use std::io::{self, Read, Write};
use std::sync::Arc;
use oxiarc_core::cancel::CancellationToken;
use oxiarc_core::progress::ProgressHandle;
use crate::compress;
use crate::crc32c::{crc32c, masked_crc32c};
use crate::decompress;
use crate::error::SnappyError;
use crate::pool::{PoolInner, SnappyPool};
const STREAM_IDENTIFIER: [u8; 10] = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
const CHUNK_TYPE_OXIARC_DICT: u8 = 0xFE;
const OXIARC_DICT_MAGIC: &[u8] = b"OXIAD";
const STREAM_BODY: [u8; 6] = [0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
const CHUNK_TYPE_COMPRESSED: u8 = 0x00;
const CHUNK_TYPE_UNCOMPRESSED: u8 = 0x01;
const CHUNK_TYPE_STREAM_ID: u8 = 0xFF;
const MAX_UNCOMPRESSED_CHUNK_SIZE: usize = 65536;
pub struct FrameEncoder<W: Write> {
inner: Option<W>,
buffer: Vec<u8>,
header_written: bool,
progress: Option<ProgressHandle>,
cancel: Option<CancellationToken>,
bytes_processed: u64,
pool: Option<SnappyPool>,
}
impl<W: Write> FrameEncoder<W> {
pub fn new(inner: W) -> Self {
Self {
inner: Some(inner),
buffer: Vec::with_capacity(MAX_UNCOMPRESSED_CHUNK_SIZE),
header_written: false,
progress: None,
cancel: None,
bytes_processed: 0,
pool: None,
}
}
pub fn with_pool(inner: W, pool: &SnappyPool) -> Self {
let mut enc = Self::new(inner);
enc.pool = Some(pool.clone());
enc
}
pub fn with_progress(mut self, handle: ProgressHandle) -> Self {
self.progress = Some(handle);
self
}
pub fn with_cancel(mut self, token: CancellationToken) -> Self {
self.cancel = Some(token);
self
}
pub fn finish(mut self) -> io::Result<W> {
self.flush_buffer()?;
self.inner
.take()
.ok_or_else(|| io::Error::other("encoder already finished"))
}
fn ensure_header(&mut self) -> io::Result<()> {
if !self.header_written {
if let Some(ref mut w) = self.inner {
w.write_all(&STREAM_IDENTIFIER)?;
}
self.header_written = true;
}
Ok(())
}
fn flush_buffer(&mut self) -> io::Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
self.ensure_header()?;
if let Some(ref token) = self.cancel {
token
.check()
.map_err(|_| io::Error::other("operation cancelled"))?;
}
let chunk_len = self.buffer.len() as u64;
let data = if let Some(ref snappy_pool) = self.pool {
let pi = &snappy_pool.inner;
let mut replacement = {
let mut guard = pi.encoder_scratch.lock().unwrap_or_else(|e| e.into_inner());
if let Some(mut b) = guard.pop() {
pi.encoder_scratch_hits
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
b.clear();
b
} else {
pi.encoder_scratch_allocs
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Vec::with_capacity(crate::pool::ENCODER_SCRATCH_CAP)
}
};
std::mem::swap(&mut self.buffer, &mut replacement);
replacement
} else {
std::mem::take(&mut self.buffer)
};
self.write_chunk(&data)?;
if let Some(ref snappy_pool) = self.pool {
let pi = &snappy_pool.inner;
let mut buf = data;
buf.clear();
let mut guard = pi.encoder_scratch.lock().unwrap_or_else(|e| e.into_inner());
if guard.len() < pi.cap {
guard.push(buf);
}
}
self.bytes_processed += chunk_len;
if let Some(ref handle) = self.progress {
handle.on_progress(self.bytes_processed, None);
}
Ok(())
}
fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
let writer = self
.inner
.as_mut()
.ok_or_else(|| io::Error::other("encoder already finished"))?;
let checksum = masked_crc32c(data);
let compressed = compress::compress(data);
if compressed.len() < data.len() {
let chunk_len = 4 + compressed.len(); write_chunk_header(writer, CHUNK_TYPE_COMPRESSED, chunk_len)?;
writer.write_all(&checksum.to_le_bytes())?;
writer.write_all(&compressed)?;
} else {
let chunk_len = 4 + data.len(); write_chunk_header(writer, CHUNK_TYPE_UNCOMPRESSED, chunk_len)?;
writer.write_all(&checksum.to_le_bytes())?;
writer.write_all(data)?;
}
Ok(())
}
}
impl<W: Write> Write for FrameEncoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
self.ensure_header()?;
let mut written = 0;
while written < buf.len() {
let remaining_capacity = MAX_UNCOMPRESSED_CHUNK_SIZE - self.buffer.len();
let to_copy = remaining_capacity.min(buf.len() - written);
self.buffer
.extend_from_slice(&buf[written..written + to_copy]);
written += to_copy;
if self.buffer.len() >= MAX_UNCOMPRESSED_CHUNK_SIZE {
self.flush_buffer()?;
}
}
Ok(written)
}
fn flush(&mut self) -> io::Result<()> {
self.flush_buffer()?;
if let Some(ref mut w) = self.inner {
w.flush()?;
}
Ok(())
}
}
impl<W: Write> Drop for FrameEncoder<W> {
fn drop(&mut self) {
if !self.buffer.is_empty() && self.inner.is_some() {
let _ = self.flush_buffer();
}
}
}
pub struct FrameDecoder<R: Read> {
inner: R,
output_buffer: Vec<u8>,
output_pos: usize,
header_validated: bool,
at_eof: bool,
progress: Option<ProgressHandle>,
cancel: Option<CancellationToken>,
bytes_processed: u64,
pool: Option<Arc<PoolInner>>,
}
impl<R: Read> FrameDecoder<R> {
pub fn new(inner: R) -> Self {
Self {
inner,
output_buffer: Vec::new(),
output_pos: 0,
header_validated: false,
at_eof: false,
progress: None,
cancel: None,
bytes_processed: 0,
pool: None,
}
}
pub fn with_pool(inner: R, pool: &SnappyPool) -> Self {
let mut dec = Self::new(inner);
dec.pool = Some(Arc::clone(&pool.inner));
dec
}
pub fn with_progress(mut self, handle: ProgressHandle) -> Self {
self.progress = Some(handle);
self
}
pub fn with_cancel(mut self, token: CancellationToken) -> Self {
self.cancel = Some(token);
self
}
fn validate_header(&mut self) -> io::Result<()> {
if self.header_validated {
return Ok(());
}
let mut header = [0u8; 10];
match self.inner.read_exact(&mut header) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
self.at_eof = true;
return Ok(());
}
Err(e) => return Err(e),
}
if header != STREAM_IDENTIFIER {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
SnappyError::InvalidStreamIdentifier.to_string(),
));
}
self.header_validated = true;
Ok(())
}
fn read_next_chunk(&mut self) -> io::Result<bool> {
if let Some(ref token) = self.cancel {
token
.check()
.map_err(|_| io::Error::other("operation cancelled"))?;
}
let mut chunk_header = [0u8; 4];
match self.inner.read_exact(&mut chunk_header) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
self.at_eof = true;
return Ok(false);
}
Err(e) => return Err(e),
}
let chunk_type = chunk_header[0];
let chunk_len = (chunk_header[1] as usize)
| ((chunk_header[2] as usize) << 8)
| ((chunk_header[3] as usize) << 16);
match chunk_type {
CHUNK_TYPE_COMPRESSED => {
self.read_compressed_chunk(chunk_len)?;
self.emit_chunk_progress();
Ok(true)
}
CHUNK_TYPE_UNCOMPRESSED => {
self.read_uncompressed_chunk(chunk_len)?;
self.emit_chunk_progress();
Ok(true)
}
CHUNK_TYPE_STREAM_ID => {
self.read_stream_identifier_chunk(chunk_len)?;
Ok(true)
}
0x02..=0x7F => {
Err(io::Error::new(
io::ErrorKind::InvalidData,
SnappyError::InvalidChunkType { chunk_type }.to_string(),
))
}
_ => {
let mut skip_buf = vec![0u8; chunk_len];
self.inner.read_exact(&mut skip_buf)?;
Ok(true)
}
}
}
fn emit_chunk_progress(&mut self) {
let chunk_size = self.output_buffer.len() as u64;
self.bytes_processed += chunk_size;
if let Some(ref handle) = self.progress {
handle.on_progress(self.bytes_processed, None);
}
}
fn acquire_decoder_scratch(&mut self, chunk_len: usize) -> Vec<u8> {
if let Some(ref pool_inner) = self.pool {
let mut guard = pool_inner
.decoder_scratch
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(mut b) = guard.pop() {
pool_inner
.decoder_scratch_hits
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
b.clear();
b.resize(chunk_len, 0);
return b;
}
pool_inner
.decoder_scratch_allocs
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
vec![0u8; chunk_len]
}
fn release_decoder_scratch(&self, mut buf: Vec<u8>) {
if let Some(ref pool_inner) = self.pool {
buf.clear();
let mut guard = pool_inner
.decoder_scratch
.lock()
.unwrap_or_else(|e| e.into_inner());
if guard.len() < pool_inner.cap {
guard.push(buf);
}
}
}
fn read_compressed_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
if chunk_len < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"compressed chunk too short for checksum",
));
}
let mut chunk_data = self.acquire_decoder_scratch(chunk_len);
self.inner.read_exact(&mut chunk_data)?;
let expected_checksum =
u32::from_le_bytes([chunk_data[0], chunk_data[1], chunk_data[2], chunk_data[3]]);
let compressed_data = &chunk_data[4..];
let decompressed = decompress::decompress(compressed_data)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
self.release_decoder_scratch(chunk_data);
let computed_checksum = masked_crc32c(&decompressed);
if expected_checksum != computed_checksum {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
SnappyError::ChecksumMismatch {
expected: expected_checksum,
computed: computed_checksum,
}
.to_string(),
));
}
self.output_buffer = decompressed;
self.output_pos = 0;
Ok(())
}
fn read_uncompressed_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
if chunk_len < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"uncompressed chunk too short for checksum",
));
}
let mut chunk_data = self.acquire_decoder_scratch(chunk_len);
self.inner.read_exact(&mut chunk_data)?;
let expected_checksum =
u32::from_le_bytes([chunk_data[0], chunk_data[1], chunk_data[2], chunk_data[3]]);
let data_slice = chunk_data[4..].to_vec();
self.release_decoder_scratch(chunk_data);
let computed_checksum = masked_crc32c(&data_slice);
if expected_checksum != computed_checksum {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
SnappyError::ChecksumMismatch {
expected: expected_checksum,
computed: computed_checksum,
}
.to_string(),
));
}
self.output_buffer = data_slice;
self.output_pos = 0;
Ok(())
}
fn read_stream_identifier_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
if chunk_len != 6 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid stream identifier length",
));
}
let mut body = [0u8; 6];
self.inner.read_exact(&mut body)?;
if body != STREAM_BODY {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
SnappyError::InvalidStreamIdentifier.to_string(),
));
}
Ok(())
}
}
impl<R: Read> Read for FrameDecoder<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if !self.header_validated && !self.at_eof {
self.validate_header()?;
}
loop {
if self.at_eof {
return Ok(0);
}
let available = self.output_buffer.len() - self.output_pos;
if available > 0 {
let to_copy = available.min(buf.len());
buf[..to_copy].copy_from_slice(
&self.output_buffer[self.output_pos..self.output_pos + to_copy],
);
self.output_pos += to_copy;
return Ok(to_copy);
}
if !self.read_next_chunk()? {
return Ok(0);
}
}
}
}
fn write_chunk_header(writer: &mut impl Write, chunk_type: u8, data_len: usize) -> io::Result<()> {
let header = [
chunk_type,
(data_len & 0xFF) as u8,
((data_len >> 8) & 0xFF) as u8,
((data_len >> 16) & 0xFF) as u8,
];
writer.write_all(&header)
}
pub fn compress_frame_pooled(input: &[u8], pool: &SnappyPool) -> io::Result<Vec<u8>> {
let mut output = Vec::new();
let mut encoder = FrameEncoder::with_pool(&mut output, pool);
encoder.write_all(input)?;
encoder.finish()?;
Ok(output)
}
pub fn compress_frame_with_dict(input: &[u8], dict: &[u8]) -> Vec<u8> {
let dict = if dict.len() > 65536 {
&dict[dict.len() - 65536..]
} else {
dict
};
let mut output = Vec::new();
output.extend_from_slice(&STREAM_IDENTIFIER);
let dict_crc = crc32c(dict);
let dict_len_u32 = dict.len() as u32;
let mut dict_body = Vec::with_capacity(13);
dict_body.extend_from_slice(OXIARC_DICT_MAGIC);
dict_body.extend_from_slice(&dict_crc.to_le_bytes());
dict_body.extend_from_slice(&dict_len_u32.to_le_bytes());
let body_len = dict_body.len();
output.push(CHUNK_TYPE_OXIARC_DICT);
output.push((body_len & 0xFF) as u8);
output.push(((body_len >> 8) & 0xFF) as u8);
output.push(((body_len >> 16) & 0xFF) as u8);
output.extend_from_slice(&dict_body);
let mut src_pos = 0usize;
while src_pos < input.len() {
let chunk_end = (src_pos + MAX_UNCOMPRESSED_CHUNK_SIZE).min(input.len());
let chunk_data = &input[src_pos..chunk_end];
let checksum = masked_crc32c(chunk_data);
let compressed = compress::compress_block_with_dict(chunk_data, dict);
if compressed.len() < chunk_data.len() {
let chunk_len = 4 + compressed.len();
write_chunk_header_vec(&mut output, CHUNK_TYPE_COMPRESSED, chunk_len);
output.extend_from_slice(&checksum.to_le_bytes());
output.extend_from_slice(&compressed);
} else {
let chunk_len = 4 + chunk_data.len();
write_chunk_header_vec(&mut output, CHUNK_TYPE_UNCOMPRESSED, chunk_len);
output.extend_from_slice(&checksum.to_le_bytes());
output.extend_from_slice(chunk_data);
}
src_pos = chunk_end;
}
output
}
pub fn decompress_frame_with_dict(input: &[u8], dict: &[u8]) -> Result<Vec<u8>, SnappyError> {
let dict = if dict.len() > 65536 {
&dict[dict.len() - 65536..]
} else {
dict
};
let mut pos = 0usize;
if pos + 10 > input.len() {
return Err(SnappyError::UnexpectedEof {
context: "stream identifier",
});
}
if input[pos..pos + 10] != STREAM_IDENTIFIER[..] {
return Err(SnappyError::InvalidStreamIdentifier);
}
pos += 10;
if pos + 4 > input.len() {
return Err(SnappyError::UnexpectedEof {
context: "dict-info chunk header",
});
}
let dict_chunk_type = input[pos];
let dict_chunk_body_len = (input[pos + 1] as usize)
| ((input[pos + 2] as usize) << 8)
| ((input[pos + 3] as usize) << 16);
pos += 4;
if dict_chunk_type != CHUNK_TYPE_OXIARC_DICT {
return Err(SnappyError::CorruptedData {
message: format!(
"expected OxiArc dict-info chunk (0xFE), found {dict_chunk_type:#04x}"
),
});
}
if dict_chunk_body_len < 13 {
return Err(SnappyError::CorruptedData {
message: format!("OxiArc dict-info chunk body too short: {dict_chunk_body_len} bytes"),
});
}
if pos + dict_chunk_body_len > input.len() {
return Err(SnappyError::UnexpectedEof {
context: "dict-info chunk body",
});
}
let dict_body = &input[pos..pos + dict_chunk_body_len];
pos += dict_chunk_body_len;
if &dict_body[..5] != OXIARC_DICT_MAGIC {
return Err(SnappyError::CorruptedData {
message: "OxiArc dict-info magic mismatch".to_string(),
});
}
let stored_crc = u32::from_le_bytes([dict_body[5], dict_body[6], dict_body[7], dict_body[8]]);
let stored_len =
u32::from_le_bytes([dict_body[9], dict_body[10], dict_body[11], dict_body[12]]) as usize;
let computed_crc = crc32c(dict);
if computed_crc != stored_crc {
return Err(SnappyError::ChecksumMismatch {
expected: stored_crc,
computed: computed_crc,
});
}
if dict.len() != stored_len {
return Err(SnappyError::CorruptedData {
message: format!(
"dict length mismatch: frame has {stored_len} bytes, supplied dict is {} bytes",
dict.len()
),
});
}
let mut output = Vec::new();
while pos < input.len() {
if pos + 4 > input.len() {
return Err(SnappyError::UnexpectedEof {
context: "chunk header",
});
}
let chunk_type = input[pos];
let chunk_body_len = (input[pos + 1] as usize)
| ((input[pos + 2] as usize) << 8)
| ((input[pos + 3] as usize) << 16);
pos += 4;
if pos + chunk_body_len > input.len() {
return Err(SnappyError::UnexpectedEof {
context: "chunk body",
});
}
let chunk_body = &input[pos..pos + chunk_body_len];
pos += chunk_body_len;
match chunk_type {
CHUNK_TYPE_COMPRESSED => {
if chunk_body.len() < 4 {
return Err(SnappyError::CorruptedData {
message: "compressed chunk too short for checksum".to_string(),
});
}
let expected_checksum = u32::from_le_bytes([
chunk_body[0],
chunk_body[1],
chunk_body[2],
chunk_body[3],
]);
let compressed_payload = &chunk_body[4..];
let decompressed =
decompress::decompress_block_with_dict(compressed_payload, dict)?;
let computed_checksum = masked_crc32c(&decompressed);
if expected_checksum != computed_checksum {
return Err(SnappyError::ChecksumMismatch {
expected: expected_checksum,
computed: computed_checksum,
});
}
output.extend_from_slice(&decompressed);
}
CHUNK_TYPE_UNCOMPRESSED => {
if chunk_body.len() < 4 {
return Err(SnappyError::CorruptedData {
message: "uncompressed chunk too short for checksum".to_string(),
});
}
let expected_checksum = u32::from_le_bytes([
chunk_body[0],
chunk_body[1],
chunk_body[2],
chunk_body[3],
]);
let raw_data = &chunk_body[4..];
let computed_checksum = masked_crc32c(raw_data);
if expected_checksum != computed_checksum {
return Err(SnappyError::ChecksumMismatch {
expected: expected_checksum,
computed: computed_checksum,
});
}
output.extend_from_slice(raw_data);
}
CHUNK_TYPE_STREAM_ID => {
}
0x02..=0x7F => {
return Err(SnappyError::InvalidChunkType { chunk_type });
}
_ => {
}
}
}
Ok(output)
}
fn write_chunk_header_vec(output: &mut Vec<u8>, chunk_type: u8, data_len: usize) {
output.push(chunk_type);
output.push((data_len & 0xFF) as u8);
output.push(((data_len >> 8) & 0xFF) as u8);
output.push(((data_len >> 16) & 0xFF) as u8);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_frame_roundtrip_small() {
let data = b"Hello, World! This is a test of Snappy framing.";
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
assert_eq!(&compressed[..10], &STREAM_IDENTIFIER);
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data);
}
#[test]
fn test_frame_roundtrip_empty() {
let data = b"";
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data);
}
#[test]
fn test_frame_roundtrip_large() {
let mut data = Vec::with_capacity(100_000);
for i in 0..100_000u32 {
data.push((i % 256) as u8);
}
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data);
}
#[test]
fn test_frame_roundtrip_repeated() {
let data = vec![0xAB; 200_000];
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
assert!(compressed.len() < data.len());
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data);
}
#[test]
fn test_frame_incremental_write() {
let data = b"Hello, this is a test of incremental writing to the encoder.";
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
for chunk in data.chunks(5) {
encoder.write_all(chunk).expect("write should succeed");
}
encoder.finish().expect("finish should succeed");
}
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data);
}
#[test]
fn test_frame_incremental_read() {
let data = b"Test data for incremental reading from the decoder.";
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
let mut buf = [0u8; 7]; loop {
let n = decoder.read(&mut buf).expect("read should succeed");
if n == 0 {
break;
}
output.extend_from_slice(&buf[..n]);
}
assert_eq!(output, data);
}
#[test]
fn test_frame_decoder_invalid_header() {
let bad_data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09];
let mut decoder = FrameDecoder::new(&bad_data[..]);
let mut output = Vec::new();
let result = decoder.read_to_end(&mut output);
assert!(result.is_err());
}
#[test]
fn test_frame_decoder_empty_input() {
let empty: &[u8] = &[];
let mut decoder = FrameDecoder::new(empty);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("empty input should succeed");
assert!(output.is_empty());
}
#[test]
fn test_write_chunk_header() {
let mut buf = Vec::new();
write_chunk_header(&mut buf, 0x00, 0x123456).expect("should succeed");
assert_eq!(buf, vec![0x00, 0x56, 0x34, 0x12]);
}
#[test]
fn test_stream_identifier_constant() {
assert_eq!(STREAM_IDENTIFIER[0], 0xFF); assert_eq!(STREAM_IDENTIFIER[1], 0x06); assert_eq!(STREAM_IDENTIFIER[2], 0x00); assert_eq!(STREAM_IDENTIFIER[3], 0x00); assert_eq!(&STREAM_IDENTIFIER[4..], b"sNaPpY");
}
use oxiarc_core::cancel::CancellationToken;
use oxiarc_core::progress::{ProgressHandle, ProgressSink};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
struct CountingSink {
calls: AtomicUsize,
}
impl CountingSink {
fn new() -> Self {
Self {
calls: AtomicUsize::new(0),
}
}
fn call_count(&self) -> usize {
self.calls.load(Ordering::SeqCst)
}
}
impl ProgressSink for CountingSink {
fn on_progress(&self, _processed: u64, _total: Option<u64>) {
self.calls.fetch_add(1, Ordering::SeqCst);
}
}
struct MonotonicSink {
values: std::sync::Mutex<Vec<u64>>,
}
impl MonotonicSink {
fn new() -> Self {
Self {
values: std::sync::Mutex::new(Vec::new()),
}
}
fn values(&self) -> Vec<u64> {
self.values
.lock()
.unwrap_or_else(|p| p.into_inner())
.clone()
}
}
impl ProgressSink for MonotonicSink {
fn on_progress(&self, processed: u64, _total: Option<u64>) {
let mut guard = self.values.lock().unwrap_or_else(|p| p.into_inner());
guard.push(processed);
}
}
#[test]
fn test_progress_counting_sink() {
let data: Vec<u8> = (0..131_072u64)
.map(|i| (i.wrapping_mul(6_364_136_223_846_793_005_u64) >> 56) as u8)
.collect();
let enc_sink = Arc::new(CountingSink::new());
let enc_handle: ProgressHandle = enc_sink.clone();
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed).with_progress(enc_handle);
encoder
.write_all(&data)
.expect("encode write should succeed");
encoder.finish().expect("encode finish should succeed");
}
assert!(
enc_sink.call_count() >= 2,
"encoder on_progress called {} times, expected >= 2",
enc_sink.call_count()
);
let dec_sink = Arc::new(MonotonicSink::new());
let dec_handle: ProgressHandle = dec_sink.clone();
let mut decoder = FrameDecoder::new(&compressed[..]).with_progress(dec_handle);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("decode should succeed");
assert_eq!(output, data, "decoded data must match original");
let values = dec_sink.values();
assert!(
values.len() >= 2,
"decoder on_progress called {} times, expected >= 2",
values.len()
);
for window in values.windows(2) {
assert!(
window[1] >= window[0],
"progress values not monotonic: {} followed by {}",
window[0],
window[1]
);
}
}
#[test]
fn test_cancellation_encoder_pre_cancelled() {
let data: Vec<u8> = vec![0xBEu8; 131_072];
let token = CancellationToken::new();
token.cancel();
let mut compressed = Vec::new();
let mut encoder = FrameEncoder::new(&mut compressed).with_cancel(token);
let write_result = encoder.write_all(&data);
let finish_result = encoder.finish();
let triggered = write_result.is_err() || finish_result.is_err();
assert!(
triggered,
"expected a cancellation error but neither write nor finish returned Err"
);
}
#[test]
fn test_cancellation_decoder_pre_cancelled() {
let data: Vec<u8> = vec![0xCAu8; 131_072];
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder
.write_all(&data)
.expect("encode write should succeed");
encoder.finish().expect("encode finish should succeed");
}
let token = CancellationToken::new();
token.cancel();
let mut decoder = FrameDecoder::new(&compressed[..]).with_cancel(token);
let mut output = Vec::new();
let result = decoder.read_to_end(&mut output);
assert!(result.is_err(), "expected cancellation error from decoder");
}
#[test]
fn test_frame_max_size_chunk() {
let data: Vec<u8> = (0..MAX_UNCOMPRESSED_CHUNK_SIZE)
.map(|i| (i % 251) as u8)
.collect();
assert_eq!(data.len(), MAX_UNCOMPRESSED_CHUNK_SIZE);
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data, "max-size chunk roundtrip failed");
}
#[test]
fn test_frame_just_over_max_chunk() {
let size = MAX_UNCOMPRESSED_CHUNK_SIZE + 1;
let data: Vec<u8> = (0..size).map(|i| (i % 253) as u8).collect();
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let payload = &compressed[10..];
let mut data_chunk_count = 0usize;
let mut pos = 0usize;
while pos + 4 <= payload.len() {
let chunk_type = payload[pos];
let chunk_len = (payload[pos + 1] as usize)
| ((payload[pos + 2] as usize) << 8)
| ((payload[pos + 3] as usize) << 16);
if chunk_type == CHUNK_TYPE_COMPRESSED || chunk_type == CHUNK_TYPE_UNCOMPRESSED {
data_chunk_count += 1;
}
pos += 4 + chunk_len;
}
assert_eq!(
data_chunk_count, 2,
"expected exactly 2 data chunks for 65537-byte input, got {data_chunk_count}"
);
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data, "just-over-max chunk roundtrip failed");
}
#[test]
fn test_block_max_compress_len() {
use crate::compress::max_compress_len;
let max_len_65536 = max_compress_len(MAX_UNCOMPRESSED_CHUNK_SIZE);
assert!(
max_len_65536 >= MAX_UNCOMPRESSED_CHUNK_SIZE,
"max_compress_len(65536) = {max_len_65536}, expected >= 65536"
);
let max_len_0 = max_compress_len(0);
assert!(
max_len_0 >= 1,
"max_compress_len(0) = {max_len_0}, expected >= 1"
);
}
#[test]
fn test_decompress_truncated_frame() {
let data = vec![b'X'; 1000];
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let truncated_len = compressed.len() / 2;
let truncated = &compressed[..truncated_len];
let mut decoder = FrameDecoder::new(truncated);
let mut output = Vec::new();
let result = decoder.read_to_end(&mut output);
assert!(
result.is_err(),
"expected error on truncated input, but read_to_end succeeded"
);
}
#[test]
fn test_decompress_corrupt_crc() {
let data = vec![b'A'; 500];
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let crc_offset = 14;
assert!(
crc_offset < compressed.len(),
"compressed output is too short to contain a CRC field"
);
let mut corrupt = compressed.clone();
corrupt[crc_offset] ^= 0xFF;
let mut decoder = FrameDecoder::new(&corrupt[..]);
let mut output = Vec::new();
let result = decoder.read_to_end(&mut output);
assert!(
result.is_err(),
"expected checksum error on corrupt CRC, but read_to_end succeeded"
);
}
#[test]
fn test_compress_all_zeros() {
let data = vec![0u8; MAX_UNCOMPRESSED_CHUNK_SIZE];
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
assert!(
compressed.len() < data.len() / 4,
"expected compressed output much smaller than {}, got {}",
data.len(),
compressed.len()
);
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data, "all-zeros roundtrip failed");
}
#[test]
fn test_compress_all_ones() {
let data = vec![0xFFu8; MAX_UNCOMPRESSED_CHUNK_SIZE];
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let mut decoder = FrameDecoder::new(&compressed[..]);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("read should succeed");
assert_eq!(output, data, "all-0xFF roundtrip failed");
}
#[test]
fn test_frame_decoder_incremental_max_size() {
use std::io::BufReader;
let data: Vec<u8> = (0..MAX_UNCOMPRESSED_CHUNK_SIZE)
.map(|i| (i % 199) as u8)
.collect();
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder.write_all(&data).expect("write should succeed");
encoder.finish().expect("finish should succeed");
}
let buf_reader = BufReader::with_capacity(1, &compressed[..]);
let mut decoder = FrameDecoder::new(buf_reader);
let mut output = Vec::new();
decoder
.read_to_end(&mut output)
.expect("incremental read should succeed");
assert_eq!(
output, data,
"incremental max-size chunk decoder roundtrip failed"
);
}
}