Skip to main content

vortex_pco/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7
8use pco::ChunkConfig;
9use pco::PagingSpec;
10use pco::data_types::Number;
11use pco::data_types::NumberType;
12use pco::errors::PcoError;
13use pco::match_number_enum;
14use pco::wrapped::ChunkDecompressor;
15use pco::wrapped::FileCompressor;
16use pco::wrapped::FileDecompressor;
17use prost::Message;
18use vortex_array::ArrayEq;
19use vortex_array::ArrayHash;
20use vortex_array::ArrayRef;
21use vortex_array::DynArray;
22use vortex_array::ExecutionCtx;
23use vortex_array::ExecutionStep;
24use vortex_array::IntoArray;
25use vortex_array::Precision;
26use vortex_array::ProstMetadata;
27use vortex_array::ToCanonical;
28use vortex_array::arrays::PrimitiveArray;
29use vortex_array::arrays::PrimitiveVTable;
30use vortex_array::buffer::BufferHandle;
31use vortex_array::dtype::DType;
32use vortex_array::dtype::PType;
33use vortex_array::dtype::half;
34use vortex_array::scalar::Scalar;
35use vortex_array::serde::ArrayChildren;
36use vortex_array::stats::ArrayStats;
37use vortex_array::stats::StatsSetRef;
38use vortex_array::validity::Validity;
39use vortex_array::vtable;
40use vortex_array::vtable::ArrayId;
41use vortex_array::vtable::OperationsVTable;
42use vortex_array::vtable::VTable;
43use vortex_array::vtable::ValidityHelper;
44use vortex_array::vtable::ValiditySliceHelper;
45use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
46use vortex_array::vtable::validity_nchildren;
47use vortex_array::vtable::validity_to_child;
48use vortex_buffer::BufferMut;
49use vortex_buffer::ByteBuffer;
50use vortex_buffer::ByteBufferMut;
51use vortex_error::VortexError;
52use vortex_error::VortexExpect as _;
53use vortex_error::VortexResult;
54use vortex_error::vortex_bail;
55use vortex_error::vortex_ensure;
56use vortex_error::vortex_err;
57use vortex_error::vortex_panic;
58use vortex_session::VortexSession;
59
60use crate::PcoChunkInfo;
61use crate::PcoMetadata;
62use crate::PcoPageInfo;
63
64// Overall approach here:
65// Chunk the array into Pco chunks (currently using the default recommended size
66// for good compression), and into finer-grained Pco pages. As we go, write each
67// ChunkMeta as a buffer, followed by each of that chunk's pages as a buffer. We
68// store metadata for each of these "components" (chunk or page). At
69// decompression time, we figure out which components we need to read and only
70// process those. We only compress and decompress valid values.
71
72// Visually, during decompression, we have an interval of pages we're
73// decompressing and a tighter interval of the slice we actually care about.
74// |=============values (all valid elements)==============|
75// |<-n_skipped_values->|----decompressed_values------|
76//                          |----slice_values----|
77//                          ^                    ^
78// |<---slice_value_start-->|<--slice_n_values-->|
79// We then insert these values to the correct position using a primitive array
80// constructor.
81
82const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
83
84vtable!(Pco);
85
86impl VTable for PcoVTable {
87    type Array = PcoArray;
88
89    type Metadata = ProstMetadata<PcoMetadata>;
90    type OperationsVTable = Self;
91    type ValidityVTable = ValidityVTableFromValiditySliceHelper;
92
93    fn id(_array: &Self::Array) -> ArrayId {
94        Self::ID
95    }
96
97    fn len(array: &PcoArray) -> usize {
98        array.slice_stop - array.slice_start
99    }
100
101    fn dtype(array: &PcoArray) -> &DType {
102        &array.dtype
103    }
104
105    fn stats(array: &PcoArray) -> StatsSetRef<'_> {
106        array.stats_set.to_ref(array.as_ref())
107    }
108
109    fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
110        array.dtype.hash(state);
111        array.unsliced_validity.array_hash(state, precision);
112        array.unsliced_n_rows.hash(state);
113        array.slice_start.hash(state);
114        array.slice_stop.hash(state);
115        // Hash chunk_metas and pages using pointer-based hashing
116        for chunk_meta in &array.chunk_metas {
117            chunk_meta.array_hash(state, precision);
118        }
119        for page in &array.pages {
120            page.array_hash(state, precision);
121        }
122    }
123
124    fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
125        if array.dtype != other.dtype
126            || !array
127                .unsliced_validity
128                .array_eq(&other.unsliced_validity, precision)
129            || array.unsliced_n_rows != other.unsliced_n_rows
130            || array.slice_start != other.slice_start
131            || array.slice_stop != other.slice_stop
132            || array.chunk_metas.len() != other.chunk_metas.len()
133            || array.pages.len() != other.pages.len()
134        {
135            return false;
136        }
137        for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
138            if !a.array_eq(b, precision) {
139                return false;
140            }
141        }
142        for (a, b) in array.pages.iter().zip(&other.pages) {
143            if !a.array_eq(b, precision) {
144                return false;
145            }
146        }
147        true
148    }
149
150    fn nbuffers(array: &PcoArray) -> usize {
151        array.chunk_metas.len() + array.pages.len()
152    }
153
154    fn buffer(array: &PcoArray, idx: usize) -> BufferHandle {
155        if idx < array.chunk_metas.len() {
156            BufferHandle::new_host(array.chunk_metas[idx].clone())
157        } else {
158            let page_idx = idx - array.chunk_metas.len();
159            BufferHandle::new_host(array.pages[page_idx].clone())
160        }
161    }
162
163    fn buffer_name(array: &PcoArray, idx: usize) -> Option<String> {
164        if idx < array.chunk_metas.len() {
165            Some(format!("chunk_meta_{idx}"))
166        } else {
167            Some(format!("page_{}", idx - array.chunk_metas.len()))
168        }
169    }
170
171    fn nchildren(array: &PcoArray) -> usize {
172        validity_nchildren(&array.unsliced_validity)
173    }
174
175    fn child(array: &PcoArray, idx: usize) -> ArrayRef {
176        validity_to_child(&array.unsliced_validity, array.unsliced_n_rows)
177            .unwrap_or_else(|| vortex_panic!("PcoArray child index {idx} out of bounds"))
178    }
179
180    fn child_name(_array: &PcoArray, idx: usize) -> String {
181        match idx {
182            0 => "validity".to_string(),
183            _ => vortex_panic!("PcoArray child_name index {idx} out of bounds"),
184        }
185    }
186
187    fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
188        Ok(ProstMetadata(array.metadata.clone()))
189    }
190
191    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
192        Ok(Some(metadata.0.encode_to_vec()))
193    }
194
195    fn deserialize(
196        bytes: &[u8],
197        _dtype: &DType,
198        _len: usize,
199        _buffers: &[BufferHandle],
200        _session: &VortexSession,
201    ) -> VortexResult<Self::Metadata> {
202        Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
203    }
204
205    fn build(
206        dtype: &DType,
207        len: usize,
208        metadata: &Self::Metadata,
209        buffers: &[BufferHandle],
210        children: &dyn ArrayChildren,
211    ) -> VortexResult<PcoArray> {
212        let validity = if children.is_empty() {
213            Validity::from(dtype.nullability())
214        } else if children.len() == 1 {
215            let validity = children.get(0, &Validity::DTYPE, len)?;
216            Validity::Array(validity)
217        } else {
218            vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
219        };
220
221        vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
222        let chunk_metas = buffers[..metadata.0.chunks.len()]
223            .iter()
224            .map(|b| b.clone().try_to_host_sync())
225            .collect::<VortexResult<Vec<_>>>()?;
226        let pages = buffers[metadata.0.chunks.len()..]
227            .iter()
228            .map(|b| b.clone().try_to_host_sync())
229            .collect::<VortexResult<Vec<_>>>()?;
230
231        let expected_n_pages = metadata
232            .0
233            .chunks
234            .iter()
235            .map(|info| info.pages.len())
236            .sum::<usize>();
237        vortex_ensure!(pages.len() == expected_n_pages);
238
239        Ok(PcoArray::new(
240            chunk_metas,
241            pages,
242            dtype.clone(),
243            metadata.0.clone(),
244            len,
245            validity,
246        ))
247    }
248
249    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
250        vortex_ensure!(
251            children.len() <= 1,
252            "PcoArray expects 0 or 1 children, got {}",
253            children.len()
254        );
255
256        if children.is_empty() {
257            array.unsliced_validity = Validity::from(array.dtype.nullability());
258        } else {
259            array.unsliced_validity =
260                Validity::Array(children.into_iter().next().vortex_expect("validity child"));
261        }
262
263        Ok(())
264    }
265
266    fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
267        Ok(ExecutionStep::Done(array.decompress()?.into_array()))
268    }
269
270    fn reduce_parent(
271        array: &Self::Array,
272        parent: &ArrayRef,
273        child_idx: usize,
274    ) -> VortexResult<Option<ArrayRef>> {
275        crate::rules::RULES.evaluate(array, parent, child_idx)
276    }
277}
278
279pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
280    let ptype = dtype.as_ptype();
281    match ptype {
282        PType::F16 => NumberType::F16,
283        PType::F32 => NumberType::F32,
284        PType::F64 => NumberType::F64,
285        PType::I16 => NumberType::I16,
286        PType::I32 => NumberType::I32,
287        PType::I64 => NumberType::I64,
288        PType::U16 => NumberType::U16,
289        PType::U32 => NumberType::U32,
290        PType::U64 => NumberType::U64,
291        _ => unreachable!("PType not supported by Pco: {:?}", ptype),
292    }
293}
294
295fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
296    let mask = parray.validity_mask()?;
297    Ok(parray.clone().into_array().filter(mask)?.to_primitive())
298}
299
300pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
301    use pco::errors::ErrorKind::*;
302    match err.kind {
303        Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
304        InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
305        other => vortex_err!("Pco {:?} error: {}", other, err.message),
306    }
307}
308
309#[derive(Debug)]
310pub struct PcoVTable;
311
312impl PcoVTable {
313    pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
314}
315
316#[derive(Clone, Debug)]
317pub struct PcoArray {
318    pub(crate) chunk_metas: Vec<ByteBuffer>,
319    pub(crate) pages: Vec<ByteBuffer>,
320    pub(crate) metadata: PcoMetadata,
321    dtype: DType,
322    pub(crate) unsliced_validity: Validity,
323    unsliced_n_rows: usize,
324    stats_set: ArrayStats,
325    slice_start: usize,
326    slice_stop: usize,
327}
328
329impl PcoArray {
330    pub fn new(
331        chunk_metas: Vec<ByteBuffer>,
332        pages: Vec<ByteBuffer>,
333        dtype: DType,
334        metadata: PcoMetadata,
335        len: usize,
336        validity: Validity,
337    ) -> Self {
338        Self {
339            chunk_metas,
340            pages,
341            metadata,
342            dtype,
343            unsliced_validity: validity,
344            unsliced_n_rows: len,
345            stats_set: Default::default(),
346            slice_start: 0,
347            slice_stop: len,
348        }
349    }
350
351    pub fn from_primitive(
352        parray: &PrimitiveArray,
353        level: usize,
354        values_per_page: usize,
355    ) -> VortexResult<Self> {
356        Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
357    }
358
359    pub(crate) fn from_primitive_with_values_per_chunk(
360        parray: &PrimitiveArray,
361        level: usize,
362        values_per_chunk: usize,
363        values_per_page: usize,
364    ) -> VortexResult<Self> {
365        let number_type = number_type_from_dtype(parray.dtype());
366        let values_per_page = if values_per_page == 0 {
367            values_per_chunk
368        } else {
369            values_per_page
370        };
371
372        // perhaps one day we can make this more configurable
373        let chunk_config = ChunkConfig::default()
374            .with_compression_level(level)
375            .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
376
377        let values = collect_valid(parray)?;
378        let n_values = values.len();
379
380        let fc = FileCompressor::default();
381        let mut header = vec![];
382        fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
383
384        let mut chunk_meta_buffers = vec![]; // the Pco component
385        let mut chunk_infos = vec![]; // the Vortex metadata
386        let mut page_buffers = vec![];
387        for chunk_start in (0..n_values).step_by(values_per_chunk) {
388            let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
389            let mut cc = match_number_enum!(
390                number_type,
391                NumberType<T> => {
392                    let values = values.to_buffer::<T>();
393                    let chunk = &values.as_slice()[chunk_start..chunk_end];
394                    fc
395                        .chunk_compressor(chunk, &chunk_config)
396                        .map_err(vortex_err_from_pco)?
397                }
398            );
399
400            let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
401            cc.write_meta(&mut chunk_meta_buffer)
402                .map_err(vortex_err_from_pco)?;
403            chunk_meta_buffers.push(chunk_meta_buffer.freeze());
404
405            let mut page_infos = vec![];
406            for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
407                let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
408                cc.write_page(page_idx, &mut page)
409                    .map_err(vortex_err_from_pco)?;
410                page_buffers.push(page.freeze());
411                page_infos.push(PcoPageInfo {
412                    n_values: u32::try_from(page_n_values)?,
413                });
414            }
415            chunk_infos.push(PcoChunkInfo { pages: page_infos })
416        }
417
418        let metadata = PcoMetadata {
419            header,
420            chunks: chunk_infos,
421        };
422        Ok(PcoArray::new(
423            chunk_meta_buffers,
424            page_buffers,
425            parray.dtype().clone(),
426            metadata,
427            parray.len(),
428            parray.validity().clone(),
429        ))
430    }
431
432    pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
433        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
434            Self::from_primitive(parray, level, nums_per_page)
435        } else {
436            Err(vortex_err!("Pco can only encode primitive arrays"))
437        }
438    }
439
440    pub fn decompress(&self) -> VortexResult<PrimitiveArray> {
441        // To start, we figure out which chunks and pages we need to decompress, and with
442        // what value offset into the first such page.
443        let number_type = number_type_from_dtype(&self.dtype);
444        let values_byte_buffer = match_number_enum!(
445            number_type,
446            NumberType<T> => {
447              self.decompress_values_typed::<T>()?
448            }
449        );
450
451        Ok(PrimitiveArray::from_values_byte_buffer(
452            values_byte_buffer,
453            self.dtype.as_ptype(),
454            self.unsliced_validity
455                .slice(self.slice_start..self.slice_stop)?,
456            self.slice_stop - self.slice_start,
457        ))
458    }
459
460    fn decompress_values_typed<T: Number>(&self) -> VortexResult<ByteBuffer> {
461        // To start, we figure out what range of values we need to decompress.
462        let slice_value_indices = self
463            .unsliced_validity
464            .to_mask(self.unsliced_n_rows)
465            .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
466        let slice_value_start = slice_value_indices[0];
467        let slice_value_stop = slice_value_indices[1];
468        let slice_n_values = slice_value_stop - slice_value_start;
469
470        // Then we decompress those pages into a buffer. Note that these values
471        // may exceed the bounds of the slice, so we need to slice later.
472        let (fd, _) =
473            FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
474        let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
475        let mut page_idx = 0;
476        let mut page_value_start = 0;
477        let mut n_skipped_values = 0;
478        for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
479            // lazily initialize chunk decompressor
480            let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
481            for page_info in &chunk_info.pages {
482                let page_n_values = page_info.n_values as usize;
483                let page_value_stop = page_value_start + page_n_values;
484
485                if page_value_start >= slice_value_stop {
486                    break;
487                }
488
489                if page_value_stop > slice_value_start {
490                    // we need this page
491                    let old_len = decompressed_values.len();
492                    let new_len = old_len + page_n_values;
493                    decompressed_values.reserve(page_n_values);
494                    unsafe {
495                        decompressed_values.set_len(new_len);
496                    }
497                    let page: &[u8] = self.pages[page_idx].as_ref();
498
499                    let mut cd = match chunk_decompressor.take() {
500                        Some(d) => d,
501                        None => {
502                            let (new_cd, _) = fd
503                                .chunk_decompressor(chunk_meta.as_ref())
504                                .map_err(vortex_err_from_pco)?;
505                            new_cd
506                        }
507                    };
508
509                    let mut pd = cd
510                        .page_decompressor(page, page_n_values)
511                        .map_err(vortex_err_from_pco)?;
512                    pd.read(&mut decompressed_values[old_len..new_len])
513                        .map_err(vortex_err_from_pco)?;
514
515                    chunk_decompressor = Some(cd);
516                } else {
517                    n_skipped_values += page_n_values;
518                }
519
520                page_value_start = page_value_stop;
521                page_idx += 1;
522            }
523        }
524
525        // Slice only the values requested.
526        let value_offset = slice_value_start - n_skipped_values;
527        Ok(decompressed_values
528            .freeze()
529            .slice(value_offset..value_offset + slice_n_values)
530            .into_byte_buffer())
531    }
532
533    pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
534        PcoArray {
535            slice_start: self.slice_start + start,
536            slice_stop: self.slice_start + stop,
537            stats_set: Default::default(),
538            ..self.clone()
539        }
540    }
541
542    pub(crate) fn dtype(&self) -> &DType {
543        &self.dtype
544    }
545
546    pub(crate) fn slice_start(&self) -> usize {
547        self.slice_start
548    }
549
550    pub(crate) fn slice_stop(&self) -> usize {
551        self.slice_stop
552    }
553
554    pub(crate) fn unsliced_n_rows(&self) -> usize {
555        self.unsliced_n_rows
556    }
557}
558
559impl ValiditySliceHelper for PcoArray {
560    fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
561        (&self.unsliced_validity, self.slice_start, self.slice_stop)
562    }
563}
564
565impl OperationsVTable<PcoVTable> for PcoVTable {
566    fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
567        array._slice(index, index + 1).decompress()?.scalar_at(0)
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use vortex_array::IntoArray;
574    use vortex_array::arrays::PrimitiveArray;
575    use vortex_array::assert_arrays_eq;
576    use vortex_array::validity::Validity;
577    use vortex_buffer::buffer;
578
579    use crate::PcoArray;
580
581    #[test]
582    fn test_slice_nullable() {
583        // Create a nullable array with some nulls
584        let values = PrimitiveArray::new(
585            buffer![10u32, 20, 30, 40, 50, 60],
586            Validity::from_iter([false, true, true, true, true, false]),
587        );
588        let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
589        assert_arrays_eq!(
590            pco,
591            PrimitiveArray::from_option_iter([
592                None,
593                Some(20u32),
594                Some(30),
595                Some(40),
596                Some(50),
597                None
598            ])
599        );
600
601        // Slice to get only the non-null values in the middle
602        let sliced = pco.slice(1..5).unwrap();
603        let expected =
604            PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
605                .into_array();
606        assert_arrays_eq!(sliced, expected);
607    }
608}