Skip to main content

mabi_core/simulation/
mod.rs

1//! Simulation framework for comprehensive testing scenarios.
2//!
3//! This module provides a flexible framework for simulating various conditions
4//! that the TRAP simulator might encounter in production environments.
5//!
6//! # Features
7//!
8//! - **Scale Simulation**: Test with 50,000+ virtual devices
9//! - **Memory Patterns**: Simulate various memory allocation patterns
10//! - **Failure Injection**: Inject faults and errors for resilience testing
11//! - **Load Patterns**: Various load profiles (steady, burst, ramp, spike)
12//! - **Resource Constraints**: Simulate limited memory/CPU environments
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use mabi_core::simulation::{
18//!     Simulator, SimulationConfig, ScaleConfig,
19//!     MemoryPattern, LoadPattern,
20//! };
21//!
22//! let config = SimulationConfig::default()
23//!     .with_scale(ScaleConfig::devices(50_000))
24//!     .with_memory_pattern(MemoryPattern::GrowthAndRelease)
25//!     .with_load_pattern(LoadPattern::Burst { peak: 100_000, duration_secs: 60 });
26//!
27//! let simulator = Simulator::new(config);
28//! let results = simulator.run().await?;
29//! ```
30
31pub mod failure;
32pub mod load;
33pub mod memory_sim;
34pub mod scale;
35pub mod scenarios;
36
37pub use failure::*;
38pub use load::*;
39pub use memory_sim::*;
40pub use scale::*;
41pub use scenarios::*;
42
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47use serde::{Deserialize, Serialize};
48use tokio::sync::broadcast;
49
50use crate::error::Result;
51use crate::profiling::{ProfileReport, Profiler};
52
53/// Main simulation configuration.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SimulationConfig {
56    /// Name of the simulation.
57    pub name: String,
58
59    /// Description of what this simulation tests.
60    pub description: String,
61
62    /// Scale configuration (number of devices, points, etc.).
63    pub scale: ScaleConfig,
64
65    /// Memory simulation pattern.
66    pub memory_pattern: MemoryPattern,
67
68    /// Load pattern to apply.
69    pub load_pattern: LoadPattern,
70
71    /// Failure injection configuration.
72    pub failure_config: Option<FailureConfig>,
73
74    /// Maximum duration for the simulation.
75    pub max_duration: Duration,
76
77    /// Interval between progress reports.
78    pub report_interval: Duration,
79
80    /// Whether to collect detailed metrics.
81    pub detailed_metrics: bool,
82
83    /// Random seed for reproducibility.
84    pub seed: Option<u64>,
85}
86
87impl Default for SimulationConfig {
88    fn default() -> Self {
89        Self {
90            name: "default_simulation".into(),
91            description: "Default simulation configuration".into(),
92            scale: ScaleConfig::default(),
93            memory_pattern: MemoryPattern::Steady,
94            load_pattern: LoadPattern::Steady { ops_per_sec: 1000 },
95            failure_config: None,
96            max_duration: Duration::from_secs(300),
97            report_interval: Duration::from_secs(10),
98            detailed_metrics: true,
99            seed: None,
100        }
101    }
102}
103
104impl SimulationConfig {
105    /// Create a new simulation config with a name.
106    pub fn new(name: impl Into<String>) -> Self {
107        Self {
108            name: name.into(),
109            ..Default::default()
110        }
111    }
112
113    /// Set the scale configuration.
114    pub fn with_scale(mut self, scale: ScaleConfig) -> Self {
115        self.scale = scale;
116        self
117    }
118
119    /// Set the memory pattern.
120    pub fn with_memory_pattern(mut self, pattern: MemoryPattern) -> Self {
121        self.memory_pattern = pattern;
122        self
123    }
124
125    /// Set the load pattern.
126    pub fn with_load_pattern(mut self, pattern: LoadPattern) -> Self {
127        self.load_pattern = pattern;
128        self
129    }
130
131    /// Set the failure configuration.
132    pub fn with_failures(mut self, config: FailureConfig) -> Self {
133        self.failure_config = Some(config);
134        self
135    }
136
137    /// Set the maximum duration.
138    pub fn with_max_duration(mut self, duration: Duration) -> Self {
139        self.max_duration = duration;
140        self
141    }
142
143    /// Set the random seed.
144    pub fn with_seed(mut self, seed: u64) -> Self {
145        self.seed = Some(seed);
146        self
147    }
148
149    /// Create a quick test configuration.
150    pub fn quick_test() -> Self {
151        Self {
152            name: "quick_test".into(),
153            description: "Quick validation test".into(),
154            scale: ScaleConfig::small(),
155            max_duration: Duration::from_secs(10),
156            ..Default::default()
157        }
158    }
159
160    /// Create a stress test configuration.
161    pub fn stress_test() -> Self {
162        Self {
163            name: "stress_test".into(),
164            description: "High-load stress test".into(),
165            scale: ScaleConfig::large(),
166            memory_pattern: MemoryPattern::HighChurn,
167            load_pattern: LoadPattern::Spike {
168                baseline: 10_000,
169                peak: 100_000,
170                spike_duration: Duration::from_secs(30),
171            },
172            max_duration: Duration::from_secs(600),
173            ..Default::default()
174        }
175    }
176
177    /// Create a memory leak detection configuration.
178    pub fn memory_leak_test() -> Self {
179        Self {
180            name: "memory_leak_test".into(),
181            description: "Test for memory leaks over time".into(),
182            scale: ScaleConfig::medium(),
183            memory_pattern: MemoryPattern::GrowthOnly,
184            max_duration: Duration::from_secs(300),
185            report_interval: Duration::from_secs(5),
186            ..Default::default()
187        }
188    }
189}
190
191/// Simulation events for progress tracking.
192#[derive(Debug, Clone)]
193pub enum SimulationEvent {
194    /// Simulation started.
195    Started {
196        config: SimulationConfig,
197        start_time: Instant,
198    },
199
200    /// Progress update.
201    Progress {
202        elapsed: Duration,
203        completion_percent: f64,
204        current_metrics: SimulationMetrics,
205    },
206
207    /// Phase changed.
208    PhaseChanged {
209        from: SimulationPhase,
210        to: SimulationPhase,
211    },
212
213    /// Error occurred.
214    Error { message: String, recoverable: bool },
215
216    /// Simulation completed.
217    Completed { result: SimulationResult },
218}
219
220/// Current phase of the simulation.
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
222pub enum SimulationPhase {
223    /// Initializing resources.
224    Initializing,
225    /// Warming up (pre-test stabilization).
226    WarmUp,
227    /// Main test execution.
228    Running,
229    /// Ramping down.
230    RampDown,
231    /// Collecting final metrics.
232    Finalizing,
233    /// Completed.
234    Completed,
235}
236
237/// Real-time simulation metrics.
238#[derive(Debug, Clone, Default, Serialize, Deserialize)]
239pub struct SimulationMetrics {
240    /// Current memory usage in bytes.
241    pub memory_bytes: u64,
242    /// Peak memory usage in bytes.
243    pub peak_memory_bytes: u64,
244    /// Current number of active devices.
245    pub active_devices: u64,
246    /// Total operations performed.
247    pub total_operations: u64,
248    /// Operations per second (current rate).
249    pub ops_per_second: f64,
250    /// Error count.
251    pub error_count: u64,
252    /// Average latency in microseconds.
253    pub avg_latency_us: u64,
254    /// P99 latency in microseconds.
255    pub p99_latency_us: u64,
256}
257
258/// Final simulation result.
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct SimulationResult {
261    /// Simulation name.
262    pub name: String,
263    /// Whether the simulation passed all criteria.
264    pub passed: bool,
265    /// Total duration of the simulation.
266    pub duration: Duration,
267    /// Final metrics.
268    pub final_metrics: SimulationMetrics,
269    /// Memory profiling report.
270    pub memory_report: Option<ProfileReport>,
271    /// Any warnings generated.
272    pub warnings: Vec<String>,
273    /// Any errors that occurred.
274    pub errors: Vec<String>,
275    /// Per-phase timing.
276    pub phase_durations: std::collections::HashMap<String, Duration>,
277}
278
279impl SimulationResult {
280    /// Check if there were any errors.
281    pub fn has_errors(&self) -> bool {
282        !self.errors.is_empty()
283    }
284
285    /// Get a summary string.
286    pub fn summary(&self) -> String {
287        format!(
288            "Simulation '{}': {} in {:?}\n  \
289             Devices: {}, Ops: {}, Errors: {}\n  \
290             Memory: {} MB (peak: {} MB)\n  \
291             Latency: avg {}us, p99 {}us",
292            self.name,
293            if self.passed { "PASSED" } else { "FAILED" },
294            self.duration,
295            self.final_metrics.active_devices,
296            self.final_metrics.total_operations,
297            self.final_metrics.error_count,
298            self.final_metrics.memory_bytes / 1024 / 1024,
299            self.final_metrics.peak_memory_bytes / 1024 / 1024,
300            self.final_metrics.avg_latency_us,
301            self.final_metrics.p99_latency_us,
302        )
303    }
304}
305
306/// Main simulator engine.
307pub struct Simulator {
308    config: SimulationConfig,
309    profiler: Arc<Profiler>,
310    metrics: Arc<RwLock<SimulationMetrics>>,
311    phase: Arc<RwLock<SimulationPhase>>,
312    event_tx: broadcast::Sender<SimulationEvent>,
313    running: Arc<std::sync::atomic::AtomicBool>,
314}
315
316impl Simulator {
317    /// Create a new simulator with the given configuration.
318    pub fn new(config: SimulationConfig) -> Self {
319        let (event_tx, _) = broadcast::channel(256);
320
321        Self {
322            config,
323            profiler: Arc::new(Profiler::with_defaults()),
324            metrics: Arc::new(RwLock::new(SimulationMetrics::default())),
325            phase: Arc::new(RwLock::new(SimulationPhase::Initializing)),
326            event_tx,
327            running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
328        }
329    }
330
331    /// Subscribe to simulation events.
332    pub fn subscribe(&self) -> broadcast::Receiver<SimulationEvent> {
333        self.event_tx.subscribe()
334    }
335
336    /// Get current metrics.
337    pub fn metrics(&self) -> SimulationMetrics {
338        self.metrics.read().clone()
339    }
340
341    /// Get current phase.
342    pub fn phase(&self) -> SimulationPhase {
343        *self.phase.read()
344    }
345
346    /// Check if simulation is running.
347    pub fn is_running(&self) -> bool {
348        self.running.load(std::sync::atomic::Ordering::SeqCst)
349    }
350
351    /// Run the simulation.
352    pub async fn run(&self) -> Result<SimulationResult> {
353        let start_time = Instant::now();
354
355        // Start profiling
356        self.profiler.start();
357        self.running
358            .store(true, std::sync::atomic::Ordering::SeqCst);
359
360        // Emit start event
361        let _ = self.event_tx.send(SimulationEvent::Started {
362            config: self.config.clone(),
363            start_time,
364        });
365
366        let mut phase_durations = std::collections::HashMap::new();
367        let mut warnings = Vec::new();
368        let mut errors = Vec::new();
369
370        // Phase 1: Initialize
371        self.set_phase(SimulationPhase::Initializing);
372        let init_start = Instant::now();
373        if let Err(e) = self.run_initialization().await {
374            errors.push(format!("Initialization failed: {}", e));
375        }
376        phase_durations.insert("initialization".into(), init_start.elapsed());
377
378        // Phase 2: Warm-up
379        if errors.is_empty() {
380            self.set_phase(SimulationPhase::WarmUp);
381            let warmup_start = Instant::now();
382            if let Err(e) = self.run_warmup().await {
383                warnings.push(format!("Warm-up warning: {}", e));
384            }
385            phase_durations.insert("warmup".into(), warmup_start.elapsed());
386        }
387
388        // Phase 3: Main execution
389        if errors.is_empty() {
390            self.set_phase(SimulationPhase::Running);
391            let run_start = Instant::now();
392
393            // Run until max duration or completion
394            let run_result = self.run_main_phase(start_time).await;
395            if let Err(e) = run_result {
396                errors.push(format!("Execution error: {}", e));
397            }
398            phase_durations.insert("execution".into(), run_start.elapsed());
399        }
400
401        // Phase 4: Ramp down
402        self.set_phase(SimulationPhase::RampDown);
403        let rampdown_start = Instant::now();
404        self.run_rampdown().await;
405        phase_durations.insert("rampdown".into(), rampdown_start.elapsed());
406
407        // Phase 5: Finalize
408        self.set_phase(SimulationPhase::Finalizing);
409        let finalize_start = Instant::now();
410
411        // Stop profiling and get report
412        self.profiler.stop();
413        let memory_report = Some(self.profiler.generate_report());
414
415        // Check for memory leaks
416        let leak_warnings = self.profiler.check_leaks();
417        for leak in leak_warnings {
418            warnings.push(format!(
419                "Potential memory leak: {} - {}",
420                leak.region, leak.message
421            ));
422        }
423
424        phase_durations.insert("finalization".into(), finalize_start.elapsed());
425
426        // Build result
427        self.set_phase(SimulationPhase::Completed);
428        self.running
429            .store(false, std::sync::atomic::Ordering::SeqCst);
430
431        let final_metrics = self.metrics.read().clone();
432        let passed = errors.is_empty() && self.check_success_criteria(&final_metrics);
433
434        let result = SimulationResult {
435            name: self.config.name.clone(),
436            passed,
437            duration: start_time.elapsed(),
438            final_metrics,
439            memory_report,
440            warnings,
441            errors,
442            phase_durations,
443        };
444
445        // Emit completion event
446        let _ = self.event_tx.send(SimulationEvent::Completed {
447            result: result.clone(),
448        });
449
450        Ok(result)
451    }
452
453    fn set_phase(&self, new_phase: SimulationPhase) {
454        let old_phase = *self.phase.read();
455        *self.phase.write() = new_phase;
456
457        let _ = self.event_tx.send(SimulationEvent::PhaseChanged {
458            from: old_phase,
459            to: new_phase,
460        });
461    }
462
463    async fn run_initialization(&self) -> Result<()> {
464        // Initialize based on scale config
465        let target_devices = self.config.scale.device_count;
466        let batch_size = self.config.scale.batch_size;
467
468        tracing::info!(
469            "Initializing simulation: {} devices in batches of {}",
470            target_devices,
471            batch_size
472        );
473
474        let mut current = 0u64;
475        while current < target_devices as u64 {
476            let batch = (target_devices as u64 - current).min(batch_size as u64);
477
478            // Simulate device creation
479            self.profiler.record_allocation(
480                "devices",
481                (batch as usize) * self.config.scale.memory_per_device,
482            );
483
484            current += batch;
485
486            // Update metrics
487            {
488                let mut metrics = self.metrics.write();
489                metrics.active_devices = current;
490                metrics.memory_bytes = self.profiler.snapshot().current_bytes;
491            }
492
493            // Yield to allow other tasks
494            tokio::task::yield_now().await;
495        }
496
497        Ok(())
498    }
499
500    async fn run_warmup(&self) -> Result<()> {
501        // Brief warm-up period
502        let warmup_duration = Duration::from_secs(5);
503        let start = Instant::now();
504
505        while start.elapsed() < warmup_duration {
506            // Simulate some operations
507            self.simulate_operations(100).await;
508            tokio::time::sleep(Duration::from_millis(100)).await;
509        }
510
511        Ok(())
512    }
513
514    async fn run_main_phase(&self, start_time: Instant) -> Result<()> {
515        let mut last_report = Instant::now();
516
517        while start_time.elapsed() < self.config.max_duration {
518            // Apply load pattern
519            let ops = self
520                .config
521                .load_pattern
522                .ops_for_elapsed(start_time.elapsed());
523            self.simulate_operations(ops).await;
524
525            // Apply memory pattern
526            self.apply_memory_pattern(start_time.elapsed()).await;
527
528            // Inject failures if configured
529            if let Some(ref failure_config) = self.config.failure_config {
530                self.inject_failures(failure_config).await;
531            }
532
533            // Report progress
534            if last_report.elapsed() >= self.config.report_interval {
535                let metrics = self.metrics.read().clone();
536                let completion = start_time.elapsed().as_secs_f64()
537                    / self.config.max_duration.as_secs_f64()
538                    * 100.0;
539
540                let _ = self.event_tx.send(SimulationEvent::Progress {
541                    elapsed: start_time.elapsed(),
542                    completion_percent: completion.min(100.0),
543                    current_metrics: metrics,
544                });
545
546                last_report = Instant::now();
547            }
548
549            tokio::time::sleep(Duration::from_millis(10)).await;
550        }
551
552        Ok(())
553    }
554
555    async fn run_rampdown(&self) {
556        // Gracefully release resources
557        let current_devices = self.metrics.read().active_devices;
558        let batch_size = self.config.scale.batch_size as u64;
559
560        let mut remaining = current_devices;
561        while remaining > 0 {
562            let batch = remaining.min(batch_size);
563
564            self.profiler.record_deallocation(
565                "devices",
566                (batch as usize) * self.config.scale.memory_per_device,
567            );
568
569            remaining -= batch;
570
571            {
572                let mut metrics = self.metrics.write();
573                metrics.active_devices = remaining;
574                metrics.memory_bytes = self.profiler.snapshot().current_bytes;
575            }
576
577            tokio::task::yield_now().await;
578        }
579    }
580
581    async fn simulate_operations(&self, count: u64) {
582        let mut metrics = self.metrics.write();
583        metrics.total_operations += count;
584
585        // Simulate latency
586        let base_latency = 100u64; // 100us base
587        let jitter = (count % 50) * 2;
588        metrics.avg_latency_us = base_latency + jitter;
589        metrics.p99_latency_us = base_latency * 3 + jitter * 2;
590    }
591
592    async fn apply_memory_pattern(&self, elapsed: Duration) {
593        match &self.config.memory_pattern {
594            MemoryPattern::Steady => {
595                // No additional allocations
596            }
597            MemoryPattern::GrowthOnly => {
598                // Continuous growth
599                let growth = (elapsed.as_secs() * 1024) as usize;
600                self.profiler.record_allocation("growth", growth);
601            }
602            MemoryPattern::GrowthAndRelease => {
603                // Periodic allocation and deallocation
604                let cycle = elapsed.as_secs() % 60;
605                if cycle < 30 {
606                    self.profiler.record_allocation("cyclic", 10240);
607                } else {
608                    self.profiler.record_deallocation("cyclic", 10240);
609                }
610            }
611            MemoryPattern::HighChurn => {
612                // Frequent small allocations and deallocations
613                self.profiler.record_allocation("churn", 1024);
614                self.profiler.record_deallocation("churn", 512);
615            }
616            MemoryPattern::Fragmentation => {
617                // Create fragmentation by varying sizes
618                let sizes = [64, 256, 1024, 4096, 16384];
619                let idx = (elapsed.as_millis() as usize) % sizes.len();
620                self.profiler.record_allocation("frag", sizes[idx]);
621                if elapsed.as_millis() % 2 == 0 {
622                    self.profiler
623                        .record_deallocation("frag", sizes[(idx + 2) % sizes.len()]);
624                }
625            }
626            MemoryPattern::Custom(pattern) => {
627                pattern.apply(&self.profiler, elapsed);
628            }
629        }
630
631        // Update memory metrics
632        let snapshot = self.profiler.snapshot();
633        let mut metrics = self.metrics.write();
634        metrics.memory_bytes = snapshot.current_bytes;
635        if snapshot.current_bytes > metrics.peak_memory_bytes {
636            metrics.peak_memory_bytes = snapshot.current_bytes;
637        }
638    }
639
640    async fn inject_failures(&self, config: &FailureConfig) {
641        if config.should_inject() {
642            let mut metrics = self.metrics.write();
643            metrics.error_count += 1;
644
645            let _ = self.event_tx.send(SimulationEvent::Error {
646                message: config.next_failure_type().to_string(),
647                recoverable: true,
648            });
649        }
650    }
651
652    fn check_success_criteria(&self, metrics: &SimulationMetrics) -> bool {
653        // Default success criteria
654        let max_error_rate = 0.01; // 1% error rate
655        let error_rate = if metrics.total_operations > 0 {
656            metrics.error_count as f64 / metrics.total_operations as f64
657        } else {
658            0.0
659        };
660
661        error_rate <= max_error_rate
662    }
663
664    /// Stop the simulation early.
665    pub fn stop(&self) {
666        self.running
667            .store(false, std::sync::atomic::Ordering::SeqCst);
668    }
669}
670
671impl std::fmt::Debug for Simulator {
672    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673        f.debug_struct("Simulator")
674            .field("config", &self.config)
675            .field("phase", &self.phase())
676            .field("running", &self.is_running())
677            .finish()
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684
685    #[test]
686    fn test_simulation_config_default() {
687        let config = SimulationConfig::default();
688        assert_eq!(config.name, "default_simulation");
689        assert_eq!(config.max_duration, Duration::from_secs(300));
690    }
691
692    #[test]
693    fn test_simulation_config_builders() {
694        let config = SimulationConfig::new("test")
695            .with_scale(ScaleConfig::devices(1000))
696            .with_memory_pattern(MemoryPattern::HighChurn)
697            .with_max_duration(Duration::from_secs(60));
698
699        assert_eq!(config.name, "test");
700        assert_eq!(config.scale.device_count, 1000);
701    }
702
703    #[test]
704    fn test_quick_test_config() {
705        let config = SimulationConfig::quick_test();
706        assert_eq!(config.max_duration, Duration::from_secs(10));
707    }
708
709    #[test]
710    fn test_stress_test_config() {
711        let config = SimulationConfig::stress_test();
712        assert_eq!(config.name, "stress_test");
713        matches!(config.memory_pattern, MemoryPattern::HighChurn);
714    }
715
716    #[test]
717    fn test_simulator_creation() {
718        let config = SimulationConfig::quick_test();
719        let simulator = Simulator::new(config);
720        assert!(!simulator.is_running());
721        assert_eq!(simulator.phase(), SimulationPhase::Initializing);
722    }
723
724    #[test]
725    fn test_simulation_result_summary() {
726        let result = SimulationResult {
727            name: "test".into(),
728            passed: true,
729            duration: Duration::from_secs(10),
730            final_metrics: SimulationMetrics {
731                memory_bytes: 1024 * 1024,
732                peak_memory_bytes: 2 * 1024 * 1024,
733                active_devices: 100,
734                total_operations: 10000,
735                ops_per_second: 1000.0,
736                error_count: 0,
737                avg_latency_us: 100,
738                p99_latency_us: 500,
739            },
740            memory_report: None,
741            warnings: vec![],
742            errors: vec![],
743            phase_durations: std::collections::HashMap::new(),
744        };
745
746        let summary = result.summary();
747        assert!(summary.contains("PASSED"));
748        assert!(summary.contains("10000"));
749    }
750
751    #[tokio::test]
752    async fn test_simulator_quick_run() {
753        let config = SimulationConfig::new("quick")
754            .with_scale(ScaleConfig::tiny())
755            .with_max_duration(Duration::from_millis(100));
756
757        let simulator = Simulator::new(config);
758        let result = simulator.run().await.unwrap();
759
760        assert!(result.passed);
761        assert!(result.final_metrics.active_devices == 0); // Should be cleaned up
762    }
763}