vortex_btrblocks/
float.rs

1mod dictionary;
2mod stats;
3
4use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
5use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
6use vortex_array::{ArrayRef, IntoArray, ToCanonical};
7use vortex_dict::DictArray;
8use vortex_dtype::PType;
9use vortex_error::{VortexExpect, VortexResult, vortex_panic};
10use vortex_runend::RunEndArray;
11use vortex_runend::compress::runend_encode;
12
13use self::stats::FloatStats;
14use crate::float::dictionary::dictionary_encode;
15use crate::integer::{IntCompressor, IntegerStats};
16use crate::patches::compress_patches;
17use crate::{
18    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
19    estimate_compression_ratio_with_sampling, integer,
20};
21
22/// Threshold for the average run length in an array before we consider run-end encoding.
23const RUN_END_THRESHOLD: u32 = 3;
24
25pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26
27impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
28
29pub struct FloatCompressor;
30
31impl Compressor for FloatCompressor {
32    type ArrayVTable = PrimitiveVTable;
33    type SchemeType = dyn FloatScheme;
34    type StatsType = FloatStats;
35
36    fn schemes() -> &'static [&'static Self::SchemeType] {
37        &[
38            &UncompressedScheme,
39            &ConstantScheme,
40            &ALPScheme,
41            &ALPRDScheme,
42            &DictScheme,
43        ]
44    }
45
46    fn default_scheme() -> &'static Self::SchemeType {
47        &UncompressedScheme
48    }
49
50    fn dict_scheme_code() -> FloatCode {
51        DICT_SCHEME
52    }
53}
54
55const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
56const CONSTANT_SCHEME: FloatCode = FloatCode(1);
57const ALP_SCHEME: FloatCode = FloatCode(2);
58const ALPRD_SCHEME: FloatCode = FloatCode(3);
59const DICT_SCHEME: FloatCode = FloatCode(4);
60const RUNEND_SCHEME: FloatCode = FloatCode(5);
61
62#[derive(Debug, Copy, Clone)]
63struct UncompressedScheme;
64
65#[derive(Debug, Copy, Clone)]
66struct ConstantScheme;
67
68#[derive(Debug, Copy, Clone)]
69struct ALPScheme;
70
71#[derive(Debug, Copy, Clone)]
72struct ALPRDScheme;
73
74#[derive(Debug, Copy, Clone)]
75struct DictScheme;
76
77#[derive(Debug, Copy, Clone)]
78struct RunEndScheme;
79
80impl Scheme for UncompressedScheme {
81    type StatsType = FloatStats;
82    type CodeType = FloatCode;
83
84    fn code(&self) -> FloatCode {
85        UNCOMPRESSED_SCHEME
86    }
87
88    fn expected_compression_ratio(
89        &self,
90        _stats: &Self::StatsType,
91        _is_sample: bool,
92        _allowed_cascading: usize,
93        _excludes: &[FloatCode],
94    ) -> VortexResult<f64> {
95        Ok(1.0)
96    }
97
98    fn compress(
99        &self,
100        stats: &Self::StatsType,
101        _is_sample: bool,
102        _allowed_cascading: usize,
103        _excludes: &[FloatCode],
104    ) -> VortexResult<ArrayRef> {
105        Ok(stats.source().to_array())
106    }
107}
108
109impl Scheme for ConstantScheme {
110    type StatsType = FloatStats;
111    type CodeType = FloatCode;
112
113    fn code(&self) -> FloatCode {
114        CONSTANT_SCHEME
115    }
116
117    fn expected_compression_ratio(
118        &self,
119        stats: &Self::StatsType,
120        is_sample: bool,
121        _allowed_cascading: usize,
122        _excludes: &[FloatCode],
123    ) -> VortexResult<f64> {
124        // Never select Constant when sampling
125        if is_sample {
126            return Ok(0.0);
127        }
128
129        // Can only have 1 distinct value
130        if stats.distinct_values_count > 1 {
131            return Ok(0.0);
132        }
133
134        // Cannot have mix of nulls and non-nulls
135        if stats.null_count > 0 && stats.value_count > 0 {
136            return Ok(0.0);
137        }
138
139        Ok(stats.value_count as f64)
140    }
141
142    fn compress(
143        &self,
144        stats: &Self::StatsType,
145        _is_sample: bool,
146        _allowed_cascading: usize,
147        _excludes: &[FloatCode],
148    ) -> VortexResult<ArrayRef> {
149        let scalar = stats
150            .source()
151            .as_constant()
152            .vortex_expect("must be constant");
153
154        Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
155    }
156}
157
158#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
159pub struct FloatCode(u8);
160
161impl Scheme for ALPScheme {
162    type StatsType = FloatStats;
163    type CodeType = FloatCode;
164
165    fn code(&self) -> FloatCode {
166        ALP_SCHEME
167    }
168
169    fn expected_compression_ratio(
170        &self,
171        stats: &Self::StatsType,
172        is_sample: bool,
173        allowed_cascading: usize,
174        excludes: &[FloatCode],
175    ) -> VortexResult<f64> {
176        // We don't support ALP for f16
177        if stats.source().ptype() == PType::F16 {
178            return Ok(0.0);
179        }
180
181        if allowed_cascading == 0 {
182            // ALP does not compress on its own, we need to be able to cascade it with
183            // an integer compressor.
184            return Ok(0.0);
185        }
186
187        estimate_compression_ratio_with_sampling(
188            self,
189            stats,
190            is_sample,
191            allowed_cascading,
192            excludes,
193        )
194    }
195
196    fn compress(
197        &self,
198        stats: &FloatStats,
199        is_sample: bool,
200        allowed_cascading: usize,
201        excludes: &[FloatCode],
202    ) -> VortexResult<ArrayRef> {
203        let alp_encoded = ALPEncoding
204            .encode(&stats.source().to_canonical()?, None)?
205            .vortex_expect("Input is a supported floating point array");
206        let alp = alp_encoded.as_::<ALPVTable>();
207        let alp_ints = alp.encoded().to_primitive()?;
208
209        // Compress the ALP ints.
210        // Patches are not compressed. They should be infrequent, and if they are not then we want
211        // to keep them linear for easy indexing.
212        let mut int_excludes = Vec::new();
213        if excludes.contains(&DICT_SCHEME) {
214            int_excludes.push(integer::DictScheme.code());
215        }
216        if excludes.contains(&RUNEND_SCHEME) {
217            int_excludes.push(integer::RunEndScheme.code());
218        }
219
220        let compressed_alp_ints =
221            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
222
223        let patches = alp.patches().map(compress_patches).transpose()?;
224
225        Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
226    }
227}
228
229impl Scheme for ALPRDScheme {
230    type StatsType = FloatStats;
231    type CodeType = FloatCode;
232
233    fn code(&self) -> FloatCode {
234        ALPRD_SCHEME
235    }
236
237    fn expected_compression_ratio(
238        &self,
239        stats: &Self::StatsType,
240        is_sample: bool,
241        allowed_cascading: usize,
242        excludes: &[FloatCode],
243    ) -> VortexResult<f64> {
244        if stats.source().ptype() == PType::F16 {
245            return Ok(0.0);
246        }
247
248        estimate_compression_ratio_with_sampling(
249            self,
250            stats,
251            is_sample,
252            allowed_cascading,
253            excludes,
254        )
255    }
256
257    fn compress(
258        &self,
259        stats: &Self::StatsType,
260        _is_sample: bool,
261        _allowed_cascading: usize,
262        _excludes: &[FloatCode],
263    ) -> VortexResult<ArrayRef> {
264        let encoder = match stats.source().ptype() {
265            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
266            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
267            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
268        };
269
270        let mut alp_rd = encoder.encode(stats.source());
271
272        let patches = alp_rd
273            .left_parts_patches()
274            .map(compress_patches)
275            .transpose()?;
276        alp_rd.replace_left_parts_patches(patches);
277
278        Ok(alp_rd.into_array())
279    }
280}
281
282impl Scheme for DictScheme {
283    type StatsType = FloatStats;
284    type CodeType = FloatCode;
285
286    fn code(&self) -> FloatCode {
287        DICT_SCHEME
288    }
289
290    fn expected_compression_ratio(
291        &self,
292        stats: &Self::StatsType,
293        is_sample: bool,
294        allowed_cascading: usize,
295        excludes: &[FloatCode],
296    ) -> VortexResult<f64> {
297        if stats.value_count == 0 {
298            return Ok(0.0);
299        }
300
301        // If the array is high cardinality (>50% unique values) skip.
302        if stats.distinct_values_count > stats.value_count / 2 {
303            return Ok(0.0);
304        }
305
306        // Take a sample and run compression on the sample to determine before/after size.
307        estimate_compression_ratio_with_sampling(
308            self,
309            stats,
310            is_sample,
311            allowed_cascading,
312            excludes,
313        )
314    }
315
316    fn compress(
317        &self,
318        stats: &Self::StatsType,
319        is_sample: bool,
320        allowed_cascading: usize,
321        _excludes: &[FloatCode],
322    ) -> VortexResult<ArrayRef> {
323        let dict_array = dictionary_encode(stats)?;
324
325        // Only compress the codes.
326        let codes_stats = IntegerStats::generate_opts(
327            &dict_array.codes().to_primitive()?,
328            GenerateStatsOptions {
329                count_distinct_values: false,
330            },
331        );
332        let codes_scheme = IntCompressor::choose_scheme(
333            &codes_stats,
334            is_sample,
335            allowed_cascading - 1,
336            &[integer::DictScheme.code()],
337        )?;
338        let compressed_codes = codes_scheme.compress(
339            &codes_stats,
340            is_sample,
341            allowed_cascading - 1,
342            &[integer::DictScheme.code()],
343        )?;
344
345        let compressed_values = FloatCompressor::compress(
346            &dict_array.values().to_primitive()?,
347            is_sample,
348            allowed_cascading - 1,
349            &[DICT_SCHEME],
350        )?;
351
352        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
353    }
354}
355
356impl Scheme for RunEndScheme {
357    type StatsType = FloatStats;
358    type CodeType = FloatCode;
359
360    fn code(&self) -> FloatCode {
361        RUNEND_SCHEME
362    }
363
364    fn expected_compression_ratio(
365        &self,
366        stats: &Self::StatsType,
367        is_sample: bool,
368        allowed_cascading: usize,
369        excludes: &[FloatCode],
370    ) -> VortexResult<f64> {
371        if stats.average_run_length < RUN_END_THRESHOLD {
372            return Ok(0.0);
373        }
374
375        estimate_compression_ratio_with_sampling(
376            self,
377            stats,
378            is_sample,
379            allowed_cascading,
380            excludes,
381        )
382    }
383
384    fn compress(
385        &self,
386        stats: &FloatStats,
387        is_sample: bool,
388        allowed_cascading: usize,
389        _excludes: &[FloatCode],
390    ) -> VortexResult<ArrayRef> {
391        let (ends, values) = runend_encode(stats.source())?;
392        // Integer compress the ends, leave the values uncompressed.
393        let compressed_ends = IntCompressor::compress(
394            &ends,
395            is_sample,
396            allowed_cascading - 1,
397            &[
398                integer::RunEndScheme.code(),
399                integer::DictScheme.code(),
400                integer::SparseScheme.code(),
401            ],
402        )?;
403
404        Ok(RunEndArray::try_new(compressed_ends, values)?.into_array())
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use vortex_array::arrays::PrimitiveArray;
411    use vortex_array::validity::Validity;
412    use vortex_array::{Array, IntoArray, ToCanonical};
413    use vortex_buffer::{Buffer, buffer_mut};
414
415    use crate::float::FloatCompressor;
416    use crate::{Compressor, MAX_CASCADE};
417
418    #[test]
419    fn test_empty() {
420        // Make sure empty array compression does not fail
421        let result = FloatCompressor::compress(
422            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
423            false,
424            3,
425            &[],
426        )
427        .unwrap();
428
429        assert!(result.is_empty());
430    }
431
432    #[test]
433    fn test_compress() {
434        let mut values = buffer_mut![1.0f32; 1024];
435        // Sprinkle some other values in.
436        for i in 0..1024 {
437            // Insert 2.0 at all odd positions.
438            // This should force dictionary encoding and exclude run-end due to the
439            // average run length being 1.
440            values[i] = (i % 50) as f32;
441        }
442
443        let floats = values.into_array().to_primitive().unwrap();
444        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
445        println!("compressed: {}", compressed.tree_display())
446    }
447}