use bytes::{BufMut, Bytes, BytesMut};
use std::io::{Read, Write};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use thiserror::Error;
#[cfg(feature = "compression")]
use snap;
#[derive(Debug, Error)]
pub enum CompressionError {
#[error("LZ4 compression failed: {0}")]
Lz4Error(String),
#[error("Zstd compression failed: {0}")]
ZstdError(String),
#[error("Snappy compression failed: {0}")]
SnappyError(String),
#[error("Invalid compression header")]
InvalidHeader,
#[error("Decompression buffer too small: need {needed}, have {available}")]
BufferTooSmall { needed: usize, available: usize },
#[error("Unknown compression algorithm: {0}")]
UnknownAlgorithm(u8),
#[error("Checksum mismatch: expected {expected:#010x}, got {actual:#010x}")]
ChecksumMismatch { expected: u32, actual: u32 },
#[error("Data corruption detected")]
DataCorruption,
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
pub type Result<T> = std::result::Result<T, CompressionError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
#[repr(u8)]
pub enum CompressionAlgorithm {
#[default]
None = 0,
Lz4 = 1,
Zstd = 2,
Snappy = 3,
}
impl CompressionAlgorithm {
pub const ALL: [CompressionAlgorithm; 4] = [
CompressionAlgorithm::None,
CompressionAlgorithm::Lz4,
CompressionAlgorithm::Zstd,
CompressionAlgorithm::Snappy,
];
pub fn from_flags(flags: u8) -> Result<Self> {
match flags & 0x07 {
0 => Ok(Self::None),
1 => Ok(Self::Lz4),
2 => Ok(Self::Zstd),
3 => Ok(Self::Snappy),
n => Err(CompressionError::UnknownAlgorithm(n)),
}
}
pub fn to_flags(self, has_size: bool, has_checksum: bool) -> u8 {
let mut flags = self as u8;
if has_size {
flags |= 0x10; }
if has_checksum {
flags |= 0x20; }
flags
}
pub fn name(&self) -> &'static str {
match self {
Self::None => "none",
Self::Lz4 => "lz4",
Self::Zstd => "zstd",
Self::Snappy => "snappy",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"none" | "uncompressed" => Some(Self::None),
"lz4" => Some(Self::Lz4),
"zstd" | "zstandard" => Some(Self::Zstd),
"snappy" => Some(Self::Snappy),
_ => None,
}
}
pub fn kafka_type_id(&self) -> i8 {
match self {
Self::None => 0,
Self::Lz4 => 3, Self::Zstd => 4, Self::Snappy => 2, }
}
pub fn from_kafka_type_id(id: i8) -> Option<Self> {
match id {
0 => Some(Self::None),
2 => Some(Self::Snappy),
3 => Some(Self::Lz4),
4 => Some(Self::Zstd),
_ => None,
}
}
pub fn is_compressed(&self) -> bool {
!matches!(self, Self::None)
}
}
impl std::fmt::Display for CompressionAlgorithm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl std::str::FromStr for CompressionAlgorithm {
type Err = CompressionError;
fn from_str(s: &str) -> Result<Self> {
Self::parse(s).ok_or(CompressionError::UnknownAlgorithm(0))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CompressionLevel {
Fast,
#[default]
Default,
Best,
Custom(i32),
}
impl CompressionLevel {
pub fn lz4_acceleration(&self) -> i32 {
match self {
Self::Fast => 65537, Self::Default => 1, Self::Best => 1, Self::Custom(n) => *n,
}
}
pub fn zstd_level(&self) -> i32 {
match self {
Self::Fast => 1,
Self::Default => 3, Self::Best => 19, Self::Custom(n) => n.clamp(&1, &22).to_owned(),
}
}
pub fn name(&self) -> &'static str {
match self {
Self::Fast => "fast",
Self::Default => "default",
Self::Best => "best",
Self::Custom(_) => "custom",
}
}
}
#[derive(Debug, Clone)]
pub struct CompressionConfig {
pub algorithm: CompressionAlgorithm,
pub level: CompressionLevel,
pub min_size: usize,
pub ratio_threshold: f32,
pub adaptive: bool,
pub checksum: bool,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
algorithm: CompressionAlgorithm::Lz4,
level: CompressionLevel::Default,
min_size: 64, ratio_threshold: 0.95, adaptive: true,
checksum: false, }
}
}
impl CompressionConfig {
pub fn builder() -> CompressionConfigBuilder {
CompressionConfigBuilder::default()
}
pub fn low_latency() -> Self {
Self {
algorithm: CompressionAlgorithm::Lz4,
level: CompressionLevel::Fast,
min_size: 128,
ratio_threshold: 0.90,
adaptive: false,
checksum: false,
}
}
pub fn storage() -> Self {
Self {
algorithm: CompressionAlgorithm::Zstd,
level: CompressionLevel::Default,
min_size: 32,
ratio_threshold: 0.98,
adaptive: true,
checksum: true, }
}
pub fn network() -> Self {
Self {
algorithm: CompressionAlgorithm::Zstd,
level: CompressionLevel::Fast,
min_size: 64,
ratio_threshold: 0.95,
adaptive: true,
checksum: false,
}
}
pub fn kafka_compatible() -> Self {
Self {
algorithm: CompressionAlgorithm::Snappy,
level: CompressionLevel::Default,
min_size: 64,
ratio_threshold: 0.95,
adaptive: false,
checksum: false, }
}
}
#[derive(Debug, Default)]
pub struct CompressionConfigBuilder {
config: CompressionConfig,
}
impl CompressionConfigBuilder {
pub fn algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
self.config.algorithm = algorithm;
self
}
pub fn level(mut self, level: CompressionLevel) -> Self {
self.config.level = level;
self
}
pub fn min_size(mut self, size: usize) -> Self {
self.config.min_size = size;
self
}
pub fn ratio_threshold(mut self, threshold: f32) -> Self {
self.config.ratio_threshold = threshold.clamp(0.0, 1.0);
self
}
pub fn adaptive(mut self, enabled: bool) -> Self {
self.config.adaptive = enabled;
self
}
pub fn checksum(mut self, enabled: bool) -> Self {
self.config.checksum = enabled;
self
}
pub fn build(self) -> CompressionConfig {
self.config
}
}
fn compress_lz4(data: &[u8], level: CompressionLevel) -> Result<Vec<u8>> {
let mode = match level {
CompressionLevel::Fast => lz4::block::CompressionMode::FAST(65537),
CompressionLevel::Default => lz4::block::CompressionMode::DEFAULT,
CompressionLevel::Best => lz4::block::CompressionMode::HIGHCOMPRESSION(9),
CompressionLevel::Custom(n) if n > 0 => lz4::block::CompressionMode::FAST(n),
CompressionLevel::Custom(n) => lz4::block::CompressionMode::HIGHCOMPRESSION(-n),
};
lz4::block::compress(data, Some(mode), false)
.map_err(|e| CompressionError::Lz4Error(e.to_string()))
}
fn decompress_lz4(data: &[u8], original_size: Option<usize>) -> Result<Vec<u8>> {
let uncompressed_size = original_size.unwrap_or(data.len() * 4);
lz4::block::decompress(data, Some(uncompressed_size as i32))
.map_err(|e| CompressionError::Lz4Error(e.to_string()))
}
fn compress_zstd(data: &[u8], level: CompressionLevel) -> Result<Vec<u8>> {
let level = level.zstd_level();
zstd::bulk::compress(data, level).map_err(|e| CompressionError::ZstdError(e.to_string()))
}
fn decompress_zstd(data: &[u8]) -> Result<Vec<u8>> {
zstd::bulk::decompress(data, 16 * 1024 * 1024) .map_err(|e| CompressionError::ZstdError(e.to_string()))
}
fn compress_snappy(data: &[u8]) -> Result<Vec<u8>> {
let mut encoder = snap::raw::Encoder::new();
encoder
.compress_vec(data)
.map_err(|e| CompressionError::SnappyError(e.to_string()))
}
fn decompress_snappy(data: &[u8]) -> Result<Vec<u8>> {
let mut decoder = snap::raw::Decoder::new();
decoder
.decompress_vec(data)
.map_err(|e| CompressionError::SnappyError(e.to_string()))
}
#[inline]
fn crc32_checksum(data: &[u8]) -> u32 {
crc32fast::hash(data)
}
#[derive(Debug, Clone)]
pub struct Compressor {
config: CompressionConfig,
stats: Arc<CompressionStatsCollector>,
}
impl Compressor {
pub fn new() -> Self {
Self {
config: CompressionConfig::default(),
stats: Arc::new(CompressionStatsCollector::new()),
}
}
pub fn with_config(config: CompressionConfig) -> Self {
Self {
config,
stats: Arc::new(CompressionStatsCollector::new()),
}
}
pub fn config(&self) -> &CompressionConfig {
&self.config
}
pub fn stats(&self) -> CompressionStatsSnapshot {
self.stats.snapshot()
}
pub fn compress(&self, data: &[u8]) -> Result<Bytes> {
if data.len() < self.config.min_size {
self.stats.record_skipped(data.len());
return Ok(self.encode_uncompressed(data));
}
let algorithm = if self.config.adaptive {
self.select_algorithm(data)
} else {
self.config.algorithm
};
let compressed = match algorithm {
CompressionAlgorithm::None => {
self.stats.record_skipped(data.len());
return Ok(self.encode_uncompressed(data));
}
CompressionAlgorithm::Lz4 => compress_lz4(data, self.config.level)?,
CompressionAlgorithm::Zstd => compress_zstd(data, self.config.level)?,
CompressionAlgorithm::Snappy => compress_snappy(data)?,
};
let ratio = compressed.len() as f32 / data.len() as f32;
if ratio > self.config.ratio_threshold {
self.stats.record_skipped(data.len());
return Ok(self.encode_uncompressed(data));
}
self.stats
.record_compression(algorithm, data.len(), compressed.len());
self.encode_compressed(algorithm, data.len(), &compressed, data)
}
pub fn compress_with(&self, data: &[u8], algorithm: CompressionAlgorithm) -> Result<Bytes> {
if algorithm == CompressionAlgorithm::None || data.len() < self.config.min_size {
self.stats.record_skipped(data.len());
return Ok(self.encode_uncompressed(data));
}
let compressed = match algorithm {
CompressionAlgorithm::None => unreachable!(),
CompressionAlgorithm::Lz4 => compress_lz4(data, self.config.level)?,
CompressionAlgorithm::Zstd => compress_zstd(data, self.config.level)?,
CompressionAlgorithm::Snappy => compress_snappy(data)?,
};
self.stats
.record_compression(algorithm, data.len(), compressed.len());
self.encode_compressed(algorithm, data.len(), &compressed, data)
}
pub fn decompress(&self, data: &[u8]) -> Result<Bytes> {
if data.is_empty() {
return Err(CompressionError::InvalidHeader);
}
let flags = data[0];
let algorithm = CompressionAlgorithm::from_flags(flags)?;
let has_size = (flags & 0x10) != 0;
let has_checksum = (flags & 0x20) != 0;
let mut offset = 1;
let original_size = if has_size {
if data.len() < offset + 4 {
return Err(CompressionError::InvalidHeader);
}
let size_bytes: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
offset += 4;
Some(u32::from_le_bytes(size_bytes) as usize)
} else {
None
};
let expected_checksum = if has_checksum {
if data.len() < offset + 4 {
return Err(CompressionError::InvalidHeader);
}
let checksum_bytes: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
offset += 4;
Some(u32::from_le_bytes(checksum_bytes))
} else {
None
};
let payload = &data[offset..];
let decompressed = match algorithm {
CompressionAlgorithm::None => payload.to_vec(),
CompressionAlgorithm::Lz4 => decompress_lz4(payload, original_size)?,
CompressionAlgorithm::Zstd => decompress_zstd(payload)?,
CompressionAlgorithm::Snappy => decompress_snappy(payload)?,
};
if let Some(expected) = expected_checksum {
let actual = crc32_checksum(&decompressed);
if actual != expected {
return Err(CompressionError::ChecksumMismatch { expected, actual });
}
}
self.stats
.record_decompression(algorithm, payload.len(), decompressed.len());
Ok(Bytes::from(decompressed))
}
pub fn analyze(&self, data: &[u8]) -> CompressionAnalysis {
let lz4_result = compress_lz4(data, self.config.level);
let zstd_result = compress_zstd(data, self.config.level);
let snappy_result = compress_snappy(data);
CompressionAnalysis {
original_size: data.len(),
lz4_size: lz4_result.as_ref().map(|v| v.len()).ok(),
zstd_size: zstd_result.as_ref().map(|v| v.len()).ok(),
snappy_size: snappy_result.as_ref().map(|v| v.len()).ok(),
entropy: estimate_entropy(data),
recommended: self.select_algorithm(data),
}
}
fn select_algorithm(&self, data: &[u8]) -> CompressionAlgorithm {
if data.len() < self.config.min_size {
return CompressionAlgorithm::None;
}
let entropy = estimate_entropy(data);
if entropy > 7.5 {
return CompressionAlgorithm::None;
}
if entropy < 4.5 || data.len() > 64 * 1024 {
return CompressionAlgorithm::Zstd;
}
if entropy < 6.0 {
return CompressionAlgorithm::Snappy;
}
CompressionAlgorithm::Lz4
}
fn encode_uncompressed(&self, data: &[u8]) -> Bytes {
let has_checksum = self.config.checksum;
let header_size = 1 + if has_checksum { 4 } else { 0 };
let mut buf = BytesMut::with_capacity(header_size + data.len());
buf.put_u8(CompressionAlgorithm::None.to_flags(false, has_checksum));
if has_checksum {
buf.put_u32_le(crc32_checksum(data));
}
buf.put_slice(data);
buf.freeze()
}
fn encode_compressed(
&self,
algorithm: CompressionAlgorithm,
original_size: usize,
compressed: &[u8],
original_data: &[u8],
) -> Result<Bytes> {
let has_checksum = self.config.checksum;
let header_size = 5 + if has_checksum { 4 } else { 0 };
let mut buf = BytesMut::with_capacity(header_size + compressed.len());
buf.put_u8(algorithm.to_flags(true, has_checksum));
buf.put_u32_le(original_size as u32);
if has_checksum {
buf.put_u32_le(crc32_checksum(original_data));
}
buf.put_slice(compressed);
Ok(buf.freeze())
}
}
impl Default for Compressor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct CompressionStatsCollector {
lz4_compressed_bytes: AtomicU64,
lz4_original_bytes: AtomicU64,
lz4_operations: AtomicU64,
zstd_compressed_bytes: AtomicU64,
zstd_original_bytes: AtomicU64,
zstd_operations: AtomicU64,
snappy_compressed_bytes: AtomicU64,
snappy_original_bytes: AtomicU64,
snappy_operations: AtomicU64,
decompressed_bytes: AtomicU64,
decompress_operations: AtomicU64,
skipped_bytes: AtomicU64,
skipped_operations: AtomicU64,
}
impl CompressionStatsCollector {
pub fn new() -> Self {
Self::default()
}
pub fn record_compression(
&self,
algorithm: CompressionAlgorithm,
original: usize,
compressed: usize,
) {
match algorithm {
CompressionAlgorithm::Lz4 => {
self.lz4_original_bytes
.fetch_add(original as u64, Ordering::Relaxed);
self.lz4_compressed_bytes
.fetch_add(compressed as u64, Ordering::Relaxed);
self.lz4_operations.fetch_add(1, Ordering::Relaxed);
}
CompressionAlgorithm::Zstd => {
self.zstd_original_bytes
.fetch_add(original as u64, Ordering::Relaxed);
self.zstd_compressed_bytes
.fetch_add(compressed as u64, Ordering::Relaxed);
self.zstd_operations.fetch_add(1, Ordering::Relaxed);
}
CompressionAlgorithm::Snappy => {
self.snappy_original_bytes
.fetch_add(original as u64, Ordering::Relaxed);
self.snappy_compressed_bytes
.fetch_add(compressed as u64, Ordering::Relaxed);
self.snappy_operations.fetch_add(1, Ordering::Relaxed);
}
CompressionAlgorithm::None => {}
}
}
pub fn record_decompression(
&self,
_algorithm: CompressionAlgorithm,
_compressed: usize,
decompressed: usize,
) {
self.decompressed_bytes
.fetch_add(decompressed as u64, Ordering::Relaxed);
self.decompress_operations.fetch_add(1, Ordering::Relaxed);
}
pub fn record_skipped(&self, size: usize) {
self.skipped_bytes.fetch_add(size as u64, Ordering::Relaxed);
self.skipped_operations.fetch_add(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> CompressionStatsSnapshot {
CompressionStatsSnapshot {
lz4_compressed_bytes: self.lz4_compressed_bytes.load(Ordering::Relaxed),
lz4_original_bytes: self.lz4_original_bytes.load(Ordering::Relaxed),
lz4_operations: self.lz4_operations.load(Ordering::Relaxed),
zstd_compressed_bytes: self.zstd_compressed_bytes.load(Ordering::Relaxed),
zstd_original_bytes: self.zstd_original_bytes.load(Ordering::Relaxed),
zstd_operations: self.zstd_operations.load(Ordering::Relaxed),
snappy_compressed_bytes: self.snappy_compressed_bytes.load(Ordering::Relaxed),
snappy_original_bytes: self.snappy_original_bytes.load(Ordering::Relaxed),
snappy_operations: self.snappy_operations.load(Ordering::Relaxed),
decompressed_bytes: self.decompressed_bytes.load(Ordering::Relaxed),
decompress_operations: self.decompress_operations.load(Ordering::Relaxed),
skipped_bytes: self.skipped_bytes.load(Ordering::Relaxed),
skipped_operations: self.skipped_operations.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CompressionStatsSnapshot {
pub lz4_compressed_bytes: u64,
pub lz4_original_bytes: u64,
pub lz4_operations: u64,
pub zstd_compressed_bytes: u64,
pub zstd_original_bytes: u64,
pub zstd_operations: u64,
pub snappy_compressed_bytes: u64,
pub snappy_original_bytes: u64,
pub snappy_operations: u64,
pub decompressed_bytes: u64,
pub decompress_operations: u64,
pub skipped_bytes: u64,
pub skipped_operations: u64,
}
impl CompressionStatsSnapshot {
pub fn lz4_ratio(&self) -> Option<f64> {
if self.lz4_original_bytes > 0 {
Some(self.lz4_compressed_bytes as f64 / self.lz4_original_bytes as f64)
} else {
None
}
}
pub fn zstd_ratio(&self) -> Option<f64> {
if self.zstd_original_bytes > 0 {
Some(self.zstd_compressed_bytes as f64 / self.zstd_original_bytes as f64)
} else {
None
}
}
pub fn snappy_ratio(&self) -> Option<f64> {
if self.snappy_original_bytes > 0 {
Some(self.snappy_compressed_bytes as f64 / self.snappy_original_bytes as f64)
} else {
None
}
}
pub fn bytes_saved(&self) -> u64 {
let original =
self.lz4_original_bytes + self.zstd_original_bytes + self.snappy_original_bytes;
let compressed =
self.lz4_compressed_bytes + self.zstd_compressed_bytes + self.snappy_compressed_bytes;
original.saturating_sub(compressed)
}
}
#[derive(Debug, Clone)]
pub struct CompressionAnalysis {
pub original_size: usize,
pub lz4_size: Option<usize>,
pub zstd_size: Option<usize>,
pub snappy_size: Option<usize>,
pub entropy: f32,
pub recommended: CompressionAlgorithm,
}
impl CompressionAnalysis {
pub fn lz4_ratio(&self) -> Option<f32> {
self.lz4_size.map(|s| s as f32 / self.original_size as f32)
}
pub fn zstd_ratio(&self) -> Option<f32> {
self.zstd_size.map(|s| s as f32 / self.original_size as f32)
}
pub fn snappy_ratio(&self) -> Option<f32> {
self.snappy_size
.map(|s| s as f32 / self.original_size as f32)
}
pub fn best_size(&self) -> Option<usize> {
[self.lz4_size, self.zstd_size, self.snappy_size]
.into_iter()
.flatten()
.min()
}
pub fn best_algorithm(&self) -> CompressionAlgorithm {
let mut best = (CompressionAlgorithm::None, self.original_size);
if let Some(size) = self.lz4_size {
if size < best.1 {
best = (CompressionAlgorithm::Lz4, size);
}
}
if let Some(size) = self.zstd_size {
if size < best.1 {
best = (CompressionAlgorithm::Zstd, size);
}
}
if let Some(size) = self.snappy_size {
if size < best.1 {
best = (CompressionAlgorithm::Snappy, size);
}
}
best.0
}
}
fn estimate_entropy(data: &[u8]) -> f32 {
if data.is_empty() {
return 0.0;
}
let sample_size = data.len().min(4096);
let sample = &data[..sample_size];
let mut freq = [0u32; 256];
for &byte in sample {
freq[byte as usize] += 1;
}
let len = sample.len() as f32;
let mut entropy = 0.0f32;
for count in freq.iter() {
if *count > 0 {
let p = *count as f32 / len;
entropy -= p * p.log2();
}
}
entropy
}
pub struct StreamingCompressor<W: Write> {
encoder: StreamingEncoder<W>,
}
enum StreamingEncoder<W: Write> {
Lz4(lz4::Encoder<W>),
Zstd(zstd::Encoder<'static, W>),
Snappy(Box<snap::write::FrameEncoder<W>>),
None(W),
}
impl<W: Write> StreamingCompressor<W> {
pub fn new(
writer: W,
algorithm: CompressionAlgorithm,
level: CompressionLevel,
) -> Result<Self> {
let encoder = match algorithm {
CompressionAlgorithm::None => StreamingEncoder::None(writer),
CompressionAlgorithm::Lz4 => {
let encoder = lz4::EncoderBuilder::new()
.level(level.lz4_acceleration().try_into().unwrap_or(4))
.build(writer)
.map_err(|e| CompressionError::Lz4Error(e.to_string()))?;
StreamingEncoder::Lz4(encoder)
}
CompressionAlgorithm::Zstd => {
let encoder = zstd::Encoder::new(writer, level.zstd_level())
.map_err(|e| CompressionError::ZstdError(e.to_string()))?;
StreamingEncoder::Zstd(encoder)
}
CompressionAlgorithm::Snappy => {
let encoder = snap::write::FrameEncoder::new(writer);
StreamingEncoder::Snappy(Box::new(encoder))
}
};
Ok(Self { encoder })
}
pub fn write(&mut self, data: &[u8]) -> Result<usize> {
match &mut self.encoder {
StreamingEncoder::None(w) => Ok(w.write(data)?),
StreamingEncoder::Lz4(e) => Ok(e.write(data)?),
StreamingEncoder::Zstd(e) => Ok(e.write(data)?),
StreamingEncoder::Snappy(e) => Ok(e.write(data)?),
}
}
pub fn finish(self) -> Result<W> {
match self.encoder {
StreamingEncoder::None(w) => Ok(w),
StreamingEncoder::Lz4(e) => {
let (w, result) = e.finish();
result.map_err(|e| CompressionError::Lz4Error(e.to_string()))?;
Ok(w)
}
StreamingEncoder::Zstd(e) => e
.finish()
.map_err(|e| CompressionError::ZstdError(e.to_string())),
StreamingEncoder::Snappy(e) => e
.into_inner()
.map_err(|e| CompressionError::SnappyError(e.to_string())),
}
}
}
pub struct StreamingDecompressor<R: Read> {
decoder: StreamingDecoder<R>,
}
enum StreamingDecoder<R: Read> {
Lz4(lz4::Decoder<R>),
Zstd(zstd::Decoder<'static, std::io::BufReader<R>>),
Snappy(snap::read::FrameDecoder<R>),
None(R),
}
impl<R: Read> StreamingDecompressor<R> {
pub fn new(reader: R, algorithm: CompressionAlgorithm) -> Result<Self> {
let decoder = match algorithm {
CompressionAlgorithm::None => StreamingDecoder::None(reader),
CompressionAlgorithm::Lz4 => {
let decoder = lz4::Decoder::new(reader)
.map_err(|e| CompressionError::Lz4Error(e.to_string()))?;
StreamingDecoder::Lz4(decoder)
}
CompressionAlgorithm::Zstd => {
let decoder = zstd::Decoder::new(reader)
.map_err(|e| CompressionError::ZstdError(e.to_string()))?;
StreamingDecoder::Zstd(decoder)
}
CompressionAlgorithm::Snappy => {
let decoder = snap::read::FrameDecoder::new(reader);
StreamingDecoder::Snappy(decoder)
}
};
Ok(Self { decoder })
}
pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
match &mut self.decoder {
StreamingDecoder::None(r) => Ok(r.read(buf)?),
StreamingDecoder::Lz4(d) => Ok(d.read(buf)?),
StreamingDecoder::Zstd(d) => Ok(d.read(buf)?),
StreamingDecoder::Snappy(d) => Ok(d.read(buf)?),
}
}
}
pub struct BatchCompressor {
compressor: Compressor,
buffer: BytesMut,
message_offsets: Vec<u32>,
}
impl BatchCompressor {
pub fn new(config: CompressionConfig) -> Self {
Self {
compressor: Compressor::with_config(config),
buffer: BytesMut::with_capacity(64 * 1024),
message_offsets: Vec::with_capacity(100),
}
}
pub fn add(&mut self, data: &[u8]) {
self.message_offsets.push(self.buffer.len() as u32);
self.buffer.put_u32_le(data.len() as u32);
self.buffer.put_slice(data);
}
pub fn finish(self) -> Result<CompressedBatch> {
let message_count = self.message_offsets.len();
let uncompressed_size = self.buffer.len();
let compressed = self.compressor.compress(&self.buffer)?;
Ok(CompressedBatch {
data: compressed,
message_count,
uncompressed_size,
})
}
pub fn reset(&mut self) {
self.buffer.clear();
self.message_offsets.clear();
}
}
#[derive(Debug, Clone)]
pub struct CompressedBatch {
pub data: Bytes,
pub message_count: usize,
pub uncompressed_size: usize,
}
impl CompressedBatch {
pub fn decompress(&self) -> Result<BatchIterator> {
let compressor = Compressor::new();
let decompressed = compressor.decompress(&self.data)?;
Ok(BatchIterator {
data: decompressed,
position: 0,
})
}
pub fn ratio(&self) -> f32 {
self.data.len() as f32 / self.uncompressed_size as f32
}
}
pub struct BatchIterator {
data: Bytes,
position: usize,
}
impl Iterator for BatchIterator {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
if self.position + 4 > self.data.len() {
return None;
}
let len_bytes: [u8; 4] = self.data[self.position..self.position + 4]
.try_into()
.ok()?;
let len = u32::from_le_bytes(len_bytes) as usize;
self.position += 4;
if self.position + len > self.data.len() {
return None;
}
let message = self.data.slice(self.position..self.position + len);
self.position += len;
Some(message)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compress_decompress_lz4() {
let data = b"Hello, World! This is a test of LZ4 compression. ".repeat(100);
let compressor = Compressor::with_config(CompressionConfig {
algorithm: CompressionAlgorithm::Lz4,
adaptive: false,
..Default::default()
});
let compressed = compressor.compress(&data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = compressor.decompress(&compressed).unwrap();
assert_eq!(&decompressed[..], &data[..]);
}
#[test]
fn test_compress_decompress_zstd() {
let data = b"Hello, World! This is a test of Zstd compression. ".repeat(100);
let compressor = Compressor::with_config(CompressionConfig {
algorithm: CompressionAlgorithm::Zstd,
adaptive: false,
..Default::default()
});
let compressed = compressor.compress(&data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = compressor.decompress(&compressed).unwrap();
assert_eq!(&decompressed[..], &data[..]);
}
#[test]
fn test_small_payload_not_compressed() {
let data = b"tiny";
let compressor = Compressor::new();
let compressed = compressor.compress(data).unwrap();
assert_eq!(compressed.len(), 5);
let decompressed = compressor.decompress(&compressed).unwrap();
assert_eq!(&decompressed[..], &data[..]);
}
#[test]
fn test_adaptive_algorithm_selection() {
let compressor = Compressor::with_config(CompressionConfig {
adaptive: true,
..Default::default()
});
let text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let algo = compressor.select_algorithm(text.as_bytes());
assert_eq!(algo, CompressionAlgorithm::Zstd);
}
#[test]
fn test_batch_compression() {
let config = CompressionConfig::default();
let mut batch = BatchCompressor::new(config);
for i in 0..100 {
let msg = format!("Message {} with some content to compress", i);
batch.add(msg.as_bytes());
}
let compressed = batch.finish().unwrap();
assert!(compressed.ratio() < 0.5);
let messages: Vec<_> = compressed.decompress().unwrap().collect();
assert_eq!(messages.len(), 100);
assert_eq!(&messages[0][..], b"Message 0 with some content to compress");
}
#[test]
fn test_entropy_estimation() {
let low = b"aaaaaaaaaaaaaaaa";
assert!(estimate_entropy(low) < 1.0);
let high: Vec<u8> = (0..=255).collect();
assert!(estimate_entropy(&high) > 7.0);
}
#[test]
fn test_compression_stats() {
let data = b"Test data for compression statistics analysis ".repeat(50);
let compressor = Compressor::new();
let _ = compressor.compress(&data).unwrap();
let stats = compressor.stats();
assert!(
stats.lz4_operations > 0 || stats.zstd_operations > 0 || stats.snappy_operations > 0
);
}
#[test]
fn test_compress_decompress_snappy() {
let data = b"Hello, World! This is a test of Snappy compression. ".repeat(100);
let compressor = Compressor::with_config(CompressionConfig {
algorithm: CompressionAlgorithm::Snappy,
adaptive: false,
..Default::default()
});
let compressed = compressor.compress(&data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = compressor.decompress(&compressed).unwrap();
assert_eq!(&decompressed[..], &data[..]);
}
#[test]
fn test_compression_analysis() {
let data = b"Test data for compression analysis with multiple algorithms ".repeat(50);
let compressor = Compressor::new();
let analysis = compressor.analyze(&data);
assert!(analysis.lz4_size.is_some());
assert!(analysis.zstd_size.is_some());
assert!(analysis.snappy_size.is_some());
assert!(analysis.best_size().unwrap() < data.len());
}
#[test]
fn test_kafka_type_ids() {
assert_eq!(CompressionAlgorithm::None.kafka_type_id(), 0);
assert_eq!(CompressionAlgorithm::Snappy.kafka_type_id(), 2);
assert_eq!(CompressionAlgorithm::Lz4.kafka_type_id(), 3);
assert_eq!(CompressionAlgorithm::Zstd.kafka_type_id(), 4);
assert_eq!(
CompressionAlgorithm::from_kafka_type_id(0),
Some(CompressionAlgorithm::None)
);
assert_eq!(
CompressionAlgorithm::from_kafka_type_id(2),
Some(CompressionAlgorithm::Snappy)
);
assert_eq!(
CompressionAlgorithm::from_kafka_type_id(3),
Some(CompressionAlgorithm::Lz4)
);
assert_eq!(
CompressionAlgorithm::from_kafka_type_id(4),
Some(CompressionAlgorithm::Zstd)
);
}
}