use crate::CodecKind;
#[async_trait::async_trait]
pub trait CodecDispatcher: Send + Sync {
async fn pick(&self, sample: &[u8]) -> CodecKind;
async fn pick_with_size_hint(&self, sample: &[u8], _total_size: Option<u64>) -> CodecKind {
self.pick(sample).await
}
}
#[derive(Debug, Clone, Copy)]
pub struct AlwaysDispatcher(pub CodecKind);
#[async_trait::async_trait]
impl CodecDispatcher for AlwaysDispatcher {
async fn pick(&self, _sample: &[u8]) -> CodecKind {
self.0
}
}
#[derive(Debug, Clone)]
pub struct SamplingDispatcher {
pub default: CodecKind,
pub entropy_threshold: f64,
pub prefer_gpu: bool,
pub gpu_min_bytes: usize,
}
impl SamplingDispatcher {
pub const DEFAULT_ENTROPY_THRESHOLD: f64 = 7.5;
pub const MIN_SAMPLE_BYTES: usize = 128;
pub const DEFAULT_GPU_MIN_BYTES: usize = 1_048_576;
pub fn new(default: CodecKind) -> Self {
Self {
default,
entropy_threshold: Self::DEFAULT_ENTROPY_THRESHOLD,
prefer_gpu: false,
gpu_min_bytes: Self::DEFAULT_GPU_MIN_BYTES,
}
}
#[must_use]
pub fn with_entropy_threshold(mut self, t: f64) -> Self {
self.entropy_threshold = t;
self
}
#[must_use]
pub fn with_gpu_preference(mut self, prefer_gpu: bool, gpu_min_bytes: usize) -> Self {
self.prefer_gpu = prefer_gpu;
self.gpu_min_bytes = gpu_min_bytes;
self
}
}
fn shannon_entropy(sample: &[u8]) -> f64 {
if sample.is_empty() {
return 0.0;
}
let mut counts = [0u32; 256];
for &b in sample {
counts[b as usize] += 1;
}
let n = sample.len() as f64;
let mut entropy = 0.0;
for c in counts {
if c == 0 {
continue;
}
let p = f64::from(c) / n;
entropy -= p * p.log2();
}
entropy
}
fn looks_already_compressed(sample: &[u8]) -> bool {
if sample.starts_with(&[0x1f, 0x8b]) {
return true;
}
if sample.starts_with(&[0x28, 0xb5, 0x2f, 0xfd]) {
return true;
}
if sample.starts_with(&[0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]) {
return true;
}
if sample.len() >= 3 && sample[0] == 0xff && sample[1] == 0xd8 && sample[2] == 0xff {
return true;
}
if sample.starts_with(b"%PDF-") {
return true;
}
if sample.starts_with(&[0x50, 0x4b, 0x03, 0x04]) {
return true;
}
if sample.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c]) {
return true;
}
if sample.starts_with(&[0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) {
return true;
}
if sample.starts_with(b"BZh") {
return true;
}
if sample.len() >= 8 && &sample[4..8] == b"ftyp" {
return true;
}
if sample.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
return true;
}
if sample.len() >= 12 && sample.starts_with(b"RIFF") && &sample[8..12] == b"WEBP" {
return true;
}
false
}
impl SamplingDispatcher {
fn pick_from_sample(&self, sample: &[u8]) -> CodecKind {
if sample.len() < Self::MIN_SAMPLE_BYTES {
return self.default;
}
if looks_already_compressed(sample) {
return CodecKind::Passthrough;
}
if shannon_entropy(sample) >= self.entropy_threshold {
return CodecKind::Passthrough;
}
self.default
}
fn maybe_promote_to_gpu(&self, chosen: CodecKind, total_size: Option<u64>) -> CodecKind {
if !self.prefer_gpu {
return chosen;
}
if chosen != CodecKind::CpuZstd {
return chosen;
}
match total_size {
Some(n) if n >= self.gpu_min_bytes as u64 => CodecKind::NvcompZstd,
_ => chosen,
}
}
}
#[async_trait::async_trait]
impl CodecDispatcher for SamplingDispatcher {
async fn pick(&self, sample: &[u8]) -> CodecKind {
self.pick_from_sample(sample)
}
async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
let chosen = self.pick_from_sample(sample);
self.maybe_promote_to_gpu(chosen, total_size)
}
}
#[async_trait::async_trait]
impl<T: CodecDispatcher + ?Sized> CodecDispatcher for Box<T> {
async fn pick(&self, sample: &[u8]) -> CodecKind {
(**self).pick(sample).await
}
async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
(**self).pick_with_size_hint(sample, total_size).await
}
}
#[async_trait::async_trait]
impl<T: CodecDispatcher + ?Sized> CodecDispatcher for std::sync::Arc<T> {
async fn pick(&self, sample: &[u8]) -> CodecKind {
(**self).pick(sample).await
}
async fn pick_with_size_hint(&self, sample: &[u8], total_size: Option<u64>) -> CodecKind {
(**self).pick_with_size_hint(sample, total_size).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn always_dispatcher_returns_configured_kind() {
let d = AlwaysDispatcher(CodecKind::CpuZstd);
assert_eq!(d.pick(b"any input").await, CodecKind::CpuZstd);
}
#[tokio::test]
async fn boxed_dispatcher_works() {
let d: Box<dyn CodecDispatcher> = Box::new(AlwaysDispatcher(CodecKind::Passthrough));
assert_eq!(d.pick(b"x").await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_short_sample_uses_default() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
assert_eq!(d.pick(b"short").await, CodecKind::CpuZstd);
}
#[tokio::test]
async fn sampling_text_picks_default() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let text: Vec<u8> = "the quick brown fox jumps over the lazy dog. "
.repeat(30)
.into_bytes();
assert_eq!(d.pick(&text).await, CodecKind::CpuZstd);
}
#[tokio::test]
async fn sampling_random_bytes_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut state: u64 = 0xfeed_beef_dead_c0de;
let mut payload = Vec::with_capacity(4096);
for _ in 0..4096 {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
payload.push((state & 0xff) as u8);
}
let e = shannon_entropy(&payload);
assert!(
e > 7.5,
"expected high entropy on pseudo-random bytes, got {e}"
);
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_gzip_magic_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut payload = vec![0x1f, 0x8b, 0x08]; payload.extend(std::iter::repeat_n(b'a', 256));
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_png_magic_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut payload = vec![0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a];
payload.extend(std::iter::repeat_n(b'b', 256));
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[tokio::test]
async fn sampling_mp4_ftyp_picks_passthrough() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let mut payload = vec![0u8; 256];
payload[4..8].copy_from_slice(b"ftyp");
assert_eq!(d.pick(&payload).await, CodecKind::Passthrough);
}
#[test]
fn entropy_zero_for_uniform() {
let zeros = vec![0u8; 1024];
assert_eq!(shannon_entropy(&zeros), 0.0);
}
fn text_sample() -> Vec<u8> {
"the quick brown fox jumps over the lazy dog. "
.repeat(30)
.into_bytes()
}
#[tokio::test]
async fn gpu_pref_promotes_large_text_to_nvcomp_zstd() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576);
let sample = text_sample();
let kind = d
.pick_with_size_hint(&sample, Some(2 * 1024 * 1024))
.await;
assert_eq!(kind, CodecKind::NvcompZstd);
}
#[tokio::test]
async fn gpu_pref_keeps_small_object_on_cpu() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576);
let sample = text_sample();
let kind = d
.pick_with_size_hint(&sample, Some(100 * 1024))
.await;
assert_eq!(kind, CodecKind::CpuZstd);
}
#[tokio::test]
async fn gpu_pref_off_keeps_cpu_even_for_large_object() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd);
let sample = text_sample();
let kind = d
.pick_with_size_hint(&sample, Some(10 * 1024 * 1024))
.await;
assert_eq!(kind, CodecKind::CpuZstd);
}
#[tokio::test]
async fn gpu_pref_does_not_override_passthrough_on_high_entropy() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576);
let mut state: u64 = 0xfeed_beef_dead_c0de;
let mut payload = Vec::with_capacity(4096);
for _ in 0..4096 {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
payload.push((state & 0xff) as u8);
}
let kind = d
.pick_with_size_hint(&payload, Some(8 * 1024 * 1024))
.await;
assert_eq!(kind, CodecKind::Passthrough);
}
#[tokio::test]
async fn gpu_pref_with_no_size_hint_stays_conservative() {
let d = SamplingDispatcher::new(CodecKind::CpuZstd)
.with_gpu_preference(true, 1_048_576);
let sample = text_sample();
let kind = d.pick_with_size_hint(&sample, None).await;
assert_eq!(kind, CodecKind::CpuZstd);
}
#[test]
fn entropy_full_8_for_each_byte_once() {
let mut payload: Vec<u8> = (0..=255).collect();
let copy = payload.clone();
for _ in 0..3 {
payload.extend_from_slice(©);
}
let e = shannon_entropy(&payload);
assert!((e - 8.0).abs() < 0.0001, "expected 8.0, got {e}");
}
}