vortex_btrblocks/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::ALPArray;
8use vortex_alp::ALPVTable;
9use vortex_alp::RDEncoder;
10use vortex_array::ArrayRef;
11use vortex_array::IntoArray;
12use vortex_array::ToCanonical;
13use vortex_array::arrays::ConstantArray;
14use vortex_array::arrays::DictArray;
15use vortex_array::arrays::MaskedArray;
16use vortex_array::arrays::PrimitiveVTable;
17use vortex_array::vtable::ArrayVTableExt;
18use vortex_array::vtable::ValidityHelper;
19use vortex_dtype::PType;
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_error::vortex_panic;
23use vortex_scalar::Scalar;
24use vortex_sparse::SparseArray;
25use vortex_sparse::SparseVTable;
26
27pub use self::stats::FloatStats;
28use crate::Compressor;
29use crate::CompressorStats;
30use crate::GenerateStatsOptions;
31use crate::Scheme;
32use crate::estimate_compression_ratio_with_sampling;
33use crate::float::dictionary::dictionary_encode;
34use crate::integer;
35use crate::integer::IntCompressor;
36use crate::integer::IntegerStats;
37use crate::patches::compress_patches;
38use crate::rle::RLEScheme;
39
40pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
41
42impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
43
44/// [`Compressor`] for floating-point numbers.
45pub struct FloatCompressor;
46
47impl Compressor for FloatCompressor {
48    type ArrayVTable = PrimitiveVTable;
49    type SchemeType = dyn FloatScheme;
50    type StatsType = FloatStats;
51
52    fn schemes() -> &'static [&'static Self::SchemeType] {
53        &[
54            &UncompressedScheme,
55            &ConstantScheme,
56            &ALPScheme,
57            &ALPRDScheme,
58            &DictScheme,
59            &NullDominated,
60            &RLE_FLOAT_SCHEME,
61        ]
62    }
63
64    fn default_scheme() -> &'static Self::SchemeType {
65        &UncompressedScheme
66    }
67
68    fn dict_scheme_code() -> FloatCode {
69        DICT_SCHEME
70    }
71}
72
73const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
74const CONSTANT_SCHEME: FloatCode = FloatCode(1);
75const ALP_SCHEME: FloatCode = FloatCode(2);
76const ALPRD_SCHEME: FloatCode = FloatCode(3);
77const DICT_SCHEME: FloatCode = FloatCode(4);
78const RUN_END_SCHEME: FloatCode = FloatCode(5);
79const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);
80
81const SPARSE_SCHEME: FloatCode = FloatCode(7);
82
83#[derive(Debug, Copy, Clone)]
84struct UncompressedScheme;
85
86#[derive(Debug, Copy, Clone)]
87struct ConstantScheme;
88
89#[derive(Debug, Copy, Clone)]
90struct ALPScheme;
91
92#[derive(Debug, Copy, Clone)]
93struct ALPRDScheme;
94
95#[derive(Debug, Copy, Clone)]
96struct DictScheme;
97
98#[derive(Debug, Copy, Clone)]
99pub struct NullDominated;
100
101pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
102    RUN_LENGTH_SCHEME,
103    |values, is_sample, allowed_cascading, excludes| {
104        FloatCompressor::compress(values, is_sample, allowed_cascading, excludes)
105    },
106);
107
108impl Scheme for UncompressedScheme {
109    type StatsType = FloatStats;
110    type CodeType = FloatCode;
111
112    fn code(&self) -> FloatCode {
113        UNCOMPRESSED_SCHEME
114    }
115
116    fn expected_compression_ratio(
117        &self,
118        _stats: &Self::StatsType,
119        _is_sample: bool,
120        _allowed_cascading: usize,
121        _excludes: &[FloatCode],
122    ) -> VortexResult<f64> {
123        Ok(1.0)
124    }
125
126    fn compress(
127        &self,
128        stats: &Self::StatsType,
129        _is_sample: bool,
130        _allowed_cascading: usize,
131        _excludes: &[FloatCode],
132    ) -> VortexResult<ArrayRef> {
133        Ok(stats.source().to_array())
134    }
135}
136
137impl Scheme for ConstantScheme {
138    type StatsType = FloatStats;
139    type CodeType = FloatCode;
140
141    fn code(&self) -> FloatCode {
142        CONSTANT_SCHEME
143    }
144
145    fn expected_compression_ratio(
146        &self,
147        stats: &Self::StatsType,
148        is_sample: bool,
149        _allowed_cascading: usize,
150        _excludes: &[FloatCode],
151    ) -> VortexResult<f64> {
152        // Never select Constant when sampling
153        if is_sample {
154            return Ok(0.0);
155        }
156
157        if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
158            return Ok(0.0);
159        }
160
161        // Can only have 1 distinct value
162        if stats.distinct_values_count != 1 {
163            return Ok(0.0);
164        }
165
166        Ok(stats.value_count as f64)
167    }
168
169    fn compress(
170        &self,
171        stats: &Self::StatsType,
172        _is_sample: bool,
173        _allowed_cascading: usize,
174        _excludes: &[FloatCode],
175    ) -> VortexResult<ArrayRef> {
176        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
177
178        match scalar_idx {
179            Some(idx) => {
180                let scalar = stats.source().scalar_at(idx);
181                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
182                if !stats.source().all_valid() {
183                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
184                } else {
185                    Ok(const_arr)
186                }
187            }
188            None => Ok(ConstantArray::new(
189                Scalar::null(stats.src.dtype().clone()),
190                stats.src.len(),
191            )
192            .into_array()),
193        }
194    }
195}
196
197#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
198pub struct FloatCode(u8);
199
200impl Scheme for ALPScheme {
201    type StatsType = FloatStats;
202    type CodeType = FloatCode;
203
204    fn code(&self) -> FloatCode {
205        ALP_SCHEME
206    }
207
208    fn expected_compression_ratio(
209        &self,
210        stats: &Self::StatsType,
211        is_sample: bool,
212        allowed_cascading: usize,
213        excludes: &[FloatCode],
214    ) -> VortexResult<f64> {
215        // We don't support ALP for f16
216        if stats.source().ptype() == PType::F16 {
217            return Ok(0.0);
218        }
219
220        if allowed_cascading == 0 {
221            // ALP does not compress on its own, we need to be able to cascade it with
222            // an integer compressor.
223            return Ok(0.0);
224        }
225
226        estimate_compression_ratio_with_sampling(
227            self,
228            stats,
229            is_sample,
230            allowed_cascading,
231            excludes,
232        )
233    }
234
235    fn compress(
236        &self,
237        stats: &FloatStats,
238        is_sample: bool,
239        allowed_cascading: usize,
240        excludes: &[FloatCode],
241    ) -> VortexResult<ArrayRef> {
242        let alp_encoded = ALPVTable
243            .as_vtable()
244            .encode(&stats.source().to_canonical(), None)?
245            .vortex_expect("Input is a supported floating point array");
246        let alp = alp_encoded.as_::<ALPVTable>();
247        let alp_ints = alp.encoded().to_primitive();
248
249        // Compress the ALP ints.
250        // Patches are not compressed. They should be infrequent, and if they are not then we want
251        // to keep them linear for easy indexing.
252        let mut int_excludes = Vec::new();
253        if excludes.contains(&DICT_SCHEME) {
254            int_excludes.push(integer::DictScheme.code());
255        }
256        if excludes.contains(&RUN_END_SCHEME) {
257            int_excludes.push(integer::RunEndScheme.code());
258        }
259
260        let compressed_alp_ints =
261            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
262
263        let patches = alp.patches().map(compress_patches).transpose()?;
264
265        Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
266    }
267}
268
269impl Scheme for ALPRDScheme {
270    type StatsType = FloatStats;
271    type CodeType = FloatCode;
272
273    fn code(&self) -> FloatCode {
274        ALPRD_SCHEME
275    }
276
277    fn expected_compression_ratio(
278        &self,
279        stats: &Self::StatsType,
280        is_sample: bool,
281        allowed_cascading: usize,
282        excludes: &[FloatCode],
283    ) -> VortexResult<f64> {
284        if stats.source().ptype() == PType::F16 {
285            return Ok(0.0);
286        }
287
288        estimate_compression_ratio_with_sampling(
289            self,
290            stats,
291            is_sample,
292            allowed_cascading,
293            excludes,
294        )
295    }
296
297    fn compress(
298        &self,
299        stats: &Self::StatsType,
300        _is_sample: bool,
301        _allowed_cascading: usize,
302        _excludes: &[FloatCode],
303    ) -> VortexResult<ArrayRef> {
304        let encoder = match stats.source().ptype() {
305            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
306            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
307            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
308        };
309
310        let mut alp_rd = encoder.encode(stats.source());
311
312        let patches = alp_rd
313            .left_parts_patches()
314            .map(compress_patches)
315            .transpose()?;
316        alp_rd.replace_left_parts_patches(patches);
317
318        Ok(alp_rd.into_array())
319    }
320}
321
322impl Scheme for DictScheme {
323    type StatsType = FloatStats;
324    type CodeType = FloatCode;
325
326    fn code(&self) -> FloatCode {
327        DICT_SCHEME
328    }
329
330    fn expected_compression_ratio(
331        &self,
332        stats: &Self::StatsType,
333        is_sample: bool,
334        allowed_cascading: usize,
335        excludes: &[FloatCode],
336    ) -> VortexResult<f64> {
337        if stats.value_count == 0 {
338            return Ok(0.0);
339        }
340
341        // If the array is high cardinality (>50% unique values) skip.
342        if stats.distinct_values_count > stats.value_count / 2 {
343            return Ok(0.0);
344        }
345
346        // Take a sample and run compression on the sample to determine before/after size.
347        estimate_compression_ratio_with_sampling(
348            self,
349            stats,
350            is_sample,
351            allowed_cascading,
352            excludes,
353        )
354    }
355
356    fn compress(
357        &self,
358        stats: &Self::StatsType,
359        is_sample: bool,
360        allowed_cascading: usize,
361        _excludes: &[FloatCode],
362    ) -> VortexResult<ArrayRef> {
363        let dict_array = dictionary_encode(stats);
364
365        // Only compress the codes.
366        let codes_stats = IntegerStats::generate_opts(
367            &dict_array.codes().to_primitive().narrow()?,
368            GenerateStatsOptions {
369                count_distinct_values: false,
370            },
371        );
372        let codes_scheme = IntCompressor::choose_scheme(
373            &codes_stats,
374            is_sample,
375            allowed_cascading - 1,
376            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
377        )?;
378        let compressed_codes = codes_scheme.compress(
379            &codes_stats,
380            is_sample,
381            allowed_cascading - 1,
382            &[integer::DictScheme.code()],
383        )?;
384
385        let compressed_values = FloatCompressor::compress(
386            &dict_array.values().to_primitive(),
387            is_sample,
388            allowed_cascading - 1,
389            &[DICT_SCHEME],
390        )?;
391
392        // SAFETY: compressing codes or values does not alter the invariants
393        unsafe {
394            Ok(
395                DictArray::new_unchecked(compressed_codes, compressed_values)
396                    .set_all_values_referenced(dict_array.has_all_values_referenced())
397                    .into_array(),
398            )
399        }
400    }
401}
402
403impl Scheme for NullDominated {
404    type StatsType = FloatStats;
405    type CodeType = FloatCode;
406
407    fn code(&self) -> Self::CodeType {
408        SPARSE_SCHEME
409    }
410
411    fn expected_compression_ratio(
412        &self,
413        stats: &Self::StatsType,
414        _is_sample: bool,
415        allowed_cascading: usize,
416        _excludes: &[Self::CodeType],
417    ) -> VortexResult<f64> {
418        // Only use `SparseScheme` if we can cascade.
419        if allowed_cascading == 0 {
420            return Ok(0.0);
421        }
422
423        if stats.value_count == 0 {
424            // All nulls should use ConstantScheme
425            return Ok(0.0);
426        }
427
428        // If the majority is null, will compress well.
429        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
430            return Ok(stats.src.len() as f64 / stats.value_count as f64);
431        }
432
433        // Otherwise we don't go this route
434        Ok(0.0)
435    }
436
437    fn compress(
438        &self,
439        stats: &Self::StatsType,
440        is_sample: bool,
441        allowed_cascading: usize,
442        _excludes: &[Self::CodeType],
443    ) -> VortexResult<ArrayRef> {
444        assert!(allowed_cascading > 0);
445
446        // We pass None as we only run this pathway for NULL-dominated float arrays
447        let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
448
449        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
450            // Compress the values
451            let new_excludes = vec![integer::SparseScheme.code()];
452
453            // Don't attempt to compress the non-null values
454
455            let indices = sparse.patches().indices().to_primitive().narrow()?;
456            let compressed_indices = IntCompressor::compress_no_dict(
457                &indices,
458                is_sample,
459                allowed_cascading - 1,
460                &new_excludes,
461            )?;
462
463            SparseArray::try_new(
464                compressed_indices,
465                sparse.patches().values().clone(),
466                sparse.len(),
467                sparse.fill_scalar().clone(),
468            )
469            .map(|a| a.into_array())
470        } else {
471            Ok(sparse_encoded)
472        }
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use std::iter;
479
480    use vortex_array::Array;
481    use vortex_array::IntoArray;
482    use vortex_array::ToCanonical;
483    use vortex_array::arrays::PrimitiveArray;
484    use vortex_array::assert_arrays_eq;
485    use vortex_array::builders::ArrayBuilder;
486    use vortex_array::builders::PrimitiveBuilder;
487    use vortex_array::validity::Validity;
488    use vortex_buffer::Buffer;
489    use vortex_buffer::buffer_mut;
490    use vortex_dtype::Nullability;
491    use vortex_sparse::SparseVTable;
492
493    use crate::Compressor;
494    use crate::CompressorStats;
495    use crate::MAX_CASCADE;
496    use crate::Scheme;
497    use crate::float::FloatCompressor;
498    use crate::float::RLE_FLOAT_SCHEME;
499
500    #[test]
501    fn test_empty() {
502        // Make sure empty array compression does not fail
503        let result = FloatCompressor::compress(
504            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
505            false,
506            3,
507            &[],
508        )
509        .unwrap();
510
511        assert!(result.is_empty());
512    }
513
514    #[test]
515    fn test_compress() {
516        let mut values = buffer_mut![1.0f32; 1024];
517        // Sprinkle some other values in.
518        for i in 0..1024 {
519            // Insert 2.0 at all odd positions.
520            // This should force dictionary encoding and exclude run-end due to the
521            // average run length being 1.
522            values[i] = (i % 50) as f32;
523        }
524
525        let floats = values.into_array().to_primitive();
526        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
527        println!("compressed: {}", compressed.display_tree())
528    }
529
530    #[test]
531    fn test_rle_compression() {
532        let mut values = Vec::new();
533        values.extend(iter::repeat_n(1.5f32, 100));
534        values.extend(iter::repeat_n(2.7f32, 200));
535        values.extend(iter::repeat_n(3.15f32, 150));
536
537        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
538        let stats = crate::float::FloatStats::generate(&array);
539        let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[]).unwrap();
540
541        let decoded = compressed;
542        let expected = Buffer::copy_from(&values).into_array();
543        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
544    }
545
546    #[test]
547    fn test_sparse_compression() {
548        let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
549        array.append_value(f32::NAN);
550        array.append_value(-f32::NAN);
551        array.append_value(f32::INFINITY);
552        array.append_value(-f32::INFINITY);
553        array.append_value(0.0f32);
554        array.append_value(-0.0f32);
555        array.append_nulls(90);
556
557        let floats = array.finish_into_primitive();
558
559        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
560
561        assert!(compressed.is::<SparseVTable>());
562    }
563}