vortex_btrblocks/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use itertools::Itertools;
8use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
9use vortex_array::compress::downscale_integer_array;
10use vortex_array::vtable::{VTable, ValidityHelper};
11use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
12use vortex_dtype::datetime::TemporalMetadata;
13use vortex_dtype::{DType, Nullability};
14use vortex_error::{VortexResult, VortexUnwrap};
15
16use crate::decimal::compress_decimal;
17pub use crate::float::FloatCompressor;
18pub use crate::integer::IntCompressor;
19pub use crate::string::StringCompressor;
20pub use crate::temporal::compress_temporal;
21
22mod decimal;
23mod float;
24pub mod integer;
25mod patches;
26mod sample;
27mod string;
28mod temporal;
29
30pub struct GenerateStatsOptions {
31    pub count_distinct_values: bool,
32    // pub count_runs: bool,
33    // should this be scheme-specific?
34}
35
36impl Default for GenerateStatsOptions {
37    fn default() -> Self {
38        Self {
39            count_distinct_values: true,
40            // count_runs: true,
41        }
42    }
43}
44
45const SAMPLE_SIZE: u32 = 64;
46
47/// Stats for the compressor.
48pub trait CompressorStats: Debug + Clone {
49    type ArrayVTable: VTable;
50
51    /// Generate stats with default options
52    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
53        Self::generate_opts(input, GenerateStatsOptions::default())
54    }
55
56    /// Generate stats with provided options
57    fn generate_opts(
58        input: &<Self::ArrayVTable as VTable>::Array,
59        opts: GenerateStatsOptions,
60    ) -> Self;
61
62    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
63
64    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
65        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
66    }
67
68    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
69}
70
71/// Top-level compression scheme trait.
72///
73/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
74pub trait Scheme: Debug {
75    type StatsType: CompressorStats;
76    type CodeType: Copy + Eq + Hash;
77
78    /// Scheme unique identifier.
79    fn code(&self) -> Self::CodeType;
80
81    /// True if this is the singular Constant scheme for this data type.
82    fn is_constant(&self) -> bool {
83        false
84    }
85
86    /// Estimate the compression ratio for running this scheme (and its children)
87    /// for the given input.
88    ///
89    /// Depth is the depth in the encoding tree we've already reached before considering this
90    /// scheme.
91    ///
92    /// Returns the estimated compression ratio as well as the tree of compressors to use.
93    fn expected_compression_ratio(
94        &self,
95        stats: &Self::StatsType,
96        is_sample: bool,
97        allowed_cascading: usize,
98        excludes: &[Self::CodeType],
99    ) -> VortexResult<f64> {
100        estimate_compression_ratio_with_sampling(
101            self,
102            stats,
103            is_sample,
104            allowed_cascading,
105            excludes,
106        )
107    }
108
109    /// Compress the input with this scheme, yielding a new array.
110    fn compress(
111        &self,
112        stats: &Self::StatsType,
113        is_sample: bool,
114        allowed_cascading: usize,
115        excludes: &[Self::CodeType],
116    ) -> VortexResult<ArrayRef>;
117}
118
119pub struct SchemeTree {
120    /// Scheme to use for the array.
121    ///
122    /// This is in the type-specific code space, for example either the `IntCompressor` or
123    /// `FloatCompressor` code space.
124    pub scheme: u8,
125    /// Specified schemes to use for children.
126    pub children: Vec<SchemeTree>,
127}
128
129pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
130    compressor: &T,
131    stats: &T::StatsType,
132    is_sample: bool,
133    allowed_cascading: usize,
134    excludes: &[T::CodeType],
135) -> VortexResult<f64> {
136    let sample = if is_sample {
137        stats.clone()
138    } else {
139        // We want to sample about 1% of data
140        let source_len = stats.source().len();
141
142        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
143        let sample_count = usize::max(
144            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
145            10,
146        );
147
148        log::trace!(
149            "Sampling {} values out of {}",
150            SAMPLE_SIZE as usize * sample_count,
151            source_len
152        );
153
154        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
155    };
156
157    let after = compressor
158        .compress(&sample, true, allowed_cascading, excludes)?
159        .nbytes();
160    let before = sample.source().nbytes();
161
162    log::debug!(
163        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
164        before as f64 / after as f64
165    );
166
167    Ok(before as f64 / after as f64)
168}
169
170const MAX_CASCADE: usize = 3;
171
172/// A compressor for a particular input type.
173///
174/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
175///
176/// Compressors expose a `compress` function.
177pub trait Compressor {
178    type ArrayVTable: VTable;
179    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
180
181    // Stats type instead?
182    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
183
184    fn schemes() -> &'static [&'static Self::SchemeType];
185    fn default_scheme() -> &'static Self::SchemeType;
186    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
187
188    fn compress(
189        array: &<Self::ArrayVTable as VTable>::Array,
190        is_sample: bool,
191        allowed_cascading: usize,
192        excludes: &[<Self::SchemeType as Scheme>::CodeType],
193    ) -> VortexResult<ArrayRef>
194    where
195        Self::SchemeType: 'static,
196    {
197        // Avoid compressing empty arrays.
198        if array.is_empty() {
199            return Ok(array.to_array());
200        }
201
202        // Generate stats on the array directly.
203        let stats = if excludes.contains(&Self::dict_scheme_code()) {
204            Self::StatsType::generate_opts(
205                array,
206                GenerateStatsOptions {
207                    count_distinct_values: false,
208                },
209            )
210        } else {
211            Self::StatsType::generate(array)
212        };
213        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
214
215        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
216        if output.nbytes() < array.nbytes() {
217            Ok(output)
218        } else {
219            log::debug!("resulting tree too large: {}", output.display_tree());
220            Ok(array.to_array())
221        }
222    }
223
224    fn choose_scheme(
225        stats: &Self::StatsType,
226        is_sample: bool,
227        allowed_cascading: usize,
228        excludes: &[<Self::SchemeType as Scheme>::CodeType],
229    ) -> VortexResult<&'static Self::SchemeType> {
230        let mut best_ratio = 1.0;
231        let mut best_scheme: Option<&'static Self::SchemeType> = None;
232
233        // logging helpers
234        let depth = MAX_CASCADE - allowed_cascading;
235
236        for scheme in Self::schemes().iter() {
237            if excludes.contains(&scheme.code()) {
238                continue;
239            }
240
241            // We never choose Constant for a sample
242            if is_sample && scheme.is_constant() {
243                continue;
244            }
245
246            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
247
248            let ratio =
249                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
250            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
251
252            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
253                if ratio > best_ratio {
254                    best_ratio = ratio;
255                    best_scheme = Some(*scheme);
256                }
257            } else {
258                log::trace!(
259                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
260                );
261            }
262        }
263
264        log::debug!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
265
266        if let Some(best) = best_scheme {
267            Ok(best)
268        } else {
269            Ok(Self::default_scheme())
270        }
271    }
272}
273
274#[derive(Debug, Clone)]
275pub struct BtrBlocksCompressor;
276
277impl BtrBlocksCompressor {
278    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
279        // Canonicalize the array
280        let canonical = array.to_canonical()?;
281
282        // Compact it, removing any wasted space before we attempt to compress it
283        let compact = canonical.compact()?;
284
285        self.compress_canonical(compact)
286    }
287
288    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
289        match array {
290            Canonical::Null(null_array) => Ok(null_array.into_array()),
291            // TODO(aduffy): Sparse, other bool compressors.
292            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
293            Canonical::Primitive(primitive) => {
294                if primitive.ptype().is_int() {
295                    IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
296                } else {
297                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
298                }
299            }
300            Canonical::Decimal(decimal) => compress_decimal(&decimal),
301            Canonical::Struct(struct_array) => {
302                let fields = struct_array
303                    .fields()
304                    .iter()
305                    .map(|field| self.compress(field))
306                    .try_collect()?;
307
308                Ok(StructArray::try_new(
309                    struct_array.names().clone(),
310                    fields,
311                    struct_array.len(),
312                    struct_array.validity().clone(),
313                )?
314                .into_array())
315            }
316            Canonical::List(list_array) => {
317                // Compress the inner
318                let compressed_elems = self.compress(list_array.elements())?;
319                let compressed_offsets = IntCompressor::compress_no_dict(
320                    &downscale_integer_array(list_array.offsets().clone())?.to_primitive()?,
321                    false,
322                    MAX_CASCADE,
323                    &[],
324                )?;
325
326                Ok(ListArray::try_new(
327                    compressed_elems,
328                    compressed_offsets,
329                    list_array.validity().clone(),
330                )?
331                .into_array())
332            }
333            Canonical::VarBinView(strings) => {
334                if strings
335                    .dtype()
336                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
337                {
338                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
339                } else {
340                    // Binary arrays do not compress
341                    Ok(strings.into_array())
342                }
343            }
344            Canonical::Extension(ext_array) => {
345                // We compress Timestamp-level arrays with DateTimeParts compression
346                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
347                    && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
348                {
349                    return compress_temporal(temporal_array);
350                }
351
352                // Compress the underlying storage array.
353                let compressed_storage = self.compress(ext_array.storage())?;
354
355                Ok(
356                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
357                        .into_array(),
358                )
359            }
360        }
361    }
362}