vortex_btrblocks/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, MaskedArray, PrimitiveVTable};
9use vortex_array::vtable::ValidityHelper;
10use vortex_array::{ArrayRef, IntoArray, ToCanonical};
11use vortex_dict::DictArray;
12use vortex_dtype::PType;
13use vortex_error::{VortexExpect, VortexResult, vortex_panic};
14use vortex_scalar::Scalar;
15
16pub use self::stats::FloatStats;
17use crate::float::dictionary::dictionary_encode;
18use crate::integer::{IntCompressor, IntegerStats};
19use crate::patches::compress_patches;
20use crate::rle::RLEScheme;
21use crate::{
22    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
23    estimate_compression_ratio_with_sampling, integer,
24};
25
26pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
27
28impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
29
30/// [`Compressor`] for floating-point numbers.
31pub struct FloatCompressor;
32
33impl Compressor for FloatCompressor {
34    type ArrayVTable = PrimitiveVTable;
35    type SchemeType = dyn FloatScheme;
36    type StatsType = FloatStats;
37
38    fn schemes() -> &'static [&'static Self::SchemeType] {
39        &[
40            &UncompressedScheme,
41            &ConstantScheme,
42            &ALPScheme,
43            &ALPRDScheme,
44            &DictScheme,
45            &RLE_FLOAT_SCHEME,
46        ]
47    }
48
49    fn default_scheme() -> &'static Self::SchemeType {
50        &UncompressedScheme
51    }
52
53    fn dict_scheme_code() -> FloatCode {
54        DICT_SCHEME
55    }
56}
57
58const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
59const CONSTANT_SCHEME: FloatCode = FloatCode(1);
60const ALP_SCHEME: FloatCode = FloatCode(2);
61const ALPRD_SCHEME: FloatCode = FloatCode(3);
62const DICT_SCHEME: FloatCode = FloatCode(4);
63const RUN_END_SCHEME: FloatCode = FloatCode(5);
64const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6);
65
66#[derive(Debug, Copy, Clone)]
67struct UncompressedScheme;
68
69#[derive(Debug, Copy, Clone)]
70struct ConstantScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct ALPScheme;
74
75#[derive(Debug, Copy, Clone)]
76struct ALPRDScheme;
77
78#[derive(Debug, Copy, Clone)]
79struct DictScheme;
80
81pub const RLE_FLOAT_SCHEME: RLEScheme<FloatStats, FloatCode> = RLEScheme::new(
82    RUN_LENGTH_SCHEME,
83    |values, is_sample, allowed_cascading, excludes| {
84        FloatCompressor::compress(values, is_sample, allowed_cascading, excludes)
85    },
86);
87
88impl Scheme for UncompressedScheme {
89    type StatsType = FloatStats;
90    type CodeType = FloatCode;
91
92    fn code(&self) -> FloatCode {
93        UNCOMPRESSED_SCHEME
94    }
95
96    fn expected_compression_ratio(
97        &self,
98        _stats: &Self::StatsType,
99        _is_sample: bool,
100        _allowed_cascading: usize,
101        _excludes: &[FloatCode],
102    ) -> VortexResult<f64> {
103        Ok(1.0)
104    }
105
106    fn compress(
107        &self,
108        stats: &Self::StatsType,
109        _is_sample: bool,
110        _allowed_cascading: usize,
111        _excludes: &[FloatCode],
112    ) -> VortexResult<ArrayRef> {
113        Ok(stats.source().to_array())
114    }
115}
116
117impl Scheme for ConstantScheme {
118    type StatsType = FloatStats;
119    type CodeType = FloatCode;
120
121    fn code(&self) -> FloatCode {
122        CONSTANT_SCHEME
123    }
124
125    fn expected_compression_ratio(
126        &self,
127        stats: &Self::StatsType,
128        is_sample: bool,
129        _allowed_cascading: usize,
130        _excludes: &[FloatCode],
131    ) -> VortexResult<f64> {
132        // Never select Constant when sampling
133        if is_sample {
134            return Ok(0.0);
135        }
136
137        if stats.null_count as usize == stats.src.len() || stats.value_count == 0 {
138            return Ok(0.0);
139        }
140
141        // Can only have 1 distinct value
142        if stats.distinct_values_count != 1 {
143            return Ok(0.0);
144        }
145
146        Ok(stats.value_count as f64)
147    }
148
149    fn compress(
150        &self,
151        stats: &Self::StatsType,
152        _is_sample: bool,
153        _allowed_cascading: usize,
154        _excludes: &[FloatCode],
155    ) -> VortexResult<ArrayRef> {
156        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
157
158        match scalar_idx {
159            Some(idx) => {
160                let scalar = stats.source().scalar_at(idx);
161                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
162                if !stats.source().all_valid() {
163                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
164                } else {
165                    Ok(const_arr)
166                }
167            }
168            None => Ok(ConstantArray::new(
169                Scalar::null(stats.src.dtype().clone()),
170                stats.src.len(),
171            )
172            .into_array()),
173        }
174    }
175}
176
177#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
178pub struct FloatCode(u8);
179
180impl Scheme for ALPScheme {
181    type StatsType = FloatStats;
182    type CodeType = FloatCode;
183
184    fn code(&self) -> FloatCode {
185        ALP_SCHEME
186    }
187
188    fn expected_compression_ratio(
189        &self,
190        stats: &Self::StatsType,
191        is_sample: bool,
192        allowed_cascading: usize,
193        excludes: &[FloatCode],
194    ) -> VortexResult<f64> {
195        // We don't support ALP for f16
196        if stats.source().ptype() == PType::F16 {
197            return Ok(0.0);
198        }
199
200        if allowed_cascading == 0 {
201            // ALP does not compress on its own, we need to be able to cascade it with
202            // an integer compressor.
203            return Ok(0.0);
204        }
205
206        estimate_compression_ratio_with_sampling(
207            self,
208            stats,
209            is_sample,
210            allowed_cascading,
211            excludes,
212        )
213    }
214
215    fn compress(
216        &self,
217        stats: &FloatStats,
218        is_sample: bool,
219        allowed_cascading: usize,
220        excludes: &[FloatCode],
221    ) -> VortexResult<ArrayRef> {
222        let alp_encoded = ALPEncoding
223            .encode(&stats.source().to_canonical(), None)?
224            .vortex_expect("Input is a supported floating point array");
225        let alp = alp_encoded.as_::<ALPVTable>();
226        let alp_ints = alp.encoded().to_primitive();
227
228        // Compress the ALP ints.
229        // Patches are not compressed. They should be infrequent, and if they are not then we want
230        // to keep them linear for easy indexing.
231        let mut int_excludes = Vec::new();
232        if excludes.contains(&DICT_SCHEME) {
233            int_excludes.push(integer::DictScheme.code());
234        }
235        if excludes.contains(&RUN_END_SCHEME) {
236            int_excludes.push(integer::RunEndScheme.code());
237        }
238
239        let compressed_alp_ints =
240            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
241
242        let patches = alp.patches().map(compress_patches).transpose()?;
243
244        Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
245    }
246}
247
248impl Scheme for ALPRDScheme {
249    type StatsType = FloatStats;
250    type CodeType = FloatCode;
251
252    fn code(&self) -> FloatCode {
253        ALPRD_SCHEME
254    }
255
256    fn expected_compression_ratio(
257        &self,
258        stats: &Self::StatsType,
259        is_sample: bool,
260        allowed_cascading: usize,
261        excludes: &[FloatCode],
262    ) -> VortexResult<f64> {
263        if stats.source().ptype() == PType::F16 {
264            return Ok(0.0);
265        }
266
267        estimate_compression_ratio_with_sampling(
268            self,
269            stats,
270            is_sample,
271            allowed_cascading,
272            excludes,
273        )
274    }
275
276    fn compress(
277        &self,
278        stats: &Self::StatsType,
279        _is_sample: bool,
280        _allowed_cascading: usize,
281        _excludes: &[FloatCode],
282    ) -> VortexResult<ArrayRef> {
283        let encoder = match stats.source().ptype() {
284            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
285            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
286            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
287        };
288
289        let mut alp_rd = encoder.encode(stats.source());
290
291        let patches = alp_rd
292            .left_parts_patches()
293            .map(compress_patches)
294            .transpose()?;
295        alp_rd.replace_left_parts_patches(patches);
296
297        Ok(alp_rd.into_array())
298    }
299}
300
301impl Scheme for DictScheme {
302    type StatsType = FloatStats;
303    type CodeType = FloatCode;
304
305    fn code(&self) -> FloatCode {
306        DICT_SCHEME
307    }
308
309    fn expected_compression_ratio(
310        &self,
311        stats: &Self::StatsType,
312        is_sample: bool,
313        allowed_cascading: usize,
314        excludes: &[FloatCode],
315    ) -> VortexResult<f64> {
316        if stats.value_count == 0 {
317            return Ok(0.0);
318        }
319
320        // If the array is high cardinality (>50% unique values) skip.
321        if stats.distinct_values_count > stats.value_count / 2 {
322            return Ok(0.0);
323        }
324
325        // Take a sample and run compression on the sample to determine before/after size.
326        estimate_compression_ratio_with_sampling(
327            self,
328            stats,
329            is_sample,
330            allowed_cascading,
331            excludes,
332        )
333    }
334
335    fn compress(
336        &self,
337        stats: &Self::StatsType,
338        is_sample: bool,
339        allowed_cascading: usize,
340        _excludes: &[FloatCode],
341    ) -> VortexResult<ArrayRef> {
342        let dict_array = dictionary_encode(stats);
343
344        // Only compress the codes.
345        let codes_stats = IntegerStats::generate_opts(
346            &dict_array.codes().to_primitive().narrow()?,
347            GenerateStatsOptions {
348                count_distinct_values: false,
349            },
350        );
351        let codes_scheme = IntCompressor::choose_scheme(
352            &codes_stats,
353            is_sample,
354            allowed_cascading - 1,
355            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
356        )?;
357        let compressed_codes = codes_scheme.compress(
358            &codes_stats,
359            is_sample,
360            allowed_cascading - 1,
361            &[integer::DictScheme.code()],
362        )?;
363
364        let compressed_values = FloatCompressor::compress(
365            &dict_array.values().to_primitive(),
366            is_sample,
367            allowed_cascading - 1,
368            &[DICT_SCHEME],
369        )?;
370
371        // SAFETY: compressing codes or values does not alter the invariants
372        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use std::iter;
379
380    use vortex_array::arrays::PrimitiveArray;
381    use vortex_array::validity::Validity;
382    use vortex_array::{Array, IntoArray, ToCanonical};
383    use vortex_buffer::{Buffer, buffer_mut};
384
385    use crate::float::{FloatCompressor, RLE_FLOAT_SCHEME};
386    use crate::{Compressor, CompressorStats, MAX_CASCADE, Scheme};
387
388    #[test]
389    fn test_empty() {
390        // Make sure empty array compression does not fail
391        let result = FloatCompressor::compress(
392            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
393            false,
394            3,
395            &[],
396        )
397        .unwrap();
398
399        assert!(result.is_empty());
400    }
401
402    #[test]
403    fn test_compress() {
404        let mut values = buffer_mut![1.0f32; 1024];
405        // Sprinkle some other values in.
406        for i in 0..1024 {
407            // Insert 2.0 at all odd positions.
408            // This should force dictionary encoding and exclude run-end due to the
409            // average run length being 1.
410            values[i] = (i % 50) as f32;
411        }
412
413        let floats = values.into_array().to_primitive();
414        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
415        println!("compressed: {}", compressed.display_tree())
416    }
417
418    #[test]
419    fn test_rle_compression() {
420        let mut values = Vec::new();
421        values.extend(iter::repeat_n(1.5f32, 100));
422        values.extend(iter::repeat_n(2.7f32, 200));
423        values.extend(iter::repeat_n(3.15f32, 150));
424
425        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
426        let stats = crate::float::FloatStats::generate(&array);
427        let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[]).unwrap();
428
429        let decoded = compressed.to_primitive();
430        assert_eq!(decoded.as_slice::<f32>(), values.as_slice());
431    }
432}