1use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, Instant};
13
14use crate::error::Result;
15
16#[derive(Debug, Clone)]
18pub struct LoadTestConfig {
19 pub concurrent_users: u32,
21 pub ramp_up: Duration,
23 pub duration: Duration,
25 pub requests_per_user: Option<u32>,
27 pub target_rps: Option<f64>,
29}
30
31impl Default for LoadTestConfig {
32 fn default() -> Self {
33 Self {
34 concurrent_users: 10,
35 ramp_up: Duration::from_secs(10),
36 duration: Duration::from_secs(60),
37 requests_per_user: None,
38 target_rps: None,
39 }
40 }
41}
42
43impl LoadTestConfig {
44 #[must_use]
46 pub fn light() -> Self {
47 Self {
48 concurrent_users: 5,
49 ramp_up: Duration::from_secs(5),
50 duration: Duration::from_secs(30),
51 ..Default::default()
52 }
53 }
54
55 #[must_use]
57 pub fn moderate() -> Self {
58 Self {
59 concurrent_users: 50,
60 ramp_up: Duration::from_secs(30),
61 duration: Duration::from_secs(120),
62 ..Default::default()
63 }
64 }
65
66 #[must_use]
68 pub fn heavy() -> Self {
69 Self {
70 concurrent_users: 200,
71 ramp_up: Duration::from_secs(60),
72 duration: Duration::from_secs(300),
73 ..Default::default()
74 }
75 }
76
77 #[must_use]
79 pub fn quick() -> Self {
80 Self {
81 concurrent_users: 4,
82 ramp_up: Duration::from_millis(100),
83 duration: Duration::from_millis(500),
84 requests_per_user: Some(10),
85 target_rps: None,
86 }
87 }
88}
89
90struct LoadMetrics {
92 total_requests: AtomicU64,
93 successful: AtomicU64,
94 failed: AtomicU64,
95 latencies_us: Mutex<Vec<u64>>,
96}
97
98impl Default for LoadMetrics {
99 fn default() -> Self {
100 Self {
101 total_requests: AtomicU64::new(0),
102 successful: AtomicU64::new(0),
103 failed: AtomicU64::new(0),
104 latencies_us: Mutex::new(Vec::new()),
105 }
106 }
107}
108
109impl LoadMetrics {
110 fn record_success(&self, latency_us: u64) {
111 self.total_requests.fetch_add(1, Ordering::Relaxed);
112 self.successful.fetch_add(1, Ordering::Relaxed);
113 if let Ok(mut latencies) = self.latencies_us.lock() {
114 latencies.push(latency_us);
115 }
116 }
117
118 fn record_failure(&self) {
119 self.total_requests.fetch_add(1, Ordering::Relaxed);
120 self.failed.fetch_add(1, Ordering::Relaxed);
121 }
122
123 fn get_latencies(&self) -> Vec<u64> {
124 self.latencies_us
125 .lock()
126 .map(|l| l.clone())
127 .unwrap_or_default()
128 }
129}
130
131pub type RequestHandler = Arc<dyn Fn(u32, u64) -> bool + Send + Sync>;
134
135pub struct LoadTester {
137 config: LoadTestConfig,
138 handler: Option<RequestHandler>,
139}
140
141impl LoadTester {
142 #[must_use]
144 pub const fn new(config: LoadTestConfig) -> Self {
145 Self {
146 config,
147 handler: None,
148 }
149 }
150
151 #[must_use]
155 pub fn with_handler(mut self, handler: RequestHandler) -> Self {
156 self.handler = Some(handler);
157 self
158 }
159
160 #[allow(clippy::too_many_lines)]
170 pub async fn run(&self) -> Result<LoadTestReport> {
171 tracing::info!(
172 users = self.config.concurrent_users,
173 duration = ?self.config.duration,
174 ramp_up = ?self.config.ramp_up,
175 "starting load test"
176 );
177
178 let metrics = Arc::new(LoadMetrics::default());
179 let start_time = Instant::now();
180 let test_end = start_time + self.config.ramp_up + self.config.duration;
181
182 let mut handles = Vec::with_capacity(self.config.concurrent_users as usize);
184 let ramp_delay = if self.config.concurrent_users > 1 {
185 self.config.ramp_up.as_millis() as u64
186 / (u64::from(self.config.concurrent_users) - 1).max(1)
187 } else {
188 0
189 };
190
191 let concurrent_users = self.config.concurrent_users;
193
194 for user_id in 0..self.config.concurrent_users {
195 let metrics = Arc::clone(&metrics);
196 let requests_per_user = self.config.requests_per_user;
197 let target_rps = self.config.target_rps;
198 let request_handler = self.handler.clone();
199
200 let worker_start_delay = Duration::from_millis(ramp_delay * u64::from(user_id));
202
203 handles.push(tokio::spawn(async move {
204 tokio::time::sleep(worker_start_delay).await;
206
207 let mut request_id = 0u64;
208 let interval = target_rps
209 .map(|rps| Duration::from_secs_f64(1.0 / rps * f64::from(concurrent_users)));
210
211 loop {
212 if Instant::now() >= test_end {
214 break;
215 }
216 if requests_per_user.is_some_and(|max| request_id >= u64::from(max)) {
217 break;
218 }
219
220 let req_start = Instant::now();
222 let success = if let Some(ref h) = request_handler {
223 h(user_id, request_id)
224 } else {
225 tokio::time::sleep(Duration::from_micros(100)).await;
227 !request_id.is_multiple_of(100) };
229 let latency_us = req_start.elapsed().as_micros() as u64;
230
231 if success {
232 metrics.record_success(latency_us);
233 } else {
234 metrics.record_failure();
235 }
236
237 request_id += 1;
238
239 if let Some(delay) = interval {
241 tokio::time::sleep(delay).await;
242 }
243 }
244 }));
245 }
246
247 for handle in handles {
249 let _ = handle.await;
250 }
251
252 let elapsed = start_time.elapsed();
253
254 let total_requests = metrics.total_requests.load(Ordering::Relaxed);
256 let successful = metrics.successful.load(Ordering::Relaxed);
257 let failed = metrics.failed.load(Ordering::Relaxed);
258
259 let mut latencies = metrics.get_latencies();
260 latencies.sort_unstable();
261
262 let (p50, p95, p99) = if latencies.is_empty() {
263 (0, 0, 0)
264 } else {
265 (
266 percentile(&latencies, 50),
267 percentile(&latencies, 95),
268 percentile(&latencies, 99),
269 )
270 };
271
272 let throughput_rps = if elapsed.as_secs_f64() > 0.0 {
273 total_requests as f64 / elapsed.as_secs_f64()
274 } else {
275 0.0
276 };
277
278 let error_rate = if total_requests > 0 {
279 failed as f64 / total_requests as f64
280 } else {
281 0.0
282 };
283
284 tracing::info!(
285 total = total_requests,
286 successful = successful,
287 failed = failed,
288 throughput_rps = format!("{throughput_rps:.2}"),
289 p50_us = p50,
290 p95_us = p95,
291 p99_us = p99,
292 "load test completed"
293 );
294
295 Ok(LoadTestReport {
296 total_requests,
297 successful,
298 failed,
299 latency_p50_us: p50,
300 latency_p95_us: p95,
301 latency_p99_us: p99,
302 throughput_rps,
303 error_rate,
304 })
305 }
306
307 #[must_use]
309 pub const fn config(&self) -> &LoadTestConfig {
310 &self.config
311 }
312}
313
314impl Default for LoadTester {
315 fn default() -> Self {
316 Self::new(LoadTestConfig::default())
317 }
318}
319
320fn percentile(sorted: &[u64], p: usize) -> u64 {
322 if sorted.is_empty() {
323 return 0;
324 }
325 let idx = (sorted.len() * p / 100).min(sorted.len() - 1);
326 sorted[idx]
327}
328
329#[derive(Debug, Clone)]
331pub struct LoadTestReport {
332 pub total_requests: u64,
334 pub successful: u64,
336 pub failed: u64,
338 pub latency_p50_us: u64,
340 pub latency_p95_us: u64,
342 pub latency_p99_us: u64,
344 pub throughput_rps: f64,
346 pub error_rate: f64,
348}
349
350impl LoadTestReport {
351 #[must_use]
353 pub fn passed(&self) -> bool {
354 self.error_rate < 0.01
355 }
356
357 #[must_use]
359 pub fn success_rate(&self) -> f64 {
360 if self.total_requests > 0 {
361 self.successful as f64 / self.total_requests as f64
362 } else {
363 0.0
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[test]
373 fn test_load_test_config_presets() {
374 let light = LoadTestConfig::light();
375 assert_eq!(light.concurrent_users, 5);
376
377 let moderate = LoadTestConfig::moderate();
378 assert_eq!(moderate.concurrent_users, 50);
379
380 let heavy = LoadTestConfig::heavy();
381 assert_eq!(heavy.concurrent_users, 200);
382
383 let quick = LoadTestConfig::quick();
384 assert_eq!(quick.concurrent_users, 4);
385 assert_eq!(quick.requests_per_user, Some(10));
386 }
387
388 #[tokio::test]
389 async fn test_load_tester_run_quick() {
390 let tester = LoadTester::new(LoadTestConfig::quick());
392 let report = tester.run().await;
393
394 assert!(report.is_ok());
395 let report = report.unwrap();
396
397 assert_eq!(report.total_requests, 40);
399 assert_eq!(report.failed, 4);
402 assert_eq!(report.successful, 36);
403 assert!(report.throughput_rps > 0.0);
404 assert!(report.latency_p50_us > 0);
405 }
406
407 #[tokio::test]
408 async fn test_load_tester_with_custom_handler() {
409 let handler: RequestHandler = Arc::new(|_user_id, request_id| {
410 request_id % 5 != 0
412 });
413
414 let config = LoadTestConfig {
415 concurrent_users: 2,
416 ramp_up: Duration::from_millis(10),
417 duration: Duration::from_millis(100),
418 requests_per_user: Some(10),
419 target_rps: None,
420 };
421
422 let tester = LoadTester::new(config).with_handler(handler);
423 let report = tester.run().await.unwrap();
424
425 assert_eq!(report.total_requests, 20);
427 assert_eq!(report.failed, 4);
429 assert_eq!(report.successful, 16);
430 }
431
432 #[tokio::test]
433 async fn test_load_tester_all_success() {
434 let handler: RequestHandler = Arc::new(|_, _| true);
435
436 let config = LoadTestConfig {
437 concurrent_users: 2,
438 ramp_up: Duration::from_millis(10),
439 duration: Duration::from_millis(100),
440 requests_per_user: Some(5),
441 target_rps: None,
442 };
443
444 let tester = LoadTester::new(config).with_handler(handler);
445 let report = tester.run().await.unwrap();
446
447 assert_eq!(report.total_requests, 10);
448 assert_eq!(report.failed, 0);
449 assert_eq!(report.successful, 10);
450 assert!(report.passed());
451 assert!((report.success_rate() - 1.0).abs() < 0.001);
452 assert!((report.error_rate - 0.0).abs() < 0.001);
453 }
454
455 #[tokio::test]
456 async fn test_load_tester_all_failure() {
457 let handler: RequestHandler = Arc::new(|_, _| false);
458
459 let config = LoadTestConfig {
460 concurrent_users: 2,
461 ramp_up: Duration::from_millis(10),
462 duration: Duration::from_millis(100),
463 requests_per_user: Some(5),
464 target_rps: None,
465 };
466
467 let tester = LoadTester::new(config).with_handler(handler);
468 let report = tester.run().await.unwrap();
469
470 assert_eq!(report.total_requests, 10);
471 assert_eq!(report.failed, 10);
472 assert_eq!(report.successful, 0);
473 assert!(!report.passed());
474 assert!((report.success_rate() - 0.0).abs() < 0.001);
475 assert!((report.error_rate - 1.0).abs() < 0.001);
476 }
477
478 #[test]
479 fn test_load_test_report_passed() {
480 let report = LoadTestReport {
481 total_requests: 1000,
482 successful: 995,
483 failed: 5,
484 latency_p50_us: 1000,
485 latency_p95_us: 5000,
486 latency_p99_us: 10000,
487 throughput_rps: 100.0,
488 error_rate: 0.005,
489 };
490
491 assert!(report.passed());
492 assert!((report.success_rate() - 0.995).abs() < 0.001);
493 }
494
495 #[test]
496 fn test_load_test_report_failed() {
497 let report = LoadTestReport {
498 total_requests: 100,
499 successful: 90,
500 failed: 10,
501 latency_p50_us: 1000,
502 latency_p95_us: 5000,
503 latency_p99_us: 10000,
504 throughput_rps: 100.0,
505 error_rate: 0.10, };
507
508 assert!(!report.passed()); assert!((report.success_rate() - 0.9).abs() < 0.001);
510 }
511
512 #[test]
513 fn test_percentile_calculation() {
514 let sorted = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
515
516 assert_eq!(percentile(&sorted, 0), 1);
522 assert_eq!(percentile(&sorted, 50), 6);
523 assert_eq!(percentile(&sorted, 90), 10);
524 assert_eq!(percentile(&sorted, 100), 10);
525 }
526
527 #[test]
528 fn test_percentile_empty() {
529 let empty: Vec<u64> = vec![];
530 assert_eq!(percentile(&empty, 50), 0);
531 }
532
533 #[test]
534 fn test_load_metrics_thread_safety() {
535 let metrics = Arc::new(LoadMetrics::default());
536 let mut handles = vec![];
537
538 for _ in 0..10 {
539 let m = Arc::clone(&metrics);
540 handles.push(std::thread::spawn(move || {
541 for i in 0..100 {
542 if i % 10 == 0 {
543 m.record_failure();
544 } else {
545 m.record_success(i * 100);
546 }
547 }
548 }));
549 }
550
551 for h in handles {
552 h.join().unwrap();
553 }
554
555 assert_eq!(metrics.total_requests.load(Ordering::Relaxed), 1000);
556 assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
557 assert_eq!(metrics.successful.load(Ordering::Relaxed), 900);
558
559 let latencies = metrics.get_latencies();
560 assert_eq!(latencies.len(), 900);
561 }
562}