Skip to main content

vortex_btrblocks/
canonical_compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Canonical array compression implementation.
5
6use vortex_array::Array;
7use vortex_array::ArrayRef;
8use vortex_array::Canonical;
9use vortex_array::IntoArray;
10use vortex_array::ToCanonical;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::ExtensionArray;
13use vortex_array::arrays::FixedSizeListArray;
14use vortex_array::arrays::ListArray;
15use vortex_array::arrays::ListViewArray;
16use vortex_array::arrays::StructArray;
17use vortex_array::arrays::TemporalArray;
18use vortex_array::arrays::list_from_list_view;
19use vortex_array::compute::Cost;
20use vortex_array::compute::IsConstantOpts;
21use vortex_array::compute::is_constant_opts;
22use vortex_array::vtable::ValidityHelper;
23use vortex_dtype::DType;
24use vortex_dtype::Nullability;
25use vortex_dtype::datetime::TemporalMetadata;
26use vortex_error::VortexResult;
27
28use crate::BtrBlocksCompressorBuilder;
29use crate::CompressorContext;
30use crate::CompressorExt;
31use crate::Excludes;
32use crate::FloatCompressor;
33use crate::IntCode;
34use crate::IntCompressor;
35use crate::StringCompressor;
36use crate::compressor::decimal::compress_decimal;
37use crate::compressor::float::FloatScheme;
38use crate::compressor::integer::IntegerScheme;
39use crate::compressor::string::StringScheme;
40use crate::compressor::temporal::compress_temporal;
41
42/// Trait for compressors that can compress canonical arrays.
43///
44/// Provides access to configured compression schemes and the ability to
45/// compress canonical arrays recursively.
46pub trait CanonicalCompressor {
47    /// Compresses a canonical array with the specified options.
48    fn compress_canonical(
49        &self,
50        array: Canonical,
51        ctx: CompressorContext,
52        excludes: Excludes,
53    ) -> VortexResult<ArrayRef>;
54
55    /// Returns the enabled integer compression schemes.
56    fn int_schemes(&self) -> &[&'static dyn IntegerScheme];
57
58    /// Returns the enabled float compression schemes.
59    fn float_schemes(&self) -> &[&'static dyn FloatScheme];
60
61    /// Returns the enabled string compression schemes.
62    fn string_schemes(&self) -> &[&'static dyn StringScheme];
63}
64
65/// The main compressor type implementing BtrBlocks-inspired compression.
66///
67/// This compressor applies adaptive compression schemes to arrays based on their data types
68/// and characteristics. It recursively compresses nested structures like structs and lists,
69/// and chooses optimal compression schemes for primitive types.
70///
71/// The compressor works by:
72/// 1. Canonicalizing input arrays to a standard representation
73/// 2. Analyzing data characteristics to choose optimal compression schemes
74/// 3. Recursively compressing nested structures
75/// 4. Applying type-specific compression for primitives, strings, and temporal data
76///
77/// Use [`BtrBlocksCompressorBuilder`] to configure which compression schemes are enabled.
78///
79/// # Examples
80///
81/// ```rust
82/// use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, IntCode};
83///
84/// // Default compressor - all schemes allowed
85/// let compressor = BtrBlocksCompressor::default();
86///
87/// // Exclude specific schemes using the builder
88/// let compressor = BtrBlocksCompressorBuilder::default()
89///     .exclude_int([IntCode::Dict])
90///     .build();
91/// ```
92#[derive(Clone)]
93pub struct BtrBlocksCompressor {
94    /// Integer compressor with configured schemes.
95    pub int_schemes: Vec<&'static dyn IntegerScheme>,
96
97    /// Float compressor with configured schemes.
98    pub float_schemes: Vec<&'static dyn FloatScheme>,
99
100    /// String compressor with configured schemes.
101    pub string_schemes: Vec<&'static dyn StringScheme>,
102}
103
104impl Default for BtrBlocksCompressor {
105    fn default() -> Self {
106        BtrBlocksCompressorBuilder::default().build()
107    }
108}
109
110impl BtrBlocksCompressor {
111    /// Compresses an array using BtrBlocks-inspired compression.
112    ///
113    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
114    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
115        // Canonicalize the array
116        let canonical = array.to_canonical()?;
117
118        // Compact it, removing any wasted space before we attempt to compress it
119        let compact = canonical.compact()?;
120
121        self.compress_canonical(compact, CompressorContext::default(), Excludes::none())
122    }
123
124    pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> {
125        IntCompressor {
126            btr_blocks_compressor: self,
127        }
128    }
129
130    pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> {
131        FloatCompressor {
132            btr_blocks_compressor: self,
133        }
134    }
135
136    pub(crate) fn string_compressor(&self) -> StringCompressor<'_> {
137        StringCompressor {
138            btr_blocks_compressor: self,
139        }
140    }
141
142    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
143    fn compress_list_array(
144        &self,
145        list_array: ListArray,
146        ctx: CompressorContext,
147    ) -> VortexResult<ArrayRef> {
148        // Reset the offsets to remove garbage data that might prevent us from narrowing our
149        // offsets (there could be a large amount of trailing garbage data that the current
150        // views do not reference at all).
151        let list_array = list_array.reset_offsets(true)?;
152
153        let compressed_elems = self.compress(list_array.elements())?;
154
155        // Note that since the type of our offsets are not encoded in our `DType`, and since
156        // we guarantee above that all elements are referenced by offsets, we may narrow the
157        // widths.
158        let compressed_offsets = self.compress_canonical(
159            Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
160            ctx,
161            Excludes::from(&[IntCode::Dict]),
162        )?;
163
164        Ok(ListArray::try_new(
165            compressed_elems,
166            compressed_offsets,
167            list_array.validity().clone(),
168        )?
169        .into_array())
170    }
171
172    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
173    /// elements.
174    fn compress_list_view_array(
175        &self,
176        list_view: ListViewArray,
177        ctx: CompressorContext,
178    ) -> VortexResult<ArrayRef> {
179        let compressed_elems = self.compress(list_view.elements())?;
180        let compressed_offsets = self.compress_canonical(
181            Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
182            ctx,
183            Excludes::none(),
184        )?;
185        let compressed_sizes = self.compress_canonical(
186            Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
187            ctx,
188            Excludes::none(),
189        )?;
190        Ok(ListViewArray::try_new(
191            compressed_elems,
192            compressed_offsets,
193            compressed_sizes,
194            list_view.validity().clone(),
195        )?
196        .into_array())
197    }
198}
199
200impl CanonicalCompressor for BtrBlocksCompressor {
201    /// Compresses a canonical array by dispatching to type-specific compressors.
202    ///
203    /// Recursively compresses nested structures and applies optimal schemes for each data type.
204    fn compress_canonical(
205        &self,
206        array: Canonical,
207        ctx: CompressorContext,
208        excludes: Excludes,
209    ) -> VortexResult<ArrayRef> {
210        match array {
211            Canonical::Null(null_array) => Ok(null_array.into_array()),
212            // TODO(aduffy): Sparse, other bool compressors.
213            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
214            Canonical::Primitive(primitive) => {
215                if primitive.ptype().is_int() {
216                    self.integer_compressor()
217                        .compress(self, &primitive, ctx, excludes.int)
218                } else {
219                    self.float_compressor()
220                        .compress(self, &primitive, ctx, excludes.float)
221                }
222            }
223            Canonical::Decimal(decimal) => compress_decimal(self, &decimal),
224            Canonical::Struct(struct_array) => {
225                let fields = struct_array
226                    .unmasked_fields()
227                    .iter()
228                    .map(|field| self.compress(field))
229                    .collect::<Result<Vec<_>, _>>()?;
230
231                Ok(StructArray::try_new(
232                    struct_array.names().clone(),
233                    fields,
234                    struct_array.len(),
235                    struct_array.validity().clone(),
236                )?
237                .into_array())
238            }
239            Canonical::List(list_view_array) => {
240                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
241                    // Offsets are already monotonic and non-overlapping, so we
242                    // can drop the sizes array and compress as a ListArray.
243                    let list_array = list_from_list_view(list_view_array)?;
244                    self.compress_list_array(list_array, ctx)
245                } else {
246                    self.compress_list_view_array(list_view_array, ctx)
247                }
248            }
249            Canonical::FixedSizeList(fsl_array) => {
250                let compressed_elems = self.compress(fsl_array.elements())?;
251
252                Ok(FixedSizeListArray::try_new(
253                    compressed_elems,
254                    fsl_array.list_size(),
255                    fsl_array.validity().clone(),
256                    fsl_array.len(),
257                )?
258                .into_array())
259            }
260            Canonical::VarBinView(strings) => {
261                if strings
262                    .dtype()
263                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
264                {
265                    self.string_compressor()
266                        .compress(self, &strings, ctx, excludes.string)
267                } else {
268                    // Binary arrays do not compress
269                    Ok(strings.into_array())
270                }
271            }
272            Canonical::Extension(ext_array) => {
273                // We compress Timestamp-level arrays with DateTimeParts compression
274                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
275                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
276                {
277                    if is_constant_opts(
278                        temporal_array.as_ref(),
279                        &IsConstantOpts {
280                            cost: Cost::Canonicalize,
281                        },
282                    )?
283                    .unwrap_or_default()
284                    {
285                        return Ok(ConstantArray::new(
286                            temporal_array.as_ref().scalar_at(0)?,
287                            ext_array.len(),
288                        )
289                        .into_array());
290                    }
291                    return compress_temporal(self, temporal_array);
292                }
293
294                // Compress the underlying storage array.
295                let compressed_storage = self.compress(ext_array.storage())?;
296
297                Ok(
298                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
299                        .into_array(),
300                )
301            }
302        }
303    }
304
305    fn int_schemes(&self) -> &[&'static dyn IntegerScheme] {
306        &self.int_schemes
307    }
308
309    fn float_schemes(&self) -> &[&'static dyn FloatScheme] {
310        &self.float_schemes
311    }
312
313    fn string_schemes(&self) -> &[&'static dyn StringScheme] {
314        &self.string_schemes
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use rstest::rstest;
321    use vortex_array::Array;
322    use vortex_array::IntoArray;
323    use vortex_array::arrays::ListVTable;
324    use vortex_array::arrays::ListViewArray;
325    use vortex_array::arrays::ListViewVTable;
326    use vortex_array::assert_arrays_eq;
327    use vortex_array::validity::Validity;
328    use vortex_buffer::buffer;
329    use vortex_error::VortexResult;
330
331    use crate::BtrBlocksCompressor;
332
333    #[rstest]
334    #[case::zctl(
335        unsafe {
336            ListViewArray::new_unchecked(
337                buffer![1i32, 2, 3, 4, 5].into_array(),
338                buffer![0i32, 3].into_array(),
339                buffer![3i32, 2].into_array(),
340                Validity::NonNullable,
341            ).with_zero_copy_to_list(true)
342        },
343        true,
344    )]
345    #[case::overlapping(
346        ListViewArray::new(
347            buffer![1i32, 2, 3].into_array(),
348            buffer![0i32, 0, 0].into_array(),
349            buffer![3i32, 3, 3].into_array(),
350            Validity::NonNullable,
351        ),
352        false,
353    )]
354    fn listview_compress_roundtrip(
355        #[case] input: ListViewArray,
356        #[case] expect_list: bool,
357    ) -> VortexResult<()> {
358        let result = BtrBlocksCompressor::default().compress(input.as_ref())?;
359        if expect_list {
360            assert!(result.as_opt::<ListVTable>().is_some());
361        } else {
362            assert!(result.as_opt::<ListViewVTable>().is_some());
363        }
364        assert_arrays_eq!(result, input);
365        Ok(())
366    }
367}