Skip to main content

vortex_btrblocks/compressor/integer/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub(crate) mod dictionary;
5pub(super) mod stats;
6
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use enum_iterator::Sequence;
11pub use stats::IntegerStats;
12use vortex_array::ArrayRef;
13use vortex_array::Canonical;
14use vortex_array::IntoArray;
15use vortex_array::ToCanonical;
16use vortex_array::arrays::ConstantArray;
17use vortex_array::arrays::DictArray;
18use vortex_array::arrays::MaskedArray;
19use vortex_array::arrays::PrimitiveArray;
20use vortex_array::arrays::PrimitiveVTable;
21use vortex_array::scalar::Scalar;
22use vortex_array::vtable::VTable;
23use vortex_array::vtable::ValidityHelper;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_fastlanes::FoRArray;
29use vortex_fastlanes::bitpack_compress::bit_width_histogram;
30use vortex_fastlanes::bitpack_compress::bitpack_encode;
31use vortex_fastlanes::bitpack_compress::find_best_bit_width;
32use vortex_runend::RunEndArray;
33use vortex_runend::compress::runend_encode;
34use vortex_sequence::sequence_encode;
35use vortex_sparse::SparseArray;
36use vortex_sparse::SparseVTable;
37use vortex_zigzag::ZigZagArray;
38use vortex_zigzag::zigzag_encode;
39
40use self::dictionary::dictionary_encode;
41use crate::BtrBlocksCompressor;
42use crate::CanonicalCompressor;
43use crate::Compressor;
44use crate::CompressorContext;
45use crate::CompressorStats;
46use crate::Excludes;
47use crate::GenerateStatsOptions;
48use crate::Scheme;
49use crate::SchemeExt;
50use crate::compressor::patches::compress_patches;
51use crate::compressor::rle;
52use crate::compressor::rle::RLEScheme;
53
54/// All available integer compression schemes.
55pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[
56    &ConstantScheme,
57    &FORScheme,
58    &ZigZagScheme,
59    &BitPackingScheme,
60    &SparseScheme,
61    &DictScheme,
62    &RunEndScheme,
63    &SequenceScheme,
64    &RLE_INTEGER_SCHEME,
65    #[cfg(feature = "pco")]
66    &PcoScheme,
67];
68
69/// [`Compressor`] for signed and unsigned integers.
70#[derive(Clone, Copy)]
71pub struct IntCompressor<'a> {
72    /// Reference to the parent compressor.
73    pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
74}
75
76impl<'a> Compressor for IntCompressor<'a> {
77    type ArrayVTable = PrimitiveVTable;
78    type SchemeType = dyn IntegerScheme;
79    type StatsType = IntegerStats;
80
81    fn schemes(&self) -> &[&'static dyn IntegerScheme] {
82        self.btr_blocks_compressor.int_schemes()
83    }
84
85    fn default_scheme(&self) -> &'static Self::SchemeType {
86        &UncompressedScheme
87    }
88
89    fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
90        if self
91            .btr_blocks_compressor
92            .int_schemes()
93            .iter()
94            .any(|s| s.code() == IntCode::Dict)
95        {
96            IntegerStats::generate_opts(
97                array,
98                GenerateStatsOptions {
99                    count_distinct_values: true,
100                },
101            )
102        } else {
103            IntegerStats::generate_opts(
104                array,
105                GenerateStatsOptions {
106                    count_distinct_values: false,
107                },
108            )
109        }
110    }
111}
112
113pub trait IntegerScheme:
114    Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
115{
116}
117
118// Auto-impl
119impl<T> IntegerScheme for T where
120    T: Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
121{
122}
123
124impl PartialEq for dyn IntegerScheme {
125    fn eq(&self, other: &Self) -> bool {
126        self.code() == other.code()
127    }
128}
129
130impl Eq for dyn IntegerScheme {}
131
132impl Hash for dyn IntegerScheme {
133    fn hash<H: Hasher>(&self, state: &mut H) {
134        self.code().hash(state)
135    }
136}
137
138/// Unique identifier for integer compression schemes.
139#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
140pub enum IntCode {
141    /// No compression applied.
142    Uncompressed,
143    /// Constant encoding for arrays with a single distinct value.
144    Constant,
145    /// Frame of Reference encoding - subtracts minimum value then bitpacks.
146    For,
147    /// ZigZag encoding - transforms negative integers to positive for better bitpacking.
148    ZigZag,
149    /// BitPacking encoding - compresses non-negative integers by reducing bit width.
150    BitPacking,
151    /// Sparse encoding - optimizes null-dominated or single-value-dominated arrays.
152    Sparse,
153    /// Dictionary encoding - creates a dictionary of unique values.
154    Dict,
155    /// Run-end encoding - run-length encoding with end positions.
156    RunEnd,
157    /// Sequence encoding - detects sequential patterns.
158    Sequence,
159    /// RLE encoding - generic run-length encoding.
160    Rle,
161    /// Pco (pcodec) compression for integers.
162    Pco,
163}
164
165#[derive(Debug, Copy, Clone, PartialEq, Eq)]
166
167pub struct UncompressedScheme;
168
169#[derive(Debug, Copy, Clone, PartialEq, Eq)]
170
171pub struct ConstantScheme;
172
173#[derive(Debug, Copy, Clone, PartialEq, Eq)]
174
175pub struct FORScheme;
176
177#[derive(Debug, Copy, Clone, PartialEq, Eq)]
178pub struct ZigZagScheme;
179
180#[derive(Debug, Copy, Clone, PartialEq, Eq)]
181pub struct BitPackingScheme;
182
183#[derive(Debug, Copy, Clone, PartialEq, Eq)]
184pub struct SparseScheme;
185
186#[derive(Debug, Copy, Clone, PartialEq, Eq)]
187pub struct DictScheme;
188
189#[derive(Debug, Copy, Clone, PartialEq, Eq)]
190pub struct RunEndScheme;
191
192#[derive(Debug, Copy, Clone, PartialEq, Eq)]
193pub struct SequenceScheme;
194
195/// Pco (pcodec) compression for integers.
196#[cfg(feature = "pco")]
197#[derive(Debug, Copy, Clone, PartialEq, Eq)]
198pub struct PcoScheme;
199
200/// Threshold for the average run length in an array before we consider run-end encoding.
201const RUN_END_THRESHOLD: u32 = 4;
202
203/// Configuration for integer RLE compression.
204#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct IntRLEConfig;
206
207impl rle::RLEConfig for IntRLEConfig {
208    type Stats = IntegerStats;
209    type Code = IntCode;
210
211    const CODE: IntCode = IntCode::Rle;
212
213    fn compress_values(
214        compressor: &BtrBlocksCompressor,
215        values: &PrimitiveArray,
216        ctx: CompressorContext,
217        excludes: &[IntCode],
218    ) -> VortexResult<ArrayRef> {
219        compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into())
220    }
221}
222
223/// RLE scheme for integer compression.
224pub const RLE_INTEGER_SCHEME: RLEScheme<IntRLEConfig> = RLEScheme::new();
225
226impl Scheme for UncompressedScheme {
227    type StatsType = IntegerStats;
228    type CodeType = IntCode;
229
230    fn code(&self) -> IntCode {
231        IntCode::Uncompressed
232    }
233
234    fn expected_compression_ratio(
235        &self,
236        _compressor: &BtrBlocksCompressor,
237        _stats: &IntegerStats,
238        _ctx: CompressorContext,
239        _excludes: &[IntCode],
240    ) -> VortexResult<f64> {
241        // no compression
242        Ok(1.0)
243    }
244
245    fn compress(
246        &self,
247        _compressor: &BtrBlocksCompressor,
248        stats: &IntegerStats,
249        _ctx: CompressorContext,
250        _excludes: &[IntCode],
251    ) -> VortexResult<ArrayRef> {
252        Ok(stats.source().to_array())
253    }
254}
255
256impl Scheme for ConstantScheme {
257    type StatsType = IntegerStats;
258    type CodeType = IntCode;
259
260    fn code(&self) -> IntCode {
261        IntCode::Constant
262    }
263
264    fn is_constant(&self) -> bool {
265        true
266    }
267
268    fn expected_compression_ratio(
269        &self,
270        _compressor: &BtrBlocksCompressor,
271        stats: &IntegerStats,
272        ctx: CompressorContext,
273        _excludes: &[IntCode],
274    ) -> VortexResult<f64> {
275        // Never yield ConstantScheme for a sample, it could be a false-positive.
276        if ctx.is_sample {
277            return Ok(0.0);
278        }
279
280        // Only arrays with one distinct values can be constant compressed.
281        if stats.distinct_values_count > 1 {
282            return Ok(0.0);
283        }
284
285        Ok(stats.value_count as f64)
286    }
287
288    fn compress(
289        &self,
290        _compressor: &BtrBlocksCompressor,
291        stats: &IntegerStats,
292        _ctx: CompressorContext,
293        _excludes: &[IntCode],
294    ) -> VortexResult<ArrayRef> {
295        let scalar_idx =
296            (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
297
298        match scalar_idx {
299            Some(idx) => {
300                let scalar = stats.source().scalar_at(idx)?;
301                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
302                if !stats.source().all_valid()? {
303                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
304                } else {
305                    Ok(const_arr)
306                }
307            }
308            None => Ok(ConstantArray::new(
309                Scalar::null(stats.src.dtype().clone()),
310                stats.src.len(),
311            )
312            .into_array()),
313        }
314    }
315}
316
317impl Scheme for FORScheme {
318    type StatsType = IntegerStats;
319    type CodeType = IntCode;
320
321    fn code(&self) -> IntCode {
322        IntCode::For
323    }
324
325    fn expected_compression_ratio(
326        &self,
327        _compressor: &BtrBlocksCompressor,
328        stats: &IntegerStats,
329        ctx: CompressorContext,
330        _excludes: &[IntCode],
331    ) -> VortexResult<f64> {
332        // Only apply if we are not at the leaf
333        if ctx.allowed_cascading == 0 {
334            return Ok(0.0);
335        }
336
337        // All-null cannot be FOR compressed.
338        if stats.value_count == 0 {
339            return Ok(0.0);
340        }
341
342        // Only apply when the min is not already zero.
343        if stats.typed.min_is_zero() {
344            return Ok(0.0);
345        }
346
347        // Difference between max and min
348        let full_width: u32 = stats
349            .src
350            .ptype()
351            .bit_width()
352            .try_into()
353            .vortex_expect("bit width must fit in u32");
354        let bw = match stats.typed.max_minus_min().checked_ilog2() {
355            Some(l) => l + 1,
356            // If max-min == 0, it we should use a different compression scheme
357            // as we don't want to bitpack down to 0 bits.
358            None => return Ok(0.0),
359        };
360
361        // If we're not saving at least 1 byte, don't bother with FOR
362        if full_width - bw < 8 {
363            return Ok(0.0);
364        }
365
366        Ok(full_width as f64 / bw as f64)
367    }
368
369    fn compress(
370        &self,
371        compressor: &BtrBlocksCompressor,
372        stats: &IntegerStats,
373        ctx: CompressorContext,
374        excludes: &[IntCode],
375    ) -> VortexResult<ArrayRef> {
376        let for_array = FoRArray::encode(stats.src.clone())?;
377        let biased = for_array.encoded().to_primitive();
378        let biased_stats = IntegerStats::generate_opts(
379            &biased,
380            GenerateStatsOptions {
381                count_distinct_values: false,
382            },
383        );
384
385        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
386        // of bitpacking.
387        // NOTE: we could delegate in the future if we had another downstream codec that performs
388        //  as well.
389        let leaf_ctx = CompressorContext {
390            is_sample: ctx.is_sample,
391            allowed_cascading: 0,
392        };
393        let compressed =
394            BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes)?;
395
396        let for_compressed = FoRArray::try_new(compressed, for_array.reference_scalar().clone())?;
397        for_compressed
398            .as_ref()
399            .statistics()
400            .inherit_from(for_array.as_ref().statistics());
401        Ok(for_compressed.into_array())
402    }
403}
404
405impl Scheme for ZigZagScheme {
406    type StatsType = IntegerStats;
407    type CodeType = IntCode;
408
409    fn code(&self) -> IntCode {
410        IntCode::ZigZag
411    }
412
413    fn expected_compression_ratio(
414        &self,
415        compressor: &BtrBlocksCompressor,
416        stats: &IntegerStats,
417        ctx: CompressorContext,
418        excludes: &[IntCode],
419    ) -> VortexResult<f64> {
420        // ZigZag is only useful when we cascade it with another encoding
421        if ctx.allowed_cascading == 0 {
422            return Ok(0.0);
423        }
424
425        // Don't try and compress all-null arrays
426        if stats.value_count == 0 {
427            return Ok(0.0);
428        }
429
430        // ZigZag is only useful when there are negative values.
431        if !stats.typed.min_is_negative() {
432            return Ok(0.0);
433        }
434
435        // Run compression on a sample to see how it performs.
436        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
437    }
438
439    fn compress(
440        &self,
441        compressor: &BtrBlocksCompressor,
442        stats: &IntegerStats,
443        ctx: CompressorContext,
444        excludes: &[IntCode],
445    ) -> VortexResult<ArrayRef> {
446        // Zigzag encode the values, then recursively compress the inner values.
447        let zag = zigzag_encode(stats.src.clone())?;
448        let encoded = zag.encoded().to_primitive();
449
450        // ZigZag should be after Dict, RunEnd or Sparse.
451        // We should only do these "container" style compressors once.
452        let mut new_excludes = vec![
453            ZigZagScheme.code(),
454            DictScheme.code(),
455            RunEndScheme.code(),
456            SparseScheme.code(),
457        ];
458        new_excludes.extend_from_slice(excludes);
459
460        let compressed = compressor.compress_canonical(
461            Canonical::Primitive(encoded),
462            ctx.descend(),
463            Excludes::int_only(&new_excludes),
464        )?;
465
466        tracing::debug!("zigzag output: {}", compressed.encoding_id());
467
468        Ok(ZigZagArray::try_new(compressed)?.into_array())
469    }
470}
471
472impl Scheme for BitPackingScheme {
473    type StatsType = IntegerStats;
474    type CodeType = IntCode;
475
476    fn code(&self) -> IntCode {
477        IntCode::BitPacking
478    }
479
480    fn expected_compression_ratio(
481        &self,
482        compressor: &BtrBlocksCompressor,
483        stats: &IntegerStats,
484        ctx: CompressorContext,
485        excludes: &[IntCode],
486    ) -> VortexResult<f64> {
487        // BitPacking only works for non-negative values
488        if stats.typed.min_is_negative() {
489            return Ok(0.0);
490        }
491
492        // Don't compress all-null arrays
493        if stats.value_count == 0 {
494            return Ok(0.0);
495        }
496
497        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
498    }
499
500    fn compress(
501        &self,
502        _compressor: &BtrBlocksCompressor,
503        stats: &IntegerStats,
504        _ctx: CompressorContext,
505        _excludes: &[IntCode],
506    ) -> VortexResult<ArrayRef> {
507        let histogram = bit_width_histogram(stats.source())?;
508        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
509        // If best bw is determined to be the current bit-width, return the original array.
510        if bw as usize == stats.source().ptype().bit_width() {
511            return Ok(stats.source().clone().into_array());
512        }
513        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
514
515        let patches = packed.patches().map(compress_patches).transpose()?;
516        packed.replace_patches(patches);
517
518        Ok(packed.into_array())
519    }
520}
521
522impl Scheme for SparseScheme {
523    type StatsType = IntegerStats;
524    type CodeType = IntCode;
525
526    fn code(&self) -> IntCode {
527        IntCode::Sparse
528    }
529
530    // We can avoid asserting the encoding tree instead.
531    fn expected_compression_ratio(
532        &self,
533        _compressor: &BtrBlocksCompressor,
534        stats: &IntegerStats,
535        ctx: CompressorContext,
536        _excludes: &[IntCode],
537    ) -> VortexResult<f64> {
538        // Only use `SparseScheme` if we can cascade.
539        if ctx.allowed_cascading == 0 {
540            return Ok(0.0);
541        }
542
543        if stats.value_count == 0 {
544            // All nulls should use ConstantScheme
545            return Ok(0.0);
546        }
547
548        // If the majority is null, will compress well.
549        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
550            return Ok(stats.src.len() as f64 / stats.value_count as f64);
551        }
552
553        // See if the top value accounts for >= 90% of the set values.
554        let (_, top_count) = stats.typed.top_value_and_count();
555
556        if top_count == stats.value_count {
557            // top_value is the only value, should use ConstantScheme instead
558            return Ok(0.0);
559        }
560
561        let freq = top_count as f64 / stats.value_count as f64;
562        if freq >= 0.9 {
563            // We only store the positions of the non-top values.
564            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
565        }
566
567        Ok(0.0)
568    }
569
570    fn compress(
571        &self,
572        compressor: &BtrBlocksCompressor,
573        stats: &IntegerStats,
574        ctx: CompressorContext,
575        excludes: &[IntCode],
576    ) -> VortexResult<ArrayRef> {
577        assert!(ctx.allowed_cascading > 0);
578        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
579        if top_count as usize == stats.src.len() {
580            // top_value is the only value, use ConstantScheme
581            return Ok(ConstantArray::new(
582                Scalar::primitive_value(
583                    top_pvalue,
584                    top_pvalue.ptype(),
585                    stats.src.dtype().nullability(),
586                ),
587                stats.src.len(),
588            )
589            .into_array());
590        }
591
592        let sparse_encoded = SparseArray::encode(
593            stats.src.as_ref(),
594            Some(Scalar::primitive_value(
595                top_pvalue,
596                top_pvalue.ptype(),
597                stats.src.dtype().nullability(),
598            )),
599        )?;
600
601        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
602            // Compress the values
603            let mut new_excludes = vec![SparseScheme.code(), IntCode::Dict];
604            new_excludes.extend_from_slice(excludes);
605
606            let compressed_values = compressor.compress_canonical(
607                Canonical::Primitive(sparse.patches().values().to_primitive()),
608                ctx.descend(),
609                Excludes::int_only(&new_excludes),
610            )?;
611
612            let indices = sparse.patches().indices().to_primitive().narrow()?;
613
614            let compressed_indices = compressor.compress_canonical(
615                Canonical::Primitive(indices),
616                ctx.descend(),
617                Excludes::int_only(&new_excludes),
618            )?;
619
620            SparseArray::try_new(
621                compressed_indices,
622                compressed_values,
623                sparse.len(),
624                sparse.fill_scalar().clone(),
625            )
626            .map(|a| a.into_array())
627        } else {
628            Ok(sparse_encoded)
629        }
630    }
631}
632
633impl Scheme for DictScheme {
634    type StatsType = IntegerStats;
635    type CodeType = IntCode;
636
637    fn code(&self) -> IntCode {
638        IntCode::Dict
639    }
640
641    fn expected_compression_ratio(
642        &self,
643        _compressor: &BtrBlocksCompressor,
644        stats: &IntegerStats,
645        ctx: CompressorContext,
646        _excludes: &[IntCode],
647    ) -> VortexResult<f64> {
648        // Dict should not be terminal.
649        if ctx.allowed_cascading == 0 {
650            return Ok(0.0);
651        }
652
653        if stats.value_count == 0 {
654            return Ok(0.0);
655        }
656
657        // If > 50% of the values are distinct, skip dict.
658        if stats.distinct_values_count > stats.value_count / 2 {
659            return Ok(0.0);
660        }
661
662        // Ignore nulls encoding for the estimate. We only focus on values.
663        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
664
665        // Assume codes are compressed RLE + BitPacking.
666        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
667
668        let n_runs = (stats.value_count / stats.average_run_length) as usize;
669
670        // Assume that codes will either be BitPack or RLE-BitPack
671        let codes_size_bp = (codes_bw * stats.value_count) as usize;
672        let codes_size_rle_bp = usize::checked_mul((codes_bw + 32) as usize, n_runs);
673
674        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp.unwrap_or(usize::MAX));
675
676        let before = stats.value_count as usize * stats.source().ptype().bit_width();
677
678        Ok(before as f64 / (values_size + codes_size) as f64)
679    }
680
681    fn compress(
682        &self,
683        compressor: &BtrBlocksCompressor,
684        stats: &IntegerStats,
685        ctx: CompressorContext,
686        excludes: &[IntCode],
687    ) -> VortexResult<ArrayRef> {
688        assert!(ctx.allowed_cascading > 0);
689
690        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
691        //    RLE or FOR + BP. Cascading probably wastes some time here.
692
693        let dict = dictionary_encode(stats);
694
695        // Cascade the codes child
696        // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data.
697        let mut new_excludes = vec![IntCode::Dict, IntCode::Sequence];
698        new_excludes.extend_from_slice(excludes);
699
700        let compressed_codes = compressor.compress_canonical(
701            Canonical::Primitive(dict.codes().to_primitive().narrow()?),
702            ctx.descend(),
703            Excludes::int_only(&new_excludes),
704        )?;
705
706        // SAFETY: compressing codes does not change their values
707        unsafe {
708            Ok(
709                DictArray::new_unchecked(compressed_codes, dict.values().clone())
710                    .set_all_values_referenced(dict.has_all_values_referenced())
711                    .into_array(),
712            )
713        }
714    }
715}
716
717impl Scheme for RunEndScheme {
718    type StatsType = IntegerStats;
719    type CodeType = IntCode;
720
721    fn code(&self) -> IntCode {
722        IntCode::RunEnd
723    }
724
725    fn expected_compression_ratio(
726        &self,
727        compressor: &BtrBlocksCompressor,
728        stats: &IntegerStats,
729        ctx: CompressorContext,
730        excludes: &[IntCode],
731    ) -> VortexResult<f64> {
732        // If the run length is below the threshold, drop it.
733        if stats.average_run_length < RUN_END_THRESHOLD {
734            return Ok(0.0);
735        }
736
737        if ctx.allowed_cascading == 0 {
738            return Ok(0.0);
739        }
740
741        // Run compression on a sample, see how it performs.
742        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
743    }
744
745    fn compress(
746        &self,
747        compressor: &BtrBlocksCompressor,
748        stats: &IntegerStats,
749        ctx: CompressorContext,
750        excludes: &[IntCode],
751    ) -> VortexResult<ArrayRef> {
752        assert!(ctx.allowed_cascading > 0);
753
754        // run-end encode the ends
755        let (ends, values) = runend_encode(&stats.src);
756
757        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
758        new_excludes.extend_from_slice(excludes);
759
760        let compressed_ends = compressor.compress_canonical(
761            Canonical::Primitive(ends.to_primitive()),
762            ctx.descend(),
763            Excludes::int_only(&new_excludes),
764        )?;
765
766        let compressed_values = compressor.compress_canonical(
767            Canonical::Primitive(values.to_primitive()),
768            ctx.descend(),
769            Excludes::int_only(&new_excludes),
770        )?;
771
772        // SAFETY: compression doesn't affect invariants
773        unsafe {
774            Ok(
775                RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
776                    .into_array(),
777            )
778        }
779    }
780}
781
782impl Scheme for SequenceScheme {
783    type StatsType = IntegerStats;
784    type CodeType = IntCode;
785
786    fn code(&self) -> Self::CodeType {
787        IntCode::Sequence
788    }
789
790    fn expected_compression_ratio(
791        &self,
792        _compressor: &BtrBlocksCompressor,
793        stats: &Self::StatsType,
794        _ctx: CompressorContext,
795        _excludes: &[Self::CodeType],
796    ) -> VortexResult<f64> {
797        if stats.null_count > 0 {
798            return Ok(0.0);
799        }
800
801        // If the distinct_values_count was computed (!= u32::MAX)
802        // Then all values in a sequence must be unique.
803        if stats.distinct_values_count != u32::MAX
804            && stats.distinct_values_count as usize != stats.src.len()
805        {
806            return Ok(0.0);
807        }
808
809        // Since two values are required to store base and multiplier the
810        // compression ratio is divided by 2.
811        Ok(sequence_encode(&stats.src)?
812            .map(|_| stats.src.len() as f64 / 2.0)
813            .unwrap_or(0.0))
814    }
815
816    fn compress(
817        &self,
818        _compressor: &BtrBlocksCompressor,
819        stats: &Self::StatsType,
820        _ctx: CompressorContext,
821        _excludes: &[Self::CodeType],
822    ) -> VortexResult<ArrayRef> {
823        if stats.null_count > 0 {
824            vortex_bail!("sequence encoding does not support nulls");
825        }
826        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
827    }
828}
829
830#[cfg(feature = "pco")]
831impl Scheme for PcoScheme {
832    type StatsType = IntegerStats;
833    type CodeType = IntCode;
834
835    fn code(&self) -> IntCode {
836        IntCode::Pco
837    }
838
839    fn expected_compression_ratio(
840        &self,
841        compressor: &BtrBlocksCompressor,
842        stats: &Self::StatsType,
843        ctx: CompressorContext,
844        excludes: &[IntCode],
845    ) -> VortexResult<f64> {
846        // Pco does not support I8 or U8.
847        if matches!(
848            stats.src.ptype(),
849            vortex_dtype::PType::I8 | vortex_dtype::PType::U8
850        ) {
851            return Ok(0.0);
852        }
853
854        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
855    }
856
857    fn compress(
858        &self,
859        _compressor: &BtrBlocksCompressor,
860        stats: &Self::StatsType,
861        _ctx: CompressorContext,
862        _excludes: &[IntCode],
863    ) -> VortexResult<ArrayRef> {
864        Ok(vortex_pco::PcoArray::from_primitive(
865            stats.source(),
866            pco::DEFAULT_COMPRESSION_LEVEL,
867            8192,
868        )?
869        .into_array())
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use std::iter;
876
877    use itertools::Itertools;
878    use rand::RngCore;
879    use rand::SeedableRng;
880    use rand::rngs::StdRng;
881    use vortex_array::Array;
882    use vortex_array::IntoArray;
883    use vortex_array::ToCanonical;
884    use vortex_array::arrays::DictVTable;
885    use vortex_array::arrays::PrimitiveArray;
886    use vortex_array::assert_arrays_eq;
887    use vortex_array::validity::Validity;
888    use vortex_array::vtable::ValidityHelper;
889    use vortex_buffer::Buffer;
890    use vortex_buffer::BufferMut;
891    use vortex_buffer::buffer;
892    use vortex_error::VortexResult;
893    use vortex_sequence::SequenceVTable;
894    use vortex_sparse::SparseVTable;
895
896    use super::IntegerStats;
897    use super::RLE_INTEGER_SCHEME;
898    use super::SequenceScheme;
899    use super::SparseScheme;
900    use crate::BtrBlocksCompressor;
901    use crate::CompressorContext;
902    use crate::CompressorExt;
903    use crate::CompressorStats;
904    use crate::Scheme;
905
906    #[test]
907    fn test_empty() -> VortexResult<()> {
908        // Make sure empty array compression does not fail
909        let btr = BtrBlocksCompressor::default();
910        let result = btr.integer_compressor().compress(
911            &btr,
912            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
913            CompressorContext::default(),
914            &[],
915        )?;
916
917        assert!(result.is_empty());
918        Ok(())
919    }
920
921    #[test]
922    fn test_dict_encodable() -> VortexResult<()> {
923        let mut codes = BufferMut::<i32>::with_capacity(65_535);
924        // Write some runs of length 3 of a handful of different values. Interrupted by some
925        // one-off values.
926
927        let numbers = [0, 10, 50, 100, 1000, 3000]
928            .into_iter()
929            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
930            .collect_vec();
931
932        let mut rng = StdRng::seed_from_u64(1u64);
933        while codes.len() < 64000 {
934            let run_length = rng.next_u32() % 5;
935            let value = numbers[rng.next_u32() as usize % numbers.len()];
936            for _ in 0..run_length {
937                codes.push(value);
938            }
939        }
940
941        let primitive = codes.freeze().into_array().to_primitive();
942        let btr = BtrBlocksCompressor::default();
943        let compressed = btr.integer_compressor().compress(
944            &btr,
945            &primitive,
946            CompressorContext::default(),
947            &[],
948        )?;
949        assert!(compressed.is::<DictVTable>());
950        Ok(())
951    }
952
953    #[test]
954    fn sparse_with_nulls() -> VortexResult<()> {
955        let array = PrimitiveArray::new(
956            buffer![189u8, 189, 189, 0, 46],
957            Validity::from_iter(vec![true, true, true, true, false]),
958        );
959        let btr = BtrBlocksCompressor::default();
960        let compressed = SparseScheme.compress(
961            &btr,
962            &IntegerStats::generate(&array),
963            CompressorContext::default(),
964            &[],
965        )?;
966        assert!(compressed.is::<SparseVTable>());
967        let decoded = compressed.clone();
968        let expected =
969            PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
970                .into_array();
971        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
972        Ok(())
973    }
974
975    #[test]
976    fn sparse_mostly_nulls() -> VortexResult<()> {
977        let array = PrimitiveArray::new(
978            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
979            Validity::from_iter(vec![
980                false, false, false, false, false, false, false, false, false, false, true,
981            ]),
982        );
983        let btr = BtrBlocksCompressor::default();
984        let compressed = SparseScheme.compress(
985            &btr,
986            &IntegerStats::generate(&array),
987            CompressorContext::default(),
988            &[],
989        )?;
990        assert!(compressed.is::<SparseVTable>());
991        let decoded = compressed.clone();
992        let expected = PrimitiveArray::new(
993            buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
994            array.validity().clone(),
995        )
996        .into_array();
997        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
998        Ok(())
999    }
1000
1001    #[test]
1002    fn nullable_sequence() -> VortexResult<()> {
1003        let values = (0i32..20).step_by(7).collect_vec();
1004        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1005        let btr = BtrBlocksCompressor::default();
1006        let compressed = SequenceScheme.compress(
1007            &btr,
1008            &IntegerStats::generate(&array),
1009            CompressorContext::default(),
1010            &[],
1011        )?;
1012        assert!(compressed.is::<SequenceVTable>());
1013        let decoded = compressed;
1014        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1015        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1016        Ok(())
1017    }
1018
1019    #[test]
1020    fn test_rle_compression() -> VortexResult<()> {
1021        let mut values = Vec::new();
1022        values.extend(iter::repeat_n(42i32, 100));
1023        values.extend(iter::repeat_n(123i32, 200));
1024        values.extend(iter::repeat_n(987i32, 150));
1025
1026        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1027        let btr = BtrBlocksCompressor::default();
1028        let compressed = RLE_INTEGER_SCHEME.compress(
1029            &btr,
1030            &IntegerStats::generate(&array),
1031            CompressorContext::default(),
1032            &[],
1033        )?;
1034
1035        let decoded = compressed;
1036        let expected = Buffer::copy_from(&values).into_array();
1037        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1038        Ok(())
1039    }
1040
1041    #[test_with::env(CI)]
1042    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1043    fn compress_large_int() -> VortexResult<()> {
1044        const NUM_LISTS: usize = 10_000;
1045        const ELEMENTS_PER_LIST: usize = 5_000;
1046
1047        let prim = (0..NUM_LISTS)
1048            .flat_map(|list_idx| {
1049                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1050            })
1051            .collect::<PrimitiveArray>()
1052            .into_array();
1053
1054        let btr = BtrBlocksCompressor::default();
1055        drop(btr.compress(prim.as_ref())?);
1056
1057        Ok(())
1058    }
1059}
1060
1061/// Tests to verify that each integer compression scheme produces the expected encoding.
1062#[cfg(test)]
1063mod scheme_selection_tests {
1064    use std::iter;
1065
1066    use vortex_array::arrays::ConstantVTable;
1067    use vortex_array::arrays::DictVTable;
1068    use vortex_array::arrays::PrimitiveArray;
1069    use vortex_array::validity::Validity;
1070    use vortex_buffer::Buffer;
1071    use vortex_error::VortexResult;
1072    use vortex_fastlanes::BitPackedVTable;
1073    use vortex_fastlanes::FoRVTable;
1074    use vortex_fastlanes::RLEVTable;
1075    use vortex_runend::RunEndVTable;
1076    use vortex_sequence::SequenceVTable;
1077    use vortex_sparse::SparseVTable;
1078
1079    use crate::BtrBlocksCompressor;
1080    use crate::CompressorContext;
1081    use crate::CompressorExt;
1082
1083    #[test]
1084    fn test_constant_compressed() -> VortexResult<()> {
1085        let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1086        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1087        let btr = BtrBlocksCompressor::default();
1088        let compressed =
1089            btr.integer_compressor()
1090                .compress(&btr, &array, CompressorContext::default(), &[])?;
1091        assert!(compressed.is::<ConstantVTable>());
1092        Ok(())
1093    }
1094
1095    #[test]
1096    fn test_for_compressed() -> VortexResult<()> {
1097        let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1098        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1099        let btr = BtrBlocksCompressor::default();
1100        let compressed =
1101            btr.integer_compressor()
1102                .compress(&btr, &array, CompressorContext::default(), &[])?;
1103        assert!(compressed.is::<FoRVTable>());
1104        Ok(())
1105    }
1106
1107    #[test]
1108    fn test_bitpacking_compressed() -> VortexResult<()> {
1109        let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1110        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1111        let btr = BtrBlocksCompressor::default();
1112        let compressed =
1113            btr.integer_compressor()
1114                .compress(&btr, &array, CompressorContext::default(), &[])?;
1115        assert!(compressed.is::<BitPackedVTable>());
1116        Ok(())
1117    }
1118
1119    #[test]
1120    fn test_sparse_compressed() -> VortexResult<()> {
1121        let mut values: Vec<i32> = Vec::new();
1122        for i in 0..1000 {
1123            if i % 20 == 0 {
1124                values.push(2_000_000 + (i * 7) % 1000);
1125            } else {
1126                values.push(1_000_000);
1127            }
1128        }
1129        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1130        let btr = BtrBlocksCompressor::default();
1131        let compressed =
1132            btr.integer_compressor()
1133                .compress(&btr, &array, CompressorContext::default(), &[])?;
1134        assert!(compressed.is::<SparseVTable>());
1135        Ok(())
1136    }
1137
1138    #[test]
1139    fn test_dict_compressed() -> VortexResult<()> {
1140        use rand::RngCore;
1141        use rand::SeedableRng;
1142        use rand::rngs::StdRng;
1143
1144        let mut codes = Vec::with_capacity(65_535);
1145        let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1146            .into_iter()
1147            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1148            .collect();
1149
1150        let mut rng = StdRng::seed_from_u64(1u64);
1151        while codes.len() < 64000 {
1152            let run_length = rng.next_u32() % 5;
1153            let value = numbers[rng.next_u32() as usize % numbers.len()];
1154            for _ in 0..run_length {
1155                codes.push(value);
1156            }
1157        }
1158
1159        let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1160        let btr = BtrBlocksCompressor::default();
1161        let compressed =
1162            btr.integer_compressor()
1163                .compress(&btr, &array, CompressorContext::default(), &[])?;
1164        assert!(compressed.is::<DictVTable>());
1165        Ok(())
1166    }
1167
1168    #[test]
1169    fn test_runend_compressed() -> VortexResult<()> {
1170        let mut values: Vec<i32> = Vec::new();
1171        for i in 0..100 {
1172            values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1173        }
1174        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1175        let btr = BtrBlocksCompressor::default();
1176        let compressed =
1177            btr.integer_compressor()
1178                .compress(&btr, &array, CompressorContext::default(), &[])?;
1179        assert!(compressed.is::<RunEndVTable>());
1180        Ok(())
1181    }
1182
1183    #[test]
1184    fn test_sequence_compressed() -> VortexResult<()> {
1185        let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1186        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1187        let btr = BtrBlocksCompressor::default();
1188        let compressed =
1189            btr.integer_compressor()
1190                .compress(&btr, &array, CompressorContext::default(), &[])?;
1191        assert!(compressed.is::<SequenceVTable>());
1192        Ok(())
1193    }
1194
1195    #[test]
1196    fn test_rle_compressed() -> VortexResult<()> {
1197        let mut values: Vec<i32> = Vec::new();
1198        for i in 0..10 {
1199            values.extend(iter::repeat_n(i, 100));
1200        }
1201        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1202        let btr = BtrBlocksCompressor::default();
1203        let compressed =
1204            btr.integer_compressor()
1205                .compress(&btr, &array, CompressorContext::default(), &[])?;
1206        assert!(compressed.is::<RLEVTable>());
1207        Ok(())
1208    }
1209}