Skip to main content

vortex_btrblocks/compressor/integer/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub(crate) mod dictionary;
5pub(super) mod stats;
6
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use enum_iterator::Sequence;
11pub use stats::IntegerStats;
12use vortex_array::ArrayRef;
13use vortex_array::Canonical;
14use vortex_array::IntoArray;
15use vortex_array::ToCanonical;
16use vortex_array::arrays::ConstantArray;
17use vortex_array::arrays::DictArray;
18use vortex_array::arrays::MaskedArray;
19use vortex_array::arrays::PrimitiveArray;
20use vortex_array::arrays::PrimitiveVTable;
21use vortex_array::scalar::Scalar;
22use vortex_array::vtable::VTable;
23use vortex_array::vtable::ValidityHelper;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_fastlanes::FoRArray;
29use vortex_fastlanes::bitpack_compress::bit_width_histogram;
30use vortex_fastlanes::bitpack_compress::bitpack_encode;
31use vortex_fastlanes::bitpack_compress::find_best_bit_width;
32use vortex_runend::RunEndArray;
33use vortex_runend::compress::runend_encode;
34use vortex_sequence::sequence_encode;
35use vortex_sparse::SparseArray;
36use vortex_sparse::SparseVTable;
37use vortex_zigzag::ZigZagArray;
38use vortex_zigzag::zigzag_encode;
39
40use self::dictionary::dictionary_encode;
41use crate::BtrBlocksCompressor;
42use crate::CanonicalCompressor;
43use crate::Compressor;
44use crate::CompressorContext;
45use crate::CompressorStats;
46use crate::Excludes;
47use crate::GenerateStatsOptions;
48use crate::Scheme;
49use crate::SchemeExt;
50use crate::compressor::patches::compress_patches;
51use crate::compressor::rle;
52use crate::compressor::rle::RLEScheme;
53
54/// All available integer compression schemes.
55pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[
56    &ConstantScheme,
57    &FORScheme,
58    &ZigZagScheme,
59    &BitPackingScheme,
60    &SparseScheme,
61    &DictScheme,
62    &RunEndScheme,
63    &SequenceScheme,
64    &RLE_INTEGER_SCHEME,
65    #[cfg(feature = "pco")]
66    &PcoScheme,
67];
68
69/// [`Compressor`] for signed and unsigned integers.
70#[derive(Clone, Copy)]
71pub struct IntCompressor<'a> {
72    /// Reference to the parent compressor.
73    pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
74}
75
76impl<'a> Compressor for IntCompressor<'a> {
77    type ArrayVTable = PrimitiveVTable;
78    type SchemeType = dyn IntegerScheme;
79    type StatsType = IntegerStats;
80
81    fn schemes(&self) -> &[&'static dyn IntegerScheme] {
82        self.btr_blocks_compressor.int_schemes()
83    }
84
85    fn default_scheme(&self) -> &'static Self::SchemeType {
86        &UncompressedScheme
87    }
88
89    fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
90        if self
91            .btr_blocks_compressor
92            .int_schemes()
93            .iter()
94            .any(|s| s.code() == IntCode::Dict)
95        {
96            IntegerStats::generate_opts(
97                array,
98                GenerateStatsOptions {
99                    count_distinct_values: true,
100                },
101            )
102        } else {
103            IntegerStats::generate_opts(
104                array,
105                GenerateStatsOptions {
106                    count_distinct_values: false,
107                },
108            )
109        }
110    }
111}
112
113pub trait IntegerScheme:
114    Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
115{
116}
117
118// Auto-impl
119impl<T> IntegerScheme for T where
120    T: Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
121{
122}
123
124impl PartialEq for dyn IntegerScheme {
125    fn eq(&self, other: &Self) -> bool {
126        self.code() == other.code()
127    }
128}
129
130impl Eq for dyn IntegerScheme {}
131
132impl Hash for dyn IntegerScheme {
133    fn hash<H: Hasher>(&self, state: &mut H) {
134        self.code().hash(state)
135    }
136}
137
138/// Unique identifier for integer compression schemes.
139///
140/// NOTE: Variant order matters for tie-breaking; `For` must precede `BitPacking` to avoid unnecessary patches.
141#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
142pub enum IntCode {
143    /// No compression applied.
144    Uncompressed,
145    /// Constant encoding for arrays with a single distinct value.
146    Constant,
147    /// Frame of Reference encoding - subtracts minimum value then bitpacks.
148    For,
149    /// BitPacking encoding - compresses non-negative integers by reducing bit width.
150    BitPacking,
151    /// ZigZag encoding - transforms negative integers to positive for better bitpacking.
152    ZigZag,
153    /// Sparse encoding - optimizes null-dominated or single-value-dominated arrays.
154    Sparse,
155    /// Dictionary encoding - creates a dictionary of unique values.
156    Dict,
157    /// Run-end encoding - run-length encoding with end positions.
158    RunEnd,
159    /// Sequence encoding - detects sequential patterns.
160    Sequence,
161    /// RLE encoding - generic run-length encoding.
162    Rle,
163    /// Pco (pcodec) compression for integers.
164    Pco,
165}
166
167#[derive(Debug, Copy, Clone, PartialEq, Eq)]
168
169pub struct UncompressedScheme;
170
171#[derive(Debug, Copy, Clone, PartialEq, Eq)]
172
173pub struct ConstantScheme;
174
175#[derive(Debug, Copy, Clone, PartialEq, Eq)]
176
177pub struct FORScheme;
178
179#[derive(Debug, Copy, Clone, PartialEq, Eq)]
180pub struct ZigZagScheme;
181
182#[derive(Debug, Copy, Clone, PartialEq, Eq)]
183pub struct BitPackingScheme;
184
185#[derive(Debug, Copy, Clone, PartialEq, Eq)]
186pub struct SparseScheme;
187
188#[derive(Debug, Copy, Clone, PartialEq, Eq)]
189pub struct DictScheme;
190
191#[derive(Debug, Copy, Clone, PartialEq, Eq)]
192pub struct RunEndScheme;
193
194#[derive(Debug, Copy, Clone, PartialEq, Eq)]
195pub struct SequenceScheme;
196
197/// Pco (pcodec) compression for integers.
198#[cfg(feature = "pco")]
199#[derive(Debug, Copy, Clone, PartialEq, Eq)]
200pub struct PcoScheme;
201
202/// Threshold for the average run length in an array before we consider run-end encoding.
203const RUN_END_THRESHOLD: u32 = 4;
204
205/// Configuration for integer RLE compression.
206#[derive(Debug, Copy, Clone, PartialEq, Eq)]
207pub struct IntRLEConfig;
208
209impl rle::RLEConfig for IntRLEConfig {
210    type Stats = IntegerStats;
211    type Code = IntCode;
212
213    const CODE: IntCode = IntCode::Rle;
214
215    fn compress_values(
216        compressor: &BtrBlocksCompressor,
217        values: &PrimitiveArray,
218        ctx: CompressorContext,
219        excludes: &[IntCode],
220    ) -> VortexResult<ArrayRef> {
221        compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into())
222    }
223}
224
225/// RLE scheme for integer compression.
226pub const RLE_INTEGER_SCHEME: RLEScheme<IntRLEConfig> = RLEScheme::new();
227
228impl Scheme for UncompressedScheme {
229    type StatsType = IntegerStats;
230    type CodeType = IntCode;
231
232    fn code(&self) -> IntCode {
233        IntCode::Uncompressed
234    }
235
236    fn expected_compression_ratio(
237        &self,
238        _compressor: &BtrBlocksCompressor,
239        _stats: &IntegerStats,
240        _ctx: CompressorContext,
241        _excludes: &[IntCode],
242    ) -> VortexResult<f64> {
243        // no compression
244        Ok(1.0)
245    }
246
247    fn compress(
248        &self,
249        _compressor: &BtrBlocksCompressor,
250        stats: &IntegerStats,
251        _ctx: CompressorContext,
252        _excludes: &[IntCode],
253    ) -> VortexResult<ArrayRef> {
254        Ok(stats.source().to_array())
255    }
256}
257
258impl Scheme for ConstantScheme {
259    type StatsType = IntegerStats;
260    type CodeType = IntCode;
261
262    fn code(&self) -> IntCode {
263        IntCode::Constant
264    }
265
266    fn is_constant(&self) -> bool {
267        true
268    }
269
270    fn expected_compression_ratio(
271        &self,
272        _compressor: &BtrBlocksCompressor,
273        stats: &IntegerStats,
274        ctx: CompressorContext,
275        _excludes: &[IntCode],
276    ) -> VortexResult<f64> {
277        // Never yield ConstantScheme for a sample, it could be a false-positive.
278        if ctx.is_sample {
279            return Ok(0.0);
280        }
281
282        // Only arrays with one distinct values can be constant compressed.
283        if stats.distinct_values_count > 1 {
284            return Ok(0.0);
285        }
286
287        Ok(stats.value_count as f64)
288    }
289
290    fn compress(
291        &self,
292        _compressor: &BtrBlocksCompressor,
293        stats: &IntegerStats,
294        _ctx: CompressorContext,
295        _excludes: &[IntCode],
296    ) -> VortexResult<ArrayRef> {
297        let scalar_idx =
298            (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
299
300        match scalar_idx {
301            Some(idx) => {
302                let scalar = stats.source().scalar_at(idx)?;
303                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
304                if !stats.source().all_valid()? {
305                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
306                } else {
307                    Ok(const_arr)
308                }
309            }
310            None => Ok(ConstantArray::new(
311                Scalar::null(stats.src.dtype().clone()),
312                stats.src.len(),
313            )
314            .into_array()),
315        }
316    }
317}
318
319impl Scheme for FORScheme {
320    type StatsType = IntegerStats;
321    type CodeType = IntCode;
322
323    fn code(&self) -> IntCode {
324        IntCode::For
325    }
326
327    fn expected_compression_ratio(
328        &self,
329        _compressor: &BtrBlocksCompressor,
330        stats: &IntegerStats,
331        ctx: CompressorContext,
332        _excludes: &[IntCode],
333    ) -> VortexResult<f64> {
334        // Only apply if we are not at the leaf
335        if ctx.allowed_cascading == 0 {
336            return Ok(0.0);
337        }
338
339        // All-null cannot be FOR compressed.
340        if stats.value_count == 0 {
341            return Ok(0.0);
342        }
343
344        // Only apply when the min is not already zero.
345        if stats.typed.min_is_zero() {
346            return Ok(0.0);
347        }
348
349        // Difference between max and min
350        let full_width: u32 = stats
351            .src
352            .ptype()
353            .bit_width()
354            .try_into()
355            .vortex_expect("bit width must fit in u32");
356        let for_bw = match stats.typed.max_minus_min().checked_ilog2() {
357            Some(l) => l + 1,
358            // If max-min == 0, it we should use a different compression scheme
359            // as we don't want to bitpack down to 0 bits.
360            None => return Ok(0.0),
361        };
362
363        // If BitPacking could apply (non-negative values) and FOR doesn't reduce bit width
364        // compared to BitPacking, don't use FOR since it has overhead (storing reference).
365        // Only skip FOR when min >= 0, otherwise BitPacking can't apply directly.
366        if let Some(max_log) = stats
367            .typed
368            .max_ilog2()
369            .filter(|_| !stats.typed.min_is_negative())
370        {
371            let bitpack_bw = max_log + 1;
372            if for_bw >= bitpack_bw {
373                return Ok(0.0);
374            }
375        }
376
377        Ok(full_width as f64 / for_bw as f64)
378    }
379
380    fn compress(
381        &self,
382        compressor: &BtrBlocksCompressor,
383        stats: &IntegerStats,
384        ctx: CompressorContext,
385        excludes: &[IntCode],
386    ) -> VortexResult<ArrayRef> {
387        let for_array = FoRArray::encode(stats.src.clone())?;
388        let biased = for_array.encoded().to_primitive();
389        let biased_stats = IntegerStats::generate_opts(
390            &biased,
391            GenerateStatsOptions {
392                count_distinct_values: false,
393            },
394        );
395
396        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
397        // of bitpacking.
398        // NOTE: we could delegate in the future if we had another downstream codec that performs
399        //  as well.
400        let leaf_ctx = CompressorContext {
401            is_sample: ctx.is_sample,
402            allowed_cascading: 0,
403        };
404        let compressed =
405            BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes)?;
406
407        let for_compressed = FoRArray::try_new(compressed, for_array.reference_scalar().clone())?;
408        for_compressed
409            .as_ref()
410            .statistics()
411            .inherit_from(for_array.as_ref().statistics());
412        Ok(for_compressed.into_array())
413    }
414}
415
416impl Scheme for ZigZagScheme {
417    type StatsType = IntegerStats;
418    type CodeType = IntCode;
419
420    fn code(&self) -> IntCode {
421        IntCode::ZigZag
422    }
423
424    fn expected_compression_ratio(
425        &self,
426        compressor: &BtrBlocksCompressor,
427        stats: &IntegerStats,
428        ctx: CompressorContext,
429        excludes: &[IntCode],
430    ) -> VortexResult<f64> {
431        // ZigZag is only useful when we cascade it with another encoding
432        if ctx.allowed_cascading == 0 {
433            return Ok(0.0);
434        }
435
436        // Don't try and compress all-null arrays
437        if stats.value_count == 0 {
438            return Ok(0.0);
439        }
440
441        // ZigZag is only useful when there are negative values.
442        if !stats.typed.min_is_negative() {
443            return Ok(0.0);
444        }
445
446        // Run compression on a sample to see how it performs.
447        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
448    }
449
450    fn compress(
451        &self,
452        compressor: &BtrBlocksCompressor,
453        stats: &IntegerStats,
454        ctx: CompressorContext,
455        excludes: &[IntCode],
456    ) -> VortexResult<ArrayRef> {
457        // Zigzag encode the values, then recursively compress the inner values.
458        let zag = zigzag_encode(stats.src.clone())?;
459        let encoded = zag.encoded().to_primitive();
460
461        // ZigZag should be after Dict, RunEnd or Sparse.
462        // We should only do these "container" style compressors once.
463        let mut new_excludes = vec![
464            ZigZagScheme.code(),
465            DictScheme.code(),
466            RunEndScheme.code(),
467            SparseScheme.code(),
468        ];
469        new_excludes.extend_from_slice(excludes);
470
471        let compressed = compressor.compress_canonical(
472            Canonical::Primitive(encoded),
473            ctx.descend(),
474            Excludes::int_only(&new_excludes),
475        )?;
476
477        tracing::debug!("zigzag output: {}", compressed.encoding_id());
478
479        Ok(ZigZagArray::try_new(compressed)?.into_array())
480    }
481}
482
483impl Scheme for BitPackingScheme {
484    type StatsType = IntegerStats;
485    type CodeType = IntCode;
486
487    fn code(&self) -> IntCode {
488        IntCode::BitPacking
489    }
490
491    fn expected_compression_ratio(
492        &self,
493        compressor: &BtrBlocksCompressor,
494        stats: &IntegerStats,
495        ctx: CompressorContext,
496        excludes: &[IntCode],
497    ) -> VortexResult<f64> {
498        // BitPacking only works for non-negative values
499        if stats.typed.min_is_negative() {
500            return Ok(0.0);
501        }
502
503        // Don't compress all-null arrays
504        if stats.value_count == 0 {
505            return Ok(0.0);
506        }
507
508        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
509    }
510
511    fn compress(
512        &self,
513        _compressor: &BtrBlocksCompressor,
514        stats: &IntegerStats,
515        _ctx: CompressorContext,
516        _excludes: &[IntCode],
517    ) -> VortexResult<ArrayRef> {
518        let histogram = bit_width_histogram(stats.source())?;
519        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
520        // If best bw is determined to be the current bit-width, return the original array.
521        if bw as usize == stats.source().ptype().bit_width() {
522            return Ok(stats.source().clone().into_array());
523        }
524        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
525
526        let patches = packed.patches().map(compress_patches).transpose()?;
527        packed.replace_patches(patches);
528
529        Ok(packed.into_array())
530    }
531}
532
533impl Scheme for SparseScheme {
534    type StatsType = IntegerStats;
535    type CodeType = IntCode;
536
537    fn code(&self) -> IntCode {
538        IntCode::Sparse
539    }
540
541    // We can avoid asserting the encoding tree instead.
542    fn expected_compression_ratio(
543        &self,
544        _compressor: &BtrBlocksCompressor,
545        stats: &IntegerStats,
546        ctx: CompressorContext,
547        _excludes: &[IntCode],
548    ) -> VortexResult<f64> {
549        // Only use `SparseScheme` if we can cascade.
550        if ctx.allowed_cascading == 0 {
551            return Ok(0.0);
552        }
553
554        if stats.value_count == 0 {
555            // All nulls should use ConstantScheme
556            return Ok(0.0);
557        }
558
559        // If the majority is null, will compress well.
560        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
561            return Ok(stats.src.len() as f64 / stats.value_count as f64);
562        }
563
564        // See if the top value accounts for >= 90% of the set values.
565        let (_, top_count) = stats.typed.top_value_and_count();
566
567        if top_count == stats.value_count {
568            // top_value is the only value, should use ConstantScheme instead
569            return Ok(0.0);
570        }
571
572        let freq = top_count as f64 / stats.value_count as f64;
573        if freq >= 0.9 {
574            // We only store the positions of the non-top values.
575            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
576        }
577
578        Ok(0.0)
579    }
580
581    fn compress(
582        &self,
583        compressor: &BtrBlocksCompressor,
584        stats: &IntegerStats,
585        ctx: CompressorContext,
586        excludes: &[IntCode],
587    ) -> VortexResult<ArrayRef> {
588        assert!(ctx.allowed_cascading > 0);
589        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
590        if top_count as usize == stats.src.len() {
591            // top_value is the only value, use ConstantScheme
592            return Ok(ConstantArray::new(
593                Scalar::primitive_value(
594                    top_pvalue,
595                    top_pvalue.ptype(),
596                    stats.src.dtype().nullability(),
597                ),
598                stats.src.len(),
599            )
600            .into_array());
601        }
602
603        let sparse_encoded = SparseArray::encode(
604            stats.src.as_ref(),
605            Some(Scalar::primitive_value(
606                top_pvalue,
607                top_pvalue.ptype(),
608                stats.src.dtype().nullability(),
609            )),
610        )?;
611
612        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
613            // Compress the values
614            let mut new_excludes = vec![SparseScheme.code(), IntCode::Dict];
615            new_excludes.extend_from_slice(excludes);
616
617            let compressed_values = compressor.compress_canonical(
618                Canonical::Primitive(sparse.patches().values().to_primitive()),
619                ctx.descend(),
620                Excludes::int_only(&new_excludes),
621            )?;
622
623            let indices = sparse.patches().indices().to_primitive().narrow()?;
624
625            let compressed_indices = compressor.compress_canonical(
626                Canonical::Primitive(indices),
627                ctx.descend(),
628                Excludes::int_only(&new_excludes),
629            )?;
630
631            SparseArray::try_new(
632                compressed_indices,
633                compressed_values,
634                sparse.len(),
635                sparse.fill_scalar().clone(),
636            )
637            .map(|a| a.into_array())
638        } else {
639            Ok(sparse_encoded)
640        }
641    }
642}
643
644impl Scheme for DictScheme {
645    type StatsType = IntegerStats;
646    type CodeType = IntCode;
647
648    fn code(&self) -> IntCode {
649        IntCode::Dict
650    }
651
652    fn expected_compression_ratio(
653        &self,
654        _compressor: &BtrBlocksCompressor,
655        stats: &IntegerStats,
656        ctx: CompressorContext,
657        _excludes: &[IntCode],
658    ) -> VortexResult<f64> {
659        // Dict should not be terminal.
660        if ctx.allowed_cascading == 0 {
661            return Ok(0.0);
662        }
663
664        if stats.value_count == 0 {
665            return Ok(0.0);
666        }
667
668        // If > 50% of the values are distinct, skip dict.
669        if stats.distinct_values_count > stats.value_count / 2 {
670            return Ok(0.0);
671        }
672
673        // Ignore nulls encoding for the estimate. We only focus on values.
674        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
675
676        // Assume codes are compressed RLE + BitPacking.
677        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
678
679        let n_runs = (stats.value_count / stats.average_run_length) as usize;
680
681        // Assume that codes will either be BitPack or RLE-BitPack
682        let codes_size_bp = (codes_bw * stats.value_count) as usize;
683        let codes_size_rle_bp = usize::checked_mul((codes_bw + 32) as usize, n_runs);
684
685        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp.unwrap_or(usize::MAX));
686
687        let before = stats.value_count as usize * stats.source().ptype().bit_width();
688
689        Ok(before as f64 / (values_size + codes_size) as f64)
690    }
691
692    fn compress(
693        &self,
694        compressor: &BtrBlocksCompressor,
695        stats: &IntegerStats,
696        ctx: CompressorContext,
697        excludes: &[IntCode],
698    ) -> VortexResult<ArrayRef> {
699        assert!(ctx.allowed_cascading > 0);
700
701        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
702        //    RLE or FOR + BP. Cascading probably wastes some time here.
703
704        let dict = dictionary_encode(stats);
705
706        // Cascade the codes child
707        // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data.
708        let mut new_excludes = vec![IntCode::Dict, IntCode::Sequence];
709        new_excludes.extend_from_slice(excludes);
710
711        let compressed_codes = compressor.compress_canonical(
712            Canonical::Primitive(dict.codes().to_primitive().narrow()?),
713            ctx.descend(),
714            Excludes::int_only(&new_excludes),
715        )?;
716
717        // SAFETY: compressing codes does not change their values
718        unsafe {
719            Ok(
720                DictArray::new_unchecked(compressed_codes, dict.values().clone())
721                    .set_all_values_referenced(dict.has_all_values_referenced())
722                    .into_array(),
723            )
724        }
725    }
726}
727
728impl Scheme for RunEndScheme {
729    type StatsType = IntegerStats;
730    type CodeType = IntCode;
731
732    fn code(&self) -> IntCode {
733        IntCode::RunEnd
734    }
735
736    fn expected_compression_ratio(
737        &self,
738        compressor: &BtrBlocksCompressor,
739        stats: &IntegerStats,
740        ctx: CompressorContext,
741        excludes: &[IntCode],
742    ) -> VortexResult<f64> {
743        // If the run length is below the threshold, drop it.
744        if stats.average_run_length < RUN_END_THRESHOLD {
745            return Ok(0.0);
746        }
747
748        if ctx.allowed_cascading == 0 {
749            return Ok(0.0);
750        }
751
752        // Run compression on a sample, see how it performs.
753        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
754    }
755
756    fn compress(
757        &self,
758        compressor: &BtrBlocksCompressor,
759        stats: &IntegerStats,
760        ctx: CompressorContext,
761        excludes: &[IntCode],
762    ) -> VortexResult<ArrayRef> {
763        assert!(ctx.allowed_cascading > 0);
764
765        // run-end encode the ends
766        let (ends, values) = runend_encode(&stats.src);
767
768        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
769        new_excludes.extend_from_slice(excludes);
770
771        let compressed_ends = compressor.compress_canonical(
772            Canonical::Primitive(ends.to_primitive()),
773            ctx.descend(),
774            Excludes::int_only(&new_excludes),
775        )?;
776
777        let compressed_values = compressor.compress_canonical(
778            Canonical::Primitive(values.to_primitive()),
779            ctx.descend(),
780            Excludes::int_only(&new_excludes),
781        )?;
782
783        // SAFETY: compression doesn't affect invariants
784        unsafe {
785            Ok(
786                RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
787                    .into_array(),
788            )
789        }
790    }
791}
792
793impl Scheme for SequenceScheme {
794    type StatsType = IntegerStats;
795    type CodeType = IntCode;
796
797    fn code(&self) -> Self::CodeType {
798        IntCode::Sequence
799    }
800
801    fn expected_compression_ratio(
802        &self,
803        _compressor: &BtrBlocksCompressor,
804        stats: &Self::StatsType,
805        _ctx: CompressorContext,
806        _excludes: &[Self::CodeType],
807    ) -> VortexResult<f64> {
808        if stats.null_count > 0 {
809            return Ok(0.0);
810        }
811
812        // If the distinct_values_count was computed (!= u32::MAX)
813        // Then all values in a sequence must be unique.
814        if stats.distinct_values_count != u32::MAX
815            && stats.distinct_values_count as usize != stats.src.len()
816        {
817            return Ok(0.0);
818        }
819
820        // Since two values are required to store base and multiplier the
821        // compression ratio is divided by 2.
822        Ok(sequence_encode(&stats.src)?
823            .map(|_| stats.src.len() as f64 / 2.0)
824            .unwrap_or(0.0))
825    }
826
827    fn compress(
828        &self,
829        _compressor: &BtrBlocksCompressor,
830        stats: &Self::StatsType,
831        _ctx: CompressorContext,
832        _excludes: &[Self::CodeType],
833    ) -> VortexResult<ArrayRef> {
834        if stats.null_count > 0 {
835            vortex_bail!("sequence encoding does not support nulls");
836        }
837        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
838    }
839}
840
841#[cfg(feature = "pco")]
842impl Scheme for PcoScheme {
843    type StatsType = IntegerStats;
844    type CodeType = IntCode;
845
846    fn code(&self) -> IntCode {
847        IntCode::Pco
848    }
849
850    fn expected_compression_ratio(
851        &self,
852        compressor: &BtrBlocksCompressor,
853        stats: &Self::StatsType,
854        ctx: CompressorContext,
855        excludes: &[IntCode],
856    ) -> VortexResult<f64> {
857        // Pco does not support I8 or U8.
858        if matches!(
859            stats.src.ptype(),
860            vortex_array::dtype::PType::I8 | vortex_array::dtype::PType::U8
861        ) {
862            return Ok(0.0);
863        }
864
865        self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
866    }
867
868    fn compress(
869        &self,
870        _compressor: &BtrBlocksCompressor,
871        stats: &Self::StatsType,
872        _ctx: CompressorContext,
873        _excludes: &[IntCode],
874    ) -> VortexResult<ArrayRef> {
875        Ok(vortex_pco::PcoArray::from_primitive(
876            stats.source(),
877            pco::DEFAULT_COMPRESSION_LEVEL,
878            8192,
879        )?
880        .into_array())
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    use std::iter;
887
888    use itertools::Itertools;
889    use rand::RngCore;
890    use rand::SeedableRng;
891    use rand::rngs::StdRng;
892    use vortex_array::Array;
893    use vortex_array::IntoArray;
894    use vortex_array::ToCanonical;
895    use vortex_array::arrays::DictVTable;
896    use vortex_array::arrays::PrimitiveArray;
897    use vortex_array::assert_arrays_eq;
898    use vortex_array::validity::Validity;
899    use vortex_array::vtable::ValidityHelper;
900    use vortex_buffer::Buffer;
901    use vortex_buffer::BufferMut;
902    use vortex_buffer::buffer;
903    use vortex_error::VortexResult;
904    use vortex_sequence::SequenceVTable;
905    use vortex_sparse::SparseVTable;
906
907    use super::IntegerStats;
908    use super::RLE_INTEGER_SCHEME;
909    use super::SequenceScheme;
910    use super::SparseScheme;
911    use crate::BtrBlocksCompressor;
912    use crate::CompressorContext;
913    use crate::CompressorExt;
914    use crate::CompressorStats;
915    use crate::Scheme;
916
917    #[test]
918    fn test_empty() -> VortexResult<()> {
919        // Make sure empty array compression does not fail
920        let btr = BtrBlocksCompressor::default();
921        let result = btr.integer_compressor().compress(
922            &btr,
923            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
924            CompressorContext::default(),
925            &[],
926        )?;
927
928        assert!(result.is_empty());
929        Ok(())
930    }
931
932    #[test]
933    fn test_dict_encodable() -> VortexResult<()> {
934        let mut codes = BufferMut::<i32>::with_capacity(65_535);
935        // Write some runs of length 3 of a handful of different values. Interrupted by some
936        // one-off values.
937
938        let numbers = [0, 10, 50, 100, 1000, 3000]
939            .into_iter()
940            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
941            .collect_vec();
942
943        let mut rng = StdRng::seed_from_u64(1u64);
944        while codes.len() < 64000 {
945            let run_length = rng.next_u32() % 5;
946            let value = numbers[rng.next_u32() as usize % numbers.len()];
947            for _ in 0..run_length {
948                codes.push(value);
949            }
950        }
951
952        let primitive = codes.freeze().into_array().to_primitive();
953        let btr = BtrBlocksCompressor::default();
954        let compressed = btr.integer_compressor().compress(
955            &btr,
956            &primitive,
957            CompressorContext::default(),
958            &[],
959        )?;
960        assert!(compressed.is::<DictVTable>());
961        Ok(())
962    }
963
964    #[test]
965    fn sparse_with_nulls() -> VortexResult<()> {
966        let array = PrimitiveArray::new(
967            buffer![189u8, 189, 189, 0, 46],
968            Validity::from_iter(vec![true, true, true, true, false]),
969        );
970        let btr = BtrBlocksCompressor::default();
971        let compressed = SparseScheme.compress(
972            &btr,
973            &IntegerStats::generate(&array),
974            CompressorContext::default(),
975            &[],
976        )?;
977        assert!(compressed.is::<SparseVTable>());
978        let decoded = compressed.clone();
979        let expected =
980            PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
981                .into_array();
982        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
983        Ok(())
984    }
985
986    #[test]
987    fn sparse_mostly_nulls() -> VortexResult<()> {
988        let array = PrimitiveArray::new(
989            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
990            Validity::from_iter(vec![
991                false, false, false, false, false, false, false, false, false, false, true,
992            ]),
993        );
994        let btr = BtrBlocksCompressor::default();
995        let compressed = SparseScheme.compress(
996            &btr,
997            &IntegerStats::generate(&array),
998            CompressorContext::default(),
999            &[],
1000        )?;
1001        assert!(compressed.is::<SparseVTable>());
1002        let decoded = compressed.clone();
1003        let expected = PrimitiveArray::new(
1004            buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
1005            array.validity().clone(),
1006        )
1007        .into_array();
1008        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1009        Ok(())
1010    }
1011
1012    #[test]
1013    fn nullable_sequence() -> VortexResult<()> {
1014        let values = (0i32..20).step_by(7).collect_vec();
1015        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1016        let btr = BtrBlocksCompressor::default();
1017        let compressed = SequenceScheme.compress(
1018            &btr,
1019            &IntegerStats::generate(&array),
1020            CompressorContext::default(),
1021            &[],
1022        )?;
1023        assert!(compressed.is::<SequenceVTable>());
1024        let decoded = compressed;
1025        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1026        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1027        Ok(())
1028    }
1029
1030    #[test]
1031    fn test_rle_compression() -> VortexResult<()> {
1032        let mut values = Vec::new();
1033        values.extend(iter::repeat_n(42i32, 100));
1034        values.extend(iter::repeat_n(123i32, 200));
1035        values.extend(iter::repeat_n(987i32, 150));
1036
1037        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1038        let btr = BtrBlocksCompressor::default();
1039        let compressed = RLE_INTEGER_SCHEME.compress(
1040            &btr,
1041            &IntegerStats::generate(&array),
1042            CompressorContext::default(),
1043            &[],
1044        )?;
1045
1046        let decoded = compressed;
1047        let expected = Buffer::copy_from(&values).into_array();
1048        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1049        Ok(())
1050    }
1051
1052    #[test_with::env(CI)]
1053    #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1054    fn compress_large_int() -> VortexResult<()> {
1055        const NUM_LISTS: usize = 10_000;
1056        const ELEMENTS_PER_LIST: usize = 5_000;
1057
1058        let prim = (0..NUM_LISTS)
1059            .flat_map(|list_idx| {
1060                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1061            })
1062            .collect::<PrimitiveArray>()
1063            .into_array();
1064
1065        let btr = BtrBlocksCompressor::default();
1066        drop(btr.compress(prim.as_ref())?);
1067
1068        Ok(())
1069    }
1070}
1071
1072/// Tests to verify that each integer compression scheme produces the expected encoding.
1073#[cfg(test)]
1074mod scheme_selection_tests {
1075    use std::iter;
1076
1077    use vortex_array::arrays::ConstantVTable;
1078    use vortex_array::arrays::DictVTable;
1079    use vortex_array::arrays::PrimitiveArray;
1080    use vortex_array::validity::Validity;
1081    use vortex_buffer::Buffer;
1082    use vortex_error::VortexResult;
1083    use vortex_fastlanes::BitPackedVTable;
1084    use vortex_fastlanes::FoRVTable;
1085    use vortex_fastlanes::RLEVTable;
1086    use vortex_runend::RunEndVTable;
1087    use vortex_sequence::SequenceVTable;
1088    use vortex_sparse::SparseVTable;
1089
1090    use crate::BtrBlocksCompressor;
1091    use crate::CompressorContext;
1092    use crate::CompressorExt;
1093
1094    #[test]
1095    fn test_constant_compressed() -> VortexResult<()> {
1096        let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1097        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1098        let btr = BtrBlocksCompressor::default();
1099        let compressed =
1100            btr.integer_compressor()
1101                .compress(&btr, &array, CompressorContext::default(), &[])?;
1102        assert!(compressed.is::<ConstantVTable>());
1103        Ok(())
1104    }
1105
1106    #[test]
1107    fn test_for_compressed() -> VortexResult<()> {
1108        let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1109        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1110        let btr = BtrBlocksCompressor::default();
1111        let compressed =
1112            btr.integer_compressor()
1113                .compress(&btr, &array, CompressorContext::default(), &[])?;
1114        assert!(compressed.is::<FoRVTable>());
1115        Ok(())
1116    }
1117
1118    #[test]
1119    fn test_bitpacking_compressed() -> VortexResult<()> {
1120        let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1121        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1122        let btr = BtrBlocksCompressor::default();
1123        let compressed =
1124            btr.integer_compressor()
1125                .compress(&btr, &array, CompressorContext::default(), &[])?;
1126        assert!(compressed.is::<BitPackedVTable>());
1127        Ok(())
1128    }
1129
1130    #[test]
1131    fn test_sparse_compressed() -> VortexResult<()> {
1132        let mut values: Vec<i32> = Vec::new();
1133        for i in 0..1000 {
1134            if i % 20 == 0 {
1135                values.push(2_000_000 + (i * 7) % 1000);
1136            } else {
1137                values.push(1_000_000);
1138            }
1139        }
1140        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1141        let btr = BtrBlocksCompressor::default();
1142        let compressed =
1143            btr.integer_compressor()
1144                .compress(&btr, &array, CompressorContext::default(), &[])?;
1145        assert!(compressed.is::<SparseVTable>());
1146        Ok(())
1147    }
1148
1149    #[test]
1150    fn test_dict_compressed() -> VortexResult<()> {
1151        use rand::RngCore;
1152        use rand::SeedableRng;
1153        use rand::rngs::StdRng;
1154
1155        let mut codes = Vec::with_capacity(65_535);
1156        let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1157            .into_iter()
1158            .map(|i| 12340 * i) // must be big enough to not prefer fastlanes.bitpacked
1159            .collect();
1160
1161        let mut rng = StdRng::seed_from_u64(1u64);
1162        while codes.len() < 64000 {
1163            let run_length = rng.next_u32() % 5;
1164            let value = numbers[rng.next_u32() as usize % numbers.len()];
1165            for _ in 0..run_length {
1166                codes.push(value);
1167            }
1168        }
1169
1170        let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1171        let btr = BtrBlocksCompressor::default();
1172        let compressed =
1173            btr.integer_compressor()
1174                .compress(&btr, &array, CompressorContext::default(), &[])?;
1175        assert!(compressed.is::<DictVTable>());
1176        Ok(())
1177    }
1178
1179    #[test]
1180    fn test_runend_compressed() -> VortexResult<()> {
1181        let mut values: Vec<i32> = Vec::new();
1182        for i in 0..100 {
1183            values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1184        }
1185        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1186        let btr = BtrBlocksCompressor::default();
1187        let compressed =
1188            btr.integer_compressor()
1189                .compress(&btr, &array, CompressorContext::default(), &[])?;
1190        assert!(compressed.is::<RunEndVTable>());
1191        Ok(())
1192    }
1193
1194    #[test]
1195    fn test_sequence_compressed() -> VortexResult<()> {
1196        let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1197        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1198        let btr = BtrBlocksCompressor::default();
1199        let compressed =
1200            btr.integer_compressor()
1201                .compress(&btr, &array, CompressorContext::default(), &[])?;
1202        assert!(compressed.is::<SequenceVTable>());
1203        Ok(())
1204    }
1205
1206    #[test]
1207    fn test_rle_compressed() -> VortexResult<()> {
1208        let mut values: Vec<i32> = Vec::new();
1209        for i in 0..10 {
1210            values.extend(iter::repeat_n(i, 100));
1211        }
1212        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1213        let btr = BtrBlocksCompressor::default();
1214        let compressed =
1215            btr.integer_compressor()
1216                .compress(&btr, &array, CompressorContext::default(), &[])?;
1217        assert!(compressed.is::<RLEVTable>());
1218        Ok(())
1219    }
1220}