1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::CanonicalValidity;
9use vortex_array::ExecutionCtx;
10use vortex_array::IntoArray;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::ExtensionArray;
13use vortex_array::arrays::FixedSizeListArray;
14use vortex_array::arrays::ListArray;
15use vortex_array::arrays::ListViewArray;
16use vortex_array::arrays::PrimitiveArray;
17use vortex_array::arrays::StructArray;
18use vortex_array::arrays::extension::ExtensionArrayExt;
19use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
20use vortex_array::arrays::list::ListArrayExt;
21use vortex_array::arrays::listview::ListViewArrayExt;
22use vortex_array::arrays::listview::list_from_list_view;
23use vortex_array::arrays::primitive::PrimitiveArrayExt;
24use vortex_array::arrays::scalar_fn::AnyScalarFn;
25use vortex_array::arrays::struct_::StructArrayExt;
26use vortex_array::dtype::DType;
27use vortex_array::dtype::Nullability;
28use vortex_array::scalar::Scalar;
29use vortex_error::VortexResult;
30use vortex_error::vortex_bail;
31
32use crate::builtins::IntDictScheme;
33use crate::ctx::CompressorContext;
34use crate::estimate::CompressionEstimate;
35use crate::estimate::DeferredEstimate;
36use crate::estimate::EstimateScore;
37use crate::estimate::EstimateVerdict;
38use crate::estimate::WinnerEstimate;
39use crate::estimate::estimate_compression_ratio_with_sampling;
40use crate::estimate::is_better_score;
41use crate::scheme::ChildSelection;
42use crate::scheme::DescendantExclusion;
43use crate::scheme::Scheme;
44use crate::scheme::SchemeExt;
45use crate::scheme::SchemeId;
46use crate::stats::ArrayAndStats;
47use crate::stats::GenerateStatsOptions;
48use crate::trace;
49
50pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
52 name: "vortex.compressor.root",
53};
54
55mod root_list_children {
57 pub const OFFSETS: usize = 1;
59 pub const SIZES: usize = 2;
61}
62
63#[derive(Debug, Clone)]
78pub struct CascadingCompressor {
79 schemes: Vec<&'static dyn Scheme>,
81
82 root_exclusions: Vec<DescendantExclusion>,
85}
86
87impl CascadingCompressor {
88 pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
93 let root_exclusions = vec![DescendantExclusion {
96 excluded: IntDictScheme.id(),
97 children: ChildSelection::One(root_list_children::OFFSETS),
98 }];
99 Self {
100 schemes,
101 root_exclusions,
102 }
103 }
104
105 pub fn compress(
113 &self,
114 array: &ArrayRef,
115 exec_ctx: &mut ExecutionCtx,
116 ) -> VortexResult<ArrayRef> {
117 let before_nbytes = array.nbytes();
118 let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
119 let _enter = span.enter();
120
121 let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
122 let compact = canonical.compact()?;
123 let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
124
125 trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
126
127 Ok(compressed)
128 }
129
130 pub fn compress_child(
140 &self,
141 child: &ArrayRef,
142 parent_ctx: &CompressorContext,
143 parent_id: SchemeId,
144 child_index: usize,
145 exec_ctx: &mut ExecutionCtx,
146 ) -> VortexResult<ArrayRef> {
147 if parent_ctx.finished_cascading() {
148 trace::cascade_exhausted(parent_id, child_index);
149 return Ok(child.clone());
150 }
151
152 let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
153 let compact = canonical.compact()?;
154
155 let child_ctx = parent_ctx
156 .clone()
157 .descend_with_scheme(parent_id, child_index);
158 self.compress_canonical(compact, child_ctx, exec_ctx)
159 }
160
161 fn compress_canonical(
167 &self,
168 array: Canonical,
169 compress_ctx: CompressorContext,
170 exec_ctx: &mut ExecutionCtx,
171 ) -> VortexResult<ArrayRef> {
172 match array {
173 Canonical::Null(null_array) => Ok(null_array.into_array()),
174 Canonical::Bool(bool_array) => {
175 self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
176 }
177 Canonical::Primitive(primitive) => {
178 self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
179 }
180 Canonical::Decimal(decimal) => {
181 self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
182 }
183 Canonical::Struct(struct_array) => {
184 let fields = struct_array
185 .iter_unmasked_fields()
186 .map(|field| self.compress(field, exec_ctx))
187 .collect::<Result<Vec<_>, _>>()?;
188
189 Ok(StructArray::try_new(
190 struct_array.names().clone(),
191 fields,
192 struct_array.len(),
193 struct_array.validity()?,
194 )?
195 .into_array())
196 }
197 Canonical::List(list_view_array) => {
198 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
199 let list_array = list_from_list_view(list_view_array)?;
200 self.compress_list_array(list_array, compress_ctx, exec_ctx)
201 } else {
202 self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
203 }
204 }
205 Canonical::FixedSizeList(fsl_array) => {
206 let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
207
208 Ok(FixedSizeListArray::try_new(
209 compressed_elems,
210 fsl_array.list_size(),
211 fsl_array.validity()?,
212 fsl_array.len(),
213 )?
214 .into_array())
215 }
216 Canonical::VarBinView(strings) => {
217 if strings
218 .dtype()
219 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
220 {
221 self.choose_and_compress(Canonical::VarBinView(strings), compress_ctx, exec_ctx)
222 } else {
223 Ok(strings.into_array())
225 }
226 }
227 Canonical::Extension(ext_array) => {
228 let before_nbytes = ext_array.as_ref().nbytes();
229
230 let result = self.choose_and_compress(
232 Canonical::Extension(ext_array.clone()),
233 compress_ctx,
234 exec_ctx,
235 )?;
236 if result.nbytes() < before_nbytes {
237 return Ok(result);
238 }
239
240 if result.is::<AnyScalarFn>() {
242 return Ok(result);
243 }
244
245 let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
247
248 Ok(
249 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
250 .into_array(),
251 )
252 }
253 Canonical::Variant(_) => {
254 vortex_bail!("Variant arrays can not be compressed")
255 }
256 }
257 }
258
259 fn choose_and_compress(
273 &self,
274 canonical: Canonical,
275 compress_ctx: CompressorContext,
276 exec_ctx: &mut ExecutionCtx,
277 ) -> VortexResult<ArrayRef> {
278 let eligible_schemes: Vec<&'static dyn Scheme> = self
279 .schemes
280 .iter()
281 .copied()
282 .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
283 .collect();
284
285 let array: ArrayRef = canonical.into();
286
287 if eligible_schemes.is_empty() || array.is_empty() {
288 return Ok(array);
289 }
290
291 if array.all_invalid(exec_ctx)? {
292 return Ok(
293 ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
294 );
295 }
296
297 let before_nbytes = array.nbytes();
298
299 let merged_opts = eligible_schemes
300 .iter()
301 .fold(GenerateStatsOptions::default(), |acc, s| {
302 acc.merge(s.stats_options())
303 });
304 let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
305
306 let data = ArrayAndStats::new(array, merged_opts);
307
308 let Some((winner, winner_estimate)) =
309 self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
310 else {
311 return Ok(data.into_array());
312 };
313
314 let error_ctx = trace::enabled_error_context(&compress_ctx);
317 let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
318 let compressed = winner
319 .compress(self, &data, compress_ctx, exec_ctx)
320 .inspect_err(|err| {
321 trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
325 })?;
326
327 let after_nbytes = compressed.nbytes();
328 let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
329
330 let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
332
333 trace::record_winner_compress_result(
334 after_nbytes,
335 winner_estimate.trace_ratio(),
336 actual_ratio,
337 accepted,
338 );
339
340 if accepted {
341 Ok(compressed)
342 } else {
343 Ok(data.into_array())
344 }
345 }
346
347 fn choose_best_scheme(
363 &self,
364 schemes: &[&'static dyn Scheme],
365 data: &ArrayAndStats,
366 compress_ctx: CompressorContext,
367 exec_ctx: &mut ExecutionCtx,
368 ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
369 let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
370 let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
371
372 {
374 let _verdict_pass = trace::verdict_pass_span().entered();
375 for &scheme in schemes {
376 match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
377 CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
378 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
379 return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
380 }
381 CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
382 let score = EstimateScore::FiniteCompression(ratio);
383
384 if is_better_score(score, best.as_ref()) {
385 best = Some((scheme, score));
386 }
387 }
388 CompressionEstimate::Deferred(deferred_estimate) => {
389 deferred.push((scheme, deferred_estimate));
390 }
391 }
392 }
393 }
394
395 for (scheme, deferred_estimate) in deferred {
398 let _span = trace::scheme_eval_span(scheme.id()).entered();
399 let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
400 match deferred_estimate {
401 DeferredEstimate::Sample => {
402 let score = estimate_compression_ratio_with_sampling(
403 self,
404 scheme,
405 data.array(),
406 compress_ctx.clone(),
407 exec_ctx,
408 )?;
409
410 if is_better_score(score, best.as_ref()) {
411 best = Some((scheme, score));
412 }
413 }
414 DeferredEstimate::Callback(callback) => {
415 match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
416 EstimateVerdict::Skip => {}
417 EstimateVerdict::AlwaysUse => {
418 return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
419 }
420 EstimateVerdict::Ratio(ratio) => {
421 let score = EstimateScore::FiniteCompression(ratio);
422
423 if is_better_score(score, best.as_ref()) {
424 best = Some((scheme, score));
425 }
426 }
427 }
428 }
429 }
430 }
431
432 Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
433 }
434
435 fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
439 let id = candidate.id();
440 let history = ctx.cascade_history();
441
442 if history.iter().any(|&(sid, _)| sid == id) {
444 return true;
445 }
446
447 let mut iter = history.iter().copied().peekable();
448
449 if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
452 && self
453 .root_exclusions
454 .iter()
455 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
456 {
457 return true;
458 }
459
460 for (ancestor_id, child_idx) in iter {
462 if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
463 && ancestor
464 .descendant_exclusions()
465 .iter()
466 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
467 {
468 return true;
469 }
470 }
471
472 for rule in candidate.ancestor_exclusions() {
474 if history
475 .iter()
476 .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
477 {
478 return true;
479 }
480 }
481
482 false
483 }
484
485 fn compress_list_array(
487 &self,
488 list_array: ListArray,
489 compress_ctx: CompressorContext,
490 exec_ctx: &mut ExecutionCtx,
491 ) -> VortexResult<ArrayRef> {
492 let list_array = list_array.reset_offsets(true)?;
493
494 let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
495
496 let offset_ctx =
498 compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
499 let list_offsets_primitive = list_array
500 .offsets()
501 .clone()
502 .execute::<PrimitiveArray>(exec_ctx)?
503 .narrow()?;
504 let compressed_offsets = self.compress_canonical(
505 Canonical::Primitive(list_offsets_primitive),
506 offset_ctx,
507 exec_ctx,
508 )?;
509
510 Ok(
511 ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
512 .into_array(),
513 )
514 }
515
516 fn compress_list_view_array(
519 &self,
520 list_view: ListViewArray,
521 compress_ctx: CompressorContext,
522 exec_ctx: &mut ExecutionCtx,
523 ) -> VortexResult<ArrayRef> {
524 let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
525
526 let offset_ctx = compress_ctx
527 .clone()
528 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
529 let list_view_offsets_primitive = list_view
530 .offsets()
531 .clone()
532 .execute::<PrimitiveArray>(exec_ctx)?
533 .narrow()?;
534 let compressed_offsets = self.compress_canonical(
535 Canonical::Primitive(list_view_offsets_primitive),
536 offset_ctx,
537 exec_ctx,
538 )?;
539
540 let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
541 let list_view_sizes_primitive = list_view
542 .sizes()
543 .clone()
544 .execute::<PrimitiveArray>(exec_ctx)?
545 .narrow()?;
546 let compressed_sizes = self.compress_canonical(
547 Canonical::Primitive(list_view_sizes_primitive),
548 sizes_ctx,
549 exec_ctx,
550 )?;
551
552 Ok(ListViewArray::try_new(
553 compressed_elems,
554 compressed_offsets,
555 compressed_sizes,
556 list_view.validity()?,
557 )?
558 .into_array())
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use std::sync::LazyLock;
565
566 use parking_lot::Mutex;
567 use vortex_array::ArrayRef;
568 use vortex_array::Canonical;
569 use vortex_array::VortexSessionExecute;
570 use vortex_array::arrays::BoolArray;
571 use vortex_array::arrays::Constant;
572 use vortex_array::arrays::NullArray;
573 use vortex_array::arrays::PrimitiveArray;
574 use vortex_array::session::ArraySession;
575 use vortex_array::validity::Validity;
576 use vortex_buffer::buffer;
577 use vortex_session::VortexSession;
578
579 use super::*;
580 use crate::builtins::FloatDictScheme;
581 use crate::builtins::IntDictScheme;
582 use crate::builtins::StringDictScheme;
583 use crate::ctx::CompressorContext;
584 use crate::estimate::CompressionEstimate;
585 use crate::estimate::DeferredEstimate;
586 use crate::estimate::EstimateScore;
587 use crate::estimate::EstimateVerdict;
588 use crate::estimate::WinnerEstimate;
589 use crate::scheme::SchemeExt;
590
591 static SESSION: LazyLock<VortexSession> =
592 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
593
594 fn compressor() -> CascadingCompressor {
595 CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
596 }
597
598 fn estimate_test_data() -> ArrayAndStats {
599 let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
600 ArrayAndStats::new(array, GenerateStatsOptions::default())
601 }
602
603 fn matches_integer_primitive(canonical: &Canonical) -> bool {
604 matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
605 }
606
607 #[derive(Debug)]
608 struct DirectRatioScheme;
609
610 impl Scheme for DirectRatioScheme {
611 fn scheme_name(&self) -> &'static str {
612 "test.direct_ratio"
613 }
614
615 fn matches(&self, canonical: &Canonical) -> bool {
616 matches_integer_primitive(canonical)
617 }
618
619 fn expected_compression_ratio(
620 &self,
621 _data: &ArrayAndStats,
622 _compress_ctx: CompressorContext,
623 _exec_ctx: &mut ExecutionCtx,
624 ) -> CompressionEstimate {
625 CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
626 }
627
628 fn compress(
629 &self,
630 _compressor: &CascadingCompressor,
631 _data: &ArrayAndStats,
632 _compress_ctx: CompressorContext,
633 _exec_ctx: &mut ExecutionCtx,
634 ) -> VortexResult<ArrayRef> {
635 unreachable!("test helper should never be selected for compression")
636 }
637 }
638
639 #[derive(Debug)]
640 struct ImmediateAlwaysUseScheme;
641
642 impl Scheme for ImmediateAlwaysUseScheme {
643 fn scheme_name(&self) -> &'static str {
644 "test.immediate_always_use"
645 }
646
647 fn matches(&self, canonical: &Canonical) -> bool {
648 matches_integer_primitive(canonical)
649 }
650
651 fn expected_compression_ratio(
652 &self,
653 _data: &ArrayAndStats,
654 _compress_ctx: CompressorContext,
655 _exec_ctx: &mut ExecutionCtx,
656 ) -> CompressionEstimate {
657 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
658 }
659
660 fn compress(
661 &self,
662 _compressor: &CascadingCompressor,
663 _data: &ArrayAndStats,
664 _compress_ctx: CompressorContext,
665 _exec_ctx: &mut ExecutionCtx,
666 ) -> VortexResult<ArrayRef> {
667 unreachable!("test helper should never be selected for compression")
668 }
669 }
670
671 #[derive(Debug)]
672 struct CallbackAlwaysUseScheme;
673
674 impl Scheme for CallbackAlwaysUseScheme {
675 fn scheme_name(&self) -> &'static str {
676 "test.callback_always_use"
677 }
678
679 fn matches(&self, canonical: &Canonical) -> bool {
680 matches_integer_primitive(canonical)
681 }
682
683 fn expected_compression_ratio(
684 &self,
685 _data: &ArrayAndStats,
686 _compress_ctx: CompressorContext,
687 _exec_ctx: &mut ExecutionCtx,
688 ) -> CompressionEstimate {
689 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
690 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
691 )))
692 }
693
694 fn compress(
695 &self,
696 _compressor: &CascadingCompressor,
697 _data: &ArrayAndStats,
698 _compress_ctx: CompressorContext,
699 _exec_ctx: &mut ExecutionCtx,
700 ) -> VortexResult<ArrayRef> {
701 unreachable!("test helper should never be selected for compression")
702 }
703 }
704
705 #[derive(Debug)]
706 struct CallbackSkipScheme;
707
708 impl Scheme for CallbackSkipScheme {
709 fn scheme_name(&self) -> &'static str {
710 "test.callback_skip"
711 }
712
713 fn matches(&self, canonical: &Canonical) -> bool {
714 matches_integer_primitive(canonical)
715 }
716
717 fn expected_compression_ratio(
718 &self,
719 _data: &ArrayAndStats,
720 _compress_ctx: CompressorContext,
721 _exec_ctx: &mut ExecutionCtx,
722 ) -> CompressionEstimate {
723 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
724 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
725 )))
726 }
727
728 fn compress(
729 &self,
730 _compressor: &CascadingCompressor,
731 _data: &ArrayAndStats,
732 _compress_ctx: CompressorContext,
733 _exec_ctx: &mut ExecutionCtx,
734 ) -> VortexResult<ArrayRef> {
735 unreachable!("test helper should never be selected for compression")
736 }
737 }
738
739 #[derive(Debug)]
740 struct CallbackRatioScheme;
741
742 impl Scheme for CallbackRatioScheme {
743 fn scheme_name(&self) -> &'static str {
744 "test.callback_ratio"
745 }
746
747 fn matches(&self, canonical: &Canonical) -> bool {
748 matches_integer_primitive(canonical)
749 }
750
751 fn expected_compression_ratio(
752 &self,
753 _data: &ArrayAndStats,
754 _compress_ctx: CompressorContext,
755 _exec_ctx: &mut ExecutionCtx,
756 ) -> CompressionEstimate {
757 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
758 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
759 )))
760 }
761
762 fn compress(
763 &self,
764 _compressor: &CascadingCompressor,
765 _data: &ArrayAndStats,
766 _compress_ctx: CompressorContext,
767 _exec_ctx: &mut ExecutionCtx,
768 ) -> VortexResult<ArrayRef> {
769 unreachable!("test helper should never be selected for compression")
770 }
771 }
772
773 #[derive(Debug)]
774 struct HugeRatioScheme;
775
776 impl Scheme for HugeRatioScheme {
777 fn scheme_name(&self) -> &'static str {
778 "test.huge_ratio"
779 }
780
781 fn matches(&self, canonical: &Canonical) -> bool {
782 matches_integer_primitive(canonical)
783 }
784
785 fn expected_compression_ratio(
786 &self,
787 _data: &ArrayAndStats,
788 _compress_ctx: CompressorContext,
789 _exec_ctx: &mut ExecutionCtx,
790 ) -> CompressionEstimate {
791 CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
792 }
793
794 fn compress(
795 &self,
796 _compressor: &CascadingCompressor,
797 _data: &ArrayAndStats,
798 _compress_ctx: CompressorContext,
799 _exec_ctx: &mut ExecutionCtx,
800 ) -> VortexResult<ArrayRef> {
801 unreachable!("test helper should never be selected for compression")
802 }
803 }
804
805 #[derive(Debug)]
806 struct ZeroBytesSamplingScheme;
807
808 impl Scheme for ZeroBytesSamplingScheme {
809 fn scheme_name(&self) -> &'static str {
810 "test.zero_bytes_sampling"
811 }
812
813 fn matches(&self, canonical: &Canonical) -> bool {
814 matches_integer_primitive(canonical)
815 }
816
817 fn expected_compression_ratio(
818 &self,
819 _data: &ArrayAndStats,
820 _compress_ctx: CompressorContext,
821 _exec_ctx: &mut ExecutionCtx,
822 ) -> CompressionEstimate {
823 CompressionEstimate::Deferred(DeferredEstimate::Sample)
824 }
825
826 fn compress(
827 &self,
828 _compressor: &CascadingCompressor,
829 data: &ArrayAndStats,
830 _compress_ctx: CompressorContext,
831 _exec_ctx: &mut ExecutionCtx,
832 ) -> VortexResult<ArrayRef> {
833 Ok(NullArray::new(data.array().len()).into_array())
834 }
835 }
836
837 #[test]
838 fn test_self_exclusion() {
839 let c = compressor();
840 let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
841
842 assert!(c.is_excluded(&IntDictScheme, &ctx));
844 }
845
846 #[test]
847 fn test_root_exclusion_list_offsets() {
848 let c = compressor();
849 let ctx = CompressorContext::default()
850 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
851
852 assert!(c.is_excluded(&IntDictScheme, &ctx));
854 }
855
856 #[test]
857 fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
858 let c = compressor();
859 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
861
862 assert!(c.is_excluded(&IntDictScheme, &ctx));
864 }
865
866 #[test]
867 fn test_push_rule_float_dict_excludes_int_dict_from_values() {
868 let c = compressor();
869 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
871
872 assert!(c.is_excluded(&IntDictScheme, &ctx));
875 }
876
877 #[test]
878 fn test_no_exclusion_without_history() {
879 let c = compressor();
880 let ctx = CompressorContext::default();
881
882 assert!(!c.is_excluded(&IntDictScheme, &ctx));
884 }
885
886 #[test]
887 fn immediate_always_use_wins_immediately() -> VortexResult<()> {
888 let compressor =
889 CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
890 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
891 let data = estimate_test_data();
892 let mut exec_ctx = SESSION.create_execution_ctx();
893
894 let winner = compressor.choose_best_scheme(
895 &schemes,
896 &data,
897 CompressorContext::new(),
898 &mut exec_ctx,
899 )?;
900
901 assert!(matches!(
902 winner,
903 Some((scheme, WinnerEstimate::AlwaysUse))
904 if scheme.id() == ImmediateAlwaysUseScheme.id()
905 ));
906 Ok(())
907 }
908
909 #[test]
910 fn callback_always_use_wins_immediately() -> VortexResult<()> {
911 let compressor =
912 CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
913 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
914 let data = estimate_test_data();
915 let mut exec_ctx = SESSION.create_execution_ctx();
916
917 let winner = compressor.choose_best_scheme(
918 &schemes,
919 &data,
920 CompressorContext::new(),
921 &mut exec_ctx,
922 )?;
923
924 assert!(matches!(
925 winner,
926 Some((scheme, WinnerEstimate::AlwaysUse))
927 if scheme.id() == CallbackAlwaysUseScheme.id()
928 ));
929 Ok(())
930 }
931
932 #[test]
933 fn callback_skip_is_ignored() -> VortexResult<()> {
934 let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
935 let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
936 let data = estimate_test_data();
937 let mut exec_ctx = SESSION.create_execution_ctx();
938
939 let winner = compressor.choose_best_scheme(
940 &schemes,
941 &data,
942 CompressorContext::new(),
943 &mut exec_ctx,
944 )?;
945
946 assert!(matches!(
947 winner,
948 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
949 if scheme.id() == DirectRatioScheme.id()
950 ));
951 Ok(())
952 }
953
954 #[test]
955 fn callback_ratio_competes_numerically() -> VortexResult<()> {
956 let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
957 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
958 let data = estimate_test_data();
959 let mut exec_ctx = SESSION.create_execution_ctx();
960
961 let winner = compressor.choose_best_scheme(
962 &schemes,
963 &data,
964 CompressorContext::new(),
965 &mut exec_ctx,
966 )?;
967
968 assert!(matches!(
969 winner,
970 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
971 if scheme.id() == CallbackRatioScheme.id()
972 ));
973 Ok(())
974 }
975
976 #[test]
977 fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
978 let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
979 let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
980 let data = estimate_test_data();
981 let mut exec_ctx = SESSION.create_execution_ctx();
982
983 let winner = compressor.choose_best_scheme(
984 &schemes,
985 &data,
986 CompressorContext::new(),
987 &mut exec_ctx,
988 )?;
989
990 assert!(matches!(
991 winner,
992 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
993 if scheme.id() == HugeRatioScheme.id()
994 ));
995 Ok(())
996 }
997
998 #[test]
999 fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
1000 let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
1001 let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
1002 let data = estimate_test_data();
1003 let mut exec_ctx = SESSION.create_execution_ctx();
1004
1005 let winner = compressor.choose_best_scheme(
1006 &schemes,
1007 &data,
1008 CompressorContext::new(),
1009 &mut exec_ctx,
1010 )?;
1011
1012 assert!(matches!(
1013 winner,
1014 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1015 if scheme.id() == HugeRatioScheme.id()
1016 ));
1017 Ok(())
1018 }
1019
1020 #[test]
1021 fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
1022 let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
1023 let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
1024 let data = estimate_test_data();
1025 let mut exec_ctx = SESSION.create_execution_ctx();
1026
1027 let winner = compressor.choose_best_scheme(
1028 &schemes,
1029 &data,
1030 CompressorContext::new(),
1031 &mut exec_ctx,
1032 )?;
1033
1034 assert!(winner.is_none());
1035 Ok(())
1036 }
1037
1038 static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
1042 static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
1043
1044 #[derive(Debug)]
1045 struct ThresholdObservingScheme;
1046
1047 impl Scheme for ThresholdObservingScheme {
1048 fn scheme_name(&self) -> &'static str {
1049 "test.threshold_observing"
1050 }
1051
1052 fn matches(&self, canonical: &Canonical) -> bool {
1053 matches_integer_primitive(canonical)
1054 }
1055
1056 fn expected_compression_ratio(
1057 &self,
1058 _data: &ArrayAndStats,
1059 _compress_ctx: CompressorContext,
1060 _exec_ctx: &mut ExecutionCtx,
1061 ) -> CompressionEstimate {
1062 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1063 |_compressor, _data, best_so_far, _ctx, _exec_ctx| {
1064 *OBSERVED_THRESHOLD.lock() = Some(best_so_far);
1065 Ok(EstimateVerdict::Skip)
1066 },
1067 )))
1068 }
1069
1070 fn compress(
1071 &self,
1072 _compressor: &CascadingCompressor,
1073 _data: &ArrayAndStats,
1074 _compress_ctx: CompressorContext,
1075 _exec_ctx: &mut ExecutionCtx,
1076 ) -> VortexResult<ArrayRef> {
1077 unreachable!("test helper should never be selected for compression")
1078 }
1079 }
1080
1081 #[derive(Debug)]
1082 struct CallbackMatchingRatioScheme;
1083
1084 impl Scheme for CallbackMatchingRatioScheme {
1085 fn scheme_name(&self) -> &'static str {
1086 "test.callback_matching_ratio"
1087 }
1088
1089 fn matches(&self, canonical: &Canonical) -> bool {
1090 matches_integer_primitive(canonical)
1091 }
1092
1093 fn expected_compression_ratio(
1094 &self,
1095 _data: &ArrayAndStats,
1096 _compress_ctx: CompressorContext,
1097 _exec_ctx: &mut ExecutionCtx,
1098 ) -> CompressionEstimate {
1099 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1100 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
1101 )))
1102 }
1103
1104 fn compress(
1105 &self,
1106 _compressor: &CascadingCompressor,
1107 _data: &ArrayAndStats,
1108 _compress_ctx: CompressorContext,
1109 _exec_ctx: &mut ExecutionCtx,
1110 ) -> VortexResult<ArrayRef> {
1111 unreachable!("test helper should never be selected for compression")
1112 }
1113 }
1114
1115 #[test]
1116 fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
1117 let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
1121 let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
1122 let data = estimate_test_data();
1123 let mut exec_ctx = SESSION.create_execution_ctx();
1124
1125 let winner = compressor.choose_best_scheme(
1126 &schemes,
1127 &data,
1128 CompressorContext::new(),
1129 &mut exec_ctx,
1130 )?;
1131
1132 assert!(matches!(
1133 winner,
1134 Some((scheme, WinnerEstimate::AlwaysUse))
1135 if scheme.id() == CallbackAlwaysUseScheme.id()
1136 ));
1137 Ok(())
1138 }
1139
1140 #[test]
1141 fn threshold_reflects_pass_one_best() -> VortexResult<()> {
1142 let _guard = OBSERVER_LOCK.lock();
1143 *OBSERVED_THRESHOLD.lock() = None;
1144
1145 let compressor =
1146 CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
1147 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
1148 let data = estimate_test_data();
1149 let mut exec_ctx = SESSION.create_execution_ctx();
1150
1151 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1152
1153 let observed = *OBSERVED_THRESHOLD.lock();
1154 assert!(matches!(
1155 observed,
1156 Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
1157 ));
1158 Ok(())
1159 }
1160
1161 #[test]
1162 fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
1163 let _guard = OBSERVER_LOCK.lock();
1164 *OBSERVED_THRESHOLD.lock() = None;
1165
1166 let compressor =
1167 CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
1168 let schemes: [&'static dyn Scheme; 2] =
1169 [&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
1170 let data = estimate_test_data();
1171 let mut exec_ctx = SESSION.create_execution_ctx();
1172
1173 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1174
1175 let observed = *OBSERVED_THRESHOLD.lock();
1178 assert_eq!(observed, Some(None));
1179 Ok(())
1180 }
1181
1182 #[test]
1183 fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
1184 let _guard = OBSERVER_LOCK.lock();
1185 *OBSERVED_THRESHOLD.lock() = None;
1186
1187 let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
1188 let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
1189 let data = estimate_test_data();
1190 let mut exec_ctx = SESSION.create_execution_ctx();
1191
1192 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1193
1194 let observed = *OBSERVED_THRESHOLD.lock();
1195 assert_eq!(observed, Some(None));
1196 Ok(())
1197 }
1198
1199 #[test]
1200 fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
1201 let _guard = OBSERVER_LOCK.lock();
1202 *OBSERVED_THRESHOLD.lock() = None;
1203
1204 let compressor =
1207 CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
1208 let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
1209 let data = estimate_test_data();
1210 let mut exec_ctx = SESSION.create_execution_ctx();
1211
1212 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1213
1214 let observed = *OBSERVED_THRESHOLD.lock();
1215 assert!(matches!(
1216 observed,
1217 Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
1218 ));
1219 Ok(())
1220 }
1221
1222 #[test]
1223 fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
1224 let compressor =
1228 CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
1229 let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
1230 let data = estimate_test_data();
1231 let mut exec_ctx = SESSION.create_execution_ctx();
1232
1233 let winner = compressor.choose_best_scheme(
1234 &schemes,
1235 &data,
1236 CompressorContext::new(),
1237 &mut exec_ctx,
1238 )?;
1239
1240 assert!(matches!(
1241 winner,
1242 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
1243 if scheme.id() == DirectRatioScheme.id() && r == 2.0
1244 ));
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn all_null_array_compresses_to_constant() -> VortexResult<()> {
1250 let array = PrimitiveArray::new(
1251 buffer![0i32, 0, 0, 0, 0],
1252 Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
1253 )
1254 .into_array();
1255
1256 let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
1259 let mut exec_ctx = SESSION.create_execution_ctx();
1260 let compressed = compressor.compress(&array, &mut exec_ctx)?;
1261 assert!(compressed.is::<Constant>());
1262 Ok(())
1263 }
1264
1265 #[test]
1272 fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
1273 let array = PrimitiveArray::new(
1275 buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
1276 Validity::NonNullable,
1277 )
1278 .into_array();
1279
1280 let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
1281
1282 let ctx = CompressorContext::new().with_sampling();
1286
1287 let mut exec_ctx = SESSION.create_execution_ctx();
1290 let score = estimate_compression_ratio_with_sampling(
1291 &compressor,
1292 &FloatDictScheme,
1293 &array,
1294 ctx,
1295 &mut exec_ctx,
1296 )?;
1297 assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
1298 Ok(())
1299 }
1300}