vortex_layout/layouts/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::Array;
5use vortex_array::ArrayRef;
6use vortex_array::Canonical;
7use vortex_array::IntoArray;
8use vortex_array::ToCanonical;
9use vortex_array::arrays::ExtensionArray;
10use vortex_array::arrays::FixedSizeListArray;
11use vortex_array::arrays::ListViewArray;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::StructArray;
14use vortex_array::arrays::narrowed_decimal;
15use vortex_array::vtable::ValidityHelper;
16use vortex_decimal_byte_parts::DecimalBytePartsArray;
17use vortex_dtype::PType;
18use vortex_error::VortexResult;
19use vortex_pco::PcoArray;
20use vortex_scalar::DecimalType;
21use vortex_zstd::ZstdArray;
22
23fn is_pco_number_type(ptype: PType) -> bool {
24    matches!(
25        ptype,
26        PType::F16
27            | PType::F32
28            | PType::F64
29            | PType::I16
30            | PType::I32
31            | PType::I64
32            | PType::U16
33            | PType::U32
34            | PType::U64
35    )
36}
37
38/// A simple compressor that uses the "compact" strategy:
39/// - Pco for supported numeric types (16, 32, and 64-bit floats and ints)
40/// - Zstd for everything else (primitive arrays only)
41#[derive(Debug, Clone)]
42pub struct CompactCompressor {
43    pco_level: usize,
44    zstd_level: i32,
45    values_per_page: usize,
46}
47
48impl CompactCompressor {
49    pub fn with_pco_level(mut self, level: usize) -> Self {
50        self.pco_level = level;
51        self
52    }
53
54    pub fn with_zstd_level(mut self, level: i32) -> Self {
55        self.zstd_level = level;
56        self
57    }
58
59    /// Sets the number of non-null primitive values to store per
60    /// separately-decompressible page/frame.
61    ///
62    /// Fewer values per page can reduce the time to query a small slice of rows, but too
63    /// few can increase compressed size and (de)compression time. The default is 0, which
64    /// is used for maximally-large pages.
65    pub fn with_values_per_page(mut self, values_per_page: usize) -> Self {
66        self.values_per_page = values_per_page;
67        self
68    }
69
70    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
71        self.compress_canonical(array.to_canonical())
72    }
73
74    /// Compress a single array using the compact strategy
75    pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult<ArrayRef> {
76        let uncompressed_nbytes = canonical.as_ref().nbytes();
77        let compressed = match &canonical {
78            // TODO compress BoolArrays
79            Canonical::Primitive(primitive) => {
80                // pco for applicable numbers, zstd for everything else
81                let ptype = primitive.ptype();
82
83                if is_pco_number_type(ptype) {
84                    let pco_array =
85                        PcoArray::from_primitive(primitive, self.pco_level, self.values_per_page)?;
86                    pco_array.into_array()
87                } else {
88                    let zstd_array = ZstdArray::from_primitive(
89                        primitive,
90                        self.zstd_level,
91                        self.values_per_page,
92                    )?;
93                    zstd_array.into_array()
94                }
95            }
96            Canonical::Decimal(decimal) => {
97                let decimal = narrowed_decimal(decimal.clone());
98                let validity = decimal.validity();
99                let int_values = match decimal.values_type() {
100                    DecimalType::I8 => {
101                        PrimitiveArray::new(decimal.buffer::<i8>(), validity.clone())
102                    }
103                    DecimalType::I16 => {
104                        PrimitiveArray::new(decimal.buffer::<i16>(), validity.clone())
105                    }
106                    DecimalType::I32 => {
107                        PrimitiveArray::new(decimal.buffer::<i32>(), validity.clone())
108                    }
109                    DecimalType::I64 => {
110                        PrimitiveArray::new(decimal.buffer::<i64>(), validity.clone())
111                    }
112                    _ => {
113                        // Vortex lacks support for i128 and i256.
114                        return Ok(canonical.into_array());
115                    }
116                };
117                let compressed = self.compress_canonical(Canonical::Primitive(int_values))?;
118                DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array()
119            }
120            Canonical::VarBinView(vbv) => {
121                // always zstd
122                ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.values_per_page)?
123                    .into_array()
124            }
125            Canonical::Struct(struct_array) => {
126                // recurse
127                let fields = struct_array
128                    .fields()
129                    .iter()
130                    .map(|field| self.compress(field))
131                    .collect::<VortexResult<Vec<_>>>()?;
132
133                StructArray::try_new(
134                    struct_array.names().clone(),
135                    fields,
136                    struct_array.len(),
137                    struct_array.validity().clone(),
138                )?
139                .into_array()
140            }
141            Canonical::List(listview) => {
142                let compressed_elems = self.compress(listview.elements())?;
143
144                // Note that since the type of our offsets and sizes is not encoded in our `DType`,
145                // we can narrow the widths.
146                let compressed_offsets =
147                    self.compress(&listview.offsets().to_primitive().narrow()?.into_array())?;
148                let compressed_sizes =
149                    self.compress(&listview.sizes().to_primitive().narrow()?.into_array())?;
150
151                // SAFETY: Since compression does not change the logical values of arrays, this is
152                // effectively the same array but represented differently, so all invariants that
153                // were previously upheld by the valid `ListViewArray` are still upheld.
154                // If the original was zero-copyable to list, compression maintains that property.
155                unsafe {
156                    ListViewArray::new_unchecked(
157                        compressed_elems,
158                        compressed_offsets,
159                        compressed_sizes,
160                        listview.validity().clone(),
161                    )
162                    .with_zero_copy_to_list(listview.is_zero_copy_to_list())
163                }
164                .into_array()
165            }
166            Canonical::FixedSizeList(fsl) => {
167                let compressed_elems = self.compress(fsl.elements())?;
168
169                FixedSizeListArray::try_new(
170                    compressed_elems,
171                    fsl.list_size(),
172                    fsl.validity().clone(),
173                    fsl.len(),
174                )?
175                .into_array()
176            }
177            Canonical::Extension(ext_array) => {
178                let compressed_storage = self.compress(ext_array.storage())?;
179
180                ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array()
181            }
182            _ => return Ok(canonical.into_array()),
183        };
184
185        if compressed.nbytes() >= uncompressed_nbytes {
186            return Ok(canonical.into_array());
187        }
188        Ok(compressed)
189    }
190}
191
192impl Default for CompactCompressor {
193    fn default() -> Self {
194        Self {
195            pco_level: pco::DEFAULT_COMPRESSION_LEVEL,
196            zstd_level: 3,
197            // This is probably high enough to not hurt performance or
198            // compression. It also currently aligns with the default strategy's
199            // number of rows per statistic, which allows efficient pushdown
200            // (but nothing enforces this).
201            values_per_page: 8192,
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use vortex_array::IntoArray;
209    use vortex_array::ToCanonical;
210    use vortex_array::arrays::PrimitiveArray;
211    use vortex_array::arrays::StructArray;
212    use vortex_array::assert_arrays_eq;
213    use vortex_array::validity::Validity;
214    use vortex_buffer::buffer;
215    use vortex_dtype::FieldName;
216
217    use super::*;
218
219    #[test]
220    fn test_compact_compressor_struct_with_mixed_types() {
221        let compressor = CompactCompressor::default();
222
223        // Create a struct array containing various types
224        let columns = vec![
225            // Pco types
226            PrimitiveArray::new(buffer![1.0f64, 2.0, 3.0, 4.0, 5.0], Validity::NonNullable),
227            PrimitiveArray::new(buffer![10i32, 20, 30, 40, 50], Validity::NonNullable),
228            // Zstd types
229            PrimitiveArray::new(buffer![11u8, 22, 33, 44, 55], Validity::NonNullable),
230        ]
231        .iter()
232        .map(|a| a.clone().into_array())
233        .collect::<Vec<_>>();
234        let field_names: Vec<FieldName> =
235            vec!["f64_field".into(), "i32_field".into(), "u8_field".into()];
236
237        let n_rows = columns[0].len();
238        let struct_array = StructArray::try_new(
239            field_names.clone().into(),
240            columns.clone(),
241            n_rows,
242            Validity::NonNullable,
243        )
244        .unwrap();
245
246        // Compress the struct array
247        let compressed = compressor.compress(struct_array.as_ref()).unwrap();
248
249        // Verify we can decompress back to original
250        let decompressed = compressed.to_canonical().into_array();
251        assert_eq!(decompressed.len(), n_rows);
252        let decompressed_struct = decompressed.to_struct();
253
254        // Verify each field can be accessed and has correct data
255        for (i, name) in decompressed_struct.names().iter().enumerate() {
256            assert_eq!(name, field_names[i]);
257            let decompressed_array = decompressed_struct.field_by_name(name).unwrap().clone();
258            assert_eq!(decompressed_array.len(), n_rows);
259
260            assert_arrays_eq!(decompressed_array.as_ref(), columns[i].as_ref());
261        }
262    }
263}