vortex_btrblocks/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex's [BtrBlocks]-inspired adaptive compression framework.
7//!
8//! This crate provides a sophisticated multi-level compression system that adaptively selects
9//! optimal compression schemes based on data characteristics. The compressor analyzes arrays
10//! to determine the best encoding strategy, supporting cascaded compression with multiple
11//! encoding layers for maximum efficiency.
12//!
13//! # Key Features
14//!
15//! - **Adaptive Compression**: Automatically selects the best compression scheme based on data patterns
16//! - **Type-Specific Compressors**: Specialized compression for integers, floats, strings, and temporal data
17//! - **Cascaded Encoding**: Multiple compression layers can be applied for optimal results
18//! - **Statistical Analysis**: Uses data sampling and statistics to predict compression ratios
19//! - **Recursive Structure Handling**: Compresses nested structures like structs and lists
20//!
21//! # Example
22//!
23//! ```rust
24//! use vortex_btrblocks::BtrBlocksCompressor;
25//! use vortex_array::Array;
26//!
27//! let compressor = BtrBlocksCompressor::default();
28//! // let compressed = compressor.compress(&array)?;
29//! ```
30//!
31//! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf
32
33use std::fmt::Debug;
34use std::hash::Hash;
35
36use vortex_array::arrays::{
37 ExtensionArray, FixedSizeListArray, ListArray, StructArray, TemporalArray, list_from_list_view,
38};
39use vortex_array::vtable::{VTable, ValidityHelper};
40use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
41use vortex_dtype::datetime::TemporalMetadata;
42use vortex_dtype::{DType, Nullability};
43use vortex_error::{VortexResult, VortexUnwrap};
44
45use crate::decimal::compress_decimal;
46pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
47pub use crate::float::{FloatCompressor, FloatStats};
48pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
49pub use crate::integer::{IntCompressor, IntegerStats};
50pub use crate::string::{StringCompressor, StringStats};
51pub use crate::temporal::compress_temporal;
52
53mod decimal;
54mod float;
55mod integer;
56mod patches;
57mod rle;
58mod sample;
59mod string;
60mod temporal;
61
62/// Configures how stats are generated.
63pub struct GenerateStatsOptions {
64 /// Should distinct values should be counted during stats generation.
65 pub count_distinct_values: bool,
66 // pub count_runs: bool,
67 // should this be scheme-specific?
68}
69
70impl Default for GenerateStatsOptions {
71 fn default() -> Self {
72 Self {
73 count_distinct_values: true,
74 // count_runs: true,
75 }
76 }
77}
78
79const SAMPLE_SIZE: u32 = 64;
80
81/// Stats for the compressor.
82pub trait CompressorStats: Debug + Clone {
83 /// The type of the underlying source array vtable.
84 type ArrayVTable: VTable;
85
86 /// Generates stats with default options.
87 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
88 Self::generate_opts(input, GenerateStatsOptions::default())
89 }
90
91 /// Generates stats with provided options.
92 fn generate_opts(
93 input: &<Self::ArrayVTable as VTable>::Array,
94 opts: GenerateStatsOptions,
95 ) -> Self;
96
97 /// Returns the underlying source array that statistics were generated from.
98 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
99
100 /// Sample the array with default options.
101 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
102 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
103 }
104
105 /// Sample the array with provided options.
106 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
107}
108
109/// Top-level compression scheme trait.
110///
111/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
112pub trait Scheme: Debug {
113 /// Type of the stats generated by the compression scheme.
114 type StatsType: CompressorStats;
115 /// Type of the code used to uniquely identify the compression scheme.
116 type CodeType: Copy + Eq + Hash;
117
118 /// Scheme unique identifier.
119 fn code(&self) -> Self::CodeType;
120
121 /// True if this is the singular Constant scheme for this data type.
122 fn is_constant(&self) -> bool {
123 false
124 }
125
126 /// Estimate the compression ratio for running this scheme (and its children)
127 /// for the given input.
128 ///
129 /// Depth is the depth in the encoding tree we've already reached before considering this
130 /// scheme.
131 ///
132 /// Returns the estimated compression ratio as well as the tree of compressors to use.
133 fn expected_compression_ratio(
134 &self,
135 stats: &Self::StatsType,
136 is_sample: bool,
137 allowed_cascading: usize,
138 excludes: &[Self::CodeType],
139 ) -> VortexResult<f64> {
140 estimate_compression_ratio_with_sampling(
141 self,
142 stats,
143 is_sample,
144 allowed_cascading,
145 excludes,
146 )
147 }
148
149 /// Compress the input with this scheme, yielding a new array.
150 fn compress(
151 &self,
152 stats: &Self::StatsType,
153 is_sample: bool,
154 allowed_cascading: usize,
155 excludes: &[Self::CodeType],
156 ) -> VortexResult<ArrayRef>;
157}
158
159fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
160 compressor: &T,
161 stats: &T::StatsType,
162 is_sample: bool,
163 allowed_cascading: usize,
164 excludes: &[T::CodeType],
165) -> VortexResult<f64> {
166 let sample = if is_sample {
167 stats.clone()
168 } else {
169 // We want to sample about 1% of data
170 let source_len = stats.source().len();
171
172 // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
173 let sample_count = usize::max(
174 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
175 10,
176 );
177
178 log::trace!(
179 "Sampling {} values out of {}",
180 SAMPLE_SIZE as usize * sample_count,
181 source_len
182 );
183
184 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
185 };
186
187 let after = compressor
188 .compress(&sample, true, allowed_cascading, excludes)?
189 .nbytes();
190 let before = sample.source().nbytes();
191
192 log::debug!(
193 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
194 before as f64 / after as f64
195 );
196
197 Ok(before as f64 / after as f64)
198}
199
200const MAX_CASCADE: usize = 3;
201
202/// A compressor for a particular input type.
203///
204/// This trait defines the interface for type-specific compressors that can adaptively
205/// choose and apply compression schemes based on data characteristics. Compressors
206/// analyze input arrays, select optimal compression schemes, and handle cascading
207/// compression with multiple encoding layers.
208///
209/// The compressor works by generating statistics on the input data, evaluating
210/// available compression schemes, and selecting the one with the best compression ratio.
211pub trait Compressor {
212 /// The VTable type for arrays this compressor operates on.
213 type ArrayVTable: VTable;
214 /// The compression scheme type used by this compressor.
215 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
216 /// The statistics type used to analyze arrays for compression.
217 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
218
219 /// Returns all available compression schemes for this compressor.
220 fn schemes() -> &'static [&'static Self::SchemeType];
221 /// Returns the default fallback compression scheme.
222 fn default_scheme() -> &'static Self::SchemeType;
223 /// Returns the scheme code for dictionary compression.
224 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
225
226 /// Compresses an array using the optimal compression scheme.
227 ///
228 /// Generates statistics on the input array, selects the best compression scheme,
229 /// and applies it. Returns the original array if compression would increase size.
230 fn compress(
231 array: &<Self::ArrayVTable as VTable>::Array,
232 is_sample: bool,
233 allowed_cascading: usize,
234 excludes: &[<Self::SchemeType as Scheme>::CodeType],
235 ) -> VortexResult<ArrayRef>
236 where
237 Self::SchemeType: 'static,
238 {
239 // Avoid compressing empty arrays.
240 if array.is_empty() {
241 return Ok(array.to_array());
242 }
243
244 // Generate stats on the array directly.
245 let stats = if excludes.contains(&Self::dict_scheme_code()) {
246 Self::StatsType::generate_opts(
247 array,
248 GenerateStatsOptions {
249 count_distinct_values: false,
250 },
251 )
252 } else {
253 Self::StatsType::generate(array)
254 };
255 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
256
257 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
258 if output.nbytes() < array.nbytes() {
259 Ok(output)
260 } else {
261 log::debug!("resulting tree too large: {}", output.display_tree());
262 Ok(array.to_array())
263 }
264 }
265
266 /// Selects the best compression scheme based on expected compression ratios.
267 ///
268 /// Evaluates all available schemes against the provided statistics and returns
269 /// the one with the highest compression ratio. Falls back to the default scheme
270 /// if no scheme provides compression benefits.
271 fn choose_scheme(
272 stats: &Self::StatsType,
273 is_sample: bool,
274 allowed_cascading: usize,
275 excludes: &[<Self::SchemeType as Scheme>::CodeType],
276 ) -> VortexResult<&'static Self::SchemeType> {
277 let mut best_ratio = 1.0;
278 let mut best_scheme: Option<&'static Self::SchemeType> = None;
279
280 // logging helpers
281 let depth = MAX_CASCADE - allowed_cascading;
282
283 for scheme in Self::schemes().iter() {
284 if excludes.contains(&scheme.code()) {
285 continue;
286 }
287
288 // We never choose Constant for a sample
289 if is_sample && scheme.is_constant() {
290 continue;
291 }
292
293 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
294
295 let ratio =
296 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
297 log::trace!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
298
299 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
300 if ratio > best_ratio {
301 best_ratio = ratio;
302 best_scheme = Some(*scheme);
303 }
304 } else {
305 log::trace!(
306 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
307 );
308 }
309 }
310
311 log::trace!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
312
313 if let Some(best) = best_scheme {
314 Ok(best)
315 } else {
316 Ok(Self::default_scheme())
317 }
318 }
319}
320
321/// The main compressor type implementing BtrBlocks-inspired compression.
322///
323/// This compressor applies adaptive compression schemes to arrays based on their data types
324/// and characteristics. It recursively compresses nested structures like structs and lists,
325/// and chooses optimal compression schemes for primitive types.
326///
327/// The compressor works by:
328/// 1. Canonicalizing input arrays to a standard representation
329/// 2. Analyzing data characteristics to choose optimal compression schemes
330/// 3. Recursively compressing nested structures
331/// 4. Applying type-specific compression for primitives, strings, and temporal data
332///
333/// # Examples
334///
335/// ```rust
336/// use vortex_btrblocks::BtrBlocksCompressor;
337/// use vortex_array::Array;
338///
339/// let compressor = BtrBlocksCompressor::default();
340/// // let compressed = compressor.compress(&array)?;
341/// ```
342#[derive(Default, Debug, Clone)]
343pub struct BtrBlocksCompressor {
344 /// Whether to exclude ints from dictionary encoding.
345 ///
346 /// When `true`, integer arrays will not use dictionary compression schemes,
347 /// which can be useful when the data has high cardinality or when dictionary
348 /// overhead would exceed compression benefits.
349 pub exclude_int_dict_encoding: bool,
350}
351
352impl BtrBlocksCompressor {
353 /// Compresses an array using BtrBlocks-inspired compression.
354 ///
355 /// First canonicalizes and compacts the array, then applies optimal compression schemes.
356 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
357 // Canonicalize the array
358 let canonical = array.to_canonical();
359
360 // Compact it, removing any wasted space before we attempt to compress it
361 let compact = canonical.compact()?;
362
363 self.compress_canonical(compact)
364 }
365
366 /// Compresses a canonical array by dispatching to type-specific compressors.
367 ///
368 /// Recursively compresses nested structures and applies optimal schemes for each data type.
369 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
370 match array {
371 Canonical::Null(null_array) => Ok(null_array.into_array()),
372 // TODO(aduffy): Sparse, other bool compressors.
373 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
374 Canonical::Primitive(primitive) => {
375 if primitive.ptype().is_int() {
376 if self.exclude_int_dict_encoding {
377 IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
378 } else {
379 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
380 }
381 } else {
382 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
383 }
384 }
385 Canonical::Decimal(decimal) => compress_decimal(&decimal),
386 Canonical::Struct(struct_array) => {
387 let fields = struct_array
388 .fields()
389 .iter()
390 .map(|field| self.compress(field))
391 .collect::<Result<Vec<_>, _>>()?;
392
393 Ok(StructArray::try_new(
394 struct_array.names().clone(),
395 fields,
396 struct_array.len(),
397 struct_array.validity().clone(),
398 )?
399 .into_array())
400 }
401 Canonical::List(list_view_array) => {
402 // TODO(joe): We might want to write list views in the future and chose between
403 // list and list view.
404 let list_array = list_from_list_view(list_view_array);
405
406 // Reset the offsets to remove garbage data that might prevent us from narrowing our
407 // offsets (there could be a large amount of trailing garbage data that the current
408 // views do not reference at all).
409 let list_array = list_array.reset_offsets(true)?;
410
411 let compressed_elems = self.compress(list_array.elements())?;
412
413 // Note that since the type of our offsets are not encoded in our `DType`, and since
414 // we guarantee above that all elements are referenced by offsets, we may narrow the
415 // widths.
416
417 let compressed_offsets = IntCompressor::compress_no_dict(
418 &list_array.offsets().to_primitive().narrow()?,
419 false,
420 MAX_CASCADE,
421 &[],
422 )?;
423
424 Ok(ListArray::try_new(
425 compressed_elems,
426 compressed_offsets,
427 list_array.validity().clone(),
428 )?
429 .into_array())
430 }
431 Canonical::FixedSizeList(fsl_array) => {
432 let compressed_elems = self.compress(fsl_array.elements())?;
433
434 Ok(FixedSizeListArray::try_new(
435 compressed_elems,
436 fsl_array.list_size(),
437 fsl_array.validity().clone(),
438 fsl_array.len(),
439 )?
440 .into_array())
441 }
442 Canonical::VarBinView(strings) => {
443 if strings
444 .dtype()
445 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
446 {
447 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
448 } else {
449 // Binary arrays do not compress
450 Ok(strings.into_array())
451 }
452 }
453 Canonical::Extension(ext_array) => {
454 // We compress Timestamp-level arrays with DateTimeParts compression
455 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
456 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
457 {
458 return compress_temporal(temporal_array);
459 }
460
461 // Compress the underlying storage array.
462 let compressed_storage = self.compress(ext_array.storage())?;
463
464 Ok(
465 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
466 .into_array(),
467 )
468 }
469 }
470 }
471}