vortex_compressor/estimate.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Compression ratio estimation types and sampling-based estimation.
5
6use std::fmt;
7
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_error::VortexResult;
13
14use crate::CascadingCompressor;
15use crate::ctx::CompressorContext;
16use crate::sample::SAMPLE_SIZE;
17use crate::sample::sample;
18use crate::sample::sample_count_approx_one_percent;
19use crate::scheme::Scheme;
20use crate::scheme::SchemeExt;
21use crate::stats::ArrayAndStats;
22use crate::trace;
23
24/// Closure type for [`DeferredEstimate::Callback`].
25///
26/// The compressor calls this with the same arguments it would pass to sampling, plus the best
27/// [`EstimateScore`] observed so far (if any). The closure must resolve directly to a terminal
28/// [`EstimateVerdict`].
29///
30/// The `best_so_far` threshold is an early-exit hint. If your scheme's maximum achievable
31/// compression ratio is not strictly greater than
32/// `best_so_far.and_then(EstimateScore::finite_ratio)`, you should return
33/// [`EstimateVerdict::Skip`]. Returning a ratio equal to the threshold is permitted but will
34/// lose to the prior best due to strict `>` tie-breaking in the selector. Use the threshold
35/// only as an early-exit hint, never to perform additional work.
36#[rustfmt::skip]
37pub type EstimateFn = dyn FnOnce(
38 &CascadingCompressor,
39 &ArrayAndStats,
40 Option<EstimateScore>,
41 CompressorContext,
42 &mut ExecutionCtx,
43 ) -> VortexResult<EstimateVerdict>
44 + Send
45 + Sync;
46
47/// The result of a [`Scheme`]'s compression ratio estimation.
48///
49/// This type is returned by [`Scheme::expected_compression_ratio`] to tell the compressor how
50/// promising this scheme is for a given array without performing any expensive work.
51///
52/// [`CompressionEstimate::Verdict`] means the scheme already knows the terminal answer.
53/// [`CompressionEstimate::Deferred`] means the compressor must do extra work before the scheme can
54/// produce a terminal answer.
55#[derive(Debug)]
56pub enum CompressionEstimate {
57 /// The scheme already knows the terminal estimation verdict.
58 Verdict(EstimateVerdict),
59
60 /// The compressor must perform deferred work to resolve the terminal estimation verdict.
61 Deferred(DeferredEstimate),
62}
63
64/// The terminal answer to a compression estimate request.
65#[derive(Debug)]
66pub enum EstimateVerdict {
67 /// Do not use this scheme for this array.
68 Skip,
69
70 /// Always use this scheme, as it is definitively the best choice.
71 ///
72 /// Some examples include constant detection, decimal byte parts, and temporal decomposition.
73 ///
74 /// The compressor will select this scheme immediately without evaluating further candidates.
75 /// Schemes that return `AlwaysUse` must be mutually exclusive per canonical type (enforced by
76 /// [`Scheme::matches`]), otherwise the winner depends silently on registration order.
77 ///
78 /// [`Scheme::matches`]: crate::scheme::Scheme::matches
79 AlwaysUse,
80
81 /// The estimated compression ratio. This must be greater than `1.0` to be considered by the
82 /// compressor, otherwise it is worse than the canonical encoding.
83 Ratio(f64),
84}
85
86/// Deferred work that can resolve to a terminal [`EstimateVerdict`].
87pub enum DeferredEstimate {
88 /// The scheme cannot cheaply estimate its ratio, so the compressor should compress a small
89 /// sample to determine effectiveness.
90 Sample,
91
92 /// A fallible estimation requiring a custom expensive computation.
93 ///
94 /// Use this only when the scheme needs to perform trial encoding or other costly checks to
95 /// determine its compression ratio. The callback returns an [`EstimateVerdict`] directly, so
96 /// it cannot request more sampling or another deferred callback.
97 ///
98 /// The compressor evaluates all immediate [`CompressionEstimate::Verdict`] results before
99 /// invoking any deferred callback, and passes the best [`EstimateScore`] observed so far to
100 /// the callback. This lets the callback return [`EstimateVerdict::Skip`] without performing
101 /// expensive work when its maximum achievable ratio cannot beat the current best. See
102 /// [`EstimateFn`] for the full contract.
103 Callback(Box<EstimateFn>),
104}
105
106/// Ranked estimate used for comparing non-terminal compression candidates.
107#[derive(Debug, Clone, Copy, PartialEq)]
108pub enum EstimateScore {
109 /// A finite compression ratio. Higher means a smaller amount of data, so it is better.
110 FiniteCompression(f64),
111 /// Trial compression produced a 0-byte output.
112 ///
113 /// This has no finite ratio and is not eligible for scheme selection.
114 ///
115 /// TODO(connor): A zero-byte sample usually means the sampler happened to hit an all-null
116 /// sample. Improve this logic so we can distinguish real zero-byte wins from sampling artifacts.
117 ZeroBytes,
118}
119
120impl EstimateScore {
121 /// Converts measured sample sizes into a ranked estimate.
122 pub(super) fn from_sample_sizes(before_nbytes: u64, after_nbytes: u64) -> Self {
123 if after_nbytes == 0 {
124 Self::ZeroBytes
125 } else {
126 Self::FiniteCompression(before_nbytes as f64 / after_nbytes as f64)
127 }
128 }
129
130 /// Returns the finite compression ratio, or [`None`] for the zero-byte special case.
131 ///
132 /// Callers comparing a scheme's maximum achievable ratio against a "best so far" threshold
133 /// should use this to extract a numeric value from an [`EstimateScore`].
134 pub fn finite_ratio(self) -> Option<f64> {
135 match self {
136 Self::FiniteCompression(ratio) => Some(ratio),
137 Self::ZeroBytes => None,
138 }
139 }
140
141 /// Returns whether this estimate is eligible to compete.
142 fn is_valid(self) -> bool {
143 match self {
144 Self::FiniteCompression(ratio) => {
145 ratio.is_finite() && !ratio.is_subnormal() && ratio > 1.0
146 }
147 Self::ZeroBytes => false,
148 }
149 }
150
151 /// Returns whether this estimate beats another valid estimate.
152 fn beats(self, other: Self) -> bool {
153 match (self, other) {
154 (Self::ZeroBytes, _) => false,
155 (Self::FiniteCompression(_), Self::ZeroBytes) => true,
156 (Self::FiniteCompression(ratio), Self::FiniteCompression(best_ratio)) => {
157 ratio > best_ratio
158 }
159 }
160 }
161}
162
163/// Winner estimate carried from scheme selection into result tracing.
164#[derive(Debug, Clone, Copy, PartialEq)]
165pub(super) enum WinnerEstimate {
166 /// The scheme must be used immediately.
167 AlwaysUse,
168 /// The scheme won by a ranked estimate.
169 Score(EstimateScore),
170}
171
172impl WinnerEstimate {
173 /// Returns the traceable numeric ratio for the winning estimate.
174 pub(super) fn trace_ratio(self) -> Option<f64> {
175 match self {
176 Self::AlwaysUse => None,
177 Self::Score(score) => score.finite_ratio(),
178 }
179 }
180}
181
182/// Returns `true` if `score` beats the current best estimate.
183pub(super) fn is_better_score(
184 score: EstimateScore,
185 best: Option<&(&'static dyn Scheme, EstimateScore)>,
186) -> bool {
187 score.is_valid() && best.is_none_or(|(_, best_score)| score.beats(*best_score))
188}
189
190/// Estimates compression ratio by compressing a ~1% sample of the data.
191///
192/// Creates a new [`ArrayAndStats`] for the sample so that stats are generated from the sample, not
193/// the full array.
194///
195/// # Errors
196///
197/// Returns an error if sample compression fails.
198pub(super) fn estimate_compression_ratio_with_sampling<S: Scheme + ?Sized>(
199 compressor: &CascadingCompressor,
200 scheme: &S,
201 array: &ArrayRef,
202 compress_ctx: CompressorContext,
203 exec_ctx: &mut ExecutionCtx,
204) -> VortexResult<EstimateScore> {
205 let sample_array = if compress_ctx.is_sample() {
206 array.clone()
207 } else {
208 let sample_count = sample_count_approx_one_percent(array.len());
209 // `ArrayAndStats` expects a canonical array (so that it can easily compute lazy stats).
210 let canonical: Canonical = sample(array, SAMPLE_SIZE, sample_count).execute(exec_ctx)?;
211 canonical.into_array()
212 };
213
214 let sample_data = ArrayAndStats::new(sample_array, scheme.stats_options());
215 let error_ctx = trace::enabled_error_context(&compress_ctx);
216 let sample_ctx = compress_ctx.with_sampling();
217
218 let compressed = match scheme.compress(compressor, &sample_data, sample_ctx, exec_ctx) {
219 Ok(compressed) => compressed,
220 Err(err) => {
221 trace::sample_compress_failed(scheme.id(), error_ctx.as_ref(), &err);
222 return Err(err);
223 }
224 };
225
226 let after = compressed.nbytes();
227 let before = sample_data.array().nbytes();
228
229 let score = EstimateScore::from_sample_sizes(before, after);
230
231 if matches!(score, EstimateScore::ZeroBytes) {
232 trace::zero_byte_sample_result(scheme.id(), before);
233 }
234
235 Ok(score)
236}
237
238impl fmt::Debug for DeferredEstimate {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 match self {
241 DeferredEstimate::Sample => write!(f, "Sample"),
242 DeferredEstimate::Callback(_) => write!(f, "Callback(..)"),
243 }
244 }
245}