use alloc::{boxed::Box, vec::Vec};
use core::convert::TryInto;
#[cfg(feature = "hash")]
use twox_hash::XxHash64;
#[cfg(feature = "hash")]
use core::hash::Hasher;
use super::{
CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
match_generator::MatchGeneratorDriver,
};
use crate::common::MAX_BLOCK_SIZE;
use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
use crate::io::{Read, Write};
pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
uncompressed_data: Option<R>,
compressed_data: Option<W>,
compression_level: CompressionLevel,
dictionary: Option<crate::decoding::Dictionary>,
dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
source_size_hint: Option<u64>,
state: CompressState<M>,
magicless: bool,
#[cfg(feature = "hash")]
hasher: XxHash64,
}
#[derive(Clone, Default)]
struct CachedDictionaryEntropy {
huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
ll_previous: Option<PreviousFseTable>,
ml_previous: Option<PreviousFseTable>,
of_previous: Option<PreviousFseTable>,
}
#[derive(Clone)]
pub(crate) enum PreviousFseTable {
Default,
Custom(Box<FSETable>),
Rle(u8),
}
impl PreviousFseTable {
pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> Option<&'a FSETable> {
match self {
Self::Default => Some(default),
Self::Custom(table) => Some(table),
Self::Rle(_) => None,
}
}
}
pub(crate) struct FseTables {
pub(crate) ll_default: crate::fse::fse_encoder::FseDefaultTable,
pub(crate) ll_previous: Option<PreviousFseTable>,
pub(crate) ml_default: crate::fse::fse_encoder::FseDefaultTable,
pub(crate) ml_previous: Option<PreviousFseTable>,
pub(crate) of_default: crate::fse::fse_encoder::FseDefaultTable,
pub(crate) of_previous: Option<PreviousFseTable>,
}
impl FseTables {
pub fn new() -> Self {
Self {
ll_default: default_ll_table(),
ll_previous: None,
ml_default: default_ml_table(),
ml_previous: None,
of_default: default_of_table(),
of_previous: None,
}
}
#[inline]
#[allow(clippy::borrow_deref_ref)]
pub(crate) fn ll_default_ref(&self) -> &FSETable {
&*self.ll_default
}
#[inline]
#[allow(clippy::borrow_deref_ref)]
pub(crate) fn ml_default_ref(&self) -> &FSETable {
&*self.ml_default
}
#[inline]
#[allow(clippy::borrow_deref_ref)]
pub(crate) fn of_default_ref(&self) -> &FSETable {
&*self.of_default
}
}
const PRESPLIT_BLOCK_MIN: usize = 3500;
const PRESPLIT_THRESHOLD_PENALTY_RATE: u64 = 16;
const PRESPLIT_THRESHOLD_BASE: u64 = PRESPLIT_THRESHOLD_PENALTY_RATE - 2;
const PRESPLIT_THRESHOLD_PENALTY: i32 = 3;
const PRESPLIT_CHUNK_SIZE: usize = 8 << 10;
const PRESPLIT_HASH_LOG_MAX: usize = 10;
const PRESPLIT_HASH_TABLE_SIZE: usize = 1 << PRESPLIT_HASH_LOG_MAX;
const PRESPLIT_KNUTH: u32 = 0x9E37_79B9;
const PRESPLIT_BORDERS_SEGMENT: usize = 512;
#[derive(Clone)]
struct PreSplitFingerprint {
events: [u32; PRESPLIT_HASH_TABLE_SIZE],
nb_events: usize,
}
impl Default for PreSplitFingerprint {
fn default() -> Self {
Self {
events: [0; PRESPLIT_HASH_TABLE_SIZE],
nb_events: 0,
}
}
}
fn presplit_hash2(bytes: &[u8], hash_log: usize) -> usize {
debug_assert!(hash_log >= 8);
if hash_log == 8 {
return bytes[0] as usize;
}
debug_assert!(hash_log <= PRESPLIT_HASH_LOG_MAX);
let value = u16::from_le_bytes([bytes[0], bytes[1]]) as u32;
(value.wrapping_mul(PRESPLIT_KNUTH) >> (32 - hash_log)) as usize
}
fn presplit_record_fingerprint(
fp: &mut PreSplitFingerprint,
src: &[u8],
sampling_rate: usize,
hash_log: usize,
) {
fp.events.fill(0);
fp.nb_events = 0;
if src.len() < 2 {
return;
}
let limit = src.len() - 1;
let mut n = 0usize;
while n < limit {
fp.events[presplit_hash2(&src[n..], hash_log)] += 1;
n += sampling_rate;
}
fp.nb_events += limit / sampling_rate;
}
fn presplit_record_byte_histogram(fp: &mut PreSplitFingerprint, src: &[u8]) {
fp.events.fill(0);
for &b in src {
fp.events[b as usize] += 1;
}
fp.nb_events = src.len();
}
fn presplit_distance(lhs: &PreSplitFingerprint, rhs: &PreSplitFingerprint, hash_log: usize) -> u64 {
let slots = 1usize << hash_log;
let mut distance = 0u64;
for idx in 0..slots {
let left = lhs.events[idx] as i128 * rhs.nb_events as i128;
let right = rhs.events[idx] as i128 * lhs.nb_events as i128;
distance = distance.saturating_add(left.abs_diff(right) as u64);
}
distance
}
fn presplit_fingerprints_differ(
reference: &PreSplitFingerprint,
new_fp: &PreSplitFingerprint,
penalty: i32,
hash_log: usize,
) -> bool {
debug_assert!(reference.nb_events > 0);
debug_assert!(new_fp.nb_events > 0);
let p50 = reference.nb_events as u64 * new_fp.nb_events as u64;
let deviation = presplit_distance(reference, new_fp, hash_log);
let threshold = p50.saturating_mul(PRESPLIT_THRESHOLD_BASE + penalty as u64)
/ PRESPLIT_THRESHOLD_PENALTY_RATE;
deviation >= threshold
}
fn presplit_merge_events(acc: &mut PreSplitFingerprint, new_fp: &PreSplitFingerprint) {
for idx in 0..PRESPLIT_HASH_TABLE_SIZE {
acc.events[idx] = acc.events[idx].saturating_add(new_fp.events[idx]);
}
acc.nb_events = acc.nb_events.saturating_add(new_fp.nb_events);
}
fn donor_split_block_by_chunks(block: &[u8], level: usize) -> usize {
debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
debug_assert!((1..=4).contains(&level));
let (sampling_rate, hash_log) = match level - 1 {
0 => (43, 8),
1 => (11, 9),
2 => (5, 10),
_ => (1, 10),
};
let mut past = PreSplitFingerprint::default();
let mut new_events = PreSplitFingerprint::default();
let mut penalty = PRESPLIT_THRESHOLD_PENALTY;
presplit_record_fingerprint(
&mut past,
&block[..PRESPLIT_CHUNK_SIZE],
sampling_rate,
hash_log,
);
let mut pos = PRESPLIT_CHUNK_SIZE;
while pos <= block.len() - PRESPLIT_CHUNK_SIZE {
presplit_record_fingerprint(
&mut new_events,
&block[pos..pos + PRESPLIT_CHUNK_SIZE],
sampling_rate,
hash_log,
);
if presplit_fingerprints_differ(&past, &new_events, penalty, hash_log) {
return pos;
}
presplit_merge_events(&mut past, &new_events);
if penalty > 0 {
penalty -= 1;
}
pos += PRESPLIT_CHUNK_SIZE;
}
block.len()
}
fn donor_split_block_from_borders(block: &[u8]) -> usize {
debug_assert_eq!(block.len(), MAX_BLOCK_SIZE as usize);
let block_size = block.len();
let mut past = PreSplitFingerprint::default();
let mut new_fp = PreSplitFingerprint::default();
presplit_record_byte_histogram(&mut past, &block[..PRESPLIT_BORDERS_SEGMENT]);
presplit_record_byte_histogram(&mut new_fp, &block[block_size - PRESPLIT_BORDERS_SEGMENT..]);
if !presplit_fingerprints_differ(&past, &new_fp, 0, 8) {
return block_size;
}
let mut middle = PreSplitFingerprint::default();
let mid_start = block_size / 2 - PRESPLIT_BORDERS_SEGMENT / 2;
presplit_record_byte_histogram(
&mut middle,
&block[mid_start..mid_start + PRESPLIT_BORDERS_SEGMENT],
);
let dist_from_begin = presplit_distance(&past, &middle, 8);
let dist_from_end = presplit_distance(&new_fp, &middle, 8);
let min_distance = (PRESPLIT_BORDERS_SEGMENT as u64) * (PRESPLIT_BORDERS_SEGMENT as u64) / 3;
if dist_from_begin.abs_diff(dist_from_end) < min_distance {
return 64 * 1024;
}
if dist_from_begin > dist_from_end {
32 * 1024
} else {
96 * 1024
}
}
fn donor_pre_split_level(level: CompressionLevel) -> Option<usize> {
match level {
CompressionLevel::Level(11..=15) => Some(0),
CompressionLevel::Level(16..=22) => Some(4),
_ => None,
}
}
#[cfg(feature = "bench_internals")]
pub(crate) fn block_splitter_decision_for_bench(block: &[u8], split_level: usize) -> usize {
assert_eq!(
block.len(),
MAX_BLOCK_SIZE as usize,
"block_splitter_decision_for_bench expects exactly MAX_BLOCK_SIZE bytes"
);
assert!(
split_level <= 4,
"block_splitter_decision_for_bench: split_level must be in 0..=4, got {split_level}"
);
if split_level == 0 {
donor_split_block_from_borders(block)
} else {
donor_split_block_by_chunks(block, split_level)
}
}
pub(crate) fn donor_optimal_block_size(
level: CompressionLevel,
block: &[u8],
remaining_src_size: usize,
block_size_max: usize,
savings: i64,
) -> usize {
let Some(split_level) = donor_pre_split_level(level) else {
return remaining_src_size.min(block_size_max);
};
if remaining_src_size < MAX_BLOCK_SIZE as usize || block_size_max < MAX_BLOCK_SIZE as usize {
return remaining_src_size.min(block_size_max);
}
if savings < 3 {
return MAX_BLOCK_SIZE as usize;
}
if block.len() < MAX_BLOCK_SIZE as usize {
return remaining_src_size.min(block_size_max);
}
let raw_split = if split_level == 0 {
donor_split_block_from_borders(&block[..MAX_BLOCK_SIZE as usize])
} else {
donor_split_block_by_chunks(&block[..MAX_BLOCK_SIZE as usize], split_level)
};
raw_split
.max(PRESPLIT_BLOCK_MIN)
.min(MAX_BLOCK_SIZE as usize)
}
pub(crate) struct CompressState<M: Matcher> {
pub(crate) matcher: M,
pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
pub(crate) fse_tables: FseTables,
pub(crate) block_scratch: crate::encoding::blocks::CompressedBlockScratch,
pub(crate) offset_hist: [u32; 3],
pub(crate) strategy_tag: crate::encoding::strategy::StrategyTag,
}
impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
pub fn new(compression_level: CompressionLevel) -> Self {
Self {
uncompressed_data: None,
compressed_data: None,
compression_level,
dictionary: None,
dictionary_entropy_cache: None,
source_size_hint: None,
state: CompressState {
matcher: MatchGeneratorDriver::new(1024 * 128, 1),
last_huff_table: None,
fse_tables: FseTables::new(),
block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
offset_hist: [1, 4, 8],
strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
compression_level,
),
},
magicless: false,
#[cfg(feature = "hash")]
hasher: XxHash64::with_seed(0),
}
}
}
impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
Self {
uncompressed_data: None,
compressed_data: None,
dictionary: None,
dictionary_entropy_cache: None,
source_size_hint: None,
state: CompressState {
matcher,
last_huff_table: None,
fse_tables: FseTables::new(),
block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
offset_hist: [1, 4, 8],
strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
compression_level,
),
},
compression_level,
magicless: false,
#[cfg(feature = "hash")]
hasher: XxHash64::with_seed(0),
}
}
pub fn set_magicless(&mut self, magicless: bool) {
self.magicless = magicless;
}
pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
self.uncompressed_data.replace(uncompressed_data)
}
pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
self.compressed_data.replace(compressed_data)
}
pub fn set_source_size_hint(&mut self, size: u64) {
self.source_size_hint = Some(size);
}
pub fn compress(&mut self) {
let initial_size_hint = self.source_size_hint;
let source_size_hint_known = initial_size_hint.is_some();
let use_dictionary_state =
!matches!(self.compression_level, CompressionLevel::Uncompressed)
&& self.state.matcher.supports_dictionary_priming()
&& self.dictionary.is_some();
if let Some(size_hint) = self.source_size_hint.take() {
self.state.matcher.set_source_size_hint(size_hint);
}
self.state.matcher.reset(self.compression_level);
self.state.offset_hist = [1, 4, 8];
self.state.strategy_tag =
crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level);
let cached_entropy = if use_dictionary_state {
self.dictionary_entropy_cache.as_ref()
} else {
None
};
if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
self.state.offset_hist = dict.offset_hist;
self.state
.matcher
.prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
}
if let Some(cache) = cached_entropy {
self.state.last_huff_table.clone_from(&cache.huff);
} else {
self.state.last_huff_table = None;
}
if let Some(cache) = cached_entropy {
self.state
.fse_tables
.ll_previous
.clone_from(&cache.ll_previous);
self.state
.fse_tables
.ml_previous
.clone_from(&cache.ml_previous);
self.state
.fse_tables
.of_previous
.clone_from(&cache.of_previous);
} else {
self.state.fse_tables.ll_previous = None;
self.state.fse_tables.ml_previous = None;
self.state.fse_tables.of_previous = None;
}
let ll_entropy = cached_entropy.and_then(|cache| match cache.ll_previous.as_ref() {
Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
_ => None,
});
let ml_entropy = cached_entropy.and_then(|cache| match cache.ml_previous.as_ref() {
Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
_ => None,
});
let of_entropy = cached_entropy.and_then(|cache| match cache.of_previous.as_ref() {
Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
_ => None,
});
self.state.matcher.seed_dictionary_entropy(
self.state.last_huff_table.as_ref(),
ll_entropy,
ml_entropy,
of_entropy,
);
#[cfg(feature = "hash")]
{
self.hasher = XxHash64::with_seed(0);
}
let source = self.uncompressed_data.as_mut().unwrap();
let drain = self.compressed_data.as_mut().unwrap();
let window_size = self.state.matcher.window_size();
assert!(
window_size != 0,
"matcher reported window_size == 0, which is invalid"
);
const ALL_BLOCKS_TINY_THRESHOLD: u64 = 4 * 1024;
const ALL_BLOCKS_SMALL_THRESHOLD: u64 = 64 * 1024;
const ALL_BLOCKS_TINY_CAP: usize = 4 * 1024;
const ALL_BLOCKS_SMALL_CAP: usize = 16 * 1024;
const ALL_BLOCKS_DEFAULT_CAP: usize = 130 * 1024;
let initial_all_blocks_cap = match initial_size_hint {
Some(h) if h <= ALL_BLOCKS_TINY_THRESHOLD => ALL_BLOCKS_TINY_CAP,
Some(h) if h <= ALL_BLOCKS_SMALL_THRESHOLD => ALL_BLOCKS_SMALL_CAP,
_ => ALL_BLOCKS_DEFAULT_CAP,
};
let mut all_blocks: Vec<u8> = Vec::with_capacity(initial_all_blocks_cap);
let mut total_uncompressed: u64 = 0;
let mut pending_input: Vec<u8> = Vec::new();
let mut reached_eof = false;
let mut savings = 0i64;
loop {
let block_capacity = MAX_BLOCK_SIZE as usize;
let had_pending = !pending_input.is_empty();
let mut uncompressed_data = if had_pending {
core::mem::take(&mut pending_input)
} else {
self.state.matcher.get_next_space()
};
let mut filled = if had_pending {
uncompressed_data.len()
} else {
0
};
if uncompressed_data.len() < block_capacity {
uncompressed_data.resize(block_capacity, 0);
}
'read_loop: loop {
if reached_eof || filled == block_capacity {
break 'read_loop;
}
let new_bytes = source
.read(&mut uncompressed_data[filled..block_capacity])
.unwrap();
if new_bytes == 0 {
reached_eof = true;
break 'read_loop;
}
filled += new_bytes;
total_uncompressed += new_bytes as u64;
}
uncompressed_data.truncate(filled);
let mut last_block = reached_eof;
let remaining_for_split = if reached_eof {
uncompressed_data.len()
} else {
block_capacity
};
if !matches!(self.compression_level, CompressionLevel::Uncompressed)
&& uncompressed_data.len() == block_capacity
{
let block_len = donor_optimal_block_size(
self.compression_level,
&uncompressed_data,
remaining_for_split,
block_capacity,
savings,
);
if block_len < uncompressed_data.len() {
pending_input = uncompressed_data.split_off(block_len);
if pending_input.capacity() < block_capacity {
pending_input.reserve_exact(block_capacity - pending_input.len());
}
last_block = false;
}
}
#[cfg(feature = "hash")]
self.hasher.write(&uncompressed_data);
if uncompressed_data.is_empty() {
let header = BlockHeader {
last_block: true,
block_type: crate::blocks::block::BlockType::Raw,
block_size: 0,
};
header.serialize(&mut all_blocks);
break;
}
match self.compression_level {
CompressionLevel::Uncompressed => {
let header = BlockHeader {
last_block,
block_type: crate::blocks::block::BlockType::Raw,
block_size: uncompressed_data.len().try_into().unwrap(),
};
header.serialize(&mut all_blocks);
all_blocks.extend_from_slice(&uncompressed_data);
savings +=
uncompressed_data.len() as i64 - (3 + uncompressed_data.len()) as i64;
}
CompressionLevel::Fastest
| CompressionLevel::Default
| CompressionLevel::Better
| CompressionLevel::Best
| CompressionLevel::Level(_) => {
let before_len = all_blocks.len();
let block_len = uncompressed_data.len();
compress_block_encoded(
&mut self.state,
self.compression_level,
last_block,
uncompressed_data,
&mut all_blocks,
);
savings += block_len as i64 - (all_blocks.len() - before_len) as i64;
}
}
if last_block && pending_input.is_empty() {
break;
}
}
let single_segment = !use_dictionary_state
&& source_size_hint_known
&& total_uncompressed >= 512
&& total_uncompressed <= window_size;
let header = FrameHeader {
frame_content_size: Some(total_uncompressed),
single_segment,
content_checksum: cfg!(feature = "hash"),
dictionary_id: if use_dictionary_state {
self.dictionary.as_ref().map(|dict| dict.id as u64)
} else {
None
},
window_size: if single_segment {
None
} else {
Some(window_size)
},
magicless: self.magicless,
};
let mut header_buf: Vec<u8> = Vec::with_capacity(14);
header.serialize(&mut header_buf);
drain.write_all(&header_buf).unwrap();
drain.write_all(&all_blocks).unwrap();
#[cfg(feature = "hash")]
{
let content_checksum = self.hasher.finish();
drain
.write_all(&(content_checksum as u32).to_le_bytes())
.unwrap();
}
}
pub fn source_mut(&mut self) -> Option<&mut R> {
self.uncompressed_data.as_mut()
}
pub fn drain_mut(&mut self) -> Option<&mut W> {
self.compressed_data.as_mut()
}
pub fn source(&self) -> Option<&R> {
self.uncompressed_data.as_ref()
}
pub fn drain(&self) -> Option<&W> {
self.compressed_data.as_ref()
}
pub fn take_source(&mut self) -> Option<R> {
self.uncompressed_data.take()
}
pub fn take_drain(&mut self) -> Option<W> {
self.compressed_data.take()
}
pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
core::mem::swap(&mut match_generator, &mut self.state.matcher);
match_generator
}
pub fn set_compression_level(
&mut self,
compression_level: CompressionLevel,
) -> CompressionLevel {
let old = self.compression_level;
self.compression_level = compression_level;
old
}
pub fn compression_level(&self) -> CompressionLevel {
self.compression_level
}
pub fn set_dictionary(
&mut self,
dictionary: crate::decoding::Dictionary,
) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
{
if dictionary.id == 0 {
return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
}
if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
return Err(
crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
index: index as u8,
},
);
}
self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
huff: dictionary.huf.table.to_encoder_table(),
ll_previous: dictionary
.fse
.literal_lengths
.to_encoder_table()
.map(|table| PreviousFseTable::Custom(Box::new(table))),
ml_previous: dictionary
.fse
.match_lengths
.to_encoder_table()
.map(|table| PreviousFseTable::Custom(Box::new(table))),
of_previous: dictionary
.fse
.offsets
.to_encoder_table()
.map(|table| PreviousFseTable::Custom(Box::new(table))),
});
Ok(self.dictionary.replace(dictionary))
}
pub fn set_dictionary_from_bytes(
&mut self,
raw_dictionary: &[u8],
) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
{
let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
self.set_dictionary(dictionary)
}
pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
self.dictionary_entropy_cache = None;
self.dictionary.take()
}
}
#[cfg(test)]
mod tests {
#[cfg(all(feature = "dict_builder", feature = "std"))]
use alloc::format;
use alloc::vec;
use super::FrameCompressor;
use crate::blocks::block::BlockType;
use crate::common::{MAGIC_NUM, MAX_BLOCK_SIZE};
use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
use crate::encoding::{Matcher, Sequence};
use alloc::vec::Vec;
fn generate_data(seed: u64, len: usize) -> Vec<u8> {
let mut state = seed;
let mut data = Vec::with_capacity(len);
for _ in 0..len {
state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
data.push((state >> 33) as u8);
}
data
}
fn first_block_type(frame: &[u8]) -> BlockType {
let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
let mut decoder = block_decoder::new();
let (header, _) = decoder
.read_block_header(&frame[header_size as usize..])
.expect("block header should parse");
header.block_type
}
#[cfg(feature = "std")]
#[test]
fn fcs_header_written_and_c_zstd_compatible() {
let levels = [
crate::encoding::CompressionLevel::Uncompressed,
crate::encoding::CompressionLevel::Fastest,
crate::encoding::CompressionLevel::Default,
crate::encoding::CompressionLevel::Better,
crate::encoding::CompressionLevel::Best,
];
let fcs_2byte = vec![0xCDu8; 300]; let large = vec![0xABu8; 100_000];
let inputs: [&[u8]; 5] = [
&[],
&[0x00],
b"abcdefghijklmnopqrstuvwxy\n",
&fcs_2byte,
&large,
];
for level in levels {
for data in &inputs {
let compressed = crate::encoding::compress_to_vec(*data, level);
let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
.unwrap()
.0;
assert_eq!(
header.frame_content_size(),
data.len() as u64,
"FCS mismatch for len={} level={:?}",
data.len(),
level,
);
assert_ne!(
header.descriptor.frame_content_size_bytes().unwrap(),
0,
"FCS field must be present for len={} level={:?}",
data.len(),
level,
);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
|e| {
panic!(
"C zstd decode failed for len={} level={level:?}: {e}",
data.len()
)
},
);
assert_eq!(
decoded.as_slice(),
*data,
"C zstd roundtrip failed for len={}",
data.len()
);
}
}
}
#[cfg(feature = "std")]
#[test]
fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
let data = vec![0xAB; 2047];
let compressed = {
let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn small_hinted_default_frame_uses_single_segment_header() {
let data = generate_data(0xD15E_A5ED, 1024);
let compressed = {
let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
assert!(
frame_header.descriptor.single_segment_flag(),
"small hinted default frames should use single-segment header for Rust/FFI parity"
);
assert_eq!(frame_header.frame_content_size(), data.len() as u64);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
.expect("ffi decoder must accept single-segment small hinted default frame");
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn small_hinted_numeric_default_levels_use_single_segment_header() {
let data = generate_data(0xA11C_E003, 1024);
for level in [
super::CompressionLevel::Level(0),
super::CompressionLevel::Level(3),
] {
let compressed = {
let mut compressor = FrameCompressor::new(level);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
assert!(
frame_header.descriptor.single_segment_flag(),
"small hinted numeric default level frames should use single-segment header (level={level:?})"
);
assert_eq!(frame_header.frame_content_size(), data.len() as u64);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
panic!(
"ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
)
});
assert_eq!(decoded, data);
}
}
#[cfg(feature = "std")]
#[test]
fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
let levels = [
super::CompressionLevel::Fastest,
super::CompressionLevel::Default,
super::CompressionLevel::Better,
super::CompressionLevel::Best,
super::CompressionLevel::Level(-1),
super::CompressionLevel::Level(2),
super::CompressionLevel::Level(3),
super::CompressionLevel::Level(4),
super::CompressionLevel::Level(11),
];
let sizes = [
511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
];
for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
for &size in &sizes {
let data = generate_data(seed + seed_idx as u64, size);
for &level in &levels {
let compressed = {
let mut compressor = FrameCompressor::new(level);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
if matches!(size, 511 | 512) {
let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
assert_eq!(
frame_header.descriptor.single_segment_flag(),
size == 512,
"single_segment 511/512 boundary mismatch: level={level:?} size={size}"
);
}
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
|e| {
panic!(
"ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
seed + seed_idx as u64
)
},
);
assert_eq!(
decoded,
data,
"hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
seed + seed_idx as u64
);
}
}
}
}
#[cfg(feature = "std")]
#[test]
fn hinted_levels_use_single_segment_header_symmetrically() {
let levels = [
super::CompressionLevel::Fastest,
super::CompressionLevel::Default,
super::CompressionLevel::Better,
super::CompressionLevel::Best,
super::CompressionLevel::Level(0),
super::CompressionLevel::Level(2),
super::CompressionLevel::Level(3),
super::CompressionLevel::Level(4),
super::CompressionLevel::Level(11),
];
for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
let size = 1024 + seed_idx * 97;
let data = generate_data(seed, size);
for &level in &levels {
let compressed = {
let mut compressor = FrameCompressor::new(level);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
assert!(
frame_header.descriptor.single_segment_flag(),
"hinted frame should be single-segment for level={level:?} size={}",
data.len()
);
assert_eq!(frame_header.frame_content_size(), data.len() as u64);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
panic!(
"ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
data.len()
)
});
assert_eq!(decoded, data);
}
}
}
#[cfg(feature = "std")]
#[test]
fn hinted_levels_pin_511_512_single_segment_boundary() {
let levels = [
super::CompressionLevel::Fastest,
super::CompressionLevel::Default,
super::CompressionLevel::Better,
super::CompressionLevel::Best,
super::CompressionLevel::Level(0),
super::CompressionLevel::Level(2),
super::CompressionLevel::Level(3),
super::CompressionLevel::Level(4),
super::CompressionLevel::Level(11),
];
for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
for &size in &[511usize, 512] {
let data = generate_data(seed + seed_idx as u64, size);
for &level in &levels {
let compressed = {
let mut compressor = FrameCompressor::new(level);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
assert_eq!(
frame_header.descriptor.single_segment_flag(),
size == 512,
"single_segment 511/512 boundary mismatch: level={level:?} size={size}"
);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
|e| {
panic!(
"ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
seed + seed_idx as u64
)
},
);
assert_eq!(decoded, data);
}
}
}
}
#[cfg(feature = "std")]
#[test]
fn fastest_random_block_uses_raw_fast_path() {
let data = generate_data(0xC0FF_EE11, 10 * 1024);
let compressed =
crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
assert_eq!(first_block_type(&compressed), BlockType::Raw);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn default_random_block_uses_raw_fast_path() {
let data = generate_data(0xD15E_A5ED, 10 * 1024);
let compressed =
crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
assert_eq!(first_block_type(&compressed), BlockType::Raw);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn best_random_block_uses_raw_fast_path() {
let data = generate_data(0xB35C_AFE1, 10 * 1024);
let compressed =
crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
assert_eq!(first_block_type(&compressed), BlockType::Raw);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn level2_random_block_uses_raw_fast_path() {
let data = generate_data(0xA11C_E222, 10 * 1024);
let compressed =
crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
assert_eq!(first_block_type(&compressed), BlockType::Raw);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn better_random_block_uses_raw_fast_path() {
let data = generate_data(0xBE77_E111, 10 * 1024);
let compressed =
crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
assert_eq!(first_block_type(&compressed), BlockType::Raw);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
#[cfg(feature = "std")]
#[test]
fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
let mut data = Vec::with_capacity(16 * 1024);
const LINE: &[u8] =
b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
while data.len() < 16 * 1024 {
let remaining = 16 * 1024 - data.len();
data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
}
fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
let compressed = crate::encoding::compress_to_vec(data, level);
assert_ne!(first_block_type(&compressed), BlockType::Raw);
assert!(
compressed.len() < data.len(),
"compressible input should remain compressible for level={level:?}"
);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
assert_eq!(decoded, data);
}
assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
}
#[cfg(feature = "std")]
#[test]
fn hinted_small_compressible_frames_use_single_segment_across_levels() {
let mut data = Vec::with_capacity(4 * 1024);
const LINE: &[u8] =
b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
while data.len() < 4 * 1024 {
let remaining = 4 * 1024 - data.len();
data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
}
for level in [
super::CompressionLevel::Fastest,
super::CompressionLevel::Default,
super::CompressionLevel::Better,
super::CompressionLevel::Best,
super::CompressionLevel::Level(0),
super::CompressionLevel::Level(3),
super::CompressionLevel::Level(4),
super::CompressionLevel::Level(11),
] {
let compressed = {
let mut compressor = FrameCompressor::new(level);
compressor.set_source_size_hint(data.len() as u64);
compressor.set_source(data.as_slice());
let mut out = Vec::new();
compressor.set_drain(&mut out);
compressor.compress();
out
};
let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
assert!(
frame_header.descriptor.single_segment_flag(),
"hinted small compressible frame should use single-segment (level={level:?})"
);
assert_ne!(
first_block_type(&compressed),
BlockType::Raw,
"compressible hinted frame should stay off raw fast path (level={level:?})"
);
assert!(
compressed.len() < data.len(),
"compressible hinted frame should still shrink (level={level:?})"
);
let mut decoded = Vec::new();
zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
.unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
assert_eq!(decoded, data);
}
}
struct NoDictionaryMatcher {
last_space: Vec<u8>,
window_size: u64,
}
impl NoDictionaryMatcher {
fn new(window_size: u64) -> Self {
Self {
last_space: Vec::new(),
window_size,
}
}
}
impl Matcher for NoDictionaryMatcher {
fn get_next_space(&mut self) -> Vec<u8> {
vec![0; self.window_size as usize]
}
fn get_last_space(&mut self) -> &[u8] {
self.last_space.as_slice()
}
fn commit_space(&mut self, space: Vec<u8>) {
self.last_space = space;
}
fn skip_matching(&mut self) {}
fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
handle_sequence(Sequence::Literals {
literals: self.last_space.as_slice(),
});
}
fn reset(&mut self, _level: super::CompressionLevel) {
self.last_space.clear();
}
fn window_size(&self) -> u64 {
self.window_size
}
}
#[test]
fn frame_starts_with_magic_num() {
let mock_data = [1_u8, 2, 3].as_slice();
let mut output: Vec<u8> = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
compressor.set_source(mock_data);
compressor.set_drain(&mut output);
compressor.compress();
assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
}
#[test]
fn very_simple_raw_compress() {
let mock_data = [1_u8, 2, 3].as_slice();
let mut output: Vec<u8> = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
compressor.set_source(mock_data);
compressor.set_drain(&mut output);
compressor.compress();
}
#[test]
fn very_simple_compress() {
let mut mock_data = vec![0; 1 << 17];
mock_data.extend(vec![1; (1 << 17) - 1]);
mock_data.extend(vec![2; (1 << 18) - 1]);
mock_data.extend(vec![2; 1 << 17]);
mock_data.extend(vec![3; (1 << 17) - 1]);
let mut output: Vec<u8> = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
compressor.set_source(mock_data.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
let mut decoder = FrameDecoder::new();
let mut decoded = Vec::with_capacity(mock_data.len());
decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
assert_eq!(mock_data, decoded);
let mut decoded = Vec::new();
zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
assert_eq!(mock_data, decoded);
}
#[test]
fn rle_compress() {
let mock_data = vec![0; 1 << 19];
let mut output: Vec<u8> = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
compressor.set_source(mock_data.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
let mut decoder = FrameDecoder::new();
let mut decoded = Vec::with_capacity(mock_data.len());
decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
assert_eq!(mock_data, decoded);
}
#[test]
fn aaa_compress() {
let mock_data = vec![0, 1, 3, 4, 5];
let mut output: Vec<u8> = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
compressor.set_source(mock_data.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
let mut decoder = FrameDecoder::new();
let mut decoded = Vec::with_capacity(mock_data.len());
decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
assert_eq!(mock_data, decoded);
let mut decoded = Vec::new();
zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
assert_eq!(mock_data, decoded);
}
#[test]
fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
let dict_raw = include_bytes!("../../dict_tests/dictionary");
let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
let mut data = Vec::new();
for _ in 0..8 {
data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
}
let mut with_dict = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
let previous = compressor
.set_dictionary_from_bytes(dict_raw)
.expect("dictionary bytes should parse");
assert!(
previous.is_none(),
"first dictionary insert should return None"
);
assert_eq!(
compressor
.set_dictionary(dict_for_encoder)
.expect("valid dictionary should attach")
.expect("set_dictionary_from_bytes inserted previous dictionary")
.id,
dict_for_decoder.id
);
compressor.set_source(data.as_slice());
compressor.set_drain(&mut with_dict);
compressor.compress();
let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
.expect("encoded stream should have a frame header");
assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
let mut decoder = FrameDecoder::new();
let mut missing_dict_target = Vec::with_capacity(data.len());
let err = decoder
.decode_all_to_vec(&with_dict, &mut missing_dict_target)
.unwrap_err();
assert!(
matches!(
&err,
crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
),
"dict-compressed stream should require dictionary id, got: {err:?}"
);
let mut decoder = FrameDecoder::new();
decoder.add_dict(dict_for_decoder).unwrap();
let mut decoded = Vec::with_capacity(data.len());
decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
assert_eq!(decoded, data);
let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
let mut ffi_decoded = Vec::with_capacity(data.len());
let ffi_written = ffi_decoder
.decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
.unwrap();
assert_eq!(ffi_written, data.len());
assert_eq!(ffi_decoded, data);
}
#[cfg(all(feature = "dict_builder", feature = "std"))]
#[test]
fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
use std::io::Cursor;
let mut training = Vec::new();
for idx in 0..256u32 {
training.extend_from_slice(
format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
);
}
let mut raw_dict = Vec::new();
crate::dictionary::create_raw_dict_from_source(
Cursor::new(training.as_slice()),
training.len(),
&mut raw_dict,
4096,
)
.expect("dict_builder training should succeed");
assert!(
!raw_dict.is_empty(),
"dict_builder produced an empty dictionary"
);
let dict_id = 0xD1C7_0008;
let encoder_dict =
crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
let decoder_dict =
crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
let mut payload = Vec::new();
for idx in 0..96u32 {
payload.extend_from_slice(
format!(
"tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
)
.as_bytes(),
);
}
let mut without_dict = Vec::new();
let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
baseline.set_source(payload.as_slice());
baseline.set_drain(&mut without_dict);
baseline.compress();
let mut with_dict = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
compressor
.set_dictionary(encoder_dict)
.expect("valid dict_builder dictionary should attach");
compressor.set_source(payload.as_slice());
compressor.set_drain(&mut with_dict);
compressor.compress();
let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
.expect("encoded stream should have a frame header");
assert_eq!(frame_header.dictionary_id(), Some(dict_id));
let mut decoder = FrameDecoder::new();
decoder.add_dict(decoder_dict).unwrap();
let mut decoded = Vec::with_capacity(payload.len());
decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
assert_eq!(decoded, payload);
assert!(
with_dict.len() < without_dict.len(),
"trained dictionary should improve compression for this small payload"
);
}
#[test]
fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
let dict_raw = include_bytes!("../../dict_tests/dictionary");
let mut output = Vec::new();
let input = b"";
let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
let previous = compressor
.set_dictionary_from_bytes(dict_raw)
.expect("dictionary bytes should parse");
assert!(previous.is_none());
compressor.set_source(input.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
assert!(
compressor.state.last_huff_table.is_some(),
"dictionary entropy should seed previous huffman table before first block"
);
assert!(
compressor.state.fse_tables.ll_previous.is_some(),
"dictionary entropy should seed previous ll table before first block"
);
assert!(
compressor.state.fse_tables.ml_previous.is_some(),
"dictionary entropy should seed previous ml table before first block"
);
assert!(
compressor.state.fse_tables.of_previous.is_some(),
"dictionary entropy should seed previous of table before first block"
);
}
#[test]
fn set_dictionary_rejects_zero_dictionary_id() {
let invalid = crate::decoding::Dictionary {
id: 0,
fse: crate::decoding::scratch::FSEScratch::new(),
huf: crate::decoding::scratch::HuffmanScratch::new(),
dict_content: vec![1, 2, 3],
offset_hist: [1, 4, 8],
};
let mut compressor: FrameCompressor<
&[u8],
Vec<u8>,
crate::encoding::match_generator::MatchGeneratorDriver,
> = FrameCompressor::new(super::CompressionLevel::Fastest);
let result = compressor.set_dictionary(invalid);
assert!(matches!(
result,
Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
));
}
#[test]
fn set_dictionary_rejects_zero_repeat_offsets() {
let invalid = crate::decoding::Dictionary {
id: 1,
fse: crate::decoding::scratch::FSEScratch::new(),
huf: crate::decoding::scratch::HuffmanScratch::new(),
dict_content: vec![1, 2, 3],
offset_hist: [0, 4, 8],
};
let mut compressor: FrameCompressor<
&[u8],
Vec<u8>,
crate::encoding::match_generator::MatchGeneratorDriver,
> = FrameCompressor::new(super::CompressionLevel::Fastest);
let result = compressor.set_dictionary(invalid);
assert!(matches!(
result,
Err(
crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
index: 0
}
)
));
}
#[test]
fn uncompressed_mode_does_not_require_dictionary() {
let dict_id = 0xABCD_0001;
let dict =
crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
.expect("raw dictionary should be valid");
let payload = b"plain-bytes-that-should-stay-raw";
let mut output = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
compressor
.set_dictionary(dict)
.expect("dictionary should attach in uncompressed mode");
compressor.set_source(payload.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
.expect("encoded frame should have a header");
assert_eq!(
frame_header.dictionary_id(),
None,
"raw/uncompressed frames must not advertise dictionary dependency"
);
let mut decoder = FrameDecoder::new();
let mut decoded = Vec::with_capacity(payload.len());
decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
assert_eq!(decoded, payload);
}
#[test]
fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
use crate::encoding::match_generator::MatchGeneratorDriver;
let dict_id = 0xABCD_0002;
let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
.expect("raw dictionary should be valid");
let dict_for_decoder =
crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
.expect("raw dictionary should be valid");
let payload = b"abcdefgh".repeat(512 * 1024 / 8 + 64);
let matcher = MatchGeneratorDriver::new(1024, 1);
let mut no_dict_output = Vec::new();
let mut no_dict_compressor =
FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
no_dict_compressor.set_source(payload.as_slice());
no_dict_compressor.set_drain(&mut no_dict_output);
no_dict_compressor.compress();
let (no_dict_frame_header, _) =
crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
.expect("baseline frame should have a header");
let no_dict_window = no_dict_frame_header
.window_size()
.expect("window size should be present");
let mut output = Vec::new();
let matcher = MatchGeneratorDriver::new(1024, 1);
let mut compressor =
FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
compressor
.set_dictionary(dict)
.expect("dictionary should attach");
compressor.set_source(payload.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
.expect("encoded frame should have a header");
let advertised_window = frame_header
.window_size()
.expect("window size should be present");
assert_eq!(
advertised_window, no_dict_window,
"dictionary priming must not inflate advertised window size"
);
assert!(
payload.len() > advertised_window as usize,
"test must cross the advertised window boundary"
);
let mut decoder = FrameDecoder::new();
decoder.add_dict(dict_for_decoder).unwrap();
let mut decoded = Vec::with_capacity(payload.len());
decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
assert_eq!(decoded, payload);
}
#[test]
fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
let dict_id = 0xABCD_0004;
let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
let dict_for_decoder =
crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
let payload = b"abcdabcdabcdabcd".repeat(128);
let mut hinted_output = Vec::new();
let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
hinted.set_dictionary(dict).unwrap();
hinted.set_source_size_hint(1);
hinted.set_source(payload.as_slice());
hinted.set_drain(&mut hinted_output);
hinted.compress();
let mut no_hint_output = Vec::new();
let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
no_hint
.set_dictionary(
crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
.unwrap(),
)
.unwrap();
no_hint.set_source(payload.as_slice());
no_hint.set_drain(&mut no_hint_output);
no_hint.compress();
let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
.expect("encoded frame should have a header")
.0
.window_size()
.expect("window size should be present");
let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
.expect("encoded frame should have a header")
.0
.window_size()
.expect("window size should be present");
assert!(
hinted_window <= no_hint_window,
"source-size hint should not increase advertised window with dictionary priming",
);
let mut decoder = FrameDecoder::new();
decoder.add_dict(dict_for_decoder).unwrap();
let mut decoded = Vec::with_capacity(payload.len());
decoder
.decode_all_to_vec(&hinted_output, &mut decoded)
.unwrap();
assert_eq!(decoded, payload);
}
#[test]
fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
let dict_id = 0xABCD_0005;
let dict_content = b"abcd".repeat(1024); let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
let dict_for_decoder =
crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
let payload = b"abcd".repeat(1024); let payload_len = payload.len() as u64;
let mut hinted_output = Vec::new();
let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
hinted.set_dictionary(dict).unwrap();
hinted.set_source_size_hint(payload_len);
hinted.set_source(payload.as_slice());
hinted.set_drain(&mut hinted_output);
hinted.compress();
let mut no_hint_output = Vec::new();
let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
no_hint
.set_dictionary(
crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
.unwrap(),
)
.unwrap();
no_hint.set_source(payload.as_slice());
no_hint.set_drain(&mut no_hint_output);
no_hint.compress();
let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
.expect("encoded frame should have a header")
.0
.window_size()
.expect("window size should be present");
let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
.expect("encoded frame should have a header")
.0
.window_size()
.expect("window size should be present");
assert!(
hinted_window <= no_hint_window,
"source-size hint should not increase advertised window with dictionary priming",
);
let mut decoder = FrameDecoder::new();
decoder.add_dict(dict_for_decoder).unwrap();
let mut decoded = Vec::with_capacity(payload.len());
decoder
.decode_all_to_vec(&hinted_output, &mut decoded)
.unwrap();
assert_eq!(decoded, payload);
}
#[test]
fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
let dict_id = 0xABCD_0003;
let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
.expect("raw dictionary should be valid");
let payload = b"abcdefghabcdefgh";
let mut output = Vec::new();
let matcher = NoDictionaryMatcher::new(64);
let mut compressor =
FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
compressor
.set_dictionary(dict)
.expect("dictionary should attach");
compressor.set_source(payload.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
.expect("encoded frame should have a header");
assert_eq!(
frame_header.dictionary_id(),
None,
"matchers that do not support dictionary priming must not advertise dictionary dependency"
);
let mut decoder = FrameDecoder::new();
let mut decoded = Vec::with_capacity(payload.len());
decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
assert_eq!(decoded, payload);
}
#[cfg(feature = "hash")]
#[test]
fn checksum_two_frames_reused_compressor() {
let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
let mut compressed1 = Vec::new();
compressor.set_source(data.as_slice());
compressor.set_drain(&mut compressed1);
compressor.compress();
let mut compressed2 = Vec::new();
compressor.set_source(data.as_slice());
compressor.set_drain(&mut compressed2);
compressor.compress();
fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
let mut decoder = FrameDecoder::new();
let mut source = compressed;
decoder.reset(&mut source).unwrap();
while !decoder.is_finished() {
decoder
.decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
.unwrap();
}
let mut decoded = Vec::new();
decoder.collect_to_writer(&mut decoded).unwrap();
(
decoded,
decoder.get_checksum_from_data(),
decoder.get_calculated_checksum(),
)
}
let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
assert_eq!(
chksum_from_data1, chksum_calculated1,
"frame 1: checksum mismatch"
);
let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
assert_eq!(
chksum_from_data2, chksum_calculated2,
"frame 2: checksum mismatch"
);
assert_eq!(
chksum_from_data1, chksum_from_data2,
"frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
);
}
#[cfg(feature = "std")]
#[test]
fn fuzz_targets() {
use std::io::Read;
fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
let mut result: Vec<u8> = Vec::new();
decoder.read_to_end(&mut result).expect("Decoding failed");
result
}
fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
let mut decoder = crate::decoding::FrameDecoder::new();
decoder.reset(&mut data).unwrap();
let mut result = vec![];
while !decoder.is_finished() || decoder.can_collect() > 0 {
decoder
.decode_blocks(
&mut data,
crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
)
.unwrap();
decoder.collect_to_writer(&mut result).unwrap();
}
result
}
fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
zstd::stream::encode_all(std::io::Cursor::new(data), 3)
}
fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
let mut input = Vec::new();
data.read_to_end(&mut input).unwrap();
crate::encoding::compress_to_vec(
input.as_slice(),
crate::encoding::CompressionLevel::Uncompressed,
)
}
fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
let mut input = Vec::new();
data.read_to_end(&mut input).unwrap();
crate::encoding::compress_to_vec(
input.as_slice(),
crate::encoding::CompressionLevel::Fastest,
)
}
fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let mut output = Vec::new();
zstd::stream::copy_decode(data, &mut output)?;
Ok(output)
}
if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
if file.as_ref().unwrap().file_type().unwrap().is_file() {
let data = std::fs::read(file.unwrap().path()).unwrap();
let data = data.as_slice();
let compressed = encode_zstd(data).unwrap();
let decoded = decode_szstd(&mut compressed.as_slice());
let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
assert!(
decoded == data,
"Decoded data did not match the original input during decompression"
);
assert_eq!(
decoded2, data,
"Decoded data did not match the original input during decompression"
);
let mut input = data;
let compressed = encode_szstd_uncompressed(&mut input);
let decoded = decode_zstd(&compressed).unwrap();
assert_eq!(
decoded, data,
"Decoded data did not match the original input during compression"
);
let mut input = data;
let compressed = encode_szstd_compressed(&mut input);
let decoded = decode_zstd(&compressed).unwrap();
assert_eq!(
decoded, data,
"Decoded data did not match the original input during compression"
);
}
}
}
}
#[test]
fn donor_split_block_from_borders_keeps_homogeneous_block() {
let block = vec![0xAAu8; MAX_BLOCK_SIZE as usize];
let split = super::donor_split_block_from_borders(&block);
assert_eq!(split, MAX_BLOCK_SIZE as usize);
}
#[test]
fn donor_split_block_from_borders_returns_midpoint_for_centred_transition() {
let mut block = vec![0u8; MAX_BLOCK_SIZE as usize];
for (i, byte) in block
.iter_mut()
.enumerate()
.skip(MAX_BLOCK_SIZE as usize / 2)
{
*byte = (i % 251 + 1) as u8;
}
let split = super::donor_split_block_from_borders(&block);
assert_eq!(
split,
64 * 1024,
"centred-transition fixture must take the symmetric \
midpoint arm (`abs_diff < min_distance`), got {split}"
);
}
#[test]
fn donor_pre_split_level_dispatches_by_compression_level() {
use crate::encoding::CompressionLevel;
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Fastest),
None
);
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Default),
None
);
assert_eq!(super::donor_pre_split_level(CompressionLevel::Better), None);
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Level(7)),
None
);
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Level(11)),
Some(0)
);
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Level(15)),
Some(0)
);
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Level(16)),
Some(4)
);
assert_eq!(
super::donor_pre_split_level(CompressionLevel::Level(22)),
Some(4)
);
}
#[test]
fn level_13_borders_split_roundtrips_through_own_decoder() {
use crate::encoding::CompressionLevel;
let mut data = vec![0u8; 256 * 1024];
for (i, byte) in data.iter_mut().enumerate() {
*byte = if i < 128 * 1024 {
(i & 0x07) as u8
} else {
(i % 251 + 1) as u8
};
}
let mut compressed = Vec::new();
let mut compressor = FrameCompressor::new(CompressionLevel::Level(13));
compressor.set_source(data.as_slice());
compressor.set_drain(&mut compressed);
compressor.compress();
let mut decoder = FrameDecoder::new();
let mut source = compressed.as_slice();
decoder
.reset(&mut source)
.expect("frame header should parse");
while !decoder.is_finished() {
decoder
.decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
.expect("decode should succeed");
}
let mut decoded = Vec::with_capacity(data.len());
decoder.collect_to_writer(&mut decoded).unwrap();
assert_eq!(decoded, data, "roundtrip must reproduce the input verbatim");
}
#[cfg(feature = "std")]
#[test]
fn set_compression_level_then_compress_refreshes_strategy_tag() {
use super::CompressionLevel;
use crate::encoding::strategy::StrategyTag;
let data = vec![0xABu8; 256];
let mut out = Vec::new();
let mut compressor = FrameCompressor::new(CompressionLevel::Fastest);
let initial_tag = compressor.state.strategy_tag;
assert_eq!(
initial_tag,
StrategyTag::for_compression_level(CompressionLevel::Fastest),
"construction-time strategy_tag must reflect initial level",
);
let new_level = CompressionLevel::Level(20);
compressor.set_compression_level(new_level);
compressor.set_source(data.as_slice());
compressor.set_drain(&mut out);
compressor.compress();
let new_tag = compressor.state.strategy_tag;
let expected = StrategyTag::for_compression_level(new_level);
assert_eq!(
new_tag, expected,
"strategy_tag must follow set_compression_level → compress, \
got {new_tag:?} expected {expected:?}",
);
assert_eq!(
expected,
StrategyTag::BtUltra2,
"test fixture invariant: Level(20) must resolve to BtUltra2 \
so the post-switch tag visibly crosses the band boundary",
);
assert_ne!(
new_tag, initial_tag,
"test fixture invariant: chosen levels must resolve to \
different StrategyTag variants",
);
}
#[test]
fn magicless_frame_omits_magic_and_roundtrips() {
use crate::common::MAGIC_NUM;
let input: alloc::vec::Vec<u8> = (0..512u32).map(|i| (i ^ 0xA5) as u8).collect();
let mut output: Vec<u8> = Vec::new();
let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
compressor.set_magicless(true);
compressor.set_source(input.as_slice());
compressor.set_drain(&mut output);
compressor.compress();
assert!(
!output.starts_with(&MAGIC_NUM.to_le_bytes()),
"magicless frame must omit the 4-byte magic prefix",
);
let mut decoder = crate::decoding::FrameDecoder::new();
decoder.set_magicless(true);
let mut cursor: &[u8] = output.as_slice();
decoder.init(&mut cursor).expect("magicless init");
decoder
.decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
.expect("decode_blocks");
let mut decoded: Vec<u8> = Vec::new();
decoder
.collect_to_writer(&mut decoded)
.expect("collect_to_writer");
assert_eq!(decoded, input, "magicless roundtrip must preserve bytes");
use crate::decoding::errors::{FrameDecoderError, ReadFrameHeaderError};
let mut std_decoder = crate::decoding::FrameDecoder::new();
let std_init = std_decoder.init(output.as_slice());
match std_init {
Err(FrameDecoderError::ReadFrameHeaderError(
ReadFrameHeaderError::BadMagicNumber(_) | ReadFrameHeaderError::SkipFrame { .. },
)) => {}
other => panic!(
"standard decoder must reject a magicless frame with \
ReadFrameHeaderError::BadMagicNumber or SkipFrame, got {other:?}",
),
}
}
}