vortex_btrblocks/
lib.rs

1#![feature(array_chunks)]
2
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
8use vortex_array::compress::downscale_integer_array;
9use vortex_array::vtable::{VTable, ValidityHelper};
10use vortex_array::{Array, ArrayRef, Canonical, IntoArray, ToCanonical};
11use vortex_dtype::datetime::TemporalMetadata;
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexResult, VortexUnwrap};
14
15use crate::decimal::compress_decimal;
16pub use crate::float::FloatCompressor;
17pub use crate::integer::IntCompressor;
18pub use crate::string::StringCompressor;
19pub use crate::temporal::compress_temporal;
20
21mod decimal;
22mod float;
23pub mod integer;
24mod patches;
25mod sample;
26mod string;
27mod temporal;
28
29pub struct GenerateStatsOptions {
30    pub count_distinct_values: bool,
31    // pub count_runs: bool,
32    // should this be scheme-specific?
33}
34
35impl Default for GenerateStatsOptions {
36    fn default() -> Self {
37        Self {
38            count_distinct_values: true,
39            // count_runs: true,
40        }
41    }
42}
43
44const SAMPLE_SIZE: u32 = 64;
45
46/// Stats for the compressor.
47pub trait CompressorStats: Debug + Clone {
48    type ArrayVTable: VTable;
49
50    // Generate with options.
51    fn generate(input: &<Self::ArrayVTable as VTable>::Array) -> Self {
52        Self::generate_opts(input, GenerateStatsOptions::default())
53    }
54
55    fn generate_opts(
56        input: &<Self::ArrayVTable as VTable>::Array,
57        opts: GenerateStatsOptions,
58    ) -> Self;
59
60    fn source(&self) -> &<Self::ArrayVTable as VTable>::Array;
61
62    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
63        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
64    }
65
66    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
67}
68
69/// Top-level compression scheme trait.
70///
71/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
72pub trait Scheme: Debug {
73    type StatsType: CompressorStats;
74    type CodeType: Copy + Eq + Hash;
75
76    /// Scheme unique identifier.
77    fn code(&self) -> Self::CodeType;
78
79    /// True if this is the singular Constant scheme for this data type.
80    fn is_constant(&self) -> bool {
81        false
82    }
83
84    /// Estimate the compression ratio for running this scheme (and its children)
85    /// for the given input.
86    ///
87    /// Depth is the depth in the encoding tree we've already reached before considering this
88    /// scheme.
89    ///
90    /// Returns the estimated compression ratio as well as the tree of compressors to use.
91    fn expected_compression_ratio(
92        &self,
93        stats: &Self::StatsType,
94        is_sample: bool,
95        allowed_cascading: usize,
96        excludes: &[Self::CodeType],
97    ) -> VortexResult<f64> {
98        estimate_compression_ratio_with_sampling(
99            self,
100            stats,
101            is_sample,
102            allowed_cascading,
103            excludes,
104        )
105    }
106
107    /// Compress the input with this scheme, yielding a new array.
108    fn compress(
109        &self,
110        stats: &Self::StatsType,
111        is_sample: bool,
112        allowed_cascading: usize,
113        excludes: &[Self::CodeType],
114    ) -> VortexResult<ArrayRef>;
115}
116
117pub struct SchemeTree {
118    /// Scheme to use for the array.
119    ///
120    /// This is in the type-specific code space, for example either the `IntCompressor` or
121    /// `FloatCompressor` code space.
122    pub scheme: u8,
123    /// Specified schemes to use for children.
124    pub children: Vec<SchemeTree>,
125}
126
127pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
128    compressor: &T,
129    stats: &T::StatsType,
130    is_sample: bool,
131    allowed_cascading: usize,
132    excludes: &[T::CodeType],
133) -> VortexResult<f64> {
134    let sample = if is_sample {
135        stats.clone()
136    } else {
137        // We want to sample about 1% of data
138        let source_len = stats.source().len();
139
140        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
141        let sample_count = usize::max(
142            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
143            10,
144        );
145
146        log::trace!(
147            "Sampling {} values out of {}",
148            SAMPLE_SIZE as usize * sample_count,
149            source_len
150        );
151
152        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
153    };
154
155    let after = compressor
156        .compress(&sample, true, allowed_cascading, excludes)?
157        .nbytes();
158    let before = sample.source().nbytes();
159
160    log::debug!(
161        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
162        before as f64 / after as f64
163    );
164
165    Ok(before as f64 / after as f64)
166}
167
168const MAX_CASCADE: usize = 3;
169
170/// A compressor for a particular input type.
171///
172/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
173///
174/// Compressors expose a `compress` function.
175pub trait Compressor {
176    type ArrayVTable: VTable;
177    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
178
179    // Stats type instead?
180    type StatsType: CompressorStats<ArrayVTable = Self::ArrayVTable>;
181
182    fn schemes() -> &'static [&'static Self::SchemeType];
183    fn default_scheme() -> &'static Self::SchemeType;
184    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
185
186    fn compress(
187        array: &<Self::ArrayVTable as VTable>::Array,
188        is_sample: bool,
189        allowed_cascading: usize,
190        excludes: &[<Self::SchemeType as Scheme>::CodeType],
191    ) -> VortexResult<ArrayRef>
192    where
193        Self::SchemeType: 'static,
194    {
195        // Avoid compressing empty arrays.
196        if array.is_empty() {
197            return Ok(array.to_array());
198        }
199
200        // Generate stats on the array directly.
201        let stats = if excludes.contains(&Self::dict_scheme_code()) {
202            Self::StatsType::generate_opts(
203                array,
204                GenerateStatsOptions {
205                    count_distinct_values: false,
206                },
207            )
208        } else {
209            Self::StatsType::generate(array)
210        };
211        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
212
213        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
214        if output.nbytes() < array.nbytes() {
215            Ok(output)
216        } else {
217            log::debug!("resulting tree too large: {}", output.tree_display());
218            Ok(array.to_array())
219        }
220    }
221
222    fn choose_scheme(
223        stats: &Self::StatsType,
224        is_sample: bool,
225        allowed_cascading: usize,
226        excludes: &[<Self::SchemeType as Scheme>::CodeType],
227    ) -> VortexResult<&'static Self::SchemeType> {
228        let mut best_ratio = 1.0;
229        let mut best_scheme: Option<&'static Self::SchemeType> = None;
230
231        // logging helpers
232        let depth = MAX_CASCADE - allowed_cascading;
233
234        for scheme in Self::schemes().iter() {
235            if excludes.contains(&scheme.code()) {
236                continue;
237            }
238
239            // We never choose Constant for a sample
240            if is_sample && scheme.is_constant() {
241                continue;
242            }
243
244            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
245
246            let ratio =
247                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
248            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
249
250            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
251                if ratio > best_ratio {
252                    best_ratio = ratio;
253                    best_scheme = Some(*scheme);
254                }
255            } else {
256                log::trace!(
257                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
258                );
259            }
260        }
261
262        log::debug!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
263
264        if let Some(best) = best_scheme {
265            Ok(best)
266        } else {
267            Ok(Self::default_scheme())
268        }
269    }
270}
271
272#[derive(Debug)]
273pub struct BtrBlocksCompressor;
274
275impl BtrBlocksCompressor {
276    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
277        self.compress_canonical(array.to_canonical()?)
278    }
279
280    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
281        match array {
282            Canonical::Null(null_array) => Ok(null_array.into_array()),
283            // TODO(aduffy): Sparse, other bool compressors.
284            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
285            Canonical::Primitive(primitive) => {
286                if primitive.ptype().is_int() {
287                    IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
288                } else {
289                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
290                }
291            }
292            Canonical::Decimal(decimal) => compress_decimal(&decimal),
293            Canonical::Struct(struct_array) => {
294                let fields = struct_array
295                    .fields()
296                    .iter()
297                    .map(|field| self.compress(field))
298                    .try_collect()?;
299
300                Ok(StructArray::try_new(
301                    struct_array.names().clone(),
302                    fields,
303                    struct_array.len(),
304                    struct_array.validity().clone(),
305                )?
306                .into_array())
307            }
308            Canonical::List(list_array) => {
309                // Compress the inner
310                let compressed_elems = self.compress(list_array.elements())?;
311                let compressed_offsets = IntCompressor::compress_no_dict(
312                    &downscale_integer_array(list_array.offsets().clone())?.to_primitive()?,
313                    false,
314                    MAX_CASCADE,
315                    &[],
316                )?;
317
318                Ok(ListArray::try_new(
319                    compressed_elems,
320                    compressed_offsets,
321                    list_array.validity().clone(),
322                )?
323                .into_array())
324            }
325            Canonical::VarBinView(strings) => {
326                if strings
327                    .dtype()
328                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
329                {
330                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
331                } else {
332                    // Binary arrays do not compress
333                    Ok(strings.into_array())
334                }
335            }
336            Canonical::Extension(ext_array) => {
337                // We compress Timestamp-level arrays with DateTimeParts compression
338                if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array()) {
339                    if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
340                        return compress_temporal(temporal_array);
341                    }
342                }
343
344                // Compress the underlying storage array.
345                let compressed_storage = self.compress(ext_array.storage())?;
346
347                Ok(
348                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
349                        .into_array(),
350                )
351            }
352        }
353    }
354}