Skip to main content

vortex_btrblocks/schemes/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integer compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::LEGACY_SESSION;
10use vortex_array::ToCanonical;
11use vortex_array::VortexSessionExecute;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::Patched;
14use vortex_array::arrays::primitive::PrimitiveArrayExt;
15use vortex_array::scalar::Scalar;
16use vortex_compressor::builtins::FloatDictScheme;
17use vortex_compressor::builtins::StringDictScheme;
18use vortex_compressor::estimate::CompressionEstimate;
19use vortex_compressor::scheme::AncestorExclusion;
20use vortex_compressor::scheme::ChildSelection;
21use vortex_compressor::scheme::DescendantExclusion;
22#[cfg(feature = "unstable_encodings")]
23use vortex_compressor::scheme::SchemeId;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_fastlanes::BitPacked;
29#[cfg(feature = "unstable_encodings")]
30use vortex_fastlanes::Delta;
31use vortex_fastlanes::FoR;
32use vortex_fastlanes::FoRArrayExt;
33use vortex_fastlanes::RLE;
34use vortex_fastlanes::RLEArrayExt;
35use vortex_fastlanes::USE_EXPERIMENTAL_PATCHES;
36use vortex_fastlanes::bitpack_compress::bit_width_histogram;
37use vortex_fastlanes::bitpack_compress::bitpack_encode;
38use vortex_fastlanes::bitpack_compress::find_best_bit_width;
39use vortex_runend::RunEnd;
40use vortex_runend::compress::runend_encode;
41use vortex_sequence::sequence_encode;
42use vortex_sparse::Sparse;
43use vortex_zigzag::ZigZag;
44use vortex_zigzag::ZigZagArrayExt;
45use vortex_zigzag::zigzag_encode;
46
47use crate::ArrayAndStats;
48use crate::CascadingCompressor;
49use crate::CompressorContext;
50use crate::GenerateStatsOptions;
51use crate::Scheme;
52use crate::SchemeExt;
53use crate::compress_patches;
54use crate::schemes::rle_ancestor_exclusions;
55use crate::schemes::rle_descendant_exclusions;
56
57/// Frame of Reference encoding.
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
59pub struct FoRScheme;
60
61/// ZigZag encoding for negative integers.
62#[derive(Debug, Copy, Clone, PartialEq, Eq)]
63pub struct ZigZagScheme;
64
65/// BitPacking encoding for non-negative integers.
66#[derive(Debug, Copy, Clone, PartialEq, Eq)]
67pub struct BitPackingScheme;
68
69/// Sparse encoding for single-value-dominated arrays.
70#[derive(Debug, Copy, Clone, PartialEq, Eq)]
71pub struct SparseScheme;
72
73/// Run-end encoding with end positions.
74#[derive(Debug, Copy, Clone, PartialEq, Eq)]
75pub struct RunEndScheme;
76
77/// Sequence encoding for sequential patterns.
78#[derive(Debug, Copy, Clone, PartialEq, Eq)]
79pub struct SequenceScheme;
80
81/// Pco (pcodec) compression for integers.
82#[cfg(feature = "pco")]
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub struct PcoScheme;
85
86// Re-export builtin schemes from vortex-compressor.
87pub use vortex_compressor::builtins::IntConstantScheme;
88pub use vortex_compressor::builtins::IntDictScheme;
89pub use vortex_compressor::builtins::is_integer_primitive;
90pub use vortex_compressor::stats::IntegerStats;
91
92/// RLE scheme for integer arrays.
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub struct IntRLEScheme;
95
96/// Threshold for the average run length in an array before we consider run-length encoding.
97pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
98
99/// Threshold for the average run length in an array before we consider run-end encoding.
100const RUN_END_THRESHOLD: u32 = 4;
101
102impl Scheme for FoRScheme {
103    fn scheme_name(&self) -> &'static str {
104        "vortex.int.for"
105    }
106
107    fn matches(&self, canonical: &Canonical) -> bool {
108        is_integer_primitive(canonical)
109    }
110
111    /// Dict codes always start at 0, so FoR (which subtracts the min) is a no-op.
112    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
113        vec![
114            AncestorExclusion {
115                ancestor: IntDictScheme.id(),
116                children: ChildSelection::One(1),
117            },
118            AncestorExclusion {
119                ancestor: FloatDictScheme.id(),
120                children: ChildSelection::One(1),
121            },
122            AncestorExclusion {
123                ancestor: StringDictScheme.id(),
124                children: ChildSelection::One(1),
125            },
126        ]
127    }
128
129    fn expected_compression_ratio(
130        &self,
131        data: &mut ArrayAndStats,
132        ctx: CompressorContext,
133    ) -> CompressionEstimate {
134        // FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
135        // the same size.
136        if ctx.finished_cascading() {
137            return CompressionEstimate::Skip;
138        }
139
140        let stats = data.integer_stats();
141
142        // Only apply when the min is not already zero.
143        if stats.erased().min_is_zero() {
144            return CompressionEstimate::Skip;
145        }
146
147        // Difference between max and min.
148        let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
149            Some(l) => l + 1,
150            // If max-min == 0, the we should be compressing this as a constant array.
151            None => return CompressionEstimate::Skip,
152        };
153
154        // If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
155        // compared to BitPacking, don't use FoR since it has a small amount of overhead (storing
156        // the reference) for effectively no benefits.
157        if let Some(max_log) = stats
158            .erased()
159            .max_ilog2()
160            // Only skip FoR when min >= 0, otherwise BitPacking can't be applied without ZigZag.
161            .filter(|_| !stats.erased().min_is_negative())
162        {
163            let bitpack_bitwidth = max_log + 1;
164            if for_bitwidth >= bitpack_bitwidth {
165                return CompressionEstimate::Skip;
166            }
167        }
168
169        let full_width: u32 = data
170            .array_as_primitive()
171            .ptype()
172            .bit_width()
173            .try_into()
174            .vortex_expect("bit width must fit in u32");
175
176        CompressionEstimate::Ratio(full_width as f64 / for_bitwidth as f64)
177    }
178
179    fn compress(
180        &self,
181        compressor: &CascadingCompressor,
182        data: &mut ArrayAndStats,
183        ctx: CompressorContext,
184    ) -> VortexResult<ArrayRef> {
185        let primitive = data.array().to_primitive();
186        let for_array = FoR::encode(primitive)?;
187        let biased = for_array.encoded().to_primitive();
188
189        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
190        // of bitpacking.
191        // NOTE: we could delegate in the future if we had another downstream codec that performs
192        //  as well.
193        let leaf_ctx = ctx.clone().as_leaf();
194        let mut biased_data = ArrayAndStats::new(biased.into_array(), ctx.merged_stats_options());
195        let compressed = BitPackingScheme.compress(compressor, &mut biased_data, leaf_ctx)?;
196
197        // TODO(connor): This should really be `new_unchecked`.
198        let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
199        for_compressed
200            .as_ref()
201            .statistics()
202            .inherit_from(for_array.as_ref().statistics());
203
204        Ok(for_compressed.into_array())
205    }
206}
207
208impl Scheme for ZigZagScheme {
209    fn scheme_name(&self) -> &'static str {
210        "vortex.int.zigzag"
211    }
212
213    fn matches(&self, canonical: &Canonical) -> bool {
214        is_integer_primitive(canonical)
215    }
216
217    /// Children: encoded=0.
218    fn num_children(&self) -> usize {
219        1
220    }
221
222    /// ZigZag is a bijective value transform that preserves cardinality, run patterns, and value
223    /// dominance. If Dict, RunEnd, or Sparse lost on the original array, they will lose on ZigZag's
224    /// output too, so we skip evaluating them.
225    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
226        vec![
227            DescendantExclusion {
228                excluded: IntDictScheme.id(),
229                children: ChildSelection::All,
230            },
231            DescendantExclusion {
232                excluded: RunEndScheme.id(),
233                children: ChildSelection::All,
234            },
235            DescendantExclusion {
236                excluded: SparseScheme.id(),
237                children: ChildSelection::All,
238            },
239        ]
240    }
241
242    /// Dict codes are unsigned integers (0..cardinality). ZigZag only helps negatives.
243    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
244        vec![
245            AncestorExclusion {
246                ancestor: IntDictScheme.id(),
247                children: ChildSelection::One(1),
248            },
249            AncestorExclusion {
250                ancestor: FloatDictScheme.id(),
251                children: ChildSelection::One(1),
252            },
253            AncestorExclusion {
254                ancestor: StringDictScheme.id(),
255                children: ChildSelection::One(1),
256            },
257        ]
258    }
259
260    fn expected_compression_ratio(
261        &self,
262        data: &mut ArrayAndStats,
263        ctx: CompressorContext,
264    ) -> CompressionEstimate {
265        // ZigZag only transforms negative values to positive. Without further compression,
266        // the output is the same size.
267        if ctx.finished_cascading() {
268            return CompressionEstimate::Skip;
269        }
270
271        let stats = data.integer_stats();
272
273        // ZigZag is only useful when there are negative values.
274        if !stats.erased().min_is_negative() {
275            return CompressionEstimate::Skip;
276        }
277
278        CompressionEstimate::Sample
279    }
280
281    fn compress(
282        &self,
283        compressor: &CascadingCompressor,
284        data: &mut ArrayAndStats,
285        ctx: CompressorContext,
286    ) -> VortexResult<ArrayRef> {
287        // Zigzag encode the values, then recursively compress the inner values.
288        let zag = zigzag_encode(data.array_as_primitive())?;
289        let encoded = zag.encoded().to_primitive();
290
291        let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?;
292
293        tracing::debug!("zigzag output: {}", compressed.encoding_id());
294
295        Ok(ZigZag::try_new(compressed)?.into_array())
296    }
297}
298
299impl Scheme for BitPackingScheme {
300    fn scheme_name(&self) -> &'static str {
301        "vortex.int.bitpacking"
302    }
303
304    fn matches(&self, canonical: &Canonical) -> bool {
305        is_integer_primitive(canonical)
306    }
307
308    fn expected_compression_ratio(
309        &self,
310        data: &mut ArrayAndStats,
311        _ctx: CompressorContext,
312    ) -> CompressionEstimate {
313        let stats = data.integer_stats();
314
315        // BitPacking only works for non-negative values.
316        if stats.erased().min_is_negative() {
317            return CompressionEstimate::Skip;
318        }
319
320        CompressionEstimate::Sample
321    }
322
323    fn compress(
324        &self,
325        _compressor: &CascadingCompressor,
326        data: &mut ArrayAndStats,
327        _ctx: CompressorContext,
328    ) -> VortexResult<ArrayRef> {
329        let primitive_array = data.array_as_primitive();
330
331        let histogram = bit_width_histogram(&primitive_array)?;
332        let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
333
334        // If best bw is determined to be the current bit-width, return the original array.
335        if bw as usize == primitive_array.ptype().bit_width() {
336            return Ok(primitive_array.into_array());
337        }
338
339        // Otherwise we can bitpack the array.
340        let packed = bitpack_encode(&primitive_array, bw, Some(&histogram))?;
341
342        let packed_stats = packed.statistics().to_owned();
343        let ptype = packed.dtype().as_ptype();
344        let mut parts = BitPacked::into_parts(packed);
345
346        let array = if *USE_EXPERIMENTAL_PATCHES {
347            let patches = parts.patches.take();
348            // Transpose patches into G-ALP style PatchedArray, wrapping an inner BitPackedArray.
349            let array = BitPacked::try_new(
350                parts.packed,
351                ptype,
352                parts.validity,
353                None,
354                parts.bit_width,
355                parts.len,
356                parts.offset,
357            )?
358            .into_array();
359
360            match patches {
361                None => array,
362                Some(p) => Patched::from_array_and_patches(
363                    array,
364                    &p,
365                    &mut LEGACY_SESSION.create_execution_ctx(),
366                )?
367                .with_stats_set(packed_stats)
368                .into_array(),
369            }
370        } else {
371            // Compress patches and place back into BitPackedArray.
372            let patches = parts.patches.take().map(compress_patches).transpose()?;
373            parts.patches = patches;
374            BitPacked::try_new(
375                parts.packed,
376                ptype,
377                parts.validity,
378                parts.patches,
379                parts.bit_width,
380                parts.len,
381                parts.offset,
382            )?
383            .with_stats_set(packed_stats)
384            .into_array()
385        };
386
387        Ok(array)
388    }
389}
390
391impl Scheme for SparseScheme {
392    fn scheme_name(&self) -> &'static str {
393        "vortex.int.sparse"
394    }
395
396    fn matches(&self, canonical: &Canonical) -> bool {
397        is_integer_primitive(canonical)
398    }
399
400    fn stats_options(&self) -> GenerateStatsOptions {
401        GenerateStatsOptions {
402            count_distinct_values: true,
403        }
404    }
405
406    /// Children: values=0, indices=1.
407    fn num_children(&self) -> usize {
408        2
409    }
410
411    /// Sparse indices (child 1) are monotonically increasing positions with all unique values.
412    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
413    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
414        vec![
415            DescendantExclusion {
416                excluded: IntDictScheme.id(),
417                children: ChildSelection::One(1),
418            },
419            DescendantExclusion {
420                excluded: RunEndScheme.id(),
421                children: ChildSelection::One(1),
422            },
423            DescendantExclusion {
424                excluded: IntRLEScheme.id(),
425                children: ChildSelection::One(1),
426            },
427            DescendantExclusion {
428                excluded: SparseScheme.id(),
429                children: ChildSelection::One(1),
430            },
431        ]
432    }
433
434    fn expected_compression_ratio(
435        &self,
436        data: &mut ArrayAndStats,
437        _ctx: CompressorContext,
438    ) -> CompressionEstimate {
439        let len = data.array_len() as f64;
440        let stats = data.integer_stats();
441        let value_count = stats.value_count();
442
443        // All-null arrays should be compressed as constant instead anyways.
444        if value_count == 0 {
445            return CompressionEstimate::Skip;
446        }
447
448        // If the majority (90%) of values is null, this will compress well.
449        if stats.null_count() as f64 / len > 0.9 {
450            return CompressionEstimate::Ratio(len / value_count as f64);
451        }
452
453        let (_, most_frequent_count) = stats
454            .erased()
455            .most_frequent_value_and_count()
456            .vortex_expect(
457                "this must be present since `SparseScheme` declared that we need distinct values",
458            );
459
460        // If the most frequent value is the only value, we should compress as constant instead.
461        if most_frequent_count == value_count {
462            return CompressionEstimate::Skip;
463        }
464        debug_assert!(value_count > most_frequent_count);
465
466        // See if the most frequent value accounts for >= 90% of the set values.
467        let freq = most_frequent_count as f64 / value_count as f64;
468        if freq < 0.9 {
469            return CompressionEstimate::Skip;
470        }
471
472        // We only store the positions of the non-top values.
473        CompressionEstimate::Ratio(value_count as f64 / (value_count - most_frequent_count) as f64)
474    }
475
476    fn compress(
477        &self,
478        compressor: &CascadingCompressor,
479        data: &mut ArrayAndStats,
480        ctx: CompressorContext,
481    ) -> VortexResult<ArrayRef> {
482        let len = data.array_len();
483        // TODO(connor): Fight the borrow checker (needs interior mutability)!
484        let stats = data.integer_stats().clone();
485        let array = data.array();
486
487        let (most_frequent_value, most_frequent_count) = stats
488            .erased()
489            .most_frequent_value_and_count()
490            .vortex_expect(
491                "this must be present since `SparseScheme` declared that we need distinct values",
492            );
493
494        if most_frequent_count as usize == len {
495            // If the most frequent value is the only value, we should compress as constant instead.
496            return Ok(ConstantArray::new(
497                Scalar::primitive_value(
498                    most_frequent_value,
499                    most_frequent_value.ptype(),
500                    array.dtype().nullability(),
501                ),
502                len,
503            )
504            .into_array());
505        }
506
507        let sparse_encoded = Sparse::encode(
508            array,
509            Some(Scalar::primitive_value(
510                most_frequent_value,
511                most_frequent_value.ptype(),
512                array.dtype().nullability(),
513            )),
514        )?;
515
516        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
517            let compressed_values = compressor.compress_child(
518                &sparse.patches().values().to_primitive().into_array(),
519                &ctx,
520                self.id(),
521                0,
522            )?;
523
524            let indices = sparse.patches().indices().to_primitive().narrow()?;
525
526            let compressed_indices =
527                compressor.compress_child(&indices.into_array(), &ctx, self.id(), 1)?;
528
529            Sparse::try_new(
530                compressed_indices,
531                compressed_values,
532                sparse.len(),
533                sparse.fill_scalar().clone(),
534            )
535            .map(|a| a.into_array())
536        } else {
537            Ok(sparse_encoded)
538        }
539    }
540}
541
542impl Scheme for RunEndScheme {
543    fn scheme_name(&self) -> &'static str {
544        "vortex.int.runend"
545    }
546
547    fn matches(&self, canonical: &Canonical) -> bool {
548        is_integer_primitive(canonical)
549    }
550
551    /// Children: values=0, ends=1.
552    fn num_children(&self) -> usize {
553        2
554    }
555
556    /// RunEnd ends (child 1) are monotonically increasing positions with all unique values.
557    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
558    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
559        vec![
560            DescendantExclusion {
561                excluded: IntDictScheme.id(),
562                children: ChildSelection::One(1),
563            },
564            DescendantExclusion {
565                excluded: RunEndScheme.id(),
566                children: ChildSelection::One(1),
567            },
568            DescendantExclusion {
569                excluded: IntRLEScheme.id(),
570                children: ChildSelection::One(1),
571            },
572            DescendantExclusion {
573                excluded: SparseScheme.id(),
574                children: ChildSelection::One(1),
575            },
576        ]
577    }
578
579    /// Dict values (child 0) are all unique by definition, so run-end encoding them is
580    /// pointless. Codes (child 1) can have runs and may benefit from RunEnd.
581    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
582        vec![
583            AncestorExclusion {
584                ancestor: IntDictScheme.id(),
585                children: ChildSelection::One(0),
586            },
587            AncestorExclusion {
588                ancestor: FloatDictScheme.id(),
589                children: ChildSelection::One(0),
590            },
591            AncestorExclusion {
592                ancestor: StringDictScheme.id(),
593                children: ChildSelection::One(0),
594            },
595        ]
596    }
597
598    fn expected_compression_ratio(
599        &self,
600        data: &mut ArrayAndStats,
601        _ctx: CompressorContext,
602    ) -> CompressionEstimate {
603        // If the run length is below the threshold, drop it.
604        if data.integer_stats().average_run_length() < RUN_END_THRESHOLD {
605            return CompressionEstimate::Skip;
606        }
607
608        CompressionEstimate::Sample
609    }
610
611    fn compress(
612        &self,
613        compressor: &CascadingCompressor,
614        data: &mut ArrayAndStats,
615        ctx: CompressorContext,
616    ) -> VortexResult<ArrayRef> {
617        // Run-end encode the ends.
618        let (ends, values) = runend_encode(data.array_as_primitive().as_view());
619
620        let compressed_values =
621            compressor.compress_child(&values.to_primitive().into_array(), &ctx, self.id(), 0)?;
622
623        let compressed_ends = compressor.compress_child(&ends.into_array(), &ctx, self.id(), 1)?;
624
625        // SAFETY: compression doesn't affect invariants.
626        Ok(unsafe {
627            RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
628                .into_array()
629        })
630    }
631}
632
633impl Scheme for SequenceScheme {
634    fn scheme_name(&self) -> &'static str {
635        "vortex.int.sequence"
636    }
637
638    fn matches(&self, canonical: &Canonical) -> bool {
639        is_integer_primitive(canonical)
640    }
641
642    /// Sequence encoding on dictionary codes just adds a layer of indirection without compressing
643    /// the data. Dict codes are compact integers that benefit from BitPacking or FoR, not from
644    /// sequence detection.
645    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
646        vec![
647            AncestorExclusion {
648                ancestor: IntDictScheme.id(),
649                children: ChildSelection::One(1),
650            },
651            AncestorExclusion {
652                ancestor: FloatDictScheme.id(),
653                children: ChildSelection::One(1),
654            },
655            AncestorExclusion {
656                ancestor: StringDictScheme.id(),
657                children: ChildSelection::One(1),
658            },
659        ]
660    }
661
662    fn expected_compression_ratio(
663        &self,
664        data: &mut ArrayAndStats,
665        ctx: CompressorContext,
666    ) -> CompressionEstimate {
667        // It is pointless checking if a sample is a sequence since it will not correspond to the
668        // entire array.
669        if ctx.is_sample() {
670            return CompressionEstimate::Skip;
671        }
672
673        let stats = data.integer_stats();
674
675        // `SequenceArray` does not support nulls.
676        if stats.null_count() > 0 {
677            return CompressionEstimate::Skip;
678        }
679
680        // If the distinct_values_count was computed, and not all values are unique, then this
681        // cannot be encoded as a sequence array.
682        if stats
683            .distinct_count()
684            .is_some_and(|count| count as usize != data.array_len())
685        {
686            return CompressionEstimate::Skip;
687        }
688
689        // TODO(connor): Why do we sequence encode the whole thing and then throw it away? And then
690        // why do we divide the ratio by 2???
691
692        CompressionEstimate::Estimate(Box::new(|_compressor, data, _ctx| {
693            let Some(encoded) = sequence_encode(&data.array_as_primitive())? else {
694                // If we are unable to sequence encode this array, make sure we skip.
695                return Ok(CompressionEstimate::Skip);
696            };
697
698            // TODO(connor): This doesn't really make sense?
699            // Since two values are required to store base and multiplier the compression ratio is
700            // divided by 2.
701            Ok(CompressionEstimate::Ratio(encoded.len() as f64 / 2.0))
702        }))
703    }
704
705    fn compress(
706        &self,
707        _compressor: &CascadingCompressor,
708        data: &mut ArrayAndStats,
709        _ctx: CompressorContext,
710    ) -> VortexResult<ArrayRef> {
711        let stats = data.integer_stats();
712
713        if stats.null_count() > 0 {
714            vortex_bail!("sequence encoding does not support nulls");
715        }
716        sequence_encode(&data.array_as_primitive())?
717            .ok_or_else(|| vortex_err!("cannot sequence encode array"))
718    }
719}
720
721#[cfg(feature = "pco")]
722impl Scheme for PcoScheme {
723    fn scheme_name(&self) -> &'static str {
724        "vortex.int.pco"
725    }
726
727    fn matches(&self, canonical: &Canonical) -> bool {
728        is_integer_primitive(canonical)
729    }
730
731    fn expected_compression_ratio(
732        &self,
733        data: &mut ArrayAndStats,
734        _ctx: CompressorContext,
735    ) -> CompressionEstimate {
736        use vortex_array::dtype::PType;
737
738        // Pco does not support I8 or U8.
739        if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
740            return CompressionEstimate::Skip;
741        }
742
743        CompressionEstimate::Sample
744    }
745
746    fn compress(
747        &self,
748        _compressor: &CascadingCompressor,
749        data: &mut ArrayAndStats,
750        _ctx: CompressorContext,
751    ) -> VortexResult<ArrayRef> {
752        Ok(vortex_pco::Pco::from_primitive(
753            &data.array_as_primitive(),
754            pco::DEFAULT_COMPRESSION_LEVEL,
755            8192,
756        )?
757        .into_array())
758    }
759}
760
761/// Shared compression logic for RLE schemes.
762pub(crate) fn rle_compress(
763    scheme: &dyn Scheme,
764    compressor: &CascadingCompressor,
765    data: &mut ArrayAndStats,
766    ctx: CompressorContext,
767) -> VortexResult<ArrayRef> {
768    let array = data.array_as_primitive();
769    let rle_array = RLE::encode(&array)?;
770
771    let compressed_values = compressor.compress_child(
772        &rle_array.values().to_primitive().into_array(),
773        &ctx,
774        scheme.id(),
775        0,
776    )?;
777
778    // Delta is an unstable encoding, once we deem it stable we can switch over to this always.
779    #[cfg(feature = "unstable_encodings")]
780    let compressed_indices = try_compress_delta(
781        compressor,
782        &rle_array.indices().to_primitive().narrow()?.into_array(),
783        &ctx,
784        scheme.id(),
785        1,
786    )?;
787
788    #[cfg(not(feature = "unstable_encodings"))]
789    let compressed_indices = compressor.compress_child(
790        &rle_array.indices().to_primitive().narrow()?.into_array(),
791        &ctx,
792        scheme.id(),
793        1,
794    )?;
795
796    let compressed_offsets = compressor.compress_child(
797        &rle_array
798            .values_idx_offsets()
799            .to_primitive()
800            .narrow()?
801            .into_array(),
802        &ctx,
803        scheme.id(),
804        2,
805    )?;
806
807    // SAFETY: Recursive compression doesn't affect the invariants.
808    unsafe {
809        Ok(RLE::new_unchecked(
810            compressed_values,
811            compressed_indices,
812            compressed_offsets,
813            rle_array.offset(),
814            rle_array.len(),
815        )
816        .into_array())
817    }
818}
819
820#[cfg(feature = "unstable_encodings")]
821fn try_compress_delta(
822    compressor: &CascadingCompressor,
823    child: &ArrayRef,
824    parent_ctx: &CompressorContext,
825    parent_id: SchemeId,
826    child_index: usize,
827) -> VortexResult<ArrayRef> {
828    let (bases, deltas) =
829        vortex_fastlanes::delta_compress(&child.to_primitive(), &mut compressor.execution_ctx())?;
830
831    let compressed_bases =
832        compressor.compress_child(&bases.into_array(), parent_ctx, parent_id, child_index)?;
833    let compressed_deltas =
834        compressor.compress_child(&deltas.into_array(), parent_ctx, parent_id, child_index)?;
835
836    Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
837}
838
839impl Scheme for IntRLEScheme {
840    fn scheme_name(&self) -> &'static str {
841        "vortex.int.rle"
842    }
843
844    fn matches(&self, canonical: &Canonical) -> bool {
845        is_integer_primitive(canonical)
846    }
847
848    /// Children: values=0, indices=1, offsets=2.
849    fn num_children(&self) -> usize {
850        3
851    }
852
853    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
854        rle_descendant_exclusions()
855    }
856
857    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
858        rle_ancestor_exclusions()
859    }
860
861    fn expected_compression_ratio(
862        &self,
863        data: &mut ArrayAndStats,
864        ctx: CompressorContext,
865    ) -> CompressionEstimate {
866        // RLE is only useful when we cascade it with another encoding.
867        if ctx.finished_cascading() {
868            return CompressionEstimate::Skip;
869        }
870
871        if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD {
872            return CompressionEstimate::Skip;
873        }
874
875        CompressionEstimate::Sample
876    }
877
878    fn compress(
879        &self,
880        compressor: &CascadingCompressor,
881        data: &mut ArrayAndStats,
882        ctx: CompressorContext,
883    ) -> VortexResult<ArrayRef> {
884        rle_compress(self, compressor, data, ctx)
885    }
886}
887
888#[cfg(test)]
889mod tests {
890    use std::iter;
891
892    use itertools::Itertools;
893    use rand::Rng;
894    use rand::SeedableRng;
895    use rand::rngs::StdRng;
896    use vortex_array::IntoArray;
897    use vortex_array::arrays::Constant;
898    use vortex_array::arrays::Dict;
899    use vortex_array::arrays::Masked;
900    use vortex_array::arrays::PrimitiveArray;
901    use vortex_array::assert_arrays_eq;
902    use vortex_array::validity::Validity;
903    use vortex_buffer::Buffer;
904    use vortex_buffer::BufferMut;
905    use vortex_buffer::buffer;
906    use vortex_compressor::CascadingCompressor;
907    use vortex_error::VortexResult;
908    use vortex_fastlanes::RLE;
909    use vortex_sequence::Sequence;
910
911    use crate::BtrBlocksCompressor;
912    use crate::schemes::integer::IntRLEScheme;
913
914    #[test]
915    fn test_empty() -> VortexResult<()> {
916        // Make sure empty array compression does not fail.
917        let btr = BtrBlocksCompressor::default();
918        let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
919        let result = btr.compress(&array.into_array())?;
920
921        assert!(result.is_empty());
922        Ok(())
923    }
924
925    #[test]
926    fn test_dict_encodable() -> VortexResult<()> {
927        let mut codes = BufferMut::<i32>::with_capacity(65_535);
928        // Write some runs of length 3 of a handful of different values. Interrupted by some
929        // one-off values.
930
931        let numbers = [0, 10, 50, 100, 1000, 3000]
932            .into_iter()
933            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
934            .collect_vec();
935
936        let mut rng = StdRng::seed_from_u64(1u64);
937        while codes.len() < 64000 {
938            let run_length = rng.next_u32() % 5;
939            let value = numbers[rng.next_u32() as usize % numbers.len()];
940            for _ in 0..run_length {
941                codes.push(value);
942            }
943        }
944
945        let btr = BtrBlocksCompressor::default();
946        let compressed = btr.compress(&codes.freeze().into_array())?;
947        assert!(compressed.is::<Dict>());
948        Ok(())
949    }
950
951    #[test]
952    fn constant_mostly_nulls() -> VortexResult<()> {
953        let array = PrimitiveArray::new(
954            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
955            Validity::from_iter(vec![
956                false, false, false, false, false, false, false, false, false, false, true,
957            ]),
958        );
959        let validity = array.validity()?;
960
961        let btr = BtrBlocksCompressor::default();
962        let compressed = btr.compress(&array.into_array())?;
963
964        assert!(compressed.is::<Masked>());
965        assert!(compressed.children()[0].is::<Constant>());
966
967        let decoded = compressed;
968        let expected =
969            PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
970        assert_arrays_eq!(decoded, expected);
971        Ok(())
972    }
973
974    #[test]
975    fn nullable_sequence() -> VortexResult<()> {
976        let values = (0i32..20).step_by(7).collect_vec();
977        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
978
979        let btr = BtrBlocksCompressor::default();
980        let compressed = btr.compress(&array.into_array())?;
981        assert!(compressed.is::<Sequence>());
982
983        let decoded = compressed;
984        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
985        assert_arrays_eq!(decoded, expected);
986        Ok(())
987    }
988
989    #[test]
990    fn test_rle_compression() -> VortexResult<()> {
991        let mut values = Vec::new();
992        values.extend(iter::repeat_n(42i32, 100));
993        values.extend(iter::repeat_n(123i32, 200));
994        values.extend(iter::repeat_n(987i32, 150));
995
996        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
997        let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
998        let compressed = compressor.compress(&array.into_array())?;
999        assert!(compressed.is::<RLE>());
1000
1001        let expected = Buffer::copy_from(&values).into_array();
1002        assert_arrays_eq!(compressed, expected);
1003        Ok(())
1004    }
1005
1006    #[test_with::env(CI)]
1007    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1008    fn compress_large_int() -> VortexResult<()> {
1009        const NUM_LISTS: usize = 10_000;
1010        const ELEMENTS_PER_LIST: usize = 5_000;
1011
1012        let prim = (0..NUM_LISTS)
1013            .flat_map(|list_idx| {
1014                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1015            })
1016            .collect::<PrimitiveArray>()
1017            .into_array();
1018
1019        let btr = BtrBlocksCompressor::default();
1020        btr.compress(&prim)?;
1021
1022        Ok(())
1023    }
1024}
1025
1026/// Tests to verify that each integer compression scheme produces the expected encoding.
1027#[cfg(test)]
1028mod scheme_selection_tests {
1029    use std::iter;
1030
1031    use rand::Rng;
1032    use rand::SeedableRng;
1033    use rand::rngs::StdRng;
1034    use vortex_array::IntoArray;
1035    use vortex_array::arrays::Constant;
1036    use vortex_array::arrays::Dict;
1037    use vortex_array::arrays::PrimitiveArray;
1038    use vortex_array::expr::stats::Precision;
1039    use vortex_array::expr::stats::Stat;
1040    use vortex_array::expr::stats::StatsProviderExt;
1041    use vortex_array::validity::Validity;
1042    use vortex_buffer::Buffer;
1043    use vortex_error::VortexResult;
1044    use vortex_fastlanes::BitPacked;
1045    use vortex_fastlanes::FoR;
1046    use vortex_runend::RunEnd;
1047    use vortex_sequence::Sequence;
1048    use vortex_sparse::Sparse;
1049
1050    use crate::BtrBlocksCompressor;
1051
1052    #[test]
1053    fn test_constant_compressed() -> VortexResult<()> {
1054        let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1055        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1056        let btr = BtrBlocksCompressor::default();
1057        let compressed = btr.compress(&array.into_array())?;
1058        assert!(compressed.is::<Constant>());
1059        Ok(())
1060    }
1061
1062    #[test]
1063    fn test_for_compressed() -> VortexResult<()> {
1064        let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1065        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1066        let btr = BtrBlocksCompressor::default();
1067        let compressed = btr.compress(&array.into_array())?;
1068        assert!(compressed.is::<FoR>());
1069        Ok(())
1070    }
1071
1072    #[test]
1073    fn test_bitpacking_compressed() -> VortexResult<()> {
1074        let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1075        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1076        let btr = BtrBlocksCompressor::default();
1077        let compressed = btr.compress(&array.into_array())?;
1078        assert!(compressed.is::<BitPacked>());
1079        assert_eq!(
1080            compressed.statistics().get_as::<u64>(Stat::NullCount),
1081            Some(Precision::exact(0u64))
1082        );
1083        assert_eq!(
1084            compressed.statistics().get_as::<u32>(Stat::Min),
1085            Some(Precision::exact(0u32))
1086        );
1087        assert_eq!(
1088            compressed.statistics().get_as::<u32>(Stat::Max),
1089            Some(Precision::exact(15u32))
1090        );
1091        Ok(())
1092    }
1093
1094    #[test]
1095    fn test_sparse_compressed() -> VortexResult<()> {
1096        let mut values: Vec<i32> = Vec::new();
1097        for i in 0..1000 {
1098            if i % 20 == 0 {
1099                values.push(2_000_000 + (i * 7) % 1000);
1100            } else {
1101                values.push(1_000_000);
1102            }
1103        }
1104        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1105        let btr = BtrBlocksCompressor::default();
1106        let compressed = btr.compress(&array.into_array())?;
1107        assert!(compressed.is::<Sparse>());
1108        Ok(())
1109    }
1110
1111    #[test]
1112    fn test_dict_compressed() -> VortexResult<()> {
1113        let mut codes = Vec::with_capacity(65_535);
1114        let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1115            .into_iter()
1116            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1117            .collect();
1118
1119        let mut rng = StdRng::seed_from_u64(1u64);
1120        while codes.len() < 64000 {
1121            let run_length = rng.next_u32() % 5;
1122            let value = numbers[rng.next_u32() as usize % numbers.len()];
1123            for _ in 0..run_length {
1124                codes.push(value);
1125            }
1126        }
1127
1128        let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1129        let btr = BtrBlocksCompressor::default();
1130        let compressed = btr.compress(&array.into_array())?;
1131        assert!(compressed.is::<Dict>());
1132        Ok(())
1133    }
1134
1135    #[test]
1136    fn test_runend_compressed() -> VortexResult<()> {
1137        let mut values: Vec<i32> = Vec::new();
1138        for i in 0..100 {
1139            values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1140        }
1141        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1142        let btr = BtrBlocksCompressor::default();
1143        let compressed = btr.compress(&array.into_array())?;
1144        assert!(compressed.is::<RunEnd>());
1145        Ok(())
1146    }
1147
1148    #[test]
1149    fn test_sequence_compressed() -> VortexResult<()> {
1150        let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1151        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1152        let btr = BtrBlocksCompressor::default();
1153        let compressed = btr.compress(&array.into_array())?;
1154        assert!(compressed.is::<Sequence>());
1155        Ok(())
1156    }
1157
1158    #[test]
1159    fn test_rle_compressed() -> VortexResult<()> {
1160        let mut values: Vec<i32> = Vec::new();
1161        for i in 0..1024 {
1162            values.extend(iter::repeat_n(i, 10));
1163        }
1164        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1165        let btr = BtrBlocksCompressor::default();
1166        let compressed = btr.compress(&array.into_array())?;
1167        eprintln!("{}", compressed.display_tree());
1168        assert!(compressed.is::<RunEnd>());
1169        Ok(())
1170    }
1171}