Skip to main content

vortex_pco/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::sync::Arc;
8
9use pco::ChunkConfig;
10use pco::PagingSpec;
11use pco::data_types::Number;
12use pco::data_types::NumberType;
13use pco::errors::PcoError;
14use pco::match_number_enum;
15use pco::wrapped::ChunkDecompressor;
16use pco::wrapped::FileCompressor;
17use pco::wrapped::FileDecompressor;
18use prost::Message;
19use vortex_array::ArrayEq;
20use vortex_array::ArrayHash;
21use vortex_array::ArrayRef;
22use vortex_array::DynArray;
23use vortex_array::ExecutionCtx;
24use vortex_array::ExecutionResult;
25use vortex_array::IntoArray;
26use vortex_array::LEGACY_SESSION;
27use vortex_array::Precision;
28use vortex_array::ProstMetadata;
29use vortex_array::ToCanonical;
30use vortex_array::VortexSessionExecute;
31use vortex_array::arrays::Primitive;
32use vortex_array::arrays::PrimitiveArray;
33use vortex_array::buffer::BufferHandle;
34use vortex_array::dtype::DType;
35use vortex_array::dtype::PType;
36use vortex_array::dtype::half;
37use vortex_array::scalar::Scalar;
38use vortex_array::serde::ArrayChildren;
39use vortex_array::stats::ArrayStats;
40use vortex_array::stats::StatsSetRef;
41use vortex_array::validity::Validity;
42use vortex_array::vtable;
43use vortex_array::vtable::ArrayId;
44use vortex_array::vtable::OperationsVTable;
45use vortex_array::vtable::VTable;
46use vortex_array::vtable::ValidityHelper;
47use vortex_array::vtable::ValiditySliceHelper;
48use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
49use vortex_array::vtable::validity_nchildren;
50use vortex_array::vtable::validity_to_child;
51use vortex_buffer::BufferMut;
52use vortex_buffer::ByteBuffer;
53use vortex_buffer::ByteBufferMut;
54use vortex_error::VortexError;
55use vortex_error::VortexExpect as _;
56use vortex_error::VortexResult;
57use vortex_error::vortex_bail;
58use vortex_error::vortex_ensure;
59use vortex_error::vortex_err;
60use vortex_error::vortex_panic;
61use vortex_session::VortexSession;
62
63use crate::PcoChunkInfo;
64use crate::PcoMetadata;
65use crate::PcoPageInfo;
66
67// Overall approach here:
68// Chunk the array into Pco chunks (currently using the default recommended size
69// for good compression), and into finer-grained Pco pages. As we go, write each
70// ChunkMeta as a buffer, followed by each of that chunk's pages as a buffer. We
71// store metadata for each of these "components" (chunk or page). At
72// decompression time, we figure out which components we need to read and only
73// process those. We only compress and decompress valid values.
74
75// Visually, during decompression, we have an interval of pages we're
76// decompressing and a tighter interval of the slice we actually care about.
77// |=============values (all valid elements)==============|
78// |<-n_skipped_values->|----decompressed_values------|
79//                          |----slice_values----|
80//                          ^                    ^
81// |<---slice_value_start-->|<--slice_n_values-->|
82// We then insert these values to the correct position using a primitive array
83// constructor.
84
85const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
86
87vtable!(Pco);
88
89impl VTable for Pco {
90    type Array = PcoArray;
91
92    type Metadata = ProstMetadata<PcoMetadata>;
93    type OperationsVTable = Self;
94    type ValidityVTable = ValidityVTableFromValiditySliceHelper;
95
96    fn vtable(_array: &Self::Array) -> &Self {
97        &Pco
98    }
99
100    fn id(&self) -> ArrayId {
101        Self::ID
102    }
103
104    fn len(array: &PcoArray) -> usize {
105        array.slice_stop - array.slice_start
106    }
107
108    fn dtype(array: &PcoArray) -> &DType {
109        &array.dtype
110    }
111
112    fn stats(array: &PcoArray) -> StatsSetRef<'_> {
113        array.stats_set.to_ref(array.as_ref())
114    }
115
116    fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
117        array.dtype.hash(state);
118        array.unsliced_validity.array_hash(state, precision);
119        array.unsliced_n_rows.hash(state);
120        array.slice_start.hash(state);
121        array.slice_stop.hash(state);
122        // Hash chunk_metas and pages using pointer-based hashing
123        for chunk_meta in &array.chunk_metas {
124            chunk_meta.array_hash(state, precision);
125        }
126        for page in &array.pages {
127            page.array_hash(state, precision);
128        }
129    }
130
131    fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
132        if array.dtype != other.dtype
133            || !array
134                .unsliced_validity
135                .array_eq(&other.unsliced_validity, precision)
136            || array.unsliced_n_rows != other.unsliced_n_rows
137            || array.slice_start != other.slice_start
138            || array.slice_stop != other.slice_stop
139            || array.chunk_metas.len() != other.chunk_metas.len()
140            || array.pages.len() != other.pages.len()
141        {
142            return false;
143        }
144        for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
145            if !a.array_eq(b, precision) {
146                return false;
147            }
148        }
149        for (a, b) in array.pages.iter().zip(&other.pages) {
150            if !a.array_eq(b, precision) {
151                return false;
152            }
153        }
154        true
155    }
156
157    fn nbuffers(array: &PcoArray) -> usize {
158        array.chunk_metas.len() + array.pages.len()
159    }
160
161    fn buffer(array: &PcoArray, idx: usize) -> BufferHandle {
162        if idx < array.chunk_metas.len() {
163            BufferHandle::new_host(array.chunk_metas[idx].clone())
164        } else {
165            let page_idx = idx - array.chunk_metas.len();
166            BufferHandle::new_host(array.pages[page_idx].clone())
167        }
168    }
169
170    fn buffer_name(array: &PcoArray, idx: usize) -> Option<String> {
171        if idx < array.chunk_metas.len() {
172            Some(format!("chunk_meta_{idx}"))
173        } else {
174            Some(format!("page_{}", idx - array.chunk_metas.len()))
175        }
176    }
177
178    fn nchildren(array: &PcoArray) -> usize {
179        validity_nchildren(&array.unsliced_validity)
180    }
181
182    fn child(array: &PcoArray, idx: usize) -> ArrayRef {
183        validity_to_child(&array.unsliced_validity, array.unsliced_n_rows)
184            .unwrap_or_else(|| vortex_panic!("PcoArray child index {idx} out of bounds"))
185    }
186
187    fn child_name(_array: &PcoArray, idx: usize) -> String {
188        match idx {
189            0 => "validity".to_string(),
190            _ => vortex_panic!("PcoArray child_name index {idx} out of bounds"),
191        }
192    }
193
194    fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
195        Ok(ProstMetadata(array.metadata.clone()))
196    }
197
198    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
199        Ok(Some(metadata.0.encode_to_vec()))
200    }
201
202    fn deserialize(
203        bytes: &[u8],
204        _dtype: &DType,
205        _len: usize,
206        _buffers: &[BufferHandle],
207        _session: &VortexSession,
208    ) -> VortexResult<Self::Metadata> {
209        Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
210    }
211
212    fn build(
213        dtype: &DType,
214        len: usize,
215        metadata: &Self::Metadata,
216        buffers: &[BufferHandle],
217        children: &dyn ArrayChildren,
218    ) -> VortexResult<PcoArray> {
219        let validity = if children.is_empty() {
220            Validity::from(dtype.nullability())
221        } else if children.len() == 1 {
222            let validity = children.get(0, &Validity::DTYPE, len)?;
223            Validity::Array(validity)
224        } else {
225            vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
226        };
227
228        vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
229        let chunk_metas = buffers[..metadata.0.chunks.len()]
230            .iter()
231            .map(|b| b.clone().try_to_host_sync())
232            .collect::<VortexResult<Vec<_>>>()?;
233        let pages = buffers[metadata.0.chunks.len()..]
234            .iter()
235            .map(|b| b.clone().try_to_host_sync())
236            .collect::<VortexResult<Vec<_>>>()?;
237
238        let expected_n_pages = metadata
239            .0
240            .chunks
241            .iter()
242            .map(|info| info.pages.len())
243            .sum::<usize>();
244        vortex_ensure!(pages.len() == expected_n_pages);
245
246        Ok(PcoArray::new(
247            chunk_metas,
248            pages,
249            dtype.clone(),
250            metadata.0.clone(),
251            len,
252            validity,
253        ))
254    }
255
256    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
257        vortex_ensure!(
258            children.len() <= 1,
259            "PcoArray expects 0 or 1 children, got {}",
260            children.len()
261        );
262
263        if children.is_empty() {
264            array.unsliced_validity = Validity::from(array.dtype.nullability());
265        } else {
266            array.unsliced_validity =
267                Validity::Array(children.into_iter().next().vortex_expect("validity child"));
268        }
269
270        Ok(())
271    }
272
273    fn execute(array: Arc<Self::Array>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
274        Ok(ExecutionResult::done(array.decompress(ctx)?.into_array()))
275    }
276
277    fn reduce_parent(
278        array: &Self::Array,
279        parent: &ArrayRef,
280        child_idx: usize,
281    ) -> VortexResult<Option<ArrayRef>> {
282        crate::rules::RULES.evaluate(array, parent, child_idx)
283    }
284}
285
286pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
287    let ptype = dtype.as_ptype();
288    match ptype {
289        PType::F16 => NumberType::F16,
290        PType::F32 => NumberType::F32,
291        PType::F64 => NumberType::F64,
292        PType::I16 => NumberType::I16,
293        PType::I32 => NumberType::I32,
294        PType::I64 => NumberType::I64,
295        PType::U16 => NumberType::U16,
296        PType::U32 => NumberType::U32,
297        PType::U64 => NumberType::U64,
298        _ => unreachable!("PType not supported by Pco: {:?}", ptype),
299    }
300}
301
302fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
303    let mask = parray.validity_mask()?;
304    Ok(parray.clone().into_array().filter(mask)?.to_primitive())
305}
306
307pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
308    use pco::errors::ErrorKind::*;
309    match err.kind {
310        Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
311        InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
312        other => vortex_err!("Pco {:?} error: {}", other, err.message),
313    }
314}
315
316#[derive(Clone, Debug)]
317pub struct Pco;
318
319impl Pco {
320    pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
321}
322
323#[derive(Clone, Debug)]
324pub struct PcoArray {
325    pub(crate) chunk_metas: Vec<ByteBuffer>,
326    pub(crate) pages: Vec<ByteBuffer>,
327    pub(crate) metadata: PcoMetadata,
328    dtype: DType,
329    pub(crate) unsliced_validity: Validity,
330    unsliced_n_rows: usize,
331    stats_set: ArrayStats,
332    slice_start: usize,
333    slice_stop: usize,
334}
335
336impl PcoArray {
337    pub fn new(
338        chunk_metas: Vec<ByteBuffer>,
339        pages: Vec<ByteBuffer>,
340        dtype: DType,
341        metadata: PcoMetadata,
342        len: usize,
343        validity: Validity,
344    ) -> Self {
345        Self {
346            chunk_metas,
347            pages,
348            metadata,
349            dtype,
350            unsliced_validity: validity,
351            unsliced_n_rows: len,
352            stats_set: Default::default(),
353            slice_start: 0,
354            slice_stop: len,
355        }
356    }
357
358    pub fn from_primitive(
359        parray: &PrimitiveArray,
360        level: usize,
361        values_per_page: usize,
362    ) -> VortexResult<Self> {
363        Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
364    }
365
366    pub(crate) fn from_primitive_with_values_per_chunk(
367        parray: &PrimitiveArray,
368        level: usize,
369        values_per_chunk: usize,
370        values_per_page: usize,
371    ) -> VortexResult<Self> {
372        let number_type = number_type_from_dtype(parray.dtype());
373        let values_per_page = if values_per_page == 0 {
374            values_per_chunk
375        } else {
376            values_per_page
377        };
378
379        // perhaps one day we can make this more configurable
380        let chunk_config = ChunkConfig::default()
381            .with_compression_level(level)
382            .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
383
384        let values = collect_valid(parray)?;
385        let n_values = values.len();
386
387        let fc = FileCompressor::default();
388        let mut header = vec![];
389        fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
390
391        let mut chunk_meta_buffers = vec![]; // the Pco component
392        let mut chunk_infos = vec![]; // the Vortex metadata
393        let mut page_buffers = vec![];
394        for chunk_start in (0..n_values).step_by(values_per_chunk) {
395            let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
396            let mut cc = match_number_enum!(
397                number_type,
398                NumberType<T> => {
399                    let values = values.to_buffer::<T>();
400                    let chunk = &values.as_slice()[chunk_start..chunk_end];
401                    fc
402                        .chunk_compressor(chunk, &chunk_config)
403                        .map_err(vortex_err_from_pco)?
404                }
405            );
406
407            let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
408            cc.write_meta(&mut chunk_meta_buffer)
409                .map_err(vortex_err_from_pco)?;
410            chunk_meta_buffers.push(chunk_meta_buffer.freeze());
411
412            let mut page_infos = vec![];
413            for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
414                let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
415                cc.write_page(page_idx, &mut page)
416                    .map_err(vortex_err_from_pco)?;
417                page_buffers.push(page.freeze());
418                page_infos.push(PcoPageInfo {
419                    n_values: u32::try_from(page_n_values)?,
420                });
421            }
422            chunk_infos.push(PcoChunkInfo { pages: page_infos })
423        }
424
425        let metadata = PcoMetadata {
426            header,
427            chunks: chunk_infos,
428        };
429        Ok(PcoArray::new(
430            chunk_meta_buffers,
431            page_buffers,
432            parray.dtype().clone(),
433            metadata,
434            parray.len(),
435            parray.validity().clone(),
436        ))
437    }
438
439    pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
440        if let Some(parray) = array.as_opt::<Primitive>() {
441            Self::from_primitive(parray, level, nums_per_page)
442        } else {
443            Err(vortex_err!("Pco can only encode primitive arrays"))
444        }
445    }
446
447    pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
448        // To start, we figure out which chunks and pages we need to decompress, and with
449        // what value offset into the first such page.
450        let number_type = number_type_from_dtype(&self.dtype);
451        let values_byte_buffer = match_number_enum!(
452            number_type,
453            NumberType<T> => {
454              self.decompress_values_typed::<T>(ctx)?
455            }
456        );
457
458        Ok(PrimitiveArray::from_values_byte_buffer(
459            values_byte_buffer,
460            self.dtype.as_ptype(),
461            self.unsliced_validity
462                .slice(self.slice_start..self.slice_stop)?,
463            self.slice_stop - self.slice_start,
464        ))
465    }
466
467    fn decompress_values_typed<T: Number>(
468        &self,
469        ctx: &mut ExecutionCtx,
470    ) -> VortexResult<ByteBuffer> {
471        // To start, we figure out what range of values we need to decompress.
472        let slice_value_indices = self
473            .unsliced_validity
474            .execute_mask(self.unsliced_n_rows, ctx)?
475            .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
476        let slice_value_start = slice_value_indices[0];
477        let slice_value_stop = slice_value_indices[1];
478        let slice_n_values = slice_value_stop - slice_value_start;
479
480        // Then we decompress those pages into a buffer. Note that these values
481        // may exceed the bounds of the slice, so we need to slice later.
482        let (fd, _) =
483            FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
484        let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
485        let mut page_idx = 0;
486        let mut page_value_start = 0;
487        let mut n_skipped_values = 0;
488        for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
489            // lazily initialize chunk decompressor
490            let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
491            for page_info in &chunk_info.pages {
492                let page_n_values = page_info.n_values as usize;
493                let page_value_stop = page_value_start + page_n_values;
494
495                if page_value_start >= slice_value_stop {
496                    break;
497                }
498
499                if page_value_stop > slice_value_start {
500                    // we need this page
501                    let old_len = decompressed_values.len();
502                    let new_len = old_len + page_n_values;
503                    decompressed_values.reserve(page_n_values);
504                    unsafe {
505                        decompressed_values.set_len(new_len);
506                    }
507                    let page: &[u8] = self.pages[page_idx].as_ref();
508
509                    let mut cd = match chunk_decompressor.take() {
510                        Some(d) => d,
511                        None => {
512                            let (new_cd, _) = fd
513                                .chunk_decompressor(chunk_meta.as_ref())
514                                .map_err(vortex_err_from_pco)?;
515                            new_cd
516                        }
517                    };
518
519                    let mut pd = cd
520                        .page_decompressor(page, page_n_values)
521                        .map_err(vortex_err_from_pco)?;
522                    pd.read(&mut decompressed_values[old_len..new_len])
523                        .map_err(vortex_err_from_pco)?;
524
525                    chunk_decompressor = Some(cd);
526                } else {
527                    n_skipped_values += page_n_values;
528                }
529
530                page_value_start = page_value_stop;
531                page_idx += 1;
532            }
533        }
534
535        // Slice only the values requested.
536        let value_offset = slice_value_start - n_skipped_values;
537        Ok(decompressed_values
538            .freeze()
539            .slice(value_offset..value_offset + slice_n_values)
540            .into_byte_buffer())
541    }
542
543    pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
544        PcoArray {
545            slice_start: self.slice_start + start,
546            slice_stop: self.slice_start + stop,
547            stats_set: Default::default(),
548            ..self.clone()
549        }
550    }
551
552    pub(crate) fn dtype(&self) -> &DType {
553        &self.dtype
554    }
555
556    pub(crate) fn slice_start(&self) -> usize {
557        self.slice_start
558    }
559
560    pub(crate) fn slice_stop(&self) -> usize {
561        self.slice_stop
562    }
563
564    pub(crate) fn unsliced_n_rows(&self) -> usize {
565        self.unsliced_n_rows
566    }
567}
568
569impl ValiditySliceHelper for PcoArray {
570    fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
571        (&self.unsliced_validity, self.slice_start, self.slice_stop)
572    }
573}
574
575impl OperationsVTable<Pco> for Pco {
576    fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
577        let mut ctx = LEGACY_SESSION.create_execution_ctx();
578        array
579            ._slice(index, index + 1)
580            .decompress(&mut ctx)?
581            .scalar_at(0)
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use vortex_array::IntoArray;
588    use vortex_array::arrays::PrimitiveArray;
589    use vortex_array::assert_arrays_eq;
590    use vortex_array::validity::Validity;
591    use vortex_buffer::buffer;
592
593    use crate::PcoArray;
594
595    #[test]
596    fn test_slice_nullable() {
597        // Create a nullable array with some nulls
598        let values = PrimitiveArray::new(
599            buffer![10u32, 20, 30, 40, 50, 60],
600            Validity::from_iter([false, true, true, true, true, false]),
601        );
602        let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
603        assert_arrays_eq!(
604            pco,
605            PrimitiveArray::from_option_iter([
606                None,
607                Some(20u32),
608                Some(30),
609                Some(40),
610                Some(50),
611                None
612            ])
613        );
614
615        // Slice to get only the non-null values in the middle
616        let sliced = pco.slice(1..5).unwrap();
617        let expected =
618            PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
619                .into_array();
620        assert_arrays_eq!(sliced, expected);
621    }
622}