Skip to main content

mabi_modbus/testing/
performance.rs

1//! Performance validation framework for large-scale testing.
2//!
3//! Provides infrastructure to validate:
4//! - 10,000+ simultaneous connections
5//! - 100,000+ transactions per second
6//! - Sub-10ms P99 latency under load
7
8use std::net::SocketAddr;
9use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use parking_lot::RwLock;
14use tokio::net::TcpStream;
15use tokio::sync::{Semaphore, broadcast};
16use tokio::time::timeout;
17
18use super::report::TestMetrics;
19
20/// Performance validation configuration.
21#[derive(Debug, Clone)]
22pub struct PerformanceConfig {
23    /// Target number of simultaneous connections.
24    pub target_connections: usize,
25    /// Target transactions per second.
26    pub target_tps: u64,
27    /// Test duration.
28    pub duration: Duration,
29    /// Connection ramp-up time.
30    pub ramp_up: Duration,
31    /// Server address to test.
32    pub server_addr: SocketAddr,
33    /// Maximum acceptable P99 latency.
34    pub max_p99_latency: Duration,
35    /// Maximum acceptable error rate (0.0 - 1.0).
36    pub max_error_rate: f64,
37    /// Number of warmup iterations.
38    pub warmup_iterations: usize,
39    /// Report interval for progress updates.
40    pub report_interval: Duration,
41}
42
43impl Default for PerformanceConfig {
44    fn default() -> Self {
45        Self {
46            target_connections: 1000,
47            target_tps: 10_000,
48            duration: Duration::from_secs(60),
49            ramp_up: Duration::from_secs(10),
50            server_addr: "127.0.0.1:502".parse().unwrap(),
51            max_p99_latency: Duration::from_millis(10),
52            max_error_rate: 0.01,
53            warmup_iterations: 100,
54            report_interval: Duration::from_secs(5),
55        }
56    }
57}
58
59impl PerformanceConfig {
60    /// Create configuration for 10K connection test.
61    pub fn ten_thousand_connections() -> Self {
62        Self {
63            target_connections: 10_000,
64            target_tps: 50_000,
65            duration: Duration::from_secs(120),
66            ramp_up: Duration::from_secs(30),
67            max_p99_latency: Duration::from_millis(50),
68            ..Default::default()
69        }
70    }
71
72    /// Create configuration for 100K TPS test.
73    pub fn hundred_thousand_tps() -> Self {
74        Self {
75            target_connections: 5_000,
76            target_tps: 100_000,
77            duration: Duration::from_secs(60),
78            ramp_up: Duration::from_secs(15),
79            max_p99_latency: Duration::from_millis(10),
80            ..Default::default()
81        }
82    }
83
84    /// Create configuration for stress test.
85    pub fn stress_test() -> Self {
86        Self {
87            target_connections: 50_000,
88            target_tps: 500_000,
89            duration: Duration::from_secs(300),
90            ramp_up: Duration::from_secs(60),
91            max_p99_latency: Duration::from_millis(100),
92            max_error_rate: 0.05,
93            ..Default::default()
94        }
95    }
96
97    /// Set target connections.
98    pub fn with_connections(mut self, n: usize) -> Self {
99        self.target_connections = n;
100        self
101    }
102
103    /// Set target TPS.
104    pub fn with_tps(mut self, tps: u64) -> Self {
105        self.target_tps = tps;
106        self
107    }
108
109    /// Set test duration.
110    pub fn with_duration(mut self, duration: Duration) -> Self {
111        self.duration = duration;
112        self
113    }
114
115    /// Set server address.
116    pub fn with_server(mut self, addr: SocketAddr) -> Self {
117        self.server_addr = addr;
118        self
119    }
120}
121
122/// Performance targets to validate.
123#[derive(Debug, Clone)]
124pub struct PerformanceTarget {
125    pub name: String,
126    pub metric: TargetMetric,
127    pub threshold: f64,
128    pub comparison: Comparison,
129}
130
131#[derive(Debug, Clone, Copy)]
132pub enum TargetMetric {
133    Connections,
134    Tps,
135    P50Latency,
136    P95Latency,
137    P99Latency,
138    ErrorRate,
139    MemoryMb,
140}
141
142#[derive(Debug, Clone, Copy)]
143pub enum Comparison {
144    GreaterThan,
145    LessThan,
146    GreaterOrEqual,
147    LessOrEqual,
148}
149
150impl PerformanceTarget {
151    pub fn min_connections(n: usize) -> Self {
152        Self {
153            name: format!("Min {} connections", n),
154            metric: TargetMetric::Connections,
155            threshold: n as f64,
156            comparison: Comparison::GreaterOrEqual,
157        }
158    }
159
160    pub fn min_tps(tps: u64) -> Self {
161        Self {
162            name: format!("Min {} TPS", tps),
163            metric: TargetMetric::Tps,
164            threshold: tps as f64,
165            comparison: Comparison::GreaterOrEqual,
166        }
167    }
168
169    pub fn max_p99_latency(ms: u64) -> Self {
170        Self {
171            name: format!("Max P99 latency {}ms", ms),
172            metric: TargetMetric::P99Latency,
173            threshold: ms as f64,
174            comparison: Comparison::LessOrEqual,
175        }
176    }
177
178    pub fn max_error_rate(rate: f64) -> Self {
179        Self {
180            name: format!("Max error rate {:.1}%", rate * 100.0),
181            metric: TargetMetric::ErrorRate,
182            threshold: rate,
183            comparison: Comparison::LessOrEqual,
184        }
185    }
186
187    /// Check if the target is met.
188    pub fn check(&self, value: f64) -> bool {
189        match self.comparison {
190            Comparison::GreaterThan => value > self.threshold,
191            Comparison::LessThan => value < self.threshold,
192            Comparison::GreaterOrEqual => value >= self.threshold,
193            Comparison::LessOrEqual => value <= self.threshold,
194        }
195    }
196}
197
198/// Result of a single target validation.
199#[derive(Debug, Clone)]
200pub struct TargetResult {
201    pub target: PerformanceTarget,
202    pub actual_value: f64,
203    pub passed: bool,
204}
205
206/// Validation result containing all target results.
207#[derive(Debug, Clone)]
208pub struct ValidationResult {
209    pub passed: bool,
210    pub targets: Vec<TargetResult>,
211    pub metrics: TestMetrics,
212    pub duration: Duration,
213    pub error_message: Option<String>,
214}
215
216impl ValidationResult {
217    /// Get a summary of the validation.
218    pub fn summary(&self) -> String {
219        let passed_count = self.targets.iter().filter(|t| t.passed).count();
220        let total = self.targets.len();
221        format!(
222            "{} ({}/{} targets passed)",
223            if self.passed { "PASSED" } else { "FAILED" },
224            passed_count,
225            total
226        )
227    }
228}
229
230/// Performance validator for running large-scale tests.
231pub struct PerformanceValidator {
232    /// Performance configuration (public for test access).
233    pub config: PerformanceConfig,
234    targets: Vec<PerformanceTarget>,
235    running: Arc<AtomicBool>,
236    metrics: Arc<LiveMetrics>,
237}
238
239/// Live metrics collector during test execution.
240struct LiveMetrics {
241    requests_total: AtomicU64,
242    requests_success: AtomicU64,
243    requests_failed: AtomicU64,
244    connections_active: AtomicU64,
245    connections_total: AtomicU64,
246    latencies: RwLock<Vec<Duration>>,
247}
248
249impl LiveMetrics {
250    fn new() -> Self {
251        Self {
252            requests_total: AtomicU64::new(0),
253            requests_success: AtomicU64::new(0),
254            requests_failed: AtomicU64::new(0),
255            connections_active: AtomicU64::new(0),
256            connections_total: AtomicU64::new(0),
257            latencies: RwLock::new(Vec::with_capacity(100_000)),
258        }
259    }
260
261    fn record_request(&self, success: bool, latency: Duration) {
262        self.requests_total.fetch_add(1, Ordering::Relaxed);
263        if success {
264            self.requests_success.fetch_add(1, Ordering::Relaxed);
265        } else {
266            self.requests_failed.fetch_add(1, Ordering::Relaxed);
267        }
268
269        // Sample latencies (keep every Nth to manage memory)
270        let total = self.requests_total.load(Ordering::Relaxed);
271        if total % 100 == 0 {
272            self.latencies.write().push(latency);
273        }
274    }
275
276    fn add_connection(&self) {
277        self.connections_active.fetch_add(1, Ordering::Relaxed);
278        self.connections_total.fetch_add(1, Ordering::Relaxed);
279    }
280
281    fn remove_connection(&self) {
282        self.connections_active.fetch_sub(1, Ordering::Relaxed);
283    }
284
285    fn snapshot(&self) -> MetricsSnapshot {
286        let mut latencies = self.latencies.read().clone();
287        latencies.sort();
288
289        let p50 = latencies.get(latencies.len() / 2).copied();
290        let p95 = latencies.get(latencies.len() * 95 / 100).copied();
291        let p99 = latencies.get(latencies.len() * 99 / 100).copied();
292
293        let total = self.requests_total.load(Ordering::Relaxed);
294        let success = self.requests_success.load(Ordering::Relaxed);
295        let failed = self.requests_failed.load(Ordering::Relaxed);
296
297        MetricsSnapshot {
298            requests_total: total,
299            requests_success: success,
300            requests_failed: failed,
301            connections_active: self.connections_active.load(Ordering::Relaxed),
302            connections_total: self.connections_total.load(Ordering::Relaxed),
303            p50_latency: p50,
304            p95_latency: p95,
305            p99_latency: p99,
306            error_rate: if total > 0 { failed as f64 / total as f64 } else { 0.0 },
307        }
308    }
309}
310
311#[derive(Debug, Clone)]
312struct MetricsSnapshot {
313    requests_total: u64,
314    requests_success: u64,
315    requests_failed: u64,
316    connections_active: u64,
317    connections_total: u64,
318    p50_latency: Option<Duration>,
319    p95_latency: Option<Duration>,
320    p99_latency: Option<Duration>,
321    error_rate: f64,
322}
323
324impl PerformanceValidator {
325    /// Create a new performance validator.
326    pub fn new(config: PerformanceConfig) -> Self {
327        Self {
328            config,
329            targets: Vec::new(),
330            running: Arc::new(AtomicBool::new(false)),
331            metrics: Arc::new(LiveMetrics::new()),
332        }
333    }
334
335    /// Create validator for 10K connection test.
336    pub fn ten_thousand_connections() -> Self {
337        let config = PerformanceConfig::ten_thousand_connections();
338        let mut validator = Self::new(config);
339        validator.add_target(PerformanceTarget::min_connections(10_000));
340        validator.add_target(PerformanceTarget::max_p99_latency(50));
341        validator.add_target(PerformanceTarget::max_error_rate(0.01));
342        validator
343    }
344
345    /// Create validator for 100K TPS test.
346    pub fn hundred_thousand_tps() -> Self {
347        let config = PerformanceConfig::hundred_thousand_tps();
348        let mut validator = Self::new(config);
349        validator.add_target(PerformanceTarget::min_tps(100_000));
350        validator.add_target(PerformanceTarget::max_p99_latency(10));
351        validator.add_target(PerformanceTarget::max_error_rate(0.01));
352        validator
353    }
354
355    /// Add a performance target.
356    pub fn add_target(&mut self, target: PerformanceTarget) {
357        self.targets.push(target);
358    }
359
360    /// Set server address.
361    pub fn server_addr(mut self, addr: SocketAddr) -> Self {
362        self.config.server_addr = addr;
363        self
364    }
365
366    /// Run the performance validation test.
367    pub async fn run(&self) -> Result<ValidationResult, String> {
368        tracing::info!(
369            "Starting performance validation: {} connections, {} TPS target",
370            self.config.target_connections,
371            self.config.target_tps
372        );
373
374        self.running.store(true, Ordering::SeqCst);
375        let start = Instant::now();
376
377        // Spawn progress reporter
378        let metrics = self.metrics.clone();
379        let report_interval = self.config.report_interval;
380        let running = self.running.clone();
381        let reporter = tokio::spawn(async move {
382            while running.load(Ordering::Relaxed) {
383                tokio::time::sleep(report_interval).await;
384                let snapshot = metrics.snapshot();
385                tracing::info!(
386                    "Progress: {} requests ({} success, {} failed), {} active connections, P99: {:?}",
387                    snapshot.requests_total,
388                    snapshot.requests_success,
389                    snapshot.requests_failed,
390                    snapshot.connections_active,
391                    snapshot.p99_latency,
392                );
393            }
394        });
395
396        // Run the test
397        let result = self.run_test().await;
398
399        // Stop reporter
400        self.running.store(false, Ordering::SeqCst);
401        let _ = reporter.await;
402
403        let duration = start.elapsed();
404
405        // Build final metrics
406        let snapshot = self.metrics.snapshot();
407        let tps = if duration.as_secs() > 0 {
408            snapshot.requests_total / duration.as_secs()
409        } else {
410            0
411        };
412
413        // Validate targets
414        let mut target_results = Vec::new();
415        let mut all_passed = true;
416
417        for target in &self.targets {
418            let value = match target.metric {
419                TargetMetric::Connections => snapshot.connections_total as f64,
420                TargetMetric::Tps => tps as f64,
421                TargetMetric::P50Latency => {
422                    snapshot.p50_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0)
423                }
424                TargetMetric::P95Latency => {
425                    snapshot.p95_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0)
426                }
427                TargetMetric::P99Latency => {
428                    snapshot.p99_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0)
429                }
430                TargetMetric::ErrorRate => snapshot.error_rate,
431                TargetMetric::MemoryMb => 0.0, // TODO: Integrate with memory profiler
432            };
433
434            let passed = target.check(value);
435            if !passed {
436                all_passed = false;
437            }
438
439            target_results.push(TargetResult {
440                target: target.clone(),
441                actual_value: value,
442                passed,
443            });
444        }
445
446        let test_metrics = TestMetrics {
447            total_requests: snapshot.requests_total,
448            successful_requests: snapshot.requests_success,
449            failed_requests: snapshot.requests_failed,
450            total_connections: snapshot.connections_total,
451            peak_connections: snapshot.connections_active, // Approximation
452            avg_tps: tps,
453            peak_tps: tps, // Would need more tracking for accurate peak
454            p50_latency_ms: snapshot.p50_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0),
455            p95_latency_ms: snapshot.p95_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0),
456            p99_latency_ms: snapshot.p99_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0),
457            error_rate: snapshot.error_rate,
458            memory_peak_mb: 0.0, // TODO: Integrate with memory profiler
459        };
460
461        Ok(ValidationResult {
462            passed: all_passed && result.is_ok(),
463            targets: target_results,
464            metrics: test_metrics,
465            duration,
466            error_message: result.err(),
467        })
468    }
469
470    async fn run_test(&self) -> Result<(), String> {
471        let semaphore = Arc::new(Semaphore::new(self.config.target_connections));
472        let (shutdown_tx, _) = broadcast::channel::<()>(1);
473
474        // Calculate request rate per connection
475        let connections = self.config.target_connections;
476        let target_tps = self.config.target_tps;
477        let requests_per_connection = (target_tps as f64 / connections as f64).max(1.0);
478        let request_interval = Duration::from_secs_f64(1.0 / requests_per_connection);
479
480        // Spawn connection tasks with ramp-up
481        let ramp_up_interval = self.config.ramp_up.as_millis() as usize / connections.max(1);
482        let mut handles = Vec::with_capacity(connections);
483
484        for i in 0..connections {
485            if !self.running.load(Ordering::Relaxed) {
486                break;
487            }
488
489            // Ramp-up delay
490            if ramp_up_interval > 0 && i > 0 {
491                tokio::time::sleep(Duration::from_millis(ramp_up_interval as u64)).await;
492            }
493
494            let permit = semaphore.clone().acquire_owned().await.map_err(|e| e.to_string())?;
495            let metrics = self.metrics.clone();
496            let server_addr = self.config.server_addr;
497            let duration = self.config.duration;
498            let running = self.running.clone();
499            let mut shutdown_rx = shutdown_tx.subscribe();
500
501            let handle = tokio::spawn(async move {
502                let _permit = permit;
503
504                // Connect to server
505                let stream = match timeout(Duration::from_secs(10), TcpStream::connect(server_addr)).await {
506                    Ok(Ok(s)) => s,
507                    Ok(Err(e)) => {
508                        tracing::debug!("Connection failed: {}", e);
509                        return;
510                    }
511                    Err(_) => {
512                        tracing::debug!("Connection timeout");
513                        return;
514                    }
515                };
516
517                metrics.add_connection();
518                let start = Instant::now();
519
520                // Send requests until duration expires or shutdown
521                while running.load(Ordering::Relaxed) && start.elapsed() < duration {
522                    tokio::select! {
523                        _ = shutdown_rx.recv() => break,
524                        _ = tokio::time::sleep(request_interval) => {
525                            let req_start = Instant::now();
526                            let success = Self::send_modbus_request(&stream).await;
527                            let latency = req_start.elapsed();
528                            metrics.record_request(success, latency);
529                        }
530                    }
531                }
532
533                metrics.remove_connection();
534            });
535
536            handles.push(handle);
537        }
538
539        // Wait for test duration
540        tokio::time::sleep(self.config.duration).await;
541
542        // Signal shutdown
543        let _ = shutdown_tx.send(());
544        self.running.store(false, Ordering::SeqCst);
545
546        // Wait for all connections to close
547        for handle in handles {
548            let _ = handle.await;
549        }
550
551        Ok(())
552    }
553
554    /// Send a simple Modbus read request.
555    async fn send_modbus_request(stream: &TcpStream) -> bool {
556        #![allow(unused_imports)]
557        use tokio::io::AsyncWriteExt;
558
559        // Simple Modbus TCP read holding registers request
560        // MBAP Header (7 bytes) + PDU (5 bytes)
561        let request: [u8; 12] = [
562            0x00, 0x01,             // Transaction ID
563            0x00, 0x00,             // Protocol ID
564            0x00, 0x06,             // Length
565            0x01,                   // Unit ID
566            0x03,                   // Function code: Read Holding Registers
567            0x00, 0x00,             // Starting address
568            0x00, 0x01,             // Quantity
569        ];
570
571        let mut response = [0u8; 256];
572
573        // We need to use try_write/try_read since we have &TcpStream not &mut
574        // This is a simplified version - in production, use proper framing
575        match stream.try_write(&request) {
576            Ok(_) => {
577                // Wait a bit for response
578                tokio::time::sleep(Duration::from_micros(100)).await;
579                match stream.try_read(&mut response) {
580                    Ok(n) if n > 0 => true,
581                    _ => false,
582                }
583            }
584            Err(_) => false,
585        }
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[test]
594    fn test_performance_config_default() {
595        let config = PerformanceConfig::default();
596        assert_eq!(config.target_connections, 1000);
597        assert_eq!(config.target_tps, 10_000);
598    }
599
600    #[test]
601    fn test_performance_config_presets() {
602        let config = PerformanceConfig::ten_thousand_connections();
603        assert_eq!(config.target_connections, 10_000);
604
605        let config = PerformanceConfig::hundred_thousand_tps();
606        assert_eq!(config.target_tps, 100_000);
607    }
608
609    #[test]
610    fn test_performance_target_check() {
611        let target = PerformanceTarget::min_connections(10_000);
612        assert!(target.check(10_000.0));
613        assert!(target.check(15_000.0));
614        assert!(!target.check(9_999.0));
615
616        let target = PerformanceTarget::max_p99_latency(10);
617        assert!(target.check(5.0));
618        assert!(target.check(10.0));
619        assert!(!target.check(15.0));
620    }
621
622    #[test]
623    fn test_live_metrics() {
624        let metrics = LiveMetrics::new();
625
626        metrics.add_connection();
627        assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 1);
628
629        metrics.record_request(true, Duration::from_millis(5));
630        assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 1);
631        assert_eq!(metrics.requests_success.load(Ordering::Relaxed), 1);
632
633        metrics.record_request(false, Duration::from_millis(10));
634        assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 2);
635        assert_eq!(metrics.requests_failed.load(Ordering::Relaxed), 1);
636
637        metrics.remove_connection();
638        assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 0);
639    }
640}