use crate::error::CodecError;
const DEFAULT_BLOCK_SIZE: usize = 4096;
pub fn encode(data: &[u8]) -> Vec<u8> {
encode_with_block_size(data, DEFAULT_BLOCK_SIZE)
}
pub fn encode_with_block_size(data: &[u8], block_size: usize) -> Vec<u8> {
let block_size = block_size.max(64); let block_count = if data.is_empty() {
0
} else {
data.len().div_ceil(block_size)
};
let mut out = Vec::with_capacity(12 + block_count * 4 + data.len());
out.extend_from_slice(&(data.len() as u32).to_le_bytes());
out.extend_from_slice(&(block_size as u32).to_le_bytes());
out.extend_from_slice(&(block_count as u32).to_le_bytes());
let lengths_offset = out.len();
out.resize(lengths_offset + block_count * 4, 0);
for (i, chunk) in data.chunks(block_size).enumerate() {
let compressed = lz4_flex::compress_prepend_size(chunk);
let compressed_len = compressed.len() as u32;
let table_pos = lengths_offset + i * 4;
out[table_pos..table_pos + 4].copy_from_slice(&compressed_len.to_le_bytes());
out.extend_from_slice(&compressed);
}
out
}
pub fn decode(data: &[u8]) -> Result<Vec<u8>, CodecError> {
let header = read_header(data)?;
if header.block_count == 0 {
return Ok(Vec::new());
}
let mut result = Vec::with_capacity(header.uncompressed_size);
let mut block_offset = header.data_offset;
for i in 0..header.block_count {
let compressed_len = header.block_lengths[i];
let block_end = block_offset + compressed_len;
if block_end > data.len() {
return Err(CodecError::Truncated {
expected: block_end,
actual: data.len(),
});
}
let block_data = &data[block_offset..block_end];
let decompressed = lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
CodecError::DecompressFailed {
detail: format!("LZ4 block {i}: {e}"),
}
})?;
result.extend_from_slice(&decompressed);
block_offset = block_end;
}
if result.len() != header.uncompressed_size {
return Err(CodecError::Corrupt {
detail: format!(
"uncompressed size mismatch: header says {}, got {}",
header.uncompressed_size,
result.len()
),
});
}
Ok(result)
}
pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
let header = read_header(data)?;
if block_idx >= header.block_count {
return Err(CodecError::Corrupt {
detail: format!(
"block index {block_idx} out of range (block_count={})",
header.block_count
),
});
}
let mut block_offset = header.data_offset;
for i in 0..block_idx {
block_offset += header.block_lengths[i];
}
let compressed_len = header.block_lengths[block_idx];
let block_end = block_offset + compressed_len;
if block_end > data.len() {
return Err(CodecError::Truncated {
expected: block_end,
actual: data.len(),
});
}
let block_data = &data[block_offset..block_end];
lz4_flex::decompress_size_prepended(block_data).map_err(|e| CodecError::DecompressFailed {
detail: format!("LZ4 block {block_idx}: {e}"),
})
}
struct Lz4Header {
uncompressed_size: usize,
block_count: usize,
block_lengths: Vec<usize>,
data_offset: usize,
}
fn read_header(data: &[u8]) -> Result<Lz4Header, CodecError> {
if data.len() < 12 {
return Err(CodecError::Truncated {
expected: 12,
actual: data.len(),
});
}
let uncompressed_size = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let _block_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
let block_count = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
let lengths_end = 12 + block_count * 4;
if data.len() < lengths_end {
return Err(CodecError::Truncated {
expected: lengths_end,
actual: data.len(),
});
}
let block_lengths: Vec<usize> = data[12..lengths_end]
.chunks_exact(4)
.map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as usize)
.collect();
Ok(Lz4Header {
uncompressed_size,
block_count,
block_lengths,
data_offset: lengths_end,
})
}
pub struct Lz4Encoder {
buf: Vec<u8>,
block_size: usize,
}
impl Lz4Encoder {
pub fn new() -> Self {
Self {
buf: Vec::with_capacity(4096),
block_size: DEFAULT_BLOCK_SIZE,
}
}
pub fn with_block_size(block_size: usize) -> Self {
Self {
buf: Vec::with_capacity(block_size),
block_size: block_size.max(64),
}
}
pub fn push(&mut self, data: &[u8]) {
self.buf.extend_from_slice(data);
}
pub fn len(&self) -> usize {
self.buf.len()
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
pub fn finish(self) -> Vec<u8> {
encode_with_block_size(&self.buf, self.block_size)
}
}
impl Default for Lz4Encoder {
fn default() -> Self {
Self::new()
}
}
pub struct Lz4Decoder;
impl Lz4Decoder {
pub fn decode_all(data: &[u8]) -> Result<Vec<u8>, CodecError> {
decode(data)
}
pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
decode_block(data, block_idx)
}
pub fn block_count(data: &[u8]) -> Result<usize, CodecError> {
let header = read_header(data)?;
Ok(header.block_count)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_roundtrip() {
let encoded = encode(&[]);
let decoded = decode(&encoded).unwrap();
assert!(decoded.is_empty());
}
#[test]
fn small_data_roundtrip() {
let data = b"hello world, this is a log message";
let encoded = encode(data);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn large_data_multiple_blocks() {
let mut data = Vec::new();
for i in 0..1000 {
let line = format!(
"2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}\n",
i % 60,
10000 + i,
i * 3 + 1
);
data.extend_from_slice(line.as_bytes());
}
let encoded = encode(&data);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, data);
let ratio = data.len() as f64 / encoded.len() as f64;
assert!(
ratio > 2.0,
"expected >2x compression for structured logs, got {ratio:.1}x"
);
}
#[test]
fn random_access_block() {
let data: Vec<u8> = (0..20000).map(|i| (i % 256) as u8).collect();
let block_size = 4096;
let encoded = encode_with_block_size(&data, block_size);
let block_count = Lz4Decoder::block_count(&encoded).unwrap();
assert_eq!(block_count, data.len().div_ceil(block_size));
let mut reassembled = Vec::new();
for i in 0..block_count {
let block = decode_block(&encoded, i).unwrap();
reassembled.extend_from_slice(&block);
}
assert_eq!(reassembled, data);
}
#[test]
fn out_of_range_block_index() {
let data = b"some data here";
let encoded = encode(data);
assert!(decode_block(&encoded, 999).is_err());
}
#[test]
fn compressible_log_data() {
let line = "2024-01-15 ERROR database connection timeout host=db-prod-01 retry=3\n";
let data: Vec<u8> = line.as_bytes().repeat(500);
let encoded = encode(&data);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, data);
let ratio = data.len() as f64 / encoded.len() as f64;
assert!(
ratio > 3.0,
"highly repetitive logs should compress >3x, got {ratio:.1}x"
);
}
#[test]
fn incompressible_data() {
let mut data = vec![0u8; 10_000];
let mut rng: u64 = 9999;
for byte in &mut data {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
*byte = (rng >> 33) as u8;
}
let encoded = encode(&data);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn streaming_encoder() {
let parts = [b"hello ".as_ref(), b"world".as_ref(), b" test".as_ref()];
let full: Vec<u8> = parts.iter().flat_map(|p| p.iter().copied()).collect();
let mut enc = Lz4Encoder::new();
for part in &parts {
enc.push(part);
}
let encoded = enc.finish();
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, full);
}
#[test]
fn custom_block_size() {
let data = vec![42u8; 10_000];
let encoded = encode_with_block_size(&data, 1024);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, data);
let block_count = Lz4Decoder::block_count(&encoded).unwrap();
assert_eq!(block_count, 10); }
#[test]
fn truncated_input_errors() {
assert!(decode(&[]).is_err());
assert!(decode(&[0; 8]).is_err()); }
}