1use crate::CodecKind;
8
9#[async_trait::async_trait]
21pub trait CodecDispatcher: Send + Sync {
22 async fn pick(&self, sample: &[u8]) -> CodecKind;
23
24 async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
29 self.pick(sample).await
30 }
31}
32
33#[derive(Debug, Clone, Copy)]
35pub struct AlwaysDispatcher(pub CodecKind);
36
37#[async_trait::async_trait]
38impl CodecDispatcher for AlwaysDispatcher {
39 async fn pick(&self, _sample: &[u8]) -> CodecKind {
40 self.0
41 }
42}
43
44#[derive(Debug, Clone)]
71pub struct SamplingDispatcher {
72 pub default: CodecKind,
73 pub entropy_threshold: f64,
74 pub prefer_gpu: bool,
76 pub gpu_min_bytes: usize,
82}
83
84impl SamplingDispatcher {
85 pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
86 pub const MIN_SAMPLE_BYTES: usize = 128;
87 pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
91
92 pub fn new(default: CodecKind) -> Self {
93 Self {
94 default,
95 entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
96 prefer_gpu: false,
97 gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
98 }
99 }
100
101 #[must_use]
102 pub fn with_entropy_threshold(mut self, t: f64) -> Self {
103 self.entropy_threshold = t;
104 self
105 }
106
107 #[must_use]
113 pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
114 self.prefer_gpu = prefer_gpu;
115 self.gpu_min_bytes = gpu_min_bytes;
116 self
117 }
118}
119
120fn shannon_entropy(sample: &[u8]) -> f64 {
122 if sample.is_empty() {
123 return 0.0;
124 }
125 let mut counts = [0u32; 256];
126 for &b in sample {
127 counts[b as usize] += 1;
128 }
129 let n = sample.len() as f64;
130 let mut entropy = 0.0;
131 for c in counts {
132 if c == 0 {
133 continue;
134 }
135 let p = f64::from(c) / n;
136 entropy -= p * p.log2();
137 }
138 entropy
139}
140
141fn looks_already_compressed(sample: &[u8]) -> bool {
143 if sample.starts_with(&[0x1f, 0x8b]) {
145 return true;
146 }
147 if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
149 return true;
150 }
151 if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
153 return true;
154 }
155 if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
157 return true;
158 }
159 if sample.starts_with(b"%PDF-") {
161 return true;
162 }
163 if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
165 return true;
166 }
167 if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
169 return true;
170 }
171 if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
173 return true;
174 }
175 if sample.starts_with(b"BZh") {
177 return true;
178 }
179 if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
181 return true;
182 }
183 if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
185 return true;
186 }
187 if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
189 return true;
190 }
191 false
192}
193
194impl SamplingDispatcher {
195 fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
199 if sample.len() < Self::MIN_SAMPLE_BYTES {
200 return self.default;
201 }
202 if looks_already_compressed(sample) {
203 return CodecKind::Passthrough;
204 }
205 if shannon_entropy(sample) >= self.entropy_threshold {
206 return CodecKind::Passthrough;
207 }
208 self.default
209 }
210
211 fn maybe_promote_to_gpu(&self, chosen: CodecKind, total_size: Option<u64>) -> CodecKind {
218 if !self.prefer_gpu {
219 return chosen;
220 }
221 if chosen != CodecKind::CpuZstd {
222 return chosen;
223 }
224 match total_size {
225 Some(n) if n >= self.gpu_min_bytes as u64 => CodecKind::NvcompZstd,
226 _ => chosen,
229 }
230 }
231}
232
233#[async_trait::async_trait]
234impl CodecDispatcher for SamplingDispatcher {
235 async fn pick(&self, sample: &[u8]) -> CodecKind {
236 self.pick_from_sample(sample)
238 }
239
240 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
241 let chosen = self.pick_from_sample(sample);
242 self.maybe_promote_to_gpu(chosen, total_size)
243 }
244}
245
246#[async_trait::async_trait]
248impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
249 async fn pick(&self, sample: &[u8]) -> CodecKind {
250 (**self).pick(sample).await
251 }
252
253 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
254 (**self).pick_with_size_hint(sample, total_size).await
255 }
256}
257
258#[async_trait::async_trait]
259impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
260 async fn pick(&self, sample: &[u8]) -> CodecKind {
261 (**self).pick(sample).await
262 }
263
264 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
265 (**self).pick_with_size_hint(sample, total_size).await
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[tokio::test]
274 async fn always_dispatcher_returns_configured_kind() {
275 let d = AlwaysDispatcher(CodecKind::CpuZstd);
276 assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
277 }
278
279 #[tokio::test]
280 async fn boxed_dispatcher_works() {
281 let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
282 assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
283 }
284
285 #[tokio::test]
286 async fn sampling_short_sample_uses_default() {
287 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
288 assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
289 }
290
291 #[tokio::test]
292 async fn sampling_text_picks_default() {
293 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
294 let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
296 .repeat(30)
297 .into_bytes();
298 assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
299 }
300
301 #[tokio::test]
302 async fn sampling_random_bytes_picks_passthrough() {
303 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
304 let mut state: u64 = 0xfeed_beef_dead_c0de;
306 let mut payload = Vec::with_capacity(4096);
307 for _ in 0..4096 {
308 state ^= state << 13;
309 state ^= state >> 7;
310 state ^= state << 17;
311 payload.push((state & 0xff) as u8);
312 }
313 let e = shannon_entropy(&payload);
315 assert!(
316 e > 7.5,
317 "expected high entropy on pseudo-random bytes, got {e}"
318 );
319 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
320 }
321
322 #[tokio::test]
323 async fn sampling_gzip_magic_picks_passthrough() {
324 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
325 let mut payload = vec![0x1f, 0x8b, 0x08]; payload.extend(std::iter::repeat_n(b'a', 256));
327 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
328 }
329
330 #[tokio::test]
331 async fn sampling_png_magic_picks_passthrough() {
332 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
333 let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
334 payload.extend(std::iter::repeat_n(b'b', 256));
335 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
336 }
337
338 #[tokio::test]
339 async fn sampling_mp4_ftyp_picks_passthrough() {
340 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
341 let mut payload = vec![0u8; 256];
342 payload[4..8].copy_from_slice(b"ftyp");
343 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
344 }
345
346 #[test]
347 fn entropy_zero_for_uniform() {
348 let zeros = vec![0u8; 1024];
349 assert_eq!(shannon_entropy(&zeros), 0.0);
350 }
351
352 fn text_sample() -> Vec<u8> {
361 "the quick brown fox jumps over the lazy dog. "
362 .repeat(30)
363 .into_bytes()
364 }
365
366 #[tokio::test]
367 async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
368 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
369 .with_gpu_preference(true, 1_048_576);
370 let sample = text_sample();
371 let kind = d
373 .pick_with_size_hint(&sample, Some(2 * 1024 * 1024))
374 .await;
375 assert_eq!(kind, CodecKind::NvcompZstd);
376 }
377
378 #[tokio::test]
379 async fn gpu_pref_keeps_small_object_on_cpu() {
380 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
381 .with_gpu_preference(true, 1_048_576);
382 let sample = text_sample();
383 let kind = d
386 .pick_with_size_hint(&sample, Some(100 * 1024))
387 .await;
388 assert_eq!(kind, CodecKind::CpuZstd);
389 }
390
391 #[tokio::test]
392 async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
393 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
395 let sample = text_sample();
396 let kind = d
397 .pick_with_size_hint(&sample, Some(10 * 1024 * 1024))
398 .await;
399 assert_eq!(kind, CodecKind::CpuZstd);
400 }
401
402 #[tokio::test]
403 async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
404 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
405 .with_gpu_preference(true, 1_048_576);
406 let mut state: u64 = 0xfeed_beef_dead_c0de;
410 let mut payload = Vec::with_capacity(4096);
411 for _ in 0..4096 {
412 state ^= state << 13;
413 state ^= state >> 7;
414 state ^= state << 17;
415 payload.push((state & 0xff) as u8);
416 }
417 let kind = d
418 .pick_with_size_hint(&payload, Some(8 * 1024 * 1024))
419 .await;
420 assert_eq!(kind, CodecKind::Passthrough);
421 }
422
423 #[tokio::test]
424 async fn gpu_pref_with_no_size_hint_stays_conservative() {
425 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
426 .with_gpu_preference(true, 1_048_576);
427 let sample = text_sample();
428 let kind = d.pick_with_size_hint(&sample, None).await;
432 assert_eq!(kind, CodecKind::CpuZstd);
433 }
434
435 #[test]
436 fn entropy_full_8_for_each_byte_once() {
437 let mut payload: Vec<u8> = (0..=255).collect();
439 let copy = payload.clone();
441 for _ in 0..3 {
442 payload.extend_from_slice(©);
443 }
444 let e = shannon_entropy(&payload);
445 assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
446 }
447}