vortex_btrblocks/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex's [BtrBlocks]-inspired adaptive compression framework.
7//!
8//! This crate provides a sophisticated multi-level compression system that adaptively selects
9//! optimal compression schemes based on data characteristics. The compressor analyzes arrays
10//! to determine the best encoding strategy, supporting cascaded compression with multiple
11//! encoding layers for maximum efficiency.
12//!
13//! # Key Features
14//!
15//! - **Adaptive Compression**: Automatically selects the best compression scheme based on data patterns
16//! - **Type-Specific Compressors**: Specialized compression for integers, floats, strings, and temporal data
17//! - **Cascaded Encoding**: Multiple compression layers can be applied for optimal results
18//! - **Statistical Analysis**: Uses data sampling and statistics to predict compression ratios
19//! - **Recursive Structure Handling**: Compresses nested structures like structs and lists
20//!
21//! # Example
22//!
23//! ```rust
24//! use vortex_btrblocks::BtrBlocksCompressor;
25//! use vortex_array::Array;
26//!
27//! let compressor = BtrBlocksCompressor::default();
28//! // let compressed = compressor.compress(&array)?;
29//! ```
30//!
31//! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf
32
33use std::fmt::Debug;
34use std::hash::Hash;
35
36use vortex_array::Array;
37use vortex_array::ArrayRef;
38use vortex_array::Canonical;
39use vortex_array::IntoArray;
40use vortex_array::ToCanonical;
41use vortex_array::arrays::ConstantArray;
42use vortex_array::arrays::ExtensionArray;
43use vortex_array::arrays::FixedSizeListArray;
44use vortex_array::arrays::ListArray;
45use vortex_array::arrays::StructArray;
46use vortex_array::arrays::TemporalArray;
47use vortex_array::arrays::list_from_list_view;
48use vortex_array::compute::Cost;
49use vortex_array::vtable::VTable;
50use vortex_array::vtable::ValidityHelper;
51use vortex_dtype::DType;
52use vortex_dtype::Nullability;
53use vortex_dtype::datetime::TemporalMetadata;
54use vortex_error::VortexExpect;
55use vortex_error::VortexResult;
56
57use crate::decimal::compress_decimal;
58pub use crate::float::FloatCompressor;
59pub use crate::float::FloatStats;
60pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
61pub use crate::integer::IntCompressor;
62pub use crate::integer::IntegerStats;
63pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
64pub use crate::string::StringCompressor;
65pub use crate::string::StringStats;
66pub use crate::temporal::compress_temporal;
67
68mod decimal;
69mod float;
70mod integer;
71mod patches;
72mod rle;
73mod sample;
74mod string;
75mod temporal;
76
77/// Configures how stats are generated.
78pub struct GenerateStatsOptions {
79    /// Should distinct values should be counted during stats generation.
80    pub count_distinct_values: bool,
81    // pub count_runs: bool,
82    // should this be scheme-specific?
83}
84
85impl Default for GenerateStatsOptions {
86    fn default() -> Self {
87        Self {
88            count_distinct_values: true,
89            // count_runs: true,
90        }
91    }
92}
93
94const SAMPLE_SIZE: u32 = 64;
95
96/// Stats for the compressor.
97pub trait CompressorStats: Debug + Clone {
98    /// The type of the underlying source array vtable.
99    type ArrayVTable: VTable;
100
101    /// Generates stats with default options.
102    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
103        Self::generate_opts(input, GenerateStatsOptions::default())
104    }
105
106    /// Generates stats with provided options.
107    fn generate_opts(
108        input: &<Self::ArrayVTable as VTable>::Array,
109        opts: GenerateStatsOptions,
110    ) -> Self;
111
112    /// Returns the underlying source array that statistics were generated from.
113    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
114
115    /// Sample the array with default options.
116    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
117        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
118    }
119
120    /// Sample the array with provided options.
121    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
122}
123
124/// Top-level compression scheme trait.
125///
126/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
127pub trait Scheme: Debug {
128    /// Type of the stats generated by the compression scheme.
129    type StatsType: CompressorStats;
130    /// Type of the code used to uniquely identify the compression scheme.
131    type CodeType: Copy + Eq + Hash;
132
133    /// Scheme unique identifier.
134    fn code(&self) -> Self::CodeType;
135
136    /// True if this is the singular Constant scheme for this data type.
137    fn is_constant(&self) -> bool {
138        false
139    }
140
141    /// Estimate the compression ratio for running this scheme (and its children)
142    /// for the given input.
143    ///
144    /// Depth is the depth in the encoding tree we've already reached before considering this
145    /// scheme.
146    ///
147    /// Returns the estimated compression ratio as well as the tree of compressors to use.
148    fn expected_compression_ratio(
149        &self,
150        stats: &Self::StatsType,
151        is_sample: bool,
152        allowed_cascading: usize,
153        excludes: &[Self::CodeType],
154    ) -> VortexResult<f64> {
155        estimate_compression_ratio_with_sampling(
156            self,
157            stats,
158            is_sample,
159            allowed_cascading,
160            excludes,
161        )
162    }
163
164    /// Compress the input with this scheme, yielding a new array.
165    fn compress(
166        &self,
167        stats: &Self::StatsType,
168        is_sample: bool,
169        allowed_cascading: usize,
170        excludes: &[Self::CodeType],
171    ) -> VortexResult<ArrayRef>;
172}
173
174fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
175    compressor: &T,
176    stats: &T::StatsType,
177    is_sample: bool,
178    allowed_cascading: usize,
179    excludes: &[T::CodeType],
180) -> VortexResult<f64> {
181    let sample = if is_sample {
182        stats.clone()
183    } else {
184        // We want to sample about 1% of data
185        let source_len = stats.source().len();
186
187        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
188        let sample_count = usize::max(
189            (source_len / 100)
190                / usize::try_from(SAMPLE_SIZE).vortex_expect("SAMPLE_SIZE must fit in usize"),
191            10,
192        );
193
194        tracing::trace!(
195            "Sampling {} values out of {}",
196            SAMPLE_SIZE as usize * sample_count,
197            source_len
198        );
199
200        stats.sample(
201            SAMPLE_SIZE,
202            sample_count
203                .try_into()
204                .vortex_expect("sample count must fit in u32"),
205        )
206    };
207
208    let after = compressor
209        .compress(&sample, true, allowed_cascading, excludes)?
210        .nbytes();
211    let before = sample.source().nbytes();
212
213    tracing::debug!(
214        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
215        before as f64 / after as f64
216    );
217
218    Ok(before as f64 / after as f64)
219}
220
221const MAX_CASCADE: usize = 3;
222
223/// A compressor for a particular input type.
224///
225/// This trait defines the interface for type-specific compressors that can adaptively
226/// choose and apply compression schemes based on data characteristics. Compressors
227/// analyze input arrays, select optimal compression schemes, and handle cascading
228/// compression with multiple encoding layers.
229///
230/// The compressor works by generating statistics on the input data, evaluating
231/// available compression schemes, and selecting the one with the best compression ratio.
232pub trait Compressor {
233    /// The VTable type for arrays this compressor operates on.
234    type ArrayVTable: VTable;
235    /// The compression scheme type used by this compressor.
236    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
237    /// The statistics type used to analyze arrays for compression.
238    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
239
240    /// Returns all available compression schemes for this compressor.
241    fn schemes() -> &'static [&'static Self::SchemeType];
242    /// Returns the default fallback compression scheme.
243    fn default_scheme() -> &'static Self::SchemeType;
244    /// Returns the scheme code for dictionary compression.
245    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
246
247    /// Compresses an array using the optimal compression scheme.
248    ///
249    /// Generates statistics on the input array, selects the best compression scheme,
250    /// and applies it. Returns the original array if compression would increase size.
251    fn compress(
252        array: &<Self::ArrayVTable as VTable>::Array,
253        is_sample: bool,
254        allowed_cascading: usize,
255        excludes: &[<Self::SchemeType as Scheme>::CodeType],
256    ) -> VortexResult<ArrayRef>
257    where
258        Self::SchemeType: 'static,
259    {
260        // Avoid compressing empty arrays.
261        if array.is_empty() {
262            return Ok(array.to_array());
263        }
264
265        // Generate stats on the array directly.
266        let stats = if excludes.contains(&Self::dict_scheme_code()) {
267            Self::StatsType::generate_opts(
268                array,
269                GenerateStatsOptions {
270                    count_distinct_values: false,
271                },
272            )
273        } else {
274            Self::StatsType::generate(array)
275        };
276        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
277
278        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
279        if output.nbytes() < array.nbytes() {
280            Ok(output)
281        } else {
282            tracing::debug!("resulting tree too large: {}", output.display_tree());
283            Ok(array.to_array())
284        }
285    }
286
287    /// Selects the best compression scheme based on expected compression ratios.
288    ///
289    /// Evaluates all available schemes against the provided statistics and returns
290    /// the one with the highest compression ratio. Falls back to the default scheme
291    /// if no scheme provides compression benefits.
292    #[allow(clippy::cognitive_complexity)]
293    fn choose_scheme(
294        stats: &Self::StatsType,
295        is_sample: bool,
296        allowed_cascading: usize,
297        excludes: &[<Self::SchemeType as Scheme>::CodeType],
298    ) -> VortexResult<&'static Self::SchemeType> {
299        let mut best_ratio = 1.0;
300        let mut best_scheme: Option<&'static Self::SchemeType> = None;
301
302        // logging helpers
303        let depth = MAX_CASCADE - allowed_cascading;
304
305        for scheme in Self::schemes().iter() {
306            if excludes.contains(&scheme.code()) {
307                continue;
308            }
309
310            // We never choose Constant for a sample
311            if is_sample && scheme.is_constant() {
312                continue;
313            }
314
315            tracing::trace!(is_sample, depth, ?scheme, "Trying compression scheme");
316
317            let ratio =
318                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
319            tracing::trace!(
320                is_sample,
321                depth,
322                ratio,
323                ?scheme,
324                "Expected compression result"
325            );
326
327            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
328                if ratio > best_ratio {
329                    best_ratio = ratio;
330                    best_scheme = Some(*scheme);
331                }
332            } else {
333                tracing::trace!(
334                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
335                );
336            }
337        }
338
339        tracing::trace!(depth, scheme = ?best_scheme, ratio = best_ratio, "best scheme found");
340
341        if let Some(best) = best_scheme {
342            Ok(best)
343        } else {
344            Ok(Self::default_scheme())
345        }
346    }
347}
348
349/// The main compressor type implementing BtrBlocks-inspired compression.
350///
351/// This compressor applies adaptive compression schemes to arrays based on their data types
352/// and characteristics. It recursively compresses nested structures like structs and lists,
353/// and chooses optimal compression schemes for primitive types.
354///
355/// The compressor works by:
356/// 1. Canonicalizing input arrays to a standard representation
357/// 2. Analyzing data characteristics to choose optimal compression schemes
358/// 3. Recursively compressing nested structures
359/// 4. Applying type-specific compression for primitives, strings, and temporal data
360///
361/// # Examples
362///
363/// ```rust
364/// use vortex_btrblocks::BtrBlocksCompressor;
365/// use vortex_array::Array;
366///
367/// let compressor = BtrBlocksCompressor::default();
368/// // let compressed = compressor.compress(&array)?;
369/// ```
370#[derive(Default, Debug, Clone)]
371pub struct BtrBlocksCompressor {
372    /// Whether to exclude ints from dictionary encoding.
373    ///
374    /// When `true`, integer arrays will not use dictionary compression schemes,
375    /// which can be useful when the data has high cardinality or when dictionary
376    /// overhead would exceed compression benefits.
377    pub exclude_int_dict_encoding: bool,
378}
379
380impl BtrBlocksCompressor {
381    /// Compresses an array using BtrBlocks-inspired compression.
382    ///
383    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
384    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
385        // Canonicalize the array
386        let canonical = array.to_canonical();
387
388        // Compact it, removing any wasted space before we attempt to compress it
389        let compact = canonical.compact()?;
390
391        self.compress_canonical(compact)
392    }
393
394    /// Compresses a canonical array by dispatching to type-specific compressors.
395    ///
396    /// Recursively compresses nested structures and applies optimal schemes for each data type.
397    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
398        match array {
399            Canonical::Null(null_array) => Ok(null_array.into_array()),
400            // TODO(aduffy): Sparse, other bool compressors.
401            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
402            Canonical::Primitive(primitive) => {
403                if primitive.ptype().is_int() {
404                    if self.exclude_int_dict_encoding {
405                        IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
406                    } else {
407                        IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
408                    }
409                } else {
410                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
411                }
412            }
413            Canonical::Decimal(decimal) => compress_decimal(&decimal),
414            Canonical::Struct(struct_array) => {
415                let fields = struct_array
416                    .fields()
417                    .iter()
418                    .map(|field| self.compress(field))
419                    .collect::<Result<Vec<_>, _>>()?;
420
421                Ok(StructArray::try_new(
422                    struct_array.names().clone(),
423                    fields,
424                    struct_array.len(),
425                    struct_array.validity().clone(),
426                )?
427                .into_array())
428            }
429            Canonical::List(list_view_array) => {
430                // TODO(joe): We might want to write list views in the future and chose between
431                // list and list view.
432                let list_array = list_from_list_view(list_view_array);
433
434                // Reset the offsets to remove garbage data that might prevent us from narrowing our
435                // offsets (there could be a large amount of trailing garbage data that the current
436                // views do not reference at all).
437                let list_array = list_array.reset_offsets(true)?;
438
439                let compressed_elems = self.compress(list_array.elements())?;
440
441                // Note that since the type of our offsets are not encoded in our `DType`, and since
442                // we guarantee above that all elements are referenced by offsets, we may narrow the
443                // widths.
444
445                let compressed_offsets = IntCompressor::compress_no_dict(
446                    &list_array.offsets().to_primitive().narrow()?,
447                    false,
448                    MAX_CASCADE,
449                    &[],
450                )?;
451
452                Ok(ListArray::try_new(
453                    compressed_elems,
454                    compressed_offsets,
455                    list_array.validity().clone(),
456                )?
457                .into_array())
458            }
459            Canonical::FixedSizeList(fsl_array) => {
460                let compressed_elems = self.compress(fsl_array.elements())?;
461
462                Ok(FixedSizeListArray::try_new(
463                    compressed_elems,
464                    fsl_array.list_size(),
465                    fsl_array.validity().clone(),
466                    fsl_array.len(),
467                )?
468                .into_array())
469            }
470            Canonical::VarBinView(strings) => {
471                if strings
472                    .dtype()
473                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
474                {
475                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
476                } else {
477                    // Binary arrays do not compress
478                    Ok(strings.into_array())
479                }
480            }
481            Canonical::Extension(ext_array) => {
482                // We compress Timestamp-level arrays with DateTimeParts compression
483                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
484                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
485                {
486                    if temporal_array.as_ref().is_constant_opts(Cost::Canonicalize) {
487                        return Ok(ConstantArray::new(
488                            temporal_array.as_ref().scalar_at(0),
489                            ext_array.len(),
490                        )
491                        .into_array());
492                    }
493                    return compress_temporal(temporal_array);
494                }
495
496                // Compress the underlying storage array.
497                let compressed_storage = self.compress(ext_array.storage())?;
498
499                Ok(
500                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
501                        .into_array(),
502                )
503            }
504        }
505    }
506}