Skip to main content

vortex_btrblocks/schemes/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integer compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::LEGACY_SESSION;
10use vortex_array::ToCanonical;
11use vortex_array::VortexSessionExecute;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::Patched;
14use vortex_array::arrays::patched::use_experimental_patches;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::scalar::Scalar;
17use vortex_compressor::builtins::FloatDictScheme;
18use vortex_compressor::builtins::StringDictScheme;
19use vortex_compressor::estimate::CompressionEstimate;
20use vortex_compressor::estimate::DeferredEstimate;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_compressor::scheme::AncestorExclusion;
23use vortex_compressor::scheme::ChildSelection;
24use vortex_compressor::scheme::DescendantExclusion;
25#[cfg(feature = "unstable_encodings")]
26use vortex_compressor::scheme::SchemeId;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_err;
31use vortex_fastlanes::BitPacked;
32#[cfg(feature = "unstable_encodings")]
33use vortex_fastlanes::Delta;
34use vortex_fastlanes::FoR;
35use vortex_fastlanes::FoRArrayExt;
36use vortex_fastlanes::RLE;
37use vortex_fastlanes::RLEArrayExt;
38use vortex_fastlanes::bitpack_compress::bit_width_histogram;
39use vortex_fastlanes::bitpack_compress::bitpack_encode;
40use vortex_fastlanes::bitpack_compress::find_best_bit_width;
41use vortex_runend::RunEnd;
42use vortex_runend::compress::runend_encode;
43use vortex_sequence::sequence_encode;
44use vortex_sparse::Sparse;
45use vortex_zigzag::ZigZag;
46use vortex_zigzag::ZigZagArrayExt;
47use vortex_zigzag::zigzag_encode;
48
49use crate::ArrayAndStats;
50use crate::CascadingCompressor;
51use crate::CompressorContext;
52use crate::GenerateStatsOptions;
53use crate::Scheme;
54use crate::SchemeExt;
55use crate::compress_patches;
56use crate::schemes::rle_ancestor_exclusions;
57use crate::schemes::rle_descendant_exclusions;
58
59/// Frame of Reference encoding.
60#[derive(Debug, Copy, Clone, PartialEq, Eq)]
61pub struct FoRScheme;
62
63/// ZigZag encoding for negative integers.
64#[derive(Debug, Copy, Clone, PartialEq, Eq)]
65pub struct ZigZagScheme;
66
67/// BitPacking encoding for non-negative integers.
68#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct BitPackingScheme;
70
71/// Sparse encoding for single-value-dominated arrays.
72#[derive(Debug, Copy, Clone, PartialEq, Eq)]
73pub struct SparseScheme;
74
75/// Run-end encoding with end positions.
76#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub struct RunEndScheme;
78
79/// Sequence encoding for sequential patterns.
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81pub struct SequenceScheme;
82
83/// Pco (pcodec) compression for integers.
84#[cfg(feature = "pco")]
85#[derive(Debug, Copy, Clone, PartialEq, Eq)]
86pub struct PcoScheme;
87
88// Re-export builtin schemes from vortex-compressor.
89pub use vortex_compressor::builtins::IntConstantScheme;
90pub use vortex_compressor::builtins::IntDictScheme;
91pub use vortex_compressor::builtins::is_integer_primitive;
92pub use vortex_compressor::stats::IntegerStats;
93
94/// RLE scheme for integer arrays.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub struct IntRLEScheme;
97
98/// Threshold for the average run length in an array before we consider run-length encoding.
99pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
100
101/// Threshold for the average run length in an array before we consider run-end encoding.
102const RUN_END_THRESHOLD: u32 = 4;
103
104impl Scheme for FoRScheme {
105    fn scheme_name(&self) -> &'static str {
106        "vortex.int.for"
107    }
108
109    fn matches(&self, canonical: &Canonical) -> bool {
110        is_integer_primitive(canonical)
111    }
112
113    /// Dict codes always start at 0, so FoR (which subtracts the min) is a no-op.
114    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
115        vec![
116            AncestorExclusion {
117                ancestor: IntDictScheme.id(),
118                children: ChildSelection::One(1),
119            },
120            AncestorExclusion {
121                ancestor: FloatDictScheme.id(),
122                children: ChildSelection::One(1),
123            },
124            AncestorExclusion {
125                ancestor: StringDictScheme.id(),
126                children: ChildSelection::One(1),
127            },
128        ]
129    }
130
131    fn expected_compression_ratio(
132        &self,
133        data: &mut ArrayAndStats,
134        ctx: CompressorContext,
135    ) -> CompressionEstimate {
136        // FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
137        // the same size.
138        if ctx.finished_cascading() {
139            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
140        }
141
142        let stats = data.integer_stats();
143
144        // Only apply when the min is not already zero.
145        if stats.erased().min_is_zero() {
146            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
147        }
148
149        // Difference between max and min.
150        let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
151            Some(l) => l + 1,
152            // If max-min == 0, the we should be compressing this as a constant array.
153            None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
154        };
155
156        // If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
157        // compared to BitPacking, don't use FoR since it has a small amount of overhead (storing
158        // the reference) for effectively no benefits.
159        if let Some(max_log) = stats
160            .erased()
161            .max_ilog2()
162            // Only skip FoR when min >= 0, otherwise BitPacking can't be applied without ZigZag.
163            .filter(|_| !stats.erased().min_is_negative())
164        {
165            let bitpack_bitwidth = max_log + 1;
166            if for_bitwidth >= bitpack_bitwidth {
167                return CompressionEstimate::Verdict(EstimateVerdict::Skip);
168            }
169        }
170
171        let full_width: u32 = data
172            .array_as_primitive()
173            .ptype()
174            .bit_width()
175            .try_into()
176            .vortex_expect("bit width must fit in u32");
177
178        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
179            full_width as f64 / for_bitwidth as f64,
180        ))
181    }
182
183    fn compress(
184        &self,
185        compressor: &CascadingCompressor,
186        data: &mut ArrayAndStats,
187        ctx: CompressorContext,
188    ) -> VortexResult<ArrayRef> {
189        let primitive = data.array().to_primitive();
190        let for_array = FoR::encode(primitive)?;
191        let biased = for_array.encoded().to_primitive();
192
193        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
194        // of bitpacking.
195        // NOTE: we could delegate in the future if we had another downstream codec that performs
196        //  as well.
197        let leaf_ctx = ctx.clone().as_leaf();
198        let mut biased_data = ArrayAndStats::new(biased.into_array(), ctx.merged_stats_options());
199        let compressed = BitPackingScheme.compress(compressor, &mut biased_data, leaf_ctx)?;
200
201        // TODO(connor): This should really be `new_unchecked`.
202        let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
203        for_compressed
204            .as_ref()
205            .statistics()
206            .inherit_from(for_array.as_ref().statistics());
207
208        Ok(for_compressed.into_array())
209    }
210}
211
212impl Scheme for ZigZagScheme {
213    fn scheme_name(&self) -> &'static str {
214        "vortex.int.zigzag"
215    }
216
217    fn matches(&self, canonical: &Canonical) -> bool {
218        is_integer_primitive(canonical)
219    }
220
221    /// Children: encoded=0.
222    fn num_children(&self) -> usize {
223        1
224    }
225
226    /// ZigZag is a bijective value transform that preserves cardinality, run patterns, and value
227    /// dominance. If Dict, RunEnd, or Sparse lost on the original array, they will lose on ZigZag's
228    /// output too, so we skip evaluating them.
229    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
230        vec![
231            DescendantExclusion {
232                excluded: IntDictScheme.id(),
233                children: ChildSelection::All,
234            },
235            DescendantExclusion {
236                excluded: RunEndScheme.id(),
237                children: ChildSelection::All,
238            },
239            DescendantExclusion {
240                excluded: SparseScheme.id(),
241                children: ChildSelection::All,
242            },
243        ]
244    }
245
246    /// Dict codes are unsigned integers (0..cardinality). ZigZag only helps negatives.
247    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
248        vec![
249            AncestorExclusion {
250                ancestor: IntDictScheme.id(),
251                children: ChildSelection::One(1),
252            },
253            AncestorExclusion {
254                ancestor: FloatDictScheme.id(),
255                children: ChildSelection::One(1),
256            },
257            AncestorExclusion {
258                ancestor: StringDictScheme.id(),
259                children: ChildSelection::One(1),
260            },
261        ]
262    }
263
264    fn expected_compression_ratio(
265        &self,
266        data: &mut ArrayAndStats,
267        ctx: CompressorContext,
268    ) -> CompressionEstimate {
269        // ZigZag only transforms negative values to positive. Without further compression,
270        // the output is the same size.
271        if ctx.finished_cascading() {
272            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
273        }
274
275        let stats = data.integer_stats();
276
277        // ZigZag is only useful when there are negative values.
278        if !stats.erased().min_is_negative() {
279            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
280        }
281
282        CompressionEstimate::Deferred(DeferredEstimate::Sample)
283    }
284
285    fn compress(
286        &self,
287        compressor: &CascadingCompressor,
288        data: &mut ArrayAndStats,
289        ctx: CompressorContext,
290    ) -> VortexResult<ArrayRef> {
291        // Zigzag encode the values, then recursively compress the inner values.
292        let zag = zigzag_encode(data.array_as_primitive())?;
293        let encoded = zag.encoded().to_primitive();
294
295        let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?;
296
297        tracing::debug!("zigzag output: {}", compressed.encoding_id());
298
299        Ok(ZigZag::try_new(compressed)?.into_array())
300    }
301}
302
303impl Scheme for BitPackingScheme {
304    fn scheme_name(&self) -> &'static str {
305        "vortex.int.bitpacking"
306    }
307
308    fn matches(&self, canonical: &Canonical) -> bool {
309        is_integer_primitive(canonical)
310    }
311
312    fn expected_compression_ratio(
313        &self,
314        data: &mut ArrayAndStats,
315        _ctx: CompressorContext,
316    ) -> CompressionEstimate {
317        let stats = data.integer_stats();
318
319        // BitPacking only works for non-negative values.
320        if stats.erased().min_is_negative() {
321            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
322        }
323
324        CompressionEstimate::Deferred(DeferredEstimate::Sample)
325    }
326
327    fn compress(
328        &self,
329        _compressor: &CascadingCompressor,
330        data: &mut ArrayAndStats,
331        _ctx: CompressorContext,
332    ) -> VortexResult<ArrayRef> {
333        let primitive_array = data.array_as_primitive();
334
335        let histogram = bit_width_histogram(primitive_array)?;
336        let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
337
338        // If best bw is determined to be the current bit-width, return the original array.
339        if bw as usize == primitive_array.ptype().bit_width() {
340            return Ok(primitive_array.array().clone());
341        }
342
343        // Otherwise we can bitpack the array.
344        let primitive_array = primitive_array.into_owned();
345        let packed = bitpack_encode(&primitive_array, bw, Some(&histogram))?;
346
347        let packed_stats = packed.statistics().to_owned();
348        let ptype = packed.dtype().as_ptype();
349        let mut parts = BitPacked::into_parts(packed);
350
351        let array = if use_experimental_patches() {
352            let patches = parts.patches.take();
353            // Transpose patches into G-ALP style PatchedArray, wrapping an inner BitPackedArray.
354            let array = BitPacked::try_new(
355                parts.packed,
356                ptype,
357                parts.validity,
358                None,
359                parts.bit_width,
360                parts.len,
361                parts.offset,
362            )?
363            .into_array();
364
365            match patches {
366                None => array,
367                Some(p) => Patched::from_array_and_patches(
368                    array,
369                    &p,
370                    &mut LEGACY_SESSION.create_execution_ctx(),
371                )?
372                .with_stats_set(packed_stats)
373                .into_array(),
374            }
375        } else {
376            // Compress patches and place back into BitPackedArray.
377            let patches = parts.patches.take().map(compress_patches).transpose()?;
378            parts.patches = patches;
379            BitPacked::try_new(
380                parts.packed,
381                ptype,
382                parts.validity,
383                parts.patches,
384                parts.bit_width,
385                parts.len,
386                parts.offset,
387            )?
388            .with_stats_set(packed_stats)
389            .into_array()
390        };
391
392        Ok(array)
393    }
394}
395
396impl Scheme for SparseScheme {
397    fn scheme_name(&self) -> &'static str {
398        "vortex.int.sparse"
399    }
400
401    fn matches(&self, canonical: &Canonical) -> bool {
402        is_integer_primitive(canonical)
403    }
404
405    fn stats_options(&self) -> GenerateStatsOptions {
406        GenerateStatsOptions {
407            count_distinct_values: true,
408        }
409    }
410
411    /// Children: values=0, indices=1.
412    fn num_children(&self) -> usize {
413        2
414    }
415
416    /// Sparse indices (child 1) are monotonically increasing positions with all unique values.
417    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
418    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
419        vec![
420            DescendantExclusion {
421                excluded: IntDictScheme.id(),
422                children: ChildSelection::One(1),
423            },
424            DescendantExclusion {
425                excluded: RunEndScheme.id(),
426                children: ChildSelection::One(1),
427            },
428            DescendantExclusion {
429                excluded: IntRLEScheme.id(),
430                children: ChildSelection::One(1),
431            },
432            DescendantExclusion {
433                excluded: SparseScheme.id(),
434                children: ChildSelection::One(1),
435            },
436        ]
437    }
438
439    fn expected_compression_ratio(
440        &self,
441        data: &mut ArrayAndStats,
442        _ctx: CompressorContext,
443    ) -> CompressionEstimate {
444        let len = data.array_len() as f64;
445        let stats = data.integer_stats();
446        let value_count = stats.value_count();
447
448        // All-null arrays should be compressed as constant instead anyways.
449        if value_count == 0 {
450            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
451        }
452
453        // If the majority (90%) of values is null, this will compress well.
454        if stats.null_count() as f64 / len > 0.9 {
455            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
456        }
457
458        let (_, most_frequent_count) = stats
459            .erased()
460            .most_frequent_value_and_count()
461            .vortex_expect(
462                "this must be present since `SparseScheme` declared that we need distinct values",
463            );
464
465        // If the most frequent value is the only value, we should compress as constant instead.
466        if most_frequent_count == value_count {
467            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
468        }
469        debug_assert!(value_count > most_frequent_count);
470
471        // See if the most frequent value accounts for >= 90% of the set values.
472        let freq = most_frequent_count as f64 / value_count as f64;
473        if freq < 0.9 {
474            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
475        }
476
477        // We only store the positions of the non-top values.
478        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
479            value_count as f64 / (value_count - most_frequent_count) as f64,
480        ))
481    }
482
483    fn compress(
484        &self,
485        compressor: &CascadingCompressor,
486        data: &mut ArrayAndStats,
487        ctx: CompressorContext,
488    ) -> VortexResult<ArrayRef> {
489        let len = data.array_len();
490        // TODO(connor): Fight the borrow checker (needs interior mutability)!
491        let stats = data.integer_stats().clone();
492        let array = data.array();
493
494        let (most_frequent_value, most_frequent_count) = stats
495            .erased()
496            .most_frequent_value_and_count()
497            .vortex_expect(
498                "this must be present since `SparseScheme` declared that we need distinct values",
499            );
500
501        if most_frequent_count as usize == len {
502            // If the most frequent value is the only value, we should compress as constant instead.
503            return Ok(ConstantArray::new(
504                Scalar::primitive_value(
505                    most_frequent_value,
506                    most_frequent_value.ptype(),
507                    array.dtype().nullability(),
508                ),
509                len,
510            )
511            .into_array());
512        }
513
514        let sparse_encoded = Sparse::encode(
515            array,
516            Some(Scalar::primitive_value(
517                most_frequent_value,
518                most_frequent_value.ptype(),
519                array.dtype().nullability(),
520            )),
521        )?;
522
523        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
524            let compressed_values = compressor.compress_child(
525                &sparse.patches().values().to_primitive().into_array(),
526                &ctx,
527                self.id(),
528                0,
529            )?;
530
531            let indices = sparse.patches().indices().to_primitive().narrow()?;
532
533            let compressed_indices =
534                compressor.compress_child(&indices.into_array(), &ctx, self.id(), 1)?;
535
536            Sparse::try_new(
537                compressed_indices,
538                compressed_values,
539                sparse.len(),
540                sparse.fill_scalar().clone(),
541            )
542            .map(|a| a.into_array())
543        } else {
544            Ok(sparse_encoded)
545        }
546    }
547}
548
549impl Scheme for RunEndScheme {
550    fn scheme_name(&self) -> &'static str {
551        "vortex.int.runend"
552    }
553
554    fn matches(&self, canonical: &Canonical) -> bool {
555        is_integer_primitive(canonical)
556    }
557
558    /// Children: values=0, ends=1.
559    fn num_children(&self) -> usize {
560        2
561    }
562
563    /// RunEnd ends (child 1) are monotonically increasing positions with all unique values.
564    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
565    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
566        vec![
567            DescendantExclusion {
568                excluded: IntDictScheme.id(),
569                children: ChildSelection::One(1),
570            },
571            DescendantExclusion {
572                excluded: RunEndScheme.id(),
573                children: ChildSelection::One(1),
574            },
575            DescendantExclusion {
576                excluded: IntRLEScheme.id(),
577                children: ChildSelection::One(1),
578            },
579            DescendantExclusion {
580                excluded: SparseScheme.id(),
581                children: ChildSelection::One(1),
582            },
583        ]
584    }
585
586    /// Dict values (child 0) are all unique by definition, so run-end encoding them is
587    /// pointless. Codes (child 1) can have runs and may benefit from RunEnd.
588    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
589        vec![
590            AncestorExclusion {
591                ancestor: IntDictScheme.id(),
592                children: ChildSelection::One(0),
593            },
594            AncestorExclusion {
595                ancestor: FloatDictScheme.id(),
596                children: ChildSelection::One(0),
597            },
598            AncestorExclusion {
599                ancestor: StringDictScheme.id(),
600                children: ChildSelection::One(0),
601            },
602        ]
603    }
604
605    fn expected_compression_ratio(
606        &self,
607        data: &mut ArrayAndStats,
608        _ctx: CompressorContext,
609    ) -> CompressionEstimate {
610        // If the run length is below the threshold, drop it.
611        if data.integer_stats().average_run_length() < RUN_END_THRESHOLD {
612            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
613        }
614
615        CompressionEstimate::Deferred(DeferredEstimate::Sample)
616    }
617
618    fn compress(
619        &self,
620        compressor: &CascadingCompressor,
621        data: &mut ArrayAndStats,
622        ctx: CompressorContext,
623    ) -> VortexResult<ArrayRef> {
624        // Run-end encode the ends.
625        let (ends, values) = runend_encode(data.array_as_primitive());
626
627        let compressed_values =
628            compressor.compress_child(&values.to_primitive().into_array(), &ctx, self.id(), 0)?;
629
630        let compressed_ends = compressor.compress_child(&ends.into_array(), &ctx, self.id(), 1)?;
631
632        // SAFETY: compression doesn't affect invariants.
633        Ok(unsafe {
634            RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
635                .into_array()
636        })
637    }
638}
639
640impl Scheme for SequenceScheme {
641    fn scheme_name(&self) -> &'static str {
642        "vortex.int.sequence"
643    }
644
645    fn matches(&self, canonical: &Canonical) -> bool {
646        is_integer_primitive(canonical)
647    }
648
649    /// Sequence encoding on dictionary codes just adds a layer of indirection without compressing
650    /// the data. Dict codes are compact integers that benefit from BitPacking or FoR, not from
651    /// sequence detection.
652    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
653        vec![
654            AncestorExclusion {
655                ancestor: IntDictScheme.id(),
656                children: ChildSelection::One(1),
657            },
658            AncestorExclusion {
659                ancestor: FloatDictScheme.id(),
660                children: ChildSelection::One(1),
661            },
662            AncestorExclusion {
663                ancestor: StringDictScheme.id(),
664                children: ChildSelection::One(1),
665            },
666        ]
667    }
668
669    fn expected_compression_ratio(
670        &self,
671        data: &mut ArrayAndStats,
672        ctx: CompressorContext,
673    ) -> CompressionEstimate {
674        // It is pointless checking if a sample is a sequence since it will not correspond to the
675        // entire array.
676        if ctx.is_sample() {
677            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
678        }
679
680        let stats = data.integer_stats();
681
682        // `SequenceArray` does not support nulls.
683        if stats.null_count() > 0 {
684            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
685        }
686
687        // If the distinct_values_count was computed, and not all values are unique, then this
688        // cannot be encoded as a sequence array.
689        if stats
690            .distinct_count()
691            .is_some_and(|count| count as usize != data.array_len())
692        {
693            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
694        }
695
696        // TODO(connor): Why do we sequence encode the whole thing and then throw it away? And then
697        // why do we divide the ratio by 2???
698
699        CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
700            |_compressor, data, _ctx| {
701                let Some(encoded) = sequence_encode(data.array_as_primitive())? else {
702                    // If we are unable to sequence encode this array, make sure we skip.
703                    return Ok(EstimateVerdict::Skip);
704                };
705
706                // TODO(connor): This doesn't really make sense?
707                // Since two values are required to store base and multiplier the compression ratio is
708                // divided by 2.
709                Ok(EstimateVerdict::Ratio(encoded.len() as f64 / 2.0))
710            },
711        )))
712    }
713
714    fn compress(
715        &self,
716        _compressor: &CascadingCompressor,
717        data: &mut ArrayAndStats,
718        _ctx: CompressorContext,
719    ) -> VortexResult<ArrayRef> {
720        let stats = data.integer_stats();
721
722        if stats.null_count() > 0 {
723            vortex_bail!("sequence encoding does not support nulls");
724        }
725        sequence_encode(data.array_as_primitive())?
726            .ok_or_else(|| vortex_err!("cannot sequence encode array"))
727    }
728}
729
730#[cfg(feature = "pco")]
731impl Scheme for PcoScheme {
732    fn scheme_name(&self) -> &'static str {
733        "vortex.int.pco"
734    }
735
736    fn matches(&self, canonical: &Canonical) -> bool {
737        is_integer_primitive(canonical)
738    }
739
740    fn expected_compression_ratio(
741        &self,
742        data: &mut ArrayAndStats,
743        _ctx: CompressorContext,
744    ) -> CompressionEstimate {
745        use vortex_array::dtype::PType;
746
747        // Pco does not support I8 or U8.
748        if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
749            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
750        }
751
752        CompressionEstimate::Deferred(DeferredEstimate::Sample)
753    }
754
755    fn compress(
756        &self,
757        _compressor: &CascadingCompressor,
758        data: &mut ArrayAndStats,
759        _ctx: CompressorContext,
760    ) -> VortexResult<ArrayRef> {
761        Ok(vortex_pco::Pco::from_primitive(
762            data.array_as_primitive(),
763            pco::DEFAULT_COMPRESSION_LEVEL,
764            8192,
765        )?
766        .into_array())
767    }
768}
769
770/// Shared compression logic for RLE schemes.
771pub(crate) fn rle_compress(
772    scheme: &dyn Scheme,
773    compressor: &CascadingCompressor,
774    data: &mut ArrayAndStats,
775    ctx: CompressorContext,
776) -> VortexResult<ArrayRef> {
777    let rle_array = RLE::encode(data.array_as_primitive())?;
778
779    let compressed_values = compressor.compress_child(
780        &rle_array.values().to_primitive().into_array(),
781        &ctx,
782        scheme.id(),
783        0,
784    )?;
785
786    // Delta is an unstable encoding, once we deem it stable we can switch over to this always.
787    #[cfg(feature = "unstable_encodings")]
788    let compressed_indices = try_compress_delta(
789        compressor,
790        &rle_array.indices().to_primitive().narrow()?.into_array(),
791        &ctx,
792        scheme.id(),
793        1,
794    )?;
795
796    #[cfg(not(feature = "unstable_encodings"))]
797    let compressed_indices = compressor.compress_child(
798        &rle_array.indices().to_primitive().narrow()?.into_array(),
799        &ctx,
800        scheme.id(),
801        1,
802    )?;
803
804    let compressed_offsets = compressor.compress_child(
805        &rle_array
806            .values_idx_offsets()
807            .to_primitive()
808            .narrow()?
809            .into_array(),
810        &ctx,
811        scheme.id(),
812        2,
813    )?;
814
815    // SAFETY: Recursive compression doesn't affect the invariants.
816    unsafe {
817        Ok(RLE::new_unchecked(
818            compressed_values,
819            compressed_indices,
820            compressed_offsets,
821            rle_array.offset(),
822            rle_array.len(),
823        )
824        .into_array())
825    }
826}
827
828#[cfg(feature = "unstable_encodings")]
829fn try_compress_delta(
830    compressor: &CascadingCompressor,
831    child: &ArrayRef,
832    parent_ctx: &CompressorContext,
833    parent_id: SchemeId,
834    child_index: usize,
835) -> VortexResult<ArrayRef> {
836    let (bases, deltas) =
837        vortex_fastlanes::delta_compress(&child.to_primitive(), &mut compressor.execution_ctx())?;
838
839    let compressed_bases =
840        compressor.compress_child(&bases.into_array(), parent_ctx, parent_id, child_index)?;
841    let compressed_deltas =
842        compressor.compress_child(&deltas.into_array(), parent_ctx, parent_id, child_index)?;
843
844    Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
845}
846
847impl Scheme for IntRLEScheme {
848    fn scheme_name(&self) -> &'static str {
849        "vortex.int.rle"
850    }
851
852    fn matches(&self, canonical: &Canonical) -> bool {
853        is_integer_primitive(canonical)
854    }
855
856    /// Children: values=0, indices=1, offsets=2.
857    fn num_children(&self) -> usize {
858        3
859    }
860
861    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
862        rle_descendant_exclusions()
863    }
864
865    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
866        rle_ancestor_exclusions()
867    }
868
869    fn expected_compression_ratio(
870        &self,
871        data: &mut ArrayAndStats,
872        ctx: CompressorContext,
873    ) -> CompressionEstimate {
874        // RLE is only useful when we cascade it with another encoding.
875        if ctx.finished_cascading() {
876            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
877        }
878
879        if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD {
880            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
881        }
882
883        CompressionEstimate::Deferred(DeferredEstimate::Sample)
884    }
885
886    fn compress(
887        &self,
888        compressor: &CascadingCompressor,
889        data: &mut ArrayAndStats,
890        ctx: CompressorContext,
891    ) -> VortexResult<ArrayRef> {
892        rle_compress(self, compressor, data, ctx)
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use std::iter;
899
900    use itertools::Itertools;
901    use rand::Rng;
902    use rand::SeedableRng;
903    use rand::rngs::StdRng;
904    use vortex_array::IntoArray;
905    use vortex_array::arrays::Constant;
906    use vortex_array::arrays::Dict;
907    use vortex_array::arrays::Masked;
908    use vortex_array::arrays::PrimitiveArray;
909    use vortex_array::assert_arrays_eq;
910    use vortex_array::validity::Validity;
911    use vortex_buffer::Buffer;
912    use vortex_buffer::BufferMut;
913    use vortex_buffer::buffer;
914    use vortex_compressor::CascadingCompressor;
915    use vortex_error::VortexResult;
916    use vortex_fastlanes::RLE;
917    use vortex_sequence::Sequence;
918
919    use crate::BtrBlocksCompressor;
920    use crate::schemes::integer::IntRLEScheme;
921
922    #[test]
923    fn test_empty() -> VortexResult<()> {
924        // Make sure empty array compression does not fail.
925        let btr = BtrBlocksCompressor::default();
926        let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
927        let result = btr.compress(&array.into_array())?;
928
929        assert!(result.is_empty());
930        Ok(())
931    }
932
933    #[test]
934    fn test_dict_encodable() -> VortexResult<()> {
935        let mut codes = BufferMut::<i32>::with_capacity(65_535);
936        // Write some runs of length 3 of a handful of different values. Interrupted by some
937        // one-off values.
938
939        let numbers = [0, 10, 50, 100, 1000, 3000]
940            .into_iter()
941            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
942            .collect_vec();
943
944        let mut rng = StdRng::seed_from_u64(1u64);
945        while codes.len() < 64000 {
946            let run_length = rng.next_u32() % 5;
947            let value = numbers[rng.next_u32() as usize % numbers.len()];
948            for _ in 0..run_length {
949                codes.push(value);
950            }
951        }
952
953        let btr = BtrBlocksCompressor::default();
954        let compressed = btr.compress(&codes.freeze().into_array())?;
955        assert!(compressed.is::<Dict>());
956        Ok(())
957    }
958
959    #[test]
960    fn constant_mostly_nulls() -> VortexResult<()> {
961        let array = PrimitiveArray::new(
962            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
963            Validity::from_iter(vec![
964                false, false, false, false, false, false, false, false, false, false, true,
965            ]),
966        );
967        let validity = array.validity()?;
968
969        let btr = BtrBlocksCompressor::default();
970        let compressed = btr.compress(&array.into_array())?;
971
972        assert!(compressed.is::<Masked>());
973        assert!(compressed.children()[0].is::<Constant>());
974
975        let decoded = compressed;
976        let expected =
977            PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
978        assert_arrays_eq!(decoded, expected);
979        Ok(())
980    }
981
982    #[test]
983    fn nullable_sequence() -> VortexResult<()> {
984        let values = (0i32..20).step_by(7).collect_vec();
985        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
986
987        let btr = BtrBlocksCompressor::default();
988        let compressed = btr.compress(&array.into_array())?;
989        assert!(compressed.is::<Sequence>());
990
991        let decoded = compressed;
992        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
993        assert_arrays_eq!(decoded, expected);
994        Ok(())
995    }
996
997    #[test]
998    fn test_rle_compression() -> VortexResult<()> {
999        let mut values = Vec::new();
1000        values.extend(iter::repeat_n(42i32, 100));
1001        values.extend(iter::repeat_n(123i32, 200));
1002        values.extend(iter::repeat_n(987i32, 150));
1003
1004        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1005        let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
1006        let compressed = compressor.compress(&array.into_array())?;
1007        assert!(compressed.is::<RLE>());
1008
1009        let expected = Buffer::copy_from(&values).into_array();
1010        assert_arrays_eq!(compressed, expected);
1011        Ok(())
1012    }
1013
1014    #[test_with::env(CI)]
1015    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1016    fn compress_large_int() -> VortexResult<()> {
1017        const NUM_LISTS: usize = 10_000;
1018        const ELEMENTS_PER_LIST: usize = 5_000;
1019
1020        let prim = (0..NUM_LISTS)
1021            .flat_map(|list_idx| {
1022                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1023            })
1024            .collect::<PrimitiveArray>()
1025            .into_array();
1026
1027        let btr = BtrBlocksCompressor::default();
1028        btr.compress(&prim)?;
1029
1030        Ok(())
1031    }
1032}
1033
1034/// Tests to verify that each integer compression scheme produces the expected encoding.
1035#[cfg(test)]
1036mod scheme_selection_tests {
1037    use std::iter;
1038
1039    use rand::Rng;
1040    use rand::SeedableRng;
1041    use rand::rngs::StdRng;
1042    use vortex_array::IntoArray;
1043    use vortex_array::arrays::Constant;
1044    use vortex_array::arrays::Dict;
1045    use vortex_array::arrays::PrimitiveArray;
1046    use vortex_array::expr::stats::Precision;
1047    use vortex_array::expr::stats::Stat;
1048    use vortex_array::expr::stats::StatsProviderExt;
1049    use vortex_array::validity::Validity;
1050    use vortex_buffer::Buffer;
1051    use vortex_error::VortexResult;
1052    use vortex_fastlanes::BitPacked;
1053    use vortex_fastlanes::FoR;
1054    use vortex_runend::RunEnd;
1055    use vortex_sequence::Sequence;
1056    use vortex_sparse::Sparse;
1057
1058    use crate::BtrBlocksCompressor;
1059
1060    #[test]
1061    fn test_constant_compressed() -> VortexResult<()> {
1062        let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1063        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1064        let btr = BtrBlocksCompressor::default();
1065        let compressed = btr.compress(&array.into_array())?;
1066        assert!(compressed.is::<Constant>());
1067        Ok(())
1068    }
1069
1070    #[test]
1071    fn test_for_compressed() -> VortexResult<()> {
1072        let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1073        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1074        let btr = BtrBlocksCompressor::default();
1075        let compressed = btr.compress(&array.into_array())?;
1076        assert!(compressed.is::<FoR>());
1077        Ok(())
1078    }
1079
1080    #[test]
1081    fn test_bitpacking_compressed() -> VortexResult<()> {
1082        let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1083        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1084        let btr = BtrBlocksCompressor::default();
1085        let compressed = btr.compress(&array.into_array())?;
1086        assert!(compressed.is::<BitPacked>());
1087        assert_eq!(
1088            compressed.statistics().get_as::<u64>(Stat::NullCount),
1089            Some(Precision::exact(0u64))
1090        );
1091        assert_eq!(
1092            compressed.statistics().get_as::<u32>(Stat::Min),
1093            Some(Precision::exact(0u32))
1094        );
1095        assert_eq!(
1096            compressed.statistics().get_as::<u32>(Stat::Max),
1097            Some(Precision::exact(15u32))
1098        );
1099        Ok(())
1100    }
1101
1102    #[test]
1103    fn test_sparse_compressed() -> VortexResult<()> {
1104        let mut values: Vec<i32> = Vec::new();
1105        for i in 0..1000 {
1106            if i % 20 == 0 {
1107                values.push(2_000_000 + (i * 7) % 1000);
1108            } else {
1109                values.push(1_000_000);
1110            }
1111        }
1112        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1113        let btr = BtrBlocksCompressor::default();
1114        let compressed = btr.compress(&array.into_array())?;
1115        assert!(compressed.is::<Sparse>());
1116        Ok(())
1117    }
1118
1119    #[test]
1120    fn test_dict_compressed() -> VortexResult<()> {
1121        let mut codes = Vec::with_capacity(65_535);
1122        let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1123            .into_iter()
1124            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1125            .collect();
1126
1127        let mut rng = StdRng::seed_from_u64(1u64);
1128        while codes.len() < 64000 {
1129            let run_length = rng.next_u32() % 5;
1130            let value = numbers[rng.next_u32() as usize % numbers.len()];
1131            for _ in 0..run_length {
1132                codes.push(value);
1133            }
1134        }
1135
1136        let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1137        let btr = BtrBlocksCompressor::default();
1138        let compressed = btr.compress(&array.into_array())?;
1139        assert!(compressed.is::<Dict>());
1140        Ok(())
1141    }
1142
1143    #[test]
1144    fn test_runend_compressed() -> VortexResult<()> {
1145        let mut values: Vec<i32> = Vec::new();
1146        for i in 0..100 {
1147            values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1148        }
1149        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1150        let btr = BtrBlocksCompressor::default();
1151        let compressed = btr.compress(&array.into_array())?;
1152        assert!(compressed.is::<RunEnd>());
1153        Ok(())
1154    }
1155
1156    #[test]
1157    fn test_sequence_compressed() -> VortexResult<()> {
1158        let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1159        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1160        let btr = BtrBlocksCompressor::default();
1161        let compressed = btr.compress(&array.into_array())?;
1162        assert!(compressed.is::<Sequence>());
1163        Ok(())
1164    }
1165
1166    #[test]
1167    fn test_rle_compressed() -> VortexResult<()> {
1168        let mut values: Vec<i32> = Vec::new();
1169        for i in 0..1024 {
1170            values.extend(iter::repeat_n(i, 10));
1171        }
1172        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1173        let btr = BtrBlocksCompressor::default();
1174        let compressed = btr.compress(&array.into_array())?;
1175        eprintln!("{}", compressed.display_tree());
1176        assert!(compressed.is::<RunEnd>());
1177        Ok(())
1178    }
1179}