vortex_btrblocks/
lib.rs

1#![feature(array_chunks)]
2
3use std::fmt::Debug;
4use std::hash::Hash;
5
6use vortex_array::arrays::{ExtensionArray, ListArray, StructArray, TemporalArray};
7use vortex_array::nbytes::NBytes;
8use vortex_array::variants::{ExtensionArrayTrait, PrimitiveArrayTrait, StructArrayTrait};
9use vortex_array::{Array, ArrayRef, Canonical};
10use vortex_dtype::datetime::TemporalMetadata;
11use vortex_dtype::{DType, Nullability};
12use vortex_error::{VortexExpect, VortexResult, VortexUnwrap};
13
14pub use crate::float::FloatCompressor;
15pub use crate::integer::IntCompressor;
16pub use crate::string::StringCompressor;
17pub use crate::temporal::compress_temporal;
18
19mod float;
20pub mod integer;
21mod patches;
22mod sample;
23mod string;
24mod temporal;
25
26pub struct GenerateStatsOptions {
27    pub count_distinct_values: bool,
28    // pub count_runs: bool,
29    // should this be scheme-specific?
30}
31
32impl Default for GenerateStatsOptions {
33    fn default() -> Self {
34        Self {
35            count_distinct_values: true,
36            // count_runs: true,
37        }
38    }
39}
40
41const SAMPLE_SIZE: u32 = 64;
42
43/// Stats for the compressor.
44pub trait CompressorStats: Debug + Clone {
45    type ArrayType: Array;
46
47    // Generate with options.
48    fn generate(input: &Self::ArrayType) -> Self {
49        Self::generate_opts(input, GenerateStatsOptions::default())
50    }
51
52    fn generate_opts(input: &Self::ArrayType, opts: GenerateStatsOptions) -> Self;
53
54    fn source(&self) -> &Self::ArrayType;
55
56    fn sample(&self, sample_size: u32, sample_count: u32) -> Self {
57        self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default())
58    }
59
60    fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self;
61}
62
63/// Top-level compression scheme trait.
64///
65/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc.
66pub trait Scheme: Debug {
67    type StatsType: CompressorStats;
68    type CodeType: Copy + Eq + Hash;
69
70    /// Scheme unique identifier.
71    fn code(&self) -> Self::CodeType;
72
73    /// True if this is the singular Constant scheme for this data type.
74    fn is_constant(&self) -> bool {
75        false
76    }
77
78    /// Estimate the compression ratio for running this scheme (and its children)
79    /// for the given input.
80    ///
81    /// Depth is the depth in the encoding tree we've already reached before considering this
82    /// scheme.
83    ///
84    /// Returns the estimated compression ratio as well as the tree of compressors to use.
85    fn expected_compression_ratio(
86        &self,
87        stats: &Self::StatsType,
88        is_sample: bool,
89        allowed_cascading: usize,
90        excludes: &[Self::CodeType],
91    ) -> VortexResult<f64> {
92        estimate_compression_ratio_with_sampling(
93            self,
94            stats,
95            is_sample,
96            allowed_cascading,
97            excludes,
98        )
99    }
100
101    /// Compress the input with this scheme, yielding a new array.
102    fn compress(
103        &self,
104        stats: &Self::StatsType,
105        is_sample: bool,
106        allowed_cascading: usize,
107        excludes: &[Self::CodeType],
108    ) -> VortexResult<ArrayRef>;
109}
110
111pub struct SchemeTree {
112    /// Scheme to use for the array.
113    ///
114    /// This is in the type-specific code space, for example either the `IntCompressor` or
115    /// `FloatCompressor` code space.
116    pub scheme: u8,
117    /// Specified schemes to use for children.
118    pub children: Vec<SchemeTree>,
119}
120
121pub fn estimate_compression_ratio_with_sampling<T: Scheme + ?Sized>(
122    compressor: &T,
123    stats: &T::StatsType,
124    is_sample: bool,
125    allowed_cascading: usize,
126    excludes: &[T::CodeType],
127) -> VortexResult<f64> {
128    let sample = if is_sample {
129        stats.clone()
130    } else {
131        // We want to sample about 1% of data
132        let source_len = stats.source().len();
133
134        // We want to sample about 1% of data, while keeping a minimal sample of 640 values.
135        let sample_count = usize::max(
136            (source_len / 100) / usize::try_from(SAMPLE_SIZE).vortex_unwrap(),
137            10,
138        );
139
140        log::trace!(
141            "Sampling {} values out of {}",
142            SAMPLE_SIZE as usize * sample_count,
143            source_len
144        );
145
146        stats.sample(SAMPLE_SIZE, sample_count.try_into().vortex_unwrap())
147    };
148
149    let after = compressor
150        .compress(&sample, true, allowed_cascading, excludes)?
151        .nbytes();
152    let before = sample.source().nbytes();
153
154    log::debug!(
155        "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}",
156        before as f64 / after as f64
157    );
158
159    Ok(before as f64 / after as f64)
160}
161
162const MAX_CASCADE: usize = 3;
163
164/// A compressor for a particular input type.
165///
166/// The `Input` type should be one of the canonical array variants, e.g. `PrimitiveArray`.
167///
168/// Compressors expose a `compress` function.
169pub trait Compressor {
170    type ArrayType: Array;
171    type SchemeType: Scheme<StatsType = Self::StatsType> + ?Sized;
172
173    // Stats type instead?
174    type StatsType: CompressorStats<ArrayType = Self::ArrayType>;
175
176    fn schemes() -> &'static [&'static Self::SchemeType];
177    fn default_scheme() -> &'static Self::SchemeType;
178    fn dict_scheme_code() -> <Self::SchemeType as Scheme>::CodeType;
179
180    fn compress(
181        array: &Self::ArrayType,
182        is_sample: bool,
183        allowed_cascading: usize,
184        excludes: &[<Self::SchemeType as Scheme>::CodeType],
185    ) -> VortexResult<ArrayRef>
186    where
187        Self::SchemeType: 'static,
188    {
189        // Avoid compressing empty arrays.
190        if array.is_empty() {
191            return Ok(array.to_array());
192        }
193
194        // Generate stats on the array directly.
195        let stats = if excludes.contains(&Self::dict_scheme_code()) {
196            Self::StatsType::generate_opts(
197                array,
198                GenerateStatsOptions {
199                    count_distinct_values: false,
200                },
201            )
202        } else {
203            Self::StatsType::generate(array)
204        };
205        let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
206
207        let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
208        if output.nbytes() < array.nbytes() {
209            Ok(output)
210        } else {
211            log::debug!("resulting tree too large: {}", output.tree_display());
212            Ok(array.to_array())
213        }
214    }
215
216    fn choose_scheme(
217        stats: &Self::StatsType,
218        is_sample: bool,
219        allowed_cascading: usize,
220        excludes: &[<Self::SchemeType as Scheme>::CodeType],
221    ) -> VortexResult<&'static Self::SchemeType> {
222        let mut best_ratio = 1.0;
223        let mut best_scheme: Option<&'static Self::SchemeType> = None;
224
225        // logging helpers
226        let depth = MAX_CASCADE - allowed_cascading;
227
228        for scheme in Self::schemes().iter() {
229            if excludes.contains(&scheme.code()) {
230                continue;
231            }
232
233            // We never choose Constant for a sample
234            if is_sample && scheme.is_constant() {
235                continue;
236            }
237
238            log::trace!("depth={depth} is_sample={is_sample} trying scheme: {scheme:#?}",);
239
240            let ratio =
241                scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?;
242            log::debug!("depth={depth} is_sample={is_sample} scheme: {scheme:?} ratio = {ratio}");
243
244            if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) {
245                if ratio > best_ratio {
246                    best_ratio = ratio;
247                    best_scheme = Some(*scheme);
248                }
249            } else {
250                log::trace!(
251                    "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan."
252                );
253            }
254        }
255
256        log::debug!("depth={depth} best scheme = {best_scheme:?}  ratio = {best_ratio}");
257
258        if let Some(best) = best_scheme {
259            Ok(best)
260        } else {
261            Ok(Self::default_scheme())
262        }
263    }
264}
265
266#[derive(Debug)]
267pub struct BtrBlocksCompressor;
268
269impl BtrBlocksCompressor {
270    pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
271        self.compress_canonical(array.to_canonical()?)
272    }
273
274    pub fn compress_canonical(&self, array: Canonical) -> VortexResult<ArrayRef> {
275        match array {
276            Canonical::Null(null_array) => Ok(null_array.into_array()),
277            // TODO(aduffy): Sparse, other bool compressors.
278            Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
279            Canonical::Primitive(primitive) => {
280                if primitive.ptype().is_int() {
281                    IntCompressor::compress(&primitive, false, MAX_CASCADE, &[])
282                } else {
283                    FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[])
284                }
285            }
286            Canonical::Struct(struct_array) => {
287                let mut fields = Vec::new();
288                for idx in 0..struct_array.nfields() {
289                    let field = struct_array
290                        .maybe_null_field_by_idx(idx)
291                        .vortex_expect("field access");
292                    let compressed = self.compress(&field)?;
293                    fields.push(compressed);
294                }
295
296                Ok(StructArray::try_new(
297                    struct_array.names().clone(),
298                    fields,
299                    struct_array.len(),
300                    struct_array.validity().clone(),
301                )?
302                .into_array())
303            }
304            Canonical::List(list_array) => {
305                // Compress the inner
306                let compressed_elems = self.compress(list_array.elements())?;
307                let compressed_offsets = self.compress(list_array.offsets())?;
308
309                Ok(ListArray::try_new(
310                    compressed_elems,
311                    compressed_offsets,
312                    list_array.validity().clone(),
313                )?
314                .into_array())
315            }
316            Canonical::VarBinView(strings) => {
317                if strings
318                    .dtype()
319                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
320                {
321                    StringCompressor::compress(&strings, false, MAX_CASCADE, &[])
322                } else {
323                    // Binary arrays do not compress
324                    Ok(strings.into_array())
325                }
326            }
327            Canonical::Extension(ext_array) => {
328                // We compress Timestamp-level arrays with DateTimeParts compression
329                if let Ok(temporal_array) =
330                    TemporalArray::try_from(ext_array.to_array().into_array())
331                {
332                    if let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() {
333                        return compress_temporal(temporal_array);
334                    }
335                }
336
337                // Compress the underlying storage array.
338                let compressed_storage = self.compress(ext_array.storage())?;
339
340                Ok(
341                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
342                        .into_array(),
343                )
344            }
345        }
346    }
347}