Skip to main content

s4_codec/
dispatcher.rs

1//! PUT 時にどの codec で圧縮するかを選ぶ dispatcher。
2//!
3//! Phase 1 では「常に同じ codec を選ぶ」`AlwaysDispatcher` を提供。
4//! Phase 1 後半で `SamplingDispatcher` を追加し、入力先頭の sampling で
5//! integer 主体 / text 主体 / 既圧縮 を判定して codec を切り替える。
6
7use crate::CodecKind;
8
9/// PUT body の先頭 sample から codec を選ぶ trait。
10#[async_trait::async_trait]
11pub trait CodecDispatcher: Send + Sync {
12    async fn pick(&self, sample: &[u8]) -> CodecKind;
13}
14
15/// 常に同じ kind を返す dispatcher (固定 codec 運用)。
16#[derive(Debug, Clone, Copy)]
17pub struct AlwaysDispatcher(pub CodecKind);
18
19#[async_trait::async_trait]
20impl CodecDispatcher for AlwaysDispatcher {
21    async fn pick(&self, _sample: &[u8]) -> CodecKind {
22        self.0
23    }
24}
25
26/// 入力 sample を見て codec を選ぶ dispatcher。
27///
28/// 判定順 (上位優先):
29/// 1. 短すぎる入力 (<128 byte) → `default`
30/// 2. magic bytes が既圧縮フォーマット (gzip / zstd / png / jpeg / mp4 / zip / pdf
31///    / 7z / xz / bzip2) → `Passthrough` (再圧縮しても意味がない)
32/// 3. Shannon entropy が `entropy_threshold` (default 7.5 bits/byte) 以上 → `Passthrough`
33///    (高エントロピー = ほぼランダム = 圧縮余地なし)
34/// 4. それ以外 → `default` (text / log / parquet 数値列等、圧縮余地あり)
35///
36/// Phase 1 では `default = CpuZstd` 想定。Phase 1 後半で integer-column 検出を加え、
37/// `default` 分岐を「数値列なら NvcompBitcomp、そうでなければ CpuZstd」に拡張する。
38#[derive(Debug, Clone)]
39pub struct SamplingDispatcher {
40    pub default: CodecKind,
41    pub entropy_threshold: f64,
42}
43
44impl SamplingDispatcher {
45    pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
46    pub const MIN_SAMPLE_BYTES: usize = 128;
47
48    pub fn new(default: CodecKind) -> Self {
49        Self {
50            default,
51            entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
52        }
53    }
54
55    #[must_use]
56    pub fn with_entropy_threshold(mut self, t: f64) -> Self {
57        self.entropy_threshold = t;
58        self
59    }
60}
61
62/// Shannon entropy (bits per byte) を sample から推定。0..=8 の範囲。
63fn shannon_entropy(sample: &[u8]) -> f64 {
64    if sample.is_empty() {
65        return 0.0;
66    }
67    let mut counts = [0u32; 256];
68    for &b in sample {
69        counts[b as usize] += 1;
70    }
71    let n = sample.len() as f64;
72    let mut entropy = 0.0;
73    for c in counts {
74        if c == 0 {
75            continue;
76        }
77        let p = f64::from(c) / n;
78        entropy -= p * p.log2();
79    }
80    entropy
81}
82
83/// 既圧縮データの magic bytes 検出。検出した場合は true を返す。
84fn looks_already_compressed(sample: &[u8]) -> bool {
85    // gzip
86    if sample.starts_with(&[0x1f, 0x8b]) {
87        return true;
88    }
89    // zstd
90    if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
91        return true;
92    }
93    // PNG
94    if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
95        return true;
96    }
97    // JPEG (FF D8 FF)
98    if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
99        return true;
100    }
101    // PDF
102    if sample.starts_with(b"%PDF-") {
103        return true;
104    }
105    // ZIP / docx / jar / apk
106    if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
107        return true;
108    }
109    // 7z
110    if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
111        return true;
112    }
113    // xz
114    if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
115        return true;
116    }
117    // bzip2
118    if sample.starts_with(b"BZh") {
119        return true;
120    }
121    // mp4 / m4a / mov (ISO Base Media): bytes 4..8 == "ftyp"
122    if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
123        return true;
124    }
125    // webm / mkv (EBML)
126    if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
127        return true;
128    }
129    // webp (RIFF .... WEBP)
130    if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
131        return true;
132    }
133    false
134}
135
136#[async_trait::async_trait]
137impl CodecDispatcher for SamplingDispatcher {
138    async fn pick(&self, sample: &[u8]) -> CodecKind {
139        if sample.len() < Self::MIN_SAMPLE_BYTES {
140            return self.default;
141        }
142        if looks_already_compressed(sample) {
143            return CodecKind::Passthrough;
144        }
145        if shannon_entropy(sample) >= self.entropy_threshold {
146            return CodecKind::Passthrough;
147        }
148        self.default
149    }
150}
151
152/// `Box<dyn CodecDispatcher>` からも `CodecDispatcher` として使えるようにする blanket impl
153#[async_trait::async_trait]
154impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
155    async fn pick(&self, sample: &[u8]) -> CodecKind {
156        (**self).pick(sample).await
157    }
158}
159
160#[async_trait::async_trait]
161impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
162    async fn pick(&self, sample: &[u8]) -> CodecKind {
163        (**self).pick(sample).await
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[tokio::test]
172    async fn always_dispatcher_returns_configured_kind() {
173        let d = AlwaysDispatcher(CodecKind::CpuZstd);
174        assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
175    }
176
177    #[tokio::test]
178    async fn boxed_dispatcher_works() {
179        let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
180        assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
181    }
182
183    #[tokio::test]
184    async fn sampling_short_sample_uses_default() {
185        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
186        assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
187    }
188
189    #[tokio::test]
190    async fn sampling_text_picks_default() {
191        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
192        // 1 KB の英語っぽい text (低エントロピー)
193        let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
194            .repeat(30)
195            .into_bytes();
196        assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
197    }
198
199    #[tokio::test]
200    async fn sampling_random_bytes_picks_passthrough() {
201        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
202        // 1 KB の高エントロピー (擬似ランダムデータを作る — XOR-shift で uniformish に)
203        let mut state: u64 = 0xfeed_beef_dead_c0de;
204        let mut payload = Vec::with_capacity(4096);
205        for _ in 0..4096 {
206            state ^= state << 13;
207            state ^= state >> 7;
208            state ^= state << 17;
209            payload.push((state & 0xff) as u8);
210        }
211        // entropy が default threshold (7.5) 以上のはず
212        let e = shannon_entropy(&payload);
213        assert!(
214            e > 7.5,
215            "expected high entropy on pseudo-random bytes, got {e}"
216        );
217        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
218    }
219
220    #[tokio::test]
221    async fn sampling_gzip_magic_picks_passthrough() {
222        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
223        let mut payload = vec![0x1f, 0x8b, 0x08]; // gzip magic + DEFLATE method
224        payload.extend(std::iter::repeat_n(b'a', 256));
225        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
226    }
227
228    #[tokio::test]
229    async fn sampling_png_magic_picks_passthrough() {
230        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
231        let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
232        payload.extend(std::iter::repeat_n(b'b', 256));
233        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
234    }
235
236    #[tokio::test]
237    async fn sampling_mp4_ftyp_picks_passthrough() {
238        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
239        let mut payload = vec![0u8; 256];
240        payload[4..8].copy_from_slice(b"ftyp");
241        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
242    }
243
244    #[test]
245    fn entropy_zero_for_uniform() {
246        let zeros = vec![0u8; 1024];
247        assert_eq!(shannon_entropy(&zeros), 0.0);
248    }
249
250    #[test]
251    fn entropy_full_8_for_each_byte_once() {
252        // 0..256 を 1 度ずつ → 各 byte の確率 1/256 → entropy = 8 bits
253        let mut payload: Vec<u8> = (0..=255).collect();
254        // 256 byte は最小 sample 未満になりうるので 1024 まで複製 (entropy は不変)
255        let copy = payload.clone();
256        for _ in 0..3 {
257            payload.extend_from_slice(&copy);
258        }
259        let e = shannon_entropy(&payload);
260        assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
261    }
262}