Skip to main content

vortex_btrblocks/schemes/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integer compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Patched;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::patched::use_experimental_patches;
14use vortex_array::arrays::primitive::PrimitiveArrayExt;
15use vortex_array::scalar::Scalar;
16use vortex_compressor::builtins::FloatDictScheme;
17use vortex_compressor::builtins::StringDictScheme;
18use vortex_compressor::estimate::CompressionEstimate;
19use vortex_compressor::estimate::DeferredEstimate;
20use vortex_compressor::estimate::EstimateScore;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_compressor::scheme::AncestorExclusion;
23use vortex_compressor::scheme::ChildSelection;
24use vortex_compressor::scheme::DescendantExclusion;
25#[cfg(feature = "unstable_encodings")]
26use vortex_compressor::scheme::SchemeId;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_err;
31use vortex_fastlanes::BitPacked;
32#[cfg(feature = "unstable_encodings")]
33use vortex_fastlanes::Delta;
34use vortex_fastlanes::FoR;
35use vortex_fastlanes::FoRArrayExt;
36use vortex_fastlanes::RLE;
37use vortex_fastlanes::RLEArrayExt;
38use vortex_fastlanes::bitpack_compress::bit_width_histogram;
39use vortex_fastlanes::bitpack_compress::bitpack_encode;
40use vortex_fastlanes::bitpack_compress::find_best_bit_width;
41use vortex_runend::RunEnd;
42use vortex_runend::compress::runend_encode;
43use vortex_sequence::sequence_encode;
44use vortex_sparse::Sparse;
45use vortex_sparse::SparseExt as _;
46use vortex_zigzag::ZigZag;
47use vortex_zigzag::ZigZagArrayExt;
48use vortex_zigzag::zigzag_encode;
49
50use crate::ArrayAndStats;
51use crate::CascadingCompressor;
52use crate::CompressorContext;
53use crate::GenerateStatsOptions;
54use crate::Scheme;
55use crate::SchemeExt;
56use crate::compress_patches;
57use crate::schemes::rle_ancestor_exclusions;
58use crate::schemes::rle_descendant_exclusions;
59
60/// Frame of Reference encoding.
61#[derive(Debug, Copy, Clone, PartialEq, Eq)]
62pub struct FoRScheme;
63
64/// ZigZag encoding for negative integers.
65#[derive(Debug, Copy, Clone, PartialEq, Eq)]
66pub struct ZigZagScheme;
67
68/// BitPacking encoding for non-negative integers.
69#[derive(Debug, Copy, Clone, PartialEq, Eq)]
70pub struct BitPackingScheme;
71
72/// Sparse encoding for single-value-dominated arrays.
73#[derive(Debug, Copy, Clone, PartialEq, Eq)]
74pub struct SparseScheme;
75
76/// Run-end encoding with end positions.
77#[derive(Debug, Copy, Clone, PartialEq, Eq)]
78pub struct RunEndScheme;
79
80/// Sequence encoding for sequential patterns.
81#[derive(Debug, Copy, Clone, PartialEq, Eq)]
82pub struct SequenceScheme;
83
84/// Pco (pcodec) compression for integers.
85#[cfg(feature = "pco")]
86#[derive(Debug, Copy, Clone, PartialEq, Eq)]
87pub struct PcoScheme;
88
89// Re-export builtin schemes from vortex-compressor.
90pub use vortex_compressor::builtins::IntConstantScheme;
91pub use vortex_compressor::builtins::IntDictScheme;
92pub use vortex_compressor::builtins::is_integer_primitive;
93pub use vortex_compressor::stats::IntegerStats;
94
95/// RLE scheme for integer arrays.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct IntRLEScheme;
98
99/// Threshold for the average run length in an array before we consider run-length encoding.
100pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
101
102/// Threshold for the average run length in an array before we consider run-end encoding.
103const RUN_END_THRESHOLD: u32 = 4;
104
105impl Scheme for FoRScheme {
106    fn scheme_name(&self) -> &'static str {
107        "vortex.int.for"
108    }
109
110    fn matches(&self, canonical: &Canonical) -> bool {
111        is_integer_primitive(canonical)
112    }
113
114    /// Dict codes always start at 0, so FoR (which subtracts the min) is a no-op.
115    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
116        vec![
117            AncestorExclusion {
118                ancestor: IntDictScheme.id(),
119                children: ChildSelection::One(1),
120            },
121            AncestorExclusion {
122                ancestor: FloatDictScheme.id(),
123                children: ChildSelection::One(1),
124            },
125            AncestorExclusion {
126                ancestor: StringDictScheme.id(),
127                children: ChildSelection::One(1),
128            },
129        ]
130    }
131
132    fn expected_compression_ratio(
133        &self,
134        data: &ArrayAndStats,
135        compress_ctx: CompressorContext,
136        exec_ctx: &mut ExecutionCtx,
137    ) -> CompressionEstimate {
138        // FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
139        // the same size.
140        if compress_ctx.finished_cascading() {
141            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
142        }
143        let stats = data.integer_stats(exec_ctx);
144
145        // Only apply when the min is not already zero.
146        if stats.erased().min_is_zero() {
147            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
148        }
149
150        // Difference between max and min.
151        let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
152            Some(l) => l + 1,
153            // If max-min == 0, the we should be compressing this as a constant array.
154            None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
155        };
156
157        // If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
158        // compared to BitPacking, don't use FoR since it has a small amount of overhead (storing
159        // the reference) for effectively no benefits.
160        if let Some(max_log) = stats
161            .erased()
162            .max_ilog2()
163            // Only skip FoR when min >= 0, otherwise BitPacking can't be applied without ZigZag.
164            .filter(|_| !stats.erased().min_is_negative())
165        {
166            let bitpack_bitwidth = max_log + 1;
167            if for_bitwidth >= bitpack_bitwidth {
168                return CompressionEstimate::Verdict(EstimateVerdict::Skip);
169            }
170        }
171
172        let full_width: u32 = data
173            .array_as_primitive()
174            .ptype()
175            .bit_width()
176            .try_into()
177            .vortex_expect("bit width must fit in u32");
178
179        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
180            full_width as f64 / for_bitwidth as f64,
181        ))
182    }
183
184    fn compress(
185        &self,
186        compressor: &CascadingCompressor,
187        data: &ArrayAndStats,
188        compress_ctx: CompressorContext,
189        exec_ctx: &mut ExecutionCtx,
190    ) -> VortexResult<ArrayRef> {
191        let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
192        let for_array = FoR::encode(primitive)?;
193        let biased = for_array
194            .encoded()
195            .clone()
196            .execute::<PrimitiveArray>(exec_ctx)?;
197
198        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
199        // of bitpacking.
200        // NOTE: we could delegate in the future if we had another downstream codec that performs
201        //  as well.
202        let leaf_ctx = compress_ctx.clone().as_leaf();
203        let biased_data =
204            ArrayAndStats::new(biased.into_array(), compress_ctx.merged_stats_options());
205        let compressed = BitPackingScheme.compress(compressor, &biased_data, leaf_ctx, exec_ctx)?;
206
207        // TODO(connor): This should really be `new_unchecked`.
208        let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
209        for_compressed
210            .as_ref()
211            .statistics()
212            .inherit_from(for_array.as_ref().statistics());
213
214        Ok(for_compressed.into_array())
215    }
216}
217
218impl Scheme for ZigZagScheme {
219    fn scheme_name(&self) -> &'static str {
220        "vortex.int.zigzag"
221    }
222
223    fn matches(&self, canonical: &Canonical) -> bool {
224        is_integer_primitive(canonical)
225    }
226
227    /// Children: encoded=0.
228    fn num_children(&self) -> usize {
229        1
230    }
231
232    /// ZigZag is a bijective value transform that preserves cardinality, run patterns, and value
233    /// dominance. If Dict, RunEnd, or Sparse lost on the original array, they will lose on ZigZag's
234    /// output too, so we skip evaluating them.
235    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
236        vec![
237            DescendantExclusion {
238                excluded: IntDictScheme.id(),
239                children: ChildSelection::All,
240            },
241            DescendantExclusion {
242                excluded: RunEndScheme.id(),
243                children: ChildSelection::All,
244            },
245            DescendantExclusion {
246                excluded: SparseScheme.id(),
247                children: ChildSelection::All,
248            },
249        ]
250    }
251
252    /// Dict codes are unsigned integers (0..cardinality). ZigZag only helps negatives.
253    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
254        vec![
255            AncestorExclusion {
256                ancestor: IntDictScheme.id(),
257                children: ChildSelection::One(1),
258            },
259            AncestorExclusion {
260                ancestor: FloatDictScheme.id(),
261                children: ChildSelection::One(1),
262            },
263            AncestorExclusion {
264                ancestor: StringDictScheme.id(),
265                children: ChildSelection::One(1),
266            },
267        ]
268    }
269
270    fn expected_compression_ratio(
271        &self,
272        data: &ArrayAndStats,
273        compress_ctx: CompressorContext,
274        exec_ctx: &mut ExecutionCtx,
275    ) -> CompressionEstimate {
276        // ZigZag only transforms negative values to positive. Without further compression,
277        // the output is the same size.
278        if compress_ctx.finished_cascading() {
279            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
280        }
281        let stats = data.integer_stats(exec_ctx);
282
283        // ZigZag is only useful when there are negative values.
284        if !stats.erased().min_is_negative() {
285            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
286        }
287
288        CompressionEstimate::Deferred(DeferredEstimate::Sample)
289    }
290
291    fn compress(
292        &self,
293        compressor: &CascadingCompressor,
294        data: &ArrayAndStats,
295        compress_ctx: CompressorContext,
296        exec_ctx: &mut ExecutionCtx,
297    ) -> VortexResult<ArrayRef> {
298        // Zigzag encode the values, then recursively compress the inner values.
299        let zag = zigzag_encode(data.array_as_primitive())?;
300        let encoded = zag.encoded().clone().execute::<PrimitiveArray>(exec_ctx)?;
301
302        let compressed = compressor.compress_child(
303            &encoded.into_array(),
304            &compress_ctx,
305            self.id(),
306            0,
307            exec_ctx,
308        )?;
309
310        Ok(ZigZag::try_new(compressed)?.into_array())
311    }
312}
313
314impl Scheme for BitPackingScheme {
315    fn scheme_name(&self) -> &'static str {
316        "vortex.int.bitpacking"
317    }
318
319    fn matches(&self, canonical: &Canonical) -> bool {
320        is_integer_primitive(canonical)
321    }
322
323    fn expected_compression_ratio(
324        &self,
325        data: &ArrayAndStats,
326        _compress_ctx: CompressorContext,
327        exec_ctx: &mut ExecutionCtx,
328    ) -> CompressionEstimate {
329        let stats = data.integer_stats(exec_ctx);
330
331        // BitPacking only works for non-negative values.
332        if stats.erased().min_is_negative() {
333            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
334        }
335
336        CompressionEstimate::Deferred(DeferredEstimate::Sample)
337    }
338
339    fn compress(
340        &self,
341        _compressor: &CascadingCompressor,
342        data: &ArrayAndStats,
343        _compress_ctx: CompressorContext,
344        exec_ctx: &mut ExecutionCtx,
345    ) -> VortexResult<ArrayRef> {
346        let primitive_array = data.array_as_primitive();
347
348        let histogram = bit_width_histogram(primitive_array, exec_ctx)?;
349        let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
350
351        // If best bw is determined to be the current bit-width, return the original array.
352        if bw as usize == primitive_array.ptype().bit_width() {
353            return Ok(primitive_array.array().clone());
354        }
355
356        // Otherwise we can bitpack the array.
357        let primitive_array = primitive_array.into_owned();
358        let packed = bitpack_encode(&primitive_array, bw, Some(&histogram), exec_ctx)?;
359
360        let packed_stats = packed.statistics().to_owned();
361        let ptype = packed.dtype().as_ptype();
362        let mut parts = BitPacked::into_parts(packed);
363
364        let array = if use_experimental_patches() {
365            let patches = parts.patches.take();
366            // Transpose patches into G-ALP style PatchedArray, wrapping an inner BitPackedArray.
367            let array = BitPacked::try_new(
368                parts.packed,
369                ptype,
370                parts.validity,
371                None,
372                parts.bit_width,
373                parts.len,
374                parts.offset,
375            )?
376            .into_array();
377
378            match patches {
379                None => array,
380                Some(p) => Patched::from_array_and_patches(array, &p, exec_ctx)?
381                    .with_stats_set(packed_stats)
382                    .into_array(),
383            }
384        } else {
385            // Compress patches and place back into BitPackedArray.
386            let patches = parts
387                .patches
388                .take()
389                .map(|p| compress_patches(p, exec_ctx))
390                .transpose()?;
391            parts.patches = patches;
392            BitPacked::try_new(
393                parts.packed,
394                ptype,
395                parts.validity,
396                parts.patches,
397                parts.bit_width,
398                parts.len,
399                parts.offset,
400            )?
401            .with_stats_set(packed_stats)
402            .into_array()
403        };
404
405        Ok(array)
406    }
407}
408
409impl Scheme for SparseScheme {
410    fn scheme_name(&self) -> &'static str {
411        "vortex.int.sparse"
412    }
413
414    fn matches(&self, canonical: &Canonical) -> bool {
415        is_integer_primitive(canonical)
416    }
417
418    fn stats_options(&self) -> GenerateStatsOptions {
419        GenerateStatsOptions {
420            count_distinct_values: true,
421        }
422    }
423
424    /// Children: values=0, indices=1.
425    fn num_children(&self) -> usize {
426        2
427    }
428
429    /// Sparse indices (child 1) are monotonically increasing positions with all unique values.
430    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
431    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
432        vec![
433            DescendantExclusion {
434                excluded: IntDictScheme.id(),
435                children: ChildSelection::One(1),
436            },
437            DescendantExclusion {
438                excluded: RunEndScheme.id(),
439                children: ChildSelection::One(1),
440            },
441            DescendantExclusion {
442                excluded: IntRLEScheme.id(),
443                children: ChildSelection::One(1),
444            },
445            DescendantExclusion {
446                excluded: SparseScheme.id(),
447                children: ChildSelection::One(1),
448            },
449        ]
450    }
451
452    fn expected_compression_ratio(
453        &self,
454        data: &ArrayAndStats,
455        _compress_ctx: CompressorContext,
456        exec_ctx: &mut ExecutionCtx,
457    ) -> CompressionEstimate {
458        let len = data.array_len() as f64;
459        let stats = data.integer_stats(exec_ctx);
460        let value_count = stats.value_count();
461
462        // All-null arrays should be compressed as constant instead anyways.
463        if value_count == 0 {
464            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
465        }
466
467        // If the majority (90%) of values is null, this will compress well.
468        if stats.null_count() as f64 / len > 0.9 {
469            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
470        }
471
472        let (_, most_frequent_count) = stats
473            .erased()
474            .most_frequent_value_and_count()
475            .vortex_expect(
476                "this must be present since `SparseScheme` declared that we need distinct values",
477            );
478
479        // If the most frequent value is the only value, we should compress as constant instead.
480        if most_frequent_count == value_count {
481            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
482        }
483        debug_assert!(value_count > most_frequent_count);
484
485        // See if the most frequent value accounts for >= 90% of the set values.
486        let freq = most_frequent_count as f64 / value_count as f64;
487        if freq < 0.9 {
488            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
489        }
490
491        // We only store the positions of the non-top values.
492        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
493            value_count as f64 / (value_count - most_frequent_count) as f64,
494        ))
495    }
496
497    fn compress(
498        &self,
499        compressor: &CascadingCompressor,
500        data: &ArrayAndStats,
501        compress_ctx: CompressorContext,
502        exec_ctx: &mut ExecutionCtx,
503    ) -> VortexResult<ArrayRef> {
504        let len = data.array_len();
505        let stats = data.integer_stats(exec_ctx);
506        let array = data.array();
507
508        let (most_frequent_value, most_frequent_count) = stats
509            .erased()
510            .most_frequent_value_and_count()
511            .vortex_expect(
512                "this must be present since `SparseScheme` declared that we need distinct values",
513            );
514
515        if most_frequent_count as usize == len {
516            // If the most frequent value is the only value, we should compress as constant instead.
517            return Ok(ConstantArray::new(
518                Scalar::primitive_value(
519                    most_frequent_value,
520                    most_frequent_value.ptype(),
521                    array.dtype().nullability(),
522                ),
523                len,
524            )
525            .into_array());
526        }
527
528        let sparse_encoded = Sparse::encode(
529            array,
530            Some(Scalar::primitive_value(
531                most_frequent_value,
532                most_frequent_value.ptype(),
533                array.dtype().nullability(),
534            )),
535            exec_ctx,
536        )?;
537
538        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
539            let sparse_values_primitive = sparse
540                .patches()
541                .values()
542                .clone()
543                .execute::<PrimitiveArray>(exec_ctx)?;
544            let compressed_values = compressor.compress_child(
545                &sparse_values_primitive.into_array(),
546                &compress_ctx,
547                self.id(),
548                0,
549                exec_ctx,
550            )?;
551
552            let indices = sparse
553                .patches()
554                .indices()
555                .clone()
556                .execute::<PrimitiveArray>(exec_ctx)?
557                .narrow()?;
558
559            let compressed_indices = compressor.compress_child(
560                &indices.into_array(),
561                &compress_ctx,
562                self.id(),
563                1,
564                exec_ctx,
565            )?;
566
567            Sparse::try_new(
568                compressed_indices,
569                compressed_values,
570                sparse.len(),
571                sparse.fill_scalar().clone(),
572            )
573            .map(|a| a.into_array())
574        } else {
575            Ok(sparse_encoded)
576        }
577    }
578}
579
580impl Scheme for RunEndScheme {
581    fn scheme_name(&self) -> &'static str {
582        "vortex.int.runend"
583    }
584
585    fn matches(&self, canonical: &Canonical) -> bool {
586        is_integer_primitive(canonical)
587    }
588
589    /// Children: values=0, ends=1.
590    fn num_children(&self) -> usize {
591        2
592    }
593
594    /// RunEnd ends (child 1) are monotonically increasing positions with all unique values.
595    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
596    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
597        vec![
598            DescendantExclusion {
599                excluded: IntDictScheme.id(),
600                children: ChildSelection::One(1),
601            },
602            DescendantExclusion {
603                excluded: RunEndScheme.id(),
604                children: ChildSelection::One(1),
605            },
606            DescendantExclusion {
607                excluded: IntRLEScheme.id(),
608                children: ChildSelection::One(1),
609            },
610            DescendantExclusion {
611                excluded: SparseScheme.id(),
612                children: ChildSelection::One(1),
613            },
614        ]
615    }
616
617    /// Dict values (child 0) are all unique by definition, so run-end encoding them is
618    /// pointless. Codes (child 1) can have runs and may benefit from RunEnd.
619    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
620        vec![
621            AncestorExclusion {
622                ancestor: IntDictScheme.id(),
623                children: ChildSelection::One(0),
624            },
625            AncestorExclusion {
626                ancestor: FloatDictScheme.id(),
627                children: ChildSelection::One(0),
628            },
629            AncestorExclusion {
630                ancestor: StringDictScheme.id(),
631                children: ChildSelection::One(0),
632            },
633        ]
634    }
635
636    fn expected_compression_ratio(
637        &self,
638        data: &ArrayAndStats,
639        _compress_ctx: CompressorContext,
640        exec_ctx: &mut ExecutionCtx,
641    ) -> CompressionEstimate {
642        // If the run length is below the threshold, drop it.
643        if data.integer_stats(exec_ctx).average_run_length() < RUN_END_THRESHOLD {
644            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
645        }
646
647        CompressionEstimate::Deferred(DeferredEstimate::Sample)
648    }
649
650    fn compress(
651        &self,
652        compressor: &CascadingCompressor,
653        data: &ArrayAndStats,
654        compress_ctx: CompressorContext,
655        exec_ctx: &mut ExecutionCtx,
656    ) -> VortexResult<ArrayRef> {
657        // Run-end encode the ends.
658        let (ends, values) = runend_encode(data.array_as_primitive(), exec_ctx);
659
660        let values_primitive = values.execute::<PrimitiveArray>(exec_ctx)?;
661        let compressed_values = compressor.compress_child(
662            &values_primitive.into_array(),
663            &compress_ctx,
664            self.id(),
665            0,
666            exec_ctx,
667        )?;
668
669        let compressed_ends =
670            compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?;
671
672        // SAFETY: compression doesn't affect invariants.
673        Ok(unsafe {
674            RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
675                .into_array()
676        })
677    }
678}
679
680impl Scheme for SequenceScheme {
681    fn scheme_name(&self) -> &'static str {
682        "vortex.int.sequence"
683    }
684
685    fn matches(&self, canonical: &Canonical) -> bool {
686        is_integer_primitive(canonical)
687    }
688
689    /// Sequence encoding on dictionary codes just adds a layer of indirection without compressing
690    /// the data. Dict codes are compact integers that benefit from BitPacking or FoR, not from
691    /// sequence detection.
692    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
693        vec![
694            AncestorExclusion {
695                ancestor: IntDictScheme.id(),
696                children: ChildSelection::One(1),
697            },
698            AncestorExclusion {
699                ancestor: FloatDictScheme.id(),
700                children: ChildSelection::One(1),
701            },
702            AncestorExclusion {
703                ancestor: StringDictScheme.id(),
704                children: ChildSelection::One(1),
705            },
706        ]
707    }
708
709    fn expected_compression_ratio(
710        &self,
711        data: &ArrayAndStats,
712        compress_ctx: CompressorContext,
713        exec_ctx: &mut ExecutionCtx,
714    ) -> CompressionEstimate {
715        // It is pointless checking if a sample is a sequence since it will not correspond to the
716        // entire array.
717        if compress_ctx.is_sample() {
718            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
719        }
720        let stats = data.integer_stats(exec_ctx);
721
722        // `SequenceArray` does not support nulls.
723        if stats.null_count() > 0 {
724            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
725        }
726
727        // If the distinct_values_count was computed, and not all values are unique, then this
728        // cannot be encoded as a sequence array.
729        if stats
730            .distinct_count()
731            .is_some_and(|count| count as usize != data.array_len())
732        {
733            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
734        }
735
736        // TODO(connor): `sequence_encode` allocates the encoded array just to confirm feasibility.
737        // A cheaper `is_sequence` probe would let us skip the allocation entirely.
738        CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
739            |_compressor, data, best_so_far, _ctx, exec_ctx| {
740                // `SequenceArray` stores exactly two scalars (base and multiplier), so the best
741                // achievable compression ratio is `array_len / 2`.
742                let compressed_size = 2usize;
743                let max_ratio = data.array_len() as f64 / compressed_size as f64;
744
745                // If we cannot beat the best so far, then we do not want to even try sequence
746                // encoding the data.
747                let threshold = best_so_far.and_then(EstimateScore::finite_ratio);
748                if threshold.is_some_and(|t| max_ratio <= t) {
749                    return Ok(EstimateVerdict::Skip);
750                }
751
752                // TODO(connor): We should pass this array back to the compressor in the case that
753                // we do want to sequence encode this so that we do not need to recompress.
754                if sequence_encode(data.array_as_primitive(), exec_ctx)?.is_none() {
755                    return Ok(EstimateVerdict::Skip);
756                }
757                // TODO(connor): Should we get the actual ratio here?
758                Ok(EstimateVerdict::Ratio(max_ratio))
759            },
760        )))
761    }
762
763    fn compress(
764        &self,
765        _compressor: &CascadingCompressor,
766        data: &ArrayAndStats,
767        _compress_ctx: CompressorContext,
768        exec_ctx: &mut ExecutionCtx,
769    ) -> VortexResult<ArrayRef> {
770        let stats = data.integer_stats(exec_ctx);
771
772        if stats.null_count() > 0 {
773            vortex_bail!("sequence encoding does not support nulls");
774        }
775        sequence_encode(data.array_as_primitive(), exec_ctx)?
776            .ok_or_else(|| vortex_err!("cannot sequence encode array"))
777    }
778}
779
780#[cfg(feature = "pco")]
781impl Scheme for PcoScheme {
782    fn scheme_name(&self) -> &'static str {
783        "vortex.int.pco"
784    }
785
786    fn matches(&self, canonical: &Canonical) -> bool {
787        is_integer_primitive(canonical)
788    }
789
790    fn expected_compression_ratio(
791        &self,
792        data: &ArrayAndStats,
793        _compress_ctx: CompressorContext,
794        _exec_ctx: &mut ExecutionCtx,
795    ) -> CompressionEstimate {
796        use vortex_array::dtype::PType;
797
798        // Pco does not support I8 or U8.
799        if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
800            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
801        }
802
803        CompressionEstimate::Deferred(DeferredEstimate::Sample)
804    }
805
806    fn compress(
807        &self,
808        _compressor: &CascadingCompressor,
809        data: &ArrayAndStats,
810        _compress_ctx: CompressorContext,
811        exec_ctx: &mut ExecutionCtx,
812    ) -> VortexResult<ArrayRef> {
813        Ok(vortex_pco::Pco::from_primitive(
814            data.array_as_primitive(),
815            pco::DEFAULT_COMPRESSION_LEVEL,
816            8192,
817            exec_ctx,
818        )?
819        .into_array())
820    }
821}
822
823/// Shared compression logic for RLE schemes.
824pub(crate) fn rle_compress(
825    scheme: &dyn Scheme,
826    compressor: &CascadingCompressor,
827    data: &ArrayAndStats,
828    compress_ctx: CompressorContext,
829    exec_ctx: &mut ExecutionCtx,
830) -> VortexResult<ArrayRef> {
831    let rle_array = RLE::encode(data.array_as_primitive(), exec_ctx)?;
832
833    let rle_values_primitive = rle_array
834        .values()
835        .clone()
836        .execute::<PrimitiveArray>(exec_ctx)?;
837    let compressed_values = compressor.compress_child(
838        &rle_values_primitive.into_array(),
839        &compress_ctx,
840        scheme.id(),
841        0,
842        exec_ctx,
843    )?;
844
845    // Delta is an unstable encoding, once we deem it stable we can switch over to this always.
846    #[cfg(feature = "unstable_encodings")]
847    let compressed_indices = {
848        let rle_indices_primitive = rle_array
849            .indices()
850            .clone()
851            .execute::<PrimitiveArray>(exec_ctx)?
852            .narrow()?;
853        try_compress_delta(
854            compressor,
855            &rle_indices_primitive.into_array(),
856            &compress_ctx,
857            scheme.id(),
858            1,
859            exec_ctx,
860        )?
861    };
862
863    #[cfg(not(feature = "unstable_encodings"))]
864    let compressed_indices = {
865        let rle_indices_primitive = rle_array
866            .indices()
867            .clone()
868            .execute::<PrimitiveArray>(exec_ctx)?
869            .narrow()?;
870        compressor.compress_child(
871            &rle_indices_primitive.into_array(),
872            &compress_ctx,
873            scheme.id(),
874            1,
875            exec_ctx,
876        )?
877    };
878
879    let rle_offsets_primitive = rle_array
880        .values_idx_offsets()
881        .clone()
882        .execute::<PrimitiveArray>(exec_ctx)?
883        .narrow()?;
884    let compressed_offsets = compressor.compress_child(
885        &rle_offsets_primitive.into_array(),
886        &compress_ctx,
887        scheme.id(),
888        2,
889        exec_ctx,
890    )?;
891
892    // SAFETY: Recursive compression doesn't affect the invariants.
893    unsafe {
894        Ok(RLE::new_unchecked(
895            compressed_values,
896            compressed_indices,
897            compressed_offsets,
898            rle_array.offset(),
899            rle_array.len(),
900        )
901        .into_array())
902    }
903}
904
905#[cfg(feature = "unstable_encodings")]
906fn try_compress_delta(
907    compressor: &CascadingCompressor,
908    child: &ArrayRef,
909    parent_ctx: &CompressorContext,
910    parent_id: SchemeId,
911    child_index: usize,
912    exec_ctx: &mut ExecutionCtx,
913) -> VortexResult<ArrayRef> {
914    let child_primitive = child.clone().execute::<PrimitiveArray>(exec_ctx)?;
915    let (bases, deltas) = vortex_fastlanes::delta_compress(&child_primitive, exec_ctx)?;
916
917    let compressed_bases = compressor.compress_child(
918        &bases.into_array(),
919        parent_ctx,
920        parent_id,
921        child_index,
922        exec_ctx,
923    )?;
924    let compressed_deltas = compressor.compress_child(
925        &deltas.into_array(),
926        parent_ctx,
927        parent_id,
928        child_index,
929        exec_ctx,
930    )?;
931
932    Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
933}
934
935impl Scheme for IntRLEScheme {
936    fn scheme_name(&self) -> &'static str {
937        "vortex.int.rle"
938    }
939
940    fn matches(&self, canonical: &Canonical) -> bool {
941        is_integer_primitive(canonical)
942    }
943
944    /// Children: values=0, indices=1, offsets=2.
945    fn num_children(&self) -> usize {
946        3
947    }
948
949    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
950        rle_descendant_exclusions()
951    }
952
953    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
954        rle_ancestor_exclusions()
955    }
956
957    fn expected_compression_ratio(
958        &self,
959        data: &ArrayAndStats,
960        compress_ctx: CompressorContext,
961        exec_ctx: &mut ExecutionCtx,
962    ) -> CompressionEstimate {
963        // RLE is only useful when we cascade it with another encoding.
964        if compress_ctx.finished_cascading() {
965            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
966        }
967        if data.integer_stats(exec_ctx).average_run_length() < RUN_LENGTH_THRESHOLD {
968            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
969        }
970
971        CompressionEstimate::Deferred(DeferredEstimate::Sample)
972    }
973
974    fn compress(
975        &self,
976        compressor: &CascadingCompressor,
977        data: &ArrayAndStats,
978        compress_ctx: CompressorContext,
979        exec_ctx: &mut ExecutionCtx,
980    ) -> VortexResult<ArrayRef> {
981        rle_compress(self, compressor, data, compress_ctx, exec_ctx)
982    }
983}
984
985#[cfg(test)]
986mod tests {
987    use std::iter;
988    use std::sync::LazyLock;
989
990    use itertools::Itertools;
991    use rand::Rng;
992    use rand::SeedableRng;
993    use rand::rngs::StdRng;
994    use vortex_array::IntoArray;
995    use vortex_array::VortexSessionExecute;
996    use vortex_array::arrays::Constant;
997    use vortex_array::arrays::Dict;
998    use vortex_array::arrays::Masked;
999    use vortex_array::arrays::PrimitiveArray;
1000    use vortex_array::assert_arrays_eq;
1001    use vortex_array::session::ArraySession;
1002    use vortex_array::validity::Validity;
1003    use vortex_buffer::Buffer;
1004    use vortex_buffer::BufferMut;
1005    use vortex_buffer::buffer;
1006    use vortex_compressor::CascadingCompressor;
1007    use vortex_error::VortexResult;
1008    use vortex_fastlanes::RLE;
1009    use vortex_sequence::Sequence;
1010    use vortex_session::VortexSession;
1011
1012    use crate::BtrBlocksCompressor;
1013    use crate::schemes::integer::IntRLEScheme;
1014
1015    static SESSION: LazyLock<VortexSession> =
1016        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1017
1018    #[test]
1019    fn test_empty() -> VortexResult<()> {
1020        // Make sure empty array compression does not fail.
1021        let btr = BtrBlocksCompressor::default();
1022        let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
1023        let result = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1024
1025        assert!(result.is_empty());
1026        Ok(())
1027    }
1028
1029    #[test]
1030    fn test_dict_encodable() -> VortexResult<()> {
1031        let mut codes = BufferMut::<i32>::with_capacity(65_535);
1032        // Write some runs of length 3 of a handful of different values. Interrupted by some
1033        // one-off values.
1034
1035        let numbers = [0, 10, 50, 100, 1000, 3000]
1036            .into_iter()
1037            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1038            .collect_vec();
1039
1040        let mut rng = StdRng::seed_from_u64(1u64);
1041        while codes.len() < 64000 {
1042            let run_length = rng.next_u32() % 5;
1043            let value = numbers[rng.next_u32() as usize % numbers.len()];
1044            for _ in 0..run_length {
1045                codes.push(value);
1046            }
1047        }
1048
1049        let btr = BtrBlocksCompressor::default();
1050        let compressed = btr.compress(
1051            &codes.freeze().into_array(),
1052            &mut SESSION.create_execution_ctx(),
1053        )?;
1054        assert!(compressed.is::<Dict>());
1055        Ok(())
1056    }
1057
1058    #[test]
1059    fn constant_mostly_nulls() -> VortexResult<()> {
1060        let array = PrimitiveArray::new(
1061            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1062            Validity::from_iter(vec![
1063                false, false, false, false, false, false, false, false, false, false, true,
1064            ]),
1065        );
1066        let validity = array.validity()?;
1067
1068        let btr = BtrBlocksCompressor::default();
1069        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1070
1071        assert!(compressed.is::<Masked>());
1072        assert!(compressed.children()[0].is::<Constant>());
1073
1074        let decoded = compressed;
1075        let expected =
1076            PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
1077        assert_arrays_eq!(decoded, expected);
1078        Ok(())
1079    }
1080
1081    #[test]
1082    fn nullable_sequence() -> VortexResult<()> {
1083        let values = (0i32..20).step_by(7).collect_vec();
1084        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1085
1086        let btr = BtrBlocksCompressor::default();
1087        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1088        assert!(compressed.is::<Sequence>());
1089
1090        let decoded = compressed;
1091        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1092        assert_arrays_eq!(decoded, expected);
1093        Ok(())
1094    }
1095
1096    #[test]
1097    fn test_rle_compression() -> VortexResult<()> {
1098        let mut values = Vec::new();
1099        values.extend(iter::repeat_n(42i32, 100));
1100        values.extend(iter::repeat_n(123i32, 200));
1101        values.extend(iter::repeat_n(987i32, 150));
1102
1103        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1104        let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
1105        let compressed =
1106            compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1107        assert!(compressed.is::<RLE>());
1108
1109        let expected = Buffer::copy_from(&values).into_array();
1110        assert_arrays_eq!(compressed, expected);
1111        Ok(())
1112    }
1113
1114    #[test_with::env(CI)]
1115    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1116    fn compress_large_int() -> VortexResult<()> {
1117        const NUM_LISTS: usize = 10_000;
1118        const ELEMENTS_PER_LIST: usize = 5_000;
1119
1120        let prim = (0..NUM_LISTS)
1121            .flat_map(|list_idx| {
1122                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1123            })
1124            .collect::<PrimitiveArray>()
1125            .into_array();
1126
1127        let btr = BtrBlocksCompressor::default();
1128        btr.compress(&prim, &mut SESSION.create_execution_ctx())?;
1129
1130        Ok(())
1131    }
1132}
1133
1134/// Tests to verify that each integer compression scheme produces the expected encoding.
1135#[cfg(test)]
1136mod scheme_selection_tests {
1137    use std::iter;
1138    use std::sync::LazyLock;
1139
1140    use rand::Rng;
1141    use rand::SeedableRng;
1142    use rand::rngs::StdRng;
1143    use vortex_array::IntoArray;
1144    use vortex_array::VortexSessionExecute;
1145    use vortex_array::arrays::Constant;
1146    use vortex_array::arrays::Dict;
1147    use vortex_array::arrays::PrimitiveArray;
1148    use vortex_array::expr::stats::Precision;
1149    use vortex_array::expr::stats::Stat;
1150    use vortex_array::expr::stats::StatsProviderExt;
1151    use vortex_array::session::ArraySession;
1152    use vortex_array::validity::Validity;
1153    use vortex_buffer::Buffer;
1154    use vortex_error::VortexResult;
1155    use vortex_fastlanes::BitPacked;
1156    use vortex_fastlanes::FoR;
1157    use vortex_runend::RunEnd;
1158    use vortex_sequence::Sequence;
1159    use vortex_session::VortexSession;
1160    use vortex_sparse::Sparse;
1161
1162    use crate::BtrBlocksCompressor;
1163
1164    static SESSION: LazyLock<VortexSession> =
1165        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1166
1167    #[test]
1168    fn test_constant_compressed() -> VortexResult<()> {
1169        let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1170        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1171        let btr = BtrBlocksCompressor::default();
1172        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1173        assert!(compressed.is::<Constant>());
1174        Ok(())
1175    }
1176
1177    #[test]
1178    fn test_for_compressed() -> VortexResult<()> {
1179        let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1180        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1181        let btr = BtrBlocksCompressor::default();
1182        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1183        assert!(compressed.is::<FoR>());
1184        Ok(())
1185    }
1186
1187    #[test]
1188    fn test_bitpacking_compressed() -> VortexResult<()> {
1189        let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1190        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1191        let btr = BtrBlocksCompressor::default();
1192        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1193        assert!(compressed.is::<BitPacked>());
1194        assert_eq!(
1195            compressed.statistics().get_as::<u64>(Stat::NullCount),
1196            Some(Precision::exact(0u64))
1197        );
1198        assert_eq!(
1199            compressed.statistics().get_as::<u32>(Stat::Min),
1200            Some(Precision::exact(0u32))
1201        );
1202        assert_eq!(
1203            compressed.statistics().get_as::<u32>(Stat::Max),
1204            Some(Precision::exact(15u32))
1205        );
1206        Ok(())
1207    }
1208
1209    #[test]
1210    fn test_sparse_compressed() -> VortexResult<()> {
1211        let mut values: Vec<i32> = Vec::new();
1212        for i in 0..1000 {
1213            if i % 20 == 0 {
1214                values.push(2_000_000 + (i * 7) % 1000);
1215            } else {
1216                values.push(1_000_000);
1217            }
1218        }
1219        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1220        let btr = BtrBlocksCompressor::default();
1221        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1222        assert!(compressed.is::<Sparse>());
1223        Ok(())
1224    }
1225
1226    #[test]
1227    fn test_dict_compressed() -> VortexResult<()> {
1228        let mut codes = Vec::with_capacity(65_535);
1229        let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1230            .into_iter()
1231            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1232            .collect();
1233
1234        let mut rng = StdRng::seed_from_u64(1u64);
1235        while codes.len() < 64000 {
1236            let run_length = rng.next_u32() % 5;
1237            let value = numbers[rng.next_u32() as usize % numbers.len()];
1238            for _ in 0..run_length {
1239                codes.push(value);
1240            }
1241        }
1242
1243        let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1244        let btr = BtrBlocksCompressor::default();
1245        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1246        assert!(compressed.is::<Dict>());
1247        Ok(())
1248    }
1249
1250    #[test]
1251    fn test_runend_compressed() -> VortexResult<()> {
1252        let mut values: Vec<i32> = Vec::new();
1253        for i in 0..100 {
1254            values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1255        }
1256        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1257        let btr = BtrBlocksCompressor::default();
1258        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1259        assert!(compressed.is::<RunEnd>());
1260        Ok(())
1261    }
1262
1263    #[test]
1264    fn test_sequence_compressed() -> VortexResult<()> {
1265        let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1266        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1267        let btr = BtrBlocksCompressor::default();
1268        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1269        assert!(compressed.is::<Sequence>());
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn test_rle_compressed() -> VortexResult<()> {
1275        let mut values: Vec<i32> = Vec::new();
1276        for i in 0..1024 {
1277            values.extend(iter::repeat_n(i, 10));
1278        }
1279        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1280        let btr = BtrBlocksCompressor::default();
1281        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1282        eprintln!("{}", compressed.display_tree());
1283        assert!(compressed.is::<RunEnd>());
1284        Ok(())
1285    }
1286}