use crate::error::{CacheError, CacheResult};
use crate::config::L2Config;
use bytes::Bytes;
use lz4::{Decoder, EncoderBuilder};
use std::io::{Read, Write};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct Compressor {
l2_config: Arc<L2Config>,
}
#[derive(Debug, Clone)]
pub struct CompressionResult {
pub compressed_data: Bytes,
pub original_size: usize,
pub compressed_size: usize,
pub compression_ratio: f64,
pub is_compressed: bool,
}
#[derive(Debug, Clone)]
pub struct DecompressionResult {
pub data: Bytes,
pub size: usize,
}
impl Compressor {
pub fn new_from_l2_config(l2_config: &L2Config) -> Self {
Self {
l2_config: Arc::new(l2_config.clone()),
}
}
pub fn new_disabled() -> Self {
let disabled_config = L2Config {
enable_l2_cache: false,
data_dir: None,
clear_on_startup: false,
max_disk_size: 0,
write_buffer_size: 0,
max_write_buffer_number: 0,
block_cache_size: 0,
background_threads: 0,
enable_lz4: false,
compression_threshold: 0,
compression_max_threshold: 0,
compression_level: 1,
cache_size_mb: 0,
max_file_size_mb: 0,
smart_flush_enabled: false,
smart_flush_base_interval_ms: 0,
smart_flush_min_interval_ms: 0,
smart_flush_max_interval_ms: 0,
smart_flush_write_rate_threshold: 0,
smart_flush_accumulated_bytes_threshold: 0,
cache_warmup_strategy: crate::config::CacheWarmupStrategy::None,
zstd_compression_level: None,
l2_write_strategy: "never".to_string(),
l2_write_threshold: 0,
l2_write_ttl_threshold: 0,
};
Self {
l2_config: Arc::new(disabled_config),
}
}
pub fn compress(&self, data: &[u8]) -> CacheResult<CompressionResult> {
let original_size = data.len();
if !self.should_compress(data) {
return Ok(CompressionResult {
compressed_data: Bytes::copy_from_slice(data),
original_size,
compressed_size: original_size,
compression_ratio: 1.0,
is_compressed: false,
});
}
let compressed_data = self.compress_lz4(data)?;
let compressed_size = compressed_data.len();
let compression_ratio = compressed_size as f64 / original_size as f64;
if compression_ratio >= 0.8 {
Ok(CompressionResult {
compressed_data: Bytes::copy_from_slice(data),
original_size,
compressed_size: original_size,
compression_ratio: 1.0,
is_compressed: false,
})
} else {
Ok(CompressionResult {
compressed_data: Bytes::from(compressed_data),
original_size,
compressed_size,
compression_ratio,
is_compressed: true,
})
}
}
pub fn decompress(&self, compressed_data: &[u8], is_compressed: bool) -> CacheResult<DecompressionResult> {
if !is_compressed {
return Ok(DecompressionResult {
data: Bytes::copy_from_slice(compressed_data),
size: compressed_data.len(),
});
}
let decompressed_data = self.decompress_lz4(compressed_data)?;
let size = decompressed_data.len();
Ok(DecompressionResult {
data: Bytes::from(decompressed_data),
size,
})
}
fn should_compress(&self, data: &[u8]) -> bool {
if !self.l2_config.enable_lz4 {
return false;
}
let data_size = data.len();
if data_size < self.l2_config.compression_threshold {
return false;
}
if data_size > self.l2_config.compression_max_threshold {
return false;
}
true
}
fn estimate_compressibility(&self, data: &[u8]) -> bool {
if data.len() < 64 {
return false;
}
let sample_size = std::cmp::min(256, data.len());
let sample = &data[..sample_size];
let mut freq = [0u32; 256];
for &byte in sample {
freq[byte as usize] += 1;
}
let unique_bytes = freq.iter().filter(|&&count| count > 0).count();
let uniqueness_ratio = unique_bytes as f64 / 256.0;
uniqueness_ratio >= 0.1 && uniqueness_ratio <= 0.8
}
fn compress_lz4(&self, data: &[u8]) -> CacheResult<Vec<u8>> {
let mut encoder = EncoderBuilder::new()
.level(self.l2_config.compression_level as u32)
.build(Vec::new())
.map_err(|e| CacheError::compression_error(&format!("创建 LZ4 编码器失败: {}", e)))?;
encoder.write_all(data)
.map_err(|e| CacheError::compression_error(&format!("LZ4 压缩写入失败: {}", e)))?;
let (compressed_data, result) = encoder.finish();
result.map_err(|e| CacheError::compression_error(&format!("LZ4 压缩完成失败: {}", e)))?;
Ok(compressed_data)
}
fn decompress_lz4(&self, compressed_data: &[u8]) -> CacheResult<Vec<u8>> {
let mut decoder = Decoder::new(compressed_data)
.map_err(|e| CacheError::compression_error(&format!("创建 LZ4 解码器失败: {}", e)))?;
let mut decompressed_data = Vec::new();
decoder.read_to_end(&mut decompressed_data)
.map_err(|e| CacheError::compression_error(&format!("LZ4 解压缩失败: {}", e)))?;
Ok(decompressed_data)
}
pub fn config(&self) -> &L2Config {
&self.l2_config
}
pub fn estimate_compressed_size(&self, original_size: usize) -> usize {
if original_size < self.l2_config.compression_threshold {
return original_size;
}
let estimated_ratio = match self.l2_config.compression_level {
1..=3 => 0.7, 4..=6 => 0.6, 7..=9 => 0.5, 10..=12 => 0.4, _ => 0.6, };
(original_size as f64 * estimated_ratio) as usize
}
}
#[derive(Debug, Clone, Default)]
pub struct CompressionStats {
pub total_compressions: u64,
pub total_decompressions: u64,
pub total_original_bytes: u64,
pub total_compressed_bytes: u64,
pub skipped_compressions: u64,
pub compression_failures: u64,
pub decompression_failures: u64,
}
impl CompressionStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_compression(&mut self, result: &CompressionResult) {
self.total_compressions += 1;
self.total_original_bytes += result.original_size as u64;
if result.is_compressed {
self.total_compressed_bytes += result.compressed_size as u64;
} else {
self.skipped_compressions += 1;
self.total_compressed_bytes += result.original_size as u64;
}
}
pub fn record_decompression(&mut self, _size: usize) {
self.total_decompressions += 1;
}
pub fn record_compression_failure(&mut self) {
self.compression_failures += 1;
}
pub fn record_decompression_failure(&mut self) {
self.decompression_failures += 1;
}
pub fn overall_compression_ratio(&self) -> f64 {
if self.total_original_bytes == 0 {
return 1.0;
}
self.total_compressed_bytes as f64 / self.total_original_bytes as f64
}
pub fn bytes_saved(&self) -> u64 {
if self.total_compressed_bytes <= self.total_original_bytes {
self.total_original_bytes - self.total_compressed_bytes
} else {
0
}
}
pub fn compression_success_rate(&self) -> f64 {
if self.total_compressions == 0 {
return 0.0;
}
let successful = self.total_compressions - self.compression_failures;
successful as f64 / self.total_compressions as f64
}
pub fn decompression_success_rate(&self) -> f64 {
if self.total_decompressions == 0 {
return 0.0;
}
let successful = self.total_decompressions - self.decompression_failures;
successful as f64 / self.total_decompressions as f64
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::L2Config;
fn create_test_compressor() -> Compressor {
let config = L2Config {
enable_l2_cache: true,
data_dir: None,
clear_on_startup: false,
max_disk_size: 1024 * 1024 * 1024,
write_buffer_size: 64 * 1024 * 1024,
max_write_buffer_number: 3,
block_cache_size: 32 * 1024 * 1024,
background_threads: 2,
enable_lz4: true,
compression_threshold: 100,
compression_max_threshold: 1024 * 1024,
compression_level: 4,
cache_size_mb: 512,
max_file_size_mb: 1024,
smart_flush_enabled: true,
smart_flush_base_interval_ms: 100,
smart_flush_min_interval_ms: 20,
smart_flush_max_interval_ms: 500,
smart_flush_write_rate_threshold: 10000,
smart_flush_accumulated_bytes_threshold: 4 * 1024 * 1024,
cache_warmup_strategy: crate::config::CacheWarmupStrategy::None,
zstd_compression_level: None,
l2_write_strategy: "write_through".to_string(),
l2_write_threshold: 1024,
l2_write_ttl_threshold: 300,
};
Compressor::new_from_l2_config(&config)
}
#[test]
fn test_compress_small_data() {
let compressor = create_test_compressor();
let data = b"small";
let result = compressor.compress(data).unwrap();
assert!(!result.is_compressed);
assert_eq!(result.compressed_data.as_ref(), data);
}
#[test]
fn test_compress_large_data() {
let config = L2Config {
enable_l2_cache: true,
data_dir: None,
clear_on_startup: false,
max_disk_size: 1024 * 1024 * 1024,
write_buffer_size: 64 * 1024 * 1024,
max_write_buffer_number: 3,
block_cache_size: 32 * 1024 * 1024,
background_threads: 2,
enable_lz4: true,
compression_threshold: 100,
compression_max_threshold: 1024 * 1024,
compression_level: 4,
cache_size_mb: 512,
max_file_size_mb: 1024,
smart_flush_enabled: true,
smart_flush_base_interval_ms: 100,
smart_flush_min_interval_ms: 20,
smart_flush_max_interval_ms: 500,
smart_flush_write_rate_threshold: 10000,
smart_flush_accumulated_bytes_threshold: 4 * 1024 * 1024,
cache_warmup_strategy: crate::config::CacheWarmupStrategy::None,
zstd_compression_level: None,
l2_write_strategy: "write_through".to_string(),
l2_write_threshold: 1024,
l2_write_ttl_threshold: 300,
};
let compressor = Compressor::new_from_l2_config(&config);
let data = b"Hello, World! This is a test string that should be compressed.".repeat(20);
let result = compressor.compress(&data).unwrap();
assert!(result.is_compressed);
assert!(result.compressed_size < result.original_size);
}
#[test]
fn test_compress_decompress_roundtrip() {
let compressor = create_test_compressor();
let original_data = b"Hello, World! This is a test string that should be compressed.".repeat(10);
let compress_result = compressor.compress(&original_data).unwrap();
let decompress_result = compressor.decompress(
&compress_result.compressed_data,
compress_result.is_compressed
).unwrap();
assert_eq!(decompress_result.data.as_ref(), original_data.as_slice());
}
#[test]
fn test_compression_stats() {
let mut stats = CompressionStats::new();
let result = CompressionResult {
compressed_data: Bytes::from(vec![1, 2, 3]),
original_size: 100,
compressed_size: 50,
compression_ratio: 0.5,
is_compressed: true,
};
stats.record_compression(&result);
assert_eq!(stats.total_compressions, 1);
assert_eq!(stats.total_original_bytes, 100);
assert_eq!(stats.total_compressed_bytes, 50);
assert_eq!(stats.overall_compression_ratio(), 0.5);
assert_eq!(stats.bytes_saved(), 50);
}
}