vortex_btrblocks/
lib.rs

1#![feature(array_chunks)]
2
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
8use vortex_array::vtable::{VTable, ValidityHelper};
9use vortex_array::{Array, ArrayRef, Canonical, IntoArray};
10use vortex_dtype::datetime::TemporalMetadata;
11use vortex_dtype::{DType, Nullability};
12use vortex_error::{VortexResult, VortexUnwrap};
13
14use crate::decimal::compress_decimal;
15pub use crate::float::FloatCompressor;
16pub use crate::integer::IntCompressor;
17pub use crate::string::StringCompressor;
18pub use crate::temporal::compress_temporal;
19
20mod decimal;
21mod float;
22pub mod integer;
23mod patches;
24mod sample;
25mod string;
26mod temporal;
27
28pub struct GenerateStatsOptions {
29    pub count_distinct_values: bool,
30    // pub count_runs: bool,
31    // should this be scheme-specific?
32}
33
34impl Default for GenerateStatsOptions {
35    fn default() -> Self {
36        Self {
37            count_distinct_values: true,
38            // count_runs: true,
39        }
40    }
41}
42
43const SAMPLE_SIZE: u32 = 64;
44
45/// Stats for the compressor.
46pub trait CompressorStats: Debug + Clone {
47    type ArrayVTable: VTable;
48
49    // Generate with options.
50    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
51        Self::generate_opts(input, GenerateStatsOptions::default())
52    }
53
54    fn generate_opts(
55        input: &<Self::ArrayVTable as VTable>::Array,
56        opts: GenerateStatsOptions,
57    ) -> Self;
58
59    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
60
61    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
62        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
63    }
64
65    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
66}
67
68/// Top-level compression scheme trait.
69///
70/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
71pub trait Scheme: Debug {
72    type StatsType: CompressorStats;
73    type CodeType: Copy + Eq + Hash;
74
75    /// Scheme unique identifier.
76    fn code(&self) -> Self::CodeType;
77
78    /// True if this is the singular Constant scheme for this data type.
79    fn is_constant(&self) -> bool {
80        false
81    }
82
83    /// Estimate the compression ratio for running this scheme (and its children)
84    /// for the given input.
85    ///
86    /// Depth is the depth in the encoding tree we've already reached before considering this
87    /// scheme.
88    ///
89    /// Returns the estimated compression ratio as well as the tree of compressors to use.
90    fn expected_compression_ratio(
91        &self,
92        stats: &Self::StatsType,
93        is_sample: bool,
94        allowed_cascading: usize,
95        excludes: &[Self::CodeType],
96    ) -> VortexResult<f64> {
97        estimate_compression_ratio_with_sampling(
98            self,
99            stats,
100            is_sample,
101            allowed_cascading,
102            excludes,
103        )
104    }
105
106    /// Compress the input with this scheme, yielding a new array.
107    fn compress(
108        &self,
109        stats: &Self::StatsType,
110        is_sample: bool,
111        allowed_cascading: usize,
112        excludes: &[Self::CodeType],
113    ) -> VortexResult<ArrayRef>;
114}
115
116pub struct SchemeTree {
117    /// Scheme to use for the array.
118    ///
119    /// This is in the type-specific code space, for example either the `IntCompressor` or
120    /// `FloatCompressor` code space.
121    pub scheme: u8,
122    /// Specified schemes to use for children.
123    pub children: Vec<SchemeTree>,
124}
125
126pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
127    compressor: &T,
128    stats: &T::StatsType,
129    is_sample: bool,
130    allowed_cascading: usize,
131    excludes: &[T::CodeType],
132) -> VortexResult<f64> {
133    let sample = if is_sample {
134        stats.clone()
135    } else {
136        // We want to sample about 1% of data
137        let source_len = stats.source().len();
138
139        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
140        let sample_count = usize::max(
141            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
142            10,
143        );
144
145        log::trace!(
146            "Sampling {} values out of {}",
147            SAMPLE_SIZE as usize * sample_count,
148            source_len
149        );
150
151        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
152    };
153
154    let after = compressor
155        .compress(&sample, true, allowed_cascading, excludes)?
156        .nbytes();
157    let before = sample.source().nbytes();
158
159    log::debug!(
160        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
161        before as f64 / after as f64
162    );
163
164    Ok(before as f64 / after as f64)
165}
166
167const MAX_CASCADE: usize = 3;
168
169/// A compressor for a particular input type.
170///
171/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
172///
173/// Compressors expose a `compress` function.
174pub trait Compressor {
175    type ArrayVTable: VTable;
176    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
177
178    // Stats type instead?
179    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
180
181    fn schemes() -> &'static [&'static Self::SchemeType];
182    fn default_scheme() -> &'static Self::SchemeType;
183    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
184
185    fn compress(
186        array: &<Self::ArrayVTable as VTable>::Array,
187        is_sample: bool,
188        allowed_cascading: usize,
189        excludes: &[<Self::SchemeType as Scheme>::CodeType],
190    ) -> VortexResult<ArrayRef>
191    where
192        Self::SchemeType: 'static,
193    {
194        // Avoid compressing empty arrays.
195        if array.is_empty() {
196            return Ok(array.to_array());
197        }
198
199        // Generate stats on the array directly.
200        let stats = if excludes.contains(&Self::dict_scheme_code()) {
201            Self::StatsType::generate_opts(
202                array,
203                GenerateStatsOptions {
204                    count_distinct_values: false,
205                },
206            )
207        } else {
208            Self::StatsType::generate(array)
209        };
210        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
211
212        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
213        if output.nbytes() < array.nbytes() {
214            Ok(output)
215        } else {
216            log::debug!("resulting tree too large: {}", output.tree_display());
217            Ok(array.to_array())
218        }
219    }
220
221    fn choose_scheme(
222        stats: &Self::StatsType,
223        is_sample: bool,
224        allowed_cascading: usize,
225        excludes: &[<Self::SchemeType as Scheme>::CodeType],
226    ) -> VortexResult<&'static Self::SchemeType> {
227        let mut best_ratio = 1.0;
228        let mut best_scheme: Option<&'static Self::SchemeType> = None;
229
230        // logging helpers
231        let depth = MAX_CASCADE - allowed_cascading;
232
233        for scheme in Self::schemes().iter() {
234            if excludes.contains(&scheme.code()) {
235                continue;
236            }
237
238            // We never choose Constant for a sample
239            if is_sample && scheme.is_constant() {
240                continue;
241            }
242
243            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
244
245            let ratio =
246                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
247            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
248
249            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
250                if ratio > best_ratio {
251                    best_ratio = ratio;
252                    best_scheme = Some(*scheme);
253                }
254            } else {
255                log::trace!(
256                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
257                );
258            }
259        }
260
261        log::debug!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
262
263        if let Some(best) = best_scheme {
264            Ok(best)
265        } else {
266            Ok(Self::default_scheme())
267        }
268    }
269}
270
271#[derive(Debug)]
272pub struct BtrBlocksCompressor;
273
274impl BtrBlocksCompressor {
275    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
276        self.compress_canonical(array.to_canonical()?)
277    }
278
279    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
280        match array {
281            Canonical::Null(null_array) => Ok(null_array.into_array()),
282            // TODO(aduffy): Sparse, other bool compressors.
283            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
284            Canonical::Primitive(primitive) => {
285                if primitive.ptype().is_int() {
286                    IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
287                } else {
288                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
289                }
290            }
291            Canonical::Decimal(decimal) => compress_decimal(&decimal),
292            Canonical::Struct(struct_array) => {
293                let fields = struct_array
294                    .fields()
295                    .iter()
296                    .map(|field| self.compress(field))
297                    .try_collect()?;
298
299                Ok(StructArray::try_new(
300                    struct_array.names().clone(),
301                    fields,
302                    struct_array.len(),
303                    struct_array.validity().clone(),
304                )?
305                .into_array())
306            }
307            Canonical::List(list_array) => {
308                // Compress the inner
309                let compressed_elems = self.compress(list_array.elements())?;
310                let compressed_offsets = self.compress(list_array.offsets())?;
311
312                Ok(ListArray::try_new(
313                    compressed_elems,
314                    compressed_offsets,
315                    list_array.validity().clone(),
316                )?
317                .into_array())
318            }
319            Canonical::VarBinView(strings) => {
320                if strings
321                    .dtype()
322                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
323                {
324                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
325                } else {
326                    // Binary arrays do not compress
327                    Ok(strings.into_array())
328                }
329            }
330            Canonical::Extension(ext_array) => {
331                // We compress Timestamp-level arrays with DateTimeParts compression
332                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array()) {
333                    if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
334                        return compress_temporal(temporal_array);
335                    }
336                }
337
338                // Compress the underlying storage array.
339                let compressed_storage = self.compress(ext_array.storage())?;
340
341                Ok(
342                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
343                        .into_array(),
344                )
345            }
346        }
347    }
348}