vortex_btrblocks/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub(crate) mod dictionary;
5mod stats;
6
7use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_dict::DictArray;
11use vortex_dtype::PType;
12use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13
14pub use self::stats::FloatStats;
15use crate::float::dictionary::dictionary_encode;
16use crate::integer::{IntCompressor, IntegerStats};
17use crate::patches::compress_patches;
18use crate::{
19    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20    estimate_compression_ratio_with_sampling, integer,
21};
22
23pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
24
25impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26
27/// [`Compressor`] for floating-point numbers.
28pub struct FloatCompressor;
29
30impl Compressor for FloatCompressor {
31    type ArrayVTable = PrimitiveVTable;
32    type SchemeType = dyn FloatScheme;
33    type StatsType = FloatStats;
34
35    fn schemes() -> &'static [&'static Self::SchemeType] {
36        &[
37            &UncompressedScheme,
38            &ConstantScheme,
39            &ALPScheme,
40            &ALPRDScheme,
41            &DictScheme,
42        ]
43    }
44
45    fn default_scheme() -> &'static Self::SchemeType {
46        &UncompressedScheme
47    }
48
49    fn dict_scheme_code() -> FloatCode {
50        DICT_SCHEME
51    }
52}
53
54const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
55const CONSTANT_SCHEME: FloatCode = FloatCode(1);
56const ALP_SCHEME: FloatCode = FloatCode(2);
57const ALPRD_SCHEME: FloatCode = FloatCode(3);
58const DICT_SCHEME: FloatCode = FloatCode(4);
59const RUNEND_SCHEME: FloatCode = FloatCode(5);
60
61#[derive(Debug, Copy, Clone)]
62struct UncompressedScheme;
63
64#[derive(Debug, Copy, Clone)]
65struct ConstantScheme;
66
67#[derive(Debug, Copy, Clone)]
68struct ALPScheme;
69
70#[derive(Debug, Copy, Clone)]
71struct ALPRDScheme;
72
73#[derive(Debug, Copy, Clone)]
74struct DictScheme;
75
76impl Scheme for UncompressedScheme {
77    type StatsType = FloatStats;
78    type CodeType = FloatCode;
79
80    fn code(&self) -> FloatCode {
81        UNCOMPRESSED_SCHEME
82    }
83
84    fn expected_compression_ratio(
85        &self,
86        _stats: &Self::StatsType,
87        _is_sample: bool,
88        _allowed_cascading: usize,
89        _excludes: &[FloatCode],
90    ) -> VortexResult<f64> {
91        Ok(1.0)
92    }
93
94    fn compress(
95        &self,
96        stats: &Self::StatsType,
97        _is_sample: bool,
98        _allowed_cascading: usize,
99        _excludes: &[FloatCode],
100    ) -> VortexResult<ArrayRef> {
101        Ok(stats.source().to_array())
102    }
103}
104
105impl Scheme for ConstantScheme {
106    type StatsType = FloatStats;
107    type CodeType = FloatCode;
108
109    fn code(&self) -> FloatCode {
110        CONSTANT_SCHEME
111    }
112
113    fn expected_compression_ratio(
114        &self,
115        stats: &Self::StatsType,
116        is_sample: bool,
117        _allowed_cascading: usize,
118        _excludes: &[FloatCode],
119    ) -> VortexResult<f64> {
120        // Never select Constant when sampling
121        if is_sample {
122            return Ok(0.0);
123        }
124
125        // Can only have 1 distinct value
126        if stats.distinct_values_count > 1 {
127            return Ok(0.0);
128        }
129
130        // Cannot have mix of nulls and non-nulls
131        if stats.null_count > 0 && stats.value_count > 0 {
132            return Ok(0.0);
133        }
134
135        Ok(stats.value_count as f64)
136    }
137
138    fn compress(
139        &self,
140        stats: &Self::StatsType,
141        _is_sample: bool,
142        _allowed_cascading: usize,
143        _excludes: &[FloatCode],
144    ) -> VortexResult<ArrayRef> {
145        let scalar = stats
146            .source()
147            .as_constant()
148            .vortex_expect("must be constant");
149
150        Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
151    }
152}
153
154#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
155pub struct FloatCode(u8);
156
157impl Scheme for ALPScheme {
158    type StatsType = FloatStats;
159    type CodeType = FloatCode;
160
161    fn code(&self) -> FloatCode {
162        ALP_SCHEME
163    }
164
165    fn expected_compression_ratio(
166        &self,
167        stats: &Self::StatsType,
168        is_sample: bool,
169        allowed_cascading: usize,
170        excludes: &[FloatCode],
171    ) -> VortexResult<f64> {
172        // We don't support ALP for f16
173        if stats.source().ptype() == PType::F16 {
174            return Ok(0.0);
175        }
176
177        if allowed_cascading == 0 {
178            // ALP does not compress on its own, we need to be able to cascade it with
179            // an integer compressor.
180            return Ok(0.0);
181        }
182
183        estimate_compression_ratio_with_sampling(
184            self,
185            stats,
186            is_sample,
187            allowed_cascading,
188            excludes,
189        )
190    }
191
192    fn compress(
193        &self,
194        stats: &FloatStats,
195        is_sample: bool,
196        allowed_cascading: usize,
197        excludes: &[FloatCode],
198    ) -> VortexResult<ArrayRef> {
199        let alp_encoded = ALPEncoding
200            .encode(&stats.source().to_canonical(), None)?
201            .vortex_expect("Input is a supported floating point array");
202        let alp = alp_encoded.as_::<ALPVTable>();
203        let alp_ints = alp.encoded().to_primitive();
204
205        // Compress the ALP ints.
206        // Patches are not compressed. They should be infrequent, and if they are not then we want
207        // to keep them linear for easy indexing.
208        let mut int_excludes = Vec::new();
209        if excludes.contains(&DICT_SCHEME) {
210            int_excludes.push(integer::DictScheme.code());
211        }
212        if excludes.contains(&RUNEND_SCHEME) {
213            int_excludes.push(integer::RunEndScheme.code());
214        }
215
216        let compressed_alp_ints =
217            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
218
219        let patches = alp.patches().map(compress_patches).transpose()?;
220
221        Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array())
222    }
223}
224
225impl Scheme for ALPRDScheme {
226    type StatsType = FloatStats;
227    type CodeType = FloatCode;
228
229    fn code(&self) -> FloatCode {
230        ALPRD_SCHEME
231    }
232
233    fn expected_compression_ratio(
234        &self,
235        stats: &Self::StatsType,
236        is_sample: bool,
237        allowed_cascading: usize,
238        excludes: &[FloatCode],
239    ) -> VortexResult<f64> {
240        if stats.source().ptype() == PType::F16 {
241            return Ok(0.0);
242        }
243
244        estimate_compression_ratio_with_sampling(
245            self,
246            stats,
247            is_sample,
248            allowed_cascading,
249            excludes,
250        )
251    }
252
253    fn compress(
254        &self,
255        stats: &Self::StatsType,
256        _is_sample: bool,
257        _allowed_cascading: usize,
258        _excludes: &[FloatCode],
259    ) -> VortexResult<ArrayRef> {
260        let encoder = match stats.source().ptype() {
261            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
262            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
263            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
264        };
265
266        let mut alp_rd = encoder.encode(stats.source());
267
268        let patches = alp_rd
269            .left_parts_patches()
270            .map(compress_patches)
271            .transpose()?;
272        alp_rd.replace_left_parts_patches(patches);
273
274        Ok(alp_rd.into_array())
275    }
276}
277
278impl Scheme for DictScheme {
279    type StatsType = FloatStats;
280    type CodeType = FloatCode;
281
282    fn code(&self) -> FloatCode {
283        DICT_SCHEME
284    }
285
286    fn expected_compression_ratio(
287        &self,
288        stats: &Self::StatsType,
289        is_sample: bool,
290        allowed_cascading: usize,
291        excludes: &[FloatCode],
292    ) -> VortexResult<f64> {
293        if stats.value_count == 0 {
294            return Ok(0.0);
295        }
296
297        // If the array is high cardinality (>50% unique values) skip.
298        if stats.distinct_values_count > stats.value_count / 2 {
299            return Ok(0.0);
300        }
301
302        // Take a sample and run compression on the sample to determine before/after size.
303        estimate_compression_ratio_with_sampling(
304            self,
305            stats,
306            is_sample,
307            allowed_cascading,
308            excludes,
309        )
310    }
311
312    fn compress(
313        &self,
314        stats: &Self::StatsType,
315        is_sample: bool,
316        allowed_cascading: usize,
317        _excludes: &[FloatCode],
318    ) -> VortexResult<ArrayRef> {
319        let dict_array = dictionary_encode(stats);
320
321        // Only compress the codes.
322        let codes_stats = IntegerStats::generate_opts(
323            &dict_array.codes().to_primitive().downcast()?,
324            GenerateStatsOptions {
325                count_distinct_values: false,
326            },
327        );
328        let codes_scheme = IntCompressor::choose_scheme(
329            &codes_stats,
330            is_sample,
331            allowed_cascading - 1,
332            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
333        )?;
334        let compressed_codes = codes_scheme.compress(
335            &codes_stats,
336            is_sample,
337            allowed_cascading - 1,
338            &[integer::DictScheme.code()],
339        )?;
340
341        let compressed_values = FloatCompressor::compress(
342            &dict_array.values().to_primitive(),
343            is_sample,
344            allowed_cascading - 1,
345            &[DICT_SCHEME],
346        )?;
347
348        // SAFETY: compressing codes or values does not alter the invariants
349        unsafe { Ok(DictArray::new_unchecked(compressed_codes, compressed_values).into_array()) }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use vortex_array::arrays::PrimitiveArray;
356    use vortex_array::validity::Validity;
357    use vortex_array::{Array, IntoArray, ToCanonical};
358    use vortex_buffer::{Buffer, buffer_mut};
359
360    use crate::float::FloatCompressor;
361    use crate::{Compressor, MAX_CASCADE};
362
363    #[test]
364    fn test_empty() {
365        // Make sure empty array compression does not fail
366        let result = FloatCompressor::compress(
367            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
368            false,
369            3,
370            &[],
371        )
372        .unwrap();
373
374        assert!(result.is_empty());
375    }
376
377    #[test]
378    fn test_compress() {
379        let mut values = buffer_mut![1.0f32; 1024];
380        // Sprinkle some other values in.
381        for i in 0..1024 {
382            // Insert 2.0 at all odd positions.
383            // This should force dictionary encoding and exclude run-end due to the
384            // average run length being 1.
385            values[i] = (i % 50) as f32;
386        }
387
388        let floats = values.into_array().to_primitive();
389        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
390        println!("compressed: {}", compressed.display_tree())
391    }
392}