use std::io::{Read, Write};
use bytes::Bytes;
use lz4_flex::frame::{BlockMode, BlockSize, FrameDecoder, FrameEncoder, FrameInfo};
use crate::CompressionError;
fn frame_info() -> FrameInfo {
FrameInfo::new()
.block_size(BlockSize::Max64KB)
.block_mode(BlockMode::Independent)
.block_checksums(false)
.content_checksum(false)
}
pub fn compress(data: &[u8]) -> Result<Bytes, CompressionError> {
let mut encoder = FrameEncoder::with_frame_info(frame_info(), Vec::with_capacity(data.len()));
encoder.write_all(data)?;
let out = encoder
.finish()
.map_err(|e| CompressionError::InvalidData(format!("lz4 finish: {e}")))?;
Ok(Bytes::from(out))
}
pub fn decompress(data: &[u8], max_output: usize) -> Result<Bytes, CompressionError> {
if data.is_empty() {
return Err(CompressionError::InvalidData("empty lz4 payload".into()));
}
let decoder = FrameDecoder::new(data);
let mut limited = decoder.take((max_output as u64).saturating_add(1));
let mut out = Vec::with_capacity(data.len().saturating_mul(2).min(max_output));
limited
.read_to_end(&mut out)
.map_err(|e| CompressionError::InvalidData(format!("lz4 decode: {e}")))?;
if out.len() > max_output {
return Err(CompressionError::TooLarge { limit: max_output });
}
Ok(Bytes::from(out))
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
const HELLO: &[u8] = b"hello kafka, this is a moderately repetitive payload to compress";
const BIG_CAP: usize = 256 * 1024 * 1024;
#[test]
fn roundtrip() {
let z = compress(HELLO).unwrap();
let back = decompress(&z, BIG_CAP).unwrap();
assert!(back.as_ref() == HELLO);
}
#[test]
fn decompress_empty_rejected() {
assert!(matches!(
decompress(b"", BIG_CAP),
Err(CompressionError::InvalidData(_))
));
}
#[test]
fn decompress_garbage_rejected() {
assert!(matches!(
decompress(b"this is not lz4", BIG_CAP),
Err(CompressionError::InvalidData(_))
));
}
#[test]
fn larger_payload_roundtrips() {
let big = vec![0xABu8; 128 * 1024]; let z = compress(&big).unwrap();
let back = decompress(&z, BIG_CAP).unwrap();
assert!(back.as_ref() == big.as_slice());
}
#[test]
fn decompression_bomb_rejected() {
let bomb = vec![0u8; 64 * 1024 * 1024];
let z = compress(&bomb).unwrap();
assert!(matches!(
decompress(&z, 1024),
Err(CompressionError::TooLarge { limit: 1024 })
));
let back = decompress(&z, BIG_CAP).unwrap();
assert!(back.len() == bomb.len());
}
}