Skip to main content

mabi_core/simulation/
mod.rs

1//! Simulation framework for comprehensive testing scenarios.
2//!
3//! This module provides a flexible framework for simulating various conditions
4//! that the TRAP simulator might encounter in production environments.
5//!
6//! # Features
7//!
8//! - **Scale Simulation**: Test with 50,000+ virtual devices
9//! - **Memory Patterns**: Simulate various memory allocation patterns
10//! - **Failure Injection**: Inject faults and errors for resilience testing
11//! - **Load Patterns**: Various load profiles (steady, burst, ramp, spike)
12//! - **Resource Constraints**: Simulate limited memory/CPU environments
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use mabi_core::simulation::{
18//!     Simulator, SimulationConfig, ScaleConfig,
19//!     MemoryPattern, LoadPattern,
20//! };
21//!
22//! let config = SimulationConfig::default()
23//!     .with_scale(ScaleConfig::devices(50_000))
24//!     .with_memory_pattern(MemoryPattern::GrowthAndRelease)
25//!     .with_load_pattern(LoadPattern::Burst { peak: 100_000, duration_secs: 60 });
26//!
27//! let simulator = Simulator::new(config);
28//! let results = simulator.run().await?;
29//! ```
30
31pub mod scale;
32pub mod memory_sim;
33pub mod failure;
34pub mod load;
35pub mod scenarios;
36
37pub use scale::*;
38pub use memory_sim::*;
39pub use failure::*;
40pub use load::*;
41pub use scenarios::*;
42
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47use serde::{Deserialize, Serialize};
48use tokio::sync::broadcast;
49
50use crate::error::Result;
51use crate::profiling::{Profiler, ProfileReport};
52
53/// Main simulation configuration.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SimulationConfig {
56    /// Name of the simulation.
57    pub name: String,
58
59    /// Description of what this simulation tests.
60    pub description: String,
61
62    /// Scale configuration (number of devices, points, etc.).
63    pub scale: ScaleConfig,
64
65    /// Memory simulation pattern.
66    pub memory_pattern: MemoryPattern,
67
68    /// Load pattern to apply.
69    pub load_pattern: LoadPattern,
70
71    /// Failure injection configuration.
72    pub failure_config: Option<FailureConfig>,
73
74    /// Maximum duration for the simulation.
75    pub max_duration: Duration,
76
77    /// Interval between progress reports.
78    pub report_interval: Duration,
79
80    /// Whether to collect detailed metrics.
81    pub detailed_metrics: bool,
82
83    /// Random seed for reproducibility.
84    pub seed: Option<u64>,
85}
86
87impl Default for SimulationConfig {
88    fn default() -> Self {
89        Self {
90            name: "default_simulation".into(),
91            description: "Default simulation configuration".into(),
92            scale: ScaleConfig::default(),
93            memory_pattern: MemoryPattern::Steady,
94            load_pattern: LoadPattern::Steady { ops_per_sec: 1000 },
95            failure_config: None,
96            max_duration: Duration::from_secs(300),
97            report_interval: Duration::from_secs(10),
98            detailed_metrics: true,
99            seed: None,
100        }
101    }
102}
103
104impl SimulationConfig {
105    /// Create a new simulation config with a name.
106    pub fn new(name: impl Into<String>) -> Self {
107        Self {
108            name: name.into(),
109            ..Default::default()
110        }
111    }
112
113    /// Set the scale configuration.
114    pub fn with_scale(mut self, scale: ScaleConfig) -> Self {
115        self.scale = scale;
116        self
117    }
118
119    /// Set the memory pattern.
120    pub fn with_memory_pattern(mut self, pattern: MemoryPattern) -> Self {
121        self.memory_pattern = pattern;
122        self
123    }
124
125    /// Set the load pattern.
126    pub fn with_load_pattern(mut self, pattern: LoadPattern) -> Self {
127        self.load_pattern = pattern;
128        self
129    }
130
131    /// Set the failure configuration.
132    pub fn with_failures(mut self, config: FailureConfig) -> Self {
133        self.failure_config = Some(config);
134        self
135    }
136
137    /// Set the maximum duration.
138    pub fn with_max_duration(mut self, duration: Duration) -> Self {
139        self.max_duration = duration;
140        self
141    }
142
143    /// Set the random seed.
144    pub fn with_seed(mut self, seed: u64) -> Self {
145        self.seed = Some(seed);
146        self
147    }
148
149    /// Create a quick test configuration.
150    pub fn quick_test() -> Self {
151        Self {
152            name: "quick_test".into(),
153            description: "Quick validation test".into(),
154            scale: ScaleConfig::small(),
155            max_duration: Duration::from_secs(10),
156            ..Default::default()
157        }
158    }
159
160    /// Create a stress test configuration.
161    pub fn stress_test() -> Self {
162        Self {
163            name: "stress_test".into(),
164            description: "High-load stress test".into(),
165            scale: ScaleConfig::large(),
166            memory_pattern: MemoryPattern::HighChurn,
167            load_pattern: LoadPattern::Spike {
168                baseline: 10_000,
169                peak: 100_000,
170                spike_duration: Duration::from_secs(30),
171            },
172            max_duration: Duration::from_secs(600),
173            ..Default::default()
174        }
175    }
176
177    /// Create a memory leak detection configuration.
178    pub fn memory_leak_test() -> Self {
179        Self {
180            name: "memory_leak_test".into(),
181            description: "Test for memory leaks over time".into(),
182            scale: ScaleConfig::medium(),
183            memory_pattern: MemoryPattern::GrowthOnly,
184            max_duration: Duration::from_secs(300),
185            report_interval: Duration::from_secs(5),
186            ..Default::default()
187        }
188    }
189}
190
191/// Simulation events for progress tracking.
192#[derive(Debug, Clone)]
193pub enum SimulationEvent {
194    /// Simulation started.
195    Started {
196        config: SimulationConfig,
197        start_time: Instant,
198    },
199
200    /// Progress update.
201    Progress {
202        elapsed: Duration,
203        completion_percent: f64,
204        current_metrics: SimulationMetrics,
205    },
206
207    /// Phase changed.
208    PhaseChanged {
209        from: SimulationPhase,
210        to: SimulationPhase,
211    },
212
213    /// Error occurred.
214    Error {
215        message: String,
216        recoverable: bool,
217    },
218
219    /// Simulation completed.
220    Completed {
221        result: SimulationResult,
222    },
223}
224
225/// Current phase of the simulation.
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
227pub enum SimulationPhase {
228    /// Initializing resources.
229    Initializing,
230    /// Warming up (pre-test stabilization).
231    WarmUp,
232    /// Main test execution.
233    Running,
234    /// Ramping down.
235    RampDown,
236    /// Collecting final metrics.
237    Finalizing,
238    /// Completed.
239    Completed,
240}
241
242/// Real-time simulation metrics.
243#[derive(Debug, Clone, Default, Serialize, Deserialize)]
244pub struct SimulationMetrics {
245    /// Current memory usage in bytes.
246    pub memory_bytes: u64,
247    /// Peak memory usage in bytes.
248    pub peak_memory_bytes: u64,
249    /// Current number of active devices.
250    pub active_devices: u64,
251    /// Total operations performed.
252    pub total_operations: u64,
253    /// Operations per second (current rate).
254    pub ops_per_second: f64,
255    /// Error count.
256    pub error_count: u64,
257    /// Average latency in microseconds.
258    pub avg_latency_us: u64,
259    /// P99 latency in microseconds.
260    pub p99_latency_us: u64,
261}
262
263/// Final simulation result.
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct SimulationResult {
266    /// Simulation name.
267    pub name: String,
268    /// Whether the simulation passed all criteria.
269    pub passed: bool,
270    /// Total duration of the simulation.
271    pub duration: Duration,
272    /// Final metrics.
273    pub final_metrics: SimulationMetrics,
274    /// Memory profiling report.
275    pub memory_report: Option<ProfileReport>,
276    /// Any warnings generated.
277    pub warnings: Vec<String>,
278    /// Any errors that occurred.
279    pub errors: Vec<String>,
280    /// Per-phase timing.
281    pub phase_durations: std::collections::HashMap<String, Duration>,
282}
283
284impl SimulationResult {
285    /// Check if there were any errors.
286    pub fn has_errors(&self) -> bool {
287        !self.errors.is_empty()
288    }
289
290    /// Get a summary string.
291    pub fn summary(&self) -> String {
292        format!(
293            "Simulation '{}': {} in {:?}\n  \
294             Devices: {}, Ops: {}, Errors: {}\n  \
295             Memory: {} MB (peak: {} MB)\n  \
296             Latency: avg {}us, p99 {}us",
297            self.name,
298            if self.passed { "PASSED" } else { "FAILED" },
299            self.duration,
300            self.final_metrics.active_devices,
301            self.final_metrics.total_operations,
302            self.final_metrics.error_count,
303            self.final_metrics.memory_bytes / 1024 / 1024,
304            self.final_metrics.peak_memory_bytes / 1024 / 1024,
305            self.final_metrics.avg_latency_us,
306            self.final_metrics.p99_latency_us,
307        )
308    }
309}
310
311/// Main simulator engine.
312pub struct Simulator {
313    config: SimulationConfig,
314    profiler: Arc<Profiler>,
315    metrics: Arc<RwLock<SimulationMetrics>>,
316    phase: Arc<RwLock<SimulationPhase>>,
317    event_tx: broadcast::Sender<SimulationEvent>,
318    running: Arc<std::sync::atomic::AtomicBool>,
319}
320
321impl Simulator {
322    /// Create a new simulator with the given configuration.
323    pub fn new(config: SimulationConfig) -> Self {
324        let (event_tx, _) = broadcast::channel(256);
325
326        Self {
327            config,
328            profiler: Arc::new(Profiler::with_defaults()),
329            metrics: Arc::new(RwLock::new(SimulationMetrics::default())),
330            phase: Arc::new(RwLock::new(SimulationPhase::Initializing)),
331            event_tx,
332            running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
333        }
334    }
335
336    /// Subscribe to simulation events.
337    pub fn subscribe(&self) -> broadcast::Receiver<SimulationEvent> {
338        self.event_tx.subscribe()
339    }
340
341    /// Get current metrics.
342    pub fn metrics(&self) -> SimulationMetrics {
343        self.metrics.read().clone()
344    }
345
346    /// Get current phase.
347    pub fn phase(&self) -> SimulationPhase {
348        *self.phase.read()
349    }
350
351    /// Check if simulation is running.
352    pub fn is_running(&self) -> bool {
353        self.running.load(std::sync::atomic::Ordering::SeqCst)
354    }
355
356    /// Run the simulation.
357    pub async fn run(&self) -> Result<SimulationResult> {
358        let start_time = Instant::now();
359
360        // Start profiling
361        self.profiler.start();
362        self.running.store(true, std::sync::atomic::Ordering::SeqCst);
363
364        // Emit start event
365        let _ = self.event_tx.send(SimulationEvent::Started {
366            config: self.config.clone(),
367            start_time,
368        });
369
370        let mut phase_durations = std::collections::HashMap::new();
371        let mut warnings = Vec::new();
372        let mut errors = Vec::new();
373
374        // Phase 1: Initialize
375        self.set_phase(SimulationPhase::Initializing);
376        let init_start = Instant::now();
377        if let Err(e) = self.run_initialization().await {
378            errors.push(format!("Initialization failed: {}", e));
379        }
380        phase_durations.insert("initialization".into(), init_start.elapsed());
381
382        // Phase 2: Warm-up
383        if errors.is_empty() {
384            self.set_phase(SimulationPhase::WarmUp);
385            let warmup_start = Instant::now();
386            if let Err(e) = self.run_warmup().await {
387                warnings.push(format!("Warm-up warning: {}", e));
388            }
389            phase_durations.insert("warmup".into(), warmup_start.elapsed());
390        }
391
392        // Phase 3: Main execution
393        if errors.is_empty() {
394            self.set_phase(SimulationPhase::Running);
395            let run_start = Instant::now();
396
397            // Run until max duration or completion
398            let run_result = self.run_main_phase(start_time).await;
399            if let Err(e) = run_result {
400                errors.push(format!("Execution error: {}", e));
401            }
402            phase_durations.insert("execution".into(), run_start.elapsed());
403        }
404
405        // Phase 4: Ramp down
406        self.set_phase(SimulationPhase::RampDown);
407        let rampdown_start = Instant::now();
408        self.run_rampdown().await;
409        phase_durations.insert("rampdown".into(), rampdown_start.elapsed());
410
411        // Phase 5: Finalize
412        self.set_phase(SimulationPhase::Finalizing);
413        let finalize_start = Instant::now();
414
415        // Stop profiling and get report
416        self.profiler.stop();
417        let memory_report = Some(self.profiler.generate_report());
418
419        // Check for memory leaks
420        let leak_warnings = self.profiler.check_leaks();
421        for leak in leak_warnings {
422            warnings.push(format!("Potential memory leak: {} - {}", leak.region, leak.message));
423        }
424
425        phase_durations.insert("finalization".into(), finalize_start.elapsed());
426
427        // Build result
428        self.set_phase(SimulationPhase::Completed);
429        self.running.store(false, std::sync::atomic::Ordering::SeqCst);
430
431        let final_metrics = self.metrics.read().clone();
432        let passed = errors.is_empty() && self.check_success_criteria(&final_metrics);
433
434        let result = SimulationResult {
435            name: self.config.name.clone(),
436            passed,
437            duration: start_time.elapsed(),
438            final_metrics,
439            memory_report,
440            warnings,
441            errors,
442            phase_durations,
443        };
444
445        // Emit completion event
446        let _ = self.event_tx.send(SimulationEvent::Completed {
447            result: result.clone(),
448        });
449
450        Ok(result)
451    }
452
453    fn set_phase(&self, new_phase: SimulationPhase) {
454        let old_phase = *self.phase.read();
455        *self.phase.write() = new_phase;
456
457        let _ = self.event_tx.send(SimulationEvent::PhaseChanged {
458            from: old_phase,
459            to: new_phase,
460        });
461    }
462
463    async fn run_initialization(&self) -> Result<()> {
464        // Initialize based on scale config
465        let target_devices = self.config.scale.device_count;
466        let batch_size = self.config.scale.batch_size;
467
468        tracing::info!(
469            "Initializing simulation: {} devices in batches of {}",
470            target_devices,
471            batch_size
472        );
473
474        let mut current = 0u64;
475        while current < target_devices as u64 {
476            let batch = (target_devices as u64 - current).min(batch_size as u64);
477
478            // Simulate device creation
479            self.profiler.record_allocation(
480                "devices",
481                (batch as usize) * self.config.scale.memory_per_device,
482            );
483
484            current += batch;
485
486            // Update metrics
487            {
488                let mut metrics = self.metrics.write();
489                metrics.active_devices = current;
490                metrics.memory_bytes = self.profiler.snapshot().current_bytes;
491            }
492
493            // Yield to allow other tasks
494            tokio::task::yield_now().await;
495        }
496
497        Ok(())
498    }
499
500    async fn run_warmup(&self) -> Result<()> {
501        // Brief warm-up period
502        let warmup_duration = Duration::from_secs(5);
503        let start = Instant::now();
504
505        while start.elapsed() < warmup_duration {
506            // Simulate some operations
507            self.simulate_operations(100).await;
508            tokio::time::sleep(Duration::from_millis(100)).await;
509        }
510
511        Ok(())
512    }
513
514    async fn run_main_phase(&self, start_time: Instant) -> Result<()> {
515        let mut last_report = Instant::now();
516
517        while start_time.elapsed() < self.config.max_duration {
518            // Apply load pattern
519            let ops = self.config.load_pattern.ops_for_elapsed(start_time.elapsed());
520            self.simulate_operations(ops).await;
521
522            // Apply memory pattern
523            self.apply_memory_pattern(start_time.elapsed()).await;
524
525            // Inject failures if configured
526            if let Some(ref failure_config) = self.config.failure_config {
527                self.inject_failures(failure_config).await;
528            }
529
530            // Report progress
531            if last_report.elapsed() >= self.config.report_interval {
532                let metrics = self.metrics.read().clone();
533                let completion = start_time.elapsed().as_secs_f64()
534                    / self.config.max_duration.as_secs_f64()
535                    * 100.0;
536
537                let _ = self.event_tx.send(SimulationEvent::Progress {
538                    elapsed: start_time.elapsed(),
539                    completion_percent: completion.min(100.0),
540                    current_metrics: metrics,
541                });
542
543                last_report = Instant::now();
544            }
545
546            tokio::time::sleep(Duration::from_millis(10)).await;
547        }
548
549        Ok(())
550    }
551
552    async fn run_rampdown(&self) {
553        // Gracefully release resources
554        let current_devices = self.metrics.read().active_devices;
555        let batch_size = self.config.scale.batch_size as u64;
556
557        let mut remaining = current_devices;
558        while remaining > 0 {
559            let batch = remaining.min(batch_size);
560
561            self.profiler.record_deallocation(
562                "devices",
563                (batch as usize) * self.config.scale.memory_per_device,
564            );
565
566            remaining -= batch;
567
568            {
569                let mut metrics = self.metrics.write();
570                metrics.active_devices = remaining;
571                metrics.memory_bytes = self.profiler.snapshot().current_bytes;
572            }
573
574            tokio::task::yield_now().await;
575        }
576    }
577
578    async fn simulate_operations(&self, count: u64) {
579        let mut metrics = self.metrics.write();
580        metrics.total_operations += count;
581
582        // Simulate latency
583        let base_latency = 100u64; // 100us base
584        let jitter = (count % 50) * 2;
585        metrics.avg_latency_us = base_latency + jitter;
586        metrics.p99_latency_us = base_latency * 3 + jitter * 2;
587    }
588
589    async fn apply_memory_pattern(&self, elapsed: Duration) {
590        match &self.config.memory_pattern {
591            MemoryPattern::Steady => {
592                // No additional allocations
593            }
594            MemoryPattern::GrowthOnly => {
595                // Continuous growth
596                let growth = (elapsed.as_secs() * 1024) as usize;
597                self.profiler.record_allocation("growth", growth);
598            }
599            MemoryPattern::GrowthAndRelease => {
600                // Periodic allocation and deallocation
601                let cycle = elapsed.as_secs() % 60;
602                if cycle < 30 {
603                    self.profiler.record_allocation("cyclic", 10240);
604                } else {
605                    self.profiler.record_deallocation("cyclic", 10240);
606                }
607            }
608            MemoryPattern::HighChurn => {
609                // Frequent small allocations and deallocations
610                self.profiler.record_allocation("churn", 1024);
611                self.profiler.record_deallocation("churn", 512);
612            }
613            MemoryPattern::Fragmentation => {
614                // Create fragmentation by varying sizes
615                let sizes = [64, 256, 1024, 4096, 16384];
616                let idx = (elapsed.as_millis() as usize) % sizes.len();
617                self.profiler.record_allocation("frag", sizes[idx]);
618                if elapsed.as_millis() % 2 == 0 {
619                    self.profiler.record_deallocation("frag", sizes[(idx + 2) % sizes.len()]);
620                }
621            }
622            MemoryPattern::Custom(pattern) => {
623                pattern.apply(&self.profiler, elapsed);
624            }
625        }
626
627        // Update memory metrics
628        let snapshot = self.profiler.snapshot();
629        let mut metrics = self.metrics.write();
630        metrics.memory_bytes = snapshot.current_bytes;
631        if snapshot.current_bytes > metrics.peak_memory_bytes {
632            metrics.peak_memory_bytes = snapshot.current_bytes;
633        }
634    }
635
636    async fn inject_failures(&self, config: &FailureConfig) {
637        if config.should_inject() {
638            let mut metrics = self.metrics.write();
639            metrics.error_count += 1;
640
641            let _ = self.event_tx.send(SimulationEvent::Error {
642                message: config.next_failure_type().to_string(),
643                recoverable: true,
644            });
645        }
646    }
647
648    fn check_success_criteria(&self, metrics: &SimulationMetrics) -> bool {
649        // Default success criteria
650        let max_error_rate = 0.01; // 1% error rate
651        let error_rate = if metrics.total_operations > 0 {
652            metrics.error_count as f64 / metrics.total_operations as f64
653        } else {
654            0.0
655        };
656
657        error_rate <= max_error_rate
658    }
659
660    /// Stop the simulation early.
661    pub fn stop(&self) {
662        self.running.store(false, std::sync::atomic::Ordering::SeqCst);
663    }
664}
665
666impl std::fmt::Debug for Simulator {
667    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668        f.debug_struct("Simulator")
669            .field("config", &self.config)
670            .field("phase", &self.phase())
671            .field("running", &self.is_running())
672            .finish()
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679
680    #[test]
681    fn test_simulation_config_default() {
682        let config = SimulationConfig::default();
683        assert_eq!(config.name, "default_simulation");
684        assert_eq!(config.max_duration, Duration::from_secs(300));
685    }
686
687    #[test]
688    fn test_simulation_config_builders() {
689        let config = SimulationConfig::new("test")
690            .with_scale(ScaleConfig::devices(1000))
691            .with_memory_pattern(MemoryPattern::HighChurn)
692            .with_max_duration(Duration::from_secs(60));
693
694        assert_eq!(config.name, "test");
695        assert_eq!(config.scale.device_count, 1000);
696    }
697
698    #[test]
699    fn test_quick_test_config() {
700        let config = SimulationConfig::quick_test();
701        assert_eq!(config.max_duration, Duration::from_secs(10));
702    }
703
704    #[test]
705    fn test_stress_test_config() {
706        let config = SimulationConfig::stress_test();
707        assert_eq!(config.name, "stress_test");
708        matches!(config.memory_pattern, MemoryPattern::HighChurn);
709    }
710
711    #[test]
712    fn test_simulator_creation() {
713        let config = SimulationConfig::quick_test();
714        let simulator = Simulator::new(config);
715        assert!(!simulator.is_running());
716        assert_eq!(simulator.phase(), SimulationPhase::Initializing);
717    }
718
719    #[test]
720    fn test_simulation_result_summary() {
721        let result = SimulationResult {
722            name: "test".into(),
723            passed: true,
724            duration: Duration::from_secs(10),
725            final_metrics: SimulationMetrics {
726                memory_bytes: 1024 * 1024,
727                peak_memory_bytes: 2 * 1024 * 1024,
728                active_devices: 100,
729                total_operations: 10000,
730                ops_per_second: 1000.0,
731                error_count: 0,
732                avg_latency_us: 100,
733                p99_latency_us: 500,
734            },
735            memory_report: None,
736            warnings: vec![],
737            errors: vec![],
738            phase_durations: std::collections::HashMap::new(),
739        };
740
741        let summary = result.summary();
742        assert!(summary.contains("PASSED"));
743        assert!(summary.contains("10000"));
744    }
745
746    #[tokio::test]
747    async fn test_simulator_quick_run() {
748        let config = SimulationConfig::new("quick")
749            .with_scale(ScaleConfig::tiny())
750            .with_max_duration(Duration::from_millis(100));
751
752        let simulator = Simulator::new(config);
753        let result = simulator.run().await.unwrap();
754
755        assert!(result.passed);
756        assert!(result.final_metrics.active_devices == 0); // Should be cleaned up
757    }
758}