#![allow(dead_code, unused_imports, unused_variables, clippy::useless_vec)]
use crate::{
backends::{Backend, BackendConfig, BackendHandle, BackendType, InferenceParams},
models::ModelInfo,
performance_baseline::{PerformanceBaseline, PerformanceTarget},
};
use anyhow::Result;
use clap::{Args, Subcommand};
use std::{
collections::HashMap,
path::PathBuf,
time::{Duration, Instant},
};
use sysinfo::{CpuExt, System, SystemExt};
const VALID_BENCHMARK_TYPES: &[&str] = &["inference", "memory", "concurrent", "cache", "all"];
const VALID_MONITOR_FORMATS: &[&str] = &["json", "csv", "console"];
const VALID_REPORT_FORMATS: &[&str] = &["html", "json", "markdown", "csv"];
#[derive(Debug, Clone)]
struct MemoryUsage {
heap_used: u64,
heap_total: u64,
rss: u64, vms: u64, }
#[derive(Debug, Clone)]
struct MemorySnapshot {
timestamp: Duration,
heap_used: u64,
heap_total: u64,
rss: u64,
vms: u64,
inference_id: u32,
model_loaded: bool,
}
#[derive(Debug)]
struct MemoryAnalysis {
baseline_rss: u64,
peak_rss: u64,
final_rss: u64,
total_growth: i64,
average_growth_per_cycle: f64,
memory_leak_detected: bool,
gc_efficiency: f64,
model_load_overhead: u64,
}
#[derive(Debug, Args)]
pub struct PerformanceBenchmarkArgs {
#[command(subcommand)]
pub command: PerformanceBenchmarkCommand,
}
#[derive(Debug, Subcommand)]
pub enum PerformanceBenchmarkCommand {
Baseline {
#[arg(short, long, default_value = "performance_baseline")]
output: PathBuf,
#[arg(short, long)]
targets: Option<PathBuf>,
#[arg(short, long, default_value = "gguf,onnx")]
backends: String,
#[arg(short, long, default_value = "30")]
duration: u64,
},
Benchmark {
#[arg(short, long, default_value = "all")]
bench_type: String,
#[arg(short, long, default_value = "benchmark_results")]
output: PathBuf,
#[arg(short, long)]
model: Option<String>,
#[arg(short, long, default_value = "100")]
iterations: u32,
#[arg(short, long)]
profile: bool,
},
Compare {
#[arg(short, long)]
current: PathBuf,
#[arg(short, long)]
baseline: PathBuf,
#[arg(short, long, default_value = "10.0")]
threshold: f64,
#[arg(short, long)]
report: bool,
},
Monitor {
#[arg(short, long, default_value = "60")]
duration: u64,
#[arg(short, long, default_value = "5")]
interval: u64,
#[arg(short, long, default_value = "console")]
format: String,
#[arg(short, long)]
output: Option<PathBuf>,
},
Stress {
#[arg(short, long, default_value = "10")]
clients: u32,
#[arg(short, long, default_value = "300")]
duration: u64,
#[arg(short, long)]
model: Option<String>,
#[arg(short, long, default_value = "1.0")]
rate: f64,
},
MemoryProfile {
#[arg(short, long)]
model: Option<String>,
#[arg(short, long, default_value = "50")]
cycles: u32,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(short, long)]
track: bool,
},
}
fn validate_benchmark_args(args: &PerformanceBenchmarkArgs) -> Result<()> {
match &args.command {
PerformanceBenchmarkCommand::Baseline {
output: _,
targets: _,
backends,
duration,
} => {
if backends.is_empty() {
anyhow::bail!("Backends cannot be empty");
}
if *duration == 0 {
anyhow::bail!("Duration must be greater than 0");
}
Ok(())
}
PerformanceBenchmarkCommand::Benchmark {
bench_type,
output: _,
model: _,
iterations,
profile: _,
} => {
if !VALID_BENCHMARK_TYPES.contains(&bench_type.as_str()) {
anyhow::bail!(
"Benchmark type must be one of: {}",
VALID_BENCHMARK_TYPES.join(", ")
);
}
if *iterations == 0 {
anyhow::bail!("Iterations must be greater than 0");
}
Ok(())
}
PerformanceBenchmarkCommand::Compare {
current,
baseline,
threshold,
report: _,
} => {
if !current.exists() {
anyhow::bail!("Current results file does not exist: {}", current.display());
}
if !baseline.exists() {
anyhow::bail!(
"Baseline results file does not exist: {}",
baseline.display()
);
}
if *threshold < 0.0 {
anyhow::bail!("Threshold must be non-negative");
}
Ok(())
}
PerformanceBenchmarkCommand::Monitor {
duration: _,
interval,
format,
output: _,
} => {
if *interval == 0 {
anyhow::bail!("Interval must be greater than 0");
}
if !VALID_MONITOR_FORMATS.contains(&format.as_str()) {
anyhow::bail!(
"Monitor format must be one of: {}",
VALID_MONITOR_FORMATS.join(", ")
);
}
Ok(())
}
PerformanceBenchmarkCommand::Stress {
clients,
duration,
model: _,
rate,
} => {
if *clients == 0 {
anyhow::bail!("Number of clients must be greater than 0");
}
if *duration == 0 {
anyhow::bail!("Duration must be greater than 0");
}
if *rate <= 0.0 {
anyhow::bail!("Request rate must be positive");
}
Ok(())
}
PerformanceBenchmarkCommand::MemoryProfile {
model: _,
cycles,
output: _,
track: _,
} => {
if *cycles == 0 {
anyhow::bail!("Number of cycles must be greater than 0");
}
Ok(())
}
}
}
pub async fn execute_performance_benchmark(args: PerformanceBenchmarkArgs) -> Result<()> {
validate_benchmark_args(&args)?;
match args.command {
PerformanceBenchmarkCommand::Baseline {
output,
targets,
backends,
duration,
} => establish_baseline(output, targets, backends, duration).await,
PerformanceBenchmarkCommand::Benchmark {
bench_type,
output,
model,
iterations,
profile,
} => run_benchmark(bench_type, output, model, iterations, profile).await,
PerformanceBenchmarkCommand::Compare {
current,
baseline,
threshold,
report,
} => compare_performance(current, baseline, threshold, report).await,
PerformanceBenchmarkCommand::Monitor {
duration,
interval,
format,
output,
} => monitor_performance(duration, interval, format, output).await,
PerformanceBenchmarkCommand::Stress {
clients,
duration,
model,
rate,
} => stress_test(clients, duration, model, rate).await,
PerformanceBenchmarkCommand::MemoryProfile {
model,
cycles,
output,
track,
} => memory_profile(model, cycles, output, track).await,
}
}
async fn establish_baseline(
output_dir: PathBuf,
targets_file: Option<PathBuf>,
backends_str: String,
duration: u64,
) -> Result<()> {
tracing::info!("Establishing performance baseline");
let mut baseline = PerformanceBaseline::new(output_dir);
baseline.initialize().await?;
if let Some(targets_path) = targets_file {
let targets_content = tokio::fs::read_to_string(targets_path).await?;
let custom_targets: PerformanceTarget = serde_json::from_str(&targets_content)?;
baseline.set_targets(custom_targets);
}
let backend_types: Vec<BackendType> = backends_str
.split(',')
.filter_map(|s| match s.trim().to_lowercase().as_str() {
#[cfg(feature = "gguf")]
"gguf" => Some(BackendType::Gguf),
#[cfg(feature = "onnx")]
"onnx" => Some(BackendType::Onnx),
_ => {
tracing::warn!("Unknown backend type: {}", s);
None
}
})
.collect();
if backend_types.is_empty() {
anyhow::bail!("No valid backend types specified");
}
baseline.run_comprehensive_baseline().await?;
tracing::info!("Performance baseline established successfully");
Ok(())
}
async fn run_benchmark(
bench_type: String,
output_dir: PathBuf,
model_name: Option<String>,
iterations: u32,
enable_profiling: bool,
) -> Result<()> {
tracing::info!(
"Running {} benchmark with {} iterations",
bench_type,
iterations
);
tokio::fs::create_dir_all(&output_dir).await?;
match bench_type.as_str() {
"inference" => run_inference_benchmark(output_dir, model_name, iterations).await,
"memory" => run_memory_benchmark(output_dir, model_name, iterations).await,
"concurrent" => run_concurrent_benchmark(output_dir, model_name, iterations).await,
"cache" => {
tracing::warn!("Cache benchmark temporarily disabled");
Ok(())
}
"all" => {
run_inference_benchmark(output_dir.clone(), model_name.clone(), iterations).await?;
run_memory_benchmark(output_dir.clone(), model_name.clone(), iterations).await?;
run_concurrent_benchmark(output_dir, model_name, iterations).await
}
_ => anyhow::bail!("Unknown benchmark type: {}", bench_type),
}
}
#[cfg(feature = "gguf")]
async fn run_inference_benchmark(
output_dir: PathBuf,
model_name: Option<String>,
iterations: u32,
) -> Result<()> {
tracing::info!("Running inference benchmark");
let temp_dir = tempfile::tempdir()?;
let model_path = temp_dir.path().join("benchmark.gguf");
tokio::fs::write(&model_path, b"GGUF\x00\x00\x00\x01test model").await?;
let file_path = model_path.clone();
let model = ModelInfo {
name: "benchmark.gguf".to_string(),
path: model_path.clone(),
file_path,
size: 1024 * 1024, size_bytes: 1024 * 1024,
modified: chrono::Utc::now(),
backend_type: "gguf".to_string(),
format: "gguf".to_string(),
checksum: None,
metadata: HashMap::new(),
};
let backend_config = BackendConfig::default();
let mut backend = Backend::new(BackendType::Gguf, &backend_config)?;
backend.load_model(&model).await?;
let backend_handle = BackendHandle::new(backend);
let inference_params = InferenceParams {
max_tokens: 50,
temperature: 0.7,
top_k: 40,
top_p: 0.9,
stream: false,
stop_sequences: vec![],
seed: None,
};
let test_prompts = vec![
"Hello, world!",
"Explain artificial intelligence",
"Write a story",
"Solve this problem",
"What is the meaning of life?",
];
let mut latencies = Vec::new();
let mut successful_requests = 0u32;
let mut failed_requests = 0u32;
let start_time = Instant::now();
for i in 0..iterations {
let prompt = &test_prompts[i as usize % test_prompts.len()];
let inference_start = Instant::now();
match backend_handle.infer(prompt, &inference_params).await {
Ok(_) => {
latencies.push(inference_start.elapsed());
successful_requests += 1;
}
Err(e) => {
tracing::warn!("Inference failed: {}", e);
failed_requests += 1;
}
}
if i % 10 == 0 {
tracing::info!("Completed {} / {} iterations", i + 1, iterations);
}
}
let total_duration = start_time.elapsed();
latencies.sort();
let latency_ms: Vec<f64> = latencies.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
let avg_latency = latency_ms.iter().sum::<f64>() / latency_ms.len() as f64;
let p50_latency = percentile(&latency_ms, 50.0);
let p90_latency = percentile(&latency_ms, 90.0);
let p99_latency = percentile(&latency_ms, 99.0);
let throughput = successful_requests as f64 / total_duration.as_secs_f64();
let results = serde_json::json!({
"benchmark_type": "inference",
"timestamp": chrono::Utc::now().to_rfc3339(),
"iterations": iterations,
"successful_requests": successful_requests,
"failed_requests": failed_requests,
"total_duration_sec": total_duration.as_secs_f64(),
"avg_latency_ms": avg_latency,
"p50_latency_ms": p50_latency,
"p90_latency_ms": p90_latency,
"p99_latency_ms": p99_latency,
"throughput_rps": throughput,
"error_rate": failed_requests as f64 / (successful_requests + failed_requests) as f64,
});
let results_file = output_dir.join("inference_benchmark.json");
tokio::fs::write(results_file, serde_json::to_string_pretty(&results)?).await?;
tracing::info!("Inference benchmark completed:");
tracing::info!(" Average latency: {:.2} ms", avg_latency);
tracing::info!(" P99 latency: {:.2} ms", p99_latency);
tracing::info!(" Throughput: {:.1} RPS", throughput);
tracing::info!(
" Error rate: {:.2}%",
(failed_requests as f64 / (successful_requests + failed_requests) as f64) * 100.0
);
Ok(())
}
#[cfg(not(feature = "gguf"))]
async fn run_inference_benchmark(
_output_dir: PathBuf,
_model_name: Option<String>,
_iterations: u32,
) -> Result<()> {
Err(anyhow::anyhow!(
"GGUF backend not available. Enable 'gguf' feature for inference benchmarks."
))
}
#[cfg(feature = "gguf")]
async fn run_memory_benchmark(
output_dir: PathBuf,
model_name: Option<String>,
iterations: u32,
) -> Result<()> {
tracing::info!("Running memory benchmark");
let mut system = System::new_all();
system.refresh_all();
let initial_memory = system.used_memory();
let mut peak_memory = initial_memory;
let mut memory_samples = Vec::new();
let model_sizes = vec![1, 5, 10, 25]; let mut results = Vec::new();
for size_mb in model_sizes {
tracing::info!("Testing with {}MB model", size_mb);
let temp_dir = tempfile::tempdir()?;
let model_path = temp_dir.path().join(format!("model_{}mb.gguf", size_mb));
let mut content = b"GGUF\x00\x00\x00\x01".to_vec();
content.extend(vec![0u8; size_mb * 1024 * 1024 - content.len()]);
tokio::fs::write(&model_path, content).await?;
let model_size_bytes = (size_mb * 1024 * 1024) as u64;
let model_path_clone = model_path.clone();
let model = ModelInfo {
name: format!("model_{}mb.gguf", size_mb),
path: model_path_clone.clone(),
file_path: model_path_clone,
size: model_size_bytes,
size_bytes: model_size_bytes,
modified: chrono::Utc::now(),
backend_type: "gguf".to_string(),
format: "gguf".to_string(),
checksum: None,
metadata: HashMap::new(),
};
let backend_config = BackendConfig::default();
for cycle in 0..10 {
system.refresh_memory();
let cycle_start_memory = system.used_memory();
let mut backend = Backend::new(BackendType::Gguf, &backend_config)?;
backend.load_model(&model).await?;
let backend_handle = BackendHandle::new(backend);
system.refresh_memory();
let loaded_memory = system.used_memory();
peak_memory = peak_memory.max(loaded_memory);
let inference_params = InferenceParams {
max_tokens: 20,
temperature: 0.7,
top_k: 40,
top_p: 0.9,
stream: false,
stop_sequences: vec![],
seed: None,
};
for _ in 0..5 {
let _ = backend_handle.infer("Test prompt", &inference_params).await;
system.refresh_memory();
peak_memory = peak_memory.max(system.used_memory());
}
backend_handle.unload_model().await?;
system.refresh_memory();
let cycle_end_memory = system.used_memory();
memory_samples.push(serde_json::json!({
"model_size_mb": size_mb,
"cycle": cycle,
"start_memory_mb": cycle_start_memory / 1024 / 1024,
"loaded_memory_mb": loaded_memory / 1024 / 1024,
"end_memory_mb": cycle_end_memory / 1024 / 1024,
"memory_delta_mb": (loaded_memory as i64 - cycle_start_memory as i64) / 1024 / 1024,
}));
if cycle % 3 == 0 {
tracing::info!(" Completed cycle {} / 10", cycle + 1);
}
}
let avg_delta: f64 = memory_samples
.iter()
.filter(|s| s["model_size_mb"] == size_mb)
.map(|s| s["memory_delta_mb"].as_i64().unwrap_or(0) as f64)
.sum::<f64>()
/ 10.0;
results.push(serde_json::json!({
"model_size_mb": size_mb,
"avg_memory_delta_mb": avg_delta,
"memory_efficiency": size_mb as f64 / avg_delta,
}));
}
let benchmark_results = serde_json::json!({
"benchmark_type": "memory",
"timestamp": chrono::Utc::now().to_rfc3339(),
"initial_memory_mb": initial_memory / 1024 / 1024,
"peak_memory_mb": peak_memory / 1024 / 1024,
"total_peak_delta_mb": (peak_memory - initial_memory) / 1024 / 1024,
"model_results": results,
"detailed_samples": memory_samples,
});
let results_file = output_dir.join("memory_benchmark.json");
tokio::fs::write(
results_file,
serde_json::to_string_pretty(&benchmark_results)?,
)
.await?;
tracing::info!("Memory benchmark completed:");
tracing::info!(" Initial memory: {} MB", initial_memory / 1024 / 1024);
tracing::info!(" Peak memory: {} MB", peak_memory / 1024 / 1024);
tracing::info!(
" Peak delta: {} MB",
(peak_memory - initial_memory) / 1024 / 1024
);
Ok(())
}
#[cfg(not(feature = "gguf"))]
async fn run_memory_benchmark(
_output_dir: PathBuf,
_model_name: Option<String>,
_iterations: u32,
) -> Result<()> {
Err(anyhow::anyhow!(
"GGUF backend not available. Enable 'gguf' feature for memory benchmarks."
))
}
#[cfg(feature = "gguf")]
async fn run_concurrent_benchmark(
output_dir: PathBuf,
model_name: Option<String>,
iterations: u32,
) -> Result<()> {
tracing::info!("Running concurrent benchmark");
let temp_dir = tempfile::tempdir()?;
let model_path = temp_dir.path().join("concurrent_test.gguf");
tokio::fs::write(&model_path, b"GGUF\x00\x00\x00\x01test model").await?;
let model_path_clone = model_path.clone();
let model = ModelInfo {
name: "concurrent_test.gguf".to_string(),
path: model_path_clone.clone(),
file_path: model_path_clone,
size: 1024 * 1024,
size_bytes: 1024 * 1024,
modified: chrono::Utc::now(),
backend_type: "gguf".to_string(),
format: "gguf".to_string(),
checksum: None,
metadata: HashMap::new(),
};
let concurrency_levels = vec![1, 2, 4, 8, 16, 32];
let mut results = Vec::new();
for concurrency in concurrency_levels {
tracing::info!("Testing concurrency level: {}", concurrency);
let backend_config = BackendConfig::default();
let mut backend = Backend::new(BackendType::Gguf, &backend_config)?;
backend.load_model(&model).await?;
let backend_handle = BackendHandle::new(backend);
let inference_params = InferenceParams {
max_tokens: 30,
temperature: 0.7,
top_k: 40,
top_p: 0.9,
stream: false,
stop_sequences: vec![],
seed: None,
};
let start_time = Instant::now();
let mut successful_requests = 0u32;
let mut failed_requests = 0u32;
let handles: Vec<_> = (0..concurrency)
.map(|i| {
let prompt = format!("Concurrent test request {}", i);
let params = inference_params.clone();
let backend_clone = backend_handle.clone();
tokio::spawn(async move { backend_clone.infer(&prompt, ¶ms).await })
})
.collect();
for handle in handles {
match handle.await {
Ok(Ok(_)) => successful_requests += 1,
_ => failed_requests += 1,
}
}
let duration = start_time.elapsed();
let throughput = successful_requests as f64 / duration.as_secs_f64();
results.push(serde_json::json!({
"concurrency": concurrency,
"successful_requests": successful_requests,
"failed_requests": failed_requests,
"duration_sec": duration.as_secs_f64(),
"throughput_rps": throughput,
"avg_latency_ms": (duration.as_secs_f64() * 1000.0) / successful_requests as f64,
}));
backend_handle.unload_model().await?;
}
let benchmark_results = serde_json::json!({
"benchmark_type": "concurrent",
"timestamp": chrono::Utc::now().to_rfc3339(),
"results": results,
});
let results_file = output_dir.join("concurrent_benchmark.json");
tokio::fs::write(
results_file,
serde_json::to_string_pretty(&benchmark_results)?,
)
.await?;
tracing::info!("Concurrent benchmark completed");
for result in &results {
tracing::info!(
" Concurrency {}: {:.1} RPS",
result["concurrency"],
result["throughput_rps"]
);
}
Ok(())
}
#[cfg(not(feature = "gguf"))]
async fn run_concurrent_benchmark(
_output_dir: PathBuf,
_model_name: Option<String>,
_iterations: u32,
) -> Result<()> {
Err(anyhow::anyhow!(
"GGUF backend not available. Enable 'gguf' feature for concurrent benchmarks."
))
}
async fn compare_performance(
current_file: PathBuf,
baseline_file: PathBuf,
threshold: f64,
generate_report: bool,
) -> Result<()> {
tracing::info!("Comparing performance with baseline");
let current_content = tokio::fs::read_to_string(¤t_file).await?;
let baseline_content = tokio::fs::read_to_string(&baseline_file).await?;
let current_data: serde_json::Value = serde_json::from_str(¤t_content)?;
let baseline_data: serde_json::Value = serde_json::from_str(&baseline_content)?;
let mut regressions = Vec::new();
let mut improvements = Vec::new();
if let (Some(current_latency), Some(baseline_latency)) = (
current_data.get("avg_latency_ms").and_then(|v| v.as_f64()),
baseline_data.get("avg_latency_ms").and_then(|v| v.as_f64()),
) {
let change_percent = ((current_latency - baseline_latency) / baseline_latency) * 100.0;
if change_percent > threshold {
regressions.push(format!(
"Latency regression: {:.2}% increase ({:.2}ms -> {:.2}ms)",
change_percent, baseline_latency, current_latency
));
} else if change_percent < -5.0 {
improvements.push(format!(
"Latency improvement: {:.2}% decrease ({:.2}ms -> {:.2}ms)",
-change_percent, baseline_latency, current_latency
));
}
}
if let (Some(current_throughput), Some(baseline_throughput)) = (
current_data.get("throughput_rps").and_then(|v| v.as_f64()),
baseline_data.get("throughput_rps").and_then(|v| v.as_f64()),
) {
let change_percent =
((current_throughput - baseline_throughput) / baseline_throughput) * 100.0;
if change_percent < -threshold {
regressions.push(format!(
"Throughput regression: {:.2}% decrease ({:.1} -> {:.1} RPS)",
-change_percent, baseline_throughput, current_throughput
));
} else if change_percent > 5.0 {
improvements.push(format!(
"Throughput improvement: {:.2}% increase ({:.1} -> {:.1} RPS)",
change_percent, baseline_throughput, current_throughput
));
}
}
if !regressions.is_empty() {
tracing::error!("Performance regressions detected:");
for regression in ®ressions {
tracing::error!(" {}", regression);
}
}
if !improvements.is_empty() {
tracing::info!("Performance improvements detected:");
for improvement in &improvements {
tracing::info!(" {}", improvement);
}
}
if regressions.is_empty() && improvements.is_empty() {
tracing::info!("No significant performance changes detected");
}
if !regressions.is_empty() {
anyhow::bail!("Performance regressions detected");
}
Ok(())
}
async fn monitor_performance(
duration: u64,
interval: u64,
format: String,
output_file: Option<PathBuf>,
) -> Result<()> {
tracing::info!("Starting performance monitoring for {}s", duration);
let mut system = System::new_all();
let end_time = if duration == 0 {
None
} else {
Some(Instant::now() + Duration::from_secs(duration))
};
let mut samples = Vec::new();
loop {
if let Some(end) = end_time {
if Instant::now() >= end {
break;
}
}
system.refresh_all();
let sample = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"cpu_usage": system.global_cpu_info().cpu_usage(),
"memory_used_mb": system.used_memory() / 1024 / 1024,
"memory_total_mb": system.total_memory() / 1024 / 1024,
"memory_usage_percent": (system.used_memory() as f64 / system.total_memory() as f64) * 100.0,
"processes": system.processes().len(),
});
match format.as_str() {
"console" => {
println!(
"{}: CPU: {:.1}%, Memory: {} MB ({:.1}%)",
chrono::Utc::now().format("%H:%M:%S"),
sample["cpu_usage"],
sample["memory_used_mb"],
sample["memory_usage_percent"]
);
}
"json" => {
println!("{}", serde_json::to_string(&sample)?);
}
_ => {
samples.push(sample);
}
}
tokio::time::sleep(Duration::from_secs(interval)).await;
}
if let Some(output_path) = output_file {
let output_data = serde_json::json!({
"monitoring_session": {
"start_time": chrono::Utc::now() - chrono::Duration::seconds(duration as i64),
"duration_sec": duration,
"interval_sec": interval,
"samples": samples,
}
});
tokio::fs::write(output_path, serde_json::to_string_pretty(&output_data)?).await?;
}
Ok(())
}
async fn stress_test(
clients: u32,
duration: u64,
model_name: Option<String>,
rate: f64,
) -> Result<()> {
tracing::info!(
"Starting stress test with {} clients for {}s at {}req/s per client",
clients,
duration,
rate
);
let model = model_name.unwrap_or_else(|| "default".to_string());
let start_time = std::time::Instant::now();
let test_duration = std::time::Duration::from_secs(duration);
let mut total_requests = 0u64;
let mut successful_requests = 0u64;
let mut failed_requests = 0u64;
let mut response_times = Vec::new();
let mut peak_memory = 0u64;
let mut errors = Vec::new();
println!("🚀 Starting stress test...");
println!(
"Clients: {}, Duration: {}s, Rate: {:.1} req/s/client",
clients, duration, rate
);
println!("{}", "=".repeat(60));
let mut client_handles = Vec::new();
let request_interval = std::time::Duration::from_secs_f64(1.0 / rate);
for client_id in 0..clients {
let model_clone = model.clone();
let client_handle = tokio::spawn(async move {
let mut client_requests = 0u64;
let mut client_successes = 0u64;
let mut client_failures = 0u64;
let mut client_response_times = Vec::new();
let mut last_request_time = std::time::Instant::now();
loop {
let elapsed = start_time.elapsed();
if elapsed >= test_duration {
break;
}
if last_request_time.elapsed() < request_interval {
tokio::time::sleep(request_interval - last_request_time.elapsed()).await;
}
let request_start = std::time::Instant::now();
let test_input = format!(
"Stress test input from client {} request {}",
client_id, client_requests
);
match simulate_inference_request(&model_clone, &test_input).await {
Ok(response_time) => {
client_successes += 1;
client_response_times.push(response_time);
}
Err(e) => {
client_failures += 1;
tracing::debug!("Client {} request failed: {}", client_id, e);
}
}
client_requests += 1;
last_request_time = std::time::Instant::now();
if client_requests.is_multiple_of(10) {
let memory_usage = get_current_memory_usage();
if memory_usage.rss > peak_memory {
peak_memory = memory_usage.rss;
}
}
}
(
client_id,
client_requests,
client_successes,
client_failures,
client_response_times,
)
});
client_handles.push(client_handle);
}
let progress_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let elapsed = start_time.elapsed();
if elapsed >= test_duration {
break;
}
let progress = (elapsed.as_secs_f64() / test_duration.as_secs_f64()) * 100.0;
println!(
"Progress: {:.1}% ({:.0}s elapsed)",
progress,
elapsed.as_secs_f64()
);
}
});
for handle in client_handles {
match handle.await {
Ok((client_id, requests, successes, failures, response_times_vec)) => {
total_requests += requests;
successful_requests += successes;
failed_requests += failures;
response_times.extend(response_times_vec);
if failures > 0 {
errors.push(format!(
"Client {}: {} failures out of {} requests",
client_id, failures, requests
));
}
}
Err(e) => {
errors.push(format!("Client task failed: {}", e));
}
}
}
progress_handle.abort();
let actual_duration = start_time.elapsed();
let overall_throughput = total_requests as f64 / actual_duration.as_secs_f64();
response_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
let response_times_secs: Vec<f64> = response_times.iter().map(|d| d.as_secs_f64()).collect();
let avg_response_time =
response_times_secs.iter().sum::<f64>() / response_times_secs.len() as f64;
let p50 = percentile(&response_times_secs, 50.0);
let p95 = percentile(&response_times_secs, 95.0);
let p99 = percentile(&response_times_secs, 99.0);
println!("\n{}", "=".repeat(60));
println!("🏁 Stress Test Results");
println!("{}", "=".repeat(60));
println!("Duration: {:.2}s", actual_duration.as_secs_f64());
println!("Total Requests: {}", total_requests);
println!(
"Successful Requests: {} ({:.1}%)",
successful_requests,
(successful_requests as f64 / total_requests as f64) * 100.0
);
println!(
"Failed Requests: {} ({:.1}%)",
failed_requests,
(failed_requests as f64 / total_requests as f64) * 100.0
);
println!("Overall Throughput: {:.2} req/s", overall_throughput);
println!(
"Peak Memory Usage: {:.1} MB",
peak_memory as f64 / 1024.0 / 1024.0
);
println!("\n📊 Response Time Statistics:");
println!("Average: {:.2}ms", avg_response_time);
println!("50th percentile: {:.2}ms", p50);
println!("95th percentile: {:.2}ms", p95);
println!("99th percentile: {:.2}ms", p99);
if !errors.is_empty() {
println!("\n❌ Errors encountered:");
for error in &errors {
println!(" {}", error);
}
}
let success_rate = successful_requests as f64 / total_requests as f64;
if success_rate < 0.95 {
println!("\n⚠️ STRESS TEST FAILED: Success rate below 95%");
return Err(anyhow::anyhow!(
"Stress test failed with {:.1}% success rate",
success_rate * 100.0
));
} else if p99 > 5000.0 {
println!("\n⚠️ STRESS TEST WARNING: 99th percentile latency above 5s");
} else {
println!("\n✅ STRESS TEST PASSED: All metrics within acceptable ranges");
}
Ok(())
}
async fn memory_profile(
model_name: Option<String>,
cycles: u32,
output_file: Option<PathBuf>,
track: bool,
) -> Result<()> {
tracing::info!("Starting memory profiling for {} cycles", cycles);
let mut memory_snapshots = Vec::new();
let start_time = std::time::Instant::now();
let baseline_memory = get_memory_usage().await?;
memory_snapshots.push(MemorySnapshot {
timestamp: start_time.elapsed(),
heap_used: baseline_memory.heap_used,
heap_total: baseline_memory.heap_total,
rss: baseline_memory.rss,
vms: baseline_memory.vms,
inference_id: 0,
model_loaded: false,
});
println!("🧠 Memory Profiling Started");
println!("├─ Cycles: {}", cycles);
println!("├─ Model: {}", model_name.as_deref().unwrap_or("default"));
println!(
"├─ Tracking: {}",
if track { "enabled" } else { "disabled" }
);
println!(
"└─ Baseline Memory: {:.2} MB RSS, {:.2} MB Heap",
baseline_memory.rss as f64 / 1024.0 / 1024.0,
baseline_memory.heap_used as f64 / 1024.0 / 1024.0
);
#[cfg(feature = "gguf")]
let backend_type = BackendType::Gguf;
#[cfg(all(not(feature = "gguf"), feature = "onnx"))]
let backend_type = BackendType::Onnx;
#[cfg(all(
not(feature = "gguf"),
not(feature = "onnx"),
all(feature = "gpu-metal", target_os = "macos")
))]
let backend_type = BackendType::Metal;
#[cfg(not(any(
feature = "gguf",
feature = "onnx",
all(feature = "gpu-metal", target_os = "macos")
)))]
let backend_type = BackendType::None;
let backend_config = BackendConfig::default();
let mut backend = Backend::new(backend_type, &backend_config)?;
if let Some(ref model) = model_name {
let model_info = ModelInfo {
name: model.clone(),
path: std::path::PathBuf::from(format!("models/{}", model)),
file_path: std::path::PathBuf::from(format!("models/{}", model)),
size: 0,
size_bytes: 0,
modified: chrono::Utc::now(),
backend_type: "gguf".to_string(),
format: "gguf".to_string(),
checksum: None,
metadata: std::collections::HashMap::new(),
};
tracing::info!("Loading model for memory profiling");
backend.load_model(&model_info).await?;
let post_load_memory = get_memory_usage().await?;
memory_snapshots.push(MemorySnapshot {
timestamp: start_time.elapsed(),
heap_used: post_load_memory.heap_used,
heap_total: post_load_memory.heap_total,
rss: post_load_memory.rss,
vms: post_load_memory.vms,
inference_id: 0,
model_loaded: true,
});
let model_memory_usage = post_load_memory.rss.saturating_sub(baseline_memory.rss);
println!(
"📊 Model loaded - Memory delta: {:.2} MB",
model_memory_usage as f64 / 1024.0 / 1024.0
);
}
let test_prompt = "What is artificial intelligence?";
let params = InferenceParams {
max_tokens: 50,
temperature: 0.7,
top_k: 40,
top_p: 0.9,
stream: false,
stop_sequences: vec![],
seed: Some(42),
};
for cycle in 1..=cycles {
let cycle_start = start_time.elapsed();
if track {
tokio::task::yield_now().await;
std::thread::sleep(std::time::Duration::from_millis(10));
}
let pre_inference_memory = get_memory_usage().await?;
if backend.is_loaded().await {
match backend.infer(test_prompt, ¶ms).await {
Ok(_response) => {
let post_inference_memory = get_memory_usage().await?;
memory_snapshots.push(MemorySnapshot {
timestamp: cycle_start,
heap_used: post_inference_memory.heap_used,
heap_total: post_inference_memory.heap_total,
rss: post_inference_memory.rss,
vms: post_inference_memory.vms,
inference_id: cycle,
model_loaded: true,
});
let cycle_memory_delta = post_inference_memory
.rss
.saturating_sub(pre_inference_memory.rss);
if track {
println!(
"Cycle {}/{}: {:.2} MB delta, {:.2} MB total RSS",
cycle,
cycles,
cycle_memory_delta as f64 / 1024.0 / 1024.0,
post_inference_memory.rss as f64 / 1024.0 / 1024.0
);
}
if cycle > 1 && cycle_memory_delta > 10 * 1024 * 1024 {
tracing::warn!(
"Potential memory leak detected in cycle {}: {:.2} MB growth",
cycle,
cycle_memory_delta as f64 / 1024.0 / 1024.0
);
}
}
Err(e) => {
tracing::error!("Inference failed in cycle {}: {}", cycle, e);
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let final_memory = get_memory_usage().await?;
memory_snapshots.push(MemorySnapshot {
timestamp: start_time.elapsed(),
heap_used: final_memory.heap_used,
heap_total: final_memory.heap_total,
rss: final_memory.rss,
vms: final_memory.vms,
inference_id: cycles,
model_loaded: backend.is_loaded().await,
});
let analysis = analyze_memory_patterns(&memory_snapshots, baseline_memory);
display_memory_analysis(&analysis);
if let Some(output_path) = output_file {
save_memory_profile(&memory_snapshots, &analysis, &output_path).await?;
println!("📁 Memory profile saved to: {}", output_path.display());
}
if backend.is_loaded().await {
backend.unload_model().await?;
}
Ok(())
}
async fn get_memory_usage() -> Result<MemoryUsage> {
let mut system = System::new_all();
system.refresh_all();
#[cfg(target_os = "linux")]
{
use std::fs;
let status = fs::read_to_string("/proc/self/status")?;
let mut rss = 0u64;
let mut vms = 0u64;
for line in status.lines() {
if line.starts_with("VmRSS:") {
if let Some(value) = line.split_whitespace().nth(1) {
rss = value.parse::<u64>().unwrap_or(0) * 1024; }
} else if line.starts_with("VmSize:") {
if let Some(value) = line.split_whitespace().nth(1) {
vms = value.parse::<u64>().unwrap_or(0) * 1024; }
}
}
Ok(MemoryUsage {
heap_used: rss, heap_total: vms,
rss,
vms,
})
}
#[cfg(not(target_os = "linux"))]
{
use sysinfo::PidExt;
let current_pid = sysinfo::Pid::from_u32(std::process::id());
let process = system.processes().get(¤t_pid);
if let Some(process) = process {
use sysinfo::ProcessExt;
let memory = process.memory(); let virtual_memory = process.virtual_memory();
Ok(MemoryUsage {
heap_used: memory * 1024,
heap_total: virtual_memory * 1024,
rss: memory * 1024,
vms: virtual_memory * 1024,
})
} else {
Ok(MemoryUsage {
heap_used: 0,
heap_total: 0,
rss: 0,
vms: 0,
})
}
}
}
fn analyze_memory_patterns(snapshots: &[MemorySnapshot], baseline: MemoryUsage) -> MemoryAnalysis {
if snapshots.is_empty() {
return MemoryAnalysis {
baseline_rss: baseline.rss,
peak_rss: baseline.rss,
final_rss: baseline.rss,
total_growth: 0,
average_growth_per_cycle: 0.0,
memory_leak_detected: false,
gc_efficiency: 1.0,
model_load_overhead: 0,
};
}
let baseline_rss = baseline.rss;
let peak_rss = snapshots
.iter()
.map(|s| s.rss)
.max()
.unwrap_or(baseline_rss);
let final_rss = snapshots.last().map(|s| s.rss).unwrap_or(baseline_rss);
let total_growth = final_rss as i64 - baseline_rss as i64;
let model_load_overhead = snapshots
.iter()
.find(|s| s.model_loaded && s.inference_id == 0)
.map(|s| s.rss.saturating_sub(baseline_rss))
.unwrap_or(0);
let inference_snapshots: Vec<_> = snapshots.iter().filter(|s| s.inference_id > 0).collect();
let average_growth_per_cycle = if inference_snapshots.len() > 1 {
let first_inference_rss = inference_snapshots
.first()
.map(|s| s.rss)
.unwrap_or(baseline_rss);
let last_inference_rss = inference_snapshots
.last()
.map(|s| s.rss)
.unwrap_or(baseline_rss);
let cycles = inference_snapshots.len() as f64;
(last_inference_rss as f64 - first_inference_rss as f64) / cycles
} else {
0.0
};
let memory_leak_detected = average_growth_per_cycle > 1024.0 * 1024.0;
let mut memory_deltas = Vec::new();
for window in inference_snapshots.windows(2) {
let delta = window[1].rss as i64 - window[0].rss as i64;
memory_deltas.push(delta);
}
let positive_deltas: Vec<_> = memory_deltas.iter().filter(|&&d| d > 0).collect();
let negative_deltas: Vec<_> = memory_deltas.iter().filter(|&&d| d < 0).collect();
let gc_efficiency = if !positive_deltas.is_empty() && !negative_deltas.is_empty() {
let total_growth: i64 = positive_deltas.iter().map(|&&d| d).sum();
let total_reclaim: i64 = negative_deltas.iter().map(|&&d| d.abs()).sum();
if total_growth > 0 {
total_reclaim as f64 / total_growth as f64
} else {
1.0
}
} else {
1.0
};
MemoryAnalysis {
baseline_rss,
peak_rss,
final_rss,
total_growth,
average_growth_per_cycle,
memory_leak_detected,
gc_efficiency: gc_efficiency.min(1.0),
model_load_overhead,
}
}
fn display_memory_analysis(analysis: &MemoryAnalysis) {
println!("\n📊 Memory Analysis Results");
println!("┌─────────────────────────────────────────────────────────────────┐");
println!("│ Memory Report │");
println!("├─────────────────────────────────────────────────────────────────┤");
println!(
"│ Baseline RSS: {:>8.2} MB │",
analysis.baseline_rss as f64 / 1024.0 / 1024.0
);
println!(
"│ Peak RSS: {:>8.2} MB │",
analysis.peak_rss as f64 / 1024.0 / 1024.0
);
println!(
"│ Final RSS: {:>8.2} MB │",
analysis.final_rss as f64 / 1024.0 / 1024.0
);
println!(
"│ Total Growth: {:>8.2} MB │",
analysis.total_growth as f64 / 1024.0 / 1024.0
);
println!(
"│ Model Load Overhead: {:>8.2} MB │",
analysis.model_load_overhead as f64 / 1024.0 / 1024.0
);
println!(
"│ Avg Growth/Cycle: {:>8.2} KB │",
analysis.average_growth_per_cycle / 1024.0
);
println!(
"│ GC Efficiency: {:>8.1}% │",
analysis.gc_efficiency * 100.0
);
println!("├─────────────────────────────────────────────────────────────────┤");
if analysis.memory_leak_detected {
println!("│ ⚠️ MEMORY LEAK DETECTED: Consistent growth > 1MB/cycle │");
} else {
println!("│ ✅ Memory Management: No significant leaks detected │");
}
if analysis.gc_efficiency < 0.5 {
println!("│ ⚠️ GC EFFICIENCY: Low garbage collection efficiency │");
} else {
println!("│ ✅ GC Performance: Good memory reclamation efficiency │");
}
println!("└─────────────────────────────────────────────────────────────────┘");
let peak_overhead = analysis.peak_rss.saturating_sub(analysis.baseline_rss);
if peak_overhead > 500 * 1024 * 1024 {
println!("\n⚠️ HIGH MEMORY USAGE: Peak memory usage exceeds 500MB");
} else if peak_overhead > 100 * 1024 * 1024 {
println!("\n📊 MODERATE MEMORY USAGE: Peak memory usage is moderate");
} else {
println!("\n✅ LOW MEMORY USAGE: Efficient memory utilization");
}
}
async fn save_memory_profile(
snapshots: &[MemorySnapshot],
analysis: &MemoryAnalysis,
output_path: &PathBuf,
) -> Result<()> {
use std::io::Write;
let mut file = std::fs::File::create(output_path)?;
writeln!(file, "# Memory Profile Report")?;
writeln!(
file,
"Generated: {}",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
)?;
writeln!(file)?;
writeln!(file, "## Analysis Summary")?;
writeln!(
file,
"- Baseline RSS: {:.2} MB",
analysis.baseline_rss as f64 / 1024.0 / 1024.0
)?;
writeln!(
file,
"- Peak RSS: {:.2} MB",
analysis.peak_rss as f64 / 1024.0 / 1024.0
)?;
writeln!(
file,
"- Final RSS: {:.2} MB",
analysis.final_rss as f64 / 1024.0 / 1024.0
)?;
writeln!(
file,
"- Total Growth: {:.2} MB",
analysis.total_growth as f64 / 1024.0 / 1024.0
)?;
writeln!(
file,
"- Model Load Overhead: {:.2} MB",
analysis.model_load_overhead as f64 / 1024.0 / 1024.0
)?;
writeln!(
file,
"- Average Growth per Cycle: {:.2} KB",
analysis.average_growth_per_cycle / 1024.0
)?;
writeln!(
file,
"- GC Efficiency: {:.1}%",
analysis.gc_efficiency * 100.0
)?;
writeln!(
file,
"- Memory Leak Detected: {}",
analysis.memory_leak_detected
)?;
writeln!(file)?;
writeln!(file, "## Memory Snapshots")?;
writeln!(
file,
"Timestamp(ms),InferenceID,ModelLoaded,HeapUsed(MB),HeapTotal(MB),RSS(MB),VMS(MB)"
)?;
for snapshot in snapshots {
writeln!(
file,
"{},{},{},{:.2},{:.2},{:.2},{:.2}",
snapshot.timestamp.as_millis(),
snapshot.inference_id,
snapshot.model_loaded,
snapshot.heap_used as f64 / 1024.0 / 1024.0,
snapshot.heap_total as f64 / 1024.0 / 1024.0,
snapshot.rss as f64 / 1024.0 / 1024.0,
snapshot.vms as f64 / 1024.0 / 1024.0
)?;
}
file.flush()?;
Ok(())
}
fn percentile(sorted_data: &[f64], percentile: f64) -> f64 {
if sorted_data.is_empty() {
return 0.0;
}
let index = (percentile / 100.0) * (sorted_data.len() - 1) as f64;
let lower = index.floor() as usize;
let upper = index.ceil() as usize;
if lower == upper {
sorted_data[lower]
} else {
let weight = index - lower as f64;
sorted_data[lower] * (1.0 - weight) + sorted_data[upper] * weight
}
}
async fn simulate_inference_request(model: &str, input: &str) -> Result<Duration> {
let start = Instant::now();
let processing_time_ms = (input.len() as u64 * 10).max(50);
tokio::time::sleep(Duration::from_millis(processing_time_ms)).await;
Ok(start.elapsed())
}
fn get_current_memory_usage() -> MemoryUsage {
let mut system = System::new_all();
system.refresh_memory();
MemoryUsage {
heap_used: system.used_memory(),
heap_total: system.total_memory(),
rss: system.used_memory(),
vms: system.total_memory(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_benchmark_invalid_type() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Benchmark {
bench_type: "invalid".to_string(),
output: PathBuf::from("output"),
model: None,
iterations: 100,
profile: false,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Benchmark type must be one of")
);
}
#[test]
fn test_validate_benchmark_zero_iterations() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Benchmark {
bench_type: "all".to_string(),
output: PathBuf::from("output"),
model: None,
iterations: 0,
profile: false,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Iterations must be greater than 0")
);
}
#[test]
fn test_validate_compare_negative_threshold() {
let temp_dir = tempfile::tempdir().unwrap();
let current_file = temp_dir.path().join("current.json");
let baseline_file = temp_dir.path().join("baseline.json");
std::fs::write(¤t_file, "{}").unwrap();
std::fs::write(&baseline_file, "{}").unwrap();
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Compare {
current: current_file,
baseline: baseline_file,
threshold: -5.0,
report: false,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Threshold must be non-negative")
);
}
#[test]
fn test_validate_monitor_invalid_format() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Monitor {
duration: 60,
interval: 5,
format: "invalid".to_string(),
output: None,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Monitor format must be one of")
);
}
#[test]
fn test_validate_monitor_zero_interval() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Monitor {
duration: 60,
interval: 0,
format: "console".to_string(),
output: None,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Interval must be greater than 0")
);
}
#[test]
fn test_validate_stress_zero_clients() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Stress {
clients: 0,
duration: 300,
model: None,
rate: 1.0,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Number of clients must be greater than 0")
);
}
#[test]
fn test_validate_stress_zero_rate() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Stress {
clients: 10,
duration: 300,
model: None,
rate: 0.0,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Request rate must be positive")
);
}
#[test]
fn test_validate_memory_profile_zero_cycles() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::MemoryProfile {
model: None,
cycles: 0,
output: None,
track: false,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Number of cycles must be greater than 0")
);
}
#[test]
fn test_validate_baseline_empty_backends() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Baseline {
output: PathBuf::from("output"),
targets: None,
backends: "".to_string(),
duration: 30,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Backends cannot be empty")
);
}
#[test]
fn test_validate_baseline_zero_duration() {
let args = PerformanceBenchmarkArgs {
command: PerformanceBenchmarkCommand::Baseline {
output: PathBuf::from("output"),
targets: None,
backends: "gguf".to_string(),
duration: 0,
},
};
let result = validate_benchmark_args(&args);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Duration must be greater than 0")
);
}
}