Skip to main content

vortex_btrblocks/schemes/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Float compression schemes.
5
6use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::IntoArray;
16use vortex_array::ToCanonical;
17use vortex_array::arrays::primitive::PrimitiveArrayExt;
18use vortex_array::dtype::PType;
19use vortex_compressor::estimate::CompressionEstimate;
20use vortex_compressor::scheme::ChildSelection;
21use vortex_compressor::scheme::DescendantExclusion;
22use vortex_error::VortexResult;
23use vortex_error::vortex_panic;
24use vortex_sparse::Sparse;
25
26use super::integer::SparseScheme as IntSparseScheme;
27use crate::ArrayAndStats;
28use crate::CascadingCompressor;
29use crate::CompressorContext;
30use crate::Scheme;
31use crate::SchemeExt;
32use crate::compress_patches;
33use crate::schemes::rle_ancestor_exclusions;
34use crate::schemes::rle_descendant_exclusions;
35
36/// ALP (Adaptive Lossless floating-Point) encoding.
37#[derive(Debug, Copy, Clone, PartialEq, Eq)]
38pub struct ALPScheme;
39
40/// ALPRD (ALP with Real Double) encoding variant.
41#[derive(Debug, Copy, Clone, PartialEq, Eq)]
42pub struct ALPRDScheme;
43
44/// Sparse encoding for null-dominated float arrays.
45///
46/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
47#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct NullDominatedSparseScheme;
49
50/// Pco (pcodec) compression for floats.
51#[cfg(feature = "pco")]
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub struct PcoScheme;
54
55// Re-export builtin schemes from vortex-compressor.
56pub use vortex_compressor::builtins::FloatConstantScheme;
57pub use vortex_compressor::builtins::FloatDictScheme;
58pub use vortex_compressor::builtins::is_float_primitive;
59pub use vortex_compressor::stats::FloatStats;
60
61/// RLE scheme for float arrays.
62#[derive(Debug, Copy, Clone, PartialEq, Eq)]
63pub struct FloatRLEScheme;
64
65impl Scheme for ALPScheme {
66    fn scheme_name(&self) -> &'static str {
67        "vortex.float.alp"
68    }
69
70    fn matches(&self, canonical: &Canonical) -> bool {
71        is_float_primitive(canonical)
72    }
73
74    /// Children: encoded_ints=0.
75    fn num_children(&self) -> usize {
76        1
77    }
78
79    fn expected_compression_ratio(
80        &self,
81        data: &mut ArrayAndStats,
82        ctx: CompressorContext,
83    ) -> CompressionEstimate {
84        // ALP encodes floats as integers. Without integer compression afterward, the encoded ints
85        // are the same size.
86        if ctx.finished_cascading() {
87            return CompressionEstimate::Skip;
88        }
89
90        // We don't support ALP for f16.
91        if data.array_as_primitive().ptype() == PType::F16 {
92            return CompressionEstimate::Skip;
93        }
94
95        CompressionEstimate::Sample
96    }
97
98    fn compress(
99        &self,
100        compressor: &CascadingCompressor,
101        data: &mut ArrayAndStats,
102        ctx: CompressorContext,
103    ) -> VortexResult<ArrayRef> {
104        let alp_encoded = alp_encode(&data.array_as_primitive(), None)?;
105
106        // Compress the ALP ints.
107        let compressed_alp_ints =
108            compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?;
109
110        // Patches are not compressed. They should be infrequent, and if they are not then we want
111        // to keep them linear for easy indexing.
112        let patches = alp_encoded.patches().map(compress_patches).transpose()?;
113
114        Ok(ALP::new(compressed_alp_ints, alp_encoded.exponents(), patches).into_array())
115    }
116}
117
118impl Scheme for ALPRDScheme {
119    fn scheme_name(&self) -> &'static str {
120        "vortex.float.alprd"
121    }
122
123    fn matches(&self, canonical: &Canonical) -> bool {
124        is_float_primitive(canonical)
125    }
126
127    fn expected_compression_ratio(
128        &self,
129        data: &mut ArrayAndStats,
130        _ctx: CompressorContext,
131    ) -> CompressionEstimate {
132        // We don't support ALPRD for f16.
133        if data.array_as_primitive().ptype() == PType::F16 {
134            return CompressionEstimate::Skip;
135        }
136
137        CompressionEstimate::Sample
138    }
139
140    fn compress(
141        &self,
142        _compressor: &CascadingCompressor,
143        data: &mut ArrayAndStats,
144        _ctx: CompressorContext,
145    ) -> VortexResult<ArrayRef> {
146        let primitive_array = data.array_as_primitive();
147
148        let encoder = match primitive_array.ptype() {
149            PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
150            PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
151            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
152        };
153
154        let alp_rd = encoder.encode(&primitive_array);
155        let dtype = alp_rd.dtype().clone();
156        let right_bit_width = alp_rd.right_bit_width();
157        let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
158        parts.left_parts_patches = parts.left_parts_patches.map(compress_patches).transpose()?;
159
160        Ok(vortex_alp::ALPRD::try_new(
161            dtype,
162            parts.left_parts,
163            parts.left_parts_dictionary,
164            parts.right_parts,
165            right_bit_width,
166            parts.left_parts_patches,
167        )?
168        .into_array())
169    }
170}
171
172impl Scheme for NullDominatedSparseScheme {
173    fn scheme_name(&self) -> &'static str {
174        "vortex.float.sparse"
175    }
176
177    fn matches(&self, canonical: &Canonical) -> bool {
178        is_float_primitive(canonical)
179    }
180
181    /// Children: indices=0.
182    fn num_children(&self) -> usize {
183        1
184    }
185
186    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
187    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
188        vec![DescendantExclusion {
189            excluded: IntSparseScheme.id(),
190            children: ChildSelection::All,
191        }]
192    }
193
194    fn expected_compression_ratio(
195        &self,
196        data: &mut ArrayAndStats,
197        _ctx: CompressorContext,
198    ) -> CompressionEstimate {
199        let len = data.array_len() as f64;
200        let stats = data.float_stats();
201        let value_count = stats.value_count();
202
203        // All-null arrays should be compressed as constant instead anyways.
204        if value_count == 0 {
205            return CompressionEstimate::Skip;
206        }
207
208        // If the majority (90%) of values is null, this will compress well.
209        if stats.null_count() as f64 / len > 0.9 {
210            return CompressionEstimate::Ratio(len / value_count as f64);
211        }
212
213        // Otherwise we don't go this route.
214        CompressionEstimate::Skip
215    }
216
217    fn compress(
218        &self,
219        compressor: &CascadingCompressor,
220        data: &mut ArrayAndStats,
221        ctx: CompressorContext,
222    ) -> VortexResult<ArrayRef> {
223        // We pass None as we only run this pathway for NULL-dominated float arrays.
224        let sparse_encoded = Sparse::encode(data.array(), None)?;
225
226        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
227            let indices = sparse.patches().indices().to_primitive().narrow()?;
228            let compressed_indices =
229                compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
230
231            Sparse::try_new(
232                compressed_indices,
233                sparse.patches().values().clone(),
234                sparse.len(),
235                sparse.fill_scalar().clone(),
236            )
237            .map(|a| a.into_array())
238        } else {
239            Ok(sparse_encoded)
240        }
241    }
242}
243
244#[cfg(feature = "pco")]
245impl Scheme for PcoScheme {
246    fn scheme_name(&self) -> &'static str {
247        "vortex.float.pco"
248    }
249
250    fn matches(&self, canonical: &Canonical) -> bool {
251        is_float_primitive(canonical)
252    }
253
254    fn expected_compression_ratio(
255        &self,
256        _data: &mut ArrayAndStats,
257        _ctx: CompressorContext,
258    ) -> CompressionEstimate {
259        CompressionEstimate::Sample
260    }
261
262    fn compress(
263        &self,
264        _compressor: &CascadingCompressor,
265        data: &mut ArrayAndStats,
266        _ctx: CompressorContext,
267    ) -> VortexResult<ArrayRef> {
268        Ok(vortex_pco::Pco::from_primitive(
269            &data.array_as_primitive(),
270            pco::DEFAULT_COMPRESSION_LEVEL,
271            8192,
272        )?
273        .into_array())
274    }
275}
276
277impl Scheme for FloatRLEScheme {
278    fn scheme_name(&self) -> &'static str {
279        "vortex.float.rle"
280    }
281
282    fn matches(&self, canonical: &Canonical) -> bool {
283        is_float_primitive(canonical)
284    }
285
286    /// Children: values=0, indices=1, offsets=2.
287    fn num_children(&self) -> usize {
288        3
289    }
290
291    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
292        rle_descendant_exclusions()
293    }
294
295    fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
296        rle_ancestor_exclusions()
297    }
298
299    fn expected_compression_ratio(
300        &self,
301        data: &mut ArrayAndStats,
302        ctx: CompressorContext,
303    ) -> CompressionEstimate {
304        // RLE is only useful when we cascade it with another encoding.
305        if ctx.finished_cascading() {
306            return CompressionEstimate::Skip;
307        }
308
309        if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
310            return CompressionEstimate::Skip;
311        }
312
313        CompressionEstimate::Sample
314    }
315
316    fn compress(
317        &self,
318        compressor: &CascadingCompressor,
319        data: &mut ArrayAndStats,
320        ctx: CompressorContext,
321    ) -> VortexResult<ArrayRef> {
322        super::integer::rle_compress(self, compressor, data, ctx)
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::iter;
329
330    use vortex_array::IntoArray;
331    use vortex_array::arrays::PrimitiveArray;
332    use vortex_array::assert_arrays_eq;
333    use vortex_array::builders::ArrayBuilder;
334    use vortex_array::builders::PrimitiveBuilder;
335    use vortex_array::display::DisplayOptions;
336    use vortex_array::dtype::Nullability;
337    use vortex_array::validity::Validity;
338    use vortex_buffer::Buffer;
339    use vortex_buffer::buffer_mut;
340    use vortex_compressor::CascadingCompressor;
341    use vortex_error::VortexResult;
342    use vortex_fastlanes::RLE;
343
344    use crate::BtrBlocksCompressor;
345    use crate::schemes::float::FloatRLEScheme;
346
347    #[test]
348    fn test_empty() -> VortexResult<()> {
349        let btr = BtrBlocksCompressor::default();
350        let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
351        let result = btr.compress(&array)?;
352
353        assert!(result.is_empty());
354        Ok(())
355    }
356
357    #[test]
358    fn test_compress() -> VortexResult<()> {
359        let mut values = buffer_mut![1.0f32; 1024];
360        for i in 0..1024 {
361            values[i] = (i % 50) as f32;
362        }
363
364        let array = values.into_array();
365        let btr = BtrBlocksCompressor::default();
366        let compressed = btr.compress(&array)?;
367        assert_eq!(compressed.len(), 1024);
368
369        let display = compressed
370            .display_as(DisplayOptions::MetadataOnly)
371            .to_string()
372            .to_lowercase();
373        assert_eq!(display, "vortex.dict(f32, len=1024)");
374
375        Ok(())
376    }
377
378    #[test]
379    fn test_rle_compression() -> VortexResult<()> {
380        let mut values = Vec::new();
381        values.extend(iter::repeat_n(1.5f32, 100));
382        values.extend(iter::repeat_n(2.7f32, 200));
383        values.extend(iter::repeat_n(3.15f32, 150));
384
385        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
386
387        let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
388        let compressed = compressor.compress(&array.into_array())?;
389        assert!(compressed.is::<RLE>());
390
391        let expected = Buffer::copy_from(&values).into_array();
392        assert_arrays_eq!(compressed, expected);
393        Ok(())
394    }
395
396    #[test]
397    fn test_sparse_compression() -> VortexResult<()> {
398        let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
399        array.append_value(f32::NAN);
400        array.append_value(-f32::NAN);
401        array.append_value(f32::INFINITY);
402        array.append_value(-f32::INFINITY);
403        array.append_value(0.0f32);
404        array.append_value(-0.0f32);
405        array.append_nulls(90);
406
407        let array = array.finish_into_primitive().into_array();
408        let btr = BtrBlocksCompressor::default();
409        let compressed = btr.compress(&array)?;
410        assert_eq!(compressed.len(), 96);
411
412        let display = compressed
413            .display_as(DisplayOptions::MetadataOnly)
414            .to_string()
415            .to_lowercase();
416        assert_eq!(display, "vortex.sparse(f32?, len=96)");
417
418        Ok(())
419    }
420}
421
422/// Tests to verify that each float compression scheme produces the expected encoding.
423#[cfg(test)]
424mod scheme_selection_tests {
425    use vortex_alp::ALP;
426    use vortex_array::IntoArray;
427    use vortex_array::arrays::Constant;
428    use vortex_array::arrays::Dict;
429    use vortex_array::arrays::PrimitiveArray;
430    use vortex_array::builders::ArrayBuilder;
431    use vortex_array::builders::PrimitiveBuilder;
432    use vortex_array::dtype::Nullability;
433    use vortex_array::validity::Validity;
434    use vortex_buffer::Buffer;
435    use vortex_error::VortexResult;
436
437    use crate::BtrBlocksCompressor;
438
439    #[test]
440    fn test_constant_compressed() -> VortexResult<()> {
441        let values: Vec<f64> = vec![42.5; 100];
442        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
443        let btr = BtrBlocksCompressor::default();
444        let compressed = btr.compress(&array.into_array())?;
445        assert!(compressed.is::<Constant>());
446        Ok(())
447    }
448
449    #[test]
450    fn test_alp_compressed() -> VortexResult<()> {
451        let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
452        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
453        let btr = BtrBlocksCompressor::default();
454        let compressed = btr.compress(&array.into_array())?;
455        assert!(compressed.is::<ALP>());
456        Ok(())
457    }
458
459    #[test]
460    fn test_dict_compressed() -> VortexResult<()> {
461        let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
462        let values: Vec<f64> = (0..1000)
463            .map(|i| distinct_values[i % distinct_values.len()])
464            .collect();
465        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
466        let btr = BtrBlocksCompressor::default();
467        let compressed = btr.compress(&array.into_array())?;
468        assert!(compressed.is::<ALP>());
469        assert!(compressed.children()[0].is::<Dict>());
470        Ok(())
471    }
472
473    #[test]
474    fn test_null_dominated_compressed() -> VortexResult<()> {
475        let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
476        for i in 0..5 {
477            builder.append_value(i as f64);
478        }
479        builder.append_nulls(95);
480        let array = builder.finish_into_primitive();
481        let btr = BtrBlocksCompressor::default();
482        let compressed = btr.compress(&array.into_array())?;
483        // Verify the compressed array preserves values.
484        assert_eq!(compressed.len(), 100);
485        Ok(())
486    }
487}