vortex_btrblocks/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex's [BtrBlocks]-inspired adaptive compression framework.
7//!
8//! This crate provides a sophisticated multi-level compression system that adaptively selects
9//! optimal compression schemes based on data characteristics. The compressor analyzes arrays
10//! to determine the best encoding strategy, supporting cascaded compression with multiple
11//! encoding layers for maximum efficiency.
12//!
13//! # Key Features
14//!
15//! - **Adaptive Compression**: Automatically selects the best compression scheme based on data patterns
16//! - **Type-Specific Compressors**: Specialized compression for integers, floats, strings, and temporal data
17//! - **Cascaded Encoding**: Multiple compression layers can be applied for optimal results
18//! - **Statistical Analysis**: Uses data sampling and statistics to predict compression ratios
19//! - **Recursive Structure Handling**: Compresses nested structures like structs and lists
20//!
21//! # Example
22//!
23//! ```rust
24//! use vortex_btrblocks::BtrBlocksCompressor;
25//! use vortex_array::Array;
26//!
27//! let compressor = BtrBlocksCompressor::default();
28//! // let compressed = compressor.compress(&array)?;
29//! ```
30//!
31//! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf
32
33use std::fmt::Debug;
34use std::hash::Hash;
35
36use vortex_array::Array;
37use vortex_array::ArrayRef;
38use vortex_array::Canonical;
39use vortex_array::IntoArray;
40use vortex_array::ToCanonical;
41use vortex_array::arrays::ExtensionArray;
42use vortex_array::arrays::FixedSizeListArray;
43use vortex_array::arrays::ListArray;
44use vortex_array::arrays::StructArray;
45use vortex_array::arrays::TemporalArray;
46use vortex_array::arrays::list_from_list_view;
47use vortex_array::vtable::VTable;
48use vortex_array::vtable::ValidityHelper;
49use vortex_dtype::DType;
50use vortex_dtype::Nullability;
51use vortex_dtype::datetime::TemporalMetadata;
52use vortex_error::VortexResult;
53use vortex_error::VortexUnwrap;
54
55use crate::decimal::compress_decimal;
56pub use crate::float::FloatCompressor;
57pub use crate::float::FloatStats;
58pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
59pub use crate::integer::IntCompressor;
60pub use crate::integer::IntegerStats;
61pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
62pub use crate::string::StringCompressor;
63pub use crate::string::StringStats;
64pub use crate::temporal::compress_temporal;
65
66mod decimal;
67mod float;
68mod integer;
69mod patches;
70mod rle;
71mod sample;
72mod string;
73mod temporal;
74
75/// Configures how stats are generated.
76pub struct GenerateStatsOptions {
77    /// Should distinct values should be counted during stats generation.
78    pub count_distinct_values: bool,
79    // pub count_runs: bool,
80    // should this be scheme-specific?
81}
82
83impl Default for GenerateStatsOptions {
84    fn default() -> Self {
85        Self {
86            count_distinct_values: true,
87            // count_runs: true,
88        }
89    }
90}
91
92const SAMPLE_SIZE: u32 = 64;
93
94/// Stats for the compressor.
95pub trait CompressorStats: Debug + Clone {
96    /// The type of the underlying source array vtable.
97    type ArrayVTable: VTable;
98
99    /// Generates stats with default options.
100    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
101        Self::generate_opts(input, GenerateStatsOptions::default())
102    }
103
104    /// Generates stats with provided options.
105    fn generate_opts(
106        input: &<Self::ArrayVTable as VTable>::Array,
107        opts: GenerateStatsOptions,
108    ) -> Self;
109
110    /// Returns the underlying source array that statistics were generated from.
111    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
112
113    /// Sample the array with default options.
114    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
115        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
116    }
117
118    /// Sample the array with provided options.
119    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
120}
121
122/// Top-level compression scheme trait.
123///
124/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
125pub trait Scheme: Debug {
126    /// Type of the stats generated by the compression scheme.
127    type StatsType: CompressorStats;
128    /// Type of the code used to uniquely identify the compression scheme.
129    type CodeType: Copy + Eq + Hash;
130
131    /// Scheme unique identifier.
132    fn code(&self) -> Self::CodeType;
133
134    /// True if this is the singular Constant scheme for this data type.
135    fn is_constant(&self) -> bool {
136        false
137    }
138
139    /// Estimate the compression ratio for running this scheme (and its children)
140    /// for the given input.
141    ///
142    /// Depth is the depth in the encoding tree we've already reached before considering this
143    /// scheme.
144    ///
145    /// Returns the estimated compression ratio as well as the tree of compressors to use.
146    fn expected_compression_ratio(
147        &self,
148        stats: &Self::StatsType,
149        is_sample: bool,
150        allowed_cascading: usize,
151        excludes: &[Self::CodeType],
152    ) -> VortexResult<f64> {
153        estimate_compression_ratio_with_sampling(
154            self,
155            stats,
156            is_sample,
157            allowed_cascading,
158            excludes,
159        )
160    }
161
162    /// Compress the input with this scheme, yielding a new array.
163    fn compress(
164        &self,
165        stats: &Self::StatsType,
166        is_sample: bool,
167        allowed_cascading: usize,
168        excludes: &[Self::CodeType],
169    ) -> VortexResult<ArrayRef>;
170}
171
172fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
173    compressor: &T,
174    stats: &T::StatsType,
175    is_sample: bool,
176    allowed_cascading: usize,
177    excludes: &[T::CodeType],
178) -> VortexResult<f64> {
179    let sample = if is_sample {
180        stats.clone()
181    } else {
182        // We want to sample about 1% of data
183        let source_len = stats.source().len();
184
185        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
186        let sample_count = usize::max(
187            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
188            10,
189        );
190
191        log::trace!(
192            "Sampling {} values out of {}",
193            SAMPLE_SIZE as usize * sample_count,
194            source_len
195        );
196
197        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
198    };
199
200    let after = compressor
201        .compress(&sample, true, allowed_cascading, excludes)?
202        .nbytes();
203    let before = sample.source().nbytes();
204
205    log::debug!(
206        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
207        before as f64 / after as f64
208    );
209
210    Ok(before as f64 / after as f64)
211}
212
213const MAX_CASCADE: usize = 3;
214
215/// A compressor for a particular input type.
216///
217/// This trait defines the interface for type-specific compressors that can adaptively
218/// choose and apply compression schemes based on data characteristics. Compressors
219/// analyze input arrays, select optimal compression schemes, and handle cascading
220/// compression with multiple encoding layers.
221///
222/// The compressor works by generating statistics on the input data, evaluating
223/// available compression schemes, and selecting the one with the best compression ratio.
224pub trait Compressor {
225    /// The VTable type for arrays this compressor operates on.
226    type ArrayVTable: VTable;
227    /// The compression scheme type used by this compressor.
228    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
229    /// The statistics type used to analyze arrays for compression.
230    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
231
232    /// Returns all available compression schemes for this compressor.
233    fn schemes() -> &'static [&'static Self::SchemeType];
234    /// Returns the default fallback compression scheme.
235    fn default_scheme() -> &'static Self::SchemeType;
236    /// Returns the scheme code for dictionary compression.
237    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
238
239    /// Compresses an array using the optimal compression scheme.
240    ///
241    /// Generates statistics on the input array, selects the best compression scheme,
242    /// and applies it. Returns the original array if compression would increase size.
243    fn compress(
244        array: &<Self::ArrayVTable as VTable>::Array,
245        is_sample: bool,
246        allowed_cascading: usize,
247        excludes: &[<Self::SchemeType as Scheme>::CodeType],
248    ) -> VortexResult<ArrayRef>
249    where
250        Self::SchemeType: 'static,
251    {
252        // Avoid compressing empty arrays.
253        if array.is_empty() {
254            return Ok(array.to_array());
255        }
256
257        // Generate stats on the array directly.
258        let stats = if excludes.contains(&Self::dict_scheme_code()) {
259            Self::StatsType::generate_opts(
260                array,
261                GenerateStatsOptions {
262                    count_distinct_values: false,
263                },
264            )
265        } else {
266            Self::StatsType::generate(array)
267        };
268        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
269
270        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
271        if output.nbytes() < array.nbytes() {
272            Ok(output)
273        } else {
274            log::debug!("resulting tree too large: {}", output.display_tree());
275            Ok(array.to_array())
276        }
277    }
278
279    /// Selects the best compression scheme based on expected compression ratios.
280    ///
281    /// Evaluates all available schemes against the provided statistics and returns
282    /// the one with the highest compression ratio. Falls back to the default scheme
283    /// if no scheme provides compression benefits.
284    fn choose_scheme(
285        stats: &Self::StatsType,
286        is_sample: bool,
287        allowed_cascading: usize,
288        excludes: &[<Self::SchemeType as Scheme>::CodeType],
289    ) -> VortexResult<&'static Self::SchemeType> {
290        let mut best_ratio = 1.0;
291        let mut best_scheme: Option<&'static Self::SchemeType> = None;
292
293        // logging helpers
294        let depth = MAX_CASCADE - allowed_cascading;
295
296        for scheme in Self::schemes().iter() {
297            if excludes.contains(&scheme.code()) {
298                continue;
299            }
300
301            // We never choose Constant for a sample
302            if is_sample && scheme.is_constant() {
303                continue;
304            }
305
306            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
307
308            let ratio =
309                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
310            log::trace!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
311
312            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
313                if ratio > best_ratio {
314                    best_ratio = ratio;
315                    best_scheme = Some(*scheme);
316                }
317            } else {
318                log::trace!(
319                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
320                );
321            }
322        }
323
324        log::trace!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
325
326        if let Some(best) = best_scheme {
327            Ok(best)
328        } else {
329            Ok(Self::default_scheme())
330        }
331    }
332}
333
334/// The main compressor type implementing BtrBlocks-inspired compression.
335///
336/// This compressor applies adaptive compression schemes to arrays based on their data types
337/// and characteristics. It recursively compresses nested structures like structs and lists,
338/// and chooses optimal compression schemes for primitive types.
339///
340/// The compressor works by:
341/// 1. Canonicalizing input arrays to a standard representation
342/// 2. Analyzing data characteristics to choose optimal compression schemes
343/// 3. Recursively compressing nested structures
344/// 4. Applying type-specific compression for primitives, strings, and temporal data
345///
346/// # Examples
347///
348/// ```rust
349/// use vortex_btrblocks::BtrBlocksCompressor;
350/// use vortex_array::Array;
351///
352/// let compressor = BtrBlocksCompressor::default();
353/// // let compressed = compressor.compress(&array)?;
354/// ```
355#[derive(Default, Debug, Clone)]
356pub struct BtrBlocksCompressor {
357    /// Whether to exclude ints from dictionary encoding.
358    ///
359    /// When `true`, integer arrays will not use dictionary compression schemes,
360    /// which can be useful when the data has high cardinality or when dictionary
361    /// overhead would exceed compression benefits.
362    pub exclude_int_dict_encoding: bool,
363}
364
365impl BtrBlocksCompressor {
366    /// Compresses an array using BtrBlocks-inspired compression.
367    ///
368    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
369    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
370        // Canonicalize the array
371        let canonical = array.to_canonical();
372
373        // Compact it, removing any wasted space before we attempt to compress it
374        let compact = canonical.compact()?;
375
376        self.compress_canonical(compact)
377    }
378
379    /// Compresses a canonical array by dispatching to type-specific compressors.
380    ///
381    /// Recursively compresses nested structures and applies optimal schemes for each data type.
382    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
383        match array {
384            Canonical::Null(null_array) => Ok(null_array.into_array()),
385            // TODO(aduffy): Sparse, other bool compressors.
386            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
387            Canonical::Primitive(primitive) => {
388                if primitive.ptype().is_int() {
389                    if self.exclude_int_dict_encoding {
390                        IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
391                    } else {
392                        IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
393                    }
394                } else {
395                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
396                }
397            }
398            Canonical::Decimal(decimal) => compress_decimal(&decimal),
399            Canonical::Struct(struct_array) => {
400                let fields = struct_array
401                    .fields()
402                    .iter()
403                    .map(|field| self.compress(field))
404                    .collect::<Result<Vec<_>, _>>()?;
405
406                Ok(StructArray::try_new(
407                    struct_array.names().clone(),
408                    fields,
409                    struct_array.len(),
410                    struct_array.validity().clone(),
411                )?
412                .into_array())
413            }
414            Canonical::List(list_view_array) => {
415                // TODO(joe): We might want to write list views in the future and chose between
416                // list and list view.
417                let list_array = list_from_list_view(list_view_array);
418
419                // Reset the offsets to remove garbage data that might prevent us from narrowing our
420                // offsets (there could be a large amount of trailing garbage data that the current
421                // views do not reference at all).
422                let list_array = list_array.reset_offsets(true)?;
423
424                let compressed_elems = self.compress(list_array.elements())?;
425
426                // Note that since the type of our offsets are not encoded in our `DType`, and since
427                // we guarantee above that all elements are referenced by offsets, we may narrow the
428                // widths.
429
430                let compressed_offsets = IntCompressor::compress_no_dict(
431                    &list_array.offsets().to_primitive().narrow()?,
432                    false,
433                    MAX_CASCADE,
434                    &[],
435                )?;
436
437                Ok(ListArray::try_new(
438                    compressed_elems,
439                    compressed_offsets,
440                    list_array.validity().clone(),
441                )?
442                .into_array())
443            }
444            Canonical::FixedSizeList(fsl_array) => {
445                let compressed_elems = self.compress(fsl_array.elements())?;
446
447                Ok(FixedSizeListArray::try_new(
448                    compressed_elems,
449                    fsl_array.list_size(),
450                    fsl_array.validity().clone(),
451                    fsl_array.len(),
452                )?
453                .into_array())
454            }
455            Canonical::VarBinView(strings) => {
456                if strings
457                    .dtype()
458                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
459                {
460                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
461                } else {
462                    // Binary arrays do not compress
463                    Ok(strings.into_array())
464                }
465            }
466            Canonical::Extension(ext_array) => {
467                // We compress Timestamp-level arrays with DateTimeParts compression
468                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
469                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
470                {
471                    return compress_temporal(temporal_array);
472                }
473
474                // Compress the underlying storage array.
475                let compressed_storage = self.compress(ext_array.storage())?;
476
477                Ok(
478                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
479                        .into_array(),
480                )
481            }
482        }
483    }
484}