Skip to main content

vortex_btrblocks/schemes/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Float compression schemes.
5
6use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::ExecutionCtx;
16use vortex_array::IntoArray;
17use vortex_array::arrays::Patched;
18use vortex_array::arrays::PrimitiveArray;
19use vortex_array::arrays::patched::use_experimental_patches;
20use vortex_array::arrays::primitive::PrimitiveArrayExt;
21use vortex_array::dtype::PType;
22use vortex_compressor::estimate::CompressionEstimate;
23use vortex_compressor::estimate::DeferredEstimate;
24use vortex_compressor::estimate::EstimateVerdict;
25use vortex_compressor::scheme::ChildSelection;
26use vortex_compressor::scheme::DescendantExclusion;
27use vortex_error::VortexResult;
28use vortex_error::vortex_panic;
29use vortex_sparse::Sparse;
30use vortex_sparse::SparseExt as _;
31
32use super::integer::SparseScheme as IntSparseScheme;
33use crate::ArrayAndStats;
34use crate::CascadingCompressor;
35use crate::CompressorContext;
36use crate::Scheme;
37use crate::SchemeExt;
38use crate::compress_patches;
39use crate::schemes::rle_ancestor_exclusions;
40use crate::schemes::rle_descendant_exclusions;
41
42/// ALP (Adaptive Lossless floating-Point) encoding.
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub struct ALPScheme;
45
46/// ALPRD (ALP with Real Double) encoding variant.
47#[derive(Debug, Copy, Clone, PartialEq, Eq)]
48pub struct ALPRDScheme;
49
50/// Sparse encoding for null-dominated float arrays.
51///
52/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
53#[derive(Debug, Copy, Clone, PartialEq, Eq)]
54pub struct NullDominatedSparseScheme;
55
56/// Pco (pcodec) compression for floats.
57#[cfg(feature = "pco")]
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
59pub struct PcoScheme;
60
61// Re-export builtin schemes from vortex-compressor.
62pub use vortex_compressor::builtins::FloatConstantScheme;
63pub use vortex_compressor::builtins::FloatDictScheme;
64pub use vortex_compressor::builtins::is_float_primitive;
65pub use vortex_compressor::stats::FloatStats;
66
67/// RLE scheme for float arrays.
68#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct FloatRLEScheme;
70
71impl Scheme for ALPScheme {
72    fn scheme_name(&self) -> &'static str {
73        "vortex.float.alp"
74    }
75
76    fn matches(&self, canonical: &Canonical) -> bool {
77        is_float_primitive(canonical)
78    }
79
80    /// Children: encoded_ints=0.
81    fn num_children(&self) -> usize {
82        1
83    }
84
85    fn expected_compression_ratio(
86        &self,
87        data: &ArrayAndStats,
88        compress_ctx: CompressorContext,
89        _exec_ctx: &mut ExecutionCtx,
90    ) -> CompressionEstimate {
91        // ALP encodes floats as integers. Without integer compression afterward, the encoded ints
92        // are the same size.
93        if compress_ctx.finished_cascading() {
94            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
95        }
96
97        // We don't support ALP for f16.
98        if data.array_as_primitive().ptype() == PType::F16 {
99            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
100        }
101
102        CompressionEstimate::Deferred(DeferredEstimate::Sample)
103    }
104
105    fn compress(
106        &self,
107        compressor: &CascadingCompressor,
108        data: &ArrayAndStats,
109        compress_ctx: CompressorContext,
110        exec_ctx: &mut ExecutionCtx,
111    ) -> VortexResult<ArrayRef> {
112        let alp_encoded = alp_encode(data.array_as_primitive(), None, exec_ctx)?;
113
114        // Compress the ALP ints.
115        let compressed_alp_ints = compressor.compress_child(
116            alp_encoded.encoded(),
117            &compress_ctx,
118            self.id(),
119            0,
120            exec_ctx,
121        )?;
122
123        let alp_stats = alp_encoded.as_array().statistics().to_owned();
124        let exponents = alp_encoded.exponents();
125
126        if use_experimental_patches() {
127            let patches = alp_encoded.patches();
128
129            // Create ALP array without interior patches.
130            let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
131
132            match patches {
133                None => Ok(alp_array),
134                Some(p) => Ok(Patched::from_array_and_patches(alp_array, &p, exec_ctx)?
135                    .with_stats_set(alp_stats)
136                    .into_array()),
137            }
138        } else {
139            let patches = alp_encoded
140                .patches()
141                .map(|p| compress_patches(p, exec_ctx))
142                .transpose()?;
143
144            Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
145        }
146    }
147}
148
149impl Scheme for ALPRDScheme {
150    fn scheme_name(&self) -> &'static str {
151        "vortex.float.alprd"
152    }
153
154    fn matches(&self, canonical: &Canonical) -> bool {
155        is_float_primitive(canonical)
156    }
157
158    fn expected_compression_ratio(
159        &self,
160        data: &ArrayAndStats,
161        _compress_ctx: CompressorContext,
162        _exec_ctx: &mut ExecutionCtx,
163    ) -> CompressionEstimate {
164        // We don't support ALPRD for f16.
165        if data.array_as_primitive().ptype() == PType::F16 {
166            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
167        }
168
169        CompressionEstimate::Deferred(DeferredEstimate::Sample)
170    }
171
172    fn compress(
173        &self,
174        _compressor: &CascadingCompressor,
175        data: &ArrayAndStats,
176        _compress_ctx: CompressorContext,
177        exec_ctx: &mut ExecutionCtx,
178    ) -> VortexResult<ArrayRef> {
179        let primitive_array = data.array_as_primitive();
180
181        let encoder = match primitive_array.ptype() {
182            PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
183            PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
184            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
185        };
186
187        let alp_rd = encoder.encode(primitive_array, exec_ctx);
188        let dtype = alp_rd.dtype().clone();
189        let right_bit_width = alp_rd.right_bit_width();
190        let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
191        parts.left_parts_patches = parts
192            .left_parts_patches
193            .map(|p| compress_patches(p, exec_ctx))
194            .transpose()?;
195
196        Ok(vortex_alp::ALPRD::try_new(
197            dtype,
198            parts.left_parts,
199            parts.left_parts_dictionary,
200            parts.right_parts,
201            right_bit_width,
202            parts.left_parts_patches,
203            exec_ctx,
204        )?
205        .into_array())
206    }
207}
208
209impl Scheme for NullDominatedSparseScheme {
210    fn scheme_name(&self) -> &'static str {
211        "vortex.float.sparse"
212    }
213
214    fn matches(&self, canonical: &Canonical) -> bool {
215        is_float_primitive(canonical)
216    }
217
218    /// Children: indices=0.
219    fn num_children(&self) -> usize {
220        1
221    }
222
223    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
224    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
225        vec![DescendantExclusion {
226            excluded: IntSparseScheme.id(),
227            children: ChildSelection::All,
228        }]
229    }
230
231    fn expected_compression_ratio(
232        &self,
233        data: &ArrayAndStats,
234        _compress_ctx: CompressorContext,
235        exec_ctx: &mut ExecutionCtx,
236    ) -> CompressionEstimate {
237        let len = data.array_len() as f64;
238        let stats = data.float_stats(exec_ctx);
239        let value_count = stats.value_count();
240
241        // All-null arrays should be compressed as constant instead anyways.
242        if value_count == 0 {
243            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
244        }
245
246        // If the majority (90%) of values is null, this will compress well.
247        if stats.null_count() as f64 / len > 0.9 {
248            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
249        }
250
251        // Otherwise we don't go this route.
252        CompressionEstimate::Verdict(EstimateVerdict::Skip)
253    }
254
255    fn compress(
256        &self,
257        compressor: &CascadingCompressor,
258        data: &ArrayAndStats,
259        compress_ctx: CompressorContext,
260        exec_ctx: &mut ExecutionCtx,
261    ) -> VortexResult<ArrayRef> {
262        // We pass None as we only run this pathway for NULL-dominated float arrays.
263        let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
264
265        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
266            let indices = sparse
267                .patches()
268                .indices()
269                .clone()
270                .execute::<PrimitiveArray>(exec_ctx)?
271                .narrow()?;
272            let compressed_indices = compressor.compress_child(
273                &indices.into_array(),
274                &compress_ctx,
275                self.id(),
276                0,
277                exec_ctx,
278            )?;
279
280            Sparse::try_new(
281                compressed_indices,
282                sparse.patches().values().clone(),
283                sparse.len(),
284                sparse.fill_scalar().clone(),
285            )
286            .map(|a| a.into_array())
287        } else {
288            Ok(sparse_encoded)
289        }
290    }
291}
292
293#[cfg(feature = "pco")]
294impl Scheme for PcoScheme {
295    fn scheme_name(&self) -> &'static str {
296        "vortex.float.pco"
297    }
298
299    fn matches(&self, canonical: &Canonical) -> bool {
300        is_float_primitive(canonical)
301    }
302
303    fn expected_compression_ratio(
304        &self,
305        _data: &ArrayAndStats,
306        _compress_ctx: CompressorContext,
307        _exec_ctx: &mut ExecutionCtx,
308    ) -> CompressionEstimate {
309        CompressionEstimate::Deferred(DeferredEstimate::Sample)
310    }
311
312    fn compress(
313        &self,
314        _compressor: &CascadingCompressor,
315        data: &ArrayAndStats,
316        _compress_ctx: CompressorContext,
317        exec_ctx: &mut ExecutionCtx,
318    ) -> VortexResult<ArrayRef> {
319        Ok(vortex_pco::Pco::from_primitive(
320            data.array_as_primitive(),
321            pco::DEFAULT_COMPRESSION_LEVEL,
322            8192,
323            exec_ctx,
324        )?
325        .into_array())
326    }
327}
328
329impl Scheme for FloatRLEScheme {
330    fn scheme_name(&self) -> &'static str {
331        "vortex.float.rle"
332    }
333
334    fn matches(&self, canonical: &Canonical) -> bool {
335        is_float_primitive(canonical)
336    }
337
338    /// Children: values=0, indices=1, offsets=2.
339    fn num_children(&self) -> usize {
340        3
341    }
342
343    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
344        rle_descendant_exclusions()
345    }
346
347    fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
348        rle_ancestor_exclusions()
349    }
350
351    fn expected_compression_ratio(
352        &self,
353        data: &ArrayAndStats,
354        compress_ctx: CompressorContext,
355        exec_ctx: &mut ExecutionCtx,
356    ) -> CompressionEstimate {
357        // RLE is only useful when we cascade it with another encoding.
358        if compress_ctx.finished_cascading() {
359            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
360        }
361
362        if data.float_stats(exec_ctx).average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
363            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
364        }
365
366        CompressionEstimate::Deferred(DeferredEstimate::Sample)
367    }
368
369    fn compress(
370        &self,
371        compressor: &CascadingCompressor,
372        data: &ArrayAndStats,
373        compress_ctx: CompressorContext,
374        exec_ctx: &mut ExecutionCtx,
375    ) -> VortexResult<ArrayRef> {
376        super::integer::rle_compress(self, compressor, data, compress_ctx, exec_ctx)
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use std::iter;
383    use std::sync::LazyLock;
384
385    use vortex_array::IntoArray;
386    use vortex_array::VortexSessionExecute;
387    use vortex_array::arrays::PrimitiveArray;
388    use vortex_array::assert_arrays_eq;
389    use vortex_array::builders::ArrayBuilder;
390    use vortex_array::builders::PrimitiveBuilder;
391    use vortex_array::display::DisplayOptions;
392    use vortex_array::dtype::Nullability;
393    use vortex_array::session::ArraySession;
394    use vortex_array::validity::Validity;
395    use vortex_buffer::Buffer;
396    use vortex_buffer::buffer_mut;
397    use vortex_compressor::CascadingCompressor;
398    use vortex_error::VortexResult;
399    use vortex_fastlanes::RLE;
400    use vortex_session::VortexSession;
401
402    use crate::BtrBlocksCompressor;
403    use crate::schemes::float::FloatRLEScheme;
404
405    static SESSION: LazyLock<VortexSession> =
406        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
407
408    #[test]
409    fn test_empty() -> VortexResult<()> {
410        let btr = BtrBlocksCompressor::default();
411        let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
412        let result = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
413
414        assert!(result.is_empty());
415        Ok(())
416    }
417
418    #[test]
419    fn test_compress() -> VortexResult<()> {
420        let mut values = buffer_mut![1.0f32; 1024];
421        for i in 0..1024 {
422            values[i] = (i % 50) as f32;
423        }
424
425        let array = values.into_array();
426        let btr = BtrBlocksCompressor::default();
427        let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
428        assert_eq!(compressed.len(), 1024);
429
430        let display = compressed
431            .display_as(DisplayOptions::MetadataOnly)
432            .to_string()
433            .to_lowercase();
434        assert_eq!(display, "vortex.dict(f32, len=1024)");
435
436        Ok(())
437    }
438
439    #[test]
440    fn test_rle_compression() -> VortexResult<()> {
441        let mut values = Vec::new();
442        values.extend(iter::repeat_n(1.5f32, 100));
443        values.extend(iter::repeat_n(2.7f32, 200));
444        values.extend(iter::repeat_n(3.15f32, 150));
445
446        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
447
448        let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
449        let compressed =
450            compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
451        assert!(compressed.is::<RLE>());
452
453        let expected = Buffer::copy_from(&values).into_array();
454        assert_arrays_eq!(compressed, expected);
455        Ok(())
456    }
457
458    #[test]
459    fn test_sparse_compression() -> VortexResult<()> {
460        let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
461        array.append_value(f32::NAN);
462        array.append_value(-f32::NAN);
463        array.append_value(f32::INFINITY);
464        array.append_value(-f32::INFINITY);
465        array.append_value(0.0f32);
466        array.append_value(-0.0f32);
467        array.append_nulls(90);
468
469        let array = array.finish_into_primitive().into_array();
470        let btr = BtrBlocksCompressor::default();
471        let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
472        assert_eq!(compressed.len(), 96);
473
474        let display = compressed
475            .display_as(DisplayOptions::MetadataOnly)
476            .to_string()
477            .to_lowercase();
478        assert_eq!(display, "vortex.sparse(f32?, len=96)");
479
480        Ok(())
481    }
482}
483
484/// Tests to verify that each float compression scheme produces the expected encoding.
485#[cfg(test)]
486mod scheme_selection_tests {
487    use std::sync::LazyLock;
488
489    use vortex_alp::ALP;
490    use vortex_array::IntoArray;
491    use vortex_array::VortexSessionExecute;
492    use vortex_array::arrays::Constant;
493    use vortex_array::arrays::Dict;
494    use vortex_array::arrays::PrimitiveArray;
495    use vortex_array::builders::ArrayBuilder;
496    use vortex_array::builders::PrimitiveBuilder;
497    use vortex_array::dtype::Nullability;
498    use vortex_array::session::ArraySession;
499    use vortex_array::validity::Validity;
500    use vortex_buffer::Buffer;
501    use vortex_error::VortexResult;
502    use vortex_session::VortexSession;
503
504    use crate::BtrBlocksCompressor;
505
506    static SESSION: LazyLock<VortexSession> =
507        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
508
509    #[test]
510    fn test_constant_compressed() -> VortexResult<()> {
511        let values: Vec<f64> = vec![42.5; 100];
512        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
513        let btr = BtrBlocksCompressor::default();
514        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
515        assert!(compressed.is::<Constant>());
516        Ok(())
517    }
518
519    #[test]
520    fn test_alp_compressed() -> VortexResult<()> {
521        let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
522        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
523        let btr = BtrBlocksCompressor::default();
524        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
525        assert!(compressed.is::<ALP>());
526        Ok(())
527    }
528
529    #[test]
530    fn test_dict_compressed() -> VortexResult<()> {
531        let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
532        let values: Vec<f64> = (0..1000)
533            .map(|i| distinct_values[i % distinct_values.len()])
534            .collect();
535        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
536        let btr = BtrBlocksCompressor::default();
537        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
538        assert!(compressed.is::<ALP>());
539        assert!(compressed.children()[0].is::<Dict>());
540        Ok(())
541    }
542
543    #[test]
544    fn test_null_dominated_compressed() -> VortexResult<()> {
545        let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
546        for i in 0..5 {
547            builder.append_value(i as f64);
548        }
549        builder.append_nulls(95);
550        let array = builder.finish_into_primitive();
551        let btr = BtrBlocksCompressor::default();
552        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
553        // Verify the compressed array preserves values.
554        assert_eq!(compressed.len(), 100);
555        Ok(())
556    }
557}