Skip to main content

vortex_compressor/
compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Cascading array compression implementation.
5
6use std::sync::Arc;
7
8use parking_lot::Mutex;
9use parking_lot::MutexGuard;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::CanonicalValidity;
13use vortex_array::ExecutionCtx;
14use vortex_array::IntoArray;
15use vortex_array::LEGACY_SESSION;
16use vortex_array::ToCanonical;
17use vortex_array::VortexSessionExecute;
18use vortex_array::arrays::ConstantArray;
19use vortex_array::arrays::ExtensionArray;
20use vortex_array::arrays::FixedSizeListArray;
21use vortex_array::arrays::ListArray;
22use vortex_array::arrays::ListViewArray;
23use vortex_array::arrays::StructArray;
24use vortex_array::arrays::extension::ExtensionArrayExt;
25use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
26use vortex_array::arrays::list::ListArrayExt;
27use vortex_array::arrays::listview::ListViewArrayExt;
28use vortex_array::arrays::listview::list_from_list_view;
29use vortex_array::arrays::primitive::PrimitiveArrayExt;
30use vortex_array::arrays::struct_::StructArrayExt;
31use vortex_array::dtype::DType;
32use vortex_array::dtype::Nullability;
33use vortex_array::scalar::Scalar;
34use vortex_error::VortexResult;
35use vortex_error::vortex_bail;
36
37use crate::builtins::IntDictScheme;
38use crate::ctx::CompressorContext;
39use crate::estimate::CompressionEstimate;
40use crate::estimate::DeferredEstimate;
41use crate::estimate::EstimateVerdict;
42use crate::estimate::estimate_compression_ratio_with_sampling;
43use crate::estimate::is_better_ratio;
44use crate::scheme::ChildSelection;
45use crate::scheme::DescendantExclusion;
46use crate::scheme::Scheme;
47use crate::scheme::SchemeExt;
48use crate::scheme::SchemeId;
49use crate::stats::ArrayAndStats;
50use crate::stats::GenerateStatsOptions;
51
52/// The implicit root scheme ID for the compressor's own cascading (e.g. list offset compression).
53///
54/// This is the **only** [`SchemeId`] that is not auto-provided via [`SchemeExt`].
55const ROOT_SCHEME_ID: SchemeId = SchemeId {
56    name: "vortex.compressor.root",
57};
58
59/// Child indices for the compressor's list/listview compression.
60mod root_list_children {
61    /// List/ListView offsets child.
62    pub const OFFSETS: usize = 1;
63    /// ListView sizes child.
64    pub const SIZES: usize = 2;
65}
66
67/// The winning estimate for a scheme after all deferred work has been resolved.
68#[derive(Debug, Clone, Copy, PartialEq)]
69enum WinnerEstimate {
70    /// The scheme must be used immediately.
71    AlwaysUse,
72    /// The scheme won by numeric compression ratio.
73    Ratio(f64),
74}
75
76/// The main compressor type implementing cascading adaptive compression.
77///
78/// This compressor applies adaptive compression [`Scheme`]s to arrays based on their data types and
79/// characteristics. It recursively compresses nested structures like structs and lists, and chooses
80/// optimal compression schemes for leaf types.
81///
82/// The compressor works by:
83/// 1. Canonicalizing input arrays to a standard representation.
84/// 2. Pre-filtering schemes by [`Scheme::matches`] and exclusion rules.
85/// 3. Evaluating each matching scheme's compression estimate and resolving deferred work.
86/// 4. Compressing with the best scheme and verifying the result is smaller.
87///
88/// No scheme may appear twice in a cascade chain. The compressor enforces this automatically
89/// along with push/pull exclusion rules declared by each scheme.
90#[derive(Debug, Clone)]
91pub struct CascadingCompressor {
92    /// The enabled compression schemes.
93    schemes: Vec<&'static dyn Scheme>,
94
95    /// Descendant exclusion rules for the compressor's own cascading (e.g. excluding Dict from
96    /// list offsets).
97    root_exclusions: Vec<DescendantExclusion>,
98
99    /// Shared execution context for array operations during compression.
100    ///
101    /// This should have low contention as we only execute arrays one at a time during compression.
102    ctx: Arc<Mutex<ExecutionCtx>>,
103}
104
105impl CascadingCompressor {
106    /// Creates a new compressor with the given schemes.
107    ///
108    /// Root-level exclusion rules (e.g. excluding Dict from list offsets) are built
109    /// automatically.
110    pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
111        // Root exclusion: exclude IntDict from list/listview offsets (monotonically
112        // increasing data where dictionary encoding is wasteful).
113        let root_exclusions = vec![DescendantExclusion {
114            excluded: IntDictScheme.id(),
115            children: ChildSelection::One(root_list_children::OFFSETS),
116        }];
117        Self {
118            schemes,
119            root_exclusions,
120            // TODO(connor): The caller should probably pass this in.
121            ctx: Arc::new(Mutex::new(LEGACY_SESSION.create_execution_ctx())),
122        }
123    }
124
125    /// Returns a mutable borrow of the execution context.
126    pub fn execution_ctx(&self) -> MutexGuard<'_, ExecutionCtx> {
127        self.ctx.lock()
128    }
129
130    /// Compresses an array using cascading adaptive compression.
131    ///
132    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if canonicalization or compression fails.
137    pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
138        let canonical = array
139            .clone()
140            .execute::<CanonicalValidity>(&mut self.execution_ctx())?
141            .0;
142
143        // Compact it, removing any wasted space before we attempt to compress it.
144        let compact = canonical.compact()?;
145
146        self.compress_canonical(compact, CompressorContext::new())
147    }
148
149    /// Compresses a child array produced by a cascading scheme.
150    ///
151    /// If the cascade budget is exhausted, the canonical array is returned as-is. Otherwise,
152    /// the child context is created by descending and recording the parent scheme + child
153    /// index, and compression proceeds normally.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if compression fails.
158    pub fn compress_child(
159        &self,
160        child: &ArrayRef,
161        parent_ctx: &CompressorContext,
162        parent_id: SchemeId,
163        child_index: usize,
164    ) -> VortexResult<ArrayRef> {
165        if parent_ctx.finished_cascading() {
166            return Ok(child.clone());
167        }
168
169        let canonical = child
170            .clone()
171            .execute::<CanonicalValidity>(&mut self.execution_ctx())?
172            .0;
173        let compact = canonical.compact()?;
174
175        let child_ctx = parent_ctx
176            .clone()
177            .descend_with_scheme(parent_id, child_index);
178        self.compress_canonical(compact, child_ctx)
179    }
180
181    /// Compresses a canonical array by dispatching to type-specific logic.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if compression of any sub-array fails.
186    fn compress_canonical(
187        &self,
188        array: Canonical,
189        ctx: CompressorContext,
190    ) -> VortexResult<ArrayRef> {
191        match array {
192            Canonical::Null(null_array) => Ok(null_array.into_array()),
193            Canonical::Bool(bool_array) => {
194                self.choose_and_compress(Canonical::Bool(bool_array), ctx)
195            }
196            Canonical::Primitive(primitive) => {
197                self.choose_and_compress(Canonical::Primitive(primitive), ctx)
198            }
199            Canonical::Decimal(decimal) => {
200                self.choose_and_compress(Canonical::Decimal(decimal), ctx)
201            }
202            Canonical::Struct(struct_array) => {
203                let fields = struct_array
204                    .iter_unmasked_fields()
205                    .map(|field| self.compress(field))
206                    .collect::<Result<Vec<_>, _>>()?;
207
208                Ok(StructArray::try_new(
209                    struct_array.names().clone(),
210                    fields,
211                    struct_array.len(),
212                    struct_array.validity()?,
213                )?
214                .into_array())
215            }
216            Canonical::List(list_view_array) => {
217                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
218                    let list_array = list_from_list_view(list_view_array)?;
219                    self.compress_list_array(list_array, ctx)
220                } else {
221                    self.compress_list_view_array(list_view_array, ctx)
222                }
223            }
224            Canonical::FixedSizeList(fsl_array) => {
225                let compressed_elems = self.compress(fsl_array.elements())?;
226
227                Ok(FixedSizeListArray::try_new(
228                    compressed_elems,
229                    fsl_array.list_size(),
230                    fsl_array.validity()?,
231                    fsl_array.len(),
232                )?
233                .into_array())
234            }
235            Canonical::VarBinView(strings) => {
236                if strings
237                    .dtype()
238                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
239                {
240                    self.choose_and_compress(Canonical::VarBinView(strings), ctx)
241                } else {
242                    // We do not compress binary arrays.
243                    Ok(strings.into_array())
244                }
245            }
246            Canonical::Extension(ext_array) => {
247                let before_nbytes = ext_array.as_ref().nbytes();
248
249                // Try scheme-based compression first.
250                let result =
251                    self.choose_and_compress(Canonical::Extension(ext_array.clone()), ctx)?;
252                if result.nbytes() < before_nbytes {
253                    return Ok(result);
254                }
255
256                // Otherwise, fall back to compressing the underlying storage array.
257                let compressed_storage = self.compress(ext_array.storage_array())?;
258
259                Ok(
260                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
261                        .into_array(),
262                )
263            }
264            Canonical::Variant(_) => {
265                vortex_bail!("Variant arrays can not be compressed")
266            }
267        }
268    }
269
270    /// The main scheme-selection entry point for a single leaf array.
271    ///
272    /// Filters allowed schemes by [`matches`] and exclusion rules, merges their [`stats_options`]
273    /// into a single [`GenerateStatsOptions`], then delegates to [`choose_scheme`] to pick the
274    /// winner by estimated compression ratio.
275    ///
276    /// If a winner is found and its compressed output is actually smaller, that output is returned.
277    /// Otherwise, the original array is returned unchanged.
278    ///
279    /// Empty and all-null arrays are short-circuited before any scheme evaluation.
280    ///
281    /// [`matches`]: Scheme::matches
282    /// [`stats_options`]: Scheme::stats_options
283    /// [`choose_scheme`]: Self::choose_scheme
284    fn choose_and_compress(
285        &self,
286        canonical: Canonical,
287        ctx: CompressorContext,
288    ) -> VortexResult<ArrayRef> {
289        let eligible_schemes: Vec<&'static dyn Scheme> = self
290            .schemes
291            .iter()
292            .copied()
293            .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &ctx))
294            .collect();
295
296        let array: ArrayRef = canonical.into();
297
298        // If there are no schemes that we can compress into, then just return it uncompressed.
299        if eligible_schemes.is_empty() {
300            return Ok(array);
301        }
302
303        // Nothing to compress if empty or all-null.
304        if array.is_empty() {
305            return Ok(array);
306        }
307        if array.all_invalid(&mut LEGACY_SESSION.create_execution_ctx())? {
308            return Ok(
309                ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
310            );
311        }
312
313        let before_nbytes = array.nbytes();
314
315        let merged_opts = eligible_schemes
316            .iter()
317            .fold(GenerateStatsOptions::default(), |acc, s| {
318                acc.merge(s.stats_options())
319            });
320        let ctx = ctx.with_merged_stats_options(merged_opts);
321
322        let mut data = ArrayAndStats::new(array, merged_opts);
323
324        // TODO(connor): Add tracing support for logging the winner estimate.
325        if let Some((winner, _winner_estimate)) =
326            self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())?
327        {
328            // Sampling and estimation chose a scheme, so let's compress the whole array with it.
329            let compressed = winner.compress(self, &mut data, ctx)?;
330
331            // TODO(connor): Add a tracing warning here if compression with the chosen scheme
332            // failed, since there was likely more we could have done while choosing schemes.
333
334            // Only choose the compressed array if it is smaller than the canonical one.
335            if compressed.nbytes() < before_nbytes {
336                // TODO(connor): Add a tracing warning here too.
337                return Ok(compressed);
338            }
339        }
340
341        // No scheme improved on the original.
342        Ok(data.into_array())
343    }
344
345    /// Calls [`expected_compression_ratio`] on each candidate and returns the winning scheme and
346    /// resolved winner estimate, or `None` if no scheme exceeds 1.0. Ties are broken by
347    /// registration order (earlier in the list wins).
348    ///
349    /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio
350    fn choose_best_scheme(
351        &self,
352        schemes: &[&'static dyn Scheme],
353        data: &mut ArrayAndStats,
354        ctx: CompressorContext,
355    ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
356        let mut best: Option<(&'static dyn Scheme, f64)> = None;
357
358        // TODO(connor): Might want to use an `im` data structure inside of `ctx` if the clones here
359        // are expensive.
360        for &scheme in schemes {
361            let estimate = scheme.expected_compression_ratio(data, ctx.clone());
362
363            // TODO(connor): Rather than computing the deferred estimates eagerly, it would be
364            // better to look at all quick estimates and see if it makes sense to sample at all.
365            match estimate {
366                CompressionEstimate::Verdict(verdict) => {
367                    if let Some(winner_estimate) =
368                        Self::check_and_update_estimate_verdict(&mut best, scheme, verdict)
369                    {
370                        return Ok(Some((scheme, winner_estimate)));
371                    }
372                }
373                CompressionEstimate::Deferred(DeferredEstimate::Sample) => {
374                    let sample_ratio = estimate_compression_ratio_with_sampling(
375                        scheme,
376                        self,
377                        data.array(),
378                        ctx.clone(),
379                    )?;
380
381                    if is_better_ratio(sample_ratio, &best) {
382                        best = Some((scheme, sample_ratio));
383                    }
384                }
385                CompressionEstimate::Deferred(DeferredEstimate::Callback(estimate_callback)) => {
386                    let verdict = estimate_callback(self, data, ctx.clone())?;
387                    if let Some(winner_estimate) =
388                        Self::check_and_update_estimate_verdict(&mut best, scheme, verdict)
389                    {
390                        return Ok(Some((scheme, winner_estimate)));
391                    }
392                }
393            }
394        }
395
396        Ok(best.map(|(scheme, ratio)| (scheme, WinnerEstimate::Ratio(ratio))))
397    }
398
399    /// Updates `best` from a terminal estimate verdict.
400    fn check_and_update_estimate_verdict(
401        best: &mut Option<(&'static dyn Scheme, f64)>,
402        scheme: &'static dyn Scheme,
403        verdict: EstimateVerdict,
404    ) -> Option<WinnerEstimate> {
405        match verdict {
406            EstimateVerdict::Skip => None,
407            EstimateVerdict::AlwaysUse => Some(WinnerEstimate::AlwaysUse),
408            EstimateVerdict::Ratio(ratio) => {
409                if is_better_ratio(ratio, &*best) {
410                    *best = Some((scheme, ratio));
411                }
412                None
413            }
414        }
415    }
416
417    // TODO(connor): Lots of room for optimization here.
418    /// Returns `true` if the candidate scheme should be excluded based on the cascade history and
419    /// exclusion rules.
420    fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
421        let id = candidate.id();
422        let history = ctx.cascade_history();
423
424        // Self-exclusion: no scheme appears twice in any chain.
425        if history.iter().any(|&(sid, _)| sid == id) {
426            return true;
427        }
428
429        let mut iter = history.iter().copied().peekable();
430
431        // The root entry is always first in the history (if present). Check if the root has
432        // excluded us.
433        if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
434            && self
435                .root_exclusions
436                .iter()
437                .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
438        {
439            return true;
440        }
441
442        // Push rules: Check if any of our ancestors have excluded us.
443        for (ancestor_id, child_idx) in iter {
444            if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
445                && ancestor
446                    .descendant_exclusions()
447                    .iter()
448                    .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
449            {
450                return true;
451            }
452        }
453
454        // Pull rules: Check if we have excluded ourselves because of our ancestors.
455        for rule in candidate.ancestor_exclusions() {
456            if history
457                .iter()
458                .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
459            {
460                return true;
461            }
462        }
463
464        false
465    }
466
467    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
468    fn compress_list_array(
469        &self,
470        list_array: ListArray,
471        ctx: CompressorContext,
472    ) -> VortexResult<ArrayRef> {
473        let list_array = list_array.reset_offsets(true)?;
474
475        let compressed_elems = self.compress(list_array.elements())?;
476
477        // Record the root scheme with the offsets child index so root exclusion rules apply.
478        let offset_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
479        let compressed_offsets = self.compress_canonical(
480            Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
481            offset_ctx,
482        )?;
483
484        Ok(
485            ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
486                .into_array(),
487        )
488    }
489
490    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
491    /// elements.
492    fn compress_list_view_array(
493        &self,
494        list_view: ListViewArray,
495        ctx: CompressorContext,
496    ) -> VortexResult<ArrayRef> {
497        let compressed_elems = self.compress(list_view.elements())?;
498
499        let offset_ctx = ctx
500            .clone()
501            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
502        let compressed_offsets = self.compress_canonical(
503            Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
504            offset_ctx,
505        )?;
506
507        let sizes_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
508        let compressed_sizes = self.compress_canonical(
509            Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
510            sizes_ctx,
511        )?;
512
513        Ok(ListViewArray::try_new(
514            compressed_elems,
515            compressed_offsets,
516            compressed_sizes,
517            list_view.validity()?,
518        )?
519        .into_array())
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use vortex_array::ArrayRef;
526    use vortex_array::Canonical;
527    use vortex_array::arrays::BoolArray;
528    use vortex_array::arrays::Constant;
529    use vortex_array::arrays::PrimitiveArray;
530    use vortex_array::validity::Validity;
531    use vortex_buffer::buffer;
532
533    use super::*;
534    use crate::builtins::FloatDictScheme;
535    use crate::builtins::IntDictScheme;
536    use crate::builtins::StringDictScheme;
537    use crate::ctx::CompressorContext;
538    use crate::estimate::CompressionEstimate;
539    use crate::estimate::DeferredEstimate;
540    use crate::estimate::EstimateVerdict;
541    use crate::scheme::SchemeExt;
542
543    fn compressor() -> CascadingCompressor {
544        CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
545    }
546
547    fn estimate_test_data() -> ArrayAndStats {
548        let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
549        ArrayAndStats::new(array, GenerateStatsOptions::default())
550    }
551
552    fn matches_integer_primitive(canonical: &Canonical) -> bool {
553        matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
554    }
555
556    #[derive(Debug)]
557    struct DirectRatioScheme;
558
559    impl Scheme for DirectRatioScheme {
560        fn scheme_name(&self) -> &'static str {
561            "test.direct_ratio"
562        }
563
564        fn matches(&self, canonical: &Canonical) -> bool {
565            matches_integer_primitive(canonical)
566        }
567
568        fn expected_compression_ratio(
569            &self,
570            _data: &mut ArrayAndStats,
571            _ctx: CompressorContext,
572        ) -> CompressionEstimate {
573            CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
574        }
575
576        fn compress(
577            &self,
578            _compressor: &CascadingCompressor,
579            _data: &mut ArrayAndStats,
580            _ctx: CompressorContext,
581        ) -> VortexResult<ArrayRef> {
582            unreachable!("test helper should never be selected for compression")
583        }
584    }
585
586    #[derive(Debug)]
587    struct ImmediateAlwaysUseScheme;
588
589    impl Scheme for ImmediateAlwaysUseScheme {
590        fn scheme_name(&self) -> &'static str {
591            "test.immediate_always_use"
592        }
593
594        fn matches(&self, canonical: &Canonical) -> bool {
595            matches_integer_primitive(canonical)
596        }
597
598        fn expected_compression_ratio(
599            &self,
600            _data: &mut ArrayAndStats,
601            _ctx: CompressorContext,
602        ) -> CompressionEstimate {
603            CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
604        }
605
606        fn compress(
607            &self,
608            _compressor: &CascadingCompressor,
609            _data: &mut ArrayAndStats,
610            _ctx: CompressorContext,
611        ) -> VortexResult<ArrayRef> {
612            unreachable!("test helper should never be selected for compression")
613        }
614    }
615
616    #[derive(Debug)]
617    struct CallbackAlwaysUseScheme;
618
619    impl Scheme for CallbackAlwaysUseScheme {
620        fn scheme_name(&self) -> &'static str {
621            "test.callback_always_use"
622        }
623
624        fn matches(&self, canonical: &Canonical) -> bool {
625            matches_integer_primitive(canonical)
626        }
627
628        fn expected_compression_ratio(
629            &self,
630            _data: &mut ArrayAndStats,
631            _ctx: CompressorContext,
632        ) -> CompressionEstimate {
633            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
634                |_compressor, _data, _ctx| Ok(EstimateVerdict::AlwaysUse),
635            )))
636        }
637
638        fn compress(
639            &self,
640            _compressor: &CascadingCompressor,
641            _data: &mut ArrayAndStats,
642            _ctx: CompressorContext,
643        ) -> VortexResult<ArrayRef> {
644            unreachable!("test helper should never be selected for compression")
645        }
646    }
647
648    #[derive(Debug)]
649    struct CallbackSkipScheme;
650
651    impl Scheme for CallbackSkipScheme {
652        fn scheme_name(&self) -> &'static str {
653            "test.callback_skip"
654        }
655
656        fn matches(&self, canonical: &Canonical) -> bool {
657            matches_integer_primitive(canonical)
658        }
659
660        fn expected_compression_ratio(
661            &self,
662            _data: &mut ArrayAndStats,
663            _ctx: CompressorContext,
664        ) -> CompressionEstimate {
665            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
666                |_compressor, _data, _ctx| Ok(EstimateVerdict::Skip),
667            )))
668        }
669
670        fn compress(
671            &self,
672            _compressor: &CascadingCompressor,
673            _data: &mut ArrayAndStats,
674            _ctx: CompressorContext,
675        ) -> VortexResult<ArrayRef> {
676            unreachable!("test helper should never be selected for compression")
677        }
678    }
679
680    #[derive(Debug)]
681    struct CallbackRatioScheme;
682
683    impl Scheme for CallbackRatioScheme {
684        fn scheme_name(&self) -> &'static str {
685            "test.callback_ratio"
686        }
687
688        fn matches(&self, canonical: &Canonical) -> bool {
689            matches_integer_primitive(canonical)
690        }
691
692        fn expected_compression_ratio(
693            &self,
694            _data: &mut ArrayAndStats,
695            _ctx: CompressorContext,
696        ) -> CompressionEstimate {
697            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
698                |_compressor, _data, _ctx| Ok(EstimateVerdict::Ratio(3.0)),
699            )))
700        }
701
702        fn compress(
703            &self,
704            _compressor: &CascadingCompressor,
705            _data: &mut ArrayAndStats,
706            _ctx: CompressorContext,
707        ) -> VortexResult<ArrayRef> {
708            unreachable!("test helper should never be selected for compression")
709        }
710    }
711
712    #[test]
713    fn test_self_exclusion() {
714        let c = compressor();
715        let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
716
717        // IntDictScheme is in the history, so it should be excluded.
718        assert!(c.is_excluded(&IntDictScheme, &ctx));
719    }
720
721    #[test]
722    fn test_root_exclusion_list_offsets() {
723        let c = compressor();
724        let ctx = CompressorContext::default()
725            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
726
727        // IntDict should be excluded for list offsets.
728        assert!(c.is_excluded(&IntDictScheme, &ctx));
729    }
730
731    #[test]
732    fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
733        let c = compressor();
734        // FloatDict cascading through codes (child 1).
735        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
736
737        // IntDict should be excluded from FloatDict's codes child.
738        assert!(c.is_excluded(&IntDictScheme, &ctx));
739    }
740
741    #[test]
742    fn test_push_rule_float_dict_excludes_int_dict_from_values() {
743        let c = compressor();
744        // FloatDict cascading through values (child 0).
745        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
746
747        // IntDict should also be excluded from FloatDict's values child (ALP propagation
748        // replacement).
749        assert!(c.is_excluded(&IntDictScheme, &ctx));
750    }
751
752    #[test]
753    fn test_no_exclusion_without_history() {
754        let c = compressor();
755        let ctx = CompressorContext::default();
756
757        // No history means no exclusions.
758        assert!(!c.is_excluded(&IntDictScheme, &ctx));
759    }
760
761    #[test]
762    fn immediate_always_use_wins_immediately() -> VortexResult<()> {
763        let compressor =
764            CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
765        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
766        let mut data = estimate_test_data();
767
768        let winner =
769            compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
770
771        assert!(matches!(
772            winner,
773            Some((scheme, WinnerEstimate::AlwaysUse))
774                if scheme.id() == ImmediateAlwaysUseScheme.id()
775        ));
776        Ok(())
777    }
778
779    #[test]
780    fn callback_always_use_wins_immediately() -> VortexResult<()> {
781        let compressor =
782            CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
783        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
784        let mut data = estimate_test_data();
785
786        let winner =
787            compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
788
789        assert!(matches!(
790            winner,
791            Some((scheme, WinnerEstimate::AlwaysUse))
792                if scheme.id() == CallbackAlwaysUseScheme.id()
793        ));
794        Ok(())
795    }
796
797    #[test]
798    fn callback_skip_is_ignored() -> VortexResult<()> {
799        let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
800        let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
801        let mut data = estimate_test_data();
802
803        let winner =
804            compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
805
806        assert!(matches!(
807            winner,
808            Some((scheme, WinnerEstimate::Ratio(2.0)))
809                if scheme.id() == DirectRatioScheme.id()
810        ));
811        Ok(())
812    }
813
814    #[test]
815    fn callback_ratio_competes_numerically() -> VortexResult<()> {
816        let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
817        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
818        let mut data = estimate_test_data();
819
820        let winner =
821            compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?;
822
823        assert!(matches!(
824            winner,
825            Some((scheme, WinnerEstimate::Ratio(3.0)))
826                if scheme.id() == CallbackRatioScheme.id()
827        ));
828        Ok(())
829    }
830
831    #[test]
832    fn all_null_array_compresses_to_constant() -> VortexResult<()> {
833        let array = PrimitiveArray::new(
834            buffer![0i32, 0, 0, 0, 0],
835            Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
836        )
837        .into_array();
838
839        // The compressor should produce a `ConstantArray` for an all-null array regardless of
840        // which schemes are registered.
841        let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
842        let compressed = compressor.compress(&array)?;
843        assert!(compressed.is::<Constant>());
844        Ok(())
845    }
846
847    /// Regression test for <https://github.com/vortex-data/vortex/issues/7227>.
848    ///
849    /// `estimate_compression_ratio_with_sampling` must use the *scheme's* stats options
850    /// (which request distinct-value counting) rather than the context's stats options
851    /// (which may not). With the old code this panicked inside `dictionary_encode` because
852    /// distinct values were never computed for the sample.
853    #[test]
854    fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
855        // Low-cardinality float array so FloatDictScheme considers it compressible.
856        let array = PrimitiveArray::new(
857            buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
858            Validity::NonNullable,
859        )
860        .into_array();
861
862        let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
863
864        // A context with default stats_options (count_distinct_values = false) and
865        // marked as a sample so the function skips the sampling step and compresses
866        // the array directly.
867        let ctx = CompressorContext::new().with_sampling();
868
869        // Before the fix this panicked with:
870        //   "this must be present since `DictScheme` declared that we need distinct values"
871        let ratio =
872            estimate_compression_ratio_with_sampling(&FloatDictScheme, &compressor, &array, ctx)?;
873        assert!(ratio.is_finite());
874        Ok(())
875    }
876}