1pub(crate) mod dictionary;
5pub(super) mod stats;
6
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use enum_iterator::Sequence;
11pub use stats::IntegerStats;
12use vortex_array::ArrayRef;
13use vortex_array::Canonical;
14use vortex_array::IntoArray;
15use vortex_array::ToCanonical;
16use vortex_array::arrays::ConstantArray;
17use vortex_array::arrays::DictArray;
18use vortex_array::arrays::MaskedArray;
19use vortex_array::arrays::PrimitiveArray;
20use vortex_array::arrays::PrimitiveVTable;
21use vortex_array::scalar::Scalar;
22use vortex_array::vtable::VTable;
23use vortex_array::vtable::ValidityHelper;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_fastlanes::FoRArray;
29use vortex_fastlanes::bitpack_compress::bit_width_histogram;
30use vortex_fastlanes::bitpack_compress::bitpack_encode;
31use vortex_fastlanes::bitpack_compress::find_best_bit_width;
32use vortex_runend::RunEndArray;
33use vortex_runend::compress::runend_encode;
34use vortex_sequence::sequence_encode;
35use vortex_sparse::SparseArray;
36use vortex_sparse::SparseVTable;
37use vortex_zigzag::ZigZagArray;
38use vortex_zigzag::zigzag_encode;
39
40use self::dictionary::dictionary_encode;
41use crate::BtrBlocksCompressor;
42use crate::CanonicalCompressor;
43use crate::Compressor;
44use crate::CompressorContext;
45use crate::CompressorStats;
46use crate::Excludes;
47use crate::GenerateStatsOptions;
48use crate::Scheme;
49use crate::SchemeExt;
50use crate::compressor::patches::compress_patches;
51use crate::compressor::rle;
52use crate::compressor::rle::RLEScheme;
53
54pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[
56 &ConstantScheme,
57 &FORScheme,
58 &ZigZagScheme,
59 &BitPackingScheme,
60 &SparseScheme,
61 &DictScheme,
62 &RunEndScheme,
63 &SequenceScheme,
64 &RLE_INTEGER_SCHEME,
65 #[cfg(feature = "pco")]
66 &PcoScheme,
67];
68
69#[derive(Clone, Copy)]
71pub struct IntCompressor<'a> {
72 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
74}
75
76impl<'a> Compressor for IntCompressor<'a> {
77 type ArrayVTable = PrimitiveVTable;
78 type SchemeType = dyn IntegerScheme;
79 type StatsType = IntegerStats;
80
81 fn schemes(&self) -> &[&'static dyn IntegerScheme] {
82 self.btr_blocks_compressor.int_schemes()
83 }
84
85 fn default_scheme(&self) -> &'static Self::SchemeType {
86 &UncompressedScheme
87 }
88
89 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
90 if self
91 .btr_blocks_compressor
92 .int_schemes()
93 .iter()
94 .any(|s| s.code() == IntCode::Dict)
95 {
96 IntegerStats::generate_opts(
97 array,
98 GenerateStatsOptions {
99 count_distinct_values: true,
100 },
101 )
102 } else {
103 IntegerStats::generate_opts(
104 array,
105 GenerateStatsOptions {
106 count_distinct_values: false,
107 },
108 )
109 }
110 }
111}
112
113pub trait IntegerScheme:
114 Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
115{
116}
117
118impl<T> IntegerScheme for T where
120 T: Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
121{
122}
123
124impl PartialEq for dyn IntegerScheme {
125 fn eq(&self, other: &Self) -> bool {
126 self.code() == other.code()
127 }
128}
129
130impl Eq for dyn IntegerScheme {}
131
132impl Hash for dyn IntegerScheme {
133 fn hash<H: Hasher>(&self, state: &mut H) {
134 self.code().hash(state)
135 }
136}
137
138#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
140pub enum IntCode {
141 Uncompressed,
143 Constant,
145 For,
147 ZigZag,
149 BitPacking,
151 Sparse,
153 Dict,
155 RunEnd,
157 Sequence,
159 Rle,
161 Pco,
163}
164
165#[derive(Debug, Copy, Clone, PartialEq, Eq)]
166
167pub struct UncompressedScheme;
168
169#[derive(Debug, Copy, Clone, PartialEq, Eq)]
170
171pub struct ConstantScheme;
172
173#[derive(Debug, Copy, Clone, PartialEq, Eq)]
174
175pub struct FORScheme;
176
177#[derive(Debug, Copy, Clone, PartialEq, Eq)]
178pub struct ZigZagScheme;
179
180#[derive(Debug, Copy, Clone, PartialEq, Eq)]
181pub struct BitPackingScheme;
182
183#[derive(Debug, Copy, Clone, PartialEq, Eq)]
184pub struct SparseScheme;
185
186#[derive(Debug, Copy, Clone, PartialEq, Eq)]
187pub struct DictScheme;
188
189#[derive(Debug, Copy, Clone, PartialEq, Eq)]
190pub struct RunEndScheme;
191
192#[derive(Debug, Copy, Clone, PartialEq, Eq)]
193pub struct SequenceScheme;
194
195#[cfg(feature = "pco")]
197#[derive(Debug, Copy, Clone, PartialEq, Eq)]
198pub struct PcoScheme;
199
200const RUN_END_THRESHOLD: u32 = 4;
202
203#[derive(Debug, Copy, Clone, PartialEq, Eq)]
205pub struct IntRLEConfig;
206
207impl rle::RLEConfig for IntRLEConfig {
208 type Stats = IntegerStats;
209 type Code = IntCode;
210
211 const CODE: IntCode = IntCode::Rle;
212
213 fn compress_values(
214 compressor: &BtrBlocksCompressor,
215 values: &PrimitiveArray,
216 ctx: CompressorContext,
217 excludes: &[IntCode],
218 ) -> VortexResult<ArrayRef> {
219 compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into())
220 }
221}
222
223pub const RLE_INTEGER_SCHEME: RLEScheme<IntRLEConfig> = RLEScheme::new();
225
226impl Scheme for UncompressedScheme {
227 type StatsType = IntegerStats;
228 type CodeType = IntCode;
229
230 fn code(&self) -> IntCode {
231 IntCode::Uncompressed
232 }
233
234 fn expected_compression_ratio(
235 &self,
236 _compressor: &BtrBlocksCompressor,
237 _stats: &IntegerStats,
238 _ctx: CompressorContext,
239 _excludes: &[IntCode],
240 ) -> VortexResult<f64> {
241 Ok(1.0)
243 }
244
245 fn compress(
246 &self,
247 _compressor: &BtrBlocksCompressor,
248 stats: &IntegerStats,
249 _ctx: CompressorContext,
250 _excludes: &[IntCode],
251 ) -> VortexResult<ArrayRef> {
252 Ok(stats.source().to_array())
253 }
254}
255
256impl Scheme for ConstantScheme {
257 type StatsType = IntegerStats;
258 type CodeType = IntCode;
259
260 fn code(&self) -> IntCode {
261 IntCode::Constant
262 }
263
264 fn is_constant(&self) -> bool {
265 true
266 }
267
268 fn expected_compression_ratio(
269 &self,
270 _compressor: &BtrBlocksCompressor,
271 stats: &IntegerStats,
272 ctx: CompressorContext,
273 _excludes: &[IntCode],
274 ) -> VortexResult<f64> {
275 if ctx.is_sample {
277 return Ok(0.0);
278 }
279
280 if stats.distinct_values_count > 1 {
282 return Ok(0.0);
283 }
284
285 Ok(stats.value_count as f64)
286 }
287
288 fn compress(
289 &self,
290 _compressor: &BtrBlocksCompressor,
291 stats: &IntegerStats,
292 _ctx: CompressorContext,
293 _excludes: &[IntCode],
294 ) -> VortexResult<ArrayRef> {
295 let scalar_idx =
296 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
297
298 match scalar_idx {
299 Some(idx) => {
300 let scalar = stats.source().scalar_at(idx)?;
301 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
302 if !stats.source().all_valid()? {
303 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
304 } else {
305 Ok(const_arr)
306 }
307 }
308 None => Ok(ConstantArray::new(
309 Scalar::null(stats.src.dtype().clone()),
310 stats.src.len(),
311 )
312 .into_array()),
313 }
314 }
315}
316
317impl Scheme for FORScheme {
318 type StatsType = IntegerStats;
319 type CodeType = IntCode;
320
321 fn code(&self) -> IntCode {
322 IntCode::For
323 }
324
325 fn expected_compression_ratio(
326 &self,
327 _compressor: &BtrBlocksCompressor,
328 stats: &IntegerStats,
329 ctx: CompressorContext,
330 _excludes: &[IntCode],
331 ) -> VortexResult<f64> {
332 if ctx.allowed_cascading == 0 {
334 return Ok(0.0);
335 }
336
337 if stats.value_count == 0 {
339 return Ok(0.0);
340 }
341
342 if stats.typed.min_is_zero() {
344 return Ok(0.0);
345 }
346
347 let full_width: u32 = stats
349 .src
350 .ptype()
351 .bit_width()
352 .try_into()
353 .vortex_expect("bit width must fit in u32");
354 let bw = match stats.typed.max_minus_min().checked_ilog2() {
355 Some(l) => l + 1,
356 None => return Ok(0.0),
359 };
360
361 if full_width - bw < 8 {
363 return Ok(0.0);
364 }
365
366 Ok(full_width as f64 / bw as f64)
367 }
368
369 fn compress(
370 &self,
371 compressor: &BtrBlocksCompressor,
372 stats: &IntegerStats,
373 ctx: CompressorContext,
374 excludes: &[IntCode],
375 ) -> VortexResult<ArrayRef> {
376 let for_array = FoRArray::encode(stats.src.clone())?;
377 let biased = for_array.encoded().to_primitive();
378 let biased_stats = IntegerStats::generate_opts(
379 &biased,
380 GenerateStatsOptions {
381 count_distinct_values: false,
382 },
383 );
384
385 let leaf_ctx = CompressorContext {
390 is_sample: ctx.is_sample,
391 allowed_cascading: 0,
392 };
393 let compressed =
394 BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes)?;
395
396 let for_compressed = FoRArray::try_new(compressed, for_array.reference_scalar().clone())?;
397 for_compressed
398 .as_ref()
399 .statistics()
400 .inherit_from(for_array.as_ref().statistics());
401 Ok(for_compressed.into_array())
402 }
403}
404
405impl Scheme for ZigZagScheme {
406 type StatsType = IntegerStats;
407 type CodeType = IntCode;
408
409 fn code(&self) -> IntCode {
410 IntCode::ZigZag
411 }
412
413 fn expected_compression_ratio(
414 &self,
415 compressor: &BtrBlocksCompressor,
416 stats: &IntegerStats,
417 ctx: CompressorContext,
418 excludes: &[IntCode],
419 ) -> VortexResult<f64> {
420 if ctx.allowed_cascading == 0 {
422 return Ok(0.0);
423 }
424
425 if stats.value_count == 0 {
427 return Ok(0.0);
428 }
429
430 if !stats.typed.min_is_negative() {
432 return Ok(0.0);
433 }
434
435 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
437 }
438
439 fn compress(
440 &self,
441 compressor: &BtrBlocksCompressor,
442 stats: &IntegerStats,
443 ctx: CompressorContext,
444 excludes: &[IntCode],
445 ) -> VortexResult<ArrayRef> {
446 let zag = zigzag_encode(stats.src.clone())?;
448 let encoded = zag.encoded().to_primitive();
449
450 let mut new_excludes = vec![
453 ZigZagScheme.code(),
454 DictScheme.code(),
455 RunEndScheme.code(),
456 SparseScheme.code(),
457 ];
458 new_excludes.extend_from_slice(excludes);
459
460 let compressed = compressor.compress_canonical(
461 Canonical::Primitive(encoded),
462 ctx.descend(),
463 Excludes::int_only(&new_excludes),
464 )?;
465
466 tracing::debug!("zigzag output: {}", compressed.encoding_id());
467
468 Ok(ZigZagArray::try_new(compressed)?.into_array())
469 }
470}
471
472impl Scheme for BitPackingScheme {
473 type StatsType = IntegerStats;
474 type CodeType = IntCode;
475
476 fn code(&self) -> IntCode {
477 IntCode::BitPacking
478 }
479
480 fn expected_compression_ratio(
481 &self,
482 compressor: &BtrBlocksCompressor,
483 stats: &IntegerStats,
484 ctx: CompressorContext,
485 excludes: &[IntCode],
486 ) -> VortexResult<f64> {
487 if stats.typed.min_is_negative() {
489 return Ok(0.0);
490 }
491
492 if stats.value_count == 0 {
494 return Ok(0.0);
495 }
496
497 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
498 }
499
500 fn compress(
501 &self,
502 _compressor: &BtrBlocksCompressor,
503 stats: &IntegerStats,
504 _ctx: CompressorContext,
505 _excludes: &[IntCode],
506 ) -> VortexResult<ArrayRef> {
507 let histogram = bit_width_histogram(stats.source())?;
508 let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
509 if bw as usize == stats.source().ptype().bit_width() {
511 return Ok(stats.source().clone().into_array());
512 }
513 let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
514
515 let patches = packed.patches().map(compress_patches).transpose()?;
516 packed.replace_patches(patches);
517
518 Ok(packed.into_array())
519 }
520}
521
522impl Scheme for SparseScheme {
523 type StatsType = IntegerStats;
524 type CodeType = IntCode;
525
526 fn code(&self) -> IntCode {
527 IntCode::Sparse
528 }
529
530 fn expected_compression_ratio(
532 &self,
533 _compressor: &BtrBlocksCompressor,
534 stats: &IntegerStats,
535 ctx: CompressorContext,
536 _excludes: &[IntCode],
537 ) -> VortexResult<f64> {
538 if ctx.allowed_cascading == 0 {
540 return Ok(0.0);
541 }
542
543 if stats.value_count == 0 {
544 return Ok(0.0);
546 }
547
548 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
550 return Ok(stats.src.len() as f64 / stats.value_count as f64);
551 }
552
553 let (_, top_count) = stats.typed.top_value_and_count();
555
556 if top_count == stats.value_count {
557 return Ok(0.0);
559 }
560
561 let freq = top_count as f64 / stats.value_count as f64;
562 if freq >= 0.9 {
563 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
565 }
566
567 Ok(0.0)
568 }
569
570 fn compress(
571 &self,
572 compressor: &BtrBlocksCompressor,
573 stats: &IntegerStats,
574 ctx: CompressorContext,
575 excludes: &[IntCode],
576 ) -> VortexResult<ArrayRef> {
577 assert!(ctx.allowed_cascading > 0);
578 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
579 if top_count as usize == stats.src.len() {
580 return Ok(ConstantArray::new(
582 Scalar::primitive_value(
583 top_pvalue,
584 top_pvalue.ptype(),
585 stats.src.dtype().nullability(),
586 ),
587 stats.src.len(),
588 )
589 .into_array());
590 }
591
592 let sparse_encoded = SparseArray::encode(
593 stats.src.as_ref(),
594 Some(Scalar::primitive_value(
595 top_pvalue,
596 top_pvalue.ptype(),
597 stats.src.dtype().nullability(),
598 )),
599 )?;
600
601 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
602 let mut new_excludes = vec![SparseScheme.code(), IntCode::Dict];
604 new_excludes.extend_from_slice(excludes);
605
606 let compressed_values = compressor.compress_canonical(
607 Canonical::Primitive(sparse.patches().values().to_primitive()),
608 ctx.descend(),
609 Excludes::int_only(&new_excludes),
610 )?;
611
612 let indices = sparse.patches().indices().to_primitive().narrow()?;
613
614 let compressed_indices = compressor.compress_canonical(
615 Canonical::Primitive(indices),
616 ctx.descend(),
617 Excludes::int_only(&new_excludes),
618 )?;
619
620 SparseArray::try_new(
621 compressed_indices,
622 compressed_values,
623 sparse.len(),
624 sparse.fill_scalar().clone(),
625 )
626 .map(|a| a.into_array())
627 } else {
628 Ok(sparse_encoded)
629 }
630 }
631}
632
633impl Scheme for DictScheme {
634 type StatsType = IntegerStats;
635 type CodeType = IntCode;
636
637 fn code(&self) -> IntCode {
638 IntCode::Dict
639 }
640
641 fn expected_compression_ratio(
642 &self,
643 _compressor: &BtrBlocksCompressor,
644 stats: &IntegerStats,
645 ctx: CompressorContext,
646 _excludes: &[IntCode],
647 ) -> VortexResult<f64> {
648 if ctx.allowed_cascading == 0 {
650 return Ok(0.0);
651 }
652
653 if stats.value_count == 0 {
654 return Ok(0.0);
655 }
656
657 if stats.distinct_values_count > stats.value_count / 2 {
659 return Ok(0.0);
660 }
661
662 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
664
665 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
667
668 let n_runs = (stats.value_count / stats.average_run_length) as usize;
669
670 let codes_size_bp = (codes_bw * stats.value_count) as usize;
672 let codes_size_rle_bp = usize::checked_mul((codes_bw + 32) as usize, n_runs);
673
674 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp.unwrap_or(usize::MAX));
675
676 let before = stats.value_count as usize * stats.source().ptype().bit_width();
677
678 Ok(before as f64 / (values_size + codes_size) as f64)
679 }
680
681 fn compress(
682 &self,
683 compressor: &BtrBlocksCompressor,
684 stats: &IntegerStats,
685 ctx: CompressorContext,
686 excludes: &[IntCode],
687 ) -> VortexResult<ArrayRef> {
688 assert!(ctx.allowed_cascading > 0);
689
690 let dict = dictionary_encode(stats);
694
695 let mut new_excludes = vec![IntCode::Dict, IntCode::Sequence];
698 new_excludes.extend_from_slice(excludes);
699
700 let compressed_codes = compressor.compress_canonical(
701 Canonical::Primitive(dict.codes().to_primitive().narrow()?),
702 ctx.descend(),
703 Excludes::int_only(&new_excludes),
704 )?;
705
706 unsafe {
708 Ok(
709 DictArray::new_unchecked(compressed_codes, dict.values().clone())
710 .set_all_values_referenced(dict.has_all_values_referenced())
711 .into_array(),
712 )
713 }
714 }
715}
716
717impl Scheme for RunEndScheme {
718 type StatsType = IntegerStats;
719 type CodeType = IntCode;
720
721 fn code(&self) -> IntCode {
722 IntCode::RunEnd
723 }
724
725 fn expected_compression_ratio(
726 &self,
727 compressor: &BtrBlocksCompressor,
728 stats: &IntegerStats,
729 ctx: CompressorContext,
730 excludes: &[IntCode],
731 ) -> VortexResult<f64> {
732 if stats.average_run_length < RUN_END_THRESHOLD {
734 return Ok(0.0);
735 }
736
737 if ctx.allowed_cascading == 0 {
738 return Ok(0.0);
739 }
740
741 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
743 }
744
745 fn compress(
746 &self,
747 compressor: &BtrBlocksCompressor,
748 stats: &IntegerStats,
749 ctx: CompressorContext,
750 excludes: &[IntCode],
751 ) -> VortexResult<ArrayRef> {
752 assert!(ctx.allowed_cascading > 0);
753
754 let (ends, values) = runend_encode(&stats.src);
756
757 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
758 new_excludes.extend_from_slice(excludes);
759
760 let compressed_ends = compressor.compress_canonical(
761 Canonical::Primitive(ends.to_primitive()),
762 ctx.descend(),
763 Excludes::int_only(&new_excludes),
764 )?;
765
766 let compressed_values = compressor.compress_canonical(
767 Canonical::Primitive(values.to_primitive()),
768 ctx.descend(),
769 Excludes::int_only(&new_excludes),
770 )?;
771
772 unsafe {
774 Ok(
775 RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
776 .into_array(),
777 )
778 }
779 }
780}
781
782impl Scheme for SequenceScheme {
783 type StatsType = IntegerStats;
784 type CodeType = IntCode;
785
786 fn code(&self) -> Self::CodeType {
787 IntCode::Sequence
788 }
789
790 fn expected_compression_ratio(
791 &self,
792 _compressor: &BtrBlocksCompressor,
793 stats: &Self::StatsType,
794 _ctx: CompressorContext,
795 _excludes: &[Self::CodeType],
796 ) -> VortexResult<f64> {
797 if stats.null_count > 0 {
798 return Ok(0.0);
799 }
800
801 if stats.distinct_values_count != u32::MAX
804 && stats.distinct_values_count as usize != stats.src.len()
805 {
806 return Ok(0.0);
807 }
808
809 Ok(sequence_encode(&stats.src)?
812 .map(|_| stats.src.len() as f64 / 2.0)
813 .unwrap_or(0.0))
814 }
815
816 fn compress(
817 &self,
818 _compressor: &BtrBlocksCompressor,
819 stats: &Self::StatsType,
820 _ctx: CompressorContext,
821 _excludes: &[Self::CodeType],
822 ) -> VortexResult<ArrayRef> {
823 if stats.null_count > 0 {
824 vortex_bail!("sequence encoding does not support nulls");
825 }
826 sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
827 }
828}
829
830#[cfg(feature = "pco")]
831impl Scheme for PcoScheme {
832 type StatsType = IntegerStats;
833 type CodeType = IntCode;
834
835 fn code(&self) -> IntCode {
836 IntCode::Pco
837 }
838
839 fn expected_compression_ratio(
840 &self,
841 compressor: &BtrBlocksCompressor,
842 stats: &Self::StatsType,
843 ctx: CompressorContext,
844 excludes: &[IntCode],
845 ) -> VortexResult<f64> {
846 if matches!(
848 stats.src.ptype(),
849 vortex_dtype::PType::I8 | vortex_dtype::PType::U8
850 ) {
851 return Ok(0.0);
852 }
853
854 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
855 }
856
857 fn compress(
858 &self,
859 _compressor: &BtrBlocksCompressor,
860 stats: &Self::StatsType,
861 _ctx: CompressorContext,
862 _excludes: &[IntCode],
863 ) -> VortexResult<ArrayRef> {
864 Ok(vortex_pco::PcoArray::from_primitive(
865 stats.source(),
866 pco::DEFAULT_COMPRESSION_LEVEL,
867 8192,
868 )?
869 .into_array())
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use std::iter;
876
877 use itertools::Itertools;
878 use rand::RngCore;
879 use rand::SeedableRng;
880 use rand::rngs::StdRng;
881 use vortex_array::Array;
882 use vortex_array::IntoArray;
883 use vortex_array::ToCanonical;
884 use vortex_array::arrays::DictVTable;
885 use vortex_array::arrays::PrimitiveArray;
886 use vortex_array::assert_arrays_eq;
887 use vortex_array::validity::Validity;
888 use vortex_array::vtable::ValidityHelper;
889 use vortex_buffer::Buffer;
890 use vortex_buffer::BufferMut;
891 use vortex_buffer::buffer;
892 use vortex_error::VortexResult;
893 use vortex_sequence::SequenceVTable;
894 use vortex_sparse::SparseVTable;
895
896 use super::IntegerStats;
897 use super::RLE_INTEGER_SCHEME;
898 use super::SequenceScheme;
899 use super::SparseScheme;
900 use crate::BtrBlocksCompressor;
901 use crate::CompressorContext;
902 use crate::CompressorExt;
903 use crate::CompressorStats;
904 use crate::Scheme;
905
906 #[test]
907 fn test_empty() -> VortexResult<()> {
908 let btr = BtrBlocksCompressor::default();
910 let result = btr.integer_compressor().compress(
911 &btr,
912 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
913 CompressorContext::default(),
914 &[],
915 )?;
916
917 assert!(result.is_empty());
918 Ok(())
919 }
920
921 #[test]
922 fn test_dict_encodable() -> VortexResult<()> {
923 let mut codes = BufferMut::<i32>::with_capacity(65_535);
924 let numbers = [0, 10, 50, 100, 1000, 3000]
928 .into_iter()
929 .map(|i| 12340 * i) .collect_vec();
931
932 let mut rng = StdRng::seed_from_u64(1u64);
933 while codes.len() < 64000 {
934 let run_length = rng.next_u32() % 5;
935 let value = numbers[rng.next_u32() as usize % numbers.len()];
936 for _ in 0..run_length {
937 codes.push(value);
938 }
939 }
940
941 let primitive = codes.freeze().into_array().to_primitive();
942 let btr = BtrBlocksCompressor::default();
943 let compressed = btr.integer_compressor().compress(
944 &btr,
945 &primitive,
946 CompressorContext::default(),
947 &[],
948 )?;
949 assert!(compressed.is::<DictVTable>());
950 Ok(())
951 }
952
953 #[test]
954 fn sparse_with_nulls() -> VortexResult<()> {
955 let array = PrimitiveArray::new(
956 buffer![189u8, 189, 189, 0, 46],
957 Validity::from_iter(vec![true, true, true, true, false]),
958 );
959 let btr = BtrBlocksCompressor::default();
960 let compressed = SparseScheme.compress(
961 &btr,
962 &IntegerStats::generate(&array),
963 CompressorContext::default(),
964 &[],
965 )?;
966 assert!(compressed.is::<SparseVTable>());
967 let decoded = compressed.clone();
968 let expected =
969 PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
970 .into_array();
971 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
972 Ok(())
973 }
974
975 #[test]
976 fn sparse_mostly_nulls() -> VortexResult<()> {
977 let array = PrimitiveArray::new(
978 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
979 Validity::from_iter(vec![
980 false, false, false, false, false, false, false, false, false, false, true,
981 ]),
982 );
983 let btr = BtrBlocksCompressor::default();
984 let compressed = SparseScheme.compress(
985 &btr,
986 &IntegerStats::generate(&array),
987 CompressorContext::default(),
988 &[],
989 )?;
990 assert!(compressed.is::<SparseVTable>());
991 let decoded = compressed.clone();
992 let expected = PrimitiveArray::new(
993 buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
994 array.validity().clone(),
995 )
996 .into_array();
997 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
998 Ok(())
999 }
1000
1001 #[test]
1002 fn nullable_sequence() -> VortexResult<()> {
1003 let values = (0i32..20).step_by(7).collect_vec();
1004 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1005 let btr = BtrBlocksCompressor::default();
1006 let compressed = SequenceScheme.compress(
1007 &btr,
1008 &IntegerStats::generate(&array),
1009 CompressorContext::default(),
1010 &[],
1011 )?;
1012 assert!(compressed.is::<SequenceVTable>());
1013 let decoded = compressed;
1014 let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1015 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1016 Ok(())
1017 }
1018
1019 #[test]
1020 fn test_rle_compression() -> VortexResult<()> {
1021 let mut values = Vec::new();
1022 values.extend(iter::repeat_n(42i32, 100));
1023 values.extend(iter::repeat_n(123i32, 200));
1024 values.extend(iter::repeat_n(987i32, 150));
1025
1026 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1027 let btr = BtrBlocksCompressor::default();
1028 let compressed = RLE_INTEGER_SCHEME.compress(
1029 &btr,
1030 &IntegerStats::generate(&array),
1031 CompressorContext::default(),
1032 &[],
1033 )?;
1034
1035 let decoded = compressed;
1036 let expected = Buffer::copy_from(&values).into_array();
1037 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1038 Ok(())
1039 }
1040
1041 #[test_with::env(CI)]
1042 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1043 fn compress_large_int() -> VortexResult<()> {
1044 const NUM_LISTS: usize = 10_000;
1045 const ELEMENTS_PER_LIST: usize = 5_000;
1046
1047 let prim = (0..NUM_LISTS)
1048 .flat_map(|list_idx| {
1049 (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1050 })
1051 .collect::<PrimitiveArray>()
1052 .into_array();
1053
1054 let btr = BtrBlocksCompressor::default();
1055 drop(btr.compress(prim.as_ref())?);
1056
1057 Ok(())
1058 }
1059}
1060
1061#[cfg(test)]
1063mod scheme_selection_tests {
1064 use std::iter;
1065
1066 use vortex_array::arrays::ConstantVTable;
1067 use vortex_array::arrays::DictVTable;
1068 use vortex_array::arrays::PrimitiveArray;
1069 use vortex_array::validity::Validity;
1070 use vortex_buffer::Buffer;
1071 use vortex_error::VortexResult;
1072 use vortex_fastlanes::BitPackedVTable;
1073 use vortex_fastlanes::FoRVTable;
1074 use vortex_fastlanes::RLEVTable;
1075 use vortex_runend::RunEndVTable;
1076 use vortex_sequence::SequenceVTable;
1077 use vortex_sparse::SparseVTable;
1078
1079 use crate::BtrBlocksCompressor;
1080 use crate::CompressorContext;
1081 use crate::CompressorExt;
1082
1083 #[test]
1084 fn test_constant_compressed() -> VortexResult<()> {
1085 let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1086 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1087 let btr = BtrBlocksCompressor::default();
1088 let compressed =
1089 btr.integer_compressor()
1090 .compress(&btr, &array, CompressorContext::default(), &[])?;
1091 assert!(compressed.is::<ConstantVTable>());
1092 Ok(())
1093 }
1094
1095 #[test]
1096 fn test_for_compressed() -> VortexResult<()> {
1097 let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1098 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1099 let btr = BtrBlocksCompressor::default();
1100 let compressed =
1101 btr.integer_compressor()
1102 .compress(&btr, &array, CompressorContext::default(), &[])?;
1103 assert!(compressed.is::<FoRVTable>());
1104 Ok(())
1105 }
1106
1107 #[test]
1108 fn test_bitpacking_compressed() -> VortexResult<()> {
1109 let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1110 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1111 let btr = BtrBlocksCompressor::default();
1112 let compressed =
1113 btr.integer_compressor()
1114 .compress(&btr, &array, CompressorContext::default(), &[])?;
1115 assert!(compressed.is::<BitPackedVTable>());
1116 Ok(())
1117 }
1118
1119 #[test]
1120 fn test_sparse_compressed() -> VortexResult<()> {
1121 let mut values: Vec<i32> = Vec::new();
1122 for i in 0..1000 {
1123 if i % 20 == 0 {
1124 values.push(2_000_000 + (i * 7) % 1000);
1125 } else {
1126 values.push(1_000_000);
1127 }
1128 }
1129 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1130 let btr = BtrBlocksCompressor::default();
1131 let compressed =
1132 btr.integer_compressor()
1133 .compress(&btr, &array, CompressorContext::default(), &[])?;
1134 assert!(compressed.is::<SparseVTable>());
1135 Ok(())
1136 }
1137
1138 #[test]
1139 fn test_dict_compressed() -> VortexResult<()> {
1140 use rand::RngCore;
1141 use rand::SeedableRng;
1142 use rand::rngs::StdRng;
1143
1144 let mut codes = Vec::with_capacity(65_535);
1145 let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1146 .into_iter()
1147 .map(|i| 12340 * i) .collect();
1149
1150 let mut rng = StdRng::seed_from_u64(1u64);
1151 while codes.len() < 64000 {
1152 let run_length = rng.next_u32() % 5;
1153 let value = numbers[rng.next_u32() as usize % numbers.len()];
1154 for _ in 0..run_length {
1155 codes.push(value);
1156 }
1157 }
1158
1159 let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1160 let btr = BtrBlocksCompressor::default();
1161 let compressed =
1162 btr.integer_compressor()
1163 .compress(&btr, &array, CompressorContext::default(), &[])?;
1164 assert!(compressed.is::<DictVTable>());
1165 Ok(())
1166 }
1167
1168 #[test]
1169 fn test_runend_compressed() -> VortexResult<()> {
1170 let mut values: Vec<i32> = Vec::new();
1171 for i in 0..100 {
1172 values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1173 }
1174 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1175 let btr = BtrBlocksCompressor::default();
1176 let compressed =
1177 btr.integer_compressor()
1178 .compress(&btr, &array, CompressorContext::default(), &[])?;
1179 assert!(compressed.is::<RunEndVTable>());
1180 Ok(())
1181 }
1182
1183 #[test]
1184 fn test_sequence_compressed() -> VortexResult<()> {
1185 let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1186 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1187 let btr = BtrBlocksCompressor::default();
1188 let compressed =
1189 btr.integer_compressor()
1190 .compress(&btr, &array, CompressorContext::default(), &[])?;
1191 assert!(compressed.is::<SequenceVTable>());
1192 Ok(())
1193 }
1194
1195 #[test]
1196 fn test_rle_compressed() -> VortexResult<()> {
1197 let mut values: Vec<i32> = Vec::new();
1198 for i in 0..10 {
1199 values.extend(iter::repeat_n(i, 100));
1200 }
1201 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1202 let btr = BtrBlocksCompressor::default();
1203 let compressed =
1204 btr.integer_compressor()
1205 .compress(&btr, &array, CompressorContext::default(), &[])?;
1206 assert!(compressed.is::<RLEVTable>());
1207 Ok(())
1208 }
1209}