Skip to main content

vortex_btrblocks/
canonical_compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Canonical array compression implementation.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::CanonicalValidity;
9use vortex_array::DynArray;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical;
13use vortex_array::VortexSessionExecute;
14use vortex_array::arrays::ConstantArray;
15use vortex_array::arrays::ExtensionArray;
16use vortex_array::arrays::FixedSizeListArray;
17use vortex_array::arrays::ListArray;
18use vortex_array::arrays::ListViewArray;
19use vortex_array::arrays::StructArray;
20use vortex_array::arrays::TemporalArray;
21use vortex_array::arrays::listview::list_from_list_view;
22use vortex_array::compute::Cost;
23use vortex_array::compute::IsConstantOpts;
24use vortex_array::compute::is_constant_opts;
25use vortex_array::dtype::DType;
26use vortex_array::dtype::Nullability;
27use vortex_array::extension::datetime::TemporalMetadata;
28use vortex_array::vtable::ValidityHelper;
29use vortex_error::VortexResult;
30
31use crate::BtrBlocksCompressorBuilder;
32use crate::CompressorContext;
33use crate::CompressorExt;
34use crate::Excludes;
35use crate::FloatCompressor;
36use crate::IntCode;
37use crate::IntCompressor;
38use crate::StringCompressor;
39use crate::compressor::decimal::compress_decimal;
40use crate::compressor::float::FloatScheme;
41use crate::compressor::integer::IntegerScheme;
42use crate::compressor::string::StringScheme;
43use crate::compressor::temporal::compress_temporal;
44
45/// Trait for compressors that can compress canonical arrays.
46///
47/// Provides access to configured compression schemes and the ability to
48/// compress canonical arrays recursively.
49pub trait CanonicalCompressor {
50    /// Compresses a canonical array with the specified options.
51    fn compress_canonical(
52        &self,
53        array: Canonical,
54        ctx: CompressorContext,
55        excludes: Excludes,
56    ) -> VortexResult<ArrayRef>;
57
58    /// Returns the enabled integer compression schemes.
59    fn int_schemes(&self) -> &[&'static dyn IntegerScheme];
60
61    /// Returns the enabled float compression schemes.
62    fn float_schemes(&self) -> &[&'static dyn FloatScheme];
63
64    /// Returns the enabled string compression schemes.
65    fn string_schemes(&self) -> &[&'static dyn StringScheme];
66}
67
68/// The main compressor type implementing BtrBlocks-inspired compression.
69///
70/// This compressor applies adaptive compression schemes to arrays based on their data types
71/// and characteristics. It recursively compresses nested structures like structs and lists,
72/// and chooses optimal compression schemes for primitive types.
73///
74/// The compressor works by:
75/// 1. Canonicalizing input arrays to a standard representation
76/// 2. Analyzing data characteristics to choose optimal compression schemes
77/// 3. Recursively compressing nested structures
78/// 4. Applying type-specific compression for primitives, strings, and temporal data
79///
80/// Use [`BtrBlocksCompressorBuilder`] to configure which compression schemes are enabled.
81///
82/// # Examples
83///
84/// ```rust
85/// use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, IntCode};
86///
87/// // Default compressor - all schemes allowed
88/// let compressor = BtrBlocksCompressor::default();
89///
90/// // Exclude specific schemes using the builder
91/// let compressor = BtrBlocksCompressorBuilder::default()
92///     .exclude_int([IntCode::Dict])
93///     .build();
94/// ```
95#[derive(Clone)]
96pub struct BtrBlocksCompressor {
97    /// Integer compressor with configured schemes.
98    pub int_schemes: Vec<&'static dyn IntegerScheme>,
99
100    /// Float compressor with configured schemes.
101    pub float_schemes: Vec<&'static dyn FloatScheme>,
102
103    /// String compressor with configured schemes.
104    pub string_schemes: Vec<&'static dyn StringScheme>,
105}
106
107impl Default for BtrBlocksCompressor {
108    fn default() -> Self {
109        BtrBlocksCompressorBuilder::default().build()
110    }
111}
112
113impl BtrBlocksCompressor {
114    /// Compresses an array using BtrBlocks-inspired compression.
115    ///
116    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
117    pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
118        // Canonicalize the array
119        // TODO(joe): receive `ctx` and use it.
120        let canonical = array
121            .clone()
122            .execute::<CanonicalValidity>(&mut LEGACY_SESSION.create_execution_ctx())?
123            .0;
124
125        // Compact it, removing any wasted space before we attempt to compress it
126        let compact = canonical.compact()?;
127
128        self.compress_canonical(compact, CompressorContext::default(), Excludes::none())
129    }
130
131    pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> {
132        IntCompressor {
133            btr_blocks_compressor: self,
134        }
135    }
136
137    pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> {
138        FloatCompressor {
139            btr_blocks_compressor: self,
140        }
141    }
142
143    pub(crate) fn string_compressor(&self) -> StringCompressor<'_> {
144        StringCompressor {
145            btr_blocks_compressor: self,
146        }
147    }
148
149    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
150    fn compress_list_array(
151        &self,
152        list_array: ListArray,
153        ctx: CompressorContext,
154    ) -> VortexResult<ArrayRef> {
155        // Reset the offsets to remove garbage data that might prevent us from narrowing our
156        // offsets (there could be a large amount of trailing garbage data that the current
157        // views do not reference at all).
158        let list_array = list_array.reset_offsets(true)?;
159
160        let compressed_elems = self.compress(list_array.elements())?;
161
162        // Note that since the type of our offsets are not encoded in our `DType`, and since
163        // we guarantee above that all elements are referenced by offsets, we may narrow the
164        // widths.
165        let compressed_offsets = self.compress_canonical(
166            Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
167            ctx,
168            Excludes::from(&[IntCode::Dict]),
169        )?;
170
171        Ok(ListArray::try_new(
172            compressed_elems,
173            compressed_offsets,
174            list_array.validity().clone(),
175        )?
176        .into_array())
177    }
178
179    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
180    /// elements.
181    fn compress_list_view_array(
182        &self,
183        list_view: ListViewArray,
184        ctx: CompressorContext,
185    ) -> VortexResult<ArrayRef> {
186        let compressed_elems = self.compress(list_view.elements())?;
187        let compressed_offsets = self.compress_canonical(
188            Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
189            ctx,
190            Excludes::none(),
191        )?;
192        let compressed_sizes = self.compress_canonical(
193            Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
194            ctx,
195            Excludes::none(),
196        )?;
197        Ok(ListViewArray::try_new(
198            compressed_elems,
199            compressed_offsets,
200            compressed_sizes,
201            list_view.validity().clone(),
202        )?
203        .into_array())
204    }
205}
206
207impl CanonicalCompressor for BtrBlocksCompressor {
208    /// Compresses a canonical array by dispatching to type-specific compressors.
209    ///
210    /// Recursively compresses nested structures and applies optimal schemes for each data type.
211    fn compress_canonical(
212        &self,
213        array: Canonical,
214        ctx: CompressorContext,
215        excludes: Excludes,
216    ) -> VortexResult<ArrayRef> {
217        match array {
218            Canonical::Null(null_array) => Ok(null_array.into_array()),
219            // TODO(aduffy): Sparse, other bool compressors.
220            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
221            Canonical::Primitive(primitive) => {
222                if primitive.ptype().is_int() {
223                    self.integer_compressor()
224                        .compress(self, &primitive, ctx, excludes.int)
225                } else {
226                    self.float_compressor()
227                        .compress(self, &primitive, ctx, excludes.float)
228                }
229            }
230            Canonical::Decimal(decimal) => compress_decimal(self, &decimal),
231            Canonical::Struct(struct_array) => {
232                let fields = struct_array
233                    .unmasked_fields()
234                    .iter()
235                    .map(|field| self.compress(field))
236                    .collect::<Result<Vec<_>, _>>()?;
237
238                Ok(StructArray::try_new(
239                    struct_array.names().clone(),
240                    fields,
241                    struct_array.len(),
242                    struct_array.validity().clone(),
243                )?
244                .into_array())
245            }
246            Canonical::List(list_view_array) => {
247                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
248                    // Offsets are already monotonic and non-overlapping, so we
249                    // can drop the sizes array and compress as a ListArray.
250                    let list_array = list_from_list_view(list_view_array)?;
251                    self.compress_list_array(list_array, ctx)
252                } else {
253                    self.compress_list_view_array(list_view_array, ctx)
254                }
255            }
256            Canonical::FixedSizeList(fsl_array) => {
257                let compressed_elems = self.compress(fsl_array.elements())?;
258
259                Ok(FixedSizeListArray::try_new(
260                    compressed_elems,
261                    fsl_array.list_size(),
262                    fsl_array.validity().clone(),
263                    fsl_array.len(),
264                )?
265                .into_array())
266            }
267            Canonical::VarBinView(strings) => {
268                if strings
269                    .dtype()
270                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
271                {
272                    self.string_compressor()
273                        .compress(self, &strings, ctx, excludes.string)
274                } else {
275                    // Binary arrays do not compress
276                    Ok(strings.into_array())
277                }
278            }
279            Canonical::Extension(ext_array) => {
280                // We compress Timestamp-level arrays with DateTimeParts compression
281                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.clone().into_array())
282                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
283                {
284                    if is_constant_opts(
285                        &ext_array.clone().into_array(),
286                        &IsConstantOpts {
287                            cost: Cost::Canonicalize,
288                        },
289                    )?
290                    .unwrap_or_default()
291                    {
292                        return Ok(ConstantArray::new(
293                            temporal_array.as_ref().scalar_at(0)?,
294                            ext_array.len(),
295                        )
296                        .into_array());
297                    }
298                    return compress_temporal(self, temporal_array);
299                }
300
301                // Compress the underlying storage array.
302                let compressed_storage = self.compress(ext_array.storage())?;
303
304                Ok(
305                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
306                        .into_array(),
307                )
308            }
309        }
310    }
311
312    fn int_schemes(&self) -> &[&'static dyn IntegerScheme] {
313        &self.int_schemes
314    }
315
316    fn float_schemes(&self) -> &[&'static dyn FloatScheme] {
317        &self.float_schemes
318    }
319
320    fn string_schemes(&self) -> &[&'static dyn StringScheme] {
321        &self.string_schemes
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use rstest::rstest;
328    use vortex_array::DynArray;
329    use vortex_array::IntoArray;
330    use vortex_array::arrays::ListVTable;
331    use vortex_array::arrays::ListViewArray;
332    use vortex_array::arrays::ListViewVTable;
333    use vortex_array::assert_arrays_eq;
334    use vortex_array::validity::Validity;
335    use vortex_buffer::buffer;
336    use vortex_error::VortexResult;
337
338    use crate::BtrBlocksCompressor;
339
340    #[rstest]
341    #[case::zctl(
342        unsafe {
343            ListViewArray::new_unchecked(
344                buffer![1i32, 2, 3, 4, 5].into_array(),
345                buffer![0i32, 3].into_array(),
346                buffer![3i32, 2].into_array(),
347                Validity::NonNullable,
348            ).with_zero_copy_to_list(true)
349        },
350        true,
351    )]
352    #[case::overlapping(
353        ListViewArray::new(
354            buffer![1i32, 2, 3].into_array(),
355            buffer![0i32, 0, 0].into_array(),
356            buffer![3i32, 3, 3].into_array(),
357            Validity::NonNullable,
358        ),
359        false,
360    )]
361    fn listview_compress_roundtrip(
362        #[case] input: ListViewArray,
363        #[case] expect_list: bool,
364    ) -> VortexResult<()> {
365        let array_ref = input.clone().into_array();
366        let result = BtrBlocksCompressor::default().compress(&array_ref)?;
367        if expect_list {
368            assert!(result.as_opt::<ListVTable>().is_some());
369        } else {
370            assert!(result.as_opt::<ListViewVTable>().is_some());
371        }
372        assert_arrays_eq!(result, input);
373        Ok(())
374    }
375}