Skip to main content

vortex_btrblocks/
canonical_compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Canonical array compression implementation.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::CanonicalValidity;
9use vortex_array::DynArray;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical;
13use vortex_array::VortexSessionExecute;
14use vortex_array::aggregate_fn::fns::is_constant::is_constant;
15use vortex_array::arrays::ConstantArray;
16use vortex_array::arrays::ExtensionArray;
17use vortex_array::arrays::FixedSizeListArray;
18use vortex_array::arrays::ListArray;
19use vortex_array::arrays::ListViewArray;
20use vortex_array::arrays::StructArray;
21use vortex_array::arrays::TemporalArray;
22use vortex_array::arrays::listview::list_from_list_view;
23use vortex_array::dtype::DType;
24use vortex_array::dtype::Nullability;
25use vortex_array::extension::datetime::TemporalMetadata;
26use vortex_array::vtable::ValidityHelper;
27use vortex_error::VortexResult;
28use vortex_error::vortex_bail;
29
30use crate::BtrBlocksCompressorBuilder;
31use crate::CompressorContext;
32use crate::CompressorExt;
33use crate::Excludes;
34use crate::FloatCompressor;
35use crate::IntCode;
36use crate::IntCompressor;
37use crate::StringCompressor;
38use crate::compressor::decimal::compress_decimal;
39use crate::compressor::float::FloatScheme;
40use crate::compressor::integer::IntegerScheme;
41use crate::compressor::string::StringScheme;
42use crate::compressor::temporal::compress_temporal;
43
44/// Trait for compressors that can compress canonical arrays.
45///
46/// Provides access to configured compression schemes and the ability to
47/// compress canonical arrays recursively.
48pub trait CanonicalCompressor {
49    /// Compresses a canonical array with the specified options.
50    fn compress_canonical(
51        &self,
52        array: Canonical,
53        ctx: CompressorContext,
54        excludes: Excludes,
55    ) -> VortexResult<ArrayRef>;
56
57    /// Returns the enabled integer compression schemes.
58    fn int_schemes(&self) -> &[&'static dyn IntegerScheme];
59
60    /// Returns the enabled float compression schemes.
61    fn float_schemes(&self) -> &[&'static dyn FloatScheme];
62
63    /// Returns the enabled string compression schemes.
64    fn string_schemes(&self) -> &[&'static dyn StringScheme];
65}
66
67/// The main compressor type implementing BtrBlocks-inspired compression.
68///
69/// This compressor applies adaptive compression schemes to arrays based on their data types
70/// and characteristics. It recursively compresses nested structures like structs and lists,
71/// and chooses optimal compression schemes for primitive types.
72///
73/// The compressor works by:
74/// 1. Canonicalizing input arrays to a standard representation
75/// 2. Analyzing data characteristics to choose optimal compression schemes
76/// 3. Recursively compressing nested structures
77/// 4. Applying type-specific compression for primitives, strings, and temporal data
78///
79/// Use [`BtrBlocksCompressorBuilder`] to configure which compression schemes are enabled.
80///
81/// # Examples
82///
83/// ```rust
84/// use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, IntCode};
85///
86/// // Default compressor - all schemes allowed
87/// let compressor = BtrBlocksCompressor::default();
88///
89/// // Exclude specific schemes using the builder
90/// let compressor = BtrBlocksCompressorBuilder::default()
91///     .exclude_int([IntCode::Dict])
92///     .build();
93/// ```
94#[derive(Clone)]
95pub struct BtrBlocksCompressor {
96    /// Integer compressor with configured schemes.
97    pub int_schemes: Vec<&'static dyn IntegerScheme>,
98
99    /// Float compressor with configured schemes.
100    pub float_schemes: Vec<&'static dyn FloatScheme>,
101
102    /// String compressor with configured schemes.
103    pub string_schemes: Vec<&'static dyn StringScheme>,
104}
105
106impl Default for BtrBlocksCompressor {
107    fn default() -> Self {
108        BtrBlocksCompressorBuilder::default().build()
109    }
110}
111
112impl BtrBlocksCompressor {
113    /// Compresses an array using BtrBlocks-inspired compression.
114    ///
115    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
116    pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
117        // Canonicalize the array
118        // TODO(joe): receive `ctx` and use it.
119        let canonical = array
120            .clone()
121            .execute::<CanonicalValidity>(&mut LEGACY_SESSION.create_execution_ctx())?
122            .0;
123
124        // Compact it, removing any wasted space before we attempt to compress it
125        let compact = canonical.compact()?;
126
127        self.compress_canonical(compact, CompressorContext::default(), Excludes::none())
128    }
129
130    pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> {
131        IntCompressor {
132            btr_blocks_compressor: self,
133        }
134    }
135
136    pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> {
137        FloatCompressor {
138            btr_blocks_compressor: self,
139        }
140    }
141
142    pub(crate) fn string_compressor(&self) -> StringCompressor<'_> {
143        StringCompressor {
144            btr_blocks_compressor: self,
145        }
146    }
147
148    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
149    fn compress_list_array(
150        &self,
151        list_array: ListArray,
152        ctx: CompressorContext,
153    ) -> VortexResult<ArrayRef> {
154        // Reset the offsets to remove garbage data that might prevent us from narrowing our
155        // offsets (there could be a large amount of trailing garbage data that the current
156        // views do not reference at all).
157        let list_array = list_array.reset_offsets(true)?;
158
159        let compressed_elems = self.compress(list_array.elements())?;
160
161        // Note that since the type of our offsets are not encoded in our `DType`, and since
162        // we guarantee above that all elements are referenced by offsets, we may narrow the
163        // widths.
164        let compressed_offsets = self.compress_canonical(
165            Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
166            ctx,
167            Excludes::from(&[IntCode::Dict]),
168        )?;
169
170        Ok(ListArray::try_new(
171            compressed_elems,
172            compressed_offsets,
173            list_array.validity().clone(),
174        )?
175        .into_array())
176    }
177
178    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
179    /// elements.
180    fn compress_list_view_array(
181        &self,
182        list_view: ListViewArray,
183        ctx: CompressorContext,
184    ) -> VortexResult<ArrayRef> {
185        let compressed_elems = self.compress(list_view.elements())?;
186        let compressed_offsets = self.compress_canonical(
187            Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
188            ctx,
189            Excludes::none(),
190        )?;
191        let compressed_sizes = self.compress_canonical(
192            Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
193            ctx,
194            Excludes::none(),
195        )?;
196        Ok(ListViewArray::try_new(
197            compressed_elems,
198            compressed_offsets,
199            compressed_sizes,
200            list_view.validity().clone(),
201        )?
202        .into_array())
203    }
204}
205
206impl CanonicalCompressor for BtrBlocksCompressor {
207    /// Compresses a canonical array by dispatching to type-specific compressors.
208    ///
209    /// Recursively compresses nested structures and applies optimal schemes for each data type.
210    fn compress_canonical(
211        &self,
212        array: Canonical,
213        ctx: CompressorContext,
214        excludes: Excludes,
215    ) -> VortexResult<ArrayRef> {
216        match array {
217            Canonical::Null(null_array) => Ok(null_array.into_array()),
218            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
219            Canonical::Primitive(primitive) => {
220                if primitive.ptype().is_int() {
221                    self.integer_compressor()
222                        .compress(self, &primitive, ctx, excludes.int)
223                } else {
224                    self.float_compressor()
225                        .compress(self, &primitive, ctx, excludes.float)
226                }
227            }
228            Canonical::Decimal(decimal) => compress_decimal(self, &decimal),
229            Canonical::Struct(struct_array) => {
230                let fields = struct_array
231                    .unmasked_fields()
232                    .iter()
233                    .map(|field| self.compress(field))
234                    .collect::<Result<Vec<_>, _>>()?;
235
236                Ok(StructArray::try_new(
237                    struct_array.names().clone(),
238                    fields,
239                    struct_array.len(),
240                    struct_array.validity().clone(),
241                )?
242                .into_array())
243            }
244            Canonical::List(list_view_array) => {
245                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
246                    // Offsets are already monotonic and non-overlapping, so we
247                    // can drop the sizes array and compress as a ListArray.
248                    let list_array = list_from_list_view(list_view_array)?;
249                    self.compress_list_array(list_array, ctx)
250                } else {
251                    self.compress_list_view_array(list_view_array, ctx)
252                }
253            }
254            Canonical::FixedSizeList(fsl_array) => {
255                let compressed_elems = self.compress(fsl_array.elements())?;
256
257                Ok(FixedSizeListArray::try_new(
258                    compressed_elems,
259                    fsl_array.list_size(),
260                    fsl_array.validity().clone(),
261                    fsl_array.len(),
262                )?
263                .into_array())
264            }
265            Canonical::VarBinView(strings) => {
266                if strings
267                    .dtype()
268                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
269                {
270                    self.string_compressor()
271                        .compress(self, &strings, ctx, excludes.string)
272                } else {
273                    // Binary arrays do not compress
274                    Ok(strings.into_array())
275                }
276            }
277            Canonical::Extension(ext_array) => {
278                // We compress Timestamp-level arrays with DateTimeParts compression
279                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.clone().into_array())
280                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
281                {
282                    let mut ctx = LEGACY_SESSION.create_execution_ctx();
283                    if is_constant(&ext_array.clone().into_array(), &mut ctx)? {
284                        return Ok(ConstantArray::new(
285                            temporal_array.as_ref().scalar_at(0)?,
286                            ext_array.len(),
287                        )
288                        .into_array());
289                    }
290                    return compress_temporal(self, temporal_array);
291                }
292
293                // Compress the underlying storage array.
294                let compressed_storage = self.compress(ext_array.storage_array())?;
295
296                Ok(
297                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
298                        .into_array(),
299                )
300            }
301            Canonical::Variant(_) => {
302                vortex_bail!("Variant arrays can not be compressed")
303            }
304        }
305    }
306
307    fn int_schemes(&self) -> &[&'static dyn IntegerScheme] {
308        &self.int_schemes
309    }
310
311    fn float_schemes(&self) -> &[&'static dyn FloatScheme] {
312        &self.float_schemes
313    }
314
315    fn string_schemes(&self) -> &[&'static dyn StringScheme] {
316        &self.string_schemes
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use rstest::rstest;
323    use vortex_array::DynArray;
324    use vortex_array::IntoArray;
325    use vortex_array::arrays::List;
326    use vortex_array::arrays::ListView;
327    use vortex_array::arrays::ListViewArray;
328    use vortex_array::assert_arrays_eq;
329    use vortex_array::validity::Validity;
330    use vortex_buffer::buffer;
331    use vortex_error::VortexResult;
332
333    use crate::BtrBlocksCompressor;
334
335    #[rstest]
336    #[case::zctl(
337        unsafe {
338            ListViewArray::new_unchecked(
339                buffer![1i32, 2, 3, 4, 5].into_array(),
340                buffer![0i32, 3].into_array(),
341                buffer![3i32, 2].into_array(),
342                Validity::NonNullable,
343            ).with_zero_copy_to_list(true)
344        },
345        true,
346    )]
347    #[case::overlapping(
348        ListViewArray::new(
349            buffer![1i32, 2, 3].into_array(),
350            buffer![0i32, 0, 0].into_array(),
351            buffer![3i32, 3, 3].into_array(),
352            Validity::NonNullable,
353        ),
354        false,
355    )]
356    fn listview_compress_roundtrip(
357        #[case] input: ListViewArray,
358        #[case] expect_list: bool,
359    ) -> VortexResult<()> {
360        let array_ref = input.clone().into_array();
361        let result = BtrBlocksCompressor::default().compress(&array_ref)?;
362        if expect_list {
363            assert!(result.as_opt::<List>().is_some());
364        } else {
365            assert!(result.as_opt::<ListView>().is_some());
366        }
367        assert_arrays_eq!(result, input);
368        Ok(())
369    }
370}