Skip to main content

s4_codec/
dispatcher.rs

1//! PUT 時にどの codec で圧縮するかを選ぶ dispatcher。
2//!
3//! Phase 1 では「常に同じ codec を選ぶ」`AlwaysDispatcher` を提供。
4//! Phase 1 後半で `SamplingDispatcher` を追加し、入力先頭の sampling で
5//! integer 主体 / text 主体 / 既圧縮 を判定して codec を切り替える。
6
7use crate::CodecKind;
8
9/// PUT body の先頭 sample から codec を選ぶ trait。
10///
11/// v0.8 #56: 呼び出し側が `Content-Length` を知っている場合 (chunked transfer
12/// でない通常 PUT)、`pick_with_size_hint` 経由で total body size を渡せる。
13/// `SamplingDispatcher` は GPU upload overhead が compress 時間を上回る小オブ
14/// ジェクトで CPU codec を選び、十分大きい (>= `gpu_min_bytes`) ものでだけ
15/// GPU codec へ昇格させる。size hint が `None` (chunked transfer) の場合は
16/// 保守的に CPU 側に倒す。
17///
18/// 既定実装は `pick_with_size_hint(sample, None)` を `pick(sample)` に委譲する
19/// — 既存 implementor は `pick` だけ実装すれば従来通り動く。
20#[async_trait::async_trait]
21pub trait CodecDispatcher: Send + Sync {
22    async fn pick(&self, sample: &[u8]) -> CodecKind;
23
24    /// v0.8 #56: size-hint aware pick. 既定実装は `pick(sample)` に委譲する
25    /// ので、追加情報を活用する dispatcher (`SamplingDispatcher`) のみ override
26    /// すればよい。`total_size = None` は「chunked transfer で content-length
27    /// が無い」ケースを表す。
28    async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
29        self.pick(sample).await
30    }
31}
32
33/// 常に同じ kind を返す dispatcher (固定 codec 運用)。
34#[derive(Debug, Clone, Copy)]
35pub struct AlwaysDispatcher(pub CodecKind);
36
37#[async_trait::async_trait]
38impl CodecDispatcher for AlwaysDispatcher {
39    async fn pick(&self, _sample: &[u8]) -> CodecKind {
40        self.0
41    }
42}
43
44/// 入力 sample を見て codec を選ぶ dispatcher。
45///
46/// 判定順 (上位優先):
47/// 1. 短すぎる入力 (<128 byte) → `default`
48/// 2. magic bytes が既圧縮フォーマット (gzip / zstd / png / jpeg / mp4 / zip / pdf
49///    / 7z / xz / bzip2) → `Passthrough` (再圧縮しても意味がない)
50/// 3. Shannon entropy が `entropy_threshold` (default 7.5 bits/byte) 以上 → `Passthrough`
51///    (高エントロピー = ほぼランダム = 圧縮余地なし)
52/// 4. それ以外 → `default` (text / log / parquet 数値列等、圧縮余地あり)
53///
54/// Phase 1 では `default = CpuZstd` 想定。Phase 1 後半で integer-column 検出を加え、
55/// `default` 分岐を「数値列なら NvcompBitcomp、そうでなければ CpuZstd」に拡張する。
56///
57/// ## v0.8 #56: GPU auto-detect at boot
58///
59/// `with_gpu_preference(true, gpu_min_bytes)` を呼ぶと、boot 時に
60/// `s4_codec::nvcomp::is_gpu_available()` が true を返した場合に限り、
61/// 「default が `CpuZstd` でかつ total size >= `gpu_min_bytes` の object」を
62/// `NvcompZstd` に昇格させる。size hint が `None` (chunked transfer)、
63/// または閾値未満の小オブジェクトでは GPU upload overhead を避けるため
64/// CPU codec のままにする。
65///
66/// `nvcomp-gpu` feature が build-time で off の場合、`NvcompZstd` への昇格は
67/// 行わない (registry に居ない codec を指すと dispatch 時に
68/// `UnregisteredCodec` で fail するため)。orchestrator は main.rs 側で
69/// `prefer_gpu = false` を強制することでこれを担保する。
70#[derive(Debug, Clone)]
71pub struct SamplingDispatcher {
72    pub default: CodecKind,
73    pub entropy_threshold: f64,
74    /// v0.8 #56: when set, route large `CpuZstd` picks through `NvcompZstd`.
75    pub prefer_gpu: bool,
76    /// v0.8 #56: GPU promotion only fires when the caller can prove
77    /// `total_size >= gpu_min_bytes` via `pick_with_size_hint`. Below this
78    /// threshold the GPU upload overhead exceeds the compress time so CPU
79    /// wins; the default 1 MiB is the empirical break-even point on common
80    /// text / log payloads with PCIe 4.0 + an A10G-class GPU.
81    pub gpu_min_bytes: usize,
82    /// v0.8.12 #125: when set, sample-based columnar-integer detection
83    /// promotes a `CpuZstd` pick to `NvcompBitcomp` instead of
84    /// `NvcompZstd` for Parquet / postings / time-series payloads.
85    /// Requires the same `prefer_gpu = true` and
86    /// `total_size >= gpu_min_bytes` preconditions — the columnar
87    /// promotion adds *targeting* on top of the GPU-promotion gate,
88    /// it doesn't loosen it. When `false` (default), large CpuZstd
89    /// picks always go to NvcompZstd, matching v0.8.11 behaviour.
90    pub prefer_columnar_gpu: bool,
91}
92
93impl SamplingDispatcher {
94    pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
95    pub const MIN_SAMPLE_BYTES: usize = 128;
96    /// v0.8 #56: 1 MiB. The empirical break-even point — below this, the
97    /// PCIe upload + kernel launch overhead dominates the GPU's compress
98    /// throughput advantage.
99    pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
100
101    pub fn new(default: CodecKind) -> Self {
102        Self {
103            default,
104            entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
105            prefer_gpu: false,
106            gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
107            prefer_columnar_gpu: false,
108        }
109    }
110
111    /// v0.8.12 #125: enable Bitcomp routing for columnar-integer
112    /// payloads. Composes with `with_gpu_preference` — both must be
113    /// on for any promotion to fire, and the columnar branch picks
114    /// `NvcompBitcomp` instead of `NvcompZstd` when the sample
115    /// matches the per-position-entropy signature of a u32 / u64 LE
116    /// integer column (Parquet, postings, time-series). When this
117    /// flag is off (default) the README's "integer/columnar →
118    /// Bitcomp" pitch is honoured manually via `--codec
119    /// nvcomp-bitcomp`; turning it on makes the SamplingDispatcher
120    /// pick Bitcomp automatically.
121    #[must_use]
122    pub fn with_columnar_gpu_preference(mut self, on: bool) -> Self {
123        self.prefer_columnar_gpu = on;
124        self
125    }
126
127    #[must_use]
128    pub fn with_entropy_threshold(mut self, t: f64) -> Self {
129        self.entropy_threshold = t;
130        self
131    }
132
133    /// v0.8 #56: enable GPU promotion. When `prefer_gpu = true`, a `CpuZstd`
134    /// pick on a body whose `total_size >= gpu_min_bytes` is rewritten to
135    /// `NvcompZstd`. Pass `prefer_gpu = false` (the default) to disable.
136    /// The threshold is in bytes; `1_048_576` (1 MiB) is the recommended
137    /// default for PCIe 4.0 hosts.
138    #[must_use]
139    pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
140        self.prefer_gpu = prefer_gpu;
141        self.gpu_min_bytes = gpu_min_bytes;
142        self
143    }
144}
145
146/// Shannon entropy (bits per byte) を sample から推定。0..=8 の範囲。
147fn shannon_entropy(sample: &[u8]) -> f64 {
148    if sample.is_empty() {
149        return 0.0;
150    }
151    let mut counts = [0u32; 256];
152    for &b in sample {
153        counts[b as usize] += 1;
154    }
155    let n = sample.len() as f64;
156    let mut entropy = 0.0;
157    for c in counts {
158        if c == 0 {
159            continue;
160        }
161        let p = f64::from(c) / n;
162        entropy -= p * p.log2();
163    }
164    entropy
165}
166
167/// v0.8.12 #125: minimum sample size at which the columnar-integer
168/// signature is statistically meaningful. Below this we'd be reading
169/// noise into the per-stride-position byte histogram. 512 bytes =
170/// 128 u32-stride samples per position, ~64 u64-stride samples.
171const COLUMNAR_MIN_SAMPLE: usize = 512;
172/// v0.8.12 #125: per-stride-position entropy gap that flags a sample
173/// as columnar-integer. Random data has near-uniform per-position
174/// entropy (gap ≈ 0); a u32 LE column of bounded values
175/// (`value < 2^24`) has full entropy on the low byte and ~0 entropy
176/// on the high byte (gap > 6). 4.0 bits is a conservative middle
177/// ground that catches u32 / u64 monotonic-id and timestamp columns
178/// without false-positives on text or mixed binary records.
179const COLUMNAR_ENTROPY_GAP: f64 = 4.0;
180/// v0.8.12 #125: per-position byte-histogram entropy. Reused for
181/// each stride position in [`looks_columnar_integer`]; same `[u8; 256]`
182/// shape as [`shannon_entropy`] for the whole sample.
183fn entropy_at_stride_position(sample: &[u8], stride: usize, pos: usize) -> f64 {
184    debug_assert!(pos < stride);
185    debug_assert!(stride > 0);
186    let mut counts = [0u32; 256];
187    let mut n = 0u32;
188    let mut i = pos;
189    while i < sample.len() {
190        counts[sample[i] as usize] += 1;
191        n += 1;
192        i += stride;
193    }
194    if n == 0 {
195        return 0.0;
196    }
197    let nf = f64::from(n);
198    let mut e = 0.0;
199    for c in counts {
200        if c == 0 {
201            continue;
202        }
203        let p = f64::from(c) / nf;
204        e -= p * p.log2();
205    }
206    e
207}
208
209/// v0.8.12 #125: detect a u32 / u64 little-endian integer column in
210/// the sample. Returns `true` when one stride's per-position entropy
211/// gap exceeds [`COLUMNAR_ENTROPY_GAP`] — the signature of a column
212/// whose high bytes are mostly zero (bounded ints) while the low
213/// bytes vary freely (counts / timestamps / sorted ids). Conservative
214/// by design: tested against Parquet u32 / u64 columns
215/// (`apache-parquet/test/data/`), pseudo-random bytes, English text,
216/// and DNA reads — only the integer columns trip the gap.
217fn looks_columnar_integer(sample: &[u8]) -> bool {
218    if sample.len() < COLUMNAR_MIN_SAMPLE {
219        return false;
220    }
221    for &stride in &[4usize, 8usize] {
222        // Need ≥ 64 strides for the per-position histogram to be
223        // stable; below that, even random data shows large gaps.
224        if sample.len() < stride * 64 {
225            continue;
226        }
227        let mut min_e = f64::INFINITY;
228        let mut max_e = f64::NEG_INFINITY;
229        for pos in 0..stride {
230            let e = entropy_at_stride_position(sample, stride, pos);
231            if e < min_e {
232                min_e = e;
233            }
234            if e > max_e {
235                max_e = e;
236            }
237        }
238        if max_e - min_e >= COLUMNAR_ENTROPY_GAP {
239            return true;
240        }
241    }
242    false
243}
244
245/// 既圧縮データの magic bytes 検出。検出した場合は true を返す。
246fn looks_already_compressed(sample: &[u8]) -> bool {
247    // gzip
248    if sample.starts_with(&[0x1f, 0x8b]) {
249        return true;
250    }
251    // zstd
252    if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
253        return true;
254    }
255    // PNG
256    if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
257        return true;
258    }
259    // JPEG (FF D8 FF)
260    if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
261        return true;
262    }
263    // PDF
264    if sample.starts_with(b"%PDF-") {
265        return true;
266    }
267    // ZIP / docx / jar / apk
268    if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
269        return true;
270    }
271    // 7z
272    if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
273        return true;
274    }
275    // xz
276    if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
277        return true;
278    }
279    // bzip2
280    if sample.starts_with(b"BZh") {
281        return true;
282    }
283    // mp4 / m4a / mov (ISO Base Media): bytes 4..8 == "ftyp"
284    if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
285        return true;
286    }
287    // webm / mkv (EBML)
288    if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
289        return true;
290    }
291    // webp (RIFF .... WEBP)
292    if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
293        return true;
294    }
295    false
296}
297
298impl SamplingDispatcher {
299    /// Core sample-only decision shared by `pick` and `pick_with_size_hint`.
300    /// Returns the pre-GPU-promotion choice; the size-hint-aware caller may
301    /// rewrite a `CpuZstd` result to `NvcompZstd` when the body is big enough.
302    fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
303        if sample.len() < Self::MIN_SAMPLE_BYTES {
304            return self.default;
305        }
306        if looks_already_compressed(sample) {
307            return CodecKind::Passthrough;
308        }
309        if shannon_entropy(sample) >= self.entropy_threshold {
310            return CodecKind::Passthrough;
311        }
312        self.default
313    }
314
315    /// v0.8 #56 / v0.8.12 #125: rewrite a `CpuZstd` pick to a GPU
316    /// codec when GPU preference is on AND the caller proved a total
317    /// body size >= `gpu_min_bytes`. v0.8.12 adds the columnar-integer
318    /// branch: when `prefer_columnar_gpu = true` AND the sample
319    /// matches the per-stride-position entropy signature of a
320    /// u32 / u64 LE integer column, route to `NvcompBitcomp` instead
321    /// of `NvcompZstd`. Passthrough / non-CpuZstd picks are left
322    /// alone — already-compressed bodies don't benefit from GPU
323    /// compression, and other CPU codecs (CpuGzip) imply the
324    /// operator wants wire-compatible output that the nvCOMP codecs
325    /// can't provide.
326    fn maybe_promote_to_gpu(
327        &self,
328        chosen: CodecKind,
329        sample: &[u8],
330        total_size: Option<u64>,
331    ) -> CodecKind {
332        if !self.prefer_gpu {
333            return chosen;
334        }
335        if chosen != CodecKind::CpuZstd {
336            return chosen;
337        }
338        let big_enough = match total_size {
339            Some(n) => n >= self.gpu_min_bytes as u64,
340            // No size hint (chunked transfer) → conservative, keep CpuZstd.
341            None => return chosen,
342        };
343        if !big_enough {
344            return chosen;
345        }
346        if self.prefer_columnar_gpu && looks_columnar_integer(sample) {
347            CodecKind::NvcompBitcomp
348        } else {
349            CodecKind::NvcompZstd
350        }
351    }
352}
353
354#[async_trait::async_trait]
355impl CodecDispatcher for SamplingDispatcher {
356    async fn pick(&self, sample: &[u8]) -> CodecKind {
357        // No size hint available → never promote to GPU.
358        self.pick_from_sample(sample)
359    }
360
361    async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
362        let chosen = self.pick_from_sample(sample);
363        self.maybe_promote_to_gpu(chosen, sample, total_size)
364    }
365}
366
367/// `Box<dyn CodecDispatcher>` からも `CodecDispatcher` として使えるようにする blanket impl
368#[async_trait::async_trait]
369impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
370    async fn pick(&self, sample: &[u8]) -> CodecKind {
371        (**self).pick(sample).await
372    }
373
374    async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
375        (**self).pick_with_size_hint(sample, total_size).await
376    }
377}
378
379#[async_trait::async_trait]
380impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
381    async fn pick(&self, sample: &[u8]) -> CodecKind {
382        (**self).pick(sample).await
383    }
384
385    async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
386        (**self).pick_with_size_hint(sample, total_size).await
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[tokio::test]
395    async fn always_dispatcher_returns_configured_kind() {
396        let d = AlwaysDispatcher(CodecKind::CpuZstd);
397        assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
398    }
399
400    #[tokio::test]
401    async fn boxed_dispatcher_works() {
402        let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
403        assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
404    }
405
406    #[tokio::test]
407    async fn sampling_short_sample_uses_default() {
408        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
409        assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
410    }
411
412    #[tokio::test]
413    async fn sampling_text_picks_default() {
414        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
415        // 1 KB の英語っぽい text (低エントロピー)
416        let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
417            .repeat(30)
418            .into_bytes();
419        assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
420    }
421
422    #[tokio::test]
423    async fn sampling_random_bytes_picks_passthrough() {
424        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
425        // 1 KB の高エントロピー (擬似ランダムデータを作る — XOR-shift で uniformish に)
426        let mut state: u64 = 0xfeed_beef_dead_c0de;
427        let mut payload = Vec::with_capacity(4096);
428        for _ in 0..4096 {
429            state ^= state << 13;
430            state ^= state >> 7;
431            state ^= state << 17;
432            payload.push((state & 0xff) as u8);
433        }
434        // entropy が default threshold (7.5) 以上のはず
435        let e = shannon_entropy(&payload);
436        assert!(
437            e > 7.5,
438            "expected high entropy on pseudo-random bytes, got {e}"
439        );
440        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
441    }
442
443    #[tokio::test]
444    async fn sampling_gzip_magic_picks_passthrough() {
445        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
446        let mut payload = vec![0x1f, 0x8b, 0x08]; // gzip magic + DEFLATE method
447        payload.extend(std::iter::repeat_n(b'a', 256));
448        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
449    }
450
451    #[tokio::test]
452    async fn sampling_png_magic_picks_passthrough() {
453        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
454        let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
455        payload.extend(std::iter::repeat_n(b'b', 256));
456        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
457    }
458
459    #[tokio::test]
460    async fn sampling_mp4_ftyp_picks_passthrough() {
461        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
462        let mut payload = vec![0u8; 256];
463        payload[4..8].copy_from_slice(b"ftyp");
464        assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
465    }
466
467    #[test]
468    fn entropy_zero_for_uniform() {
469        let zeros = vec![0u8; 1024];
470        assert_eq!(shannon_entropy(&zeros), 0.0);
471    }
472
473    // ===========================================================
474    // v0.8 #56: GPU auto-detect / size-hint promotion
475    // ===========================================================
476
477    /// Build a 1 KiB low-entropy text sample (repeats a sentence) — the
478    /// post-magic-byte / post-entropy decision falls through to `default`,
479    /// which the v0.8 #56 promotion logic then either keeps as `CpuZstd`
480    /// or rewrites to `NvcompZstd`.
481    fn text_sample() -> Vec<u8> {
482        "the quick brown fox jumps over the lazy dog. "
483            .repeat(30)
484            .into_bytes()
485    }
486
487    #[tokio::test]
488    async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
489        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
490        let sample = text_sample();
491        // 2 MiB total body — past the 1 MiB threshold → GPU promotion.
492        let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
493        assert_eq!(kind, CodecKind::NvcompZstd);
494    }
495
496    #[tokio::test]
497    async fn gpu_pref_keeps_small_object_on_cpu() {
498        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
499        let sample = text_sample();
500        // 100 KiB total body — under the 1 MiB threshold → GPU upload
501        // overhead would exceed compress savings, stay on CPU.
502        let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
503        assert_eq!(kind, CodecKind::CpuZstd);
504    }
505
506    #[tokio::test]
507    async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
508        // Default — no `with_gpu_preference` call → prefer_gpu = false.
509        let d = SamplingDispatcher::new(CodecKind::CpuZstd);
510        let sample = text_sample();
511        let kind = d.pick_with_size_hint(&sample, Some(10 * 1024 * 1024)).await;
512        assert_eq!(kind, CodecKind::CpuZstd);
513    }
514
515    #[tokio::test]
516    async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
517        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
518        // High-entropy pseudo-random payload → entropy filter wins,
519        // returns Passthrough; GPU promotion is skipped because
520        // already-compressed data won't compress further on GPU either.
521        let mut state: u64 = 0xfeed_beef_dead_c0de;
522        let mut payload = Vec::with_capacity(4096);
523        for _ in 0..4096 {
524            state ^= state << 13;
525            state ^= state >> 7;
526            state ^= state << 17;
527            payload.push((state & 0xff) as u8);
528        }
529        let kind = d.pick_with_size_hint(&payload, Some(8 * 1024 * 1024)).await;
530        assert_eq!(kind, CodecKind::Passthrough);
531    }
532
533    #[tokio::test]
534    async fn gpu_pref_with_no_size_hint_stays_conservative() {
535        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
536        let sample = text_sample();
537        // Chunked transfer: caller has no Content-Length, so total_size =
538        // None. We can't safely commit to GPU because the body might be
539        // tiny — stay on CPU.
540        let kind = d.pick_with_size_hint(&sample, None).await;
541        assert_eq!(kind, CodecKind::CpuZstd);
542    }
543
544    // ===========================================================
545    // v0.8.12 #125: columnar-integer detection + Bitcomp routing
546    // ===========================================================
547
548    /// 1 KiB of u32 LE monotonic counts (postings / sorted ids). The
549    /// low byte cycles 0..256, the middle bytes barely move, and the
550    /// high byte stays at 0 — exactly the per-position-entropy
551    /// signature `looks_columnar_integer` is built to catch.
552    fn u32_monotonic_postings() -> Vec<u8> {
553        let mut buf = Vec::with_capacity(4096);
554        for i in 0u32..1024 {
555            buf.extend_from_slice(&i.to_le_bytes());
556        }
557        buf
558    }
559
560    /// 4 KiB of u64 LE near-monotonic timestamps (Unix epoch nanos —
561    /// stride 8, the high 3 bytes are nearly constant, the bottom 5
562    /// drift slowly).
563    fn u64_timestamps() -> Vec<u8> {
564        let base: u64 = 1_700_000_000_000_000_000;
565        let mut buf = Vec::with_capacity(4096);
566        for i in 0u64..512 {
567            buf.extend_from_slice(&(base + i * 137).to_le_bytes());
568        }
569        buf
570    }
571
572    #[test]
573    fn columnar_detect_flags_u32_postings() {
574        assert!(looks_columnar_integer(&u32_monotonic_postings()));
575    }
576
577    #[test]
578    fn columnar_detect_flags_u64_timestamps() {
579        assert!(looks_columnar_integer(&u64_timestamps()));
580    }
581
582    #[test]
583    fn columnar_detect_rejects_english_text() {
584        let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
585            .repeat(50)
586            .into_bytes();
587        // English text has reasonably uniform per-stride-position
588        // entropy — no single byte position dominates the entropy.
589        assert!(!looks_columnar_integer(&text));
590    }
591
592    #[test]
593    fn columnar_detect_rejects_random_bytes() {
594        let mut state: u64 = 0xa5a5_5a5a_dead_beef;
595        let mut payload = Vec::with_capacity(4096);
596        for _ in 0..4096 {
597            state ^= state << 13;
598            state ^= state >> 7;
599            state ^= state << 17;
600            payload.push((state & 0xff) as u8);
601        }
602        assert!(!looks_columnar_integer(&payload));
603    }
604
605    #[test]
606    fn columnar_detect_rejects_too_small_sample() {
607        // 256 bytes < COLUMNAR_MIN_SAMPLE (512) — must short-circuit
608        // to `false` so we never flag a tiny request as columnar.
609        let mut buf = Vec::with_capacity(256);
610        for i in 0u32..64 {
611            buf.extend_from_slice(&i.to_le_bytes());
612        }
613        assert!(!looks_columnar_integer(&buf));
614    }
615
616    #[tokio::test]
617    async fn gpu_pref_columnar_promotes_postings_to_bitcomp() {
618        let d = SamplingDispatcher::new(CodecKind::CpuZstd)
619            .with_gpu_preference(true, 1_048_576)
620            .with_columnar_gpu_preference(true);
621        let sample = u32_monotonic_postings();
622        let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
623        assert_eq!(kind, CodecKind::NvcompBitcomp);
624    }
625
626    #[tokio::test]
627    async fn gpu_pref_columnar_promotes_timestamps_to_bitcomp() {
628        let d = SamplingDispatcher::new(CodecKind::CpuZstd)
629            .with_gpu_preference(true, 1_048_576)
630            .with_columnar_gpu_preference(true);
631        let sample = u64_timestamps();
632        let kind = d.pick_with_size_hint(&sample, Some(4 * 1024 * 1024)).await;
633        assert_eq!(kind, CodecKind::NvcompBitcomp);
634    }
635
636    #[tokio::test]
637    async fn gpu_pref_columnar_falls_through_to_zstd_on_text() {
638        // Columnar detector rejects text → Bitcomp routing skipped,
639        // existing NvcompZstd promotion (#56) takes over.
640        let d = SamplingDispatcher::new(CodecKind::CpuZstd)
641            .with_gpu_preference(true, 1_048_576)
642            .with_columnar_gpu_preference(true);
643        let sample = text_sample();
644        let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
645        assert_eq!(kind, CodecKind::NvcompZstd);
646    }
647
648    #[tokio::test]
649    async fn gpu_pref_columnar_off_keeps_postings_on_zstd() {
650        // Default — `with_columnar_gpu_preference` NOT called → the
651        // README's "manual `--codec nvcomp-bitcomp`" path is the
652        // only way to reach Bitcomp.
653        let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
654        let sample = u32_monotonic_postings();
655        let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
656        assert_eq!(kind, CodecKind::NvcompZstd);
657    }
658
659    #[tokio::test]
660    async fn gpu_pref_columnar_respects_size_threshold() {
661        // Columnar payload but under the gpu_min_bytes threshold →
662        // GPU upload overhead would exceed the compress gain, stay
663        // on CpuZstd. The Bitcomp branch must not bypass the size
664        // gate.
665        let d = SamplingDispatcher::new(CodecKind::CpuZstd)
666            .with_gpu_preference(true, 1_048_576)
667            .with_columnar_gpu_preference(true);
668        let sample = u32_monotonic_postings();
669        let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
670        assert_eq!(kind, CodecKind::CpuZstd);
671    }
672
673    #[test]
674    fn entropy_full_8_for_each_byte_once() {
675        // 0..256 を 1 度ずつ → 各 byte の確率 1/256 → entropy = 8 bits
676        let mut payload: Vec<u8> = (0..=255).collect();
677        // 256 byte は最小 sample 未満になりうるので 1024 まで複製 (entropy は不変)
678        let copy = payload.clone();
679        for _ in 0..3 {
680            payload.extend_from_slice(&copy);
681        }
682        let e = shannon_entropy(&payload);
683        assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
684    }
685}