Skip to main content

vortex_pco/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7
8use pco::ChunkConfig;
9use pco::PagingSpec;
10use pco::data_types::Number;
11use pco::data_types::NumberType;
12use pco::errors::PcoError;
13use pco::match_number_enum;
14use pco::wrapped::ChunkDecompressor;
15use pco::wrapped::FileCompressor;
16use pco::wrapped::FileDecompressor;
17use prost::Message;
18use vortex_array::ArrayEq;
19use vortex_array::ArrayHash;
20use vortex_array::ArrayRef;
21use vortex_array::DynArray;
22use vortex_array::ExecutionCtx;
23use vortex_array::ExecutionStep;
24use vortex_array::IntoArray;
25use vortex_array::LEGACY_SESSION;
26use vortex_array::Precision;
27use vortex_array::ProstMetadata;
28use vortex_array::ToCanonical;
29use vortex_array::VortexSessionExecute;
30use vortex_array::arrays::Primitive;
31use vortex_array::arrays::PrimitiveArray;
32use vortex_array::buffer::BufferHandle;
33use vortex_array::dtype::DType;
34use vortex_array::dtype::PType;
35use vortex_array::dtype::half;
36use vortex_array::scalar::Scalar;
37use vortex_array::serde::ArrayChildren;
38use vortex_array::stats::ArrayStats;
39use vortex_array::stats::StatsSetRef;
40use vortex_array::validity::Validity;
41use vortex_array::vtable;
42use vortex_array::vtable::ArrayId;
43use vortex_array::vtable::OperationsVTable;
44use vortex_array::vtable::VTable;
45use vortex_array::vtable::ValidityHelper;
46use vortex_array::vtable::ValiditySliceHelper;
47use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
48use vortex_array::vtable::validity_nchildren;
49use vortex_array::vtable::validity_to_child;
50use vortex_buffer::BufferMut;
51use vortex_buffer::ByteBuffer;
52use vortex_buffer::ByteBufferMut;
53use vortex_error::VortexError;
54use vortex_error::VortexExpect as _;
55use vortex_error::VortexResult;
56use vortex_error::vortex_bail;
57use vortex_error::vortex_ensure;
58use vortex_error::vortex_err;
59use vortex_error::vortex_panic;
60use vortex_session::VortexSession;
61
62use crate::PcoChunkInfo;
63use crate::PcoMetadata;
64use crate::PcoPageInfo;
65
66// Overall approach here:
67// Chunk the array into Pco chunks (currently using the default recommended size
68// for good compression), and into finer-grained Pco pages. As we go, write each
69// ChunkMeta as a buffer, followed by each of that chunk's pages as a buffer. We
70// store metadata for each of these "components" (chunk or page). At
71// decompression time, we figure out which components we need to read and only
72// process those. We only compress and decompress valid values.
73
74// Visually, during decompression, we have an interval of pages we're
75// decompressing and a tighter interval of the slice we actually care about.
76// |=============values (all valid elements)==============|
77// |<-n_skipped_values->|----decompressed_values------|
78//                          |----slice_values----|
79//                          ^                    ^
80// |<---slice_value_start-->|<--slice_n_values-->|
81// We then insert these values to the correct position using a primitive array
82// constructor.
83
84const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
85
86vtable!(Pco);
87
88impl VTable for Pco {
89    type Array = PcoArray;
90
91    type Metadata = ProstMetadata<PcoMetadata>;
92    type OperationsVTable = Self;
93    type ValidityVTable = ValidityVTableFromValiditySliceHelper;
94
95    fn id(_array: &Self::Array) -> ArrayId {
96        Self::ID
97    }
98
99    fn len(array: &PcoArray) -> usize {
100        array.slice_stop - array.slice_start
101    }
102
103    fn dtype(array: &PcoArray) -> &DType {
104        &array.dtype
105    }
106
107    fn stats(array: &PcoArray) -> StatsSetRef<'_> {
108        array.stats_set.to_ref(array.as_ref())
109    }
110
111    fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
112        array.dtype.hash(state);
113        array.unsliced_validity.array_hash(state, precision);
114        array.unsliced_n_rows.hash(state);
115        array.slice_start.hash(state);
116        array.slice_stop.hash(state);
117        // Hash chunk_metas and pages using pointer-based hashing
118        for chunk_meta in &array.chunk_metas {
119            chunk_meta.array_hash(state, precision);
120        }
121        for page in &array.pages {
122            page.array_hash(state, precision);
123        }
124    }
125
126    fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
127        if array.dtype != other.dtype
128            || !array
129                .unsliced_validity
130                .array_eq(&other.unsliced_validity, precision)
131            || array.unsliced_n_rows != other.unsliced_n_rows
132            || array.slice_start != other.slice_start
133            || array.slice_stop != other.slice_stop
134            || array.chunk_metas.len() != other.chunk_metas.len()
135            || array.pages.len() != other.pages.len()
136        {
137            return false;
138        }
139        for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
140            if !a.array_eq(b, precision) {
141                return false;
142            }
143        }
144        for (a, b) in array.pages.iter().zip(&other.pages) {
145            if !a.array_eq(b, precision) {
146                return false;
147            }
148        }
149        true
150    }
151
152    fn nbuffers(array: &PcoArray) -> usize {
153        array.chunk_metas.len() + array.pages.len()
154    }
155
156    fn buffer(array: &PcoArray, idx: usize) -> BufferHandle {
157        if idx < array.chunk_metas.len() {
158            BufferHandle::new_host(array.chunk_metas[idx].clone())
159        } else {
160            let page_idx = idx - array.chunk_metas.len();
161            BufferHandle::new_host(array.pages[page_idx].clone())
162        }
163    }
164
165    fn buffer_name(array: &PcoArray, idx: usize) -> Option<String> {
166        if idx < array.chunk_metas.len() {
167            Some(format!("chunk_meta_{idx}"))
168        } else {
169            Some(format!("page_{}", idx - array.chunk_metas.len()))
170        }
171    }
172
173    fn nchildren(array: &PcoArray) -> usize {
174        validity_nchildren(&array.unsliced_validity)
175    }
176
177    fn child(array: &PcoArray, idx: usize) -> ArrayRef {
178        validity_to_child(&array.unsliced_validity, array.unsliced_n_rows)
179            .unwrap_or_else(|| vortex_panic!("PcoArray child index {idx} out of bounds"))
180    }
181
182    fn child_name(_array: &PcoArray, idx: usize) -> String {
183        match idx {
184            0 => "validity".to_string(),
185            _ => vortex_panic!("PcoArray child_name index {idx} out of bounds"),
186        }
187    }
188
189    fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
190        Ok(ProstMetadata(array.metadata.clone()))
191    }
192
193    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
194        Ok(Some(metadata.0.encode_to_vec()))
195    }
196
197    fn deserialize(
198        bytes: &[u8],
199        _dtype: &DType,
200        _len: usize,
201        _buffers: &[BufferHandle],
202        _session: &VortexSession,
203    ) -> VortexResult<Self::Metadata> {
204        Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
205    }
206
207    fn build(
208        dtype: &DType,
209        len: usize,
210        metadata: &Self::Metadata,
211        buffers: &[BufferHandle],
212        children: &dyn ArrayChildren,
213    ) -> VortexResult<PcoArray> {
214        let validity = if children.is_empty() {
215            Validity::from(dtype.nullability())
216        } else if children.len() == 1 {
217            let validity = children.get(0, &Validity::DTYPE, len)?;
218            Validity::Array(validity)
219        } else {
220            vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
221        };
222
223        vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
224        let chunk_metas = buffers[..metadata.0.chunks.len()]
225            .iter()
226            .map(|b| b.clone().try_to_host_sync())
227            .collect::<VortexResult<Vec<_>>>()?;
228        let pages = buffers[metadata.0.chunks.len()..]
229            .iter()
230            .map(|b| b.clone().try_to_host_sync())
231            .collect::<VortexResult<Vec<_>>>()?;
232
233        let expected_n_pages = metadata
234            .0
235            .chunks
236            .iter()
237            .map(|info| info.pages.len())
238            .sum::<usize>();
239        vortex_ensure!(pages.len() == expected_n_pages);
240
241        Ok(PcoArray::new(
242            chunk_metas,
243            pages,
244            dtype.clone(),
245            metadata.0.clone(),
246            len,
247            validity,
248        ))
249    }
250
251    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
252        vortex_ensure!(
253            children.len() <= 1,
254            "PcoArray expects 0 or 1 children, got {}",
255            children.len()
256        );
257
258        if children.is_empty() {
259            array.unsliced_validity = Validity::from(array.dtype.nullability());
260        } else {
261            array.unsliced_validity =
262                Validity::Array(children.into_iter().next().vortex_expect("validity child"));
263        }
264
265        Ok(())
266    }
267
268    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
269        Ok(ExecutionStep::Done(array.decompress(ctx)?.into_array()))
270    }
271
272    fn reduce_parent(
273        array: &Self::Array,
274        parent: &ArrayRef,
275        child_idx: usize,
276    ) -> VortexResult<Option<ArrayRef>> {
277        crate::rules::RULES.evaluate(array, parent, child_idx)
278    }
279}
280
281pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
282    let ptype = dtype.as_ptype();
283    match ptype {
284        PType::F16 => NumberType::F16,
285        PType::F32 => NumberType::F32,
286        PType::F64 => NumberType::F64,
287        PType::I16 => NumberType::I16,
288        PType::I32 => NumberType::I32,
289        PType::I64 => NumberType::I64,
290        PType::U16 => NumberType::U16,
291        PType::U32 => NumberType::U32,
292        PType::U64 => NumberType::U64,
293        _ => unreachable!("PType not supported by Pco: {:?}", ptype),
294    }
295}
296
297fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
298    let mask = parray.validity_mask()?;
299    Ok(parray.clone().into_array().filter(mask)?.to_primitive())
300}
301
302pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
303    use pco::errors::ErrorKind::*;
304    match err.kind {
305        Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
306        InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
307        other => vortex_err!("Pco {:?} error: {}", other, err.message),
308    }
309}
310
311#[derive(Debug)]
312pub struct Pco;
313
314impl Pco {
315    pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
316}
317
318#[derive(Clone, Debug)]
319pub struct PcoArray {
320    pub(crate) chunk_metas: Vec<ByteBuffer>,
321    pub(crate) pages: Vec<ByteBuffer>,
322    pub(crate) metadata: PcoMetadata,
323    dtype: DType,
324    pub(crate) unsliced_validity: Validity,
325    unsliced_n_rows: usize,
326    stats_set: ArrayStats,
327    slice_start: usize,
328    slice_stop: usize,
329}
330
331impl PcoArray {
332    pub fn new(
333        chunk_metas: Vec<ByteBuffer>,
334        pages: Vec<ByteBuffer>,
335        dtype: DType,
336        metadata: PcoMetadata,
337        len: usize,
338        validity: Validity,
339    ) -> Self {
340        Self {
341            chunk_metas,
342            pages,
343            metadata,
344            dtype,
345            unsliced_validity: validity,
346            unsliced_n_rows: len,
347            stats_set: Default::default(),
348            slice_start: 0,
349            slice_stop: len,
350        }
351    }
352
353    pub fn from_primitive(
354        parray: &PrimitiveArray,
355        level: usize,
356        values_per_page: usize,
357    ) -> VortexResult<Self> {
358        Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
359    }
360
361    pub(crate) fn from_primitive_with_values_per_chunk(
362        parray: &PrimitiveArray,
363        level: usize,
364        values_per_chunk: usize,
365        values_per_page: usize,
366    ) -> VortexResult<Self> {
367        let number_type = number_type_from_dtype(parray.dtype());
368        let values_per_page = if values_per_page == 0 {
369            values_per_chunk
370        } else {
371            values_per_page
372        };
373
374        // perhaps one day we can make this more configurable
375        let chunk_config = ChunkConfig::default()
376            .with_compression_level(level)
377            .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
378
379        let values = collect_valid(parray)?;
380        let n_values = values.len();
381
382        let fc = FileCompressor::default();
383        let mut header = vec![];
384        fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
385
386        let mut chunk_meta_buffers = vec![]; // the Pco component
387        let mut chunk_infos = vec![]; // the Vortex metadata
388        let mut page_buffers = vec![];
389        for chunk_start in (0..n_values).step_by(values_per_chunk) {
390            let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
391            let mut cc = match_number_enum!(
392                number_type,
393                NumberType<T> => {
394                    let values = values.to_buffer::<T>();
395                    let chunk = &values.as_slice()[chunk_start..chunk_end];
396                    fc
397                        .chunk_compressor(chunk, &chunk_config)
398                        .map_err(vortex_err_from_pco)?
399                }
400            );
401
402            let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
403            cc.write_meta(&mut chunk_meta_buffer)
404                .map_err(vortex_err_from_pco)?;
405            chunk_meta_buffers.push(chunk_meta_buffer.freeze());
406
407            let mut page_infos = vec![];
408            for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
409                let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
410                cc.write_page(page_idx, &mut page)
411                    .map_err(vortex_err_from_pco)?;
412                page_buffers.push(page.freeze());
413                page_infos.push(PcoPageInfo {
414                    n_values: u32::try_from(page_n_values)?,
415                });
416            }
417            chunk_infos.push(PcoChunkInfo { pages: page_infos })
418        }
419
420        let metadata = PcoMetadata {
421            header,
422            chunks: chunk_infos,
423        };
424        Ok(PcoArray::new(
425            chunk_meta_buffers,
426            page_buffers,
427            parray.dtype().clone(),
428            metadata,
429            parray.len(),
430            parray.validity().clone(),
431        ))
432    }
433
434    pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
435        if let Some(parray) = array.as_opt::<Primitive>() {
436            Self::from_primitive(parray, level, nums_per_page)
437        } else {
438            Err(vortex_err!("Pco can only encode primitive arrays"))
439        }
440    }
441
442    pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
443        // To start, we figure out which chunks and pages we need to decompress, and with
444        // what value offset into the first such page.
445        let number_type = number_type_from_dtype(&self.dtype);
446        let values_byte_buffer = match_number_enum!(
447            number_type,
448            NumberType<T> => {
449              self.decompress_values_typed::<T>(ctx)?
450            }
451        );
452
453        Ok(PrimitiveArray::from_values_byte_buffer(
454            values_byte_buffer,
455            self.dtype.as_ptype(),
456            self.unsliced_validity
457                .slice(self.slice_start..self.slice_stop)?,
458            self.slice_stop - self.slice_start,
459        ))
460    }
461
462    fn decompress_values_typed<T: Number>(
463        &self,
464        ctx: &mut ExecutionCtx,
465    ) -> VortexResult<ByteBuffer> {
466        // To start, we figure out what range of values we need to decompress.
467        let slice_value_indices = self
468            .unsliced_validity
469            .execute_mask(self.unsliced_n_rows, ctx)?
470            .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
471        let slice_value_start = slice_value_indices[0];
472        let slice_value_stop = slice_value_indices[1];
473        let slice_n_values = slice_value_stop - slice_value_start;
474
475        // Then we decompress those pages into a buffer. Note that these values
476        // may exceed the bounds of the slice, so we need to slice later.
477        let (fd, _) =
478            FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
479        let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
480        let mut page_idx = 0;
481        let mut page_value_start = 0;
482        let mut n_skipped_values = 0;
483        for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
484            // lazily initialize chunk decompressor
485            let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
486            for page_info in &chunk_info.pages {
487                let page_n_values = page_info.n_values as usize;
488                let page_value_stop = page_value_start + page_n_values;
489
490                if page_value_start >= slice_value_stop {
491                    break;
492                }
493
494                if page_value_stop > slice_value_start {
495                    // we need this page
496                    let old_len = decompressed_values.len();
497                    let new_len = old_len + page_n_values;
498                    decompressed_values.reserve(page_n_values);
499                    unsafe {
500                        decompressed_values.set_len(new_len);
501                    }
502                    let page: &[u8] = self.pages[page_idx].as_ref();
503
504                    let mut cd = match chunk_decompressor.take() {
505                        Some(d) => d,
506                        None => {
507                            let (new_cd, _) = fd
508                                .chunk_decompressor(chunk_meta.as_ref())
509                                .map_err(vortex_err_from_pco)?;
510                            new_cd
511                        }
512                    };
513
514                    let mut pd = cd
515                        .page_decompressor(page, page_n_values)
516                        .map_err(vortex_err_from_pco)?;
517                    pd.read(&mut decompressed_values[old_len..new_len])
518                        .map_err(vortex_err_from_pco)?;
519
520                    chunk_decompressor = Some(cd);
521                } else {
522                    n_skipped_values += page_n_values;
523                }
524
525                page_value_start = page_value_stop;
526                page_idx += 1;
527            }
528        }
529
530        // Slice only the values requested.
531        let value_offset = slice_value_start - n_skipped_values;
532        Ok(decompressed_values
533            .freeze()
534            .slice(value_offset..value_offset + slice_n_values)
535            .into_byte_buffer())
536    }
537
538    pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
539        PcoArray {
540            slice_start: self.slice_start + start,
541            slice_stop: self.slice_start + stop,
542            stats_set: Default::default(),
543            ..self.clone()
544        }
545    }
546
547    pub(crate) fn dtype(&self) -> &DType {
548        &self.dtype
549    }
550
551    pub(crate) fn slice_start(&self) -> usize {
552        self.slice_start
553    }
554
555    pub(crate) fn slice_stop(&self) -> usize {
556        self.slice_stop
557    }
558
559    pub(crate) fn unsliced_n_rows(&self) -> usize {
560        self.unsliced_n_rows
561    }
562}
563
564impl ValiditySliceHelper for PcoArray {
565    fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
566        (&self.unsliced_validity, self.slice_start, self.slice_stop)
567    }
568}
569
570impl OperationsVTable<Pco> for Pco {
571    fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
572        let mut ctx = LEGACY_SESSION.create_execution_ctx();
573        array
574            ._slice(index, index + 1)
575            .decompress(&mut ctx)?
576            .scalar_at(0)
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use vortex_array::IntoArray;
583    use vortex_array::arrays::PrimitiveArray;
584    use vortex_array::assert_arrays_eq;
585    use vortex_array::validity::Validity;
586    use vortex_buffer::buffer;
587
588    use crate::PcoArray;
589
590    #[test]
591    fn test_slice_nullable() {
592        // Create a nullable array with some nulls
593        let values = PrimitiveArray::new(
594            buffer![10u32, 20, 30, 40, 50, 60],
595            Validity::from_iter([false, true, true, true, true, false]),
596        );
597        let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
598        assert_arrays_eq!(
599            pco,
600            PrimitiveArray::from_option_iter([
601                None,
602                Some(20u32),
603                Some(30),
604                Some(40),
605                Some(50),
606                None
607            ])
608        );
609
610        // Slice to get only the non-null values in the middle
611        let sliced = pco.slice(1..5).unwrap();
612        let expected =
613            PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
614                .into_array();
615        assert_arrays_eq!(sliced, expected);
616    }
617}