use crate::ArchiveError;
#[derive(Debug, Clone)]
pub enum StreamingCodec {
Zstd { level: i32 },
Deflate { level: u32 },
Brotli { quality: u32 },
Lz4,
Passthrough,
}
const WINDOW_SIZE: usize = 4096;
const MIN_MATCH: usize = 3;
const END_SENTINEL: u8 = 0xFF;
pub fn compress_lz77(data: &[u8], search_depth: usize) -> Vec<u8> {
let depth = search_depth.max(1);
let mut out: Vec<u8> = Vec::with_capacity(data.len());
let mut pos = 0usize;
while pos < data.len() {
let mut literals: Vec<u8> = Vec::new();
loop {
if pos >= data.len() {
break;
}
let window_start = pos.saturating_sub(WINDOW_SIZE);
let mut best_offset = 0usize;
let mut best_len = 0usize;
let mut checked = 0usize;
let mut search = pos.saturating_sub(1);
while search >= window_start && checked < depth {
let mut mlen = 0usize;
while pos + mlen < data.len()
&& mlen < 255
&& data[search + mlen] == data[pos + mlen]
{
mlen += 1;
if search + mlen >= pos {
break;
}
}
if mlen >= MIN_MATCH && mlen > best_len {
best_len = mlen;
best_offset = pos - search;
}
if search == window_start {
break;
}
search -= 1;
checked += 1;
}
if best_len >= MIN_MATCH {
while !literals.is_empty() {
let chunk_len = literals.len().min(0xFE);
out.push(chunk_len as u8);
out.extend_from_slice(&literals[..chunk_len]);
out.push(0x00);
out.push(0x00);
out.push(0x00);
literals.drain(..chunk_len);
}
out.push(0x00);
out.extend_from_slice(&(best_offset as u16).to_le_bytes());
out.push(best_len as u8);
pos += best_len;
break; } else {
literals.push(data[pos]);
pos += 1;
if literals.len() == 0xFE {
out.push(0xFE);
out.extend_from_slice(&literals);
out.push(0x00);
out.push(0x00);
out.push(0x00);
literals.clear();
break; }
}
}
while !literals.is_empty() {
let chunk_len = literals.len().min(0xFE);
out.push(chunk_len as u8);
out.extend_from_slice(&literals[..chunk_len]);
out.push(0x00);
out.push(0x00);
out.push(0x00);
literals.drain(..chunk_len);
}
}
out.push(END_SENTINEL);
out.push(0x00);
out.push(0x00);
out.push(0x00);
out
}
pub fn decompress_lz77(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut out: Vec<u8> = Vec::new();
let mut cursor = 0usize;
loop {
if cursor >= data.len() {
return Err(ArchiveError::Corruption(
"lz77: unexpected end of stream (no sentinel)".to_string(),
));
}
let literal_len = data[cursor];
cursor += 1;
if literal_len == END_SENTINEL {
if cursor + 3 > data.len() {
return Err(ArchiveError::Corruption(
"lz77: truncated sentinel".to_string(),
));
}
let _ = cursor + 3;
break;
}
if cursor + literal_len as usize > data.len() {
return Err(ArchiveError::Corruption(format!(
"lz77: literal overflow at cursor {cursor}"
)));
}
out.extend_from_slice(&data[cursor..cursor + literal_len as usize]);
cursor += literal_len as usize;
if cursor + 3 > data.len() {
return Err(ArchiveError::Corruption(format!(
"lz77: missing match descriptor at cursor {cursor}"
)));
}
let offset = u16::from_le_bytes([data[cursor], data[cursor + 1]]) as usize;
let match_len = data[cursor + 2] as usize;
cursor += 3;
if offset == 0 && match_len == 0 {
continue;
}
if offset == 0 {
return Err(ArchiveError::Corruption(
"lz77: match with zero offset".to_string(),
));
}
if offset > out.len() {
return Err(ArchiveError::Corruption(format!(
"lz77: match offset {offset} exceeds output length {}",
out.len()
)));
}
let start = out.len() - offset;
for i in 0..match_len {
let byte = out[start + i];
out.push(byte);
}
}
Ok(out)
}
pub fn lz4_compress(data: &[u8]) -> Vec<u8> {
if data.is_empty() {
return Vec::new();
}
const LZ4_WINDOW: usize = 65535;
const LZ4_MIN_MATCH: usize = 4;
const LZ4_SEARCH_DEPTH: usize = 16;
let mut out: Vec<u8> = Vec::with_capacity(data.len() + data.len() / 4 + 16);
let mut pos = 0usize;
let mut anchor = 0usize;
while pos < data.len() {
let window_start = pos.saturating_sub(LZ4_WINDOW);
let mut best_offset = 0usize;
let mut best_len = 0usize;
let mut checked = 0usize;
let search_start = if pos >= LZ4_MIN_MATCH {
pos - 1
} else {
pos += 1;
continue;
};
let mut search = search_start;
while search >= window_start && checked < LZ4_SEARCH_DEPTH {
let mut mlen = 0usize;
while pos + mlen < data.len()
&& mlen < 65535 + 4
&& data[search + mlen] == data[pos + mlen]
{
mlen += 1;
if search + mlen >= pos {
break;
}
}
if mlen >= LZ4_MIN_MATCH && mlen > best_len {
best_len = mlen;
best_offset = pos - search;
}
if search == window_start {
break;
}
search -= 1;
checked += 1;
}
if best_len >= LZ4_MIN_MATCH {
lz4_write_sequence(&mut out, &data[anchor..pos], best_offset, best_len);
anchor = pos + best_len;
pos = anchor;
} else {
pos += 1;
}
}
lz4_write_last_sequence(&mut out, &data[anchor..]);
out
}
fn lz4_write_extra_len(out: &mut Vec<u8>, mut extra: usize) {
while extra >= 255 {
out.push(255);
extra -= 255;
}
out.push(extra as u8);
}
fn lz4_write_sequence(out: &mut Vec<u8>, literals: &[u8], offset: usize, match_len: usize) {
let lit_len = literals.len();
let ml_code = (match_len - 4).min(15);
let ll_code = lit_len.min(15);
let token = ((ll_code as u8) << 4) | (ml_code as u8);
out.push(token);
if lit_len >= 15 {
lz4_write_extra_len(out, lit_len - 15);
}
out.extend_from_slice(literals);
out.extend_from_slice(&(offset as u16).to_le_bytes());
if match_len - 4 >= 15 {
lz4_write_extra_len(out, match_len - 4 - 15);
}
}
fn lz4_write_last_sequence(out: &mut Vec<u8>, literals: &[u8]) {
let lit_len = literals.len();
let ll_code = lit_len.min(15);
let token = (ll_code as u8) << 4; out.push(token);
if lit_len >= 15 {
lz4_write_extra_len(out, lit_len - 15);
}
out.extend_from_slice(literals);
}
fn lz4_read_extra_len(data: &[u8], cursor: &mut usize) -> Result<usize, ArchiveError> {
let mut total = 0usize;
loop {
if *cursor >= data.len() {
return Err(ArchiveError::Corruption(
"lz4: truncated extra length".to_string(),
));
}
let b = data[*cursor] as usize;
*cursor += 1;
total += b;
if b < 255 {
break;
}
}
Ok(total)
}
pub fn lz4_decompress(data: &[u8], expected_size_hint: usize) -> Result<Vec<u8>, ArchiveError> {
if data.is_empty() {
return Ok(Vec::new());
}
let cap = if expected_size_hint > 0 {
expected_size_hint
} else {
data.len() * 4
};
let mut out: Vec<u8> = Vec::with_capacity(cap);
let mut cursor = 0usize;
loop {
if cursor >= data.len() {
break;
}
let token = data[cursor];
cursor += 1;
let mut lit_len = (token >> 4) as usize;
if lit_len == 15 {
lit_len += lz4_read_extra_len(data, &mut cursor)?;
}
if cursor + lit_len > data.len() {
return Err(ArchiveError::Corruption(format!(
"lz4: literal overflow at cursor {cursor}"
)));
}
out.extend_from_slice(&data[cursor..cursor + lit_len]);
cursor += lit_len;
if cursor >= data.len() {
break;
}
if cursor + 2 > data.len() {
return Err(ArchiveError::Corruption(
"lz4: truncated match offset".to_string(),
));
}
let offset = u16::from_le_bytes([data[cursor], data[cursor + 1]]) as usize;
cursor += 2;
if offset == 0 {
return Err(ArchiveError::Corruption(
"lz4: zero match offset".to_string(),
));
}
let mut match_len = (token & 0x0F) as usize + 4;
if match_len - 4 == 15 {
match_len += lz4_read_extra_len(data, &mut cursor)?;
}
if offset > out.len() {
return Err(ArchiveError::Corruption(format!(
"lz4: match offset {offset} exceeds output length {}",
out.len()
)));
}
let start = out.len() - offset;
for i in 0..match_len {
let byte = out[start + i];
out.push(byte);
}
}
Ok(out)
}
#[derive(Debug, Clone)]
pub struct CompressionStats {
pub codec: String,
pub input_bytes: u64,
pub output_bytes: u64,
pub compression_ratio: f64,
pub throughput_mb_per_sec: f64,
}
pub struct StreamingCompressor {
pub codec: StreamingCodec,
pub input_bytes_total: u64,
pub output_bytes_total: u64,
}
impl StreamingCompressor {
pub fn new(codec: StreamingCodec) -> Self {
Self {
codec,
input_bytes_total: 0,
output_bytes_total: 0,
}
}
pub fn compress_chunk(&mut self, chunk: &[u8]) -> Vec<u8> {
self.input_bytes_total += chunk.len() as u64;
let compressed = match &self.codec {
StreamingCodec::Zstd { level } => {
let depth = (*level).clamp(1, 22) as usize;
compress_lz77(chunk, depth)
}
StreamingCodec::Deflate { level } => {
let depth = (*level).clamp(0, 9).max(1) as usize;
compress_lz77(chunk, depth)
}
StreamingCodec::Lz4 => lz4_compress(chunk),
StreamingCodec::Brotli { .. } | StreamingCodec::Passthrough => chunk.to_vec(),
};
self.output_bytes_total += compressed.len() as u64;
compressed
}
pub fn finish(&mut self) -> Vec<u8> {
let mut footer = Vec::with_capacity(8);
footer.extend_from_slice(&[0x4C, 0x5A, 0x37, 0x00]);
let input_lo = (self.input_bytes_total & 0xFFFF_FFFF) as u32;
footer.extend_from_slice(&input_lo.to_le_bytes());
footer
}
pub fn stats(&self) -> CompressionStats {
let ratio = if self.output_bytes_total == 0 {
1.0
} else {
self.input_bytes_total as f64 / self.output_bytes_total as f64
};
let codec_name = match &self.codec {
StreamingCodec::Zstd { level } => format!("zstd(level={level})"),
StreamingCodec::Deflate { level } => format!("deflate(level={level})"),
StreamingCodec::Brotli { quality } => format!("brotli(quality={quality})-passthrough"),
StreamingCodec::Lz4 => "lz4".to_string(),
StreamingCodec::Passthrough => "passthrough".to_string(),
};
CompressionStats {
codec: codec_name,
input_bytes: self.input_bytes_total,
output_bytes: self.output_bytes_total,
compression_ratio: ratio,
throughput_mb_per_sec: 0.0,
}
}
}
pub struct StreamingDecompressor {
pub codec: StreamingCodec,
pub bytes_decompressed: u64,
}
impl StreamingDecompressor {
pub fn new(codec: StreamingCodec) -> Self {
Self {
codec,
bytes_decompressed: 0,
}
}
pub fn decompress_chunk(&mut self, chunk: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let decompressed = match &self.codec {
StreamingCodec::Zstd { .. } | StreamingCodec::Deflate { .. } => decompress_lz77(chunk)?,
StreamingCodec::Lz4 => lz4_decompress(chunk, 0)?,
StreamingCodec::Brotli { .. } | StreamingCodec::Passthrough => chunk.to_vec(),
};
self.bytes_decompressed += decompressed.len() as u64;
Ok(decompressed)
}
}
impl StreamingCompressor {
pub fn write(&mut self, data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
Ok(self.compress_chunk(data))
}
}
impl StreamingDecompressor {
pub fn write(&mut self, data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
self.decompress_chunk(data)
}
}
#[derive(Debug, Clone)]
pub struct StreamingCompressorBuilder {
codec: StreamingCodec,
buffer_capacity: usize,
chunk_size: usize,
}
impl StreamingCompressorBuilder {
#[must_use]
pub fn new(codec: StreamingCodec) -> Self {
Self {
codec,
buffer_capacity: 65536,
chunk_size: 0,
}
}
#[must_use]
pub fn buffer_capacity(mut self, capacity: usize) -> Self {
self.buffer_capacity = capacity;
self
}
#[must_use]
pub fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
#[must_use]
pub fn build(self) -> StreamingCompressor {
StreamingCompressor::new(self.codec)
}
}
#[derive(Debug, Clone)]
pub struct StreamingDecompressorBuilder {
codec: StreamingCodec,
buffer_capacity: usize,
}
impl StreamingDecompressorBuilder {
#[must_use]
pub fn new(codec: StreamingCodec) -> Self {
Self {
codec,
buffer_capacity: 65536,
}
}
#[must_use]
pub fn buffer_capacity(mut self, capacity: usize) -> Self {
self.buffer_capacity = capacity;
self
}
#[must_use]
pub fn build(self) -> StreamingDecompressor {
StreamingDecompressor::new(self.codec)
}
}
pub const FRAME_MAGIC: [u8; 4] = [0x4F, 0x58, 0x46, 0x01];
fn encode_frame(payload: &[u8]) -> Vec<u8> {
let mut frame = Vec::with_capacity(8 + payload.len());
frame.extend_from_slice(&FRAME_MAGIC);
frame.extend_from_slice(&(payload.len() as u32).to_le_bytes());
frame.extend_from_slice(payload);
frame
}
pub struct FramedStreamingCompressor {
inner: StreamingCompressor,
}
impl FramedStreamingCompressor {
#[must_use]
pub fn new(codec: StreamingCodec) -> Self {
Self {
inner: StreamingCompressor::new(codec),
}
}
pub fn write(&mut self, data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let compressed = self.inner.compress_chunk(data);
Ok(encode_frame(&compressed))
}
pub fn finish(&mut self) -> Vec<u8> {
encode_frame(&[])
}
#[must_use]
pub fn stats(&self) -> CompressionStats {
self.inner.stats()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FrameParseState {
Magic,
Length,
Payload { remaining: usize },
}
pub struct FramedStreamingDecompressor {
inner: StreamingDecompressor,
buf: Vec<u8>,
state: FrameParseState,
magic_buf: Vec<u8>,
len_buf: Vec<u8>,
payload_buf: Vec<u8>,
eos: bool,
}
impl FramedStreamingDecompressor {
#[must_use]
pub fn new(codec: StreamingCodec) -> Self {
Self {
inner: StreamingDecompressor::new(codec),
buf: Vec::new(),
state: FrameParseState::Magic,
magic_buf: Vec::with_capacity(4),
len_buf: Vec::with_capacity(4),
payload_buf: Vec::new(),
eos: false,
}
}
#[must_use]
pub fn is_eos(&self) -> bool {
self.eos
}
pub fn write(&mut self, data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
self.buf.extend_from_slice(data);
let mut out: Vec<u8> = Vec::new();
let mut cursor = 0usize;
loop {
match self.state {
FrameParseState::Magic => {
while self.magic_buf.len() < 4 && cursor < self.buf.len() {
self.magic_buf.push(self.buf[cursor]);
cursor += 1;
}
if self.magic_buf.len() < 4 {
break; }
if self.magic_buf[..4] != FRAME_MAGIC {
self.buf.drain(..cursor);
return Err(ArchiveError::Corruption(format!(
"framed: invalid magic {:?}",
&self.magic_buf[..4]
)));
}
self.magic_buf.clear();
self.state = FrameParseState::Length;
}
FrameParseState::Length => {
while self.len_buf.len() < 4 && cursor < self.buf.len() {
self.len_buf.push(self.buf[cursor]);
cursor += 1;
}
if self.len_buf.len() < 4 {
break; }
let payload_len = u32::from_le_bytes([
self.len_buf[0],
self.len_buf[1],
self.len_buf[2],
self.len_buf[3],
]) as usize;
self.len_buf.clear();
if payload_len == 0 {
self.eos = true;
self.state = FrameParseState::Magic;
break;
}
self.payload_buf.clear();
self.payload_buf.reserve(payload_len);
self.state = FrameParseState::Payload {
remaining: payload_len,
};
}
FrameParseState::Payload { remaining } => {
let available = self.buf.len() - cursor;
let take = available.min(remaining);
self.payload_buf
.extend_from_slice(&self.buf[cursor..cursor + take]);
cursor += take;
let new_remaining = remaining - take;
if new_remaining == 0 {
let decompressed = self.inner.decompress_chunk(&self.payload_buf)?;
out.extend_from_slice(&decompressed);
self.payload_buf.clear();
self.state = FrameParseState::Magic;
} else {
self.state = FrameParseState::Payload {
remaining: new_remaining,
};
break; }
}
}
}
if cursor > 0 {
self.buf.drain(..cursor);
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lz77_roundtrip_simple() {
let data = b"hello, world! hello, world!";
let compressed = compress_lz77(data, 8);
let decompressed = decompress_lz77(&compressed).expect("decompression failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_lz77_roundtrip_repetitive() {
let data: Vec<u8> = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAA".to_vec();
let compressed = compress_lz77(&data, 8);
let decompressed = decompress_lz77(&compressed).expect("decompression failed");
assert_eq!(decompressed, data);
assert!(compressed.len() < data.len(), "expected compression");
}
#[test]
fn test_lz77_empty() {
let data: &[u8] = b"";
let compressed = compress_lz77(data, 4);
let decompressed = decompress_lz77(&compressed).expect("decompression failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_lz77_incompressible() {
let data: Vec<u8> = (0u8..=255).cycle().take(512).collect();
let compressed = compress_lz77(&data, 4);
let decompressed = decompress_lz77(&compressed).expect("decompression failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_lz77_large_repetitive() {
let data: Vec<u8> = b"OxiMedia archive streaming compression test "
.iter()
.cycle()
.take(4096)
.copied()
.collect();
let compressed = compress_lz77(&data, 12);
let decompressed = decompress_lz77(&compressed).expect("decompression failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_lz4_roundtrip_simple() {
let data = b"the quick brown fox jumps over the lazy dog";
let compressed = lz4_compress(data);
let decompressed = lz4_decompress(&compressed, data.len()).expect("lz4 decompress failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_lz4_roundtrip_repetitive() {
let data: Vec<u8> = b"ABCDABCDABCDABCDABCDABCDABCDABCD".to_vec();
let compressed = lz4_compress(&data);
let decompressed = lz4_decompress(&compressed, data.len()).expect("lz4 decompress failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_lz4_empty() {
let data: &[u8] = b"";
let compressed = lz4_compress(data);
let decompressed = lz4_decompress(&compressed, 0).expect("lz4 decompress failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_streaming_zstd_roundtrip() {
let original = b"hello streaming zstd! hello streaming zstd!";
let mut compressor = StreamingCompressor::new(StreamingCodec::Zstd { level: 3 });
let compressed = compressor.compress_chunk(original);
let _footer = compressor.finish();
let mut decompressor = StreamingDecompressor::new(StreamingCodec::Zstd { level: 3 });
let decompressed = decompressor
.decompress_chunk(&compressed)
.expect("streaming zstd decompress failed");
assert_eq!(decompressed.as_slice(), original.as_ref());
}
#[test]
fn test_streaming_deflate_roundtrip() {
let original = b"deflate streaming test data deflate streaming test data";
let mut compressor = StreamingCompressor::new(StreamingCodec::Deflate { level: 6 });
let compressed = compressor.compress_chunk(original);
let mut decompressor = StreamingDecompressor::new(StreamingCodec::Deflate { level: 6 });
let decompressed = decompressor
.decompress_chunk(&compressed)
.expect("streaming deflate decompress failed");
assert_eq!(decompressed.as_slice(), original.as_ref());
}
#[test]
fn test_streaming_lz4_roundtrip() {
let original = b"lz4 streaming chunk test lz4 streaming chunk test lz4";
let mut compressor = StreamingCompressor::new(StreamingCodec::Lz4);
let compressed = compressor.compress_chunk(original);
let mut decompressor = StreamingDecompressor::new(StreamingCodec::Lz4);
let decompressed = decompressor
.decompress_chunk(&compressed)
.expect("streaming lz4 decompress failed");
assert_eq!(decompressed.as_slice(), original.as_ref());
}
#[test]
fn test_streaming_passthrough_roundtrip() {
let original = b"passthrough data is not modified";
let mut compressor = StreamingCompressor::new(StreamingCodec::Passthrough);
let compressed = compressor.compress_chunk(original);
assert_eq!(compressed.as_slice(), original.as_ref());
let mut decompressor = StreamingDecompressor::new(StreamingCodec::Passthrough);
let decompressed = decompressor
.decompress_chunk(&compressed)
.expect("streaming passthrough decompress failed");
assert_eq!(decompressed.as_slice(), original.as_ref());
}
#[test]
fn test_streaming_stats() {
let data: Vec<u8> = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
.iter()
.cycle()
.take(1024)
.copied()
.collect();
let mut compressor = StreamingCompressor::new(StreamingCodec::Zstd { level: 5 });
let _ = compressor.compress_chunk(&data);
let stats = compressor.stats();
assert_eq!(stats.input_bytes, 1024);
assert!(stats.output_bytes > 0);
assert!(
stats.compression_ratio > 1.0,
"should compress repetitive data"
);
assert!(stats.codec.contains("zstd"));
}
#[test]
fn test_compressor_write_api_zstd() {
let data = b"write api test data write api test data";
let mut c = StreamingCompressor::new(StreamingCodec::Zstd { level: 3 });
let compressed = c.write(data).expect("compress write failed");
let mut d = StreamingDecompressor::new(StreamingCodec::Zstd { level: 3 });
let decompressed = d.write(&compressed).expect("decompress write failed");
assert_eq!(decompressed.as_slice(), data.as_ref());
}
#[test]
fn test_compressor_write_api_lz4() {
let data: Vec<u8> = b"lz4 write api "
.iter()
.cycle()
.take(200)
.copied()
.collect();
let mut c = StreamingCompressor::new(StreamingCodec::Lz4);
let compressed = c.write(&data).expect("compress failed");
let mut d = StreamingDecompressor::new(StreamingCodec::Lz4);
let decompressed = d.write(&compressed).expect("decompress failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_compressor_write_api_passthrough() {
let data = b"passthrough write";
let mut c = StreamingCompressor::new(StreamingCodec::Passthrough);
let out = c.write(data).expect("passthrough failed");
assert_eq!(out.as_slice(), data.as_ref());
let mut d = StreamingDecompressor::new(StreamingCodec::Passthrough);
let back = d.write(&out).expect("decompress passthrough failed");
assert_eq!(back.as_slice(), data.as_ref());
}
#[test]
fn test_compressor_write_api_deflate() {
let data = b"deflate write api test data deflate write api test data";
let mut c = StreamingCompressor::new(StreamingCodec::Deflate { level: 6 });
let compressed = c.write(data).expect("deflate compress failed");
let mut d = StreamingDecompressor::new(StreamingCodec::Deflate { level: 6 });
let decompressed = d.write(&compressed).expect("deflate decompress failed");
assert_eq!(decompressed.as_slice(), data.as_ref());
}
#[test]
fn test_builder_basic() {
let mut c = StreamingCompressorBuilder::new(StreamingCodec::Lz4)
.buffer_capacity(1024)
.chunk_size(512)
.build();
let data = b"builder test data";
let compressed = c.write(data).expect("builder compress failed");
let mut d = StreamingDecompressor::new(StreamingCodec::Lz4);
let back = d.write(&compressed).expect("decompress failed");
assert_eq!(back.as_slice(), data.as_ref());
}
#[test]
fn test_builder_defaults() {
let b = StreamingCompressorBuilder::new(StreamingCodec::Passthrough);
assert_eq!(b.buffer_capacity, 65536);
assert_eq!(b.chunk_size, 0);
}
#[test]
fn test_decompressor_builder_basic() {
let mut d = StreamingDecompressorBuilder::new(StreamingCodec::Passthrough)
.buffer_capacity(2048)
.build();
let out = d.write(b"hello").expect("failed");
assert_eq!(out.as_slice(), b"hello");
}
#[test]
fn test_framed_roundtrip_zstd() {
let data = b"framed zstd roundtrip data framed zstd roundtrip data";
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Zstd { level: 3 });
let frame = fc.write(data).expect("framed compress failed");
let eos = fc.finish();
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Zstd { level: 3 });
let out = fd.write(&frame).expect("framed decompress failed");
let _ = fd.write(&eos).expect("eos write failed");
assert_eq!(out.as_slice(), data.as_ref());
assert!(fd.is_eos());
}
#[test]
fn test_framed_roundtrip_lz4() {
let data: Vec<u8> = b"lz4 framed test "
.iter()
.cycle()
.take(300)
.copied()
.collect();
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Lz4);
let frame = fc.write(&data).expect("compress failed");
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Lz4);
let out = fd.write(&frame).expect("decompress failed");
assert_eq!(out, data);
}
#[test]
fn test_framed_roundtrip_passthrough() {
let data = b"passthrough framed";
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Passthrough);
let frame = fc.write(data).expect("failed");
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Passthrough);
let out = fd.write(&frame).expect("failed");
assert_eq!(out.as_slice(), data.as_ref());
}
#[test]
fn test_framed_multiple_frames() {
let chunks: &[&[u8]] = &[b"chunk one data", b"chunk two data", b"chunk three data"];
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Passthrough);
let mut all_framed: Vec<u8> = Vec::new();
for chunk in chunks {
all_framed.extend(fc.write(chunk).expect("compress failed"));
}
all_framed.extend(fc.finish());
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Passthrough);
let out = fd.write(&all_framed).expect("decompress failed");
let expected: Vec<u8> = chunks.iter().flat_map(|c| c.iter().copied()).collect();
assert_eq!(out, expected);
assert!(fd.is_eos());
}
#[test]
fn test_framed_partial_feed() {
let data = b"partial feed test data partial feed test";
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Passthrough);
let frame = fc.write(data).expect("compress failed");
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Passthrough);
let mut collected: Vec<u8> = Vec::new();
for byte in &frame {
let chunk = fd
.write(std::slice::from_ref(byte))
.expect("partial feed failed");
collected.extend(chunk);
}
assert_eq!(collected.as_slice(), data.as_ref());
}
#[test]
fn test_framed_eos_marker() {
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Passthrough);
let eos = fc.finish();
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Passthrough);
assert!(!fd.is_eos());
let _ = fd.write(&eos).expect("eos failed");
assert!(fd.is_eos());
}
#[test]
fn test_framed_stats_available() {
let mut fc = FramedStreamingCompressor::new(StreamingCodec::Zstd { level: 3 });
let data: Vec<u8> = b"stats test ".iter().cycle().take(512).copied().collect();
let _ = fc.write(&data).expect("compress failed");
let stats = fc.stats();
assert_eq!(stats.input_bytes, 512);
assert!(stats.output_bytes > 0);
}
#[test]
fn test_framed_invalid_magic_returns_error() {
let bad_frame = b"\x00\x00\x00\x00\x05\x00\x00\x00hello";
let mut fd = FramedStreamingDecompressor::new(StreamingCodec::Passthrough);
let result = fd.write(bad_frame);
assert!(result.is_err(), "should detect invalid magic");
}
}