1use vortex_array::ArrayRef;
7use vortex_array::ArraySlots;
8use vortex_array::Canonical;
9use vortex_array::CanonicalValidity;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::ExtensionArray;
14use vortex_array::arrays::FixedSizeListArray;
15use vortex_array::arrays::ListArray;
16use vortex_array::arrays::ListViewArray;
17use vortex_array::arrays::PrimitiveArray;
18use vortex_array::arrays::StructArray;
19use vortex_array::arrays::Variant;
20use vortex_array::arrays::VariantArray;
21use vortex_array::arrays::extension::ExtensionArrayExt;
22use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
23use vortex_array::arrays::list::ListArrayExt;
24use vortex_array::arrays::listview::ListViewArrayExt;
25use vortex_array::arrays::listview::list_from_list_view;
26use vortex_array::arrays::primitive::PrimitiveArrayExt;
27use vortex_array::arrays::scalar_fn::AnyScalarFn;
28use vortex_array::arrays::struct_::StructArrayExt;
29use vortex_array::arrays::variant::VariantArrayExt;
30use vortex_array::dtype::DType;
31use vortex_array::dtype::Nullability;
32use vortex_array::scalar::Scalar;
33use vortex_error::VortexResult;
34
35use crate::builtins::IntDictScheme;
36use crate::ctx::CompressorContext;
37use crate::estimate::CompressionEstimate;
38use crate::estimate::DeferredEstimate;
39use crate::estimate::EstimateScore;
40use crate::estimate::EstimateVerdict;
41use crate::estimate::WinnerEstimate;
42use crate::estimate::estimate_compression_ratio_with_sampling;
43use crate::estimate::is_better_score;
44use crate::scheme::ChildSelection;
45use crate::scheme::DescendantExclusion;
46use crate::scheme::Scheme;
47use crate::scheme::SchemeExt;
48use crate::scheme::SchemeId;
49use crate::stats::ArrayAndStats;
50use crate::stats::GenerateStatsOptions;
51use crate::trace;
52
53pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
55 name: "vortex.compressor.root",
56};
57
58mod root_list_children {
60 pub const OFFSETS: usize = 1;
62 pub const SIZES: usize = 2;
64}
65
66#[derive(Debug, Clone)]
81pub struct CascadingCompressor {
82 schemes: Vec<&'static dyn Scheme>,
84
85 root_exclusions: Vec<DescendantExclusion>,
88}
89
90impl CascadingCompressor {
91 pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
96 let root_exclusions = vec![DescendantExclusion {
99 excluded: IntDictScheme.id(),
100 children: ChildSelection::One(root_list_children::OFFSETS),
101 }];
102 Self {
103 schemes,
104 root_exclusions,
105 }
106 }
107
108 pub fn compress(
116 &self,
117 array: &ArrayRef,
118 exec_ctx: &mut ExecutionCtx,
119 ) -> VortexResult<ArrayRef> {
120 let before_nbytes = array.nbytes();
121 let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
122 let _enter = span.enter();
123
124 let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
125 let compact = canonical.compact()?;
126 let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
127
128 trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
129
130 Ok(compressed)
131 }
132
133 pub fn compress_child(
143 &self,
144 child: &ArrayRef,
145 parent_ctx: &CompressorContext,
146 parent_id: SchemeId,
147 child_index: usize,
148 exec_ctx: &mut ExecutionCtx,
149 ) -> VortexResult<ArrayRef> {
150 if parent_ctx.finished_cascading() {
151 trace::cascade_exhausted(parent_id, child_index);
152 return Ok(child.clone());
153 }
154
155 let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
156 let compact = canonical.compact()?;
157
158 let child_ctx = parent_ctx
159 .clone()
160 .descend_with_scheme(parent_id, child_index);
161 self.compress_canonical(compact, child_ctx, exec_ctx)
162 }
163
164 fn compress_canonical(
170 &self,
171 array: Canonical,
172 compress_ctx: CompressorContext,
173 exec_ctx: &mut ExecutionCtx,
174 ) -> VortexResult<ArrayRef> {
175 match array {
176 Canonical::Null(null_array) => Ok(null_array.into_array()),
177 Canonical::Bool(bool_array) => {
178 self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
179 }
180 Canonical::Primitive(primitive) => {
181 self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
182 }
183 Canonical::Decimal(decimal) => {
184 self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
185 }
186 Canonical::Struct(struct_array) => {
187 let fields = struct_array
188 .iter_unmasked_fields()
189 .map(|field| self.compress(field, exec_ctx))
190 .collect::<Result<Vec<_>, _>>()?;
191
192 Ok(StructArray::try_new(
193 struct_array.names().clone(),
194 fields,
195 struct_array.len(),
196 struct_array.validity()?,
197 )?
198 .into_array())
199 }
200 Canonical::List(list_view_array) => {
201 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
202 let list_array = list_from_list_view(list_view_array)?;
203 self.compress_list_array(list_array, compress_ctx, exec_ctx)
204 } else {
205 self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
206 }
207 }
208 Canonical::FixedSizeList(fsl_array) => {
209 let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
210
211 Ok(FixedSizeListArray::try_new(
212 compressed_elems,
213 fsl_array.list_size(),
214 fsl_array.validity()?,
215 fsl_array.len(),
216 )?
217 .into_array())
218 }
219 Canonical::VarBinView(strings) => {
220 if strings
221 .dtype()
222 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
223 {
224 self.choose_and_compress(Canonical::VarBinView(strings), compress_ctx, exec_ctx)
225 } else {
226 Ok(strings.into_array())
228 }
229 }
230 Canonical::Extension(ext_array) => {
231 let before_nbytes = ext_array.as_ref().nbytes();
232
233 let result = self.choose_and_compress(
235 Canonical::Extension(ext_array.clone()),
236 compress_ctx,
237 exec_ctx,
238 )?;
239 if result.nbytes() < before_nbytes {
240 return Ok(result);
241 }
242
243 if result.is::<AnyScalarFn>() {
245 return Ok(result);
246 }
247
248 let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
250
251 Ok(
252 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
253 .into_array(),
254 )
255 }
256 Canonical::Variant(variant_array) => {
257 let core_storage =
258 self.compress_physical_slots(variant_array.core_storage(), exec_ctx)?;
259 let shredded = variant_array
260 .shredded()
261 .map(|arr| {
262 if arr.is::<Variant>() {
264 self.compress_physical_slots(arr, exec_ctx)
265 } else {
266 self.compress(arr, exec_ctx)
267 }
268 })
269 .transpose()?;
270
271 Ok(VariantArray::try_new(core_storage, shredded)?.into_array())
272 }
273 }
274 }
275
276 fn choose_and_compress(
290 &self,
291 canonical: Canonical,
292 compress_ctx: CompressorContext,
293 exec_ctx: &mut ExecutionCtx,
294 ) -> VortexResult<ArrayRef> {
295 let eligible_schemes: Vec<&'static dyn Scheme> = self
296 .schemes
297 .iter()
298 .copied()
299 .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
300 .collect();
301
302 let array: ArrayRef = canonical.into();
303
304 if eligible_schemes.is_empty() || array.is_empty() {
305 return Ok(array);
306 }
307
308 if array.all_invalid(exec_ctx)? {
309 return Ok(
310 ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
311 );
312 }
313
314 let before_nbytes = array.nbytes();
315
316 let merged_opts = eligible_schemes
317 .iter()
318 .fold(GenerateStatsOptions::default(), |acc, s| {
319 acc.merge(s.stats_options())
320 });
321 let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
322
323 let data = ArrayAndStats::new(array, merged_opts);
324
325 let Some((winner, winner_estimate)) =
326 self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
327 else {
328 return Ok(data.into_array());
329 };
330
331 let error_ctx = trace::enabled_error_context(&compress_ctx);
334 let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
335 let compressed = winner
336 .compress(self, &data, compress_ctx, exec_ctx)
337 .inspect_err(|err| {
338 trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
342 })?;
343
344 let after_nbytes = compressed.nbytes();
345 let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
346
347 let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
349
350 trace::record_winner_compress_result(
351 after_nbytes,
352 winner_estimate.trace_ratio(),
353 actual_ratio,
354 accepted,
355 );
356
357 if accepted {
358 Ok(compressed)
359 } else {
360 Ok(data.into_array())
361 }
362 }
363
364 fn choose_best_scheme(
380 &self,
381 schemes: &[&'static dyn Scheme],
382 data: &ArrayAndStats,
383 compress_ctx: CompressorContext,
384 exec_ctx: &mut ExecutionCtx,
385 ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
386 let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
387 let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
388
389 {
391 let _verdict_pass = trace::verdict_pass_span().entered();
392 for &scheme in schemes {
393 match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
394 CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
395 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
396 return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
397 }
398 CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
399 let score = EstimateScore::FiniteCompression(ratio);
400
401 if is_better_score(score, best.as_ref()) {
402 best = Some((scheme, score));
403 }
404 }
405 CompressionEstimate::Deferred(deferred_estimate) => {
406 deferred.push((scheme, deferred_estimate));
407 }
408 }
409 }
410 }
411
412 for (scheme, deferred_estimate) in deferred {
415 let _span = trace::scheme_eval_span(scheme.id()).entered();
416 let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
417 match deferred_estimate {
418 DeferredEstimate::Sample => {
419 let score = estimate_compression_ratio_with_sampling(
420 self,
421 scheme,
422 data.array(),
423 compress_ctx.clone(),
424 exec_ctx,
425 )?;
426
427 if is_better_score(score, best.as_ref()) {
428 best = Some((scheme, score));
429 }
430 }
431 DeferredEstimate::Callback(callback) => {
432 match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
433 EstimateVerdict::Skip => {}
434 EstimateVerdict::AlwaysUse => {
435 return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
436 }
437 EstimateVerdict::Ratio(ratio) => {
438 let score = EstimateScore::FiniteCompression(ratio);
439
440 if is_better_score(score, best.as_ref()) {
441 best = Some((scheme, score));
442 }
443 }
444 }
445 }
446 }
447 }
448
449 Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
450 }
451
452 fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
456 let id = candidate.id();
457 let history = ctx.cascade_history();
458
459 if history.iter().any(|&(sid, _)| sid == id) {
461 return true;
462 }
463
464 let mut iter = history.iter().copied().peekable();
465
466 if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
469 && self
470 .root_exclusions
471 .iter()
472 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
473 {
474 return true;
475 }
476
477 for (ancestor_id, child_idx) in iter {
479 if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
480 && ancestor
481 .descendant_exclusions()
482 .iter()
483 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
484 {
485 return true;
486 }
487 }
488
489 for rule in candidate.ancestor_exclusions() {
491 if history
492 .iter()
493 .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
494 {
495 return true;
496 }
497 }
498
499 false
500 }
501
502 fn compress_list_array(
504 &self,
505 list_array: ListArray,
506 compress_ctx: CompressorContext,
507 exec_ctx: &mut ExecutionCtx,
508 ) -> VortexResult<ArrayRef> {
509 let list_array = list_array.reset_offsets(true)?;
510
511 let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
512
513 let offset_ctx =
515 compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
516 let list_offsets_primitive = list_array
517 .offsets()
518 .clone()
519 .execute::<PrimitiveArray>(exec_ctx)?
520 .narrow(exec_ctx)?;
521 let compressed_offsets = self.compress_canonical(
522 Canonical::Primitive(list_offsets_primitive),
523 offset_ctx,
524 exec_ctx,
525 )?;
526
527 Ok(
528 ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
529 .into_array(),
530 )
531 }
532
533 fn compress_list_view_array(
536 &self,
537 list_view: ListViewArray,
538 compress_ctx: CompressorContext,
539 exec_ctx: &mut ExecutionCtx,
540 ) -> VortexResult<ArrayRef> {
541 let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
542
543 let offset_ctx = compress_ctx
544 .clone()
545 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
546 let list_view_offsets_primitive = list_view
547 .offsets()
548 .clone()
549 .execute::<PrimitiveArray>(exec_ctx)?
550 .narrow(exec_ctx)?;
551 let compressed_offsets = self.compress_canonical(
552 Canonical::Primitive(list_view_offsets_primitive),
553 offset_ctx,
554 exec_ctx,
555 )?;
556
557 let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
558 let list_view_sizes_primitive = list_view
559 .sizes()
560 .clone()
561 .execute::<PrimitiveArray>(exec_ctx)?
562 .narrow(exec_ctx)?;
563 let compressed_sizes = self.compress_canonical(
564 Canonical::Primitive(list_view_sizes_primitive),
565 sizes_ctx,
566 exec_ctx,
567 )?;
568
569 Ok(ListViewArray::try_new(
570 compressed_elems,
571 compressed_offsets,
572 compressed_sizes,
573 list_view.validity()?,
574 )?
575 .into_array())
576 }
577
578 fn compress_physical_slots(
580 &self,
581 array: &ArrayRef,
582 exec_ctx: &mut ExecutionCtx,
583 ) -> VortexResult<ArrayRef> {
584 let slots = array
585 .slots()
586 .iter()
587 .map(|slot| {
588 slot.as_ref()
589 .map(|child| self.compress(child, exec_ctx))
590 .transpose()
591 })
592 .collect::<VortexResult<ArraySlots>>()?;
593
594 array.clone().with_slots(slots)
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use std::sync::LazyLock;
601
602 use parking_lot::Mutex;
603 use vortex_array::ArrayRef;
604 use vortex_array::Canonical;
605 use vortex_array::VortexSessionExecute;
606 use vortex_array::arrays::BoolArray;
607 use vortex_array::arrays::Constant;
608 use vortex_array::arrays::NullArray;
609 use vortex_array::arrays::PrimitiveArray;
610 use vortex_array::session::ArraySession;
611 use vortex_array::validity::Validity;
612 use vortex_buffer::buffer;
613 use vortex_session::VortexSession;
614
615 use super::*;
616 use crate::builtins::FloatDictScheme;
617 use crate::builtins::IntDictScheme;
618 use crate::builtins::StringDictScheme;
619 use crate::ctx::CompressorContext;
620 use crate::estimate::CompressionEstimate;
621 use crate::estimate::DeferredEstimate;
622 use crate::estimate::EstimateScore;
623 use crate::estimate::EstimateVerdict;
624 use crate::estimate::WinnerEstimate;
625 use crate::scheme::SchemeExt;
626
627 static SESSION: LazyLock<VortexSession> =
628 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
629
630 fn compressor() -> CascadingCompressor {
631 CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
632 }
633
634 fn estimate_test_data() -> ArrayAndStats {
635 let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
636 ArrayAndStats::new(array, GenerateStatsOptions::default())
637 }
638
639 fn matches_integer_primitive(canonical: &Canonical) -> bool {
640 matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
641 }
642
643 #[derive(Debug)]
644 struct DirectRatioScheme;
645
646 impl Scheme for DirectRatioScheme {
647 fn scheme_name(&self) -> &'static str {
648 "test.direct_ratio"
649 }
650
651 fn matches(&self, canonical: &Canonical) -> bool {
652 matches_integer_primitive(canonical)
653 }
654
655 fn expected_compression_ratio(
656 &self,
657 _data: &ArrayAndStats,
658 _compress_ctx: CompressorContext,
659 _exec_ctx: &mut ExecutionCtx,
660 ) -> CompressionEstimate {
661 CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
662 }
663
664 fn compress(
665 &self,
666 _compressor: &CascadingCompressor,
667 _data: &ArrayAndStats,
668 _compress_ctx: CompressorContext,
669 _exec_ctx: &mut ExecutionCtx,
670 ) -> VortexResult<ArrayRef> {
671 unreachable!("test helper should never be selected for compression")
672 }
673 }
674
675 #[derive(Debug)]
676 struct ImmediateAlwaysUseScheme;
677
678 impl Scheme for ImmediateAlwaysUseScheme {
679 fn scheme_name(&self) -> &'static str {
680 "test.immediate_always_use"
681 }
682
683 fn matches(&self, canonical: &Canonical) -> bool {
684 matches_integer_primitive(canonical)
685 }
686
687 fn expected_compression_ratio(
688 &self,
689 _data: &ArrayAndStats,
690 _compress_ctx: CompressorContext,
691 _exec_ctx: &mut ExecutionCtx,
692 ) -> CompressionEstimate {
693 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
694 }
695
696 fn compress(
697 &self,
698 _compressor: &CascadingCompressor,
699 _data: &ArrayAndStats,
700 _compress_ctx: CompressorContext,
701 _exec_ctx: &mut ExecutionCtx,
702 ) -> VortexResult<ArrayRef> {
703 unreachable!("test helper should never be selected for compression")
704 }
705 }
706
707 #[derive(Debug)]
708 struct CallbackAlwaysUseScheme;
709
710 impl Scheme for CallbackAlwaysUseScheme {
711 fn scheme_name(&self) -> &'static str {
712 "test.callback_always_use"
713 }
714
715 fn matches(&self, canonical: &Canonical) -> bool {
716 matches_integer_primitive(canonical)
717 }
718
719 fn expected_compression_ratio(
720 &self,
721 _data: &ArrayAndStats,
722 _compress_ctx: CompressorContext,
723 _exec_ctx: &mut ExecutionCtx,
724 ) -> CompressionEstimate {
725 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
726 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
727 )))
728 }
729
730 fn compress(
731 &self,
732 _compressor: &CascadingCompressor,
733 _data: &ArrayAndStats,
734 _compress_ctx: CompressorContext,
735 _exec_ctx: &mut ExecutionCtx,
736 ) -> VortexResult<ArrayRef> {
737 unreachable!("test helper should never be selected for compression")
738 }
739 }
740
741 #[derive(Debug)]
742 struct CallbackSkipScheme;
743
744 impl Scheme for CallbackSkipScheme {
745 fn scheme_name(&self) -> &'static str {
746 "test.callback_skip"
747 }
748
749 fn matches(&self, canonical: &Canonical) -> bool {
750 matches_integer_primitive(canonical)
751 }
752
753 fn expected_compression_ratio(
754 &self,
755 _data: &ArrayAndStats,
756 _compress_ctx: CompressorContext,
757 _exec_ctx: &mut ExecutionCtx,
758 ) -> CompressionEstimate {
759 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
760 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
761 )))
762 }
763
764 fn compress(
765 &self,
766 _compressor: &CascadingCompressor,
767 _data: &ArrayAndStats,
768 _compress_ctx: CompressorContext,
769 _exec_ctx: &mut ExecutionCtx,
770 ) -> VortexResult<ArrayRef> {
771 unreachable!("test helper should never be selected for compression")
772 }
773 }
774
775 #[derive(Debug)]
776 struct CallbackRatioScheme;
777
778 impl Scheme for CallbackRatioScheme {
779 fn scheme_name(&self) -> &'static str {
780 "test.callback_ratio"
781 }
782
783 fn matches(&self, canonical: &Canonical) -> bool {
784 matches_integer_primitive(canonical)
785 }
786
787 fn expected_compression_ratio(
788 &self,
789 _data: &ArrayAndStats,
790 _compress_ctx: CompressorContext,
791 _exec_ctx: &mut ExecutionCtx,
792 ) -> CompressionEstimate {
793 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
794 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
795 )))
796 }
797
798 fn compress(
799 &self,
800 _compressor: &CascadingCompressor,
801 _data: &ArrayAndStats,
802 _compress_ctx: CompressorContext,
803 _exec_ctx: &mut ExecutionCtx,
804 ) -> VortexResult<ArrayRef> {
805 unreachable!("test helper should never be selected for compression")
806 }
807 }
808
809 #[derive(Debug)]
810 struct HugeRatioScheme;
811
812 impl Scheme for HugeRatioScheme {
813 fn scheme_name(&self) -> &'static str {
814 "test.huge_ratio"
815 }
816
817 fn matches(&self, canonical: &Canonical) -> bool {
818 matches_integer_primitive(canonical)
819 }
820
821 fn expected_compression_ratio(
822 &self,
823 _data: &ArrayAndStats,
824 _compress_ctx: CompressorContext,
825 _exec_ctx: &mut ExecutionCtx,
826 ) -> CompressionEstimate {
827 CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
828 }
829
830 fn compress(
831 &self,
832 _compressor: &CascadingCompressor,
833 _data: &ArrayAndStats,
834 _compress_ctx: CompressorContext,
835 _exec_ctx: &mut ExecutionCtx,
836 ) -> VortexResult<ArrayRef> {
837 unreachable!("test helper should never be selected for compression")
838 }
839 }
840
841 #[derive(Debug)]
842 struct ZeroBytesSamplingScheme;
843
844 impl Scheme for ZeroBytesSamplingScheme {
845 fn scheme_name(&self) -> &'static str {
846 "test.zero_bytes_sampling"
847 }
848
849 fn matches(&self, canonical: &Canonical) -> bool {
850 matches_integer_primitive(canonical)
851 }
852
853 fn expected_compression_ratio(
854 &self,
855 _data: &ArrayAndStats,
856 _compress_ctx: CompressorContext,
857 _exec_ctx: &mut ExecutionCtx,
858 ) -> CompressionEstimate {
859 CompressionEstimate::Deferred(DeferredEstimate::Sample)
860 }
861
862 fn compress(
863 &self,
864 _compressor: &CascadingCompressor,
865 data: &ArrayAndStats,
866 _compress_ctx: CompressorContext,
867 _exec_ctx: &mut ExecutionCtx,
868 ) -> VortexResult<ArrayRef> {
869 Ok(NullArray::new(data.array().len()).into_array())
870 }
871 }
872
873 #[test]
874 fn test_self_exclusion() {
875 let c = compressor();
876 let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
877
878 assert!(c.is_excluded(&IntDictScheme, &ctx));
880 }
881
882 #[test]
883 fn test_root_exclusion_list_offsets() {
884 let c = compressor();
885 let ctx = CompressorContext::default()
886 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
887
888 assert!(c.is_excluded(&IntDictScheme, &ctx));
890 }
891
892 #[test]
893 fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
894 let c = compressor();
895 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
897
898 assert!(c.is_excluded(&IntDictScheme, &ctx));
900 }
901
902 #[test]
903 fn test_push_rule_float_dict_excludes_int_dict_from_values() {
904 let c = compressor();
905 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
907
908 assert!(c.is_excluded(&IntDictScheme, &ctx));
911 }
912
913 #[test]
914 fn test_no_exclusion_without_history() {
915 let c = compressor();
916 let ctx = CompressorContext::default();
917
918 assert!(!c.is_excluded(&IntDictScheme, &ctx));
920 }
921
922 #[test]
923 fn immediate_always_use_wins_immediately() -> VortexResult<()> {
924 let compressor =
925 CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
926 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
927 let data = estimate_test_data();
928 let mut exec_ctx = SESSION.create_execution_ctx();
929
930 let winner = compressor.choose_best_scheme(
931 &schemes,
932 &data,
933 CompressorContext::new(),
934 &mut exec_ctx,
935 )?;
936
937 assert!(matches!(
938 winner,
939 Some((scheme, WinnerEstimate::AlwaysUse))
940 if scheme.id() == ImmediateAlwaysUseScheme.id()
941 ));
942 Ok(())
943 }
944
945 #[test]
946 fn callback_always_use_wins_immediately() -> VortexResult<()> {
947 let compressor =
948 CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
949 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
950 let data = estimate_test_data();
951 let mut exec_ctx = SESSION.create_execution_ctx();
952
953 let winner = compressor.choose_best_scheme(
954 &schemes,
955 &data,
956 CompressorContext::new(),
957 &mut exec_ctx,
958 )?;
959
960 assert!(matches!(
961 winner,
962 Some((scheme, WinnerEstimate::AlwaysUse))
963 if scheme.id() == CallbackAlwaysUseScheme.id()
964 ));
965 Ok(())
966 }
967
968 #[test]
969 fn callback_skip_is_ignored() -> VortexResult<()> {
970 let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
971 let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
972 let data = estimate_test_data();
973 let mut exec_ctx = SESSION.create_execution_ctx();
974
975 let winner = compressor.choose_best_scheme(
976 &schemes,
977 &data,
978 CompressorContext::new(),
979 &mut exec_ctx,
980 )?;
981
982 assert!(matches!(
983 winner,
984 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
985 if scheme.id() == DirectRatioScheme.id()
986 ));
987 Ok(())
988 }
989
990 #[test]
991 fn callback_ratio_competes_numerically() -> VortexResult<()> {
992 let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
993 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
994 let data = estimate_test_data();
995 let mut exec_ctx = SESSION.create_execution_ctx();
996
997 let winner = compressor.choose_best_scheme(
998 &schemes,
999 &data,
1000 CompressorContext::new(),
1001 &mut exec_ctx,
1002 )?;
1003
1004 assert!(matches!(
1005 winner,
1006 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
1007 if scheme.id() == CallbackRatioScheme.id()
1008 ));
1009 Ok(())
1010 }
1011
1012 #[test]
1013 fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
1014 let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
1015 let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
1016 let data = estimate_test_data();
1017 let mut exec_ctx = SESSION.create_execution_ctx();
1018
1019 let winner = compressor.choose_best_scheme(
1020 &schemes,
1021 &data,
1022 CompressorContext::new(),
1023 &mut exec_ctx,
1024 )?;
1025
1026 assert!(matches!(
1027 winner,
1028 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1029 if scheme.id() == HugeRatioScheme.id()
1030 ));
1031 Ok(())
1032 }
1033
1034 #[test]
1035 fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
1036 let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
1037 let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
1038 let data = estimate_test_data();
1039 let mut exec_ctx = SESSION.create_execution_ctx();
1040
1041 let winner = compressor.choose_best_scheme(
1042 &schemes,
1043 &data,
1044 CompressorContext::new(),
1045 &mut exec_ctx,
1046 )?;
1047
1048 assert!(matches!(
1049 winner,
1050 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1051 if scheme.id() == HugeRatioScheme.id()
1052 ));
1053 Ok(())
1054 }
1055
1056 #[test]
1057 fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
1058 let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
1059 let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
1060 let data = estimate_test_data();
1061 let mut exec_ctx = SESSION.create_execution_ctx();
1062
1063 let winner = compressor.choose_best_scheme(
1064 &schemes,
1065 &data,
1066 CompressorContext::new(),
1067 &mut exec_ctx,
1068 )?;
1069
1070 assert!(winner.is_none());
1071 Ok(())
1072 }
1073
1074 static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
1078 static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
1079
1080 #[derive(Debug)]
1081 struct ThresholdObservingScheme;
1082
1083 impl Scheme for ThresholdObservingScheme {
1084 fn scheme_name(&self) -> &'static str {
1085 "test.threshold_observing"
1086 }
1087
1088 fn matches(&self, canonical: &Canonical) -> bool {
1089 matches_integer_primitive(canonical)
1090 }
1091
1092 fn expected_compression_ratio(
1093 &self,
1094 _data: &ArrayAndStats,
1095 _compress_ctx: CompressorContext,
1096 _exec_ctx: &mut ExecutionCtx,
1097 ) -> CompressionEstimate {
1098 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1099 |_compressor, _data, best_so_far, _ctx, _exec_ctx| {
1100 *OBSERVED_THRESHOLD.lock() = Some(best_so_far);
1101 Ok(EstimateVerdict::Skip)
1102 },
1103 )))
1104 }
1105
1106 fn compress(
1107 &self,
1108 _compressor: &CascadingCompressor,
1109 _data: &ArrayAndStats,
1110 _compress_ctx: CompressorContext,
1111 _exec_ctx: &mut ExecutionCtx,
1112 ) -> VortexResult<ArrayRef> {
1113 unreachable!("test helper should never be selected for compression")
1114 }
1115 }
1116
1117 #[derive(Debug)]
1118 struct CallbackMatchingRatioScheme;
1119
1120 impl Scheme for CallbackMatchingRatioScheme {
1121 fn scheme_name(&self) -> &'static str {
1122 "test.callback_matching_ratio"
1123 }
1124
1125 fn matches(&self, canonical: &Canonical) -> bool {
1126 matches_integer_primitive(canonical)
1127 }
1128
1129 fn expected_compression_ratio(
1130 &self,
1131 _data: &ArrayAndStats,
1132 _compress_ctx: CompressorContext,
1133 _exec_ctx: &mut ExecutionCtx,
1134 ) -> CompressionEstimate {
1135 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1136 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
1137 )))
1138 }
1139
1140 fn compress(
1141 &self,
1142 _compressor: &CascadingCompressor,
1143 _data: &ArrayAndStats,
1144 _compress_ctx: CompressorContext,
1145 _exec_ctx: &mut ExecutionCtx,
1146 ) -> VortexResult<ArrayRef> {
1147 unreachable!("test helper should never be selected for compression")
1148 }
1149 }
1150
1151 #[test]
1152 fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
1153 let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
1157 let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
1158 let data = estimate_test_data();
1159 let mut exec_ctx = SESSION.create_execution_ctx();
1160
1161 let winner = compressor.choose_best_scheme(
1162 &schemes,
1163 &data,
1164 CompressorContext::new(),
1165 &mut exec_ctx,
1166 )?;
1167
1168 assert!(matches!(
1169 winner,
1170 Some((scheme, WinnerEstimate::AlwaysUse))
1171 if scheme.id() == CallbackAlwaysUseScheme.id()
1172 ));
1173 Ok(())
1174 }
1175
1176 #[test]
1177 fn threshold_reflects_pass_one_best() -> VortexResult<()> {
1178 let _guard = OBSERVER_LOCK.lock();
1179 *OBSERVED_THRESHOLD.lock() = None;
1180
1181 let compressor =
1182 CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
1183 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
1184 let data = estimate_test_data();
1185 let mut exec_ctx = SESSION.create_execution_ctx();
1186
1187 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1188
1189 let observed = *OBSERVED_THRESHOLD.lock();
1190 assert!(matches!(
1191 observed,
1192 Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
1193 ));
1194 Ok(())
1195 }
1196
1197 #[test]
1198 fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
1199 let _guard = OBSERVER_LOCK.lock();
1200 *OBSERVED_THRESHOLD.lock() = None;
1201
1202 let compressor =
1203 CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
1204 let schemes: [&'static dyn Scheme; 2] =
1205 [&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
1206 let data = estimate_test_data();
1207 let mut exec_ctx = SESSION.create_execution_ctx();
1208
1209 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1210
1211 let observed = *OBSERVED_THRESHOLD.lock();
1214 assert_eq!(observed, Some(None));
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
1220 let _guard = OBSERVER_LOCK.lock();
1221 *OBSERVED_THRESHOLD.lock() = None;
1222
1223 let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
1224 let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
1225 let data = estimate_test_data();
1226 let mut exec_ctx = SESSION.create_execution_ctx();
1227
1228 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1229
1230 let observed = *OBSERVED_THRESHOLD.lock();
1231 assert_eq!(observed, Some(None));
1232 Ok(())
1233 }
1234
1235 #[test]
1236 fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
1237 let _guard = OBSERVER_LOCK.lock();
1238 *OBSERVED_THRESHOLD.lock() = None;
1239
1240 let compressor =
1243 CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
1244 let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
1245 let data = estimate_test_data();
1246 let mut exec_ctx = SESSION.create_execution_ctx();
1247
1248 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1249
1250 let observed = *OBSERVED_THRESHOLD.lock();
1251 assert!(matches!(
1252 observed,
1253 Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
1254 ));
1255 Ok(())
1256 }
1257
1258 #[test]
1259 fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
1260 let compressor =
1264 CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
1265 let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
1266 let data = estimate_test_data();
1267 let mut exec_ctx = SESSION.create_execution_ctx();
1268
1269 let winner = compressor.choose_best_scheme(
1270 &schemes,
1271 &data,
1272 CompressorContext::new(),
1273 &mut exec_ctx,
1274 )?;
1275
1276 assert!(matches!(
1277 winner,
1278 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
1279 if scheme.id() == DirectRatioScheme.id() && r == 2.0
1280 ));
1281 Ok(())
1282 }
1283
1284 #[test]
1285 fn all_null_array_compresses_to_constant() -> VortexResult<()> {
1286 let array = PrimitiveArray::new(
1287 buffer![0i32, 0, 0, 0, 0],
1288 Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
1289 )
1290 .into_array();
1291
1292 let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
1295 let mut exec_ctx = SESSION.create_execution_ctx();
1296 let compressed = compressor.compress(&array, &mut exec_ctx)?;
1297 assert!(compressed.is::<Constant>());
1298 Ok(())
1299 }
1300
1301 #[test]
1308 fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
1309 let array = PrimitiveArray::new(
1311 buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
1312 Validity::NonNullable,
1313 )
1314 .into_array();
1315
1316 let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
1317
1318 let ctx = CompressorContext::new().with_sampling();
1322
1323 let mut exec_ctx = SESSION.create_execution_ctx();
1326 let score = estimate_compression_ratio_with_sampling(
1327 &compressor,
1328 &FloatDictScheme,
1329 &array,
1330 ctx,
1331 &mut exec_ctx,
1332 )?;
1333 assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
1334 Ok(())
1335 }
1336}