vortex_btrblocks/
float.rs

1mod dictionary;
2mod stats;
3
4use vortex_alp::{ALPArray, RDEncoder, alp_encode};
5use vortex_array::arrays::{ConstantArray, PrimitiveArray};
6use vortex_array::variants::PrimitiveArrayTrait;
7use vortex_array::{Array, ArrayRef, ArrayStatistics, ToCanonical};
8use vortex_dict::DictArray;
9use vortex_dtype::PType;
10use vortex_error::{VortexExpect, VortexResult, vortex_panic};
11use vortex_runend::RunEndArray;
12use vortex_runend::compress::runend_encode;
13
14use self::stats::FloatStats;
15use crate::float::dictionary::dictionary_encode;
16use crate::integer::{IntCompressor, IntegerStats};
17use crate::patches::compress_patches;
18use crate::{
19    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20    estimate_compression_ratio_with_sampling, integer,
21};
22
23/// Threshold for the average run length in an array before we consider run-end encoding.
24const RUN_END_THRESHOLD: u32 = 3;
25
26pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
27
28impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
29
30pub struct FloatCompressor;
31
32impl Compressor for FloatCompressor {
33    type ArrayType = PrimitiveArray;
34    type SchemeType = dyn FloatScheme;
35    type StatsType = FloatStats;
36
37    fn schemes() -> &'static [&'static Self::SchemeType] {
38        &[
39            &UncompressedScheme,
40            &ConstantScheme,
41            &ALPScheme,
42            &ALPRDScheme,
43            &DictScheme,
44        ]
45    }
46
47    fn default_scheme() -> &'static Self::SchemeType {
48        &UncompressedScheme
49    }
50
51    fn dict_scheme_code() -> FloatCode {
52        DICT_SCHEME
53    }
54}
55
56const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
57const CONSTANT_SCHEME: FloatCode = FloatCode(1);
58const ALP_SCHEME: FloatCode = FloatCode(2);
59const ALPRD_SCHEME: FloatCode = FloatCode(3);
60const DICT_SCHEME: FloatCode = FloatCode(4);
61const RUNEND_SCHEME: FloatCode = FloatCode(5);
62
63#[derive(Debug, Copy, Clone)]
64struct UncompressedScheme;
65
66#[derive(Debug, Copy, Clone)]
67struct ConstantScheme;
68
69#[derive(Debug, Copy, Clone)]
70struct ALPScheme;
71
72#[derive(Debug, Copy, Clone)]
73struct ALPRDScheme;
74
75#[derive(Debug, Copy, Clone)]
76struct DictScheme;
77
78#[derive(Debug, Copy, Clone)]
79struct RunEndScheme;
80
81impl Scheme for UncompressedScheme {
82    type StatsType = FloatStats;
83    type CodeType = FloatCode;
84
85    fn code(&self) -> FloatCode {
86        UNCOMPRESSED_SCHEME
87    }
88
89    fn expected_compression_ratio(
90        &self,
91        _stats: &Self::StatsType,
92        _is_sample: bool,
93        _allowed_cascading: usize,
94        _excludes: &[FloatCode],
95    ) -> VortexResult<f64> {
96        Ok(1.0)
97    }
98
99    fn compress(
100        &self,
101        stats: &Self::StatsType,
102        _is_sample: bool,
103        _allowed_cascading: usize,
104        _excludes: &[FloatCode],
105    ) -> VortexResult<ArrayRef> {
106        Ok(stats.source().to_array())
107    }
108}
109
110impl Scheme for ConstantScheme {
111    type StatsType = FloatStats;
112    type CodeType = FloatCode;
113
114    fn code(&self) -> FloatCode {
115        CONSTANT_SCHEME
116    }
117
118    fn expected_compression_ratio(
119        &self,
120        stats: &Self::StatsType,
121        is_sample: bool,
122        _allowed_cascading: usize,
123        _excludes: &[FloatCode],
124    ) -> VortexResult<f64> {
125        // Never select Constant when sampling
126        if is_sample {
127            return Ok(0.0);
128        }
129
130        // Can only have 1 distinct value
131        if stats.distinct_values_count > 1 {
132            return Ok(0.0);
133        }
134
135        // Cannot have mix of nulls and non-nulls
136        if stats.null_count > 0 && stats.value_count > 0 {
137            return Ok(0.0);
138        }
139
140        Ok(stats.value_count as f64)
141    }
142
143    fn compress(
144        &self,
145        stats: &Self::StatsType,
146        _is_sample: bool,
147        _allowed_cascading: usize,
148        _excludes: &[FloatCode],
149    ) -> VortexResult<ArrayRef> {
150        let scalar = stats
151            .source()
152            .as_constant()
153            .vortex_expect("must be constant");
154
155        Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
156    }
157}
158
159#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
160pub struct FloatCode(u8);
161
162impl Scheme for ALPScheme {
163    type StatsType = FloatStats;
164    type CodeType = FloatCode;
165
166    fn code(&self) -> FloatCode {
167        ALP_SCHEME
168    }
169
170    fn expected_compression_ratio(
171        &self,
172        stats: &Self::StatsType,
173        is_sample: bool,
174        allowed_cascading: usize,
175        excludes: &[FloatCode],
176    ) -> VortexResult<f64> {
177        // We don't support ALP for f16
178        if stats.source().ptype() == PType::F16 {
179            return Ok(0.0);
180        }
181
182        if allowed_cascading == 0 {
183            // ALP does not compress on its own, we need to be able to cascade it with
184            // an integer compressor.
185            return Ok(0.0);
186        }
187
188        estimate_compression_ratio_with_sampling(
189            self,
190            stats,
191            is_sample,
192            allowed_cascading,
193            excludes,
194        )
195    }
196
197    fn compress(
198        &self,
199        stats: &FloatStats,
200        is_sample: bool,
201        allowed_cascading: usize,
202        excludes: &[FloatCode],
203    ) -> VortexResult<ArrayRef> {
204        let alp = alp_encode(stats.source())?;
205        let alp_ints = alp.encoded().to_primitive()?;
206
207        // Compress the ALP ints.
208        // Patches are not compressed. They should be infrequent, and if they are not then we want
209        // to keep them linear for easy indexing.
210        let mut int_excludes = Vec::new();
211        if excludes.contains(&DICT_SCHEME) {
212            int_excludes.push(integer::DictScheme.code());
213        }
214        if excludes.contains(&RUNEND_SCHEME) {
215            int_excludes.push(integer::RunEndScheme.code());
216        }
217
218        let compressed_alp_ints =
219            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
220
221        let patches = alp.patches().map(compress_patches).transpose()?;
222
223        Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
224    }
225}
226
227impl Scheme for ALPRDScheme {
228    type StatsType = FloatStats;
229    type CodeType = FloatCode;
230
231    fn code(&self) -> FloatCode {
232        ALPRD_SCHEME
233    }
234
235    fn expected_compression_ratio(
236        &self,
237        stats: &Self::StatsType,
238        is_sample: bool,
239        allowed_cascading: usize,
240        excludes: &[FloatCode],
241    ) -> VortexResult<f64> {
242        if stats.source().ptype() == PType::F16 {
243            return Ok(0.0);
244        }
245
246        estimate_compression_ratio_with_sampling(
247            self,
248            stats,
249            is_sample,
250            allowed_cascading,
251            excludes,
252        )
253    }
254
255    fn compress(
256        &self,
257        stats: &Self::StatsType,
258        _is_sample: bool,
259        _allowed_cascading: usize,
260        _excludes: &[FloatCode],
261    ) -> VortexResult<ArrayRef> {
262        let encoder = match stats.source().ptype() {
263            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
264            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
265            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
266        };
267
268        let mut alp_rd = encoder.encode(stats.source());
269
270        let patches = alp_rd
271            .left_parts_patches()
272            .map(compress_patches)
273            .transpose()?;
274        alp_rd.replace_left_parts_patches(patches);
275
276        Ok(alp_rd.into_array())
277    }
278}
279
280impl Scheme for DictScheme {
281    type StatsType = FloatStats;
282    type CodeType = FloatCode;
283
284    fn code(&self) -> FloatCode {
285        DICT_SCHEME
286    }
287
288    fn expected_compression_ratio(
289        &self,
290        stats: &Self::StatsType,
291        is_sample: bool,
292        allowed_cascading: usize,
293        excludes: &[FloatCode],
294    ) -> VortexResult<f64> {
295        if stats.value_count == 0 {
296            return Ok(0.0);
297        }
298
299        // If the array is high cardinality (>50% unique values) skip.
300        if stats.distinct_values_count > stats.value_count / 2 {
301            return Ok(0.0);
302        }
303
304        // Take a sample and run compression on the sample to determine before/after size.
305        estimate_compression_ratio_with_sampling(
306            self,
307            stats,
308            is_sample,
309            allowed_cascading,
310            excludes,
311        )
312    }
313
314    fn compress(
315        &self,
316        stats: &Self::StatsType,
317        is_sample: bool,
318        allowed_cascading: usize,
319        _excludes: &[FloatCode],
320    ) -> VortexResult<ArrayRef> {
321        let dict_array = dictionary_encode(stats)?;
322
323        // Only compress the codes.
324        let codes_stats = IntegerStats::generate_opts(
325            &dict_array.codes().to_primitive()?,
326            GenerateStatsOptions {
327                count_distinct_values: false,
328            },
329        );
330        let codes_scheme = IntCompressor::choose_scheme(
331            &codes_stats,
332            is_sample,
333            allowed_cascading - 1,
334            &[integer::DictScheme.code()],
335        )?;
336        let compressed_codes = codes_scheme.compress(
337            &codes_stats,
338            is_sample,
339            allowed_cascading - 1,
340            &[integer::DictScheme.code()],
341        )?;
342
343        let compressed_values = FloatCompressor::compress(
344            &dict_array.values().to_primitive()?,
345            is_sample,
346            allowed_cascading - 1,
347            &[DICT_SCHEME],
348        )?;
349
350        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
351    }
352}
353
354impl Scheme for RunEndScheme {
355    type StatsType = FloatStats;
356    type CodeType = FloatCode;
357
358    fn code(&self) -> FloatCode {
359        RUNEND_SCHEME
360    }
361
362    fn expected_compression_ratio(
363        &self,
364        stats: &Self::StatsType,
365        is_sample: bool,
366        allowed_cascading: usize,
367        excludes: &[FloatCode],
368    ) -> VortexResult<f64> {
369        if stats.average_run_length < RUN_END_THRESHOLD {
370            return Ok(0.0);
371        }
372
373        estimate_compression_ratio_with_sampling(
374            self,
375            stats,
376            is_sample,
377            allowed_cascading,
378            excludes,
379        )
380    }
381
382    fn compress(
383        &self,
384        stats: &FloatStats,
385        is_sample: bool,
386        allowed_cascading: usize,
387        _excludes: &[FloatCode],
388    ) -> VortexResult<ArrayRef> {
389        let (ends, values) = runend_encode(stats.source())?;
390        // Integer compress the ends, leave the values uncompressed.
391        let compressed_ends = IntCompressor::compress(
392            &ends,
393            is_sample,
394            allowed_cascading - 1,
395            &[
396                integer::RunEndScheme.code(),
397                integer::DictScheme.code(),
398                integer::SparseScheme.code(),
399            ],
400        )?;
401
402        Ok(RunEndArray::try_new(compressed_ends, values)?.into_array())
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use vortex_array::arrays::PrimitiveArray;
409    use vortex_array::validity::Validity;
410    use vortex_array::{Array, IntoArray, ToCanonical};
411    use vortex_buffer::{Buffer, buffer_mut};
412
413    use crate::float::FloatCompressor;
414    use crate::{Compressor, MAX_CASCADE};
415
416    #[test]
417    fn test_empty() {
418        // Make sure empty array compression does not fail
419        let result = FloatCompressor::compress(
420            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
421            false,
422            3,
423            &[],
424        )
425        .unwrap();
426
427        assert!(result.is_empty());
428    }
429
430    #[test]
431    fn test_compress() {
432        let mut values = buffer_mut![1.0f32; 1024];
433        // Sprinkle some other values in.
434        for i in 0..1024 {
435            // Insert 2.0 at all odd positions.
436            // This should force dictionary encoding and exclude run-end due to the
437            // average run length being 1.
438            values[i] = (i % 50) as f32;
439        }
440
441        let floats = values.into_array().to_primitive().unwrap();
442        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
443        println!("compressed: {}", compressed.tree_display())
444    }
445}