vortex_btrblocks/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, DictArray, MaskedArray, PrimitiveVTable};
9use vortex_array::vtable::ValidityHelper;
10use vortex_array::{ArrayRef, IntoArray, ToCanonical};
11use vortex_dtype::PType;
12use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13use vortex_scalar::Scalar;
14use vortex_sparse::{SparseArray, SparseVTable};
15
16pub use self::stats::FloatStats;
17use crate::float::dictionary::dictionary_encode;
18use crate::integer::{IntCompressor, IntegerStats};
19use crate::patches::compress_patches;
20use crate::rle::RLEScheme;
21use crate::{
22    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
23    estimate_compression_ratio_with_sampling, integer,
24};
25
26pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
27
28impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
29
30/// [`Compressor`] for floating-point numbers.
31pub struct FloatCompressor;
32
33impl Compressor for FloatCompressor {
34    type ArrayVTable = PrimitiveVTable;
35    type SchemeType = dyn FloatScheme;
36    type StatsType = FloatStats;
37
38    fn schemes() -> &'static [&'static Self::SchemeType] {
39        &[
40            &UncompressedScheme,
41            &ConstantScheme,
42            &ALPScheme,
43            &ALPRDScheme,
44            &DictScheme,
45            &NullDominated,
46            &RLE_FLOAT_SCHEME,
47        ]
48    }
49
50    fn default_scheme() -> &'static Self::SchemeType {
51        &UncompressedScheme
52    }
53
54    fn dict_scheme_code() -> FloatCode {
55        DICT_SCHEME
56    }
57}
58
59const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
60const CONSTANT_SCHEME: FloatCode = FloatCode(1);
61const ALP_SCHEME: FloatCode = FloatCode(2);
62const ALPRD_SCHEME: FloatCode = FloatCode(3);
63const DICT_SCHEME: FloatCode = FloatCode(4);
64const RUN_END_SCHEME: FloatCode = FloatCode(5);
65const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);
66
67const SPARSE_SCHEME: FloatCode = FloatCode(7);
68
69#[derive(Debug, Copy, Clone)]
70struct UncompressedScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct ConstantScheme;
74
75#[derive(Debug, Copy, Clone)]
76struct ALPScheme;
77
78#[derive(Debug, Copy, Clone)]
79struct ALPRDScheme;
80
81#[derive(Debug, Copy, Clone)]
82struct DictScheme;
83
84#[derive(Debug, Copy, Clone)]
85pub struct NullDominated;
86
87pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
88    RUN_LENGTH_SCHEME,
89    |values, is_sample, allowed_cascading, excludes| {
90        FloatCompressor::compress(values, is_sample, allowed_cascading, excludes)
91    },
92);
93
94impl Scheme for UncompressedScheme {
95    type StatsType = FloatStats;
96    type CodeType = FloatCode;
97
98    fn code(&self) -> FloatCode {
99        UNCOMPRESSED_SCHEME
100    }
101
102    fn expected_compression_ratio(
103        &self,
104        _stats: &Self::StatsType,
105        _is_sample: bool,
106        _allowed_cascading: usize,
107        _excludes: &[FloatCode],
108    ) -> VortexResult<f64> {
109        Ok(1.0)
110    }
111
112    fn compress(
113        &self,
114        stats: &Self::StatsType,
115        _is_sample: bool,
116        _allowed_cascading: usize,
117        _excludes: &[FloatCode],
118    ) -> VortexResult<ArrayRef> {
119        Ok(stats.source().to_array())
120    }
121}
122
123impl Scheme for ConstantScheme {
124    type StatsType = FloatStats;
125    type CodeType = FloatCode;
126
127    fn code(&self) -> FloatCode {
128        CONSTANT_SCHEME
129    }
130
131    fn expected_compression_ratio(
132        &self,
133        stats: &Self::StatsType,
134        is_sample: bool,
135        _allowed_cascading: usize,
136        _excludes: &[FloatCode],
137    ) -> VortexResult<f64> {
138        // Never select Constant when sampling
139        if is_sample {
140            return Ok(0.0);
141        }
142
143        if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
144            return Ok(0.0);
145        }
146
147        // Can only have 1 distinct value
148        if stats.distinct_values_count != 1 {
149            return Ok(0.0);
150        }
151
152        Ok(stats.value_count as f64)
153    }
154
155    fn compress(
156        &self,
157        stats: &Self::StatsType,
158        _is_sample: bool,
159        _allowed_cascading: usize,
160        _excludes: &[FloatCode],
161    ) -> VortexResult<ArrayRef> {
162        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
163
164        match scalar_idx {
165            Some(idx) => {
166                let scalar = stats.source().scalar_at(idx);
167                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
168                if !stats.source().all_valid() {
169                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
170                } else {
171                    Ok(const_arr)
172                }
173            }
174            None => Ok(ConstantArray::new(
175                Scalar::null(stats.src.dtype().clone()),
176                stats.src.len(),
177            )
178            .into_array()),
179        }
180    }
181}
182
183#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
184pub struct FloatCode(u8);
185
186impl Scheme for ALPScheme {
187    type StatsType = FloatStats;
188    type CodeType = FloatCode;
189
190    fn code(&self) -> FloatCode {
191        ALP_SCHEME
192    }
193
194    fn expected_compression_ratio(
195        &self,
196        stats: &Self::StatsType,
197        is_sample: bool,
198        allowed_cascading: usize,
199        excludes: &[FloatCode],
200    ) -> VortexResult<f64> {
201        // We don't support ALP for f16
202        if stats.source().ptype() == PType::F16 {
203            return Ok(0.0);
204        }
205
206        if allowed_cascading == 0 {
207            // ALP does not compress on its own, we need to be able to cascade it with
208            // an integer compressor.
209            return Ok(0.0);
210        }
211
212        estimate_compression_ratio_with_sampling(
213            self,
214            stats,
215            is_sample,
216            allowed_cascading,
217            excludes,
218        )
219    }
220
221    fn compress(
222        &self,
223        stats: &FloatStats,
224        is_sample: bool,
225        allowed_cascading: usize,
226        excludes: &[FloatCode],
227    ) -> VortexResult<ArrayRef> {
228        let alp_encoded = ALPEncoding
229            .encode(&stats.source().to_canonical(), None)?
230            .vortex_expect("Input is a supported floating point array");
231        let alp = alp_encoded.as_::<ALPVTable>();
232        let alp_ints = alp.encoded().to_primitive();
233
234        // Compress the ALP ints.
235        // Patches are not compressed. They should be infrequent, and if they are not then we want
236        // to keep them linear for easy indexing.
237        let mut int_excludes = Vec::new();
238        if excludes.contains(&DICT_SCHEME) {
239            int_excludes.push(integer::DictScheme.code());
240        }
241        if excludes.contains(&RUN_END_SCHEME) {
242            int_excludes.push(integer::RunEndScheme.code());
243        }
244
245        let compressed_alp_ints =
246            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
247
248        let patches = alp.patches().map(compress_patches).transpose()?;
249
250        Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
251    }
252}
253
254impl Scheme for ALPRDScheme {
255    type StatsType = FloatStats;
256    type CodeType = FloatCode;
257
258    fn code(&self) -> FloatCode {
259        ALPRD_SCHEME
260    }
261
262    fn expected_compression_ratio(
263        &self,
264        stats: &Self::StatsType,
265        is_sample: bool,
266        allowed_cascading: usize,
267        excludes: &[FloatCode],
268    ) -> VortexResult<f64> {
269        if stats.source().ptype() == PType::F16 {
270            return Ok(0.0);
271        }
272
273        estimate_compression_ratio_with_sampling(
274            self,
275            stats,
276            is_sample,
277            allowed_cascading,
278            excludes,
279        )
280    }
281
282    fn compress(
283        &self,
284        stats: &Self::StatsType,
285        _is_sample: bool,
286        _allowed_cascading: usize,
287        _excludes: &[FloatCode],
288    ) -> VortexResult<ArrayRef> {
289        let encoder = match stats.source().ptype() {
290            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
291            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
292            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
293        };
294
295        let mut alp_rd = encoder.encode(stats.source());
296
297        let patches = alp_rd
298            .left_parts_patches()
299            .map(compress_patches)
300            .transpose()?;
301        alp_rd.replace_left_parts_patches(patches);
302
303        Ok(alp_rd.into_array())
304    }
305}
306
307impl Scheme for DictScheme {
308    type StatsType = FloatStats;
309    type CodeType = FloatCode;
310
311    fn code(&self) -> FloatCode {
312        DICT_SCHEME
313    }
314
315    fn expected_compression_ratio(
316        &self,
317        stats: &Self::StatsType,
318        is_sample: bool,
319        allowed_cascading: usize,
320        excludes: &[FloatCode],
321    ) -> VortexResult<f64> {
322        if stats.value_count == 0 {
323            return Ok(0.0);
324        }
325
326        // If the array is high cardinality (>50% unique values) skip.
327        if stats.distinct_values_count > stats.value_count / 2 {
328            return Ok(0.0);
329        }
330
331        // Take a sample and run compression on the sample to determine before/after size.
332        estimate_compression_ratio_with_sampling(
333            self,
334            stats,
335            is_sample,
336            allowed_cascading,
337            excludes,
338        )
339    }
340
341    fn compress(
342        &self,
343        stats: &Self::StatsType,
344        is_sample: bool,
345        allowed_cascading: usize,
346        _excludes: &[FloatCode],
347    ) -> VortexResult<ArrayRef> {
348        let dict_array = dictionary_encode(stats);
349
350        // Only compress the codes.
351        let codes_stats = IntegerStats::generate_opts(
352            &dict_array.codes().to_primitive().narrow()?,
353            GenerateStatsOptions {
354                count_distinct_values: false,
355            },
356        );
357        let codes_scheme = IntCompressor::choose_scheme(
358            &codes_stats,
359            is_sample,
360            allowed_cascading - 1,
361            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
362        )?;
363        let compressed_codes = codes_scheme.compress(
364            &codes_stats,
365            is_sample,
366            allowed_cascading - 1,
367            &[integer::DictScheme.code()],
368        )?;
369
370        let compressed_values = FloatCompressor::compress(
371            &dict_array.values().to_primitive(),
372            is_sample,
373            allowed_cascading - 1,
374            &[DICT_SCHEME],
375        )?;
376
377        // SAFETY: compressing codes or values does not alter the invariants
378        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
379    }
380}
381
382impl Scheme for NullDominated {
383    type StatsType = FloatStats;
384    type CodeType = FloatCode;
385
386    fn code(&self) -> Self::CodeType {
387        SPARSE_SCHEME
388    }
389
390    fn expected_compression_ratio(
391        &self,
392        stats: &Self::StatsType,
393        _is_sample: bool,
394        allowed_cascading: usize,
395        _excludes: &[Self::CodeType],
396    ) -> VortexResult<f64> {
397        // Only use `SparseScheme` if we can cascade.
398        if allowed_cascading == 0 {
399            return Ok(0.0);
400        }
401
402        if stats.value_count == 0 {
403            // All nulls should use ConstantScheme
404            return Ok(0.0);
405        }
406
407        // If the majority is null, will compress well.
408        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
409            return Ok(stats.src.len() as f64 / stats.value_count as f64);
410        }
411
412        // Otherwise we don't go this route
413        Ok(0.0)
414    }
415
416    fn compress(
417        &self,
418        stats: &Self::StatsType,
419        is_sample: bool,
420        allowed_cascading: usize,
421        _excludes: &[Self::CodeType],
422    ) -> VortexResult<ArrayRef> {
423        assert!(allowed_cascading > 0);
424
425        // We pass None as we only run this pathway for NULL-dominated float arrays
426        let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?;
427
428        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
429            // Compress the values
430            let new_excludes = vec![integer::SparseScheme.code()];
431
432            // Don't attempt to compress the non-null values
433
434            let indices = sparse.patches().indices().to_primitive().narrow()?;
435            let compressed_indices = IntCompressor::compress_no_dict(
436                &indices,
437                is_sample,
438                allowed_cascading - 1,
439                &new_excludes,
440            )?;
441
442            SparseArray::try_new(
443                compressed_indices,
444                sparse.patches().values().clone(),
445                sparse.len(),
446                sparse.fill_scalar().clone(),
447            )
448            .map(|a| a.into_array())
449        } else {
450            Ok(sparse_encoded)
451        }
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use std::iter;
458
459    use vortex_array::arrays::PrimitiveArray;
460    use vortex_array::builders::{ArrayBuilder, PrimitiveBuilder};
461    use vortex_array::validity::Validity;
462    use vortex_array::{Array, IntoArray, ToCanonical, assert_arrays_eq};
463    use vortex_buffer::{Buffer, buffer_mut};
464    use vortex_dtype::Nullability;
465    use vortex_sparse::SparseEncoding;
466
467    use crate::float::{FloatCompressor, RLE_FLOAT_SCHEME};
468    use crate::{Compressor, CompressorStats, MAX_CASCADE, Scheme};
469
470    #[test]
471    fn test_empty() {
472        // Make sure empty array compression does not fail
473        let result = FloatCompressor::compress(
474            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
475            false,
476            3,
477            &[],
478        )
479        .unwrap();
480
481        assert!(result.is_empty());
482    }
483
484    #[test]
485    fn test_compress() {
486        let mut values = buffer_mut![1.0f32; 1024];
487        // Sprinkle some other values in.
488        for i in 0..1024 {
489            // Insert 2.0 at all odd positions.
490            // This should force dictionary encoding and exclude run-end due to the
491            // average run length being 1.
492            values[i] = (i % 50) as f32;
493        }
494
495        let floats = values.into_array().to_primitive();
496        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
497        println!("compressed: {}", compressed.display_tree())
498    }
499
500    #[test]
501    fn test_rle_compression() {
502        let mut values = Vec::new();
503        values.extend(iter::repeat_n(1.5f32, 100));
504        values.extend(iter::repeat_n(2.7f32, 200));
505        values.extend(iter::repeat_n(3.15f32, 150));
506
507        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
508        let stats = crate::float::FloatStats::generate(&array);
509        let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[]).unwrap();
510
511        let decoded = compressed;
512        let expected = Buffer::copy_from(&values).into_array();
513        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
514    }
515
516    #[test]
517    fn test_sparse_compression() {
518        let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
519        array.append_value(f32::NAN);
520        array.append_value(-f32::NAN);
521        array.append_value(f32::INFINITY);
522        array.append_value(-f32::INFINITY);
523        array.append_value(0.0f32);
524        array.append_value(-0.0f32);
525        array.append_nulls(90);
526
527        let floats = array.finish_into_primitive();
528
529        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
530
531        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
532    }
533}