vortex_btrblocks/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_dict::DictArray;
11use vortex_dtype::PType;
12use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13
14use self::stats::FloatStats;
15use crate::float::dictionary::dictionary_encode;
16use crate::integer::{IntCompressor, IntegerStats};
17use crate::patches::compress_patches;
18use crate::{
19    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20    estimate_compression_ratio_with_sampling, integer,
21};
22
23pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
24
25impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26
27pub struct FloatCompressor;
28
29impl Compressor for FloatCompressor {
30    type ArrayVTable = PrimitiveVTable;
31    type SchemeType = dyn FloatScheme;
32    type StatsType = FloatStats;
33
34    fn schemes() -> &'static [&'static Self::SchemeType] {
35        &[
36            &UncompressedScheme,
37            &ConstantScheme,
38            &ALPScheme,
39            &ALPRDScheme,
40            &DictScheme,
41        ]
42    }
43
44    fn default_scheme() -> &'static Self::SchemeType {
45        &UncompressedScheme
46    }
47
48    fn dict_scheme_code() -> FloatCode {
49        DICT_SCHEME
50    }
51}
52
53const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
54const CONSTANT_SCHEME: FloatCode = FloatCode(1);
55const ALP_SCHEME: FloatCode = FloatCode(2);
56const ALPRD_SCHEME: FloatCode = FloatCode(3);
57const DICT_SCHEME: FloatCode = FloatCode(4);
58const RUNEND_SCHEME: FloatCode = FloatCode(5);
59
60#[derive(Debug, Copy, Clone)]
61struct UncompressedScheme;
62
63#[derive(Debug, Copy, Clone)]
64struct ConstantScheme;
65
66#[derive(Debug, Copy, Clone)]
67struct ALPScheme;
68
69#[derive(Debug, Copy, Clone)]
70struct ALPRDScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct DictScheme;
74
75impl Scheme for UncompressedScheme {
76    type StatsType = FloatStats;
77    type CodeType = FloatCode;
78
79    fn code(&self) -> FloatCode {
80        UNCOMPRESSED_SCHEME
81    }
82
83    fn expected_compression_ratio(
84        &self,
85        _stats: &Self::StatsType,
86        _is_sample: bool,
87        _allowed_cascading: usize,
88        _excludes: &[FloatCode],
89    ) -> VortexResult<f64> {
90        Ok(1.0)
91    }
92
93    fn compress(
94        &self,
95        stats: &Self::StatsType,
96        _is_sample: bool,
97        _allowed_cascading: usize,
98        _excludes: &[FloatCode],
99    ) -> VortexResult<ArrayRef> {
100        Ok(stats.source().to_array())
101    }
102}
103
104impl Scheme for ConstantScheme {
105    type StatsType = FloatStats;
106    type CodeType = FloatCode;
107
108    fn code(&self) -> FloatCode {
109        CONSTANT_SCHEME
110    }
111
112    fn expected_compression_ratio(
113        &self,
114        stats: &Self::StatsType,
115        is_sample: bool,
116        _allowed_cascading: usize,
117        _excludes: &[FloatCode],
118    ) -> VortexResult<f64> {
119        // Never select Constant when sampling
120        if is_sample {
121            return Ok(0.0);
122        }
123
124        // Can only have 1 distinct value
125        if stats.distinct_values_count > 1 {
126            return Ok(0.0);
127        }
128
129        // Cannot have mix of nulls and non-nulls
130        if stats.null_count > 0 && stats.value_count > 0 {
131            return Ok(0.0);
132        }
133
134        Ok(stats.value_count as f64)
135    }
136
137    fn compress(
138        &self,
139        stats: &Self::StatsType,
140        _is_sample: bool,
141        _allowed_cascading: usize,
142        _excludes: &[FloatCode],
143    ) -> VortexResult<ArrayRef> {
144        let scalar = stats
145            .source()
146            .as_constant()
147            .vortex_expect("must be constant");
148
149        Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
150    }
151}
152
153#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
154pub struct FloatCode(u8);
155
156impl Scheme for ALPScheme {
157    type StatsType = FloatStats;
158    type CodeType = FloatCode;
159
160    fn code(&self) -> FloatCode {
161        ALP_SCHEME
162    }
163
164    fn expected_compression_ratio(
165        &self,
166        stats: &Self::StatsType,
167        is_sample: bool,
168        allowed_cascading: usize,
169        excludes: &[FloatCode],
170    ) -> VortexResult<f64> {
171        // We don't support ALP for f16
172        if stats.source().ptype() == PType::F16 {
173            return Ok(0.0);
174        }
175
176        if allowed_cascading == 0 {
177            // ALP does not compress on its own, we need to be able to cascade it with
178            // an integer compressor.
179            return Ok(0.0);
180        }
181
182        estimate_compression_ratio_with_sampling(
183            self,
184            stats,
185            is_sample,
186            allowed_cascading,
187            excludes,
188        )
189    }
190
191    fn compress(
192        &self,
193        stats: &FloatStats,
194        is_sample: bool,
195        allowed_cascading: usize,
196        excludes: &[FloatCode],
197    ) -> VortexResult<ArrayRef> {
198        let alp_encoded = ALPEncoding
199            .encode(&stats.source().to_canonical(), None)?
200            .vortex_expect("Input is a supported floating point array");
201        let alp = alp_encoded.as_::<ALPVTable>();
202        let alp_ints = alp.encoded().to_primitive();
203
204        // Compress the ALP ints.
205        // Patches are not compressed. They should be infrequent, and if they are not then we want
206        // to keep them linear for easy indexing.
207        let mut int_excludes = Vec::new();
208        if excludes.contains(&DICT_SCHEME) {
209            int_excludes.push(integer::DictScheme.code());
210        }
211        if excludes.contains(&RUNEND_SCHEME) {
212            int_excludes.push(integer::RunEndScheme.code());
213        }
214
215        let compressed_alp_ints =
216            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
217
218        let patches = alp.patches().map(compress_patches).transpose()?;
219
220        Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
221    }
222}
223
224impl Scheme for ALPRDScheme {
225    type StatsType = FloatStats;
226    type CodeType = FloatCode;
227
228    fn code(&self) -> FloatCode {
229        ALPRD_SCHEME
230    }
231
232    fn expected_compression_ratio(
233        &self,
234        stats: &Self::StatsType,
235        is_sample: bool,
236        allowed_cascading: usize,
237        excludes: &[FloatCode],
238    ) -> VortexResult<f64> {
239        if stats.source().ptype() == PType::F16 {
240            return Ok(0.0);
241        }
242
243        estimate_compression_ratio_with_sampling(
244            self,
245            stats,
246            is_sample,
247            allowed_cascading,
248            excludes,
249        )
250    }
251
252    fn compress(
253        &self,
254        stats: &Self::StatsType,
255        _is_sample: bool,
256        _allowed_cascading: usize,
257        _excludes: &[FloatCode],
258    ) -> VortexResult<ArrayRef> {
259        let encoder = match stats.source().ptype() {
260            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
261            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
262            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
263        };
264
265        let mut alp_rd = encoder.encode(stats.source());
266
267        let patches = alp_rd
268            .left_parts_patches()
269            .map(compress_patches)
270            .transpose()?;
271        alp_rd.replace_left_parts_patches(patches);
272
273        Ok(alp_rd.into_array())
274    }
275}
276
277impl Scheme for DictScheme {
278    type StatsType = FloatStats;
279    type CodeType = FloatCode;
280
281    fn code(&self) -> FloatCode {
282        DICT_SCHEME
283    }
284
285    fn expected_compression_ratio(
286        &self,
287        stats: &Self::StatsType,
288        is_sample: bool,
289        allowed_cascading: usize,
290        excludes: &[FloatCode],
291    ) -> VortexResult<f64> {
292        if stats.value_count == 0 {
293            return Ok(0.0);
294        }
295
296        // If the array is high cardinality (>50% unique values) skip.
297        if stats.distinct_values_count > stats.value_count / 2 {
298            return Ok(0.0);
299        }
300
301        // Take a sample and run compression on the sample to determine before/after size.
302        estimate_compression_ratio_with_sampling(
303            self,
304            stats,
305            is_sample,
306            allowed_cascading,
307            excludes,
308        )
309    }
310
311    fn compress(
312        &self,
313        stats: &Self::StatsType,
314        is_sample: bool,
315        allowed_cascading: usize,
316        _excludes: &[FloatCode],
317    ) -> VortexResult<ArrayRef> {
318        let dict_array = dictionary_encode(stats);
319
320        // Only compress the codes.
321        let codes_stats = IntegerStats::generate_opts(
322            &dict_array.codes().to_primitive().downcast()?,
323            GenerateStatsOptions {
324                count_distinct_values: false,
325            },
326        );
327        let codes_scheme = IntCompressor::choose_scheme(
328            &codes_stats,
329            is_sample,
330            allowed_cascading - 1,
331            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
332        )?;
333        let compressed_codes = codes_scheme.compress(
334            &codes_stats,
335            is_sample,
336            allowed_cascading - 1,
337            &[integer::DictScheme.code()],
338        )?;
339
340        let compressed_values = FloatCompressor::compress(
341            &dict_array.values().to_primitive(),
342            is_sample,
343            allowed_cascading - 1,
344            &[DICT_SCHEME],
345        )?;
346
347        // SAFETY: compressing codes or values does not alter the invariants
348        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use vortex_array::arrays::PrimitiveArray;
355    use vortex_array::validity::Validity;
356    use vortex_array::{Array, IntoArray, ToCanonical};
357    use vortex_buffer::{Buffer, buffer_mut};
358
359    use crate::float::FloatCompressor;
360    use crate::{Compressor, MAX_CASCADE};
361
362    #[test]
363    fn test_empty() {
364        // Make sure empty array compression does not fail
365        let result = FloatCompressor::compress(
366            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
367            false,
368            3,
369            &[],
370        )
371        .unwrap();
372
373        assert!(result.is_empty());
374    }
375
376    #[test]
377    fn test_compress() {
378        let mut values = buffer_mut![1.0f32; 1024];
379        // Sprinkle some other values in.
380        for i in 0..1024 {
381            // Insert 2.0 at all odd positions.
382            // This should force dictionary encoding and exclude run-end due to the
383            // average run length being 1.
384            values[i] = (i % 50) as f32;
385        }
386
387        let floats = values.into_array().to_primitive();
388        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
389        println!("compressed: {}", compressed.display_tree())
390    }
391}