Skip to main content

vortex_pco/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7
8use pco::ChunkConfig;
9use pco::PagingSpec;
10use pco::data_types::Number;
11use pco::data_types::NumberType;
12use pco::errors::PcoError;
13use pco::match_number_enum;
14use pco::wrapped::ChunkDecompressor;
15use pco::wrapped::FileCompressor;
16use pco::wrapped::FileDecompressor;
17use prost::Message;
18use vortex_array::ArrayBufferVisitor;
19use vortex_array::ArrayChildVisitor;
20use vortex_array::ArrayEq;
21use vortex_array::ArrayHash;
22use vortex_array::ArrayRef;
23use vortex_array::ExecutionCtx;
24use vortex_array::IntoArray;
25use vortex_array::Precision;
26use vortex_array::ProstMetadata;
27use vortex_array::ToCanonical;
28use vortex_array::arrays::PrimitiveArray;
29use vortex_array::arrays::PrimitiveVTable;
30use vortex_array::buffer::BufferHandle;
31use vortex_array::compute::filter;
32use vortex_array::scalar::Scalar;
33use vortex_array::serde::ArrayChildren;
34use vortex_array::stats::ArrayStats;
35use vortex_array::stats::StatsSetRef;
36use vortex_array::validity::Validity;
37use vortex_array::vtable;
38use vortex_array::vtable::ArrayId;
39use vortex_array::vtable::BaseArrayVTable;
40use vortex_array::vtable::OperationsVTable;
41use vortex_array::vtable::VTable;
42use vortex_array::vtable::ValidityHelper;
43use vortex_array::vtable::ValiditySliceHelper;
44use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
45use vortex_array::vtable::VisitorVTable;
46use vortex_array::vtable::validity_nchildren;
47use vortex_buffer::BufferMut;
48use vortex_buffer::ByteBuffer;
49use vortex_buffer::ByteBufferMut;
50use vortex_dtype::DType;
51use vortex_dtype::PType;
52use vortex_dtype::half;
53use vortex_error::VortexError;
54use vortex_error::VortexExpect as _;
55use vortex_error::VortexResult;
56use vortex_error::vortex_bail;
57use vortex_error::vortex_ensure;
58use vortex_error::vortex_err;
59use vortex_session::VortexSession;
60
61use crate::PcoChunkInfo;
62use crate::PcoMetadata;
63use crate::PcoPageInfo;
64
65// Overall approach here:
66// Chunk the array into Pco chunks (currently using the default recommended size
67// for good compression), and into finer-grained Pco pages. As we go, write each
68// ChunkMeta as a buffer, followed by each of that chunk's pages as a buffer. We
69// store metadata for each of these "components" (chunk or page). At
70// decompression time, we figure out which components we need to read and only
71// process those. We only compress and decompress valid values.
72
73// Visually, during decompression, we have an interval of pages we're
74// decompressing and a tighter interval of the slice we actually care about.
75// |=============values (all valid elements)==============|
76// |<-n_skipped_values->|----decompressed_values------|
77//                          |----slice_values----|
78//                          ^                    ^
79// |<---slice_value_start-->|<--slice_n_values-->|
80// We then insert these values to the correct position using a primitive array
81// constructor.
82
83const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
84
85vtable!(Pco);
86
87impl VTable for PcoVTable {
88    type Array = PcoArray;
89
90    type Metadata = ProstMetadata<PcoMetadata>;
91
92    type ArrayVTable = Self;
93    type OperationsVTable = Self;
94    type ValidityVTable = ValidityVTableFromValiditySliceHelper;
95    type VisitorVTable = Self;
96
97    fn id(_array: &Self::Array) -> ArrayId {
98        Self::ID
99    }
100
101    fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
102        Ok(ProstMetadata(array.metadata.clone()))
103    }
104
105    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
106        Ok(Some(metadata.0.encode_to_vec()))
107    }
108
109    fn deserialize(
110        bytes: &[u8],
111        _dtype: &DType,
112        _len: usize,
113        _buffers: &[BufferHandle],
114        _session: &VortexSession,
115    ) -> VortexResult<Self::Metadata> {
116        Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
117    }
118
119    fn build(
120        dtype: &DType,
121        len: usize,
122        metadata: &Self::Metadata,
123        buffers: &[BufferHandle],
124        children: &dyn ArrayChildren,
125    ) -> VortexResult<PcoArray> {
126        let validity = if children.is_empty() {
127            Validity::from(dtype.nullability())
128        } else if children.len() == 1 {
129            let validity = children.get(0, &Validity::DTYPE, len)?;
130            Validity::Array(validity)
131        } else {
132            vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
133        };
134
135        vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
136        let chunk_metas = buffers[..metadata.0.chunks.len()]
137            .iter()
138            .map(|b| b.clone().try_to_host_sync())
139            .collect::<VortexResult<Vec<_>>>()?;
140        let pages = buffers[metadata.0.chunks.len()..]
141            .iter()
142            .map(|b| b.clone().try_to_host_sync())
143            .collect::<VortexResult<Vec<_>>>()?;
144
145        let expected_n_pages = metadata
146            .0
147            .chunks
148            .iter()
149            .map(|info| info.pages.len())
150            .sum::<usize>();
151        vortex_ensure!(pages.len() == expected_n_pages);
152
153        Ok(PcoArray::new(
154            chunk_metas,
155            pages,
156            dtype.clone(),
157            metadata.0.clone(),
158            len,
159            validity,
160        ))
161    }
162
163    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
164        vortex_ensure!(
165            children.len() <= 1,
166            "PcoArray expects 0 or 1 children, got {}",
167            children.len()
168        );
169
170        if children.is_empty() {
171            array.unsliced_validity = Validity::from(array.dtype.nullability());
172        } else {
173            array.unsliced_validity =
174                Validity::Array(children.into_iter().next().vortex_expect("validity child"));
175        }
176
177        Ok(())
178    }
179
180    fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
181        Ok(array.decompress()?.into_array())
182    }
183
184    fn reduce_parent(
185        array: &Self::Array,
186        parent: &ArrayRef,
187        child_idx: usize,
188    ) -> VortexResult<Option<ArrayRef>> {
189        crate::rules::RULES.evaluate(array, parent, child_idx)
190    }
191}
192
193pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
194    let ptype = dtype.as_ptype();
195    match ptype {
196        PType::F16 => NumberType::F16,
197        PType::F32 => NumberType::F32,
198        PType::F64 => NumberType::F64,
199        PType::I16 => NumberType::I16,
200        PType::I32 => NumberType::I32,
201        PType::I64 => NumberType::I64,
202        PType::U16 => NumberType::U16,
203        PType::U32 => NumberType::U32,
204        PType::U64 => NumberType::U64,
205        _ => unreachable!("PType not supported by Pco: {:?}", ptype),
206    }
207}
208
209fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
210    let mask = parray.validity_mask()?;
211    Ok(filter(&parray.to_array(), &mask)?.to_primitive())
212}
213
214pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
215    use pco::errors::ErrorKind::*;
216    match err.kind {
217        Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
218        InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
219        other => vortex_err!("Pco {:?} error: {}", other, err.message),
220    }
221}
222
223#[derive(Debug)]
224pub struct PcoVTable;
225
226impl PcoVTable {
227    pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
228}
229
230#[derive(Clone, Debug)]
231pub struct PcoArray {
232    pub(crate) chunk_metas: Vec<ByteBuffer>,
233    pub(crate) pages: Vec<ByteBuffer>,
234    pub(crate) metadata: PcoMetadata,
235    dtype: DType,
236    pub(crate) unsliced_validity: Validity,
237    unsliced_n_rows: usize,
238    stats_set: ArrayStats,
239    slice_start: usize,
240    slice_stop: usize,
241}
242
243impl PcoArray {
244    pub fn new(
245        chunk_metas: Vec<ByteBuffer>,
246        pages: Vec<ByteBuffer>,
247        dtype: DType,
248        metadata: PcoMetadata,
249        len: usize,
250        validity: Validity,
251    ) -> Self {
252        Self {
253            chunk_metas,
254            pages,
255            metadata,
256            dtype,
257            unsliced_validity: validity,
258            unsliced_n_rows: len,
259            stats_set: Default::default(),
260            slice_start: 0,
261            slice_stop: len,
262        }
263    }
264
265    pub fn from_primitive(
266        parray: &PrimitiveArray,
267        level: usize,
268        values_per_page: usize,
269    ) -> VortexResult<Self> {
270        Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
271    }
272
273    pub(crate) fn from_primitive_with_values_per_chunk(
274        parray: &PrimitiveArray,
275        level: usize,
276        values_per_chunk: usize,
277        values_per_page: usize,
278    ) -> VortexResult<Self> {
279        let number_type = number_type_from_dtype(parray.dtype());
280        let values_per_page = if values_per_page == 0 {
281            values_per_chunk
282        } else {
283            values_per_page
284        };
285
286        // perhaps one day we can make this more configurable
287        let chunk_config = ChunkConfig::default()
288            .with_compression_level(level)
289            .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
290
291        let values = collect_valid(parray)?;
292        let n_values = values.len();
293
294        let fc = FileCompressor::default();
295        let mut header = vec![];
296        fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
297
298        let mut chunk_meta_buffers = vec![]; // the Pco component
299        let mut chunk_infos = vec![]; // the Vortex metadata
300        let mut page_buffers = vec![];
301        for chunk_start in (0..n_values).step_by(values_per_chunk) {
302            let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
303            let mut cc = match_number_enum!(
304                number_type,
305                NumberType<T> => {
306                    let values = values.to_buffer::<T>();
307                    let chunk = &values.as_slice()[chunk_start..chunk_end];
308                    fc
309                        .chunk_compressor(chunk, &chunk_config)
310                        .map_err(vortex_err_from_pco)?
311                }
312            );
313
314            let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
315            cc.write_meta(&mut chunk_meta_buffer)
316                .map_err(vortex_err_from_pco)?;
317            chunk_meta_buffers.push(chunk_meta_buffer.freeze());
318
319            let mut page_infos = vec![];
320            for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
321                let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
322                cc.write_page(page_idx, &mut page)
323                    .map_err(vortex_err_from_pco)?;
324                page_buffers.push(page.freeze());
325                page_infos.push(PcoPageInfo {
326                    n_values: u32::try_from(page_n_values)?,
327                });
328            }
329            chunk_infos.push(PcoChunkInfo { pages: page_infos })
330        }
331
332        let metadata = PcoMetadata {
333            header,
334            chunks: chunk_infos,
335        };
336        Ok(PcoArray::new(
337            chunk_meta_buffers,
338            page_buffers,
339            parray.dtype().clone(),
340            metadata,
341            parray.len(),
342            parray.validity().clone(),
343        ))
344    }
345
346    pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
347        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
348            Self::from_primitive(parray, level, nums_per_page)
349        } else {
350            Err(vortex_err!("Pco can only encode primitive arrays"))
351        }
352    }
353
354    pub fn decompress(&self) -> VortexResult<PrimitiveArray> {
355        // To start, we figure out which chunks and pages we need to decompress, and with
356        // what value offset into the first such page.
357        let number_type = number_type_from_dtype(&self.dtype);
358        let values_byte_buffer = match_number_enum!(
359            number_type,
360            NumberType<T> => {
361              self.decompress_values_typed::<T>()
362            }
363        );
364
365        Ok(PrimitiveArray::from_values_byte_buffer(
366            values_byte_buffer,
367            self.dtype.as_ptype(),
368            self.unsliced_validity
369                .slice(self.slice_start..self.slice_stop)?,
370            self.slice_stop - self.slice_start,
371        ))
372    }
373
374    #[allow(clippy::unwrap_in_result, clippy::unwrap_used)]
375    fn decompress_values_typed<T: Number>(&self) -> ByteBuffer {
376        // To start, we figure out what range of values we need to decompress.
377        let slice_value_indices = self
378            .unsliced_validity
379            .to_mask(self.unsliced_n_rows)
380            .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
381        let slice_value_start = slice_value_indices[0];
382        let slice_value_stop = slice_value_indices[1];
383        let slice_n_values = slice_value_stop - slice_value_start;
384
385        // Then we decompress those pages into a buffer. Note that these values
386        // may exceed the bounds of the slice, so we need to slice later.
387        let (fd, _) = FileDecompressor::new(self.metadata.header.as_slice())
388            .map_err(vortex_err_from_pco)
389            .vortex_expect("FileDecompressor::new should succeed with valid header");
390        let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
391        let mut page_idx = 0;
392        let mut page_value_start = 0;
393        let mut n_skipped_values = 0;
394        for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
395            let mut cd: Option<ChunkDecompressor<T>> = None;
396            for page_info in &chunk_info.pages {
397                let page_n_values = page_info.n_values as usize;
398                let page_value_stop = page_value_start + page_n_values;
399
400                if page_value_start >= slice_value_stop {
401                    break;
402                }
403
404                if page_value_stop > slice_value_start {
405                    // we need this page
406                    let old_len = decompressed_values.len();
407                    let new_len = old_len + page_n_values;
408                    decompressed_values.reserve(page_n_values);
409                    unsafe {
410                        decompressed_values.set_len(new_len);
411                    }
412                    let chunk_meta_bytes: &[u8] = chunk_meta.as_ref();
413                    let page: &[u8] = self.pages[page_idx].as_ref();
414                    if cd.is_none() {
415                        let (new_cd, _) = fd
416                            .chunk_decompressor(chunk_meta_bytes)
417                            .map_err(vortex_err_from_pco)
418                            .vortex_expect(
419                                "chunk_decompressor should succeed with valid chunk metadata",
420                            );
421                        cd = Some(new_cd);
422                    }
423                    let mut pd = cd
424                        .as_mut()
425                        .unwrap()
426                        .page_decompressor(page, page_n_values)
427                        .map_err(vortex_err_from_pco)
428                        .vortex_expect("page_decompressor should succeed with valid page data");
429                    pd.read(&mut decompressed_values[old_len..new_len])
430                        .map_err(vortex_err_from_pco)
431                        .vortex_expect("decompress should succeed with valid compressed data");
432                } else {
433                    n_skipped_values += page_n_values;
434                }
435
436                page_value_start = page_value_stop;
437                page_idx += 1;
438            }
439        }
440
441        // Slice only the values requested.
442        let value_offset = slice_value_start - n_skipped_values;
443        decompressed_values
444            .freeze()
445            .slice(value_offset..value_offset + slice_n_values)
446            .into_byte_buffer()
447    }
448
449    pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
450        PcoArray {
451            slice_start: self.slice_start + start,
452            slice_stop: self.slice_start + stop,
453            stats_set: Default::default(),
454            ..self.clone()
455        }
456    }
457
458    pub(crate) fn dtype(&self) -> &DType {
459        &self.dtype
460    }
461
462    pub(crate) fn slice_start(&self) -> usize {
463        self.slice_start
464    }
465
466    pub(crate) fn slice_stop(&self) -> usize {
467        self.slice_stop
468    }
469
470    pub(crate) fn unsliced_n_rows(&self) -> usize {
471        self.unsliced_n_rows
472    }
473}
474
475impl ValiditySliceHelper for PcoArray {
476    fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
477        (&self.unsliced_validity, self.slice_start, self.slice_stop)
478    }
479}
480
481impl BaseArrayVTable<PcoVTable> for PcoVTable {
482    fn len(array: &PcoArray) -> usize {
483        array.slice_stop - array.slice_start
484    }
485
486    fn dtype(array: &PcoArray) -> &DType {
487        &array.dtype
488    }
489
490    fn stats(array: &PcoArray) -> StatsSetRef<'_> {
491        array.stats_set.to_ref(array.as_ref())
492    }
493
494    fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
495        array.dtype.hash(state);
496        array.unsliced_validity.array_hash(state, precision);
497        array.unsliced_n_rows.hash(state);
498        array.slice_start.hash(state);
499        array.slice_stop.hash(state);
500        // Hash chunk_metas and pages using pointer-based hashing
501        for chunk_meta in &array.chunk_metas {
502            chunk_meta.array_hash(state, precision);
503        }
504        for page in &array.pages {
505            page.array_hash(state, precision);
506        }
507    }
508
509    fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
510        if array.dtype != other.dtype
511            || !array
512                .unsliced_validity
513                .array_eq(&other.unsliced_validity, precision)
514            || array.unsliced_n_rows != other.unsliced_n_rows
515            || array.slice_start != other.slice_start
516            || array.slice_stop != other.slice_stop
517            || array.chunk_metas.len() != other.chunk_metas.len()
518            || array.pages.len() != other.pages.len()
519        {
520            return false;
521        }
522        for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
523            if !a.array_eq(b, precision) {
524                return false;
525            }
526        }
527        for (a, b) in array.pages.iter().zip(&other.pages) {
528            if !a.array_eq(b, precision) {
529                return false;
530            }
531        }
532        true
533    }
534}
535
536impl OperationsVTable<PcoVTable> for PcoVTable {
537    fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
538        array._slice(index, index + 1).decompress()?.scalar_at(0)
539    }
540}
541
542impl VisitorVTable<PcoVTable> for PcoVTable {
543    fn visit_buffers(array: &PcoArray, visitor: &mut dyn ArrayBufferVisitor) {
544        for (i, buffer) in array.chunk_metas.iter().enumerate() {
545            visitor.visit_buffer_handle(
546                &format!("chunk_meta_{i}"),
547                &BufferHandle::new_host(buffer.clone()),
548            );
549        }
550        for (i, buffer) in array.pages.iter().enumerate() {
551            visitor.visit_buffer_handle(
552                &format!("page_{i}"),
553                &BufferHandle::new_host(buffer.clone()),
554            );
555        }
556    }
557
558    fn nbuffers(array: &PcoArray) -> usize {
559        array.chunk_metas.len() + array.pages.len()
560    }
561
562    fn visit_children(array: &PcoArray, visitor: &mut dyn ArrayChildVisitor) {
563        visitor.visit_validity(&array.unsliced_validity, array.unsliced_n_rows());
564    }
565
566    fn nchildren(array: &PcoArray) -> usize {
567        validity_nchildren(&array.unsliced_validity)
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use vortex_array::IntoArray;
574    use vortex_array::arrays::PrimitiveArray;
575    use vortex_array::assert_arrays_eq;
576    use vortex_array::validity::Validity;
577    use vortex_buffer::Buffer;
578
579    use crate::PcoArray;
580
581    #[test]
582    fn test_slice_nullable() {
583        // Create a nullable array with some nulls
584        let values = PrimitiveArray::new(
585            Buffer::copy_from(vec![10u32, 20, 30, 40, 50, 60]),
586            Validity::from_iter([false, true, true, true, true, false]),
587        );
588        let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
589        assert_arrays_eq!(
590            pco,
591            PrimitiveArray::from_option_iter([
592                None,
593                Some(20u32),
594                Some(30),
595                Some(40),
596                Some(50),
597                None
598            ])
599        );
600
601        // Slice to get only the non-null values in the middle
602        let sliced = pco.slice(1..5).unwrap();
603        let expected =
604            PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
605                .into_array();
606        assert_arrays_eq!(sliced, expected);
607    }
608}