1use dashmap::DashMap;
44use parking_lot::RwLock;
45use std::sync::Arc;
46use std::time::{Duration, Instant};
47use thiserror::Error;
48use tokio::time::sleep;
49
50#[derive(Debug, Error)]
52pub enum SimulatorError {
53 #[error("Simulator is not running")]
55 NotRunning,
56
57 #[error("Simulator is already running")]
59 AlreadyRunning,
60
61 #[error("Invalid configuration: {0}")]
63 InvalidConfig(String),
64
65 #[error("Internal error: {0}")]
67 Internal(String),
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum NetworkCondition {
73 Perfect,
75 Good,
77 Fair,
79 Poor,
81 VeryPoor,
83 Mobile3G,
85 Mobile4G,
87 Mobile5G,
89 Satellite,
91 Custom,
93}
94
95#[derive(Debug, Clone)]
97pub struct SimulatorConfig {
98 pub base_latency_ms: u64,
100
101 pub latency_variance_ms: u64,
103
104 pub packet_loss_rate: f64,
106
107 pub bandwidth_limit_bps: u64,
109
110 pub spike_probability: f64,
112
113 pub spike_multiplier: u64,
115
116 pub enable_reordering: bool,
118
119 pub reorder_probability: f64,
121
122 pub max_reorder_delay_ms: u64,
124}
125
126impl Default for SimulatorConfig {
127 fn default() -> Self {
128 Self {
129 base_latency_ms: 0,
130 latency_variance_ms: 0,
131 packet_loss_rate: 0.0,
132 bandwidth_limit_bps: 0,
133 spike_probability: 0.0,
134 spike_multiplier: 10,
135 enable_reordering: false,
136 reorder_probability: 0.0,
137 max_reorder_delay_ms: 100,
138 }
139 }
140}
141
142impl SimulatorConfig {
143 pub fn from_condition(condition: NetworkCondition) -> Self {
145 match condition {
146 NetworkCondition::Perfect => Self::default(),
147 NetworkCondition::Good => Self {
148 base_latency_ms: 5,
149 latency_variance_ms: 2,
150 packet_loss_rate: 0.001,
151 ..Default::default()
152 },
153 NetworkCondition::Fair => Self {
154 base_latency_ms: 50,
155 latency_variance_ms: 10,
156 packet_loss_rate: 0.01,
157 spike_probability: 0.05,
158 ..Default::default()
159 },
160 NetworkCondition::Poor => Self {
161 base_latency_ms: 200,
162 latency_variance_ms: 50,
163 packet_loss_rate: 0.05,
164 spike_probability: 0.1,
165 spike_multiplier: 5,
166 ..Default::default()
167 },
168 NetworkCondition::VeryPoor => Self {
169 base_latency_ms: 500,
170 latency_variance_ms: 150,
171 packet_loss_rate: 0.1,
172 spike_probability: 0.2,
173 spike_multiplier: 10,
174 enable_reordering: true,
175 reorder_probability: 0.05,
176 ..Default::default()
177 },
178 NetworkCondition::Mobile3G => Self {
179 base_latency_ms: 100,
180 latency_variance_ms: 50,
181 packet_loss_rate: 0.02,
182 bandwidth_limit_bps: 384_000, spike_probability: 0.1,
184 ..Default::default()
185 },
186 NetworkCondition::Mobile4G => Self {
187 base_latency_ms: 50,
188 latency_variance_ms: 20,
189 packet_loss_rate: 0.01,
190 bandwidth_limit_bps: 10_000_000, spike_probability: 0.05,
192 ..Default::default()
193 },
194 NetworkCondition::Mobile5G => Self {
195 base_latency_ms: 10,
196 latency_variance_ms: 5,
197 packet_loss_rate: 0.005,
198 bandwidth_limit_bps: 100_000_000, ..Default::default()
200 },
201 NetworkCondition::Satellite => Self {
202 base_latency_ms: 600,
203 latency_variance_ms: 100,
204 packet_loss_rate: 0.03,
205 bandwidth_limit_bps: 1_000_000, spike_probability: 0.15,
207 ..Default::default()
208 },
209 NetworkCondition::Custom => Self::default(),
210 }
211 }
212
213 pub fn validate(&self) -> Result<(), SimulatorError> {
215 if self.packet_loss_rate < 0.0 || self.packet_loss_rate > 1.0 {
216 return Err(SimulatorError::InvalidConfig(
217 "packet_loss_rate must be between 0.0 and 1.0".to_string(),
218 ));
219 }
220
221 if self.spike_probability < 0.0 || self.spike_probability > 1.0 {
222 return Err(SimulatorError::InvalidConfig(
223 "spike_probability must be between 0.0 and 1.0".to_string(),
224 ));
225 }
226
227 if self.reorder_probability < 0.0 || self.reorder_probability > 1.0 {
228 return Err(SimulatorError::InvalidConfig(
229 "reorder_probability must be between 0.0 and 1.0".to_string(),
230 ));
231 }
232
233 Ok(())
234 }
235}
236
237#[derive(Debug, Clone, Default)]
239pub struct SimulatorStats {
240 pub packets_processed: u64,
242
243 pub packets_dropped: u64,
245
246 pub packets_delayed: u64,
248
249 pub packets_reordered: u64,
251
252 pub bytes_processed: u64,
254
255 pub avg_latency_ms: f64,
257
258 pub max_latency_ms: u64,
260
261 pub latency_spikes: u64,
263
264 pub start_time: Option<Instant>,
266
267 pub duration: Duration,
269}
270
271impl SimulatorStats {
272 pub fn packet_loss_rate(&self) -> f64 {
274 if self.packets_processed == 0 {
275 0.0
276 } else {
277 self.packets_dropped as f64 / self.packets_processed as f64
278 }
279 }
280
281 pub fn throughput_bps(&self) -> f64 {
283 if self.duration.as_secs_f64() == 0.0 {
284 0.0
285 } else {
286 self.bytes_processed as f64 / self.duration.as_secs_f64()
287 }
288 }
289}
290
291pub struct NetworkSimulator {
293 config: SimulatorConfig,
294 stats: Arc<RwLock<SimulatorStats>>,
295 running: Arc<RwLock<bool>>,
296 partitions: Arc<DashMap<String, Vec<String>>>,
297}
298
299impl NetworkSimulator {
300 pub fn new(config: SimulatorConfig) -> Self {
302 Self {
303 config,
304 stats: Arc::new(RwLock::new(SimulatorStats::default())),
305 running: Arc::new(RwLock::new(false)),
306 partitions: Arc::new(DashMap::new()),
307 }
308 }
309
310 pub fn from_condition(condition: NetworkCondition) -> Self {
312 Self::new(SimulatorConfig::from_condition(condition))
313 }
314
315 pub async fn start(&self) -> Result<(), SimulatorError> {
317 self.config.validate()?;
318
319 let mut running = self.running.write();
320 if *running {
321 return Err(SimulatorError::AlreadyRunning);
322 }
323
324 *running = true;
325
326 let mut stats = self.stats.write();
327 stats.start_time = Some(Instant::now());
328 stats.duration = Duration::from_secs(0);
329
330 Ok(())
331 }
332
333 pub async fn stop(&self) -> Result<(), SimulatorError> {
335 let mut running = self.running.write();
336 if !*running {
337 return Err(SimulatorError::NotRunning);
338 }
339
340 *running = false;
341
342 let mut stats = self.stats.write();
343 if let Some(start_time) = stats.start_time {
344 stats.duration = start_time.elapsed();
345 }
346
347 Ok(())
348 }
349
350 pub fn is_running(&self) -> bool {
352 *self.running.read()
353 }
354
355 pub async fn delay_packet(&self, packet_size: usize) -> Result<bool, SimulatorError> {
357 if !self.is_running() {
358 return Ok(true);
359 }
360
361 {
363 let mut stats = self.stats.write();
364 stats.packets_processed += 1;
365 stats.bytes_processed += packet_size as u64;
366 }
367
368 if rand::random::<f64>() < self.config.packet_loss_rate {
370 let mut stats = self.stats.write();
371 stats.packets_dropped += 1;
372 return Ok(false); }
374
375 let mut latency_ms = self.config.base_latency_ms;
377
378 if self.config.latency_variance_ms > 0 {
380 let variance = (rand::random::<f64>() * self.config.latency_variance_ms as f64) as u64;
381 latency_ms += variance;
382 }
383
384 if rand::random::<f64>() < self.config.spike_probability {
386 latency_ms *= self.config.spike_multiplier;
387 let mut stats = self.stats.write();
388 stats.latency_spikes += 1;
389 }
390
391 {
393 let mut stats = self.stats.write();
394 if latency_ms > stats.max_latency_ms {
395 stats.max_latency_ms = latency_ms;
396 }
397
398 let alpha = 0.3;
400 stats.avg_latency_ms = alpha * latency_ms as f64 + (1.0 - alpha) * stats.avg_latency_ms;
401 }
402
403 if latency_ms > 0 {
405 sleep(Duration::from_millis(latency_ms)).await;
406
407 let mut stats = self.stats.write();
408 stats.packets_delayed += 1;
409 }
410
411 if self.config.enable_reordering && rand::random::<f64>() < self.config.reorder_probability
413 {
414 let reorder_delay =
415 (rand::random::<f64>() * self.config.max_reorder_delay_ms as f64) as u64;
416 sleep(Duration::from_millis(reorder_delay)).await;
417
418 let mut stats = self.stats.write();
419 stats.packets_reordered += 1;
420 }
421
422 Ok(true) }
424
425 pub fn create_partition(&self, group1: Vec<String>, group2: Vec<String>) {
427 for peer in &group1 {
428 self.partitions.insert(peer.clone(), group2.clone());
429 }
430 for peer in &group2 {
431 self.partitions.insert(peer.clone(), group1.clone());
432 }
433 }
434
435 pub fn clear_partitions(&self) {
437 self.partitions.clear();
438 }
439
440 pub fn is_partitioned(&self, peer1: &str, peer2: &str) -> bool {
442 if let Some(partitioned_peers) = self.partitions.get(peer1) {
443 partitioned_peers.contains(&peer2.to_string())
444 } else {
445 false
446 }
447 }
448
449 pub fn stats(&self) -> SimulatorStats {
451 let mut stats = self.stats.read().clone();
452 if self.is_running() {
453 if let Some(start_time) = stats.start_time {
454 stats.duration = start_time.elapsed();
455 }
456 }
457 stats
458 }
459
460 pub fn reset_stats(&self) {
462 let mut stats = self.stats.write();
463 *stats = SimulatorStats::default();
464 if self.is_running() {
465 stats.start_time = Some(Instant::now());
466 }
467 }
468
469 pub fn config(&self) -> &SimulatorConfig {
471 &self.config
472 }
473
474 pub fn update_config(&mut self, config: SimulatorConfig) -> Result<(), SimulatorError> {
476 if self.is_running() {
477 return Err(SimulatorError::Internal(
478 "Cannot update config while running".to_string(),
479 ));
480 }
481
482 config.validate()?;
483 self.config = config;
484 Ok(())
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_simulator_creation() {
494 let config = SimulatorConfig::default();
495 let simulator = NetworkSimulator::new(config);
496 assert!(!simulator.is_running());
497 }
498
499 #[test]
500 fn test_network_conditions() {
501 let conditions = vec![
502 NetworkCondition::Perfect,
503 NetworkCondition::Good,
504 NetworkCondition::Fair,
505 NetworkCondition::Poor,
506 NetworkCondition::VeryPoor,
507 NetworkCondition::Mobile3G,
508 NetworkCondition::Mobile4G,
509 NetworkCondition::Mobile5G,
510 NetworkCondition::Satellite,
511 ];
512
513 for condition in conditions {
514 let config = SimulatorConfig::from_condition(condition);
515 assert!(config.validate().is_ok());
516 }
517 }
518
519 #[tokio::test]
520 async fn test_start_stop() {
521 let simulator = NetworkSimulator::from_condition(NetworkCondition::Good);
522
523 assert!(!simulator.is_running());
524
525 simulator.start().await.unwrap();
526 assert!(simulator.is_running());
527
528 simulator.stop().await.unwrap();
529 assert!(!simulator.is_running());
530 }
531
532 #[tokio::test]
533 async fn test_packet_delay() {
534 let config = SimulatorConfig {
535 base_latency_ms: 10,
536 packet_loss_rate: 0.0,
537 ..Default::default()
538 };
539
540 let simulator = NetworkSimulator::new(config);
541 simulator.start().await.unwrap();
542
543 let start = Instant::now();
544 let delivered = simulator.delay_packet(1024).await.unwrap();
545 let elapsed = start.elapsed();
546
547 assert!(delivered);
548 assert!(elapsed >= Duration::from_millis(10));
549
550 simulator.stop().await.unwrap();
551 }
552
553 #[tokio::test]
554 async fn test_packet_loss() {
555 let config = SimulatorConfig {
556 packet_loss_rate: 1.0, ..Default::default()
558 };
559
560 let simulator = NetworkSimulator::new(config);
561 simulator.start().await.unwrap();
562
563 let delivered = simulator.delay_packet(1024).await.unwrap();
564 assert!(!delivered); let stats = simulator.stats();
567 assert_eq!(stats.packets_dropped, 1);
568
569 simulator.stop().await.unwrap();
570 }
571
572 #[test]
573 fn test_partitions() {
574 let simulator = NetworkSimulator::from_condition(NetworkCondition::Good);
575
576 let group1 = vec!["peer1".to_string(), "peer2".to_string()];
577 let group2 = vec!["peer3".to_string(), "peer4".to_string()];
578
579 simulator.create_partition(group1, group2);
580
581 assert!(simulator.is_partitioned("peer1", "peer3"));
582 assert!(simulator.is_partitioned("peer2", "peer4"));
583 assert!(!simulator.is_partitioned("peer1", "peer2"));
584
585 simulator.clear_partitions();
586 assert!(!simulator.is_partitioned("peer1", "peer3"));
587 }
588
589 #[tokio::test]
590 async fn test_statistics() {
591 let config = SimulatorConfig {
592 base_latency_ms: 5,
593 packet_loss_rate: 0.0,
594 ..Default::default()
595 };
596
597 let simulator = NetworkSimulator::new(config);
598 simulator.start().await.unwrap();
599
600 for _ in 0..10 {
601 simulator.delay_packet(1024).await.unwrap();
602 }
603
604 let stats = simulator.stats();
605 assert_eq!(stats.packets_processed, 10);
606 assert_eq!(stats.bytes_processed, 10240);
607 assert!(stats.avg_latency_ms > 0.0);
608
609 simulator.stop().await.unwrap();
610 }
611
612 #[test]
613 fn test_invalid_config() {
614 let config = SimulatorConfig {
615 packet_loss_rate: 1.5, ..Default::default()
617 };
618
619 assert!(config.validate().is_err());
620 }
621
622 #[test]
623 fn test_stats_calculation() {
624 let stats = SimulatorStats {
625 packets_processed: 100,
626 packets_dropped: 5,
627 bytes_processed: 102400,
628 duration: Duration::from_secs(10),
629 ..Default::default()
630 };
631
632 assert_eq!(stats.packet_loss_rate(), 0.05);
633 assert_eq!(stats.throughput_bps(), 10240.0);
634 }
635
636 #[tokio::test]
637 async fn test_config_update() {
638 let mut simulator = NetworkSimulator::from_condition(NetworkCondition::Good);
639
640 let new_config = SimulatorConfig::from_condition(NetworkCondition::Poor);
641 assert!(simulator.update_config(new_config).is_ok());
642
643 simulator.start().await.unwrap();
644
645 let invalid_config = SimulatorConfig {
646 packet_loss_rate: -0.5,
647 ..Default::default()
648 };
649 assert!(simulator.update_config(invalid_config).is_err());
650 }
651
652 #[tokio::test]
653 async fn test_reset_stats() {
654 let simulator = NetworkSimulator::from_condition(NetworkCondition::Good);
655 simulator.start().await.unwrap();
656
657 simulator.delay_packet(1024).await.unwrap();
658 assert_eq!(simulator.stats().packets_processed, 1);
659
660 simulator.reset_stats();
661 assert_eq!(simulator.stats().packets_processed, 0);
662
663 simulator.stop().await.unwrap();
664 }
665}