vortex_btrblocks/
lib.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3
4use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
5use vortex_array::nbytes::NBytes;
6use vortex_array::variants::{ExtensionArrayTrait, PrimitiveArrayTrait, StructArrayTrait};
7use vortex_array::{Array, ArrayRef, Canonical};
8use vortex_datetime_dtype::TemporalMetadata;
9use vortex_dtype::{DType, Nullability};
10use vortex_error::{VortexExpect, VortexResult};
11
12pub use crate::float::FloatCompressor;
13pub use crate::integer::IntCompressor;
14pub use crate::string::StringCompressor;
15pub use crate::temporal::compress_temporal;
16
17mod downscale;
18mod float;
19pub mod integer;
20mod patches;
21mod sample;
22mod string;
23mod temporal;
24
25pub struct GenerateStatsOptions {
26    pub count_distinct_values: bool,
27    // pub count_runs: bool,
28    // should this be scheme-specific?
29}
30
31impl Default for GenerateStatsOptions {
32    fn default() -> Self {
33        Self {
34            count_distinct_values: true,
35            // count_runs: true,
36        }
37    }
38}
39
40/// Stats for the compressor.
41pub trait CompressorStats: Clone {
42    type ArrayType: Array;
43
44    // Generate with options.
45    fn generate(input: &Self::ArrayType) -> Self {
46        Self::generate_opts(input, GenerateStatsOptions::default())
47    }
48
49    fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self;
50
51    fn source(&self) -> &Self::ArrayType;
52
53    fn sample(&self, sample_size: u16, sample_count: u16) -> Self {
54        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
55    }
56
57    fn sample_opts(&self, sample_size: u16, sample_count: u16, opts: GenerateStatsOptions) -> Self;
58}
59
60/// Top-level compression scheme trait.
61///
62/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
63pub trait Scheme: Debug {
64    type StatsType: CompressorStats;
65    type CodeType: Copy + Eq + Hash;
66
67    /// Scheme unique identifier.
68    fn code(&self) -> Self::CodeType;
69
70    /// True if this is the singular Constant scheme for this data type.
71    fn is_constant(&self) -> bool {
72        false
73    }
74
75    /// Estimate the compression ratio for running this scheme (and its children)
76    /// for the given input.
77    ///
78    /// Depth is the depth in the encoding tree we've already reached before considering this
79    /// scheme.
80    ///
81    /// Returns the estimated compression ratio as well as the tree of compressors to use.
82    fn expected_compression_ratio(
83        &self,
84        stats: &Self::StatsType,
85        is_sample: bool,
86        allowed_cascading: usize,
87        excludes: &[Self::CodeType],
88    ) -> VortexResult<f64> {
89        estimate_compression_ratio_with_sampling(
90            self,
91            stats,
92            is_sample,
93            allowed_cascading,
94            excludes,
95        )
96    }
97
98    /// Compress the input with this scheme, yielding a new array.
99    fn compress(
100        &self,
101        stats: &Self::StatsType,
102        is_sample: bool,
103        allowed_cascading: usize,
104        excludes: &[Self::CodeType],
105    ) -> VortexResult<ArrayRef>;
106}
107
108pub struct SchemeTree {
109    /// Scheme to use for the array.
110    ///
111    /// This is in the type-specific code space, for example either the `IntCompressor` or
112    /// `FloatCompressor` code space.
113    pub scheme: u8,
114    /// Specified schemes to use for children.
115    pub children: Vec<SchemeTree>,
116}
117
118pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
119    compressor: &T,
120    stats: &T::StatsType,
121    is_sample: bool,
122    allowed_cascading: usize,
123    excludes: &[T::CodeType],
124) -> VortexResult<f64> {
125    let sample = if is_sample {
126        stats.clone()
127    } else {
128        stats.sample(64, 10)
129    };
130
131    let after = compressor
132        .compress(&sample, true, allowed_cascading, excludes)?
133        .nbytes();
134    let before = sample.source().nbytes();
135
136    log::debug!(
137        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
138        before as f64 / after as f64
139    );
140
141    Ok(before as f64 / after as f64)
142}
143
144const MAX_CASCADE: usize = 3;
145
146/// A compressor for a particular input type.
147///
148/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
149///
150/// Compressors expose a `compress` function.
151pub trait Compressor {
152    type ArrayType: Array;
153    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
154
155    // Stats type instead?
156    type StatsType: CompressorStats<ArrayType = Self::ArrayType>;
157
158    fn schemes() -> &'static [&'static Self::SchemeType];
159    fn default_scheme() -> &'static Self::SchemeType;
160    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
161
162    fn compress(
163        array: &Self::ArrayType,
164        is_sample: bool,
165        allowed_cascading: usize,
166        excludes: &[<Self::SchemeType as Scheme>::CodeType],
167    ) -> VortexResult<ArrayRef>
168    where
169        Self::SchemeType: 'static,
170    {
171        // Avoid compressing empty arrays.
172        if array.is_empty() {
173            return Ok(array.to_array());
174        }
175
176        // Generate stats on the array directly.
177        let stats = if excludes.contains(&Self::dict_scheme_code()) {
178            Self::StatsType::generate_opts(
179                array,
180                GenerateStatsOptions {
181                    count_distinct_values: false,
182                },
183            )
184        } else {
185            Self::StatsType::generate(array)
186        };
187        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
188
189        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
190        if output.nbytes() < array.nbytes() {
191            Ok(output)
192        } else {
193            log::debug!("resulting tree too large: {}", output.tree_display());
194            Ok(array.to_array())
195        }
196    }
197
198    fn choose_scheme(
199        stats: &Self::StatsType,
200        is_sample: bool,
201        allowed_cascading: usize,
202        excludes: &[<Self::SchemeType as Scheme>::CodeType],
203    ) -> VortexResult<&'static Self::SchemeType> {
204        let mut best_ratio = 1.0;
205        let mut best_scheme: Option<&'static Self::SchemeType> = None;
206
207        // logging helpers
208        let depth = MAX_CASCADE - allowed_cascading;
209
210        for scheme in Self::schemes().iter() {
211            if excludes.contains(&scheme.code()) {
212                continue;
213            }
214
215            // We never choose Constant for a sample
216            if is_sample && scheme.is_constant() {
217                continue;
218            }
219
220            log::debug!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
221
222            let ratio =
223                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
224            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:#?} ratio = {ratio}");
225
226            if ratio > best_ratio {
227                best_ratio = ratio;
228                let _ = best_scheme.insert(*scheme);
229            }
230        }
231
232        log::trace!("depth={depth} best scheme = {best_scheme:#?}  ratio = {best_ratio}");
233
234        if let Some(best) = best_scheme {
235            Ok(best)
236        } else {
237            Ok(Self::default_scheme())
238        }
239    }
240}
241
242#[derive(Debug)]
243pub struct BtrBlocksCompressor;
244
245impl BtrBlocksCompressor {
246    #[allow(clippy::only_used_in_recursion)]
247    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
248        match array.to_canonical()? {
249            Canonical::Null(null_array) => Ok(null_array.into_array()),
250            // TODO(aduffy): Sparse, other bool compressors.
251            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
252            Canonical::Primitive(primitive) => {
253                if primitive.ptype().is_int() {
254                    IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
255                } else {
256                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
257                }
258            }
259            Canonical::Struct(struct_array) => {
260                let mut fields = Vec::new();
261                for idx in 0..struct_array.nfields() {
262                    let field = struct_array
263                        .maybe_null_field_by_idx(idx)
264                        .vortex_expect("field access");
265                    let compressed = self.compress(&field)?;
266                    fields.push(compressed);
267                }
268
269                Ok(StructArray::try_new(
270                    struct_array.names().clone(),
271                    fields,
272                    struct_array.len(),
273                    struct_array.validity().clone(),
274                )?
275                .into_array())
276            }
277            Canonical::List(list_array) => {
278                // Compress the inner
279                let compressed_elems = self.compress(list_array.elements())?;
280                let compressed_offsets = self.compress(list_array.offsets())?;
281
282                Ok(ListArray::try_new(
283                    compressed_elems,
284                    compressed_offsets,
285                    list_array.validity().clone(),
286                )?
287                .into_array())
288            }
289            Canonical::VarBinView(strings) => {
290                if strings
291                    .dtype()
292                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
293                {
294                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
295                } else {
296                    // Binary arrays do not compress
297                    Ok(strings.into_array())
298                }
299            }
300            Canonical::Extension(ext_array) => {
301                // We compress Timestamp-level arrays with DateTimeParts compression
302                if let Ok(temporal_array) =
303                    TemporalArray::try_from(ext_array.to_array().into_array())
304                {
305                    if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
306                        return compress_temporal(temporal_array);
307                    }
308                }
309
310                // Compress the underlying storage array.
311                let compressed_storage = self.compress(ext_array.storage())?;
312
313                Ok(
314                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
315                        .into_array(),
316                )
317            }
318        }
319    }
320}