vortex_btrblocks/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod dictionary;
5mod stats;
6
7use std::fmt::Debug;
8use std::hash::Hash;
9
10pub use stats::IntegerStats;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::ToCanonical;
14use vortex_array::arrays::ConstantArray;
15use vortex_array::arrays::DictArray;
16use vortex_array::arrays::MaskedArray;
17use vortex_array::arrays::PrimitiveArray;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::vtable::ValidityHelper;
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_error::vortex_bail;
23use vortex_error::vortex_err;
24use vortex_fastlanes::FoRArray;
25use vortex_fastlanes::bitpack_compress::bit_width_histogram;
26use vortex_fastlanes::bitpack_compress::bitpack_encode;
27use vortex_fastlanes::bitpack_compress::find_best_bit_width;
28use vortex_runend::RunEndArray;
29use vortex_runend::compress::runend_encode;
30use vortex_scalar::Scalar;
31use vortex_sequence::sequence_encode;
32use vortex_sparse::SparseArray;
33use vortex_sparse::SparseVTable;
34use vortex_zigzag::ZigZagArray;
35use vortex_zigzag::zigzag_encode;
36
37use crate::Compressor;
38use crate::CompressorStats;
39use crate::GenerateStatsOptions;
40use crate::Scheme;
41use crate::estimate_compression_ratio_with_sampling;
42use crate::integer::dictionary::dictionary_encode;
43use crate::patches::compress_patches;
44use crate::rle::RLEScheme;
45
46/// [`Compressor`] for signed and unsigned integers.
47pub struct IntCompressor;
48
49impl Compressor for IntCompressor {
50    type ArrayVTable = PrimitiveVTable;
51    type SchemeType = dyn IntegerScheme;
52    type StatsType = IntegerStats;
53
54    fn schemes() -> &'static [&'static dyn IntegerScheme] {
55        &[
56            &ConstantScheme,
57            &FORScheme,
58            &ZigZagScheme,
59            &BitPackingScheme,
60            &SparseScheme,
61            &DictScheme,
62            &RunEndScheme,
63            &SequenceScheme,
64            &RLE_INTEGER_SCHEME,
65        ]
66    }
67
68    fn default_scheme() -> &'static Self::SchemeType {
69        &UncompressedScheme
70    }
71
72    fn dict_scheme_code() -> IntCode {
73        DICT_SCHEME
74    }
75}
76
77impl IntCompressor {
78    pub(crate) fn compress_no_dict(
79        array: &PrimitiveArray,
80        is_sample: bool,
81        allowed_cascading: usize,
82        excludes: &[IntCode],
83    ) -> VortexResult<ArrayRef> {
84        let stats = IntegerStats::generate_opts(
85            array,
86            GenerateStatsOptions {
87                count_distinct_values: false,
88            },
89        );
90
91        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
92        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
93
94        if output.nbytes() < array.nbytes() {
95            Ok(output)
96        } else {
97            tracing::debug!("resulting tree too large: {}", output.display_tree());
98            Ok(array.to_array())
99        }
100    }
101}
102
103pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
104
105// Auto-impl
106impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
107
108#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
109pub struct IntCode(u8);
110
111const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
112const CONSTANT_SCHEME: IntCode = IntCode(1);
113const FOR_SCHEME: IntCode = IntCode(2);
114const ZIGZAG_SCHEME: IntCode = IntCode(3);
115const BITPACKING_SCHEME: IntCode = IntCode(4);
116const SPARSE_SCHEME: IntCode = IntCode(5);
117const DICT_SCHEME: IntCode = IntCode(6);
118const RUN_END_SCHEME: IntCode = IntCode(7);
119const SEQUENCE_SCHEME: IntCode = IntCode(8);
120const RUN_LENGTH_SCHEME: IntCode = IntCode(9);
121
122#[derive(Debug, Copy, Clone)]
123pub struct UncompressedScheme;
124
125#[derive(Debug, Copy, Clone)]
126pub struct ConstantScheme;
127
128#[derive(Debug, Copy, Clone)]
129pub struct FORScheme;
130
131#[derive(Debug, Copy, Clone)]
132pub struct ZigZagScheme;
133
134#[derive(Debug, Copy, Clone)]
135pub struct BitPackingScheme;
136
137#[derive(Debug, Copy, Clone)]
138pub struct SparseScheme;
139
140#[derive(Debug, Copy, Clone)]
141pub struct DictScheme;
142
143#[derive(Debug, Copy, Clone)]
144pub struct RunEndScheme;
145
146#[derive(Debug, Copy, Clone)]
147pub struct SequenceScheme;
148
149/// Threshold for the average run length in an array before we consider run-end encoding.
150const RUN_END_THRESHOLD: u32 = 4;
151
152pub const RLE_INTEGER_SCHEME: RLEScheme<IntegerStats, IntCode> = RLEScheme::new(
153    RUN_LENGTH_SCHEME,
154    |values, is_sample, allowed_cascading, excludes| {
155        IntCompressor::compress_no_dict(values, is_sample, allowed_cascading, excludes)
156    },
157);
158
159impl Scheme for UncompressedScheme {
160    type StatsType = IntegerStats;
161    type CodeType = IntCode;
162
163    fn code(&self) -> IntCode {
164        UNCOMPRESSED_SCHEME
165    }
166
167    fn expected_compression_ratio(
168        &self,
169        _stats: &IntegerStats,
170        _is_sample: bool,
171        _allowed_cascading: usize,
172        _excludes: &[IntCode],
173    ) -> VortexResult<f64> {
174        // no compression
175        Ok(1.0)
176    }
177
178    fn compress(
179        &self,
180        stats: &IntegerStats,
181        _is_sample: bool,
182        _allowed_cascading: usize,
183        _excludes: &[IntCode],
184    ) -> VortexResult<ArrayRef> {
185        Ok(stats.source().to_array())
186    }
187}
188
189impl Scheme for ConstantScheme {
190    type StatsType = IntegerStats;
191    type CodeType = IntCode;
192
193    fn code(&self) -> IntCode {
194        CONSTANT_SCHEME
195    }
196
197    fn is_constant(&self) -> bool {
198        true
199    }
200
201    fn expected_compression_ratio(
202        &self,
203        stats: &IntegerStats,
204        is_sample: bool,
205        _allowed_cascading: usize,
206        _excludes: &[IntCode],
207    ) -> VortexResult<f64> {
208        // Never yield ConstantScheme for a sample, it could be a false-positive.
209        if is_sample {
210            return Ok(0.0);
211        }
212
213        // Only arrays with one distinct values can be constant compressed.
214        if stats.distinct_values_count > 1 {
215            return Ok(0.0);
216        }
217
218        Ok(stats.value_count as f64)
219    }
220
221    fn compress(
222        &self,
223        stats: &IntegerStats,
224        _is_sample: bool,
225        _allowed_cascading: usize,
226        _excludes: &[IntCode],
227    ) -> VortexResult<ArrayRef> {
228        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
229
230        match scalar_idx {
231            Some(idx) => {
232                let scalar = stats.source().scalar_at(idx);
233                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
234                if !stats.source().all_valid() {
235                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
236                } else {
237                    Ok(const_arr)
238                }
239            }
240            None => Ok(ConstantArray::new(
241                Scalar::null(stats.src.dtype().clone()),
242                stats.src.len(),
243            )
244            .into_array()),
245        }
246    }
247}
248
249impl Scheme for FORScheme {
250    type StatsType = IntegerStats;
251    type CodeType = IntCode;
252
253    fn code(&self) -> IntCode {
254        FOR_SCHEME
255    }
256
257    fn expected_compression_ratio(
258        &self,
259        stats: &IntegerStats,
260        _is_sample: bool,
261        allowed_cascading: usize,
262        _excludes: &[IntCode],
263    ) -> VortexResult<f64> {
264        // Only apply if we are not at the leaf
265        if allowed_cascading == 0 {
266            return Ok(0.0);
267        }
268
269        // All-null cannot be FOR compressed.
270        if stats.value_count == 0 {
271            return Ok(0.0);
272        }
273
274        // Only apply when the min is not already zero.
275        if stats.typed.min_is_zero() {
276            return Ok(0.0);
277        }
278
279        // Difference between max and min
280        let full_width: u32 = stats
281            .src
282            .ptype()
283            .bit_width()
284            .try_into()
285            .vortex_expect("bit width must fit in u32");
286        let bw = match stats.typed.max_minus_min().checked_ilog2() {
287            Some(l) => l + 1,
288            // If max-min == 0, it we should use a different compression scheme
289            // as we don't want to bitpack down to 0 bits.
290            None => return Ok(0.0),
291        };
292
293        // If we're not saving at least 1 byte, don't bother with FOR
294        if full_width - bw < 8 {
295            return Ok(0.0);
296        }
297
298        Ok(full_width as f64 / bw as f64)
299    }
300
301    fn compress(
302        &self,
303        stats: &IntegerStats,
304        is_sample: bool,
305        _allowed_cascading: usize,
306        excludes: &[IntCode],
307    ) -> VortexResult<ArrayRef> {
308        let for_array = FoRArray::encode(stats.src.clone())?;
309        let biased = for_array.encoded().to_primitive();
310        let biased_stats = IntegerStats::generate_opts(
311            &biased,
312            GenerateStatsOptions {
313                count_distinct_values: false,
314            },
315        );
316
317        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
318        // of bitpacking.
319        // NOTE: we could delegate in the future if we had another downstream codec that performs
320        //  as well.
321        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
322
323        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
324    }
325}
326
327impl Scheme for ZigZagScheme {
328    type StatsType = IntegerStats;
329    type CodeType = IntCode;
330
331    fn code(&self) -> IntCode {
332        ZIGZAG_SCHEME
333    }
334
335    fn expected_compression_ratio(
336        &self,
337        stats: &IntegerStats,
338        is_sample: bool,
339        allowed_cascading: usize,
340        excludes: &[IntCode],
341    ) -> VortexResult<f64> {
342        // ZigZag is only useful when we cascade it with another encoding
343        if allowed_cascading == 0 {
344            return Ok(0.0);
345        }
346
347        // Don't try and compress all-null arrays
348        if stats.value_count == 0 {
349            return Ok(0.0);
350        }
351
352        // ZigZag is only useful when there are negative values.
353        if !stats.typed.min_is_negative() {
354            return Ok(0.0);
355        }
356
357        // Run compression on a sample to see how it performs.
358        estimate_compression_ratio_with_sampling(
359            self,
360            stats,
361            is_sample,
362            allowed_cascading,
363            excludes,
364        )
365    }
366
367    fn compress(
368        &self,
369        stats: &IntegerStats,
370        is_sample: bool,
371        allowed_cascading: usize,
372        excludes: &[IntCode],
373    ) -> VortexResult<ArrayRef> {
374        // Zigzag encode the values, then recursively compress the inner values.
375        let zag = zigzag_encode(stats.src.clone())?;
376        let encoded = zag.encoded().to_primitive();
377
378        // ZigZag should be after Dict, RunEnd or Sparse.
379        // We should only do these "container" style compressors once.
380        let mut new_excludes = vec![
381            ZigZagScheme.code(),
382            DictScheme.code(),
383            RunEndScheme.code(),
384            SparseScheme.code(),
385        ];
386        new_excludes.extend_from_slice(excludes);
387
388        let compressed =
389            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
390
391        tracing::debug!("zigzag output: {}", compressed.display_tree());
392
393        Ok(ZigZagArray::try_new(compressed)?.into_array())
394    }
395}
396
397impl Scheme for BitPackingScheme {
398    type StatsType = IntegerStats;
399    type CodeType = IntCode;
400
401    fn code(&self) -> IntCode {
402        BITPACKING_SCHEME
403    }
404
405    fn expected_compression_ratio(
406        &self,
407        stats: &IntegerStats,
408        is_sample: bool,
409        allowed_cascading: usize,
410        excludes: &[IntCode],
411    ) -> VortexResult<f64> {
412        // BitPacking only works for non-negative values
413        if stats.typed.min_is_negative() {
414            return Ok(0.0);
415        }
416
417        // Don't compress all-null arrays
418        if stats.value_count == 0 {
419            return Ok(0.0);
420        }
421
422        estimate_compression_ratio_with_sampling(
423            self,
424            stats,
425            is_sample,
426            allowed_cascading,
427            excludes,
428        )
429    }
430
431    fn compress(
432        &self,
433        stats: &IntegerStats,
434        _is_sample: bool,
435        _allowed_cascading: usize,
436        _excludes: &[IntCode],
437    ) -> VortexResult<ArrayRef> {
438        let histogram = bit_width_histogram(stats.source())?;
439        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
440        // If best bw is determined to be the current bit-width, return the original array.
441        if bw as usize == stats.source().ptype().bit_width() {
442            return Ok(stats.source().clone().into_array());
443        }
444        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
445
446        let patches = packed.patches().map(compress_patches).transpose()?;
447        packed.replace_patches(patches);
448
449        Ok(packed.into_array())
450    }
451}
452
453impl Scheme for SparseScheme {
454    type StatsType = IntegerStats;
455    type CodeType = IntCode;
456
457    fn code(&self) -> IntCode {
458        SPARSE_SCHEME
459    }
460
461    // We can avoid asserting the encoding tree instead.
462    fn expected_compression_ratio(
463        &self,
464        stats: &IntegerStats,
465        _is_sample: bool,
466        allowed_cascading: usize,
467        _excludes: &[IntCode],
468    ) -> VortexResult<f64> {
469        // Only use `SparseScheme` if we can cascade.
470        if allowed_cascading == 0 {
471            return Ok(0.0);
472        }
473
474        if stats.value_count == 0 {
475            // All nulls should use ConstantScheme
476            return Ok(0.0);
477        }
478
479        // If the majority is null, will compress well.
480        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
481            return Ok(stats.src.len() as f64 / stats.value_count as f64);
482        }
483
484        // See if the top value accounts for >= 90% of the set values.
485        let (_, top_count) = stats.typed.top_value_and_count();
486
487        if top_count == stats.value_count {
488            // top_value is the only value, should use ConstantScheme instead
489            return Ok(0.0);
490        }
491
492        let freq = top_count as f64 / stats.value_count as f64;
493        if freq >= 0.9 {
494            // We only store the positions of the non-top values.
495            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
496        }
497
498        Ok(0.0)
499    }
500
501    fn compress(
502        &self,
503        stats: &IntegerStats,
504        is_sample: bool,
505        allowed_cascading: usize,
506        excludes: &[IntCode],
507    ) -> VortexResult<ArrayRef> {
508        assert!(allowed_cascading > 0);
509        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
510        if top_count as usize == stats.src.len() {
511            // top_value is the only value, use ConstantScheme
512            return Ok(ConstantArray::new(
513                Scalar::primitive_value(
514                    top_pvalue,
515                    top_pvalue.ptype(),
516                    stats.src.dtype().nullability(),
517                ),
518                stats.src.len(),
519            )
520            .into_array());
521        }
522
523        let sparse_encoded = SparseArray::encode(
524            stats.src.as_ref(),
525            Some(Scalar::primitive_value(
526                top_pvalue,
527                top_pvalue.ptype(),
528                stats.src.dtype().nullability(),
529            )),
530        )?;
531
532        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
533            // Compress the values
534            let mut new_excludes = vec![SparseScheme.code()];
535            new_excludes.extend_from_slice(excludes);
536
537            let compressed_values = IntCompressor::compress_no_dict(
538                &sparse.patches().values().to_primitive(),
539                is_sample,
540                allowed_cascading - 1,
541                &new_excludes,
542            )?;
543
544            let indices = sparse.patches().indices().to_primitive().narrow()?;
545
546            let compressed_indices = IntCompressor::compress_no_dict(
547                &indices,
548                is_sample,
549                allowed_cascading - 1,
550                &new_excludes,
551            )?;
552
553            SparseArray::try_new(
554                compressed_indices,
555                compressed_values,
556                sparse.len(),
557                sparse.fill_scalar().clone(),
558            )
559            .map(|a| a.into_array())
560        } else {
561            Ok(sparse_encoded)
562        }
563    }
564}
565
566impl Scheme for DictScheme {
567    type StatsType = IntegerStats;
568    type CodeType = IntCode;
569
570    fn code(&self) -> IntCode {
571        DICT_SCHEME
572    }
573
574    fn expected_compression_ratio(
575        &self,
576        stats: &IntegerStats,
577        _is_sample: bool,
578        allowed_cascading: usize,
579        _excludes: &[IntCode],
580    ) -> VortexResult<f64> {
581        // Dict should not be terminal.
582        if allowed_cascading == 0 {
583            return Ok(0.0);
584        }
585
586        if stats.value_count == 0 {
587            return Ok(0.0);
588        }
589
590        // If > 50% of the values are distinct, skip dict.
591        if stats.distinct_values_count > stats.value_count / 2 {
592            return Ok(0.0);
593        }
594
595        // Ignore nulls encoding for the estimate. We only focus on values.
596        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
597
598        // Assume codes are compressed RLE + BitPacking.
599        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
600
601        let n_runs = (stats.value_count / stats.average_run_length) as usize;
602
603        // Assume that codes will either be BitPack or RLE-BitPack
604        let codes_size_bp = (codes_bw * stats.value_count) as usize;
605        let codes_size_rle_bp = usize::checked_mul((codes_bw + 32) as usize, n_runs);
606
607        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp.unwrap_or(usize::MAX));
608
609        let before = stats.value_count as usize * stats.source().ptype().bit_width();
610
611        Ok(before as f64 / (values_size + codes_size) as f64)
612    }
613
614    fn compress(
615        &self,
616        stats: &IntegerStats,
617        is_sample: bool,
618        allowed_cascading: usize,
619        excludes: &[IntCode],
620    ) -> VortexResult<ArrayRef> {
621        assert!(allowed_cascading > 0);
622
623        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
624        //    RLE or FOR + BP. Cascading probably wastes some time here.
625
626        let dict = dictionary_encode(stats);
627
628        // Cascade the codes child
629        // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data.
630        let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME];
631        new_excludes.extend_from_slice(excludes);
632
633        let compressed_codes = IntCompressor::compress_no_dict(
634            &dict.codes().to_primitive().narrow()?,
635            is_sample,
636            allowed_cascading - 1,
637            &new_excludes,
638        )?;
639
640        // SAFETY: compressing codes does not change their values
641        unsafe {
642            Ok(
643                DictArray::new_unchecked(compressed_codes, dict.values().clone())
644                    .set_all_values_referenced(dict.has_all_values_referenced())
645                    .into_array(),
646            )
647        }
648    }
649}
650
651impl Scheme for RunEndScheme {
652    type StatsType = IntegerStats;
653    type CodeType = IntCode;
654
655    fn code(&self) -> IntCode {
656        RUN_END_SCHEME
657    }
658
659    fn expected_compression_ratio(
660        &self,
661        stats: &IntegerStats,
662        is_sample: bool,
663        allowed_cascading: usize,
664        excludes: &[IntCode],
665    ) -> VortexResult<f64> {
666        // If the run length is below the threshold, drop it.
667        if stats.average_run_length < RUN_END_THRESHOLD {
668            return Ok(0.0);
669        }
670
671        if allowed_cascading == 0 {
672            return Ok(0.0);
673        }
674
675        // Run compression on a sample, see how it performs.
676        estimate_compression_ratio_with_sampling(
677            self,
678            stats,
679            is_sample,
680            allowed_cascading,
681            excludes,
682        )
683    }
684
685    fn compress(
686        &self,
687        stats: &IntegerStats,
688        is_sample: bool,
689        allowed_cascading: usize,
690        excludes: &[IntCode],
691    ) -> VortexResult<ArrayRef> {
692        assert!(allowed_cascading > 0);
693
694        // run-end encode the ends
695        let (ends, values) = runend_encode(&stats.src);
696
697        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
698        new_excludes.extend_from_slice(excludes);
699
700        let ends_stats = IntegerStats::generate_opts(
701            &ends.to_primitive(),
702            GenerateStatsOptions {
703                count_distinct_values: false,
704            },
705        );
706        let ends_scheme = IntCompressor::choose_scheme(
707            &ends_stats,
708            is_sample,
709            allowed_cascading - 1,
710            &new_excludes,
711        )?;
712        let compressed_ends =
713            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
714
715        let compressed_values = IntCompressor::compress_no_dict(
716            &values.to_primitive(),
717            is_sample,
718            allowed_cascading - 1,
719            &new_excludes,
720        )?;
721
722        // SAFETY: compression doesn't affect invariants
723        unsafe {
724            Ok(
725                RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
726                    .into_array(),
727            )
728        }
729    }
730}
731
732impl Scheme for SequenceScheme {
733    type StatsType = IntegerStats;
734    type CodeType = IntCode;
735
736    fn code(&self) -> Self::CodeType {
737        SEQUENCE_SCHEME
738    }
739
740    fn expected_compression_ratio(
741        &self,
742        stats: &Self::StatsType,
743        _is_sample: bool,
744        _allowed_cascading: usize,
745        _excludes: &[Self::CodeType],
746    ) -> VortexResult<f64> {
747        if stats.null_count > 0 {
748            return Ok(0.0);
749        }
750        // Since two values are required to store base and multiplier the
751        // compression ratio is divided by 2.
752        Ok(sequence_encode(&stats.src)?
753            .map(|_| stats.src.len() as f64 / 2.0)
754            .unwrap_or(0.0))
755    }
756
757    fn compress(
758        &self,
759        stats: &Self::StatsType,
760        _is_sample: bool,
761        _allowed_cascading: usize,
762        _excludes: &[Self::CodeType],
763    ) -> VortexResult<ArrayRef> {
764        if stats.null_count > 0 {
765            vortex_bail!("sequence encoding does not support nulls");
766        }
767        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use std::iter;
774
775    use itertools::Itertools;
776    use rand::RngCore;
777    use rand::SeedableRng;
778    use rand::rngs::StdRng;
779    use vortex_array::Array;
780    use vortex_array::IntoArray;
781    use vortex_array::ToCanonical;
782    use vortex_array::arrays::DictVTable;
783    use vortex_array::arrays::PrimitiveArray;
784    use vortex_array::assert_arrays_eq;
785    use vortex_array::validity::Validity;
786    use vortex_array::vtable::ValidityHelper;
787    use vortex_buffer::Buffer;
788    use vortex_buffer::BufferMut;
789    use vortex_buffer::buffer;
790    use vortex_error::VortexResult;
791    use vortex_sequence::SequenceVTable;
792    use vortex_sparse::SparseVTable;
793
794    use crate::Compressor;
795    use crate::CompressorStats;
796    use crate::FloatCompressor;
797    use crate::Scheme;
798    use crate::integer::IntCompressor;
799    use crate::integer::IntegerStats;
800    use crate::integer::RLE_INTEGER_SCHEME;
801    use crate::integer::SequenceScheme;
802    use crate::integer::SparseScheme;
803
804    #[test]
805    fn test_empty() {
806        // Make sure empty array compression does not fail
807        let result = IntCompressor::compress(
808            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
809            false,
810            3,
811            &[],
812        )
813        .unwrap();
814
815        assert!(result.is_empty());
816    }
817
818    #[test]
819    fn test_dict_encodable() {
820        let mut codes = BufferMut::<i32>::with_capacity(65_535);
821        // Write some runs of length 3 of a handful of different values. Interrupted by some
822        // one-off values.
823
824        let numbers = [0, 10, 50, 100, 1000, 3000]
825            .into_iter()
826            .map(|i| 1234 * i)
827            .collect_vec();
828
829        let mut rng = StdRng::seed_from_u64(1u64);
830        while codes.len() < 64000 {
831            let run_length = rng.next_u32() % 5;
832            let value = numbers[rng.next_u32() as usize % numbers.len()];
833            for _ in 0..run_length {
834                codes.push(value);
835            }
836        }
837
838        let primitive = codes.freeze().into_array().to_primitive();
839        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
840        assert!(compressed.is::<DictVTable>());
841    }
842
843    #[test]
844    fn sparse_with_nulls() {
845        let array = PrimitiveArray::new(
846            buffer![189u8, 189, 189, 0, 46],
847            Validity::from_iter(vec![true, true, true, true, false]),
848        );
849        let compressed = SparseScheme
850            .compress(&IntegerStats::generate(&array), false, 3, &[])
851            .unwrap();
852        assert!(compressed.is::<SparseVTable>());
853        let decoded = compressed.clone();
854        let expected =
855            PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
856                .into_array();
857        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
858    }
859
860    #[test]
861    fn sparse_mostly_nulls() {
862        let array = PrimitiveArray::new(
863            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
864            Validity::from_iter(vec![
865                false, false, false, false, false, false, false, false, false, false, true,
866            ]),
867        );
868        let compressed = SparseScheme
869            .compress(&IntegerStats::generate(&array), false, 3, &[])
870            .unwrap();
871        assert!(compressed.is::<SparseVTable>());
872        let decoded = compressed.clone();
873        let expected = PrimitiveArray::new(
874            buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
875            array.validity().clone(),
876        )
877        .into_array();
878        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
879    }
880
881    #[test]
882    fn nullable_sequence() {
883        let values = (0i32..20).step_by(7).collect_vec();
884        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
885        let compressed = SequenceScheme
886            .compress(&IntegerStats::generate(&array), false, 3, &[])
887            .unwrap();
888        assert!(compressed.is::<SequenceVTable>());
889        let decoded = compressed;
890        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
891        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
892    }
893
894    #[test]
895    fn test_rle_compression() {
896        let mut values = Vec::new();
897        values.extend(iter::repeat_n(42i32, 100));
898        values.extend(iter::repeat_n(123i32, 200));
899        values.extend(iter::repeat_n(987i32, 150));
900
901        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
902        let compressed = RLE_INTEGER_SCHEME
903            .compress(&IntegerStats::generate(&array), false, 3, &[])
904            .unwrap();
905
906        let decoded = compressed;
907        let expected = Buffer::copy_from(&values).into_array();
908        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
909    }
910
911    #[test_with::env(CI)]
912    fn compress_large_int() -> VortexResult<()> {
913        const NUM_LISTS: usize = 10_000;
914        const ELEMENTS_PER_LIST: usize = 5_000;
915
916        let prim = (0..NUM_LISTS)
917            .flat_map(|list_idx| {
918                (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
919            })
920            .collect::<PrimitiveArray>();
921
922        drop(FloatCompressor::compress(&prim, false, 3, &[])?);
923
924        Ok(())
925    }
926}