vortex_layout/layouts/
compact.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::arrays::{
5    ExtensionArray, FixedSizeListArray, ListArray, PrimitiveArray, StructArray, narrowed_decimal,
6};
7use vortex_array::vtable::ValidityHelper;
8use vortex_array::{Array, ArrayRef, Canonical, IntoArray};
9use vortex_decimal_byte_parts::DecimalBytePartsArray;
10use vortex_dtype::PType;
11use vortex_error::VortexResult;
12use vortex_pco::PcoArray;
13use vortex_scalar::DecimalValueType;
14use vortex_zstd::ZstdArray;
15
16fn is_pco_number_type(ptype: PType) -> bool {
17    matches!(
18        ptype,
19        PType::F16
20            | PType::F32
21            | PType::F64
22            | PType::I16
23            | PType::I32
24            | PType::I64
25            | PType::U16
26            | PType::U32
27            | PType::U64
28    )
29}
30
31/// A simple compressor that uses the "compact" strategy:
32/// - Pco for supported numeric types (16, 32, and 64-bit floats and ints)
33/// - Zstd for everything else (primitive arrays only)
34#[derive(Debug, Clone)]
35pub struct CompactCompressor {
36    pco_level: usize,
37    zstd_level: i32,
38    values_per_page: usize,
39}
40
41impl CompactCompressor {
42    pub fn with_pco_level(mut self, level: usize) -> Self {
43        self.pco_level = level;
44        self
45    }
46
47    pub fn with_zstd_level(mut self, level: i32) -> Self {
48        self.zstd_level = level;
49        self
50    }
51
52    /// Sets the number of non-null primitive values to store per
53    /// separately-decompressible page/frame.
54    ///
55    /// Fewer values per page can reduce the time to query a small slice of rows, but too
56    /// few can increase compressed size and (de)compression time. The default is 0, which
57    /// is used for maximally-large pages.
58    pub fn with_values_per_page(mut self, values_per_page: usize) -> Self {
59        self.values_per_page = values_per_page;
60        self
61    }
62
63    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
64        self.compress_canonical(array.to_canonical())
65    }
66
67    /// Compress a single array using the compact strategy
68    pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult<ArrayRef> {
69        let uncompressed_nbytes = canonical.as_ref().nbytes();
70        let compressed = match &canonical {
71            // TODO compress BoolArrays
72            Canonical::Primitive(primitive) => {
73                // pco for applicable numbers, zstd for everything else
74                let ptype = primitive.ptype();
75
76                if is_pco_number_type(ptype) {
77                    let pco_array =
78                        PcoArray::from_primitive(primitive, self.pco_level, self.values_per_page)?;
79                    pco_array.into_array()
80                } else {
81                    let zstd_array = ZstdArray::from_primitive(
82                        primitive,
83                        self.zstd_level,
84                        self.values_per_page,
85                    )?;
86                    zstd_array.into_array()
87                }
88            }
89            Canonical::Decimal(decimal) => {
90                let decimal = narrowed_decimal(decimal.clone());
91                let validity = decimal.validity();
92                let int_values = match decimal.values_type() {
93                    DecimalValueType::I8 => {
94                        PrimitiveArray::new(decimal.buffer::<i8>(), validity.clone())
95                    }
96                    DecimalValueType::I16 => {
97                        PrimitiveArray::new(decimal.buffer::<i16>(), validity.clone())
98                    }
99                    DecimalValueType::I32 => {
100                        PrimitiveArray::new(decimal.buffer::<i32>(), validity.clone())
101                    }
102                    DecimalValueType::I64 => {
103                        PrimitiveArray::new(decimal.buffer::<i64>(), validity.clone())
104                    }
105                    _ => {
106                        // Vortex lacks support for i128 and i256.
107                        return Ok(canonical.into_array());
108                    }
109                };
110                let compressed = self.compress_canonical(Canonical::Primitive(int_values))?;
111                DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array()
112            }
113            Canonical::VarBinView(vbv) => {
114                // always zstd
115                ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.values_per_page)?
116                    .into_array()
117            }
118            Canonical::Struct(struct_array) => {
119                // recurse
120                let fields = struct_array
121                    .fields()
122                    .iter()
123                    .map(|field| self.compress(field))
124                    .collect::<VortexResult<Vec<_>>>()?;
125
126                StructArray::try_new(
127                    struct_array.names().clone(),
128                    fields,
129                    struct_array.len(),
130                    struct_array.validity().clone(),
131                )?
132                .into_array()
133            }
134            Canonical::List(list_array) => {
135                // recurse
136                let compressed_elems = self.compress(list_array.elements())?;
137                let compressed_offsets = self.compress(list_array.offsets())?;
138
139                ListArray::try_new(
140                    compressed_elems,
141                    compressed_offsets,
142                    list_array.validity().clone(),
143                )?
144                .into_array()
145            }
146            Canonical::FixedSizeList(list_array) => {
147                // recurse
148                let compressed_elems = self.compress(list_array.elements())?;
149
150                FixedSizeListArray::try_new(
151                    compressed_elems,
152                    list_array.list_size(),
153                    list_array.validity().clone(),
154                    list_array.len(),
155                )?
156                .into_array()
157            }
158            Canonical::Extension(ext_array) => {
159                // recurse
160                let compressed_storage = self.compress(ext_array.storage())?;
161
162                ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array()
163            }
164            _ => return Ok(canonical.into_array()),
165        };
166
167        if compressed.nbytes() >= uncompressed_nbytes {
168            return Ok(canonical.into_array());
169        }
170        Ok(compressed)
171    }
172}
173
174impl Default for CompactCompressor {
175    fn default() -> Self {
176        Self {
177            pco_level: pco::DEFAULT_COMPRESSION_LEVEL,
178            zstd_level: 3,
179            // This is probably high enough to not hurt performance or
180            // compression. It also currently aligns with the default strategy's
181            // number of rows per statistic, which allows efficient pushdown
182            // (but nothing enforces this).
183            values_per_page: 8192,
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use vortex_array::arrays::{PrimitiveArray, StructArray};
191    use vortex_array::validity::Validity;
192    use vortex_array::{IntoArray, ToCanonical};
193    use vortex_buffer::buffer;
194    use vortex_dtype::FieldName;
195
196    use super::*;
197
198    #[test]
199    fn test_compact_compressor_struct_with_mixed_types() {
200        let compressor = CompactCompressor::default();
201
202        // Create a struct array containing various types
203        let columns = vec![
204            // Pco types
205            PrimitiveArray::new(buffer![1.0f64, 2.0, 3.0, 4.0, 5.0], Validity::NonNullable),
206            PrimitiveArray::new(buffer![10i32, 20, 30, 40, 50], Validity::NonNullable),
207            // Zstd types
208            PrimitiveArray::new(buffer![11u8, 22, 33, 44, 55], Validity::NonNullable),
209        ]
210        .iter()
211        .map(|a| a.clone().into_array())
212        .collect::<Vec<_>>();
213        let field_names: Vec<FieldName> =
214            vec!["f64_field".into(), "i32_field".into(), "u8_field".into()];
215
216        let n_rows = columns[0].len();
217        let struct_array = StructArray::try_new(
218            field_names.clone().into(),
219            columns.clone(),
220            n_rows,
221            Validity::NonNullable,
222        )
223        .unwrap();
224
225        // Compress the struct array
226        let compressed = compressor.compress(struct_array.as_ref()).unwrap();
227
228        // Verify we can decompress back to original
229        let decompressed = compressed.to_canonical().into_array();
230        assert_eq!(decompressed.len(), n_rows);
231        let decompressed_struct = decompressed.to_struct();
232
233        // Verify each field can be accessed and has correct data
234        for (i, name) in decompressed_struct.names().iter().enumerate() {
235            assert_eq!(name, field_names[i]);
236            let decompressed_array = decompressed_struct
237                .field_by_name(name)
238                .unwrap()
239                .to_primitive();
240            // is there no direct way to assert_eq on (primitive) arrays?
241            assert_eq!(decompressed_array.len(), n_rows);
242
243            for j in 0..n_rows {
244                assert_eq!(decompressed_array.scalar_at(j), columns[i].scalar_at(j),);
245            }
246        }
247    }
248}