vortex_compressor/estimate.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Compression ratio estimation types and sampling-based estimation.
5
6use std::fmt;
7
8use vortex_array::ArrayRef;
9use vortex_array::Canonical;
10use vortex_array::IntoArray;
11use vortex_error::VortexResult;
12
13use crate::CascadingCompressor;
14use crate::ctx::CompressorContext;
15use crate::sample::SAMPLE_SIZE;
16use crate::sample::sample;
17use crate::sample::sample_count_approx_one_percent;
18use crate::scheme::Scheme;
19use crate::stats::ArrayAndStats;
20
21/// Closure type for [`DeferredEstimate::Callback`].
22///
23/// The compressor calls this with the same arguments it would pass to sampling. The closure must
24/// resolve directly to a terminal [`EstimateVerdict`].
25#[rustfmt::skip]
26pub type EstimateFn = dyn FnOnce(
27 &CascadingCompressor,
28 &mut ArrayAndStats,
29 CompressorContext,
30 ) -> VortexResult<EstimateVerdict>
31 + Send
32 + Sync;
33
34/// The result of a [`Scheme`]'s compression ratio estimation.
35///
36/// This type is returned by [`Scheme::expected_compression_ratio`] to tell the compressor how
37/// promising this scheme is for a given array without performing any expensive work.
38///
39/// [`CompressionEstimate::Verdict`] means the scheme already knows the terminal answer.
40/// [`CompressionEstimate::Deferred`] means the compressor must do extra work before the scheme can
41/// produce a terminal answer.
42#[derive(Debug)]
43pub enum CompressionEstimate {
44 /// The scheme already knows the terminal estimation verdict.
45 Verdict(EstimateVerdict),
46
47 /// The compressor must perform deferred work to resolve the terminal estimation verdict.
48 Deferred(DeferredEstimate),
49}
50
51/// The terminal answer to a compression estimate request.
52#[derive(Debug)]
53pub enum EstimateVerdict {
54 /// Do not use this scheme for this array.
55 Skip,
56
57 /// Always use this scheme, as it is definitively the best choice.
58 ///
59 /// Some examples include constant detection, decimal byte parts, and temporal decomposition.
60 ///
61 /// The compressor will select this scheme immediately without evaluating further candidates.
62 /// Schemes that return `AlwaysUse` must be mutually exclusive per canonical type (enforced by
63 /// [`Scheme::matches`]), otherwise the winner depends silently on registration order.
64 ///
65 /// [`Scheme::matches`]: crate::scheme::Scheme::matches
66 AlwaysUse,
67
68 /// The estimated compression ratio. This must be greater than `1.0` to be considered by the
69 /// compressor, otherwise it is worse than the canonical encoding.
70 Ratio(f64),
71}
72
73/// Deferred work that can resolve to a terminal [`EstimateVerdict`].
74pub enum DeferredEstimate {
75 /// The scheme cannot cheaply estimate its ratio, so the compressor should compress a small
76 /// sample to determine effectiveness.
77 Sample,
78
79 /// A fallible estimation requiring a custom expensive computation.
80 ///
81 /// Use this only when the scheme needs to perform trial encoding or other costly checks to
82 /// determine its compression ratio. The callback returns an [`EstimateVerdict`] directly, so
83 /// it cannot request more sampling or another deferred callback.
84 Callback(Box<EstimateFn>),
85}
86
87/// Returns `true` if `ratio` is a valid compression ratio (> 1.0, finite, not subnormal) that
88/// beats the current best.
89pub(super) fn is_better_ratio(ratio: f64, best: &Option<(&'static dyn Scheme, f64)>) -> bool {
90 ratio.is_finite() && !ratio.is_subnormal() && ratio > 1.0 && best.is_none_or(|(_, r)| ratio > r)
91}
92
93/// Estimates compression ratio by compressing a ~1% sample of the data.
94///
95/// Creates a new [`ArrayAndStats`] for the sample so that stats are generated from the sample, not
96/// the full array.
97///
98/// # Errors
99///
100/// Returns an error if sample compression fails.
101pub(super) fn estimate_compression_ratio_with_sampling<S: Scheme + ?Sized>(
102 scheme: &S,
103 compressor: &CascadingCompressor,
104 array: &ArrayRef,
105 ctx: CompressorContext,
106) -> VortexResult<f64> {
107 let sample_array = if ctx.is_sample() {
108 array.clone()
109 } else {
110 let source_len = array.len();
111 let sample_count = sample_count_approx_one_percent(source_len);
112
113 tracing::trace!(
114 "Sampling {} values out of {}",
115 SAMPLE_SIZE as u64 * sample_count as u64,
116 source_len
117 );
118
119 // `ArrayAndStats` expects a canonical array (so that it can easily compute lazy stats).
120 let canonical: Canonical =
121 sample(array, SAMPLE_SIZE, sample_count).execute(&mut compressor.execution_ctx())?;
122 canonical.into_array()
123 };
124
125 let mut sample_data = ArrayAndStats::new(sample_array, scheme.stats_options());
126 let sample_ctx = ctx.with_sampling();
127
128 let after = scheme
129 .compress(compressor, &mut sample_data, sample_ctx)?
130 .nbytes();
131 let before = sample_data.array().nbytes();
132
133 // TODO(connor): Issue https://github.com/vortex-data/vortex/issues/7268.
134 // if after == 0 {
135 // tracing::warn!(
136 // scheme = %scheme.id(),
137 // "sample compressed to 0 bytes, which should only happen for constant arrays",
138 // );
139 // }
140
141 let ratio = before as f64 / after as f64;
142
143 tracing::debug!("estimate_compression_ratio_with_sampling(compressor={scheme:#?}) = {ratio}",);
144
145 Ok(ratio)
146}
147
148impl fmt::Debug for DeferredEstimate {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 match self {
151 DeferredEstimate::Sample => write!(f, "Sample"),
152 DeferredEstimate::Callback(_) => write!(f, "Callback(..)"),
153 }
154 }
155}