1use crate::CodecKind;
8
9#[async_trait::async_trait]
11pub trait CodecDispatcher: Send + Sync {
12 async fn pick(&self, sample: &[u8]) -> CodecKind;
13}
14
15#[derive(Debug, Clone, Copy)]
17pub struct AlwaysDispatcher(pub CodecKind);
18
19#[async_trait::async_trait]
20impl CodecDispatcher for AlwaysDispatcher {
21 async fn pick(&self, _sample: &[u8]) -> CodecKind {
22 self.0
23 }
24}
25
26#[derive(Debug, Clone)]
39pub struct SamplingDispatcher {
40 pub default: CodecKind,
41 pub entropy_threshold: f64,
42}
43
44impl SamplingDispatcher {
45 pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
46 pub const MIN_SAMPLE_BYTES: usize = 128;
47
48 pub fn new(default: CodecKind) -> Self {
49 Self {
50 default,
51 entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
52 }
53 }
54
55 #[must_use]
56 pub fn with_entropy_threshold(mut self, t: f64) -> Self {
57 self.entropy_threshold = t;
58 self
59 }
60}
61
62fn shannon_entropy(sample: &[u8]) -> f64 {
64 if sample.is_empty() {
65 return 0.0;
66 }
67 let mut counts = [0u32; 256];
68 for &b in sample {
69 counts[b as usize] += 1;
70 }
71 let n = sample.len() as f64;
72 let mut entropy = 0.0;
73 for c in counts {
74 if c == 0 {
75 continue;
76 }
77 let p = f64::from(c) / n;
78 entropy -= p * p.log2();
79 }
80 entropy
81}
82
83fn looks_already_compressed(sample: &[u8]) -> bool {
85 if sample.starts_with(&[0x1f, 0x8b]) {
87 return true;
88 }
89 if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
91 return true;
92 }
93 if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
95 return true;
96 }
97 if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
99 return true;
100 }
101 if sample.starts_with(b"%PDF-") {
103 return true;
104 }
105 if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
107 return true;
108 }
109 if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
111 return true;
112 }
113 if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
115 return true;
116 }
117 if sample.starts_with(b"BZh") {
119 return true;
120 }
121 if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
123 return true;
124 }
125 if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
127 return true;
128 }
129 if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
131 return true;
132 }
133 false
134}
135
136#[async_trait::async_trait]
137impl CodecDispatcher for SamplingDispatcher {
138 async fn pick(&self, sample: &[u8]) -> CodecKind {
139 if sample.len() < Self::MIN_SAMPLE_BYTES {
140 return self.default;
141 }
142 if looks_already_compressed(sample) {
143 return CodecKind::Passthrough;
144 }
145 if shannon_entropy(sample) >= self.entropy_threshold {
146 return CodecKind::Passthrough;
147 }
148 self.default
149 }
150}
151
152#[async_trait::async_trait]
154impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
155 async fn pick(&self, sample: &[u8]) -> CodecKind {
156 (**self).pick(sample).await
157 }
158}
159
160#[async_trait::async_trait]
161impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
162 async fn pick(&self, sample: &[u8]) -> CodecKind {
163 (**self).pick(sample).await
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[tokio::test]
172 async fn always_dispatcher_returns_configured_kind() {
173 let d = AlwaysDispatcher(CodecKind::CpuZstd);
174 assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
175 }
176
177 #[tokio::test]
178 async fn boxed_dispatcher_works() {
179 let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
180 assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
181 }
182
183 #[tokio::test]
184 async fn sampling_short_sample_uses_default() {
185 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
186 assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
187 }
188
189 #[tokio::test]
190 async fn sampling_text_picks_default() {
191 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
192 let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
194 .repeat(30)
195 .into_bytes();
196 assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
197 }
198
199 #[tokio::test]
200 async fn sampling_random_bytes_picks_passthrough() {
201 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
202 let mut state: u64 = 0xfeed_beef_dead_c0de;
204 let mut payload = Vec::with_capacity(4096);
205 for _ in 0..4096 {
206 state ^= state << 13;
207 state ^= state >> 7;
208 state ^= state << 17;
209 payload.push((state & 0xff) as u8);
210 }
211 let e = shannon_entropy(&payload);
213 assert!(
214 e > 7.5,
215 "expected high entropy on pseudo-random bytes, got {e}"
216 );
217 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
218 }
219
220 #[tokio::test]
221 async fn sampling_gzip_magic_picks_passthrough() {
222 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
223 let mut payload = vec![0x1f, 0x8b, 0x08]; payload.extend(std::iter::repeat_n(b'a', 256));
225 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
226 }
227
228 #[tokio::test]
229 async fn sampling_png_magic_picks_passthrough() {
230 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
231 let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
232 payload.extend(std::iter::repeat_n(b'b', 256));
233 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
234 }
235
236 #[tokio::test]
237 async fn sampling_mp4_ftyp_picks_passthrough() {
238 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
239 let mut payload = vec![0u8; 256];
240 payload[4..8].copy_from_slice(b"ftyp");
241 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
242 }
243
244 #[test]
245 fn entropy_zero_for_uniform() {
246 let zeros = vec![0u8; 1024];
247 assert_eq!(shannon_entropy(&zeros), 0.0);
248 }
249
250 #[test]
251 fn entropy_full_8_for_each_byte_once() {
252 let mut payload: Vec<u8> = (0..=255).collect();
254 let copy = payload.clone();
256 for _ in 0..3 {
257 payload.extend_from_slice(©);
258 }
259 let e = shannon_entropy(&payload);
260 assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
261 }
262}