1pub mod scale;
32pub mod memory_sim;
33pub mod failure;
34pub mod load;
35pub mod scenarios;
36
37pub use scale::*;
38pub use memory_sim::*;
39pub use failure::*;
40pub use load::*;
41pub use scenarios::*;
42
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47use serde::{Deserialize, Serialize};
48use tokio::sync::broadcast;
49
50use crate::error::Result;
51use crate::profiling::{Profiler, ProfileReport};
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SimulationConfig {
56 pub name: String,
58
59 pub description: String,
61
62 pub scale: ScaleConfig,
64
65 pub memory_pattern: MemoryPattern,
67
68 pub load_pattern: LoadPattern,
70
71 pub failure_config: Option<FailureConfig>,
73
74 pub max_duration: Duration,
76
77 pub report_interval: Duration,
79
80 pub detailed_metrics: bool,
82
83 pub seed: Option<u64>,
85}
86
87impl Default for SimulationConfig {
88 fn default() -> Self {
89 Self {
90 name: "default_simulation".into(),
91 description: "Default simulation configuration".into(),
92 scale: ScaleConfig::default(),
93 memory_pattern: MemoryPattern::Steady,
94 load_pattern: LoadPattern::Steady { ops_per_sec: 1000 },
95 failure_config: None,
96 max_duration: Duration::from_secs(300),
97 report_interval: Duration::from_secs(10),
98 detailed_metrics: true,
99 seed: None,
100 }
101 }
102}
103
104impl SimulationConfig {
105 pub fn new(name: impl Into<String>) -> Self {
107 Self {
108 name: name.into(),
109 ..Default::default()
110 }
111 }
112
113 pub fn with_scale(mut self, scale: ScaleConfig) -> Self {
115 self.scale = scale;
116 self
117 }
118
119 pub fn with_memory_pattern(mut self, pattern: MemoryPattern) -> Self {
121 self.memory_pattern = pattern;
122 self
123 }
124
125 pub fn with_load_pattern(mut self, pattern: LoadPattern) -> Self {
127 self.load_pattern = pattern;
128 self
129 }
130
131 pub fn with_failures(mut self, config: FailureConfig) -> Self {
133 self.failure_config = Some(config);
134 self
135 }
136
137 pub fn with_max_duration(mut self, duration: Duration) -> Self {
139 self.max_duration = duration;
140 self
141 }
142
143 pub fn with_seed(mut self, seed: u64) -> Self {
145 self.seed = Some(seed);
146 self
147 }
148
149 pub fn quick_test() -> Self {
151 Self {
152 name: "quick_test".into(),
153 description: "Quick validation test".into(),
154 scale: ScaleConfig::small(),
155 max_duration: Duration::from_secs(10),
156 ..Default::default()
157 }
158 }
159
160 pub fn stress_test() -> Self {
162 Self {
163 name: "stress_test".into(),
164 description: "High-load stress test".into(),
165 scale: ScaleConfig::large(),
166 memory_pattern: MemoryPattern::HighChurn,
167 load_pattern: LoadPattern::Spike {
168 baseline: 10_000,
169 peak: 100_000,
170 spike_duration: Duration::from_secs(30),
171 },
172 max_duration: Duration::from_secs(600),
173 ..Default::default()
174 }
175 }
176
177 pub fn memory_leak_test() -> Self {
179 Self {
180 name: "memory_leak_test".into(),
181 description: "Test for memory leaks over time".into(),
182 scale: ScaleConfig::medium(),
183 memory_pattern: MemoryPattern::GrowthOnly,
184 max_duration: Duration::from_secs(300),
185 report_interval: Duration::from_secs(5),
186 ..Default::default()
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
193pub enum SimulationEvent {
194 Started {
196 config: SimulationConfig,
197 start_time: Instant,
198 },
199
200 Progress {
202 elapsed: Duration,
203 completion_percent: f64,
204 current_metrics: SimulationMetrics,
205 },
206
207 PhaseChanged {
209 from: SimulationPhase,
210 to: SimulationPhase,
211 },
212
213 Error {
215 message: String,
216 recoverable: bool,
217 },
218
219 Completed {
221 result: SimulationResult,
222 },
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
227pub enum SimulationPhase {
228 Initializing,
230 WarmUp,
232 Running,
234 RampDown,
236 Finalizing,
238 Completed,
240}
241
242#[derive(Debug, Clone, Default, Serialize, Deserialize)]
244pub struct SimulationMetrics {
245 pub memory_bytes: u64,
247 pub peak_memory_bytes: u64,
249 pub active_devices: u64,
251 pub total_operations: u64,
253 pub ops_per_second: f64,
255 pub error_count: u64,
257 pub avg_latency_us: u64,
259 pub p99_latency_us: u64,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct SimulationResult {
266 pub name: String,
268 pub passed: bool,
270 pub duration: Duration,
272 pub final_metrics: SimulationMetrics,
274 pub memory_report: Option<ProfileReport>,
276 pub warnings: Vec<String>,
278 pub errors: Vec<String>,
280 pub phase_durations: std::collections::HashMap<String, Duration>,
282}
283
284impl SimulationResult {
285 pub fn has_errors(&self) -> bool {
287 !self.errors.is_empty()
288 }
289
290 pub fn summary(&self) -> String {
292 format!(
293 "Simulation '{}': {} in {:?}\n \
294 Devices: {}, Ops: {}, Errors: {}\n \
295 Memory: {} MB (peak: {} MB)\n \
296 Latency: avg {}us, p99 {}us",
297 self.name,
298 if self.passed { "PASSED" } else { "FAILED" },
299 self.duration,
300 self.final_metrics.active_devices,
301 self.final_metrics.total_operations,
302 self.final_metrics.error_count,
303 self.final_metrics.memory_bytes / 1024 / 1024,
304 self.final_metrics.peak_memory_bytes / 1024 / 1024,
305 self.final_metrics.avg_latency_us,
306 self.final_metrics.p99_latency_us,
307 )
308 }
309}
310
311pub struct Simulator {
313 config: SimulationConfig,
314 profiler: Arc<Profiler>,
315 metrics: Arc<RwLock<SimulationMetrics>>,
316 phase: Arc<RwLock<SimulationPhase>>,
317 event_tx: broadcast::Sender<SimulationEvent>,
318 running: Arc<std::sync::atomic::AtomicBool>,
319}
320
321impl Simulator {
322 pub fn new(config: SimulationConfig) -> Self {
324 let (event_tx, _) = broadcast::channel(256);
325
326 Self {
327 config,
328 profiler: Arc::new(Profiler::with_defaults()),
329 metrics: Arc::new(RwLock::new(SimulationMetrics::default())),
330 phase: Arc::new(RwLock::new(SimulationPhase::Initializing)),
331 event_tx,
332 running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
333 }
334 }
335
336 pub fn subscribe(&self) -> broadcast::Receiver<SimulationEvent> {
338 self.event_tx.subscribe()
339 }
340
341 pub fn metrics(&self) -> SimulationMetrics {
343 self.metrics.read().clone()
344 }
345
346 pub fn phase(&self) -> SimulationPhase {
348 *self.phase.read()
349 }
350
351 pub fn is_running(&self) -> bool {
353 self.running.load(std::sync::atomic::Ordering::SeqCst)
354 }
355
356 pub async fn run(&self) -> Result<SimulationResult> {
358 let start_time = Instant::now();
359
360 self.profiler.start();
362 self.running.store(true, std::sync::atomic::Ordering::SeqCst);
363
364 let _ = self.event_tx.send(SimulationEvent::Started {
366 config: self.config.clone(),
367 start_time,
368 });
369
370 let mut phase_durations = std::collections::HashMap::new();
371 let mut warnings = Vec::new();
372 let mut errors = Vec::new();
373
374 self.set_phase(SimulationPhase::Initializing);
376 let init_start = Instant::now();
377 if let Err(e) = self.run_initialization().await {
378 errors.push(format!("Initialization failed: {}", e));
379 }
380 phase_durations.insert("initialization".into(), init_start.elapsed());
381
382 if errors.is_empty() {
384 self.set_phase(SimulationPhase::WarmUp);
385 let warmup_start = Instant::now();
386 if let Err(e) = self.run_warmup().await {
387 warnings.push(format!("Warm-up warning: {}", e));
388 }
389 phase_durations.insert("warmup".into(), warmup_start.elapsed());
390 }
391
392 if errors.is_empty() {
394 self.set_phase(SimulationPhase::Running);
395 let run_start = Instant::now();
396
397 let run_result = self.run_main_phase(start_time).await;
399 if let Err(e) = run_result {
400 errors.push(format!("Execution error: {}", e));
401 }
402 phase_durations.insert("execution".into(), run_start.elapsed());
403 }
404
405 self.set_phase(SimulationPhase::RampDown);
407 let rampdown_start = Instant::now();
408 self.run_rampdown().await;
409 phase_durations.insert("rampdown".into(), rampdown_start.elapsed());
410
411 self.set_phase(SimulationPhase::Finalizing);
413 let finalize_start = Instant::now();
414
415 self.profiler.stop();
417 let memory_report = Some(self.profiler.generate_report());
418
419 let leak_warnings = self.profiler.check_leaks();
421 for leak in leak_warnings {
422 warnings.push(format!("Potential memory leak: {} - {}", leak.region, leak.message));
423 }
424
425 phase_durations.insert("finalization".into(), finalize_start.elapsed());
426
427 self.set_phase(SimulationPhase::Completed);
429 self.running.store(false, std::sync::atomic::Ordering::SeqCst);
430
431 let final_metrics = self.metrics.read().clone();
432 let passed = errors.is_empty() && self.check_success_criteria(&final_metrics);
433
434 let result = SimulationResult {
435 name: self.config.name.clone(),
436 passed,
437 duration: start_time.elapsed(),
438 final_metrics,
439 memory_report,
440 warnings,
441 errors,
442 phase_durations,
443 };
444
445 let _ = self.event_tx.send(SimulationEvent::Completed {
447 result: result.clone(),
448 });
449
450 Ok(result)
451 }
452
453 fn set_phase(&self, new_phase: SimulationPhase) {
454 let old_phase = *self.phase.read();
455 *self.phase.write() = new_phase;
456
457 let _ = self.event_tx.send(SimulationEvent::PhaseChanged {
458 from: old_phase,
459 to: new_phase,
460 });
461 }
462
463 async fn run_initialization(&self) -> Result<()> {
464 let target_devices = self.config.scale.device_count;
466 let batch_size = self.config.scale.batch_size;
467
468 tracing::info!(
469 "Initializing simulation: {} devices in batches of {}",
470 target_devices,
471 batch_size
472 );
473
474 let mut current = 0u64;
475 while current < target_devices as u64 {
476 let batch = (target_devices as u64 - current).min(batch_size as u64);
477
478 self.profiler.record_allocation(
480 "devices",
481 (batch as usize) * self.config.scale.memory_per_device,
482 );
483
484 current += batch;
485
486 {
488 let mut metrics = self.metrics.write();
489 metrics.active_devices = current;
490 metrics.memory_bytes = self.profiler.snapshot().current_bytes;
491 }
492
493 tokio::task::yield_now().await;
495 }
496
497 Ok(())
498 }
499
500 async fn run_warmup(&self) -> Result<()> {
501 let warmup_duration = Duration::from_secs(5);
503 let start = Instant::now();
504
505 while start.elapsed() < warmup_duration {
506 self.simulate_operations(100).await;
508 tokio::time::sleep(Duration::from_millis(100)).await;
509 }
510
511 Ok(())
512 }
513
514 async fn run_main_phase(&self, start_time: Instant) -> Result<()> {
515 let mut last_report = Instant::now();
516
517 while start_time.elapsed() < self.config.max_duration {
518 let ops = self.config.load_pattern.ops_for_elapsed(start_time.elapsed());
520 self.simulate_operations(ops).await;
521
522 self.apply_memory_pattern(start_time.elapsed()).await;
524
525 if let Some(ref failure_config) = self.config.failure_config {
527 self.inject_failures(failure_config).await;
528 }
529
530 if last_report.elapsed() >= self.config.report_interval {
532 let metrics = self.metrics.read().clone();
533 let completion = start_time.elapsed().as_secs_f64()
534 / self.config.max_duration.as_secs_f64()
535 * 100.0;
536
537 let _ = self.event_tx.send(SimulationEvent::Progress {
538 elapsed: start_time.elapsed(),
539 completion_percent: completion.min(100.0),
540 current_metrics: metrics,
541 });
542
543 last_report = Instant::now();
544 }
545
546 tokio::time::sleep(Duration::from_millis(10)).await;
547 }
548
549 Ok(())
550 }
551
552 async fn run_rampdown(&self) {
553 let current_devices = self.metrics.read().active_devices;
555 let batch_size = self.config.scale.batch_size as u64;
556
557 let mut remaining = current_devices;
558 while remaining > 0 {
559 let batch = remaining.min(batch_size);
560
561 self.profiler.record_deallocation(
562 "devices",
563 (batch as usize) * self.config.scale.memory_per_device,
564 );
565
566 remaining -= batch;
567
568 {
569 let mut metrics = self.metrics.write();
570 metrics.active_devices = remaining;
571 metrics.memory_bytes = self.profiler.snapshot().current_bytes;
572 }
573
574 tokio::task::yield_now().await;
575 }
576 }
577
578 async fn simulate_operations(&self, count: u64) {
579 let mut metrics = self.metrics.write();
580 metrics.total_operations += count;
581
582 let base_latency = 100u64; let jitter = (count % 50) * 2;
585 metrics.avg_latency_us = base_latency + jitter;
586 metrics.p99_latency_us = base_latency * 3 + jitter * 2;
587 }
588
589 async fn apply_memory_pattern(&self, elapsed: Duration) {
590 match &self.config.memory_pattern {
591 MemoryPattern::Steady => {
592 }
594 MemoryPattern::GrowthOnly => {
595 let growth = (elapsed.as_secs() * 1024) as usize;
597 self.profiler.record_allocation("growth", growth);
598 }
599 MemoryPattern::GrowthAndRelease => {
600 let cycle = elapsed.as_secs() % 60;
602 if cycle < 30 {
603 self.profiler.record_allocation("cyclic", 10240);
604 } else {
605 self.profiler.record_deallocation("cyclic", 10240);
606 }
607 }
608 MemoryPattern::HighChurn => {
609 self.profiler.record_allocation("churn", 1024);
611 self.profiler.record_deallocation("churn", 512);
612 }
613 MemoryPattern::Fragmentation => {
614 let sizes = [64, 256, 1024, 4096, 16384];
616 let idx = (elapsed.as_millis() as usize) % sizes.len();
617 self.profiler.record_allocation("frag", sizes[idx]);
618 if elapsed.as_millis() % 2 == 0 {
619 self.profiler.record_deallocation("frag", sizes[(idx + 2) % sizes.len()]);
620 }
621 }
622 MemoryPattern::Custom(pattern) => {
623 pattern.apply(&self.profiler, elapsed);
624 }
625 }
626
627 let snapshot = self.profiler.snapshot();
629 let mut metrics = self.metrics.write();
630 metrics.memory_bytes = snapshot.current_bytes;
631 if snapshot.current_bytes > metrics.peak_memory_bytes {
632 metrics.peak_memory_bytes = snapshot.current_bytes;
633 }
634 }
635
636 async fn inject_failures(&self, config: &FailureConfig) {
637 if config.should_inject() {
638 let mut metrics = self.metrics.write();
639 metrics.error_count += 1;
640
641 let _ = self.event_tx.send(SimulationEvent::Error {
642 message: config.next_failure_type().to_string(),
643 recoverable: true,
644 });
645 }
646 }
647
648 fn check_success_criteria(&self, metrics: &SimulationMetrics) -> bool {
649 let max_error_rate = 0.01; let error_rate = if metrics.total_operations > 0 {
652 metrics.error_count as f64 / metrics.total_operations as f64
653 } else {
654 0.0
655 };
656
657 error_rate <= max_error_rate
658 }
659
660 pub fn stop(&self) {
662 self.running.store(false, std::sync::atomic::Ordering::SeqCst);
663 }
664}
665
666impl std::fmt::Debug for Simulator {
667 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668 f.debug_struct("Simulator")
669 .field("config", &self.config)
670 .field("phase", &self.phase())
671 .field("running", &self.is_running())
672 .finish()
673 }
674}
675
676#[cfg(test)]
677mod tests {
678 use super::*;
679
680 #[test]
681 fn test_simulation_config_default() {
682 let config = SimulationConfig::default();
683 assert_eq!(config.name, "default_simulation");
684 assert_eq!(config.max_duration, Duration::from_secs(300));
685 }
686
687 #[test]
688 fn test_simulation_config_builders() {
689 let config = SimulationConfig::new("test")
690 .with_scale(ScaleConfig::devices(1000))
691 .with_memory_pattern(MemoryPattern::HighChurn)
692 .with_max_duration(Duration::from_secs(60));
693
694 assert_eq!(config.name, "test");
695 assert_eq!(config.scale.device_count, 1000);
696 }
697
698 #[test]
699 fn test_quick_test_config() {
700 let config = SimulationConfig::quick_test();
701 assert_eq!(config.max_duration, Duration::from_secs(10));
702 }
703
704 #[test]
705 fn test_stress_test_config() {
706 let config = SimulationConfig::stress_test();
707 assert_eq!(config.name, "stress_test");
708 matches!(config.memory_pattern, MemoryPattern::HighChurn);
709 }
710
711 #[test]
712 fn test_simulator_creation() {
713 let config = SimulationConfig::quick_test();
714 let simulator = Simulator::new(config);
715 assert!(!simulator.is_running());
716 assert_eq!(simulator.phase(), SimulationPhase::Initializing);
717 }
718
719 #[test]
720 fn test_simulation_result_summary() {
721 let result = SimulationResult {
722 name: "test".into(),
723 passed: true,
724 duration: Duration::from_secs(10),
725 final_metrics: SimulationMetrics {
726 memory_bytes: 1024 * 1024,
727 peak_memory_bytes: 2 * 1024 * 1024,
728 active_devices: 100,
729 total_operations: 10000,
730 ops_per_second: 1000.0,
731 error_count: 0,
732 avg_latency_us: 100,
733 p99_latency_us: 500,
734 },
735 memory_report: None,
736 warnings: vec![],
737 errors: vec![],
738 phase_durations: std::collections::HashMap::new(),
739 };
740
741 let summary = result.summary();
742 assert!(summary.contains("PASSED"));
743 assert!(summary.contains("10000"));
744 }
745
746 #[tokio::test]
747 async fn test_simulator_quick_run() {
748 let config = SimulationConfig::new("quick")
749 .with_scale(ScaleConfig::tiny())
750 .with_max_duration(Duration::from_millis(100));
751
752 let simulator = Simulator::new(config);
753 let result = simulator.run().await.unwrap();
754
755 assert!(result.passed);
756 assert!(result.final_metrics.active_devices == 0); }
758}