Skip to main content

vortex_compressor/
compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Cascading array compression implementation.
5
6use vortex_array::ArrayRef;
7use vortex_array::ArraySlots;
8use vortex_array::Canonical;
9use vortex_array::CanonicalValidity;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::ExtensionArray;
14use vortex_array::arrays::FixedSizeListArray;
15use vortex_array::arrays::ListArray;
16use vortex_array::arrays::ListViewArray;
17use vortex_array::arrays::PrimitiveArray;
18use vortex_array::arrays::StructArray;
19use vortex_array::arrays::Variant;
20use vortex_array::arrays::VariantArray;
21use vortex_array::arrays::extension::ExtensionArrayExt;
22use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
23use vortex_array::arrays::list::ListArrayExt;
24use vortex_array::arrays::listview::ListViewArrayExt;
25use vortex_array::arrays::listview::list_from_list_view;
26use vortex_array::arrays::primitive::PrimitiveArrayExt;
27use vortex_array::arrays::scalar_fn::AnyScalarFn;
28use vortex_array::arrays::struct_::StructArrayExt;
29use vortex_array::arrays::variant::VariantArrayExt;
30use vortex_array::scalar::Scalar;
31use vortex_error::VortexResult;
32
33use crate::builtins::IntDictScheme;
34use crate::ctx::CompressorContext;
35use crate::estimate::CompressionEstimate;
36use crate::estimate::DeferredEstimate;
37use crate::estimate::EstimateScore;
38use crate::estimate::EstimateVerdict;
39use crate::estimate::WinnerEstimate;
40use crate::estimate::estimate_compression_ratio_with_sampling;
41use crate::estimate::is_better_score;
42use crate::scheme::ChildSelection;
43use crate::scheme::DescendantExclusion;
44use crate::scheme::Scheme;
45use crate::scheme::SchemeExt;
46use crate::scheme::SchemeId;
47use crate::stats::ArrayAndStats;
48use crate::stats::GenerateStatsOptions;
49use crate::trace;
50
51/// Synthetic scheme ID used for the compressor's own root-level cascading.
52pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
53    name: "vortex.compressor.root",
54};
55
56/// Child indices for the compressor's list/listview compression.
57mod root_list_children {
58    /// List/ListView offsets child.
59    pub const OFFSETS: usize = 1;
60    /// ListView sizes child.
61    pub const SIZES: usize = 2;
62}
63
64/// The main compressor type implementing cascading adaptive compression.
65///
66/// This compressor applies adaptive compression [`Scheme`]s to arrays based on their data types and
67/// characteristics. It recursively compresses nested structures like structs and lists, and chooses
68/// optimal compression schemes for leaf types.
69///
70/// The compressor works by:
71/// 1. Canonicalizing input arrays to a standard representation.
72/// 2. Pre-filtering schemes by [`Scheme::matches`] and exclusion rules.
73/// 3. Evaluating each matching scheme's compression estimate and resolving deferred work.
74/// 4. Compressing with the best scheme and verifying the result is smaller.
75///
76/// No scheme may appear twice in a cascade chain. The compressor enforces this automatically
77/// along with push/pull exclusion rules declared by each scheme.
78#[derive(Debug, Clone)]
79pub struct CascadingCompressor {
80    /// The enabled compression schemes.
81    schemes: Vec<&'static dyn Scheme>,
82
83    /// Descendant exclusion rules for the compressor's own cascading (e.g. excluding Dict from
84    /// list offsets).
85    root_exclusions: Vec<DescendantExclusion>,
86}
87
88impl CascadingCompressor {
89    /// Creates a new compressor with the given schemes.
90    ///
91    /// Root-level exclusion rules (e.g. excluding Dict from list offsets) are built
92    /// automatically.
93    pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
94        // Root exclusion: exclude IntDict from list/listview offsets (monotonically
95        // increasing data where dictionary encoding is wasteful).
96        let root_exclusions = vec![DescendantExclusion {
97            excluded: IntDictScheme.id(),
98            children: ChildSelection::One(root_list_children::OFFSETS),
99        }];
100        Self {
101            schemes,
102            root_exclusions,
103        }
104    }
105
106    /// Compresses an array using cascading adaptive compression.
107    ///
108    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if canonicalization or compression fails.
113    pub fn compress(
114        &self,
115        array: &ArrayRef,
116        exec_ctx: &mut ExecutionCtx,
117    ) -> VortexResult<ArrayRef> {
118        let before_nbytes = array.nbytes();
119        let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
120        let _enter = span.enter();
121
122        let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
123        let compact = canonical.compact()?;
124        let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
125
126        trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
127
128        Ok(compressed)
129    }
130
131    /// Compresses a child array produced by a cascading scheme.
132    ///
133    /// If the cascade budget is exhausted, the canonical array is returned as-is. Otherwise, the
134    /// child context is created by descending and recording the parent scheme + child index, and
135    /// compression proceeds normally.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if compression fails.
140    pub fn compress_child(
141        &self,
142        child: &ArrayRef,
143        parent_ctx: &CompressorContext,
144        parent_id: SchemeId,
145        child_index: usize,
146        exec_ctx: &mut ExecutionCtx,
147    ) -> VortexResult<ArrayRef> {
148        if parent_ctx.finished_cascading() {
149            trace::cascade_exhausted(parent_id, child_index);
150            return Ok(child.clone());
151        }
152
153        let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
154        let compact = canonical.compact()?;
155
156        let child_ctx = parent_ctx
157            .clone()
158            .descend_with_scheme(parent_id, child_index);
159        self.compress_canonical(compact, child_ctx, exec_ctx)
160    }
161
162    /// Compresses a canonical array by dispatching to type-specific logic.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if compression of any sub-array fails.
167    fn compress_canonical(
168        &self,
169        array: Canonical,
170        compress_ctx: CompressorContext,
171        exec_ctx: &mut ExecutionCtx,
172    ) -> VortexResult<ArrayRef> {
173        match array {
174            Canonical::Null(null_array) => Ok(null_array.into_array()),
175            Canonical::Bool(bool_array) => {
176                self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
177            }
178            Canonical::Primitive(primitive) => {
179                self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
180            }
181            Canonical::Decimal(decimal) => {
182                self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
183            }
184            Canonical::Struct(struct_array) => {
185                let fields = struct_array
186                    .iter_unmasked_fields()
187                    .map(|field| self.compress(field, exec_ctx))
188                    .collect::<Result<Vec<_>, _>>()?;
189
190                Ok(StructArray::try_new(
191                    struct_array.names().clone(),
192                    fields,
193                    struct_array.len(),
194                    struct_array.validity()?,
195                )?
196                .into_array())
197            }
198            Canonical::List(list_view_array) => {
199                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
200                    let list_array = list_from_list_view(list_view_array)?;
201                    self.compress_list_array(list_array, compress_ctx, exec_ctx)
202                } else {
203                    self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
204                }
205            }
206            Canonical::FixedSizeList(fsl_array) => {
207                let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
208
209                Ok(FixedSizeListArray::try_new(
210                    compressed_elems,
211                    fsl_array.list_size(),
212                    fsl_array.validity()?,
213                    fsl_array.len(),
214                )?
215                .into_array())
216            }
217            Canonical::VarBinView(varbinview) => {
218                self.choose_and_compress(Canonical::VarBinView(varbinview), compress_ctx, exec_ctx)
219            }
220            Canonical::Extension(ext_array) => {
221                // Try scheme-based compression first.
222                let scheme_compressed = self.choose_and_compress(
223                    Canonical::Extension(ext_array.clone()),
224                    compress_ctx,
225                    exec_ctx,
226                )?;
227                // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
228                if scheme_compressed.is::<AnyScalarFn>() {
229                    return Ok(scheme_compressed);
230                }
231
232                // Also compress the underlying storage array. Some extension schemes can beat the
233                // extension storage but still lose to ordinary storage compression.
234                let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
235                let storage_compressed =
236                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
237                        .into_array();
238
239                if scheme_compressed.nbytes() < storage_compressed.nbytes() {
240                    Ok(scheme_compressed)
241                } else {
242                    Ok(storage_compressed)
243                }
244            }
245            Canonical::Variant(variant_array) => {
246                let core_storage =
247                    self.compress_physical_slots(variant_array.core_storage(), exec_ctx)?;
248                let shredded = variant_array
249                    .shredded()
250                    .map(|arr| {
251                        // Avoid stack-overflow for variant shredded values
252                        if arr.is::<Variant>() {
253                            self.compress_physical_slots(arr, exec_ctx)
254                        } else {
255                            self.compress(arr, exec_ctx)
256                        }
257                    })
258                    .transpose()?;
259
260                Ok(VariantArray::try_new(core_storage, shredded)?.into_array())
261            }
262        }
263    }
264
265    /// The main scheme-selection entry point for a single leaf array.
266    ///
267    /// Filters allowed schemes by [`matches`] and exclusion rules, merges their [`stats_options`]
268    /// into a single [`GenerateStatsOptions`], and picks the winner by estimated compression
269    /// ratio.
270    ///
271    /// If a winner is found and its compressed output is actually smaller, that output is
272    /// returned. Otherwise, the original array is returned unchanged.
273    ///
274    /// Empty and all-null arrays are short-circuited before any scheme evaluation.
275    ///
276    /// [`matches`]: Scheme::matches
277    /// [`stats_options`]: Scheme::stats_options
278    fn choose_and_compress(
279        &self,
280        canonical: Canonical,
281        compress_ctx: CompressorContext,
282        exec_ctx: &mut ExecutionCtx,
283    ) -> VortexResult<ArrayRef> {
284        let eligible_schemes: Vec<&'static dyn Scheme> = self
285            .schemes
286            .iter()
287            .copied()
288            .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
289            .collect();
290
291        let array: ArrayRef = canonical.into();
292
293        if eligible_schemes.is_empty() || array.is_empty() {
294            return Ok(array);
295        }
296
297        if array.all_invalid(exec_ctx)? {
298            return Ok(
299                ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
300            );
301        }
302
303        let before_nbytes = array.nbytes();
304
305        let merged_opts = eligible_schemes
306            .iter()
307            .fold(GenerateStatsOptions::default(), |acc, s| {
308                acc.merge(s.stats_options())
309            });
310        let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
311
312        let data = ArrayAndStats::new(array, merged_opts);
313
314        let Some((winner, winner_estimate)) =
315            self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
316        else {
317            return Ok(data.into_array());
318        };
319
320        // Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the
321        // scheme name and cascade history before propagating.
322        let error_ctx = trace::enabled_error_context(&compress_ctx);
323        let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
324        let compressed = winner
325            .compress(self, &data, compress_ctx, exec_ctx)
326            .inspect_err(|err| {
327                // NB: this is the only way we can tell which scheme panicked / bailed on their
328                // data, especially for third-party schemes where the error site may not carry any
329                // compressor context.
330                trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
331            })?;
332
333        let after_nbytes = compressed.nbytes();
334        let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
335
336        // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
337        let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
338
339        trace::record_winner_compress_result(
340            after_nbytes,
341            winner_estimate.trace_ratio(),
342            actual_ratio,
343            accepted,
344        );
345
346        if accepted {
347            Ok(compressed)
348        } else {
349            Ok(data.into_array())
350        }
351    }
352
353    /// Calls [`expected_compression_ratio`] on each candidate and returns the winning scheme along
354    /// with its resolved winner estimate, or `None` if no scheme beats the canonical encoding.
355    ///
356    /// Selection runs in two passes. Pass 1 evaluates every immediate
357    /// [`CompressionEstimate::Verdict`] and tracks the running best. [`Scheme`]s returning
358    /// [`CompressionEstimate::Deferred`] are stashed for pass 2 so that we do not make any
359    /// expensive computations if we don't have to.
360    ///
361    /// Pass 2 evaluates the deferred work and, for each [`DeferredEstimate::Callback`], passes the
362    /// current best [`EstimateScore`] as an early-exit hint so the callback can return
363    /// [`EstimateVerdict::Skip`] without doing expensive work when it cannot beat the threshold.
364    ///
365    /// Ties are broken by registration order within each pass.
366    ///
367    /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio
368    fn choose_best_scheme(
369        &self,
370        schemes: &[&'static dyn Scheme],
371        data: &ArrayAndStats,
372        compress_ctx: CompressorContext,
373        exec_ctx: &mut ExecutionCtx,
374    ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
375        let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
376        let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
377
378        // Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2.
379        {
380            let _verdict_pass = trace::verdict_pass_span().entered();
381            for &scheme in schemes {
382                match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
383                    CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
384                    CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
385                        return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
386                    }
387                    CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
388                        let score = EstimateScore::FiniteCompression(ratio);
389
390                        if is_better_score(score, best.as_ref()) {
391                            best = Some((scheme, score));
392                        }
393                    }
394                    CompressionEstimate::Deferred(deferred_estimate) => {
395                        deferred.push((scheme, deferred_estimate));
396                    }
397                }
398            }
399        }
400
401        // Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can
402        // short-circuit with `Skip` when they cannot beat it.
403        for (scheme, deferred_estimate) in deferred {
404            let _span = trace::scheme_eval_span(scheme.id()).entered();
405            let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
406            match deferred_estimate {
407                DeferredEstimate::Sample => {
408                    let score = estimate_compression_ratio_with_sampling(
409                        self,
410                        scheme,
411                        data.array(),
412                        compress_ctx.clone(),
413                        exec_ctx,
414                    )?;
415
416                    if is_better_score(score, best.as_ref()) {
417                        best = Some((scheme, score));
418                    }
419                }
420                DeferredEstimate::Callback(callback) => {
421                    match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
422                        EstimateVerdict::Skip => {}
423                        EstimateVerdict::AlwaysUse => {
424                            return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
425                        }
426                        EstimateVerdict::Ratio(ratio) => {
427                            let score = EstimateScore::FiniteCompression(ratio);
428
429                            if is_better_score(score, best.as_ref()) {
430                                best = Some((scheme, score));
431                            }
432                        }
433                    }
434                }
435            }
436        }
437
438        Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
439    }
440
441    // TODO(connor): Lots of room for optimization here.
442    /// Returns `true` if the candidate scheme should be excluded based on the cascade history and
443    /// exclusion rules.
444    fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
445        let id = candidate.id();
446        let history = ctx.cascade_history();
447
448        // Self-exclusion: no scheme appears twice in any chain.
449        if history.iter().any(|&(sid, _)| sid == id) {
450            return true;
451        }
452
453        let mut iter = history.iter().copied().peekable();
454
455        // The root entry is always first in the history (if present). Check if the root has
456        // excluded us.
457        if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
458            && self
459                .root_exclusions
460                .iter()
461                .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
462        {
463            return true;
464        }
465
466        // Push rules: Check if any of our ancestors have excluded us.
467        for (ancestor_id, child_idx) in iter {
468            if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
469                && ancestor
470                    .descendant_exclusions()
471                    .iter()
472                    .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
473            {
474                return true;
475            }
476        }
477
478        // Pull rules: Check if we have excluded ourselves because of our ancestors.
479        for rule in candidate.ancestor_exclusions() {
480            if history
481                .iter()
482                .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
483            {
484                return true;
485            }
486        }
487
488        false
489    }
490
491    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
492    fn compress_list_array(
493        &self,
494        list_array: ListArray,
495        compress_ctx: CompressorContext,
496        exec_ctx: &mut ExecutionCtx,
497    ) -> VortexResult<ArrayRef> {
498        let list_array = list_array.reset_offsets(true)?;
499
500        let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
501
502        // Record the root scheme with the offsets child index so root exclusion rules apply.
503        let offset_ctx =
504            compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
505        let list_offsets_primitive = list_array
506            .offsets()
507            .clone()
508            .execute::<PrimitiveArray>(exec_ctx)?
509            .narrow(exec_ctx)?;
510        let compressed_offsets = self.compress_canonical(
511            Canonical::Primitive(list_offsets_primitive),
512            offset_ctx,
513            exec_ctx,
514        )?;
515
516        Ok(
517            ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
518                .into_array(),
519        )
520    }
521
522    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
523    /// elements.
524    fn compress_list_view_array(
525        &self,
526        list_view: ListViewArray,
527        compress_ctx: CompressorContext,
528        exec_ctx: &mut ExecutionCtx,
529    ) -> VortexResult<ArrayRef> {
530        let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
531
532        let offset_ctx = compress_ctx
533            .clone()
534            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
535        let list_view_offsets_primitive = list_view
536            .offsets()
537            .clone()
538            .execute::<PrimitiveArray>(exec_ctx)?
539            .narrow(exec_ctx)?;
540        let compressed_offsets = self.compress_canonical(
541            Canonical::Primitive(list_view_offsets_primitive),
542            offset_ctx,
543            exec_ctx,
544        )?;
545
546        let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
547        let list_view_sizes_primitive = list_view
548            .sizes()
549            .clone()
550            .execute::<PrimitiveArray>(exec_ctx)?
551            .narrow(exec_ctx)?;
552        let compressed_sizes = self.compress_canonical(
553            Canonical::Primitive(list_view_sizes_primitive),
554            sizes_ctx,
555            exec_ctx,
556        )?;
557
558        Ok(ListViewArray::try_new(
559            compressed_elems,
560            compressed_offsets,
561            compressed_sizes,
562            list_view.validity()?,
563        )?
564        .into_array())
565    }
566
567    /// Compress very child slot of the array, then re-build it from them.
568    fn compress_physical_slots(
569        &self,
570        array: &ArrayRef,
571        exec_ctx: &mut ExecutionCtx,
572    ) -> VortexResult<ArrayRef> {
573        let slots = array
574            .slots()
575            .iter()
576            .map(|slot| {
577                slot.as_ref()
578                    .map(|child| self.compress(child, exec_ctx))
579                    .transpose()
580            })
581            .collect::<VortexResult<ArraySlots>>()?;
582
583        array.clone().with_slots(slots)
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use std::sync::LazyLock;
590
591    use parking_lot::Mutex;
592    use vortex_array::ArrayRef;
593    use vortex_array::Canonical;
594    use vortex_array::VortexSessionExecute;
595    use vortex_array::arrays::BoolArray;
596    use vortex_array::arrays::Constant;
597    use vortex_array::arrays::NullArray;
598    use vortex_array::arrays::PrimitiveArray;
599    use vortex_array::session::ArraySession;
600    use vortex_array::validity::Validity;
601    use vortex_buffer::buffer;
602    use vortex_session::VortexSession;
603
604    use super::*;
605    use crate::builtins::FloatDictScheme;
606    use crate::builtins::IntDictScheme;
607    use crate::builtins::StringDictScheme;
608    use crate::ctx::CompressorContext;
609    use crate::estimate::CompressionEstimate;
610    use crate::estimate::DeferredEstimate;
611    use crate::estimate::EstimateScore;
612    use crate::estimate::EstimateVerdict;
613    use crate::estimate::WinnerEstimate;
614    use crate::scheme::SchemeExt;
615
616    static SESSION: LazyLock<VortexSession> =
617        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
618
619    fn compressor() -> CascadingCompressor {
620        CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
621    }
622
623    fn estimate_test_data() -> ArrayAndStats {
624        let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
625        ArrayAndStats::new(array, GenerateStatsOptions::default())
626    }
627
628    fn matches_integer_primitive(canonical: &Canonical) -> bool {
629        matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
630    }
631
632    #[derive(Debug)]
633    struct DirectRatioScheme;
634
635    impl Scheme for DirectRatioScheme {
636        fn scheme_name(&self) -> &'static str {
637            "test.direct_ratio"
638        }
639
640        fn matches(&self, canonical: &Canonical) -> bool {
641            matches_integer_primitive(canonical)
642        }
643
644        fn expected_compression_ratio(
645            &self,
646            _data: &ArrayAndStats,
647            _compress_ctx: CompressorContext,
648            _exec_ctx: &mut ExecutionCtx,
649        ) -> CompressionEstimate {
650            CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
651        }
652
653        fn compress(
654            &self,
655            _compressor: &CascadingCompressor,
656            _data: &ArrayAndStats,
657            _compress_ctx: CompressorContext,
658            _exec_ctx: &mut ExecutionCtx,
659        ) -> VortexResult<ArrayRef> {
660            unreachable!("test helper should never be selected for compression")
661        }
662    }
663
664    #[derive(Debug)]
665    struct ImmediateAlwaysUseScheme;
666
667    impl Scheme for ImmediateAlwaysUseScheme {
668        fn scheme_name(&self) -> &'static str {
669            "test.immediate_always_use"
670        }
671
672        fn matches(&self, canonical: &Canonical) -> bool {
673            matches_integer_primitive(canonical)
674        }
675
676        fn expected_compression_ratio(
677            &self,
678            _data: &ArrayAndStats,
679            _compress_ctx: CompressorContext,
680            _exec_ctx: &mut ExecutionCtx,
681        ) -> CompressionEstimate {
682            CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
683        }
684
685        fn compress(
686            &self,
687            _compressor: &CascadingCompressor,
688            _data: &ArrayAndStats,
689            _compress_ctx: CompressorContext,
690            _exec_ctx: &mut ExecutionCtx,
691        ) -> VortexResult<ArrayRef> {
692            unreachable!("test helper should never be selected for compression")
693        }
694    }
695
696    #[derive(Debug)]
697    struct CallbackAlwaysUseScheme;
698
699    impl Scheme for CallbackAlwaysUseScheme {
700        fn scheme_name(&self) -> &'static str {
701            "test.callback_always_use"
702        }
703
704        fn matches(&self, canonical: &Canonical) -> bool {
705            matches_integer_primitive(canonical)
706        }
707
708        fn expected_compression_ratio(
709            &self,
710            _data: &ArrayAndStats,
711            _compress_ctx: CompressorContext,
712            _exec_ctx: &mut ExecutionCtx,
713        ) -> CompressionEstimate {
714            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
715                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
716            )))
717        }
718
719        fn compress(
720            &self,
721            _compressor: &CascadingCompressor,
722            _data: &ArrayAndStats,
723            _compress_ctx: CompressorContext,
724            _exec_ctx: &mut ExecutionCtx,
725        ) -> VortexResult<ArrayRef> {
726            unreachable!("test helper should never be selected for compression")
727        }
728    }
729
730    #[derive(Debug)]
731    struct CallbackSkipScheme;
732
733    impl Scheme for CallbackSkipScheme {
734        fn scheme_name(&self) -> &'static str {
735            "test.callback_skip"
736        }
737
738        fn matches(&self, canonical: &Canonical) -> bool {
739            matches_integer_primitive(canonical)
740        }
741
742        fn expected_compression_ratio(
743            &self,
744            _data: &ArrayAndStats,
745            _compress_ctx: CompressorContext,
746            _exec_ctx: &mut ExecutionCtx,
747        ) -> CompressionEstimate {
748            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
749                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
750            )))
751        }
752
753        fn compress(
754            &self,
755            _compressor: &CascadingCompressor,
756            _data: &ArrayAndStats,
757            _compress_ctx: CompressorContext,
758            _exec_ctx: &mut ExecutionCtx,
759        ) -> VortexResult<ArrayRef> {
760            unreachable!("test helper should never be selected for compression")
761        }
762    }
763
764    #[derive(Debug)]
765    struct CallbackRatioScheme;
766
767    impl Scheme for CallbackRatioScheme {
768        fn scheme_name(&self) -> &'static str {
769            "test.callback_ratio"
770        }
771
772        fn matches(&self, canonical: &Canonical) -> bool {
773            matches_integer_primitive(canonical)
774        }
775
776        fn expected_compression_ratio(
777            &self,
778            _data: &ArrayAndStats,
779            _compress_ctx: CompressorContext,
780            _exec_ctx: &mut ExecutionCtx,
781        ) -> CompressionEstimate {
782            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
783                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
784            )))
785        }
786
787        fn compress(
788            &self,
789            _compressor: &CascadingCompressor,
790            _data: &ArrayAndStats,
791            _compress_ctx: CompressorContext,
792            _exec_ctx: &mut ExecutionCtx,
793        ) -> VortexResult<ArrayRef> {
794            unreachable!("test helper should never be selected for compression")
795        }
796    }
797
798    #[derive(Debug)]
799    struct HugeRatioScheme;
800
801    impl Scheme for HugeRatioScheme {
802        fn scheme_name(&self) -> &'static str {
803            "test.huge_ratio"
804        }
805
806        fn matches(&self, canonical: &Canonical) -> bool {
807            matches_integer_primitive(canonical)
808        }
809
810        fn expected_compression_ratio(
811            &self,
812            _data: &ArrayAndStats,
813            _compress_ctx: CompressorContext,
814            _exec_ctx: &mut ExecutionCtx,
815        ) -> CompressionEstimate {
816            CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
817        }
818
819        fn compress(
820            &self,
821            _compressor: &CascadingCompressor,
822            _data: &ArrayAndStats,
823            _compress_ctx: CompressorContext,
824            _exec_ctx: &mut ExecutionCtx,
825        ) -> VortexResult<ArrayRef> {
826            unreachable!("test helper should never be selected for compression")
827        }
828    }
829
830    #[derive(Debug)]
831    struct ZeroBytesSamplingScheme;
832
833    impl Scheme for ZeroBytesSamplingScheme {
834        fn scheme_name(&self) -> &'static str {
835            "test.zero_bytes_sampling"
836        }
837
838        fn matches(&self, canonical: &Canonical) -> bool {
839            matches_integer_primitive(canonical)
840        }
841
842        fn expected_compression_ratio(
843            &self,
844            _data: &ArrayAndStats,
845            _compress_ctx: CompressorContext,
846            _exec_ctx: &mut ExecutionCtx,
847        ) -> CompressionEstimate {
848            CompressionEstimate::Deferred(DeferredEstimate::Sample)
849        }
850
851        fn compress(
852            &self,
853            _compressor: &CascadingCompressor,
854            data: &ArrayAndStats,
855            _compress_ctx: CompressorContext,
856            _exec_ctx: &mut ExecutionCtx,
857        ) -> VortexResult<ArrayRef> {
858            Ok(NullArray::new(data.array().len()).into_array())
859        }
860    }
861
862    #[test]
863    fn test_self_exclusion() {
864        let c = compressor();
865        let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
866
867        // IntDictScheme is in the history, so it should be excluded.
868        assert!(c.is_excluded(&IntDictScheme, &ctx));
869    }
870
871    #[test]
872    fn test_root_exclusion_list_offsets() {
873        let c = compressor();
874        let ctx = CompressorContext::default()
875            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
876
877        // IntDict should be excluded for list offsets.
878        assert!(c.is_excluded(&IntDictScheme, &ctx));
879    }
880
881    #[test]
882    fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
883        let c = compressor();
884        // FloatDict cascading through codes (child 1).
885        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
886
887        // IntDict should be excluded from FloatDict's codes child.
888        assert!(c.is_excluded(&IntDictScheme, &ctx));
889    }
890
891    #[test]
892    fn test_push_rule_float_dict_excludes_int_dict_from_values() {
893        let c = compressor();
894        // FloatDict cascading through values (child 0).
895        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
896
897        // IntDict should also be excluded from FloatDict's values child (ALP propagation
898        // replacement).
899        assert!(c.is_excluded(&IntDictScheme, &ctx));
900    }
901
902    #[test]
903    fn test_no_exclusion_without_history() {
904        let c = compressor();
905        let ctx = CompressorContext::default();
906
907        // No history means no exclusions.
908        assert!(!c.is_excluded(&IntDictScheme, &ctx));
909    }
910
911    #[test]
912    fn immediate_always_use_wins_immediately() -> VortexResult<()> {
913        let compressor =
914            CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
915        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
916        let data = estimate_test_data();
917        let mut exec_ctx = SESSION.create_execution_ctx();
918
919        let winner = compressor.choose_best_scheme(
920            &schemes,
921            &data,
922            CompressorContext::new(),
923            &mut exec_ctx,
924        )?;
925
926        assert!(matches!(
927            winner,
928            Some((scheme, WinnerEstimate::AlwaysUse))
929                if scheme.id() == ImmediateAlwaysUseScheme.id()
930        ));
931        Ok(())
932    }
933
934    #[test]
935    fn callback_always_use_wins_immediately() -> VortexResult<()> {
936        let compressor =
937            CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
938        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
939        let data = estimate_test_data();
940        let mut exec_ctx = SESSION.create_execution_ctx();
941
942        let winner = compressor.choose_best_scheme(
943            &schemes,
944            &data,
945            CompressorContext::new(),
946            &mut exec_ctx,
947        )?;
948
949        assert!(matches!(
950            winner,
951            Some((scheme, WinnerEstimate::AlwaysUse))
952                if scheme.id() == CallbackAlwaysUseScheme.id()
953        ));
954        Ok(())
955    }
956
957    #[test]
958    fn callback_skip_is_ignored() -> VortexResult<()> {
959        let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
960        let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
961        let data = estimate_test_data();
962        let mut exec_ctx = SESSION.create_execution_ctx();
963
964        let winner = compressor.choose_best_scheme(
965            &schemes,
966            &data,
967            CompressorContext::new(),
968            &mut exec_ctx,
969        )?;
970
971        assert!(matches!(
972            winner,
973            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
974                if scheme.id() == DirectRatioScheme.id()
975        ));
976        Ok(())
977    }
978
979    #[test]
980    fn callback_ratio_competes_numerically() -> VortexResult<()> {
981        let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
982        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
983        let data = estimate_test_data();
984        let mut exec_ctx = SESSION.create_execution_ctx();
985
986        let winner = compressor.choose_best_scheme(
987            &schemes,
988            &data,
989            CompressorContext::new(),
990            &mut exec_ctx,
991        )?;
992
993        assert!(matches!(
994            winner,
995            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
996                if scheme.id() == CallbackRatioScheme.id()
997        ));
998        Ok(())
999    }
1000
1001    #[test]
1002    fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
1003        let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
1004        let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
1005        let data = estimate_test_data();
1006        let mut exec_ctx = SESSION.create_execution_ctx();
1007
1008        let winner = compressor.choose_best_scheme(
1009            &schemes,
1010            &data,
1011            CompressorContext::new(),
1012            &mut exec_ctx,
1013        )?;
1014
1015        assert!(matches!(
1016            winner,
1017            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1018                if scheme.id() == HugeRatioScheme.id()
1019        ));
1020        Ok(())
1021    }
1022
1023    #[test]
1024    fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
1025        let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
1026        let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
1027        let data = estimate_test_data();
1028        let mut exec_ctx = SESSION.create_execution_ctx();
1029
1030        let winner = compressor.choose_best_scheme(
1031            &schemes,
1032            &data,
1033            CompressorContext::new(),
1034            &mut exec_ctx,
1035        )?;
1036
1037        assert!(matches!(
1038            winner,
1039            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1040                if scheme.id() == HugeRatioScheme.id()
1041        ));
1042        Ok(())
1043    }
1044
1045    #[test]
1046    fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
1047        let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
1048        let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
1049        let data = estimate_test_data();
1050        let mut exec_ctx = SESSION.create_execution_ctx();
1051
1052        let winner = compressor.choose_best_scheme(
1053            &schemes,
1054            &data,
1055            CompressorContext::new(),
1056            &mut exec_ctx,
1057        )?;
1058
1059        assert!(winner.is_none());
1060        Ok(())
1061    }
1062
1063    // Observer helper used by threshold-related tests. Captures the `best_so_far` value the
1064    // compressor passes to its deferred callback. `OBSERVER_LOCK` serializes tests that share
1065    // `OBSERVED_THRESHOLD` so they do not race.
1066    static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
1067    static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
1068
1069    #[derive(Debug)]
1070    struct ThresholdObservingScheme;
1071
1072    impl Scheme for ThresholdObservingScheme {
1073        fn scheme_name(&self) -> &'static str {
1074            "test.threshold_observing"
1075        }
1076
1077        fn matches(&self, canonical: &Canonical) -> bool {
1078            matches_integer_primitive(canonical)
1079        }
1080
1081        fn expected_compression_ratio(
1082            &self,
1083            _data: &ArrayAndStats,
1084            _compress_ctx: CompressorContext,
1085            _exec_ctx: &mut ExecutionCtx,
1086        ) -> CompressionEstimate {
1087            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1088                |_compressor, _data, best_so_far, _ctx, _exec_ctx| {
1089                    *OBSERVED_THRESHOLD.lock() = Some(best_so_far);
1090                    Ok(EstimateVerdict::Skip)
1091                },
1092            )))
1093        }
1094
1095        fn compress(
1096            &self,
1097            _compressor: &CascadingCompressor,
1098            _data: &ArrayAndStats,
1099            _compress_ctx: CompressorContext,
1100            _exec_ctx: &mut ExecutionCtx,
1101        ) -> VortexResult<ArrayRef> {
1102            unreachable!("test helper should never be selected for compression")
1103        }
1104    }
1105
1106    #[derive(Debug)]
1107    struct CallbackMatchingRatioScheme;
1108
1109    impl Scheme for CallbackMatchingRatioScheme {
1110        fn scheme_name(&self) -> &'static str {
1111            "test.callback_matching_ratio"
1112        }
1113
1114        fn matches(&self, canonical: &Canonical) -> bool {
1115            matches_integer_primitive(canonical)
1116        }
1117
1118        fn expected_compression_ratio(
1119            &self,
1120            _data: &ArrayAndStats,
1121            _compress_ctx: CompressorContext,
1122            _exec_ctx: &mut ExecutionCtx,
1123        ) -> CompressionEstimate {
1124            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1125                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
1126            )))
1127        }
1128
1129        fn compress(
1130            &self,
1131            _compressor: &CascadingCompressor,
1132            _data: &ArrayAndStats,
1133            _compress_ctx: CompressorContext,
1134            _exec_ctx: &mut ExecutionCtx,
1135        ) -> VortexResult<ArrayRef> {
1136            unreachable!("test helper should never be selected for compression")
1137        }
1138    }
1139
1140    #[test]
1141    fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
1142        // `HugeRatioScheme` returns an immediate `Ratio(100.0)` in pass 1;
1143        // `CallbackAlwaysUseScheme` returns `AlwaysUse` from its deferred callback in pass 2.
1144        // The deferred `AlwaysUse` must still win.
1145        let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
1146        let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
1147        let data = estimate_test_data();
1148        let mut exec_ctx = SESSION.create_execution_ctx();
1149
1150        let winner = compressor.choose_best_scheme(
1151            &schemes,
1152            &data,
1153            CompressorContext::new(),
1154            &mut exec_ctx,
1155        )?;
1156
1157        assert!(matches!(
1158            winner,
1159            Some((scheme, WinnerEstimate::AlwaysUse))
1160                if scheme.id() == CallbackAlwaysUseScheme.id()
1161        ));
1162        Ok(())
1163    }
1164
1165    #[test]
1166    fn threshold_reflects_pass_one_best() -> VortexResult<()> {
1167        let _guard = OBSERVER_LOCK.lock();
1168        *OBSERVED_THRESHOLD.lock() = None;
1169
1170        let compressor =
1171            CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
1172        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
1173        let data = estimate_test_data();
1174        let mut exec_ctx = SESSION.create_execution_ctx();
1175
1176        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1177
1178        let observed = *OBSERVED_THRESHOLD.lock();
1179        assert!(matches!(
1180            observed,
1181            Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
1182        ));
1183        Ok(())
1184    }
1185
1186    #[test]
1187    fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
1188        let _guard = OBSERVER_LOCK.lock();
1189        *OBSERVED_THRESHOLD.lock() = None;
1190
1191        let compressor =
1192            CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
1193        let schemes: [&'static dyn Scheme; 2] =
1194            [&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
1195        let data = estimate_test_data();
1196        let mut exec_ctx = SESSION.create_execution_ctx();
1197
1198        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1199
1200        // The observing callback was invoked (outer `Some`) and `best_so_far` was `None` (inner
1201        // `None`) because the zero-byte sample is never stored as the best.
1202        let observed = *OBSERVED_THRESHOLD.lock();
1203        assert_eq!(observed, Some(None));
1204        Ok(())
1205    }
1206
1207    #[test]
1208    fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
1209        let _guard = OBSERVER_LOCK.lock();
1210        *OBSERVED_THRESHOLD.lock() = None;
1211
1212        let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
1213        let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
1214        let data = estimate_test_data();
1215        let mut exec_ctx = SESSION.create_execution_ctx();
1216
1217        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1218
1219        let observed = *OBSERVED_THRESHOLD.lock();
1220        assert_eq!(observed, Some(None));
1221        Ok(())
1222    }
1223
1224    #[test]
1225    fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
1226        let _guard = OBSERVER_LOCK.lock();
1227        *OBSERVED_THRESHOLD.lock() = None;
1228
1229        // Both schemes are deferred. The first callback registers `Ratio(3.0)`; the second
1230        // callback must observe it as its threshold.
1231        let compressor =
1232            CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
1233        let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
1234        let data = estimate_test_data();
1235        let mut exec_ctx = SESSION.create_execution_ctx();
1236
1237        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1238
1239        let observed = *OBSERVED_THRESHOLD.lock();
1240        assert!(matches!(
1241            observed,
1242            Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
1243        ));
1244        Ok(())
1245    }
1246
1247    #[test]
1248    fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
1249        // Both schemes produce the same `Ratio(2.0)`, one from pass 1 (immediate) and one from
1250        // pass 2 (deferred callback). Pass 1 locks in first, and strict `>` tie-breaking means
1251        // the deferred callback's equal ratio cannot displace it.
1252        let compressor =
1253            CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
1254        let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
1255        let data = estimate_test_data();
1256        let mut exec_ctx = SESSION.create_execution_ctx();
1257
1258        let winner = compressor.choose_best_scheme(
1259            &schemes,
1260            &data,
1261            CompressorContext::new(),
1262            &mut exec_ctx,
1263        )?;
1264
1265        assert!(matches!(
1266            winner,
1267            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
1268                if scheme.id() == DirectRatioScheme.id() && r == 2.0
1269        ));
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn all_null_array_compresses_to_constant() -> VortexResult<()> {
1275        let array = PrimitiveArray::new(
1276            buffer![0i32, 0, 0, 0, 0],
1277            Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
1278        )
1279        .into_array();
1280
1281        // The compressor should produce a `ConstantArray` for an all-null array regardless of
1282        // which schemes are registered.
1283        let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
1284        let mut exec_ctx = SESSION.create_execution_ctx();
1285        let compressed = compressor.compress(&array, &mut exec_ctx)?;
1286        assert!(compressed.is::<Constant>());
1287        Ok(())
1288    }
1289
1290    /// Regression test for <https://github.com/vortex-data/vortex/issues/7227>.
1291    ///
1292    /// `estimate_compression_ratio_with_sampling` must use the *scheme's* stats options
1293    /// (which request distinct-value counting) rather than the context's stats options
1294    /// (which may not). With the old code this panicked inside `dictionary_encode` because
1295    /// distinct values were never computed for the sample.
1296    #[test]
1297    fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
1298        // Low-cardinality float array so FloatDictScheme considers it compressible.
1299        let array = PrimitiveArray::new(
1300            buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
1301            Validity::NonNullable,
1302        )
1303        .into_array();
1304
1305        let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
1306
1307        // A context with default stats_options (count_distinct_values = false) and
1308        // marked as a sample so the function skips the sampling step and compresses
1309        // the array directly.
1310        let ctx = CompressorContext::new().with_sampling();
1311
1312        // Before the fix this panicked with:
1313        //   "this must be present since `DictScheme` declared that we need distinct values"
1314        let mut exec_ctx = SESSION.create_execution_ctx();
1315        let score = estimate_compression_ratio_with_sampling(
1316            &compressor,
1317            &FloatDictScheme,
1318            &array,
1319            ctx,
1320            &mut exec_ctx,
1321        )?;
1322        assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
1323        Ok(())
1324    }
1325}