codex_memory/performance/
stress_testing.rs1use super::{PerformanceMetrics, PerformanceTestResult, StressTestConfig, TestType};
4use crate::memory::models::{CreateMemoryRequest, SearchRequest};
5use crate::memory::{MemoryRepository, MemoryTier};
6use anyhow::Result;
7use chrono::Utc;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use sysinfo::System;
12use tokio::sync::{RwLock, Semaphore};
13use tokio::time;
14use tracing::{debug, info, warn};
15
16pub struct StressTester {
18 config: StressTestConfig,
19 repository: Arc<MemoryRepository>,
20 metrics: Arc<StressTestMetrics>,
21 connection_semaphore: Arc<Semaphore>,
22}
23
24struct StressTestMetrics {
26 total_requests: AtomicU64,
27 successful_requests: AtomicU64,
28 failed_requests: AtomicU64,
29 current_connections: AtomicU64,
30 max_connections_reached: AtomicU64,
31 system_metrics: RwLock<SystemMetrics>,
32 breaking_point_found: AtomicBool,
33 breaking_point_connections: AtomicU64,
34}
35
36#[derive(Debug, Default)]
37struct SystemMetrics {
38 cpu_samples: Vec<f32>,
39 memory_samples: Vec<u64>,
40 max_cpu_usage: f32,
41 max_memory_usage: u64,
42}
43
44impl StressTester {
45 pub fn new(config: StressTestConfig, repository: Arc<MemoryRepository>) -> Self {
46 let max_concurrent = (config.max_connections as f64 * 0.7) as usize;
49
50 Self {
51 config,
52 repository,
53 metrics: Arc::new(StressTestMetrics {
54 total_requests: AtomicU64::new(0),
55 successful_requests: AtomicU64::new(0),
56 failed_requests: AtomicU64::new(0),
57 current_connections: AtomicU64::new(0),
58 max_connections_reached: AtomicU64::new(0),
59 system_metrics: RwLock::new(SystemMetrics::default()),
60 breaking_point_found: AtomicBool::new(false),
61 breaking_point_connections: AtomicU64::new(0),
62 }),
63 connection_semaphore: Arc::new(Semaphore::new(max_concurrent.max(10))),
64 }
65 }
66
67 pub async fn run_stress_test(&self) -> Result<PerformanceTestResult> {
69 info!(
70 "Starting stress test with max {} connections",
71 self.config.max_connections
72 );
73
74 let test_start = Utc::now();
75 let start_time = Instant::now();
76
77 let monitor_handle = self.start_system_monitoring();
79
80 let result = self.find_breaking_point().await;
82
83 self.metrics
85 .breaking_point_found
86 .store(true, Ordering::Relaxed);
87 monitor_handle.await?;
88
89 let test_end = Utc::now();
90 let duration = start_time.elapsed();
91
92 let metrics = self.calculate_metrics().await?;
94
95 let sla_violations = self.check_stress_violations(&metrics);
97
98 let result = PerformanceTestResult {
99 test_name: "Stress Test".to_string(),
100 test_type: TestType::Stress,
101 start_time: test_start,
102 end_time: test_end,
103 duration,
104 metrics,
105 sla_violations,
106 passed: result.is_ok(),
107 };
108
109 info!(
110 "Stress test completed. Breaking point: {} connections",
111 self.metrics
112 .breaking_point_connections
113 .load(Ordering::Relaxed)
114 );
115
116 Ok(result)
117 }
118
119 async fn find_breaking_point(&self) -> Result<()> {
121 let mut current_load = 10;
122 let load_increment = 10;
123 let stabilization_time = Duration::from_secs(5);
124
125 while current_load <= self.config.max_connections {
126 info!("Testing with {} concurrent connections", current_load);
127
128 let mut handles = Vec::new();
130
131 for conn_id in 0..current_load {
132 let repository = Arc::clone(&self.repository);
133 let metrics = Arc::clone(&self.metrics);
134 let semaphore = Arc::clone(&self.connection_semaphore);
135 let should_stop = Arc::new(AtomicBool::new(false));
136 let stop_clone = Arc::clone(&should_stop);
137
138 let handle = tokio::spawn(async move {
139 let _permit = match semaphore.acquire().await {
141 Ok(permit) => permit,
142 Err(_) => {
143 debug!("Failed to acquire semaphore permit");
144 return;
145 }
146 };
147
148 metrics.current_connections.fetch_add(1, Ordering::Relaxed);
149
150 let current = metrics.current_connections.load(Ordering::Relaxed);
152 let mut max = metrics.max_connections_reached.load(Ordering::Relaxed);
153 while current > max {
154 match metrics.max_connections_reached.compare_exchange(
155 max,
156 current,
157 Ordering::Relaxed,
158 Ordering::Relaxed,
159 ) {
160 Ok(_) => break,
161 Err(x) => max = x,
162 }
163 }
164
165 while !stop_clone.load(Ordering::Relaxed) {
167 let request_start = Instant::now();
168
169 let result = Self::stress_operation(&repository, conn_id).await;
170
171 metrics.total_requests.fetch_add(1, Ordering::Relaxed);
172
173 match result {
174 Ok(_) => {
175 metrics.successful_requests.fetch_add(1, Ordering::Relaxed);
176 }
177 Err(e) => {
178 metrics.failed_requests.fetch_add(1, Ordering::Relaxed);
179 debug!("Request failed: {}", e);
180 }
181 }
182
183 if request_start.elapsed() < Duration::from_millis(10) {
185 time::sleep(Duration::from_millis(10)).await;
186 }
187 }
188
189 metrics.current_connections.fetch_sub(1, Ordering::Relaxed);
190 });
191
192 handles.push((handle, should_stop));
193 }
194
195 time::sleep(stabilization_time).await;
197
198 if self.is_system_under_stress().await {
200 warn!("System under stress at {} connections", current_load);
201 self.metrics
202 .breaking_point_found
203 .store(true, Ordering::Relaxed);
204 self.metrics
205 .breaking_point_connections
206 .store(current_load as u64, Ordering::Relaxed);
207
208 for (_, should_stop) in &handles {
210 should_stop.store(true, Ordering::Relaxed);
211 }
212
213 for (handle, _) in handles {
215 handle.await?;
216 }
217
218 break;
219 }
220
221 let total = self.metrics.total_requests.load(Ordering::Relaxed);
223 let failed = self.metrics.failed_requests.load(Ordering::Relaxed);
224
225 if total > 0 && (failed as f64 / total as f64) > 0.1 {
226 warn!("High error rate detected at {} connections", current_load);
227 self.metrics
228 .breaking_point_found
229 .store(true, Ordering::Relaxed);
230 self.metrics
231 .breaking_point_connections
232 .store(current_load as u64, Ordering::Relaxed);
233
234 for (_, should_stop) in &handles {
236 should_stop.store(true, Ordering::Relaxed);
237 }
238
239 for (handle, _) in handles {
241 handle.await?;
242 }
243
244 break;
245 }
246
247 for (_, should_stop) in &handles {
249 should_stop.store(true, Ordering::Relaxed);
250 }
251
252 for (handle, _) in handles {
254 handle.await?;
255 }
256
257 current_load += load_increment;
259 }
260
261 Ok(())
262 }
263
264 async fn stress_operation(repository: &Arc<MemoryRepository>, conn_id: usize) -> Result<()> {
266 use rand::Rng;
267
268 let operation = {
270 let mut rng = rand::thread_rng();
271 rng.gen_range(0..3)
272 };
273
274 match operation {
276 0 => {
277 for i in 0..10 {
279 let content = format!("{}_{}_{}_{}", "x".repeat(250), conn_id, i, Utc::now());
280 let request = CreateMemoryRequest {
281 content,
282 embedding: None,
283 tier: Some(MemoryTier::Working),
284 importance_score: Some(0.5),
285 metadata: Some(
286 serde_json::json!({"stress_test": true, "conn_id": conn_id}),
287 ),
288 parent_id: None,
289 expires_at: None,
290 };
291 repository.create_memory(request).await?;
292 }
293 }
294 1 => {
295 let query_num = {
297 let mut rng = rand::thread_rng();
298 rng.gen_range(0..100)
299 };
300 let query = format!("stress test query {query_num}");
301 let search_request = SearchRequest {
302 query_text: Some(query),
303 query_embedding: None,
304 search_type: None,
305 hybrid_weights: None,
306 tier: None,
307 date_range: None,
308 importance_range: None,
309 metadata_filters: None,
310 tags: None,
311 limit: Some(100),
312 offset: None,
313 cursor: None,
314 similarity_threshold: None,
315 include_metadata: None,
316 include_facets: None,
317 ranking_boost: None,
318 explain_score: None,
319 };
320 repository.search_memories_simple(search_request).await?;
321 }
322 2 => {
323 for _ in 0..20 {
325 let id = uuid::Uuid::new_v4();
326 let _ = repository.get_memory(id).await;
327 }
328 }
329 _ => unreachable!(),
330 }
331
332 Ok(())
333 }
334
335 async fn is_system_under_stress(&self) -> bool {
337 let metrics = self.metrics.system_metrics.read().await;
338
339 if metrics.max_cpu_usage > self.config.cpu_pressure_threshold as f32 {
341 return true;
342 }
343
344 let mut sys = System::new();
346 sys.refresh_memory();
347
348 let total_memory = sys.total_memory();
349 let used_memory = sys.used_memory();
350 let memory_usage_percent = (used_memory as f64 / total_memory as f64) * 100.0;
351
352 memory_usage_percent > self.config.memory_pressure_threshold as f64
353 }
354
355 fn start_system_monitoring(&self) -> tokio::task::JoinHandle<()> {
357 let metrics = Arc::clone(&self.metrics);
358
359 tokio::spawn(async move {
360 let mut sys = System::new();
361 let mut interval = time::interval(Duration::from_secs(1));
362
363 while !metrics.breaking_point_found.load(Ordering::Relaxed) {
364 interval.tick().await;
365
366 sys.refresh_cpu();
368 sys.refresh_memory();
369
370 let cpu_usage = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>()
372 / sys.cpus().len() as f32;
373
374 let used_memory = sys.used_memory();
376
377 let mut system_metrics = metrics.system_metrics.write().await;
379 system_metrics.cpu_samples.push(cpu_usage);
380 system_metrics.memory_samples.push(used_memory);
381
382 if cpu_usage > system_metrics.max_cpu_usage {
384 system_metrics.max_cpu_usage = cpu_usage;
385 }
386
387 if used_memory > system_metrics.max_memory_usage {
388 system_metrics.max_memory_usage = used_memory;
389 }
390
391 if system_metrics.cpu_samples.len() > 60 {
393 system_metrics.cpu_samples.remove(0);
394 }
395 if system_metrics.memory_samples.len() > 60 {
396 system_metrics.memory_samples.remove(0);
397 }
398 }
399 })
400 }
401
402 async fn calculate_metrics(&self) -> Result<PerformanceMetrics> {
404 let total_requests = self.metrics.total_requests.load(Ordering::Relaxed);
405 let successful_requests = self.metrics.successful_requests.load(Ordering::Relaxed);
406 let failed_requests = self.metrics.failed_requests.load(Ordering::Relaxed);
407
408 let error_rate = if total_requests > 0 {
409 (failed_requests as f64 / total_requests as f64) * 100.0
410 } else {
411 0.0
412 };
413
414 let system_metrics = self.metrics.system_metrics.read().await;
415
416 let cpu_usage_avg = if !system_metrics.cpu_samples.is_empty() {
418 system_metrics.cpu_samples.iter().sum::<f32>() / system_metrics.cpu_samples.len() as f32
419 } else {
420 0.0
421 } as f64;
422
423 let memory_usage_avg = if !system_metrics.memory_samples.is_empty() {
424 system_metrics.memory_samples.iter().sum::<u64>()
425 / system_metrics.memory_samples.len() as u64
426 } else {
427 0
428 } as f64;
429
430 Ok(PerformanceMetrics {
431 total_requests,
432 successful_requests,
433 failed_requests,
434 throughput_rps: 0.0, latency_p50_ms: 0,
436 latency_p95_ms: 0,
437 latency_p99_ms: 0,
438 latency_max_ms: 0,
439 error_rate,
440 cpu_usage_avg,
441 memory_usage_avg,
442 cache_hit_ratio: 0.0,
443 db_connections_used: self.metrics.max_connections_reached.load(Ordering::Relaxed)
444 as u32,
445 network_bytes_sent: 0,
446 network_bytes_received: 0,
447 })
448 }
449
450 fn check_stress_violations(&self, metrics: &PerformanceMetrics) -> Vec<super::SlaViolation> {
452 let mut violations = Vec::new();
453
454 let breaking_point = self
456 .metrics
457 .breaking_point_connections
458 .load(Ordering::Relaxed);
459
460 if breaking_point > 0 && breaking_point < 100 {
461 violations.push(super::SlaViolation {
462 metric: "Breaking Point".to_string(),
463 threshold: 100.0,
464 actual_value: breaking_point as f64,
465 severity: super::ViolationSeverity::Critical,
466 timestamp: Utc::now(),
467 });
468 }
469
470 if metrics.error_rate > 10.0 {
472 violations.push(super::SlaViolation {
473 metric: "Stress Error Rate".to_string(),
474 threshold: 10.0,
475 actual_value: metrics.error_rate,
476 severity: super::ViolationSeverity::Warning,
477 timestamp: Utc::now(),
478 });
479 }
480
481 violations
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488
489 #[test]
490 fn test_stress_config_defaults() {
491 let config = StressTestConfig::default();
492 assert_eq!(config.max_connections, 10000);
493 assert_eq!(config.memory_pressure_threshold, 80);
494 assert_eq!(config.cpu_pressure_threshold, 90);
495 assert!(!config.chaos_testing_enabled);
496 }
497}