Skip to main content

s4_codec/
dispatcher.rs

1//! PUT 時にどの codec で圧縮するかを選ぶ dispatcher。
2//!
3//! Phase 1 では「常に同じ codec を選ぶ」`AlwaysDispatcher` を提供。
4//! Phase 1 後半で `SamplingDispatcher` を追加し、入力先頭の sampling で
5//! integer 主体 / text 主体 / 既圧縮 を判定して codec を切り替える。
6
7use crate::CodecKind;
8
9/// PUT body の先頭 sample から codec を選ぶ trait。
10///
11/// v0.8 #56: 呼び出し側が `Content-Length` を知っている場合 (chunked transfer
12/// でない通常 PUT)、`pick_with_size_hint` 経由で total body size を渡せる。
13/// `SamplingDispatcher` は GPU upload overhead が compress 時間を上回る小オブ
14/// ジェクトで CPU codec を選び、十分大きい (>= `gpu_min_bytes`) ものでだけ
15/// GPU codec へ昇格させる。size hint が `None` (chunked transfer) の場合は
16/// 保守的に CPU 側に倒す。
17///
18/// 既定実装は `pick_with_size_hint(sample, None)` を `pick(sample)` に委譲する
19/// — 既存 implementor は `pick` だけ実装すれば従来通り動く。
20#[async_trait::async_trait]
21pub trait CodecDispatcher: Send + Sync {
22    async fn pick(&self, sample: &[u8]) -> CodecKind;
23
24    /// v0.8 #56: size-hint aware pick. 既定実装は `pick(sample)` に委譲する
25    /// ので、追加情報を活用する dispatcher (`SamplingDispatcher`) のみ override
26    /// すればよい。`total_size = None` は「chunked transfer で content-length
27    /// が無い」ケースを表す。
28    async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
29        self.pick(sample).await
30    }
31}
32
33/// 常に同じ kind を返す dispatcher (固定 codec 運用)。
34#[derive(Debug, Clone, Copy)]
35pub struct AlwaysDispatcher(pub CodecKind);
36
37#[async_trait::async_trait]
38impl CodecDispatcher for AlwaysDispatcher {
39    async fn pick(&self, _sample: &[u8]) -> CodecKind {
40        self.0
41    }
42}
43
44/// 入力 sample を見て codec を選ぶ dispatcher。
45///
46/// 判定順 (上位優先):
47/// 1. 短すぎる入力 (<128 byte) → `default`
48/// 2. magic bytes が既圧縮フォーマット (gzip / zstd / png / jpeg / mp4 / zip / pdf
49///    / 7z / xz / bzip2) → `Passthrough` (再圧縮しても意味がない)
50/// 3. Shannon entropy が `entropy_threshold` (default 7.5 bits/byte) 以上 → `Passthrough`
51///    (高エントロピー = ほぼランダム = 圧縮余地なし)
52/// 4. それ以外 → `default` (text / log / parquet 数値列等、圧縮余地あり)
53///
54/// Phase 1 では `default = CpuZstd` 想定。Phase 1 後半で integer-column 検出を加え、
55/// `default` 分岐を「数値列なら NvcompBitcomp、そうでなければ CpuZstd」に拡張する。
56///
57/// ## v0.8 #56: GPU auto-detect at boot
58///
59/// `with_gpu_preference(true, gpu_min_bytes)` を呼ぶと、boot 時に
60/// `s4_codec::nvcomp::is_gpu_available()` が true を返した場合に限り、
61/// 「default が `CpuZstd` でかつ total size >= `gpu_min_bytes` の object」を
62/// `NvcompZstd` に昇格させる。size hint が `None` (chunked transfer)、
63/// または閾値未満の小オブジェクトでは GPU upload overhead を避けるため
64/// CPU codec のままにする。
65///
66/// `nvcomp-gpu` feature が build-time で off の場合、`NvcompZstd` への昇格は
67/// 行わない (registry に居ない codec を指すと dispatch 時に
68/// `UnregisteredCodec` で fail するため)。orchestrator は main.rs 側で
69/// `prefer_gpu = false` を強制することでこれを担保する。
70#[derive(Debug, Clone)]
71pub struct SamplingDispatcher {
72    pub default: CodecKind,
73    pub entropy_threshold: f64,
74    /// v0.8 #56: when set, route large `CpuZstd` picks through `NvcompZstd`.
75    pub prefer_gpu: bool,
76    /// v0.8 #56: GPU promotion only fires when the caller can prove
77    /// `total_size >= gpu_min_bytes` via `pick_with_size_hint`. Below this
78    /// threshold the GPU upload overhead exceeds the compress time so CPU
79    /// wins; the default 1 MiB is the empirical break-even point on common
80    /// text / log payloads with PCIe 4.0 + an A10G-class GPU.
81    pub gpu_min_bytes: usize,
82}
83
84impl SamplingDispatcher {
85    pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
86    pub const MIN_SAMPLE_BYTES: usize = 128;
87    /// v0.8 #56: 1 MiB. The empirical break-even point — below this, the
88    /// PCIe upload + kernel launch overhead dominates the GPU's compress
89    /// throughput advantage.
90    pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
91
92    pub fn new(default: CodecKind) -> Self {
93        Self {
94            default,
95            entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
96            prefer_gpu: false,
97            gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
98        }
99    }
100
101    #[must_use]
102    pub fn with_entropy_threshold(mut self, t: f64) -> Self {
103        self.entropy_threshold = t;
104        self
105    }
106
107    /// v0.8 #56: enable GPU promotion. When `prefer_gpu = true`, a `CpuZstd`
108    /// pick on a body whose `total_size >= gpu_min_bytes` is rewritten to
109    /// `NvcompZstd`. Pass `prefer_gpu = false` (the default) to disable.
110    /// The threshold is in bytes; `1_048_576` (1 MiB) is the recommended
111    /// default for PCIe 4.0 hosts.
112    #[must_use]
113    pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
114        self.prefer_gpu = prefer_gpu;
115        self.gpu_min_bytes = gpu_min_bytes;
116        self
117    }
118}
119
120/// Shannon entropy (bits per byte) を sample から推定。0..=8 の範囲。
121fn shannon_entropy(sample: &[u8]) -> f64 {
122    if sample.is_empty() {
123        return 0.0;
124    }
125    let mut counts = [0u32; 256];
126    for &b in sample {
127        counts[b as usize] += 1;
128    }
129    let n = sample.len() as f64;
130    let mut entropy = 0.0;
131    for c in counts {
132        if c == 0 {
133            continue;
134        }
135        let p = f64::from(c) / n;
136        entropy -= p * p.log2();
137    }
138    entropy
139}
140
141/// 既圧縮データの magic bytes 検出。検出した場合は true を返す。
142fn looks_already_compressed(sample: &[u8]) -> bool {
143    // gzip
144    if sample.starts_with(&[0x1f, 0x8b]) {
145        return true;
146    }
147    // zstd
148    if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
149        return true;
150    }
151    // PNG
152    if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
153        return true;
154    }
155    // JPEG (FF D8 FF)
156    if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
157        return true;
158    }
159    // PDF
160    if sample.starts_with(b"%PDF-") {
161        return true;
162    }
163    // ZIP / docx / jar / apk
164    if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
165        return true;
166    }
167    // 7z
168    if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
169        return true;
170    }
171    // xz
172    if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
173        return true;
174    }
175    // bzip2
176    if sample.starts_with(b"BZh") {
177        return true;
178    }
179    // mp4 / m4a / mov (ISO Base Media): bytes 4..8 == "ftyp"
180    if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
181        return true;
182    }
183    // webm / mkv (EBML)
184    if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
185        return true;
186    }
187    // webp (RIFF .... WEBP)
188    if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
189        return true;
190    }
191    false
192}
193
194impl SamplingDispatcher {
195    /// Core sample-only decision shared by `pick` and `pick_with_size_hint`.
196    /// Returns the pre-GPU-promotion choice; the size-hint-aware caller may
197    /// rewrite a `CpuZstd` result to `NvcompZstd` when the body is big enough.
198    fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
199        if sample.len() < Self::MIN_SAMPLE_BYTES {
200            return self.default;
201        }
202        if looks_already_compressed(sample) {
203            return CodecKind::Passthrough;
204        }
205        if shannon_entropy(sample) >= self.entropy_threshold {
206            return CodecKind::Passthrough;
207        }
208        self.default
209    }
210
211    /// v0.8 #56: rewrite a `CpuZstd` pick to `NvcompZstd` when (a) GPU
212    /// preference is on, (b) the caller could prove a total body size >=
213    /// `gpu_min_bytes`. Passthrough / non-CpuZstd picks are left alone —
214    /// already-compressed bodies don't benefit from GPU compression, and
215    /// other CPU codecs (CpuGzip) imply the operator wants wire-compatible
216    /// output that NvcompZstd can't provide.
217    fn maybe_promote_to_gpu(&self, chosen: CodecKind, total_size: Option<u64>) -> CodecKind {
218        if !self.prefer_gpu {
219            return chosen;
220        }
221        if chosen != CodecKind::CpuZstd {
222            return chosen;
223        }
224        match total_size {
225            Some(n) if n >= self.gpu_min_bytes as u64 => CodecKind::NvcompZstd,
226            // No size hint (chunked transfer) → conservative, keep CpuZstd.
227            // Below threshold → GPU upload overhead exceeds compress gain.
228            _ => chosen,
229        }
230    }
231}
232
233#[async_trait::async_trait]
234impl CodecDispatcher for SamplingDispatcher {
235    async fn pick(&self, sample: &[u8]) -> CodecKind {
236        // No size hint available → never promote to GPU.
237        self.pick_from_sample(sample)
238    }
239
240    async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
241        let chosen = self.pick_from_sample(sample);
242        self.maybe_promote_to_gpu(chosen, total_size)
243    }
244}
245
246/// `Box<dyn CodecDispatcher>` からも `CodecDispatcher` として使えるようにする blanket impl
247#[async_trait::async_trait]
248impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
249    async fn pick(&self, sample: &[u8]) -> CodecKind {
250        (**self).pick(sample).await
251    }
252
253    async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
254        (**self).pick_with_size_hint(sample, total_size).await
255    }
256}
257
258#[async_trait::async_trait]
259impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
260    async fn pick(&self, sample: &[u8]) -> CodecKind {
261        (**self).pick(sample).await
262    }
263
264    async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
265        (**self).pick_with_size_hint(sample, total_size).await
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[tokio::test]
274    async fn always_dispatcher_returns_configured_kind() {
275        let d = AlwaysDispatcher(CodecKind::CpuZstd);
276        assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
277    }
278
279    #[tokio::test]
280    async fn boxed_dispatcher_works() {
281        let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
282        assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
283    }
284
285    #[tokio::test]
286    async fn sampling_short_sample_uses_default() {
287        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
288        assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
289    }
290
291    #[tokio::test]
292    async fn sampling_text_picks_default() {
293        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
294        // 1 KB の英語っぽい text (低エントロピー)
295        let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
296            .repeat(30)
297            .into_bytes();
298        assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
299    }
300
301    #[tokio::test]
302    async fn sampling_random_bytes_picks_passthrough() {
303        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
304        // 1 KB の高エントロピー (擬似ランダムデータを作る — XOR-shift で uniformish に)
305        let mut state: u64 = 0xfeed_beef_dead_c0de;
306        let mut payload = Vec::with_capacity(4096);
307        for _ in 0..4096 {
308            state ^= state << 13;
309            state ^= state >> 7;
310            state ^= state << 17;
311            payload.push((state & 0xff) as u8);
312        }
313        // entropy が default threshold (7.5) 以上のはず
314        let e = shannon_entropy(&payload);
315        assert!(
316            e > 7.5,
317            "expected high entropy on pseudo-random bytes, got {e}"
318        );
319        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
320    }
321
322    #[tokio::test]
323    async fn sampling_gzip_magic_picks_passthrough() {
324        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
325        let mut payload = vec![0x1f, 0x8b, 0x08]; // gzip magic + DEFLATE method
326        payload.extend(std::iter::repeat_n(b'a', 256));
327        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
328    }
329
330    #[tokio::test]
331    async fn sampling_png_magic_picks_passthrough() {
332        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
333        let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
334        payload.extend(std::iter::repeat_n(b'b', 256));
335        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
336    }
337
338    #[tokio::test]
339    async fn sampling_mp4_ftyp_picks_passthrough() {
340        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
341        let mut payload = vec![0u8; 256];
342        payload[4..8].copy_from_slice(b"ftyp");
343        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
344    }
345
346    #[test]
347    fn entropy_zero_for_uniform() {
348        let zeros = vec![0u8; 1024];
349        assert_eq!(shannon_entropy(&zeros), 0.0);
350    }
351
352    // ===========================================================
353    // v0.8 #56: GPU auto-detect / size-hint promotion
354    // ===========================================================
355
356    /// Build a 1 KiB low-entropy text sample (repeats a sentence) — the
357    /// post-magic-byte / post-entropy decision falls through to `default`,
358    /// which the v0.8 #56 promotion logic then either keeps as `CpuZstd`
359    /// or rewrites to `NvcompZstd`.
360    fn text_sample() -> Vec<u8> {
361        "the quick brown fox jumps over the lazy dog. "
362            .repeat(30)
363            .into_bytes()
364    }
365
366    #[tokio::test]
367    async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
368        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
369        let sample = text_sample();
370        // 2 MiB total body — past the 1 MiB threshold → GPU promotion.
371        let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
372        assert_eq!(kind, CodecKind::NvcompZstd);
373    }
374
375    #[tokio::test]
376    async fn gpu_pref_keeps_small_object_on_cpu() {
377        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
378        let sample = text_sample();
379        // 100 KiB total body — under the 1 MiB threshold → GPU upload
380        // overhead would exceed compress savings, stay on CPU.
381        let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
382        assert_eq!(kind, CodecKind::CpuZstd);
383    }
384
385    #[tokio::test]
386    async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
387        // Default — no `with_gpu_preference` call → prefer_gpu = false.
388        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
389        let sample = text_sample();
390        let kind = d.pick_with_size_hint(&sample, Some(10 * 1024 * 1024)).await;
391        assert_eq!(kind, CodecKind::CpuZstd);
392    }
393
394    #[tokio::test]
395    async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
396        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
397        // High-entropy pseudo-random payload → entropy filter wins,
398        // returns Passthrough; GPU promotion is skipped because
399        // already-compressed data won't compress further on GPU either.
400        let mut state: u64 = 0xfeed_beef_dead_c0de;
401        let mut payload = Vec::with_capacity(4096);
402        for _ in 0..4096 {
403            state ^= state << 13;
404            state ^= state >> 7;
405            state ^= state << 17;
406            payload.push((state & 0xff) as u8);
407        }
408        let kind = d.pick_with_size_hint(&payload, Some(8 * 1024 * 1024)).await;
409        assert_eq!(kind, CodecKind::Passthrough);
410    }
411
412    #[tokio::test]
413    async fn gpu_pref_with_no_size_hint_stays_conservative() {
414        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
415        let sample = text_sample();
416        // Chunked transfer: caller has no Content-Length, so total_size =
417        // None. We can't safely commit to GPU because the body might be
418        // tiny — stay on CPU.
419        let kind = d.pick_with_size_hint(&sample, None).await;
420        assert_eq!(kind, CodecKind::CpuZstd);
421    }
422
423    #[test]
424    fn entropy_full_8_for_each_byte_once() {
425        // 0..256 を 1 度ずつ → 各 byte の確率 1/256 → entropy = 8 bits
426        let mut payload: Vec<u8> = (0..=255).collect();
427        // 256 byte は最小 sample 未満になりうるので 1024 まで複製 (entropy は不変)
428        let copy = payload.clone();
429        for _ in 0..3 {
430            payload.extend_from_slice(&copy);
431        }
432        let e = shannon_entropy(&payload);
433        assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
434    }
435}