Skip to main content

vortex_compressor/
compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Cascading array compression implementation.
5
6use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::CanonicalValidity;
9use vortex_array::ExecutionCtx;
10use vortex_array::IntoArray;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::ExtensionArray;
13use vortex_array::arrays::FixedSizeListArray;
14use vortex_array::arrays::ListArray;
15use vortex_array::arrays::ListViewArray;
16use vortex_array::arrays::PrimitiveArray;
17use vortex_array::arrays::StructArray;
18use vortex_array::arrays::extension::ExtensionArrayExt;
19use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
20use vortex_array::arrays::list::ListArrayExt;
21use vortex_array::arrays::listview::ListViewArrayExt;
22use vortex_array::arrays::listview::list_from_list_view;
23use vortex_array::arrays::primitive::PrimitiveArrayExt;
24use vortex_array::arrays::scalar_fn::AnyScalarFn;
25use vortex_array::arrays::struct_::StructArrayExt;
26use vortex_array::dtype::DType;
27use vortex_array::dtype::Nullability;
28use vortex_array::scalar::Scalar;
29use vortex_error::VortexResult;
30use vortex_error::vortex_bail;
31
32use crate::builtins::IntDictScheme;
33use crate::ctx::CompressorContext;
34use crate::estimate::CompressionEstimate;
35use crate::estimate::DeferredEstimate;
36use crate::estimate::EstimateScore;
37use crate::estimate::EstimateVerdict;
38use crate::estimate::WinnerEstimate;
39use crate::estimate::estimate_compression_ratio_with_sampling;
40use crate::estimate::is_better_score;
41use crate::scheme::ChildSelection;
42use crate::scheme::DescendantExclusion;
43use crate::scheme::Scheme;
44use crate::scheme::SchemeExt;
45use crate::scheme::SchemeId;
46use crate::stats::ArrayAndStats;
47use crate::stats::GenerateStatsOptions;
48use crate::trace;
49
50/// Synthetic scheme ID used for the compressor's own root-level cascading.
51pub(crate) const ROOT_SCHEME_ID: SchemeId = SchemeId {
52    name: "vortex.compressor.root",
53};
54
55/// Child indices for the compressor's list/listview compression.
56mod root_list_children {
57    /// List/ListView offsets child.
58    pub const OFFSETS: usize = 1;
59    /// ListView sizes child.
60    pub const SIZES: usize = 2;
61}
62
63/// The main compressor type implementing cascading adaptive compression.
64///
65/// This compressor applies adaptive compression [`Scheme`]s to arrays based on their data types and
66/// characteristics. It recursively compresses nested structures like structs and lists, and chooses
67/// optimal compression schemes for leaf types.
68///
69/// The compressor works by:
70/// 1. Canonicalizing input arrays to a standard representation.
71/// 2. Pre-filtering schemes by [`Scheme::matches`] and exclusion rules.
72/// 3. Evaluating each matching scheme's compression estimate and resolving deferred work.
73/// 4. Compressing with the best scheme and verifying the result is smaller.
74///
75/// No scheme may appear twice in a cascade chain. The compressor enforces this automatically
76/// along with push/pull exclusion rules declared by each scheme.
77#[derive(Debug, Clone)]
78pub struct CascadingCompressor {
79    /// The enabled compression schemes.
80    schemes: Vec<&'static dyn Scheme>,
81
82    /// Descendant exclusion rules for the compressor's own cascading (e.g. excluding Dict from
83    /// list offsets).
84    root_exclusions: Vec<DescendantExclusion>,
85}
86
87impl CascadingCompressor {
88    /// Creates a new compressor with the given schemes.
89    ///
90    /// Root-level exclusion rules (e.g. excluding Dict from list offsets) are built
91    /// automatically.
92    pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
93        // Root exclusion: exclude IntDict from list/listview offsets (monotonically
94        // increasing data where dictionary encoding is wasteful).
95        let root_exclusions = vec![DescendantExclusion {
96            excluded: IntDictScheme.id(),
97            children: ChildSelection::One(root_list_children::OFFSETS),
98        }];
99        Self {
100            schemes,
101            root_exclusions,
102        }
103    }
104
105    /// Compresses an array using cascading adaptive compression.
106    ///
107    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if canonicalization or compression fails.
112    pub fn compress(
113        &self,
114        array: &ArrayRef,
115        exec_ctx: &mut ExecutionCtx,
116    ) -> VortexResult<ArrayRef> {
117        let before_nbytes = array.nbytes();
118        let span = trace::compress_span(array.len(), array.dtype(), before_nbytes);
119        let _enter = span.enter();
120
121        let canonical = array.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
122        let compact = canonical.compact()?;
123        let compressed = self.compress_canonical(compact, CompressorContext::new(), exec_ctx)?;
124
125        trace::record_compress_outcome(&span, before_nbytes, compressed.nbytes());
126
127        Ok(compressed)
128    }
129
130    /// Compresses a child array produced by a cascading scheme.
131    ///
132    /// If the cascade budget is exhausted, the canonical array is returned as-is. Otherwise, the
133    /// child context is created by descending and recording the parent scheme + child index, and
134    /// compression proceeds normally.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if compression fails.
139    pub fn compress_child(
140        &self,
141        child: &ArrayRef,
142        parent_ctx: &CompressorContext,
143        parent_id: SchemeId,
144        child_index: usize,
145        exec_ctx: &mut ExecutionCtx,
146    ) -> VortexResult<ArrayRef> {
147        if parent_ctx.finished_cascading() {
148            trace::cascade_exhausted(parent_id, child_index);
149            return Ok(child.clone());
150        }
151
152        let canonical = child.clone().execute::<CanonicalValidity>(exec_ctx)?.0;
153        let compact = canonical.compact()?;
154
155        let child_ctx = parent_ctx
156            .clone()
157            .descend_with_scheme(parent_id, child_index);
158        self.compress_canonical(compact, child_ctx, exec_ctx)
159    }
160
161    /// Compresses a canonical array by dispatching to type-specific logic.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if compression of any sub-array fails.
166    fn compress_canonical(
167        &self,
168        array: Canonical,
169        compress_ctx: CompressorContext,
170        exec_ctx: &mut ExecutionCtx,
171    ) -> VortexResult<ArrayRef> {
172        match array {
173            Canonical::Null(null_array) => Ok(null_array.into_array()),
174            Canonical::Bool(bool_array) => {
175                self.choose_and_compress(Canonical::Bool(bool_array), compress_ctx, exec_ctx)
176            }
177            Canonical::Primitive(primitive) => {
178                self.choose_and_compress(Canonical::Primitive(primitive), compress_ctx, exec_ctx)
179            }
180            Canonical::Decimal(decimal) => {
181                self.choose_and_compress(Canonical::Decimal(decimal), compress_ctx, exec_ctx)
182            }
183            Canonical::Struct(struct_array) => {
184                let fields = struct_array
185                    .iter_unmasked_fields()
186                    .map(|field| self.compress(field, exec_ctx))
187                    .collect::<Result<Vec<_>, _>>()?;
188
189                Ok(StructArray::try_new(
190                    struct_array.names().clone(),
191                    fields,
192                    struct_array.len(),
193                    struct_array.validity()?,
194                )?
195                .into_array())
196            }
197            Canonical::List(list_view_array) => {
198                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
199                    let list_array = list_from_list_view(list_view_array)?;
200                    self.compress_list_array(list_array, compress_ctx, exec_ctx)
201                } else {
202                    self.compress_list_view_array(list_view_array, compress_ctx, exec_ctx)
203                }
204            }
205            Canonical::FixedSizeList(fsl_array) => {
206                let compressed_elems = self.compress(fsl_array.elements(), exec_ctx)?;
207
208                Ok(FixedSizeListArray::try_new(
209                    compressed_elems,
210                    fsl_array.list_size(),
211                    fsl_array.validity()?,
212                    fsl_array.len(),
213                )?
214                .into_array())
215            }
216            Canonical::VarBinView(strings) => {
217                if strings
218                    .dtype()
219                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
220                {
221                    self.choose_and_compress(Canonical::VarBinView(strings), compress_ctx, exec_ctx)
222                } else {
223                    // We do not compress binary arrays.
224                    Ok(strings.into_array())
225                }
226            }
227            Canonical::Extension(ext_array) => {
228                let before_nbytes = ext_array.as_ref().nbytes();
229
230                // Try scheme-based compression first.
231                let result = self.choose_and_compress(
232                    Canonical::Extension(ext_array.clone()),
233                    compress_ctx,
234                    exec_ctx,
235                )?;
236                if result.nbytes() < before_nbytes {
237                    return Ok(result);
238                }
239
240                // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
241                if result.is::<AnyScalarFn>() {
242                    return Ok(result);
243                }
244
245                // Otherwise, fall back to compressing the underlying storage array.
246                let compressed_storage = self.compress(ext_array.storage_array(), exec_ctx)?;
247
248                Ok(
249                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
250                        .into_array(),
251                )
252            }
253            Canonical::Variant(_) => {
254                vortex_bail!("Variant arrays can not be compressed")
255            }
256        }
257    }
258
259    /// The main scheme-selection entry point for a single leaf array.
260    ///
261    /// Filters allowed schemes by [`matches`] and exclusion rules, merges their [`stats_options`]
262    /// into a single [`GenerateStatsOptions`], and picks the winner by estimated compression
263    /// ratio.
264    ///
265    /// If a winner is found and its compressed output is actually smaller, that output is
266    /// returned. Otherwise, the original array is returned unchanged.
267    ///
268    /// Empty and all-null arrays are short-circuited before any scheme evaluation.
269    ///
270    /// [`matches`]: Scheme::matches
271    /// [`stats_options`]: Scheme::stats_options
272    fn choose_and_compress(
273        &self,
274        canonical: Canonical,
275        compress_ctx: CompressorContext,
276        exec_ctx: &mut ExecutionCtx,
277    ) -> VortexResult<ArrayRef> {
278        let eligible_schemes: Vec<&'static dyn Scheme> = self
279            .schemes
280            .iter()
281            .copied()
282            .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &compress_ctx))
283            .collect();
284
285        let array: ArrayRef = canonical.into();
286
287        if eligible_schemes.is_empty() || array.is_empty() {
288            return Ok(array);
289        }
290
291        if array.all_invalid(exec_ctx)? {
292            return Ok(
293                ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
294            );
295        }
296
297        let before_nbytes = array.nbytes();
298
299        let merged_opts = eligible_schemes
300            .iter()
301            .fold(GenerateStatsOptions::default(), |acc, s| {
302                acc.merge(s.stats_options())
303            });
304        let compress_ctx = compress_ctx.with_merged_stats_options(merged_opts);
305
306        let data = ArrayAndStats::new(array, merged_opts);
307
308        let Some((winner, winner_estimate)) =
309            self.choose_best_scheme(&eligible_schemes, &data, compress_ctx.clone(), exec_ctx)?
310        else {
311            return Ok(data.into_array());
312        };
313
314        // Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the
315        // scheme name and cascade history before propagating.
316        let error_ctx = trace::enabled_error_context(&compress_ctx);
317        let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
318        let compressed = winner
319            .compress(self, &data, compress_ctx, exec_ctx)
320            .inspect_err(|err| {
321                // NB: this is the only way we can tell which scheme panicked / bailed on their
322                // data, especially for third-party schemes where the error site may not carry any
323                // compressor context.
324                trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
325            })?;
326
327        let after_nbytes = compressed.nbytes();
328        let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
329
330        // TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
331        let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
332
333        trace::record_winner_compress_result(
334            after_nbytes,
335            winner_estimate.trace_ratio(),
336            actual_ratio,
337            accepted,
338        );
339
340        if accepted {
341            Ok(compressed)
342        } else {
343            Ok(data.into_array())
344        }
345    }
346
347    /// Calls [`expected_compression_ratio`] on each candidate and returns the winning scheme along
348    /// with its resolved winner estimate, or `None` if no scheme beats the canonical encoding.
349    ///
350    /// Selection runs in two passes. Pass 1 evaluates every immediate
351    /// [`CompressionEstimate::Verdict`] and tracks the running best. [`Scheme`]s returning
352    /// [`CompressionEstimate::Deferred`] are stashed for pass 2 so that we do not make any
353    /// expensive computations if we don't have to.
354    ///
355    /// Pass 2 evaluates the deferred work and, for each [`DeferredEstimate::Callback`], passes the
356    /// current best [`EstimateScore`] as an early-exit hint so the callback can return
357    /// [`EstimateVerdict::Skip`] without doing expensive work when it cannot beat the threshold.
358    ///
359    /// Ties are broken by registration order within each pass.
360    ///
361    /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio
362    fn choose_best_scheme(
363        &self,
364        schemes: &[&'static dyn Scheme],
365        data: &ArrayAndStats,
366        compress_ctx: CompressorContext,
367        exec_ctx: &mut ExecutionCtx,
368    ) -> VortexResult<Option<(&'static dyn Scheme, WinnerEstimate)>> {
369        let mut best: Option<(&'static dyn Scheme, EstimateScore)> = None;
370        let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
371
372        // Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2.
373        {
374            let _verdict_pass = trace::verdict_pass_span().entered();
375            for &scheme in schemes {
376                match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
377                    CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
378                    CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
379                        return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
380                    }
381                    CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
382                        let score = EstimateScore::FiniteCompression(ratio);
383
384                        if is_better_score(score, best.as_ref()) {
385                            best = Some((scheme, score));
386                        }
387                    }
388                    CompressionEstimate::Deferred(deferred_estimate) => {
389                        deferred.push((scheme, deferred_estimate));
390                    }
391                }
392            }
393        }
394
395        // Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can
396        // short-circuit with `Skip` when they cannot beat it.
397        for (scheme, deferred_estimate) in deferred {
398            let _span = trace::scheme_eval_span(scheme.id()).entered();
399            let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
400            match deferred_estimate {
401                DeferredEstimate::Sample => {
402                    let score = estimate_compression_ratio_with_sampling(
403                        self,
404                        scheme,
405                        data.array(),
406                        compress_ctx.clone(),
407                        exec_ctx,
408                    )?;
409
410                    if is_better_score(score, best.as_ref()) {
411                        best = Some((scheme, score));
412                    }
413                }
414                DeferredEstimate::Callback(callback) => {
415                    match callback(self, data, threshold, compress_ctx.clone(), exec_ctx)? {
416                        EstimateVerdict::Skip => {}
417                        EstimateVerdict::AlwaysUse => {
418                            return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
419                        }
420                        EstimateVerdict::Ratio(ratio) => {
421                            let score = EstimateScore::FiniteCompression(ratio);
422
423                            if is_better_score(score, best.as_ref()) {
424                                best = Some((scheme, score));
425                            }
426                        }
427                    }
428                }
429            }
430        }
431
432        Ok(best.map(|(scheme, score)| (scheme, WinnerEstimate::Score(score))))
433    }
434
435    // TODO(connor): Lots of room for optimization here.
436    /// Returns `true` if the candidate scheme should be excluded based on the cascade history and
437    /// exclusion rules.
438    fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
439        let id = candidate.id();
440        let history = ctx.cascade_history();
441
442        // Self-exclusion: no scheme appears twice in any chain.
443        if history.iter().any(|&(sid, _)| sid == id) {
444            return true;
445        }
446
447        let mut iter = history.iter().copied().peekable();
448
449        // The root entry is always first in the history (if present). Check if the root has
450        // excluded us.
451        if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
452            && self
453                .root_exclusions
454                .iter()
455                .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
456        {
457            return true;
458        }
459
460        // Push rules: Check if any of our ancestors have excluded us.
461        for (ancestor_id, child_idx) in iter {
462            if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
463                && ancestor
464                    .descendant_exclusions()
465                    .iter()
466                    .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
467            {
468                return true;
469            }
470        }
471
472        // Pull rules: Check if we have excluded ourselves because of our ancestors.
473        for rule in candidate.ancestor_exclusions() {
474            if history
475                .iter()
476                .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
477            {
478                return true;
479            }
480        }
481
482        false
483    }
484
485    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
486    fn compress_list_array(
487        &self,
488        list_array: ListArray,
489        compress_ctx: CompressorContext,
490        exec_ctx: &mut ExecutionCtx,
491    ) -> VortexResult<ArrayRef> {
492        let list_array = list_array.reset_offsets(true)?;
493
494        let compressed_elems = self.compress(list_array.elements(), exec_ctx)?;
495
496        // Record the root scheme with the offsets child index so root exclusion rules apply.
497        let offset_ctx =
498            compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
499        let list_offsets_primitive = list_array
500            .offsets()
501            .clone()
502            .execute::<PrimitiveArray>(exec_ctx)?
503            .narrow()?;
504        let compressed_offsets = self.compress_canonical(
505            Canonical::Primitive(list_offsets_primitive),
506            offset_ctx,
507            exec_ctx,
508        )?;
509
510        Ok(
511            ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
512                .into_array(),
513        )
514    }
515
516    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
517    /// elements.
518    fn compress_list_view_array(
519        &self,
520        list_view: ListViewArray,
521        compress_ctx: CompressorContext,
522        exec_ctx: &mut ExecutionCtx,
523    ) -> VortexResult<ArrayRef> {
524        let compressed_elems = self.compress(list_view.elements(), exec_ctx)?;
525
526        let offset_ctx = compress_ctx
527            .clone()
528            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
529        let list_view_offsets_primitive = list_view
530            .offsets()
531            .clone()
532            .execute::<PrimitiveArray>(exec_ctx)?
533            .narrow()?;
534        let compressed_offsets = self.compress_canonical(
535            Canonical::Primitive(list_view_offsets_primitive),
536            offset_ctx,
537            exec_ctx,
538        )?;
539
540        let sizes_ctx = compress_ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
541        let list_view_sizes_primitive = list_view
542            .sizes()
543            .clone()
544            .execute::<PrimitiveArray>(exec_ctx)?
545            .narrow()?;
546        let compressed_sizes = self.compress_canonical(
547            Canonical::Primitive(list_view_sizes_primitive),
548            sizes_ctx,
549            exec_ctx,
550        )?;
551
552        Ok(ListViewArray::try_new(
553            compressed_elems,
554            compressed_offsets,
555            compressed_sizes,
556            list_view.validity()?,
557        )?
558        .into_array())
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use std::sync::LazyLock;
565
566    use parking_lot::Mutex;
567    use vortex_array::ArrayRef;
568    use vortex_array::Canonical;
569    use vortex_array::VortexSessionExecute;
570    use vortex_array::arrays::BoolArray;
571    use vortex_array::arrays::Constant;
572    use vortex_array::arrays::NullArray;
573    use vortex_array::arrays::PrimitiveArray;
574    use vortex_array::session::ArraySession;
575    use vortex_array::validity::Validity;
576    use vortex_buffer::buffer;
577    use vortex_session::VortexSession;
578
579    use super::*;
580    use crate::builtins::FloatDictScheme;
581    use crate::builtins::IntDictScheme;
582    use crate::builtins::StringDictScheme;
583    use crate::ctx::CompressorContext;
584    use crate::estimate::CompressionEstimate;
585    use crate::estimate::DeferredEstimate;
586    use crate::estimate::EstimateScore;
587    use crate::estimate::EstimateVerdict;
588    use crate::estimate::WinnerEstimate;
589    use crate::scheme::SchemeExt;
590
591    static SESSION: LazyLock<VortexSession> =
592        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
593
594    fn compressor() -> CascadingCompressor {
595        CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
596    }
597
598    fn estimate_test_data() -> ArrayAndStats {
599        let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array();
600        ArrayAndStats::new(array, GenerateStatsOptions::default())
601    }
602
603    fn matches_integer_primitive(canonical: &Canonical) -> bool {
604        matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int())
605    }
606
607    #[derive(Debug)]
608    struct DirectRatioScheme;
609
610    impl Scheme for DirectRatioScheme {
611        fn scheme_name(&self) -> &'static str {
612            "test.direct_ratio"
613        }
614
615        fn matches(&self, canonical: &Canonical) -> bool {
616            matches_integer_primitive(canonical)
617        }
618
619        fn expected_compression_ratio(
620            &self,
621            _data: &ArrayAndStats,
622            _compress_ctx: CompressorContext,
623            _exec_ctx: &mut ExecutionCtx,
624        ) -> CompressionEstimate {
625            CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0))
626        }
627
628        fn compress(
629            &self,
630            _compressor: &CascadingCompressor,
631            _data: &ArrayAndStats,
632            _compress_ctx: CompressorContext,
633            _exec_ctx: &mut ExecutionCtx,
634        ) -> VortexResult<ArrayRef> {
635            unreachable!("test helper should never be selected for compression")
636        }
637    }
638
639    #[derive(Debug)]
640    struct ImmediateAlwaysUseScheme;
641
642    impl Scheme for ImmediateAlwaysUseScheme {
643        fn scheme_name(&self) -> &'static str {
644            "test.immediate_always_use"
645        }
646
647        fn matches(&self, canonical: &Canonical) -> bool {
648            matches_integer_primitive(canonical)
649        }
650
651        fn expected_compression_ratio(
652            &self,
653            _data: &ArrayAndStats,
654            _compress_ctx: CompressorContext,
655            _exec_ctx: &mut ExecutionCtx,
656        ) -> CompressionEstimate {
657            CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
658        }
659
660        fn compress(
661            &self,
662            _compressor: &CascadingCompressor,
663            _data: &ArrayAndStats,
664            _compress_ctx: CompressorContext,
665            _exec_ctx: &mut ExecutionCtx,
666        ) -> VortexResult<ArrayRef> {
667            unreachable!("test helper should never be selected for compression")
668        }
669    }
670
671    #[derive(Debug)]
672    struct CallbackAlwaysUseScheme;
673
674    impl Scheme for CallbackAlwaysUseScheme {
675        fn scheme_name(&self) -> &'static str {
676            "test.callback_always_use"
677        }
678
679        fn matches(&self, canonical: &Canonical) -> bool {
680            matches_integer_primitive(canonical)
681        }
682
683        fn expected_compression_ratio(
684            &self,
685            _data: &ArrayAndStats,
686            _compress_ctx: CompressorContext,
687            _exec_ctx: &mut ExecutionCtx,
688        ) -> CompressionEstimate {
689            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
690                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::AlwaysUse),
691            )))
692        }
693
694        fn compress(
695            &self,
696            _compressor: &CascadingCompressor,
697            _data: &ArrayAndStats,
698            _compress_ctx: CompressorContext,
699            _exec_ctx: &mut ExecutionCtx,
700        ) -> VortexResult<ArrayRef> {
701            unreachable!("test helper should never be selected for compression")
702        }
703    }
704
705    #[derive(Debug)]
706    struct CallbackSkipScheme;
707
708    impl Scheme for CallbackSkipScheme {
709        fn scheme_name(&self) -> &'static str {
710            "test.callback_skip"
711        }
712
713        fn matches(&self, canonical: &Canonical) -> bool {
714            matches_integer_primitive(canonical)
715        }
716
717        fn expected_compression_ratio(
718            &self,
719            _data: &ArrayAndStats,
720            _compress_ctx: CompressorContext,
721            _exec_ctx: &mut ExecutionCtx,
722        ) -> CompressionEstimate {
723            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
724                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Skip),
725            )))
726        }
727
728        fn compress(
729            &self,
730            _compressor: &CascadingCompressor,
731            _data: &ArrayAndStats,
732            _compress_ctx: CompressorContext,
733            _exec_ctx: &mut ExecutionCtx,
734        ) -> VortexResult<ArrayRef> {
735            unreachable!("test helper should never be selected for compression")
736        }
737    }
738
739    #[derive(Debug)]
740    struct CallbackRatioScheme;
741
742    impl Scheme for CallbackRatioScheme {
743        fn scheme_name(&self) -> &'static str {
744            "test.callback_ratio"
745        }
746
747        fn matches(&self, canonical: &Canonical) -> bool {
748            matches_integer_primitive(canonical)
749        }
750
751        fn expected_compression_ratio(
752            &self,
753            _data: &ArrayAndStats,
754            _compress_ctx: CompressorContext,
755            _exec_ctx: &mut ExecutionCtx,
756        ) -> CompressionEstimate {
757            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
758                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(3.0)),
759            )))
760        }
761
762        fn compress(
763            &self,
764            _compressor: &CascadingCompressor,
765            _data: &ArrayAndStats,
766            _compress_ctx: CompressorContext,
767            _exec_ctx: &mut ExecutionCtx,
768        ) -> VortexResult<ArrayRef> {
769            unreachable!("test helper should never be selected for compression")
770        }
771    }
772
773    #[derive(Debug)]
774    struct HugeRatioScheme;
775
776    impl Scheme for HugeRatioScheme {
777        fn scheme_name(&self) -> &'static str {
778            "test.huge_ratio"
779        }
780
781        fn matches(&self, canonical: &Canonical) -> bool {
782            matches_integer_primitive(canonical)
783        }
784
785        fn expected_compression_ratio(
786            &self,
787            _data: &ArrayAndStats,
788            _compress_ctx: CompressorContext,
789            _exec_ctx: &mut ExecutionCtx,
790        ) -> CompressionEstimate {
791            CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0))
792        }
793
794        fn compress(
795            &self,
796            _compressor: &CascadingCompressor,
797            _data: &ArrayAndStats,
798            _compress_ctx: CompressorContext,
799            _exec_ctx: &mut ExecutionCtx,
800        ) -> VortexResult<ArrayRef> {
801            unreachable!("test helper should never be selected for compression")
802        }
803    }
804
805    #[derive(Debug)]
806    struct ZeroBytesSamplingScheme;
807
808    impl Scheme for ZeroBytesSamplingScheme {
809        fn scheme_name(&self) -> &'static str {
810            "test.zero_bytes_sampling"
811        }
812
813        fn matches(&self, canonical: &Canonical) -> bool {
814            matches_integer_primitive(canonical)
815        }
816
817        fn expected_compression_ratio(
818            &self,
819            _data: &ArrayAndStats,
820            _compress_ctx: CompressorContext,
821            _exec_ctx: &mut ExecutionCtx,
822        ) -> CompressionEstimate {
823            CompressionEstimate::Deferred(DeferredEstimate::Sample)
824        }
825
826        fn compress(
827            &self,
828            _compressor: &CascadingCompressor,
829            data: &ArrayAndStats,
830            _compress_ctx: CompressorContext,
831            _exec_ctx: &mut ExecutionCtx,
832        ) -> VortexResult<ArrayRef> {
833            Ok(NullArray::new(data.array().len()).into_array())
834        }
835    }
836
837    #[test]
838    fn test_self_exclusion() {
839        let c = compressor();
840        let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
841
842        // IntDictScheme is in the history, so it should be excluded.
843        assert!(c.is_excluded(&IntDictScheme, &ctx));
844    }
845
846    #[test]
847    fn test_root_exclusion_list_offsets() {
848        let c = compressor();
849        let ctx = CompressorContext::default()
850            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
851
852        // IntDict should be excluded for list offsets.
853        assert!(c.is_excluded(&IntDictScheme, &ctx));
854    }
855
856    #[test]
857    fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
858        let c = compressor();
859        // FloatDict cascading through codes (child 1).
860        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
861
862        // IntDict should be excluded from FloatDict's codes child.
863        assert!(c.is_excluded(&IntDictScheme, &ctx));
864    }
865
866    #[test]
867    fn test_push_rule_float_dict_excludes_int_dict_from_values() {
868        let c = compressor();
869        // FloatDict cascading through values (child 0).
870        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
871
872        // IntDict should also be excluded from FloatDict's values child (ALP propagation
873        // replacement).
874        assert!(c.is_excluded(&IntDictScheme, &ctx));
875    }
876
877    #[test]
878    fn test_no_exclusion_without_history() {
879        let c = compressor();
880        let ctx = CompressorContext::default();
881
882        // No history means no exclusions.
883        assert!(!c.is_excluded(&IntDictScheme, &ctx));
884    }
885
886    #[test]
887    fn immediate_always_use_wins_immediately() -> VortexResult<()> {
888        let compressor =
889            CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]);
890        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme];
891        let data = estimate_test_data();
892        let mut exec_ctx = SESSION.create_execution_ctx();
893
894        let winner = compressor.choose_best_scheme(
895            &schemes,
896            &data,
897            CompressorContext::new(),
898            &mut exec_ctx,
899        )?;
900
901        assert!(matches!(
902            winner,
903            Some((scheme, WinnerEstimate::AlwaysUse))
904                if scheme.id() == ImmediateAlwaysUseScheme.id()
905        ));
906        Ok(())
907    }
908
909    #[test]
910    fn callback_always_use_wins_immediately() -> VortexResult<()> {
911        let compressor =
912            CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]);
913        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme];
914        let data = estimate_test_data();
915        let mut exec_ctx = SESSION.create_execution_ctx();
916
917        let winner = compressor.choose_best_scheme(
918            &schemes,
919            &data,
920            CompressorContext::new(),
921            &mut exec_ctx,
922        )?;
923
924        assert!(matches!(
925            winner,
926            Some((scheme, WinnerEstimate::AlwaysUse))
927                if scheme.id() == CallbackAlwaysUseScheme.id()
928        ));
929        Ok(())
930    }
931
932    #[test]
933    fn callback_skip_is_ignored() -> VortexResult<()> {
934        let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]);
935        let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme];
936        let data = estimate_test_data();
937        let mut exec_ctx = SESSION.create_execution_ctx();
938
939        let winner = compressor.choose_best_scheme(
940            &schemes,
941            &data,
942            CompressorContext::new(),
943            &mut exec_ctx,
944        )?;
945
946        assert!(matches!(
947            winner,
948            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(2.0))))
949                if scheme.id() == DirectRatioScheme.id()
950        ));
951        Ok(())
952    }
953
954    #[test]
955    fn callback_ratio_competes_numerically() -> VortexResult<()> {
956        let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]);
957        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme];
958        let data = estimate_test_data();
959        let mut exec_ctx = SESSION.create_execution_ctx();
960
961        let winner = compressor.choose_best_scheme(
962            &schemes,
963            &data,
964            CompressorContext::new(),
965            &mut exec_ctx,
966        )?;
967
968        assert!(matches!(
969            winner,
970            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(3.0))))
971                if scheme.id() == CallbackRatioScheme.id()
972        ));
973        Ok(())
974    }
975
976    #[test]
977    fn zero_byte_sample_loses_to_finite_ratio() -> VortexResult<()> {
978        let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &ZeroBytesSamplingScheme]);
979        let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &ZeroBytesSamplingScheme];
980        let data = estimate_test_data();
981        let mut exec_ctx = SESSION.create_execution_ctx();
982
983        let winner = compressor.choose_best_scheme(
984            &schemes,
985            &data,
986            CompressorContext::new(),
987            &mut exec_ctx,
988        )?;
989
990        assert!(matches!(
991            winner,
992            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
993                if scheme.id() == HugeRatioScheme.id()
994        ));
995        Ok(())
996    }
997
998    #[test]
999    fn finite_ratio_displaces_zero_byte_sample() -> VortexResult<()> {
1000        let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &HugeRatioScheme]);
1001        let schemes: [&'static dyn Scheme; 2] = [&ZeroBytesSamplingScheme, &HugeRatioScheme];
1002        let data = estimate_test_data();
1003        let mut exec_ctx = SESSION.create_execution_ctx();
1004
1005        let winner = compressor.choose_best_scheme(
1006            &schemes,
1007            &data,
1008            CompressorContext::new(),
1009            &mut exec_ctx,
1010        )?;
1011
1012        assert!(matches!(
1013            winner,
1014            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(100.0))))
1015                if scheme.id() == HugeRatioScheme.id()
1016        ));
1017        Ok(())
1018    }
1019
1020    #[test]
1021    fn zero_byte_sample_alone_selects_no_scheme() -> VortexResult<()> {
1022        let compressor = CascadingCompressor::new(vec![&ZeroBytesSamplingScheme]);
1023        let schemes: [&'static dyn Scheme; 1] = [&ZeroBytesSamplingScheme];
1024        let data = estimate_test_data();
1025        let mut exec_ctx = SESSION.create_execution_ctx();
1026
1027        let winner = compressor.choose_best_scheme(
1028            &schemes,
1029            &data,
1030            CompressorContext::new(),
1031            &mut exec_ctx,
1032        )?;
1033
1034        assert!(winner.is_none());
1035        Ok(())
1036    }
1037
1038    // Observer helper used by threshold-related tests. Captures the `best_so_far` value the
1039    // compressor passes to its deferred callback. `OBSERVER_LOCK` serializes tests that share
1040    // `OBSERVED_THRESHOLD` so they do not race.
1041    static OBSERVER_LOCK: Mutex<()> = Mutex::new(());
1042    static OBSERVED_THRESHOLD: Mutex<Option<Option<EstimateScore>>> = Mutex::new(None);
1043
1044    #[derive(Debug)]
1045    struct ThresholdObservingScheme;
1046
1047    impl Scheme for ThresholdObservingScheme {
1048        fn scheme_name(&self) -> &'static str {
1049            "test.threshold_observing"
1050        }
1051
1052        fn matches(&self, canonical: &Canonical) -> bool {
1053            matches_integer_primitive(canonical)
1054        }
1055
1056        fn expected_compression_ratio(
1057            &self,
1058            _data: &ArrayAndStats,
1059            _compress_ctx: CompressorContext,
1060            _exec_ctx: &mut ExecutionCtx,
1061        ) -> CompressionEstimate {
1062            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1063                |_compressor, _data, best_so_far, _ctx, _exec_ctx| {
1064                    *OBSERVED_THRESHOLD.lock() = Some(best_so_far);
1065                    Ok(EstimateVerdict::Skip)
1066                },
1067            )))
1068        }
1069
1070        fn compress(
1071            &self,
1072            _compressor: &CascadingCompressor,
1073            _data: &ArrayAndStats,
1074            _compress_ctx: CompressorContext,
1075            _exec_ctx: &mut ExecutionCtx,
1076        ) -> VortexResult<ArrayRef> {
1077            unreachable!("test helper should never be selected for compression")
1078        }
1079    }
1080
1081    #[derive(Debug)]
1082    struct CallbackMatchingRatioScheme;
1083
1084    impl Scheme for CallbackMatchingRatioScheme {
1085        fn scheme_name(&self) -> &'static str {
1086            "test.callback_matching_ratio"
1087        }
1088
1089        fn matches(&self, canonical: &Canonical) -> bool {
1090            matches_integer_primitive(canonical)
1091        }
1092
1093        fn expected_compression_ratio(
1094            &self,
1095            _data: &ArrayAndStats,
1096            _compress_ctx: CompressorContext,
1097            _exec_ctx: &mut ExecutionCtx,
1098        ) -> CompressionEstimate {
1099            CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
1100                |_compressor, _data, _ctx, _exec_ctx, _best_so_far| Ok(EstimateVerdict::Ratio(2.0)),
1101            )))
1102        }
1103
1104        fn compress(
1105            &self,
1106            _compressor: &CascadingCompressor,
1107            _data: &ArrayAndStats,
1108            _compress_ctx: CompressorContext,
1109            _exec_ctx: &mut ExecutionCtx,
1110        ) -> VortexResult<ArrayRef> {
1111            unreachable!("test helper should never be selected for compression")
1112        }
1113    }
1114
1115    #[test]
1116    fn callback_always_use_overrides_pass_one_best() -> VortexResult<()> {
1117        // `HugeRatioScheme` returns an immediate `Ratio(100.0)` in pass 1;
1118        // `CallbackAlwaysUseScheme` returns `AlwaysUse` from its deferred callback in pass 2.
1119        // The deferred `AlwaysUse` must still win.
1120        let compressor = CascadingCompressor::new(vec![&HugeRatioScheme, &CallbackAlwaysUseScheme]);
1121        let schemes: [&'static dyn Scheme; 2] = [&HugeRatioScheme, &CallbackAlwaysUseScheme];
1122        let data = estimate_test_data();
1123        let mut exec_ctx = SESSION.create_execution_ctx();
1124
1125        let winner = compressor.choose_best_scheme(
1126            &schemes,
1127            &data,
1128            CompressorContext::new(),
1129            &mut exec_ctx,
1130        )?;
1131
1132        assert!(matches!(
1133            winner,
1134            Some((scheme, WinnerEstimate::AlwaysUse))
1135                if scheme.id() == CallbackAlwaysUseScheme.id()
1136        ));
1137        Ok(())
1138    }
1139
1140    #[test]
1141    fn threshold_reflects_pass_one_best() -> VortexResult<()> {
1142        let _guard = OBSERVER_LOCK.lock();
1143        *OBSERVED_THRESHOLD.lock() = None;
1144
1145        let compressor =
1146            CascadingCompressor::new(vec![&DirectRatioScheme, &ThresholdObservingScheme]);
1147        let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ThresholdObservingScheme];
1148        let data = estimate_test_data();
1149        let mut exec_ctx = SESSION.create_execution_ctx();
1150
1151        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1152
1153        let observed = *OBSERVED_THRESHOLD.lock();
1154        assert!(matches!(
1155            observed,
1156            Some(Some(EstimateScore::FiniteCompression(r))) if r == 2.0
1157        ));
1158        Ok(())
1159    }
1160
1161    #[test]
1162    fn threshold_is_none_when_only_prior_is_zero_bytes() -> VortexResult<()> {
1163        let _guard = OBSERVER_LOCK.lock();
1164        *OBSERVED_THRESHOLD.lock() = None;
1165
1166        let compressor =
1167            CascadingCompressor::new(vec![&ZeroBytesSamplingScheme, &ThresholdObservingScheme]);
1168        let schemes: [&'static dyn Scheme; 2] =
1169            [&ZeroBytesSamplingScheme, &ThresholdObservingScheme];
1170        let data = estimate_test_data();
1171        let mut exec_ctx = SESSION.create_execution_ctx();
1172
1173        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1174
1175        // The observing callback was invoked (outer `Some`) and `best_so_far` was `None` (inner
1176        // `None`) because the zero-byte sample is never stored as the best.
1177        let observed = *OBSERVED_THRESHOLD.lock();
1178        assert_eq!(observed, Some(None));
1179        Ok(())
1180    }
1181
1182    #[test]
1183    fn threshold_is_none_when_no_prior_scheme() -> VortexResult<()> {
1184        let _guard = OBSERVER_LOCK.lock();
1185        *OBSERVED_THRESHOLD.lock() = None;
1186
1187        let compressor = CascadingCompressor::new(vec![&ThresholdObservingScheme]);
1188        let schemes: [&'static dyn Scheme; 1] = [&ThresholdObservingScheme];
1189        let data = estimate_test_data();
1190        let mut exec_ctx = SESSION.create_execution_ctx();
1191
1192        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1193
1194        let observed = *OBSERVED_THRESHOLD.lock();
1195        assert_eq!(observed, Some(None));
1196        Ok(())
1197    }
1198
1199    #[test]
1200    fn threshold_updates_from_earlier_deferred_callback() -> VortexResult<()> {
1201        let _guard = OBSERVER_LOCK.lock();
1202        *OBSERVED_THRESHOLD.lock() = None;
1203
1204        // Both schemes are deferred. The first callback registers `Ratio(3.0)`; the second
1205        // callback must observe it as its threshold.
1206        let compressor =
1207            CascadingCompressor::new(vec![&CallbackRatioScheme, &ThresholdObservingScheme]);
1208        let schemes: [&'static dyn Scheme; 2] = [&CallbackRatioScheme, &ThresholdObservingScheme];
1209        let data = estimate_test_data();
1210        let mut exec_ctx = SESSION.create_execution_ctx();
1211
1212        compressor.choose_best_scheme(&schemes, &data, CompressorContext::new(), &mut exec_ctx)?;
1213
1214        let observed = *OBSERVED_THRESHOLD.lock();
1215        assert!(matches!(
1216            observed,
1217            Some(Some(EstimateScore::FiniteCompression(r))) if r == 3.0
1218        ));
1219        Ok(())
1220    }
1221
1222    #[test]
1223    fn ratio_tie_between_immediate_and_deferred_favors_immediate() -> VortexResult<()> {
1224        // Both schemes produce the same `Ratio(2.0)`, one from pass 1 (immediate) and one from
1225        // pass 2 (deferred callback). Pass 1 locks in first, and strict `>` tie-breaking means
1226        // the deferred callback's equal ratio cannot displace it.
1227        let compressor =
1228            CascadingCompressor::new(vec![&CallbackMatchingRatioScheme, &DirectRatioScheme]);
1229        let schemes: [&'static dyn Scheme; 2] = [&CallbackMatchingRatioScheme, &DirectRatioScheme];
1230        let data = estimate_test_data();
1231        let mut exec_ctx = SESSION.create_execution_ctx();
1232
1233        let winner = compressor.choose_best_scheme(
1234            &schemes,
1235            &data,
1236            CompressorContext::new(),
1237            &mut exec_ctx,
1238        )?;
1239
1240        assert!(matches!(
1241            winner,
1242            Some((scheme, WinnerEstimate::Score(EstimateScore::FiniteCompression(r))))
1243                if scheme.id() == DirectRatioScheme.id() && r == 2.0
1244        ));
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn all_null_array_compresses_to_constant() -> VortexResult<()> {
1250        let array = PrimitiveArray::new(
1251            buffer![0i32, 0, 0, 0, 0],
1252            Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
1253        )
1254        .into_array();
1255
1256        // The compressor should produce a `ConstantArray` for an all-null array regardless of
1257        // which schemes are registered.
1258        let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
1259        let mut exec_ctx = SESSION.create_execution_ctx();
1260        let compressed = compressor.compress(&array, &mut exec_ctx)?;
1261        assert!(compressed.is::<Constant>());
1262        Ok(())
1263    }
1264
1265    /// Regression test for <https://github.com/vortex-data/vortex/issues/7227>.
1266    ///
1267    /// `estimate_compression_ratio_with_sampling` must use the *scheme's* stats options
1268    /// (which request distinct-value counting) rather than the context's stats options
1269    /// (which may not). With the old code this panicked inside `dictionary_encode` because
1270    /// distinct values were never computed for the sample.
1271    #[test]
1272    fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
1273        // Low-cardinality float array so FloatDictScheme considers it compressible.
1274        let array = PrimitiveArray::new(
1275            buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
1276            Validity::NonNullable,
1277        )
1278        .into_array();
1279
1280        let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
1281
1282        // A context with default stats_options (count_distinct_values = false) and
1283        // marked as a sample so the function skips the sampling step and compresses
1284        // the array directly.
1285        let ctx = CompressorContext::new().with_sampling();
1286
1287        // Before the fix this panicked with:
1288        //   "this must be present since `DictScheme` declared that we need distinct values"
1289        let mut exec_ctx = SESSION.create_execution_ctx();
1290        let score = estimate_compression_ratio_with_sampling(
1291            &compressor,
1292            &FloatDictScheme,
1293            &array,
1294            ctx,
1295            &mut exec_ctx,
1296        )?;
1297        assert!(matches!(score, EstimateScore::FiniteCompression(ratio) if ratio.is_finite()));
1298        Ok(())
1299    }
1300}