1use parking_lot::RwLock;
41use rand::Rng;
42use serde::{Deserialize, Serialize};
43use std::collections::HashMap;
44use std::sync::Arc;
45use std::time::{Duration, Instant};
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct LoadTestConfig {
50 pub duration: Duration,
52 pub connection_target: usize,
54 pub query_rate: u64,
56 pub bandwidth_target: u64,
58 pub provider_publish_rate: u64,
60 pub concurrent_operations: usize,
62 pub memory_limit: u64,
64 pub warmup_duration: Duration,
66 pub rampup_duration: Duration,
68}
69
70impl Default for LoadTestConfig {
71 fn default() -> Self {
72 Self {
73 duration: Duration::from_secs(300), connection_target: 100,
75 query_rate: 10,
76 bandwidth_target: 10_000_000, provider_publish_rate: 5,
78 concurrent_operations: 50,
79 memory_limit: 512 * 1024 * 1024, warmup_duration: Duration::from_secs(10),
81 rampup_duration: Duration::from_secs(30),
82 }
83 }
84}
85
86impl LoadTestConfig {
87 pub fn light() -> Self {
89 Self {
90 duration: Duration::from_secs(60),
91 connection_target: 20,
92 query_rate: 5,
93 bandwidth_target: 1_000_000, provider_publish_rate: 2,
95 concurrent_operations: 10,
96 memory_limit: 128 * 1024 * 1024, warmup_duration: Duration::from_secs(5),
98 rampup_duration: Duration::from_secs(10),
99 }
100 }
101
102 pub fn moderate() -> Self {
104 Self::default()
105 }
106
107 pub fn heavy() -> Self {
109 Self {
110 duration: Duration::from_secs(600), connection_target: 500,
112 query_rate: 100,
113 bandwidth_target: 100_000_000, provider_publish_rate: 20,
115 concurrent_operations: 200,
116 memory_limit: 2 * 1024 * 1024 * 1024, warmup_duration: Duration::from_secs(30),
118 rampup_duration: Duration::from_secs(60),
119 }
120 }
121
122 pub fn extreme() -> Self {
124 Self {
125 duration: Duration::from_secs(1200), connection_target: 2000,
127 query_rate: 500,
128 bandwidth_target: 1_000_000_000, provider_publish_rate: 100,
130 concurrent_operations: 1000,
131 memory_limit: 8 * 1024 * 1024 * 1024, warmup_duration: Duration::from_secs(60),
133 rampup_duration: Duration::from_secs(120),
134 }
135 }
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
140pub enum LoadTestType {
141 ConnectionStress,
143 DhtQueryStorm,
145 BandwidthSaturation,
147 ProviderFlood,
149 ConcurrentOps,
151 MemoryPressure,
153 ComprehensiveSuite,
155}
156
157impl LoadTestType {
158 pub fn name(&self) -> &'static str {
160 match self {
161 Self::ConnectionStress => "Connection Stress Test",
162 Self::DhtQueryStorm => "DHT Query Storm",
163 Self::BandwidthSaturation => "Bandwidth Saturation Test",
164 Self::ProviderFlood => "Provider Record Flood",
165 Self::ConcurrentOps => "Concurrent Operations Test",
166 Self::MemoryPressure => "Memory Pressure Test",
167 Self::ComprehensiveSuite => "Comprehensive Suite",
168 }
169 }
170
171 pub fn description(&self) -> &'static str {
173 match self {
174 Self::ConnectionStress => "Tests network behavior under many simultaneous connections",
175 Self::DhtQueryStorm => "Stress-tests DHT with high volume of queries",
176 Self::BandwidthSaturation => "Tests throughput limits and bandwidth handling",
177 Self::ProviderFlood => "Tests provider record publishing and querying at scale",
178 Self::ConcurrentOps => "Tests system behavior under many concurrent operations",
179 Self::MemoryPressure => "Tests behavior under memory constraints",
180 Self::ComprehensiveSuite => "Runs all load tests sequentially",
181 }
182 }
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct LoadTestResults {
188 pub test_type: LoadTestType,
190 pub passed: bool,
192 pub duration: Duration,
194 pub peak_connections: usize,
196 pub average_latency: Duration,
198 pub p95_latency: Duration,
200 pub p99_latency: Duration,
202 pub total_queries: u64,
204 pub successful_queries: u64,
206 pub failed_queries: u64,
208 pub total_bytes_sent: u64,
210 pub total_bytes_received: u64,
212 pub peak_memory_usage: u64,
214 pub average_memory_usage: u64,
216 pub throughput_bps: u64,
218 pub query_rate_achieved: f64,
220 pub errors: Vec<String>,
222 pub performance_timeline: HashMap<String, Vec<(Duration, f64)>>,
224}
225
226impl LoadTestResults {
227 pub fn new(test_type: LoadTestType) -> Self {
229 Self {
230 test_type,
231 passed: false,
232 duration: Duration::ZERO,
233 peak_connections: 0,
234 average_latency: Duration::ZERO,
235 p95_latency: Duration::ZERO,
236 p99_latency: Duration::ZERO,
237 total_queries: 0,
238 successful_queries: 0,
239 failed_queries: 0,
240 total_bytes_sent: 0,
241 total_bytes_received: 0,
242 peak_memory_usage: 0,
243 average_memory_usage: 0,
244 throughput_bps: 0,
245 query_rate_achieved: 0.0,
246 errors: Vec::new(),
247 performance_timeline: HashMap::new(),
248 }
249 }
250
251 pub fn success_rate(&self) -> f64 {
253 if self.total_queries == 0 {
254 return 0.0;
255 }
256 (self.successful_queries as f64 / self.total_queries as f64) * 100.0
257 }
258
259 pub fn throughput_human(&self) -> String {
261 crate::utils::format_bandwidth(self.throughput_bps as usize)
262 }
263
264 pub fn summary(&self) -> String {
266 format!(
267 "{}: {} | Connections: {} | Latency: {:?} (avg), {:?} (p95) | \
268 Queries: {}/{} ({:.1}%) | Throughput: {} | Memory: {}",
269 self.test_type.name(),
270 if self.passed { "PASS" } else { "FAIL" },
271 self.peak_connections,
272 self.average_latency,
273 self.p95_latency,
274 self.successful_queries,
275 self.total_queries,
276 self.success_rate(),
277 self.throughput_human(),
278 crate::utils::format_bytes(self.peak_memory_usage as usize),
279 )
280 }
281}
282
283pub struct LoadTester {
285 config: LoadTestConfig,
286 metrics: Arc<RwLock<LoadTestMetrics>>,
287}
288
289#[derive(Debug, Default, Clone)]
291pub struct LoadTestMetrics {
292 pub start_time: Option<Instant>,
294 pub connections: usize,
296 pub peak_connections: usize,
298 pub queries_sent: u64,
300 pub queries_succeeded: u64,
302 pub queries_failed: u64,
304 pub bytes_sent: u64,
306 pub bytes_received: u64,
308 pub latencies: Vec<Duration>,
310 pub memory_samples: Vec<u64>,
312 pub errors: Vec<String>,
314}
315
316impl LoadTester {
317 pub fn new(config: LoadTestConfig) -> Self {
319 Self {
320 config,
321 metrics: Arc::new(RwLock::new(LoadTestMetrics::default())),
322 }
323 }
324
325 pub fn run_test(&mut self, test_type: LoadTestType) -> Result<LoadTestResults, LoadTestError> {
327 match test_type {
328 LoadTestType::ConnectionStress => self.run_connection_stress(),
329 LoadTestType::DhtQueryStorm => self.run_dht_query_storm(),
330 LoadTestType::BandwidthSaturation => self.run_bandwidth_saturation(),
331 LoadTestType::ProviderFlood => self.run_provider_flood(),
332 LoadTestType::ConcurrentOps => self.run_concurrent_ops(),
333 LoadTestType::MemoryPressure => self.run_memory_pressure(),
334 LoadTestType::ComprehensiveSuite => self.run_comprehensive_suite(),
335 }
336 }
337
338 fn run_connection_stress(&mut self) -> Result<LoadTestResults, LoadTestError> {
340 let start = Instant::now();
341 let mut results = LoadTestResults::new(LoadTestType::ConnectionStress);
342
343 *self.metrics.write() = LoadTestMetrics {
345 start_time: Some(start),
346 ..Default::default()
347 };
348
349 let target = self.config.connection_target;
351 let mut current_connections = 0;
352
353 while current_connections < target {
354 current_connections += 1;
355 {
356 let mut metrics = self.metrics.write();
357 metrics.connections = current_connections;
358 metrics.peak_connections = metrics.peak_connections.max(current_connections);
359 }
360
361 std::thread::sleep(Duration::from_millis(10));
363 }
364
365 std::thread::sleep(self.config.duration);
367
368 let metrics = self.metrics.read();
370 results.peak_connections = metrics.peak_connections;
371 results.duration = start.elapsed();
372 results.passed = metrics.peak_connections >= self.config.connection_target;
373
374 Ok(results)
375 }
376
377 fn run_dht_query_storm(&mut self) -> Result<LoadTestResults, LoadTestError> {
379 let start = Instant::now();
380 let mut results = LoadTestResults::new(LoadTestType::DhtQueryStorm);
381
382 *self.metrics.write() = LoadTestMetrics {
384 start_time: Some(start),
385 ..Default::default()
386 };
387
388 let target_queries =
389 (self.config.query_rate as f64 * self.config.duration.as_secs_f64()).max(1.0) as u64;
390
391 for _ in 0..target_queries {
392 self.metrics.write().queries_sent += 1;
394
395 let mut rng = rand::rng();
397 if rng.random::<f64>() < 0.95 {
398 self.metrics.write().queries_succeeded += 1;
399 } else {
400 self.metrics.write().queries_failed += 1;
401 }
402
403 let latency_ms = rng.random_range(10..100);
405 let latency = Duration::from_millis(latency_ms);
406 self.metrics.write().latencies.push(latency);
407
408 std::thread::sleep(Duration::from_micros(1000 / self.config.query_rate.max(1)));
409 }
410
411 let metrics = self.metrics.read();
413 results.total_queries = metrics.queries_sent;
414 results.successful_queries = metrics.queries_succeeded;
415 results.failed_queries = metrics.queries_failed;
416 results.duration = start.elapsed();
417 results.query_rate_achieved = results.total_queries as f64 / results.duration.as_secs_f64();
418
419 if !metrics.latencies.is_empty() {
420 let mut sorted_latencies = metrics.latencies.clone();
421 sorted_latencies.sort();
422
423 let sum: Duration = sorted_latencies.iter().sum();
424 results.average_latency = sum / sorted_latencies.len() as u32;
425
426 let p95_idx = (sorted_latencies.len() as f64 * 0.95) as usize;
427 let p99_idx = (sorted_latencies.len() as f64 * 0.99) as usize;
428 results.p95_latency = sorted_latencies
429 .get(p95_idx)
430 .copied()
431 .unwrap_or(Duration::ZERO);
432 results.p99_latency = sorted_latencies
433 .get(p99_idx)
434 .copied()
435 .unwrap_or(Duration::ZERO);
436 }
437
438 results.passed = results.success_rate() >= 95.0;
439
440 Ok(results)
441 }
442
443 fn run_bandwidth_saturation(&mut self) -> Result<LoadTestResults, LoadTestError> {
445 let start = Instant::now();
446 let mut results = LoadTestResults::new(LoadTestType::BandwidthSaturation);
447
448 *self.metrics.write() = LoadTestMetrics {
450 start_time: Some(start),
451 ..Default::default()
452 };
453
454 let target_bytes = (self.config.bandwidth_target as f64
455 * self.config.duration.as_secs_f64())
456 .max(1024.0) as u64;
457 let mut bytes_transferred = 0u64;
458
459 while bytes_transferred < target_bytes {
460 let chunk_size = 1024 * 1024; bytes_transferred += chunk_size;
462
463 self.metrics.write().bytes_sent += chunk_size / 2;
464 self.metrics.write().bytes_received += chunk_size / 2;
465
466 std::thread::sleep(Duration::from_millis(10));
467 }
468
469 let metrics = self.metrics.read();
471 results.total_bytes_sent = metrics.bytes_sent;
472 results.total_bytes_received = metrics.bytes_received;
473 results.duration = start.elapsed();
474 results.throughput_bps =
475 (metrics.bytes_sent + metrics.bytes_received) / results.duration.as_secs().max(1);
476 results.passed = results.throughput_bps >= self.config.bandwidth_target;
477
478 Ok(results)
479 }
480
481 fn run_provider_flood(&mut self) -> Result<LoadTestResults, LoadTestError> {
483 let start = Instant::now();
484 let mut results = LoadTestResults::new(LoadTestType::ProviderFlood);
485
486 *self.metrics.write() = LoadTestMetrics {
488 start_time: Some(start),
489 ..Default::default()
490 };
491
492 let target_records = (self.config.provider_publish_rate as f64
493 * self.config.duration.as_secs_f64())
494 .max(1.0) as u64;
495
496 for _ in 0..target_records {
497 self.metrics.write().queries_sent += 1;
499
500 let mut rng = rand::rng();
501 if rng.random::<f64>() < 0.98 {
502 self.metrics.write().queries_succeeded += 1;
503 } else {
504 self.metrics.write().queries_failed += 1;
505 }
506
507 std::thread::sleep(Duration::from_micros(
508 1000 / self.config.provider_publish_rate.max(1),
509 ));
510 }
511
512 let metrics = self.metrics.read();
514 results.total_queries = metrics.queries_sent;
515 results.successful_queries = metrics.queries_succeeded;
516 results.failed_queries = metrics.queries_failed;
517 results.duration = start.elapsed();
518 results.passed = results.success_rate() >= 98.0;
519
520 Ok(results)
521 }
522
523 fn run_concurrent_ops(&mut self) -> Result<LoadTestResults, LoadTestError> {
525 let start = Instant::now();
526 let mut results = LoadTestResults::new(LoadTestType::ConcurrentOps);
527
528 *self.metrics.write() = LoadTestMetrics {
530 start_time: Some(start),
531 ..Default::default()
532 };
533
534 let mut rng = rand::rng();
536 for _ in 0..self.config.concurrent_operations {
537 self.metrics.write().queries_sent += 1;
538
539 if rng.random::<f64>() < 0.90 {
540 self.metrics.write().queries_succeeded += 1;
541 } else {
542 self.metrics.write().queries_failed += 1;
543 }
544 }
545
546 std::thread::sleep(self.config.duration);
547
548 let metrics = self.metrics.read();
550 results.total_queries = metrics.queries_sent;
551 results.successful_queries = metrics.queries_succeeded;
552 results.failed_queries = metrics.queries_failed;
553 results.duration = start.elapsed();
554 results.passed = results.success_rate() >= 90.0;
555
556 Ok(results)
557 }
558
559 fn run_memory_pressure(&mut self) -> Result<LoadTestResults, LoadTestError> {
561 let start = Instant::now();
562 let mut results = LoadTestResults::new(LoadTestType::MemoryPressure);
563
564 *self.metrics.write() = LoadTestMetrics {
566 start_time: Some(start),
567 ..Default::default()
568 };
569
570 let samples = (self.config.duration.as_secs() / 10).max(1);
572 let step = self.config.memory_limit / samples;
573
574 for i in 0..samples {
575 let memory_used = step * (i + 1);
576 self.metrics.write().memory_samples.push(memory_used);
577 std::thread::sleep(Duration::from_secs(10));
578 }
579
580 let metrics = self.metrics.read();
582 if !metrics.memory_samples.is_empty() {
583 results.peak_memory_usage = *metrics.memory_samples.iter().max().unwrap();
584 results.average_memory_usage =
585 metrics.memory_samples.iter().sum::<u64>() / metrics.memory_samples.len() as u64;
586 }
587 results.duration = start.elapsed();
588 results.passed = results.peak_memory_usage <= self.config.memory_limit;
589
590 Ok(results)
591 }
592
593 fn run_comprehensive_suite(&mut self) -> Result<LoadTestResults, LoadTestError> {
595 let start = Instant::now();
596 let mut combined = LoadTestResults::new(LoadTestType::ComprehensiveSuite);
597
598 let tests = vec![
599 LoadTestType::ConnectionStress,
600 LoadTestType::DhtQueryStorm,
601 LoadTestType::BandwidthSaturation,
602 LoadTestType::ProviderFlood,
603 LoadTestType::ConcurrentOps,
604 LoadTestType::MemoryPressure,
605 ];
606
607 let mut all_passed = true;
608
609 for test_type in tests {
610 match self.run_test(test_type) {
611 Ok(result) => {
612 if !result.passed {
613 all_passed = false;
614 combined.errors.push(format!("{} failed", test_type.name()));
615 }
616 combined.total_queries += result.total_queries;
618 combined.successful_queries += result.successful_queries;
619 combined.failed_queries += result.failed_queries;
620 combined.peak_connections =
621 combined.peak_connections.max(result.peak_connections);
622 combined.peak_memory_usage =
623 combined.peak_memory_usage.max(result.peak_memory_usage);
624 }
625 Err(e) => {
626 all_passed = false;
627 combined.errors.push(format!("{}: {}", test_type.name(), e));
628 }
629 }
630 }
631
632 combined.duration = start.elapsed();
633 combined.passed = all_passed;
634
635 Ok(combined)
636 }
637
638 pub fn get_metrics_snapshot(&self) -> LoadTestMetrics {
640 self.metrics.read().clone()
641 }
642}
643
644#[derive(Debug, thiserror::Error)]
646pub enum LoadTestError {
647 #[error("Load test failed: {0}")]
648 TestFailed(String),
649
650 #[error("Configuration error: {0}")]
651 ConfigError(String),
652
653 #[error("Timeout reached")]
654 Timeout,
655
656 #[error("Resource limit exceeded: {0}")]
657 ResourceLimit(String),
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663
664 #[test]
665 fn test_config_presets() {
666 let light = LoadTestConfig::light();
667 assert_eq!(light.connection_target, 20);
668
669 let moderate = LoadTestConfig::moderate();
670 assert_eq!(moderate.connection_target, 100);
671
672 let heavy = LoadTestConfig::heavy();
673 assert_eq!(heavy.connection_target, 500);
674
675 let extreme = LoadTestConfig::extreme();
676 assert_eq!(extreme.connection_target, 2000);
677 }
678
679 #[test]
680 fn test_load_test_types() {
681 assert_eq!(
682 LoadTestType::ConnectionStress.name(),
683 "Connection Stress Test"
684 );
685 assert!(!LoadTestType::DhtQueryStorm.description().is_empty());
686 }
687
688 #[test]
689 fn test_results_creation() {
690 let results = LoadTestResults::new(LoadTestType::ConnectionStress);
691 assert_eq!(results.test_type, LoadTestType::ConnectionStress);
692 assert!(!results.passed);
693 assert_eq!(results.total_queries, 0);
694 }
695
696 #[test]
697 fn test_success_rate() {
698 let mut results = LoadTestResults::new(LoadTestType::DhtQueryStorm);
699 results.total_queries = 100;
700 results.successful_queries = 95;
701 assert_eq!(results.success_rate(), 95.0);
702 }
703
704 #[test]
705 fn test_tester_creation() {
706 let config = LoadTestConfig::light();
707 let tester = LoadTester::new(config);
708 assert_eq!(tester.config.connection_target, 20);
709 }
710
711 #[test]
712 fn test_connection_stress() {
713 let config = LoadTestConfig {
714 duration: Duration::from_millis(100),
715 connection_target: 10,
716 ..LoadTestConfig::light()
717 };
718 let mut tester = LoadTester::new(config);
719 let results = tester.run_test(LoadTestType::ConnectionStress).unwrap();
720 assert!(results.peak_connections > 0);
721 }
722
723 #[test]
724 fn test_dht_query_storm() {
725 let config = LoadTestConfig {
726 duration: Duration::from_millis(100),
727 query_rate: 10,
728 ..LoadTestConfig::light()
729 };
730 let mut tester = LoadTester::new(config);
731 let results = tester.run_test(LoadTestType::DhtQueryStorm).unwrap();
732 assert!(results.total_queries > 0);
733 }
734
735 #[test]
736 fn test_bandwidth_saturation() {
737 let config = LoadTestConfig {
738 duration: Duration::from_millis(100),
739 bandwidth_target: 1_000_000,
740 ..LoadTestConfig::light()
741 };
742 let mut tester = LoadTester::new(config);
743 let results = tester.run_test(LoadTestType::BandwidthSaturation).unwrap();
744 assert!(results.total_bytes_sent > 0 || results.total_bytes_received > 0);
745 }
746
747 #[test]
748 fn test_provider_flood() {
749 let config = LoadTestConfig {
750 duration: Duration::from_millis(100),
751 provider_publish_rate: 10,
752 ..LoadTestConfig::light()
753 };
754 let mut tester = LoadTester::new(config);
755 let results = tester.run_test(LoadTestType::ProviderFlood).unwrap();
756 assert!(results.total_queries > 0);
757 }
758
759 #[test]
760 fn test_concurrent_ops() {
761 let config = LoadTestConfig {
762 duration: Duration::from_millis(100),
763 concurrent_operations: 20,
764 ..LoadTestConfig::light()
765 };
766 let mut tester = LoadTester::new(config);
767 let results = tester.run_test(LoadTestType::ConcurrentOps).unwrap();
768 assert_eq!(results.total_queries, 20);
769 }
770
771 #[test]
772 fn test_memory_pressure() {
773 let config = LoadTestConfig {
774 duration: Duration::from_millis(100),
775 memory_limit: 100 * 1024 * 1024,
776 ..LoadTestConfig::light()
777 };
778 let mut tester = LoadTester::new(config);
779 let results = tester.run_test(LoadTestType::MemoryPressure).unwrap();
780 assert!(results.peak_memory_usage > 0);
781 }
782
783 #[test]
784 fn test_results_summary() {
785 let mut results = LoadTestResults::new(LoadTestType::ConnectionStress);
786 results.passed = true;
787 results.peak_connections = 100;
788 let summary = results.summary();
789 assert!(summary.contains("PASS"));
790 assert!(summary.contains("100"));
791 }
792
793 #[test]
794 fn test_metrics_snapshot() {
795 let config = LoadTestConfig::light();
796 let tester = LoadTester::new(config);
797 let snapshot = tester.get_metrics_snapshot();
798 assert_eq!(snapshot.connections, 0);
799 }
800}