vortex_btrblocks/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod dictionary;
5mod stats;
6
7use std::fmt::Debug;
8use std::hash::Hash;
9
10pub use stats::IntegerStats;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::ToCanonical;
14use vortex_array::arrays::ConstantArray;
15use vortex_array::arrays::DictArray;
16use vortex_array::arrays::MaskedArray;
17use vortex_array::arrays::PrimitiveArray;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::vtable::ValidityHelper;
20use vortex_error::VortexResult;
21use vortex_error::VortexUnwrap;
22use vortex_error::vortex_bail;
23use vortex_error::vortex_err;
24use vortex_fastlanes::FoRArray;
25use vortex_fastlanes::bitpack_compress::bit_width_histogram;
26use vortex_fastlanes::bitpack_compress::bitpack_encode;
27use vortex_fastlanes::bitpack_compress::find_best_bit_width;
28use vortex_runend::RunEndArray;
29use vortex_runend::compress::runend_encode;
30use vortex_scalar::Scalar;
31use vortex_sequence::sequence_encode;
32use vortex_sparse::SparseArray;
33use vortex_sparse::SparseVTable;
34use vortex_zigzag::ZigZagArray;
35use vortex_zigzag::zigzag_encode;
36
37use crate::Compressor;
38use crate::CompressorStats;
39use crate::GenerateStatsOptions;
40use crate::Scheme;
41use crate::estimate_compression_ratio_with_sampling;
42use crate::integer::dictionary::dictionary_encode;
43use crate::patches::compress_patches;
44use crate::rle::RLEScheme;
45
46/// [`Compressor`] for signed and unsigned integers.
47pub struct IntCompressor;
48
49impl Compressor for IntCompressor {
50    type ArrayVTable = PrimitiveVTable;
51    type SchemeType = dyn IntegerScheme;
52    type StatsType = IntegerStats;
53
54    fn schemes() -> &'static [&'static dyn IntegerScheme] {
55        &[
56            &ConstantScheme,
57            &FORScheme,
58            &ZigZagScheme,
59            &BitPackingScheme,
60            &SparseScheme,
61            &DictScheme,
62            &RunEndScheme,
63            &SequenceScheme,
64            &RLE_INTEGER_SCHEME,
65        ]
66    }
67
68    fn default_scheme() -> &'static Self::SchemeType {
69        &UncompressedScheme
70    }
71
72    fn dict_scheme_code() -> IntCode {
73        DICT_SCHEME
74    }
75}
76
77impl IntCompressor {
78    pub(crate) fn compress_no_dict(
79        array: &PrimitiveArray,
80        is_sample: bool,
81        allowed_cascading: usize,
82        excludes: &[IntCode],
83    ) -> VortexResult<ArrayRef> {
84        let stats = IntegerStats::generate_opts(
85            array,
86            GenerateStatsOptions {
87                count_distinct_values: false,
88            },
89        );
90
91        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
92        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
93
94        if output.nbytes() < array.nbytes() {
95            Ok(output)
96        } else {
97            log::debug!("resulting tree too large: {}", output.display_tree());
98            Ok(array.to_array())
99        }
100    }
101}
102
103pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
104
105// Auto-impl
106impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
107
108#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
109pub struct IntCode(u8);
110
111const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
112const CONSTANT_SCHEME: IntCode = IntCode(1);
113const FOR_SCHEME: IntCode = IntCode(2);
114const ZIGZAG_SCHEME: IntCode = IntCode(3);
115const BITPACKING_SCHEME: IntCode = IntCode(4);
116const SPARSE_SCHEME: IntCode = IntCode(5);
117const DICT_SCHEME: IntCode = IntCode(6);
118const RUN_END_SCHEME: IntCode = IntCode(7);
119const SEQUENCE_SCHEME: IntCode = IntCode(8);
120const RUN_LENGTH_SCHEME: IntCode = IntCode(9);
121
122#[derive(Debug, Copy, Clone)]
123pub struct UncompressedScheme;
124
125#[derive(Debug, Copy, Clone)]
126pub struct ConstantScheme;
127
128#[derive(Debug, Copy, Clone)]
129pub struct FORScheme;
130
131#[derive(Debug, Copy, Clone)]
132pub struct ZigZagScheme;
133
134#[derive(Debug, Copy, Clone)]
135pub struct BitPackingScheme;
136
137#[derive(Debug, Copy, Clone)]
138pub struct SparseScheme;
139
140#[derive(Debug, Copy, Clone)]
141pub struct DictScheme;
142
143#[derive(Debug, Copy, Clone)]
144pub struct RunEndScheme;
145
146#[derive(Debug, Copy, Clone)]
147pub struct SequenceScheme;
148
149/// Threshold for the average run length in an array before we consider run-end encoding.
150const RUN_END_THRESHOLD: u32 = 4;
151
152pub const RLE_INTEGER_SCHEME: RLEScheme<IntegerStats, IntCode> = RLEScheme::new(
153    RUN_LENGTH_SCHEME,
154    |values, is_sample, allowed_cascading, excludes| {
155        IntCompressor::compress_no_dict(values, is_sample, allowed_cascading, excludes)
156    },
157);
158
159impl Scheme for UncompressedScheme {
160    type StatsType = IntegerStats;
161    type CodeType = IntCode;
162
163    fn code(&self) -> IntCode {
164        UNCOMPRESSED_SCHEME
165    }
166
167    fn expected_compression_ratio(
168        &self,
169        _stats: &IntegerStats,
170        _is_sample: bool,
171        _allowed_cascading: usize,
172        _excludes: &[IntCode],
173    ) -> VortexResult<f64> {
174        // no compression
175        Ok(1.0)
176    }
177
178    fn compress(
179        &self,
180        stats: &IntegerStats,
181        _is_sample: bool,
182        _allowed_cascading: usize,
183        _excludes: &[IntCode],
184    ) -> VortexResult<ArrayRef> {
185        Ok(stats.source().to_array())
186    }
187}
188
189impl Scheme for ConstantScheme {
190    type StatsType = IntegerStats;
191    type CodeType = IntCode;
192
193    fn code(&self) -> IntCode {
194        CONSTANT_SCHEME
195    }
196
197    fn is_constant(&self) -> bool {
198        true
199    }
200
201    fn expected_compression_ratio(
202        &self,
203        stats: &IntegerStats,
204        is_sample: bool,
205        _allowed_cascading: usize,
206        _excludes: &[IntCode],
207    ) -> VortexResult<f64> {
208        // Never yield ConstantScheme for a sample, it could be a false-positive.
209        if is_sample {
210            return Ok(0.0);
211        }
212
213        // Only arrays with one distinct values can be constant compressed.
214        if stats.distinct_values_count > 1 {
215            return Ok(0.0);
216        }
217
218        Ok(stats.value_count as f64)
219    }
220
221    fn compress(
222        &self,
223        stats: &IntegerStats,
224        _is_sample: bool,
225        _allowed_cascading: usize,
226        _excludes: &[IntCode],
227    ) -> VortexResult<ArrayRef> {
228        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
229
230        match scalar_idx {
231            Some(idx) => {
232                let scalar = stats.source().scalar_at(idx);
233                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
234                if !stats.source().all_valid() {
235                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
236                } else {
237                    Ok(const_arr)
238                }
239            }
240            None => Ok(ConstantArray::new(
241                Scalar::null(stats.src.dtype().clone()),
242                stats.src.len(),
243            )
244            .into_array()),
245        }
246    }
247}
248
249impl Scheme for FORScheme {
250    type StatsType = IntegerStats;
251    type CodeType = IntCode;
252
253    fn code(&self) -> IntCode {
254        FOR_SCHEME
255    }
256
257    fn expected_compression_ratio(
258        &self,
259        stats: &IntegerStats,
260        _is_sample: bool,
261        allowed_cascading: usize,
262        _excludes: &[IntCode],
263    ) -> VortexResult<f64> {
264        // Only apply if we are not at the leaf
265        if allowed_cascading == 0 {
266            return Ok(0.0);
267        }
268
269        // All-null cannot be FOR compressed.
270        if stats.value_count == 0 {
271            return Ok(0.0);
272        }
273
274        // Only apply when the min is not already zero.
275        if stats.typed.min_is_zero() {
276            return Ok(0.0);
277        }
278
279        // Difference between max and min
280        let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
281        let bw = match stats.typed.max_minus_min().checked_ilog2() {
282            Some(l) => l + 1,
283            // If max-min == 0, it we should use a different compression scheme
284            // as we don't want to bitpack down to 0 bits.
285            None => return Ok(0.0),
286        };
287
288        // If we're not saving at least 1 byte, don't bother with FOR
289        if full_width - bw < 8 {
290            return Ok(0.0);
291        }
292
293        Ok(full_width as f64 / bw as f64)
294    }
295
296    fn compress(
297        &self,
298        stats: &IntegerStats,
299        is_sample: bool,
300        _allowed_cascading: usize,
301        excludes: &[IntCode],
302    ) -> VortexResult<ArrayRef> {
303        let for_array = FoRArray::encode(stats.src.clone())?;
304        let biased = for_array.encoded().to_primitive();
305        let biased_stats = IntegerStats::generate_opts(
306            &biased,
307            GenerateStatsOptions {
308                count_distinct_values: false,
309            },
310        );
311
312        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
313        // of bitpacking.
314        // NOTE: we could delegate in the future if we had another downstream codec that performs
315        //  as well.
316        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
317
318        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
319    }
320}
321
322impl Scheme for ZigZagScheme {
323    type StatsType = IntegerStats;
324    type CodeType = IntCode;
325
326    fn code(&self) -> IntCode {
327        ZIGZAG_SCHEME
328    }
329
330    fn expected_compression_ratio(
331        &self,
332        stats: &IntegerStats,
333        is_sample: bool,
334        allowed_cascading: usize,
335        excludes: &[IntCode],
336    ) -> VortexResult<f64> {
337        // ZigZag is only useful when we cascade it with another encoding
338        if allowed_cascading == 0 {
339            return Ok(0.0);
340        }
341
342        // Don't try and compress all-null arrays
343        if stats.value_count == 0 {
344            return Ok(0.0);
345        }
346
347        // ZigZag is only useful when there are negative values.
348        if !stats.typed.min_is_negative() {
349            return Ok(0.0);
350        }
351
352        // Run compression on a sample to see how it performs.
353        estimate_compression_ratio_with_sampling(
354            self,
355            stats,
356            is_sample,
357            allowed_cascading,
358            excludes,
359        )
360    }
361
362    fn compress(
363        &self,
364        stats: &IntegerStats,
365        is_sample: bool,
366        allowed_cascading: usize,
367        excludes: &[IntCode],
368    ) -> VortexResult<ArrayRef> {
369        // Zigzag encode the values, then recursively compress the inner values.
370        let zag = zigzag_encode(stats.src.clone())?;
371        let encoded = zag.encoded().to_primitive();
372
373        // ZigZag should be after Dict, RunEnd or Sparse.
374        // We should only do these "container" style compressors once.
375        let mut new_excludes = vec![
376            ZigZagScheme.code(),
377            DictScheme.code(),
378            RunEndScheme.code(),
379            SparseScheme.code(),
380        ];
381        new_excludes.extend_from_slice(excludes);
382
383        let compressed =
384            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
385
386        log::debug!("zigzag output: {}", compressed.display_tree());
387
388        Ok(ZigZagArray::try_new(compressed)?.into_array())
389    }
390}
391
392impl Scheme for BitPackingScheme {
393    type StatsType = IntegerStats;
394    type CodeType = IntCode;
395
396    fn code(&self) -> IntCode {
397        BITPACKING_SCHEME
398    }
399
400    fn expected_compression_ratio(
401        &self,
402        stats: &IntegerStats,
403        is_sample: bool,
404        allowed_cascading: usize,
405        excludes: &[IntCode],
406    ) -> VortexResult<f64> {
407        // BitPacking only works for non-negative values
408        if stats.typed.min_is_negative() {
409            return Ok(0.0);
410        }
411
412        // Don't compress all-null arrays
413        if stats.value_count == 0 {
414            return Ok(0.0);
415        }
416
417        estimate_compression_ratio_with_sampling(
418            self,
419            stats,
420            is_sample,
421            allowed_cascading,
422            excludes,
423        )
424    }
425
426    fn compress(
427        &self,
428        stats: &IntegerStats,
429        _is_sample: bool,
430        _allowed_cascading: usize,
431        _excludes: &[IntCode],
432    ) -> VortexResult<ArrayRef> {
433        let histogram = bit_width_histogram(stats.source())?;
434        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
435        // If best bw is determined to be the current bit-width, return the original array.
436        if bw as usize == stats.source().ptype().bit_width() {
437            return Ok(stats.source().clone().into_array());
438        }
439        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
440
441        let patches = packed.patches().map(compress_patches).transpose()?;
442        packed.replace_patches(patches);
443
444        Ok(packed.into_array())
445    }
446}
447
448impl Scheme for SparseScheme {
449    type StatsType = IntegerStats;
450    type CodeType = IntCode;
451
452    fn code(&self) -> IntCode {
453        SPARSE_SCHEME
454    }
455
456    // We can avoid asserting the encoding tree instead.
457    fn expected_compression_ratio(
458        &self,
459        stats: &IntegerStats,
460        _is_sample: bool,
461        allowed_cascading: usize,
462        _excludes: &[IntCode],
463    ) -> VortexResult<f64> {
464        // Only use `SparseScheme` if we can cascade.
465        if allowed_cascading == 0 {
466            return Ok(0.0);
467        }
468
469        if stats.value_count == 0 {
470            // All nulls should use ConstantScheme
471            return Ok(0.0);
472        }
473
474        // If the majority is null, will compress well.
475        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
476            return Ok(stats.src.len() as f64 / stats.value_count as f64);
477        }
478
479        // See if the top value accounts for >= 90% of the set values.
480        let (_, top_count) = stats.typed.top_value_and_count();
481
482        if top_count == stats.value_count {
483            // top_value is the only value, should use ConstantScheme instead
484            return Ok(0.0);
485        }
486
487        let freq = top_count as f64 / stats.value_count as f64;
488        if freq >= 0.9 {
489            // We only store the positions of the non-top values.
490            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
491        }
492
493        Ok(0.0)
494    }
495
496    fn compress(
497        &self,
498        stats: &IntegerStats,
499        is_sample: bool,
500        allowed_cascading: usize,
501        excludes: &[IntCode],
502    ) -> VortexResult<ArrayRef> {
503        assert!(allowed_cascading > 0);
504        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
505        if top_count as usize == stats.src.len() {
506            // top_value is the only value, use ConstantScheme
507            return Ok(ConstantArray::new(
508                Scalar::primitive_value(
509                    top_pvalue,
510                    top_pvalue.ptype(),
511                    stats.src.dtype().nullability(),
512                ),
513                stats.src.len(),
514            )
515            .into_array());
516        }
517
518        let sparse_encoded = SparseArray::encode(
519            stats.src.as_ref(),
520            Some(Scalar::primitive_value(
521                top_pvalue,
522                top_pvalue.ptype(),
523                stats.src.dtype().nullability(),
524            )),
525        )?;
526
527        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
528            // Compress the values
529            let mut new_excludes = vec![SparseScheme.code()];
530            new_excludes.extend_from_slice(excludes);
531
532            let compressed_values = IntCompressor::compress_no_dict(
533                &sparse.patches().values().to_primitive(),
534                is_sample,
535                allowed_cascading - 1,
536                &new_excludes,
537            )?;
538
539            let indices = sparse.patches().indices().to_primitive().narrow()?;
540
541            let compressed_indices = IntCompressor::compress_no_dict(
542                &indices,
543                is_sample,
544                allowed_cascading - 1,
545                &new_excludes,
546            )?;
547
548            SparseArray::try_new(
549                compressed_indices,
550                compressed_values,
551                sparse.len(),
552                sparse.fill_scalar().clone(),
553            )
554            .map(|a| a.into_array())
555        } else {
556            Ok(sparse_encoded)
557        }
558    }
559}
560
561impl Scheme for DictScheme {
562    type StatsType = IntegerStats;
563    type CodeType = IntCode;
564
565    fn code(&self) -> IntCode {
566        DICT_SCHEME
567    }
568
569    fn expected_compression_ratio(
570        &self,
571        stats: &IntegerStats,
572        _is_sample: bool,
573        allowed_cascading: usize,
574        _excludes: &[IntCode],
575    ) -> VortexResult<f64> {
576        // Dict should not be terminal.
577        if allowed_cascading == 0 {
578            return Ok(0.0);
579        }
580
581        if stats.value_count == 0 {
582            return Ok(0.0);
583        }
584
585        // If > 50% of the values are distinct, skip dict.
586        if stats.distinct_values_count > stats.value_count / 2 {
587            return Ok(0.0);
588        }
589
590        // Ignore nulls encoding for the estimate. We only focus on values.
591        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
592
593        // Assume codes are compressed RLE + BitPacking.
594        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
595
596        let n_runs = stats.value_count / stats.average_run_length;
597
598        // Assume that codes will either be BitPack or RLE-BitPack
599        let codes_size_bp = (codes_bw * stats.value_count) as usize;
600        let codes_size_rle_bp = (codes_bw + 32) * n_runs;
601
602        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
603
604        let before = stats.value_count as usize * stats.source().ptype().bit_width();
605
606        Ok(before as f64 / (values_size + codes_size) as f64)
607    }
608
609    fn compress(
610        &self,
611        stats: &IntegerStats,
612        is_sample: bool,
613        allowed_cascading: usize,
614        excludes: &[IntCode],
615    ) -> VortexResult<ArrayRef> {
616        assert!(allowed_cascading > 0);
617
618        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
619        //    RLE or FOR + BP. Cascading probably wastes some time here.
620
621        let dict = dictionary_encode(stats);
622
623        // Cascade the codes child
624        // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data.
625        let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME];
626        new_excludes.extend_from_slice(excludes);
627
628        let compressed_codes = IntCompressor::compress_no_dict(
629            &dict.codes().to_primitive().narrow()?,
630            is_sample,
631            allowed_cascading - 1,
632            &new_excludes,
633        )?;
634
635        // SAFETY: compressing codes does not change their values
636        unsafe {
637            Ok(
638                DictArray::new_unchecked(compressed_codes, dict.values().clone())
639                    .set_all_values_referenced(dict.has_all_values_referenced())
640                    .into_array(),
641            )
642        }
643    }
644}
645
646impl Scheme for RunEndScheme {
647    type StatsType = IntegerStats;
648    type CodeType = IntCode;
649
650    fn code(&self) -> IntCode {
651        RUN_END_SCHEME
652    }
653
654    fn expected_compression_ratio(
655        &self,
656        stats: &IntegerStats,
657        is_sample: bool,
658        allowed_cascading: usize,
659        excludes: &[IntCode],
660    ) -> VortexResult<f64> {
661        // If the run length is below the threshold, drop it.
662        if stats.average_run_length < RUN_END_THRESHOLD {
663            return Ok(0.0);
664        }
665
666        if allowed_cascading == 0 {
667            return Ok(0.0);
668        }
669
670        // Run compression on a sample, see how it performs.
671        estimate_compression_ratio_with_sampling(
672            self,
673            stats,
674            is_sample,
675            allowed_cascading,
676            excludes,
677        )
678    }
679
680    fn compress(
681        &self,
682        stats: &IntegerStats,
683        is_sample: bool,
684        allowed_cascading: usize,
685        excludes: &[IntCode],
686    ) -> VortexResult<ArrayRef> {
687        assert!(allowed_cascading > 0);
688
689        // run-end encode the ends
690        let (ends, values) = runend_encode(&stats.src);
691
692        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
693        new_excludes.extend_from_slice(excludes);
694
695        let ends_stats = IntegerStats::generate_opts(
696            &ends.to_primitive(),
697            GenerateStatsOptions {
698                count_distinct_values: false,
699            },
700        );
701        let ends_scheme = IntCompressor::choose_scheme(
702            &ends_stats,
703            is_sample,
704            allowed_cascading - 1,
705            &new_excludes,
706        )?;
707        let compressed_ends =
708            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
709
710        let compressed_values = IntCompressor::compress_no_dict(
711            &values.to_primitive(),
712            is_sample,
713            allowed_cascading - 1,
714            &new_excludes,
715        )?;
716
717        // SAFETY: compression doesn't affect invariants
718        unsafe {
719            Ok(
720                RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
721                    .into_array(),
722            )
723        }
724    }
725}
726
727impl Scheme for SequenceScheme {
728    type StatsType = IntegerStats;
729    type CodeType = IntCode;
730
731    fn code(&self) -> Self::CodeType {
732        SEQUENCE_SCHEME
733    }
734
735    fn expected_compression_ratio(
736        &self,
737        stats: &Self::StatsType,
738        _is_sample: bool,
739        _allowed_cascading: usize,
740        _excludes: &[Self::CodeType],
741    ) -> VortexResult<f64> {
742        if stats.null_count > 0 {
743            return Ok(0.0);
744        }
745        // Since two values are required to store base and multiplier the
746        // compression ratio is divided by 2.
747        Ok(sequence_encode(&stats.src)?
748            .map(|_| stats.src.len() as f64 / 2.0)
749            .unwrap_or(0.0))
750    }
751
752    fn compress(
753        &self,
754        stats: &Self::StatsType,
755        _is_sample: bool,
756        _allowed_cascading: usize,
757        _excludes: &[Self::CodeType],
758    ) -> VortexResult<ArrayRef> {
759        if stats.null_count > 0 {
760            vortex_bail!("sequence encoding does not support nulls");
761        }
762        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
763    }
764}
765
766#[cfg(test)]
767mod tests {
768    use std::iter;
769
770    use itertools::Itertools;
771    use log::LevelFilter;
772    use rand::RngCore;
773    use rand::SeedableRng;
774    use rand::rngs::StdRng;
775    use vortex_array::Array;
776    use vortex_array::IntoArray;
777    use vortex_array::ToCanonical;
778    use vortex_array::arrays::DictVTable;
779    use vortex_array::arrays::PrimitiveArray;
780    use vortex_array::assert_arrays_eq;
781    use vortex_array::validity::Validity;
782    use vortex_array::vtable::ValidityHelper;
783    use vortex_buffer::Buffer;
784    use vortex_buffer::BufferMut;
785    use vortex_buffer::buffer;
786    use vortex_buffer::buffer_mut;
787    use vortex_sequence::SequenceVTable;
788    use vortex_sparse::SparseVTable;
789    use vortex_utils::aliases::hash_set::HashSet;
790
791    use crate::Compressor;
792    use crate::CompressorStats;
793    use crate::Scheme;
794    use crate::integer::IntCompressor;
795    use crate::integer::IntegerStats;
796    use crate::integer::RLE_INTEGER_SCHEME;
797    use crate::integer::SequenceScheme;
798    use crate::integer::SparseScheme;
799
800    #[test]
801    fn test_empty() {
802        // Make sure empty array compression does not fail
803        let result = IntCompressor::compress(
804            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
805            false,
806            3,
807            &[],
808        )
809        .unwrap();
810
811        assert!(result.is_empty());
812    }
813
814    #[test]
815    fn test_dict_encodable() {
816        let mut codes = BufferMut::<i32>::with_capacity(65_535);
817        // Write some runs of length 3 of a handful of different values. Interrupted by some
818        // one-off values.
819
820        let numbers = [0, 10, 50, 100, 1000, 3000]
821            .into_iter()
822            .map(|i| 1234 * i)
823            .collect_vec();
824
825        let mut rng = StdRng::seed_from_u64(1u64);
826        while codes.len() < 64000 {
827            let run_length = rng.next_u32() % 5;
828            let value = numbers[rng.next_u32() as usize % numbers.len()];
829            for _ in 0..run_length {
830                codes.push(value);
831            }
832        }
833
834        let primitive = codes.freeze().into_array().to_primitive();
835        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
836        assert!(compressed.is::<DictVTable>());
837    }
838
839    #[test]
840    fn test_window_name() {
841        env_logger::builder()
842            .filter(None, LevelFilter::Debug)
843            .try_init()
844            .ok();
845
846        // A test that's meant to mirror the WindowName column from ClickBench.
847        let mut values = buffer_mut![-1i32; 1_000_000];
848        let mut visited = HashSet::new();
849        let mut rng = StdRng::seed_from_u64(1u64);
850        while visited.len() < 223 {
851            let random = (rng.next_u32() as usize) % 1_000_000;
852            if visited.contains(&random) {
853                continue;
854            }
855            visited.insert(random);
856            // Pick 100 random values to insert.
857            values[random] = 5 * (rng.next_u64() % 100) as i32;
858        }
859
860        let array = values.freeze().into_array().to_primitive();
861        let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
862        log::info!("WindowName compressed: {}", compressed.display_tree());
863    }
864
865    #[test]
866    fn sparse_with_nulls() {
867        let array = PrimitiveArray::new(
868            buffer![189u8, 189, 189, 0, 46],
869            Validity::from_iter(vec![true, true, true, true, false]),
870        );
871        let compressed = SparseScheme
872            .compress(&IntegerStats::generate(&array), false, 3, &[])
873            .unwrap();
874        assert!(compressed.is::<SparseVTable>());
875        let decoded = compressed.clone();
876        let expected =
877            PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
878                .into_array();
879        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
880    }
881
882    #[test]
883    fn sparse_mostly_nulls() {
884        let array = PrimitiveArray::new(
885            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
886            Validity::from_iter(vec![
887                false, false, false, false, false, false, false, false, false, false, true,
888            ]),
889        );
890        let compressed = SparseScheme
891            .compress(&IntegerStats::generate(&array), false, 3, &[])
892            .unwrap();
893        assert!(compressed.is::<SparseVTable>());
894        let decoded = compressed.clone();
895        let expected = PrimitiveArray::new(
896            buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
897            array.validity().clone(),
898        )
899        .into_array();
900        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
901    }
902
903    #[test]
904    fn nullable_sequence() {
905        let values = (0i32..20).step_by(7).collect_vec();
906        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
907        let compressed = SequenceScheme
908            .compress(&IntegerStats::generate(&array), false, 3, &[])
909            .unwrap();
910        assert!(compressed.is::<SequenceVTable>());
911        let decoded = compressed;
912        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
913        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
914    }
915
916    #[test]
917    fn test_rle_compression() {
918        let mut values = Vec::new();
919        values.extend(iter::repeat_n(42i32, 100));
920        values.extend(iter::repeat_n(123i32, 200));
921        values.extend(iter::repeat_n(987i32, 150));
922
923        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
924        let compressed = RLE_INTEGER_SCHEME
925            .compress(&IntegerStats::generate(&array), false, 3, &[])
926            .unwrap();
927
928        let decoded = compressed;
929        let expected = Buffer::copy_from(&values).into_array();
930        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
931    }
932}