mod support;
use criterion::{Criterion, SamplingMode, Throughput, criterion_group, criterion_main};
use std::hint::black_box;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use structured_zstd::decoding::FrameDecoder;
use structured_zstd::dictionary::{
FastCoverOptions, FinalizeOptions, finalize_raw_dict, train_fastcover_raw_from_slice,
};
use support::{
LevelConfig, Scenario, ScenarioClass, benchmark_scenarios, supported_levels_filtered,
};
static BENCHMARK_SCENARIOS: OnceLock<Vec<Scenario>> = OnceLock::new();
fn ffi_encode_to_vec(input: &[u8], level: i32) -> Vec<u8> {
use zstd::zstd_safe::zstd_sys;
let cctx = unsafe { zstd_sys::ZSTD_createCCtx() };
assert!(!cctx.is_null(), "ZSTD_createCCtx returned null");
unsafe {
let rc = zstd_sys::ZSTD_CCtx_setParameter(
cctx,
zstd_sys::ZSTD_cParameter::ZSTD_c_compressionLevel,
level,
);
assert!(
zstd_sys::ZSTD_isError(rc) == 0,
"set compressionLevel failed"
);
let rc = zstd_sys::ZSTD_CCtx_setParameter(
cctx,
zstd_sys::ZSTD_cParameter::ZSTD_c_checksumFlag,
if cfg!(feature = "hash") { 1 } else { 0 },
);
assert!(zstd_sys::ZSTD_isError(rc) == 0, "set checksumFlag failed");
let rc = zstd_sys::ZSTD_CCtx_setParameter(
cctx,
zstd_sys::ZSTD_cParameter::ZSTD_c_contentSizeFlag,
1,
);
assert!(
zstd_sys::ZSTD_isError(rc) == 0,
"set contentSizeFlag failed"
);
if input.len() <= (1 << 14) {
let rc = zstd_sys::ZSTD_CCtx_setParameter(
cctx,
zstd_sys::ZSTD_cParameter::ZSTD_c_windowLog,
14,
);
assert!(zstd_sys::ZSTD_isError(rc) == 0, "set windowLog failed");
}
let rc = zstd_sys::ZSTD_CCtx_setPledgedSrcSize(cctx, input.len() as u64);
assert!(zstd_sys::ZSTD_isError(rc) == 0, "setPledgedSrcSize failed");
let recommended_in = zstd_sys::ZSTD_CStreamInSize();
let recommended_out = zstd_sys::ZSTD_CStreamOutSize();
let mut output: Vec<u8> = Vec::new();
let mut chunk = vec![0u8; recommended_out];
let mut in_pos: usize = 0;
loop {
let chunk_end = (in_pos + recommended_in).min(input.len());
let mut zin = zstd_sys::ZSTD_inBuffer {
src: input.as_ptr() as *const core::ffi::c_void,
size: chunk_end,
pos: in_pos,
};
let mode = if chunk_end == input.len() {
zstd_sys::ZSTD_EndDirective::ZSTD_e_end
} else {
zstd_sys::ZSTD_EndDirective::ZSTD_e_continue
};
loop {
let mut zout = zstd_sys::ZSTD_outBuffer {
dst: chunk.as_mut_ptr() as *mut core::ffi::c_void,
size: chunk.len(),
pos: 0,
};
let remaining = zstd_sys::ZSTD_compressStream2(cctx, &mut zout, &mut zin, mode);
assert!(
zstd_sys::ZSTD_isError(remaining) == 0,
"ZSTD_compressStream2 failed (code = {remaining})"
);
output.extend_from_slice(&chunk[..zout.pos]);
let frame_complete =
matches!(mode, zstd_sys::ZSTD_EndDirective::ZSTD_e_end) && remaining == 0;
let chunk_consumed = matches!(mode, zstd_sys::ZSTD_EndDirective::ZSTD_e_continue)
&& zin.pos == zin.size;
if frame_complete || chunk_consumed {
break;
}
}
in_pos = zin.pos;
if in_pos == input.len() && matches!(mode, zstd_sys::ZSTD_EndDirective::ZSTD_e_end) {
break;
}
}
zstd_sys::ZSTD_freeCCtx(cctx);
output
}
}
struct FfiDCtxHandle {
ptr: *mut zstd::zstd_safe::zstd_sys::ZSTD_DCtx_s,
}
impl FfiDCtxHandle {
fn new() -> Self {
use zstd::zstd_safe::zstd_sys;
let ptr = unsafe { zstd_sys::ZSTD_createDCtx() };
assert!(!ptr.is_null(), "ZSTD_createDCtx returned null");
FfiDCtxHandle { ptr }
}
fn decompress_into(&mut self, compressed: &[u8], output: &mut [u8]) -> usize {
use zstd::zstd_safe::zstd_sys;
let written = unsafe {
zstd_sys::ZSTD_decompressDCtx(
self.ptr,
output.as_mut_ptr() as *mut core::ffi::c_void,
output.len(),
compressed.as_ptr() as *const core::ffi::c_void,
compressed.len(),
)
};
assert!(
unsafe { zstd_sys::ZSTD_isError(written) } == 0,
"ZSTD_decompressDCtx failed (code = {written})"
);
written
}
}
impl Drop for FfiDCtxHandle {
fn drop(&mut self) {
unsafe {
zstd::zstd_safe::zstd_sys::ZSTD_freeDCtx(self.ptr);
}
}
}
fn ffi_decompress_into(compressed: &[u8], output: &mut [u8]) -> usize {
let mut dctx = FfiDCtxHandle::new();
dctx.decompress_into(compressed, output)
}
fn benchmark_scenarios_cached() -> &'static [Scenario] {
BENCHMARK_SCENARIOS.get_or_init(benchmark_scenarios)
}
fn emit_reports_enabled() -> bool {
std::env::var("STRUCTURED_ZSTD_EMIT_REPORT")
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE"))
.unwrap_or(false)
}
fn bench_compress(c: &mut Criterion) {
let emit_reports = emit_reports_enabled();
for scenario in benchmark_scenarios_cached().iter() {
for level in supported_levels_filtered() {
if emit_reports {
let rust_compressed = structured_zstd::encoding::compress_slice_to_vec(
&scenario.bytes[..],
level.rust_level,
);
let ffi_compressed = ffi_encode_to_vec(&scenario.bytes[..], level.ffi_level);
emit_report_line(scenario, level, &rust_compressed, &ffi_compressed);
emit_frame_header_report(scenario, level, "rust", &rust_compressed);
emit_frame_header_report(scenario, level, "ffi", &ffi_compressed);
}
let benchmark_name = format!("compress/{}/{}/{}", level.name, scenario.id, "matrix");
let mut group = c.benchmark_group(benchmark_name);
configure_group(&mut group, scenario);
group.throughput(Throughput::Bytes(scenario.throughput_bytes()));
group.bench_function("pure_rust", |b| {
b.iter(|| {
black_box(structured_zstd::encoding::compress_slice_to_vec(
&scenario.bytes[..],
level.rust_level,
))
})
});
group.bench_function("c_ffi", |b| {
b.iter(|| black_box(ffi_encode_to_vec(&scenario.bytes[..], level.ffi_level)))
});
group.finish();
}
}
}
fn bench_decompress(c: &mut Criterion) {
let emit_reports = emit_reports_enabled();
for scenario in benchmark_scenarios_cached().iter() {
for level in supported_levels_filtered() {
let rust_compressed = structured_zstd::encoding::compress_slice_to_vec(
&scenario.bytes[..],
level.rust_level,
);
let ffi_compressed = ffi_encode_to_vec(&scenario.bytes[..], level.ffi_level);
let expected_len = scenario.len();
bench_decompress_source(
c,
scenario,
level,
"rust_stream",
&rust_compressed,
expected_len,
emit_reports,
);
bench_decompress_source(
c,
scenario,
level,
"c_stream",
&ffi_compressed,
expected_len,
emit_reports,
);
}
}
}
fn bench_decompress_source(
c: &mut Criterion,
scenario: &Scenario,
level: LevelConfig,
source: &'static str,
compressed: &[u8],
expected_len: usize,
_emit_reports: bool,
) {
assert_decompress_matches_reference(scenario, compressed, expected_len);
let benchmark_name = format!(
"decompress/{}/{}/{}/matrix",
level.name, scenario.id, source
);
let mut group = c.benchmark_group(benchmark_name);
configure_group(&mut group, scenario);
group.throughput(Throughput::Bytes(scenario.throughput_bytes()));
group.bench_function("pure_rust", |b| {
let mut target = vec![0u8; expected_len];
let mut decoder = FrameDecoder::new();
b.iter(|| {
let written = decoder
.decode_all(black_box(compressed), &mut target)
.unwrap();
black_box(&target[..written]);
assert_eq!(written, expected_len);
})
});
group.bench_function("c_ffi", |b| {
let mut dctx = FfiDCtxHandle::new();
let mut target = vec![0u8; expected_len];
b.iter(|| {
let written = dctx.decompress_into(black_box(compressed), &mut target);
assert_eq!(written, expected_len);
black_box(&target[..written]);
})
});
group.finish();
}
fn assert_decompress_matches_reference(
scenario: &Scenario,
compressed: &[u8],
expected_len: usize,
) {
let mut rust_target = vec![0u8; expected_len];
let mut rust_decoder = FrameDecoder::new();
let rust_written = rust_decoder
.decode_all(compressed, &mut rust_target)
.unwrap();
assert_eq!(rust_written, expected_len);
assert_eq!(&rust_target[..rust_written], scenario.bytes.as_slice());
let mut ffi_target = vec![0u8; expected_len];
let ffi_written = ffi_decompress_into(compressed, &mut ffi_target);
assert_eq!(ffi_written, expected_len);
assert_eq!(&ffi_target[..ffi_written], scenario.bytes.as_slice());
}
fn bench_dictionary(c: &mut Criterion) {
let emit_reports = emit_reports_enabled();
for scenario in benchmark_scenarios_cached().iter() {
if !matches!(scenario.class, ScenarioClass::Small | ScenarioClass::Corpus) {
continue;
}
let sample_count = training_sample_count(&scenario.bytes);
let total_training_bytes = scenario.bytes.len();
let ffi_samples = [scenario.bytes.as_slice()];
let max_dict_size = total_training_bytes.saturating_sub(64);
let dict_size = dictionary_size_for(scenario.len())
.max(256)
.min(max_dict_size);
let Ok(rust_content_budget) =
finalized_training_content_budget(scenario.bytes.as_slice(), dict_size)
else {
eprintln!(
"BENCH_WARN skipping Rust FastCOVER dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={}) due to finalized content budget error",
scenario.id, sample_count, total_training_bytes, dict_size
);
continue;
};
let fastcover_options = fastcover_fixed_options();
let rust_train_started = Instant::now();
let Ok((rust_raw_dictionary, rust_tuned)) = train_fastcover_raw_from_slice(
scenario.bytes.as_slice(),
rust_content_budget,
&fastcover_options,
) else {
eprintln!(
"BENCH_WARN skipping Rust FastCOVER dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={})",
scenario.id, sample_count, total_training_bytes, dict_size
);
continue;
};
let Ok(rust_dictionary) = finalize_raw_dict(
rust_raw_dictionary.as_slice(),
scenario.bytes.as_slice(),
dict_size,
FinalizeOptions::default(),
) else {
eprintln!(
"BENCH_WARN skipping Rust FastCOVER finalization benchmark for {} (samples={}, total_training_bytes={}, dict_size={})",
scenario.id, sample_count, total_training_bytes, dict_size
);
continue;
};
let rust_train_ms = rust_train_started.elapsed().as_secs_f64() * 1_000.0;
let ffi_train_started = Instant::now();
let Ok(ffi_dictionary) = zstd::dict::from_samples(&ffi_samples, dict_size) else {
eprintln!(
"BENCH_WARN skipping dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={})",
scenario.id,
ffi_samples.len(),
total_training_bytes,
dict_size
);
continue;
};
let ffi_train_ms = ffi_train_started.elapsed().as_secs_f64() * 1_000.0;
if emit_reports {
emit_dictionary_training_report(
scenario,
DictTrainingMetrics {
training_bytes: total_training_bytes,
dict_bytes_requested: dict_size,
rust_train_ms,
ffi_train_ms,
rust_dict_bytes: rust_dictionary.len(),
ffi_dict_bytes: ffi_dictionary.len(),
rust_fastcover_score: rust_tuned.score,
},
);
}
let benchmark_name = format!("dict-train/na/{}/{}", scenario.id, "matrix");
let mut group = c.benchmark_group(benchmark_name);
configure_group(&mut group, scenario);
group.throughput(Throughput::Bytes(total_training_bytes as u64));
group.bench_function("pure_rust", |b| {
b.iter(|| {
let (raw_dict, tuned) = train_fastcover_raw_from_slice(
scenario.bytes.as_slice(),
rust_content_budget,
&fastcover_options,
)
.expect("fastcover training should succeed");
let dict = finalize_raw_dict(
raw_dict.as_slice(),
scenario.bytes.as_slice(),
dict_size,
FinalizeOptions::default(),
)
.expect("fastcover dictionary finalization should succeed");
black_box((dict.len(), tuned.score));
})
});
group.bench_function("c_ffi", |b| {
b.iter(|| {
black_box(
zstd::dict::from_samples(&ffi_samples, dict_size)
.expect("ffi dictionary training should succeed")
.len(),
)
})
});
group.finish();
for level in supported_levels_filtered() {
let mut no_dict = zstd::bulk::Compressor::new(level.ffi_level).unwrap();
let mut with_dict =
zstd::bulk::Compressor::with_dictionary(level.ffi_level, &ffi_dictionary).unwrap();
let no_dict_bytes = no_dict.compress(&scenario.bytes).unwrap();
let with_dict_bytes = with_dict.compress(&scenario.bytes).unwrap();
if emit_reports {
emit_dictionary_report(
scenario,
level,
ffi_dictionary.len(),
ffi_train_ms,
&no_dict_bytes,
&with_dict_bytes,
);
}
let benchmark_name =
format!("compress-dict/{}/{}/{}", level.name, scenario.id, "matrix");
let mut group = c.benchmark_group(benchmark_name);
configure_group(&mut group, scenario);
group.throughput(Throughput::Bytes(scenario.throughput_bytes()));
group.bench_function("c_ffi_without_dict", |b| {
let mut compressor = zstd::bulk::Compressor::new(level.ffi_level).unwrap();
b.iter(|| black_box(compressor.compress(&scenario.bytes).unwrap()))
});
group.bench_function("c_ffi_with_dict", |b| {
let mut compressor =
zstd::bulk::Compressor::with_dictionary(level.ffi_level, &ffi_dictionary)
.unwrap();
b.iter(|| black_box(compressor.compress(&scenario.bytes).unwrap()))
});
group.finish();
}
}
}
fn configure_group<M: criterion::measurement::Measurement>(
group: &mut criterion::BenchmarkGroup<'_, M>,
scenario: &Scenario,
) {
match scenario.class {
ScenarioClass::Small => {
group.sample_size(30);
group.measurement_time(Duration::from_secs(3));
group.sampling_mode(SamplingMode::Flat);
}
ScenarioClass::Corpus | ScenarioClass::Entropy => {
group.sample_size(10);
group.measurement_time(Duration::from_secs(8));
group.sampling_mode(SamplingMode::Flat);
}
ScenarioClass::Large | ScenarioClass::Silesia => {
group.sample_size(10);
group.measurement_time(Duration::from_secs(10));
group.warm_up_time(Duration::from_millis(500));
group.sampling_mode(SamplingMode::Flat);
}
}
}
fn emit_frame_header_report(
scenario: &Scenario,
level: LevelConfig,
encoder: &'static str,
compressed: &[u8],
) {
if compressed.len() < 5 {
println!(
"REPORT_HDR scenario={} level={} encoder={} parse=error",
scenario.id, level.name, encoder
);
return;
}
let desc = compressed[4];
let frame_content_size_flag = desc >> 6;
let single_segment = ((desc >> 5) & 0x1) == 1;
let checksum = ((desc >> 2) & 0x1) == 1;
let dict_id_flag = desc & 0x3;
let dict_id_bytes: u8 = match dict_id_flag {
0 => 0,
1 => 1,
2 => 2,
3 => 4,
_ => unreachable!(),
};
let fcs_bytes: u8 = match frame_content_size_flag {
0 => {
if single_segment {
1
} else {
0
}
}
1 => 2,
2 => 4,
3 => 8,
_ => unreachable!(),
};
let header_bytes =
4u16 + 1 + if single_segment { 0 } else { 1 } + dict_id_bytes as u16 + fcs_bytes as u16;
println!(
"REPORT_HDR scenario={} level={} encoder={} header_bytes={} single_segment={} checksum={} fcs_bytes={} dict_id_bytes={}",
scenario.id,
level.name,
encoder,
header_bytes,
single_segment,
checksum,
fcs_bytes,
dict_id_bytes,
);
}
fn emit_report_line(
scenario: &Scenario,
level: LevelConfig,
rust_compressed: &[u8],
ffi_compressed: &[u8],
) {
let input_len = scenario.len() as f64;
let escaped_label = escape_report_label(&scenario.label);
let (rust_ratio, ffi_ratio) = if input_len > 0.0 {
(
rust_compressed.len() as f64 / input_len,
ffi_compressed.len() as f64 / input_len,
)
} else {
(0.0, 0.0)
};
println!(
"REPORT scenario={} label=\"{}\" level={} input_bytes={} rust_bytes={} ffi_bytes={} rust_ratio={:.6} ffi_ratio={:.6}",
scenario.id,
escaped_label,
level.name,
scenario.len(),
rust_compressed.len(),
ffi_compressed.len(),
rust_ratio,
ffi_ratio
);
}
fn emit_dictionary_report(
scenario: &Scenario,
level: LevelConfig,
dict_bytes: usize,
train_ms: f64,
no_dict_bytes: &[u8],
with_dict_bytes: &[u8],
) {
let input_len = scenario.len() as f64;
let escaped_label = escape_report_label(&scenario.label);
let (no_dict_ratio, with_dict_ratio) = if input_len > 0.0 {
(
no_dict_bytes.len() as f64 / input_len,
with_dict_bytes.len() as f64 / input_len,
)
} else {
(0.0, 0.0)
};
println!(
"REPORT_DICT scenario={} label=\"{}\" level={} dict_bytes={} train_ms={:.3} ffi_no_dict_bytes={} ffi_with_dict_bytes={} ffi_no_dict_ratio={:.6} ffi_with_dict_ratio={:.6}",
scenario.id,
escaped_label,
level.name,
dict_bytes,
train_ms,
no_dict_bytes.len(),
with_dict_bytes.len(),
no_dict_ratio,
with_dict_ratio
);
}
fn emit_dictionary_training_report(scenario: &Scenario, metrics: DictTrainingMetrics) {
let escaped_label = escape_report_label(&scenario.label);
println!(
"REPORT_DICT_TRAIN scenario={} label=\"{}\" training_bytes={} dict_bytes_requested={} rust_train_ms={:.3} ffi_train_ms={:.3} rust_dict_bytes={} ffi_dict_bytes={} rust_fastcover_score={}",
scenario.id,
escaped_label,
metrics.training_bytes,
metrics.dict_bytes_requested,
metrics.rust_train_ms,
metrics.ffi_train_ms,
metrics.rust_dict_bytes,
metrics.ffi_dict_bytes,
metrics.rust_fastcover_score
);
}
struct DictTrainingMetrics {
training_bytes: usize,
dict_bytes_requested: usize,
rust_train_ms: f64,
ffi_train_ms: f64,
rust_dict_bytes: usize,
ffi_dict_bytes: usize,
rust_fastcover_score: usize,
}
fn finalized_training_content_budget(sample: &[u8], dict_size: usize) -> std::io::Result<usize> {
let probe = [0u8; 8];
let finalized = finalize_raw_dict(
probe.as_slice(),
sample,
dict_size,
FinalizeOptions::default(),
)?;
let header_bytes = finalized.len().saturating_sub(probe.len());
Ok(dict_size.saturating_sub(header_bytes))
}
fn training_sample_count(source: &[u8]) -> usize {
let sample_size = source.len().div_ceil(16).clamp(256, 8192);
let samples = source
.chunks(sample_size)
.take(64)
.filter(|chunk| chunk.len() >= 64)
.count();
if samples < 2 {
let midpoint = source.len() / 2;
let left = &source[..midpoint];
let right = &source[midpoint..];
if left.len() >= 64 && right.len() >= 64 {
2
} else {
eprintln!(
"BENCH_WARN tiny dictionary training input ({} bytes), using a single sample fallback",
source.len()
);
1
}
} else {
samples
}
}
fn dictionary_size_for(input_len: usize) -> usize {
input_len.div_ceil(8).clamp(256, 16 * 1024)
}
fn fastcover_fixed_options() -> FastCoverOptions {
FastCoverOptions {
optimize: false,
accel: 4,
k: 256,
d: 8,
f: 20,
..FastCoverOptions::default()
}
}
fn escape_report_label(label: &str) -> String {
label.replace('\\', "\\\\").replace('\"', "\\\"")
}
criterion_group!(benches, bench_compress, bench_decompress, bench_dictionary);
criterion_main!(benches);