1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::IntoArray;
9use vortex_array::LEGACY_SESSION;
10use vortex_array::ToCanonical;
11use vortex_array::VortexSessionExecute;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::Patched;
14use vortex_array::arrays::primitive::PrimitiveArrayExt;
15use vortex_array::scalar::Scalar;
16use vortex_compressor::builtins::FloatDictScheme;
17use vortex_compressor::builtins::StringDictScheme;
18use vortex_compressor::estimate::CompressionEstimate;
19use vortex_compressor::scheme::AncestorExclusion;
20use vortex_compressor::scheme::ChildSelection;
21use vortex_compressor::scheme::DescendantExclusion;
22#[cfg(feature = "unstable_encodings")]
23use vortex_compressor::scheme::SchemeId;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_fastlanes::BitPacked;
29#[cfg(feature = "unstable_encodings")]
30use vortex_fastlanes::Delta;
31use vortex_fastlanes::FoR;
32use vortex_fastlanes::FoRArrayExt;
33use vortex_fastlanes::RLE;
34use vortex_fastlanes::RLEArrayExt;
35use vortex_fastlanes::USE_EXPERIMENTAL_PATCHES;
36use vortex_fastlanes::bitpack_compress::bit_width_histogram;
37use vortex_fastlanes::bitpack_compress::bitpack_encode;
38use vortex_fastlanes::bitpack_compress::find_best_bit_width;
39use vortex_runend::RunEnd;
40use vortex_runend::compress::runend_encode;
41use vortex_sequence::sequence_encode;
42use vortex_sparse::Sparse;
43use vortex_zigzag::ZigZag;
44use vortex_zigzag::ZigZagArrayExt;
45use vortex_zigzag::zigzag_encode;
46
47use crate::ArrayAndStats;
48use crate::CascadingCompressor;
49use crate::CompressorContext;
50use crate::GenerateStatsOptions;
51use crate::Scheme;
52use crate::SchemeExt;
53use crate::compress_patches;
54use crate::schemes::rle_ancestor_exclusions;
55use crate::schemes::rle_descendant_exclusions;
56
57#[derive(Debug, Copy, Clone, PartialEq, Eq)]
59pub struct FoRScheme;
60
61#[derive(Debug, Copy, Clone, PartialEq, Eq)]
63pub struct ZigZagScheme;
64
65#[derive(Debug, Copy, Clone, PartialEq, Eq)]
67pub struct BitPackingScheme;
68
69#[derive(Debug, Copy, Clone, PartialEq, Eq)]
71pub struct SparseScheme;
72
73#[derive(Debug, Copy, Clone, PartialEq, Eq)]
75pub struct RunEndScheme;
76
77#[derive(Debug, Copy, Clone, PartialEq, Eq)]
79pub struct SequenceScheme;
80
81#[cfg(feature = "pco")]
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub struct PcoScheme;
85
86pub use vortex_compressor::builtins::IntConstantScheme;
88pub use vortex_compressor::builtins::IntDictScheme;
89pub use vortex_compressor::builtins::is_integer_primitive;
90pub use vortex_compressor::stats::IntegerStats;
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub struct IntRLEScheme;
95
96pub(crate) const RUN_LENGTH_THRESHOLD: u32 = 4;
98
99const RUN_END_THRESHOLD: u32 = 4;
101
102impl Scheme for FoRScheme {
103 fn scheme_name(&self) -> &'static str {
104 "vortex.int.for"
105 }
106
107 fn matches(&self, canonical: &Canonical) -> bool {
108 is_integer_primitive(canonical)
109 }
110
111 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
113 vec![
114 AncestorExclusion {
115 ancestor: IntDictScheme.id(),
116 children: ChildSelection::One(1),
117 },
118 AncestorExclusion {
119 ancestor: FloatDictScheme.id(),
120 children: ChildSelection::One(1),
121 },
122 AncestorExclusion {
123 ancestor: StringDictScheme.id(),
124 children: ChildSelection::One(1),
125 },
126 ]
127 }
128
129 fn expected_compression_ratio(
130 &self,
131 data: &mut ArrayAndStats,
132 ctx: CompressorContext,
133 ) -> CompressionEstimate {
134 if ctx.finished_cascading() {
137 return CompressionEstimate::Skip;
138 }
139
140 let stats = data.integer_stats();
141
142 if stats.erased().min_is_zero() {
144 return CompressionEstimate::Skip;
145 }
146
147 let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
149 Some(l) => l + 1,
150 None => return CompressionEstimate::Skip,
152 };
153
154 if let Some(max_log) = stats
158 .erased()
159 .max_ilog2()
160 .filter(|_| !stats.erased().min_is_negative())
162 {
163 let bitpack_bitwidth = max_log + 1;
164 if for_bitwidth >= bitpack_bitwidth {
165 return CompressionEstimate::Skip;
166 }
167 }
168
169 let full_width: u32 = data
170 .array_as_primitive()
171 .ptype()
172 .bit_width()
173 .try_into()
174 .vortex_expect("bit width must fit in u32");
175
176 CompressionEstimate::Ratio(full_width as f64 / for_bitwidth as f64)
177 }
178
179 fn compress(
180 &self,
181 compressor: &CascadingCompressor,
182 data: &mut ArrayAndStats,
183 ctx: CompressorContext,
184 ) -> VortexResult<ArrayRef> {
185 let primitive = data.array().to_primitive();
186 let for_array = FoR::encode(primitive)?;
187 let biased = for_array.encoded().to_primitive();
188
189 let leaf_ctx = ctx.clone().as_leaf();
194 let mut biased_data = ArrayAndStats::new(biased.into_array(), ctx.merged_stats_options());
195 let compressed = BitPackingScheme.compress(compressor, &mut biased_data, leaf_ctx)?;
196
197 let for_compressed = FoR::try_new(compressed, for_array.reference_scalar().clone())?;
199 for_compressed
200 .as_ref()
201 .statistics()
202 .inherit_from(for_array.as_ref().statistics());
203
204 Ok(for_compressed.into_array())
205 }
206}
207
208impl Scheme for ZigZagScheme {
209 fn scheme_name(&self) -> &'static str {
210 "vortex.int.zigzag"
211 }
212
213 fn matches(&self, canonical: &Canonical) -> bool {
214 is_integer_primitive(canonical)
215 }
216
217 fn num_children(&self) -> usize {
219 1
220 }
221
222 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
226 vec![
227 DescendantExclusion {
228 excluded: IntDictScheme.id(),
229 children: ChildSelection::All,
230 },
231 DescendantExclusion {
232 excluded: RunEndScheme.id(),
233 children: ChildSelection::All,
234 },
235 DescendantExclusion {
236 excluded: SparseScheme.id(),
237 children: ChildSelection::All,
238 },
239 ]
240 }
241
242 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
244 vec![
245 AncestorExclusion {
246 ancestor: IntDictScheme.id(),
247 children: ChildSelection::One(1),
248 },
249 AncestorExclusion {
250 ancestor: FloatDictScheme.id(),
251 children: ChildSelection::One(1),
252 },
253 AncestorExclusion {
254 ancestor: StringDictScheme.id(),
255 children: ChildSelection::One(1),
256 },
257 ]
258 }
259
260 fn expected_compression_ratio(
261 &self,
262 data: &mut ArrayAndStats,
263 ctx: CompressorContext,
264 ) -> CompressionEstimate {
265 if ctx.finished_cascading() {
268 return CompressionEstimate::Skip;
269 }
270
271 let stats = data.integer_stats();
272
273 if !stats.erased().min_is_negative() {
275 return CompressionEstimate::Skip;
276 }
277
278 CompressionEstimate::Sample
279 }
280
281 fn compress(
282 &self,
283 compressor: &CascadingCompressor,
284 data: &mut ArrayAndStats,
285 ctx: CompressorContext,
286 ) -> VortexResult<ArrayRef> {
287 let zag = zigzag_encode(data.array_as_primitive())?;
289 let encoded = zag.encoded().to_primitive();
290
291 let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?;
292
293 tracing::debug!("zigzag output: {}", compressed.encoding_id());
294
295 Ok(ZigZag::try_new(compressed)?.into_array())
296 }
297}
298
299impl Scheme for BitPackingScheme {
300 fn scheme_name(&self) -> &'static str {
301 "vortex.int.bitpacking"
302 }
303
304 fn matches(&self, canonical: &Canonical) -> bool {
305 is_integer_primitive(canonical)
306 }
307
308 fn expected_compression_ratio(
309 &self,
310 data: &mut ArrayAndStats,
311 _ctx: CompressorContext,
312 ) -> CompressionEstimate {
313 let stats = data.integer_stats();
314
315 if stats.erased().min_is_negative() {
317 return CompressionEstimate::Skip;
318 }
319
320 CompressionEstimate::Sample
321 }
322
323 fn compress(
324 &self,
325 _compressor: &CascadingCompressor,
326 data: &mut ArrayAndStats,
327 _ctx: CompressorContext,
328 ) -> VortexResult<ArrayRef> {
329 let primitive_array = data.array_as_primitive();
330
331 let histogram = bit_width_histogram(&primitive_array)?;
332 let bw = find_best_bit_width(primitive_array.ptype(), &histogram)?;
333
334 if bw as usize == primitive_array.ptype().bit_width() {
336 return Ok(primitive_array.into_array());
337 }
338
339 let packed = bitpack_encode(&primitive_array, bw, Some(&histogram))?;
341
342 let packed_stats = packed.statistics().to_owned();
343 let ptype = packed.dtype().as_ptype();
344 let mut parts = BitPacked::into_parts(packed);
345
346 let array = if *USE_EXPERIMENTAL_PATCHES {
347 let patches = parts.patches.take();
348 let array = BitPacked::try_new(
350 parts.packed,
351 ptype,
352 parts.validity,
353 None,
354 parts.bit_width,
355 parts.len,
356 parts.offset,
357 )?
358 .into_array();
359
360 match patches {
361 None => array,
362 Some(p) => Patched::from_array_and_patches(
363 array,
364 &p,
365 &mut LEGACY_SESSION.create_execution_ctx(),
366 )?
367 .with_stats_set(packed_stats)
368 .into_array(),
369 }
370 } else {
371 let patches = parts.patches.take().map(compress_patches).transpose()?;
373 parts.patches = patches;
374 BitPacked::try_new(
375 parts.packed,
376 ptype,
377 parts.validity,
378 parts.patches,
379 parts.bit_width,
380 parts.len,
381 parts.offset,
382 )?
383 .with_stats_set(packed_stats)
384 .into_array()
385 };
386
387 Ok(array)
388 }
389}
390
391impl Scheme for SparseScheme {
392 fn scheme_name(&self) -> &'static str {
393 "vortex.int.sparse"
394 }
395
396 fn matches(&self, canonical: &Canonical) -> bool {
397 is_integer_primitive(canonical)
398 }
399
400 fn stats_options(&self) -> GenerateStatsOptions {
401 GenerateStatsOptions {
402 count_distinct_values: true,
403 }
404 }
405
406 fn num_children(&self) -> usize {
408 2
409 }
410
411 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
414 vec![
415 DescendantExclusion {
416 excluded: IntDictScheme.id(),
417 children: ChildSelection::One(1),
418 },
419 DescendantExclusion {
420 excluded: RunEndScheme.id(),
421 children: ChildSelection::One(1),
422 },
423 DescendantExclusion {
424 excluded: IntRLEScheme.id(),
425 children: ChildSelection::One(1),
426 },
427 DescendantExclusion {
428 excluded: SparseScheme.id(),
429 children: ChildSelection::One(1),
430 },
431 ]
432 }
433
434 fn expected_compression_ratio(
435 &self,
436 data: &mut ArrayAndStats,
437 _ctx: CompressorContext,
438 ) -> CompressionEstimate {
439 let len = data.array_len() as f64;
440 let stats = data.integer_stats();
441 let value_count = stats.value_count();
442
443 if value_count == 0 {
445 return CompressionEstimate::Skip;
446 }
447
448 if stats.null_count() as f64 / len > 0.9 {
450 return CompressionEstimate::Ratio(len / value_count as f64);
451 }
452
453 let (_, most_frequent_count) = stats
454 .erased()
455 .most_frequent_value_and_count()
456 .vortex_expect(
457 "this must be present since `SparseScheme` declared that we need distinct values",
458 );
459
460 if most_frequent_count == value_count {
462 return CompressionEstimate::Skip;
463 }
464 debug_assert!(value_count > most_frequent_count);
465
466 let freq = most_frequent_count as f64 / value_count as f64;
468 if freq < 0.9 {
469 return CompressionEstimate::Skip;
470 }
471
472 CompressionEstimate::Ratio(value_count as f64 / (value_count - most_frequent_count) as f64)
474 }
475
476 fn compress(
477 &self,
478 compressor: &CascadingCompressor,
479 data: &mut ArrayAndStats,
480 ctx: CompressorContext,
481 ) -> VortexResult<ArrayRef> {
482 let len = data.array_len();
483 let stats = data.integer_stats().clone();
485 let array = data.array();
486
487 let (most_frequent_value, most_frequent_count) = stats
488 .erased()
489 .most_frequent_value_and_count()
490 .vortex_expect(
491 "this must be present since `SparseScheme` declared that we need distinct values",
492 );
493
494 if most_frequent_count as usize == len {
495 return Ok(ConstantArray::new(
497 Scalar::primitive_value(
498 most_frequent_value,
499 most_frequent_value.ptype(),
500 array.dtype().nullability(),
501 ),
502 len,
503 )
504 .into_array());
505 }
506
507 let sparse_encoded = Sparse::encode(
508 array,
509 Some(Scalar::primitive_value(
510 most_frequent_value,
511 most_frequent_value.ptype(),
512 array.dtype().nullability(),
513 )),
514 )?;
515
516 if let Some(sparse) = sparse_encoded.as_opt::<Sparse>() {
517 let compressed_values = compressor.compress_child(
518 &sparse.patches().values().to_primitive().into_array(),
519 &ctx,
520 self.id(),
521 0,
522 )?;
523
524 let indices = sparse.patches().indices().to_primitive().narrow()?;
525
526 let compressed_indices =
527 compressor.compress_child(&indices.into_array(), &ctx, self.id(), 1)?;
528
529 Sparse::try_new(
530 compressed_indices,
531 compressed_values,
532 sparse.len(),
533 sparse.fill_scalar().clone(),
534 )
535 .map(|a| a.into_array())
536 } else {
537 Ok(sparse_encoded)
538 }
539 }
540}
541
542impl Scheme for RunEndScheme {
543 fn scheme_name(&self) -> &'static str {
544 "vortex.int.runend"
545 }
546
547 fn matches(&self, canonical: &Canonical) -> bool {
548 is_integer_primitive(canonical)
549 }
550
551 fn num_children(&self) -> usize {
553 2
554 }
555
556 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
559 vec![
560 DescendantExclusion {
561 excluded: IntDictScheme.id(),
562 children: ChildSelection::One(1),
563 },
564 DescendantExclusion {
565 excluded: RunEndScheme.id(),
566 children: ChildSelection::One(1),
567 },
568 DescendantExclusion {
569 excluded: IntRLEScheme.id(),
570 children: ChildSelection::One(1),
571 },
572 DescendantExclusion {
573 excluded: SparseScheme.id(),
574 children: ChildSelection::One(1),
575 },
576 ]
577 }
578
579 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
582 vec![
583 AncestorExclusion {
584 ancestor: IntDictScheme.id(),
585 children: ChildSelection::One(0),
586 },
587 AncestorExclusion {
588 ancestor: FloatDictScheme.id(),
589 children: ChildSelection::One(0),
590 },
591 AncestorExclusion {
592 ancestor: StringDictScheme.id(),
593 children: ChildSelection::One(0),
594 },
595 ]
596 }
597
598 fn expected_compression_ratio(
599 &self,
600 data: &mut ArrayAndStats,
601 _ctx: CompressorContext,
602 ) -> CompressionEstimate {
603 if data.integer_stats().average_run_length() < RUN_END_THRESHOLD {
605 return CompressionEstimate::Skip;
606 }
607
608 CompressionEstimate::Sample
609 }
610
611 fn compress(
612 &self,
613 compressor: &CascadingCompressor,
614 data: &mut ArrayAndStats,
615 ctx: CompressorContext,
616 ) -> VortexResult<ArrayRef> {
617 let (ends, values) = runend_encode(data.array_as_primitive().as_view());
619
620 let compressed_values =
621 compressor.compress_child(&values.to_primitive().into_array(), &ctx, self.id(), 0)?;
622
623 let compressed_ends = compressor.compress_child(&ends.into_array(), &ctx, self.id(), 1)?;
624
625 Ok(unsafe {
627 RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len())
628 .into_array()
629 })
630 }
631}
632
633impl Scheme for SequenceScheme {
634 fn scheme_name(&self) -> &'static str {
635 "vortex.int.sequence"
636 }
637
638 fn matches(&self, canonical: &Canonical) -> bool {
639 is_integer_primitive(canonical)
640 }
641
642 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
646 vec![
647 AncestorExclusion {
648 ancestor: IntDictScheme.id(),
649 children: ChildSelection::One(1),
650 },
651 AncestorExclusion {
652 ancestor: FloatDictScheme.id(),
653 children: ChildSelection::One(1),
654 },
655 AncestorExclusion {
656 ancestor: StringDictScheme.id(),
657 children: ChildSelection::One(1),
658 },
659 ]
660 }
661
662 fn expected_compression_ratio(
663 &self,
664 data: &mut ArrayAndStats,
665 ctx: CompressorContext,
666 ) -> CompressionEstimate {
667 if ctx.is_sample() {
670 return CompressionEstimate::Skip;
671 }
672
673 let stats = data.integer_stats();
674
675 if stats.null_count() > 0 {
677 return CompressionEstimate::Skip;
678 }
679
680 if stats
683 .distinct_count()
684 .is_some_and(|count| count as usize != data.array_len())
685 {
686 return CompressionEstimate::Skip;
687 }
688
689 CompressionEstimate::Estimate(Box::new(|_compressor, data, _ctx| {
693 let Some(encoded) = sequence_encode(&data.array_as_primitive())? else {
694 return Ok(CompressionEstimate::Skip);
696 };
697
698 Ok(CompressionEstimate::Ratio(encoded.len() as f64 / 2.0))
702 }))
703 }
704
705 fn compress(
706 &self,
707 _compressor: &CascadingCompressor,
708 data: &mut ArrayAndStats,
709 _ctx: CompressorContext,
710 ) -> VortexResult<ArrayRef> {
711 let stats = data.integer_stats();
712
713 if stats.null_count() > 0 {
714 vortex_bail!("sequence encoding does not support nulls");
715 }
716 sequence_encode(&data.array_as_primitive())?
717 .ok_or_else(|| vortex_err!("cannot sequence encode array"))
718 }
719}
720
721#[cfg(feature = "pco")]
722impl Scheme for PcoScheme {
723 fn scheme_name(&self) -> &'static str {
724 "vortex.int.pco"
725 }
726
727 fn matches(&self, canonical: &Canonical) -> bool {
728 is_integer_primitive(canonical)
729 }
730
731 fn expected_compression_ratio(
732 &self,
733 data: &mut ArrayAndStats,
734 _ctx: CompressorContext,
735 ) -> CompressionEstimate {
736 use vortex_array::dtype::PType;
737
738 if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
740 return CompressionEstimate::Skip;
741 }
742
743 CompressionEstimate::Sample
744 }
745
746 fn compress(
747 &self,
748 _compressor: &CascadingCompressor,
749 data: &mut ArrayAndStats,
750 _ctx: CompressorContext,
751 ) -> VortexResult<ArrayRef> {
752 Ok(vortex_pco::Pco::from_primitive(
753 &data.array_as_primitive(),
754 pco::DEFAULT_COMPRESSION_LEVEL,
755 8192,
756 )?
757 .into_array())
758 }
759}
760
761pub(crate) fn rle_compress(
763 scheme: &dyn Scheme,
764 compressor: &CascadingCompressor,
765 data: &mut ArrayAndStats,
766 ctx: CompressorContext,
767) -> VortexResult<ArrayRef> {
768 let array = data.array_as_primitive();
769 let rle_array = RLE::encode(&array)?;
770
771 let compressed_values = compressor.compress_child(
772 &rle_array.values().to_primitive().into_array(),
773 &ctx,
774 scheme.id(),
775 0,
776 )?;
777
778 #[cfg(feature = "unstable_encodings")]
780 let compressed_indices = try_compress_delta(
781 compressor,
782 &rle_array.indices().to_primitive().narrow()?.into_array(),
783 &ctx,
784 scheme.id(),
785 1,
786 )?;
787
788 #[cfg(not(feature = "unstable_encodings"))]
789 let compressed_indices = compressor.compress_child(
790 &rle_array.indices().to_primitive().narrow()?.into_array(),
791 &ctx,
792 scheme.id(),
793 1,
794 )?;
795
796 let compressed_offsets = compressor.compress_child(
797 &rle_array
798 .values_idx_offsets()
799 .to_primitive()
800 .narrow()?
801 .into_array(),
802 &ctx,
803 scheme.id(),
804 2,
805 )?;
806
807 unsafe {
809 Ok(RLE::new_unchecked(
810 compressed_values,
811 compressed_indices,
812 compressed_offsets,
813 rle_array.offset(),
814 rle_array.len(),
815 )
816 .into_array())
817 }
818}
819
820#[cfg(feature = "unstable_encodings")]
821fn try_compress_delta(
822 compressor: &CascadingCompressor,
823 child: &ArrayRef,
824 parent_ctx: &CompressorContext,
825 parent_id: SchemeId,
826 child_index: usize,
827) -> VortexResult<ArrayRef> {
828 let (bases, deltas) =
829 vortex_fastlanes::delta_compress(&child.to_primitive(), &mut compressor.execution_ctx())?;
830
831 let compressed_bases =
832 compressor.compress_child(&bases.into_array(), parent_ctx, parent_id, child_index)?;
833 let compressed_deltas =
834 compressor.compress_child(&deltas.into_array(), parent_ctx, parent_id, child_index)?;
835
836 Delta::try_new(compressed_bases, compressed_deltas, 0, child.len()).map(IntoArray::into_array)
837}
838
839impl Scheme for IntRLEScheme {
840 fn scheme_name(&self) -> &'static str {
841 "vortex.int.rle"
842 }
843
844 fn matches(&self, canonical: &Canonical) -> bool {
845 is_integer_primitive(canonical)
846 }
847
848 fn num_children(&self) -> usize {
850 3
851 }
852
853 fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
854 rle_descendant_exclusions()
855 }
856
857 fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
858 rle_ancestor_exclusions()
859 }
860
861 fn expected_compression_ratio(
862 &self,
863 data: &mut ArrayAndStats,
864 ctx: CompressorContext,
865 ) -> CompressionEstimate {
866 if ctx.finished_cascading() {
868 return CompressionEstimate::Skip;
869 }
870
871 if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD {
872 return CompressionEstimate::Skip;
873 }
874
875 CompressionEstimate::Sample
876 }
877
878 fn compress(
879 &self,
880 compressor: &CascadingCompressor,
881 data: &mut ArrayAndStats,
882 ctx: CompressorContext,
883 ) -> VortexResult<ArrayRef> {
884 rle_compress(self, compressor, data, ctx)
885 }
886}
887
888#[cfg(test)]
889mod tests {
890 use std::iter;
891
892 use itertools::Itertools;
893 use rand::Rng;
894 use rand::SeedableRng;
895 use rand::rngs::StdRng;
896 use vortex_array::IntoArray;
897 use vortex_array::arrays::Constant;
898 use vortex_array::arrays::Dict;
899 use vortex_array::arrays::Masked;
900 use vortex_array::arrays::PrimitiveArray;
901 use vortex_array::assert_arrays_eq;
902 use vortex_array::validity::Validity;
903 use vortex_buffer::Buffer;
904 use vortex_buffer::BufferMut;
905 use vortex_buffer::buffer;
906 use vortex_compressor::CascadingCompressor;
907 use vortex_error::VortexResult;
908 use vortex_fastlanes::RLE;
909 use vortex_sequence::Sequence;
910
911 use crate::BtrBlocksCompressor;
912 use crate::schemes::integer::IntRLEScheme;
913
914 #[test]
915 fn test_empty() -> VortexResult<()> {
916 let btr = BtrBlocksCompressor::default();
918 let array = PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable);
919 let result = btr.compress(&array.into_array())?;
920
921 assert!(result.is_empty());
922 Ok(())
923 }
924
925 #[test]
926 fn test_dict_encodable() -> VortexResult<()> {
927 let mut codes = BufferMut::<i32>::with_capacity(65_535);
928 let numbers = [0, 10, 50, 100, 1000, 3000]
932 .into_iter()
933 .map(|i| 12340 * i) .collect_vec();
935
936 let mut rng = StdRng::seed_from_u64(1u64);
937 while codes.len() < 64000 {
938 let run_length = rng.next_u32() % 5;
939 let value = numbers[rng.next_u32() as usize % numbers.len()];
940 for _ in 0..run_length {
941 codes.push(value);
942 }
943 }
944
945 let btr = BtrBlocksCompressor::default();
946 let compressed = btr.compress(&codes.freeze().into_array())?;
947 assert!(compressed.is::<Dict>());
948 Ok(())
949 }
950
951 #[test]
952 fn constant_mostly_nulls() -> VortexResult<()> {
953 let array = PrimitiveArray::new(
954 buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
955 Validity::from_iter(vec![
956 false, false, false, false, false, false, false, false, false, false, true,
957 ]),
958 );
959 let validity = array.validity()?;
960
961 let btr = BtrBlocksCompressor::default();
962 let compressed = btr.compress(&array.into_array())?;
963
964 assert!(compressed.is::<Masked>());
965 assert!(compressed.children()[0].is::<Constant>());
966
967 let decoded = compressed;
968 let expected =
969 PrimitiveArray::new(buffer![0u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46], validity).into_array();
970 assert_arrays_eq!(decoded, expected);
971 Ok(())
972 }
973
974 #[test]
975 fn nullable_sequence() -> VortexResult<()> {
976 let values = (0i32..20).step_by(7).collect_vec();
977 let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
978
979 let btr = BtrBlocksCompressor::default();
980 let compressed = btr.compress(&array.into_array())?;
981 assert!(compressed.is::<Sequence>());
982
983 let decoded = compressed;
984 let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array();
985 assert_arrays_eq!(decoded, expected);
986 Ok(())
987 }
988
989 #[test]
990 fn test_rle_compression() -> VortexResult<()> {
991 let mut values = Vec::new();
992 values.extend(iter::repeat_n(42i32, 100));
993 values.extend(iter::repeat_n(123i32, 200));
994 values.extend(iter::repeat_n(987i32, 150));
995
996 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
997 let compressor = CascadingCompressor::new(vec![&IntRLEScheme]);
998 let compressed = compressor.compress(&array.into_array())?;
999 assert!(compressed.is::<RLE>());
1000
1001 let expected = Buffer::copy_from(&values).into_array();
1002 assert_arrays_eq!(compressed, expected);
1003 Ok(())
1004 }
1005
1006 #[test_with::env(CI)]
1007 #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
1008 fn compress_large_int() -> VortexResult<()> {
1009 const NUM_LISTS: usize = 10_000;
1010 const ELEMENTS_PER_LIST: usize = 5_000;
1011
1012 let prim = (0..NUM_LISTS)
1013 .flat_map(|list_idx| {
1014 (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64)
1015 })
1016 .collect::<PrimitiveArray>()
1017 .into_array();
1018
1019 let btr = BtrBlocksCompressor::default();
1020 btr.compress(&prim)?;
1021
1022 Ok(())
1023 }
1024}
1025
1026#[cfg(test)]
1028mod scheme_selection_tests {
1029 use std::iter;
1030
1031 use rand::Rng;
1032 use rand::SeedableRng;
1033 use rand::rngs::StdRng;
1034 use vortex_array::IntoArray;
1035 use vortex_array::arrays::Constant;
1036 use vortex_array::arrays::Dict;
1037 use vortex_array::arrays::PrimitiveArray;
1038 use vortex_array::expr::stats::Precision;
1039 use vortex_array::expr::stats::Stat;
1040 use vortex_array::expr::stats::StatsProviderExt;
1041 use vortex_array::validity::Validity;
1042 use vortex_buffer::Buffer;
1043 use vortex_error::VortexResult;
1044 use vortex_fastlanes::BitPacked;
1045 use vortex_fastlanes::FoR;
1046 use vortex_runend::RunEnd;
1047 use vortex_sequence::Sequence;
1048 use vortex_sparse::Sparse;
1049
1050 use crate::BtrBlocksCompressor;
1051
1052 #[test]
1053 fn test_constant_compressed() -> VortexResult<()> {
1054 let values: Vec<i32> = iter::repeat_n(42, 100).collect();
1055 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1056 let btr = BtrBlocksCompressor::default();
1057 let compressed = btr.compress(&array.into_array())?;
1058 assert!(compressed.is::<Constant>());
1059 Ok(())
1060 }
1061
1062 #[test]
1063 fn test_for_compressed() -> VortexResult<()> {
1064 let values: Vec<i32> = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect();
1065 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1066 let btr = BtrBlocksCompressor::default();
1067 let compressed = btr.compress(&array.into_array())?;
1068 assert!(compressed.is::<FoR>());
1069 Ok(())
1070 }
1071
1072 #[test]
1073 fn test_bitpacking_compressed() -> VortexResult<()> {
1074 let values: Vec<u32> = (0..1000).map(|i| i % 16).collect();
1075 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1076 let btr = BtrBlocksCompressor::default();
1077 let compressed = btr.compress(&array.into_array())?;
1078 assert!(compressed.is::<BitPacked>());
1079 assert_eq!(
1080 compressed.statistics().get_as::<u64>(Stat::NullCount),
1081 Some(Precision::exact(0u64))
1082 );
1083 assert_eq!(
1084 compressed.statistics().get_as::<u32>(Stat::Min),
1085 Some(Precision::exact(0u32))
1086 );
1087 assert_eq!(
1088 compressed.statistics().get_as::<u32>(Stat::Max),
1089 Some(Precision::exact(15u32))
1090 );
1091 Ok(())
1092 }
1093
1094 #[test]
1095 fn test_sparse_compressed() -> VortexResult<()> {
1096 let mut values: Vec<i32> = Vec::new();
1097 for i in 0..1000 {
1098 if i % 20 == 0 {
1099 values.push(2_000_000 + (i * 7) % 1000);
1100 } else {
1101 values.push(1_000_000);
1102 }
1103 }
1104 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1105 let btr = BtrBlocksCompressor::default();
1106 let compressed = btr.compress(&array.into_array())?;
1107 assert!(compressed.is::<Sparse>());
1108 Ok(())
1109 }
1110
1111 #[test]
1112 fn test_dict_compressed() -> VortexResult<()> {
1113 let mut codes = Vec::with_capacity(65_535);
1114 let numbers: Vec<i32> = [0, 10, 50, 100, 1000, 3000]
1115 .into_iter()
1116 .map(|i| 12340 * i) .collect();
1118
1119 let mut rng = StdRng::seed_from_u64(1u64);
1120 while codes.len() < 64000 {
1121 let run_length = rng.next_u32() % 5;
1122 let value = numbers[rng.next_u32() as usize % numbers.len()];
1123 for _ in 0..run_length {
1124 codes.push(value);
1125 }
1126 }
1127
1128 let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable);
1129 let btr = BtrBlocksCompressor::default();
1130 let compressed = btr.compress(&array.into_array())?;
1131 assert!(compressed.is::<Dict>());
1132 Ok(())
1133 }
1134
1135 #[test]
1136 fn test_runend_compressed() -> VortexResult<()> {
1137 let mut values: Vec<i32> = Vec::new();
1138 for i in 0..100 {
1139 values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10));
1140 }
1141 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1142 let btr = BtrBlocksCompressor::default();
1143 let compressed = btr.compress(&array.into_array())?;
1144 assert!(compressed.is::<RunEnd>());
1145 Ok(())
1146 }
1147
1148 #[test]
1149 fn test_sequence_compressed() -> VortexResult<()> {
1150 let values: Vec<i32> = (0..1000).map(|i| i * 7).collect();
1151 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1152 let btr = BtrBlocksCompressor::default();
1153 let compressed = btr.compress(&array.into_array())?;
1154 assert!(compressed.is::<Sequence>());
1155 Ok(())
1156 }
1157
1158 #[test]
1159 fn test_rle_compressed() -> VortexResult<()> {
1160 let mut values: Vec<i32> = Vec::new();
1161 for i in 0..1024 {
1162 values.extend(iter::repeat_n(i, 10));
1163 }
1164 let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
1165 let btr = BtrBlocksCompressor::default();
1166 let compressed = btr.compress(&array.into_array())?;
1167 eprintln!("{}", compressed.display_tree());
1168 assert!(compressed.is::<RunEnd>());
1169 Ok(())
1170 }
1171}