mlt_core/frames/v01/stream/
optimizer.rs1use num_traits::{AsPrimitive as _, PrimInt as _, WrappingSub, Zero as _};
2use zigzag::ZigZag;
3
4use crate::MltResult;
5use crate::v01::EncodedStreamData;
6use crate::v01::stream::IntEncoder;
7
8const MIN_SAMPLE: usize = 512;
12
13const MAX_SAMPLE: usize = 4_096;
15
16const RLE_MIN_AVG_RUN_LENGTH: f64 = 2.0;
18
19const DELTA_BIT_SAVINGS_THRESHOLD: u8 = 4;
23
24#[derive(Debug, Clone, Default)]
38pub struct DataProfile {
39 _sample_len: usize,
41
42 avg_run_length: f64,
47
48 is_sorted: bool,
50
51 max_bit_width: u8,
54
55 delta_max_bit_width: u8,
61}
62
63impl DataProfile {
64 #[must_use]
66 #[expect(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
67 fn profile<T>(sample: &[T::UInt]) -> Self
68 where
69 T: ZigZag,
70 <T as ZigZag>::UInt: WrappingSub,
71 {
72 if sample.is_empty() {
73 return Self::default();
74 }
75
76 let mut runs: usize = 1;
77 let mut is_sorted_rising = true;
78 let mut is_sorted_falling = true;
79 let mut max_val: T::UInt = sample[0];
80 let mut max_delta: T::UInt = T::UInt::zero();
81 let mut prev = sample[0];
82
83 for &v in &sample[1..] {
84 if v != prev {
85 runs += 1;
86 }
87 if v < prev {
88 is_sorted_rising = false;
89 } else if prev < v {
90 is_sorted_falling = false;
91 }
92 let delta_bits: T::UInt = v.wrapping_sub(&prev);
93 let delta_signed: T = delta_bits.as_();
94 let zz = T::encode(delta_signed);
95 max_delta = max_delta.max(zz);
96 max_val = v.max(max_val);
97 prev = v;
98 }
99
100 Self {
101 _sample_len: sample.len(),
102 avg_run_length: sample.len() as f64 / runs as f64,
103 is_sorted: is_sorted_rising || is_sorted_falling,
104 max_bit_width: (T::zero().leading_zeros() - max_val.leading_zeros()) as u8,
105 delta_max_bit_width: (T::zero().leading_zeros() - max_delta.leading_zeros()) as u8,
106 }
107 }
108
109 #[must_use]
111 pub fn prune_candidates<T>(values: &[T::UInt]) -> Vec<IntEncoder>
112 where
113 T: ZigZag,
114 <T as ZigZag>::UInt: WrappingSub,
115 {
116 if values.is_empty() {
117 return vec![IntEncoder::plain()];
118 }
119
120 let target = sample_size(values.len());
121 let sample = block_sample(values, target);
122
123 let profile = DataProfile::profile::<T>(sample);
124 profile.candidates(T::zero().count_zeros() == 32)
125 }
126
127 pub fn compete_u32(candidates: &[IntEncoder], data: &[u32]) -> IntEncoder {
128 candidates
129 .iter()
130 .copied()
131 .min_by_key(|&enc| encoded_size_u32(data, enc))
132 .unwrap_or_else(IntEncoder::fastpfor)
133 }
134 pub fn compete_u64(candidates: &[IntEncoder], data: &[u64]) -> IntEncoder {
135 candidates
136 .iter()
137 .copied()
138 .min_by_key(|&enc| encoded_size_u64(data, enc))
139 .unwrap_or_else(IntEncoder::varint)
140 }
141
142 #[must_use]
151 fn candidates(&self, fastpfor_is_allowed: bool) -> Vec<IntEncoder> {
152 let mut out = Vec::with_capacity(8);
153
154 if self.delta_is_beneficial() && self.rle_is_viable() {
156 if fastpfor_is_allowed {
157 out.push(IntEncoder::delta_rle_fastpfor());
158 }
159 out.push(IntEncoder::delta_rle_varint());
160 }
161
162 if self.delta_is_beneficial() {
164 if fastpfor_is_allowed {
165 out.push(IntEncoder::delta_fastpfor());
166 }
167 out.push(IntEncoder::delta_varint());
168 }
169
170 if self.rle_is_viable() {
172 if fastpfor_is_allowed {
173 out.push(IntEncoder::rle_fastpfor());
174 }
175 out.push(IntEncoder::rle_varint());
176 }
177
178 if fastpfor_is_allowed {
180 out.push(IntEncoder::fastpfor());
181 }
182 out.push(IntEncoder::varint());
183
184 out
185 }
186
187 #[must_use]
192 fn rle_is_viable(&self) -> bool {
193 self.avg_run_length >= RLE_MIN_AVG_RUN_LENGTH
194 }
195
196 #[must_use]
198 fn delta_is_beneficial(&self) -> bool {
199 let bit_width_saving = self.max_bit_width.saturating_sub(self.delta_max_bit_width);
200 self.is_sorted || bit_width_saving >= DELTA_BIT_SAVINGS_THRESHOLD
201 }
202}
203
204fn block_sample<T: Clone + Copy>(values: &[T], target: usize) -> &[T] {
205 if values.len() <= target {
206 return values;
207 }
208 let start = (values.len() / 2).saturating_sub(target / 2);
211 &values[start..start + target]
212}
213
214#[inline]
220fn sample_size(len: usize) -> usize {
221 if len <= MIN_SAMPLE {
222 len
223 } else {
224 (len / 100).clamp(MIN_SAMPLE, MAX_SAMPLE)
225 }
226}
227
228fn encoded_size_u32(values: &[u32], encoder: IntEncoder) -> usize {
234 let result: MltResult<_> = (|| {
235 let (physical_u32s, _logical_enc) = encoder.logical.encode_u32s(values)?;
236 let (data, _physical_enc) = encoder.physical.encode_u32s(physical_u32s)?;
237 Ok(data_byte_len(data))
238 })();
239 result.unwrap_or(usize::MAX)
240}
241
242fn encoded_size_u64(values: &[u64], encoder: IntEncoder) -> usize {
243 let result: MltResult<_> = (|| {
244 let (physical_u64s, _logical_enc) = encoder.logical.encode_u64s(values)?;
245 let (data, _physical_enc) = encoder.physical.encode_u64s(physical_u64s)?;
246 Ok(data_byte_len(data))
247 })();
248 result.unwrap_or(usize::MAX)
249}
250
251fn data_byte_len(data: EncodedStreamData) -> usize {
253 match data {
254 EncodedStreamData::VarInt(v) | EncodedStreamData::Encoded(v) => v.len(),
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::v01::PhysicalEncoder;
262
263 #[test]
264 fn candidates_rle_excluded_when_short_runs() {
265 let data: Vec<u32> = (0..100).collect();
267 let candidates = DataProfile::prune_candidates::<i32>(&data);
268 insta::assert_debug_snapshot!(candidates, @"
269 [
270 IntEncoder {
271 logical: Delta,
272 physical: FastPFOR,
273 },
274 IntEncoder {
275 logical: Delta,
276 physical: VarInt,
277 },
278 IntEncoder {
279 logical: None,
280 physical: FastPFOR,
281 },
282 IntEncoder {
283 logical: None,
284 physical: VarInt,
285 },
286 ]
287 ");
288 }
289
290 #[test]
291 fn candidates_u64_never_includes_fastpfor() {
292 let data: Vec<u64> = (0..200).collect();
294 let candidates = DataProfile::prune_candidates::<i64>(&data);
295 for enc in &candidates {
296 assert_ne!(
297 enc.physical,
298 PhysicalEncoder::FastPFOR,
299 "FastPFOR invalid for u64"
300 );
301 }
302
303 insta::assert_debug_snapshot!(candidates, @"
304 [
305 IntEncoder {
306 logical: Delta,
307 physical: VarInt,
308 },
309 IntEncoder {
310 logical: None,
311 physical: VarInt,
312 },
313 ]
314 ");
315 let enc = DataProfile::compete_u64(&candidates, &data);
316 assert_eq!(enc, IntEncoder::delta_varint());
317 }
318
319 #[test]
320 fn select_u32_sequential_picks_delta() {
321 let data: Vec<u32> = (0..1_000).collect();
322 let enc = DataProfile::prune_candidates::<i32>(&data);
323 insta::assert_debug_snapshot!(enc, @"
324 [
325 IntEncoder {
326 logical: Delta,
327 physical: FastPFOR,
328 },
329 IntEncoder {
330 logical: Delta,
331 physical: VarInt,
332 },
333 IntEncoder {
334 logical: None,
335 physical: FastPFOR,
336 },
337 IntEncoder {
338 logical: None,
339 physical: VarInt,
340 },
341 ]
342 ");
343 let enc = DataProfile::compete_u32(&enc, &data);
344 assert_eq!(enc, IntEncoder::delta_fastpfor());
345 }
346
347 #[test]
348 fn select_u32_constant_picks_rle() {
349 let data = vec![1234u32; 500];
350 let enc = DataProfile::prune_candidates::<i32>(&data);
351 insta::assert_debug_snapshot!(enc, @"
352 [
353 IntEncoder {
354 logical: DeltaRle,
355 physical: FastPFOR,
356 },
357 IntEncoder {
358 logical: DeltaRle,
359 physical: VarInt,
360 },
361 IntEncoder {
362 logical: Delta,
363 physical: FastPFOR,
364 },
365 IntEncoder {
366 logical: Delta,
367 physical: VarInt,
368 },
369 IntEncoder {
370 logical: Rle,
371 physical: FastPFOR,
372 },
373 IntEncoder {
374 logical: Rle,
375 physical: VarInt,
376 },
377 IntEncoder {
378 logical: None,
379 physical: FastPFOR,
380 },
381 IntEncoder {
382 logical: None,
383 physical: VarInt,
384 },
385 ]
386 ");
387 let enc = DataProfile::compete_u32(&enc, &data);
388 assert_eq!(enc, IntEncoder::rle_varint());
389 }
390
391 #[test]
392 fn select_u64_sequential_picks_delta() {
393 let data: Vec<u64> = (0u64..500).collect();
394 let enc = DataProfile::prune_candidates::<i64>(&data);
395 insta::assert_debug_snapshot!(enc, @"
396 [
397 IntEncoder {
398 logical: Delta,
399 physical: VarInt,
400 },
401 IntEncoder {
402 logical: None,
403 physical: VarInt,
404 },
405 ]
406 ");
407 let enc = DataProfile::compete_u64(&enc, &data);
408 assert_eq!(enc, IntEncoder::delta_varint());
409 }
410
411 #[test]
412 fn select_u32_empty_fallback() {
413 let enc = DataProfile::prune_candidates::<i32>(&[]);
414 assert_eq!(enc, vec![IntEncoder::plain()]);
415 let enc = DataProfile::compete_u64(&enc, &[]);
416 assert_eq!(enc, IntEncoder::plain());
417 }
418
419 #[test]
420 fn select_u64_empty_fallback() {
421 let enc = DataProfile::prune_candidates::<i64>(&[]);
422 assert_eq!(enc, vec![IntEncoder::plain()]);
423 let enc = DataProfile::compete_u32(&enc, &[]);
424 assert_eq!(enc, IntEncoder::plain());
425 }
426}