Skip to main content

vortex_btrblocks/schemes/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Float compression schemes.
5
6use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::IntoArray;
16use vortex_array::LEGACY_SESSION;
17use vortex_array::ToCanonical;
18use vortex_array::VortexSessionExecute;
19use vortex_array::arrays::Patched;
20use vortex_array::arrays::patched::use_experimental_patches;
21use vortex_array::arrays::primitive::PrimitiveArrayExt;
22use vortex_array::dtype::PType;
23use vortex_compressor::estimate::CompressionEstimate;
24use vortex_compressor::estimate::DeferredEstimate;
25use vortex_compressor::estimate::EstimateVerdict;
26use vortex_compressor::scheme::ChildSelection;
27use vortex_compressor::scheme::DescendantExclusion;
28use vortex_error::VortexResult;
29use vortex_error::vortex_panic;
30use vortex_sparse::Sparse;
31
32use super::integer::SparseScheme as IntSparseScheme;
33use crate::ArrayAndStats;
34use crate::CascadingCompressor;
35use crate::CompressorContext;
36use crate::Scheme;
37use crate::SchemeExt;
38use crate::compress_patches;
39use crate::schemes::rle_ancestor_exclusions;
40use crate::schemes::rle_descendant_exclusions;
41
42/// ALP (Adaptive Lossless floating-Point) encoding.
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub struct ALPScheme;
45
46/// ALPRD (ALP with Real Double) encoding variant.
47#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct ALPRDScheme;
49
50/// Sparse encoding for null-dominated float arrays.
51///
52/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
53#[derive(Debug, Copy, Clone, PartialEq, Eq)]
54pub struct NullDominatedSparseScheme;
55
56/// Pco (pcodec) compression for floats.
57#[cfg(feature = "pco")]
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
59pub struct PcoScheme;
60
61// Re-export builtin schemes from vortex-compressor.
62pub use vortex_compressor::builtins::FloatConstantScheme;
63pub use vortex_compressor::builtins::FloatDictScheme;
64pub use vortex_compressor::builtins::is_float_primitive;
65pub use vortex_compressor::stats::FloatStats;
66
67/// RLE scheme for float arrays.
68#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct FloatRLEScheme;
70
71impl Scheme for ALPScheme {
72    fn scheme_name(&self) -> &'static str {
73        "vortex.float.alp"
74    }
75
76    fn matches(&self, canonical: &Canonical) -> bool {
77        is_float_primitive(canonical)
78    }
79
80    /// Children: encoded_ints=0.
81    fn num_children(&self) -> usize {
82        1
83    }
84
85    fn expected_compression_ratio(
86        &self,
87        data: &mut ArrayAndStats,
88        ctx: CompressorContext,
89    ) -> CompressionEstimate {
90        // ALP encodes floats as integers. Without integer compression afterward, the encoded ints
91        // are the same size.
92        if ctx.finished_cascading() {
93            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
94        }
95
96        // We don't support ALP for f16.
97        if data.array_as_primitive().ptype() == PType::F16 {
98            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
99        }
100
101        CompressionEstimate::Deferred(DeferredEstimate::Sample)
102    }
103
104    fn compress(
105        &self,
106        compressor: &CascadingCompressor,
107        data: &mut ArrayAndStats,
108        ctx: CompressorContext,
109    ) -> VortexResult<ArrayRef> {
110        let alp_encoded = alp_encode(
111            data.array_as_primitive(),
112            None,
113            &mut LEGACY_SESSION.create_execution_ctx(),
114        )?;
115
116        // Compress the ALP ints.
117        let compressed_alp_ints =
118            compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?;
119
120        let alp_stats = alp_encoded.as_array().statistics().to_owned();
121        let exponents = alp_encoded.exponents();
122
123        if use_experimental_patches() {
124            let patches = alp_encoded.patches();
125
126            // Create ALP array without interior patches.
127            let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
128
129            match patches {
130                None => Ok(alp_array),
131                Some(p) => Ok(Patched::from_array_and_patches(
132                    alp_array,
133                    &p,
134                    &mut LEGACY_SESSION.create_execution_ctx(),
135                )?
136                .with_stats_set(alp_stats)
137                .into_array()),
138            }
139        } else {
140            let patches = alp_encoded.patches().map(compress_patches).transpose()?;
141
142            Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
143        }
144    }
145}
146
147impl Scheme for ALPRDScheme {
148    fn scheme_name(&self) -> &'static str {
149        "vortex.float.alprd"
150    }
151
152    fn matches(&self, canonical: &Canonical) -> bool {
153        is_float_primitive(canonical)
154    }
155
156    fn expected_compression_ratio(
157        &self,
158        data: &mut ArrayAndStats,
159        _ctx: CompressorContext,
160    ) -> CompressionEstimate {
161        // We don't support ALPRD for f16.
162        if data.array_as_primitive().ptype() == PType::F16 {
163            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
164        }
165
166        CompressionEstimate::Deferred(DeferredEstimate::Sample)
167    }
168
169    fn compress(
170        &self,
171        _compressor: &CascadingCompressor,
172        data: &mut ArrayAndStats,
173        _ctx: CompressorContext,
174    ) -> VortexResult<ArrayRef> {
175        let primitive_array = data.array_as_primitive();
176
177        let encoder = match primitive_array.ptype() {
178            PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
179            PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
180            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
181        };
182
183        let alp_rd = encoder.encode(primitive_array);
184        let dtype = alp_rd.dtype().clone();
185        let right_bit_width = alp_rd.right_bit_width();
186        let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
187        parts.left_parts_patches = parts.left_parts_patches.map(compress_patches).transpose()?;
188
189        Ok(vortex_alp::ALPRD::try_new(
190            dtype,
191            parts.left_parts,
192            parts.left_parts_dictionary,
193            parts.right_parts,
194            right_bit_width,
195            parts.left_parts_patches,
196        )?
197        .into_array())
198    }
199}
200
201impl Scheme for NullDominatedSparseScheme {
202    fn scheme_name(&self) -> &'static str {
203        "vortex.float.sparse"
204    }
205
206    fn matches(&self, canonical: &Canonical) -> bool {
207        is_float_primitive(canonical)
208    }
209
210    /// Children: indices=0.
211    fn num_children(&self) -> usize {
212        1
213    }
214
215    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
216    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
217        vec![DescendantExclusion {
218            excluded: IntSparseScheme.id(),
219            children: ChildSelection::All,
220        }]
221    }
222
223    fn expected_compression_ratio(
224        &self,
225        data: &mut ArrayAndStats,
226        _ctx: CompressorContext,
227    ) -> CompressionEstimate {
228        let len = data.array_len() as f64;
229        let stats = data.float_stats();
230        let value_count = stats.value_count();
231
232        // All-null arrays should be compressed as constant instead anyways.
233        if value_count == 0 {
234            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
235        }
236
237        // If the majority (90%) of values is null, this will compress well.
238        if stats.null_count() as f64 / len > 0.9 {
239            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
240        }
241
242        // Otherwise we don't go this route.
243        CompressionEstimate::Verdict(EstimateVerdict::Skip)
244    }
245
246    fn compress(
247        &self,
248        compressor: &CascadingCompressor,
249        data: &mut ArrayAndStats,
250        ctx: CompressorContext,
251    ) -> VortexResult<ArrayRef> {
252        // We pass None as we only run this pathway for NULL-dominated float arrays.
253        let sparse_encoded = Sparse::encode(data.array(), None)?;
254
255        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
256            let indices = sparse.patches().indices().to_primitive().narrow()?;
257            let compressed_indices =
258                compressor.compress_child(&indices.into_array(), &ctx, self.id(), 0)?;
259
260            Sparse::try_new(
261                compressed_indices,
262                sparse.patches().values().clone(),
263                sparse.len(),
264                sparse.fill_scalar().clone(),
265            )
266            .map(|a| a.into_array())
267        } else {
268            Ok(sparse_encoded)
269        }
270    }
271}
272
273#[cfg(feature = "pco")]
274impl Scheme for PcoScheme {
275    fn scheme_name(&self) -> &'static str {
276        "vortex.float.pco"
277    }
278
279    fn matches(&self, canonical: &Canonical) -> bool {
280        is_float_primitive(canonical)
281    }
282
283    fn expected_compression_ratio(
284        &self,
285        _data: &mut ArrayAndStats,
286        _ctx: CompressorContext,
287    ) -> CompressionEstimate {
288        CompressionEstimate::Deferred(DeferredEstimate::Sample)
289    }
290
291    fn compress(
292        &self,
293        _compressor: &CascadingCompressor,
294        data: &mut ArrayAndStats,
295        _ctx: CompressorContext,
296    ) -> VortexResult<ArrayRef> {
297        Ok(vortex_pco::Pco::from_primitive(
298            data.array_as_primitive(),
299            pco::DEFAULT_COMPRESSION_LEVEL,
300            8192,
301        )?
302        .into_array())
303    }
304}
305
306impl Scheme for FloatRLEScheme {
307    fn scheme_name(&self) -> &'static str {
308        "vortex.float.rle"
309    }
310
311    fn matches(&self, canonical: &Canonical) -> bool {
312        is_float_primitive(canonical)
313    }
314
315    /// Children: values=0, indices=1, offsets=2.
316    fn num_children(&self) -> usize {
317        3
318    }
319
320    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
321        rle_descendant_exclusions()
322    }
323
324    fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
325        rle_ancestor_exclusions()
326    }
327
328    fn expected_compression_ratio(
329        &self,
330        data: &mut ArrayAndStats,
331        ctx: CompressorContext,
332    ) -> CompressionEstimate {
333        // RLE is only useful when we cascade it with another encoding.
334        if ctx.finished_cascading() {
335            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
336        }
337
338        if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
339            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
340        }
341
342        CompressionEstimate::Deferred(DeferredEstimate::Sample)
343    }
344
345    fn compress(
346        &self,
347        compressor: &CascadingCompressor,
348        data: &mut ArrayAndStats,
349        ctx: CompressorContext,
350    ) -> VortexResult<ArrayRef> {
351        super::integer::rle_compress(self, compressor, data, ctx)
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use std::iter;
358
359    use vortex_array::IntoArray;
360    use vortex_array::arrays::PrimitiveArray;
361    use vortex_array::assert_arrays_eq;
362    use vortex_array::builders::ArrayBuilder;
363    use vortex_array::builders::PrimitiveBuilder;
364    use vortex_array::display::DisplayOptions;
365    use vortex_array::dtype::Nullability;
366    use vortex_array::validity::Validity;
367    use vortex_buffer::Buffer;
368    use vortex_buffer::buffer_mut;
369    use vortex_compressor::CascadingCompressor;
370    use vortex_error::VortexResult;
371    use vortex_fastlanes::RLE;
372
373    use crate::BtrBlocksCompressor;
374    use crate::schemes::float::FloatRLEScheme;
375
376    #[test]
377    fn test_empty() -> VortexResult<()> {
378        let btr = BtrBlocksCompressor::default();
379        let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
380        let result = btr.compress(&array)?;
381
382        assert!(result.is_empty());
383        Ok(())
384    }
385
386    #[test]
387    fn test_compress() -> VortexResult<()> {
388        let mut values = buffer_mut![1.0f32; 1024];
389        for i in 0..1024 {
390            values[i] = (i % 50) as f32;
391        }
392
393        let array = values.into_array();
394        let btr = BtrBlocksCompressor::default();
395        let compressed = btr.compress(&array)?;
396        assert_eq!(compressed.len(), 1024);
397
398        let display = compressed
399            .display_as(DisplayOptions::MetadataOnly)
400            .to_string()
401            .to_lowercase();
402        assert_eq!(display, "vortex.dict(f32, len=1024)");
403
404        Ok(())
405    }
406
407    #[test]
408    fn test_rle_compression() -> VortexResult<()> {
409        let mut values = Vec::new();
410        values.extend(iter::repeat_n(1.5f32, 100));
411        values.extend(iter::repeat_n(2.7f32, 200));
412        values.extend(iter::repeat_n(3.15f32, 150));
413
414        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
415
416        let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
417        let compressed = compressor.compress(&array.into_array())?;
418        assert!(compressed.is::<RLE>());
419
420        let expected = Buffer::copy_from(&values).into_array();
421        assert_arrays_eq!(compressed, expected);
422        Ok(())
423    }
424
425    #[test]
426    fn test_sparse_compression() -> VortexResult<()> {
427        let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
428        array.append_value(f32::NAN);
429        array.append_value(-f32::NAN);
430        array.append_value(f32::INFINITY);
431        array.append_value(-f32::INFINITY);
432        array.append_value(0.0f32);
433        array.append_value(-0.0f32);
434        array.append_nulls(90);
435
436        let array = array.finish_into_primitive().into_array();
437        let btr = BtrBlocksCompressor::default();
438        let compressed = btr.compress(&array)?;
439        assert_eq!(compressed.len(), 96);
440
441        let display = compressed
442            .display_as(DisplayOptions::MetadataOnly)
443            .to_string()
444            .to_lowercase();
445        assert_eq!(display, "vortex.sparse(f32?, len=96)");
446
447        Ok(())
448    }
449}
450
451/// Tests to verify that each float compression scheme produces the expected encoding.
452#[cfg(test)]
453mod scheme_selection_tests {
454    use vortex_alp::ALP;
455    use vortex_array::IntoArray;
456    use vortex_array::arrays::Constant;
457    use vortex_array::arrays::Dict;
458    use vortex_array::arrays::PrimitiveArray;
459    use vortex_array::builders::ArrayBuilder;
460    use vortex_array::builders::PrimitiveBuilder;
461    use vortex_array::dtype::Nullability;
462    use vortex_array::validity::Validity;
463    use vortex_buffer::Buffer;
464    use vortex_error::VortexResult;
465
466    use crate::BtrBlocksCompressor;
467
468    #[test]
469    fn test_constant_compressed() -> VortexResult<()> {
470        let values: Vec<f64> = vec![42.5; 100];
471        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
472        let btr = BtrBlocksCompressor::default();
473        let compressed = btr.compress(&array.into_array())?;
474        assert!(compressed.is::<Constant>());
475        Ok(())
476    }
477
478    #[test]
479    fn test_alp_compressed() -> VortexResult<()> {
480        let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
481        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
482        let btr = BtrBlocksCompressor::default();
483        let compressed = btr.compress(&array.into_array())?;
484        assert!(compressed.is::<ALP>());
485        Ok(())
486    }
487
488    #[test]
489    fn test_dict_compressed() -> VortexResult<()> {
490        let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
491        let values: Vec<f64> = (0..1000)
492            .map(|i| distinct_values[i % distinct_values.len()])
493            .collect();
494        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
495        let btr = BtrBlocksCompressor::default();
496        let compressed = btr.compress(&array.into_array())?;
497        assert!(compressed.is::<ALP>());
498        assert!(compressed.children()[0].is::<Dict>());
499        Ok(())
500    }
501
502    #[test]
503    fn test_null_dominated_compressed() -> VortexResult<()> {
504        let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
505        for i in 0..5 {
506            builder.append_value(i as f64);
507        }
508        builder.append_nulls(95);
509        let array = builder.finish_into_primitive();
510        let btr = BtrBlocksCompressor::default();
511        let compressed = btr.compress(&array.into_array())?;
512        // Verify the compressed array preserves values.
513        assert_eq!(compressed.len(), 100);
514        Ok(())
515    }
516}