#![cfg(feature = "benchmarks")]
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use crate::{
storage::sstable::compression::{Compression, CompressionAlgorithm},
Config, Platform, Result,
};
use super::{
utils::{generate_test_data, MemoryMonitor, PrecisionTimer},
BenchmarkResult, PRDTargets,
};
pub struct CompressionBenchmarks {
#[allow(dead_code)]
platform: Arc<Platform>,
#[allow(dead_code)]
config: Config,
}
impl CompressionBenchmarks {
pub async fn new(platform: Arc<Platform>, config: &Config) -> Result<Self> {
Ok(Self {
platform,
config: config.clone(),
})
}
pub async fn run_compression_performance_tests(
&self,
_test_data_dir: &Path,
targets: &PRDTargets,
) -> Result<Vec<BenchmarkResult>> {
let mut results = Vec::new();
println!("🗜️ Starting Cassandra 5+ Compression Benchmarks");
let test_sizes = vec![1.0, 10.0, 50.0, 100.0, 500.0];
let algorithms = vec![
CompressionAlgorithm::Lz4,
CompressionAlgorithm::Snappy,
CompressionAlgorithm::Deflate,
];
for &size_mb in &test_sizes {
println!("\n📏 Testing with {:.1} MB test data", size_mb);
let test_data = generate_test_data(size_mb);
println!(" Generated {} bytes of test data", test_data.len());
for algorithm in &algorithms {
let algo_name = format!("{:?}", algorithm);
println!(" 🔄 Testing {} compression...", algo_name);
let compress_result = self
.benchmark_compression(&test_data, *algorithm, size_mb, targets)
.await?;
results.push(compress_result);
let decompress_result = self
.benchmark_decompression(&test_data, *algorithm, size_mb, targets)
.await?;
results.push(decompress_result);
let roundtrip_result = self
.benchmark_roundtrip(&test_data, *algorithm, size_mb, targets)
.await?;
results.push(roundtrip_result);
}
}
if self.should_run_stress_test() {
println!("\n🏋️ Running large file stress test (1GB)");
let stress_results = self.run_large_file_stress_test(targets).await?;
results.extend(stress_results);
}
Ok(results)
}
async fn benchmark_compression(
&self,
test_data: &[u8],
algorithm: CompressionAlgorithm,
size_mb: f64,
targets: &PRDTargets,
) -> Result<BenchmarkResult> {
let algo_name = format!("{:?}", algorithm);
let benchmark_name = format!("Compression_{}_{}MB", algo_name, size_mb);
let compression = Compression::new(algorithm)?;
let mut memory_monitor = MemoryMonitor::new();
for _ in 0..3 {
let _ = compression.compress(&test_data[..1024.min(test_data.len())]);
}
let timer = PrecisionTimer::start();
memory_monitor.sample();
let compressed_data = compression.compress(test_data)?;
memory_monitor.sample();
let duration = timer.elapsed_duration();
let throughput_mb_per_sec = size_mb / duration.as_secs_f64();
let memory_usage_mb = memory_monitor.peak_usage_mb();
let memory_efficiency = size_mb / memory_usage_mb.max(0.1);
let compression_ratio = compressed_data.len() as f64 / test_data.len() as f64;
let meets_target = throughput_mb_per_sec >= targets.parse_speed_mb_per_sec
&& memory_usage_mb <= targets.memory_limit_mb;
let target_comparison = if meets_target {
format!(
"✅ Meets PRD targets ({:.1} MB/s, {:.1} MB)",
throughput_mb_per_sec, memory_usage_mb
)
} else {
format!(
"❌ Below targets ({:.1} MB/s vs {:.1}, {:.1} MB vs {:.1})",
throughput_mb_per_sec,
targets.parse_speed_mb_per_sec,
memory_usage_mb,
targets.memory_limit_mb
)
};
let mut details = HashMap::new();
details.insert("compression_ratio".to_string(), compression_ratio);
details.insert(
"compressed_size_mb".to_string(),
compressed_data.len() as f64 / 1024.0 / 1024.0,
);
details.insert(
"space_saved_percent".to_string(),
(1.0 - compression_ratio) * 100.0,
);
println!(
" ✅ {} compression: {:.2} MB/s, {:.1} MB memory, {:.1}% compression",
algo_name,
throughput_mb_per_sec,
memory_usage_mb,
(1.0 - compression_ratio) * 100.0
);
Ok(BenchmarkResult {
benchmark_name,
file_size_mb: size_mb,
duration,
throughput_mb_per_sec,
memory_usage_mb,
memory_efficiency,
compression_ratio: Some(compression_ratio),
operations_per_second: 1.0 / duration.as_secs_f64(), meets_prd_target: meets_target,
target_comparison,
details,
})
}
async fn benchmark_decompression(
&self,
test_data: &[u8],
algorithm: CompressionAlgorithm,
size_mb: f64,
targets: &PRDTargets,
) -> Result<BenchmarkResult> {
let algo_name = format!("{:?}", algorithm);
let benchmark_name = format!("Decompression_{}_{}MB", algo_name, size_mb);
let compression = Compression::new(algorithm)?;
let compressed_data = compression.compress(test_data)?;
let mut memory_monitor = MemoryMonitor::new();
for _ in 0..3 {
let sample_size = 1024.min(compressed_data.len());
let sample = &compressed_data[..sample_size];
if sample.len() >= 4 {
let _ = compression.decompress(sample);
}
}
let timer = PrecisionTimer::start();
memory_monitor.sample();
let decompressed_data = compression.decompress(&compressed_data)?;
memory_monitor.sample();
let duration = timer.elapsed_duration();
assert_eq!(
decompressed_data.len(),
test_data.len(),
"Decompressed data size mismatch"
);
assert_eq!(
decompressed_data, test_data,
"Decompressed data content mismatch"
);
let throughput_mb_per_sec = size_mb / duration.as_secs_f64();
let memory_usage_mb = memory_monitor.peak_usage_mb();
let memory_efficiency = size_mb / memory_usage_mb.max(0.1);
let compression_ratio = compressed_data.len() as f64 / test_data.len() as f64;
let decompression_target = targets.parse_speed_mb_per_sec * 1.5; let meets_target = throughput_mb_per_sec >= decompression_target
&& memory_usage_mb <= targets.memory_limit_mb;
let target_comparison = if meets_target {
format!(
"✅ Meets PRD targets ({:.1} MB/s, {:.1} MB)",
throughput_mb_per_sec, memory_usage_mb
)
} else {
format!(
"❌ Below targets ({:.1} MB/s vs {:.1}, {:.1} MB vs {:.1})",
throughput_mb_per_sec,
decompression_target,
memory_usage_mb,
targets.memory_limit_mb
)
};
let mut details = HashMap::new();
details.insert("compression_ratio".to_string(), compression_ratio);
details.insert(
"compressed_size_mb".to_string(),
compressed_data.len() as f64 / 1024.0 / 1024.0,
);
details.insert(
"decompression_speed_ratio".to_string(),
throughput_mb_per_sec / targets.parse_speed_mb_per_sec,
);
println!(
" ✅ {} decompression: {:.2} MB/s, {:.1} MB memory",
algo_name, throughput_mb_per_sec, memory_usage_mb
);
Ok(BenchmarkResult {
benchmark_name,
file_size_mb: size_mb,
duration,
throughput_mb_per_sec,
memory_usage_mb,
memory_efficiency,
compression_ratio: Some(compression_ratio),
operations_per_second: 1.0 / duration.as_secs_f64(),
meets_prd_target: meets_target,
target_comparison,
details,
})
}
async fn benchmark_roundtrip(
&self,
test_data: &[u8],
algorithm: CompressionAlgorithm,
size_mb: f64,
targets: &PRDTargets,
) -> Result<BenchmarkResult> {
let algo_name = format!("{:?}", algorithm);
let benchmark_name = format!("Roundtrip_{}_{}MB", algo_name, size_mb);
let compression = Compression::new(algorithm)?;
let mut memory_monitor = MemoryMonitor::new();
let timer = PrecisionTimer::start();
memory_monitor.sample();
let compressed_data = compression.compress(test_data)?;
memory_monitor.sample();
let decompressed_data = compression.decompress(&compressed_data)?;
memory_monitor.sample();
let duration = timer.elapsed_duration();
assert_eq!(
decompressed_data, test_data,
"Round-trip data integrity check failed"
);
let throughput_mb_per_sec = size_mb / duration.as_secs_f64();
let memory_usage_mb = memory_monitor.peak_usage_mb();
let memory_efficiency = size_mb / memory_usage_mb.max(0.1);
let compression_ratio = compressed_data.len() as f64 / test_data.len() as f64;
let roundtrip_target = targets.parse_speed_mb_per_sec * 0.75;
let meets_target =
throughput_mb_per_sec >= roundtrip_target && memory_usage_mb <= targets.memory_limit_mb;
let target_comparison = if meets_target {
format!(
"✅ Meets PRD targets ({:.1} MB/s, {:.1} MB)",
throughput_mb_per_sec, memory_usage_mb
)
} else {
format!(
"❌ Below targets ({:.1} MB/s vs {:.1}, {:.1} MB vs {:.1})",
throughput_mb_per_sec, roundtrip_target, memory_usage_mb, targets.memory_limit_mb
)
};
let mut details = HashMap::new();
details.insert("compression_ratio".to_string(), compression_ratio);
details.insert(
"space_saved_mb".to_string(),
(test_data.len() - compressed_data.len()) as f64 / 1024.0 / 1024.0,
);
details.insert(
"roundtrip_efficiency".to_string(),
throughput_mb_per_sec / targets.parse_speed_mb_per_sec,
);
println!(
" ✅ {} round-trip: {:.2} MB/s, {:.1} MB memory",
algo_name, throughput_mb_per_sec, memory_usage_mb
);
Ok(BenchmarkResult {
benchmark_name,
file_size_mb: size_mb,
duration,
throughput_mb_per_sec,
memory_usage_mb,
memory_efficiency,
compression_ratio: Some(compression_ratio),
operations_per_second: 1.0 / duration.as_secs_f64(),
meets_prd_target: meets_target,
target_comparison,
details,
})
}
async fn run_large_file_stress_test(
&self,
targets: &PRDTargets,
) -> Result<Vec<BenchmarkResult>> {
let mut results = Vec::new();
let algorithms = vec![
CompressionAlgorithm::Lz4, CompressionAlgorithm::Snappy, ];
for algorithm in algorithms {
let algo_name = format!("{:?}", algorithm);
println!(" 🏋️ Large file stress test: {} with 1GB data", algo_name);
let chunk_size_mb = 50.0; let num_chunks = (1000.0 / chunk_size_mb) as usize;
let benchmark_name = format!("LargeFile_{}_1GB", algo_name);
let compression = Compression::new(algorithm)?;
let mut memory_monitor = MemoryMonitor::new();
let timer = PrecisionTimer::start();
let mut total_original_size = 0;
let mut total_compressed_size = 0;
memory_monitor.sample();
for chunk_idx in 0..num_chunks {
let chunk_data = generate_test_data(chunk_size_mb);
total_original_size += chunk_data.len();
let compressed_chunk = compression.compress(&chunk_data)?;
total_compressed_size += compressed_chunk.len();
let decompressed_chunk = compression.decompress(&compressed_chunk)?;
assert_eq!(decompressed_chunk.len(), chunk_data.len());
memory_monitor.sample();
if chunk_idx % 5 == 0 {
println!(
" Progress: {}/{} chunks processed",
chunk_idx + 1,
num_chunks
);
}
}
let duration = timer.elapsed_duration();
let size_gb = total_original_size as f64 / 1024.0 / 1024.0 / 1024.0;
let throughput_mb_per_sec =
(total_original_size as f64 / 1024.0 / 1024.0) / duration.as_secs_f64();
let memory_usage_mb = memory_monitor.peak_usage_mb();
let compression_ratio = total_compressed_size as f64 / total_original_size as f64;
let large_file_speed_target = targets.parse_speed_mb_per_sec * 0.8; let meets_target = throughput_mb_per_sec >= large_file_speed_target
&& memory_usage_mb <= targets.memory_limit_mb;
let target_comparison = if meets_target {
format!(
"✅ Meets large file targets ({:.1} MB/s, {:.1} MB)",
throughput_mb_per_sec, memory_usage_mb
)
} else {
format!(
"❌ Below large file targets ({:.1} MB/s vs {:.1}, {:.1} MB vs {:.1})",
throughput_mb_per_sec,
large_file_speed_target,
memory_usage_mb,
targets.memory_limit_mb
)
};
let mut details = HashMap::new();
details.insert("compression_ratio".to_string(), compression_ratio);
details.insert("total_size_gb".to_string(), size_gb);
details.insert("chunks_processed".to_string(), num_chunks as f64);
details.insert(
"space_saved_mb".to_string(),
(total_original_size - total_compressed_size) as f64 / 1024.0 / 1024.0,
);
println!(
" ✅ {} 1GB test: {:.2} MB/s, {:.1} MB memory, {:.1}% compression",
algo_name,
throughput_mb_per_sec,
memory_usage_mb,
(1.0 - compression_ratio) * 100.0
);
results.push(BenchmarkResult {
benchmark_name,
file_size_mb: 1024.0, duration,
throughput_mb_per_sec,
memory_usage_mb,
memory_efficiency: 1024.0 / memory_usage_mb.max(0.1),
compression_ratio: Some(compression_ratio),
operations_per_second: num_chunks as f64 / duration.as_secs_f64(),
meets_prd_target: meets_target,
target_comparison,
details,
});
}
Ok(results)
}
fn should_run_stress_test(&self) -> bool {
let available_memory_gb = self.get_available_memory_gb();
available_memory_gb >= 4.0
}
fn get_available_memory_gb(&self) -> f64 {
#[cfg(target_os = "macos")]
{
self.get_available_memory_macos()
}
#[cfg(target_os = "linux")]
{
self.get_available_memory_linux()
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
8.0 }
}
#[cfg(target_os = "macos")]
fn get_available_memory_macos(&self) -> f64 {
use std::process::{Command, Stdio};
let output = Command::new("vm_stat").stdout(Stdio::piped()).output();
if let Ok(output) = output {
let _vm_stat = String::from_utf8_lossy(&output.stdout);
8.0 } else {
8.0
}
}
#[cfg(target_os = "linux")]
fn get_available_memory_linux(&self) -> f64 {
use std::fs;
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
for line in meminfo.lines() {
if line.starts_with("MemAvailable:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(kb) = parts[1].parse::<f64>() {
return kb / 1024.0 / 1024.0; }
}
}
}
}
8.0 }
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(unused_imports)]
use tempfile::TempDir;
#[tokio::test]
async fn test_compression_benchmarks_creation() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = CompressionBenchmarks::new(platform, &config).await;
assert!(benchmarks.is_ok());
}
#[tokio::test]
async fn test_compression_benchmark() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = CompressionBenchmarks::new(platform, &config).await.unwrap();
let test_data = generate_test_data(1.0); let targets = PRDTargets::default();
let result = benchmarks
.benchmark_compression(&test_data, CompressionAlgorithm::Lz4, 1.0, &targets)
.await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.file_size_mb, 1.0);
assert!(result.throughput_mb_per_sec > 0.0);
assert!(result.compression_ratio.is_some());
}
#[tokio::test]
async fn test_decompression_benchmark() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = CompressionBenchmarks::new(platform, &config).await.unwrap();
let test_data = generate_test_data(1.0); let targets = PRDTargets::default();
let result = benchmarks
.benchmark_decompression(&test_data, CompressionAlgorithm::Lz4, 1.0, &targets)
.await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.file_size_mb, 1.0);
assert!(result.throughput_mb_per_sec > 0.0);
}
#[tokio::test]
async fn test_roundtrip_benchmark() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = CompressionBenchmarks::new(platform, &config).await.unwrap();
let test_data = generate_test_data(1.0); let targets = PRDTargets::default();
let result = benchmarks
.benchmark_roundtrip(&test_data, CompressionAlgorithm::Lz4, 1.0, &targets)
.await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.file_size_mb, 1.0);
assert!(result.throughput_mb_per_sec > 0.0);
assert!(result.compression_ratio.is_some());
}
}