Skip to main content

vortex_compressor/
compressor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Cascading array compression implementation.
5
6use std::sync::Arc;
7
8use parking_lot::Mutex;
9use parking_lot::MutexGuard;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::CanonicalValidity;
13use vortex_array::ExecutionCtx;
14use vortex_array::IntoArray;
15use vortex_array::LEGACY_SESSION;
16use vortex_array::ToCanonical;
17use vortex_array::VortexSessionExecute;
18use vortex_array::arrays::ConstantArray;
19use vortex_array::arrays::ExtensionArray;
20use vortex_array::arrays::FixedSizeListArray;
21use vortex_array::arrays::ListArray;
22use vortex_array::arrays::ListViewArray;
23use vortex_array::arrays::StructArray;
24use vortex_array::arrays::extension::ExtensionArrayExt;
25use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
26use vortex_array::arrays::list::ListArrayExt;
27use vortex_array::arrays::listview::ListViewArrayExt;
28use vortex_array::arrays::listview::list_from_list_view;
29use vortex_array::arrays::primitive::PrimitiveArrayExt;
30use vortex_array::arrays::struct_::StructArrayExt;
31use vortex_array::dtype::DType;
32use vortex_array::dtype::Nullability;
33use vortex_array::scalar::Scalar;
34use vortex_error::VortexResult;
35use vortex_error::vortex_bail;
36use vortex_error::vortex_panic;
37
38use crate::builtins::IntDictScheme;
39use crate::ctx::CompressorContext;
40use crate::estimate::CompressionEstimate;
41use crate::estimate::estimate_compression_ratio_with_sampling;
42use crate::estimate::is_better_ratio;
43use crate::scheme::ChildSelection;
44use crate::scheme::DescendantExclusion;
45use crate::scheme::Scheme;
46use crate::scheme::SchemeExt;
47use crate::scheme::SchemeId;
48use crate::stats::ArrayAndStats;
49use crate::stats::GenerateStatsOptions;
50
51/// The implicit root scheme ID for the compressor's own cascading (e.g. list offset compression).
52///
53/// This is the **only** [`SchemeId`] that is not auto-provided via [`SchemeExt`].
54const ROOT_SCHEME_ID: SchemeId = SchemeId {
55    name: "vortex.compressor.root",
56};
57
58/// Child indices for the compressor's list/listview compression.
59mod root_list_children {
60    /// List/ListView offsets child.
61    pub const OFFSETS: usize = 1;
62    /// ListView sizes child.
63    pub const SIZES: usize = 2;
64}
65
66/// The main compressor type implementing cascading adaptive compression.
67///
68/// This compressor applies adaptive compression [`Scheme`]s to arrays based on their data types and
69/// characteristics. It recursively compresses nested structures like structs and lists, and chooses
70/// optimal compression schemes for leaf types.
71///
72/// The compressor works by:
73/// 1. Canonicalizing input arrays to a standard representation.
74/// 2. Pre-filtering schemes by [`Scheme::matches`] and exclusion rules.
75/// 3. Evaluating each matching scheme's compression ratio on a sample.
76/// 4. Compressing with the best scheme and verifying the result is smaller.
77///
78/// No scheme may appear twice in a cascade chain. The compressor enforces this automatically
79/// along with push/pull exclusion rules declared by each scheme.
80#[derive(Debug, Clone)]
81pub struct CascadingCompressor {
82    /// The enabled compression schemes.
83    schemes: Vec<&'static dyn Scheme>,
84
85    /// Descendant exclusion rules for the compressor's own cascading (e.g. excluding Dict from
86    /// list offsets).
87    root_exclusions: Vec<DescendantExclusion>,
88
89    /// Shared execution context for array operations during compression.
90    ///
91    /// This should have low contention as we only execute arrays one at a time during compression.
92    ctx: Arc<Mutex<ExecutionCtx>>,
93}
94
95impl CascadingCompressor {
96    /// Creates a new compressor with the given schemes.
97    ///
98    /// Root-level exclusion rules (e.g. excluding Dict from list offsets) are built
99    /// automatically.
100    pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
101        // Root exclusion: exclude IntDict from list/listview offsets (monotonically
102        // increasing data where dictionary encoding is wasteful).
103        let root_exclusions = vec![DescendantExclusion {
104            excluded: IntDictScheme.id(),
105            children: ChildSelection::One(root_list_children::OFFSETS),
106        }];
107        Self {
108            schemes,
109            root_exclusions,
110            // TODO(connor): The caller should probably pass this in.
111            ctx: Arc::new(Mutex::new(LEGACY_SESSION.create_execution_ctx())),
112        }
113    }
114
115    /// Returns a mutable borrow of the execution context.
116    pub fn execution_ctx(&self) -> MutexGuard<'_, ExecutionCtx> {
117        self.ctx.lock()
118    }
119
120    /// Compresses an array using cascading adaptive compression.
121    ///
122    /// First canonicalizes and compacts the array, then applies optimal compression schemes.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if canonicalization or compression fails.
127    pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
128        let canonical = array
129            .clone()
130            .execute::<CanonicalValidity>(&mut self.execution_ctx())?
131            .0;
132
133        // Compact it, removing any wasted space before we attempt to compress it.
134        let compact = canonical.compact()?;
135
136        self.compress_canonical(compact, CompressorContext::new())
137    }
138
139    /// Compresses a child array produced by a cascading scheme.
140    ///
141    /// If the cascade budget is exhausted, the canonical array is returned as-is. Otherwise,
142    /// the child context is created by descending and recording the parent scheme + child
143    /// index, and compression proceeds normally.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if compression fails.
148    pub fn compress_child(
149        &self,
150        child: &ArrayRef,
151        parent_ctx: &CompressorContext,
152        parent_id: SchemeId,
153        child_index: usize,
154    ) -> VortexResult<ArrayRef> {
155        if parent_ctx.finished_cascading() {
156            return Ok(child.clone());
157        }
158
159        let canonical = child
160            .clone()
161            .execute::<CanonicalValidity>(&mut self.execution_ctx())?
162            .0;
163        let compact = canonical.compact()?;
164
165        let child_ctx = parent_ctx
166            .clone()
167            .descend_with_scheme(parent_id, child_index);
168        self.compress_canonical(compact, child_ctx)
169    }
170
171    /// Compresses a canonical array by dispatching to type-specific logic.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if compression of any sub-array fails.
176    fn compress_canonical(
177        &self,
178        array: Canonical,
179        ctx: CompressorContext,
180    ) -> VortexResult<ArrayRef> {
181        match array {
182            Canonical::Null(null_array) => Ok(null_array.into_array()),
183            Canonical::Bool(bool_array) => {
184                self.choose_and_compress(Canonical::Bool(bool_array), ctx)
185            }
186            Canonical::Primitive(primitive) => {
187                self.choose_and_compress(Canonical::Primitive(primitive), ctx)
188            }
189            Canonical::Decimal(decimal) => {
190                self.choose_and_compress(Canonical::Decimal(decimal), ctx)
191            }
192            Canonical::Struct(struct_array) => {
193                let fields = struct_array
194                    .iter_unmasked_fields()
195                    .map(|field| self.compress(field))
196                    .collect::<Result<Vec<_>, _>>()?;
197
198                Ok(StructArray::try_new(
199                    struct_array.names().clone(),
200                    fields,
201                    struct_array.len(),
202                    struct_array.validity()?,
203                )?
204                .into_array())
205            }
206            Canonical::List(list_view_array) => {
207                if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
208                    let list_array = list_from_list_view(list_view_array)?;
209                    self.compress_list_array(list_array, ctx)
210                } else {
211                    self.compress_list_view_array(list_view_array, ctx)
212                }
213            }
214            Canonical::FixedSizeList(fsl_array) => {
215                let compressed_elems = self.compress(fsl_array.elements())?;
216
217                Ok(FixedSizeListArray::try_new(
218                    compressed_elems,
219                    fsl_array.list_size(),
220                    fsl_array.validity()?,
221                    fsl_array.len(),
222                )?
223                .into_array())
224            }
225            Canonical::VarBinView(strings) => {
226                if strings
227                    .dtype()
228                    .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
229                {
230                    self.choose_and_compress(Canonical::VarBinView(strings), ctx)
231                } else {
232                    // We do not compress binary arrays.
233                    Ok(strings.into_array())
234                }
235            }
236            Canonical::Extension(ext_array) => {
237                let before_nbytes = ext_array.as_ref().nbytes();
238
239                // Try scheme-based compression first.
240                let result =
241                    self.choose_and_compress(Canonical::Extension(ext_array.clone()), ctx)?;
242                if result.nbytes() < before_nbytes {
243                    return Ok(result);
244                }
245
246                // Otherwise, fall back to compressing the underlying storage array.
247                let compressed_storage = self.compress(ext_array.storage_array())?;
248
249                Ok(
250                    ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
251                        .into_array(),
252                )
253            }
254            Canonical::Variant(_) => {
255                vortex_bail!("Variant arrays can not be compressed")
256            }
257        }
258    }
259
260    /// The main scheme-selection entry point for a single leaf array.
261    ///
262    /// Filters allowed schemes by [`matches`] and exclusion rules, merges their [`stats_options`]
263    /// into a single [`GenerateStatsOptions`], then delegates to [`choose_scheme`] to pick the
264    /// winner by estimated compression ratio.
265    ///
266    /// If a winner is found and its compressed output is actually smaller, that output is returned.
267    /// Otherwise, the original array is returned unchanged.
268    ///
269    /// Empty and all-null arrays are short-circuited before any scheme evaluation.
270    ///
271    /// [`matches`]: Scheme::matches
272    /// [`stats_options`]: Scheme::stats_options
273    /// [`choose_scheme`]: Self::choose_scheme
274    fn choose_and_compress(
275        &self,
276        canonical: Canonical,
277        ctx: CompressorContext,
278    ) -> VortexResult<ArrayRef> {
279        let eligible_schemes: Vec<&'static dyn Scheme> = self
280            .schemes
281            .iter()
282            .copied()
283            .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &ctx))
284            .collect();
285
286        let array: ArrayRef = canonical.into();
287
288        // If there are no schemes that we can compress into, then just return it uncompressed.
289        if eligible_schemes.is_empty() {
290            return Ok(array);
291        }
292
293        // Nothing to compress if empty or all-null.
294        if array.is_empty() {
295            return Ok(array);
296        }
297        if array.all_invalid()? {
298            return Ok(
299                ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
300            );
301        }
302
303        let before_nbytes = array.nbytes();
304
305        let merged_opts = eligible_schemes
306            .iter()
307            .fold(GenerateStatsOptions::default(), |acc, s| {
308                acc.merge(s.stats_options())
309            });
310        let ctx = ctx.with_merged_stats_options(merged_opts);
311
312        let mut data = ArrayAndStats::new(array, merged_opts);
313
314        if let Some(winner) = self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())? {
315            // TODO(connor): Add a tracing warning here if compression with the chosen scheme
316            // failed, since there was likely more we could have done while choosing schemes.
317
318            // Sampling and estimation chose a scheme, so let's compress the whole array with it.
319            let compressed = winner.compress(self, &mut data, ctx)?;
320
321            // Only choose the compressed array if it is smaller than the canonical one.
322            if compressed.nbytes() < before_nbytes {
323                // TODO(connor): Add a tracing warning here too.
324                return Ok(compressed);
325            }
326        }
327
328        // No scheme improved on the original.
329        Ok(data.into_array())
330    }
331
332    /// Calls [`expected_compression_ratio`] on each candidate and returns the scheme with the
333    /// highest ratio, or `None` if no scheme exceeds 1.0. Ties are broken by registration order
334    /// (earlier in the list wins).
335    ///
336    /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio
337    fn choose_best_scheme(
338        &self,
339        schemes: &[&'static dyn Scheme],
340        data: &mut ArrayAndStats,
341        ctx: CompressorContext,
342    ) -> VortexResult<Option<&'static dyn Scheme>> {
343        let mut best: Option<(&'static dyn Scheme, f64)> = None;
344
345        // TODO(connor): Might want to use an `im` data structure inside of `ctx` if the clones here
346        // are expensive.
347        for &scheme in schemes {
348            let estimate = scheme.expected_compression_ratio(data, ctx.clone());
349
350            match estimate {
351                CompressionEstimate::Skip => {}
352                CompressionEstimate::AlwaysUse => return Ok(Some(scheme)),
353                CompressionEstimate::Ratio(ratio) => {
354                    if is_better_ratio(ratio, &best) {
355                        best = Some((scheme, ratio));
356                    }
357                }
358                CompressionEstimate::Sample => {
359                    let sample_ratio = estimate_compression_ratio_with_sampling(
360                        scheme,
361                        self,
362                        data.array(),
363                        ctx.clone(),
364                    )?;
365
366                    if is_better_ratio(sample_ratio, &best) {
367                        best = Some((scheme, sample_ratio));
368                    }
369                }
370                // TODO(connor): Is there a way to deduplicate some of this code?
371                CompressionEstimate::Estimate(estimate_callback) => {
372                    let estimate = estimate_callback(self, data, ctx.clone())?;
373
374                    match estimate {
375                        CompressionEstimate::Skip => {}
376                        CompressionEstimate::AlwaysUse => return Ok(Some(scheme)),
377                        CompressionEstimate::Ratio(ratio) => {
378                            if is_better_ratio(ratio, &best) {
379                                best = Some((scheme, ratio));
380                            }
381                        }
382                        e @ (CompressionEstimate::Sample | CompressionEstimate::Estimate(_)) => {
383                            vortex_panic!(
384                                "an estimation function returned an invalid variant {e:?}"
385                            )
386                        }
387                    }
388                }
389            }
390
391            // tracing::debug!(scheme = %scheme.id(), estimate, "evaluated compression ratio");
392        }
393
394        Ok(best.map(|(s, _)| s))
395    }
396
397    // TODO(connor): Lots of room for optimization here.
398    /// Returns `true` if the candidate scheme should be excluded based on the cascade history and
399    /// exclusion rules.
400    fn is_excluded(&self, candidate: &dyn Scheme, ctx: &CompressorContext) -> bool {
401        let id = candidate.id();
402        let history = ctx.cascade_history();
403
404        // Self-exclusion: no scheme appears twice in any chain.
405        if history.iter().any(|&(sid, _)| sid == id) {
406            return true;
407        }
408
409        let mut iter = history.iter().copied().peekable();
410
411        // The root entry is always first in the history (if present). Check if the root has
412        // excluded us.
413        if let Some((_, child_idx)) = iter.next_if(|&(sid, _)| sid == ROOT_SCHEME_ID)
414            && self
415                .root_exclusions
416                .iter()
417                .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
418        {
419            return true;
420        }
421
422        // Push rules: Check if any of our ancestors have excluded us.
423        for (ancestor_id, child_idx) in iter {
424            if let Some(ancestor) = self.schemes.iter().find(|s| s.id() == ancestor_id)
425                && ancestor
426                    .descendant_exclusions()
427                    .iter()
428                    .any(|rule| rule.excluded == id && rule.children.contains(child_idx))
429            {
430                return true;
431            }
432        }
433
434        // Pull rules: Check if we have excluded ourselves because of our ancestors.
435        for rule in candidate.ancestor_exclusions() {
436            if history
437                .iter()
438                .any(|(sid, cidx)| *sid == rule.ancestor && rule.children.contains(*cidx))
439            {
440                return true;
441            }
442        }
443
444        false
445    }
446
447    /// Compresses a [`ListArray`] by narrowing offsets and recursively compressing elements.
448    fn compress_list_array(
449        &self,
450        list_array: ListArray,
451        ctx: CompressorContext,
452    ) -> VortexResult<ArrayRef> {
453        let list_array = list_array.reset_offsets(true)?;
454
455        let compressed_elems = self.compress(list_array.elements())?;
456
457        // Record the root scheme with the offsets child index so root exclusion rules apply.
458        let offset_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
459        let compressed_offsets = self.compress_canonical(
460            Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
461            offset_ctx,
462        )?;
463
464        Ok(
465            ListArray::try_new(compressed_elems, compressed_offsets, list_array.validity()?)?
466                .into_array(),
467        )
468    }
469
470    /// Compresses a [`ListViewArray`] by narrowing offsets/sizes and recursively compressing
471    /// elements.
472    fn compress_list_view_array(
473        &self,
474        list_view: ListViewArray,
475        ctx: CompressorContext,
476    ) -> VortexResult<ArrayRef> {
477        let compressed_elems = self.compress(list_view.elements())?;
478
479        let offset_ctx = ctx
480            .clone()
481            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
482        let compressed_offsets = self.compress_canonical(
483            Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
484            offset_ctx,
485        )?;
486
487        let sizes_ctx = ctx.descend_with_scheme(ROOT_SCHEME_ID, root_list_children::SIZES);
488        let compressed_sizes = self.compress_canonical(
489            Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
490            sizes_ctx,
491        )?;
492
493        Ok(ListViewArray::try_new(
494            compressed_elems,
495            compressed_offsets,
496            compressed_sizes,
497            list_view.validity()?,
498        )?
499        .into_array())
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use vortex_array::arrays::BoolArray;
506    use vortex_array::arrays::Constant;
507    use vortex_array::arrays::PrimitiveArray;
508    use vortex_array::validity::Validity;
509    use vortex_buffer::buffer;
510
511    use super::*;
512    use crate::builtins::FloatDictScheme;
513    use crate::builtins::IntDictScheme;
514    use crate::builtins::StringDictScheme;
515    use crate::ctx::CompressorContext;
516    use crate::scheme::SchemeExt;
517
518    fn compressor() -> CascadingCompressor {
519        CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme])
520    }
521
522    #[test]
523    fn test_self_exclusion() {
524        let c = compressor();
525        let ctx = CompressorContext::default().descend_with_scheme(IntDictScheme.id(), 0);
526
527        // IntDictScheme is in the history, so it should be excluded.
528        assert!(c.is_excluded(&IntDictScheme, &ctx));
529    }
530
531    #[test]
532    fn test_root_exclusion_list_offsets() {
533        let c = compressor();
534        let ctx = CompressorContext::default()
535            .descend_with_scheme(ROOT_SCHEME_ID, root_list_children::OFFSETS);
536
537        // IntDict should be excluded for list offsets.
538        assert!(c.is_excluded(&IntDictScheme, &ctx));
539    }
540
541    #[test]
542    fn test_push_rule_float_dict_excludes_int_dict_from_codes() {
543        let c = compressor();
544        // FloatDict cascading through codes (child 1).
545        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 1);
546
547        // IntDict should be excluded from FloatDict's codes child.
548        assert!(c.is_excluded(&IntDictScheme, &ctx));
549    }
550
551    #[test]
552    fn test_push_rule_float_dict_excludes_int_dict_from_values() {
553        let c = compressor();
554        // FloatDict cascading through values (child 0).
555        let ctx = CompressorContext::default().descend_with_scheme(FloatDictScheme.id(), 0);
556
557        // IntDict should also be excluded from FloatDict's values child (ALP propagation
558        // replacement).
559        assert!(c.is_excluded(&IntDictScheme, &ctx));
560    }
561
562    #[test]
563    fn test_no_exclusion_without_history() {
564        let c = compressor();
565        let ctx = CompressorContext::default();
566
567        // No history means no exclusions.
568        assert!(!c.is_excluded(&IntDictScheme, &ctx));
569    }
570
571    #[test]
572    fn all_null_array_compresses_to_constant() -> VortexResult<()> {
573        let array = PrimitiveArray::new(
574            buffer![0i32, 0, 0, 0, 0],
575            Validity::Array(BoolArray::from_iter([false, false, false, false, false]).into_array()),
576        )
577        .into_array();
578
579        // The compressor should produce a `ConstantArray` for an all-null array regardless of
580        // which schemes are registered.
581        let compressor = CascadingCompressor::new(vec![&IntDictScheme]);
582        let compressed = compressor.compress(&array)?;
583        assert!(compressed.is::<Constant>());
584        Ok(())
585    }
586
587    /// Regression test for <https://github.com/vortex-data/vortex/issues/7227>.
588    ///
589    /// `estimate_compression_ratio_with_sampling` must use the *scheme's* stats options
590    /// (which request distinct-value counting) rather than the context's stats options
591    /// (which may not). With the old code this panicked inside `dictionary_encode` because
592    /// distinct values were never computed for the sample.
593    #[test]
594    fn sampling_uses_scheme_stats_options() -> VortexResult<()> {
595        // Low-cardinality float array so FloatDictScheme considers it compressible.
596        let array = PrimitiveArray::new(
597            buffer![1.0f32, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 2.0],
598            Validity::NonNullable,
599        )
600        .into_array();
601
602        let compressor = CascadingCompressor::new(vec![&FloatDictScheme]);
603
604        // A context with default stats_options (count_distinct_values = false) and
605        // marked as a sample so the function skips the sampling step and compresses
606        // the array directly.
607        let ctx = CompressorContext::new().with_sampling();
608
609        // Before the fix this panicked with:
610        //   "this must be present since `DictScheme` declared that we need distinct values"
611        let ratio =
612            estimate_compression_ratio_with_sampling(&FloatDictScheme, &compressor, &array, ctx)?;
613        assert!(ratio.is_finite());
614        Ok(())
615    }
616}