1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::LEGACY_SESSION;
10use vortex_array::ToCanonical;
11use vortex_array::VortexSessionExecute;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::Patched;
14use vortex_array::arrays::patched::use_experimental_patches;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::scalar::Scalar;
17use vortex_compressor::builtins::FloatDictScheme;
18use vortex_compressor::builtins::StringDictScheme;
19use vortex_compressor::estimate::CompressionEstimate;
20use vortex_compressor::estimate::DeferredEstimate;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_compressor::scheme::AncestorExclusion;
23use vortex_compressor::scheme::ChildSelection;
24use vortex_compressor::scheme::DescendantExclusion;
25#[cfg(feature = "unstable_encodings")]
26use vortex_compressor::scheme::SchemeId;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_err;
31use vortex_fastlanes::BitPacked;
32#[cfg(feature = "unstable_encodings")]
33use vortex_fastlanes::Delta;
34use vortex_fastlanes::FoR;
35use vortex_fastlanes::FoRArrayExt;
36use vortex_fastlanes::RLE;
37use vortex_fastlanes::RLEArrayExt;
38use vortex_fastlanes::bitpack_compress::bit_width_histogram;
39use vortex_fastlanes::bitpack_compress::bitpack_encode;
40use vortex_fastlanes::bitpack_compress::find_best_bit_width;
41use vortex_runend::RunEnd;
42use vortex_runend::compress::runend_encode;
43use vortex_sequence::sequence_encode;
44use vortex_sparse::Sparse;
45use vortex_zigzag::ZigZag;
46use vortex_zigzag::ZigZagArrayExt;
47use vortex_zigzag::zigzag_encode;
48
49use crate::ArrayAndStats;
50use crate::CascadingCompressor;
51use crate::CompressorContext;
52use crate::GenerateStatsOptions;
53use crate::Scheme;
54use crate::SchemeExt;
55use crate::compress_patches;
56use crate::schemes::rle_ancestor_exclusions;
57use crate::schemes::rle_descendant_exclusions;
58
59#[derive(Debug, Copy, Clone, PartialEq, Eq)]
61pub struct FoRScheme;
62
63#[derive(Debug, Copy, Clone, PartialEq, Eq)]
65pub struct ZigZagScheme;
66
67#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct BitPackingScheme;
70
71#[derive(Debug, Copy, Clone, PartialEq, Eq)]
73pub struct SparseScheme;
74
75#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub struct RunEndScheme;
78
79#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81pub struct SequenceScheme;
82
83#[cfg(feature = "pco")]
85#[derive(Debug, Copy, Clone, PartialEq, Eq)]
86pub struct PcoScheme;
87
88pub use vortex_compressor::builtins::IntConstantScheme;
90pub use vortex_compressor::builtins::IntDictScheme;
91pub use vortex_compressor::builtins::is_integer_primitive;
92pub use vortex_compressor::stats::IntegerStats;
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub struct IntRLEScheme;
97
98pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
100
101const RUN_END_THRESHOLD: u32 = 4;
103
104impl Scheme for FoRScheme {
105 fn scheme_name(&self) -> &'static str {
106 "vortex.int.for"
107 }
108
109 fn matches(&self, canonical: &Canonical) -> bool {
110 is_integer_primitive(canonical)
111 }
112
113 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
115 vec![
116 AncestorExclusion {
117 ancestor: IntDictScheme.id(),
118 children: ChildSelection::One(1),
119 },
120 AncestorExclusion {
121 ancestor: FloatDictScheme.id(),
122 children: ChildSelection::One(1),
123 },
124 AncestorExclusion {
125 ancestor: StringDictScheme.id(),
126 children: ChildSelection::One(1),
127 },
128 ]
129 }
130
131 fn expected_compression_ratio(
132 &self,
133 data: &mut ArrayAndStats,
134 ctx: CompressorContext,
135 ) -> CompressionEstimate {
136 if ctx.finished_cascading() {
139 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
140 }
141
142 let stats = data.integer_stats();
143
144 if stats.erased().min_is_zero() {
146 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
147 }
148
149 let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
151 Some(l) => l + 1,
152 None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
154 };
155
156 if let Some(max_log) = stats
160 .erased()
161 .max_ilog2()
162 .filter(|_| !stats.erased().min_is_negative())
164 {
165 let bitpack_bitwidth = max_log + 1;
166 if for_bitwidth >= bitpack_bitwidth {
167 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
168 }
169 }
170
171 let full_width: u32 = data
172 .array_as_primitive()
173 .ptype()
174 .bit_width()
175 .try_into()
176 .vortex_expect("bit width must fit in u32");
177
178 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
179 full_width as f64 / for_bitwidth as f64,
180 ))
181 }
182
183 fn compress(
184 &self,
185 compressor: &CascadingCompressor,
186 data: &mut ArrayAndStats,
187 ctx: CompressorContext,
188 ) -> VortexResult<ArrayRef> {
189 let primitive = data.array().to_primitive();
190 let for_array = FoR::encode(primitive)?;
191 let biased = for_array.encoded().to_primitive();
192
193 let leaf_ctx = ctx.clone().as_leaf();
198 let mut biased_data = ArrayAndStats::new(biased.into_array(), ctx.merged_stats_options());
199 let compressed = BitPackingScheme.compress(compressor, &mut biased_data, leaf_ctx)?;
200
201 let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
203 for_compressed
204 .as_ref()
205 .statistics()
206 .inherit_from(for_array.as_ref().statistics());
207
208 Ok(for_compressed.into_array())
209 }
210}
211
212impl Scheme for ZigZagScheme {
213 fn scheme_name(&self) -> &'static str {
214 "vortex.int.zigzag"
215 }
216
217 fn matches(&self, canonical: &Canonical) -> bool {
218 is_integer_primitive(canonical)
219 }
220
221 fn num_children(&self) -> usize {
223 1
224 }
225
226 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
230 vec![
231 DescendantExclusion {
232 excluded: IntDictScheme.id(),
233 children: ChildSelection::All,
234 },
235 DescendantExclusion {
236 excluded: RunEndScheme.id(),
237 children: ChildSelection::All,
238 },
239 DescendantExclusion {
240 excluded: SparseScheme.id(),
241 children: ChildSelection::All,
242 },
243 ]
244 }
245
246 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
248 vec![
249 AncestorExclusion {
250 ancestor: IntDictScheme.id(),
251 children: ChildSelection::One(1),
252 },
253 AncestorExclusion {
254 ancestor: FloatDictScheme.id(),
255 children: ChildSelection::One(1),
256 },
257 AncestorExclusion {
258 ancestor: StringDictScheme.id(),
259 children: ChildSelection::One(1),
260 },
261 ]
262 }
263
264 fn expected_compression_ratio(
265 &self,
266 data: &mut ArrayAndStats,
267 ctx: CompressorContext,
268 ) -> CompressionEstimate {
269 if ctx.finished_cascading() {
272 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
273 }
274
275 let stats = data.integer_stats();
276
277 if !stats.erased().min_is_negative() {
279 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
280 }
281
282 CompressionEstimate::Deferred(DeferredEstimate::Sample)
283 }
284
285 fn compress(
286 &self,
287 compressor: &CascadingCompressor,
288 data: &mut ArrayAndStats,
289 ctx: CompressorContext,
290 ) -> VortexResult<ArrayRef> {
291 let zag = zigzag_encode(data.array_as_primitive())?;
293 let encoded = zag.encoded().to_primitive();
294
295 let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?;
296
297 tracing::debug!("zigzag output: {}", compressed.encoding_id());
298
299 Ok(ZigZag::try_new(compressed)?.into_array())
300 }
301}
302
303impl Scheme for BitPackingScheme {
304 fn scheme_name(&self) -> &'static str {
305 "vortex.int.bitpacking"
306 }
307
308 fn matches(&self, canonical: &Canonical) -> bool {
309 is_integer_primitive(canonical)
310 }
311
312 fn expected_compression_ratio(
313 &self,
314 data: &mut ArrayAndStats,
315 _ctx: CompressorContext,
316 ) -> CompressionEstimate {
317 let stats = data.integer_stats();
318
319 if stats.erased().min_is_negative() {
321 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
322 }
323
324 CompressionEstimate::Deferred(DeferredEstimate::Sample)
325 }
326
327 fn compress(
328 &self,
329 _compressor: &CascadingCompressor,
330 data: &mut ArrayAndStats,
331 _ctx: CompressorContext,
332 ) -> VortexResult<ArrayRef> {
333 let primitive_array = data.array_as_primitive();
334
335 let histogram = bit_width_histogram(primitive_array)?;
336 let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
337
338 if bw as usize == primitive_array.ptype().bit_width() {
340 return Ok(primitive_array.array().clone());
341 }
342
343 let primitive_array = primitive_array.into_owned();
345 let packed = bitpack_encode(&primitive_array, bw, Some(&histogram))?;
346
347 let packed_stats = packed.statistics().to_owned();
348 let ptype = packed.dtype().as_ptype();
349 let mut parts = BitPacked::into_parts(packed);
350
351 let array = if use_experimental_patches() {
352 let patches = parts.patches.take();
353 let array = BitPacked::try_new(
355 parts.packed,
356 ptype,
357 parts.validity,
358 None,
359 parts.bit_width,
360 parts.len,
361 parts.offset,
362 )?
363 .into_array();
364
365 match patches {
366 None => array,
367 Some(p) => Patched::from_array_and_patches(
368 array,
369 &p,
370 &mut LEGACY_SESSION.create_execution_ctx(),
371 )?
372 .with_stats_set(packed_stats)
373 .into_array(),
374 }
375 } else {
376 let patches = parts.patches.take().map(compress_patches).transpose()?;
378 parts.patches = patches;
379 BitPacked::try_new(
380 parts.packed,
381 ptype,
382 parts.validity,
383 parts.patches,
384 parts.bit_width,
385 parts.len,
386 parts.offset,
387 )?
388 .with_stats_set(packed_stats)
389 .into_array()
390 };
391
392 Ok(array)
393 }
394}
395
396impl Scheme for SparseScheme {
397 fn scheme_name(&self) -> &'static str {
398 "vortex.int.sparse"
399 }
400
401 fn matches(&self, canonical: &Canonical) -> bool {
402 is_integer_primitive(canonical)
403 }
404
405 fn stats_options(&self) -> GenerateStatsOptions {
406 GenerateStatsOptions {
407 count_distinct_values: true,
408 }
409 }
410
411 fn num_children(&self) -> usize {
413 2
414 }
415
416 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
419 vec![
420 DescendantExclusion {
421 excluded: IntDictScheme.id(),
422 children: ChildSelection::One(1),
423 },
424 DescendantExclusion {
425 excluded: RunEndScheme.id(),
426 children: ChildSelection::One(1),
427 },
428 DescendantExclusion {
429 excluded: IntRLEScheme.id(),
430 children: ChildSelection::One(1),
431 },
432 DescendantExclusion {
433 excluded: SparseScheme.id(),
434 children: ChildSelection::One(1),
435 },
436 ]
437 }
438
439 fn expected_compression_ratio(
440 &self,
441 data: &mut ArrayAndStats,
442 _ctx: CompressorContext,
443 ) -> CompressionEstimate {
444 let len = data.array_len() as f64;
445 let stats = data.integer_stats();
446 let value_count = stats.value_count();
447
448 if value_count == 0 {
450 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
451 }
452
453 if stats.null_count() as f64 / len > 0.9 {
455 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
456 }
457
458 let (_, most_frequent_count) = stats
459 .erased()
460 .most_frequent_value_and_count()
461 .vortex_expect(
462 "this must be present since `SparseScheme` declared that we need distinct values",
463 );
464
465 if most_frequent_count == value_count {
467 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
468 }
469 debug_assert!(value_count > most_frequent_count);
470
471 let freq = most_frequent_count as f64 / value_count as f64;
473 if freq < 0.9 {
474 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
475 }
476
477 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
479 value_count as f64 / (value_count - most_frequent_count) as f64,
480 ))
481 }
482
483 fn compress(
484 &self,
485 compressor: &CascadingCompressor,
486 data: &mut ArrayAndStats,
487 ctx: CompressorContext,
488 ) -> VortexResult<ArrayRef> {
489 let len = data.array_len();
490 let stats = data.integer_stats().clone();
492 let array = data.array();
493
494 let (most_frequent_value, most_frequent_count) = stats
495 .erased()
496 .most_frequent_value_and_count()
497 .vortex_expect(
498 "this must be present since `SparseScheme` declared that we need distinct values",
499 );
500
501 if most_frequent_count as usize == len {
502 return Ok(ConstantArray::new(
504 Scalar::primitive_value(
505 most_frequent_value,
506 most_frequent_value.ptype(),
507 array.dtype().nullability(),
508 ),
509 len,
510 )
511 .into_array());
512 }
513
514 let sparse_encoded = Sparse::encode(
515 array,
516 Some(Scalar::primitive_value(
517 most_frequent_value,
518 most_frequent_value.ptype(),
519 array.dtype().nullability(),
520 )),
521 )?;
522
523 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
524 let compressed_values = compressor.compress_child(
525 &sparse.patches().values().to_primitive().into_array(),
526 &ctx,
527 self.id(),
528 0,
529 )?;
530
531 let indices = sparse.patches().indices().to_primitive().narrow()?;
532
533 let compressed_indices =
534 compressor.compress_child(&indices.into_array(), &ctx, self.id(), 1)?;
535
536 Sparse::try_new(
537 compressed_indices,
538 compressed_values,
539 sparse.len(),
540 sparse.fill_scalar().clone(),
541 )
542 .map(|a| a.into_array())
543 } else {
544 Ok(sparse_encoded)
545 }
546 }
547}
548
549impl Scheme for RunEndScheme {
550 fn scheme_name(&self) -> &'static str {
551 "vortex.int.runend"
552 }
553
554 fn matches(&self, canonical: &Canonical) -> bool {
555 is_integer_primitive(canonical)
556 }
557
558 fn num_children(&self) -> usize {
560 2
561 }
562
563 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
566 vec![
567 DescendantExclusion {
568 excluded: IntDictScheme.id(),
569 children: ChildSelection::One(1),
570 },
571 DescendantExclusion {
572 excluded: RunEndScheme.id(),
573 children: ChildSelection::One(1),
574 },
575 DescendantExclusion {
576 excluded: IntRLEScheme.id(),
577 children: ChildSelection::One(1),
578 },
579 DescendantExclusion {
580 excluded: SparseScheme.id(),
581 children: ChildSelection::One(1),
582 },
583 ]
584 }
585
586 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
589 vec![
590 AncestorExclusion {
591 ancestor: IntDictScheme.id(),
592 children: ChildSelection::One(0),
593 },
594 AncestorExclusion {
595 ancestor: FloatDictScheme.id(),
596 children: ChildSelection::One(0),
597 },
598 AncestorExclusion {
599 ancestor: StringDictScheme.id(),
600 children: ChildSelection::One(0),
601 },
602 ]
603 }
604
605 fn expected_compression_ratio(
606 &self,
607 data: &mut ArrayAndStats,
608 _ctx: CompressorContext,
609 ) -> CompressionEstimate {
610 if data.integer_stats().average_run_length() < RUN_END_THRESHOLD {
612 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
613 }
614
615 CompressionEstimate::Deferred(DeferredEstimate::Sample)
616 }
617
618 fn compress(
619 &self,
620 compressor: &CascadingCompressor,
621 data: &mut ArrayAndStats,
622 ctx: CompressorContext,
623 ) -> VortexResult<ArrayRef> {
624 let (ends, values) = runend_encode(data.array_as_primitive());
626
627 let compressed_values =
628 compressor.compress_child(&values.to_primitive().into_array(), &ctx, self.id(), 0)?;
629
630 let compressed_ends = compressor.compress_child(&ends.into_array(), &ctx, self.id(), 1)?;
631
632 Ok(unsafe {
634 RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
635 .into_array()
636 })
637 }
638}
639
640impl Scheme for SequenceScheme {
641 fn scheme_name(&self) -> &'static str {
642 "vortex.int.sequence"
643 }
644
645 fn matches(&self, canonical: &Canonical) -> bool {
646 is_integer_primitive(canonical)
647 }
648
649 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
653 vec![
654 AncestorExclusion {
655 ancestor: IntDictScheme.id(),
656 children: ChildSelection::One(1),
657 },
658 AncestorExclusion {
659 ancestor: FloatDictScheme.id(),
660 children: ChildSelection::One(1),
661 },
662 AncestorExclusion {
663 ancestor: StringDictScheme.id(),
664 children: ChildSelection::One(1),
665 },
666 ]
667 }
668
669 fn expected_compression_ratio(
670 &self,
671 data: &mut ArrayAndStats,
672 ctx: CompressorContext,
673 ) -> CompressionEstimate {
674 if ctx.is_sample() {
677 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
678 }
679
680 let stats = data.integer_stats();
681
682 if stats.null_count() > 0 {
684 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
685 }
686
687 if stats
690 .distinct_count()
691 .is_some_and(|count| count as usize != data.array_len())
692 {
693 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
694 }
695
696 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
700 |_compressor, data, _ctx| {
701 let Some(encoded) = sequence_encode(data.array_as_primitive())? else {
702 return Ok(EstimateVerdict::Skip);
704 };
705
706 Ok(EstimateVerdict::Ratio(encoded.len() as f64 / 2.0))
710 },
711 )))
712 }
713
714 fn compress(
715 &self,
716 _compressor: &CascadingCompressor,
717 data: &mut ArrayAndStats,
718 _ctx: CompressorContext,
719 ) -> VortexResult<ArrayRef> {
720 let stats = data.integer_stats();
721
722 if stats.null_count() > 0 {
723 vortex_bail!("sequence encoding does not support nulls");
724 }
725 sequence_encode(data.array_as_primitive())?
726 .ok_or_else(|| vortex_err!("cannot sequence encode array"))
727 }
728}
729
730#[cfg(feature = "pco")]
731impl Scheme for PcoScheme {
732 fn scheme_name(&self) -> &'static str {
733 "vortex.int.pco"
734 }
735
736 fn matches(&self, canonical: &Canonical) -> bool {
737 is_integer_primitive(canonical)
738 }
739
740 fn expected_compression_ratio(
741 &self,
742 data: &mut ArrayAndStats,
743 _ctx: CompressorContext,
744 ) -> CompressionEstimate {
745 use vortex_array::dtype::PType;
746
747 if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
749 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
750 }
751
752 CompressionEstimate::Deferred(DeferredEstimate::Sample)
753 }
754
755 fn compress(
756 &self,
757 _compressor: &CascadingCompressor,
758 data: &mut ArrayAndStats,
759 _ctx: CompressorContext,
760 ) -> VortexResult<ArrayRef> {
761 Ok(vortex_pco::Pco::from_primitive(
762 data.array_as_primitive(),
763 pco::DEFAULT_COMPRESSION_LEVEL,
764 8192,
765 )?
766 .into_array())
767 }
768}
769
770pub(crate) fn rle_compress(
772 scheme: &dyn Scheme,
773 compressor: &CascadingCompressor,
774 data: &mut ArrayAndStats,
775 ctx: CompressorContext,
776) -> VortexResult<ArrayRef> {
777 let rle_array = RLE::encode(data.array_as_primitive())?;
778
779 let compressed_values = compressor.compress_child(
780 &rle_array.values().to_primitive().into_array(),
781 &ctx,
782 scheme.id(),
783 0,
784 )?;
785
786 #[cfg(feature = "unstable_encodings")]
788 let compressed_indices = try_compress_delta(
789 compressor,
790 &rle_array.indices().to_primitive().narrow()?.into_array(),
791 &ctx,
792 scheme.id(),
793 1,
794 )?;
795
796 #[cfg(not(feature = "unstable_encodings"))]
797 let compressed_indices = compressor.compress_child(
798 &rle_array.indices().to_primitive().narrow()?.into_array(),
799 &ctx,
800 scheme.id(),
801 1,
802 )?;
803
804 let compressed_offsets = compressor.compress_child(
805 &rle_array
806 .values_idx_offsets()
807 .to_primitive()
808 .narrow()?
809 .into_array(),
810 &ctx,
811 scheme.id(),
812 2,
813 )?;
814
815 unsafe {
817 Ok(RLE::new_unchecked(
818 compressed_values,
819 compressed_indices,
820 compressed_offsets,
821 rle_array.offset(),
822 rle_array.len(),
823 )
824 .into_array())
825 }
826}
827
828#[cfg(feature = "unstable_encodings")]
829fn try_compress_delta(
830 compressor: &CascadingCompressor,
831 child: &ArrayRef,
832 parent_ctx: &CompressorContext,
833 parent_id: SchemeId,
834 child_index: usize,
835) -> VortexResult<ArrayRef> {
836 let (bases, deltas) =
837 vortex_fastlanes::delta_compress(&child.to_primitive(), &mut compressor.execution_ctx())?;
838
839 let compressed_bases =
840 compressor.compress_child(&bases.into_array(), parent_ctx, parent_id, child_index)?;
841 let compressed_deltas =
842 compressor.compress_child(&deltas.into_array(), parent_ctx, parent_id, child_index)?;
843
844 Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
845}
846
847impl Scheme for IntRLEScheme {
848 fn scheme_name(&self) -> &'static str {
849 "vortex.int.rle"
850 }
851
852 fn matches(&self, canonical: &Canonical) -> bool {
853 is_integer_primitive(canonical)
854 }
855
856 fn num_children(&self) -> usize {
858 3
859 }
860
861 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
862 rle_descendant_exclusions()
863 }
864
865 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
866 rle_ancestor_exclusions()
867 }
868
869 fn expected_compression_ratio(
870 &self,
871 data: &mut ArrayAndStats,
872 ctx: CompressorContext,
873 ) -> CompressionEstimate {
874 if ctx.finished_cascading() {
876 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
877 }
878
879 if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD {
880 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
881 }
882
883 CompressionEstimate::Deferred(DeferredEstimate::Sample)
884 }
885
886 fn compress(
887 &self,
888 compressor: &CascadingCompressor,
889 data: &mut ArrayAndStats,
890 ctx: CompressorContext,
891 ) -> VortexResult<ArrayRef> {
892 rle_compress(self, compressor, data, ctx)
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use std::iter;
899
900 use itertools::Itertools;
901 use rand::Rng;
902 use rand::SeedableRng;
903 use rand::rngs::StdRng;
904 use vortex_array::IntoArray;
905 use vortex_array::arrays::Constant;
906 use vortex_array::arrays::Dict;
907 use vortex_array::arrays::Masked;
908 use vortex_array::arrays::PrimitiveArray;
909 use vortex_array::assert_arrays_eq;
910 use vortex_array::validity::Validity;
911 use vortex_buffer::Buffer;
912 use vortex_buffer::BufferMut;
913 use vortex_buffer::buffer;
914 use vortex_compressor::CascadingCompressor;
915 use vortex_error::VortexResult;
916 use vortex_fastlanes::RLE;
917 use vortex_sequence::Sequence;
918
919 use crate::BtrBlocksCompressor;
920 use crate::schemes::integer::IntRLEScheme;
921
922 #[test]
923 fn test_empty() -> VortexResult<()> {
924 let btr = BtrBlocksCompressor::default();
926 let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
927 let result = btr.compress(&array.into_array())?;
928
929 assert!(result.is_empty());
930 Ok(())
931 }
932
933 #[test]
934 fn test_dict_encodable() -> VortexResult<()> {
935 let mut codes = BufferMut::<i32>::with_capacity(65_535);
936 let numbers = [0, 10, 50, 100, 1000, 3000]
940 .into_iter()
941 .map(|i| 12340 * i) .collect_vec();
943
944 let mut rng = StdRng::seed_from_u64(1u64);
945 while codes.len() < 64000 {
946 let run_length = rng.next_u32() % 5;
947 let value = numbers[rng.next_u32() as usize % numbers.len()];
948 for _ in 0..run_length {
949 codes.push(value);
950 }
951 }
952
953 let btr = BtrBlocksCompressor::default();
954 let compressed = btr.compress(&codes.freeze().into_array())?;
955 assert!(compressed.is::<Dict>());
956 Ok(())
957 }
958
959 #[test]
960 fn constant_mostly_nulls() -> VortexResult<()> {
961 let array = PrimitiveArray::new(
962 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
963 Validity::from_iter(vec![
964 false, false, false, false, false, false, false, false, false, false, true,
965 ]),
966 );
967 let validity = array.validity()?;
968
969 let btr = BtrBlocksCompressor::default();
970 let compressed = btr.compress(&array.into_array())?;
971
972 assert!(compressed.is::<Masked>());
973 assert!(compressed.children()[0].is::<Constant>());
974
975 let decoded = compressed;
976 let expected =
977 PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
978 assert_arrays_eq!(decoded, expected);
979 Ok(())
980 }
981
982 #[test]
983 fn nullable_sequence() -> VortexResult<()> {
984 let values = (0i32..20).step_by(7).collect_vec();
985 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
986
987 let btr = BtrBlocksCompressor::default();
988 let compressed = btr.compress(&array.into_array())?;
989 assert!(compressed.is::<Sequence>());
990
991 let decoded = compressed;
992 let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
993 assert_arrays_eq!(decoded, expected);
994 Ok(())
995 }
996
997 #[test]
998 fn test_rle_compression() -> VortexResult<()> {
999 let mut values = Vec::new();
1000 values.extend(iter::repeat_n(42i32, 100));
1001 values.extend(iter::repeat_n(123i32, 200));
1002 values.extend(iter::repeat_n(987i32, 150));
1003
1004 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1005 let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
1006 let compressed = compressor.compress(&array.into_array())?;
1007 assert!(compressed.is::<RLE>());
1008
1009 let expected = Buffer::copy_from(&values).into_array();
1010 assert_arrays_eq!(compressed, expected);
1011 Ok(())
1012 }
1013
1014 #[test_with::env(CI)]
1015 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1016 fn compress_large_int() -> VortexResult<()> {
1017 const NUM_LISTS: usize = 10_000;
1018 const ELEMENTS_PER_LIST: usize = 5_000;
1019
1020 let prim = (0..NUM_LISTS)
1021 .flat_map(|list_idx| {
1022 (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1023 })
1024 .collect::<PrimitiveArray>()
1025 .into_array();
1026
1027 let btr = BtrBlocksCompressor::default();
1028 btr.compress(&prim)?;
1029
1030 Ok(())
1031 }
1032}
1033
1034#[cfg(test)]
1036mod scheme_selection_tests {
1037 use std::iter;
1038
1039 use rand::Rng;
1040 use rand::SeedableRng;
1041 use rand::rngs::StdRng;
1042 use vortex_array::IntoArray;
1043 use vortex_array::arrays::Constant;
1044 use vortex_array::arrays::Dict;
1045 use vortex_array::arrays::PrimitiveArray;
1046 use vortex_array::expr::stats::Precision;
1047 use vortex_array::expr::stats::Stat;
1048 use vortex_array::expr::stats::StatsProviderExt;
1049 use vortex_array::validity::Validity;
1050 use vortex_buffer::Buffer;
1051 use vortex_error::VortexResult;
1052 use vortex_fastlanes::BitPacked;
1053 use vortex_fastlanes::FoR;
1054 use vortex_runend::RunEnd;
1055 use vortex_sequence::Sequence;
1056 use vortex_sparse::Sparse;
1057
1058 use crate::BtrBlocksCompressor;
1059
1060 #[test]
1061 fn test_constant_compressed() -> VortexResult<()> {
1062 let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1063 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1064 let btr = BtrBlocksCompressor::default();
1065 let compressed = btr.compress(&array.into_array())?;
1066 assert!(compressed.is::<Constant>());
1067 Ok(())
1068 }
1069
1070 #[test]
1071 fn test_for_compressed() -> VortexResult<()> {
1072 let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1073 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1074 let btr = BtrBlocksCompressor::default();
1075 let compressed = btr.compress(&array.into_array())?;
1076 assert!(compressed.is::<FoR>());
1077 Ok(())
1078 }
1079
1080 #[test]
1081 fn test_bitpacking_compressed() -> VortexResult<()> {
1082 let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1083 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1084 let btr = BtrBlocksCompressor::default();
1085 let compressed = btr.compress(&array.into_array())?;
1086 assert!(compressed.is::<BitPacked>());
1087 assert_eq!(
1088 compressed.statistics().get_as::<u64>(Stat::NullCount),
1089 Some(Precision::exact(0u64))
1090 );
1091 assert_eq!(
1092 compressed.statistics().get_as::<u32>(Stat::Min),
1093 Some(Precision::exact(0u32))
1094 );
1095 assert_eq!(
1096 compressed.statistics().get_as::<u32>(Stat::Max),
1097 Some(Precision::exact(15u32))
1098 );
1099 Ok(())
1100 }
1101
1102 #[test]
1103 fn test_sparse_compressed() -> VortexResult<()> {
1104 let mut values: Vec<i32> = Vec::new();
1105 for i in 0..1000 {
1106 if i % 20 == 0 {
1107 values.push(2_000_000 + (i * 7) % 1000);
1108 } else {
1109 values.push(1_000_000);
1110 }
1111 }
1112 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1113 let btr = BtrBlocksCompressor::default();
1114 let compressed = btr.compress(&array.into_array())?;
1115 assert!(compressed.is::<Sparse>());
1116 Ok(())
1117 }
1118
1119 #[test]
1120 fn test_dict_compressed() -> VortexResult<()> {
1121 let mut codes = Vec::with_capacity(65_535);
1122 let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1123 .into_iter()
1124 .map(|i| 12340 * i) .collect();
1126
1127 let mut rng = StdRng::seed_from_u64(1u64);
1128 while codes.len() < 64000 {
1129 let run_length = rng.next_u32() % 5;
1130 let value = numbers[rng.next_u32() as usize % numbers.len()];
1131 for _ in 0..run_length {
1132 codes.push(value);
1133 }
1134 }
1135
1136 let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1137 let btr = BtrBlocksCompressor::default();
1138 let compressed = btr.compress(&array.into_array())?;
1139 assert!(compressed.is::<Dict>());
1140 Ok(())
1141 }
1142
1143 #[test]
1144 fn test_runend_compressed() -> VortexResult<()> {
1145 let mut values: Vec<i32> = Vec::new();
1146 for i in 0..100 {
1147 values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1148 }
1149 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1150 let btr = BtrBlocksCompressor::default();
1151 let compressed = btr.compress(&array.into_array())?;
1152 assert!(compressed.is::<RunEnd>());
1153 Ok(())
1154 }
1155
1156 #[test]
1157 fn test_sequence_compressed() -> VortexResult<()> {
1158 let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1159 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1160 let btr = BtrBlocksCompressor::default();
1161 let compressed = btr.compress(&array.into_array())?;
1162 assert!(compressed.is::<Sequence>());
1163 Ok(())
1164 }
1165
1166 #[test]
1167 fn test_rle_compressed() -> VortexResult<()> {
1168 let mut values: Vec<i32> = Vec::new();
1169 for i in 0..1024 {
1170 values.extend(iter::repeat_n(i, 10));
1171 }
1172 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1173 let btr = BtrBlocksCompressor::default();
1174 let compressed = btr.compress(&array.into_array())?;
1175 eprintln!("{}", compressed.display_tree());
1176 assert!(compressed.is::<RunEnd>());
1177 Ok(())
1178 }
1179}