codex_memory/performance/
stress_testing.rs

1//! Stress testing implementation for finding system breaking points
2
3use super::{PerformanceMetrics, PerformanceTestResult, StressTestConfig, TestType};
4use crate::memory::models::{CreateMemoryRequest, SearchRequest};
5use crate::memory::{MemoryRepository, MemoryTier};
6use anyhow::Result;
7use chrono::Utc;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use sysinfo::System;
12use tokio::sync::{RwLock, Semaphore};
13use tokio::time;
14use tracing::{debug, info, warn};
15
16/// Stress testing orchestrator
17pub struct StressTester {
18    config: StressTestConfig,
19    repository: Arc<MemoryRepository>,
20    metrics: Arc<StressTestMetrics>,
21    connection_semaphore: Arc<Semaphore>,
22}
23
24/// Metrics collected during stress testing
25struct StressTestMetrics {
26    total_requests: AtomicU64,
27    successful_requests: AtomicU64,
28    failed_requests: AtomicU64,
29    current_connections: AtomicU64,
30    max_connections_reached: AtomicU64,
31    system_metrics: RwLock<SystemMetrics>,
32    breaking_point_found: AtomicBool,
33    breaking_point_connections: AtomicU64,
34}
35
36#[derive(Debug, Default)]
37struct SystemMetrics {
38    cpu_samples: Vec<f32>,
39    memory_samples: Vec<u64>,
40    max_cpu_usage: f32,
41    max_memory_usage: u64,
42}
43
44impl StressTester {
45    pub fn new(config: StressTestConfig, repository: Arc<MemoryRepository>) -> Self {
46        // Limit concurrent connections to avoid pool exhaustion
47        // Use 70% of max connections as per CLAUDE.md best practices
48        let max_concurrent = (config.max_connections as f64 * 0.7) as usize;
49
50        Self {
51            config,
52            repository,
53            metrics: Arc::new(StressTestMetrics {
54                total_requests: AtomicU64::new(0),
55                successful_requests: AtomicU64::new(0),
56                failed_requests: AtomicU64::new(0),
57                current_connections: AtomicU64::new(0),
58                max_connections_reached: AtomicU64::new(0),
59                system_metrics: RwLock::new(SystemMetrics::default()),
60                breaking_point_found: AtomicBool::new(false),
61                breaking_point_connections: AtomicU64::new(0),
62            }),
63            connection_semaphore: Arc::new(Semaphore::new(max_concurrent.max(10))),
64        }
65    }
66
67    /// Run stress test to find breaking points
68    pub async fn run_stress_test(&self) -> Result<PerformanceTestResult> {
69        info!(
70            "Starting stress test with max {} connections",
71            self.config.max_connections
72        );
73
74        let test_start = Utc::now();
75        let start_time = Instant::now();
76
77        // Start system monitoring
78        let monitor_handle = self.start_system_monitoring();
79
80        // Gradually increase load until breaking point
81        let result = self.find_breaking_point().await;
82
83        // Stop monitoring
84        self.metrics
85            .breaking_point_found
86            .store(true, Ordering::Relaxed);
87        monitor_handle.await?;
88
89        let test_end = Utc::now();
90        let duration = start_time.elapsed();
91
92        // Calculate final metrics
93        let metrics = self.calculate_metrics().await?;
94
95        // Check for stress test specific violations
96        let sla_violations = self.check_stress_violations(&metrics);
97
98        let result = PerformanceTestResult {
99            test_name: "Stress Test".to_string(),
100            test_type: TestType::Stress,
101            start_time: test_start,
102            end_time: test_end,
103            duration,
104            metrics,
105            sla_violations,
106            passed: result.is_ok(),
107        };
108
109        info!(
110            "Stress test completed. Breaking point: {} connections",
111            self.metrics
112                .breaking_point_connections
113                .load(Ordering::Relaxed)
114        );
115
116        Ok(result)
117    }
118
119    /// Find the system breaking point by gradually increasing load
120    async fn find_breaking_point(&self) -> Result<()> {
121        let mut current_load = 10;
122        let load_increment = 10;
123        let stabilization_time = Duration::from_secs(5);
124
125        while current_load <= self.config.max_connections {
126            info!("Testing with {} concurrent connections", current_load);
127
128            // Spawn connections
129            let mut handles = Vec::new();
130
131            for conn_id in 0..current_load {
132                let repository = Arc::clone(&self.repository);
133                let metrics = Arc::clone(&self.metrics);
134                let semaphore = Arc::clone(&self.connection_semaphore);
135                let should_stop = Arc::new(AtomicBool::new(false));
136                let stop_clone = Arc::clone(&should_stop);
137
138                let handle = tokio::spawn(async move {
139                    // Acquire semaphore permit to limit concurrent connections
140                    let _permit = match semaphore.acquire().await {
141                        Ok(permit) => permit,
142                        Err(_) => {
143                            debug!("Failed to acquire semaphore permit");
144                            return;
145                        }
146                    };
147
148                    metrics.current_connections.fetch_add(1, Ordering::Relaxed);
149
150                    // Update max connections if needed
151                    let current = metrics.current_connections.load(Ordering::Relaxed);
152                    let mut max = metrics.max_connections_reached.load(Ordering::Relaxed);
153                    while current > max {
154                        match metrics.max_connections_reached.compare_exchange(
155                            max,
156                            current,
157                            Ordering::Relaxed,
158                            Ordering::Relaxed,
159                        ) {
160                            Ok(_) => break,
161                            Err(x) => max = x,
162                        }
163                    }
164
165                    // Continuously make requests
166                    while !stop_clone.load(Ordering::Relaxed) {
167                        let request_start = Instant::now();
168
169                        let result = Self::stress_operation(&repository, conn_id).await;
170
171                        metrics.total_requests.fetch_add(1, Ordering::Relaxed);
172
173                        match result {
174                            Ok(_) => {
175                                metrics.successful_requests.fetch_add(1, Ordering::Relaxed);
176                            }
177                            Err(e) => {
178                                metrics.failed_requests.fetch_add(1, Ordering::Relaxed);
179                                debug!("Request failed: {}", e);
180                            }
181                        }
182
183                        // Small delay to prevent CPU spinning
184                        if request_start.elapsed() < Duration::from_millis(10) {
185                            time::sleep(Duration::from_millis(10)).await;
186                        }
187                    }
188
189                    metrics.current_connections.fetch_sub(1, Ordering::Relaxed);
190                });
191
192                handles.push((handle, should_stop));
193            }
194
195            // Let the system stabilize
196            time::sleep(stabilization_time).await;
197
198            // Check if system is under stress
199            if self.is_system_under_stress().await {
200                warn!("System under stress at {} connections", current_load);
201                self.metrics
202                    .breaking_point_found
203                    .store(true, Ordering::Relaxed);
204                self.metrics
205                    .breaking_point_connections
206                    .store(current_load as u64, Ordering::Relaxed);
207
208                // Stop all connections
209                for (_, should_stop) in &handles {
210                    should_stop.store(true, Ordering::Relaxed);
211                }
212
213                // Wait for all to complete
214                for (handle, _) in handles {
215                    handle.await?;
216                }
217
218                break;
219            }
220
221            // Check error rate
222            let total = self.metrics.total_requests.load(Ordering::Relaxed);
223            let failed = self.metrics.failed_requests.load(Ordering::Relaxed);
224
225            if total > 0 && (failed as f64 / total as f64) > 0.1 {
226                warn!("High error rate detected at {} connections", current_load);
227                self.metrics
228                    .breaking_point_found
229                    .store(true, Ordering::Relaxed);
230                self.metrics
231                    .breaking_point_connections
232                    .store(current_load as u64, Ordering::Relaxed);
233
234                // Stop all connections
235                for (_, should_stop) in &handles {
236                    should_stop.store(true, Ordering::Relaxed);
237                }
238
239                // Wait for all to complete
240                for (handle, _) in handles {
241                    handle.await?;
242                }
243
244                break;
245            }
246
247            // Stop current load level
248            for (_, should_stop) in &handles {
249                should_stop.store(true, Ordering::Relaxed);
250            }
251
252            // Wait for all to complete
253            for (handle, _) in handles {
254                handle.await?;
255            }
256
257            // Increase load
258            current_load += load_increment;
259        }
260
261        Ok(())
262    }
263
264    /// Perform a stress test operation
265    async fn stress_operation(repository: &Arc<MemoryRepository>, conn_id: usize) -> Result<()> {
266        use rand::Rng;
267
268        // Generate random value before any await
269        let operation = {
270            let mut rng = rand::thread_rng();
271            rng.gen_range(0..3)
272        };
273
274        // Heavy operations to stress the system
275        match operation {
276            0 => {
277                // Bulk write
278                for i in 0..10 {
279                    let content = format!("{}_{}_{}_{}", "x".repeat(250), conn_id, i, Utc::now());
280                    let request = CreateMemoryRequest {
281                        content,
282                        embedding: None,
283                        tier: Some(MemoryTier::Working),
284                        importance_score: Some(0.5),
285                        metadata: Some(
286                            serde_json::json!({"stress_test": true, "conn_id": conn_id}),
287                        ),
288                        parent_id: None,
289                        expires_at: None,
290                    };
291                    repository.create_memory(request).await?;
292                }
293            }
294            1 => {
295                // Complex search
296                let query_num = {
297                    let mut rng = rand::thread_rng();
298                    rng.gen_range(0..100)
299                };
300                let query = format!("stress test query {query_num}");
301                let search_request = SearchRequest {
302                    query_text: Some(query),
303                    query_embedding: None,
304                    search_type: None,
305                    hybrid_weights: None,
306                    tier: None,
307                    date_range: None,
308                    importance_range: None,
309                    metadata_filters: None,
310                    tags: None,
311                    limit: Some(100),
312                    offset: None,
313                    cursor: None,
314                    similarity_threshold: None,
315                    include_metadata: None,
316                    include_facets: None,
317                    ranking_boost: None,
318                    explain_score: None,
319                };
320                repository.search_memories_simple(search_request).await?;
321            }
322            2 => {
323                // Bulk read - attempt to read random UUIDs (most will fail, which is fine for stress testing)
324                for _ in 0..20 {
325                    let id = uuid::Uuid::new_v4();
326                    let _ = repository.get_memory(id).await;
327                }
328            }
329            _ => unreachable!(),
330        }
331
332        Ok(())
333    }
334
335    /// Check if system is under stress based on CPU and memory thresholds
336    async fn is_system_under_stress(&self) -> bool {
337        let metrics = self.metrics.system_metrics.read().await;
338
339        // Check CPU threshold
340        if metrics.max_cpu_usage > self.config.cpu_pressure_threshold as f32 {
341            return true;
342        }
343
344        // Check memory threshold
345        let mut sys = System::new();
346        sys.refresh_memory();
347
348        let total_memory = sys.total_memory();
349        let used_memory = sys.used_memory();
350        let memory_usage_percent = (used_memory as f64 / total_memory as f64) * 100.0;
351
352        memory_usage_percent > self.config.memory_pressure_threshold as f64
353    }
354
355    /// Start monitoring system metrics
356    fn start_system_monitoring(&self) -> tokio::task::JoinHandle<()> {
357        let metrics = Arc::clone(&self.metrics);
358
359        tokio::spawn(async move {
360            let mut sys = System::new();
361            let mut interval = time::interval(Duration::from_secs(1));
362
363            while !metrics.breaking_point_found.load(Ordering::Relaxed) {
364                interval.tick().await;
365
366                // Refresh system info
367                sys.refresh_cpu();
368                sys.refresh_memory();
369
370                // Get CPU usage - calculate average across all CPUs
371                let cpu_usage = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>()
372                    / sys.cpus().len() as f32;
373
374                // Get memory usage
375                let used_memory = sys.used_memory();
376
377                // Store samples
378                let mut system_metrics = metrics.system_metrics.write().await;
379                system_metrics.cpu_samples.push(cpu_usage);
380                system_metrics.memory_samples.push(used_memory);
381
382                // Update maximums
383                if cpu_usage > system_metrics.max_cpu_usage {
384                    system_metrics.max_cpu_usage = cpu_usage;
385                }
386
387                if used_memory > system_metrics.max_memory_usage {
388                    system_metrics.max_memory_usage = used_memory;
389                }
390
391                // Keep only last 60 samples
392                if system_metrics.cpu_samples.len() > 60 {
393                    system_metrics.cpu_samples.remove(0);
394                }
395                if system_metrics.memory_samples.len() > 60 {
396                    system_metrics.memory_samples.remove(0);
397                }
398            }
399        })
400    }
401
402    /// Calculate performance metrics from stress test
403    async fn calculate_metrics(&self) -> Result<PerformanceMetrics> {
404        let total_requests = self.metrics.total_requests.load(Ordering::Relaxed);
405        let successful_requests = self.metrics.successful_requests.load(Ordering::Relaxed);
406        let failed_requests = self.metrics.failed_requests.load(Ordering::Relaxed);
407
408        let error_rate = if total_requests > 0 {
409            (failed_requests as f64 / total_requests as f64) * 100.0
410        } else {
411            0.0
412        };
413
414        let system_metrics = self.metrics.system_metrics.read().await;
415
416        // Calculate averages
417        let cpu_usage_avg = if !system_metrics.cpu_samples.is_empty() {
418            system_metrics.cpu_samples.iter().sum::<f32>() / system_metrics.cpu_samples.len() as f32
419        } else {
420            0.0
421        } as f64;
422
423        let memory_usage_avg = if !system_metrics.memory_samples.is_empty() {
424            system_metrics.memory_samples.iter().sum::<u64>()
425                / system_metrics.memory_samples.len() as u64
426        } else {
427            0
428        } as f64;
429
430        Ok(PerformanceMetrics {
431            total_requests,
432            successful_requests,
433            failed_requests,
434            throughput_rps: 0.0, // Not relevant for stress test
435            latency_p50_ms: 0,
436            latency_p95_ms: 0,
437            latency_p99_ms: 0,
438            latency_max_ms: 0,
439            error_rate,
440            cpu_usage_avg,
441            memory_usage_avg,
442            cache_hit_ratio: 0.0,
443            db_connections_used: self.metrics.max_connections_reached.load(Ordering::Relaxed)
444                as u32,
445            network_bytes_sent: 0,
446            network_bytes_received: 0,
447        })
448    }
449
450    /// Check for stress test specific violations
451    fn check_stress_violations(&self, metrics: &PerformanceMetrics) -> Vec<super::SlaViolation> {
452        let mut violations = Vec::new();
453
454        // Check if breaking point was found too early
455        let breaking_point = self
456            .metrics
457            .breaking_point_connections
458            .load(Ordering::Relaxed);
459
460        if breaking_point > 0 && breaking_point < 100 {
461            violations.push(super::SlaViolation {
462                metric: "Breaking Point".to_string(),
463                threshold: 100.0,
464                actual_value: breaking_point as f64,
465                severity: super::ViolationSeverity::Critical,
466                timestamp: Utc::now(),
467            });
468        }
469
470        // Check error rate under stress
471        if metrics.error_rate > 10.0 {
472            violations.push(super::SlaViolation {
473                metric: "Stress Error Rate".to_string(),
474                threshold: 10.0,
475                actual_value: metrics.error_rate,
476                severity: super::ViolationSeverity::Warning,
477                timestamp: Utc::now(),
478            });
479        }
480
481        violations
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488
489    #[test]
490    fn test_stress_config_defaults() {
491        let config = StressTestConfig::default();
492        assert_eq!(config.max_connections, 10000);
493        assert_eq!(config.memory_pressure_threshold, 80);
494        assert_eq!(config.cpu_pressure_threshold, 90);
495        assert!(!config.chaos_testing_enabled);
496    }
497}