1use std::net::SocketAddr;
9use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use parking_lot::RwLock;
14use tokio::net::TcpStream;
15use tokio::sync::{Semaphore, broadcast};
16use tokio::time::timeout;
17
18use super::report::TestMetrics;
19
20#[derive(Debug, Clone)]
22pub struct PerformanceConfig {
23 pub target_connections: usize,
25 pub target_tps: u64,
27 pub duration: Duration,
29 pub ramp_up: Duration,
31 pub server_addr: SocketAddr,
33 pub max_p99_latency: Duration,
35 pub max_error_rate: f64,
37 pub warmup_iterations: usize,
39 pub report_interval: Duration,
41}
42
43impl Default for PerformanceConfig {
44 fn default() -> Self {
45 Self {
46 target_connections: 1000,
47 target_tps: 10_000,
48 duration: Duration::from_secs(60),
49 ramp_up: Duration::from_secs(10),
50 server_addr: "127.0.0.1:502".parse().unwrap(),
51 max_p99_latency: Duration::from_millis(10),
52 max_error_rate: 0.01,
53 warmup_iterations: 100,
54 report_interval: Duration::from_secs(5),
55 }
56 }
57}
58
59impl PerformanceConfig {
60 pub fn ten_thousand_connections() -> Self {
62 Self {
63 target_connections: 10_000,
64 target_tps: 50_000,
65 duration: Duration::from_secs(120),
66 ramp_up: Duration::from_secs(30),
67 max_p99_latency: Duration::from_millis(50),
68 ..Default::default()
69 }
70 }
71
72 pub fn hundred_thousand_tps() -> Self {
74 Self {
75 target_connections: 5_000,
76 target_tps: 100_000,
77 duration: Duration::from_secs(60),
78 ramp_up: Duration::from_secs(15),
79 max_p99_latency: Duration::from_millis(10),
80 ..Default::default()
81 }
82 }
83
84 pub fn stress_test() -> Self {
86 Self {
87 target_connections: 50_000,
88 target_tps: 500_000,
89 duration: Duration::from_secs(300),
90 ramp_up: Duration::from_secs(60),
91 max_p99_latency: Duration::from_millis(100),
92 max_error_rate: 0.05,
93 ..Default::default()
94 }
95 }
96
97 pub fn with_connections(mut self, n: usize) -> Self {
99 self.target_connections = n;
100 self
101 }
102
103 pub fn with_tps(mut self, tps: u64) -> Self {
105 self.target_tps = tps;
106 self
107 }
108
109 pub fn with_duration(mut self, duration: Duration) -> Self {
111 self.duration = duration;
112 self
113 }
114
115 pub fn with_server(mut self, addr: SocketAddr) -> Self {
117 self.server_addr = addr;
118 self
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct PerformanceTarget {
125 pub name: String,
126 pub metric: TargetMetric,
127 pub threshold: f64,
128 pub comparison: Comparison,
129}
130
131#[derive(Debug, Clone, Copy)]
132pub enum TargetMetric {
133 Connections,
134 Tps,
135 P50Latency,
136 P95Latency,
137 P99Latency,
138 ErrorRate,
139 MemoryMb,
140}
141
142#[derive(Debug, Clone, Copy)]
143pub enum Comparison {
144 GreaterThan,
145 LessThan,
146 GreaterOrEqual,
147 LessOrEqual,
148}
149
150impl PerformanceTarget {
151 pub fn min_connections(n: usize) -> Self {
152 Self {
153 name: format!("Min {} connections", n),
154 metric: TargetMetric::Connections,
155 threshold: n as f64,
156 comparison: Comparison::GreaterOrEqual,
157 }
158 }
159
160 pub fn min_tps(tps: u64) -> Self {
161 Self {
162 name: format!("Min {} TPS", tps),
163 metric: TargetMetric::Tps,
164 threshold: tps as f64,
165 comparison: Comparison::GreaterOrEqual,
166 }
167 }
168
169 pub fn max_p99_latency(ms: u64) -> Self {
170 Self {
171 name: format!("Max P99 latency {}ms", ms),
172 metric: TargetMetric::P99Latency,
173 threshold: ms as f64,
174 comparison: Comparison::LessOrEqual,
175 }
176 }
177
178 pub fn max_error_rate(rate: f64) -> Self {
179 Self {
180 name: format!("Max error rate {:.1}%", rate * 100.0),
181 metric: TargetMetric::ErrorRate,
182 threshold: rate,
183 comparison: Comparison::LessOrEqual,
184 }
185 }
186
187 pub fn check(&self, value: f64) -> bool {
189 match self.comparison {
190 Comparison::GreaterThan => value > self.threshold,
191 Comparison::LessThan => value < self.threshold,
192 Comparison::GreaterOrEqual => value >= self.threshold,
193 Comparison::LessOrEqual => value <= self.threshold,
194 }
195 }
196}
197
198#[derive(Debug, Clone)]
200pub struct TargetResult {
201 pub target: PerformanceTarget,
202 pub actual_value: f64,
203 pub passed: bool,
204}
205
206#[derive(Debug, Clone)]
208pub struct ValidationResult {
209 pub passed: bool,
210 pub targets: Vec<TargetResult>,
211 pub metrics: TestMetrics,
212 pub duration: Duration,
213 pub error_message: Option<String>,
214}
215
216impl ValidationResult {
217 pub fn summary(&self) -> String {
219 let passed_count = self.targets.iter().filter(|t| t.passed).count();
220 let total = self.targets.len();
221 format!(
222 "{} ({}/{} targets passed)",
223 if self.passed { "PASSED" } else { "FAILED" },
224 passed_count,
225 total
226 )
227 }
228}
229
230pub struct PerformanceValidator {
232 pub config: PerformanceConfig,
234 targets: Vec<PerformanceTarget>,
235 running: Arc<AtomicBool>,
236 metrics: Arc<LiveMetrics>,
237}
238
239struct LiveMetrics {
241 requests_total: AtomicU64,
242 requests_success: AtomicU64,
243 requests_failed: AtomicU64,
244 connections_active: AtomicU64,
245 connections_total: AtomicU64,
246 latencies: RwLock<Vec<Duration>>,
247}
248
249impl LiveMetrics {
250 fn new() -> Self {
251 Self {
252 requests_total: AtomicU64::new(0),
253 requests_success: AtomicU64::new(0),
254 requests_failed: AtomicU64::new(0),
255 connections_active: AtomicU64::new(0),
256 connections_total: AtomicU64::new(0),
257 latencies: RwLock::new(Vec::with_capacity(100_000)),
258 }
259 }
260
261 fn record_request(&self, success: bool, latency: Duration) {
262 self.requests_total.fetch_add(1, Ordering::Relaxed);
263 if success {
264 self.requests_success.fetch_add(1, Ordering::Relaxed);
265 } else {
266 self.requests_failed.fetch_add(1, Ordering::Relaxed);
267 }
268
269 let total = self.requests_total.load(Ordering::Relaxed);
271 if total % 100 == 0 {
272 self.latencies.write().push(latency);
273 }
274 }
275
276 fn add_connection(&self) {
277 self.connections_active.fetch_add(1, Ordering::Relaxed);
278 self.connections_total.fetch_add(1, Ordering::Relaxed);
279 }
280
281 fn remove_connection(&self) {
282 self.connections_active.fetch_sub(1, Ordering::Relaxed);
283 }
284
285 fn snapshot(&self) -> MetricsSnapshot {
286 let mut latencies = self.latencies.read().clone();
287 latencies.sort();
288
289 let p50 = latencies.get(latencies.len() / 2).copied();
290 let p95 = latencies.get(latencies.len() * 95 / 100).copied();
291 let p99 = latencies.get(latencies.len() * 99 / 100).copied();
292
293 let total = self.requests_total.load(Ordering::Relaxed);
294 let success = self.requests_success.load(Ordering::Relaxed);
295 let failed = self.requests_failed.load(Ordering::Relaxed);
296
297 MetricsSnapshot {
298 requests_total: total,
299 requests_success: success,
300 requests_failed: failed,
301 connections_active: self.connections_active.load(Ordering::Relaxed),
302 connections_total: self.connections_total.load(Ordering::Relaxed),
303 p50_latency: p50,
304 p95_latency: p95,
305 p99_latency: p99,
306 error_rate: if total > 0 { failed as f64 / total as f64 } else { 0.0 },
307 }
308 }
309}
310
311#[derive(Debug, Clone)]
312struct MetricsSnapshot {
313 requests_total: u64,
314 requests_success: u64,
315 requests_failed: u64,
316 connections_active: u64,
317 connections_total: u64,
318 p50_latency: Option<Duration>,
319 p95_latency: Option<Duration>,
320 p99_latency: Option<Duration>,
321 error_rate: f64,
322}
323
324impl PerformanceValidator {
325 pub fn new(config: PerformanceConfig) -> Self {
327 Self {
328 config,
329 targets: Vec::new(),
330 running: Arc::new(AtomicBool::new(false)),
331 metrics: Arc::new(LiveMetrics::new()),
332 }
333 }
334
335 pub fn ten_thousand_connections() -> Self {
337 let config = PerformanceConfig::ten_thousand_connections();
338 let mut validator = Self::new(config);
339 validator.add_target(PerformanceTarget::min_connections(10_000));
340 validator.add_target(PerformanceTarget::max_p99_latency(50));
341 validator.add_target(PerformanceTarget::max_error_rate(0.01));
342 validator
343 }
344
345 pub fn hundred_thousand_tps() -> Self {
347 let config = PerformanceConfig::hundred_thousand_tps();
348 let mut validator = Self::new(config);
349 validator.add_target(PerformanceTarget::min_tps(100_000));
350 validator.add_target(PerformanceTarget::max_p99_latency(10));
351 validator.add_target(PerformanceTarget::max_error_rate(0.01));
352 validator
353 }
354
355 pub fn add_target(&mut self, target: PerformanceTarget) {
357 self.targets.push(target);
358 }
359
360 pub fn server_addr(mut self, addr: SocketAddr) -> Self {
362 self.config.server_addr = addr;
363 self
364 }
365
366 pub async fn run(&self) -> Result<ValidationResult, String> {
368 tracing::info!(
369 "Starting performance validation: {} connections, {} TPS target",
370 self.config.target_connections,
371 self.config.target_tps
372 );
373
374 self.running.store(true, Ordering::SeqCst);
375 let start = Instant::now();
376
377 let metrics = self.metrics.clone();
379 let report_interval = self.config.report_interval;
380 let running = self.running.clone();
381 let reporter = tokio::spawn(async move {
382 while running.load(Ordering::Relaxed) {
383 tokio::time::sleep(report_interval).await;
384 let snapshot = metrics.snapshot();
385 tracing::info!(
386 "Progress: {} requests ({} success, {} failed), {} active connections, P99: {:?}",
387 snapshot.requests_total,
388 snapshot.requests_success,
389 snapshot.requests_failed,
390 snapshot.connections_active,
391 snapshot.p99_latency,
392 );
393 }
394 });
395
396 let result = self.run_test().await;
398
399 self.running.store(false, Ordering::SeqCst);
401 let _ = reporter.await;
402
403 let duration = start.elapsed();
404
405 let snapshot = self.metrics.snapshot();
407 let tps = if duration.as_secs() > 0 {
408 snapshot.requests_total / duration.as_secs()
409 } else {
410 0
411 };
412
413 let mut target_results = Vec::new();
415 let mut all_passed = true;
416
417 for target in &self.targets {
418 let value = match target.metric {
419 TargetMetric::Connections => snapshot.connections_total as f64,
420 TargetMetric::Tps => tps as f64,
421 TargetMetric::P50Latency => {
422 snapshot.p50_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0)
423 }
424 TargetMetric::P95Latency => {
425 snapshot.p95_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0)
426 }
427 TargetMetric::P99Latency => {
428 snapshot.p99_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0)
429 }
430 TargetMetric::ErrorRate => snapshot.error_rate,
431 TargetMetric::MemoryMb => 0.0, };
433
434 let passed = target.check(value);
435 if !passed {
436 all_passed = false;
437 }
438
439 target_results.push(TargetResult {
440 target: target.clone(),
441 actual_value: value,
442 passed,
443 });
444 }
445
446 let test_metrics = TestMetrics {
447 total_requests: snapshot.requests_total,
448 successful_requests: snapshot.requests_success,
449 failed_requests: snapshot.requests_failed,
450 total_connections: snapshot.connections_total,
451 peak_connections: snapshot.connections_active, avg_tps: tps,
453 peak_tps: tps, p50_latency_ms: snapshot.p50_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0),
455 p95_latency_ms: snapshot.p95_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0),
456 p99_latency_ms: snapshot.p99_latency.map(|d| d.as_millis() as f64).unwrap_or(0.0),
457 error_rate: snapshot.error_rate,
458 memory_peak_mb: 0.0, };
460
461 Ok(ValidationResult {
462 passed: all_passed && result.is_ok(),
463 targets: target_results,
464 metrics: test_metrics,
465 duration,
466 error_message: result.err(),
467 })
468 }
469
470 async fn run_test(&self) -> Result<(), String> {
471 let semaphore = Arc::new(Semaphore::new(self.config.target_connections));
472 let (shutdown_tx, _) = broadcast::channel::<()>(1);
473
474 let connections = self.config.target_connections;
476 let target_tps = self.config.target_tps;
477 let requests_per_connection = (target_tps as f64 / connections as f64).max(1.0);
478 let request_interval = Duration::from_secs_f64(1.0 / requests_per_connection);
479
480 let ramp_up_interval = self.config.ramp_up.as_millis() as usize / connections.max(1);
482 let mut handles = Vec::with_capacity(connections);
483
484 for i in 0..connections {
485 if !self.running.load(Ordering::Relaxed) {
486 break;
487 }
488
489 if ramp_up_interval > 0 && i > 0 {
491 tokio::time::sleep(Duration::from_millis(ramp_up_interval as u64)).await;
492 }
493
494 let permit = semaphore.clone().acquire_owned().await.map_err(|e| e.to_string())?;
495 let metrics = self.metrics.clone();
496 let server_addr = self.config.server_addr;
497 let duration = self.config.duration;
498 let running = self.running.clone();
499 let mut shutdown_rx = shutdown_tx.subscribe();
500
501 let handle = tokio::spawn(async move {
502 let _permit = permit;
503
504 let stream = match timeout(Duration::from_secs(10), TcpStream::connect(server_addr)).await {
506 Ok(Ok(s)) => s,
507 Ok(Err(e)) => {
508 tracing::debug!("Connection failed: {}", e);
509 return;
510 }
511 Err(_) => {
512 tracing::debug!("Connection timeout");
513 return;
514 }
515 };
516
517 metrics.add_connection();
518 let start = Instant::now();
519
520 while running.load(Ordering::Relaxed) && start.elapsed() < duration {
522 tokio::select! {
523 _ = shutdown_rx.recv() => break,
524 _ = tokio::time::sleep(request_interval) => {
525 let req_start = Instant::now();
526 let success = Self::send_modbus_request(&stream).await;
527 let latency = req_start.elapsed();
528 metrics.record_request(success, latency);
529 }
530 }
531 }
532
533 metrics.remove_connection();
534 });
535
536 handles.push(handle);
537 }
538
539 tokio::time::sleep(self.config.duration).await;
541
542 let _ = shutdown_tx.send(());
544 self.running.store(false, Ordering::SeqCst);
545
546 for handle in handles {
548 let _ = handle.await;
549 }
550
551 Ok(())
552 }
553
554 async fn send_modbus_request(stream: &TcpStream) -> bool {
556 #![allow(unused_imports)]
557 use tokio::io::AsyncWriteExt;
558
559 let request: [u8; 12] = [
562 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01, ];
570
571 let mut response = [0u8; 256];
572
573 match stream.try_write(&request) {
576 Ok(_) => {
577 tokio::time::sleep(Duration::from_micros(100)).await;
579 match stream.try_read(&mut response) {
580 Ok(n) if n > 0 => true,
581 _ => false,
582 }
583 }
584 Err(_) => false,
585 }
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_performance_config_default() {
595 let config = PerformanceConfig::default();
596 assert_eq!(config.target_connections, 1000);
597 assert_eq!(config.target_tps, 10_000);
598 }
599
600 #[test]
601 fn test_performance_config_presets() {
602 let config = PerformanceConfig::ten_thousand_connections();
603 assert_eq!(config.target_connections, 10_000);
604
605 let config = PerformanceConfig::hundred_thousand_tps();
606 assert_eq!(config.target_tps, 100_000);
607 }
608
609 #[test]
610 fn test_performance_target_check() {
611 let target = PerformanceTarget::min_connections(10_000);
612 assert!(target.check(10_000.0));
613 assert!(target.check(15_000.0));
614 assert!(!target.check(9_999.0));
615
616 let target = PerformanceTarget::max_p99_latency(10);
617 assert!(target.check(5.0));
618 assert!(target.check(10.0));
619 assert!(!target.check(15.0));
620 }
621
622 #[test]
623 fn test_live_metrics() {
624 let metrics = LiveMetrics::new();
625
626 metrics.add_connection();
627 assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 1);
628
629 metrics.record_request(true, Duration::from_millis(5));
630 assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 1);
631 assert_eq!(metrics.requests_success.load(Ordering::Relaxed), 1);
632
633 metrics.record_request(false, Duration::from_millis(10));
634 assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 2);
635 assert_eq!(metrics.requests_failed.load(Ordering::Relaxed), 1);
636
637 metrics.remove_connection();
638 assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 0);
639 }
640}