Skip to main content

vortex_btrblocks/schemes/
float.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Float compression schemes.
5
6use vortex_alp::ALP;
7use vortex_alp::ALPArrayExt;
8use vortex_alp::ALPArraySlotsExt;
9use vortex_alp::ALPRDArrayExt;
10use vortex_alp::ALPRDArrayOwnedExt;
11use vortex_alp::RDEncoder;
12use vortex_alp::alp_encode;
13use vortex_array::ArrayRef;
14use vortex_array::Canonical;
15use vortex_array::ExecutionCtx;
16use vortex_array::IntoArray;
17use vortex_array::arrays::Patched;
18use vortex_array::arrays::PrimitiveArray;
19use vortex_array::arrays::patched::use_experimental_patches;
20use vortex_array::arrays::primitive::PrimitiveArrayExt;
21use vortex_array::dtype::PType;
22use vortex_compressor::estimate::CompressionEstimate;
23use vortex_compressor::estimate::DeferredEstimate;
24use vortex_compressor::estimate::EstimateVerdict;
25use vortex_compressor::scheme::ChildSelection;
26use vortex_compressor::scheme::DescendantExclusion;
27use vortex_error::VortexResult;
28use vortex_error::vortex_panic;
29use vortex_sparse::Sparse;
30
31use super::integer::SparseScheme as IntSparseScheme;
32use crate::ArrayAndStats;
33use crate::CascadingCompressor;
34use crate::CompressorContext;
35use crate::Scheme;
36use crate::SchemeExt;
37use crate::compress_patches;
38use crate::schemes::rle_ancestor_exclusions;
39use crate::schemes::rle_descendant_exclusions;
40
41/// ALP (Adaptive Lossless floating-Point) encoding.
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub struct ALPScheme;
44
45/// ALPRD (ALP with Real Double) encoding variant.
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
47pub struct ALPRDScheme;
48
49/// Sparse encoding for null-dominated float arrays.
50///
51/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub struct NullDominatedSparseScheme;
54
55/// Pco (pcodec) compression for floats.
56#[cfg(feature = "pco")]
57#[derive(Debug, Copy, Clone, PartialEq, Eq)]
58pub struct PcoScheme;
59
60// Re-export builtin schemes from vortex-compressor.
61pub use vortex_compressor::builtins::FloatConstantScheme;
62pub use vortex_compressor::builtins::FloatDictScheme;
63pub use vortex_compressor::builtins::is_float_primitive;
64pub use vortex_compressor::stats::FloatStats;
65
66/// RLE scheme for float arrays.
67#[derive(Debug, Copy, Clone, PartialEq, Eq)]
68pub struct FloatRLEScheme;
69
70impl Scheme for ALPScheme {
71    fn scheme_name(&self) -> &'static str {
72        "vortex.float.alp"
73    }
74
75    fn matches(&self, canonical: &Canonical) -> bool {
76        is_float_primitive(canonical)
77    }
78
79    /// Children: encoded_ints=0.
80    fn num_children(&self) -> usize {
81        1
82    }
83
84    fn expected_compression_ratio(
85        &self,
86        data: &ArrayAndStats,
87        compress_ctx: CompressorContext,
88        _exec_ctx: &mut ExecutionCtx,
89    ) -> CompressionEstimate {
90        // ALP encodes floats as integers. Without integer compression afterward, the encoded ints
91        // are the same size.
92        if compress_ctx.finished_cascading() {
93            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
94        }
95
96        // We don't support ALP for f16.
97        if data.array_as_primitive().ptype() == PType::F16 {
98            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
99        }
100
101        CompressionEstimate::Deferred(DeferredEstimate::Sample)
102    }
103
104    fn compress(
105        &self,
106        compressor: &CascadingCompressor,
107        data: &ArrayAndStats,
108        compress_ctx: CompressorContext,
109        exec_ctx: &mut ExecutionCtx,
110    ) -> VortexResult<ArrayRef> {
111        let alp_encoded = alp_encode(data.array_as_primitive(), None, exec_ctx)?;
112
113        // Compress the ALP ints.
114        let compressed_alp_ints = compressor.compress_child(
115            alp_encoded.encoded(),
116            &compress_ctx,
117            self.id(),
118            0,
119            exec_ctx,
120        )?;
121
122        let alp_stats = alp_encoded.as_array().statistics().to_owned();
123        let exponents = alp_encoded.exponents();
124
125        if use_experimental_patches() {
126            let patches = alp_encoded.patches();
127
128            // Create ALP array without interior patches.
129            let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
130
131            match patches {
132                None => Ok(alp_array),
133                Some(p) => Ok(Patched::from_array_and_patches(alp_array, &p, exec_ctx)?
134                    .with_stats_set(alp_stats)
135                    .into_array()),
136            }
137        } else {
138            let patches = alp_encoded
139                .patches()
140                .map(|p| compress_patches(p, exec_ctx))
141                .transpose()?;
142
143            Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
144        }
145    }
146}
147
148impl Scheme for ALPRDScheme {
149    fn scheme_name(&self) -> &'static str {
150        "vortex.float.alprd"
151    }
152
153    fn matches(&self, canonical: &Canonical) -> bool {
154        is_float_primitive(canonical)
155    }
156
157    fn expected_compression_ratio(
158        &self,
159        data: &ArrayAndStats,
160        _compress_ctx: CompressorContext,
161        _exec_ctx: &mut ExecutionCtx,
162    ) -> CompressionEstimate {
163        // We don't support ALPRD for f16.
164        if data.array_as_primitive().ptype() == PType::F16 {
165            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
166        }
167
168        CompressionEstimate::Deferred(DeferredEstimate::Sample)
169    }
170
171    fn compress(
172        &self,
173        _compressor: &CascadingCompressor,
174        data: &ArrayAndStats,
175        _compress_ctx: CompressorContext,
176        exec_ctx: &mut ExecutionCtx,
177    ) -> VortexResult<ArrayRef> {
178        let primitive_array = data.array_as_primitive();
179
180        let encoder = match primitive_array.ptype() {
181            PType::F32 => RDEncoder::new(primitive_array.as_slice::<f32>()),
182            PType::F64 => RDEncoder::new(primitive_array.as_slice::<f64>()),
183            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
184        };
185
186        let alp_rd = encoder.encode(primitive_array, exec_ctx);
187        let dtype = alp_rd.dtype().clone();
188        let right_bit_width = alp_rd.right_bit_width();
189        let mut parts = ALPRDArrayOwnedExt::into_data_parts(alp_rd);
190        parts.left_parts_patches = parts
191            .left_parts_patches
192            .map(|p| compress_patches(p, exec_ctx))
193            .transpose()?;
194
195        Ok(vortex_alp::ALPRD::try_new(
196            dtype,
197            parts.left_parts,
198            parts.left_parts_dictionary,
199            parts.right_parts,
200            right_bit_width,
201            parts.left_parts_patches,
202            exec_ctx,
203        )?
204        .into_array())
205    }
206}
207
208impl Scheme for NullDominatedSparseScheme {
209    fn scheme_name(&self) -> &'static str {
210        "vortex.float.sparse"
211    }
212
213    fn matches(&self, canonical: &Canonical) -> bool {
214        is_float_primitive(canonical)
215    }
216
217    /// Children: indices=0.
218    fn num_children(&self) -> usize {
219        1
220    }
221
222    /// The indices of a null-dominated sparse array should not be sparse-encoded again.
223    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
224        vec![DescendantExclusion {
225            excluded: IntSparseScheme.id(),
226            children: ChildSelection::All,
227        }]
228    }
229
230    fn expected_compression_ratio(
231        &self,
232        data: &ArrayAndStats,
233        _compress_ctx: CompressorContext,
234        exec_ctx: &mut ExecutionCtx,
235    ) -> CompressionEstimate {
236        let len = data.array_len() as f64;
237        let stats = data.float_stats(exec_ctx);
238        let value_count = stats.value_count();
239
240        // All-null arrays should be compressed as constant instead anyways.
241        if value_count == 0 {
242            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
243        }
244
245        // If the majority (90%) of values is null, this will compress well.
246        if stats.null_count() as f64 / len > 0.9 {
247            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
248        }
249
250        // Otherwise we don't go this route.
251        CompressionEstimate::Verdict(EstimateVerdict::Skip)
252    }
253
254    fn compress(
255        &self,
256        compressor: &CascadingCompressor,
257        data: &ArrayAndStats,
258        compress_ctx: CompressorContext,
259        exec_ctx: &mut ExecutionCtx,
260    ) -> VortexResult<ArrayRef> {
261        // We pass None as we only run this pathway for NULL-dominated float arrays.
262        let sparse_encoded = Sparse::encode(data.array(), None, exec_ctx)?;
263
264        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
265            let indices = sparse
266                .patches()
267                .indices()
268                .clone()
269                .execute::<PrimitiveArray>(exec_ctx)?
270                .narrow()?;
271            let compressed_indices = compressor.compress_child(
272                &indices.into_array(),
273                &compress_ctx,
274                self.id(),
275                0,
276                exec_ctx,
277            )?;
278
279            Sparse::try_new(
280                compressed_indices,
281                sparse.patches().values().clone(),
282                sparse.len(),
283                sparse.fill_scalar().clone(),
284            )
285            .map(|a| a.into_array())
286        } else {
287            Ok(sparse_encoded)
288        }
289    }
290}
291
292#[cfg(feature = "pco")]
293impl Scheme for PcoScheme {
294    fn scheme_name(&self) -> &'static str {
295        "vortex.float.pco"
296    }
297
298    fn matches(&self, canonical: &Canonical) -> bool {
299        is_float_primitive(canonical)
300    }
301
302    fn expected_compression_ratio(
303        &self,
304        _data: &ArrayAndStats,
305        _compress_ctx: CompressorContext,
306        _exec_ctx: &mut ExecutionCtx,
307    ) -> CompressionEstimate {
308        CompressionEstimate::Deferred(DeferredEstimate::Sample)
309    }
310
311    fn compress(
312        &self,
313        _compressor: &CascadingCompressor,
314        data: &ArrayAndStats,
315        _compress_ctx: CompressorContext,
316        exec_ctx: &mut ExecutionCtx,
317    ) -> VortexResult<ArrayRef> {
318        Ok(vortex_pco::Pco::from_primitive(
319            data.array_as_primitive(),
320            pco::DEFAULT_COMPRESSION_LEVEL,
321            8192,
322            exec_ctx,
323        )?
324        .into_array())
325    }
326}
327
328impl Scheme for FloatRLEScheme {
329    fn scheme_name(&self) -> &'static str {
330        "vortex.float.rle"
331    }
332
333    fn matches(&self, canonical: &Canonical) -> bool {
334        is_float_primitive(canonical)
335    }
336
337    /// Children: values=0, indices=1, offsets=2.
338    fn num_children(&self) -> usize {
339        3
340    }
341
342    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
343        rle_descendant_exclusions()
344    }
345
346    fn ancestor_exclusions(&self) -> Vec<vortex_compressor::scheme::AncestorExclusion> {
347        rle_ancestor_exclusions()
348    }
349
350    fn expected_compression_ratio(
351        &self,
352        data: &ArrayAndStats,
353        compress_ctx: CompressorContext,
354        exec_ctx: &mut ExecutionCtx,
355    ) -> CompressionEstimate {
356        // RLE is only useful when we cascade it with another encoding.
357        if compress_ctx.finished_cascading() {
358            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
359        }
360
361        if data.float_stats(exec_ctx).average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
362            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
363        }
364
365        CompressionEstimate::Deferred(DeferredEstimate::Sample)
366    }
367
368    fn compress(
369        &self,
370        compressor: &CascadingCompressor,
371        data: &ArrayAndStats,
372        compress_ctx: CompressorContext,
373        exec_ctx: &mut ExecutionCtx,
374    ) -> VortexResult<ArrayRef> {
375        super::integer::rle_compress(self, compressor, data, compress_ctx, exec_ctx)
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use std::iter;
382    use std::sync::LazyLock;
383
384    use vortex_array::IntoArray;
385    use vortex_array::VortexSessionExecute;
386    use vortex_array::arrays::PrimitiveArray;
387    use vortex_array::assert_arrays_eq;
388    use vortex_array::builders::ArrayBuilder;
389    use vortex_array::builders::PrimitiveBuilder;
390    use vortex_array::display::DisplayOptions;
391    use vortex_array::dtype::Nullability;
392    use vortex_array::session::ArraySession;
393    use vortex_array::validity::Validity;
394    use vortex_buffer::Buffer;
395    use vortex_buffer::buffer_mut;
396    use vortex_compressor::CascadingCompressor;
397    use vortex_error::VortexResult;
398    use vortex_fastlanes::RLE;
399    use vortex_session::VortexSession;
400
401    use crate::BtrBlocksCompressor;
402    use crate::schemes::float::FloatRLEScheme;
403
404    static SESSION: LazyLock<VortexSession> =
405        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
406
407    #[test]
408    fn test_empty() -> VortexResult<()> {
409        let btr = BtrBlocksCompressor::default();
410        let array = PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable).into_array();
411        let result = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
412
413        assert!(result.is_empty());
414        Ok(())
415    }
416
417    #[test]
418    fn test_compress() -> VortexResult<()> {
419        let mut values = buffer_mut![1.0f32; 1024];
420        for i in 0..1024 {
421            values[i] = (i % 50) as f32;
422        }
423
424        let array = values.into_array();
425        let btr = BtrBlocksCompressor::default();
426        let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
427        assert_eq!(compressed.len(), 1024);
428
429        let display = compressed
430            .display_as(DisplayOptions::MetadataOnly)
431            .to_string()
432            .to_lowercase();
433        assert_eq!(display, "vortex.dict(f32, len=1024)");
434
435        Ok(())
436    }
437
438    #[test]
439    fn test_rle_compression() -> VortexResult<()> {
440        let mut values = Vec::new();
441        values.extend(iter::repeat_n(1.5f32, 100));
442        values.extend(iter::repeat_n(2.7f32, 200));
443        values.extend(iter::repeat_n(3.15f32, 150));
444
445        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
446
447        let compressor = CascadingCompressor::new(vec![&FloatRLEScheme]);
448        let compressed =
449            compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
450        assert!(compressed.is::<RLE>());
451
452        let expected = Buffer::copy_from(&values).into_array();
453        assert_arrays_eq!(compressed, expected);
454        Ok(())
455    }
456
457    #[test]
458    fn test_sparse_compression() -> VortexResult<()> {
459        let mut array = PrimitiveBuilder::<f32>::with_capacity(Nullability::Nullable, 100);
460        array.append_value(f32::NAN);
461        array.append_value(-f32::NAN);
462        array.append_value(f32::INFINITY);
463        array.append_value(-f32::INFINITY);
464        array.append_value(0.0f32);
465        array.append_value(-0.0f32);
466        array.append_nulls(90);
467
468        let array = array.finish_into_primitive().into_array();
469        let btr = BtrBlocksCompressor::default();
470        let compressed = btr.compress(&array, &mut SESSION.create_execution_ctx())?;
471        assert_eq!(compressed.len(), 96);
472
473        let display = compressed
474            .display_as(DisplayOptions::MetadataOnly)
475            .to_string()
476            .to_lowercase();
477        assert_eq!(display, "vortex.sparse(f32?, len=96)");
478
479        Ok(())
480    }
481}
482
483/// Tests to verify that each float compression scheme produces the expected encoding.
484#[cfg(test)]
485mod scheme_selection_tests {
486    use std::sync::LazyLock;
487
488    use vortex_alp::ALP;
489    use vortex_array::IntoArray;
490    use vortex_array::VortexSessionExecute;
491    use vortex_array::arrays::Constant;
492    use vortex_array::arrays::Dict;
493    use vortex_array::arrays::PrimitiveArray;
494    use vortex_array::builders::ArrayBuilder;
495    use vortex_array::builders::PrimitiveBuilder;
496    use vortex_array::dtype::Nullability;
497    use vortex_array::session::ArraySession;
498    use vortex_array::validity::Validity;
499    use vortex_buffer::Buffer;
500    use vortex_error::VortexResult;
501    use vortex_session::VortexSession;
502
503    use crate::BtrBlocksCompressor;
504
505    static SESSION: LazyLock<VortexSession> =
506        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
507
508    #[test]
509    fn test_constant_compressed() -> VortexResult<()> {
510        let values: Vec<f64> = vec![42.5; 100];
511        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
512        let btr = BtrBlocksCompressor::default();
513        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
514        assert!(compressed.is::<Constant>());
515        Ok(())
516    }
517
518    #[test]
519    fn test_alp_compressed() -> VortexResult<()> {
520        let values: Vec<f64> = (0..1000).map(|i| (i as f64) * 0.01).collect();
521        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
522        let btr = BtrBlocksCompressor::default();
523        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
524        assert!(compressed.is::<ALP>());
525        Ok(())
526    }
527
528    #[test]
529    fn test_dict_compressed() -> VortexResult<()> {
530        let distinct_values = [1.1, 2.2, 3.3, 4.4, 5.5];
531        let values: Vec<f64> = (0..1000)
532            .map(|i| distinct_values[i % distinct_values.len()])
533            .collect();
534        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
535        let btr = BtrBlocksCompressor::default();
536        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
537        assert!(compressed.is::<ALP>());
538        assert!(compressed.children()[0].is::<Dict>());
539        Ok(())
540    }
541
542    #[test]
543    fn test_null_dominated_compressed() -> VortexResult<()> {
544        let mut builder = PrimitiveBuilder::<f64>::with_capacity(Nullability::Nullable, 100);
545        for i in 0..5 {
546            builder.append_value(i as f64);
547        }
548        builder.append_nulls(95);
549        let array = builder.finish_into_primitive();
550        let btr = BtrBlocksCompressor::default();
551        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
552        // Verify the compressed array preserves values.
553        assert_eq!(compressed.len(), 100);
554        Ok(())
555    }
556}