1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Patched;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::patched::use_experimental_patches;
14use vortex_array::arrays::primitive::PrimitiveArrayExt;
15use vortex_array::scalar::Scalar;
16use vortex_compressor::builtins::FloatDictScheme;
17use vortex_compressor::builtins::StringDictScheme;
18use vortex_compressor::estimate::CompressionEstimate;
19use vortex_compressor::estimate::DeferredEstimate;
20use vortex_compressor::estimate::EstimateScore;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_compressor::scheme::AncestorExclusion;
23use vortex_compressor::scheme::ChildSelection;
24use vortex_compressor::scheme::DescendantExclusion;
25#[cfg(feature = "unstable_encodings")]
26use vortex_compressor::scheme::SchemeId;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_err;
31use vortex_fastlanes::BitPacked;
32#[cfg(feature = "unstable_encodings")]
33use vortex_fastlanes::Delta;
34use vortex_fastlanes::FoR;
35use vortex_fastlanes::FoRArrayExt;
36use vortex_fastlanes::RLE;
37use vortex_fastlanes::RLEArrayExt;
38use vortex_fastlanes::bitpack_compress::bit_width_histogram;
39use vortex_fastlanes::bitpack_compress::bitpack_encode;
40use vortex_fastlanes::bitpack_compress::find_best_bit_width;
41use vortex_runend::RunEnd;
42use vortex_runend::compress::runend_encode;
43use vortex_sequence::sequence_encode;
44use vortex_sparse::Sparse;
45use vortex_zigzag::ZigZag;
46use vortex_zigzag::ZigZagArrayExt;
47use vortex_zigzag::zigzag_encode;
48
49use crate::ArrayAndStats;
50use crate::CascadingCompressor;
51use crate::CompressorContext;
52use crate::GenerateStatsOptions;
53use crate::Scheme;
54use crate::SchemeExt;
55use crate::compress_patches;
56use crate::schemes::rle_ancestor_exclusions;
57use crate::schemes::rle_descendant_exclusions;
58
59#[derive(Debug, Copy, Clone, PartialEq, Eq)]
61pub struct FoRScheme;
62
63#[derive(Debug, Copy, Clone, PartialEq, Eq)]
65pub struct ZigZagScheme;
66
67#[derive(Debug, Copy, Clone, PartialEq, Eq)]
69pub struct BitPackingScheme;
70
71#[derive(Debug, Copy, Clone, PartialEq, Eq)]
73pub struct SparseScheme;
74
75#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub struct RunEndScheme;
78
79#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81pub struct SequenceScheme;
82
83#[cfg(feature = "pco")]
85#[derive(Debug, Copy, Clone, PartialEq, Eq)]
86pub struct PcoScheme;
87
88pub use vortex_compressor::builtins::IntConstantScheme;
90pub use vortex_compressor::builtins::IntDictScheme;
91pub use vortex_compressor::builtins::is_integer_primitive;
92pub use vortex_compressor::stats::IntegerStats;
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub struct IntRLEScheme;
97
98pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
100
101const RUN_END_THRESHOLD: u32 = 4;
103
104impl Scheme for FoRScheme {
105 fn scheme_name(&self) -> &'static str {
106 "vortex.int.for"
107 }
108
109 fn matches(&self, canonical: &Canonical) -> bool {
110 is_integer_primitive(canonical)
111 }
112
113 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
115 vec![
116 AncestorExclusion {
117 ancestor: IntDictScheme.id(),
118 children: ChildSelection::One(1),
119 },
120 AncestorExclusion {
121 ancestor: FloatDictScheme.id(),
122 children: ChildSelection::One(1),
123 },
124 AncestorExclusion {
125 ancestor: StringDictScheme.id(),
126 children: ChildSelection::One(1),
127 },
128 ]
129 }
130
131 fn expected_compression_ratio(
132 &self,
133 data: &ArrayAndStats,
134 compress_ctx: CompressorContext,
135 exec_ctx: &mut ExecutionCtx,
136 ) -> CompressionEstimate {
137 if compress_ctx.finished_cascading() {
140 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
141 }
142 let stats = data.integer_stats(exec_ctx);
143
144 if stats.erased().min_is_zero() {
146 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
147 }
148
149 let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
151 Some(l) => l + 1,
152 None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
154 };
155
156 if let Some(max_log) = stats
160 .erased()
161 .max_ilog2()
162 .filter(|_| !stats.erased().min_is_negative())
164 {
165 let bitpack_bitwidth = max_log + 1;
166 if for_bitwidth >= bitpack_bitwidth {
167 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
168 }
169 }
170
171 let full_width: u32 = data
172 .array_as_primitive()
173 .ptype()
174 .bit_width()
175 .try_into()
176 .vortex_expect("bit width must fit in u32");
177
178 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
179 full_width as f64 / for_bitwidth as f64,
180 ))
181 }
182
183 fn compress(
184 &self,
185 compressor: &CascadingCompressor,
186 data: &ArrayAndStats,
187 compress_ctx: CompressorContext,
188 exec_ctx: &mut ExecutionCtx,
189 ) -> VortexResult<ArrayRef> {
190 let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
191 let for_array = FoR::encode(primitive)?;
192 let biased = for_array
193 .encoded()
194 .clone()
195 .execute::<PrimitiveArray>(exec_ctx)?;
196
197 let leaf_ctx = compress_ctx.clone().as_leaf();
202 let biased_data =
203 ArrayAndStats::new(biased.into_array(), compress_ctx.merged_stats_options());
204 let compressed = BitPackingScheme.compress(compressor, &biased_data, leaf_ctx, exec_ctx)?;
205
206 let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
208 for_compressed
209 .as_ref()
210 .statistics()
211 .inherit_from(for_array.as_ref().statistics());
212
213 Ok(for_compressed.into_array())
214 }
215}
216
217impl Scheme for ZigZagScheme {
218 fn scheme_name(&self) -> &'static str {
219 "vortex.int.zigzag"
220 }
221
222 fn matches(&self, canonical: &Canonical) -> bool {
223 is_integer_primitive(canonical)
224 }
225
226 fn num_children(&self) -> usize {
228 1
229 }
230
231 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
235 vec![
236 DescendantExclusion {
237 excluded: IntDictScheme.id(),
238 children: ChildSelection::All,
239 },
240 DescendantExclusion {
241 excluded: RunEndScheme.id(),
242 children: ChildSelection::All,
243 },
244 DescendantExclusion {
245 excluded: SparseScheme.id(),
246 children: ChildSelection::All,
247 },
248 ]
249 }
250
251 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
253 vec![
254 AncestorExclusion {
255 ancestor: IntDictScheme.id(),
256 children: ChildSelection::One(1),
257 },
258 AncestorExclusion {
259 ancestor: FloatDictScheme.id(),
260 children: ChildSelection::One(1),
261 },
262 AncestorExclusion {
263 ancestor: StringDictScheme.id(),
264 children: ChildSelection::One(1),
265 },
266 ]
267 }
268
269 fn expected_compression_ratio(
270 &self,
271 data: &ArrayAndStats,
272 compress_ctx: CompressorContext,
273 exec_ctx: &mut ExecutionCtx,
274 ) -> CompressionEstimate {
275 if compress_ctx.finished_cascading() {
278 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
279 }
280 let stats = data.integer_stats(exec_ctx);
281
282 if !stats.erased().min_is_negative() {
284 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
285 }
286
287 CompressionEstimate::Deferred(DeferredEstimate::Sample)
288 }
289
290 fn compress(
291 &self,
292 compressor: &CascadingCompressor,
293 data: &ArrayAndStats,
294 compress_ctx: CompressorContext,
295 exec_ctx: &mut ExecutionCtx,
296 ) -> VortexResult<ArrayRef> {
297 let zag = zigzag_encode(data.array_as_primitive())?;
299 let encoded = zag.encoded().clone().execute::<PrimitiveArray>(exec_ctx)?;
300
301 let compressed = compressor.compress_child(
302 &encoded.into_array(),
303 &compress_ctx,
304 self.id(),
305 0,
306 exec_ctx,
307 )?;
308
309 Ok(ZigZag::try_new(compressed)?.into_array())
310 }
311}
312
313impl Scheme for BitPackingScheme {
314 fn scheme_name(&self) -> &'static str {
315 "vortex.int.bitpacking"
316 }
317
318 fn matches(&self, canonical: &Canonical) -> bool {
319 is_integer_primitive(canonical)
320 }
321
322 fn expected_compression_ratio(
323 &self,
324 data: &ArrayAndStats,
325 _compress_ctx: CompressorContext,
326 exec_ctx: &mut ExecutionCtx,
327 ) -> CompressionEstimate {
328 let stats = data.integer_stats(exec_ctx);
329
330 if stats.erased().min_is_negative() {
332 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
333 }
334
335 CompressionEstimate::Deferred(DeferredEstimate::Sample)
336 }
337
338 fn compress(
339 &self,
340 _compressor: &CascadingCompressor,
341 data: &ArrayAndStats,
342 _compress_ctx: CompressorContext,
343 exec_ctx: &mut ExecutionCtx,
344 ) -> VortexResult<ArrayRef> {
345 let primitive_array = data.array_as_primitive();
346
347 let histogram = bit_width_histogram(primitive_array, exec_ctx)?;
348 let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
349
350 if bw as usize == primitive_array.ptype().bit_width() {
352 return Ok(primitive_array.array().clone());
353 }
354
355 let primitive_array = primitive_array.into_owned();
357 let packed = bitpack_encode(&primitive_array, bw, Some(&histogram), exec_ctx)?;
358
359 let packed_stats = packed.statistics().to_owned();
360 let ptype = packed.dtype().as_ptype();
361 let mut parts = BitPacked::into_parts(packed);
362
363 let array = if use_experimental_patches() {
364 let patches = parts.patches.take();
365 let array = BitPacked::try_new(
367 parts.packed,
368 ptype,
369 parts.validity,
370 None,
371 parts.bit_width,
372 parts.len,
373 parts.offset,
374 )?
375 .into_array();
376
377 match patches {
378 None => array,
379 Some(p) => Patched::from_array_and_patches(array, &p, exec_ctx)?
380 .with_stats_set(packed_stats)
381 .into_array(),
382 }
383 } else {
384 let patches = parts
386 .patches
387 .take()
388 .map(|p| compress_patches(p, exec_ctx))
389 .transpose()?;
390 parts.patches = patches;
391 BitPacked::try_new(
392 parts.packed,
393 ptype,
394 parts.validity,
395 parts.patches,
396 parts.bit_width,
397 parts.len,
398 parts.offset,
399 )?
400 .with_stats_set(packed_stats)
401 .into_array()
402 };
403
404 Ok(array)
405 }
406}
407
408impl Scheme for SparseScheme {
409 fn scheme_name(&self) -> &'static str {
410 "vortex.int.sparse"
411 }
412
413 fn matches(&self, canonical: &Canonical) -> bool {
414 is_integer_primitive(canonical)
415 }
416
417 fn stats_options(&self) -> GenerateStatsOptions {
418 GenerateStatsOptions {
419 count_distinct_values: true,
420 }
421 }
422
423 fn num_children(&self) -> usize {
425 2
426 }
427
428 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
431 vec![
432 DescendantExclusion {
433 excluded: IntDictScheme.id(),
434 children: ChildSelection::One(1),
435 },
436 DescendantExclusion {
437 excluded: RunEndScheme.id(),
438 children: ChildSelection::One(1),
439 },
440 DescendantExclusion {
441 excluded: IntRLEScheme.id(),
442 children: ChildSelection::One(1),
443 },
444 DescendantExclusion {
445 excluded: SparseScheme.id(),
446 children: ChildSelection::One(1),
447 },
448 ]
449 }
450
451 fn expected_compression_ratio(
452 &self,
453 data: &ArrayAndStats,
454 _compress_ctx: CompressorContext,
455 exec_ctx: &mut ExecutionCtx,
456 ) -> CompressionEstimate {
457 let len = data.array_len() as f64;
458 let stats = data.integer_stats(exec_ctx);
459 let value_count = stats.value_count();
460
461 if value_count == 0 {
463 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
464 }
465
466 if stats.null_count() as f64 / len > 0.9 {
468 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
469 }
470
471 let (_, most_frequent_count) = stats
472 .erased()
473 .most_frequent_value_and_count()
474 .vortex_expect(
475 "this must be present since `SparseScheme` declared that we need distinct values",
476 );
477
478 if most_frequent_count == value_count {
480 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
481 }
482 debug_assert!(value_count > most_frequent_count);
483
484 let freq = most_frequent_count as f64 / value_count as f64;
486 if freq < 0.9 {
487 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
488 }
489
490 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
492 value_count as f64 / (value_count - most_frequent_count) as f64,
493 ))
494 }
495
496 fn compress(
497 &self,
498 compressor: &CascadingCompressor,
499 data: &ArrayAndStats,
500 compress_ctx: CompressorContext,
501 exec_ctx: &mut ExecutionCtx,
502 ) -> VortexResult<ArrayRef> {
503 let len = data.array_len();
504 let stats = data.integer_stats(exec_ctx);
505 let array = data.array();
506
507 let (most_frequent_value, most_frequent_count) = stats
508 .erased()
509 .most_frequent_value_and_count()
510 .vortex_expect(
511 "this must be present since `SparseScheme` declared that we need distinct values",
512 );
513
514 if most_frequent_count as usize == len {
515 return Ok(ConstantArray::new(
517 Scalar::primitive_value(
518 most_frequent_value,
519 most_frequent_value.ptype(),
520 array.dtype().nullability(),
521 ),
522 len,
523 )
524 .into_array());
525 }
526
527 let sparse_encoded = Sparse::encode(
528 array,
529 Some(Scalar::primitive_value(
530 most_frequent_value,
531 most_frequent_value.ptype(),
532 array.dtype().nullability(),
533 )),
534 exec_ctx,
535 )?;
536
537 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
538 let sparse_values_primitive = sparse
539 .patches()
540 .values()
541 .clone()
542 .execute::<PrimitiveArray>(exec_ctx)?;
543 let compressed_values = compressor.compress_child(
544 &sparse_values_primitive.into_array(),
545 &compress_ctx,
546 self.id(),
547 0,
548 exec_ctx,
549 )?;
550
551 let indices = sparse
552 .patches()
553 .indices()
554 .clone()
555 .execute::<PrimitiveArray>(exec_ctx)?
556 .narrow()?;
557
558 let compressed_indices = compressor.compress_child(
559 &indices.into_array(),
560 &compress_ctx,
561 self.id(),
562 1,
563 exec_ctx,
564 )?;
565
566 Sparse::try_new(
567 compressed_indices,
568 compressed_values,
569 sparse.len(),
570 sparse.fill_scalar().clone(),
571 )
572 .map(|a| a.into_array())
573 } else {
574 Ok(sparse_encoded)
575 }
576 }
577}
578
579impl Scheme for RunEndScheme {
580 fn scheme_name(&self) -> &'static str {
581 "vortex.int.runend"
582 }
583
584 fn matches(&self, canonical: &Canonical) -> bool {
585 is_integer_primitive(canonical)
586 }
587
588 fn num_children(&self) -> usize {
590 2
591 }
592
593 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
596 vec![
597 DescendantExclusion {
598 excluded: IntDictScheme.id(),
599 children: ChildSelection::One(1),
600 },
601 DescendantExclusion {
602 excluded: RunEndScheme.id(),
603 children: ChildSelection::One(1),
604 },
605 DescendantExclusion {
606 excluded: IntRLEScheme.id(),
607 children: ChildSelection::One(1),
608 },
609 DescendantExclusion {
610 excluded: SparseScheme.id(),
611 children: ChildSelection::One(1),
612 },
613 ]
614 }
615
616 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
619 vec![
620 AncestorExclusion {
621 ancestor: IntDictScheme.id(),
622 children: ChildSelection::One(0),
623 },
624 AncestorExclusion {
625 ancestor: FloatDictScheme.id(),
626 children: ChildSelection::One(0),
627 },
628 AncestorExclusion {
629 ancestor: StringDictScheme.id(),
630 children: ChildSelection::One(0),
631 },
632 ]
633 }
634
635 fn expected_compression_ratio(
636 &self,
637 data: &ArrayAndStats,
638 _compress_ctx: CompressorContext,
639 exec_ctx: &mut ExecutionCtx,
640 ) -> CompressionEstimate {
641 if data.integer_stats(exec_ctx).average_run_length() < RUN_END_THRESHOLD {
643 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
644 }
645
646 CompressionEstimate::Deferred(DeferredEstimate::Sample)
647 }
648
649 fn compress(
650 &self,
651 compressor: &CascadingCompressor,
652 data: &ArrayAndStats,
653 compress_ctx: CompressorContext,
654 exec_ctx: &mut ExecutionCtx,
655 ) -> VortexResult<ArrayRef> {
656 let (ends, values) = runend_encode(data.array_as_primitive(), exec_ctx);
658
659 let values_primitive = values.execute::<PrimitiveArray>(exec_ctx)?;
660 let compressed_values = compressor.compress_child(
661 &values_primitive.into_array(),
662 &compress_ctx,
663 self.id(),
664 0,
665 exec_ctx,
666 )?;
667
668 let compressed_ends =
669 compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?;
670
671 Ok(unsafe {
673 RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
674 .into_array()
675 })
676 }
677}
678
679impl Scheme for SequenceScheme {
680 fn scheme_name(&self) -> &'static str {
681 "vortex.int.sequence"
682 }
683
684 fn matches(&self, canonical: &Canonical) -> bool {
685 is_integer_primitive(canonical)
686 }
687
688 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
692 vec![
693 AncestorExclusion {
694 ancestor: IntDictScheme.id(),
695 children: ChildSelection::One(1),
696 },
697 AncestorExclusion {
698 ancestor: FloatDictScheme.id(),
699 children: ChildSelection::One(1),
700 },
701 AncestorExclusion {
702 ancestor: StringDictScheme.id(),
703 children: ChildSelection::One(1),
704 },
705 ]
706 }
707
708 fn expected_compression_ratio(
709 &self,
710 data: &ArrayAndStats,
711 compress_ctx: CompressorContext,
712 exec_ctx: &mut ExecutionCtx,
713 ) -> CompressionEstimate {
714 if compress_ctx.is_sample() {
717 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
718 }
719 let stats = data.integer_stats(exec_ctx);
720
721 if stats.null_count() > 0 {
723 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
724 }
725
726 if stats
729 .distinct_count()
730 .is_some_and(|count| count as usize != data.array_len())
731 {
732 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
733 }
734
735 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
738 |_compressor, data, best_so_far, _ctx, exec_ctx| {
739 let compressed_size = 2usize;
742 let max_ratio = data.array_len() as f64 / compressed_size as f64;
743
744 let threshold = best_so_far.and_then(EstimateScore::finite_ratio);
747 if threshold.is_some_and(|t| max_ratio <= t) {
748 return Ok(EstimateVerdict::Skip);
749 }
750
751 if sequence_encode(data.array_as_primitive(), exec_ctx)?.is_none() {
754 return Ok(EstimateVerdict::Skip);
755 }
756 Ok(EstimateVerdict::Ratio(max_ratio))
758 },
759 )))
760 }
761
762 fn compress(
763 &self,
764 _compressor: &CascadingCompressor,
765 data: &ArrayAndStats,
766 _compress_ctx: CompressorContext,
767 exec_ctx: &mut ExecutionCtx,
768 ) -> VortexResult<ArrayRef> {
769 let stats = data.integer_stats(exec_ctx);
770
771 if stats.null_count() > 0 {
772 vortex_bail!("sequence encoding does not support nulls");
773 }
774 sequence_encode(data.array_as_primitive(), exec_ctx)?
775 .ok_or_else(|| vortex_err!("cannot sequence encode array"))
776 }
777}
778
779#[cfg(feature = "pco")]
780impl Scheme for PcoScheme {
781 fn scheme_name(&self) -> &'static str {
782 "vortex.int.pco"
783 }
784
785 fn matches(&self, canonical: &Canonical) -> bool {
786 is_integer_primitive(canonical)
787 }
788
789 fn expected_compression_ratio(
790 &self,
791 data: &ArrayAndStats,
792 _compress_ctx: CompressorContext,
793 _exec_ctx: &mut ExecutionCtx,
794 ) -> CompressionEstimate {
795 use vortex_array::dtype::PType;
796
797 if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
799 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
800 }
801
802 CompressionEstimate::Deferred(DeferredEstimate::Sample)
803 }
804
805 fn compress(
806 &self,
807 _compressor: &CascadingCompressor,
808 data: &ArrayAndStats,
809 _compress_ctx: CompressorContext,
810 exec_ctx: &mut ExecutionCtx,
811 ) -> VortexResult<ArrayRef> {
812 Ok(vortex_pco::Pco::from_primitive(
813 data.array_as_primitive(),
814 pco::DEFAULT_COMPRESSION_LEVEL,
815 8192,
816 exec_ctx,
817 )?
818 .into_array())
819 }
820}
821
822pub(crate) fn rle_compress(
824 scheme: &dyn Scheme,
825 compressor: &CascadingCompressor,
826 data: &ArrayAndStats,
827 compress_ctx: CompressorContext,
828 exec_ctx: &mut ExecutionCtx,
829) -> VortexResult<ArrayRef> {
830 let rle_array = RLE::encode(data.array_as_primitive(), exec_ctx)?;
831
832 let rle_values_primitive = rle_array
833 .values()
834 .clone()
835 .execute::<PrimitiveArray>(exec_ctx)?;
836 let compressed_values = compressor.compress_child(
837 &rle_values_primitive.into_array(),
838 &compress_ctx,
839 scheme.id(),
840 0,
841 exec_ctx,
842 )?;
843
844 #[cfg(feature = "unstable_encodings")]
846 let compressed_indices = {
847 let rle_indices_primitive = rle_array
848 .indices()
849 .clone()
850 .execute::<PrimitiveArray>(exec_ctx)?
851 .narrow()?;
852 try_compress_delta(
853 compressor,
854 &rle_indices_primitive.into_array(),
855 &compress_ctx,
856 scheme.id(),
857 1,
858 exec_ctx,
859 )?
860 };
861
862 #[cfg(not(feature = "unstable_encodings"))]
863 let compressed_indices = {
864 let rle_indices_primitive = rle_array
865 .indices()
866 .clone()
867 .execute::<PrimitiveArray>(exec_ctx)?
868 .narrow()?;
869 compressor.compress_child(
870 &rle_indices_primitive.into_array(),
871 &compress_ctx,
872 scheme.id(),
873 1,
874 exec_ctx,
875 )?
876 };
877
878 let rle_offsets_primitive = rle_array
879 .values_idx_offsets()
880 .clone()
881 .execute::<PrimitiveArray>(exec_ctx)?
882 .narrow()?;
883 let compressed_offsets = compressor.compress_child(
884 &rle_offsets_primitive.into_array(),
885 &compress_ctx,
886 scheme.id(),
887 2,
888 exec_ctx,
889 )?;
890
891 unsafe {
893 Ok(RLE::new_unchecked(
894 compressed_values,
895 compressed_indices,
896 compressed_offsets,
897 rle_array.offset(),
898 rle_array.len(),
899 )
900 .into_array())
901 }
902}
903
904#[cfg(feature = "unstable_encodings")]
905fn try_compress_delta(
906 compressor: &CascadingCompressor,
907 child: &ArrayRef,
908 parent_ctx: &CompressorContext,
909 parent_id: SchemeId,
910 child_index: usize,
911 exec_ctx: &mut ExecutionCtx,
912) -> VortexResult<ArrayRef> {
913 let child_primitive = child.clone().execute::<PrimitiveArray>(exec_ctx)?;
914 let (bases, deltas) = vortex_fastlanes::delta_compress(&child_primitive, exec_ctx)?;
915
916 let compressed_bases = compressor.compress_child(
917 &bases.into_array(),
918 parent_ctx,
919 parent_id,
920 child_index,
921 exec_ctx,
922 )?;
923 let compressed_deltas = compressor.compress_child(
924 &deltas.into_array(),
925 parent_ctx,
926 parent_id,
927 child_index,
928 exec_ctx,
929 )?;
930
931 Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
932}
933
934impl Scheme for IntRLEScheme {
935 fn scheme_name(&self) -> &'static str {
936 "vortex.int.rle"
937 }
938
939 fn matches(&self, canonical: &Canonical) -> bool {
940 is_integer_primitive(canonical)
941 }
942
943 fn num_children(&self) -> usize {
945 3
946 }
947
948 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
949 rle_descendant_exclusions()
950 }
951
952 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
953 rle_ancestor_exclusions()
954 }
955
956 fn expected_compression_ratio(
957 &self,
958 data: &ArrayAndStats,
959 compress_ctx: CompressorContext,
960 exec_ctx: &mut ExecutionCtx,
961 ) -> CompressionEstimate {
962 if compress_ctx.finished_cascading() {
964 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
965 }
966 if data.integer_stats(exec_ctx).average_run_length() < RUN_LENGTH_THRESHOLD {
967 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
968 }
969
970 CompressionEstimate::Deferred(DeferredEstimate::Sample)
971 }
972
973 fn compress(
974 &self,
975 compressor: &CascadingCompressor,
976 data: &ArrayAndStats,
977 compress_ctx: CompressorContext,
978 exec_ctx: &mut ExecutionCtx,
979 ) -> VortexResult<ArrayRef> {
980 rle_compress(self, compressor, data, compress_ctx, exec_ctx)
981 }
982}
983
984#[cfg(test)]
985mod tests {
986 use std::iter;
987 use std::sync::LazyLock;
988
989 use itertools::Itertools;
990 use rand::Rng;
991 use rand::SeedableRng;
992 use rand::rngs::StdRng;
993 use vortex_array::IntoArray;
994 use vortex_array::VortexSessionExecute;
995 use vortex_array::arrays::Constant;
996 use vortex_array::arrays::Dict;
997 use vortex_array::arrays::Masked;
998 use vortex_array::arrays::PrimitiveArray;
999 use vortex_array::assert_arrays_eq;
1000 use vortex_array::session::ArraySession;
1001 use vortex_array::validity::Validity;
1002 use vortex_buffer::Buffer;
1003 use vortex_buffer::BufferMut;
1004 use vortex_buffer::buffer;
1005 use vortex_compressor::CascadingCompressor;
1006 use vortex_error::VortexResult;
1007 use vortex_fastlanes::RLE;
1008 use vortex_sequence::Sequence;
1009 use vortex_session::VortexSession;
1010
1011 use crate::BtrBlocksCompressor;
1012 use crate::schemes::integer::IntRLEScheme;
1013
1014 static SESSION: LazyLock<VortexSession> =
1015 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1016
1017 #[test]
1018 fn test_empty() -> VortexResult<()> {
1019 let btr = BtrBlocksCompressor::default();
1021 let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
1022 let result = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1023
1024 assert!(result.is_empty());
1025 Ok(())
1026 }
1027
1028 #[test]
1029 fn test_dict_encodable() -> VortexResult<()> {
1030 let mut codes = BufferMut::<i32>::with_capacity(65_535);
1031 let numbers = [0, 10, 50, 100, 1000, 3000]
1035 .into_iter()
1036 .map(|i| 12340 * i) .collect_vec();
1038
1039 let mut rng = StdRng::seed_from_u64(1u64);
1040 while codes.len() < 64000 {
1041 let run_length = rng.next_u32() % 5;
1042 let value = numbers[rng.next_u32() as usize % numbers.len()];
1043 for _ in 0..run_length {
1044 codes.push(value);
1045 }
1046 }
1047
1048 let btr = BtrBlocksCompressor::default();
1049 let compressed = btr.compress(
1050 &codes.freeze().into_array(),
1051 &mut SESSION.create_execution_ctx(),
1052 )?;
1053 assert!(compressed.is::<Dict>());
1054 Ok(())
1055 }
1056
1057 #[test]
1058 fn constant_mostly_nulls() -> VortexResult<()> {
1059 let array = PrimitiveArray::new(
1060 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1061 Validity::from_iter(vec![
1062 false, false, false, false, false, false, false, false, false, false, true,
1063 ]),
1064 );
1065 let validity = array.validity()?;
1066
1067 let btr = BtrBlocksCompressor::default();
1068 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1069
1070 assert!(compressed.is::<Masked>());
1071 assert!(compressed.children()[0].is::<Constant>());
1072
1073 let decoded = compressed;
1074 let expected =
1075 PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
1076 assert_arrays_eq!(decoded, expected);
1077 Ok(())
1078 }
1079
1080 #[test]
1081 fn nullable_sequence() -> VortexResult<()> {
1082 let values = (0i32..20).step_by(7).collect_vec();
1083 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1084
1085 let btr = BtrBlocksCompressor::default();
1086 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1087 assert!(compressed.is::<Sequence>());
1088
1089 let decoded = compressed;
1090 let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1091 assert_arrays_eq!(decoded, expected);
1092 Ok(())
1093 }
1094
1095 #[test]
1096 fn test_rle_compression() -> VortexResult<()> {
1097 let mut values = Vec::new();
1098 values.extend(iter::repeat_n(42i32, 100));
1099 values.extend(iter::repeat_n(123i32, 200));
1100 values.extend(iter::repeat_n(987i32, 150));
1101
1102 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1103 let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
1104 let compressed =
1105 compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1106 assert!(compressed.is::<RLE>());
1107
1108 let expected = Buffer::copy_from(&values).into_array();
1109 assert_arrays_eq!(compressed, expected);
1110 Ok(())
1111 }
1112
1113 #[test_with::env(CI)]
1114 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1115 fn compress_large_int() -> VortexResult<()> {
1116 const NUM_LISTS: usize = 10_000;
1117 const ELEMENTS_PER_LIST: usize = 5_000;
1118
1119 let prim = (0..NUM_LISTS)
1120 .flat_map(|list_idx| {
1121 (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1122 })
1123 .collect::<PrimitiveArray>()
1124 .into_array();
1125
1126 let btr = BtrBlocksCompressor::default();
1127 btr.compress(&prim, &mut SESSION.create_execution_ctx())?;
1128
1129 Ok(())
1130 }
1131}
1132
1133#[cfg(test)]
1135mod scheme_selection_tests {
1136 use std::iter;
1137 use std::sync::LazyLock;
1138
1139 use rand::Rng;
1140 use rand::SeedableRng;
1141 use rand::rngs::StdRng;
1142 use vortex_array::IntoArray;
1143 use vortex_array::VortexSessionExecute;
1144 use vortex_array::arrays::Constant;
1145 use vortex_array::arrays::Dict;
1146 use vortex_array::arrays::PrimitiveArray;
1147 use vortex_array::expr::stats::Precision;
1148 use vortex_array::expr::stats::Stat;
1149 use vortex_array::expr::stats::StatsProviderExt;
1150 use vortex_array::session::ArraySession;
1151 use vortex_array::validity::Validity;
1152 use vortex_buffer::Buffer;
1153 use vortex_error::VortexResult;
1154 use vortex_fastlanes::BitPacked;
1155 use vortex_fastlanes::FoR;
1156 use vortex_runend::RunEnd;
1157 use vortex_sequence::Sequence;
1158 use vortex_session::VortexSession;
1159 use vortex_sparse::Sparse;
1160
1161 use crate::BtrBlocksCompressor;
1162
1163 static SESSION: LazyLock<VortexSession> =
1164 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1165
1166 #[test]
1167 fn test_constant_compressed() -> VortexResult<()> {
1168 let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1169 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1170 let btr = BtrBlocksCompressor::default();
1171 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1172 assert!(compressed.is::<Constant>());
1173 Ok(())
1174 }
1175
1176 #[test]
1177 fn test_for_compressed() -> VortexResult<()> {
1178 let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1179 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1180 let btr = BtrBlocksCompressor::default();
1181 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1182 assert!(compressed.is::<FoR>());
1183 Ok(())
1184 }
1185
1186 #[test]
1187 fn test_bitpacking_compressed() -> VortexResult<()> {
1188 let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1189 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1190 let btr = BtrBlocksCompressor::default();
1191 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1192 assert!(compressed.is::<BitPacked>());
1193 assert_eq!(
1194 compressed.statistics().get_as::<u64>(Stat::NullCount),
1195 Some(Precision::exact(0u64))
1196 );
1197 assert_eq!(
1198 compressed.statistics().get_as::<u32>(Stat::Min),
1199 Some(Precision::exact(0u32))
1200 );
1201 assert_eq!(
1202 compressed.statistics().get_as::<u32>(Stat::Max),
1203 Some(Precision::exact(15u32))
1204 );
1205 Ok(())
1206 }
1207
1208 #[test]
1209 fn test_sparse_compressed() -> VortexResult<()> {
1210 let mut values: Vec<i32> = Vec::new();
1211 for i in 0..1000 {
1212 if i % 20 == 0 {
1213 values.push(2_000_000 + (i * 7) % 1000);
1214 } else {
1215 values.push(1_000_000);
1216 }
1217 }
1218 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1219 let btr = BtrBlocksCompressor::default();
1220 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1221 assert!(compressed.is::<Sparse>());
1222 Ok(())
1223 }
1224
1225 #[test]
1226 fn test_dict_compressed() -> VortexResult<()> {
1227 let mut codes = Vec::with_capacity(65_535);
1228 let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1229 .into_iter()
1230 .map(|i| 12340 * i) .collect();
1232
1233 let mut rng = StdRng::seed_from_u64(1u64);
1234 while codes.len() < 64000 {
1235 let run_length = rng.next_u32() % 5;
1236 let value = numbers[rng.next_u32() as usize % numbers.len()];
1237 for _ in 0..run_length {
1238 codes.push(value);
1239 }
1240 }
1241
1242 let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1243 let btr = BtrBlocksCompressor::default();
1244 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1245 assert!(compressed.is::<Dict>());
1246 Ok(())
1247 }
1248
1249 #[test]
1250 fn test_runend_compressed() -> VortexResult<()> {
1251 let mut values: Vec<i32> = Vec::new();
1252 for i in 0..100 {
1253 values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1254 }
1255 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1256 let btr = BtrBlocksCompressor::default();
1257 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1258 assert!(compressed.is::<RunEnd>());
1259 Ok(())
1260 }
1261
1262 #[test]
1263 fn test_sequence_compressed() -> VortexResult<()> {
1264 let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1265 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1266 let btr = BtrBlocksCompressor::default();
1267 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1268 assert!(compressed.is::<Sequence>());
1269 Ok(())
1270 }
1271
1272 #[test]
1273 fn test_rle_compressed() -> VortexResult<()> {
1274 let mut values: Vec<i32> = Vec::new();
1275 for i in 0..1024 {
1276 values.extend(iter::repeat_n(i, 10));
1277 }
1278 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1279 let btr = BtrBlocksCompressor::default();
1280 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1281 eprintln!("{}", compressed.display_tree());
1282 assert!(compressed.is::<RunEnd>());
1283 Ok(())
1284 }
1285}