duende_test/
load.rs

1//! Load testing for daemons.
2//!
3//! # Toyota Way: Heijunka (平準化)
4//! Level loading to understand capacity limits.
5//!
6//! # Implementation
7//! Uses concurrent workers via tokio to simulate load.
8//! Collects latency metrics and computes percentiles.
9
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, Instant};
13
14use crate::error::Result;
15
16/// Load test configuration.
17#[derive(Debug, Clone)]
18pub struct LoadTestConfig {
19    /// Number of concurrent users/workers.
20    pub concurrent_users: u32,
21    /// Ramp-up duration (time to reach full concurrency).
22    pub ramp_up: Duration,
23    /// Test duration (after ramp-up completes).
24    pub duration: Duration,
25    /// Requests per user (if set, stops after this many requests per user).
26    pub requests_per_user: Option<u32>,
27    /// Target requests per second (rate limiting).
28    pub target_rps: Option<f64>,
29}
30
31impl Default for LoadTestConfig {
32    fn default() -> Self {
33        Self {
34            concurrent_users: 10,
35            ramp_up: Duration::from_secs(10),
36            duration: Duration::from_secs(60),
37            requests_per_user: None,
38            target_rps: None,
39        }
40    }
41}
42
43impl LoadTestConfig {
44    /// Creates a light load test config.
45    #[must_use]
46    pub fn light() -> Self {
47        Self {
48            concurrent_users: 5,
49            ramp_up: Duration::from_secs(5),
50            duration: Duration::from_secs(30),
51            ..Default::default()
52        }
53    }
54
55    /// Creates a moderate load test config.
56    #[must_use]
57    pub fn moderate() -> Self {
58        Self {
59            concurrent_users: 50,
60            ramp_up: Duration::from_secs(30),
61            duration: Duration::from_secs(120),
62            ..Default::default()
63        }
64    }
65
66    /// Creates a heavy load test config.
67    #[must_use]
68    pub fn heavy() -> Self {
69        Self {
70            concurrent_users: 200,
71            ramp_up: Duration::from_secs(60),
72            duration: Duration::from_secs(300),
73            ..Default::default()
74        }
75    }
76
77    /// Creates a quick config for testing (short durations).
78    #[must_use]
79    pub fn quick() -> Self {
80        Self {
81            concurrent_users: 4,
82            ramp_up: Duration::from_millis(100),
83            duration: Duration::from_millis(500),
84            requests_per_user: Some(10),
85            target_rps: None,
86        }
87    }
88}
89
90/// Shared metrics for concurrent load test workers.
91struct LoadMetrics {
92    total_requests: AtomicU64,
93    successful: AtomicU64,
94    failed: AtomicU64,
95    latencies_us: Mutex<Vec<u64>>,
96}
97
98impl Default for LoadMetrics {
99    fn default() -> Self {
100        Self {
101            total_requests: AtomicU64::new(0),
102            successful: AtomicU64::new(0),
103            failed: AtomicU64::new(0),
104            latencies_us: Mutex::new(Vec::new()),
105        }
106    }
107}
108
109impl LoadMetrics {
110    fn record_success(&self, latency_us: u64) {
111        self.total_requests.fetch_add(1, Ordering::Relaxed);
112        self.successful.fetch_add(1, Ordering::Relaxed);
113        if let Ok(mut latencies) = self.latencies_us.lock() {
114            latencies.push(latency_us);
115        }
116    }
117
118    fn record_failure(&self) {
119        self.total_requests.fetch_add(1, Ordering::Relaxed);
120        self.failed.fetch_add(1, Ordering::Relaxed);
121    }
122
123    fn get_latencies(&self) -> Vec<u64> {
124        self.latencies_us
125            .lock()
126            .map(|l| l.clone())
127            .unwrap_or_default()
128    }
129}
130
131/// Request handler function type.
132/// Takes a user_id and request_id, returns success (true) or failure (false).
133pub type RequestHandler = Arc<dyn Fn(u32, u64) -> bool + Send + Sync>;
134
135/// Load tester for daemons.
136pub struct LoadTester {
137    config: LoadTestConfig,
138    handler: Option<RequestHandler>,
139}
140
141impl LoadTester {
142    /// Creates a new load tester.
143    #[must_use]
144    pub const fn new(config: LoadTestConfig) -> Self {
145        Self {
146            config,
147            handler: None,
148        }
149    }
150
151    /// Sets a custom request handler for the load test.
152    ///
153    /// The handler receives (user_id, request_id) and returns true for success.
154    #[must_use]
155    pub fn with_handler(mut self, handler: RequestHandler) -> Self {
156        self.handler = Some(handler);
157        self
158    }
159
160    /// Runs the load test with concurrent workers.
161    ///
162    /// # Load Test Phases (Toyota Way: Heijunka)
163    /// 1. Ramp-up: Gradually spawn workers to avoid thundering herd
164    /// 2. Steady-state: All workers active, collecting metrics
165    /// 3. Cool-down: Workers complete, aggregate results
166    ///
167    /// # Errors
168    /// Returns an error if the test infrastructure fails.
169    #[allow(clippy::too_many_lines)]
170    pub async fn run(&self) -> Result<LoadTestReport> {
171        tracing::info!(
172            users = self.config.concurrent_users,
173            duration = ?self.config.duration,
174            ramp_up = ?self.config.ramp_up,
175            "starting load test"
176        );
177
178        let metrics = Arc::new(LoadMetrics::default());
179        let start_time = Instant::now();
180        let test_end = start_time + self.config.ramp_up + self.config.duration;
181
182        // Spawn concurrent workers
183        let mut handles = Vec::with_capacity(self.config.concurrent_users as usize);
184        let ramp_delay = if self.config.concurrent_users > 1 {
185            self.config.ramp_up.as_millis() as u64
186                / (u64::from(self.config.concurrent_users) - 1).max(1)
187        } else {
188            0
189        };
190
191        // Copy config values for worker closures
192        let concurrent_users = self.config.concurrent_users;
193
194        for user_id in 0..self.config.concurrent_users {
195            let metrics = Arc::clone(&metrics);
196            let requests_per_user = self.config.requests_per_user;
197            let target_rps = self.config.target_rps;
198            let request_handler = self.handler.clone();
199
200            // Calculate when this worker should start (ramp-up)
201            let worker_start_delay = Duration::from_millis(ramp_delay * u64::from(user_id));
202
203            handles.push(tokio::spawn(async move {
204                // Wait for ramp-up
205                tokio::time::sleep(worker_start_delay).await;
206
207                let mut request_id = 0u64;
208                let interval = target_rps
209                    .map(|rps| Duration::from_secs_f64(1.0 / rps * f64::from(concurrent_users)));
210
211                loop {
212                    // Check termination conditions
213                    if Instant::now() >= test_end {
214                        break;
215                    }
216                    if requests_per_user.is_some_and(|max| request_id >= u64::from(max)) {
217                        break;
218                    }
219
220                    // Execute request
221                    let req_start = Instant::now();
222                    let success = if let Some(ref h) = request_handler {
223                        h(user_id, request_id)
224                    } else {
225                        // Default: simulate 100us work with 1% failure rate
226                        tokio::time::sleep(Duration::from_micros(100)).await;
227                        !request_id.is_multiple_of(100) // 1% failure
228                    };
229                    let latency_us = req_start.elapsed().as_micros() as u64;
230
231                    if success {
232                        metrics.record_success(latency_us);
233                    } else {
234                        metrics.record_failure();
235                    }
236
237                    request_id += 1;
238
239                    // Rate limiting
240                    if let Some(delay) = interval {
241                        tokio::time::sleep(delay).await;
242                    }
243                }
244            }));
245        }
246
247        // Wait for all workers
248        for handle in handles {
249            let _ = handle.await;
250        }
251
252        let elapsed = start_time.elapsed();
253
254        // Compute report
255        let total_requests = metrics.total_requests.load(Ordering::Relaxed);
256        let successful = metrics.successful.load(Ordering::Relaxed);
257        let failed = metrics.failed.load(Ordering::Relaxed);
258
259        let mut latencies = metrics.get_latencies();
260        latencies.sort_unstable();
261
262        let (p50, p95, p99) = if latencies.is_empty() {
263            (0, 0, 0)
264        } else {
265            (
266                percentile(&latencies, 50),
267                percentile(&latencies, 95),
268                percentile(&latencies, 99),
269            )
270        };
271
272        let throughput_rps = if elapsed.as_secs_f64() > 0.0 {
273            total_requests as f64 / elapsed.as_secs_f64()
274        } else {
275            0.0
276        };
277
278        let error_rate = if total_requests > 0 {
279            failed as f64 / total_requests as f64
280        } else {
281            0.0
282        };
283
284        tracing::info!(
285            total = total_requests,
286            successful = successful,
287            failed = failed,
288            throughput_rps = format!("{throughput_rps:.2}"),
289            p50_us = p50,
290            p95_us = p95,
291            p99_us = p99,
292            "load test completed"
293        );
294
295        Ok(LoadTestReport {
296            total_requests,
297            successful,
298            failed,
299            latency_p50_us: p50,
300            latency_p95_us: p95,
301            latency_p99_us: p99,
302            throughput_rps,
303            error_rate,
304        })
305    }
306
307    /// Returns the test config.
308    #[must_use]
309    pub const fn config(&self) -> &LoadTestConfig {
310        &self.config
311    }
312}
313
314impl Default for LoadTester {
315    fn default() -> Self {
316        Self::new(LoadTestConfig::default())
317    }
318}
319
320/// Computes percentile from sorted slice.
321fn percentile(sorted: &[u64], p: usize) -> u64 {
322    if sorted.is_empty() {
323        return 0;
324    }
325    let idx = (sorted.len() * p / 100).min(sorted.len() - 1);
326    sorted[idx]
327}
328
329/// Load test report.
330#[derive(Debug, Clone)]
331pub struct LoadTestReport {
332    /// Total requests made.
333    pub total_requests: u64,
334    /// Successful requests.
335    pub successful: u64,
336    /// Failed requests.
337    pub failed: u64,
338    /// P50 latency in microseconds.
339    pub latency_p50_us: u64,
340    /// P95 latency in microseconds.
341    pub latency_p95_us: u64,
342    /// P99 latency in microseconds.
343    pub latency_p99_us: u64,
344    /// Throughput in requests per second.
345    pub throughput_rps: f64,
346    /// Error rate (0.0 to 1.0).
347    pub error_rate: f64,
348}
349
350impl LoadTestReport {
351    /// Returns true if the test passed (error rate below 1%).
352    #[must_use]
353    pub fn passed(&self) -> bool {
354        self.error_rate < 0.01
355    }
356
357    /// Returns success rate (0.0 to 1.0).
358    #[must_use]
359    pub fn success_rate(&self) -> f64 {
360        if self.total_requests > 0 {
361            self.successful as f64 / self.total_requests as f64
362        } else {
363            0.0
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn test_load_test_config_presets() {
374        let light = LoadTestConfig::light();
375        assert_eq!(light.concurrent_users, 5);
376
377        let moderate = LoadTestConfig::moderate();
378        assert_eq!(moderate.concurrent_users, 50);
379
380        let heavy = LoadTestConfig::heavy();
381        assert_eq!(heavy.concurrent_users, 200);
382
383        let quick = LoadTestConfig::quick();
384        assert_eq!(quick.concurrent_users, 4);
385        assert_eq!(quick.requests_per_user, Some(10));
386    }
387
388    #[tokio::test]
389    async fn test_load_tester_run_quick() {
390        // Use quick config for fast test execution
391        let tester = LoadTester::new(LoadTestConfig::quick());
392        let report = tester.run().await;
393
394        assert!(report.is_ok());
395        let report = report.unwrap();
396
397        // With 4 users * 10 requests each = 40 total requests
398        assert_eq!(report.total_requests, 40);
399        // Default handler has 1% failure rate (request_id % 100 == 0 fails)
400        // With request_ids 0-9 per user, request 0 fails for each user = 4 failures
401        assert_eq!(report.failed, 4);
402        assert_eq!(report.successful, 36);
403        assert!(report.throughput_rps > 0.0);
404        assert!(report.latency_p50_us > 0);
405    }
406
407    #[tokio::test]
408    async fn test_load_tester_with_custom_handler() {
409        let handler: RequestHandler = Arc::new(|_user_id, request_id| {
410            // Fail every 5th request
411            request_id % 5 != 0
412        });
413
414        let config = LoadTestConfig {
415            concurrent_users: 2,
416            ramp_up: Duration::from_millis(10),
417            duration: Duration::from_millis(100),
418            requests_per_user: Some(10),
419            target_rps: None,
420        };
421
422        let tester = LoadTester::new(config).with_handler(handler);
423        let report = tester.run().await.unwrap();
424
425        // 2 users * 10 requests = 20 total
426        assert_eq!(report.total_requests, 20);
427        // Requests 0, 5 fail for each user = 4 failures
428        assert_eq!(report.failed, 4);
429        assert_eq!(report.successful, 16);
430    }
431
432    #[tokio::test]
433    async fn test_load_tester_all_success() {
434        let handler: RequestHandler = Arc::new(|_, _| true);
435
436        let config = LoadTestConfig {
437            concurrent_users: 2,
438            ramp_up: Duration::from_millis(10),
439            duration: Duration::from_millis(100),
440            requests_per_user: Some(5),
441            target_rps: None,
442        };
443
444        let tester = LoadTester::new(config).with_handler(handler);
445        let report = tester.run().await.unwrap();
446
447        assert_eq!(report.total_requests, 10);
448        assert_eq!(report.failed, 0);
449        assert_eq!(report.successful, 10);
450        assert!(report.passed());
451        assert!((report.success_rate() - 1.0).abs() < 0.001);
452        assert!((report.error_rate - 0.0).abs() < 0.001);
453    }
454
455    #[tokio::test]
456    async fn test_load_tester_all_failure() {
457        let handler: RequestHandler = Arc::new(|_, _| false);
458
459        let config = LoadTestConfig {
460            concurrent_users: 2,
461            ramp_up: Duration::from_millis(10),
462            duration: Duration::from_millis(100),
463            requests_per_user: Some(5),
464            target_rps: None,
465        };
466
467        let tester = LoadTester::new(config).with_handler(handler);
468        let report = tester.run().await.unwrap();
469
470        assert_eq!(report.total_requests, 10);
471        assert_eq!(report.failed, 10);
472        assert_eq!(report.successful, 0);
473        assert!(!report.passed());
474        assert!((report.success_rate() - 0.0).abs() < 0.001);
475        assert!((report.error_rate - 1.0).abs() < 0.001);
476    }
477
478    #[test]
479    fn test_load_test_report_passed() {
480        let report = LoadTestReport {
481            total_requests: 1000,
482            successful: 995,
483            failed: 5,
484            latency_p50_us: 1000,
485            latency_p95_us: 5000,
486            latency_p99_us: 10000,
487            throughput_rps: 100.0,
488            error_rate: 0.005,
489        };
490
491        assert!(report.passed());
492        assert!((report.success_rate() - 0.995).abs() < 0.001);
493    }
494
495    #[test]
496    fn test_load_test_report_failed() {
497        let report = LoadTestReport {
498            total_requests: 100,
499            successful: 90,
500            failed: 10,
501            latency_p50_us: 1000,
502            latency_p95_us: 5000,
503            latency_p99_us: 10000,
504            throughput_rps: 100.0,
505            error_rate: 0.10, // 10% error rate
506        };
507
508        assert!(!report.passed()); // >1% error rate
509        assert!((report.success_rate() - 0.9).abs() < 0.001);
510    }
511
512    #[test]
513    fn test_percentile_calculation() {
514        let sorted = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
515
516        // percentile uses (len * p / 100).min(len-1) for index
517        // len=10, p=0 => idx=0 => value=1
518        // len=10, p=50 => idx=5 => value=6
519        // len=10, p=90 => idx=9 => value=10
520        // len=10, p=100 => idx=10.min(9)=9 => value=10
521        assert_eq!(percentile(&sorted, 0), 1);
522        assert_eq!(percentile(&sorted, 50), 6);
523        assert_eq!(percentile(&sorted, 90), 10);
524        assert_eq!(percentile(&sorted, 100), 10);
525    }
526
527    #[test]
528    fn test_percentile_empty() {
529        let empty: Vec<u64> = vec![];
530        assert_eq!(percentile(&empty, 50), 0);
531    }
532
533    #[test]
534    fn test_load_metrics_thread_safety() {
535        let metrics = Arc::new(LoadMetrics::default());
536        let mut handles = vec![];
537
538        for _ in 0..10 {
539            let m = Arc::clone(&metrics);
540            handles.push(std::thread::spawn(move || {
541                for i in 0..100 {
542                    if i % 10 == 0 {
543                        m.record_failure();
544                    } else {
545                        m.record_success(i * 100);
546                    }
547                }
548            }));
549        }
550
551        for h in handles {
552            h.join().unwrap();
553        }
554
555        assert_eq!(metrics.total_requests.load(Ordering::Relaxed), 1000);
556        assert_eq!(metrics.failed.load(Ordering::Relaxed), 100);
557        assert_eq!(metrics.successful.load(Ordering::Relaxed), 900);
558
559        let latencies = metrics.get_latencies();
560        assert_eq!(latencies.len(), 900);
561    }
562}