1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Patched;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::patched::use_experimental_patches;
14use vortex_array::arrays::primitive::PrimitiveArrayExt;
15use vortex_array::scalar::Scalar;
16use vortex_compressor::builtins::FloatDictScheme;
17use vortex_compressor::builtins::StringDictScheme;
18use vortex_compressor::estimate::CompressionEstimate;
19use vortex_compressor::estimate::DeferredEstimate;
20use vortex_compressor::estimate::EstimateScore;
21use vortex_compressor::estimate::EstimateVerdict;
22use vortex_compressor::scheme::AncestorExclusion;
23use vortex_compressor::scheme::ChildSelection;
24use vortex_compressor::scheme::DescendantExclusion;
25#[cfg(feature = "unstable_encodings")]
26use vortex_compressor::scheme::SchemeId;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_err;
31use vortex_fastlanes::BitPacked;
32#[cfg(feature = "unstable_encodings")]
33use vortex_fastlanes::Delta;
34use vortex_fastlanes::FoR;
35use vortex_fastlanes::FoRArrayExt;
36use vortex_fastlanes::RLE;
37use vortex_fastlanes::RLEArrayExt;
38use vortex_fastlanes::bitpack_compress::bit_width_histogram;
39use vortex_fastlanes::bitpack_compress::bitpack_encode;
40use vortex_fastlanes::bitpack_compress::find_best_bit_width;
41use vortex_runend::RunEnd;
42use vortex_runend::compress::runend_encode;
43use vortex_sequence::sequence_encode;
44use vortex_sparse::Sparse;
45use vortex_sparse::SparseExt as _;
46use vortex_zigzag::ZigZag;
47use vortex_zigzag::ZigZagArrayExt;
48use vortex_zigzag::zigzag_encode;
49
50use crate::ArrayAndStats;
51use crate::CascadingCompressor;
52use crate::CompressorContext;
53use crate::GenerateStatsOptions;
54use crate::Scheme;
55use crate::SchemeExt;
56use crate::compress_patches;
57use crate::schemes::rle_ancestor_exclusions;
58use crate::schemes::rle_descendant_exclusions;
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq)]
62pub struct FoRScheme;
63
64#[derive(Debug, Copy, Clone, PartialEq, Eq)]
66pub struct ZigZagScheme;
67
68#[derive(Debug, Copy, Clone, PartialEq, Eq)]
70pub struct BitPackingScheme;
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq)]
74pub struct SparseScheme;
75
76#[derive(Debug, Copy, Clone, PartialEq, Eq)]
78pub struct RunEndScheme;
79
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
82pub struct SequenceScheme;
83
84#[cfg(feature = "pco")]
86#[derive(Debug, Copy, Clone, PartialEq, Eq)]
87pub struct PcoScheme;
88
89pub use vortex_compressor::builtins::IntConstantScheme;
91pub use vortex_compressor::builtins::IntDictScheme;
92pub use vortex_compressor::builtins::is_integer_primitive;
93pub use vortex_compressor::stats::IntegerStats;
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct IntRLEScheme;
98
99pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
101
102const RUN_END_THRESHOLD: u32 = 4;
104
105impl Scheme for FoRScheme {
106 fn scheme_name(&self) -> &'static str {
107 "vortex.int.for"
108 }
109
110 fn matches(&self, canonical: &Canonical) -> bool {
111 is_integer_primitive(canonical)
112 }
113
114 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
116 vec![
117 AncestorExclusion {
118 ancestor: IntDictScheme.id(),
119 children: ChildSelection::One(1),
120 },
121 AncestorExclusion {
122 ancestor: FloatDictScheme.id(),
123 children: ChildSelection::One(1),
124 },
125 AncestorExclusion {
126 ancestor: StringDictScheme.id(),
127 children: ChildSelection::One(1),
128 },
129 ]
130 }
131
132 fn expected_compression_ratio(
133 &self,
134 data: &ArrayAndStats,
135 compress_ctx: CompressorContext,
136 exec_ctx: &mut ExecutionCtx,
137 ) -> CompressionEstimate {
138 if compress_ctx.finished_cascading() {
141 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
142 }
143 let stats = data.integer_stats(exec_ctx);
144
145 if stats.erased().min_is_zero() {
147 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
148 }
149
150 let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
152 Some(l) => l + 1,
153 None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
155 };
156
157 if let Some(max_log) = stats
161 .erased()
162 .max_ilog2()
163 .filter(|_| !stats.erased().min_is_negative())
165 {
166 let bitpack_bitwidth = max_log + 1;
167 if for_bitwidth >= bitpack_bitwidth {
168 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
169 }
170 }
171
172 let full_width: u32 = data
173 .array_as_primitive()
174 .ptype()
175 .bit_width()
176 .try_into()
177 .vortex_expect("bit width must fit in u32");
178
179 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
180 full_width as f64 / for_bitwidth as f64,
181 ))
182 }
183
184 fn compress(
185 &self,
186 compressor: &CascadingCompressor,
187 data: &ArrayAndStats,
188 compress_ctx: CompressorContext,
189 exec_ctx: &mut ExecutionCtx,
190 ) -> VortexResult<ArrayRef> {
191 let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
192 let for_array = FoR::encode(primitive)?;
193 let biased = for_array
194 .encoded()
195 .clone()
196 .execute::<PrimitiveArray>(exec_ctx)?;
197
198 let leaf_ctx = compress_ctx.clone().as_leaf();
203 let biased_data =
204 ArrayAndStats::new(biased.into_array(), compress_ctx.merged_stats_options());
205 let compressed = BitPackingScheme.compress(compressor, &biased_data, leaf_ctx, exec_ctx)?;
206
207 let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
209 for_compressed
210 .as_ref()
211 .statistics()
212 .inherit_from(for_array.as_ref().statistics());
213
214 Ok(for_compressed.into_array())
215 }
216}
217
218impl Scheme for ZigZagScheme {
219 fn scheme_name(&self) -> &'static str {
220 "vortex.int.zigzag"
221 }
222
223 fn matches(&self, canonical: &Canonical) -> bool {
224 is_integer_primitive(canonical)
225 }
226
227 fn num_children(&self) -> usize {
229 1
230 }
231
232 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
236 vec![
237 DescendantExclusion {
238 excluded: IntDictScheme.id(),
239 children: ChildSelection::All,
240 },
241 DescendantExclusion {
242 excluded: RunEndScheme.id(),
243 children: ChildSelection::All,
244 },
245 DescendantExclusion {
246 excluded: SparseScheme.id(),
247 children: ChildSelection::All,
248 },
249 ]
250 }
251
252 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
254 vec![
255 AncestorExclusion {
256 ancestor: IntDictScheme.id(),
257 children: ChildSelection::One(1),
258 },
259 AncestorExclusion {
260 ancestor: FloatDictScheme.id(),
261 children: ChildSelection::One(1),
262 },
263 AncestorExclusion {
264 ancestor: StringDictScheme.id(),
265 children: ChildSelection::One(1),
266 },
267 ]
268 }
269
270 fn expected_compression_ratio(
271 &self,
272 data: &ArrayAndStats,
273 compress_ctx: CompressorContext,
274 exec_ctx: &mut ExecutionCtx,
275 ) -> CompressionEstimate {
276 if compress_ctx.finished_cascading() {
279 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
280 }
281 let stats = data.integer_stats(exec_ctx);
282
283 if !stats.erased().min_is_negative() {
285 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
286 }
287
288 CompressionEstimate::Deferred(DeferredEstimate::Sample)
289 }
290
291 fn compress(
292 &self,
293 compressor: &CascadingCompressor,
294 data: &ArrayAndStats,
295 compress_ctx: CompressorContext,
296 exec_ctx: &mut ExecutionCtx,
297 ) -> VortexResult<ArrayRef> {
298 let zag = zigzag_encode(data.array_as_primitive())?;
300 let encoded = zag.encoded().clone().execute::<PrimitiveArray>(exec_ctx)?;
301
302 let compressed = compressor.compress_child(
303 &encoded.into_array(),
304 &compress_ctx,
305 self.id(),
306 0,
307 exec_ctx,
308 )?;
309
310 Ok(ZigZag::try_new(compressed)?.into_array())
311 }
312}
313
314impl Scheme for BitPackingScheme {
315 fn scheme_name(&self) -> &'static str {
316 "vortex.int.bitpacking"
317 }
318
319 fn matches(&self, canonical: &Canonical) -> bool {
320 is_integer_primitive(canonical)
321 }
322
323 fn expected_compression_ratio(
324 &self,
325 data: &ArrayAndStats,
326 _compress_ctx: CompressorContext,
327 exec_ctx: &mut ExecutionCtx,
328 ) -> CompressionEstimate {
329 let stats = data.integer_stats(exec_ctx);
330
331 if stats.erased().min_is_negative() {
333 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
334 }
335
336 CompressionEstimate::Deferred(DeferredEstimate::Sample)
337 }
338
339 fn compress(
340 &self,
341 _compressor: &CascadingCompressor,
342 data: &ArrayAndStats,
343 _compress_ctx: CompressorContext,
344 exec_ctx: &mut ExecutionCtx,
345 ) -> VortexResult<ArrayRef> {
346 let primitive_array = data.array_as_primitive();
347
348 let histogram = bit_width_histogram(primitive_array, exec_ctx)?;
349 let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
350
351 if bw as usize == primitive_array.ptype().bit_width() {
353 return Ok(primitive_array.array().clone());
354 }
355
356 let primitive_array = primitive_array.into_owned();
358 let packed = bitpack_encode(&primitive_array, bw, Some(&histogram), exec_ctx)?;
359
360 let packed_stats = packed.statistics().to_owned();
361 let ptype = packed.dtype().as_ptype();
362 let mut parts = BitPacked::into_parts(packed);
363
364 let array = if use_experimental_patches() {
365 let patches = parts.patches.take();
366 let array = BitPacked::try_new(
368 parts.packed,
369 ptype,
370 parts.validity,
371 None,
372 parts.bit_width,
373 parts.len,
374 parts.offset,
375 )?
376 .into_array();
377
378 match patches {
379 None => array,
380 Some(p) => Patched::from_array_and_patches(array, &p, exec_ctx)?
381 .with_stats_set(packed_stats)
382 .into_array(),
383 }
384 } else {
385 let patches = parts
387 .patches
388 .take()
389 .map(|p| compress_patches(p, exec_ctx))
390 .transpose()?;
391 parts.patches = patches;
392 BitPacked::try_new(
393 parts.packed,
394 ptype,
395 parts.validity,
396 parts.patches,
397 parts.bit_width,
398 parts.len,
399 parts.offset,
400 )?
401 .with_stats_set(packed_stats)
402 .into_array()
403 };
404
405 Ok(array)
406 }
407}
408
409impl Scheme for SparseScheme {
410 fn scheme_name(&self) -> &'static str {
411 "vortex.int.sparse"
412 }
413
414 fn matches(&self, canonical: &Canonical) -> bool {
415 is_integer_primitive(canonical)
416 }
417
418 fn stats_options(&self) -> GenerateStatsOptions {
419 GenerateStatsOptions {
420 count_distinct_values: true,
421 }
422 }
423
424 fn num_children(&self) -> usize {
426 2
427 }
428
429 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
432 vec![
433 DescendantExclusion {
434 excluded: IntDictScheme.id(),
435 children: ChildSelection::One(1),
436 },
437 DescendantExclusion {
438 excluded: RunEndScheme.id(),
439 children: ChildSelection::One(1),
440 },
441 DescendantExclusion {
442 excluded: IntRLEScheme.id(),
443 children: ChildSelection::One(1),
444 },
445 DescendantExclusion {
446 excluded: SparseScheme.id(),
447 children: ChildSelection::One(1),
448 },
449 ]
450 }
451
452 fn expected_compression_ratio(
453 &self,
454 data: &ArrayAndStats,
455 _compress_ctx: CompressorContext,
456 exec_ctx: &mut ExecutionCtx,
457 ) -> CompressionEstimate {
458 let len = data.array_len() as f64;
459 let stats = data.integer_stats(exec_ctx);
460 let value_count = stats.value_count();
461
462 if value_count == 0 {
464 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
465 }
466
467 if stats.null_count() as f64 / len > 0.9 {
469 return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
470 }
471
472 let (_, most_frequent_count) = stats
473 .erased()
474 .most_frequent_value_and_count()
475 .vortex_expect(
476 "this must be present since `SparseScheme` declared that we need distinct values",
477 );
478
479 if most_frequent_count == value_count {
481 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
482 }
483 debug_assert!(value_count > most_frequent_count);
484
485 let freq = most_frequent_count as f64 / value_count as f64;
487 if freq < 0.9 {
488 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
489 }
490
491 CompressionEstimate::Verdict(EstimateVerdict::Ratio(
493 value_count as f64 / (value_count - most_frequent_count) as f64,
494 ))
495 }
496
497 fn compress(
498 &self,
499 compressor: &CascadingCompressor,
500 data: &ArrayAndStats,
501 compress_ctx: CompressorContext,
502 exec_ctx: &mut ExecutionCtx,
503 ) -> VortexResult<ArrayRef> {
504 let len = data.array_len();
505 let stats = data.integer_stats(exec_ctx);
506 let array = data.array();
507
508 let (most_frequent_value, most_frequent_count) = stats
509 .erased()
510 .most_frequent_value_and_count()
511 .vortex_expect(
512 "this must be present since `SparseScheme` declared that we need distinct values",
513 );
514
515 if most_frequent_count as usize == len {
516 return Ok(ConstantArray::new(
518 Scalar::primitive_value(
519 most_frequent_value,
520 most_frequent_value.ptype(),
521 array.dtype().nullability(),
522 ),
523 len,
524 )
525 .into_array());
526 }
527
528 let sparse_encoded = Sparse::encode(
529 array,
530 Some(Scalar::primitive_value(
531 most_frequent_value,
532 most_frequent_value.ptype(),
533 array.dtype().nullability(),
534 )),
535 exec_ctx,
536 )?;
537
538 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
539 let sparse_values_primitive = sparse
540 .patches()
541 .values()
542 .clone()
543 .execute::<PrimitiveArray>(exec_ctx)?;
544 let compressed_values = compressor.compress_child(
545 &sparse_values_primitive.into_array(),
546 &compress_ctx,
547 self.id(),
548 0,
549 exec_ctx,
550 )?;
551
552 let indices = sparse
553 .patches()
554 .indices()
555 .clone()
556 .execute::<PrimitiveArray>(exec_ctx)?
557 .narrow()?;
558
559 let compressed_indices = compressor.compress_child(
560 &indices.into_array(),
561 &compress_ctx,
562 self.id(),
563 1,
564 exec_ctx,
565 )?;
566
567 Sparse::try_new(
568 compressed_indices,
569 compressed_values,
570 sparse.len(),
571 sparse.fill_scalar().clone(),
572 )
573 .map(|a| a.into_array())
574 } else {
575 Ok(sparse_encoded)
576 }
577 }
578}
579
580impl Scheme for RunEndScheme {
581 fn scheme_name(&self) -> &'static str {
582 "vortex.int.runend"
583 }
584
585 fn matches(&self, canonical: &Canonical) -> bool {
586 is_integer_primitive(canonical)
587 }
588
589 fn num_children(&self) -> usize {
591 2
592 }
593
594 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
597 vec![
598 DescendantExclusion {
599 excluded: IntDictScheme.id(),
600 children: ChildSelection::One(1),
601 },
602 DescendantExclusion {
603 excluded: RunEndScheme.id(),
604 children: ChildSelection::One(1),
605 },
606 DescendantExclusion {
607 excluded: IntRLEScheme.id(),
608 children: ChildSelection::One(1),
609 },
610 DescendantExclusion {
611 excluded: SparseScheme.id(),
612 children: ChildSelection::One(1),
613 },
614 ]
615 }
616
617 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
620 vec![
621 AncestorExclusion {
622 ancestor: IntDictScheme.id(),
623 children: ChildSelection::One(0),
624 },
625 AncestorExclusion {
626 ancestor: FloatDictScheme.id(),
627 children: ChildSelection::One(0),
628 },
629 AncestorExclusion {
630 ancestor: StringDictScheme.id(),
631 children: ChildSelection::One(0),
632 },
633 ]
634 }
635
636 fn expected_compression_ratio(
637 &self,
638 data: &ArrayAndStats,
639 _compress_ctx: CompressorContext,
640 exec_ctx: &mut ExecutionCtx,
641 ) -> CompressionEstimate {
642 if data.integer_stats(exec_ctx).average_run_length() < RUN_END_THRESHOLD {
644 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
645 }
646
647 CompressionEstimate::Deferred(DeferredEstimate::Sample)
648 }
649
650 fn compress(
651 &self,
652 compressor: &CascadingCompressor,
653 data: &ArrayAndStats,
654 compress_ctx: CompressorContext,
655 exec_ctx: &mut ExecutionCtx,
656 ) -> VortexResult<ArrayRef> {
657 let (ends, values) = runend_encode(data.array_as_primitive(), exec_ctx);
659
660 let values_primitive = values.execute::<PrimitiveArray>(exec_ctx)?;
661 let compressed_values = compressor.compress_child(
662 &values_primitive.into_array(),
663 &compress_ctx,
664 self.id(),
665 0,
666 exec_ctx,
667 )?;
668
669 let compressed_ends =
670 compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?;
671
672 Ok(unsafe {
674 RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
675 .into_array()
676 })
677 }
678}
679
680impl Scheme for SequenceScheme {
681 fn scheme_name(&self) -> &'static str {
682 "vortex.int.sequence"
683 }
684
685 fn matches(&self, canonical: &Canonical) -> bool {
686 is_integer_primitive(canonical)
687 }
688
689 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
693 vec![
694 AncestorExclusion {
695 ancestor: IntDictScheme.id(),
696 children: ChildSelection::One(1),
697 },
698 AncestorExclusion {
699 ancestor: FloatDictScheme.id(),
700 children: ChildSelection::One(1),
701 },
702 AncestorExclusion {
703 ancestor: StringDictScheme.id(),
704 children: ChildSelection::One(1),
705 },
706 ]
707 }
708
709 fn expected_compression_ratio(
710 &self,
711 data: &ArrayAndStats,
712 compress_ctx: CompressorContext,
713 exec_ctx: &mut ExecutionCtx,
714 ) -> CompressionEstimate {
715 if compress_ctx.is_sample() {
718 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
719 }
720 let stats = data.integer_stats(exec_ctx);
721
722 if stats.null_count() > 0 {
724 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
725 }
726
727 if stats
730 .distinct_count()
731 .is_some_and(|count| count as usize != data.array_len())
732 {
733 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
734 }
735
736 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
739 |_compressor, data, best_so_far, _ctx, exec_ctx| {
740 let compressed_size = 2usize;
743 let max_ratio = data.array_len() as f64 / compressed_size as f64;
744
745 let threshold = best_so_far.and_then(EstimateScore::finite_ratio);
748 if threshold.is_some_and(|t| max_ratio <= t) {
749 return Ok(EstimateVerdict::Skip);
750 }
751
752 if sequence_encode(data.array_as_primitive(), exec_ctx)?.is_none() {
755 return Ok(EstimateVerdict::Skip);
756 }
757 Ok(EstimateVerdict::Ratio(max_ratio))
759 },
760 )))
761 }
762
763 fn compress(
764 &self,
765 _compressor: &CascadingCompressor,
766 data: &ArrayAndStats,
767 _compress_ctx: CompressorContext,
768 exec_ctx: &mut ExecutionCtx,
769 ) -> VortexResult<ArrayRef> {
770 let stats = data.integer_stats(exec_ctx);
771
772 if stats.null_count() > 0 {
773 vortex_bail!("sequence encoding does not support nulls");
774 }
775 sequence_encode(data.array_as_primitive(), exec_ctx)?
776 .ok_or_else(|| vortex_err!("cannot sequence encode array"))
777 }
778}
779
780#[cfg(feature = "pco")]
781impl Scheme for PcoScheme {
782 fn scheme_name(&self) -> &'static str {
783 "vortex.int.pco"
784 }
785
786 fn matches(&self, canonical: &Canonical) -> bool {
787 is_integer_primitive(canonical)
788 }
789
790 fn expected_compression_ratio(
791 &self,
792 data: &ArrayAndStats,
793 _compress_ctx: CompressorContext,
794 _exec_ctx: &mut ExecutionCtx,
795 ) -> CompressionEstimate {
796 use vortex_array::dtype::PType;
797
798 if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
800 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
801 }
802
803 CompressionEstimate::Deferred(DeferredEstimate::Sample)
804 }
805
806 fn compress(
807 &self,
808 _compressor: &CascadingCompressor,
809 data: &ArrayAndStats,
810 _compress_ctx: CompressorContext,
811 exec_ctx: &mut ExecutionCtx,
812 ) -> VortexResult<ArrayRef> {
813 Ok(vortex_pco::Pco::from_primitive(
814 data.array_as_primitive(),
815 pco::DEFAULT_COMPRESSION_LEVEL,
816 8192,
817 exec_ctx,
818 )?
819 .into_array())
820 }
821}
822
823pub(crate) fn rle_compress(
825 scheme: &dyn Scheme,
826 compressor: &CascadingCompressor,
827 data: &ArrayAndStats,
828 compress_ctx: CompressorContext,
829 exec_ctx: &mut ExecutionCtx,
830) -> VortexResult<ArrayRef> {
831 let rle_array = RLE::encode(data.array_as_primitive(), exec_ctx)?;
832
833 let rle_values_primitive = rle_array
834 .values()
835 .clone()
836 .execute::<PrimitiveArray>(exec_ctx)?;
837 let compressed_values = compressor.compress_child(
838 &rle_values_primitive.into_array(),
839 &compress_ctx,
840 scheme.id(),
841 0,
842 exec_ctx,
843 )?;
844
845 #[cfg(feature = "unstable_encodings")]
847 let compressed_indices = {
848 let rle_indices_primitive = rle_array
849 .indices()
850 .clone()
851 .execute::<PrimitiveArray>(exec_ctx)?
852 .narrow()?;
853 try_compress_delta(
854 compressor,
855 &rle_indices_primitive.into_array(),
856 &compress_ctx,
857 scheme.id(),
858 1,
859 exec_ctx,
860 )?
861 };
862
863 #[cfg(not(feature = "unstable_encodings"))]
864 let compressed_indices = {
865 let rle_indices_primitive = rle_array
866 .indices()
867 .clone()
868 .execute::<PrimitiveArray>(exec_ctx)?
869 .narrow()?;
870 compressor.compress_child(
871 &rle_indices_primitive.into_array(),
872 &compress_ctx,
873 scheme.id(),
874 1,
875 exec_ctx,
876 )?
877 };
878
879 let rle_offsets_primitive = rle_array
880 .values_idx_offsets()
881 .clone()
882 .execute::<PrimitiveArray>(exec_ctx)?
883 .narrow()?;
884 let compressed_offsets = compressor.compress_child(
885 &rle_offsets_primitive.into_array(),
886 &compress_ctx,
887 scheme.id(),
888 2,
889 exec_ctx,
890 )?;
891
892 unsafe {
894 Ok(RLE::new_unchecked(
895 compressed_values,
896 compressed_indices,
897 compressed_offsets,
898 rle_array.offset(),
899 rle_array.len(),
900 )
901 .into_array())
902 }
903}
904
905#[cfg(feature = "unstable_encodings")]
906fn try_compress_delta(
907 compressor: &CascadingCompressor,
908 child: &ArrayRef,
909 parent_ctx: &CompressorContext,
910 parent_id: SchemeId,
911 child_index: usize,
912 exec_ctx: &mut ExecutionCtx,
913) -> VortexResult<ArrayRef> {
914 let child_primitive = child.clone().execute::<PrimitiveArray>(exec_ctx)?;
915 let (bases, deltas) = vortex_fastlanes::delta_compress(&child_primitive, exec_ctx)?;
916
917 let compressed_bases = compressor.compress_child(
918 &bases.into_array(),
919 parent_ctx,
920 parent_id,
921 child_index,
922 exec_ctx,
923 )?;
924 let compressed_deltas = compressor.compress_child(
925 &deltas.into_array(),
926 parent_ctx,
927 parent_id,
928 child_index,
929 exec_ctx,
930 )?;
931
932 Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
933}
934
935impl Scheme for IntRLEScheme {
936 fn scheme_name(&self) -> &'static str {
937 "vortex.int.rle"
938 }
939
940 fn matches(&self, canonical: &Canonical) -> bool {
941 is_integer_primitive(canonical)
942 }
943
944 fn num_children(&self) -> usize {
946 3
947 }
948
949 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
950 rle_descendant_exclusions()
951 }
952
953 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
954 rle_ancestor_exclusions()
955 }
956
957 fn expected_compression_ratio(
958 &self,
959 data: &ArrayAndStats,
960 compress_ctx: CompressorContext,
961 exec_ctx: &mut ExecutionCtx,
962 ) -> CompressionEstimate {
963 if compress_ctx.finished_cascading() {
965 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
966 }
967 if data.integer_stats(exec_ctx).average_run_length() < RUN_LENGTH_THRESHOLD {
968 return CompressionEstimate::Verdict(EstimateVerdict::Skip);
969 }
970
971 CompressionEstimate::Deferred(DeferredEstimate::Sample)
972 }
973
974 fn compress(
975 &self,
976 compressor: &CascadingCompressor,
977 data: &ArrayAndStats,
978 compress_ctx: CompressorContext,
979 exec_ctx: &mut ExecutionCtx,
980 ) -> VortexResult<ArrayRef> {
981 rle_compress(self, compressor, data, compress_ctx, exec_ctx)
982 }
983}
984
985#[cfg(test)]
986mod tests {
987 use std::iter;
988 use std::sync::LazyLock;
989
990 use itertools::Itertools;
991 use rand::Rng;
992 use rand::SeedableRng;
993 use rand::rngs::StdRng;
994 use vortex_array::IntoArray;
995 use vortex_array::VortexSessionExecute;
996 use vortex_array::arrays::Constant;
997 use vortex_array::arrays::Dict;
998 use vortex_array::arrays::Masked;
999 use vortex_array::arrays::PrimitiveArray;
1000 use vortex_array::assert_arrays_eq;
1001 use vortex_array::session::ArraySession;
1002 use vortex_array::validity::Validity;
1003 use vortex_buffer::Buffer;
1004 use vortex_buffer::BufferMut;
1005 use vortex_buffer::buffer;
1006 use vortex_compressor::CascadingCompressor;
1007 use vortex_error::VortexResult;
1008 use vortex_fastlanes::RLE;
1009 use vortex_sequence::Sequence;
1010 use vortex_session::VortexSession;
1011
1012 use crate::BtrBlocksCompressor;
1013 use crate::schemes::integer::IntRLEScheme;
1014
1015 static SESSION: LazyLock<VortexSession> =
1016 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1017
1018 #[test]
1019 fn test_empty() -> VortexResult<()> {
1020 let btr = BtrBlocksCompressor::default();
1022 let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
1023 let result = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1024
1025 assert!(result.is_empty());
1026 Ok(())
1027 }
1028
1029 #[test]
1030 fn test_dict_encodable() -> VortexResult<()> {
1031 let mut codes = BufferMut::<i32>::with_capacity(65_535);
1032 let numbers = [0, 10, 50, 100, 1000, 3000]
1036 .into_iter()
1037 .map(|i| 12340 * i) .collect_vec();
1039
1040 let mut rng = StdRng::seed_from_u64(1u64);
1041 while codes.len() < 64000 {
1042 let run_length = rng.next_u32() % 5;
1043 let value = numbers[rng.next_u32() as usize % numbers.len()];
1044 for _ in 0..run_length {
1045 codes.push(value);
1046 }
1047 }
1048
1049 let btr = BtrBlocksCompressor::default();
1050 let compressed = btr.compress(
1051 &codes.freeze().into_array(),
1052 &mut SESSION.create_execution_ctx(),
1053 )?;
1054 assert!(compressed.is::<Dict>());
1055 Ok(())
1056 }
1057
1058 #[test]
1059 fn constant_mostly_nulls() -> VortexResult<()> {
1060 let array = PrimitiveArray::new(
1061 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1062 Validity::from_iter(vec![
1063 false, false, false, false, false, false, false, false, false, false, true,
1064 ]),
1065 );
1066 let validity = array.validity()?;
1067
1068 let btr = BtrBlocksCompressor::default();
1069 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1070
1071 assert!(compressed.is::<Masked>());
1072 assert!(compressed.children()[0].is::<Constant>());
1073
1074 let decoded = compressed;
1075 let expected =
1076 PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
1077 assert_arrays_eq!(decoded, expected);
1078 Ok(())
1079 }
1080
1081 #[test]
1082 fn nullable_sequence() -> VortexResult<()> {
1083 let values = (0i32..20).step_by(7).collect_vec();
1084 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1085
1086 let btr = BtrBlocksCompressor::default();
1087 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1088 assert!(compressed.is::<Sequence>());
1089
1090 let decoded = compressed;
1091 let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
1092 assert_arrays_eq!(decoded, expected);
1093 Ok(())
1094 }
1095
1096 #[test]
1097 fn test_rle_compression() -> VortexResult<()> {
1098 let mut values = Vec::new();
1099 values.extend(iter::repeat_n(42i32, 100));
1100 values.extend(iter::repeat_n(123i32, 200));
1101 values.extend(iter::repeat_n(987i32, 150));
1102
1103 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1104 let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
1105 let compressed =
1106 compressor.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1107 assert!(compressed.is::<RLE>());
1108
1109 let expected = Buffer::copy_from(&values).into_array();
1110 assert_arrays_eq!(compressed, expected);
1111 Ok(())
1112 }
1113
1114 #[test_with::env(CI)]
1115 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1116 fn compress_large_int() -> VortexResult<()> {
1117 const NUM_LISTS: usize = 10_000;
1118 const ELEMENTS_PER_LIST: usize = 5_000;
1119
1120 let prim = (0..NUM_LISTS)
1121 .flat_map(|list_idx| {
1122 (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1123 })
1124 .collect::<PrimitiveArray>()
1125 .into_array();
1126
1127 let btr = BtrBlocksCompressor::default();
1128 btr.compress(&prim, &mut SESSION.create_execution_ctx())?;
1129
1130 Ok(())
1131 }
1132}
1133
1134#[cfg(test)]
1136mod scheme_selection_tests {
1137 use std::iter;
1138 use std::sync::LazyLock;
1139
1140 use rand::Rng;
1141 use rand::SeedableRng;
1142 use rand::rngs::StdRng;
1143 use vortex_array::IntoArray;
1144 use vortex_array::VortexSessionExecute;
1145 use vortex_array::arrays::Constant;
1146 use vortex_array::arrays::Dict;
1147 use vortex_array::arrays::PrimitiveArray;
1148 use vortex_array::expr::stats::Precision;
1149 use vortex_array::expr::stats::Stat;
1150 use vortex_array::expr::stats::StatsProviderExt;
1151 use vortex_array::session::ArraySession;
1152 use vortex_array::validity::Validity;
1153 use vortex_buffer::Buffer;
1154 use vortex_error::VortexResult;
1155 use vortex_fastlanes::BitPacked;
1156 use vortex_fastlanes::FoR;
1157 use vortex_runend::RunEnd;
1158 use vortex_sequence::Sequence;
1159 use vortex_session::VortexSession;
1160 use vortex_sparse::Sparse;
1161
1162 use crate::BtrBlocksCompressor;
1163
1164 static SESSION: LazyLock<VortexSession> =
1165 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
1166
1167 #[test]
1168 fn test_constant_compressed() -> VortexResult<()> {
1169 let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1170 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1171 let btr = BtrBlocksCompressor::default();
1172 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1173 assert!(compressed.is::<Constant>());
1174 Ok(())
1175 }
1176
1177 #[test]
1178 fn test_for_compressed() -> VortexResult<()> {
1179 let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1180 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1181 let btr = BtrBlocksCompressor::default();
1182 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1183 assert!(compressed.is::<FoR>());
1184 Ok(())
1185 }
1186
1187 #[test]
1188 fn test_bitpacking_compressed() -> VortexResult<()> {
1189 let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1190 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1191 let btr = BtrBlocksCompressor::default();
1192 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1193 assert!(compressed.is::<BitPacked>());
1194 assert_eq!(
1195 compressed.statistics().get_as::<u64>(Stat::NullCount),
1196 Some(Precision::exact(0u64))
1197 );
1198 assert_eq!(
1199 compressed.statistics().get_as::<u32>(Stat::Min),
1200 Some(Precision::exact(0u32))
1201 );
1202 assert_eq!(
1203 compressed.statistics().get_as::<u32>(Stat::Max),
1204 Some(Precision::exact(15u32))
1205 );
1206 Ok(())
1207 }
1208
1209 #[test]
1210 fn test_sparse_compressed() -> VortexResult<()> {
1211 let mut values: Vec<i32> = Vec::new();
1212 for i in 0..1000 {
1213 if i % 20 == 0 {
1214 values.push(2_000_000 + (i * 7) % 1000);
1215 } else {
1216 values.push(1_000_000);
1217 }
1218 }
1219 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1220 let btr = BtrBlocksCompressor::default();
1221 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1222 assert!(compressed.is::<Sparse>());
1223 Ok(())
1224 }
1225
1226 #[test]
1227 fn test_dict_compressed() -> VortexResult<()> {
1228 let mut codes = Vec::with_capacity(65_535);
1229 let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1230 .into_iter()
1231 .map(|i| 12340 * i) .collect();
1233
1234 let mut rng = StdRng::seed_from_u64(1u64);
1235 while codes.len() < 64000 {
1236 let run_length = rng.next_u32() % 5;
1237 let value = numbers[rng.next_u32() as usize % numbers.len()];
1238 for _ in 0..run_length {
1239 codes.push(value);
1240 }
1241 }
1242
1243 let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1244 let btr = BtrBlocksCompressor::default();
1245 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1246 assert!(compressed.is::<Dict>());
1247 Ok(())
1248 }
1249
1250 #[test]
1251 fn test_runend_compressed() -> VortexResult<()> {
1252 let mut values: Vec<i32> = Vec::new();
1253 for i in 0..100 {
1254 values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1255 }
1256 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1257 let btr = BtrBlocksCompressor::default();
1258 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1259 assert!(compressed.is::<RunEnd>());
1260 Ok(())
1261 }
1262
1263 #[test]
1264 fn test_sequence_compressed() -> VortexResult<()> {
1265 let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1266 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1267 let btr = BtrBlocksCompressor::default();
1268 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1269 assert!(compressed.is::<Sequence>());
1270 Ok(())
1271 }
1272
1273 #[test]
1274 fn test_rle_compressed() -> VortexResult<()> {
1275 let mut values: Vec<i32> = Vec::new();
1276 for i in 0..1024 {
1277 values.extend(iter::repeat_n(i, 10));
1278 }
1279 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1280 let btr = BtrBlocksCompressor::default();
1281 let compressed = btr.compress(&array.into_array(), &mut SESSION.create_execution_ctx())?;
1282 eprintln!("{}", compressed.display_tree());
1283 assert!(compressed.is::<RunEnd>());
1284 Ok(())
1285 }
1286}