vortex_btrblocks/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use itertools::Itertools;
8use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
9use vortex_array::vtable::{VTable, ValidityHelper};
10use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
11use vortex_dtype::datetime::TemporalMetadata;
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexResult, VortexUnwrap};
14
15use crate::decimal::compress_decimal;
16pub use crate::float::FloatCompressor;
17pub use crate::integer::IntCompressor;
18pub use crate::string::StringCompressor;
19pub use crate::temporal::compress_temporal;
20
21mod decimal;
22mod float;
23pub mod integer;
24mod patches;
25mod sample;
26mod string;
27mod temporal;
28
29pub struct GenerateStatsOptions {
30    pub count_distinct_values: bool,
31    // pub count_runs: bool,
32    // should this be scheme-specific?
33}
34
35impl Default for GenerateStatsOptions {
36    fn default() -> Self {
37        Self {
38            count_distinct_values: true,
39            // count_runs: true,
40        }
41    }
42}
43
44const SAMPLE_SIZE: u32 = 64;
45
46/// Stats for the compressor.
47pub trait CompressorStats: Debug + Clone {
48    type ArrayVTable: VTable;
49
50    /// Generate stats with default options
51    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
52        Self::generate_opts(input, GenerateStatsOptions::default())
53    }
54
55    /// Generate stats with provided options
56    fn generate_opts(
57        input: &<Self::ArrayVTable as VTable>::Array,
58        opts: GenerateStatsOptions,
59    ) -> Self;
60
61    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
62
63    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
64        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
65    }
66
67    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
68}
69
70/// Top-level compression scheme trait.
71///
72/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
73pub trait Scheme: Debug {
74    type StatsType: CompressorStats;
75    type CodeType: Copy + Eq + Hash;
76
77    /// Scheme unique identifier.
78    fn code(&self) -> Self::CodeType;
79
80    /// True if this is the singular Constant scheme for this data type.
81    fn is_constant(&self) -> bool {
82        false
83    }
84
85    /// Estimate the compression ratio for running this scheme (and its children)
86    /// for the given input.
87    ///
88    /// Depth is the depth in the encoding tree we've already reached before considering this
89    /// scheme.
90    ///
91    /// Returns the estimated compression ratio as well as the tree of compressors to use.
92    fn expected_compression_ratio(
93        &self,
94        stats: &Self::StatsType,
95        is_sample: bool,
96        allowed_cascading: usize,
97        excludes: &[Self::CodeType],
98    ) -> VortexResult<f64> {
99        estimate_compression_ratio_with_sampling(
100            self,
101            stats,
102            is_sample,
103            allowed_cascading,
104            excludes,
105        )
106    }
107
108    /// Compress the input with this scheme, yielding a new array.
109    fn compress(
110        &self,
111        stats: &Self::StatsType,
112        is_sample: bool,
113        allowed_cascading: usize,
114        excludes: &[Self::CodeType],
115    ) -> VortexResult<ArrayRef>;
116}
117
118pub struct SchemeTree {
119    /// Scheme to use for the array.
120    ///
121    /// This is in the type-specific code space, for example either the `IntCompressor` or
122    /// `FloatCompressor` code space.
123    pub scheme: u8,
124    /// Specified schemes to use for children.
125    pub children: Vec<SchemeTree>,
126}
127
128pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
129    compressor: &T,
130    stats: &T::StatsType,
131    is_sample: bool,
132    allowed_cascading: usize,
133    excludes: &[T::CodeType],
134) -> VortexResult<f64> {
135    let sample = if is_sample {
136        stats.clone()
137    } else {
138        // We want to sample about 1% of data
139        let source_len = stats.source().len();
140
141        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
142        let sample_count = usize::max(
143            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
144            10,
145        );
146
147        log::trace!(
148            "Sampling {} values out of {}",
149            SAMPLE_SIZE as usize * sample_count,
150            source_len
151        );
152
153        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
154    };
155
156    let after = compressor
157        .compress(&sample, true, allowed_cascading, excludes)?
158        .nbytes();
159    let before = sample.source().nbytes();
160
161    log::debug!(
162        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
163        before as f64 / after as f64
164    );
165
166    Ok(before as f64 / after as f64)
167}
168
169const MAX_CASCADE: usize = 3;
170
171/// A compressor for a particular input type.
172///
173/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
174///
175/// Compressors expose a `compress` function.
176pub trait Compressor {
177    type ArrayVTable: VTable;
178    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
179
180    // Stats type instead?
181    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
182
183    fn schemes() -> &'static [&'static Self::SchemeType];
184    fn default_scheme() -> &'static Self::SchemeType;
185    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
186
187    fn compress(
188        array: &<Self::ArrayVTable as VTable>::Array,
189        is_sample: bool,
190        allowed_cascading: usize,
191        excludes: &[<Self::SchemeType as Scheme>::CodeType],
192    ) -> VortexResult<ArrayRef>
193    where
194        Self::SchemeType: 'static,
195    {
196        // Avoid compressing empty arrays.
197        if array.is_empty() {
198            return Ok(array.to_array());
199        }
200
201        // Generate stats on the array directly.
202        let stats = if excludes.contains(&Self::dict_scheme_code()) {
203            Self::StatsType::generate_opts(
204                array,
205                GenerateStatsOptions {
206                    count_distinct_values: false,
207                },
208            )
209        } else {
210            Self::StatsType::generate(array)
211        };
212        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
213
214        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
215        if output.nbytes() < array.nbytes() {
216            Ok(output)
217        } else {
218            log::debug!("resulting tree too large: {}", output.display_tree());
219            Ok(array.to_array())
220        }
221    }
222
223    fn choose_scheme(
224        stats: &Self::StatsType,
225        is_sample: bool,
226        allowed_cascading: usize,
227        excludes: &[<Self::SchemeType as Scheme>::CodeType],
228    ) -> VortexResult<&'static Self::SchemeType> {
229        let mut best_ratio = 1.0;
230        let mut best_scheme: Option<&'static Self::SchemeType> = None;
231
232        // logging helpers
233        let depth = MAX_CASCADE - allowed_cascading;
234
235        for scheme in Self::schemes().iter() {
236            if excludes.contains(&scheme.code()) {
237                continue;
238            }
239
240            // We never choose Constant for a sample
241            if is_sample && scheme.is_constant() {
242                continue;
243            }
244
245            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
246
247            let ratio =
248                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
249            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
250
251            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
252                if ratio > best_ratio {
253                    best_ratio = ratio;
254                    best_scheme = Some(*scheme);
255                }
256            } else {
257                log::trace!(
258                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
259                );
260            }
261        }
262
263        log::debug!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
264
265        if let Some(best) = best_scheme {
266            Ok(best)
267        } else {
268            Ok(Self::default_scheme())
269        }
270    }
271}
272
273#[derive(Default, Debug, Clone)]
274pub struct BtrBlocksCompressor {
275    pub exclude_int_dict_encoding: bool,
276}
277
278impl BtrBlocksCompressor {
279    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
280        // Canonicalize the array
281        let canonical = array.to_canonical()?;
282
283        // Compact it, removing any wasted space before we attempt to compress it
284        let compact = canonical.compact()?;
285
286        self.compress_canonical(compact)
287    }
288
289    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
290        match array {
291            Canonical::Null(null_array) => Ok(null_array.into_array()),
292            // TODO(aduffy): Sparse, other bool compressors.
293            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
294            Canonical::Primitive(primitive) => {
295                if primitive.ptype().is_int() {
296                    if self.exclude_int_dict_encoding {
297                        IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
298                    } else {
299                        IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
300                    }
301                } else {
302                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
303                }
304            }
305            Canonical::Decimal(decimal) => compress_decimal(&decimal),
306            Canonical::Struct(struct_array) => {
307                let fields = struct_array
308                    .fields()
309                    .iter()
310                    .map(|field| self.compress(field))
311                    .try_collect()?;
312
313                Ok(StructArray::try_new(
314                    struct_array.names().clone(),
315                    fields,
316                    struct_array.len(),
317                    struct_array.validity().clone(),
318                )?
319                .into_array())
320            }
321            Canonical::List(list_array) => {
322                // Compress the inner
323                let compressed_elems = self.compress(list_array.elements())?;
324                let compressed_offsets = IntCompressor::compress_no_dict(
325                    &list_array.offsets().to_primitive()?.downcast()?,
326                    false,
327                    MAX_CASCADE,
328                    &[],
329                )?;
330
331                Ok(ListArray::try_new(
332                    compressed_elems,
333                    compressed_offsets,
334                    list_array.validity().clone(),
335                )?
336                .into_array())
337            }
338            Canonical::VarBinView(strings) => {
339                if strings
340                    .dtype()
341                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
342                {
343                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
344                } else {
345                    // Binary arrays do not compress
346                    Ok(strings.into_array())
347                }
348            }
349            Canonical::Extension(ext_array) => {
350                // We compress Timestamp-level arrays with DateTimeParts compression
351                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
352                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
353                {
354                    return compress_temporal(temporal_array);
355                }
356
357                // Compress the underlying storage array.
358                let compressed_storage = self.compress(ext_array.storage())?;
359
360                Ok(
361                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
362                        .into_array(),
363                )
364            }
365        }
366    }
367}