use crate::error::{CacheError, Result};
use bytes::Bytes;
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum CompressionCodec {
None,
Lz4,
Zstd,
Snappy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionLevel {
Fast,
Default,
Best,
}
impl CompressionLevel {
pub fn to_zstd_level(&self) -> i32 {
match self {
CompressionLevel::Fast => 1,
CompressionLevel::Default => 3,
CompressionLevel::Best => 19,
}
}
}
#[derive(Debug, Clone)]
pub struct CompressionStats {
pub original_size: usize,
pub compressed_size: usize,
pub compression_time_us: u64,
pub decompression_time_us: u64,
pub codec: CompressionCodec,
}
impl CompressionStats {
pub fn compression_ratio(&self) -> f64 {
if self.compressed_size > 0 {
self.original_size as f64 / self.compressed_size as f64
} else {
1.0
}
}
pub fn compression_throughput_mbps(&self) -> f64 {
if self.compression_time_us > 0 {
let mb = self.original_size as f64 / (1024.0 * 1024.0);
let seconds = self.compression_time_us as f64 / 1_000_000.0;
mb / seconds
} else {
0.0
}
}
pub fn decompression_throughput_mbps(&self) -> f64 {
if self.decompression_time_us > 0 {
let mb = self.compressed_size as f64 / (1024.0 * 1024.0);
let seconds = self.decompression_time_us as f64 / 1_000_000.0;
mb / seconds
} else {
0.0
}
}
pub fn efficiency_score(&self) -> f64 {
self.compression_ratio() * self.compression_throughput_mbps()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DataType {
Binary,
Text,
Image,
Numerical,
Compressed,
}
pub struct AdaptiveCompressor {
performance_history: HashMap<(CompressionCodec, DataType), Vec<CompressionStats>>,
default_level: CompressionLevel,
min_compress_size: usize,
max_history: usize,
}
impl AdaptiveCompressor {
pub fn new() -> Self {
Self {
performance_history: HashMap::new(),
default_level: CompressionLevel::Default,
min_compress_size: 1024, max_history: 100,
}
}
pub fn with_level(mut self, level: CompressionLevel) -> Self {
self.default_level = level;
self
}
pub fn with_min_size(mut self, size: usize) -> Self {
self.min_compress_size = size;
self
}
pub fn compress(
&mut self,
data: &[u8],
codec: CompressionCodec,
data_type: DataType,
) -> Result<Bytes> {
if data.len() < self.min_compress_size {
return Ok(Bytes::copy_from_slice(data));
}
let start = std::time::Instant::now();
let compressed = match codec {
CompressionCodec::None => Bytes::copy_from_slice(data),
CompressionCodec::Lz4 => self.compress_lz4(data)?,
CompressionCodec::Zstd => self.compress_zstd(data)?,
CompressionCodec::Snappy => self.compress_snappy(data)?,
};
let compression_time_us = start.elapsed().as_micros() as u64;
let stats = CompressionStats {
original_size: data.len(),
compressed_size: compressed.len(),
compression_time_us,
decompression_time_us: 0,
codec,
};
self.record_stats(data_type, stats);
Ok(compressed)
}
pub fn decompress(&mut self, data: &[u8], codec: CompressionCodec) -> Result<Bytes> {
let start = std::time::Instant::now();
let decompressed = match codec {
CompressionCodec::None => Bytes::copy_from_slice(data),
CompressionCodec::Lz4 => self.decompress_lz4(data)?,
CompressionCodec::Zstd => self.decompress_zstd(data)?,
CompressionCodec::Snappy => self.decompress_snappy(data)?,
};
let _decompression_time_us = start.elapsed().as_micros() as u64;
Ok(decompressed)
}
pub fn select_codec(&self, data_type: DataType) -> CompressionCodec {
let mut best_codec = CompressionCodec::Lz4; let mut best_score = 0.0;
for codec in &[
CompressionCodec::Lz4,
CompressionCodec::Zstd,
CompressionCodec::Snappy,
] {
if let Some(stats_vec) = self.performance_history.get(&(*codec, data_type)) {
if !stats_vec.is_empty() {
let avg_score: f64 =
stats_vec.iter().map(|s| s.efficiency_score()).sum::<f64>()
/ stats_vec.len() as f64;
if avg_score > best_score {
best_score = avg_score;
best_codec = *codec;
}
}
}
}
if best_score == 0.0 {
return self.heuristic_codec(data_type);
}
best_codec
}
fn heuristic_codec(&self, data_type: DataType) -> CompressionCodec {
match data_type {
DataType::Binary => CompressionCodec::Lz4,
DataType::Text => CompressionCodec::Zstd,
DataType::Image => CompressionCodec::Lz4,
DataType::Numerical => CompressionCodec::Zstd,
DataType::Compressed => CompressionCodec::None,
}
}
fn compress_lz4(&self, data: &[u8]) -> Result<Bytes> {
let compressed =
oxiarc_lz4::compress_block(data).map_err(|e| CacheError::Compression(e.to_string()))?;
let orig_size = data.len() as i32;
let mut result = Vec::with_capacity(4 + compressed.len());
result.extend_from_slice(&orig_size.to_le_bytes());
result.extend_from_slice(&compressed);
Ok(Bytes::from(result))
}
fn decompress_lz4(&self, data: &[u8]) -> Result<Bytes> {
if data.len() < 4 {
return Err(CacheError::Decompression("LZ4 data too short".to_string()));
}
let orig_size = i32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let decompressed = oxiarc_lz4::decompress_block(&data[4..], orig_size)
.map_err(|e| CacheError::Decompression(e.to_string()))?;
Ok(Bytes::from(decompressed))
}
fn compress_zstd(&self, data: &[u8]) -> Result<Bytes> {
let level = self.default_level.to_zstd_level();
oxiarc_zstd::encode_all(data, level)
.map(Bytes::from)
.map_err(|e| CacheError::Compression(e.to_string()))
}
fn decompress_zstd(&self, data: &[u8]) -> Result<Bytes> {
oxiarc_zstd::decode_all(data)
.map(Bytes::from)
.map_err(|e| CacheError::Decompression(e.to_string()))
}
fn compress_snappy(&self, data: &[u8]) -> Result<Bytes> {
Ok(Bytes::from(oxiarc_snappy::compress(data)))
}
fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
oxiarc_snappy::decompress(data)
.map(Bytes::from)
.map_err(|e| CacheError::Decompression(e.to_string()))
}
fn record_stats(&mut self, data_type: DataType, stats: CompressionStats) {
let key = (stats.codec, data_type);
let history = self.performance_history.entry(key).or_default();
history.push(stats);
if history.len() > self.max_history {
history.remove(0);
}
}
pub fn avg_compression_ratio(
&self,
codec: CompressionCodec,
data_type: DataType,
) -> Option<f64> {
self.performance_history
.get(&(codec, data_type))
.and_then(|stats_vec| {
if stats_vec.is_empty() {
None
} else {
let avg = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
/ stats_vec.len() as f64;
Some(avg)
}
})
}
pub fn get_performance_stats(
&self,
) -> HashMap<(CompressionCodec, DataType), PerformanceMetrics> {
let mut result = HashMap::new();
for (key, stats_vec) in &self.performance_history {
if stats_vec.is_empty() {
continue;
}
let avg_ratio = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
/ stats_vec.len() as f64;
let avg_comp_throughput = stats_vec
.iter()
.map(|s| s.compression_throughput_mbps())
.sum::<f64>()
/ stats_vec.len() as f64;
let avg_decomp_throughput = stats_vec
.iter()
.filter(|s| s.decompression_time_us > 0)
.map(|s| s.decompression_throughput_mbps())
.sum::<f64>()
/ stats_vec.len() as f64;
result.insert(
*key,
PerformanceMetrics {
avg_compression_ratio: avg_ratio,
avg_compression_throughput_mbps: avg_comp_throughput,
avg_decompression_throughput_mbps: avg_decomp_throughput,
sample_count: stats_vec.len(),
},
);
}
result
}
pub fn clear_history(&mut self) {
self.performance_history.clear();
}
}
impl Default for AdaptiveCompressor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub avg_compression_ratio: f64,
pub avg_compression_throughput_mbps: f64,
pub avg_decompression_throughput_mbps: f64,
pub sample_count: usize,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CompressedData {
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
pub codec: CompressionCodec,
pub original_size: usize,
}
impl CompressedData {
pub fn new(data: Vec<u8>, codec: CompressionCodec, original_size: usize) -> Self {
Self {
data,
codec,
original_size,
}
}
pub fn decompress(&self, compressor: &mut AdaptiveCompressor) -> Result<Bytes> {
let decompressed = compressor.decompress(&self.data, self.codec)?;
if decompressed.len() != self.original_size {
return Err(CacheError::Decompression(format!(
"Size mismatch: expected {}, got {}",
self.original_size,
decompressed.len()
)));
}
Ok(decompressed)
}
pub fn compressed_size(&self) -> usize {
self.data.len()
}
pub fn compression_ratio(&self) -> f64 {
if !self.data.is_empty() {
self.original_size as f64 / self.data.len() as f64
} else {
1.0
}
}
}
mod serde_bytes {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
bytes.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
Vec::<u8>::deserialize(deserializer)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lz4_compression() {
let mut compressor = AdaptiveCompressor::new();
let data = b"Hello, World! ".repeat(100);
let compressed = compressor
.compress(&data, CompressionCodec::Lz4, DataType::Text)
.expect("compression failed");
assert!(compressed.len() < data.len());
let decompressed = compressor
.decompress(&compressed, CompressionCodec::Lz4)
.expect("decompression failed");
assert_eq!(decompressed.as_ref(), &data[..]);
}
#[test]
fn test_zstd_compression() {
let mut compressor = AdaptiveCompressor::new();
let data = b"Test data for compression ".repeat(50);
let compressed = compressor
.compress(&data, CompressionCodec::Zstd, DataType::Text)
.expect("compression failed");
assert!(compressed.len() < data.len());
let decompressed = compressor
.decompress(&compressed, CompressionCodec::Zstd)
.expect("decompression failed");
assert_eq!(decompressed.as_ref(), &data[..]);
}
#[test]
fn test_snappy_compression() {
let mut compressor = AdaptiveCompressor::new();
let data = b"Snappy compression test ".repeat(50);
let compressed = compressor
.compress(&data, CompressionCodec::Snappy, DataType::Binary)
.expect("compression failed");
assert!(compressed.len() < data.len());
let decompressed = compressor
.decompress(&compressed, CompressionCodec::Snappy)
.expect("decompression failed");
assert_eq!(decompressed.as_ref(), &data[..]);
}
#[test]
fn test_codec_selection() {
let compressor = AdaptiveCompressor::new();
assert_eq!(
compressor.select_codec(DataType::Text),
CompressionCodec::Zstd
);
assert_eq!(
compressor.select_codec(DataType::Binary),
CompressionCodec::Lz4
);
assert_eq!(
compressor.select_codec(DataType::Compressed),
CompressionCodec::None
);
}
#[test]
fn test_min_compress_size() {
let mut compressor = AdaptiveCompressor::new().with_min_size(1000);
let small_data = b"small";
let result = compressor
.compress(small_data, CompressionCodec::Lz4, DataType::Binary)
.expect("compression failed");
assert_eq!(result.len(), small_data.len());
}
#[test]
fn test_compression_stats() {
let stats = CompressionStats {
original_size: 1000,
compressed_size: 500,
compression_time_us: 1000,
decompression_time_us: 500,
codec: CompressionCodec::Lz4,
};
approx::assert_relative_eq!(stats.compression_ratio(), 2.0, epsilon = 0.01);
assert!(stats.compression_throughput_mbps() > 0.0);
assert!(stats.decompression_throughput_mbps() > 0.0);
}
#[test]
fn test_compressed_data() {
let mut compressor = AdaptiveCompressor::new();
let original = b"Test data for compression ratio validation! ".repeat(100);
let compressed_bytes = compressor
.compress(&original, CompressionCodec::Zstd, DataType::Binary)
.expect("compression failed");
let compressed_data = CompressedData::new(
compressed_bytes.to_vec(),
CompressionCodec::Zstd,
original.len(),
);
assert!(compressed_data.compression_ratio() > 1.0);
let decompressed = compressed_data
.decompress(&mut compressor)
.expect("decompression failed");
assert_eq!(decompressed.as_ref(), &original[..]);
}
}