Skip to main content

mlt_core/frames/v01/stream/
optimizer.rs

1use num_traits::{AsPrimitive as _, PrimInt as _, WrappingSub, Zero as _};
2use zigzag::ZigZag;
3
4use crate::MltResult;
5use crate::v01::EncodedStreamData;
6use crate::v01::stream::IntEncoder;
7
8/// Minimum number of values to profile / compete on.
9///
10/// Below this threshold the full slice is used regardless of its length.
11const MIN_SAMPLE: usize = 512;
12
13/// Hard upper bound on competition sample size.
14const MAX_SAMPLE: usize = 4_096;
15
16/// RLE is only worthwhile when runs are on average at least this long.
17const RLE_MIN_AVG_RUN_LENGTH: f64 = 2.0;
18
19/// Delta encoding is useful when the absolute delta values fit in fewer bits
20/// than the original values.  Require at least this many bits of reduction
21/// before enabling Delta on an unsorted stream.
22const DELTA_BIT_SAVINGS_THRESHOLD: u8 = 4;
23
24/// Sampling-based encoder selection
25///
26/// # Strategy
27///
28/// 1. [`Self::prune_candidates`] - **"Prune"**:
29///    Compute lightweight statistics over a representative sample
30///    of the data (average run length, sort order, max bit-width) and use them to prune obviously unsuitable candidates early.
31/// 2. [`Self::compete_u32`] / [`Self::compete_u64`] - **"Compete"**:
32///    Encode the same sample with every surviving candidate and
33///    pick the one whose encoded output is smallest.
34///    In case of a tie
35///    - the physical priority order is `FastPFOR` > `VarInt` > `None` and,
36///    - at the logical level, more complex transforms are deprioritized.
37#[derive(Debug, Clone, Default)]
38pub struct DataProfile {
39    /// Number of values in the sample that was analyzed.
40    _sample_len: usize,
41
42    /// Average run length in the sample.
43    ///
44    /// A run is a maximal sequence of identical consecutive values.
45    /// `avg_run_length = sample_len / num_runs`.
46    avg_run_length: f64,
47
48    /// `true` if the sample values are sorted in ascending or descending order.
49    is_sorted: bool,
50
51    /// Maximum number of bits required to represent any value in the sample
52    /// (`T::BITS - v.leading_zeros()`).
53    max_bit_width: u8,
54
55    /// Maximum bit-width after zigzag-delta encoding.
56    ///
57    /// A value lower than `max_bit_width` signals that Delta compression will
58    /// reduce value magnitudes and therefore benefit downstream integer
59    /// encoders.
60    delta_max_bit_width: u8,
61}
62
63impl DataProfile {
64    /// Profile a `u32` sample in a single pass.
65    #[must_use]
66    #[expect(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
67    fn profile<T>(sample: &[T::UInt]) -> Self
68    where
69        T: ZigZag,
70        <T as ZigZag>::UInt: WrappingSub,
71    {
72        if sample.is_empty() {
73            return Self::default();
74        }
75
76        let mut runs: usize = 1;
77        let mut is_sorted_rising = true;
78        let mut is_sorted_falling = true;
79        let mut max_val: T::UInt = sample[0];
80        let mut max_delta: T::UInt = T::UInt::zero();
81        let mut prev = sample[0];
82
83        for &v in &sample[1..] {
84            if v != prev {
85                runs += 1;
86            }
87            if v < prev {
88                is_sorted_rising = false;
89            } else if prev < v {
90                is_sorted_falling = false;
91            }
92            let delta_bits: T::UInt = v.wrapping_sub(&prev);
93            let delta_signed: T = delta_bits.as_();
94            let zz = T::encode(delta_signed);
95            max_delta = max_delta.max(zz);
96            max_val = v.max(max_val);
97            prev = v;
98        }
99
100        Self {
101            _sample_len: sample.len(),
102            avg_run_length: sample.len() as f64 / runs as f64,
103            is_sorted: is_sorted_rising || is_sorted_falling,
104            max_bit_width: (T::zero().leading_zeros() - max_val.leading_zeros()) as u8,
105            delta_max_bit_width: (T::zero().leading_zeros() - max_delta.leading_zeros()) as u8,
106        }
107    }
108
109    /// Profile a representative sample to prune unsuitable candidates.
110    #[must_use]
111    pub fn prune_candidates<T>(values: &[T::UInt]) -> Vec<IntEncoder>
112    where
113        T: ZigZag,
114        <T as ZigZag>::UInt: WrappingSub,
115    {
116        if values.is_empty() {
117            return vec![IntEncoder::plain()];
118        }
119
120        let target = sample_size(values.len());
121        let sample = block_sample(values, target);
122
123        let profile = DataProfile::profile::<T>(sample);
124        profile.candidates(T::zero().count_zeros() == 32)
125    }
126
127    pub fn compete_u32(candidates: &[IntEncoder], data: &[u32]) -> IntEncoder {
128        candidates
129            .iter()
130            .copied()
131            .min_by_key(|&enc| encoded_size_u32(data, enc))
132            .unwrap_or_else(IntEncoder::fastpfor)
133    }
134    pub fn compete_u64(candidates: &[IntEncoder], data: &[u64]) -> IntEncoder {
135        candidates
136            .iter()
137            .copied()
138            .min_by_key(|&enc| encoded_size_u64(data, enc))
139            .unwrap_or_else(IntEncoder::varint)
140    }
141
142    /// Return the list of `Encoder` variants worth trying for `u32` data given the
143    /// supplied profile.
144    ///
145    /// `FastPFOR` is always preferred over `VarInt`;
146    /// `VarInt` is included as a fallback and for compatibility with gzip-compressed output.
147    ///
148    /// The returned vec is ordered from most- to least-complex so the competition
149    /// loop breaks ties deterministically (first match wins on equal sizes).
150    #[must_use]
151    fn candidates(&self, fastpfor_is_allowed: bool) -> Vec<IntEncoder> {
152        let mut out = Vec::with_capacity(8);
153
154        // DeltaRle – only when both transforms pay off.
155        if self.delta_is_beneficial() && self.rle_is_viable() {
156            if fastpfor_is_allowed {
157                out.push(IntEncoder::delta_rle_fastpfor());
158            }
159            out.push(IntEncoder::delta_rle_varint());
160        }
161
162        // Delta-only.
163        if self.delta_is_beneficial() {
164            if fastpfor_is_allowed {
165                out.push(IntEncoder::delta_fastpfor());
166            }
167            out.push(IntEncoder::delta_varint());
168        }
169
170        // RLE-only (no delta).
171        if self.rle_is_viable() {
172            if fastpfor_is_allowed {
173                out.push(IntEncoder::rle_fastpfor());
174            }
175            out.push(IntEncoder::rle_varint());
176        }
177
178        // Plain FastPFOR / VarInt are always candidates.
179        if fastpfor_is_allowed {
180            out.push(IntEncoder::fastpfor());
181        }
182        out.push(IntEncoder::varint());
183
184        out
185    }
186
187    /// Returns `true` if RLE is a sensible candidate based on this profile.
188    ///
189    /// An average run length above the threshold means values repeat frequently
190    /// enough that the run-length and unique-value arrays will be compact.
191    #[must_use]
192    fn rle_is_viable(&self) -> bool {
193        self.avg_run_length >= RLE_MIN_AVG_RUN_LENGTH
194    }
195
196    /// Returns `true` if Delta encoding is expected to be beneficial.
197    #[must_use]
198    fn delta_is_beneficial(&self) -> bool {
199        let bit_width_saving = self.max_bit_width.saturating_sub(self.delta_max_bit_width);
200        self.is_sorted || bit_width_saving >= DELTA_BIT_SAVINGS_THRESHOLD
201    }
202}
203
204fn block_sample<T: Clone + Copy>(values: &[T], target: usize) -> &[T] {
205    if values.len() <= target {
206        return values;
207    }
208    // Pick a starting point (could be middle or random)
209    // and take a contiguous chunk to preserve RLE/Delta patterns.
210    let start = (values.len() / 2).saturating_sub(target / 2);
211    &values[start..start + target]
212}
213
214/// Compute the target sample size from the full stream length.
215///
216/// - Streams shorter than `MIN_SAMPLE` are sampled fully.
217/// - Larger streams are sampled at ~1 % of their length, clamped to
218///   `[MIN_SAMPLE, MAX_SAMPLE]`.
219#[inline]
220fn sample_size(len: usize) -> usize {
221    if len <= MIN_SAMPLE {
222        len
223    } else {
224        (len / 100).clamp(MIN_SAMPLE, MAX_SAMPLE)
225    }
226}
227
228/// Encode `values` with `encoder` and return the number of bytes in the
229/// physical payload (excluding stream metadata).
230///
231/// Returns `usize::MAX` on error so that a broken candidate is always ranked
232/// last.
233fn encoded_size_u32(values: &[u32], encoder: IntEncoder) -> usize {
234    let result: MltResult<_> = (|| {
235        let (physical_u32s, _logical_enc) = encoder.logical.encode_u32s(values)?;
236        let (data, _physical_enc) = encoder.physical.encode_u32s(physical_u32s)?;
237        Ok(data_byte_len(data))
238    })();
239    result.unwrap_or(usize::MAX)
240}
241
242fn encoded_size_u64(values: &[u64], encoder: IntEncoder) -> usize {
243    let result: MltResult<_> = (|| {
244        let (physical_u64s, _logical_enc) = encoder.logical.encode_u64s(values)?;
245        let (data, _physical_enc) = encoder.physical.encode_u64s(physical_u64s)?;
246        Ok(data_byte_len(data))
247    })();
248    result.unwrap_or(usize::MAX)
249}
250
251/// Return the byte length stored inside an `EncodedStreamData`.
252fn data_byte_len(data: EncodedStreamData) -> usize {
253    match data {
254        EncodedStreamData::VarInt(v) | EncodedStreamData::Encoded(v) => v.len(),
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::v01::PhysicalEncoder;
262
263    #[test]
264    fn candidates_rle_excluded_when_short_runs() {
265        // All-distinct stream → avg_run_length == 1 → no RLE candidate.
266        let data: Vec<u32> = (0..100).collect();
267        let candidates = DataProfile::prune_candidates::<i32>(&data);
268        insta::assert_debug_snapshot!(candidates, @"
269        [
270            IntEncoder {
271                logical: Delta,
272                physical: FastPFOR,
273            },
274            IntEncoder {
275                logical: Delta,
276                physical: VarInt,
277            },
278            IntEncoder {
279                logical: None,
280                physical: FastPFOR,
281            },
282            IntEncoder {
283                logical: None,
284                physical: VarInt,
285            },
286        ]
287        ");
288    }
289
290    #[test]
291    fn candidates_u64_never_includes_fastpfor() {
292        // FastPFOR is a 32-bit-only codec and must never appear for u64 streams.
293        let data: Vec<u64> = (0..200).collect();
294        let candidates = DataProfile::prune_candidates::<i64>(&data);
295        for enc in &candidates {
296            assert_ne!(
297                enc.physical,
298                PhysicalEncoder::FastPFOR,
299                "FastPFOR invalid for u64"
300            );
301        }
302
303        insta::assert_debug_snapshot!(candidates, @"
304        [
305            IntEncoder {
306                logical: Delta,
307                physical: VarInt,
308            },
309            IntEncoder {
310                logical: None,
311                physical: VarInt,
312            },
313        ]
314        ");
315        let enc = DataProfile::compete_u64(&candidates, &data);
316        assert_eq!(enc, IntEncoder::delta_varint());
317    }
318
319    #[test]
320    fn select_u32_sequential_picks_delta() {
321        let data: Vec<u32> = (0..1_000).collect();
322        let enc = DataProfile::prune_candidates::<i32>(&data);
323        insta::assert_debug_snapshot!(enc, @"
324        [
325            IntEncoder {
326                logical: Delta,
327                physical: FastPFOR,
328            },
329            IntEncoder {
330                logical: Delta,
331                physical: VarInt,
332            },
333            IntEncoder {
334                logical: None,
335                physical: FastPFOR,
336            },
337            IntEncoder {
338                logical: None,
339                physical: VarInt,
340            },
341        ]
342        ");
343        let enc = DataProfile::compete_u32(&enc, &data);
344        assert_eq!(enc, IntEncoder::delta_fastpfor());
345    }
346
347    #[test]
348    fn select_u32_constant_picks_rle() {
349        let data = vec![1234u32; 500];
350        let enc = DataProfile::prune_candidates::<i32>(&data);
351        insta::assert_debug_snapshot!(enc, @"
352        [
353            IntEncoder {
354                logical: DeltaRle,
355                physical: FastPFOR,
356            },
357            IntEncoder {
358                logical: DeltaRle,
359                physical: VarInt,
360            },
361            IntEncoder {
362                logical: Delta,
363                physical: FastPFOR,
364            },
365            IntEncoder {
366                logical: Delta,
367                physical: VarInt,
368            },
369            IntEncoder {
370                logical: Rle,
371                physical: FastPFOR,
372            },
373            IntEncoder {
374                logical: Rle,
375                physical: VarInt,
376            },
377            IntEncoder {
378                logical: None,
379                physical: FastPFOR,
380            },
381            IntEncoder {
382                logical: None,
383                physical: VarInt,
384            },
385        ]
386        ");
387        let enc = DataProfile::compete_u32(&enc, &data);
388        assert_eq!(enc, IntEncoder::rle_varint());
389    }
390
391    #[test]
392    fn select_u64_sequential_picks_delta() {
393        let data: Vec<u64> = (0u64..500).collect();
394        let enc = DataProfile::prune_candidates::<i64>(&data);
395        insta::assert_debug_snapshot!(enc, @"
396        [
397            IntEncoder {
398                logical: Delta,
399                physical: VarInt,
400            },
401            IntEncoder {
402                logical: None,
403                physical: VarInt,
404            },
405        ]
406        ");
407        let enc = DataProfile::compete_u64(&enc, &data);
408        assert_eq!(enc, IntEncoder::delta_varint());
409    }
410
411    #[test]
412    fn select_u32_empty_fallback() {
413        let enc = DataProfile::prune_candidates::<i32>(&[]);
414        assert_eq!(enc, vec![IntEncoder::plain()]);
415        let enc = DataProfile::compete_u64(&enc, &[]);
416        assert_eq!(enc, IntEncoder::plain());
417    }
418
419    #[test]
420    fn select_u64_empty_fallback() {
421        let enc = DataProfile::prune_candidates::<i64>(&[]);
422        assert_eq!(enc, vec![IntEncoder::plain()]);
423        let enc = DataProfile::compete_u32(&enc, &[]);
424        assert_eq!(enc, IntEncoder::plain());
425    }
426}