Skip to main content

vortex_compressor/
ctx.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Compression context for recursive compression.
5
6use std::fmt;
7
8use vortex_error::VortexExpect;
9
10use crate::compressor::ROOT_SCHEME_ID;
11use crate::scheme::SchemeId;
12use crate::stats::GenerateStatsOptions;
13
14// TODO(connor): Why is this 3??? This doesn't seem smart or adaptive.
15/// Maximum cascade depth for compression.
16pub const MAX_CASCADE: usize = 3;
17
18/// Context passed through recursive compression calls.
19///
20/// Tracks the cascade history (which schemes and child indices have been applied in the current
21/// chain) so the compressor can enforce exclusion rules and prevent cycles.
22#[derive(Debug, Clone)]
23pub struct CompressorContext {
24    /// Whether we're compressing a sample (for ratio estimation).
25    is_sample: bool,
26
27    /// Remaining cascade depth allowed.
28    allowed_cascading: usize,
29
30    /// Merged stats options from all eligible schemes at this compression site.
31    merged_stats_options: GenerateStatsOptions,
32
33    // TODO(connor): Replace this with an `im::Vector`
34    /// The cascade chain: `(scheme_id, child_index)` pairs from root to current depth.
35    /// Used for self-exclusion, push rules ([`descendant_exclusions`]), and pull rules
36    /// ([`ancestor_exclusions`]).
37    ///
38    /// [`descendant_exclusions`]: crate::scheme::Scheme::descendant_exclusions
39    /// [`ancestor_exclusions`]: crate::scheme::Scheme::ancestor_exclusions
40    cascade_history: Vec<(SchemeId, usize)>,
41}
42
43impl CompressorContext {
44    /// Creates a new `CompressorContext`.
45    ///
46    /// This should **only** be created by the compressor.
47    pub(super) fn new() -> Self {
48        Self {
49            is_sample: false,
50            allowed_cascading: MAX_CASCADE,
51            merged_stats_options: GenerateStatsOptions::default(),
52            cascade_history: Vec::new(),
53        }
54    }
55}
56
57#[cfg(test)]
58impl Default for CompressorContext {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl CompressorContext {
65    /// Whether this context is for sample compression (ratio estimation).
66    pub fn is_sample(&self) -> bool {
67        self.is_sample
68    }
69
70    /// Returns the merged stats generation options for this compression site.
71    pub fn merged_stats_options(&self) -> GenerateStatsOptions {
72        self.merged_stats_options
73    }
74
75    /// Returns the cascade chain of `(scheme_id, child_index)` pairs.
76    pub fn cascade_history(&self) -> &[(SchemeId, usize)] {
77        &self.cascade_history
78    }
79
80    /// Returns a display wrapper for the current cascade ancestry.
81    pub(crate) fn cascade_path(&self) -> impl fmt::Display + '_ {
82        CascadePath(&self.cascade_history)
83    }
84
85    /// Returns the current cascade ancestry depth.
86    pub(crate) fn cascade_depth(&self) -> usize {
87        self.cascade_history.len()
88    }
89
90    /// Whether cascading is exhausted (no further cascade levels allowed).
91    ///
92    /// This should only be used in the implementation of a [`Scheme`](crate::scheme::Scheme) if the
93    /// scheme knows that it's child _must_ be compressed for it to make any sense being chosen.
94    pub fn finished_cascading(&self) -> bool {
95        self.allowed_cascading == 0
96    }
97
98    /// Returns a context that disallows further cascading.
99    pub fn as_leaf(mut self) -> Self {
100        self.allowed_cascading = 0;
101        self
102    }
103
104    /// Returns a context with the given stats options.
105    pub(super) fn with_merged_stats_options(mut self, opts: GenerateStatsOptions) -> Self {
106        self.merged_stats_options = opts;
107        self
108    }
109
110    /// Returns a context marked as sample compression.
111    pub(super) fn with_sampling(mut self) -> Self {
112        self.is_sample = true;
113        self
114    }
115
116    /// Descends one level in the cascade, recording the current scheme and which child is
117    /// being compressed.
118    ///
119    /// The `child_index` identifies which child of the scheme is being compressed (e.g. for
120    /// Dict: values=0, codes=1).
121    pub(super) fn descend_with_scheme(mut self, id: SchemeId, child_index: usize) -> Self {
122        self.allowed_cascading = self
123            .allowed_cascading
124            .checked_sub(1)
125            .vortex_expect("cannot descend: cascade depth exhausted");
126        self.cascade_history.push((id, child_index));
127        self
128    }
129}
130
131/// Display wrapper for a cascade ancestry path.
132struct CascadePath<'a>(&'a [(SchemeId, usize)]);
133
134impl fmt::Display for CascadePath<'_> {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        if self.0.is_empty() {
137            return f.write_str("root");
138        }
139
140        for (index, (scheme_id, child_index)) in self.0.iter().enumerate() {
141            if index > 0 {
142                f.write_str(" > ")?;
143            }
144
145            if *scheme_id == ROOT_SCHEME_ID {
146                write!(f, "root[{child_index}]")?;
147            } else {
148                write!(f, "{scheme_id}[{child_index}]")?;
149            }
150        }
151
152        Ok(())
153    }
154}