#![cfg(feature = "benchmarks")]
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use crate::{Config, Platform, Result};
use super::{
utils::{generate_test_data, MemoryMonitor, PrecisionTimer},
BenchmarkResult, PRDTargets,
};
pub struct MemoryBenchmarks {
#[allow(dead_code)]
platform: Arc<Platform>,
#[allow(dead_code)]
config: Config,
}
impl MemoryBenchmarks {
pub async fn new(platform: Arc<Platform>, config: &Config) -> Result<Self> {
Ok(Self {
platform,
config: config.clone(),
})
}
pub async fn run_comprehensive_memory_tests(
&self,
_test_data_dir: &Path,
targets: &PRDTargets,
) -> Result<Vec<BenchmarkResult>> {
let mut results = Vec::new();
println!("💾 Starting Cassandra 5+ Memory Usage Benchmarks");
let test_sizes_mb = vec![
1.0, 10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, ];
println!(
"📊 Testing memory usage across file sizes: {:?} MB",
test_sizes_mb
);
for &size_mb in &test_sizes_mb {
println!("\n📏 Memory benchmark for {:.0} MB file", size_mb);
if size_mb >= 500.0 && !self.has_sufficient_memory(size_mb) {
println!(
" ⏭️ Skipping {:.0} MB test due to insufficient system memory",
size_mb
);
continue;
}
let memory_tests = vec![
("Sequential", MemoryAccessPattern::Sequential),
("Random", MemoryAccessPattern::Random),
("Streaming", MemoryAccessPattern::Streaming),
("Chunked", MemoryAccessPattern::Chunked),
];
for (pattern_name, pattern) in memory_tests {
let result = self
.benchmark_memory_usage(size_mb, pattern, pattern_name, targets)
.await?;
results.push(result);
}
}
let efficiency_result = self.analyze_memory_efficiency(&results, targets).await?;
results.push(efficiency_result);
let leak_result = self.detect_memory_leaks(targets).await?;
results.push(leak_result);
Ok(results)
}
async fn benchmark_memory_usage(
&self,
size_mb: f64,
pattern: MemoryAccessPattern,
pattern_name: &str,
targets: &PRDTargets,
) -> Result<BenchmarkResult> {
let benchmark_name = format!("Memory_{}_{:.0}MB", pattern_name, size_mb);
let test_data = generate_test_data(size_mb);
let mut memory_monitor = MemoryMonitor::new();
#[cfg(not(target_env = "msvc"))]
{
std::hint::black_box(&test_data);
}
let timer = PrecisionTimer::start();
memory_monitor.sample();
let operations_count = match pattern {
MemoryAccessPattern::Sequential => {
self.sequential_access(&test_data, &mut memory_monitor)
.await?
}
MemoryAccessPattern::Random => {
self.random_access(&test_data, &mut memory_monitor).await?
}
MemoryAccessPattern::Streaming => {
self.streaming_access(&test_data, &mut memory_monitor)
.await?
}
MemoryAccessPattern::Chunked => {
self.chunked_access(&test_data, &mut memory_monitor).await?
}
};
let duration = timer.elapsed_duration();
let memory_usage_mb = memory_monitor.peak_usage_mb();
let avg_memory_mb = memory_monitor.average_usage_mb();
let throughput_mb_per_sec = size_mb / duration.as_secs_f64();
let memory_efficiency = size_mb / memory_usage_mb.max(0.1);
let operations_per_second = operations_count as f64 / duration.as_secs_f64();
let meets_target = memory_usage_mb <= targets.memory_limit_mb
&& throughput_mb_per_sec >= targets.parse_speed_mb_per_sec * 0.5;
let target_comparison = if meets_target {
format!(
"✅ Meets memory targets ({:.1} MB ≤ {})",
memory_usage_mb, targets.memory_limit_mb
)
} else {
format!(
"❌ Exceeds memory limit ({:.1} MB > {})",
memory_usage_mb, targets.memory_limit_mb
)
};
let mut details = HashMap::new();
details.insert("peak_memory_mb".to_string(), memory_usage_mb);
details.insert("average_memory_mb".to_string(), avg_memory_mb);
details.insert("memory_per_mb_ratio".to_string(), memory_usage_mb / size_mb);
details.insert("access_pattern".to_string(), pattern as u8 as f64);
println!(
" ✅ {} {:.0}MB: {:.1} MB peak memory ({:.2}x file size), {:.2} MB/s",
pattern_name,
size_mb,
memory_usage_mb,
memory_usage_mb / size_mb,
throughput_mb_per_sec
);
Ok(BenchmarkResult {
benchmark_name,
file_size_mb: size_mb,
duration,
throughput_mb_per_sec,
memory_usage_mb,
memory_efficiency,
compression_ratio: None,
operations_per_second,
meets_prd_target: meets_target,
target_comparison,
details,
})
}
async fn sequential_access(&self, data: &[u8], monitor: &mut MemoryMonitor) -> Result<usize> {
let chunk_size = 64 * 1024; let mut operations = 0;
let mut position = 0;
while position < data.len() {
let end = (position + chunk_size).min(data.len());
let _chunk = &data[position..end];
std::hint::black_box(_chunk);
operations += 1;
position += chunk_size;
if operations % 100 == 0 {
monitor.sample();
}
}
monitor.sample();
Ok(operations)
}
async fn random_access(&self, data: &[u8], monitor: &mut MemoryMonitor) -> Result<usize> {
let chunk_size = 4 * 1024; let num_operations = 1000.min(data.len() / chunk_size);
let mut seed = 12345u64;
for i in 0..num_operations {
seed = seed.wrapping_mul(1_103_515_245).wrapping_add(12345);
let offset = (seed as usize) % (data.len().saturating_sub(chunk_size));
let _chunk = &data[offset..offset + chunk_size];
std::hint::black_box(_chunk);
if i % 100 == 0 {
monitor.sample();
}
}
monitor.sample();
Ok(num_operations)
}
async fn streaming_access(&self, data: &[u8], monitor: &mut MemoryMonitor) -> Result<usize> {
let stream_buffer_size = 8 * 1024; let mut operations = 0;
let mut position = 0;
let mut buffer = vec![0u8; stream_buffer_size];
while position < data.len() {
let copy_size = stream_buffer_size.min(data.len() - position);
buffer[..copy_size].copy_from_slice(&data[position..position + copy_size]);
std::hint::black_box(&buffer[..copy_size]);
operations += 1;
position += copy_size;
if operations % 50 == 0 {
monitor.sample();
}
}
monitor.sample();
Ok(operations)
}
async fn chunked_access(&self, data: &[u8], monitor: &mut MemoryMonitor) -> Result<usize> {
let chunk_size = 1024 * 1024; let mut operations = 0;
let mut position = 0;
while position < data.len() {
let end = (position + chunk_size).min(data.len());
let chunk = data[position..end].to_vec();
std::hint::black_box(&chunk);
tokio::time::sleep(Duration::from_millis(1)).await;
operations += 1;
position = end;
monitor.sample();
drop(chunk);
}
monitor.sample();
Ok(operations)
}
async fn analyze_memory_efficiency(
&self,
results: &[BenchmarkResult],
targets: &PRDTargets,
) -> Result<BenchmarkResult> {
let benchmark_name = "Memory_Efficiency_Analysis".to_string();
let memory_results: Vec<_> = results
.iter()
.filter(|r| r.benchmark_name.starts_with("Memory_"))
.collect();
if memory_results.is_empty() {
return Err(crate::Error::invalid_operation(
"No memory results to analyze".to_string(),
));
}
let avg_efficiency: f64 = memory_results
.iter()
.map(|r| r.memory_efficiency)
.sum::<f64>()
/ memory_results.len() as f64;
let max_memory_usage: f64 = memory_results
.iter()
.map(|r| r.memory_usage_mb)
.fold(0.0, f64::max);
let memory_scaling_factor = self.calculate_memory_scaling(&memory_results);
let meets_target = max_memory_usage <= targets.memory_limit_mb && avg_efficiency >= 1.0;
let target_comparison = if meets_target {
format!(
"✅ Memory efficiency acceptable (max: {:.1} MB, avg efficiency: {:.2})",
max_memory_usage, avg_efficiency
)
} else {
format!(
"❌ Memory efficiency needs improvement (max: {:.1} MB > {}, efficiency: {:.2})",
max_memory_usage, targets.memory_limit_mb, avg_efficiency
)
};
let mut details = HashMap::new();
details.insert("average_efficiency".to_string(), avg_efficiency);
details.insert("max_memory_usage_mb".to_string(), max_memory_usage);
details.insert("memory_scaling_factor".to_string(), memory_scaling_factor);
details.insert("tests_analyzed".to_string(), memory_results.len() as f64);
println!(
" 📊 Memory Efficiency Analysis: {:.2} avg efficiency, {:.1} MB max usage",
avg_efficiency, max_memory_usage
);
Ok(BenchmarkResult {
benchmark_name,
file_size_mb: 0.0, duration: Duration::from_secs(0),
throughput_mb_per_sec: 0.0,
memory_usage_mb: max_memory_usage,
memory_efficiency: avg_efficiency,
compression_ratio: None,
operations_per_second: 0.0,
meets_prd_target: meets_target,
target_comparison,
details,
})
}
fn calculate_memory_scaling(&self, results: &[&BenchmarkResult]) -> f64 {
if results.len() < 2 {
return 1.0;
}
let mut sorted_results = results.to_vec();
sorted_results.sort_by(|a, b| {
a.file_size_mb
.partial_cmp(&b.file_size_mb)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut scaling_factors = Vec::new();
for i in 1..sorted_results.len() {
let prev = &sorted_results[i - 1];
let curr = &sorted_results[i];
if prev.file_size_mb > 0.0 {
let size_ratio = curr.file_size_mb / prev.file_size_mb;
let memory_ratio = curr.memory_usage_mb / prev.memory_usage_mb.max(0.1);
let scaling_factor = memory_ratio / size_ratio;
scaling_factors.push(scaling_factor);
}
}
if scaling_factors.is_empty() {
1.0
} else {
scaling_factors.iter().sum::<f64>() / scaling_factors.len() as f64
}
}
async fn detect_memory_leaks(&self, targets: &PRDTargets) -> Result<BenchmarkResult> {
let benchmark_name = "Memory_Leak_Detection".to_string();
println!(" 🔍 Running memory leak detection test...");
let test_size_mb = 10.0; let iterations = 20; let mut memory_samples = Vec::new();
let mut monitor = MemoryMonitor::new();
let timer = PrecisionTimer::start();
for i in 0..iterations {
let test_data = generate_test_data(test_size_mb);
let _processed = self.simulate_processing(&test_data).await?;
monitor.sample();
memory_samples.push(monitor.peak_usage_mb());
drop(test_data);
if i % 5 == 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
if i % 5 == 0 {
println!(
" Iteration {}/{}: {:.1} MB",
i + 1,
iterations,
memory_samples[i]
);
}
}
let duration = timer.elapsed_duration();
let memory_growth = self.analyze_memory_growth(&memory_samples);
let final_memory = memory_samples.last().copied().unwrap_or(0.0);
let initial_memory = memory_samples.first().copied().unwrap_or(0.0);
let has_potential_leak = memory_growth > 0.1; let exceeds_memory_limit = final_memory > targets.memory_limit_mb;
let meets_target = !has_potential_leak && !exceeds_memory_limit;
let target_comparison = if meets_target {
format!(
"✅ No memory leaks detected ({:.2} MB/iter growth)",
memory_growth
)
} else if has_potential_leak {
format!(
"⚠️ Potential memory leak detected ({:.2} MB/iter growth)",
memory_growth
)
} else {
format!(
"❌ Memory usage too high ({:.1} MB > {})",
final_memory, targets.memory_limit_mb
)
};
let mut details = HashMap::new();
details.insert("memory_growth_mb_per_iter".to_string(), memory_growth);
details.insert("initial_memory_mb".to_string(), initial_memory);
details.insert("final_memory_mb".to_string(), final_memory);
details.insert("iterations".to_string(), iterations as f64);
details.insert(
"potential_leak".to_string(),
if has_potential_leak { 1.0 } else { 0.0 },
);
println!(
" 🔍 Leak Detection: {:.2} MB/iter growth, final: {:.1} MB",
memory_growth, final_memory
);
Ok(BenchmarkResult {
benchmark_name,
file_size_mb: test_size_mb * iterations as f64,
duration,
throughput_mb_per_sec: (test_size_mb * iterations as f64) / duration.as_secs_f64(),
memory_usage_mb: final_memory,
memory_efficiency: (test_size_mb * iterations as f64) / final_memory.max(0.1),
compression_ratio: None,
operations_per_second: iterations as f64 / duration.as_secs_f64(),
meets_prd_target: meets_target,
target_comparison,
details,
})
}
fn analyze_memory_growth(&self, samples: &[f64]) -> f64 {
if samples.len() < 2 {
return 0.0;
}
let n = samples.len() as f64;
let sum_x: f64 = (0..samples.len()).map(|i| i as f64).sum();
let sum_y: f64 = samples.iter().sum();
let sum_xy_product: f64 = samples.iter().enumerate().map(|(i, &y)| i as f64 * y).sum();
let sum_x2: f64 = (0..samples.len()).map(|i| (i as f64).powi(2)).sum();
(n * sum_xy_product - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2))
}
async fn simulate_processing(&self, data: &[u8]) -> Result<Vec<u8>> {
let mut result = Vec::new();
for chunk in data.chunks(1024) {
let mut buffer = chunk.to_vec();
buffer.reverse(); result.extend_from_slice(&buffer);
}
let _string_data = String::from_utf8_lossy(data);
let mut map = HashMap::new();
for (i, &byte) in data.iter().enumerate().take(100) {
map.insert(i, byte);
}
let nested: Vec<Vec<u8>> = data.chunks(1024).map(|chunk| chunk.to_vec()).collect();
let _flattened: Vec<u8> = nested.into_iter().flatten().collect();
Ok(result)
}
fn has_sufficient_memory(&self, required_mb: f64) -> bool {
let available_gb = self.get_available_memory_gb();
let required_gigabytes = required_mb / 1024.0;
available_gb >= required_gigabytes * 2.0
}
fn get_available_memory_gb(&self) -> f64 {
#[cfg(target_os = "macos")]
{
self.get_memory_macos()
}
#[cfg(target_os = "linux")]
{
self.get_memory_linux()
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
8.0 }
}
#[cfg(target_os = "macos")]
fn get_memory_macos(&self) -> f64 {
8.0 }
#[cfg(target_os = "linux")]
fn get_memory_linux(&self) -> f64 {
use std::fs;
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
for line in meminfo.lines() {
if line.starts_with("MemAvailable:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(kb) = parts[1].parse::<f64>() {
return kb / 1024.0 / 1024.0; }
}
}
}
}
8.0 }
}
#[derive(Debug, Clone, Copy)]
enum MemoryAccessPattern {
Sequential = 0,
Random = 1,
Streaming = 2,
Chunked = 3,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_benchmarks_creation() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = MemoryBenchmarks::new(platform, &config).await;
assert!(benchmarks.is_ok());
}
#[tokio::test]
async fn test_sequential_access() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = MemoryBenchmarks::new(platform, &config).await.unwrap();
let test_data = generate_test_data(1.0); let mut monitor = MemoryMonitor::new();
let operations = benchmarks.sequential_access(&test_data, &mut monitor).await;
assert!(operations.is_ok());
assert!(operations.unwrap() > 0);
}
#[tokio::test]
async fn test_memory_growth_analysis() {
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let benchmarks = MemoryBenchmarks::new(platform, &config).await.unwrap();
let samples = vec![10.0, 10.5, 11.0, 11.5, 12.0];
let growth = benchmarks.analyze_memory_growth(&samples);
assert!(growth > 0.0);
let stable_samples = vec![10.0, 10.1, 9.9, 10.0, 10.1];
let stable_growth = benchmarks.analyze_memory_growth(&stable_samples);
assert!(stable_growth.abs() < 0.1); }
#[test]
fn test_memory_access_pattern_enum() {
assert_eq!(MemoryAccessPattern::Sequential as u8, 0);
assert_eq!(MemoryAccessPattern::Random as u8, 1);
assert_eq!(MemoryAccessPattern::Streaming as u8, 2);
assert_eq!(MemoryAccessPattern::Chunked as u8, 3);
}
}