1use crate::CodecKind;
8
9#[async_trait::async_trait]
21pub trait CodecDispatcher: Send + Sync {
22 async fn pick(&self, sample: &[u8]) -> CodecKind;
23
24 async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
29 self.pick(sample).await
30 }
31}
32
33#[derive(Debug, Clone, Copy)]
35pub struct AlwaysDispatcher(pub CodecKind);
36
37#[async_trait::async_trait]
38impl CodecDispatcher for AlwaysDispatcher {
39 async fn pick(&self, _sample: &[u8]) -> CodecKind {
40 self.0
41 }
42}
43
44#[derive(Debug, Clone)]
71pub struct SamplingDispatcher {
72 pub default: CodecKind,
73 pub entropy_threshold: f64,
74 pub prefer_gpu: bool,
76 pub gpu_min_bytes: usize,
82 pub prefer_columnar_gpu: bool,
91}
92
93impl SamplingDispatcher {
94 pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
95 pub const MIN_SAMPLE_BYTES: usize = 128;
96 pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
100
101 pub fn new(default: CodecKind) -> Self {
102 Self {
103 default,
104 entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
105 prefer_gpu: false,
106 gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
107 prefer_columnar_gpu: false,
108 }
109 }
110
111 #[must_use]
122 pub fn with_columnar_gpu_preference(mut self, on: bool) -> Self {
123 self.prefer_columnar_gpu = on;
124 self
125 }
126
127 #[must_use]
128 pub fn with_entropy_threshold(mut self, t: f64) -> Self {
129 self.entropy_threshold = t;
130 self
131 }
132
133 #[must_use]
139 pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
140 self.prefer_gpu = prefer_gpu;
141 self.gpu_min_bytes = gpu_min_bytes;
142 self
143 }
144}
145
146fn shannon_entropy(sample: &[u8]) -> f64 {
148 if sample.is_empty() {
149 return 0.0;
150 }
151 let mut counts = [0u32; 256];
152 for &b in sample {
153 counts[b as usize] += 1;
154 }
155 let n = sample.len() as f64;
156 let mut entropy = 0.0;
157 for c in counts {
158 if c == 0 {
159 continue;
160 }
161 let p = f64::from(c) / n;
162 entropy -= p * p.log2();
163 }
164 entropy
165}
166
167const COLUMNAR_MIN_SAMPLE: usize = 512;
172const COLUMNAR_ENTROPY_GAP: f64 = 4.0;
180fn entropy_at_stride_position(sample: &[u8], stride: usize, pos: usize) -> f64 {
184 debug_assert!(pos < stride);
185 debug_assert!(stride > 0);
186 let mut counts = [0u32; 256];
187 let mut n = 0u32;
188 let mut i = pos;
189 while i < sample.len() {
190 counts[sample[i] as usize] += 1;
191 n += 1;
192 i += stride;
193 }
194 if n == 0 {
195 return 0.0;
196 }
197 let nf = f64::from(n);
198 let mut e = 0.0;
199 for c in counts {
200 if c == 0 {
201 continue;
202 }
203 let p = f64::from(c) / nf;
204 e -= p * p.log2();
205 }
206 e
207}
208
209fn looks_columnar_integer(sample: &[u8]) -> bool {
218 if sample.len() < COLUMNAR_MIN_SAMPLE {
219 return false;
220 }
221 for &stride in &[4usize, 8usize] {
222 if sample.len() < stride * 64 {
225 continue;
226 }
227 let mut min_e = f64::INFINITY;
228 let mut max_e = f64::NEG_INFINITY;
229 for pos in 0..stride {
230 let e = entropy_at_stride_position(sample, stride, pos);
231 if e < min_e {
232 min_e = e;
233 }
234 if e > max_e {
235 max_e = e;
236 }
237 }
238 if max_e - min_e >= COLUMNAR_ENTROPY_GAP {
239 return true;
240 }
241 }
242 false
243}
244
245fn looks_already_compressed(sample: &[u8]) -> bool {
247 if sample.starts_with(&[0x1f, 0x8b]) {
249 return true;
250 }
251 if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
253 return true;
254 }
255 if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
257 return true;
258 }
259 if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
261 return true;
262 }
263 if sample.starts_with(b"%PDF-") {
265 return true;
266 }
267 if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
269 return true;
270 }
271 if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
273 return true;
274 }
275 if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
277 return true;
278 }
279 if sample.starts_with(b"BZh") {
281 return true;
282 }
283 if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
285 return true;
286 }
287 if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
289 return true;
290 }
291 if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
293 return true;
294 }
295 false
296}
297
298impl SamplingDispatcher {
299 fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
303 if sample.len() < Self::MIN_SAMPLE_BYTES {
304 return self.default;
305 }
306 if looks_already_compressed(sample) {
307 return CodecKind::Passthrough;
308 }
309 if shannon_entropy(sample) >= self.entropy_threshold {
310 return CodecKind::Passthrough;
311 }
312 self.default
313 }
314
315 fn maybe_promote_to_gpu(
327 &self,
328 chosen: CodecKind,
329 sample: &[u8],
330 total_size: Option<u64>,
331 ) -> CodecKind {
332 if !self.prefer_gpu {
333 return chosen;
334 }
335 if chosen != CodecKind::CpuZstd {
336 return chosen;
337 }
338 let big_enough = match total_size {
339 Some(n) => n >= self.gpu_min_bytes as u64,
340 None => return chosen,
342 };
343 if !big_enough {
344 return chosen;
345 }
346 if self.prefer_columnar_gpu && looks_columnar_integer(sample) {
347 CodecKind::NvcompBitcomp
348 } else {
349 CodecKind::NvcompZstd
350 }
351 }
352}
353
354#[async_trait::async_trait]
355impl CodecDispatcher for SamplingDispatcher {
356 async fn pick(&self, sample: &[u8]) -> CodecKind {
357 self.pick_from_sample(sample)
359 }
360
361 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
362 let chosen = self.pick_from_sample(sample);
363 self.maybe_promote_to_gpu(chosen, sample, total_size)
364 }
365}
366
367#[async_trait::async_trait]
369impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
370 async fn pick(&self, sample: &[u8]) -> CodecKind {
371 (**self).pick(sample).await
372 }
373
374 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
375 (**self).pick_with_size_hint(sample, total_size).await
376 }
377}
378
379#[async_trait::async_trait]
380impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
381 async fn pick(&self, sample: &[u8]) -> CodecKind {
382 (**self).pick(sample).await
383 }
384
385 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
386 (**self).pick_with_size_hint(sample, total_size).await
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[tokio::test]
395 async fn always_dispatcher_returns_configured_kind() {
396 let d = AlwaysDispatcher(CodecKind::CpuZstd);
397 assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
398 }
399
400 #[tokio::test]
401 async fn boxed_dispatcher_works() {
402 let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
403 assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
404 }
405
406 #[tokio::test]
407 async fn sampling_short_sample_uses_default() {
408 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
409 assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
410 }
411
412 #[tokio::test]
413 async fn sampling_text_picks_default() {
414 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
415 let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
417 .repeat(30)
418 .into_bytes();
419 assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
420 }
421
422 #[tokio::test]
423 async fn sampling_random_bytes_picks_passthrough() {
424 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
425 let mut state: u64 = 0xfeed_beef_dead_c0de;
427 let mut payload = Vec::with_capacity(4096);
428 for _ in 0..4096 {
429 state ^= state << 13;
430 state ^= state >> 7;
431 state ^= state << 17;
432 payload.push((state & 0xff) as u8);
433 }
434 let e = shannon_entropy(&payload);
436 assert!(
437 e > 7.5,
438 "expected high entropy on pseudo-random bytes, got {e}"
439 );
440 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
441 }
442
443 #[tokio::test]
444 async fn sampling_gzip_magic_picks_passthrough() {
445 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
446 let mut payload = vec![0x1f, 0x8b, 0x08]; payload.extend(std::iter::repeat_n(b'a', 256));
448 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
449 }
450
451 #[tokio::test]
452 async fn sampling_png_magic_picks_passthrough() {
453 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
454 let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
455 payload.extend(std::iter::repeat_n(b'b', 256));
456 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
457 }
458
459 #[tokio::test]
460 async fn sampling_mp4_ftyp_picks_passthrough() {
461 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
462 let mut payload = vec![0u8; 256];
463 payload[4..8].copy_from_slice(b"ftyp");
464 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
465 }
466
467 #[test]
468 fn entropy_zero_for_uniform() {
469 let zeros = vec![0u8; 1024];
470 assert_eq!(shannon_entropy(&zeros), 0.0);
471 }
472
473 fn text_sample() -> Vec<u8> {
482 "the quick brown fox jumps over the lazy dog. "
483 .repeat(30)
484 .into_bytes()
485 }
486
487 #[tokio::test]
488 async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
489 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
490 let sample = text_sample();
491 let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
493 assert_eq!(kind, CodecKind::NvcompZstd);
494 }
495
496 #[tokio::test]
497 async fn gpu_pref_keeps_small_object_on_cpu() {
498 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
499 let sample = text_sample();
500 let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
503 assert_eq!(kind, CodecKind::CpuZstd);
504 }
505
506 #[tokio::test]
507 async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
508 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
510 let sample = text_sample();
511 let kind = d.pick_with_size_hint(&sample, Some(10 * 1024 * 1024)).await;
512 assert_eq!(kind, CodecKind::CpuZstd);
513 }
514
515 #[tokio::test]
516 async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
517 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
518 let mut state: u64 = 0xfeed_beef_dead_c0de;
522 let mut payload = Vec::with_capacity(4096);
523 for _ in 0..4096 {
524 state ^= state << 13;
525 state ^= state >> 7;
526 state ^= state << 17;
527 payload.push((state & 0xff) as u8);
528 }
529 let kind = d.pick_with_size_hint(&payload, Some(8 * 1024 * 1024)).await;
530 assert_eq!(kind, CodecKind::Passthrough);
531 }
532
533 #[tokio::test]
534 async fn gpu_pref_with_no_size_hint_stays_conservative() {
535 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
536 let sample = text_sample();
537 let kind = d.pick_with_size_hint(&sample, None).await;
541 assert_eq!(kind, CodecKind::CpuZstd);
542 }
543
544 fn u32_monotonic_postings() -> Vec<u8> {
553 let mut buf = Vec::with_capacity(4096);
554 for i in 0u32..1024 {
555 buf.extend_from_slice(&i.to_le_bytes());
556 }
557 buf
558 }
559
560 fn u64_timestamps() -> Vec<u8> {
564 let base: u64 = 1_700_000_000_000_000_000;
565 let mut buf = Vec::with_capacity(4096);
566 for i in 0u64..512 {
567 buf.extend_from_slice(&(base + i * 137).to_le_bytes());
568 }
569 buf
570 }
571
572 #[test]
573 fn columnar_detect_flags_u32_postings() {
574 assert!(looks_columnar_integer(&u32_monotonic_postings()));
575 }
576
577 #[test]
578 fn columnar_detect_flags_u64_timestamps() {
579 assert!(looks_columnar_integer(&u64_timestamps()));
580 }
581
582 #[test]
583 fn columnar_detect_rejects_english_text() {
584 let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
585 .repeat(50)
586 .into_bytes();
587 assert!(!looks_columnar_integer(&text));
590 }
591
592 #[test]
593 fn columnar_detect_rejects_random_bytes() {
594 let mut state: u64 = 0xa5a5_5a5a_dead_beef;
595 let mut payload = Vec::with_capacity(4096);
596 for _ in 0..4096 {
597 state ^= state << 13;
598 state ^= state >> 7;
599 state ^= state << 17;
600 payload.push((state & 0xff) as u8);
601 }
602 assert!(!looks_columnar_integer(&payload));
603 }
604
605 #[test]
606 fn columnar_detect_rejects_too_small_sample() {
607 let mut buf = Vec::with_capacity(256);
610 for i in 0u32..64 {
611 buf.extend_from_slice(&i.to_le_bytes());
612 }
613 assert!(!looks_columnar_integer(&buf));
614 }
615
616 #[tokio::test]
617 async fn gpu_pref_columnar_promotes_postings_to_bitcomp() {
618 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
619 .with_gpu_preference(true, 1_048_576)
620 .with_columnar_gpu_preference(true);
621 let sample = u32_monotonic_postings();
622 let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
623 assert_eq!(kind, CodecKind::NvcompBitcomp);
624 }
625
626 #[tokio::test]
627 async fn gpu_pref_columnar_promotes_timestamps_to_bitcomp() {
628 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
629 .with_gpu_preference(true, 1_048_576)
630 .with_columnar_gpu_preference(true);
631 let sample = u64_timestamps();
632 let kind = d.pick_with_size_hint(&sample, Some(4 * 1024 * 1024)).await;
633 assert_eq!(kind, CodecKind::NvcompBitcomp);
634 }
635
636 #[tokio::test]
637 async fn gpu_pref_columnar_falls_through_to_zstd_on_text() {
638 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
641 .with_gpu_preference(true, 1_048_576)
642 .with_columnar_gpu_preference(true);
643 let sample = text_sample();
644 let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
645 assert_eq!(kind, CodecKind::NvcompZstd);
646 }
647
648 #[tokio::test]
649 async fn gpu_pref_columnar_off_keeps_postings_on_zstd() {
650 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
654 let sample = u32_monotonic_postings();
655 let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
656 assert_eq!(kind, CodecKind::NvcompZstd);
657 }
658
659 #[tokio::test]
660 async fn gpu_pref_columnar_respects_size_threshold() {
661 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
666 .with_gpu_preference(true, 1_048_576)
667 .with_columnar_gpu_preference(true);
668 let sample = u32_monotonic_postings();
669 let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
670 assert_eq!(kind, CodecKind::CpuZstd);
671 }
672
673 #[test]
674 fn entropy_full_8_for_each_byte_once() {
675 let mut payload: Vec<u8> = (0..=255).collect();
677 let copy = payload.clone();
679 for _ in 0..3 {
680 payload.extend_from_slice(©);
681 }
682 let e = shannon_entropy(&payload);
683 assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
684 }
685}