use crate::CodecKind;
#[async_trait::async_trait]
pub trait CodecDispatcher: Send + Sync {
async fn pick(&self, sample: &[u8]) -> CodecKind;
async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
self.pick(sample).await
}
}
#[derive(Debug, Clone, Copy)]
pub struct AlwaysDispatcher(pub CodecKind);
#[async_trait::async_trait]
impl CodecDispatcher for AlwaysDispatcher {
async fn pick(&self, _sample: &[u8]) -> CodecKind {
self.0
}
}
#[derive(Debug, Clone)]
pub struct SamplingDispatcher {
pub default: CodecKind,
pub entropy_threshold: f64,
pub prefer_gpu: bool,
pub gpu_min_bytes: usize,
pub prefer_columnar_gpu: bool,
}
impl SamplingDispatcher {
pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
pub const MIN_SAMPLE_BYTES: usize = 128;
pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
pub fn new(default: CodecKind) -> Self {
Self {
default,
entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
prefer_gpu: false,
gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
prefer_columnar_gpu: false,
}
}
#[must_use]
pub fn with_columnar_gpu_preference(mut self, on: bool) -> Self {
self.prefer_columnar_gpu = on;
self
}
#[must_use]
pub fn with_entropy_threshold(mut self, t: f64) -> Self {
self.entropy_threshold = t;
self
}
#[must_use]
pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
self.prefer_gpu = prefer_gpu;
self.gpu_min_bytes = gpu_min_bytes;
self
}
}
fn shannon_entropy(sample: &[u8]) -> f64 {
if sample.is_empty() {
return 0.0;
}
let mut counts = [0u32; 256];
for &b in sample {
counts[b as usize] += 1;
}
let n = sample.len() as f64;
let mut entropy = 0.0;
for c in counts {
if c == 0 {
continue;
}
let p = f64::from(c) / n;
entropy -= p * p.log2();
}
entropy
}
const COLUMNAR_MIN_SAMPLE: usize = 512;
const COLUMNAR_ENTROPY_GAP: f64 = 4.0;
fn entropy_at_stride_position(sample: &[u8], stride: usize, pos: usize) -> f64 {
debug_assert!(pos < stride);
debug_assert!(stride > 0);
let mut counts = [0u32; 256];
let mut n = 0u32;
let mut i = pos;
while i < sample.len() {
counts[sample[i] as usize] += 1;
n += 1;
i += stride;
}
if n == 0 {
return 0.0;
}
let nf = f64::from(n);
let mut e = 0.0;
for c in counts {
if c == 0 {
continue;
}
let p = f64::from(c) / nf;
e -= p * p.log2();
}
e
}
fn looks_columnar_integer(sample: &[u8]) -> bool {
if sample.len() < COLUMNAR_MIN_SAMPLE {
return false;
}
for &stride in &[4usize, 8usize] {
if sample.len() < stride * 64 {
continue;
}
let mut min_e = f64::INFINITY;
let mut max_e = f64::NEG_INFINITY;
for pos in 0..stride {
let e = entropy_at_stride_position(sample, stride, pos);
if e < min_e {
min_e = e;
}
if e > max_e {
max_e = e;
}
}
if max_e - min_e >= COLUMNAR_ENTROPY_GAP {
return true;
}
}
false
}
fn looks_already_compressed(sample: &[u8]) -> bool {
if sample.starts_with(&[0x1f, 0x8b]) {
return true;
}
if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
return true;
}
if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
return true;
}
if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
return true;
}
if sample.starts_with(b"%PDF-") {
return true;
}
if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
return true;
}
if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
return true;
}
if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
return true;
}
if sample.starts_with(b"BZh") {
return true;
}
if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
return true;
}
if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
return true;
}
if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
return true;
}
false
}
impl SamplingDispatcher {
fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
if sample.len() < Self::MIN_SAMPLE_BYTES {
return self.default;
}
if looks_already_compressed(sample) {
return CodecKind::Passthrough;
}
if shannon_entropy(sample) >= self.entropy_threshold {
return CodecKind::Passthrough;
}
self.default
}
fn maybe_promote_to_gpu(
&self,
chosen: CodecKind,
sample: &[u8],
total_size: Option<u64>,
) -> CodecKind {
if !self.prefer_gpu {
return chosen;
}
if chosen != CodecKind::CpuZstd {
return chosen;
}
let big_enough = match total_size {
Some(n) => n >= self.gpu_min_bytes as u64,
None => return chosen,
};
if !big_enough {
return chosen;
}
if self.prefer_columnar_gpu && looks_columnar_integer(sample) {
CodecKind::NvcompBitcomp
} else {
CodecKind::NvcompZstd
}
}
}
#[async_trait::async_trait]
impl CodecDispatcher for SamplingDispatcher {
async fn pick(&self, sample: &[u8]) -> CodecKind {
self.pick_from_sample(sample)
}
async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
let chosen = self.pick_from_sample(sample);
self.maybe_promote_to_gpu(chosen, sample, total_size)
}
}
#[async_trait::async_trait]
impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
async fn pick(&self, sample: &[u8]) -> CodecKind {
(**self).pick(sample).await
}
async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
(**self).pick_with_size_hint(sample, total_size).await
}
}
#[async_trait::async_trait]
impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
async fn pick(&self, sample: &[u8]) -> CodecKind {
(**self).pick(sample).await
}
async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
(**self).pick_with_size_hint(sample, total_size).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn always_dispatcher_returns_configured_kind() {
let d = AlwaysDispatcher(CodecKind::CpuZstd);
assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
}
#[tokio::test]
async fn boxed_dispatcher_works() {
let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_short_sample_uses_default() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
}
#[tokio::test]
async fn sampling_text_picks_default() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
.repeat(30)
.into_bytes();
assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
}
#[tokio::test]
async fn sampling_random_bytes_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut state: u64 = 0xfeed_beef_dead_c0de;
let mut payload = Vec::with_capacity(4096);
for _ in 0..4096 {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
payload.push((state & 0xff) as u8);
}
let e = shannon_entropy(&payload);
assert!(
e > 7.5,
"expected high entropy on pseudo-random bytes, got {e}"
);
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_gzip_magic_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut payload = vec![0x1f, 0x8b, 0x08]; payload.extend(std::iter::repeat_n(b'a', 256));
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_png_magic_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
payload.extend(std::iter::repeat_n(b'b', 256));
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_mp4_ftyp_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut payload = vec![0u8; 256];
payload[4..8].copy_from_slice(b"ftyp");
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[test]
fn entropy_zero_for_uniform() {
let zeros = vec![0u8; 1024];
assert_eq!(shannon_entropy(&zeros), 0.0);
}
fn text_sample() -> Vec<u8> {
"the quick brown fox jumps over the lazy dog. "
.repeat(30)
.into_bytes()
}
#[tokio::test]
async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
let sample = text_sample();
let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::NvcompZstd);
}
#[tokio::test]
async fn gpu_pref_keeps_small_object_on_cpu() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
let sample = text_sample();
let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
assert_eq!(kind, CodecKind::CpuZstd);
}
#[tokio::test]
async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let sample = text_sample();
let kind = d.pick_with_size_hint(&sample, Some(10 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::CpuZstd);
}
#[tokio::test]
async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
let mut state: u64 = 0xfeed_beef_dead_c0de;
let mut payload = Vec::with_capacity(4096);
for _ in 0..4096 {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
payload.push((state & 0xff) as u8);
}
let kind = d.pick_with_size_hint(&payload, Some(8 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::Passthrough);
}
#[tokio::test]
async fn gpu_pref_with_no_size_hint_stays_conservative() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
let sample = text_sample();
let kind = d.pick_with_size_hint(&sample, None).await;
assert_eq!(kind, CodecKind::CpuZstd);
}
fn u32_monotonic_postings() -> Vec<u8> {
let mut buf = Vec::with_capacity(4096);
for i in 0u32..1024 {
buf.extend_from_slice(&i.to_le_bytes());
}
buf
}
fn u64_timestamps() -> Vec<u8> {
let base: u64 = 1_700_000_000_000_000_000;
let mut buf = Vec::with_capacity(4096);
for i in 0u64..512 {
buf.extend_from_slice(&(base + i * 137).to_le_bytes());
}
buf
}
#[test]
fn columnar_detect_flags_u32_postings() {
assert!(looks_columnar_integer(&u32_monotonic_postings()));
}
#[test]
fn columnar_detect_flags_u64_timestamps() {
assert!(looks_columnar_integer(&u64_timestamps()));
}
#[test]
fn columnar_detect_rejects_english_text() {
let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
.repeat(50)
.into_bytes();
assert!(!looks_columnar_integer(&text));
}
#[test]
fn columnar_detect_rejects_random_bytes() {
let mut state: u64 = 0xa5a5_5a5a_dead_beef;
let mut payload = Vec::with_capacity(4096);
for _ in 0..4096 {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
payload.push((state & 0xff) as u8);
}
assert!(!looks_columnar_integer(&payload));
}
#[test]
fn columnar_detect_rejects_too_small_sample() {
let mut buf = Vec::with_capacity(256);
for i in 0u32..64 {
buf.extend_from_slice(&i.to_le_bytes());
}
assert!(!looks_columnar_integer(&buf));
}
#[tokio::test]
async fn gpu_pref_columnar_promotes_postings_to_bitcomp() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576)
.with_columnar_gpu_preference(true);
let sample = u32_monotonic_postings();
let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::NvcompBitcomp);
}
#[tokio::test]
async fn gpu_pref_columnar_promotes_timestamps_to_bitcomp() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576)
.with_columnar_gpu_preference(true);
let sample = u64_timestamps();
let kind = d.pick_with_size_hint(&sample, Some(4 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::NvcompBitcomp);
}
#[tokio::test]
async fn gpu_pref_columnar_falls_through_to_zstd_on_text() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576)
.with_columnar_gpu_preference(true);
let sample = text_sample();
let kind = d.pick_with_size_hint(&sample, Some(2 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::NvcompZstd);
}
#[tokio::test]
async fn gpu_pref_columnar_off_keeps_postings_on_zstd() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd).with_gpu_preference(true, 1_048_576);
let sample = u32_monotonic_postings();
let kind = d.pick_with_size_hint(&sample, Some(8 * 1024 * 1024)).await;
assert_eq!(kind, CodecKind::NvcompZstd);
}
#[tokio::test]
async fn gpu_pref_columnar_respects_size_threshold() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576)
.with_columnar_gpu_preference(true);
let sample = u32_monotonic_postings();
let kind = d.pick_with_size_hint(&sample, Some(100 * 1024)).await;
assert_eq!(kind, CodecKind::CpuZstd);
}
#[test]
fn entropy_full_8_for_each_byte_once() {
let mut payload: Vec<u8> = (0..=255).collect();
let copy = payload.clone();
for _ in 0..3 {
payload.extend_from_slice(©);
}
let e = shannon_entropy(&payload);
assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
}
}