use ddex_builder::{Builder, BuildRequest};
use std::time::{Instant, Duration};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;
pub mod memory_monitor;
pub mod large_catalog;
pub mod concurrent_processing;
pub mod benchmarks;
#[derive(Debug, Clone)]
pub struct StressTestConfig {
pub max_file_size: usize,
pub max_tracks: usize,
pub max_nesting_depth: usize,
pub memory_limit_mb: usize,
pub operation_timeout_secs: u64,
pub concurrency_level: usize,
pub enable_memory_tracking: bool,
pub enable_profiling: bool,
}
impl Default for StressTestConfig {
fn default() -> Self {
Self {
max_file_size: 100 * 1024 * 1024, max_tracks: 10_000, max_nesting_depth: 50, memory_limit_mb: 1024, operation_timeout_secs: 300, concurrency_level: 100, enable_memory_tracking: true,
enable_profiling: true,
}
}
}
#[derive(Debug, Clone)]
pub struct StressTestResult {
pub test_name: String,
pub success: bool,
pub duration: Duration,
pub memory_usage: MemoryUsage,
pub performance_metrics: PerformanceMetrics,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MemoryUsage {
pub peak_memory_mb: f64,
pub average_memory_mb: f64,
pub memory_allocations: usize,
pub memory_deallocations: usize,
pub gc_collections: usize,
}
impl Default for MemoryUsage {
fn default() -> Self {
Self {
peak_memory_mb: 0.0,
average_memory_mb: 0.0,
memory_allocations: 0,
memory_deallocations: 0,
gc_collections: 0,
}
}
}
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub throughput_mb_per_sec: f64,
pub operations_per_second: f64,
pub cpu_usage_percent: f64,
pub io_read_mb: f64,
pub io_write_mb: f64,
}
impl Default for PerformanceMetrics {
fn default() -> Self {
Self {
throughput_mb_per_sec: 0.0,
operations_per_second: 0.0,
cpu_usage_percent: 0.0,
io_read_mb: 0.0,
io_write_mb: 0.0,
}
}
}
pub struct StressTestRunner {
config: StressTestConfig,
memory_monitor: Arc<MemoryMonitor>,
}
impl StressTestRunner {
pub fn new(config: StressTestConfig) -> Self {
let memory_monitor = Arc::new(MemoryMonitor::new(config.enable_memory_tracking));
Self {
config,
memory_monitor,
}
}
pub async fn run_all_stress_tests(&self) -> Result<Vec<StressTestResult>, Box<dyn std::error::Error>> {
let mut results = Vec::new();
results.push(self.test_large_file_processing().await?);
results.push(self.test_many_tracks_processing().await?);
results.push(self.test_deep_nesting_processing().await?);
results.push(self.test_concurrent_processing().await?);
results.push(self.test_memory_pressure().await?);
results.push(self.test_sustained_load().await?);
Ok(results)
}
pub async fn test_large_file_processing(&self) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let test_name = "Large File Processing".to_string();
let start_time = Instant::now();
self.memory_monitor.start_monitoring();
let large_request = self.generate_large_catalog_request()?;
let result = tokio::time::timeout(
Duration::from_secs(self.config.operation_timeout_secs),
self.process_large_request(large_request)
).await;
let duration = start_time.elapsed();
let memory_usage = self.memory_monitor.get_usage_stats();
let performance_metrics = self.calculate_performance_metrics(&memory_usage, duration);
self.memory_monitor.stop_monitoring();
match result {
Ok(Ok(_)) => Ok(StressTestResult {
test_name,
success: true,
duration,
memory_usage,
performance_metrics,
error: None,
}),
Ok(Err(e)) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some(e.to_string()),
}),
Err(_) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some("Operation timed out".to_string()),
}),
}
}
pub async fn test_many_tracks_processing(&self) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let test_name = "Many Tracks Processing".to_string();
let start_time = Instant::now();
self.memory_monitor.start_monitoring();
let many_tracks_request = self.generate_many_tracks_request()?;
let result = tokio::time::timeout(
Duration::from_secs(self.config.operation_timeout_secs),
self.process_many_tracks_request(many_tracks_request)
).await;
let duration = start_time.elapsed();
let memory_usage = self.memory_monitor.get_usage_stats();
let performance_metrics = self.calculate_performance_metrics(&memory_usage, duration);
self.memory_monitor.stop_monitoring();
match result {
Ok(Ok(_)) => Ok(StressTestResult {
test_name,
success: true,
duration,
memory_usage,
performance_metrics,
error: None,
}),
Ok(Err(e)) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some(e.to_string()),
}),
Err(_) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some("Operation timed out".to_string()),
}),
}
}
pub async fn test_deep_nesting_processing(&self) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let test_name = "Deep Nesting Processing".to_string();
let start_time = Instant::now();
self.memory_monitor.start_monitoring();
let deep_nested_request = self.generate_deep_nested_request()?;
let result = tokio::time::timeout(
Duration::from_secs(self.config.operation_timeout_secs),
self.process_deep_nested_request(deep_nested_request)
).await;
let duration = start_time.elapsed();
let memory_usage = self.memory_monitor.get_usage_stats();
let performance_metrics = self.calculate_performance_metrics(&memory_usage, duration);
self.memory_monitor.stop_monitoring();
match result {
Ok(Ok(_)) => Ok(StressTestResult {
test_name,
success: true,
duration,
memory_usage,
performance_metrics,
error: None,
}),
Ok(Err(e)) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some(e.to_string()),
}),
Err(_) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some("Operation timed out".to_string()),
}),
}
}
pub async fn test_concurrent_processing(&self) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let test_name = "Concurrent Processing".to_string();
let start_time = Instant::now();
self.memory_monitor.start_monitoring();
let semaphore = Arc::new(Semaphore::new(self.config.concurrency_level));
let successful_operations = Arc::new(AtomicUsize::new(0));
let failed_operations = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for i in 0..self.config.concurrency_level * 2 { let sem = Arc::clone(&semaphore);
let successful = Arc::clone(&successful_operations);
let failed = Arc::clone(&failed_operations);
let timeout_secs = self.config.operation_timeout_secs;
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
let request = Self::generate_simple_request(i);
let result = tokio::time::timeout(
Duration::from_secs(timeout_secs),
Self::process_simple_request(request)
).await;
match result {
Ok(Ok(_)) => successful.fetch_add(1, Ordering::Relaxed),
_ => failed.fetch_add(1, Ordering::Relaxed),
};
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let duration = start_time.elapsed();
let memory_usage = self.memory_monitor.get_usage_stats();
let performance_metrics = self.calculate_performance_metrics(&memory_usage, duration);
self.memory_monitor.stop_monitoring();
let successful = successful_operations.load(Ordering::Relaxed);
let failed = failed_operations.load(Ordering::Relaxed);
let total = successful + failed;
let success_rate = if total > 0 { successful as f64 / total as f64 } else { 0.0 };
Ok(StressTestResult {
test_name: format!("{} ({}/{} ops succeeded, {:.1}% success rate)",
test_name, successful, total, success_rate * 100.0),
success: success_rate > 0.95, duration,
memory_usage,
performance_metrics,
error: if success_rate <= 0.95 {
Some(format!("Success rate {:.1}% below threshold", success_rate * 100.0))
} else {
None
},
})
}
pub async fn test_memory_pressure(&self) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let test_name = "Memory Pressure Processing".to_string();
let start_time = Instant::now();
self.memory_monitor.start_monitoring();
let memory_pressure_size = self.config.memory_limit_mb / 4; let _pressure_blocks: Vec<Vec<u8>> = (0..memory_pressure_size)
.map(|_| vec![0u8; 1024 * 1024]) .collect();
let request = self.generate_medium_request()?;
let result = tokio::time::timeout(
Duration::from_secs(self.config.operation_timeout_secs),
self.process_medium_request(request)
).await;
let duration = start_time.elapsed();
let memory_usage = self.memory_monitor.get_usage_stats();
let performance_metrics = self.calculate_performance_metrics(&memory_usage, duration);
self.memory_monitor.stop_monitoring();
match result {
Ok(Ok(_)) => Ok(StressTestResult {
test_name,
success: true,
duration,
memory_usage,
performance_metrics,
error: None,
}),
Ok(Err(e)) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some(e.to_string()),
}),
Err(_) => Ok(StressTestResult {
test_name,
success: false,
duration,
memory_usage,
performance_metrics,
error: Some("Operation timed out under memory pressure".to_string()),
}),
}
}
pub async fn test_sustained_load(&self) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let test_name = "Sustained Load Processing".to_string();
let start_time = Instant::now();
self.memory_monitor.start_monitoring();
let operations_count = Arc::new(AtomicUsize::new(0));
let successful_operations = Arc::new(AtomicUsize::new(0));
let test_duration = Duration::from_secs(60);
let end_time = Instant::now() + test_duration;
let mut handles = Vec::new();
for worker_id in 0..10 { let operations = Arc::clone(&operations_count);
let successful = Arc::clone(&successful_operations);
let handle = tokio::spawn(async move {
let mut local_operations = 0;
let mut local_successful = 0;
while Instant::now() < end_time {
let request = Self::generate_simple_request(worker_id * 1000 + local_operations);
match Self::process_simple_request(request).await {
Ok(_) => local_successful += 1,
Err(_) => {} }
local_operations += 1;
tokio::time::sleep(Duration::from_millis(10)).await;
}
operations.fetch_add(local_operations, Ordering::Relaxed);
successful.fetch_add(local_successful, Ordering::Relaxed);
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let duration = start_time.elapsed();
let memory_usage = self.memory_monitor.get_usage_stats();
let performance_metrics = self.calculate_performance_metrics(&memory_usage, duration);
self.memory_monitor.stop_monitoring();
let total_operations = operations_count.load(Ordering::Relaxed);
let successful = successful_operations.load(Ordering::Relaxed);
let success_rate = if total_operations > 0 { successful as f64 / total_operations as f64 } else { 0.0 };
Ok(StressTestResult {
test_name: format!("{} ({} ops in {:?}, {:.1}% success rate)",
test_name, total_operations, duration, success_rate * 100.0),
success: success_rate > 0.90 && total_operations > 100, duration,
memory_usage,
performance_metrics,
error: if success_rate <= 0.90 || total_operations <= 100 {
Some(format!("Insufficient throughput or success rate: {} ops, {:.1}% success",
total_operations, success_rate * 100.0))
} else {
None
},
})
}
fn generate_large_catalog_request(&self) -> Result<LargeBuildRequest, Box<dyn std::error::Error>> {
Ok(LargeBuildRequest {
size_mb: self.config.max_file_size / (1024 * 1024),
track_count: 1000,
complexity: "high".to_string(),
})
}
fn generate_many_tracks_request(&self) -> Result<ManyTracksBuildRequest, Box<dyn std::error::Error>> {
Ok(ManyTracksBuildRequest {
track_count: self.config.max_tracks,
complexity: "medium".to_string(),
})
}
fn generate_deep_nested_request(&self) -> Result<DeepNestedBuildRequest, Box<dyn std::error::Error>> {
Ok(DeepNestedBuildRequest {
nesting_depth: self.config.max_nesting_depth,
complexity: "deep".to_string(),
})
}
fn generate_medium_request(&self) -> Result<MediumBuildRequest, Box<dyn std::error::Error>> {
Ok(MediumBuildRequest {
track_count: 100,
complexity: "medium".to_string(),
})
}
fn generate_simple_request(id: usize) -> SimpleBuildRequest {
SimpleBuildRequest {
id,
complexity: "simple".to_string(),
}
}
async fn process_large_request(&self, request: LargeBuildRequest) -> Result<String, Box<dyn std::error::Error>> {
let content = format!("Large catalog with {} MB, {} tracks", request.size_mb, request.track_count);
Ok(format!("<?xml version=\"1.0\"?><LargeCatalog>{}</LargeCatalog>", content))
}
async fn process_many_tracks_request(&self, request: ManyTracksBuildRequest) -> Result<String, Box<dyn std::error::Error>> {
let content = format!("Album with {} tracks", request.track_count);
Ok(format!("<?xml version=\"1.0\"?><ManyTracks>{}</ManyTracks>", content))
}
async fn process_deep_nested_request(&self, request: DeepNestedBuildRequest) -> Result<String, Box<dyn std::error::Error>> {
let content = format!("Structure with {} levels of nesting", request.nesting_depth);
Ok(format!("<?xml version=\"1.0\"?><DeepNested>{}</DeepNested>", content))
}
async fn process_medium_request(&self, request: MediumBuildRequest) -> Result<String, Box<dyn std::error::Error>> {
let content = format!("Medium album with {} tracks", request.track_count);
Ok(format!("<?xml version=\"1.0\"?><Medium>{}</Medium>", content))
}
async fn process_simple_request(request: SimpleBuildRequest) -> Result<String, Box<dyn std::error::Error>> {
let content = format!("Simple release #{}", request.id);
Ok(format!("<?xml version=\"1.0\"?><Simple>{}</Simple>", content))
}
fn calculate_performance_metrics(&self, memory_usage: &MemoryUsage, duration: Duration) -> PerformanceMetrics {
let duration_secs = duration.as_secs_f64();
PerformanceMetrics {
throughput_mb_per_sec: memory_usage.peak_memory_mb / duration_secs,
operations_per_second: 1.0 / duration_secs, cpu_usage_percent: 0.0, io_read_mb: 0.0, io_write_mb: 0.0, }
}
}
#[derive(Debug, Clone)]
pub struct LargeBuildRequest {
pub size_mb: usize,
pub track_count: usize,
pub complexity: String,
}
#[derive(Debug, Clone)]
pub struct ManyTracksBuildRequest {
pub track_count: usize,
pub complexity: String,
}
#[derive(Debug, Clone)]
pub struct DeepNestedBuildRequest {
pub nesting_depth: usize,
pub complexity: String,
}
#[derive(Debug, Clone)]
pub struct MediumBuildRequest {
pub track_count: usize,
pub complexity: String,
}
#[derive(Debug, Clone)]
pub struct SimpleBuildRequest {
pub id: usize,
pub complexity: String,
}
pub struct MemoryMonitor {
enabled: bool,
start_memory: Option<usize>,
peak_memory: Arc<AtomicUsize>,
samples: Arc<std::sync::Mutex<Vec<usize>>>,
}
impl MemoryMonitor {
pub fn new(enabled: bool) -> Self {
Self {
enabled,
start_memory: None,
peak_memory: Arc::new(AtomicUsize::new(0)),
samples: Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
pub fn start_monitoring(&self) {
if !self.enabled {
return;
}
let peak = Arc::clone(&self.peak_memory);
let samples = Arc::clone(&self.samples);
tokio::spawn(async move {
while peak.load(Ordering::Relaxed) != usize::MAX { if let Ok(current_memory) = Self::get_current_memory_usage() {
peak.fetch_max(current_memory, Ordering::Relaxed);
if let Ok(mut samples_vec) = samples.lock() {
samples_vec.push(current_memory);
}
}
tokio::time::sleep(Duration::from_millis(100)).await; }
});
}
pub fn stop_monitoring(&self) {
if self.enabled {
self.peak_memory.store(usize::MAX, Ordering::Relaxed); }
}
pub fn get_usage_stats(&self) -> MemoryUsage {
if !self.enabled {
return MemoryUsage::default();
}
let peak_bytes = self.peak_memory.load(Ordering::Relaxed);
let average_bytes = if let Ok(samples) = self.samples.lock() {
if samples.is_empty() {
0.0
} else {
samples.iter().sum::<usize>() as f64 / samples.len() as f64
}
} else {
0.0
};
MemoryUsage {
peak_memory_mb: peak_bytes as f64 / (1024.0 * 1024.0),
average_memory_mb: average_bytes / (1024.0 * 1024.0),
memory_allocations: 0, memory_deallocations: 0,
gc_collections: 0,
}
}
fn get_current_memory_usage() -> Result<usize, Box<dyn std::error::Error>> {
Ok(1024 * 1024) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stress_config_creation() {
let config = StressTestConfig::default();
assert_eq!(config.max_file_size, 100 * 1024 * 1024);
assert_eq!(config.max_tracks, 10_000);
assert!(config.enable_memory_tracking);
}
#[test]
fn test_memory_monitor_creation() {
let monitor = MemoryMonitor::new(true);
assert!(monitor.enabled);
let monitor_disabled = MemoryMonitor::new(false);
assert!(!monitor_disabled.enabled);
}
#[tokio::test]
async fn test_simple_request_processing() {
let request = StressTestRunner::generate_simple_request(42);
assert_eq!(request.id, 42);
let result = StressTestRunner::process_simple_request(request).await;
assert!(result.is_ok());
let xml = result.unwrap();
assert!(xml.contains("Simple release #42"));
}
#[tokio::test]
async fn test_stress_runner_creation() {
let config = StressTestConfig::default();
let runner = StressTestRunner::new(config);
assert!(runner.memory_monitor.enabled);
}
#[tokio::test]
#[ignore] async fn test_concurrent_stress() {
let config = StressTestConfig {
concurrency_level: 10, operation_timeout_secs: 30,
..Default::default()
};
let runner = StressTestRunner::new(config);
let result = runner.test_concurrent_processing().await;
assert!(result.is_ok());
let stress_result = result.unwrap();
println!("Concurrent stress test: {}", stress_result.test_name);
println!("Success: {}", stress_result.success);
println!("Duration: {:?}", stress_result.duration);
assert!(stress_result.success);
}
}