s4_codec/dispatcher.rs
1//! PUT 時にどの codec で圧縮するかを選ぶ dispatcher。
2//!
3//! Phase 1 では「常に同じ codec を選ぶ」`AlwaysDispatcher` を提供。
4//! Phase 1 後半で `SamplingDispatcher` を追加し、入力先頭の sampling で
5//! integer 主体 / text 主体 / 既圧縮 を判定して codec を切り替える。
6
7use crate::CodecKind;
8
9/// PUT body の先頭 sample から codec を選ぶ trait。
10///
11/// v0.8 #56: 呼び出し側が `Content-Length` を知っている場合 (chunked transfer
12/// でない通常 PUT)、`pick_with_size_hint` 経由で total body size を渡せる。
13/// `SamplingDispatcher` は GPU upload overhead が compress 時間を上回る小オブ
14/// ジェクトで CPU codec を選び、十分大きい (>= `gpu_min_bytes`) ものでだけ
15/// GPU codec へ昇格させる。size hint が `None` (chunked transfer) の場合は
16/// 保守的に CPU 側に倒す。
17///
18/// 既定実装は `pick_with_size_hint(sample, None)` を `pick(sample)` に委譲する
19/// — 既存 implementor は `pick` だけ実装すれば従来通り動く。
20#[async_trait::async_trait]
21pub trait CodecDispatcher: Send + Sync {
22 async fn pick(&self, sample: &[u8]) -> CodecKind;
23
24 /// v0.8 #56: size-hint aware pick. 既定実装は `pick(sample)` に委譲する
25 /// ので、追加情報を活用する dispatcher (`SamplingDispatcher`) のみ override
26 /// すればよい。`total_size = None` は「chunked transfer で content-length
27 /// が無い」ケースを表す。
28 async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
29 self.pick(sample).await
30 }
31}
32
33/// 常に同じ kind を返す dispatcher (固定 codec 運用)。
34#[derive(Debug, Clone, Copy)]
35pub struct AlwaysDispatcher(pub CodecKind);
36
37#[async_trait::async_trait]
38impl CodecDispatcher for AlwaysDispatcher {
39 async fn pick(&self, _sample: &[u8]) -> CodecKind {
40 self.0
41 }
42}
43
44/// 入力 sample を見て codec を選ぶ dispatcher。
45///
46/// 判定順 (上位優先):
47/// 1. 短すぎる入力 (<128 byte) → `default`
48/// 2. magic bytes が既圧縮フォーマット (gzip / zstd / png / jpeg / mp4 / zip / pdf
49/// / 7z / xz / bzip2) → `Passthrough` (再圧縮しても意味がない)
50/// 3. Shannon entropy が `entropy_threshold` (default 7.5 bits/byte) 以上 → `Passthrough`
51/// (高エントロピー = ほぼランダム = 圧縮余地なし)
52/// 4. それ以外 → `default` (text / log / parquet 数値列等、圧縮余地あり)
53///
54/// Phase 1 では `default = CpuZstd` 想定。Phase 1 後半で integer-column 検出を加え、
55/// `default` 分岐を「数値列なら NvcompBitcomp、そうでなければ CpuZstd」に拡張する。
56///
57/// ## v0.8 #56: GPU auto-detect at boot
58///
59/// `with_gpu_preference(true, gpu_min_bytes)` を呼ぶと、boot 時に
60/// `s4_codec::nvcomp::is_gpu_available()` が true を返した場合に限り、
61/// 「default が `CpuZstd` でかつ total size >= `gpu_min_bytes` の object」を
62/// `NvcompZstd` に昇格させる。size hint が `None` (chunked transfer)、
63/// または閾値未満の小オブジェクトでは GPU upload overhead を避けるため
64/// CPU codec のままにする。
65///
66/// `nvcomp-gpu` feature が build-time で off の場合、`NvcompZstd` への昇格は
67/// 行わない (registry に居ない codec を指すと dispatch 時に
68/// `UnregisteredCodec` で fail するため)。orchestrator は main.rs 側で
69/// `prefer_gpu = false` を強制することでこれを担保する。
70#[derive(Debug, Clone)]
71pub struct SamplingDispatcher {
72 pub default: CodecKind,
73 pub entropy_threshold: f64,
74 /// v0.8 #56: when set, route large `CpuZstd` picks through `NvcompZstd`.
75 pub prefer_gpu: bool,
76 /// v0.8 #56: GPU promotion only fires when the caller can prove
77 /// `total_size >= gpu_min_bytes` via `pick_with_size_hint`. Below this
78 /// threshold the GPU upload overhead exceeds the compress time so CPU
79 /// wins; the default 1 MiB is the empirical break-even point on common
80 /// text / log payloads with PCIe 4.0 + an A10G-class GPU.
81 pub gpu_min_bytes: usize,
82 /// v0.8.12 #125: when set, sample-based columnar-integer detection
83 /// promotes a `CpuZstd` pick to `NvcompBitcomp` instead of
84 /// `NvcompZstd` for Parquet / postings / time-series payloads.
85 /// Requires the same `prefer_gpu = true` and
86 /// `total_size >= gpu_min_bytes` preconditions — the columnar
87 /// promotion adds *targeting* on top of the GPU-promotion gate,
88 /// it doesn't loosen it. When `false` (default), large CpuZstd
89 /// picks always go to NvcompZstd, matching v0.8.11 behaviour.
90 pub prefer_columnar_gpu: bool,
91}
92
93impl SamplingDispatcher {
94 pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
95 pub const MIN_SAMPLE_BYTES: usize = 128;
96 /// v0.8 #56: 1 MiB. The empirical break-even point — below this, the
97 /// PCIe upload + kernel launch overhead dominates the GPU's compress
98 /// throughput advantage.
99 pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
100
101 pub fn new(default: CodecKind) -> Self {
102 Self {
103 default,
104 entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
105 prefer_gpu: false,
106 gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
107 prefer_columnar_gpu: false,
108 }
109 }
110
111 /// v0.8.12 #125: enable Bitcomp routing for columnar-integer
112 /// payloads. Composes with `with_gpu_preference` — both must be
113 /// on for any promotion to fire, and the columnar branch picks
114 /// `NvcompBitcomp` instead of `NvcompZstd` when the sample
115 /// matches the per-position-entropy signature of a u32 / u64 LE
116 /// integer column (Parquet, postings, time-series). When this
117 /// flag is off (default) the README's "integer/columnar →
118 /// Bitcomp" pitch is honoured manually via `--codec
119 /// nvcomp-bitcomp`; turning it on makes the SamplingDispatcher
120 /// pick Bitcomp automatically.
121 #[must_use]
122 pub fn with_columnar_gpu_preference(mut self, on: bool) -> Self {
123 self.prefer_columnar_gpu = on;
124 self
125 }
126
127 #[must_use]
128 pub fn with_entropy_threshold(mut self, t: f64) -> Self {
129 self.entropy_threshold = t;
130 self
131 }
132
133 /// v0.8 #56: enable GPU promotion. When `prefer_gpu = true`, a `CpuZstd`
134 /// pick on a body whose `total_size >= gpu_min_bytes` is rewritten to
135 /// `NvcompZstd`. Pass `prefer_gpu = false` (the default) to disable.
136 /// The threshold is in bytes; `1_048_576` (1 MiB) is the recommended
137 /// default for PCIe 4.0 hosts.
138 #[must_use]
139 pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
140 self.prefer_gpu = prefer_gpu;
141 self.gpu_min_bytes = gpu_min_bytes;
142 self
143 }
144}
145
146/// Shannon entropy (bits per byte) を sample から推定。0..=8 の範囲。
147fn shannon_entropy(sample: &[u8]) -> f64 {
148 if sample.is_empty() {
149 return 0.0;
150 }
151 let mut counts = [0u32; 256];
152 for &b in sample {
153 counts[b as usize] += 1;
154 }
155 let n = sample.len() as f64;
156 let mut entropy = 0.0;
157 for c in counts {
158 if c == 0 {
159 continue;
160 }
161 let p = f64::from(c) / n;
162 entropy -= p * p.log2();
163 }
164 entropy
165}
166
167/// v0.8.12 #125: minimum sample size at which the columnar-integer
168/// signature is statistically meaningful. Below this we'd be reading
169/// noise into the per-stride-position byte histogram. 512 bytes =
170/// 128 u32-stride samples per position, ~64 u64-stride samples.
171const COLUMNAR_MIN_SAMPLE: usize = 512;
172/// v0.8.12 #125: per-stride-position entropy gap that flags a sample
173/// as columnar-integer. Random data has near-uniform per-position
174/// entropy (gap ≈ 0); a u32 LE column of bounded values
175/// (`value < 2^24`) has full entropy on the low byte and ~0 entropy
176/// on the high byte (gap > 6). 4.0 bits is a conservative middle
177/// ground that catches u32 / u64 monotonic-id and timestamp columns
178/// without false-positives on text or mixed binary records.
179const COLUMNAR_ENTROPY_GAP: f64 = 4.0;
180/// v0.8.12 #125: per-position byte-histogram entropy. Reused for
181/// each stride position in [`looks_columnar_integer`]; same `[u8; 256]`
182/// shape as [`shannon_entropy`] for the whole sample.
183fn entropy_at_stride_position(sample: &[u8], stride: usize, pos: usize) -> f64 {
184 debug_assert!(pos < stride);
185 debug_assert!(stride > 0);
186 let mut counts = [0u32; 256];
187 let mut n = 0u32;
188 let mut i = pos;
189 while i < sample.len() {
190 counts[sample[i] as usize] += 1;
191 n += 1;
192 i += stride;
193 }
194 if n == 0 {
195 return 0.0;
196 }
197 let nf = f64::from(n);
198 let mut e = 0.0;
199 for c in counts {
200 if c == 0 {
201 continue;
202 }
203 let p = f64::from(c) / nf;
204 e -= p * p.log2();
205 }
206 e
207}
208
209/// v0.8.12 #125: detect a u32 / u64 little-endian integer column in
210/// the sample. Returns `true` when one stride's per-position entropy
211/// gap exceeds [`COLUMNAR_ENTROPY_GAP`] — the signature of a column
212/// whose high bytes are mostly zero (bounded ints) while the low
213/// bytes vary freely (counts / timestamps / sorted ids). Conservative
214/// by design: tested against Parquet u32 / u64 columns
215/// (`apache-parquet/test/data/`), pseudo-random bytes, English text,
216/// and DNA reads — only the integer columns trip the gap.
217fn looks_columnar_integer(sample: &[u8]) -> bool {
218 if sample.len() < COLUMNAR_MIN_SAMPLE {
219 return false;
220 }
221 for &stride in &[4usize, 8usize] {
222 // Need ≥ 64 strides for the per-position histogram to be
223 // stable; below that, even random data shows large gaps.
224 if sample.len() < stride * 64 {
225 continue;
226 }
227 let mut min_e = f64::INFINITY;
228 let mut max_e = f64::NEG_INFINITY;
229 for pos in 0..stride {
230 let e = entropy_at_stride_position(sample, stride, pos);
231 if e < min_e {
232 min_e = e;
233 }
234 if e > max_e {
235 max_e = e;
236 }
237 }
238 if max_e - min_e >= COLUMNAR_ENTROPY_GAP {
239 return true;
240 }
241 }
242 false
243}
244
245/// v0.8.15 M-7: confirm that the bytes *after* the magic-byte prefix
246/// look like compressed data (high entropy), not benign text whose
247/// leading 2-3 bytes happen to spell the magic. Returns `true` when
248/// the post-magic window has entropy `>= threshold` (default 7.5).
249/// Operates on `sample[16..]` ── 16 bytes of skip is enough to clear
250/// every magic this dispatcher knows about while leaving plenty of
251/// runway for the entropy estimate to be statistically meaningful.
252/// Returns `true` (skip the check) when the sample is too short to
253/// inspect, so behaviour matches the pre-M-7 path for tiny samples.
254fn post_magic_entropy_high(sample: &[u8], threshold: f64) -> bool {
255 const SKIP: usize = 16;
256 if sample.len() <= SKIP + 32 {
257 return true;
258 }
259 shannon_entropy(&sample[SKIP..]) >= threshold
260}
261
262/// 既圧縮データの magic bytes 検出。検出した場合は true を返す。
263fn looks_already_compressed(sample: &[u8]) -> bool {
264 // gzip
265 if sample.starts_with(&[0x1f, 0x8b]) {
266 return true;
267 }
268 // zstd
269 if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
270 return true;
271 }
272 // PNG
273 if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
274 return true;
275 }
276 // JPEG (FF D8 FF)
277 if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
278 return true;
279 }
280 // PDF
281 if sample.starts_with(b"%PDF-") {
282 return true;
283 }
284 // ZIP / docx / jar / apk
285 if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
286 return true;
287 }
288 // 7z
289 if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
290 return true;
291 }
292 // xz
293 if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
294 return true;
295 }
296 // bzip2
297 if sample.starts_with(b"BZh") {
298 return true;
299 }
300 // mp4 / m4a / mov (ISO Base Media): bytes 4..8 == "ftyp"
301 if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
302 return true;
303 }
304 // webm / mkv (EBML)
305 if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
306 return true;
307 }
308 // webp (RIFF .... WEBP)
309 if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
310 return true;
311 }
312 false
313}
314
315impl SamplingDispatcher {
316 /// Core sample-only decision shared by `pick` and `pick_with_size_hint`.
317 /// Returns the pre-GPU-promotion choice; the size-hint-aware caller may
318 /// rewrite a `CpuZstd` result to `NvcompZstd` when the body is big enough.
319 ///
320 /// # Adversarial limitations (v0.8.15 M-6 / M-7)
321 ///
322 /// The sample is just the prefix the listener captured (typically
323 /// the first 4 KiB). An attacker who controls the upload bytes
324 /// can:
325 ///
326 /// - **Trick passthrough into firing** by prefixing a gzip / zstd
327 /// magic and following it with 10 GiB of zeros, costing the
328 /// gateway disk space the operator expected to save. Mitigated
329 /// by requiring the post-magic window to *also* show high
330 /// entropy — real compressed bytes have both, an unscrupulous
331 /// text payload won't.
332 /// - **Trick passthrough into NOT firing** by prefixing 4 KiB of
333 /// zeros to an already-compressed body, costing CPU on a
334 /// useless compress pass. The dispatcher cannot defend against
335 /// this without re-sampling other windows (a v0.8.15 follow-up;
336 /// would require listener-side changes to capture multiple
337 /// windows, not just the prefix).
338 ///
339 /// The sample-only path is "best-effort", not "adversarial".
340 /// Operators who need an adversarial guarantee should set
341 /// `--dispatcher always --codec cpu-zstd` (compress everything)
342 /// or `--codec passthrough` (compress nothing) and bypass the
343 /// sampler entirely.
344 fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
345 if sample.len() < Self::MIN_SAMPLE_BYTES {
346 return self.default;
347 }
348 // v0.8.15 M-7: magic-byte passthrough is only honoured when
349 // the post-magic window *also* exhibits high entropy. A user
350 // log file that happens to start with `BZh` (or any other
351 // 2-3 byte magic by coincidence) won't have a high-entropy
352 // body — those should keep being compressed, not silently
353 // passthrough'd. Real compressed data has both signals.
354 if looks_already_compressed(sample)
355 && post_magic_entropy_high(sample, self.entropy_threshold)
356 {
357 return CodecKind::Passthrough;
358 }
359 if shannon_entropy(sample) >= self.entropy_threshold {
360 return CodecKind::Passthrough;
361 }
362 self.default
363 }
364
365 /// v0.8 #56 / v0.8.12 #125: rewrite a `CpuZstd` pick to a GPU
366 /// codec when GPU preference is on AND the caller proved a total
367 /// body size >= `gpu_min_bytes`. v0.8.12 adds the columnar-integer
368 /// branch: when `prefer_columnar_gpu = true` AND the sample
369 /// matches the per-stride-position entropy signature of a
370 /// u32 / u64 LE integer column, route to `NvcompBitcomp` instead
371 /// of `NvcompZstd`. Passthrough / non-CpuZstd picks are left
372 /// alone — already-compressed bodies don't benefit from GPU
373 /// compression, and other CPU codecs (CpuGzip) imply the
374 /// operator wants wire-compatible output that the nvCOMP codecs
375 /// can't provide.
376 fn maybe_promote_to_gpu(
377 &self,
378 chosen: CodecKind,
379 sample: &[u8],
380 total_size: Option<u64>,
381 ) -> CodecKind {
382 if !self.prefer_gpu {
383 return chosen;
384 }
385 if chosen != CodecKind::CpuZstd {
386 return chosen;
387 }
388 let big_enough = match total_size {
389 Some(n) => n >= self.gpu_min_bytes as u64,
390 // No size hint (chunked transfer) → conservative, keep CpuZstd.
391 None => return chosen,
392 };
393 if !big_enough {
394 return chosen;
395 }
396 if self.prefer_columnar_gpu && looks_columnar_integer(sample) {
397 CodecKind::NvcompBitcomp
398 } else {
399 CodecKind::NvcompZstd
400 }
401 }
402}
403
404#[async_trait::async_trait]
405impl CodecDispatcher for SamplingDispatcher {
406 async fn pick(&self, sample: &[u8]) -> CodecKind {
407 // No size hint available → never promote to GPU.
408 self.pick_from_sample(sample)
409 }
410
411 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
412 let chosen = self.pick_from_sample(sample);
413 self.maybe_promote_to_gpu(chosen, sample, total_size)
414 }
415}
416
417/// `Box<dyn CodecDispatcher>` からも `CodecDispatcher` として使えるようにする blanket impl
418#[async_trait::async_trait]
419impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
420 async fn pick(&self, sample: &[u8]) -> CodecKind {
421 (**self).pick(sample).await
422 }
423
424 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
425 (**self).pick_with_size_hint(sample, total_size).await
426 }
427}
428
429#[async_trait::async_trait]
430impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
431 async fn pick(&self, sample: &[u8]) -> CodecKind {
432 (**self).pick(sample).await
433 }
434
435 async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
436 (**self).pick_with_size_hint(sample, total_size).await
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443
444 #[tokio::test]
445 async fn always_dispatcher_returns_configured_kind() {
446 let d = AlwaysDispatcher(CodecKind::CpuZstd);
447 assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
448 }
449
450 #[tokio::test]
451 async fn boxed_dispatcher_works() {
452 let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
453 assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
454 }
455
456 #[tokio::test]
457 async fn sampling_short_sample_uses_default() {
458 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
459 assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
460 }
461
462 #[tokio::test]
463 async fn sampling_text_picks_default() {
464 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
465 // 1 KB の英語っぽい text (低エントロピー)
466 let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
467 .repeat(30)
468 .into_bytes();
469 assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
470 }
471
472 #[tokio::test]
473 async fn sampling_random_bytes_picks_passthrough() {
474 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
475 // 1 KB の高エントロピー (擬似ランダムデータを作る — XOR-shift で uniformish に)
476 let mut state: u64 = 0xfeed_beef_dead_c0de;
477 let mut payload = Vec::with_capacity(4096);
478 for _ in 0..4096 {
479 state ^= state << 13;
480 state ^= state >> 7;
481 state ^= state << 17;
482 payload.push((state & 0xff) as u8);
483 }
484 // entropy が default threshold (7.5) 以上のはず
485 let e = shannon_entropy(&payload);
486 assert!(
487 e > 7.5,
488 "expected high entropy on pseudo-random bytes, got {e}"
489 );
490 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
491 }
492
493 #[tokio::test]
494 async fn sampling_gzip_magic_picks_passthrough() {
495 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
496 // v0.8.15 M-7: the post-magic window must also look like
497 // compressed bytes (high entropy) for passthrough to fire.
498 // Use random-ish bytes instead of repeating `a` so the
499 // post-magic check passes.
500 let mut payload = vec![0x1f, 0x8b, 0x08]; // gzip magic + DEFLATE method
501 let mut state: u64 = 0xdead_c0de_feed_beef;
502 for _ in 0..512 {
503 state ^= state << 13;
504 state ^= state >> 7;
505 state ^= state << 17;
506 payload.push((state & 0xff) as u8);
507 }
508 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
509 }
510
511 /// v0.8.15 M-7: a user log file starting with `BZh` followed by
512 /// English text (low entropy) MUST NOT trigger passthrough — the
513 /// pre-M-7 magic-byte check fired on that prefix alone, silently
514 /// skipping compression on customer logs that happened to begin
515 /// with bzip2's 3-byte magic.
516 #[tokio::test]
517 async fn sampling_magic_prefix_but_low_entropy_body_compresses() {
518 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
519 let mut payload = b"BZh just a log line\n".to_vec();
520 // Append low-entropy English text to fill the sample window.
521 payload.extend(
522 "the quick brown fox jumps over the lazy dog. "
523 .repeat(20)
524 .into_bytes(),
525 );
526 assert_eq!(d.pick(&payload).await, CodecKind::CpuZstd);
527 }
528
529 #[tokio::test]
530 async fn sampling_png_magic_picks_passthrough() {
531 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
532 // v0.8.15 M-7: real PNG bytes have high entropy after the
533 // magic — pseudo-random fill exercises the new "magic +
534 // post-magic high entropy" branch.
535 let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
536 let mut state: u64 = 0xc0de_f00d_dead_face;
537 for _ in 0..512 {
538 state ^= state << 13;
539 state ^= state >> 7;
540 state ^= state << 17;
541 payload.push((state & 0xff) as u8);
542 }
543 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
544 }
545
546 #[tokio::test]
547 async fn sampling_mp4_ftyp_picks_passthrough() {
548 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
549 // v0.8.15 M-7: same shape — magic at bytes 4..8 plus a
550 // high-entropy body after for the post-magic check.
551 let mut payload = vec![0u8; 8];
552 payload[4..8].copy_from_slice(b"ftyp");
553 let mut state: u64 = 0x1234_5678_dead_beef;
554 for _ in 0..512 {
555 state ^= state << 13;
556 state ^= state >> 7;
557 state ^= state << 17;
558 payload.push((state & 0xff) as u8);
559 }
560 assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
561 }
562
563 #[test]
564 fn entropy_zero_for_uniform() {
565 let zeros = vec![0u8; 1024];
566 assert_eq!(shannon_entropy(&zeros), 0.0);
567 }
568
569 // ===========================================================
570 // v0.8 #56: GPU auto-detect / size-hint promotion
571 // ===========================================================
572
573 /// Build a 1 KiB low-entropy text sample (repeats a sentence) — the
574 /// post-magic-byte / post-entropy decision falls through to `default`,
575 /// which the v0.8 #56 promotion logic then either keeps as `CpuZstd`
576 /// or rewrites to `NvcompZstd`.
577 fn text_sample() -> Vec<u8> {
578 "the quick brown fox jumps over the lazy dog. "
579 .repeat(30)
580 .into_bytes()
581 }
582
583 #[tokio::test]
584 async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
585 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
586 let sample = text_sample();
587 // 2 MiB total body — past the 1 MiB threshold → GPU promotion.
588 let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
589 assert_eq!(kind, CodecKind::NvcompZstd);
590 }
591
592 #[tokio::test]
593 async fn gpu_pref_keeps_small_object_on_cpu() {
594 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
595 let sample = text_sample();
596 // 100 KiB total body — under the 1 MiB threshold → GPU upload
597 // overhead would exceed compress savings, stay on CPU.
598 let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
599 assert_eq!(kind, CodecKind::CpuZstd);
600 }
601
602 #[tokio::test]
603 async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
604 // Default — no `with_gpu_preference` call → prefer_gpu = false.
605 let d = SamplingDispatcher::new(CodecKind::CpuZstd);
606 let sample = text_sample();
607 let kind = d.pick_with_size_hint(&sample, Some(10 * 1024 * 1024)).await;
608 assert_eq!(kind, CodecKind::CpuZstd);
609 }
610
611 #[tokio::test]
612 async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
613 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
614 // High-entropy pseudo-random payload → entropy filter wins,
615 // returns Passthrough; GPU promotion is skipped because
616 // already-compressed data won't compress further on GPU either.
617 let mut state: u64 = 0xfeed_beef_dead_c0de;
618 let mut payload = Vec::with_capacity(4096);
619 for _ in 0..4096 {
620 state ^= state << 13;
621 state ^= state >> 7;
622 state ^= state << 17;
623 payload.push((state & 0xff) as u8);
624 }
625 let kind = d.pick_with_size_hint(&payload, Some(8 * 1024 * 1024)).await;
626 assert_eq!(kind, CodecKind::Passthrough);
627 }
628
629 #[tokio::test]
630 async fn gpu_pref_with_no_size_hint_stays_conservative() {
631 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
632 let sample = text_sample();
633 // Chunked transfer: caller has no Content-Length, so total_size =
634 // None. We can't safely commit to GPU because the body might be
635 // tiny — stay on CPU.
636 let kind = d.pick_with_size_hint(&sample, None).await;
637 assert_eq!(kind, CodecKind::CpuZstd);
638 }
639
640 // ===========================================================
641 // v0.8.12 #125: columnar-integer detection + Bitcomp routing
642 // ===========================================================
643
644 /// 1 KiB of u32 LE monotonic counts (postings / sorted ids). The
645 /// low byte cycles 0..256, the middle bytes barely move, and the
646 /// high byte stays at 0 — exactly the per-position-entropy
647 /// signature `looks_columnar_integer` is built to catch.
648 fn u32_monotonic_postings() -> Vec<u8> {
649 let mut buf = Vec::with_capacity(4096);
650 for i in 0u32..1024 {
651 buf.extend_from_slice(&i.to_le_bytes());
652 }
653 buf
654 }
655
656 /// 4 KiB of u64 LE near-monotonic timestamps (Unix epoch nanos —
657 /// stride 8, the high 3 bytes are nearly constant, the bottom 5
658 /// drift slowly).
659 fn u64_timestamps() -> Vec<u8> {
660 let base: u64 = 1_700_000_000_000_000_000;
661 let mut buf = Vec::with_capacity(4096);
662 for i in 0u64..512 {
663 buf.extend_from_slice(&(base + i * 137).to_le_bytes());
664 }
665 buf
666 }
667
668 #[test]
669 fn columnar_detect_flags_u32_postings() {
670 assert!(looks_columnar_integer(&u32_monotonic_postings()));
671 }
672
673 #[test]
674 fn columnar_detect_flags_u64_timestamps() {
675 assert!(looks_columnar_integer(&u64_timestamps()));
676 }
677
678 #[test]
679 fn columnar_detect_rejects_english_text() {
680 let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
681 .repeat(50)
682 .into_bytes();
683 // English text has reasonably uniform per-stride-position
684 // entropy — no single byte position dominates the entropy.
685 assert!(!looks_columnar_integer(&text));
686 }
687
688 #[test]
689 fn columnar_detect_rejects_random_bytes() {
690 let mut state: u64 = 0xa5a5_5a5a_dead_beef;
691 let mut payload = Vec::with_capacity(4096);
692 for _ in 0..4096 {
693 state ^= state << 13;
694 state ^= state >> 7;
695 state ^= state << 17;
696 payload.push((state & 0xff) as u8);
697 }
698 assert!(!looks_columnar_integer(&payload));
699 }
700
701 #[test]
702 fn columnar_detect_rejects_too_small_sample() {
703 // 256 bytes < COLUMNAR_MIN_SAMPLE (512) — must short-circuit
704 // to `false` so we never flag a tiny request as columnar.
705 let mut buf = Vec::with_capacity(256);
706 for i in 0u32..64 {
707 buf.extend_from_slice(&i.to_le_bytes());
708 }
709 assert!(!looks_columnar_integer(&buf));
710 }
711
712 #[tokio::test]
713 async fn gpu_pref_columnar_promotes_postings_to_bitcomp() {
714 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
715 .with_gpu_preference(true, 1_048_576)
716 .with_columnar_gpu_preference(true);
717 let sample = u32_monotonic_postings();
718 let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
719 assert_eq!(kind, CodecKind::NvcompBitcomp);
720 }
721
722 #[tokio::test]
723 async fn gpu_pref_columnar_promotes_timestamps_to_bitcomp() {
724 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
725 .with_gpu_preference(true, 1_048_576)
726 .with_columnar_gpu_preference(true);
727 let sample = u64_timestamps();
728 let kind = d.pick_with_size_hint(&sample, Some(4 * 1024 * 1024)).await;
729 assert_eq!(kind, CodecKind::NvcompBitcomp);
730 }
731
732 #[tokio::test]
733 async fn gpu_pref_columnar_falls_through_to_zstd_on_text() {
734 // Columnar detector rejects text → Bitcomp routing skipped,
735 // existing NvcompZstd promotion (#56) takes over.
736 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
737 .with_gpu_preference(true, 1_048_576)
738 .with_columnar_gpu_preference(true);
739 let sample = text_sample();
740 let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
741 assert_eq!(kind, CodecKind::NvcompZstd);
742 }
743
744 #[tokio::test]
745 async fn gpu_pref_columnar_off_keeps_postings_on_zstd() {
746 // Default — `with_columnar_gpu_preference` NOT called → the
747 // README's "manual `--codec nvcomp-bitcomp`" path is the
748 // only way to reach Bitcomp.
749 let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
750 let sample = u32_monotonic_postings();
751 let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
752 assert_eq!(kind, CodecKind::NvcompZstd);
753 }
754
755 #[tokio::test]
756 async fn gpu_pref_columnar_respects_size_threshold() {
757 // Columnar payload but under the gpu_min_bytes threshold →
758 // GPU upload overhead would exceed the compress gain, stay
759 // on CpuZstd. The Bitcomp branch must not bypass the size
760 // gate.
761 let d = SamplingDispatcher::new(CodecKind::CpuZstd)
762 .with_gpu_preference(true, 1_048_576)
763 .with_columnar_gpu_preference(true);
764 let sample = u32_monotonic_postings();
765 let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
766 assert_eq!(kind, CodecKind::CpuZstd);
767 }
768
769 #[test]
770 fn entropy_full_8_for_each_byte_once() {
771 // 0..256 を 1 度ずつ → 各 byte の確率 1/256 → entropy = 8 bits
772 let mut payload: Vec<u8> = (0..=255).collect();
773 // 256 byte は最小 sample 未満になりうるので 1024 まで複製 (entropy は不変)
774 let copy = payload.clone();
775 for _ in 0..3 {
776 payload.extend_from_slice(©);
777 }
778 let e = shannon_entropy(&payload);
779 assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
780 }
781}