vortex_btrblocks/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex's [BtrBlocks]-inspired adaptive compression framework.
7//!
8//! This crate provides a sophisticated multi-level compression system that adaptively selects
9//! optimal compression schemes based on data characteristics. The compressor analyzes arrays
10//! to determine the best encoding strategy, supporting cascaded compression with multiple
11//! encoding layers for maximum efficiency.
12//!
13//! # Key Features
14//!
15//! - **Adaptive Compression**: Automatically selects the best compression scheme based on data patterns
16//! - **Type-Specific Compressors**: Specialized compression for integers, floats, strings, and temporal data
17//! - **Cascaded Encoding**: Multiple compression layers can be applied for optimal results
18//! - **Statistical Analysis**: Uses data sampling and statistics to predict compression ratios
19//! - **Recursive Structure Handling**: Compresses nested structures like structs and lists
20//!
21//! # Example
22//!
23//! ```rust
24//! use vortex_btrblocks::BtrBlocksCompressor;
25//! use vortex_array::Array;
26//!
27//! let compressor = BtrBlocksCompressor::default();
28//! // let compressed = compressor.compress(&array)?;
29//! ```
30//!
31//! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf
32
33use std::fmt::Debug;
34use std::hash::Hash;
35
36use vortex_array::arrays::{
37    ExtensionArray, FixedSizeListArray, ListArray, StructArray, TemporalArray, list_from_list_view,
38};
39use vortex_array::vtable::{VTable, ValidityHelper};
40use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
41use vortex_dtype::datetime::TemporalMetadata;
42use vortex_dtype::{DType, Nullability};
43use vortex_error::{VortexResult, VortexUnwrap};
44
45use crate::decimal::compress_decimal;
46pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
47pub use crate::float::{FloatCompressor, FloatStats};
48pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
49pub use crate::integer::{IntCompressor, IntegerStats};
50pub use crate::string::{StringCompressor, StringStats};
51pub use crate::temporal::compress_temporal;
52
53mod decimal;
54mod float;
55mod integer;
56mod patches;
57mod rle;
58mod sample;
59mod string;
60mod temporal;
61
62/// Configures how stats are generated.
63pub struct GenerateStatsOptions {
64    /// Should distinct values should be counted during stats generation.
65    pub count_distinct_values: bool,
66    // pub count_runs: bool,
67    // should this be scheme-specific?
68}
69
70impl Default for GenerateStatsOptions {
71    fn default() -> Self {
72        Self {
73            count_distinct_values: true,
74            // count_runs: true,
75        }
76    }
77}
78
79const SAMPLE_SIZE: u32 = 64;
80
81/// Stats for the compressor.
82pub trait CompressorStats: Debug + Clone {
83    /// The type of the underlying source array vtable.
84    type ArrayVTable: VTable;
85
86    /// Generates stats with default options.
87    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
88        Self::generate_opts(input, GenerateStatsOptions::default())
89    }
90
91    /// Generates stats with provided options.
92    fn generate_opts(
93        input: &<Self::ArrayVTable as VTable>::Array,
94        opts: GenerateStatsOptions,
95    ) -> Self;
96
97    /// Returns the underlying source array that statistics were generated from.
98    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
99
100    /// Sample the array with default options.
101    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
102        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
103    }
104
105    /// Sample the array with provided options.
106    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
107}
108
109/// Top-level compression scheme trait.
110///
111/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
112pub trait Scheme: Debug {
113    /// Type of the stats generated by the compression scheme.
114    type StatsType: CompressorStats;
115    /// Type of the code used to uniquely identify the compression scheme.
116    type CodeType: Copy + Eq + Hash;
117
118    /// Scheme unique identifier.
119    fn code(&self) -> Self::CodeType;
120
121    /// True if this is the singular Constant scheme for this data type.
122    fn is_constant(&self) -> bool {
123        false
124    }
125
126    /// Estimate the compression ratio for running this scheme (and its children)
127    /// for the given input.
128    ///
129    /// Depth is the depth in the encoding tree we've already reached before considering this
130    /// scheme.
131    ///
132    /// Returns the estimated compression ratio as well as the tree of compressors to use.
133    fn expected_compression_ratio(
134        &self,
135        stats: &Self::StatsType,
136        is_sample: bool,
137        allowed_cascading: usize,
138        excludes: &[Self::CodeType],
139    ) -> VortexResult<f64> {
140        estimate_compression_ratio_with_sampling(
141            self,
142            stats,
143            is_sample,
144            allowed_cascading,
145            excludes,
146        )
147    }
148
149    /// Compress the input with this scheme, yielding a new array.
150    fn compress(
151        &self,
152        stats: &Self::StatsType,
153        is_sample: bool,
154        allowed_cascading: usize,
155        excludes: &[Self::CodeType],
156    ) -> VortexResult<ArrayRef>;
157}
158
159fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
160    compressor: &T,
161    stats: &T::StatsType,
162    is_sample: bool,
163    allowed_cascading: usize,
164    excludes: &[T::CodeType],
165) -> VortexResult<f64> {
166    let sample = if is_sample {
167        stats.clone()
168    } else {
169        // We want to sample about 1% of data
170        let source_len = stats.source().len();
171
172        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
173        let sample_count = usize::max(
174            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
175            10,
176        );
177
178        log::trace!(
179            "Sampling {} values out of {}",
180            SAMPLE_SIZE as usize * sample_count,
181            source_len
182        );
183
184        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
185    };
186
187    let after = compressor
188        .compress(&sample, true, allowed_cascading, excludes)?
189        .nbytes();
190    let before = sample.source().nbytes();
191
192    log::debug!(
193        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
194        before as f64 / after as f64
195    );
196
197    Ok(before as f64 / after as f64)
198}
199
200const MAX_CASCADE: usize = 3;
201
202/// A compressor for a particular input type.
203///
204/// This trait defines the interface for type-specific compressors that can adaptively
205/// choose and apply compression schemes based on data characteristics. Compressors
206/// analyze input arrays, select optimal compression schemes, and handle cascading
207/// compression with multiple encoding layers.
208///
209/// The compressor works by generating statistics on the input data, evaluating
210/// available compression schemes, and selecting the one with the best compression ratio.
211pub trait Compressor {
212    /// The VTable type for arrays this compressor operates on.
213    type ArrayVTable: VTable;
214    /// The compression scheme type used by this compressor.
215    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
216    /// The statistics type used to analyze arrays for compression.
217    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
218
219    /// Returns all available compression schemes for this compressor.
220    fn schemes() -> &'static [&'static Self::SchemeType];
221    /// Returns the default fallback compression scheme.
222    fn default_scheme() -> &'static Self::SchemeType;
223    /// Returns the scheme code for dictionary compression.
224    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
225
226    /// Compresses an array using the optimal compression scheme.
227    ///
228    /// Generates statistics on the input array, selects the best compression scheme,
229    /// and applies it. Returns the original array if compression would increase size.
230    fn compress(
231        array: &<Self::ArrayVTable as VTable>::Array,
232        is_sample: bool,
233        allowed_cascading: usize,
234        excludes: &[<Self::SchemeType as Scheme>::CodeType],
235    ) -> VortexResult<ArrayRef>
236    where
237        Self::SchemeType: 'static,
238    {
239        // Avoid compressing empty arrays.
240        if array.is_empty() {
241            return Ok(array.to_array());
242        }
243
244        // Generate stats on the array directly.
245        let stats = if excludes.contains(&Self::dict_scheme_code()) {
246            Self::StatsType::generate_opts(
247                array,
248                GenerateStatsOptions {
249                    count_distinct_values: false,
250                },
251            )
252        } else {
253            Self::StatsType::generate(array)
254        };
255        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
256
257        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
258        if output.nbytes() < array.nbytes() {
259            Ok(output)
260        } else {
261            log::debug!("resulting tree too large: {}", output.display_tree());
262            Ok(array.to_array())
263        }
264    }
265
266    /// Selects the best compression scheme based on expected compression ratios.
267    ///
268    /// Evaluates all available schemes against the provided statistics and returns
269    /// the one with the highest compression ratio. Falls back to the default scheme
270    /// if no scheme provides compression benefits.
271    fn choose_scheme(
272        stats: &Self::StatsType,
273        is_sample: bool,
274        allowed_cascading: usize,
275        excludes: &[<Self::SchemeType as Scheme>::CodeType],
276    ) -> VortexResult<&'static Self::SchemeType> {
277        let mut best_ratio = 1.0;
278        let mut best_scheme: Option<&'static Self::SchemeType> = None;
279
280        // logging helpers
281        let depth = MAX_CASCADE - allowed_cascading;
282
283        for scheme in Self::schemes().iter() {
284            if excludes.contains(&scheme.code()) {
285                continue;
286            }
287
288            // We never choose Constant for a sample
289            if is_sample && scheme.is_constant() {
290                continue;
291            }
292
293            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
294
295            let ratio =
296                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
297            log::trace!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
298
299            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
300                if ratio > best_ratio {
301                    best_ratio = ratio;
302                    best_scheme = Some(*scheme);
303                }
304            } else {
305                log::trace!(
306                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
307                );
308            }
309        }
310
311        log::trace!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
312
313        if let Some(best) = best_scheme {
314            Ok(best)
315        } else {
316            Ok(Self::default_scheme())
317        }
318    }
319}
320
321/// The main compressor type implementing BtrBlocks-inspired compression.
322///
323/// This compressor applies adaptive compression schemes to arrays based on their data types
324/// and characteristics. It recursively compresses nested structures like structs and lists,
325/// and chooses optimal compression schemes for primitive types.
326///
327/// The compressor works by:
328/// 1. Canonicalizing input arrays to a standard representation
329/// 2. Analyzing data characteristics to choose optimal compression schemes
330/// 3. Recursively compressing nested structures
331/// 4. Applying type-specific compression for primitives, strings, and temporal data
332///
333/// # Examples
334///
335/// ```rust
336/// use vortex_btrblocks::BtrBlocksCompressor;
337/// use vortex_array::Array;
338///
339/// let compressor = BtrBlocksCompressor::default();
340/// // let compressed = compressor.compress(&array)?;
341/// ```
342#[derive(Default, Debug, Clone)]
343pub struct BtrBlocksCompressor {
344    /// Whether to exclude ints from dictionary encoding.
345    ///
346    /// When `true`, integer arrays will not use dictionary compression schemes,
347    /// which can be useful when the data has high cardinality or when dictionary
348    /// overhead would exceed compression benefits.
349    pub exclude_int_dict_encoding: bool,
350}
351
352impl BtrBlocksCompressor {
353    /// Compresses an array using BtrBlocks-inspired compression.
354    ///
355    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
356    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
357        // Canonicalize the array
358        let canonical = array.to_canonical();
359
360        // Compact it, removing any wasted space before we attempt to compress it
361        let compact = canonical.compact()?;
362
363        self.compress_canonical(compact)
364    }
365
366    /// Compresses a canonical array by dispatching to type-specific compressors.
367    ///
368    /// Recursively compresses nested structures and applies optimal schemes for each data type.
369    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
370        match array {
371            Canonical::Null(null_array) => Ok(null_array.into_array()),
372            // TODO(aduffy): Sparse, other bool compressors.
373            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
374            Canonical::Primitive(primitive) => {
375                if primitive.ptype().is_int() {
376                    if self.exclude_int_dict_encoding {
377                        IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
378                    } else {
379                        IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
380                    }
381                } else {
382                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
383                }
384            }
385            Canonical::Decimal(decimal) => compress_decimal(&decimal),
386            Canonical::Struct(struct_array) => {
387                let fields = struct_array
388                    .fields()
389                    .iter()
390                    .map(|field| self.compress(field))
391                    .collect::<Result<Vec<_>, _>>()?;
392
393                Ok(StructArray::try_new(
394                    struct_array.names().clone(),
395                    fields,
396                    struct_array.len(),
397                    struct_array.validity().clone(),
398                )?
399                .into_array())
400            }
401            Canonical::List(list_view_array) => {
402                // TODO(joe): We might want to write list views in the future and chose between
403                // list and list view.
404                let list_array = list_from_list_view(list_view_array);
405
406                // Reset the offsets to remove garbage data that might prevent us from narrowing our
407                // offsets (there could be a large amount of trailing garbage data that the current
408                // views do not reference at all).
409                let list_array = list_array.reset_offsets(true)?;
410
411                let compressed_elems = self.compress(list_array.elements())?;
412
413                // Note that since the type of our offsets are not encoded in our `DType`, and since
414                // we guarantee above that all elements are referenced by offsets, we may narrow the
415                // widths.
416
417                let compressed_offsets = IntCompressor::compress_no_dict(
418                    &list_array.offsets().to_primitive().narrow()?,
419                    false,
420                    MAX_CASCADE,
421                    &[],
422                )?;
423
424                Ok(ListArray::try_new(
425                    compressed_elems,
426                    compressed_offsets,
427                    list_array.validity().clone(),
428                )?
429                .into_array())
430            }
431            Canonical::FixedSizeList(fsl_array) => {
432                let compressed_elems = self.compress(fsl_array.elements())?;
433
434                Ok(FixedSizeListArray::try_new(
435                    compressed_elems,
436                    fsl_array.list_size(),
437                    fsl_array.validity().clone(),
438                    fsl_array.len(),
439                )?
440                .into_array())
441            }
442            Canonical::VarBinView(strings) => {
443                if strings
444                    .dtype()
445                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
446                {
447                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
448                } else {
449                    // Binary arrays do not compress
450                    Ok(strings.into_array())
451                }
452            }
453            Canonical::Extension(ext_array) => {
454                // We compress Timestamp-level arrays with DateTimeParts compression
455                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
456                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
457                {
458                    return compress_temporal(temporal_array);
459                }
460
461                // Compress the underlying storage array.
462                let compressed_storage = self.compress(ext_array.storage())?;
463
464                Ok(
465                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
466                        .into_array(),
467                )
468            }
469        }
470    }
471}