vortex_btrblocks/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use itertools::Itertools;
8use vortex_array::arrays::{
9    ExtensionArray, FixedSizeListArray, ListArray, StructArray, TemporalArray,
10};
11use vortex_array::vtable::{VTable, ValidityHelper};
12use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
13use vortex_dtype::datetime::TemporalMetadata;
14use vortex_dtype::{DType, Nullability};
15use vortex_error::{VortexResult, VortexUnwrap};
16
17use crate::decimal::compress_decimal;
18pub use crate::float::FloatCompressor;
19pub use crate::integer::IntCompressor;
20pub use crate::string::StringCompressor;
21pub use crate::temporal::compress_temporal;
22
23mod decimal;
24mod float;
25pub mod integer;
26mod patches;
27mod sample;
28mod string;
29mod temporal;
30
31pub struct GenerateStatsOptions {
32    pub count_distinct_values: bool,
33    // pub count_runs: bool,
34    // should this be scheme-specific?
35}
36
37impl Default for GenerateStatsOptions {
38    fn default() -> Self {
39        Self {
40            count_distinct_values: true,
41            // count_runs: true,
42        }
43    }
44}
45
46const SAMPLE_SIZE: u32 = 64;
47
48/// Stats for the compressor.
49pub trait CompressorStats: Debug + Clone {
50    type ArrayVTable: VTable;
51
52    /// Generate stats with default options
53    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
54        Self::generate_opts(input, GenerateStatsOptions::default())
55    }
56
57    /// Generate stats with provided options
58    fn generate_opts(
59        input: &<Self::ArrayVTable as VTable>::Array,
60        opts: GenerateStatsOptions,
61    ) -> Self;
62
63    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
64
65    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
66        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
67    }
68
69    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
70}
71
72/// Top-level compression scheme trait.
73///
74/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
75pub trait Scheme: Debug {
76    type StatsType: CompressorStats;
77    type CodeType: Copy + Eq + Hash;
78
79    /// Scheme unique identifier.
80    fn code(&self) -> Self::CodeType;
81
82    /// True if this is the singular Constant scheme for this data type.
83    fn is_constant(&self) -> bool {
84        false
85    }
86
87    /// Estimate the compression ratio for running this scheme (and its children)
88    /// for the given input.
89    ///
90    /// Depth is the depth in the encoding tree we've already reached before considering this
91    /// scheme.
92    ///
93    /// Returns the estimated compression ratio as well as the tree of compressors to use.
94    fn expected_compression_ratio(
95        &self,
96        stats: &Self::StatsType,
97        is_sample: bool,
98        allowed_cascading: usize,
99        excludes: &[Self::CodeType],
100    ) -> VortexResult<f64> {
101        estimate_compression_ratio_with_sampling(
102            self,
103            stats,
104            is_sample,
105            allowed_cascading,
106            excludes,
107        )
108    }
109
110    /// Compress the input with this scheme, yielding a new array.
111    fn compress(
112        &self,
113        stats: &Self::StatsType,
114        is_sample: bool,
115        allowed_cascading: usize,
116        excludes: &[Self::CodeType],
117    ) -> VortexResult<ArrayRef>;
118}
119
120pub struct SchemeTree {
121    /// Scheme to use for the array.
122    ///
123    /// This is in the type-specific code space, for example either the `IntCompressor` or
124    /// `FloatCompressor` code space.
125    pub scheme: u8,
126    /// Specified schemes to use for children.
127    pub children: Vec<SchemeTree>,
128}
129
130pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
131    compressor: &T,
132    stats: &T::StatsType,
133    is_sample: bool,
134    allowed_cascading: usize,
135    excludes: &[T::CodeType],
136) -> VortexResult<f64> {
137    let sample = if is_sample {
138        stats.clone()
139    } else {
140        // We want to sample about 1% of data
141        let source_len = stats.source().len();
142
143        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
144        let sample_count = usize::max(
145            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
146            10,
147        );
148
149        log::trace!(
150            "Sampling {} values out of {}",
151            SAMPLE_SIZE as usize * sample_count,
152            source_len
153        );
154
155        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
156    };
157
158    let after = compressor
159        .compress(&sample, true, allowed_cascading, excludes)?
160        .nbytes();
161    let before = sample.source().nbytes();
162
163    log::debug!(
164        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
165        before as f64 / after as f64
166    );
167
168    Ok(before as f64 / after as f64)
169}
170
171const MAX_CASCADE: usize = 3;
172
173/// A compressor for a particular input type.
174///
175/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
176///
177/// Compressors expose a `compress` function.
178pub trait Compressor {
179    type ArrayVTable: VTable;
180    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
181
182    // Stats type instead?
183    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
184
185    fn schemes() -> &'static [&'static Self::SchemeType];
186    fn default_scheme() -> &'static Self::SchemeType;
187    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
188
189    fn compress(
190        array: &<Self::ArrayVTable as VTable>::Array,
191        is_sample: bool,
192        allowed_cascading: usize,
193        excludes: &[<Self::SchemeType as Scheme>::CodeType],
194    ) -> VortexResult<ArrayRef>
195    where
196        Self::SchemeType: 'static,
197    {
198        // Avoid compressing empty arrays.
199        if array.is_empty() {
200            return Ok(array.to_array());
201        }
202
203        // Generate stats on the array directly.
204        let stats = if excludes.contains(&Self::dict_scheme_code()) {
205            Self::StatsType::generate_opts(
206                array,
207                GenerateStatsOptions {
208                    count_distinct_values: false,
209                },
210            )
211        } else {
212            Self::StatsType::generate(array)
213        };
214        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
215
216        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
217        if output.nbytes() < array.nbytes() {
218            Ok(output)
219        } else {
220            log::debug!("resulting tree too large: {}", output.display_tree());
221            Ok(array.to_array())
222        }
223    }
224
225    fn choose_scheme(
226        stats: &Self::StatsType,
227        is_sample: bool,
228        allowed_cascading: usize,
229        excludes: &[<Self::SchemeType as Scheme>::CodeType],
230    ) -> VortexResult<&'static Self::SchemeType> {
231        let mut best_ratio = 1.0;
232        let mut best_scheme: Option<&'static Self::SchemeType> = None;
233
234        // logging helpers
235        let depth = MAX_CASCADE - allowed_cascading;
236
237        for scheme in Self::schemes().iter() {
238            if excludes.contains(&scheme.code()) {
239                continue;
240            }
241
242            // We never choose Constant for a sample
243            if is_sample && scheme.is_constant() {
244                continue;
245            }
246
247            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
248
249            let ratio =
250                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
251            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
252
253            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
254                if ratio > best_ratio {
255                    best_ratio = ratio;
256                    best_scheme = Some(*scheme);
257                }
258            } else {
259                log::trace!(
260                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
261                );
262            }
263        }
264
265        log::debug!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
266
267        if let Some(best) = best_scheme {
268            Ok(best)
269        } else {
270            Ok(Self::default_scheme())
271        }
272    }
273}
274
275#[derive(Default, Debug, Clone)]
276pub struct BtrBlocksCompressor {
277    pub exclude_int_dict_encoding: bool,
278}
279
280impl BtrBlocksCompressor {
281    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
282        // Canonicalize the array
283        let canonical = array.to_canonical();
284
285        // Compact it, removing any wasted space before we attempt to compress it
286        let compact = canonical.compact()?;
287
288        self.compress_canonical(compact)
289    }
290
291    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
292        match array {
293            Canonical::Null(null_array) => Ok(null_array.into_array()),
294            // TODO(aduffy): Sparse, other bool compressors.
295            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
296            Canonical::Primitive(primitive) => {
297                if primitive.ptype().is_int() {
298                    if self.exclude_int_dict_encoding {
299                        IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[])
300                    } else {
301                        IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
302                    }
303                } else {
304                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
305                }
306            }
307            Canonical::Decimal(decimal) => compress_decimal(&decimal),
308            Canonical::Struct(struct_array) => {
309                let fields = struct_array
310                    .fields()
311                    .iter()
312                    .map(|field| self.compress(field))
313                    .try_collect()?;
314
315                Ok(StructArray::try_new(
316                    struct_array.names().clone(),
317                    fields,
318                    struct_array.len(),
319                    struct_array.validity().clone(),
320                )?
321                .into_array())
322            }
323            Canonical::List(list_array) => {
324                // Compress the inner
325                let compressed_elems = self.compress(list_array.elements())?;
326                let compressed_offsets = IntCompressor::compress_no_dict(
327                    &list_array.offsets().to_primitive().downcast()?,
328                    false,
329                    MAX_CASCADE,
330                    &[],
331                )?;
332
333                Ok(ListArray::try_new(
334                    compressed_elems,
335                    compressed_offsets,
336                    list_array.validity().clone(),
337                )?
338                .into_array())
339            }
340            Canonical::FixedSizeList(fsl_array) => {
341                let compressed_elems = self.compress(fsl_array.elements())?;
342
343                Ok(FixedSizeListArray::try_new(
344                    compressed_elems,
345                    fsl_array.list_size(),
346                    fsl_array.validity().clone(),
347                    fsl_array.len(),
348                )?
349                .into_array())
350            }
351            Canonical::VarBinView(strings) => {
352                if strings
353                    .dtype()
354                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
355                {
356                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
357                } else {
358                    // Binary arrays do not compress
359                    Ok(strings.into_array())
360                }
361            }
362            Canonical::Extension(ext_array) => {
363                // We compress Timestamp-level arrays with DateTimeParts compression
364                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
365                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
366                {
367                    return compress_temporal(temporal_array);
368                }
369
370                // Compress the underlying storage array.
371                let compressed_storage = self.compress(ext_array.storage())?;
372
373                Ok(
374                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
375                        .into_array(),
376                )
377            }
378        }
379    }
380}