1use vortex_array::ArrayRef;
7use vortex_array::ArraySlots;
8use vortex_array::Canonical;
9use vortex_array::CanonicalValidity;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::ExtensionArray;
14use vortex_array::arrays::FixedSizeListArray;
15use vortex_array::arrays::ListArray;
16use vortex_array::arrays::ListViewArray;
17use vortex_array::arrays::PrimitiveArray;
18use vortex_array::arrays::StructArray;
19use vortex_array::arrays::Variant;
20use vortex_array::arrays::VariantArray;
21use vortex_array::arrays::extension::ExtensionArrayExt;
22use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
23use vortex_array::arrays::list::ListArrayExt;
24use vortex_array::arrays::listview::ListViewArrayExt;
25use vortex_array::arrays::listview::list_from_list_view;
26use vortex_array::arrays::primitive::PrimitiveArrayExt;
27use vortex_array::arrays::scalar_fn::AnyScalarFn;
28use vortex_array::arrays::struct_::StructArrayExt;
29use vortex_array::arrays::variant::VariantArrayExt;
30use vortex_array::scalar::Scalar;
31use vortex_error::VortexResult;
32
33use crate::builtins::IntDictScheme;
34use crate::ctx::CompressorContext;
35use crate::estimate::CompressionEstimate;
36use crate::estimate::DeferredEstimate;
37use crate::estimate::EstimateScore;
38use crate::estimate::EstimateVerdict;
39use crate::estimate::WinnerEstimate;
40use crate::estimate::estimate_compression_ratio_with_sampling;
41use crate::estimate::is_better_score;
42use crate::scheme::ChildSelection;
43use crate::scheme::DescendantExclusion;
44use crate::scheme::Scheme;
45use crate::scheme::SchemeExt;
46use crate::scheme::SchemeId;
47use crate::stats::ArrayAndStats;
48use crate::stats::GenerateStatsOptions;
49use crate::trace;
50
51pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
53 name: "vortex.compressor.root",
54};
55
56mod root_list_children {
58 pub const OFFSETS: usize = 1;
60 pub const SIZES: usize = 2;
62}
63
64#[derive(Debug, Clone)]
79pub struct CascadingCompressor {
80 schemes: Vec<&'static dyn Scheme>,
82
83 root_exclusions: Vec<DescendantExclusion>,
86}
87
88impl CascadingCompressor {
89 pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
94 let root_exclusions = vec![DescendantExclusion {
97 excluded: IntDictScheme.id(),
98 children: ChildSelection::One(root_list_children::OFFSETS),
99 }];
100 Self {
101 schemes,
102 root_exclusions,
103 }
104 }
105
106 pub fn compress(
114 &self,
115 array: &ArrayRef,
116 exec_ctx: &mut ExecutionCtx,
117 ) -> VortexResult<ArrayRef> {
118 let before_nbytes = array.nbytes();
119 let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
120 let _enter = span.enter();
121
122 let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
123 let compact = canonical.compact()?;
124 let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
125
126 trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
127
128 Ok(compressed)
129 }
130
131 pub fn compress_child(
141 &self,
142 child: &ArrayRef,
143 parent_ctx: &CompressorContext,
144 parent_id: SchemeId,
145 child_index: usize,
146 exec_ctx: &mut ExecutionCtx,
147 ) -> VortexResult<ArrayRef> {
148 if parent_ctx.finished_cascading() {
149 trace::cascade_exhausted(parent_id, child_index);
150 return Ok(child.clone());
151 }
152
153 let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
154 let compact = canonical.compact()?;
155
156 let child_ctx = parent_ctx
157 .clone()
158 .descend_with_scheme(parent_id, child_index);
159 self.compress_canonical(compact, child_ctx, exec_ctx)
160 }
161
162 fn compress_canonical(
168 &self,
169 array: Canonical,
170 compress_ctx: CompressorContext,
171 exec_ctx: &mut ExecutionCtx,
172 ) -> VortexResult<ArrayRef> {
173 match array {
174 Canonical::Null(null_array) => Ok(null_array.into_array()),
175 Canonical::Bool(bool_array) => {
176 self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
177 }
178 Canonical::Primitive(primitive) => {
179 self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
180 }
181 Canonical::Decimal(decimal) => {
182 self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
183 }
184 Canonical::Struct(struct_array) => {
185 let fields = struct_array
186 .iter_unmasked_fields()
187 .map(|field| self.compress(field, exec_ctx))
188 .collect::<Result<Vec<_>, _>>()?;
189
190 Ok(StructArray::try_new(
191 struct_array.names().clone(),
192 fields,
193 struct_array.len(),
194 struct_array.validity()?,
195 )?
196 .into_array())
197 }
198 Canonical::List(list_view_array) => {
199 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
200 let list_array = list_from_list_view(list_view_array)?;
201 self.compress_list_array(list_array, compress_ctx, exec_ctx)
202 } else {
203 self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
204 }
205 }
206 Canonical::FixedSizeList(fsl_array) => {
207 let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
208
209 Ok(FixedSizeListArray::try_new(
210 compressed_elems,
211 fsl_array.list_size(),
212 fsl_array.validity()?,
213 fsl_array.len(),
214 )?
215 .into_array())
216 }
217 Canonical::VarBinView(varbinview) => {
218 self.choose_and_compress(Canonical::VarBinView(varbinview), compress_ctx, exec_ctx)
219 }
220 Canonical::Extension(ext_array) => {
221 let scheme_compressed = self.choose_and_compress(
223 Canonical::Extension(ext_array.clone()),
224 compress_ctx,
225 exec_ctx,
226 )?;
227 if scheme_compressed.is::<AnyScalarFn>() {
229 return Ok(scheme_compressed);
230 }
231
232 let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
235 let storage_compressed =
236 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
237 .into_array();
238
239 if scheme_compressed.nbytes() < storage_compressed.nbytes() {
240 Ok(scheme_compressed)
241 } else {
242 Ok(storage_compressed)
243 }
244 }
245 Canonical::Variant(variant_array) => {
246 let core_storage =
247 self.compress_physical_slots(variant_array.core_storage(), exec_ctx)?;
248 let shredded = variant_array
249 .shredded()
250 .map(|arr| {
251 if arr.is::<Variant>() {
253 self.compress_physical_slots(arr, exec_ctx)
254 } else {
255 self.compress(arr, exec_ctx)
256 }
257 })
258 .transpose()?;
259
260 Ok(VariantArray::try_new(core_storage, shredded)?.into_array())
261 }
262 }
263 }
264
265 fn choose_and_compress(
279 &self,
280 canonical: Canonical,
281 compress_ctx: CompressorContext,
282 exec_ctx: &mut ExecutionCtx,
283 ) -> VortexResult<ArrayRef> {
284 let eligible_schemes: Vec<&'static dyn Scheme> = self
285 .schemes
286 .iter()
287 .copied()
288 .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
289 .collect();
290
291 let array: ArrayRef = canonical.into();
292
293 if eligible_schemes.is_empty() || array.is_empty() {
294 return Ok(array);
295 }
296
297 if array.all_invalid(exec_ctx)? {
298 return Ok(
299 ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
300 );
301 }
302
303 let before_nbytes = array.nbytes();
304
305 let merged_opts = eligible_schemes
306 .iter()
307 .fold(GenerateStatsOptions::default(), |acc, s| {
308 acc.merge(s.stats_options())
309 });
310 let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
311
312 let data = ArrayAndStats::new(array, merged_opts);
313
314 let Some((winner, winner_estimate)) =
315 self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
316 else {
317 return Ok(data.into_array());
318 };
319
320 let error_ctx = trace::enabled_error_context(&compress_ctx);
323 let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
324 let compressed = winner
325 .compress(self, &data, compress_ctx, exec_ctx)
326 .inspect_err(|err| {
327 trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
331 })?;
332
333 let after_nbytes = compressed.nbytes();
334 let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
335
336 let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
338
339 trace::record_winner_compress_result(
340 after_nbytes,
341 winner_estimate.trace_ratio(),
342 actual_ratio,
343 accepted,
344 );
345
346 if accepted {
347 Ok(compressed)
348 } else {
349 Ok(data.into_array())
350 }
351 }
352
353 fn choose_best_scheme(
369 &self,
370 schemes: &[&'static dyn Scheme],
371 data: &ArrayAndStats,
372 compress_ctx: CompressorContext,
373 exec_ctx: &mut ExecutionCtx,
374 ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
375 let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
376 let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
377
378 {
380 let _verdict_pass = trace::verdict_pass_span().entered();
381 for &scheme in schemes {
382 match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
383 CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
384 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
385 return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
386 }
387 CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
388 let score = EstimateScore::FiniteCompression(ratio);
389
390 if is_better_score(score, best.as_ref()) {
391 best = Some((scheme, score));
392 }
393 }
394 CompressionEstimate::Deferred(deferred_estimate) => {
395 deferred.push((scheme, deferred_estimate));
396 }
397 }
398 }
399 }
400
401 for (scheme, deferred_estimate) in deferred {
404 let _span = trace::scheme_eval_span(scheme.id()).entered();
405 let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
406 match deferred_estimate {
407 DeferredEstimate::Sample => {
408 let score = estimate_compression_ratio_with_sampling(
409 self,
410 scheme,
411 data.array(),
412 compress_ctx.clone(),
413 exec_ctx,
414 )?;
415
416 if is_better_score(score, best.as_ref()) {
417 best = Some((scheme, score));
418 }
419 }
420 DeferredEstimate::Callback(callback) => {
421 match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
422 EstimateVerdict::Skip => {}
423 EstimateVerdict::AlwaysUse => {
424 return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
425 }
426 EstimateVerdict::Ratio(ratio) => {
427 let score = EstimateScore::FiniteCompression(ratio);
428
429 if is_better_score(score, best.as_ref()) {
430 best = Some((scheme, score));
431 }
432 }
433 }
434 }
435 }
436 }
437
438 Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
439 }
440
441 fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
445 let id = candidate.id();
446 let history = ctx.cascade_history();
447
448 if history.iter().any(|&(sid, _)| sid == id) {
450 return true;
451 }
452
453 let mut iter = history.iter().copied().peekable();
454
455 if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
458 && self
459 .root_exclusions
460 .iter()
461 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
462 {
463 return true;
464 }
465
466 for (ancestor_id, child_idx) in iter {
468 if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
469 && ancestor
470 .descendant_exclusions()
471 .iter()
472 .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
473 {
474 return true;
475 }
476 }
477
478 for rule in candidate.ancestor_exclusions() {
480 if history
481 .iter()
482 .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
483 {
484 return true;
485 }
486 }
487
488 false
489 }
490
491 fn compress_list_array(
493 &self,
494 list_array: ListArray,
495 compress_ctx: CompressorContext,
496 exec_ctx: &mut ExecutionCtx,
497 ) -> VortexResult<ArrayRef> {
498 let list_array = list_array.reset_offsets(true)?;
499
500 let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
501
502 let offset_ctx =
504 compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
505 let list_offsets_primitive = list_array
506 .offsets()
507 .clone()
508 .execute::<PrimitiveArray>(exec_ctx)?
509 .narrow(exec_ctx)?;
510 let compressed_offsets = self.compress_canonical(
511 Canonical::Primitive(list_offsets_primitive),
512 offset_ctx,
513 exec_ctx,
514 )?;
515
516 Ok(
517 ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
518 .into_array(),
519 )
520 }
521
522 fn compress_list_view_array(
525 &self,
526 list_view: ListViewArray,
527 compress_ctx: CompressorContext,
528 exec_ctx: &mut ExecutionCtx,
529 ) -> VortexResult<ArrayRef> {
530 let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
531
532 let offset_ctx = compress_ctx
533 .clone()
534 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
535 let list_view_offsets_primitive = list_view
536 .offsets()
537 .clone()
538 .execute::<PrimitiveArray>(exec_ctx)?
539 .narrow(exec_ctx)?;
540 let compressed_offsets = self.compress_canonical(
541 Canonical::Primitive(list_view_offsets_primitive),
542 offset_ctx,
543 exec_ctx,
544 )?;
545
546 let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
547 let list_view_sizes_primitive = list_view
548 .sizes()
549 .clone()
550 .execute::<PrimitiveArray>(exec_ctx)?
551 .narrow(exec_ctx)?;
552 let compressed_sizes = self.compress_canonical(
553 Canonical::Primitive(list_view_sizes_primitive),
554 sizes_ctx,
555 exec_ctx,
556 )?;
557
558 Ok(ListViewArray::try_new(
559 compressed_elems,
560 compressed_offsets,
561 compressed_sizes,
562 list_view.validity()?,
563 )?
564 .into_array())
565 }
566
567 fn compress_physical_slots(
569 &self,
570 array: &ArrayRef,
571 exec_ctx: &mut ExecutionCtx,
572 ) -> VortexResult<ArrayRef> {
573 let slots = array
574 .slots()
575 .iter()
576 .map(|slot| {
577 slot.as_ref()
578 .map(|child| self.compress(child, exec_ctx))
579 .transpose()
580 })
581 .collect::<VortexResult<ArraySlots>>()?;
582
583 array.clone().with_slots(slots)
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use std::sync::LazyLock;
590
591 use parking_lot::Mutex;
592 use vortex_array::ArrayRef;
593 use vortex_array::Canonical;
594 use vortex_array::VortexSessionExecute;
595 use vortex_array::arrays::BoolArray;
596 use vortex_array::arrays::Constant;
597 use vortex_array::arrays::NullArray;
598 use vortex_array::arrays::PrimitiveArray;
599 use vortex_array::session::ArraySession;
600 use vortex_array::validity::Validity;
601 use vortex_buffer::buffer;
602 use vortex_session::VortexSession;
603
604 use super::*;
605 use crate::builtins::FloatDictScheme;
606 use crate::builtins::IntDictScheme;
607 use crate::builtins::StringDictScheme;
608 use crate::ctx::CompressorContext;
609 use crate::estimate::CompressionEstimate;
610 use crate::estimate::DeferredEstimate;
611 use crate::estimate::EstimateScore;
612 use crate::estimate::EstimateVerdict;
613 use crate::estimate::WinnerEstimate;
614 use crate::scheme::SchemeExt;
615
616 static SESSION: LazyLock<VortexSession> =
617 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
618
619 fn compressor() -> CascadingCompressor {
620 CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
621 }
622
623 fn estimate_test_data() -> ArrayAndStats {
624 let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
625 ArrayAndStats::new(array, GenerateStatsOptions::default())
626 }
627
628 fn matches_integer_primitive(canonical: &Canonical) -> bool {
629 matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
630 }
631
632 #[derive(Debug)]
633 struct DirectRatioScheme;
634
635 impl Scheme for DirectRatioScheme {
636 fn scheme_name(&self) -> &'static str {
637 "test.direct_ratio"
638 }
639
640 fn matches(&self, canonical: &Canonical) -> bool {
641 matches_integer_primitive(canonical)
642 }
643
644 fn expected_compression_ratio(
645 &self,
646 _data: &ArrayAndStats,
647 _compress_ctx: CompressorContext,
648 _exec_ctx: &mut ExecutionCtx,
649 ) -> CompressionEstimate {
650 CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
651 }
652
653 fn compress(
654 &self,
655 _compressor: &CascadingCompressor,
656 _data: &ArrayAndStats,
657 _compress_ctx: CompressorContext,
658 _exec_ctx: &mut ExecutionCtx,
659 ) -> VortexResult<ArrayRef> {
660 unreachable!("test helper should never be selected for compression")
661 }
662 }
663
664 #[derive(Debug)]
665 struct ImmediateAlwaysUseScheme;
666
667 impl Scheme for ImmediateAlwaysUseScheme {
668 fn scheme_name(&self) -> &'static str {
669 "test.immediate_always_use"
670 }
671
672 fn matches(&self, canonical: &Canonical) -> bool {
673 matches_integer_primitive(canonical)
674 }
675
676 fn expected_compression_ratio(
677 &self,
678 _data: &ArrayAndStats,
679 _compress_ctx: CompressorContext,
680 _exec_ctx: &mut ExecutionCtx,
681 ) -> CompressionEstimate {
682 CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
683 }
684
685 fn compress(
686 &self,
687 _compressor: &CascadingCompressor,
688 _data: &ArrayAndStats,
689 _compress_ctx: CompressorContext,
690 _exec_ctx: &mut ExecutionCtx,
691 ) -> VortexResult<ArrayRef> {
692 unreachable!("test helper should never be selected for compression")
693 }
694 }
695
696 #[derive(Debug)]
697 struct CallbackAlwaysUseScheme;
698
699 impl Scheme for CallbackAlwaysUseScheme {
700 fn scheme_name(&self) -> &'static str {
701 "test.callback_always_use"
702 }
703
704 fn matches(&self, canonical: &Canonical) -> bool {
705 matches_integer_primitive(canonical)
706 }
707
708 fn expected_compression_ratio(
709 &self,
710 _data: &ArrayAndStats,
711 _compress_ctx: CompressorContext,
712 _exec_ctx: &mut ExecutionCtx,
713 ) -> CompressionEstimate {
714 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
715 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
716 )))
717 }
718
719 fn compress(
720 &self,
721 _compressor: &CascadingCompressor,
722 _data: &ArrayAndStats,
723 _compress_ctx: CompressorContext,
724 _exec_ctx: &mut ExecutionCtx,
725 ) -> VortexResult<ArrayRef> {
726 unreachable!("test helper should never be selected for compression")
727 }
728 }
729
730 #[derive(Debug)]
731 struct CallbackSkipScheme;
732
733 impl Scheme for CallbackSkipScheme {
734 fn scheme_name(&self) -> &'static str {
735 "test.callback_skip"
736 }
737
738 fn matches(&self, canonical: &Canonical) -> bool {
739 matches_integer_primitive(canonical)
740 }
741
742 fn expected_compression_ratio(
743 &self,
744 _data: &ArrayAndStats,
745 _compress_ctx: CompressorContext,
746 _exec_ctx: &mut ExecutionCtx,
747 ) -> CompressionEstimate {
748 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
749 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
750 )))
751 }
752
753 fn compress(
754 &self,
755 _compressor: &CascadingCompressor,
756 _data: &ArrayAndStats,
757 _compress_ctx: CompressorContext,
758 _exec_ctx: &mut ExecutionCtx,
759 ) -> VortexResult<ArrayRef> {
760 unreachable!("test helper should never be selected for compression")
761 }
762 }
763
764 #[derive(Debug)]
765 struct CallbackRatioScheme;
766
767 impl Scheme for CallbackRatioScheme {
768 fn scheme_name(&self) -> &'static str {
769 "test.callback_ratio"
770 }
771
772 fn matches(&self, canonical: &Canonical) -> bool {
773 matches_integer_primitive(canonical)
774 }
775
776 fn expected_compression_ratio(
777 &self,
778 _data: &ArrayAndStats,
779 _compress_ctx: CompressorContext,
780 _exec_ctx: &mut ExecutionCtx,
781 ) -> CompressionEstimate {
782 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
783 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
784 )))
785 }
786
787 fn compress(
788 &self,
789 _compressor: &CascadingCompressor,
790 _data: &ArrayAndStats,
791 _compress_ctx: CompressorContext,
792 _exec_ctx: &mut ExecutionCtx,
793 ) -> VortexResult<ArrayRef> {
794 unreachable!("test helper should never be selected for compression")
795 }
796 }
797
798 #[derive(Debug)]
799 struct HugeRatioScheme;
800
801 impl Scheme for HugeRatioScheme {
802 fn scheme_name(&self) -> &'static str {
803 "test.huge_ratio"
804 }
805
806 fn matches(&self, canonical: &Canonical) -> bool {
807 matches_integer_primitive(canonical)
808 }
809
810 fn expected_compression_ratio(
811 &self,
812 _data: &ArrayAndStats,
813 _compress_ctx: CompressorContext,
814 _exec_ctx: &mut ExecutionCtx,
815 ) -> CompressionEstimate {
816 CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
817 }
818
819 fn compress(
820 &self,
821 _compressor: &CascadingCompressor,
822 _data: &ArrayAndStats,
823 _compress_ctx: CompressorContext,
824 _exec_ctx: &mut ExecutionCtx,
825 ) -> VortexResult<ArrayRef> {
826 unreachable!("test helper should never be selected for compression")
827 }
828 }
829
830 #[derive(Debug)]
831 struct ZeroBytesSamplingScheme;
832
833 impl Scheme for ZeroBytesSamplingScheme {
834 fn scheme_name(&self) -> &'static str {
835 "test.zero_bytes_sampling"
836 }
837
838 fn matches(&self, canonical: &Canonical) -> bool {
839 matches_integer_primitive(canonical)
840 }
841
842 fn expected_compression_ratio(
843 &self,
844 _data: &ArrayAndStats,
845 _compress_ctx: CompressorContext,
846 _exec_ctx: &mut ExecutionCtx,
847 ) -> CompressionEstimate {
848 CompressionEstimate::Deferred(DeferredEstimate::Sample)
849 }
850
851 fn compress(
852 &self,
853 _compressor: &CascadingCompressor,
854 data: &ArrayAndStats,
855 _compress_ctx: CompressorContext,
856 _exec_ctx: &mut ExecutionCtx,
857 ) -> VortexResult<ArrayRef> {
858 Ok(NullArray::new(data.array().len()).into_array())
859 }
860 }
861
862 #[test]
863 fn test_self_exclusion() {
864 let c = compressor();
865 let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
866
867 assert!(c.is_excluded(&IntDictScheme, &ctx));
869 }
870
871 #[test]
872 fn test_root_exclusion_list_offsets() {
873 let c = compressor();
874 let ctx = CompressorContext::default()
875 .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
876
877 assert!(c.is_excluded(&IntDictScheme, &ctx));
879 }
880
881 #[test]
882 fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
883 let c = compressor();
884 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
886
887 assert!(c.is_excluded(&IntDictScheme, &ctx));
889 }
890
891 #[test]
892 fn test_push_rule_float_dict_excludes_int_dict_from_values() {
893 let c = compressor();
894 let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
896
897 assert!(c.is_excluded(&IntDictScheme, &ctx));
900 }
901
902 #[test]
903 fn test_no_exclusion_without_history() {
904 let c = compressor();
905 let ctx = CompressorContext::default();
906
907 assert!(!c.is_excluded(&IntDictScheme, &ctx));
909 }
910
911 #[test]
912 fn immediate_always_use_wins_immediately() -> VortexResult<()> {
913 let compressor =
914 CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
915 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
916 let data = estimate_test_data();
917 let mut exec_ctx = SESSION.create_execution_ctx();
918
919 let winner = compressor.choose_best_scheme(
920 &schemes,
921 &data,
922 CompressorContext::new(),
923 &mut exec_ctx,
924 )?;
925
926 assert!(matches!(
927 winner,
928 Some((scheme, WinnerEstimate::AlwaysUse))
929 if scheme.id() == ImmediateAlwaysUseScheme.id()
930 ));
931 Ok(())
932 }
933
934 #[test]
935 fn callback_always_use_wins_immediately() -> VortexResult<()> {
936 let compressor =
937 CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
938 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
939 let data = estimate_test_data();
940 let mut exec_ctx = SESSION.create_execution_ctx();
941
942 let winner = compressor.choose_best_scheme(
943 &schemes,
944 &data,
945 CompressorContext::new(),
946 &mut exec_ctx,
947 )?;
948
949 assert!(matches!(
950 winner,
951 Some((scheme, WinnerEstimate::AlwaysUse))
952 if scheme.id() == CallbackAlwaysUseScheme.id()
953 ));
954 Ok(())
955 }
956
957 #[test]
958 fn callback_skip_is_ignored() -> VortexResult<()> {
959 let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
960 let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
961 let data = estimate_test_data();
962 let mut exec_ctx = SESSION.create_execution_ctx();
963
964 let winner = compressor.choose_best_scheme(
965 &schemes,
966 &data,
967 CompressorContext::new(),
968 &mut exec_ctx,
969 )?;
970
971 assert!(matches!(
972 winner,
973 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
974 if scheme.id() == DirectRatioScheme.id()
975 ));
976 Ok(())
977 }
978
979 #[test]
980 fn callback_ratio_competes_numerically() -> VortexResult<()> {
981 let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
982 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
983 let data = estimate_test_data();
984 let mut exec_ctx = SESSION.create_execution_ctx();
985
986 let winner = compressor.choose_best_scheme(
987 &schemes,
988 &data,
989 CompressorContext::new(),
990 &mut exec_ctx,
991 )?;
992
993 assert!(matches!(
994 winner,
995 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
996 if scheme.id() == CallbackRatioScheme.id()
997 ));
998 Ok(())
999 }
1000
1001 #[test]
1002 fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
1003 let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
1004 let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
1005 let data = estimate_test_data();
1006 let mut exec_ctx = SESSION.create_execution_ctx();
1007
1008 let winner = compressor.choose_best_scheme(
1009 &schemes,
1010 &data,
1011 CompressorContext::new(),
1012 &mut exec_ctx,
1013 )?;
1014
1015 assert!(matches!(
1016 winner,
1017 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1018 if scheme.id() == HugeRatioScheme.id()
1019 ));
1020 Ok(())
1021 }
1022
1023 #[test]
1024 fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
1025 let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
1026 let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
1027 let data = estimate_test_data();
1028 let mut exec_ctx = SESSION.create_execution_ctx();
1029
1030 let winner = compressor.choose_best_scheme(
1031 &schemes,
1032 &data,
1033 CompressorContext::new(),
1034 &mut exec_ctx,
1035 )?;
1036
1037 assert!(matches!(
1038 winner,
1039 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1040 if scheme.id() == HugeRatioScheme.id()
1041 ));
1042 Ok(())
1043 }
1044
1045 #[test]
1046 fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
1047 let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
1048 let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
1049 let data = estimate_test_data();
1050 let mut exec_ctx = SESSION.create_execution_ctx();
1051
1052 let winner = compressor.choose_best_scheme(
1053 &schemes,
1054 &data,
1055 CompressorContext::new(),
1056 &mut exec_ctx,
1057 )?;
1058
1059 assert!(winner.is_none());
1060 Ok(())
1061 }
1062
1063 static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
1067 static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
1068
1069 #[derive(Debug)]
1070 struct ThresholdObservingScheme;
1071
1072 impl Scheme for ThresholdObservingScheme {
1073 fn scheme_name(&self) -> &'static str {
1074 "test.threshold_observing"
1075 }
1076
1077 fn matches(&self, canonical: &Canonical) -> bool {
1078 matches_integer_primitive(canonical)
1079 }
1080
1081 fn expected_compression_ratio(
1082 &self,
1083 _data: &ArrayAndStats,
1084 _compress_ctx: CompressorContext,
1085 _exec_ctx: &mut ExecutionCtx,
1086 ) -> CompressionEstimate {
1087 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1088 |_compressor, _data, best_so_far, _ctx, _exec_ctx| {
1089 *OBSERVED_THRESHOLD.lock() = Some(best_so_far);
1090 Ok(EstimateVerdict::Skip)
1091 },
1092 )))
1093 }
1094
1095 fn compress(
1096 &self,
1097 _compressor: &CascadingCompressor,
1098 _data: &ArrayAndStats,
1099 _compress_ctx: CompressorContext,
1100 _exec_ctx: &mut ExecutionCtx,
1101 ) -> VortexResult<ArrayRef> {
1102 unreachable!("test helper should never be selected for compression")
1103 }
1104 }
1105
1106 #[derive(Debug)]
1107 struct CallbackMatchingRatioScheme;
1108
1109 impl Scheme for CallbackMatchingRatioScheme {
1110 fn scheme_name(&self) -> &'static str {
1111 "test.callback_matching_ratio"
1112 }
1113
1114 fn matches(&self, canonical: &Canonical) -> bool {
1115 matches_integer_primitive(canonical)
1116 }
1117
1118 fn expected_compression_ratio(
1119 &self,
1120 _data: &ArrayAndStats,
1121 _compress_ctx: CompressorContext,
1122 _exec_ctx: &mut ExecutionCtx,
1123 ) -> CompressionEstimate {
1124 CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1125 |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
1126 )))
1127 }
1128
1129 fn compress(
1130 &self,
1131 _compressor: &CascadingCompressor,
1132 _data: &ArrayAndStats,
1133 _compress_ctx: CompressorContext,
1134 _exec_ctx: &mut ExecutionCtx,
1135 ) -> VortexResult<ArrayRef> {
1136 unreachable!("test helper should never be selected for compression")
1137 }
1138 }
1139
1140 #[test]
1141 fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
1142 let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
1146 let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
1147 let data = estimate_test_data();
1148 let mut exec_ctx = SESSION.create_execution_ctx();
1149
1150 let winner = compressor.choose_best_scheme(
1151 &schemes,
1152 &data,
1153 CompressorContext::new(),
1154 &mut exec_ctx,
1155 )?;
1156
1157 assert!(matches!(
1158 winner,
1159 Some((scheme, WinnerEstimate::AlwaysUse))
1160 if scheme.id() == CallbackAlwaysUseScheme.id()
1161 ));
1162 Ok(())
1163 }
1164
1165 #[test]
1166 fn threshold_reflects_pass_one_best() -> VortexResult<()> {
1167 let _guard = OBSERVER_LOCK.lock();
1168 *OBSERVED_THRESHOLD.lock() = None;
1169
1170 let compressor =
1171 CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
1172 let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
1173 let data = estimate_test_data();
1174 let mut exec_ctx = SESSION.create_execution_ctx();
1175
1176 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1177
1178 let observed = *OBSERVED_THRESHOLD.lock();
1179 assert!(matches!(
1180 observed,
1181 Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
1182 ));
1183 Ok(())
1184 }
1185
1186 #[test]
1187 fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
1188 let _guard = OBSERVER_LOCK.lock();
1189 *OBSERVED_THRESHOLD.lock() = None;
1190
1191 let compressor =
1192 CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
1193 let schemes: [&'static dyn Scheme; 2] =
1194 [&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
1195 let data = estimate_test_data();
1196 let mut exec_ctx = SESSION.create_execution_ctx();
1197
1198 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1199
1200 let observed = *OBSERVED_THRESHOLD.lock();
1203 assert_eq!(observed, Some(None));
1204 Ok(())
1205 }
1206
1207 #[test]
1208 fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
1209 let _guard = OBSERVER_LOCK.lock();
1210 *OBSERVED_THRESHOLD.lock() = None;
1211
1212 let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
1213 let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
1214 let data = estimate_test_data();
1215 let mut exec_ctx = SESSION.create_execution_ctx();
1216
1217 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1218
1219 let observed = *OBSERVED_THRESHOLD.lock();
1220 assert_eq!(observed, Some(None));
1221 Ok(())
1222 }
1223
1224 #[test]
1225 fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
1226 let _guard = OBSERVER_LOCK.lock();
1227 *OBSERVED_THRESHOLD.lock() = None;
1228
1229 let compressor =
1232 CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
1233 let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
1234 let data = estimate_test_data();
1235 let mut exec_ctx = SESSION.create_execution_ctx();
1236
1237 compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1238
1239 let observed = *OBSERVED_THRESHOLD.lock();
1240 assert!(matches!(
1241 observed,
1242 Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
1243 ));
1244 Ok(())
1245 }
1246
1247 #[test]
1248 fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
1249 let compressor =
1253 CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
1254 let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
1255 let data = estimate_test_data();
1256 let mut exec_ctx = SESSION.create_execution_ctx();
1257
1258 let winner = compressor.choose_best_scheme(
1259 &schemes,
1260 &data,
1261 CompressorContext::new(),
1262 &mut exec_ctx,
1263 )?;
1264
1265 assert!(matches!(
1266 winner,
1267 Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
1268 if scheme.id() == DirectRatioScheme.id() && r == 2.0
1269 ));
1270 Ok(())
1271 }
1272
1273 #[test]
1274 fn all_null_array_compresses_to_constant() -> VortexResult<()> {
1275 let array = PrimitiveArray::new(
1276 buffer![0i32, 0, 0, 0, 0],
1277 Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
1278 )
1279 .into_array();
1280
1281 let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
1284 let mut exec_ctx = SESSION.create_execution_ctx();
1285 let compressed = compressor.compress(&array, &mut exec_ctx)?;
1286 assert!(compressed.is::<Constant>());
1287 Ok(())
1288 }
1289
1290 #[test]
1297 fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
1298 let array = PrimitiveArray::new(
1300 buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
1301 Validity::NonNullable,
1302 )
1303 .into_array();
1304
1305 let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
1306
1307 let ctx = CompressorContext::new().with_sampling();
1311
1312 let mut exec_ctx = SESSION.create_execution_ctx();
1315 let score = estimate_compression_ratio_with_sampling(
1316 &compressor,
1317 &FloatDictScheme,
1318 &array,
1319 ctx,
1320 &mut exec_ctx,
1321 )?;
1322 assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
1323 Ok(())
1324 }
1325}