Skip to main content

vortex_pco/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::fmt::Debug;
6use std::fmt::Display;
7use std::fmt::Formatter;
8use std::hash::Hash;
9use std::hash::Hasher;
10
11use pco::ChunkConfig;
12use pco::PagingSpec;
13use pco::data_types::Number;
14use pco::data_types::NumberType;
15use pco::errors::PcoError;
16use pco::match_number_enum;
17use pco::wrapped::ChunkDecompressor;
18use pco::wrapped::FileCompressor;
19use pco::wrapped::FileDecompressor;
20use prost::Message;
21use vortex_array::Array;
22use vortex_array::ArrayEq;
23use vortex_array::ArrayHash;
24use vortex_array::ArrayId;
25use vortex_array::ArrayParts;
26use vortex_array::ArrayRef;
27use vortex_array::ArrayView;
28use vortex_array::ExecutionCtx;
29use vortex_array::ExecutionResult;
30use vortex_array::IntoArray;
31use vortex_array::LEGACY_SESSION;
32use vortex_array::Precision;
33use vortex_array::ToCanonical;
34use vortex_array::VortexSessionExecute;
35use vortex_array::arrays::Primitive;
36use vortex_array::arrays::PrimitiveArray;
37use vortex_array::buffer::BufferHandle;
38use vortex_array::dtype::DType;
39use vortex_array::dtype::PType;
40use vortex_array::dtype::half;
41use vortex_array::scalar::Scalar;
42use vortex_array::serde::ArrayChildren;
43use vortex_array::validity::Validity;
44use vortex_array::vtable::OperationsVTable;
45use vortex_array::vtable::VTable;
46use vortex_array::vtable::ValidityVTable;
47use vortex_array::vtable::child_to_validity;
48use vortex_array::vtable::validity_to_child;
49use vortex_buffer::BufferMut;
50use vortex_buffer::ByteBuffer;
51use vortex_buffer::ByteBufferMut;
52use vortex_error::VortexError;
53use vortex_error::VortexResult;
54use vortex_error::vortex_bail;
55use vortex_error::vortex_ensure;
56use vortex_error::vortex_err;
57use vortex_session::VortexSession;
58use vortex_session::registry::CachedId;
59
60use crate::PcoChunkInfo;
61use crate::PcoMetadata;
62use crate::PcoPageInfo;
63
64// Overall approach here:
65// Chunk the array into Pco chunks (currently using the default recommended size
66// for good compression), and into finer-grained Pco pages. As we go, write each
67// ChunkMeta as a buffer, followed by each of that chunk's pages as a buffer. We
68// store metadata for each of these "components" (chunk or page). At
69// decompression time, we figure out which components we need to read and only
70// process those. We only compress and decompress valid values.
71
72// Visually, during decompression, we have an interval of pages we're
73// decompressing and a tighter interval of the slice we actually care about.
74// |=============values (all valid elements)==============|
75// |<-n_skipped_values->|----decompressed_values------|
76//                          |----slice_values----|
77//                          ^                    ^
78// |<---slice_value_start-->|<--slice_n_values-->|
79// We then insert these values to the correct position using a primitive array
80// constructor.
81
82const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
83
84/// A [`Pco`]-encoded Vortex array.
85pub type PcoArray = Array<Pco>;
86
87impl ArrayHash for PcoData {
88    fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
89        self.unsliced_n_rows.hash(state);
90        self.slice_start.hash(state);
91        self.slice_stop.hash(state);
92        // Hash chunk_metas and pages using pointer-based hashing
93        for chunk_meta in &self.chunk_metas {
94            chunk_meta.array_hash(state, precision);
95        }
96        for page in &self.pages {
97            page.array_hash(state, precision);
98        }
99    }
100}
101
102impl ArrayEq for PcoData {
103    fn array_eq(&self, other: &Self, precision: Precision) -> bool {
104        if self.unsliced_n_rows != other.unsliced_n_rows
105            || self.slice_start != other.slice_start
106            || self.slice_stop != other.slice_stop
107            || self.chunk_metas.len() != other.chunk_metas.len()
108            || self.pages.len() != other.pages.len()
109        {
110            return false;
111        }
112        for (a, b) in self.chunk_metas.iter().zip(&other.chunk_metas) {
113            if !a.array_eq(b, precision) {
114                return false;
115            }
116        }
117        for (a, b) in self.pages.iter().zip(&other.pages) {
118            if !a.array_eq(b, precision) {
119                return false;
120            }
121        }
122        true
123    }
124}
125
126impl VTable for Pco {
127    type ArrayData = PcoData;
128
129    type OperationsVTable = Self;
130    type ValidityVTable = Self;
131
132    fn id(&self) -> ArrayId {
133        static ID: CachedId = CachedId::new("vortex.pco");
134        *ID
135    }
136
137    fn validate(
138        &self,
139        data: &PcoData,
140        dtype: &DType,
141        len: usize,
142        slots: &[Option<ArrayRef>],
143    ) -> VortexResult<()> {
144        let validity = child_to_validity(&slots[0], dtype.nullability());
145        data.validate(dtype, len, &validity)
146    }
147
148    fn nbuffers(array: ArrayView<'_, Self>) -> usize {
149        array.chunk_metas.len() + array.pages.len()
150    }
151
152    fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
153        if idx < array.chunk_metas.len() {
154            BufferHandle::new_host(array.chunk_metas[idx].clone())
155        } else {
156            let page_idx = idx - array.chunk_metas.len();
157            BufferHandle::new_host(array.pages[page_idx].clone())
158        }
159    }
160
161    fn buffer_name(array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
162        if idx < array.chunk_metas.len() {
163            Some(format!("chunk_meta_{idx}"))
164        } else {
165            Some(format!("page_{}", idx - array.chunk_metas.len()))
166        }
167    }
168
169    fn serialize(
170        array: ArrayView<'_, Self>,
171        _session: &VortexSession,
172    ) -> VortexResult<Option<Vec<u8>>> {
173        Ok(Some(array.metadata.clone().encode_to_vec()))
174    }
175
176    fn deserialize(
177        &self,
178        dtype: &DType,
179        len: usize,
180        metadata: &[u8],
181        buffers: &[BufferHandle],
182        children: &dyn ArrayChildren,
183        _session: &VortexSession,
184    ) -> VortexResult<ArrayParts<Self>> {
185        let metadata = PcoMetadata::decode(metadata)?;
186        let validity = if children.is_empty() {
187            Validity::from(dtype.nullability())
188        } else if children.len() == 1 {
189            let validity = children.get(0, &Validity::DTYPE, len)?;
190            Validity::Array(validity)
191        } else {
192            vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
193        };
194
195        vortex_ensure!(buffers.len() >= metadata.chunks.len());
196        let chunk_metas = buffers[..metadata.chunks.len()]
197            .iter()
198            .map(|b| b.clone().try_to_host_sync())
199            .collect::<VortexResult<Vec<_>>>()?;
200        let pages = buffers[metadata.chunks.len()..]
201            .iter()
202            .map(|b| b.clone().try_to_host_sync())
203            .collect::<VortexResult<Vec<_>>>()?;
204
205        let expected_n_pages = metadata
206            .chunks
207            .iter()
208            .map(|info| info.pages.len())
209            .sum::<usize>();
210        vortex_ensure!(pages.len() == expected_n_pages);
211
212        let slots = vec![validity_to_child(&validity, len)];
213        let data = PcoData::new(chunk_metas, pages, dtype.as_ptype(), metadata, len);
214        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
215    }
216
217    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
218        SLOT_NAMES[idx].to_string()
219    }
220
221    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
222        let unsliced_validity =
223            child_to_validity(&array.as_ref().slots()[0], array.dtype().nullability());
224        Ok(ExecutionResult::done(
225            array
226                .data()
227                .decompress(&unsliced_validity, ctx)?
228                .into_array(),
229        ))
230    }
231
232    fn reduce_parent(
233        array: ArrayView<'_, Self>,
234        parent: &ArrayRef,
235        child_idx: usize,
236    ) -> VortexResult<Option<ArrayRef>> {
237        crate::rules::RULES.evaluate(array, parent, child_idx)
238    }
239}
240
241pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
242    number_type_from_ptype(dtype.as_ptype())
243}
244
245pub(crate) fn number_type_from_ptype(ptype: PType) -> NumberType {
246    match ptype {
247        PType::F16 => NumberType::F16,
248        PType::F32 => NumberType::F32,
249        PType::F64 => NumberType::F64,
250        PType::I16 => NumberType::I16,
251        PType::I32 => NumberType::I32,
252        PType::I64 => NumberType::I64,
253        PType::U16 => NumberType::U16,
254        PType::U32 => NumberType::U32,
255        PType::U64 => NumberType::U64,
256        _ => unreachable!("PType not supported by Pco: {:?}", ptype),
257    }
258}
259
260fn collect_valid(parray: ArrayView<'_, Primitive>) -> VortexResult<PrimitiveArray> {
261    let mask = parray.array().validity()?.to_mask(
262        parray.array().len(),
263        &mut LEGACY_SESSION.create_execution_ctx(),
264    )?;
265    Ok(parray.array().filter(mask)?.to_primitive())
266}
267
268pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
269    use pco::errors::ErrorKind::*;
270    match err.kind {
271        Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
272        InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
273        other => vortex_err!("Pco {:?} error: {}", other, err.message),
274    }
275}
276
277#[derive(Clone, Debug)]
278pub struct Pco;
279
280impl Pco {
281    pub(crate) fn try_new(
282        dtype: DType,
283        data: PcoData,
284        validity: Validity,
285    ) -> VortexResult<PcoArray> {
286        let len = data.len();
287        data.validate(&dtype, len, &validity)?;
288        let slots = vec![validity_to_child(&validity, data.unsliced_n_rows())];
289        Ok(unsafe {
290            Array::from_parts_unchecked(ArrayParts::new(Pco, dtype, len, data).with_slots(slots))
291        })
292    }
293
294    /// Compress a primitive array using pcodec.
295    pub fn from_primitive(
296        parray: ArrayView<'_, Primitive>,
297        level: usize,
298        values_per_page: usize,
299    ) -> VortexResult<PcoArray> {
300        let dtype = parray.dtype().clone();
301        let validity = parray.validity()?;
302        let data = PcoData::from_primitive(parray, level, values_per_page)?;
303        Self::try_new(dtype, data, validity)
304    }
305}
306
307/// The validity bitmap indicating which elements are non-null.
308pub(super) const NUM_SLOTS: usize = 1;
309pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
310
311#[derive(Clone, Debug)]
312pub struct PcoData {
313    pub(crate) chunk_metas: Vec<ByteBuffer>,
314    pub(crate) pages: Vec<ByteBuffer>,
315    pub(crate) metadata: PcoMetadata,
316    ptype: PType,
317    unsliced_n_rows: usize,
318    slice_start: usize,
319    slice_stop: usize,
320}
321
322impl Display for PcoData {
323    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
324        write!(
325            f,
326            "ptype: {}, nrows: {}, slice: {}..{}",
327            self.ptype, self.unsliced_n_rows, self.slice_start, self.slice_stop
328        )
329    }
330}
331
332impl PcoData {
333    pub fn validate(&self, dtype: &DType, len: usize, validity: &Validity) -> VortexResult<()> {
334        let _ = number_type_from_ptype(self.ptype);
335        vortex_ensure!(
336            dtype.as_ptype() == self.ptype,
337            "expected ptype {}, got {}",
338            self.ptype,
339            dtype.as_ptype()
340        );
341        vortex_ensure!(
342            dtype.nullability() == validity.nullability(),
343            "expected nullability {}, got {}",
344            validity.nullability(),
345            dtype.nullability()
346        );
347        vortex_ensure!(
348            self.slice_start <= self.slice_stop && self.slice_stop <= self.unsliced_n_rows,
349            "invalid slice range {}..{} for {} rows",
350            self.slice_start,
351            self.slice_stop,
352            self.unsliced_n_rows
353        );
354        vortex_ensure!(
355            self.slice_stop - self.slice_start == len,
356            "expected len {len}, got {}",
357            self.slice_stop - self.slice_start
358        );
359        if let Some(validity_len) = validity.maybe_len() {
360            vortex_ensure!(
361                validity_len == self.unsliced_n_rows,
362                "expected validity len {}, got {}",
363                self.unsliced_n_rows,
364                validity_len
365            );
366        }
367        vortex_ensure!(
368            self.chunk_metas.len() == self.metadata.chunks.len(),
369            "expected {} chunk metas, got {}",
370            self.metadata.chunks.len(),
371            self.chunk_metas.len()
372        );
373        vortex_ensure!(
374            self.pages.len()
375                == self
376                    .metadata
377                    .chunks
378                    .iter()
379                    .map(|chunk| chunk.pages.len())
380                    .sum::<usize>(),
381            "page count does not match metadata"
382        );
383        Ok(())
384    }
385
386    pub fn new(
387        chunk_metas: Vec<ByteBuffer>,
388        pages: Vec<ByteBuffer>,
389        ptype: PType,
390        metadata: PcoMetadata,
391        len: usize,
392    ) -> Self {
393        Self {
394            chunk_metas,
395            pages,
396            metadata,
397            ptype,
398            unsliced_n_rows: len,
399            slice_start: 0,
400            slice_stop: len,
401        }
402    }
403
404    pub fn from_primitive(
405        parray: ArrayView<'_, Primitive>,
406        level: usize,
407        values_per_page: usize,
408    ) -> VortexResult<Self> {
409        Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
410    }
411
412    pub(crate) fn from_primitive_with_values_per_chunk(
413        parray: ArrayView<'_, Primitive>,
414        level: usize,
415        values_per_chunk: usize,
416        values_per_page: usize,
417    ) -> VortexResult<Self> {
418        let number_type = number_type_from_dtype(parray.dtype());
419        let values_per_page = if values_per_page == 0 {
420            values_per_chunk
421        } else {
422            values_per_page
423        };
424
425        // perhaps one day we can make this more configurable
426        let chunk_config = ChunkConfig::default()
427            .with_compression_level(level)
428            .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
429
430        let values = collect_valid(parray)?;
431        let n_values = values.len();
432
433        let fc = FileCompressor::default();
434        let mut header = vec![];
435        fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
436
437        let mut chunk_meta_buffers = vec![]; // the Pco component
438        let mut chunk_infos = vec![]; // the Vortex metadata
439        let mut page_buffers = vec![];
440        for chunk_start in (0..n_values).step_by(values_per_chunk) {
441            let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
442            let mut cc = match_number_enum!(
443                number_type,
444                NumberType<T> => {
445                    let values = values.to_buffer::<T>();
446                    let chunk = &values.as_slice()[chunk_start..chunk_end];
447                    fc
448                        .chunk_compressor(chunk, &chunk_config)
449                        .map_err(vortex_err_from_pco)?
450                }
451            );
452
453            let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
454            cc.write_meta(&mut chunk_meta_buffer)
455                .map_err(vortex_err_from_pco)?;
456            chunk_meta_buffers.push(chunk_meta_buffer.freeze());
457
458            let mut page_infos = vec![];
459            for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
460                let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
461                cc.write_page(page_idx, &mut page)
462                    .map_err(vortex_err_from_pco)?;
463                page_buffers.push(page.freeze());
464                page_infos.push(PcoPageInfo {
465                    n_values: u32::try_from(page_n_values)?,
466                });
467            }
468            chunk_infos.push(PcoChunkInfo { pages: page_infos })
469        }
470
471        let metadata = PcoMetadata {
472            header,
473            chunks: chunk_infos,
474        };
475        Ok(PcoData::new(
476            chunk_meta_buffers,
477            page_buffers,
478            parray.dtype().as_ptype(),
479            metadata,
480            parray.len(),
481        ))
482    }
483
484    pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
485        let parray = array.try_downcast::<Primitive>().map_err(|a| {
486            vortex_err!(
487                "Pco can only encode primitive arrays, got {}",
488                a.encoding_id()
489            )
490        })?;
491        Self::from_primitive(parray.as_view(), level, nums_per_page)
492    }
493
494    pub fn decompress(
495        &self,
496        unsliced_validity: &Validity,
497        ctx: &mut ExecutionCtx,
498    ) -> VortexResult<PrimitiveArray> {
499        // To start, we figure out which chunks and pages we need to decompress, and with
500        // what value offset into the first such page.
501        let number_type = number_type_from_ptype(self.ptype);
502        let values_byte_buffer = match_number_enum!(
503            number_type,
504            NumberType<T> => {
505              self.decompress_values_typed::<T>(unsliced_validity, ctx)?
506            }
507        );
508
509        Ok(PrimitiveArray::from_values_byte_buffer(
510            values_byte_buffer,
511            self.ptype,
512            unsliced_validity.slice(self.slice_start..self.slice_stop)?,
513            self.slice_stop - self.slice_start,
514        ))
515    }
516
517    fn decompress_values_typed<T: Number>(
518        &self,
519        unsliced_validity: &Validity,
520        ctx: &mut ExecutionCtx,
521    ) -> VortexResult<ByteBuffer> {
522        // To start, we figure out what range of values we need to decompress.
523        let slice_value_indices = unsliced_validity
524            .execute_mask(self.unsliced_n_rows, ctx)?
525            .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
526        let slice_value_start = slice_value_indices[0];
527        let slice_value_stop = slice_value_indices[1];
528        let slice_n_values = slice_value_stop - slice_value_start;
529
530        // Then we decompress those pages into a buffer. Note that these values
531        // may exceed the bounds of the slice, so we need to slice later.
532        let (fd, _) =
533            FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
534        let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
535        let mut page_idx = 0;
536        let mut page_value_start = 0;
537        let mut n_skipped_values = 0;
538        for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
539            // lazily initialize chunk decompressor
540            let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
541            for page_info in &chunk_info.pages {
542                let page_n_values = page_info.n_values as usize;
543                let page_value_stop = page_value_start + page_n_values;
544
545                if page_value_start >= slice_value_stop {
546                    break;
547                }
548
549                if page_value_stop > slice_value_start {
550                    // we need this page
551                    let old_len = decompressed_values.len();
552                    let new_len = old_len + page_n_values;
553                    decompressed_values.reserve(page_n_values);
554                    unsafe {
555                        decompressed_values.set_len(new_len);
556                    }
557                    let page: &[u8] = self.pages[page_idx].as_ref();
558
559                    let mut cd = match chunk_decompressor.take() {
560                        Some(d) => d,
561                        None => {
562                            let (new_cd, _) = fd
563                                .chunk_decompressor(chunk_meta.as_ref())
564                                .map_err(vortex_err_from_pco)?;
565                            new_cd
566                        }
567                    };
568
569                    let mut pd = cd
570                        .page_decompressor(page, page_n_values)
571                        .map_err(vortex_err_from_pco)?;
572                    pd.read(&mut decompressed_values[old_len..new_len])
573                        .map_err(vortex_err_from_pco)?;
574
575                    chunk_decompressor = Some(cd);
576                } else {
577                    n_skipped_values += page_n_values;
578                }
579
580                page_value_start = page_value_stop;
581                page_idx += 1;
582            }
583        }
584
585        // Slice only the values requested.
586        let value_offset = slice_value_start - n_skipped_values;
587        Ok(decompressed_values
588            .freeze()
589            .slice(value_offset..value_offset + slice_n_values)
590            .into_byte_buffer())
591    }
592
593    pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
594        PcoData {
595            slice_start: self.slice_start + start,
596            slice_stop: self.slice_start + stop,
597            ..self.clone()
598        }
599    }
600
601    /// Returns the number of elements in the array.
602    pub fn len(&self) -> usize {
603        self.slice_stop - self.slice_start
604    }
605
606    /// Returns `true` if the array contains no elements.
607    pub fn is_empty(&self) -> bool {
608        self.slice_stop == self.slice_start
609    }
610
611    pub(crate) fn slice_start(&self) -> usize {
612        self.slice_start
613    }
614
615    pub(crate) fn slice_stop(&self) -> usize {
616        self.slice_stop
617    }
618
619    pub(crate) fn unsliced_n_rows(&self) -> usize {
620        self.unsliced_n_rows
621    }
622}
623
624impl ValidityVTable<Pco> for Pco {
625    fn validity(array: ArrayView<'_, Pco>) -> VortexResult<Validity> {
626        let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
627        unsliced_validity.slice(array.slice_start()..array.slice_stop())
628    }
629}
630
631impl OperationsVTable<Pco> for Pco {
632    fn scalar_at(
633        array: ArrayView<'_, Pco>,
634        index: usize,
635        ctx: &mut ExecutionCtx,
636    ) -> VortexResult<Scalar> {
637        let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
638        array
639            ._slice(index, index + 1)
640            .decompress(&unsliced_validity, ctx)?
641            .into_array()
642            .execute_scalar(0, ctx)
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use vortex_array::IntoArray;
649    use vortex_array::arrays::PrimitiveArray;
650    use vortex_array::assert_arrays_eq;
651    use vortex_array::validity::Validity;
652    use vortex_buffer::buffer;
653
654    use crate::Pco;
655
656    #[test]
657    fn test_slice_nullable() {
658        // Create a nullable array with some nulls
659        let values = PrimitiveArray::new(
660            buffer![10u32, 20, 30, 40, 50, 60],
661            Validity::from_iter([false, true, true, true, true, false]),
662        );
663        let pco = Pco::from_primitive(values.as_view(), 0, 128).unwrap();
664        assert_arrays_eq!(
665            pco,
666            PrimitiveArray::from_option_iter([
667                None,
668                Some(20u32),
669                Some(30),
670                Some(40),
671                Some(50),
672                None
673            ])
674        );
675
676        // Slice to get only the non-null values in the middle
677        let sliced = pco.slice(1..5).unwrap();
678        let expected =
679            PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
680                .into_array();
681        assert_arrays_eq!(sliced, expected);
682    }
683}