1pub mod failure;
32pub mod load;
33pub mod memory_sim;
34pub mod scale;
35pub mod scenarios;
36
37pub use failure::*;
38pub use load::*;
39pub use memory_sim::*;
40pub use scale::*;
41pub use scenarios::*;
42
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47use serde::{Deserialize, Serialize};
48use tokio::sync::broadcast;
49
50use crate::error::Result;
51use crate::profiling::{ProfileReport, Profiler};
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SimulationConfig {
56 pub name: String,
58
59 pub description: String,
61
62 pub scale: ScaleConfig,
64
65 pub memory_pattern: MemoryPattern,
67
68 pub load_pattern: LoadPattern,
70
71 pub failure_config: Option<FailureConfig>,
73
74 pub max_duration: Duration,
76
77 pub report_interval: Duration,
79
80 pub detailed_metrics: bool,
82
83 pub seed: Option<u64>,
85}
86
87impl Default for SimulationConfig {
88 fn default() -> Self {
89 Self {
90 name: "default_simulation".into(),
91 description: "Default simulation configuration".into(),
92 scale: ScaleConfig::default(),
93 memory_pattern: MemoryPattern::Steady,
94 load_pattern: LoadPattern::Steady { ops_per_sec: 1000 },
95 failure_config: None,
96 max_duration: Duration::from_secs(300),
97 report_interval: Duration::from_secs(10),
98 detailed_metrics: true,
99 seed: None,
100 }
101 }
102}
103
104impl SimulationConfig {
105 pub fn new(name: impl Into<String>) -> Self {
107 Self {
108 name: name.into(),
109 ..Default::default()
110 }
111 }
112
113 pub fn with_scale(mut self, scale: ScaleConfig) -> Self {
115 self.scale = scale;
116 self
117 }
118
119 pub fn with_memory_pattern(mut self, pattern: MemoryPattern) -> Self {
121 self.memory_pattern = pattern;
122 self
123 }
124
125 pub fn with_load_pattern(mut self, pattern: LoadPattern) -> Self {
127 self.load_pattern = pattern;
128 self
129 }
130
131 pub fn with_failures(mut self, config: FailureConfig) -> Self {
133 self.failure_config = Some(config);
134 self
135 }
136
137 pub fn with_max_duration(mut self, duration: Duration) -> Self {
139 self.max_duration = duration;
140 self
141 }
142
143 pub fn with_seed(mut self, seed: u64) -> Self {
145 self.seed = Some(seed);
146 self
147 }
148
149 pub fn quick_test() -> Self {
151 Self {
152 name: "quick_test".into(),
153 description: "Quick validation test".into(),
154 scale: ScaleConfig::small(),
155 max_duration: Duration::from_secs(10),
156 ..Default::default()
157 }
158 }
159
160 pub fn stress_test() -> Self {
162 Self {
163 name: "stress_test".into(),
164 description: "High-load stress test".into(),
165 scale: ScaleConfig::large(),
166 memory_pattern: MemoryPattern::HighChurn,
167 load_pattern: LoadPattern::Spike {
168 baseline: 10_000,
169 peak: 100_000,
170 spike_duration: Duration::from_secs(30),
171 },
172 max_duration: Duration::from_secs(600),
173 ..Default::default()
174 }
175 }
176
177 pub fn memory_leak_test() -> Self {
179 Self {
180 name: "memory_leak_test".into(),
181 description: "Test for memory leaks over time".into(),
182 scale: ScaleConfig::medium(),
183 memory_pattern: MemoryPattern::GrowthOnly,
184 max_duration: Duration::from_secs(300),
185 report_interval: Duration::from_secs(5),
186 ..Default::default()
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
193pub enum SimulationEvent {
194 Started {
196 config: SimulationConfig,
197 start_time: Instant,
198 },
199
200 Progress {
202 elapsed: Duration,
203 completion_percent: f64,
204 current_metrics: SimulationMetrics,
205 },
206
207 PhaseChanged {
209 from: SimulationPhase,
210 to: SimulationPhase,
211 },
212
213 Error { message: String, recoverable: bool },
215
216 Completed { result: SimulationResult },
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
222pub enum SimulationPhase {
223 Initializing,
225 WarmUp,
227 Running,
229 RampDown,
231 Finalizing,
233 Completed,
235}
236
237#[derive(Debug, Clone, Default, Serialize, Deserialize)]
239pub struct SimulationMetrics {
240 pub memory_bytes: u64,
242 pub peak_memory_bytes: u64,
244 pub active_devices: u64,
246 pub total_operations: u64,
248 pub ops_per_second: f64,
250 pub error_count: u64,
252 pub avg_latency_us: u64,
254 pub p99_latency_us: u64,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct SimulationResult {
261 pub name: String,
263 pub passed: bool,
265 pub duration: Duration,
267 pub final_metrics: SimulationMetrics,
269 pub memory_report: Option<ProfileReport>,
271 pub warnings: Vec<String>,
273 pub errors: Vec<String>,
275 pub phase_durations: std::collections::HashMap<String, Duration>,
277}
278
279impl SimulationResult {
280 pub fn has_errors(&self) -> bool {
282 !self.errors.is_empty()
283 }
284
285 pub fn summary(&self) -> String {
287 format!(
288 "Simulation '{}': {} in {:?}\n \
289 Devices: {}, Ops: {}, Errors: {}\n \
290 Memory: {} MB (peak: {} MB)\n \
291 Latency: avg {}us, p99 {}us",
292 self.name,
293 if self.passed { "PASSED" } else { "FAILED" },
294 self.duration,
295 self.final_metrics.active_devices,
296 self.final_metrics.total_operations,
297 self.final_metrics.error_count,
298 self.final_metrics.memory_bytes / 1024 / 1024,
299 self.final_metrics.peak_memory_bytes / 1024 / 1024,
300 self.final_metrics.avg_latency_us,
301 self.final_metrics.p99_latency_us,
302 )
303 }
304}
305
306pub struct Simulator {
308 config: SimulationConfig,
309 profiler: Arc<Profiler>,
310 metrics: Arc<RwLock<SimulationMetrics>>,
311 phase: Arc<RwLock<SimulationPhase>>,
312 event_tx: broadcast::Sender<SimulationEvent>,
313 running: Arc<std::sync::atomic::AtomicBool>,
314}
315
316impl Simulator {
317 pub fn new(config: SimulationConfig) -> Self {
319 let (event_tx, _) = broadcast::channel(256);
320
321 Self {
322 config,
323 profiler: Arc::new(Profiler::with_defaults()),
324 metrics: Arc::new(RwLock::new(SimulationMetrics::default())),
325 phase: Arc::new(RwLock::new(SimulationPhase::Initializing)),
326 event_tx,
327 running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
328 }
329 }
330
331 pub fn subscribe(&self) -> broadcast::Receiver<SimulationEvent> {
333 self.event_tx.subscribe()
334 }
335
336 pub fn metrics(&self) -> SimulationMetrics {
338 self.metrics.read().clone()
339 }
340
341 pub fn phase(&self) -> SimulationPhase {
343 *self.phase.read()
344 }
345
346 pub fn is_running(&self) -> bool {
348 self.running.load(std::sync::atomic::Ordering::SeqCst)
349 }
350
351 pub async fn run(&self) -> Result<SimulationResult> {
353 let start_time = Instant::now();
354
355 self.profiler.start();
357 self.running
358 .store(true, std::sync::atomic::Ordering::SeqCst);
359
360 let _ = self.event_tx.send(SimulationEvent::Started {
362 config: self.config.clone(),
363 start_time,
364 });
365
366 let mut phase_durations = std::collections::HashMap::new();
367 let mut warnings = Vec::new();
368 let mut errors = Vec::new();
369
370 self.set_phase(SimulationPhase::Initializing);
372 let init_start = Instant::now();
373 if let Err(e) = self.run_initialization().await {
374 errors.push(format!("Initialization failed: {}", e));
375 }
376 phase_durations.insert("initialization".into(), init_start.elapsed());
377
378 if errors.is_empty() {
380 self.set_phase(SimulationPhase::WarmUp);
381 let warmup_start = Instant::now();
382 if let Err(e) = self.run_warmup().await {
383 warnings.push(format!("Warm-up warning: {}", e));
384 }
385 phase_durations.insert("warmup".into(), warmup_start.elapsed());
386 }
387
388 if errors.is_empty() {
390 self.set_phase(SimulationPhase::Running);
391 let run_start = Instant::now();
392
393 let run_result = self.run_main_phase(start_time).await;
395 if let Err(e) = run_result {
396 errors.push(format!("Execution error: {}", e));
397 }
398 phase_durations.insert("execution".into(), run_start.elapsed());
399 }
400
401 self.set_phase(SimulationPhase::RampDown);
403 let rampdown_start = Instant::now();
404 self.run_rampdown().await;
405 phase_durations.insert("rampdown".into(), rampdown_start.elapsed());
406
407 self.set_phase(SimulationPhase::Finalizing);
409 let finalize_start = Instant::now();
410
411 self.profiler.stop();
413 let memory_report = Some(self.profiler.generate_report());
414
415 let leak_warnings = self.profiler.check_leaks();
417 for leak in leak_warnings {
418 warnings.push(format!(
419 "Potential memory leak: {} - {}",
420 leak.region, leak.message
421 ));
422 }
423
424 phase_durations.insert("finalization".into(), finalize_start.elapsed());
425
426 self.set_phase(SimulationPhase::Completed);
428 self.running
429 .store(false, std::sync::atomic::Ordering::SeqCst);
430
431 let final_metrics = self.metrics.read().clone();
432 let passed = errors.is_empty() && self.check_success_criteria(&final_metrics);
433
434 let result = SimulationResult {
435 name: self.config.name.clone(),
436 passed,
437 duration: start_time.elapsed(),
438 final_metrics,
439 memory_report,
440 warnings,
441 errors,
442 phase_durations,
443 };
444
445 let _ = self.event_tx.send(SimulationEvent::Completed {
447 result: result.clone(),
448 });
449
450 Ok(result)
451 }
452
453 fn set_phase(&self, new_phase: SimulationPhase) {
454 let old_phase = *self.phase.read();
455 *self.phase.write() = new_phase;
456
457 let _ = self.event_tx.send(SimulationEvent::PhaseChanged {
458 from: old_phase,
459 to: new_phase,
460 });
461 }
462
463 async fn run_initialization(&self) -> Result<()> {
464 let target_devices = self.config.scale.device_count;
466 let batch_size = self.config.scale.batch_size;
467
468 tracing::info!(
469 "Initializing simulation: {} devices in batches of {}",
470 target_devices,
471 batch_size
472 );
473
474 let mut current = 0u64;
475 while current < target_devices as u64 {
476 let batch = (target_devices as u64 - current).min(batch_size as u64);
477
478 self.profiler.record_allocation(
480 "devices",
481 (batch as usize) * self.config.scale.memory_per_device,
482 );
483
484 current += batch;
485
486 {
488 let mut metrics = self.metrics.write();
489 metrics.active_devices = current;
490 metrics.memory_bytes = self.profiler.snapshot().current_bytes;
491 }
492
493 tokio::task::yield_now().await;
495 }
496
497 Ok(())
498 }
499
500 async fn run_warmup(&self) -> Result<()> {
501 let warmup_duration = Duration::from_secs(5);
503 let start = Instant::now();
504
505 while start.elapsed() < warmup_duration {
506 self.simulate_operations(100).await;
508 tokio::time::sleep(Duration::from_millis(100)).await;
509 }
510
511 Ok(())
512 }
513
514 async fn run_main_phase(&self, start_time: Instant) -> Result<()> {
515 let mut last_report = Instant::now();
516
517 while start_time.elapsed() < self.config.max_duration {
518 let ops = self
520 .config
521 .load_pattern
522 .ops_for_elapsed(start_time.elapsed());
523 self.simulate_operations(ops).await;
524
525 self.apply_memory_pattern(start_time.elapsed()).await;
527
528 if let Some(ref failure_config) = self.config.failure_config {
530 self.inject_failures(failure_config).await;
531 }
532
533 if last_report.elapsed() >= self.config.report_interval {
535 let metrics = self.metrics.read().clone();
536 let completion = start_time.elapsed().as_secs_f64()
537 / self.config.max_duration.as_secs_f64()
538 * 100.0;
539
540 let _ = self.event_tx.send(SimulationEvent::Progress {
541 elapsed: start_time.elapsed(),
542 completion_percent: completion.min(100.0),
543 current_metrics: metrics,
544 });
545
546 last_report = Instant::now();
547 }
548
549 tokio::time::sleep(Duration::from_millis(10)).await;
550 }
551
552 Ok(())
553 }
554
555 async fn run_rampdown(&self) {
556 let current_devices = self.metrics.read().active_devices;
558 let batch_size = self.config.scale.batch_size as u64;
559
560 let mut remaining = current_devices;
561 while remaining > 0 {
562 let batch = remaining.min(batch_size);
563
564 self.profiler.record_deallocation(
565 "devices",
566 (batch as usize) * self.config.scale.memory_per_device,
567 );
568
569 remaining -= batch;
570
571 {
572 let mut metrics = self.metrics.write();
573 metrics.active_devices = remaining;
574 metrics.memory_bytes = self.profiler.snapshot().current_bytes;
575 }
576
577 tokio::task::yield_now().await;
578 }
579 }
580
581 async fn simulate_operations(&self, count: u64) {
582 let mut metrics = self.metrics.write();
583 metrics.total_operations += count;
584
585 let base_latency = 100u64; let jitter = (count % 50) * 2;
588 metrics.avg_latency_us = base_latency + jitter;
589 metrics.p99_latency_us = base_latency * 3 + jitter * 2;
590 }
591
592 async fn apply_memory_pattern(&self, elapsed: Duration) {
593 match &self.config.memory_pattern {
594 MemoryPattern::Steady => {
595 }
597 MemoryPattern::GrowthOnly => {
598 let growth = (elapsed.as_secs() * 1024) as usize;
600 self.profiler.record_allocation("growth", growth);
601 }
602 MemoryPattern::GrowthAndRelease => {
603 let cycle = elapsed.as_secs() % 60;
605 if cycle < 30 {
606 self.profiler.record_allocation("cyclic", 10240);
607 } else {
608 self.profiler.record_deallocation("cyclic", 10240);
609 }
610 }
611 MemoryPattern::HighChurn => {
612 self.profiler.record_allocation("churn", 1024);
614 self.profiler.record_deallocation("churn", 512);
615 }
616 MemoryPattern::Fragmentation => {
617 let sizes = [64, 256, 1024, 4096, 16384];
619 let idx = (elapsed.as_millis() as usize) % sizes.len();
620 self.profiler.record_allocation("frag", sizes[idx]);
621 if elapsed.as_millis() % 2 == 0 {
622 self.profiler
623 .record_deallocation("frag", sizes[(idx + 2) % sizes.len()]);
624 }
625 }
626 MemoryPattern::Custom(pattern) => {
627 pattern.apply(&self.profiler, elapsed);
628 }
629 }
630
631 let snapshot = self.profiler.snapshot();
633 let mut metrics = self.metrics.write();
634 metrics.memory_bytes = snapshot.current_bytes;
635 if snapshot.current_bytes > metrics.peak_memory_bytes {
636 metrics.peak_memory_bytes = snapshot.current_bytes;
637 }
638 }
639
640 async fn inject_failures(&self, config: &FailureConfig) {
641 if config.should_inject() {
642 let mut metrics = self.metrics.write();
643 metrics.error_count += 1;
644
645 let _ = self.event_tx.send(SimulationEvent::Error {
646 message: config.next_failure_type().to_string(),
647 recoverable: true,
648 });
649 }
650 }
651
652 fn check_success_criteria(&self, metrics: &SimulationMetrics) -> bool {
653 let max_error_rate = 0.01; let error_rate = if metrics.total_operations > 0 {
656 metrics.error_count as f64 / metrics.total_operations as f64
657 } else {
658 0.0
659 };
660
661 error_rate <= max_error_rate
662 }
663
664 pub fn stop(&self) {
666 self.running
667 .store(false, std::sync::atomic::Ordering::SeqCst);
668 }
669}
670
671impl std::fmt::Debug for Simulator {
672 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673 f.debug_struct("Simulator")
674 .field("config", &self.config)
675 .field("phase", &self.phase())
676 .field("running", &self.is_running())
677 .finish()
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684
685 #[test]
686 fn test_simulation_config_default() {
687 let config = SimulationConfig::default();
688 assert_eq!(config.name, "default_simulation");
689 assert_eq!(config.max_duration, Duration::from_secs(300));
690 }
691
692 #[test]
693 fn test_simulation_config_builders() {
694 let config = SimulationConfig::new("test")
695 .with_scale(ScaleConfig::devices(1000))
696 .with_memory_pattern(MemoryPattern::HighChurn)
697 .with_max_duration(Duration::from_secs(60));
698
699 assert_eq!(config.name, "test");
700 assert_eq!(config.scale.device_count, 1000);
701 }
702
703 #[test]
704 fn test_quick_test_config() {
705 let config = SimulationConfig::quick_test();
706 assert_eq!(config.max_duration, Duration::from_secs(10));
707 }
708
709 #[test]
710 fn test_stress_test_config() {
711 let config = SimulationConfig::stress_test();
712 assert_eq!(config.name, "stress_test");
713 matches!(config.memory_pattern, MemoryPattern::HighChurn);
714 }
715
716 #[test]
717 fn test_simulator_creation() {
718 let config = SimulationConfig::quick_test();
719 let simulator = Simulator::new(config);
720 assert!(!simulator.is_running());
721 assert_eq!(simulator.phase(), SimulationPhase::Initializing);
722 }
723
724 #[test]
725 fn test_simulation_result_summary() {
726 let result = SimulationResult {
727 name: "test".into(),
728 passed: true,
729 duration: Duration::from_secs(10),
730 final_metrics: SimulationMetrics {
731 memory_bytes: 1024 * 1024,
732 peak_memory_bytes: 2 * 1024 * 1024,
733 active_devices: 100,
734 total_operations: 10000,
735 ops_per_second: 1000.0,
736 error_count: 0,
737 avg_latency_us: 100,
738 p99_latency_us: 500,
739 },
740 memory_report: None,
741 warnings: vec![],
742 errors: vec![],
743 phase_durations: std::collections::HashMap::new(),
744 };
745
746 let summary = result.summary();
747 assert!(summary.contains("PASSED"));
748 assert!(summary.contains("10000"));
749 }
750
751 #[tokio::test]
752 async fn test_simulator_quick_run() {
753 let config = SimulationConfig::new("quick")
754 .with_scale(ScaleConfig::tiny())
755 .with_max_duration(Duration::from_millis(100));
756
757 let simulator = Simulator::new(config);
758 let result = simulator.run().await.unwrap();
759
760 assert!(result.passed);
761 assert!(result.final_metrics.active_devices == 0); }
763}