Skip to main content

vortex_btrblocks/schemes/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integer compression schemes.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Patched;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::patched::use_experimental_patches;
14use vortex_array::arrays::primitive::PrimitiveArrayExt;
15use vortex_array::scalar::Scalar;
16use vortex_compressor::builtins::FloatDictScheme;
17use vortex_compressor::builtins::StringDictScheme;
18use vortex_compressor::estimate::CompressionEstimate;
19use vortex_compressor::estimate::DeferredEstimate;
20use vortex_compressor::estimate::EstimateScore;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_compressor::scheme::AncestorExclusion;
23use vortex_compressor::scheme::ChildSelection;
24use vortex_compressor::scheme::DescendantExclusion;
25#[cfg(feature = "unstable_encodings")]
26use vortex_compressor::scheme::SchemeId;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_err;
31use vortex_fastlanes::BitPacked;
32#[cfg(feature = "unstable_encodings")]
33use vortex_fastlanes::Delta;
34use vortex_fastlanes::FoR;
35use vortex_fastlanes::FoRArrayExt;
36use vortex_fastlanes::RLE;
37use vortex_fastlanes::RLEArrayExt;
38use vortex_fastlanes::bitpack_compress::bit_width_histogram;
39use vortex_fastlanes::bitpack_compress::bitpack_encode;
40use vortex_fastlanes::bitpack_compress::find_best_bit_width;
41use vortex_runend::RunEnd;
42use vortex_runend::compress::runend_encode;
43use vortex_sequence::sequence_encode;
44use vortex_sparse::Sparse;
45use vortex_zigzag::ZigZag;
46use vortex_zigzag::ZigZagArrayExt;
47use vortex_zigzag::zigzag_encode;
48
49use crate::ArrayAndStats;
50use crate::CascadingCompressor;
51use crate::CompressorContext;
52use crate::GenerateStatsOptions;
53use crate::Scheme;
54use crate::SchemeExt;
55use crate::compress_patches;
56use crate::schemes::rle_ancestor_exclusions;
57use crate::schemes::rle_descendant_exclusions;
58
59/// Frame of Reference encoding.
60#[derive(Debug, Copy, Clone, PartialEq, Eq)]
61pub struct FoRScheme;
62
63/// ZigZag encoding for negative integers.
64#[derive(Debug, Copy, Clone, PartialEq, Eq)]
65pub struct ZigZagScheme;
66
67/// BitPacking encoding for non-negative integers.
68#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct BitPackingScheme;
70
71/// Sparse encoding for single-value-dominated arrays.
72#[derive(Debug, Copy, Clone, PartialEq, Eq)]
73pub struct SparseScheme;
74
75/// Run-end encoding with end positions.
76#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub struct RunEndScheme;
78
79/// Sequence encoding for sequential patterns.
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81pub struct SequenceScheme;
82
83/// Pco (pcodec) compression for integers.
84#[cfg(feature = "pco")]
85#[derive(Debug, Copy, Clone, PartialEq, Eq)]
86pub struct PcoScheme;
87
88// Re-export builtin schemes from vortex-compressor.
89pub use vortex_compressor::builtins::IntConstantScheme;
90pub use vortex_compressor::builtins::IntDictScheme;
91pub use vortex_compressor::builtins::is_integer_primitive;
92pub use vortex_compressor::stats::IntegerStats;
93
94/// RLE scheme for integer arrays.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub struct IntRLEScheme;
97
98/// Threshold for the average run length in an array before we consider run-length encoding.
99pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
100
101/// Threshold for the average run length in an array before we consider run-end encoding.
102const RUN_END_THRESHOLD: u32 = 4;
103
104impl Scheme for FoRScheme {
105    fn scheme_name(&self) -> &'static str {
106        "vortex.int.for"
107    }
108
109    fn matches(&self, canonical: &Canonical) -> bool {
110        is_integer_primitive(canonical)
111    }
112
113    /// Dict codes always start at 0, so FoR (which subtracts the min) is a no-op.
114    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
115        vec![
116            AncestorExclusion {
117                ancestor: IntDictScheme.id(),
118                children: ChildSelection::One(1),
119            },
120            AncestorExclusion {
121                ancestor: FloatDictScheme.id(),
122                children: ChildSelection::One(1),
123            },
124            AncestorExclusion {
125                ancestor: StringDictScheme.id(),
126                children: ChildSelection::One(1),
127            },
128        ]
129    }
130
131    fn expected_compression_ratio(
132        &self,
133        data: &ArrayAndStats,
134        compress_ctx: CompressorContext,
135        exec_ctx: &mut ExecutionCtx,
136    ) -> CompressionEstimate {
137        // FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
138        // the same size.
139        if compress_ctx.finished_cascading() {
140            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
141        }
142        let stats = data.integer_stats(exec_ctx);
143
144        // Only apply when the min is not already zero.
145        if stats.erased().min_is_zero() {
146            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
147        }
148
149        // Difference between max and min.
150        let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
151            Some(l) => l + 1,
152            // If max-min == 0, the we should be compressing this as a constant array.
153            None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
154        };
155
156        // If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
157        // compared to BitPacking, don't use FoR since it has a small amount of overhead (storing
158        // the reference) for effectively no benefits.
159        if let Some(max_log) = stats
160            .erased()
161            .max_ilog2()
162            // Only skip FoR when min >= 0, otherwise BitPacking can't be applied without ZigZag.
163            .filter(|_| !stats.erased().min_is_negative())
164        {
165            let bitpack_bitwidth = max_log + 1;
166            if for_bitwidth >= bitpack_bitwidth {
167                return CompressionEstimate::Verdict(EstimateVerdict::Skip);
168            }
169        }
170
171        let full_width: u32 = data
172            .array_as_primitive()
173            .ptype()
174            .bit_width()
175            .try_into()
176            .vortex_expect("bit width must fit in u32");
177
178        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
179            full_width as f64 / for_bitwidth as f64,
180        ))
181    }
182
183    fn compress(
184        &self,
185        compressor: &CascadingCompressor,
186        data: &ArrayAndStats,
187        compress_ctx: CompressorContext,
188        exec_ctx: &mut ExecutionCtx,
189    ) -> VortexResult<ArrayRef> {
190        let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
191        let for_array = FoR::encode(primitive)?;
192        let biased = for_array
193            .encoded()
194            .clone()
195            .execute::<PrimitiveArray>(exec_ctx)?;
196
197        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
198        // of bitpacking.
199        // NOTE: we could delegate in the future if we had another downstream codec that performs
200        //  as well.
201        let leaf_ctx = compress_ctx.clone().as_leaf();
202        let biased_data =
203            ArrayAndStats::new(biased.into_array(), compress_ctx.merged_stats_options());
204        let compressed = BitPackingScheme.compress(compressor, &biased_data, leaf_ctx, exec_ctx)?;
205
206        // TODO(connor): This should really be `new_unchecked`.
207        let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
208        for_compressed
209            .as_ref()
210            .statistics()
211            .inherit_from(for_array.as_ref().statistics());
212
213        Ok(for_compressed.into_array())
214    }
215}
216
217impl Scheme for ZigZagScheme {
218    fn scheme_name(&self) -> &'static str {
219        "vortex.int.zigzag"
220    }
221
222    fn matches(&self, canonical: &Canonical) -> bool {
223        is_integer_primitive(canonical)
224    }
225
226    /// Children: encoded=0.
227    fn num_children(&self) -> usize {
228        1
229    }
230
231    /// ZigZag is a bijective value transform that preserves cardinality, run patterns, and value
232    /// dominance. If Dict, RunEnd, or Sparse lost on the original array, they will lose on ZigZag's
233    /// output too, so we skip evaluating them.
234    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
235        vec![
236            DescendantExclusion {
237                excluded: IntDictScheme.id(),
238                children: ChildSelection::All,
239            },
240            DescendantExclusion {
241                excluded: RunEndScheme.id(),
242                children: ChildSelection::All,
243            },
244            DescendantExclusion {
245                excluded: SparseScheme.id(),
246                children: ChildSelection::All,
247            },
248        ]
249    }
250
251    /// Dict codes are unsigned integers (0..cardinality). ZigZag only helps negatives.
252    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
253        vec![
254            AncestorExclusion {
255                ancestor: IntDictScheme.id(),
256                children: ChildSelection::One(1),
257            },
258            AncestorExclusion {
259                ancestor: FloatDictScheme.id(),
260                children: ChildSelection::One(1),
261            },
262            AncestorExclusion {
263                ancestor: StringDictScheme.id(),
264                children: ChildSelection::One(1),
265            },
266        ]
267    }
268
269    fn expected_compression_ratio(
270        &self,
271        data: &ArrayAndStats,
272        compress_ctx: CompressorContext,
273        exec_ctx: &mut ExecutionCtx,
274    ) -> CompressionEstimate {
275        // ZigZag only transforms negative values to positive. Without further compression,
276        // the output is the same size.
277        if compress_ctx.finished_cascading() {
278            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
279        }
280        let stats = data.integer_stats(exec_ctx);
281
282        // ZigZag is only useful when there are negative values.
283        if !stats.erased().min_is_negative() {
284            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
285        }
286
287        CompressionEstimate::Deferred(DeferredEstimate::Sample)
288    }
289
290    fn compress(
291        &self,
292        compressor: &CascadingCompressor,
293        data: &ArrayAndStats,
294        compress_ctx: CompressorContext,
295        exec_ctx: &mut ExecutionCtx,
296    ) -> VortexResult<ArrayRef> {
297        // Zigzag encode the values, then recursively compress the inner values.
298        let zag = zigzag_encode(data.array_as_primitive())?;
299        let encoded = zag.encoded().clone().execute::<PrimitiveArray>(exec_ctx)?;
300
301        let compressed = compressor.compress_child(
302            &encoded.into_array(),
303            &compress_ctx,
304            self.id(),
305            0,
306            exec_ctx,
307        )?;
308
309        Ok(ZigZag::try_new(compressed)?.into_array())
310    }
311}
312
313impl Scheme for BitPackingScheme {
314    fn scheme_name(&self) -> &'static str {
315        "vortex.int.bitpacking"
316    }
317
318    fn matches(&self, canonical: &Canonical) -> bool {
319        is_integer_primitive(canonical)
320    }
321
322    fn expected_compression_ratio(
323        &self,
324        data: &ArrayAndStats,
325        _compress_ctx: CompressorContext,
326        exec_ctx: &mut ExecutionCtx,
327    ) -> CompressionEstimate {
328        let stats = data.integer_stats(exec_ctx);
329
330        // BitPacking only works for non-negative values.
331        if stats.erased().min_is_negative() {
332            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
333        }
334
335        CompressionEstimate::Deferred(DeferredEstimate::Sample)
336    }
337
338    fn compress(
339        &self,
340        _compressor: &CascadingCompressor,
341        data: &ArrayAndStats,
342        _compress_ctx: CompressorContext,
343        exec_ctx: &mut ExecutionCtx,
344    ) -> VortexResult<ArrayRef> {
345        let primitive_array = data.array_as_primitive();
346
347        let histogram = bit_width_histogram(primitive_array, exec_ctx)?;
348        let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
349
350        // If best bw is determined to be the current bit-width, return the original array.
351        if bw as usize == primitive_array.ptype().bit_width() {
352            return Ok(primitive_array.array().clone());
353        }
354
355        // Otherwise we can bitpack the array.
356        let primitive_array = primitive_array.into_owned();
357        let packed = bitpack_encode(&primitive_array, bw, Some(&histogram), exec_ctx)?;
358
359        let packed_stats = packed.statistics().to_owned();
360        let ptype = packed.dtype().as_ptype();
361        let mut parts = BitPacked::into_parts(packed);
362
363        let array = if use_experimental_patches() {
364            let patches = parts.patches.take();
365            // Transpose patches into G-ALP style PatchedArray, wrapping an inner BitPackedArray.
366            let array = BitPacked::try_new(
367                parts.packed,
368                ptype,
369                parts.validity,
370                None,
371                parts.bit_width,
372                parts.len,
373                parts.offset,
374            )?
375            .into_array();
376
377            match patches {
378                None => array,
379                Some(p) => Patched::from_array_and_patches(array, &p, exec_ctx)?
380                    .with_stats_set(packed_stats)
381                    .into_array(),
382            }
383        } else {
384            // Compress patches and place back into BitPackedArray.
385            let patches = parts
386                .patches
387                .take()
388                .map(|p| compress_patches(p, exec_ctx))
389                .transpose()?;
390            parts.patches = patches;
391            BitPacked::try_new(
392                parts.packed,
393                ptype,
394                parts.validity,
395                parts.patches,
396                parts.bit_width,
397                parts.len,
398                parts.offset,
399            )?
400            .with_stats_set(packed_stats)
401            .into_array()
402        };
403
404        Ok(array)
405    }
406}
407
408impl Scheme for SparseScheme {
409    fn scheme_name(&self) -> &'static str {
410        "vortex.int.sparse"
411    }
412
413    fn matches(&self, canonical: &Canonical) -> bool {
414        is_integer_primitive(canonical)
415    }
416
417    fn stats_options(&self) -> GenerateStatsOptions {
418        GenerateStatsOptions {
419            count_distinct_values: true,
420        }
421    }
422
423    /// Children: values=0, indices=1.
424    fn num_children(&self) -> usize {
425        2
426    }
427
428    /// Sparse indices (child 1) are monotonically increasing positions with all unique values.
429    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
430    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
431        vec![
432            DescendantExclusion {
433                excluded: IntDictScheme.id(),
434                children: ChildSelection::One(1),
435            },
436            DescendantExclusion {
437                excluded: RunEndScheme.id(),
438                children: ChildSelection::One(1),
439            },
440            DescendantExclusion {
441                excluded: IntRLEScheme.id(),
442                children: ChildSelection::One(1),
443            },
444            DescendantExclusion {
445                excluded: SparseScheme.id(),
446                children: ChildSelection::One(1),
447            },
448        ]
449    }
450
451    fn expected_compression_ratio(
452        &self,
453        data: &ArrayAndStats,
454        _compress_ctx: CompressorContext,
455        exec_ctx: &mut ExecutionCtx,
456    ) -> CompressionEstimate {
457        let len = data.array_len() as f64;
458        let stats = data.integer_stats(exec_ctx);
459        let value_count = stats.value_count();
460
461        // All-null arrays should be compressed as constant instead anyways.
462        if value_count == 0 {
463            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
464        }
465
466        // If the majority (90%) of values is null, this will compress well.
467        if stats.null_count() as f64 / len > 0.9 {
468            return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
469        }
470
471        let (_, most_frequent_count) = stats
472            .erased()
473            .most_frequent_value_and_count()
474            .vortex_expect(
475                "this must be present since `SparseScheme` declared that we need distinct values",
476            );
477
478        // If the most frequent value is the only value, we should compress as constant instead.
479        if most_frequent_count == value_count {
480            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
481        }
482        debug_assert!(value_count > most_frequent_count);
483
484        // See if the most frequent value accounts for >= 90% of the set values.
485        let freq = most_frequent_count as f64 / value_count as f64;
486        if freq < 0.9 {
487            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
488        }
489
490        // We only store the positions of the non-top values.
491        CompressionEstimate::Verdict(EstimateVerdict::Ratio(
492            value_count as f64 / (value_count - most_frequent_count) as f64,
493        ))
494    }
495
496    fn compress(
497        &self,
498        compressor: &CascadingCompressor,
499        data: &ArrayAndStats,
500        compress_ctx: CompressorContext,
501        exec_ctx: &mut ExecutionCtx,
502    ) -> VortexResult<ArrayRef> {
503        let len = data.array_len();
504        let stats = data.integer_stats(exec_ctx);
505        let array = data.array();
506
507        let (most_frequent_value, most_frequent_count) = stats
508            .erased()
509            .most_frequent_value_and_count()
510            .vortex_expect(
511                "this must be present since `SparseScheme` declared that we need distinct values",
512            );
513
514        if most_frequent_count as usize == len {
515            // If the most frequent value is the only value, we should compress as constant instead.
516            return Ok(ConstantArray::new(
517                Scalar::primitive_value(
518                    most_frequent_value,
519                    most_frequent_value.ptype(),
520                    array.dtype().nullability(),
521                ),
522                len,
523            )
524            .into_array());
525        }
526
527        let sparse_encoded = Sparse::encode(
528            array,
529            Some(Scalar::primitive_value(
530                most_frequent_value,
531                most_frequent_value.ptype(),
532                array.dtype().nullability(),
533            )),
534            exec_ctx,
535        )?;
536
537        if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
538            let sparse_values_primitive = sparse
539                .patches()
540                .values()
541                .clone()
542                .execute::<PrimitiveArray>(exec_ctx)?;
543            let compressed_values = compressor.compress_child(
544                &sparse_values_primitive.into_array(),
545                &compress_ctx,
546                self.id(),
547                0,
548                exec_ctx,
549            )?;
550
551            let indices = sparse
552                .patches()
553                .indices()
554                .clone()
555                .execute::<PrimitiveArray>(exec_ctx)?
556                .narrow()?;
557
558            let compressed_indices = compressor.compress_child(
559                &indices.into_array(),
560                &compress_ctx,
561                self.id(),
562                1,
563                exec_ctx,
564            )?;
565
566            Sparse::try_new(
567                compressed_indices,
568                compressed_values,
569                sparse.len(),
570                sparse.fill_scalar().clone(),
571            )
572            .map(|a| a.into_array())
573        } else {
574            Ok(sparse_encoded)
575        }
576    }
577}
578
579impl Scheme for RunEndScheme {
580    fn scheme_name(&self) -> &'static str {
581        "vortex.int.runend"
582    }
583
584    fn matches(&self, canonical: &Canonical) -> bool {
585        is_integer_primitive(canonical)
586    }
587
588    /// Children: values=0, ends=1.
589    fn num_children(&self) -> usize {
590        2
591    }
592
593    /// RunEnd ends (child 1) are monotonically increasing positions with all unique values.
594    /// Dict, RunEnd, RLE, and Sparse are all pointless on such data.
595    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
596        vec![
597            DescendantExclusion {
598                excluded: IntDictScheme.id(),
599                children: ChildSelection::One(1),
600            },
601            DescendantExclusion {
602                excluded: RunEndScheme.id(),
603                children: ChildSelection::One(1),
604            },
605            DescendantExclusion {
606                excluded: IntRLEScheme.id(),
607                children: ChildSelection::One(1),
608            },
609            DescendantExclusion {
610                excluded: SparseScheme.id(),
611                children: ChildSelection::One(1),
612            },
613        ]
614    }
615
616    /// Dict values (child 0) are all unique by definition, so run-end encoding them is
617    /// pointless. Codes (child 1) can have runs and may benefit from RunEnd.
618    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
619        vec![
620            AncestorExclusion {
621                ancestor: IntDictScheme.id(),
622                children: ChildSelection::One(0),
623            },
624            AncestorExclusion {
625                ancestor: FloatDictScheme.id(),
626                children: ChildSelection::One(0),
627            },
628            AncestorExclusion {
629                ancestor: StringDictScheme.id(),
630                children: ChildSelection::One(0),
631            },
632        ]
633    }
634
635    fn expected_compression_ratio(
636        &self,
637        data: &ArrayAndStats,
638        _compress_ctx: CompressorContext,
639        exec_ctx: &mut ExecutionCtx,
640    ) -> CompressionEstimate {
641        // If the run length is below the threshold, drop it.
642        if data.integer_stats(exec_ctx).average_run_length() < RUN_END_THRESHOLD {
643            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
644        }
645
646        CompressionEstimate::Deferred(DeferredEstimate::Sample)
647    }
648
649    fn compress(
650        &self,
651        compressor: &CascadingCompressor,
652        data: &ArrayAndStats,
653        compress_ctx: CompressorContext,
654        exec_ctx: &mut ExecutionCtx,
655    ) -> VortexResult<ArrayRef> {
656        // Run-end encode the ends.
657        let (ends, values) = runend_encode(data.array_as_primitive(), exec_ctx);
658
659        let values_primitive = values.execute::<PrimitiveArray>(exec_ctx)?;
660        let compressed_values = compressor.compress_child(
661            &values_primitive.into_array(),
662            &compress_ctx,
663            self.id(),
664            0,
665            exec_ctx,
666        )?;
667
668        let compressed_ends =
669            compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?;
670
671        // SAFETY: compression doesn't affect invariants.
672        Ok(unsafe {
673            RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
674                .into_array()
675        })
676    }
677}
678
679impl Scheme for SequenceScheme {
680    fn scheme_name(&self) -> &'static str {
681        "vortex.int.sequence"
682    }
683
684    fn matches(&self, canonical: &Canonical) -> bool {
685        is_integer_primitive(canonical)
686    }
687
688    /// Sequence encoding on dictionary codes just adds a layer of indirection without compressing
689    /// the data. Dict codes are compact integers that benefit from BitPacking or FoR, not from
690    /// sequence detection.
691    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
692        vec![
693            AncestorExclusion {
694                ancestor: IntDictScheme.id(),
695                children: ChildSelection::One(1),
696            },
697            AncestorExclusion {
698                ancestor: FloatDictScheme.id(),
699                children: ChildSelection::One(1),
700            },
701            AncestorExclusion {
702                ancestor: StringDictScheme.id(),
703                children: ChildSelection::One(1),
704            },
705        ]
706    }
707
708    fn expected_compression_ratio(
709        &self,
710        data: &ArrayAndStats,
711        compress_ctx: CompressorContext,
712        exec_ctx: &mut ExecutionCtx,
713    ) -> CompressionEstimate {
714        // It is pointless checking if a sample is a sequence since it will not correspond to the
715        // entire array.
716        if compress_ctx.is_sample() {
717            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
718        }
719        let stats = data.integer_stats(exec_ctx);
720
721        // `SequenceArray` does not support nulls.
722        if stats.null_count() > 0 {
723            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
724        }
725
726        // If the distinct_values_count was computed, and not all values are unique, then this
727        // cannot be encoded as a sequence array.
728        if stats
729            .distinct_count()
730            .is_some_and(|count| count as usize != data.array_len())
731        {
732            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
733        }
734
735        // TODO(connor): `sequence_encode` allocates the encoded array just to confirm feasibility.
736        // A cheaper `is_sequence` probe would let us skip the allocation entirely.
737        CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
738            |_compressor, data, best_so_far, _ctx, exec_ctx| {
739                // `SequenceArray` stores exactly two scalars (base and multiplier), so the best
740                // achievable compression ratio is `array_len / 2`.
741                let compressed_size = 2usize;
742                let max_ratio = data.array_len() as f64 / compressed_size as f64;
743
744                // If we cannot beat the best so far, then we do not want to even try sequence
745                // encoding the data.
746                let threshold = best_so_far.and_then(EstimateScore::finite_ratio);
747                if threshold.is_some_and(|t| max_ratio <= t) {
748                    return Ok(EstimateVerdict::Skip);
749                }
750
751                // TODO(connor): We should pass this array back to the compressor in the case that
752                // we do want to sequence encode this so that we do not need to recompress.
753                if sequence_encode(data.array_as_primitive(), exec_ctx)?.is_none() {
754                    return Ok(EstimateVerdict::Skip);
755                }
756                // TODO(connor): Should we get the actual ratio here?
757                Ok(EstimateVerdict::Ratio(max_ratio))
758            },
759        )))
760    }
761
762    fn compress(
763        &self,
764        _compressor: &CascadingCompressor,
765        data: &ArrayAndStats,
766        _compress_ctx: CompressorContext,
767        exec_ctx: &mut ExecutionCtx,
768    ) -> VortexResult<ArrayRef> {
769        let stats = data.integer_stats(exec_ctx);
770
771        if stats.null_count() > 0 {
772            vortex_bail!("sequence encoding does not support nulls");
773        }
774        sequence_encode(data.array_as_primitive(), exec_ctx)?
775            .ok_or_else(|| vortex_err!("cannot sequence encode array"))
776    }
777}
778
779#[cfg(feature = "pco")]
780impl Scheme for PcoScheme {
781    fn scheme_name(&self) -> &'static str {
782        "vortex.int.pco"
783    }
784
785    fn matches(&self, canonical: &Canonical) -> bool {
786        is_integer_primitive(canonical)
787    }
788
789    fn expected_compression_ratio(
790        &self,
791        data: &ArrayAndStats,
792        _compress_ctx: CompressorContext,
793        _exec_ctx: &mut ExecutionCtx,
794    ) -> CompressionEstimate {
795        use vortex_array::dtype::PType;
796
797        // Pco does not support I8 or U8.
798        if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
799            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
800        }
801
802        CompressionEstimate::Deferred(DeferredEstimate::Sample)
803    }
804
805    fn compress(
806        &self,
807        _compressor: &CascadingCompressor,
808        data: &ArrayAndStats,
809        _compress_ctx: CompressorContext,
810        exec_ctx: &mut ExecutionCtx,
811    ) -> VortexResult<ArrayRef> {
812        Ok(vortex_pco::Pco::from_primitive(
813            data.array_as_primitive(),
814            pco::DEFAULT_COMPRESSION_LEVEL,
815            8192,
816            exec_ctx,
817        )?
818        .into_array())
819    }
820}
821
822/// Shared compression logic for RLE schemes.
823pub(crate) fn rle_compress(
824    scheme: &dyn Scheme,
825    compressor: &CascadingCompressor,
826    data: &ArrayAndStats,
827    compress_ctx: CompressorContext,
828    exec_ctx: &mut ExecutionCtx,
829) -> VortexResult<ArrayRef> {
830    let rle_array = RLE::encode(data.array_as_primitive(), exec_ctx)?;
831
832    let rle_values_primitive = rle_array
833        .values()
834        .clone()
835        .execute::<PrimitiveArray>(exec_ctx)?;
836    let compressed_values = compressor.compress_child(
837        &rle_values_primitive.into_array(),
838        &compress_ctx,
839        scheme.id(),
840        0,
841        exec_ctx,
842    )?;
843
844    // Delta is an unstable encoding, once we deem it stable we can switch over to this always.
845    #[cfg(feature = "unstable_encodings")]
846    let compressed_indices = {
847        let rle_indices_primitive = rle_array
848            .indices()
849            .clone()
850            .execute::<PrimitiveArray>(exec_ctx)?
851            .narrow()?;
852        try_compress_delta(
853            compressor,
854            &rle_indices_primitive.into_array(),
855            &compress_ctx,
856            scheme.id(),
857            1,
858            exec_ctx,
859        )?
860    };
861
862    #[cfg(not(feature = "unstable_encodings"))]
863    let compressed_indices = {
864        let rle_indices_primitive = rle_array
865            .indices()
866            .clone()
867            .execute::<PrimitiveArray>(exec_ctx)?
868            .narrow()?;
869        compressor.compress_child(
870            &rle_indices_primitive.into_array(),
871            &compress_ctx,
872            scheme.id(),
873            1,
874            exec_ctx,
875        )?
876    };
877
878    let rle_offsets_primitive = rle_array
879        .values_idx_offsets()
880        .clone()
881        .execute::<PrimitiveArray>(exec_ctx)?
882        .narrow()?;
883    let compressed_offsets = compressor.compress_child(
884        &rle_offsets_primitive.into_array(),
885        &compress_ctx,
886        scheme.id(),
887        2,
888        exec_ctx,
889    )?;
890
891    // SAFETY: Recursive compression doesn't affect the invariants.
892    unsafe {
893        Ok(RLE::new_unchecked(
894            compressed_values,
895            compressed_indices,
896            compressed_offsets,
897            rle_array.offset(),
898            rle_array.len(),
899        )
900        .into_array())
901    }
902}
903
904#[cfg(feature = "unstable_encodings")]
905fn try_compress_delta(
906    compressor: &CascadingCompressor,
907    child: &ArrayRef,
908    parent_ctx: &CompressorContext,
909    parent_id: SchemeId,
910    child_index: usize,
911    exec_ctx: &mut ExecutionCtx,
912) -> VortexResult<ArrayRef> {
913    let child_primitive = child.clone().execute::<PrimitiveArray>(exec_ctx)?;
914    let (bases, deltas) = vortex_fastlanes::delta_compress(&child_primitive, exec_ctx)?;
915
916    let compressed_bases = compressor.compress_child(
917        &bases.into_array(),
918        parent_ctx,
919        parent_id,
920        child_index,
921        exec_ctx,
922    )?;
923    let compressed_deltas = compressor.compress_child(
924        &deltas.into_array(),
925        parent_ctx,
926        parent_id,
927        child_index,
928        exec_ctx,
929    )?;
930
931    Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
932}
933
934impl Scheme for IntRLEScheme {
935    fn scheme_name(&self) -> &'static str {
936        "vortex.int.rle"
937    }
938
939    fn matches(&self, canonical: &Canonical) -> bool {
940        is_integer_primitive(canonical)
941    }
942
943    /// Children: values=0, indices=1, offsets=2.
944    fn num_children(&self) -> usize {
945        3
946    }
947
948    fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
949        rle_descendant_exclusions()
950    }
951
952    fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
953        rle_ancestor_exclusions()
954    }
955
956    fn expected_compression_ratio(
957        &self,
958        data: &ArrayAndStats,
959        compress_ctx: CompressorContext,
960        exec_ctx: &mut ExecutionCtx,
961    ) -> CompressionEstimate {
962        // RLE is only useful when we cascade it with another encoding.
963        if compress_ctx.finished_cascading() {
964            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
965        }
966        if data.integer_stats(exec_ctx).average_run_length() < RUN_LENGTH_THRESHOLD {
967            return CompressionEstimate::Verdict(EstimateVerdict::Skip);
968        }
969
970        CompressionEstimate::Deferred(DeferredEstimate::Sample)
971    }
972
973    fn compress(
974        &self,
975        compressor: &CascadingCompressor,
976        data: &ArrayAndStats,
977        compress_ctx: CompressorContext,
978        exec_ctx: &mut ExecutionCtx,
979    ) -> VortexResult<ArrayRef> {
980        rle_compress(self, compressor, data, compress_ctx, exec_ctx)
981    }
982}
983
984#[cfg(test)]
985mod tests {
986    use std::iter;
987    use std::sync::LazyLock;
988
989    use itertools::Itertools;
990    use rand::Rng;
991    use rand::SeedableRng;
992    use rand::rngs::StdRng;
993    use vortex_array::IntoArray;
994    use vortex_array::VortexSessionExecute;
995    use vortex_array::arrays::Constant;
996    use vortex_array::arrays::Dict;
997    use vortex_array::arrays::Masked;
998    use vortex_array::arrays::PrimitiveArray;
999    use vortex_array::assert_arrays_eq;
1000    use vortex_array::session::ArraySession;
1001    use vortex_array::validity::Validity;
1002    use vortex_buffer::Buffer;
1003    use vortex_buffer::BufferMut;
1004    use vortex_buffer::buffer;
1005    use vortex_compressor::CascadingCompressor;
1006    use vortex_error::VortexResult;
1007    use vortex_fastlanes::RLE;
1008    use vortex_sequence::Sequence;
1009    use vortex_session::VortexSession;
1010
1011    use crate::BtrBlocksCompressor;
1012    use crate::schemes::integer::IntRLEScheme;
1013
1014    static SESSION: LazyLock<VortexSession> =
1015        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1016
1017    #[test]
1018    fn test_empty() -> VortexResult<()> {
1019        // Make sure empty array compression does not fail.
1020        let btr = BtrBlocksCompressor::default();
1021        let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
1022        let result = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1023
1024        assert!(result.is_empty());
1025        Ok(())
1026    }
1027
1028    #[test]
1029    fn test_dict_encodable() -> VortexResult<()> {
1030        let mut codes = BufferMut::<i32>::with_capacity(65_535);
1031        // Write some runs of length 3 of a handful of different values. Interrupted by some
1032        // one-off values.
1033
1034        let numbers = [0, 10, 50, 100, 1000, 3000]
1035            .into_iter()
1036            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1037            .collect_vec();
1038
1039        let mut rng = StdRng::seed_from_u64(1u64);
1040        while codes.len() < 64000 {
1041            let run_length = rng.next_u32() % 5;
1042            let value = numbers[rng.next_u32() as usize % numbers.len()];
1043            for _ in 0..run_length {
1044                codes.push(value);
1045            }
1046        }
1047
1048        let btr = BtrBlocksCompressor::default();
1049        let compressed = btr.compress(
1050            &codes.freeze().into_array(),
1051            &mut SESSION.create_execution_ctx(),
1052        )?;
1053        assert!(compressed.is::<Dict>());
1054        Ok(())
1055    }
1056
1057    #[test]
1058    fn constant_mostly_nulls() -> VortexResult<()> {
1059        let array = PrimitiveArray::new(
1060            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1061            Validity::from_iter(vec![
1062                false, false, false, false, false, false, false, false, false, false, true,
1063            ]),
1064        );
1065        let validity = array.validity()?;
1066
1067        let btr = BtrBlocksCompressor::default();
1068        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1069
1070        assert!(compressed.is::<Masked>());
1071        assert!(compressed.children()[0].is::<Constant>());
1072
1073        let decoded = compressed;
1074        let expected =
1075            PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
1076        assert_arrays_eq!(decoded, expected);
1077        Ok(())
1078    }
1079
1080    #[test]
1081    fn nullable_sequence() -> VortexResult<()> {
1082        let values = (0i32..20).step_by(7).collect_vec();
1083        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1084
1085        let btr = BtrBlocksCompressor::default();
1086        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1087        assert!(compressed.is::<Sequence>());
1088
1089        let decoded = compressed;
1090        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1091        assert_arrays_eq!(decoded, expected);
1092        Ok(())
1093    }
1094
1095    #[test]
1096    fn test_rle_compression() -> VortexResult<()> {
1097        let mut values = Vec::new();
1098        values.extend(iter::repeat_n(42i32, 100));
1099        values.extend(iter::repeat_n(123i32, 200));
1100        values.extend(iter::repeat_n(987i32, 150));
1101
1102        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1103        let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
1104        let compressed =
1105            compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1106        assert!(compressed.is::<RLE>());
1107
1108        let expected = Buffer::copy_from(&values).into_array();
1109        assert_arrays_eq!(compressed, expected);
1110        Ok(())
1111    }
1112
1113    #[test_with::env(CI)]
1114    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1115    fn compress_large_int() -> VortexResult<()> {
1116        const NUM_LISTS: usize = 10_000;
1117        const ELEMENTS_PER_LIST: usize = 5_000;
1118
1119        let prim = (0..NUM_LISTS)
1120            .flat_map(|list_idx| {
1121                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1122            })
1123            .collect::<PrimitiveArray>()
1124            .into_array();
1125
1126        let btr = BtrBlocksCompressor::default();
1127        btr.compress(&prim, &mut SESSION.create_execution_ctx())?;
1128
1129        Ok(())
1130    }
1131}
1132
1133/// Tests to verify that each integer compression scheme produces the expected encoding.
1134#[cfg(test)]
1135mod scheme_selection_tests {
1136    use std::iter;
1137    use std::sync::LazyLock;
1138
1139    use rand::Rng;
1140    use rand::SeedableRng;
1141    use rand::rngs::StdRng;
1142    use vortex_array::IntoArray;
1143    use vortex_array::VortexSessionExecute;
1144    use vortex_array::arrays::Constant;
1145    use vortex_array::arrays::Dict;
1146    use vortex_array::arrays::PrimitiveArray;
1147    use vortex_array::expr::stats::Precision;
1148    use vortex_array::expr::stats::Stat;
1149    use vortex_array::expr::stats::StatsProviderExt;
1150    use vortex_array::session::ArraySession;
1151    use vortex_array::validity::Validity;
1152    use vortex_buffer::Buffer;
1153    use vortex_error::VortexResult;
1154    use vortex_fastlanes::BitPacked;
1155    use vortex_fastlanes::FoR;
1156    use vortex_runend::RunEnd;
1157    use vortex_sequence::Sequence;
1158    use vortex_session::VortexSession;
1159    use vortex_sparse::Sparse;
1160
1161    use crate::BtrBlocksCompressor;
1162
1163    static SESSION: LazyLock<VortexSession> =
1164        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1165
1166    #[test]
1167    fn test_constant_compressed() -> VortexResult<()> {
1168        let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1169        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1170        let btr = BtrBlocksCompressor::default();
1171        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1172        assert!(compressed.is::<Constant>());
1173        Ok(())
1174    }
1175
1176    #[test]
1177    fn test_for_compressed() -> VortexResult<()> {
1178        let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1179        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1180        let btr = BtrBlocksCompressor::default();
1181        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1182        assert!(compressed.is::<FoR>());
1183        Ok(())
1184    }
1185
1186    #[test]
1187    fn test_bitpacking_compressed() -> VortexResult<()> {
1188        let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1189        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1190        let btr = BtrBlocksCompressor::default();
1191        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1192        assert!(compressed.is::<BitPacked>());
1193        assert_eq!(
1194            compressed.statistics().get_as::<u64>(Stat::NullCount),
1195            Some(Precision::exact(0u64))
1196        );
1197        assert_eq!(
1198            compressed.statistics().get_as::<u32>(Stat::Min),
1199            Some(Precision::exact(0u32))
1200        );
1201        assert_eq!(
1202            compressed.statistics().get_as::<u32>(Stat::Max),
1203            Some(Precision::exact(15u32))
1204        );
1205        Ok(())
1206    }
1207
1208    #[test]
1209    fn test_sparse_compressed() -> VortexResult<()> {
1210        let mut values: Vec<i32> = Vec::new();
1211        for i in 0..1000 {
1212            if i % 20 == 0 {
1213                values.push(2_000_000 + (i * 7) % 1000);
1214            } else {
1215                values.push(1_000_000);
1216            }
1217        }
1218        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1219        let btr = BtrBlocksCompressor::default();
1220        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1221        assert!(compressed.is::<Sparse>());
1222        Ok(())
1223    }
1224
1225    #[test]
1226    fn test_dict_compressed() -> VortexResult<()> {
1227        let mut codes = Vec::with_capacity(65_535);
1228        let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1229            .into_iter()
1230            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1231            .collect();
1232
1233        let mut rng = StdRng::seed_from_u64(1u64);
1234        while codes.len() < 64000 {
1235            let run_length = rng.next_u32() % 5;
1236            let value = numbers[rng.next_u32() as usize % numbers.len()];
1237            for _ in 0..run_length {
1238                codes.push(value);
1239            }
1240        }
1241
1242        let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1243        let btr = BtrBlocksCompressor::default();
1244        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1245        assert!(compressed.is::<Dict>());
1246        Ok(())
1247    }
1248
1249    #[test]
1250    fn test_runend_compressed() -> VortexResult<()> {
1251        let mut values: Vec<i32> = Vec::new();
1252        for i in 0..100 {
1253            values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1254        }
1255        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1256        let btr = BtrBlocksCompressor::default();
1257        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1258        assert!(compressed.is::<RunEnd>());
1259        Ok(())
1260    }
1261
1262    #[test]
1263    fn test_sequence_compressed() -> VortexResult<()> {
1264        let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1265        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1266        let btr = BtrBlocksCompressor::default();
1267        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1268        assert!(compressed.is::<Sequence>());
1269        Ok(())
1270    }
1271
1272    #[test]
1273    fn test_rle_compressed() -> VortexResult<()> {
1274        let mut values: Vec<i32> = Vec::new();
1275        for i in 0..1024 {
1276            values.extend(iter::repeat_n(i, 10));
1277        }
1278        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1279        let btr = BtrBlocksCompressor::default();
1280        let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1281        eprintln!("{}", compressed.display_tree());
1282        assert!(compressed.is::<RunEnd>());
1283        Ok(())
1284    }
1285}