1pub(crate) mod dictionary;
5pub(super) mod stats;
6
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use enum_iterator::Sequence;
11pub use stats::IntegerStats;
12use vortex_array::ArrayRef;
13use vortex_array::Canonical;
14use vortex_array::IntoArray;
15use vortex_array::ToCanonical;
16use vortex_array::arrays::ConstantArray;
17use vortex_array::arrays::DictArray;
18use vortex_array::arrays::MaskedArray;
19use vortex_array::arrays::PrimitiveArray;
20use vortex_array::arrays::PrimitiveVTable;
21use vortex_array::scalar::Scalar;
22use vortex_array::vtable::VTable;
23use vortex_array::vtable::ValidityHelper;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_fastlanes::FoRArray;
29use vortex_fastlanes::bitpack_compress::bit_width_histogram;
30use vortex_fastlanes::bitpack_compress::bitpack_encode;
31use vortex_fastlanes::bitpack_compress::find_best_bit_width;
32use vortex_runend::RunEndArray;
33use vortex_runend::compress::runend_encode;
34use vortex_sequence::sequence_encode;
35use vortex_sparse::SparseArray;
36use vortex_sparse::SparseVTable;
37use vortex_zigzag::ZigZagArray;
38use vortex_zigzag::zigzag_encode;
39
40use self::dictionary::dictionary_encode;
41use crate::BtrBlocksCompressor;
42use crate::CanonicalCompressor;
43use crate::Compressor;
44use crate::CompressorContext;
45use crate::CompressorStats;
46use crate::Excludes;
47use crate::GenerateStatsOptions;
48use crate::Scheme;
49use crate::SchemeExt;
50use crate::compressor::patches::compress_patches;
51use crate::compressor::rle;
52use crate::compressor::rle::RLEScheme;
53
54pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[
56 &ConstantScheme,
57 &FORScheme,
58 &ZigZagScheme,
59 &BitPackingScheme,
60 &SparseScheme,
61 &DictScheme,
62 &RunEndScheme,
63 &SequenceScheme,
64 &RLE_INTEGER_SCHEME,
65 #[cfg(feature = "pco")]
66 &PcoScheme,
67];
68
69#[derive(Clone, Copy)]
71pub struct IntCompressor<'a> {
72 pub btr_blocks_compressor: &'a dyn CanonicalCompressor,
74}
75
76impl<'a> Compressor for IntCompressor<'a> {
77 type ArrayVTable = PrimitiveVTable;
78 type SchemeType = dyn IntegerScheme;
79 type StatsType = IntegerStats;
80
81 fn schemes(&self) -> &[&'static dyn IntegerScheme] {
82 self.btr_blocks_compressor.int_schemes()
83 }
84
85 fn default_scheme(&self) -> &'static Self::SchemeType {
86 &UncompressedScheme
87 }
88
89 fn gen_stats(&self, array: &<Self::ArrayVTable as VTable>::Array) -> Self::StatsType {
90 if self
91 .btr_blocks_compressor
92 .int_schemes()
93 .iter()
94 .any(|s| s.code() == IntCode::Dict)
95 {
96 IntegerStats::generate_opts(
97 array,
98 GenerateStatsOptions {
99 count_distinct_values: true,
100 },
101 )
102 } else {
103 IntegerStats::generate_opts(
104 array,
105 GenerateStatsOptions {
106 count_distinct_values: false,
107 },
108 )
109 }
110 }
111}
112
113pub trait IntegerScheme:
114 Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
115{
116}
117
118impl<T> IntegerScheme for T where
120 T: Scheme<StatsType = IntegerStats, CodeType = IntCode> + Send + Sync
121{
122}
123
124impl PartialEq for dyn IntegerScheme {
125 fn eq(&self, other: &Self) -> bool {
126 self.code() == other.code()
127 }
128}
129
130impl Eq for dyn IntegerScheme {}
131
132impl Hash for dyn IntegerScheme {
133 fn hash<H: Hasher>(&self, state: &mut H) {
134 self.code().hash(state)
135 }
136}
137
138#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
142pub enum IntCode {
143 Uncompressed,
145 Constant,
147 For,
149 BitPacking,
151 ZigZag,
153 Sparse,
155 Dict,
157 RunEnd,
159 Sequence,
161 Rle,
163 Pco,
165}
166
167#[derive(Debug, Copy, Clone, PartialEq, Eq)]
168
169pub struct UncompressedScheme;
170
171#[derive(Debug, Copy, Clone, PartialEq, Eq)]
172
173pub struct ConstantScheme;
174
175#[derive(Debug, Copy, Clone, PartialEq, Eq)]
176
177pub struct FORScheme;
178
179#[derive(Debug, Copy, Clone, PartialEq, Eq)]
180pub struct ZigZagScheme;
181
182#[derive(Debug, Copy, Clone, PartialEq, Eq)]
183pub struct BitPackingScheme;
184
185#[derive(Debug, Copy, Clone, PartialEq, Eq)]
186pub struct SparseScheme;
187
188#[derive(Debug, Copy, Clone, PartialEq, Eq)]
189pub struct DictScheme;
190
191#[derive(Debug, Copy, Clone, PartialEq, Eq)]
192pub struct RunEndScheme;
193
194#[derive(Debug, Copy, Clone, PartialEq, Eq)]
195pub struct SequenceScheme;
196
197#[cfg(feature = "pco")]
199#[derive(Debug, Copy, Clone, PartialEq, Eq)]
200pub struct PcoScheme;
201
202const RUN_END_THRESHOLD: u32 = 4;
204
205#[derive(Debug, Copy, Clone, PartialEq, Eq)]
207pub struct IntRLEConfig;
208
209impl rle::RLEConfig for IntRLEConfig {
210 type Stats = IntegerStats;
211 type Code = IntCode;
212
213 const CODE: IntCode = IntCode::Rle;
214
215 fn compress_values(
216 compressor: &BtrBlocksCompressor,
217 values: &PrimitiveArray,
218 ctx: CompressorContext,
219 excludes: &[IntCode],
220 ) -> VortexResult<ArrayRef> {
221 compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into())
222 }
223}
224
225pub const RLE_INTEGER_SCHEME: RLEScheme<IntRLEConfig> = RLEScheme::new();
227
228impl Scheme for UncompressedScheme {
229 type StatsType = IntegerStats;
230 type CodeType = IntCode;
231
232 fn code(&self) -> IntCode {
233 IntCode::Uncompressed
234 }
235
236 fn expected_compression_ratio(
237 &self,
238 _compressor: &BtrBlocksCompressor,
239 _stats: &IntegerStats,
240 _ctx: CompressorContext,
241 _excludes: &[IntCode],
242 ) -> VortexResult<f64> {
243 Ok(1.0)
245 }
246
247 fn compress(
248 &self,
249 _compressor: &BtrBlocksCompressor,
250 stats: &IntegerStats,
251 _ctx: CompressorContext,
252 _excludes: &[IntCode],
253 ) -> VortexResult<ArrayRef> {
254 Ok(stats.source().to_array())
255 }
256}
257
258impl Scheme for ConstantScheme {
259 type StatsType = IntegerStats;
260 type CodeType = IntCode;
261
262 fn code(&self) -> IntCode {
263 IntCode::Constant
264 }
265
266 fn is_constant(&self) -> bool {
267 true
268 }
269
270 fn expected_compression_ratio(
271 &self,
272 _compressor: &BtrBlocksCompressor,
273 stats: &IntegerStats,
274 ctx: CompressorContext,
275 _excludes: &[IntCode],
276 ) -> VortexResult<f64> {
277 if ctx.is_sample {
279 return Ok(0.0);
280 }
281
282 if stats.distinct_values_count > 1 {
284 return Ok(0.0);
285 }
286
287 Ok(stats.value_count as f64)
288 }
289
290 fn compress(
291 &self,
292 _compressor: &BtrBlocksCompressor,
293 stats: &IntegerStats,
294 _ctx: CompressorContext,
295 _excludes: &[IntCode],
296 ) -> VortexResult<ArrayRef> {
297 let scalar_idx =
298 (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false));
299
300 match scalar_idx {
301 Some(idx) => {
302 let scalar = stats.source().scalar_at(idx)?;
303 let const_arr = ConstantArray::new(scalar, stats.src.len()).into_array();
304 if !stats.source().all_valid()? {
305 Ok(MaskedArray::try_new(const_arr, stats.src.validity().clone())?.into_array())
306 } else {
307 Ok(const_arr)
308 }
309 }
310 None => Ok(ConstantArray::new(
311 Scalar::null(stats.src.dtype().clone()),
312 stats.src.len(),
313 )
314 .into_array()),
315 }
316 }
317}
318
319impl Scheme for FORScheme {
320 type StatsType = IntegerStats;
321 type CodeType = IntCode;
322
323 fn code(&self) -> IntCode {
324 IntCode::For
325 }
326
327 fn expected_compression_ratio(
328 &self,
329 _compressor: &BtrBlocksCompressor,
330 stats: &IntegerStats,
331 ctx: CompressorContext,
332 _excludes: &[IntCode],
333 ) -> VortexResult<f64> {
334 if ctx.allowed_cascading == 0 {
336 return Ok(0.0);
337 }
338
339 if stats.value_count == 0 {
341 return Ok(0.0);
342 }
343
344 if stats.typed.min_is_zero() {
346 return Ok(0.0);
347 }
348
349 let full_width: u32 = stats
351 .src
352 .ptype()
353 .bit_width()
354 .try_into()
355 .vortex_expect("bit width must fit in u32");
356 let for_bw = match stats.typed.max_minus_min().checked_ilog2() {
357 Some(l) => l + 1,
358 None => return Ok(0.0),
361 };
362
363 if let Some(max_log) = stats
367 .typed
368 .max_ilog2()
369 .filter(|_| !stats.typed.min_is_negative())
370 {
371 let bitpack_bw = max_log + 1;
372 if for_bw >= bitpack_bw {
373 return Ok(0.0);
374 }
375 }
376
377 Ok(full_width as f64 / for_bw as f64)
378 }
379
380 fn compress(
381 &self,
382 compressor: &BtrBlocksCompressor,
383 stats: &IntegerStats,
384 ctx: CompressorContext,
385 excludes: &[IntCode],
386 ) -> VortexResult<ArrayRef> {
387 let for_array = FoRArray::encode(stats.src.clone())?;
388 let biased = for_array.encoded().to_primitive();
389 let biased_stats = IntegerStats::generate_opts(
390 &biased,
391 GenerateStatsOptions {
392 count_distinct_values: false,
393 },
394 );
395
396 let leaf_ctx = CompressorContext {
401 is_sample: ctx.is_sample,
402 allowed_cascading: 0,
403 };
404 let compressed =
405 BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes)?;
406
407 let for_compressed = FoRArray::try_new(compressed, for_array.reference_scalar().clone())?;
408 for_compressed
409 .as_ref()
410 .statistics()
411 .inherit_from(for_array.as_ref().statistics());
412 Ok(for_compressed.into_array())
413 }
414}
415
416impl Scheme for ZigZagScheme {
417 type StatsType = IntegerStats;
418 type CodeType = IntCode;
419
420 fn code(&self) -> IntCode {
421 IntCode::ZigZag
422 }
423
424 fn expected_compression_ratio(
425 &self,
426 compressor: &BtrBlocksCompressor,
427 stats: &IntegerStats,
428 ctx: CompressorContext,
429 excludes: &[IntCode],
430 ) -> VortexResult<f64> {
431 if ctx.allowed_cascading == 0 {
433 return Ok(0.0);
434 }
435
436 if stats.value_count == 0 {
438 return Ok(0.0);
439 }
440
441 if !stats.typed.min_is_negative() {
443 return Ok(0.0);
444 }
445
446 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
448 }
449
450 fn compress(
451 &self,
452 compressor: &BtrBlocksCompressor,
453 stats: &IntegerStats,
454 ctx: CompressorContext,
455 excludes: &[IntCode],
456 ) -> VortexResult<ArrayRef> {
457 let zag = zigzag_encode(stats.src.clone())?;
459 let encoded = zag.encoded().to_primitive();
460
461 let mut new_excludes = vec![
464 ZigZagScheme.code(),
465 DictScheme.code(),
466 RunEndScheme.code(),
467 SparseScheme.code(),
468 ];
469 new_excludes.extend_from_slice(excludes);
470
471 let compressed = compressor.compress_canonical(
472 Canonical::Primitive(encoded),
473 ctx.descend(),
474 Excludes::int_only(&new_excludes),
475 )?;
476
477 tracing::debug!("zigzag output: {}", compressed.encoding_id());
478
479 Ok(ZigZagArray::try_new(compressed)?.into_array())
480 }
481}
482
483impl Scheme for BitPackingScheme {
484 type StatsType = IntegerStats;
485 type CodeType = IntCode;
486
487 fn code(&self) -> IntCode {
488 IntCode::BitPacking
489 }
490
491 fn expected_compression_ratio(
492 &self,
493 compressor: &BtrBlocksCompressor,
494 stats: &IntegerStats,
495 ctx: CompressorContext,
496 excludes: &[IntCode],
497 ) -> VortexResult<f64> {
498 if stats.typed.min_is_negative() {
500 return Ok(0.0);
501 }
502
503 if stats.value_count == 0 {
505 return Ok(0.0);
506 }
507
508 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
509 }
510
511 fn compress(
512 &self,
513 _compressor: &BtrBlocksCompressor,
514 stats: &IntegerStats,
515 _ctx: CompressorContext,
516 _excludes: &[IntCode],
517 ) -> VortexResult<ArrayRef> {
518 let histogram = bit_width_histogram(stats.source())?;
519 let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
520 if bw as usize == stats.source().ptype().bit_width() {
522 return Ok(stats.source().clone().into_array());
523 }
524 let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
525
526 let patches = packed.patches().map(compress_patches).transpose()?;
527 packed.replace_patches(patches);
528
529 Ok(packed.into_array())
530 }
531}
532
533impl Scheme for SparseScheme {
534 type StatsType = IntegerStats;
535 type CodeType = IntCode;
536
537 fn code(&self) -> IntCode {
538 IntCode::Sparse
539 }
540
541 fn expected_compression_ratio(
543 &self,
544 _compressor: &BtrBlocksCompressor,
545 stats: &IntegerStats,
546 ctx: CompressorContext,
547 _excludes: &[IntCode],
548 ) -> VortexResult<f64> {
549 if ctx.allowed_cascading == 0 {
551 return Ok(0.0);
552 }
553
554 if stats.value_count == 0 {
555 return Ok(0.0);
557 }
558
559 if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
561 return Ok(stats.src.len() as f64 / stats.value_count as f64);
562 }
563
564 let (_, top_count) = stats.typed.top_value_and_count();
566
567 if top_count == stats.value_count {
568 return Ok(0.0);
570 }
571
572 let freq = top_count as f64 / stats.value_count as f64;
573 if freq >= 0.9 {
574 return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
576 }
577
578 Ok(0.0)
579 }
580
581 fn compress(
582 &self,
583 compressor: &BtrBlocksCompressor,
584 stats: &IntegerStats,
585 ctx: CompressorContext,
586 excludes: &[IntCode],
587 ) -> VortexResult<ArrayRef> {
588 assert!(ctx.allowed_cascading > 0);
589 let (top_pvalue, top_count) = stats.typed.top_value_and_count();
590 if top_count as usize == stats.src.len() {
591 return Ok(ConstantArray::new(
593 Scalar::primitive_value(
594 top_pvalue,
595 top_pvalue.ptype(),
596 stats.src.dtype().nullability(),
597 ),
598 stats.src.len(),
599 )
600 .into_array());
601 }
602
603 let sparse_encoded = SparseArray::encode(
604 stats.src.as_ref(),
605 Some(Scalar::primitive_value(
606 top_pvalue,
607 top_pvalue.ptype(),
608 stats.src.dtype().nullability(),
609 )),
610 )?;
611
612 if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
613 let mut new_excludes = vec![SparseScheme.code(), IntCode::Dict];
615 new_excludes.extend_from_slice(excludes);
616
617 let compressed_values = compressor.compress_canonical(
618 Canonical::Primitive(sparse.patches().values().to_primitive()),
619 ctx.descend(),
620 Excludes::int_only(&new_excludes),
621 )?;
622
623 let indices = sparse.patches().indices().to_primitive().narrow()?;
624
625 let compressed_indices = compressor.compress_canonical(
626 Canonical::Primitive(indices),
627 ctx.descend(),
628 Excludes::int_only(&new_excludes),
629 )?;
630
631 SparseArray::try_new(
632 compressed_indices,
633 compressed_values,
634 sparse.len(),
635 sparse.fill_scalar().clone(),
636 )
637 .map(|a| a.into_array())
638 } else {
639 Ok(sparse_encoded)
640 }
641 }
642}
643
644impl Scheme for DictScheme {
645 type StatsType = IntegerStats;
646 type CodeType = IntCode;
647
648 fn code(&self) -> IntCode {
649 IntCode::Dict
650 }
651
652 fn expected_compression_ratio(
653 &self,
654 _compressor: &BtrBlocksCompressor,
655 stats: &IntegerStats,
656 ctx: CompressorContext,
657 _excludes: &[IntCode],
658 ) -> VortexResult<f64> {
659 if ctx.allowed_cascading == 0 {
661 return Ok(0.0);
662 }
663
664 if stats.value_count == 0 {
665 return Ok(0.0);
666 }
667
668 if stats.distinct_values_count > stats.value_count / 2 {
670 return Ok(0.0);
671 }
672
673 let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
675
676 let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
678
679 let n_runs = (stats.value_count / stats.average_run_length) as usize;
680
681 let codes_size_bp = (codes_bw * stats.value_count) as usize;
683 let codes_size_rle_bp = usize::checked_mul((codes_bw + 32) as usize, n_runs);
684
685 let codes_size = usize::min(codes_size_bp, codes_size_rle_bp.unwrap_or(usize::MAX));
686
687 let before = stats.value_count as usize * stats.source().ptype().bit_width();
688
689 Ok(before as f64 / (values_size + codes_size) as f64)
690 }
691
692 fn compress(
693 &self,
694 compressor: &BtrBlocksCompressor,
695 stats: &IntegerStats,
696 ctx: CompressorContext,
697 excludes: &[IntCode],
698 ) -> VortexResult<ArrayRef> {
699 assert!(ctx.allowed_cascading > 0);
700
701 let dict = dictionary_encode(stats);
705
706 let mut new_excludes = vec![IntCode::Dict, IntCode::Sequence];
709 new_excludes.extend_from_slice(excludes);
710
711 let compressed_codes = compressor.compress_canonical(
712 Canonical::Primitive(dict.codes().to_primitive().narrow()?),
713 ctx.descend(),
714 Excludes::int_only(&new_excludes),
715 )?;
716
717 unsafe {
719 Ok(
720 DictArray::new_unchecked(compressed_codes, dict.values().clone())
721 .set_all_values_referenced(dict.has_all_values_referenced())
722 .into_array(),
723 )
724 }
725 }
726}
727
728impl Scheme for RunEndScheme {
729 type StatsType = IntegerStats;
730 type CodeType = IntCode;
731
732 fn code(&self) -> IntCode {
733 IntCode::RunEnd
734 }
735
736 fn expected_compression_ratio(
737 &self,
738 compressor: &BtrBlocksCompressor,
739 stats: &IntegerStats,
740 ctx: CompressorContext,
741 excludes: &[IntCode],
742 ) -> VortexResult<f64> {
743 if stats.average_run_length < RUN_END_THRESHOLD {
745 return Ok(0.0);
746 }
747
748 if ctx.allowed_cascading == 0 {
749 return Ok(0.0);
750 }
751
752 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
754 }
755
756 fn compress(
757 &self,
758 compressor: &BtrBlocksCompressor,
759 stats: &IntegerStats,
760 ctx: CompressorContext,
761 excludes: &[IntCode],
762 ) -> VortexResult<ArrayRef> {
763 assert!(ctx.allowed_cascading > 0);
764
765 let (ends, values) = runend_encode(&stats.src);
767
768 let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
769 new_excludes.extend_from_slice(excludes);
770
771 let compressed_ends = compressor.compress_canonical(
772 Canonical::Primitive(ends.to_primitive()),
773 ctx.descend(),
774 Excludes::int_only(&new_excludes),
775 )?;
776
777 let compressed_values = compressor.compress_canonical(
778 Canonical::Primitive(values.to_primitive()),
779 ctx.descend(),
780 Excludes::int_only(&new_excludes),
781 )?;
782
783 unsafe {
785 Ok(
786 RunEndArray::new_unchecked(compressed_ends, compressed_values, 0, stats.src.len())
787 .into_array(),
788 )
789 }
790 }
791}
792
793impl Scheme for SequenceScheme {
794 type StatsType = IntegerStats;
795 type CodeType = IntCode;
796
797 fn code(&self) -> Self::CodeType {
798 IntCode::Sequence
799 }
800
801 fn expected_compression_ratio(
802 &self,
803 _compressor: &BtrBlocksCompressor,
804 stats: &Self::StatsType,
805 _ctx: CompressorContext,
806 _excludes: &[Self::CodeType],
807 ) -> VortexResult<f64> {
808 if stats.null_count > 0 {
809 return Ok(0.0);
810 }
811
812 if stats.distinct_values_count != u32::MAX
815 && stats.distinct_values_count as usize != stats.src.len()
816 {
817 return Ok(0.0);
818 }
819
820 Ok(sequence_encode(&stats.src)?
823 .map(|_| stats.src.len() as f64 / 2.0)
824 .unwrap_or(0.0))
825 }
826
827 fn compress(
828 &self,
829 _compressor: &BtrBlocksCompressor,
830 stats: &Self::StatsType,
831 _ctx: CompressorContext,
832 _excludes: &[Self::CodeType],
833 ) -> VortexResult<ArrayRef> {
834 if stats.null_count > 0 {
835 vortex_bail!("sequence encoding does not support nulls");
836 }
837 sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
838 }
839}
840
841#[cfg(feature = "pco")]
842impl Scheme for PcoScheme {
843 type StatsType = IntegerStats;
844 type CodeType = IntCode;
845
846 fn code(&self) -> IntCode {
847 IntCode::Pco
848 }
849
850 fn expected_compression_ratio(
851 &self,
852 compressor: &BtrBlocksCompressor,
853 stats: &Self::StatsType,
854 ctx: CompressorContext,
855 excludes: &[IntCode],
856 ) -> VortexResult<f64> {
857 if matches!(
859 stats.src.ptype(),
860 vortex_array::dtype::PType::I8 | vortex_array::dtype::PType::U8
861 ) {
862 return Ok(0.0);
863 }
864
865 self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
866 }
867
868 fn compress(
869 &self,
870 _compressor: &BtrBlocksCompressor,
871 stats: &Self::StatsType,
872 _ctx: CompressorContext,
873 _excludes: &[IntCode],
874 ) -> VortexResult<ArrayRef> {
875 Ok(vortex_pco::PcoArray::from_primitive(
876 stats.source(),
877 pco::DEFAULT_COMPRESSION_LEVEL,
878 8192,
879 )?
880 .into_array())
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use std::iter;
887
888 use itertools::Itertools;
889 use rand::RngCore;
890 use rand::SeedableRng;
891 use rand::rngs::StdRng;
892 use vortex_array::Array;
893 use vortex_array::IntoArray;
894 use vortex_array::ToCanonical;
895 use vortex_array::arrays::DictVTable;
896 use vortex_array::arrays::PrimitiveArray;
897 use vortex_array::assert_arrays_eq;
898 use vortex_array::validity::Validity;
899 use vortex_array::vtable::ValidityHelper;
900 use vortex_buffer::Buffer;
901 use vortex_buffer::BufferMut;
902 use vortex_buffer::buffer;
903 use vortex_error::VortexResult;
904 use vortex_sequence::SequenceVTable;
905 use vortex_sparse::SparseVTable;
906
907 use super::IntegerStats;
908 use super::RLE_INTEGER_SCHEME;
909 use super::SequenceScheme;
910 use super::SparseScheme;
911 use crate::BtrBlocksCompressor;
912 use crate::CompressorContext;
913 use crate::CompressorExt;
914 use crate::CompressorStats;
915 use crate::Scheme;
916
917 #[test]
918 fn test_empty() -> VortexResult<()> {
919 let btr = BtrBlocksCompressor::default();
921 let result = btr.integer_compressor().compress(
922 &btr,
923 &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
924 CompressorContext::default(),
925 &[],
926 )?;
927
928 assert!(result.is_empty());
929 Ok(())
930 }
931
932 #[test]
933 fn test_dict_encodable() -> VortexResult<()> {
934 let mut codes = BufferMut::<i32>::with_capacity(65_535);
935 let numbers = [0, 10, 50, 100, 1000, 3000]
939 .into_iter()
940 .map(|i| 12340 * i) .collect_vec();
942
943 let mut rng = StdRng::seed_from_u64(1u64);
944 while codes.len() < 64000 {
945 let run_length = rng.next_u32() % 5;
946 let value = numbers[rng.next_u32() as usize % numbers.len()];
947 for _ in 0..run_length {
948 codes.push(value);
949 }
950 }
951
952 let primitive = codes.freeze().into_array().to_primitive();
953 let btr = BtrBlocksCompressor::default();
954 let compressed = btr.integer_compressor().compress(
955 &btr,
956 &primitive,
957 CompressorContext::default(),
958 &[],
959 )?;
960 assert!(compressed.is::<DictVTable>());
961 Ok(())
962 }
963
964 #[test]
965 fn sparse_with_nulls() -> VortexResult<()> {
966 let array = PrimitiveArray::new(
967 buffer![189u8, 189, 189, 0, 46],
968 Validity::from_iter(vec![true, true, true, true, false]),
969 );
970 let btr = BtrBlocksCompressor::default();
971 let compressed = SparseScheme.compress(
972 &btr,
973 &IntegerStats::generate(&array),
974 CompressorContext::default(),
975 &[],
976 )?;
977 assert!(compressed.is::<SparseVTable>());
978 let decoded = compressed.clone();
979 let expected =
980 PrimitiveArray::new(buffer![189u8, 189, 189, 0, 0], array.validity().clone())
981 .into_array();
982 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
983 Ok(())
984 }
985
986 #[test]
987 fn sparse_mostly_nulls() -> VortexResult<()> {
988 let array = PrimitiveArray::new(
989 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
990 Validity::from_iter(vec![
991 false, false, false, false, false, false, false, false, false, false, true,
992 ]),
993 );
994 let btr = BtrBlocksCompressor::default();
995 let compressed = SparseScheme.compress(
996 &btr,
997 &IntegerStats::generate(&array),
998 CompressorContext::default(),
999 &[],
1000 )?;
1001 assert!(compressed.is::<SparseVTable>());
1002 let decoded = compressed.clone();
1003 let expected = PrimitiveArray::new(
1004 buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46],
1005 array.validity().clone(),
1006 )
1007 .into_array();
1008 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1009 Ok(())
1010 }
1011
1012 #[test]
1013 fn nullable_sequence() -> VortexResult<()> {
1014 let values = (0i32..20).step_by(7).collect_vec();
1015 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1016 let btr = BtrBlocksCompressor::default();
1017 let compressed = SequenceScheme.compress(
1018 &btr,
1019 &IntegerStats::generate(&array),
1020 CompressorContext::default(),
1021 &[],
1022 )?;
1023 assert!(compressed.is::<SequenceVTable>());
1024 let decoded = compressed;
1025 let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1026 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1027 Ok(())
1028 }
1029
1030 #[test]
1031 fn test_rle_compression() -> VortexResult<()> {
1032 let mut values = Vec::new();
1033 values.extend(iter::repeat_n(42i32, 100));
1034 values.extend(iter::repeat_n(123i32, 200));
1035 values.extend(iter::repeat_n(987i32, 150));
1036
1037 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1038 let btr = BtrBlocksCompressor::default();
1039 let compressed = RLE_INTEGER_SCHEME.compress(
1040 &btr,
1041 &IntegerStats::generate(&array),
1042 CompressorContext::default(),
1043 &[],
1044 )?;
1045
1046 let decoded = compressed;
1047 let expected = Buffer::copy_from(&values).into_array();
1048 assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
1049 Ok(())
1050 }
1051
1052 #[test_with::env(CI)]
1053 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1054 fn compress_large_int() -> VortexResult<()> {
1055 const NUM_LISTS: usize = 10_000;
1056 const ELEMENTS_PER_LIST: usize = 5_000;
1057
1058 let prim = (0..NUM_LISTS)
1059 .flat_map(|list_idx| {
1060 (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1061 })
1062 .collect::<PrimitiveArray>()
1063 .into_array();
1064
1065 let btr = BtrBlocksCompressor::default();
1066 drop(btr.compress(prim.as_ref())?);
1067
1068 Ok(())
1069 }
1070}
1071
1072#[cfg(test)]
1074mod scheme_selection_tests {
1075 use std::iter;
1076
1077 use vortex_array::arrays::ConstantVTable;
1078 use vortex_array::arrays::DictVTable;
1079 use vortex_array::arrays::PrimitiveArray;
1080 use vortex_array::validity::Validity;
1081 use vortex_buffer::Buffer;
1082 use vortex_error::VortexResult;
1083 use vortex_fastlanes::BitPackedVTable;
1084 use vortex_fastlanes::FoRVTable;
1085 use vortex_fastlanes::RLEVTable;
1086 use vortex_runend::RunEndVTable;
1087 use vortex_sequence::SequenceVTable;
1088 use vortex_sparse::SparseVTable;
1089
1090 use crate::BtrBlocksCompressor;
1091 use crate::CompressorContext;
1092 use crate::CompressorExt;
1093
1094 #[test]
1095 fn test_constant_compressed() -> VortexResult<()> {
1096 let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1097 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1098 let btr = BtrBlocksCompressor::default();
1099 let compressed =
1100 btr.integer_compressor()
1101 .compress(&btr, &array, CompressorContext::default(), &[])?;
1102 assert!(compressed.is::<ConstantVTable>());
1103 Ok(())
1104 }
1105
1106 #[test]
1107 fn test_for_compressed() -> VortexResult<()> {
1108 let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1109 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1110 let btr = BtrBlocksCompressor::default();
1111 let compressed =
1112 btr.integer_compressor()
1113 .compress(&btr, &array, CompressorContext::default(), &[])?;
1114 assert!(compressed.is::<FoRVTable>());
1115 Ok(())
1116 }
1117
1118 #[test]
1119 fn test_bitpacking_compressed() -> VortexResult<()> {
1120 let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1121 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1122 let btr = BtrBlocksCompressor::default();
1123 let compressed =
1124 btr.integer_compressor()
1125 .compress(&btr, &array, CompressorContext::default(), &[])?;
1126 assert!(compressed.is::<BitPackedVTable>());
1127 Ok(())
1128 }
1129
1130 #[test]
1131 fn test_sparse_compressed() -> VortexResult<()> {
1132 let mut values: Vec<i32> = Vec::new();
1133 for i in 0..1000 {
1134 if i % 20 == 0 {
1135 values.push(2_000_000 + (i * 7) % 1000);
1136 } else {
1137 values.push(1_000_000);
1138 }
1139 }
1140 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1141 let btr = BtrBlocksCompressor::default();
1142 let compressed =
1143 btr.integer_compressor()
1144 .compress(&btr, &array, CompressorContext::default(), &[])?;
1145 assert!(compressed.is::<SparseVTable>());
1146 Ok(())
1147 }
1148
1149 #[test]
1150 fn test_dict_compressed() -> VortexResult<()> {
1151 use rand::RngCore;
1152 use rand::SeedableRng;
1153 use rand::rngs::StdRng;
1154
1155 let mut codes = Vec::with_capacity(65_535);
1156 let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1157 .into_iter()
1158 .map(|i| 12340 * i) .collect();
1160
1161 let mut rng = StdRng::seed_from_u64(1u64);
1162 while codes.len() < 64000 {
1163 let run_length = rng.next_u32() % 5;
1164 let value = numbers[rng.next_u32() as usize % numbers.len()];
1165 for _ in 0..run_length {
1166 codes.push(value);
1167 }
1168 }
1169
1170 let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1171 let btr = BtrBlocksCompressor::default();
1172 let compressed =
1173 btr.integer_compressor()
1174 .compress(&btr, &array, CompressorContext::default(), &[])?;
1175 assert!(compressed.is::<DictVTable>());
1176 Ok(())
1177 }
1178
1179 #[test]
1180 fn test_runend_compressed() -> VortexResult<()> {
1181 let mut values: Vec<i32> = Vec::new();
1182 for i in 0..100 {
1183 values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1184 }
1185 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1186 let btr = BtrBlocksCompressor::default();
1187 let compressed =
1188 btr.integer_compressor()
1189 .compress(&btr, &array, CompressorContext::default(), &[])?;
1190 assert!(compressed.is::<RunEndVTable>());
1191 Ok(())
1192 }
1193
1194 #[test]
1195 fn test_sequence_compressed() -> VortexResult<()> {
1196 let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1197 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1198 let btr = BtrBlocksCompressor::default();
1199 let compressed =
1200 btr.integer_compressor()
1201 .compress(&btr, &array, CompressorContext::default(), &[])?;
1202 assert!(compressed.is::<SequenceVTable>());
1203 Ok(())
1204 }
1205
1206 #[test]
1207 fn test_rle_compressed() -> VortexResult<()> {
1208 let mut values: Vec<i32> = Vec::new();
1209 for i in 0..10 {
1210 values.extend(iter::repeat_n(i, 100));
1211 }
1212 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1213 let btr = BtrBlocksCompressor::default();
1214 let compressed =
1215 btr.integer_compressor()
1216 .compress(&btr, &array, CompressorContext::default(), &[])?;
1217 assert!(compressed.is::<RLEVTable>());
1218 Ok(())
1219 }
1220}