Skip to main content

vortex_compressor/
compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Cascading array compression implementation.
5
6use vortex_array::ArrayRef;
7use vortex_array::ArraySlots;
8use vortex_array::Canonical;
9use vortex_array::CanonicalValidity;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::ExtensionArray;
14use vortex_array::arrays::FixedSizeListArray;
15use vortex_array::arrays::ListArray;
16use vortex_array::arrays::ListViewArray;
17use vortex_array::arrays::PrimitiveArray;
18use vortex_array::arrays::StructArray;
19use vortex_array::arrays::Variant;
20use vortex_array::arrays::VariantArray;
21use vortex_array::arrays::extension::ExtensionArrayExt;
22use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
23use vortex_array::arrays::list::ListArrayExt;
24use vortex_array::arrays::listview::ListViewArrayExt;
25use vortex_array::arrays::listview::list_from_list_view;
26use vortex_array::arrays::primitive::PrimitiveArrayExt;
27use vortex_array::arrays::scalar_fn::AnyScalarFn;
28use vortex_array::arrays::struct_::StructArrayExt;
29use vortex_array::arrays::variant::VariantArrayExt;
30use vortex_array::dtype::DType;
31use vortex_array::dtype::Nullability;
32use vortex_array::scalar::Scalar;
33use vortex_error::VortexResult;
34
35use crate::builtins::IntDictScheme;
36use crate::ctx::CompressorContext;
37use crate::estimate::CompressionEstimate;
38use crate::estimate::DeferredEstimate;
39use crate::estimate::EstimateScore;
40use crate::estimate::EstimateVerdict;
41use crate::estimate::WinnerEstimate;
42use crate::estimate::estimate_compression_ratio_with_sampling;
43use crate::estimate::is_better_score;
44use crate::scheme::ChildSelection;
45use crate::scheme::DescendantExclusion;
46use crate::scheme::Scheme;
47use crate::scheme::SchemeExt;
48use crate::scheme::SchemeId;
49use crate::stats::ArrayAndStats;
50use crate::stats::GenerateStatsOptions;
51use crate::trace;
52
53/// Synthetic scheme ID used for the compressor's own root-level cascading.
54pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
55    name: "vortex.compressor.root",
56};
57
58/// Child indices for the compressor's list/listview compression.
59mod root_list_children {
60    /// List/ListView offsets child.
61    pub const OFFSETS: usize = 1;
62    /// ListView sizes child.
63    pub const SIZES: usize = 2;
64}
65
66/// The main compressor type implementing cascading adaptive compression.
67///
68/// This compressor applies adaptive compression [`Scheme`]s to arrays based on their data types and
69/// characteristics. It recursively compresses nested structures like structs and lists, and chooses
70/// optimal compression schemes for leaf types.
71///
72/// The compressor works by:
73/// 1. Canonicalizing input arrays to a standard representation.
74/// 2. Pre-filtering schemes by [`Scheme::matches`] and exclusion rules.
75/// 3. Evaluating each matching scheme's compression estimate and resolving deferred work.
76/// 4. Compressing with the best scheme and verifying the result is smaller.
77///
78/// No scheme may appear twice in a cascade chain. The compressor enforces this automatically
79/// along with push/pull exclusion rules declared by each scheme.
80#[derive(Debug, Clone)]
81pub struct CascadingCompressor {
82    /// The enabled compression schemes.
83    schemes: Vec<&'static dyn Scheme>,
84
85    /// Descendant exclusion rules for the compressor's own cascading (e.g. excluding Dict from
86    /// list offsets).
87    root_exclusions: Vec<DescendantExclusion>,
88}
89
90impl CascadingCompressor {
91    /// Creates a new compressor with the given schemes.
92    ///
93    /// Root-level exclusion rules (e.g. excluding Dict from list offsets) are built
94    /// automatically.
95    pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
96        // Root exclusion: exclude IntDict from list/listview offsets (monotonically
97        // increasing data where dictionary encoding is wasteful).
98        let root_exclusions = vec![DescendantExclusion {
99            excluded: IntDictScheme.id(),
100            children: ChildSelection::One(root_list_children::OFFSETS),
101        }];
102        Self {
103            schemes,
104            root_exclusions,
105        }
106    }
107
108    /// Compresses an array using cascading adaptive compression.
109    ///
110    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if canonicalization or compression fails.
115    pub fn compress(
116        &self,
117        array: &ArrayRef,
118        exec_ctx: &mut ExecutionCtx,
119    ) -> VortexResult<ArrayRef> {
120        let before_nbytes = array.nbytes();
121        let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
122        let _enter = span.enter();
123
124        let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
125        let compact = canonical.compact()?;
126        let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
127
128        trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
129
130        Ok(compressed)
131    }
132
133    /// Compresses a child array produced by a cascading scheme.
134    ///
135    /// If the cascade budget is exhausted, the canonical array is returned as-is. Otherwise, the
136    /// child context is created by descending and recording the parent scheme + child index, and
137    /// compression proceeds normally.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if compression fails.
142    pub fn compress_child(
143        &self,
144        child: &ArrayRef,
145        parent_ctx: &CompressorContext,
146        parent_id: SchemeId,
147        child_index: usize,
148        exec_ctx: &mut ExecutionCtx,
149    ) -> VortexResult<ArrayRef> {
150        if parent_ctx.finished_cascading() {
151            trace::cascade_exhausted(parent_id, child_index);
152            return Ok(child.clone());
153        }
154
155        let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
156        let compact = canonical.compact()?;
157
158        let child_ctx = parent_ctx
159            .clone()
160            .descend_with_scheme(parent_id, child_index);
161        self.compress_canonical(compact, child_ctx, exec_ctx)
162    }
163
164    /// Compresses a canonical array by dispatching to type-specific logic.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if compression of any sub-array fails.
169    fn compress_canonical(
170        &self,
171        array: Canonical,
172        compress_ctx: CompressorContext,
173        exec_ctx: &mut ExecutionCtx,
174    ) -> VortexResult<ArrayRef> {
175        match array {
176            Canonical::Null(null_array) => Ok(null_array.into_array()),
177            Canonical::Bool(bool_array) => {
178                self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
179            }
180            Canonical::Primitive(primitive) => {
181                self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
182            }
183            Canonical::Decimal(decimal) => {
184                self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
185            }
186            Canonical::Struct(struct_array) => {
187                let fields = struct_array
188                    .iter_unmasked_fields()
189                    .map(|field| self.compress(field, exec_ctx))
190                    .collect::<Result<Vec<_>, _>>()?;
191
192                Ok(StructArray::try_new(
193                    struct_array.names().clone(),
194                    fields,
195                    struct_array.len(),
196                    struct_array.validity()?,
197                )?
198                .into_array())
199            }
200            Canonical::List(list_view_array) => {
201                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
202                    let list_array = list_from_list_view(list_view_array)?;
203                    self.compress_list_array(list_array, compress_ctx, exec_ctx)
204                } else {
205                    self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
206                }
207            }
208            Canonical::FixedSizeList(fsl_array) => {
209                let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
210
211                Ok(FixedSizeListArray::try_new(
212                    compressed_elems,
213                    fsl_array.list_size(),
214                    fsl_array.validity()?,
215                    fsl_array.len(),
216                )?
217                .into_array())
218            }
219            Canonical::VarBinView(strings) => {
220                if strings
221                    .dtype()
222                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
223                {
224                    self.choose_and_compress(Canonical::VarBinView(strings), compress_ctx, exec_ctx)
225                } else {
226                    // We do not compress binary arrays.
227                    Ok(strings.into_array())
228                }
229            }
230            Canonical::Extension(ext_array) => {
231                let before_nbytes = ext_array.as_ref().nbytes();
232
233                // Try scheme-based compression first.
234                let result = self.choose_and_compress(
235                    Canonical::Extension(ext_array.clone()),
236                    compress_ctx,
237                    exec_ctx,
238                )?;
239                if result.nbytes() < before_nbytes {
240                    return Ok(result);
241                }
242
243                // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
244                if result.is::<AnyScalarFn>() {
245                    return Ok(result);
246                }
247
248                // Otherwise, fall back to compressing the underlying storage array.
249                let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
250
251                Ok(
252                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
253                        .into_array(),
254                )
255            }
256            Canonical::Variant(variant_array) => {
257                let core_storage =
258                    self.compress_physical_slots(variant_array.core_storage(), exec_ctx)?;
259                let shredded = variant_array
260                    .shredded()
261                    .map(|arr| {
262                        // Avoid stack-overflow for variant shredded values
263                        if arr.is::<Variant>() {
264                            self.compress_physical_slots(arr, exec_ctx)
265                        } else {
266                            self.compress(arr, exec_ctx)
267                        }
268                    })
269                    .transpose()?;
270
271                Ok(VariantArray::try_new(core_storage, shredded)?.into_array())
272            }
273        }
274    }
275
276    /// The main scheme-selection entry point for a single leaf array.
277    ///
278    /// Filters allowed schemes by [`matches`] and exclusion rules, merges their [`stats_options`]
279    /// into a single [`GenerateStatsOptions`], and picks the winner by estimated compression
280    /// ratio.
281    ///
282    /// If a winner is found and its compressed output is actually smaller, that output is
283    /// returned. Otherwise, the original array is returned unchanged.
284    ///
285    /// Empty and all-null arrays are short-circuited before any scheme evaluation.
286    ///
287    /// [`matches`]: Scheme::matches
288    /// [`stats_options`]: Scheme::stats_options
289    fn choose_and_compress(
290        &self,
291        canonical: Canonical,
292        compress_ctx: CompressorContext,
293        exec_ctx: &mut ExecutionCtx,
294    ) -> VortexResult<ArrayRef> {
295        let eligible_schemes: Vec<&'static dyn Scheme> = self
296            .schemes
297            .iter()
298            .copied()
299            .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
300            .collect();
301
302        let array: ArrayRef = canonical.into();
303
304        if eligible_schemes.is_empty() || array.is_empty() {
305            return Ok(array);
306        }
307
308        if array.all_invalid(exec_ctx)? {
309            return Ok(
310                ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
311            );
312        }
313
314        let before_nbytes = array.nbytes();
315
316        let merged_opts = eligible_schemes
317            .iter()
318            .fold(GenerateStatsOptions::default(), |acc, s| {
319                acc.merge(s.stats_options())
320            });
321        let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
322
323        let data = ArrayAndStats::new(array, merged_opts);
324
325        let Some((winner, winner_estimate)) =
326            self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
327        else {
328            return Ok(data.into_array());
329        };
330
331        // Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the
332        // scheme name and cascade history before propagating.
333        let error_ctx = trace::enabled_error_context(&compress_ctx);
334        let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
335        let compressed = winner
336            .compress(self, &data, compress_ctx, exec_ctx)
337            .inspect_err(|err| {
338                // NB: this is the only way we can tell which scheme panicked / bailed on their
339                // data, especially for third-party schemes where the error site may not carry any
340                // compressor context.
341                trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
342            })?;
343
344        let after_nbytes = compressed.nbytes();
345        let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
346
347        // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
348        let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
349
350        trace::record_winner_compress_result(
351            after_nbytes,
352            winner_estimate.trace_ratio(),
353            actual_ratio,
354            accepted,
355        );
356
357        if accepted {
358            Ok(compressed)
359        } else {
360            Ok(data.into_array())
361        }
362    }
363
364    /// Calls [`expected_compression_ratio`] on each candidate and returns the winning scheme along
365    /// with its resolved winner estimate, or `None` if no scheme beats the canonical encoding.
366    ///
367    /// Selection runs in two passes. Pass 1 evaluates every immediate
368    /// [`CompressionEstimate::Verdict`] and tracks the running best. [`Scheme`]s returning
369    /// [`CompressionEstimate::Deferred`] are stashed for pass 2 so that we do not make any
370    /// expensive computations if we don't have to.
371    ///
372    /// Pass 2 evaluates the deferred work and, for each [`DeferredEstimate::Callback`], passes the
373    /// current best [`EstimateScore`] as an early-exit hint so the callback can return
374    /// [`EstimateVerdict::Skip`] without doing expensive work when it cannot beat the threshold.
375    ///
376    /// Ties are broken by registration order within each pass.
377    ///
378    /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio
379    fn choose_best_scheme(
380        &self,
381        schemes: &[&'static dyn Scheme],
382        data: &ArrayAndStats,
383        compress_ctx: CompressorContext,
384        exec_ctx: &mut ExecutionCtx,
385    ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
386        let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
387        let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
388
389        // Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2.
390        {
391            let _verdict_pass = trace::verdict_pass_span().entered();
392            for &scheme in schemes {
393                match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
394                    CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
395                    CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
396                        return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
397                    }
398                    CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
399                        let score = EstimateScore::FiniteCompression(ratio);
400
401                        if is_better_score(score, best.as_ref()) {
402                            best = Some((scheme, score));
403                        }
404                    }
405                    CompressionEstimate::Deferred(deferred_estimate) => {
406                        deferred.push((scheme, deferred_estimate));
407                    }
408                }
409            }
410        }
411
412        // Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can
413        // short-circuit with `Skip` when they cannot beat it.
414        for (scheme, deferred_estimate) in deferred {
415            let _span = trace::scheme_eval_span(scheme.id()).entered();
416            let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
417            match deferred_estimate {
418                DeferredEstimate::Sample => {
419                    let score = estimate_compression_ratio_with_sampling(
420                        self,
421                        scheme,
422                        data.array(),
423                        compress_ctx.clone(),
424                        exec_ctx,
425                    )?;
426
427                    if is_better_score(score, best.as_ref()) {
428                        best = Some((scheme, score));
429                    }
430                }
431                DeferredEstimate::Callback(callback) => {
432                    match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
433                        EstimateVerdict::Skip => {}
434                        EstimateVerdict::AlwaysUse => {
435                            return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
436                        }
437                        EstimateVerdict::Ratio(ratio) => {
438                            let score = EstimateScore::FiniteCompression(ratio);
439
440                            if is_better_score(score, best.as_ref()) {
441                                best = Some((scheme, score));
442                            }
443                        }
444                    }
445                }
446            }
447        }
448
449        Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
450    }
451
452    // TODO(connor): Lots of room for optimization here.
453    /// Returns `true` if the candidate scheme should be excluded based on the cascade history and
454    /// exclusion rules.
455    fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
456        let id = candidate.id();
457        let history = ctx.cascade_history();
458
459        // Self-exclusion: no scheme appears twice in any chain.
460        if history.iter().any(|&(sid, _)| sid == id) {
461            return true;
462        }
463
464        let mut iter = history.iter().copied().peekable();
465
466        // The root entry is always first in the history (if present). Check if the root has
467        // excluded us.
468        if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
469            && self
470                .root_exclusions
471                .iter()
472                .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
473        {
474            return true;
475        }
476
477        // Push rules: Check if any of our ancestors have excluded us.
478        for (ancestor_id, child_idx) in iter {
479            if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
480                && ancestor
481                    .descendant_exclusions()
482                    .iter()
483                    .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
484            {
485                return true;
486            }
487        }
488
489        // Pull rules: Check if we have excluded ourselves because of our ancestors.
490        for rule in candidate.ancestor_exclusions() {
491            if history
492                .iter()
493                .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
494            {
495                return true;
496            }
497        }
498
499        false
500    }
501
502    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
503    fn compress_list_array(
504        &self,
505        list_array: ListArray,
506        compress_ctx: CompressorContext,
507        exec_ctx: &mut ExecutionCtx,
508    ) -> VortexResult<ArrayRef> {
509        let list_array = list_array.reset_offsets(true)?;
510
511        let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
512
513        // Record the root scheme with the offsets child index so root exclusion rules apply.
514        let offset_ctx =
515            compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
516        let list_offsets_primitive = list_array
517            .offsets()
518            .clone()
519            .execute::<PrimitiveArray>(exec_ctx)?
520            .narrow(exec_ctx)?;
521        let compressed_offsets = self.compress_canonical(
522            Canonical::Primitive(list_offsets_primitive),
523            offset_ctx,
524            exec_ctx,
525        )?;
526
527        Ok(
528            ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
529                .into_array(),
530        )
531    }
532
533    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
534    /// elements.
535    fn compress_list_view_array(
536        &self,
537        list_view: ListViewArray,
538        compress_ctx: CompressorContext,
539        exec_ctx: &mut ExecutionCtx,
540    ) -> VortexResult<ArrayRef> {
541        let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
542
543        let offset_ctx = compress_ctx
544            .clone()
545            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
546        let list_view_offsets_primitive = list_view
547            .offsets()
548            .clone()
549            .execute::<PrimitiveArray>(exec_ctx)?
550            .narrow(exec_ctx)?;
551        let compressed_offsets = self.compress_canonical(
552            Canonical::Primitive(list_view_offsets_primitive),
553            offset_ctx,
554            exec_ctx,
555        )?;
556
557        let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
558        let list_view_sizes_primitive = list_view
559            .sizes()
560            .clone()
561            .execute::<PrimitiveArray>(exec_ctx)?
562            .narrow(exec_ctx)?;
563        let compressed_sizes = self.compress_canonical(
564            Canonical::Primitive(list_view_sizes_primitive),
565            sizes_ctx,
566            exec_ctx,
567        )?;
568
569        Ok(ListViewArray::try_new(
570            compressed_elems,
571            compressed_offsets,
572            compressed_sizes,
573            list_view.validity()?,
574        )?
575        .into_array())
576    }
577
578    /// Compress very child slot of the array, then re-build it from them.
579    fn compress_physical_slots(
580        &self,
581        array: &ArrayRef,
582        exec_ctx: &mut ExecutionCtx,
583    ) -> VortexResult<ArrayRef> {
584        let slots = array
585            .slots()
586            .iter()
587            .map(|slot| {
588                slot.as_ref()
589                    .map(|child| self.compress(child, exec_ctx))
590                    .transpose()
591            })
592            .collect::<VortexResult<ArraySlots>>()?;
593
594        array.clone().with_slots(slots)
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use std::sync::LazyLock;
601
602    use parking_lot::Mutex;
603    use vortex_array::ArrayRef;
604    use vortex_array::Canonical;
605    use vortex_array::VortexSessionExecute;
606    use vortex_array::arrays::BoolArray;
607    use vortex_array::arrays::Constant;
608    use vortex_array::arrays::NullArray;
609    use vortex_array::arrays::PrimitiveArray;
610    use vortex_array::session::ArraySession;
611    use vortex_array::validity::Validity;
612    use vortex_buffer::buffer;
613    use vortex_session::VortexSession;
614
615    use super::*;
616    use crate::builtins::FloatDictScheme;
617    use crate::builtins::IntDictScheme;
618    use crate::builtins::StringDictScheme;
619    use crate::ctx::CompressorContext;
620    use crate::estimate::CompressionEstimate;
621    use crate::estimate::DeferredEstimate;
622    use crate::estimate::EstimateScore;
623    use crate::estimate::EstimateVerdict;
624    use crate::estimate::WinnerEstimate;
625    use crate::scheme::SchemeExt;
626
627    static SESSION: LazyLock<VortexSession> =
628        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
629
630    fn compressor() -> CascadingCompressor {
631        CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
632    }
633
634    fn estimate_test_data() -> ArrayAndStats {
635        let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
636        ArrayAndStats::new(array, GenerateStatsOptions::default())
637    }
638
639    fn matches_integer_primitive(canonical: &Canonical) -> bool {
640        matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
641    }
642
643    #[derive(Debug)]
644    struct DirectRatioScheme;
645
646    impl Scheme for DirectRatioScheme {
647        fn scheme_name(&self) -> &'static str {
648            "test.direct_ratio"
649        }
650
651        fn matches(&self, canonical: &Canonical) -> bool {
652            matches_integer_primitive(canonical)
653        }
654
655        fn expected_compression_ratio(
656            &self,
657            _data: &ArrayAndStats,
658            _compress_ctx: CompressorContext,
659            _exec_ctx: &mut ExecutionCtx,
660        ) -> CompressionEstimate {
661            CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
662        }
663
664        fn compress(
665            &self,
666            _compressor: &CascadingCompressor,
667            _data: &ArrayAndStats,
668            _compress_ctx: CompressorContext,
669            _exec_ctx: &mut ExecutionCtx,
670        ) -> VortexResult<ArrayRef> {
671            unreachable!("test helper should never be selected for compression")
672        }
673    }
674
675    #[derive(Debug)]
676    struct ImmediateAlwaysUseScheme;
677
678    impl Scheme for ImmediateAlwaysUseScheme {
679        fn scheme_name(&self) -> &'static str {
680            "test.immediate_always_use"
681        }
682
683        fn matches(&self, canonical: &Canonical) -> bool {
684            matches_integer_primitive(canonical)
685        }
686
687        fn expected_compression_ratio(
688            &self,
689            _data: &ArrayAndStats,
690            _compress_ctx: CompressorContext,
691            _exec_ctx: &mut ExecutionCtx,
692        ) -> CompressionEstimate {
693            CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
694        }
695
696        fn compress(
697            &self,
698            _compressor: &CascadingCompressor,
699            _data: &ArrayAndStats,
700            _compress_ctx: CompressorContext,
701            _exec_ctx: &mut ExecutionCtx,
702        ) -> VortexResult<ArrayRef> {
703            unreachable!("test helper should never be selected for compression")
704        }
705    }
706
707    #[derive(Debug)]
708    struct CallbackAlwaysUseScheme;
709
710    impl Scheme for CallbackAlwaysUseScheme {
711        fn scheme_name(&self) -> &'static str {
712            "test.callback_always_use"
713        }
714
715        fn matches(&self, canonical: &Canonical) -> bool {
716            matches_integer_primitive(canonical)
717        }
718
719        fn expected_compression_ratio(
720            &self,
721            _data: &ArrayAndStats,
722            _compress_ctx: CompressorContext,
723            _exec_ctx: &mut ExecutionCtx,
724        ) -> CompressionEstimate {
725            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
726                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
727            )))
728        }
729
730        fn compress(
731            &self,
732            _compressor: &CascadingCompressor,
733            _data: &ArrayAndStats,
734            _compress_ctx: CompressorContext,
735            _exec_ctx: &mut ExecutionCtx,
736        ) -> VortexResult<ArrayRef> {
737            unreachable!("test helper should never be selected for compression")
738        }
739    }
740
741    #[derive(Debug)]
742    struct CallbackSkipScheme;
743
744    impl Scheme for CallbackSkipScheme {
745        fn scheme_name(&self) -> &'static str {
746            "test.callback_skip"
747        }
748
749        fn matches(&self, canonical: &Canonical) -> bool {
750            matches_integer_primitive(canonical)
751        }
752
753        fn expected_compression_ratio(
754            &self,
755            _data: &ArrayAndStats,
756            _compress_ctx: CompressorContext,
757            _exec_ctx: &mut ExecutionCtx,
758        ) -> CompressionEstimate {
759            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
760                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
761            )))
762        }
763
764        fn compress(
765            &self,
766            _compressor: &CascadingCompressor,
767            _data: &ArrayAndStats,
768            _compress_ctx: CompressorContext,
769            _exec_ctx: &mut ExecutionCtx,
770        ) -> VortexResult<ArrayRef> {
771            unreachable!("test helper should never be selected for compression")
772        }
773    }
774
775    #[derive(Debug)]
776    struct CallbackRatioScheme;
777
778    impl Scheme for CallbackRatioScheme {
779        fn scheme_name(&self) -> &'static str {
780            "test.callback_ratio"
781        }
782
783        fn matches(&self, canonical: &Canonical) -> bool {
784            matches_integer_primitive(canonical)
785        }
786
787        fn expected_compression_ratio(
788            &self,
789            _data: &ArrayAndStats,
790            _compress_ctx: CompressorContext,
791            _exec_ctx: &mut ExecutionCtx,
792        ) -> CompressionEstimate {
793            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
794                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
795            )))
796        }
797
798        fn compress(
799            &self,
800            _compressor: &CascadingCompressor,
801            _data: &ArrayAndStats,
802            _compress_ctx: CompressorContext,
803            _exec_ctx: &mut ExecutionCtx,
804        ) -> VortexResult<ArrayRef> {
805            unreachable!("test helper should never be selected for compression")
806        }
807    }
808
809    #[derive(Debug)]
810    struct HugeRatioScheme;
811
812    impl Scheme for HugeRatioScheme {
813        fn scheme_name(&self) -> &'static str {
814            "test.huge_ratio"
815        }
816
817        fn matches(&self, canonical: &Canonical) -> bool {
818            matches_integer_primitive(canonical)
819        }
820
821        fn expected_compression_ratio(
822            &self,
823            _data: &ArrayAndStats,
824            _compress_ctx: CompressorContext,
825            _exec_ctx: &mut ExecutionCtx,
826        ) -> CompressionEstimate {
827            CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
828        }
829
830        fn compress(
831            &self,
832            _compressor: &CascadingCompressor,
833            _data: &ArrayAndStats,
834            _compress_ctx: CompressorContext,
835            _exec_ctx: &mut ExecutionCtx,
836        ) -> VortexResult<ArrayRef> {
837            unreachable!("test helper should never be selected for compression")
838        }
839    }
840
841    #[derive(Debug)]
842    struct ZeroBytesSamplingScheme;
843
844    impl Scheme for ZeroBytesSamplingScheme {
845        fn scheme_name(&self) -> &'static str {
846            "test.zero_bytes_sampling"
847        }
848
849        fn matches(&self, canonical: &Canonical) -> bool {
850            matches_integer_primitive(canonical)
851        }
852
853        fn expected_compression_ratio(
854            &self,
855            _data: &ArrayAndStats,
856            _compress_ctx: CompressorContext,
857            _exec_ctx: &mut ExecutionCtx,
858        ) -> CompressionEstimate {
859            CompressionEstimate::Deferred(DeferredEstimate::Sample)
860        }
861
862        fn compress(
863            &self,
864            _compressor: &CascadingCompressor,
865            data: &ArrayAndStats,
866            _compress_ctx: CompressorContext,
867            _exec_ctx: &mut ExecutionCtx,
868        ) -> VortexResult<ArrayRef> {
869            Ok(NullArray::new(data.array().len()).into_array())
870        }
871    }
872
873    #[test]
874    fn test_self_exclusion() {
875        let c = compressor();
876        let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
877
878        // IntDictScheme is in the history, so it should be excluded.
879        assert!(c.is_excluded(&IntDictScheme, &ctx));
880    }
881
882    #[test]
883    fn test_root_exclusion_list_offsets() {
884        let c = compressor();
885        let ctx = CompressorContext::default()
886            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
887
888        // IntDict should be excluded for list offsets.
889        assert!(c.is_excluded(&IntDictScheme, &ctx));
890    }
891
892    #[test]
893    fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
894        let c = compressor();
895        // FloatDict cascading through codes (child 1).
896        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
897
898        // IntDict should be excluded from FloatDict's codes child.
899        assert!(c.is_excluded(&IntDictScheme, &ctx));
900    }
901
902    #[test]
903    fn test_push_rule_float_dict_excludes_int_dict_from_values() {
904        let c = compressor();
905        // FloatDict cascading through values (child 0).
906        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
907
908        // IntDict should also be excluded from FloatDict's values child (ALP propagation
909        // replacement).
910        assert!(c.is_excluded(&IntDictScheme, &ctx));
911    }
912
913    #[test]
914    fn test_no_exclusion_without_history() {
915        let c = compressor();
916        let ctx = CompressorContext::default();
917
918        // No history means no exclusions.
919        assert!(!c.is_excluded(&IntDictScheme, &ctx));
920    }
921
922    #[test]
923    fn immediate_always_use_wins_immediately() -> VortexResult<()> {
924        let compressor =
925            CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
926        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
927        let data = estimate_test_data();
928        let mut exec_ctx = SESSION.create_execution_ctx();
929
930        let winner = compressor.choose_best_scheme(
931            &schemes,
932            &data,
933            CompressorContext::new(),
934            &mut exec_ctx,
935        )?;
936
937        assert!(matches!(
938            winner,
939            Some((scheme, WinnerEstimate::AlwaysUse))
940                if scheme.id() == ImmediateAlwaysUseScheme.id()
941        ));
942        Ok(())
943    }
944
945    #[test]
946    fn callback_always_use_wins_immediately() -> VortexResult<()> {
947        let compressor =
948            CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
949        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
950        let data = estimate_test_data();
951        let mut exec_ctx = SESSION.create_execution_ctx();
952
953        let winner = compressor.choose_best_scheme(
954            &schemes,
955            &data,
956            CompressorContext::new(),
957            &mut exec_ctx,
958        )?;
959
960        assert!(matches!(
961            winner,
962            Some((scheme, WinnerEstimate::AlwaysUse))
963                if scheme.id() == CallbackAlwaysUseScheme.id()
964        ));
965        Ok(())
966    }
967
968    #[test]
969    fn callback_skip_is_ignored() -> VortexResult<()> {
970        let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
971        let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
972        let data = estimate_test_data();
973        let mut exec_ctx = SESSION.create_execution_ctx();
974
975        let winner = compressor.choose_best_scheme(
976            &schemes,
977            &data,
978            CompressorContext::new(),
979            &mut exec_ctx,
980        )?;
981
982        assert!(matches!(
983            winner,
984            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
985                if scheme.id() == DirectRatioScheme.id()
986        ));
987        Ok(())
988    }
989
990    #[test]
991    fn callback_ratio_competes_numerically() -> VortexResult<()> {
992        let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
993        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
994        let data = estimate_test_data();
995        let mut exec_ctx = SESSION.create_execution_ctx();
996
997        let winner = compressor.choose_best_scheme(
998            &schemes,
999            &data,
1000            CompressorContext::new(),
1001            &mut exec_ctx,
1002        )?;
1003
1004        assert!(matches!(
1005            winner,
1006            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
1007                if scheme.id() == CallbackRatioScheme.id()
1008        ));
1009        Ok(())
1010    }
1011
1012    #[test]
1013    fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
1014        let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
1015        let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
1016        let data = estimate_test_data();
1017        let mut exec_ctx = SESSION.create_execution_ctx();
1018
1019        let winner = compressor.choose_best_scheme(
1020            &schemes,
1021            &data,
1022            CompressorContext::new(),
1023            &mut exec_ctx,
1024        )?;
1025
1026        assert!(matches!(
1027            winner,
1028            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1029                if scheme.id() == HugeRatioScheme.id()
1030        ));
1031        Ok(())
1032    }
1033
1034    #[test]
1035    fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
1036        let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
1037        let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
1038        let data = estimate_test_data();
1039        let mut exec_ctx = SESSION.create_execution_ctx();
1040
1041        let winner = compressor.choose_best_scheme(
1042            &schemes,
1043            &data,
1044            CompressorContext::new(),
1045            &mut exec_ctx,
1046        )?;
1047
1048        assert!(matches!(
1049            winner,
1050            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1051                if scheme.id() == HugeRatioScheme.id()
1052        ));
1053        Ok(())
1054    }
1055
1056    #[test]
1057    fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
1058        let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
1059        let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
1060        let data = estimate_test_data();
1061        let mut exec_ctx = SESSION.create_execution_ctx();
1062
1063        let winner = compressor.choose_best_scheme(
1064            &schemes,
1065            &data,
1066            CompressorContext::new(),
1067            &mut exec_ctx,
1068        )?;
1069
1070        assert!(winner.is_none());
1071        Ok(())
1072    }
1073
1074    // Observer helper used by threshold-related tests. Captures the `best_so_far` value the
1075    // compressor passes to its deferred callback. `OBSERVER_LOCK` serializes tests that share
1076    // `OBSERVED_THRESHOLD` so they do not race.
1077    static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
1078    static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
1079
1080    #[derive(Debug)]
1081    struct ThresholdObservingScheme;
1082
1083    impl Scheme for ThresholdObservingScheme {
1084        fn scheme_name(&self) -> &'static str {
1085            "test.threshold_observing"
1086        }
1087
1088        fn matches(&self, canonical: &Canonical) -> bool {
1089            matches_integer_primitive(canonical)
1090        }
1091
1092        fn expected_compression_ratio(
1093            &self,
1094            _data: &ArrayAndStats,
1095            _compress_ctx: CompressorContext,
1096            _exec_ctx: &mut ExecutionCtx,
1097        ) -> CompressionEstimate {
1098            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1099                |_compressor, _data, best_so_far, _ctx, _exec_ctx| {
1100                    *OBSERVED_THRESHOLD.lock() = Some(best_so_far);
1101                    Ok(EstimateVerdict::Skip)
1102                },
1103            )))
1104        }
1105
1106        fn compress(
1107            &self,
1108            _compressor: &CascadingCompressor,
1109            _data: &ArrayAndStats,
1110            _compress_ctx: CompressorContext,
1111            _exec_ctx: &mut ExecutionCtx,
1112        ) -> VortexResult<ArrayRef> {
1113            unreachable!("test helper should never be selected for compression")
1114        }
1115    }
1116
1117    #[derive(Debug)]
1118    struct CallbackMatchingRatioScheme;
1119
1120    impl Scheme for CallbackMatchingRatioScheme {
1121        fn scheme_name(&self) -> &'static str {
1122            "test.callback_matching_ratio"
1123        }
1124
1125        fn matches(&self, canonical: &Canonical) -> bool {
1126            matches_integer_primitive(canonical)
1127        }
1128
1129        fn expected_compression_ratio(
1130            &self,
1131            _data: &ArrayAndStats,
1132            _compress_ctx: CompressorContext,
1133            _exec_ctx: &mut ExecutionCtx,
1134        ) -> CompressionEstimate {
1135            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1136                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
1137            )))
1138        }
1139
1140        fn compress(
1141            &self,
1142            _compressor: &CascadingCompressor,
1143            _data: &ArrayAndStats,
1144            _compress_ctx: CompressorContext,
1145            _exec_ctx: &mut ExecutionCtx,
1146        ) -> VortexResult<ArrayRef> {
1147            unreachable!("test helper should never be selected for compression")
1148        }
1149    }
1150
1151    #[test]
1152    fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
1153        // `HugeRatioScheme` returns an immediate `Ratio(100.0)` in pass 1;
1154        // `CallbackAlwaysUseScheme` returns `AlwaysUse` from its deferred callback in pass 2.
1155        // The deferred `AlwaysUse` must still win.
1156        let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
1157        let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
1158        let data = estimate_test_data();
1159        let mut exec_ctx = SESSION.create_execution_ctx();
1160
1161        let winner = compressor.choose_best_scheme(
1162            &schemes,
1163            &data,
1164            CompressorContext::new(),
1165            &mut exec_ctx,
1166        )?;
1167
1168        assert!(matches!(
1169            winner,
1170            Some((scheme, WinnerEstimate::AlwaysUse))
1171                if scheme.id() == CallbackAlwaysUseScheme.id()
1172        ));
1173        Ok(())
1174    }
1175
1176    #[test]
1177    fn threshold_reflects_pass_one_best() -> VortexResult<()> {
1178        let _guard = OBSERVER_LOCK.lock();
1179        *OBSERVED_THRESHOLD.lock() = None;
1180
1181        let compressor =
1182            CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
1183        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
1184        let data = estimate_test_data();
1185        let mut exec_ctx = SESSION.create_execution_ctx();
1186
1187        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1188
1189        let observed = *OBSERVED_THRESHOLD.lock();
1190        assert!(matches!(
1191            observed,
1192            Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
1193        ));
1194        Ok(())
1195    }
1196
1197    #[test]
1198    fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
1199        let _guard = OBSERVER_LOCK.lock();
1200        *OBSERVED_THRESHOLD.lock() = None;
1201
1202        let compressor =
1203            CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
1204        let schemes: [&'static dyn Scheme; 2] =
1205            [&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
1206        let data = estimate_test_data();
1207        let mut exec_ctx = SESSION.create_execution_ctx();
1208
1209        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1210
1211        // The observing callback was invoked (outer `Some`) and `best_so_far` was `None` (inner
1212        // `None`) because the zero-byte sample is never stored as the best.
1213        let observed = *OBSERVED_THRESHOLD.lock();
1214        assert_eq!(observed, Some(None));
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
1220        let _guard = OBSERVER_LOCK.lock();
1221        *OBSERVED_THRESHOLD.lock() = None;
1222
1223        let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
1224        let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
1225        let data = estimate_test_data();
1226        let mut exec_ctx = SESSION.create_execution_ctx();
1227
1228        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1229
1230        let observed = *OBSERVED_THRESHOLD.lock();
1231        assert_eq!(observed, Some(None));
1232        Ok(())
1233    }
1234
1235    #[test]
1236    fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
1237        let _guard = OBSERVER_LOCK.lock();
1238        *OBSERVED_THRESHOLD.lock() = None;
1239
1240        // Both schemes are deferred. The first callback registers `Ratio(3.0)`; the second
1241        // callback must observe it as its threshold.
1242        let compressor =
1243            CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
1244        let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
1245        let data = estimate_test_data();
1246        let mut exec_ctx = SESSION.create_execution_ctx();
1247
1248        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1249
1250        let observed = *OBSERVED_THRESHOLD.lock();
1251        assert!(matches!(
1252            observed,
1253            Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
1254        ));
1255        Ok(())
1256    }
1257
1258    #[test]
1259    fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
1260        // Both schemes produce the same `Ratio(2.0)`, one from pass 1 (immediate) and one from
1261        // pass 2 (deferred callback). Pass 1 locks in first, and strict `>` tie-breaking means
1262        // the deferred callback's equal ratio cannot displace it.
1263        let compressor =
1264            CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
1265        let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
1266        let data = estimate_test_data();
1267        let mut exec_ctx = SESSION.create_execution_ctx();
1268
1269        let winner = compressor.choose_best_scheme(
1270            &schemes,
1271            &data,
1272            CompressorContext::new(),
1273            &mut exec_ctx,
1274        )?;
1275
1276        assert!(matches!(
1277            winner,
1278            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
1279                if scheme.id() == DirectRatioScheme.id() && r == 2.0
1280        ));
1281        Ok(())
1282    }
1283
1284    #[test]
1285    fn all_null_array_compresses_to_constant() -> VortexResult<()> {
1286        let array = PrimitiveArray::new(
1287            buffer![0i32, 0, 0, 0, 0],
1288            Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
1289        )
1290        .into_array();
1291
1292        // The compressor should produce a `ConstantArray` for an all-null array regardless of
1293        // which schemes are registered.
1294        let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
1295        let mut exec_ctx = SESSION.create_execution_ctx();
1296        let compressed = compressor.compress(&array, &mut exec_ctx)?;
1297        assert!(compressed.is::<Constant>());
1298        Ok(())
1299    }
1300
1301    /// Regression test for <https://github.com/vortex-data/vortex/issues/7227>.
1302    ///
1303    /// `estimate_compression_ratio_with_sampling` must use the *scheme's* stats options
1304    /// (which request distinct-value counting) rather than the context's stats options
1305    /// (which may not). With the old code this panicked inside `dictionary_encode` because
1306    /// distinct values were never computed for the sample.
1307    #[test]
1308    fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
1309        // Low-cardinality float array so FloatDictScheme considers it compressible.
1310        let array = PrimitiveArray::new(
1311            buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
1312            Validity::NonNullable,
1313        )
1314        .into_array();
1315
1316        let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
1317
1318        // A context with default stats_options (count_distinct_values = false) and
1319        // marked as a sample so the function skips the sampling step and compresses
1320        // the array directly.
1321        let ctx = CompressorContext::new().with_sampling();
1322
1323        // Before the fix this panicked with:
1324        //   "this must be present since `DictScheme` declared that we need distinct values"
1325        let mut exec_ctx = SESSION.create_execution_ctx();
1326        let score = estimate_compression_ratio_with_sampling(
1327            &compressor,
1328            &FloatDictScheme,
1329            &array,
1330            ctx,
1331            &mut exec_ctx,
1332        )?;
1333        assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
1334        Ok(())
1335    }
1336}