vortex_btrblocks/
integer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod dictionary;
5mod stats;
6
7use std::fmt::Debug;
8use std::hash::Hash;
9
10pub use stats::IntegerStats;
11use vortex_array::arrays::{
12    ConstantArray, DictArray, MaskedArray, PrimitiveArray, PrimitiveVTable,
13};
14use vortex_array::vtable::ValidityHelper;
15use vortex_array::{ArrayRef, IntoArray, ToCanonical};
16use vortex_error::{VortexResult, VortexUnwrap, vortex_bail, vortex_err};
17use vortex_fastlanes::FoRArray;
18use vortex_fastlanes::bitpack_compress::{
19    bit_width_histogram, bitpack_encode, find_best_bit_width,
20};
21use vortex_runend::RunEndArray;
22use vortex_runend::compress::runend_encode;
23use vortex_scalar::Scalar;
24use vortex_sequence::sequence_encode;
25use vortex_sparse::{SparseArray, SparseVTable};
26use vortex_zigzag::{ZigZagArray, zigzag_encode};
27
28use crate::integer::dictionary::dictionary_encode;
29use crate::patches::compress_patches;
30use crate::rle::RLEScheme;
31use crate::{
32    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
33    estimate_compression_ratio_with_sampling,
34};
35
36/// [`Compressor`] for signed and unsigned integers.
37pub struct IntCompressor;
38
39impl Compressor for IntCompressor {
40    type ArrayVTable = PrimitiveVTable;
41    type SchemeType = dyn IntegerScheme;
42    type StatsType = IntegerStats;
43
44    fn schemes() -> &'static [&'static dyn IntegerScheme] {
45        &[
46            &ConstantScheme,
47            &FORScheme,
48            &ZigZagScheme,
49            &BitPackingScheme,
50            &SparseScheme,
51            &DictScheme,
52            &RunEndScheme,
53            &SequenceScheme,
54            &RLE_INTEGER_SCHEME,
55        ]
56    }
57
58    fn default_scheme() -> &'static Self::SchemeType {
59        &UncompressedScheme
60    }
61
62    fn dict_scheme_code() -> IntCode {
63        DICT_SCHEME
64    }
65}
66
67impl IntCompressor {
68    pub(crate) fn compress_no_dict(
69        array: &PrimitiveArray,
70        is_sample: bool,
71        allowed_cascading: usize,
72        excludes: &[IntCode],
73    ) -> VortexResult<ArrayRef> {
74        let stats = IntegerStats::generate_opts(
75            array,
76            GenerateStatsOptions {
77                count_distinct_values: false,
78            },
79        );
80
81        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
82        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
83
84        if output.nbytes() < array.nbytes() {
85            Ok(output)
86        } else {
87            log::debug!("resulting tree too large: {}", output.display_tree());
88            Ok(array.to_array())
89        }
90    }
91}
92
93pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
94
95// Auto-impl
96impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
97
98#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
99pub struct IntCode(u8);
100
101const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
102const CONSTANT_SCHEME: IntCode = IntCode(1);
103const FOR_SCHEME: IntCode = IntCode(2);
104const ZIGZAG_SCHEME: IntCode = IntCode(3);
105const BITPACKING_SCHEME: IntCode = IntCode(4);
106const SPARSE_SCHEME: IntCode = IntCode(5);
107const DICT_SCHEME: IntCode = IntCode(6);
108const RUN_END_SCHEME: IntCode = IntCode(7);
109const SEQUENCE_SCHEME: IntCode = IntCode(8);
110const RUN_LENGTH_SCHEME: IntCode = IntCode(9);
111
112#[derive(Debug, Copy, Clone)]
113pub struct UncompressedScheme;
114
115#[derive(Debug, Copy, Clone)]
116pub struct ConstantScheme;
117
118#[derive(Debug, Copy, Clone)]
119pub struct FORScheme;
120
121#[derive(Debug, Copy, Clone)]
122pub struct ZigZagScheme;
123
124#[derive(Debug, Copy, Clone)]
125pub struct BitPackingScheme;
126
127#[derive(Debug, Copy, Clone)]
128pub struct SparseScheme;
129
130#[derive(Debug, Copy, Clone)]
131pub struct DictScheme;
132
133#[derive(Debug, Copy, Clone)]
134pub struct RunEndScheme;
135
136#[derive(Debug, Copy, Clone)]
137pub struct SequenceScheme;
138
139/// Threshold for the average run length in an array before we consider run-end encoding.
140const RUN_END_THRESHOLD: u32 = 4;
141
142pub const RLE_INTEGER_SCHEME: RLEScheme<IntegerStats, IntCode> = RLEScheme::new(
143    RUN_LENGTH_SCHEME,
144    |values, is_sample, allowed_cascading, excludes| {
145        IntCompressor::compress_no_dict(values, is_sample, allowed_cascading, excludes)
146    },
147);
148
149impl Scheme for UncompressedScheme {
150    type StatsType = IntegerStats;
151    type CodeType = IntCode;
152
153    fn code(&self) -> IntCode {
154        UNCOMPRESSED_SCHEME
155    }
156
157    fn expected_compression_ratio(
158        &self,
159        _stats: &IntegerStats,
160        _is_sample: bool,
161        _allowed_cascading: usize,
162        _excludes: &[IntCode],
163    ) -> VortexResult<f64> {
164        // no compression
165        Ok(1.0)
166    }
167
168    fn compress(
169        &self,
170        stats: &IntegerStats,
171        _is_sample: bool,
172        _allowed_cascading: usize,
173        _excludes: &[IntCode],
174    ) -> VortexResult<ArrayRef> {
175        Ok(stats.source().to_array())
176    }
177}
178
179impl Scheme for ConstantScheme {
180    type StatsType = IntegerStats;
181    type CodeType = IntCode;
182
183    fn code(&self) -> IntCode {
184        CONSTANT_SCHEME
185    }
186
187    fn is_constant(&self) -> bool {
188        true
189    }
190
191    fn expected_compression_ratio(
192        &self,
193        stats: &IntegerStats,
194        is_sample: bool,
195        _allowed_cascading: usize,
196        _excludes: &[IntCode],
197    ) -> VortexResult<f64> {
198        // Never yield ConstantScheme for a sample, it could be a false-positive.
199        if is_sample {
200            return Ok(0.0);
201        }
202
203        // Only arrays with one distinct values can be constant compressed.
204        if stats.distinct_values_count > 1 {
205            return Ok(0.0);
206        }
207
208        Ok(stats.value_count as f64)
209    }
210
211    fn compress(
212        &self,
213        stats: &IntegerStats,
214        _is_sample: bool,
215        _allowed_cascading: usize,
216        _excludes: &[IntCode],
217    ) -> VortexResult<ArrayRef> {
218        let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx));
219
220        match scalar_idx {
221            Some(idx) => {
222                let scalar = stats.source().scalar_at(idx);
223                let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
224                if !stats.source().all_valid() {
225                    Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
226                } else {
227                    Ok(const_arr)
228                }
229            }
230            None => Ok(ConstantArray::new(
231                Scalar::null(stats.src.dtype().clone()),
232                stats.src.len(),
233            )
234            .into_array()),
235        }
236    }
237}
238
239impl Scheme for FORScheme {
240    type StatsType = IntegerStats;
241    type CodeType = IntCode;
242
243    fn code(&self) -> IntCode {
244        FOR_SCHEME
245    }
246
247    fn expected_compression_ratio(
248        &self,
249        stats: &IntegerStats,
250        _is_sample: bool,
251        allowed_cascading: usize,
252        _excludes: &[IntCode],
253    ) -> VortexResult<f64> {
254        // Only apply if we are not at the leaf
255        if allowed_cascading == 0 {
256            return Ok(0.0);
257        }
258
259        // All-null cannot be FOR compressed.
260        if stats.value_count == 0 {
261            return Ok(0.0);
262        }
263
264        // Only apply when the min is not already zero.
265        if stats.typed.min_is_zero() {
266            return Ok(0.0);
267        }
268
269        // Difference between max and min
270        let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
271        let bw = match stats.typed.max_minus_min().checked_ilog2() {
272            Some(l) => l + 1,
273            // If max-min == 0, it we should use a different compression scheme
274            // as we don't want to bitpack down to 0 bits.
275            None => return Ok(0.0),
276        };
277
278        // If we're not saving at least 1 byte, don't bother with FOR
279        if full_width - bw < 8 {
280            return Ok(0.0);
281        }
282
283        Ok(full_width as f64 / bw as f64)
284    }
285
286    fn compress(
287        &self,
288        stats: &IntegerStats,
289        is_sample: bool,
290        _allowed_cascading: usize,
291        excludes: &[IntCode],
292    ) -> VortexResult<ArrayRef> {
293        let for_array = FoRArray::encode(stats.src.clone())?;
294        let biased = for_array.encoded().to_primitive();
295        let biased_stats = IntegerStats::generate_opts(
296            &biased,
297            GenerateStatsOptions {
298                count_distinct_values: false,
299            },
300        );
301
302        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
303        // of bitpacking.
304        // NOTE: we could delegate in the future if we had another downstream codec that performs
305        //  as well.
306        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
307
308        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
309    }
310}
311
312impl Scheme for ZigZagScheme {
313    type StatsType = IntegerStats;
314    type CodeType = IntCode;
315
316    fn code(&self) -> IntCode {
317        ZIGZAG_SCHEME
318    }
319
320    fn expected_compression_ratio(
321        &self,
322        stats: &IntegerStats,
323        is_sample: bool,
324        allowed_cascading: usize,
325        excludes: &[IntCode],
326    ) -> VortexResult<f64> {
327        // ZigZag is only useful when we cascade it with another encoding
328        if allowed_cascading == 0 {
329            return Ok(0.0);
330        }
331
332        // Don't try and compress all-null arrays
333        if stats.value_count == 0 {
334            return Ok(0.0);
335        }
336
337        // ZigZag is only useful when there are negative values.
338        if !stats.typed.min_is_negative() {
339            return Ok(0.0);
340        }
341
342        // Run compression on a sample to see how it performs.
343        estimate_compression_ratio_with_sampling(
344            self,
345            stats,
346            is_sample,
347            allowed_cascading,
348            excludes,
349        )
350    }
351
352    fn compress(
353        &self,
354        stats: &IntegerStats,
355        is_sample: bool,
356        allowed_cascading: usize,
357        excludes: &[IntCode],
358    ) -> VortexResult<ArrayRef> {
359        // Zigzag encode the values, then recursively compress the inner values.
360        let zag = zigzag_encode(stats.src.clone())?;
361        let encoded = zag.encoded().to_primitive();
362
363        // ZigZag should be after Dict, RunEnd or Sparse.
364        // We should only do these "container" style compressors once.
365        let mut new_excludes = vec![
366            ZigZagScheme.code(),
367            DictScheme.code(),
368            RunEndScheme.code(),
369            SparseScheme.code(),
370        ];
371        new_excludes.extend_from_slice(excludes);
372
373        let compressed =
374            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
375
376        log::debug!("zigzag output: {}", compressed.display_tree());
377
378        Ok(ZigZagArray::try_new(compressed)?.into_array())
379    }
380}
381
382impl Scheme for BitPackingScheme {
383    type StatsType = IntegerStats;
384    type CodeType = IntCode;
385
386    fn code(&self) -> IntCode {
387        BITPACKING_SCHEME
388    }
389
390    #[allow(clippy::cast_possible_truncation)]
391    fn expected_compression_ratio(
392        &self,
393        stats: &IntegerStats,
394        is_sample: bool,
395        allowed_cascading: usize,
396        excludes: &[IntCode],
397    ) -> VortexResult<f64> {
398        // BitPacking only works for non-negative values
399        if stats.typed.min_is_negative() {
400            return Ok(0.0);
401        }
402
403        // Don't compress all-null arrays
404        if stats.value_count == 0 {
405            return Ok(0.0);
406        }
407
408        estimate_compression_ratio_with_sampling(
409            self,
410            stats,
411            is_sample,
412            allowed_cascading,
413            excludes,
414        )
415    }
416
417    #[allow(clippy::cast_possible_truncation)]
418    fn compress(
419        &self,
420        stats: &IntegerStats,
421        _is_sample: bool,
422        _allowed_cascading: usize,
423        _excludes: &[IntCode],
424    ) -> VortexResult<ArrayRef> {
425        let histogram = bit_width_histogram(stats.source())?;
426        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
427        // If best bw is determined to be the current bit-width, return the original array.
428        if bw as usize == stats.source().ptype().bit_width() {
429            return Ok(stats.source().clone().into_array());
430        }
431        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
432
433        let patches = packed.patches().map(compress_patches).transpose()?;
434        packed.replace_patches(patches);
435
436        Ok(packed.into_array())
437    }
438}
439
440impl Scheme for SparseScheme {
441    type StatsType = IntegerStats;
442    type CodeType = IntCode;
443
444    fn code(&self) -> IntCode {
445        SPARSE_SCHEME
446    }
447
448    // We can avoid asserting the encoding tree instead.
449    fn expected_compression_ratio(
450        &self,
451        stats: &IntegerStats,
452        _is_sample: bool,
453        allowed_cascading: usize,
454        _excludes: &[IntCode],
455    ) -> VortexResult<f64> {
456        // Only use `SparseScheme` if we can cascade.
457        if allowed_cascading == 0 {
458            return Ok(0.0);
459        }
460
461        if stats.value_count == 0 {
462            // All nulls should use ConstantScheme
463            return Ok(0.0);
464        }
465
466        // If the majority is null, will compress well.
467        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
468            return Ok(stats.src.len() as f64 / stats.value_count as f64);
469        }
470
471        // See if the top value accounts for >= 90% of the set values.
472        let (_, top_count) = stats.typed.top_value_and_count();
473
474        if top_count == stats.value_count {
475            // top_value is the only value, should use ConstantScheme instead
476            return Ok(0.0);
477        }
478
479        let freq = top_count as f64 / stats.value_count as f64;
480        if freq >= 0.9 {
481            // We only store the positions of the non-top values.
482            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
483        }
484
485        Ok(0.0)
486    }
487
488    fn compress(
489        &self,
490        stats: &IntegerStats,
491        is_sample: bool,
492        allowed_cascading: usize,
493        excludes: &[IntCode],
494    ) -> VortexResult<ArrayRef> {
495        assert!(allowed_cascading > 0);
496        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
497        if top_count as usize == stats.src.len() {
498            // top_value is the only value, use ConstantScheme
499            return Ok(ConstantArray::new(
500                Scalar::primitive_value(
501                    top_pvalue,
502                    top_pvalue.ptype(),
503                    stats.src.dtype().nullability(),
504                ),
505                stats.src.len(),
506            )
507            .into_array());
508        }
509
510        let sparse_encoded = SparseArray::encode(
511            stats.src.as_ref(),
512            Some(Scalar::primitive_value(
513                top_pvalue,
514                top_pvalue.ptype(),
515                stats.src.dtype().nullability(),
516            )),
517        )?;
518
519        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
520            // Compress the values
521            let mut new_excludes = vec![SparseScheme.code()];
522            new_excludes.extend_from_slice(excludes);
523
524            let compressed_values = IntCompressor::compress_no_dict(
525                &sparse.patches().values().to_primitive(),
526                is_sample,
527                allowed_cascading - 1,
528                &new_excludes,
529            )?;
530
531            let indices = sparse.patches().indices().to_primitive().narrow()?;
532
533            let compressed_indices = IntCompressor::compress_no_dict(
534                &indices,
535                is_sample,
536                allowed_cascading - 1,
537                &new_excludes,
538            )?;
539
540            SparseArray::try_new(
541                compressed_indices,
542                compressed_values,
543                sparse.len(),
544                sparse.fill_scalar().clone(),
545            )
546            .map(|a| a.into_array())
547        } else {
548            Ok(sparse_encoded)
549        }
550    }
551}
552
553impl Scheme for DictScheme {
554    type StatsType = IntegerStats;
555    type CodeType = IntCode;
556
557    fn code(&self) -> IntCode {
558        DICT_SCHEME
559    }
560
561    fn expected_compression_ratio(
562        &self,
563        stats: &IntegerStats,
564        _is_sample: bool,
565        allowed_cascading: usize,
566        _excludes: &[IntCode],
567    ) -> VortexResult<f64> {
568        // Dict should not be terminal.
569        if allowed_cascading == 0 {
570            return Ok(0.0);
571        }
572
573        if stats.value_count == 0 {
574            return Ok(0.0);
575        }
576
577        // If > 50% of the values are distinct, skip dict.
578        if stats.distinct_values_count > stats.value_count / 2 {
579            return Ok(0.0);
580        }
581
582        // Ignore nulls encoding for the estimate. We only focus on values.
583        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
584
585        // Assume codes are compressed RLE + BitPacking.
586        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
587
588        let n_runs = stats.value_count / stats.average_run_length;
589
590        // Assume that codes will either be BitPack or RLE-BitPack
591        let codes_size_bp = (codes_bw * stats.value_count) as usize;
592        let codes_size_rle_bp = (codes_bw + 32) * n_runs;
593
594        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
595
596        let before = stats.value_count as usize * stats.source().ptype().bit_width();
597
598        Ok(before as f64 / (values_size + codes_size) as f64)
599    }
600
601    fn compress(
602        &self,
603        stats: &IntegerStats,
604        is_sample: bool,
605        allowed_cascading: usize,
606        excludes: &[IntCode],
607    ) -> VortexResult<ArrayRef> {
608        assert!(allowed_cascading > 0);
609
610        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
611        //    RLE or FOR + BP. Cascading probably wastes some time here.
612
613        let dict = dictionary_encode(stats);
614
615        // Cascade the codes child
616        // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data.
617        let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME];
618        new_excludes.extend_from_slice(excludes);
619
620        let compressed_codes = IntCompressor::compress_no_dict(
621            &dict.codes().to_primitive().narrow()?,
622            is_sample,
623            allowed_cascading - 1,
624            &new_excludes,
625        )?;
626
627        // SAFETY: compressing codes does not change their values
628        unsafe {
629            Ok(DictArray::new_unchecked(compressed_codes, dict.values().clone()).into_array())
630        }
631    }
632}
633
634impl Scheme for RunEndScheme {
635    type StatsType = IntegerStats;
636    type CodeType = IntCode;
637
638    fn code(&self) -> IntCode {
639        RUN_END_SCHEME
640    }
641
642    fn expected_compression_ratio(
643        &self,
644        stats: &IntegerStats,
645        is_sample: bool,
646        allowed_cascading: usize,
647        excludes: &[IntCode],
648    ) -> VortexResult<f64> {
649        // If the run length is below the threshold, drop it.
650        if stats.average_run_length < RUN_END_THRESHOLD {
651            return Ok(0.0);
652        }
653
654        if allowed_cascading == 0 {
655            return Ok(0.0);
656        }
657
658        // Run compression on a sample, see how it performs.
659        estimate_compression_ratio_with_sampling(
660            self,
661            stats,
662            is_sample,
663            allowed_cascading,
664            excludes,
665        )
666    }
667
668    fn compress(
669        &self,
670        stats: &IntegerStats,
671        is_sample: bool,
672        allowed_cascading: usize,
673        excludes: &[IntCode],
674    ) -> VortexResult<ArrayRef> {
675        assert!(allowed_cascading > 0);
676
677        // run-end encode the ends
678        let (ends, values) = runend_encode(&stats.src);
679
680        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
681        new_excludes.extend_from_slice(excludes);
682
683        let ends_stats = IntegerStats::generate_opts(
684            &ends.to_primitive(),
685            GenerateStatsOptions {
686                count_distinct_values: false,
687            },
688        );
689        let ends_scheme = IntCompressor::choose_scheme(
690            &ends_stats,
691            is_sample,
692            allowed_cascading - 1,
693            &new_excludes,
694        )?;
695        let compressed_ends =
696            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
697
698        let compressed_values = IntCompressor::compress_no_dict(
699            &values.to_primitive(),
700            is_sample,
701            allowed_cascading - 1,
702            &new_excludes,
703        )?;
704
705        // SAFETY: compression doesn't affect invariants
706        unsafe {
707            Ok(
708                RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
709                    .into_array(),
710            )
711        }
712    }
713}
714
715impl Scheme for SequenceScheme {
716    type StatsType = IntegerStats;
717    type CodeType = IntCode;
718
719    fn code(&self) -> Self::CodeType {
720        SEQUENCE_SCHEME
721    }
722
723    fn expected_compression_ratio(
724        &self,
725        stats: &Self::StatsType,
726        _is_sample: bool,
727        _allowed_cascading: usize,
728        _excludes: &[Self::CodeType],
729    ) -> VortexResult<f64> {
730        if stats.null_count > 0 {
731            return Ok(0.0);
732        }
733        // Since two values are required to store base and multiplier the
734        // compression ratio is divided by 2.
735        Ok(sequence_encode(&stats.src)?
736            .map(|_| stats.src.len() as f64 / 2.0)
737            .unwrap_or(0.0))
738    }
739
740    fn compress(
741        &self,
742        stats: &Self::StatsType,
743        _is_sample: bool,
744        _allowed_cascading: usize,
745        _excludes: &[Self::CodeType],
746    ) -> VortexResult<ArrayRef> {
747        if stats.null_count > 0 {
748            vortex_bail!("sequence encoding does not support nulls");
749        }
750        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use std::iter;
757
758    use itertools::Itertools;
759    use log::LevelFilter;
760    use rand::rngs::StdRng;
761    use rand::{RngCore, SeedableRng};
762    use vortex_array::arrays::{DictEncoding, PrimitiveArray};
763    use vortex_array::validity::Validity;
764    use vortex_array::vtable::ValidityHelper;
765    use vortex_array::{Array, IntoArray, ToCanonical, assert_arrays_eq};
766    use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
767    use vortex_sequence::SequenceEncoding;
768    use vortex_sparse::SparseEncoding;
769    use vortex_utils::aliases::hash_set::HashSet;
770
771    use crate::integer::{
772        IntCompressor, IntegerStats, RLE_INTEGER_SCHEME, SequenceScheme, SparseScheme,
773    };
774    use crate::{Compressor, CompressorStats, Scheme};
775
776    #[test]
777    fn test_empty() {
778        // Make sure empty array compression does not fail
779        let result = IntCompressor::compress(
780            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
781            false,
782            3,
783            &[],
784        )
785        .unwrap();
786
787        assert!(result.is_empty());
788    }
789
790    #[test]
791    fn test_dict_encodable() {
792        let mut codes = BufferMut::<i32>::with_capacity(65_535);
793        // Write some runs of length 3 of a handful of different values. Interrupted by some
794        // one-off values.
795
796        let numbers = [0, 10, 50, 100, 1000, 3000]
797            .into_iter()
798            .map(|i| 1234 * i)
799            .collect_vec();
800
801        let mut rng = StdRng::seed_from_u64(1u64);
802        while codes.len() < 64000 {
803            let run_length = rng.next_u32() % 5;
804            let value = numbers[rng.next_u32() as usize % numbers.len()];
805            for _ in 0..run_length {
806                codes.push(value);
807            }
808        }
809
810        let primitive = codes.freeze().into_array().to_primitive();
811        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
812        assert_eq!(compressed.encoding_id(), DictEncoding.id());
813    }
814
815    #[test]
816    fn test_window_name() {
817        env_logger::builder()
818            .filter(None, LevelFilter::Debug)
819            .try_init()
820            .ok();
821
822        // A test that's meant to mirror the WindowName column from ClickBench.
823        let mut values = buffer_mut![-1i32; 1_000_000];
824        let mut visited = HashSet::new();
825        let mut rng = StdRng::seed_from_u64(1u64);
826        while visited.len() < 223 {
827            let random = (rng.next_u32() as usize) % 1_000_000;
828            if visited.contains(&random) {
829                continue;
830            }
831            visited.insert(random);
832            // Pick 100 random values to insert.
833            values[random] = 5 * (rng.next_u64() % 100) as i32;
834        }
835
836        let array = values.freeze().into_array().to_primitive();
837        let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
838        log::info!("WindowName compressed: {}", compressed.display_tree());
839    }
840
841    #[test]
842    fn sparse_with_nulls() {
843        let array = PrimitiveArray::new(
844            buffer![189u8, 189, 189, 0, 46],
845            Validity::from_iter(vec![true, true, true, true, false]),
846        );
847        let compressed = SparseScheme
848            .compress(&IntegerStats::generate(&array), false, 3, &[])
849            .unwrap();
850        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
851        let decoded = compressed.clone();
852        let expected =
853            PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
854                .into_array();
855        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
856    }
857
858    #[test]
859    fn sparse_mostly_nulls() {
860        let array = PrimitiveArray::new(
861            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
862            Validity::from_iter(vec![
863                false, false, false, false, false, false, false, false, false, false, true,
864            ]),
865        );
866        let compressed = SparseScheme
867            .compress(&IntegerStats::generate(&array), false, 3, &[])
868            .unwrap();
869        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
870        let decoded = compressed.clone();
871        let expected = PrimitiveArray::new(
872            buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
873            array.validity().clone(),
874        )
875        .into_array();
876        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
877    }
878
879    #[test]
880    fn nullable_sequence() {
881        let values = (0i32..20).step_by(7).collect_vec();
882        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
883        let compressed = SequenceScheme
884            .compress(&IntegerStats::generate(&array), false, 3, &[])
885            .unwrap();
886        assert_eq!(compressed.encoding_id(), SequenceEncoding.id());
887        let decoded = compressed;
888        let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
889        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
890    }
891
892    #[test]
893    fn test_rle_compression() {
894        let mut values = Vec::new();
895        values.extend(iter::repeat_n(42i32, 100));
896        values.extend(iter::repeat_n(123i32, 200));
897        values.extend(iter::repeat_n(987i32, 150));
898
899        let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
900        let compressed = RLE_INTEGER_SCHEME
901            .compress(&IntegerStats::generate(&array), false, 3, &[])
902            .unwrap();
903
904        let decoded = compressed;
905        let expected = Buffer::copy_from(&values).into_array();
906        assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
907    }
908}