vortex_btrblocks/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex's [BtrBlocks]-inspired adaptive compression framework.
7//!
8//! This crate provides a sophisticated multi-level compression system that adaptively selects
9//! optimal compression schemes based on data characteristics. The compressor analyzes arrays
10//! to determine the best encoding strategy, supporting cascaded compression with multiple
11//! encoding layers for maximum efficiency.
12//!
13//! # Key Features
14//!
15//! - **Adaptive Compression**: Automatically selects the best compression scheme based on data patterns
16//! - **Type-Specific Compressors**: Specialized compression for integers, floats, strings, and temporal data
17//! - **Cascaded Encoding**: Multiple compression layers can be applied for optimal results
18//! - **Statistical Analysis**: Uses data sampling and statistics to predict compression ratios
19//! - **Recursive Structure Handling**: Compresses nested structures like structs and lists
20//!
21//! # Example
22//!
23//! ```rust
24//! use vortex_btrblocks::BtrBlocksCompressor;
25//! use vortex_array::Array;
26//!
27//! let compressor = BtrBlocksCompressor::default();
28//! // let compressed = compressor.compress(&array)?;
29//! ```
30//!
31//! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf
32
33use std::fmt::Debug;
34use std::hash::Hash;
35
36use vortex_array::Array;
37use vortex_array::ArrayRef;
38use vortex_array::Canonical;
39use vortex_array::IntoArray;
40use vortex_array::ToCanonical;
41use vortex_array::arrays::ExtensionArray;
42use vortex_array::arrays::FixedSizeListArray;
43use vortex_array::arrays::ListArray;
44use vortex_array::arrays::StructArray;
45use vortex_array::arrays::TemporalArray;
46use vortex_array::arrays::list_from_list_view;
47use vortex_array::vtable::VTable;
48use vortex_array::vtable::ValidityHelper;
49use vortex_dtype::DType;
50use vortex_dtype::Nullability;
51use vortex_dtype::datetime::TemporalMetadata;
52use vortex_error::VortexResult;
53use vortex_error::VortexUnwrap;
54
55use crate::decimal::compress_decimal;
56pub use crate::float::FloatCompressor;
57pub use crate::float::FloatStats;
58pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode;
59pub use crate::integer::IntCompressor;
60pub use crate::integer::IntegerStats;
61pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode;
62pub use crate::string::StringCompressor;
63pub use crate::string::StringStats;
64pub use crate::temporal::compress_temporal;
65
66mod decimal;
67mod float;
68mod integer;
69mod patches;
70mod rle;
71mod sample;
72mod string;
73mod temporal;
74
75/// Configures how stats are generated.
76pub struct GenerateStatsOptions {
77 /// Should distinct values should be counted during stats generation.
78 pub count_distinct_values: bool,
79 // pub count_runs: bool,
80 // should this be scheme-specific?
81}
82
83impl Default for GenerateStatsOptions {
84 fn default() -> Self {
85 Self {
86 count_distinct_values: true,
87 // count_runs: true,
88 }
89 }
90}
91
92const SAMPLE_SIZE: u32 = 64;
93
94/// Stats for the compressor.
95pub trait CompressorStats: Debug + Clone {
96 /// The type of the underlying source array vtable.
97 type ArrayVTable: VTable;
98
99 /// Generates stats with default options.
100 fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
101 Self::generate_opts(input, GenerateStatsOptions::default())
102 }
103
104 /// Generates stats with provided options.
105 fn generate_opts(
106 input: &<Self::ArrayVTable as VTable>::Array,
107 opts: GenerateStatsOptions,
108 ) -> Self;
109
110 /// Returns the underlying source array that statistics were generated from.
111 fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
112
113 /// Sample the array with default options.
114 fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
115 self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
116 }
117
118 /// Sample the array with provided options.
119 fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
120}
121
122/// Top-level compression scheme trait.
123///
124/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
125pub trait Scheme: Debug {
126 /// Type of the stats generated by the compression scheme.
127 type StatsType: CompressorStats;
128 /// Type of the code used to uniquely identify the compression scheme.
129 type CodeType: Copy + Eq + Hash;
130
131 /// Scheme unique identifier.
132 fn code(&self) -> Self::CodeType;
133
134 /// True if this is the singular Constant scheme for this data type.
135 fn is_constant(&self) -> bool {
136 false
137 }
138
139 /// Estimate the compression ratio for running this scheme (and its children)
140 /// for the given input.
141 ///
142 /// Depth is the depth in the encoding tree we've already reached before considering this
143 /// scheme.
144 ///
145 /// Returns the estimated compression ratio as well as the tree of compressors to use.
146 fn expected_compression_ratio(
147 &self,
148 stats: &Self::StatsType,
149 is_sample: bool,
150 allowed_cascading: usize,
151 excludes: &[Self::CodeType],
152 ) -> VortexResult<f64> {
153 estimate_compression_ratio_with_sampling(
154 self,
155 stats,
156 is_sample,
157 allowed_cascading,
158 excludes,
159 )
160 }
161
162 /// Compress the input with this scheme, yielding a new array.
163 fn compress(
164 &self,
165 stats: &Self::StatsType,
166 is_sample: bool,
167 allowed_cascading: usize,
168 excludes: &[Self::CodeType],
169 ) -> VortexResult<ArrayRef>;
170}
171
172fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
173 compressor: &T,
174 stats: &T::StatsType,
175 is_sample: bool,
176 allowed_cascading: usize,
177 excludes: &[T::CodeType],
178) -> VortexResult<f64> {
179 let sample = if is_sample {
180 stats.clone()
181 } else {
182 // We want to sample about 1% of data
183 let source_len = stats.source().len();
184
185 // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
186 let sample_count = usize::max(
187 (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
188 10,
189 );
190
191 log::trace!(
192 "Sampling {} values out of {}",
193 SAMPLE_SIZE as usize * sample_count,
194 source_len
195 );
196
197 stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
198 };
199
200 let after = compressor
201 .compress(&sample, true, allowed_cascading, excludes)?
202 .nbytes();
203 let before = sample.source().nbytes();
204
205 log::debug!(
206 "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
207 before as f64 / after as f64
208 );
209
210 Ok(before as f64 / after as f64)
211}
212
213const MAX_CASCADE: usize = 3;
214
215/// A compressor for a particular input type.
216///
217/// This trait defines the interface for type-specific compressors that can adaptively
218/// choose and apply compression schemes based on data characteristics. Compressors
219/// analyze input arrays, select optimal compression schemes, and handle cascading
220/// compression with multiple encoding layers.
221///
222/// The compressor works by generating statistics on the input data, evaluating
223/// available compression schemes, and selecting the one with the best compression ratio.
224pub trait Compressor {
225 /// The VTable type for arrays this compressor operates on.
226 type ArrayVTable: VTable;
227 /// The compression scheme type used by this compressor.
228 type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
229 /// The statistics type used to analyze arrays for compression.
230 type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
231
232 /// Returns all available compression schemes for this compressor.
233 fn schemes() -> &'static [&'static Self::SchemeType];
234 /// Returns the default fallback compression scheme.
235 fn default_scheme() -> &'static Self::SchemeType;
236 /// Returns the scheme code for dictionary compression.
237 fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
238
239 /// Compresses an array using the optimal compression scheme.
240 ///
241 /// Generates statistics on the input array, selects the best compression scheme,
242 /// and applies it. Returns the original array if compression would increase size.
243 fn compress(
244 array: &<Self::ArrayVTable as VTable>::Array,
245 is_sample: bool,
246 allowed_cascading: usize,
247 excludes: &[<Self::SchemeType as Scheme>::CodeType],
248 ) -> VortexResult<ArrayRef>
249 where
250 Self::SchemeType: 'static,
251 {
252 // Avoid compressing empty arrays.
253 if array.is_empty() {
254 return Ok(array.to_array());
255 }
256
257 // Generate stats on the array directly.
258 let stats = if excludes.contains(&Self::dict_scheme_code()) {
259 Self::StatsType::generate_opts(
260 array,
261 GenerateStatsOptions {
262 count_distinct_values: false,
263 },
264 )
265 } else {
266 Self::StatsType::generate(array)
267 };
268 let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
269
270 let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
271 if output.nbytes() < array.nbytes() {
272 Ok(output)
273 } else {
274 log::debug!("resulting tree too large: {}", output.display_tree());
275 Ok(array.to_array())
276 }
277 }
278
279 /// Selects the best compression scheme based on expected compression ratios.
280 ///
281 /// Evaluates all available schemes against the provided statistics and returns
282 /// the one with the highest compression ratio. Falls back to the default scheme
283 /// if no scheme provides compression benefits.
284 fn choose_scheme(
285 stats: &Self::StatsType,
286 is_sample: bool,
287 allowed_cascading: usize,
288 excludes: &[<Self::SchemeType as Scheme>::CodeType],
289 ) -> VortexResult<&'static Self::SchemeType> {
290 let mut best_ratio = 1.0;
291 let mut best_scheme: Option<&'static Self::SchemeType> = None;
292
293 // logging helpers
294 let depth = MAX_CASCADE - allowed_cascading;
295
296 for scheme in Self::schemes().iter() {
297 if excludes.contains(&scheme.code()) {
298 continue;
299 }
300
301 // We never choose Constant for a sample
302 if is_sample && scheme.is_constant() {
303 continue;
304 }
305
306 log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
307
308 let ratio =
309 scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
310 log::trace!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
311
312 if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
313 if ratio > best_ratio {
314 best_ratio = ratio;
315 best_scheme = Some(*scheme);
316 }
317 } else {
318 log::trace!(
319 "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
320 );
321 }
322 }
323
324 log::trace!("depth={depth} best scheme = {best_scheme:?} ratio = {best_ratio}");
325
326 if let Some(best) = best_scheme {
327 Ok(best)
328 } else {
329 Ok(Self::default_scheme())
330 }
331 }
332}
333
334/// The main compressor type implementing BtrBlocks-inspired compression.
335///
336/// This compressor applies adaptive compression schemes to arrays based on their data types
337/// and characteristics. It recursively compresses nested structures like structs and lists,
338/// and chooses optimal compression schemes for primitive types.
339///
340/// The compressor works by:
341/// 1. Canonicalizing input arrays to a standard representation
342/// 2. Analyzing data characteristics to choose optimal compression schemes
343/// 3. Recursively compressing nested structures
344/// 4. Applying type-specific compression for primitives, strings, and temporal data
345///
346/// # Examples
347///
348/// ```rust
349/// use vortex_btrblocks::BtrBlocksCompressor;
350/// use vortex_array::Array;
351///
352/// let compressor = BtrBlocksCompressor::default();
353/// // let compressed = compressor.compress(&array)?;
354/// ```
355#[derive(Default, Debug, Clone)]
356pub struct BtrBlocksCompressor {
357 /// Whether to exclude ints from dictionary encoding.
358 ///
359 /// When `true`, integer arrays will not use dictionary compression schemes,
360 /// which can be useful when the data has high cardinality or when dictionary
361 /// overhead would exceed compression benefits.
362 pub exclude_int_dict_encoding: bool,
363}
364
365impl BtrBlocksCompressor {
366 /// Compresses an array using BtrBlocks-inspired compression.
367 ///
368 /// First canonicalizes and compacts the array, then applies optimal compression schemes.
369 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
370 // Canonicalize the array
371 let canonical = array.to_canonical();
372
373 // Compact it, removing any wasted space before we attempt to compress it
374 let compact = canonical.compact()?;
375
376 self.compress_canonical(compact)
377 }
378
379 /// Compresses a canonical array by dispatching to type-specific compressors.
380 ///
381 /// Recursively compresses nested structures and applies optimal schemes for each data type.
382 pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
383 match array {
384 Canonical::Null(null_array) => Ok(null_array.into_array()),
385 // TODO(aduffy): Sparse, other bool compressors.
386 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
387 Canonical::Primitive(primitive) => {
388 if primitive.ptype().is_int() {
389 if self.exclude_int_dict_encoding {
390 IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
391 } else {
392 IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
393 }
394 } else {
395 FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
396 }
397 }
398 Canonical::Decimal(decimal) => compress_decimal(&decimal),
399 Canonical::Struct(struct_array) => {
400 let fields = struct_array
401 .fields()
402 .iter()
403 .map(|field| self.compress(field))
404 .collect::<Result<Vec<_>, _>>()?;
405
406 Ok(StructArray::try_new(
407 struct_array.names().clone(),
408 fields,
409 struct_array.len(),
410 struct_array.validity().clone(),
411 )?
412 .into_array())
413 }
414 Canonical::List(list_view_array) => {
415 // TODO(joe): We might want to write list views in the future and chose between
416 // list and list view.
417 let list_array = list_from_list_view(list_view_array);
418
419 // Reset the offsets to remove garbage data that might prevent us from narrowing our
420 // offsets (there could be a large amount of trailing garbage data that the current
421 // views do not reference at all).
422 let list_array = list_array.reset_offsets(true)?;
423
424 let compressed_elems = self.compress(list_array.elements())?;
425
426 // Note that since the type of our offsets are not encoded in our `DType`, and since
427 // we guarantee above that all elements are referenced by offsets, we may narrow the
428 // widths.
429
430 let compressed_offsets = IntCompressor::compress_no_dict(
431 &list_array.offsets().to_primitive().narrow()?,
432 false,
433 MAX_CASCADE,
434 &[],
435 )?;
436
437 Ok(ListArray::try_new(
438 compressed_elems,
439 compressed_offsets,
440 list_array.validity().clone(),
441 )?
442 .into_array())
443 }
444 Canonical::FixedSizeList(fsl_array) => {
445 let compressed_elems = self.compress(fsl_array.elements())?;
446
447 Ok(FixedSizeListArray::try_new(
448 compressed_elems,
449 fsl_array.list_size(),
450 fsl_array.validity().clone(),
451 fsl_array.len(),
452 )?
453 .into_array())
454 }
455 Canonical::VarBinView(strings) => {
456 if strings
457 .dtype()
458 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
459 {
460 StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
461 } else {
462 // Binary arrays do not compress
463 Ok(strings.into_array())
464 }
465 }
466 Canonical::Extension(ext_array) => {
467 // We compress Timestamp-level arrays with DateTimeParts compression
468 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
469 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
470 {
471 return compress_temporal(temporal_array);
472 }
473
474 // Compress the underlying storage array.
475 let compressed_storage = self.compress(ext_array.storage())?;
476
477 Ok(
478 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
479 .into_array(),
480 )
481 }
482 }
483 }
484}