moonpool_sim/runner/
builder.rs

1//! Simulation builder pattern for configuring and running experiments.
2//!
3//! This module provides the main SimulationBuilder type for setting up
4//! and executing simulation experiments.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::time::{Duration, Instant};
10use tracing::instrument;
11
12use crate::{InvariantCheck, SimulationResult};
13
14use super::orchestrator::{IterationManager, MetricsCollector, WorkloadOrchestrator};
15use super::report::{SimulationMetrics, SimulationReport};
16use super::topology::{Workload, WorkloadTopology};
17
18/// Configuration for how many iterations a simulation should run.
19///
20/// Provides flexible control over simulation execution duration and completion criteria.
21#[derive(Debug, Clone)]
22pub enum IterationControl {
23    /// Run a fixed number of iterations with specific seeds
24    FixedCount(usize),
25    /// Run for a specific duration of wall-clock time
26    TimeLimit(Duration),
27    /// Run until all sometimes_assert! assertions have been reached (with a safety limit)
28    UntilAllSometimesReached(usize),
29}
30
31/// Type alias for workload function signature to reduce complexity.
32pub(crate) type WorkloadFn = Box<
33    dyn Fn(
34        crate::SimRandomProvider,
35        crate::SimNetworkProvider,
36        crate::SimTimeProvider,
37        crate::TokioTaskProvider,
38        WorkloadTopology,
39    ) -> Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>,
40>;
41
42/// Builder pattern for configuring and running simulation experiments.
43pub struct SimulationBuilder {
44    iteration_control: IterationControl,
45    workloads: Vec<Workload>,
46    seeds: Vec<u64>,
47    next_ip: u32, // For auto-assigning IP addresses starting from 10.0.0.1
48    use_random_config: bool,
49    invariants: Vec<InvariantCheck>,
50}
51
52impl Default for SimulationBuilder {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl SimulationBuilder {
59    /// Create a new empty simulation builder.
60    pub fn new() -> Self {
61        Self {
62            iteration_control: IterationControl::FixedCount(1),
63            workloads: Vec::new(),
64            seeds: Vec::new(),
65            next_ip: 1, // Start from 10.0.0.1
66            use_random_config: false,
67            invariants: Vec::new(),
68        }
69    }
70
71    /// Register a workload with the simulation builder.
72    ///
73    /// # Arguments
74    /// * `name` - Name for the workload (for reporting purposes)
75    /// * `workload` - Async function that takes a RandomProvider, NetworkProvider, TimeProvider, TaskProvider, and WorkloadTopology and returns simulation metrics
76    pub fn register_workload<S, F, Fut>(mut self, name: S, workload: F) -> Self
77    where
78        S: Into<String>,
79        F: Fn(
80                crate::SimRandomProvider,
81                crate::SimNetworkProvider,
82                crate::SimTimeProvider,
83                crate::TokioTaskProvider,
84                WorkloadTopology,
85            ) -> Fut
86            + 'static,
87        Fut: Future<Output = SimulationResult<SimulationMetrics>> + 'static,
88    {
89        // Auto-assign IP address starting from 10.0.0.1
90        let ip_address = format!("10.0.0.{}", self.next_ip);
91        self.next_ip += 1;
92
93        let boxed_workload = Box::new(
94            move |random_provider, provider, time_provider, task_provider, topology| {
95                let fut = workload(
96                    random_provider,
97                    provider,
98                    time_provider,
99                    task_provider,
100                    topology,
101                );
102                Box::pin(fut) as Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>
103            },
104        );
105
106        self.workloads.push(Workload {
107            name: name.into(),
108            ip_address,
109            workload: boxed_workload,
110        });
111        self
112    }
113
114    /// Set the number of iterations to run.
115    pub fn set_iterations(mut self, iterations: usize) -> Self {
116        self.iteration_control = IterationControl::FixedCount(iterations);
117        self
118    }
119
120    /// Set the iteration control strategy.
121    pub fn set_iteration_control(mut self, control: IterationControl) -> Self {
122        self.iteration_control = control;
123        self
124    }
125
126    /// Run for a specific wall-clock time duration.
127    pub fn set_time_limit(mut self, duration: Duration) -> Self {
128        self.iteration_control = IterationControl::TimeLimit(duration);
129        self
130    }
131
132    /// Run until all sometimes_assert! assertions have been reached.
133    pub fn run_until_all_sometimes_reached(mut self, safety_limit: usize) -> Self {
134        self.iteration_control = IterationControl::UntilAllSometimesReached(safety_limit);
135        self
136    }
137
138    /// Set specific seeds for deterministic debugging and regression testing.
139    ///
140    /// This method is specifically designed for debugging scenarios where you need
141    /// to reproduce specific problematic behavior. Unlike `set_seeds()`, the name
142    /// makes it clear this is for debugging/testing specific scenarios.
143    ///
144    /// **Key differences from `set_seeds()`:**
145    /// - **Intent**: Clearly indicates debugging/testing purpose
146    /// - **Usage**: Typically used with `FixedCount(1)` for reproducing exact scenarios
147    /// - **Documentation**: Self-documenting that these seeds are for specific test cases
148    ///
149    /// **Common use cases:**
150    /// - Reproducing TCP ordering bugs (e.g., seed 42 revealed the ordering issue)
151    /// - Regression testing for specific edge cases
152    /// - Deterministic testing in CI/CD pipelines
153    /// - Investigating assertion failures at specific seeds
154    ///
155    /// Example: `set_debug_seeds(vec![42])` with `FixedCount(1)` ensures the test
156    /// always runs with seed 42, making it reproducible for debugging the TCP ordering fix.
157    pub fn set_debug_seeds(mut self, seeds: Vec<u64>) -> Self {
158        self.seeds = seeds;
159        self
160    }
161
162    /// Enable randomized network configuration for chaos testing
163    pub fn use_random_config(mut self) -> Self {
164        self.use_random_config = true;
165        self
166    }
167
168    /// Register invariant check functions to be executed after every simulation event.
169    ///
170    /// Invariants receive a snapshot of all actor states and the current simulation time,
171    /// and should panic if any global property is violated.
172    ///
173    /// # Arguments
174    /// * `invariants` - Vector of invariant check functions
175    ///
176    /// # Example
177    /// ```ignore
178    /// SimulationBuilder::new()
179    ///     .with_invariants(vec![
180    ///         Box::new(|states, _time| {
181    ///             let total_sent: u64 = states.values()
182    ///                 .filter_map(|v| v.get("messages_sent").and_then(|s| s.as_u64()))
183    ///                 .sum();
184    ///             let total_received: u64 = states.values()
185    ///                 .filter_map(|v| v.get("messages_received").and_then(|r| r.as_u64()))
186    ///                 .sum();
187    ///             assert!(total_received <= total_sent, "Message conservation violated");
188    ///         })
189    ///     ])
190    /// ```
191    pub fn with_invariants(mut self, invariants: Vec<InvariantCheck>) -> Self {
192        self.invariants = invariants;
193        self
194    }
195
196    #[instrument(skip_all)]
197    /// Run the simulation and generate a report.
198    pub async fn run(self) -> SimulationReport {
199        if self.workloads.is_empty() {
200            return SimulationReport {
201                iterations: 0,
202                successful_runs: 0,
203                failed_runs: 0,
204                metrics: SimulationMetrics::default(),
205                individual_metrics: Vec::new(),
206                seeds_used: Vec::new(),
207                seeds_failing: Vec::new(),
208                assertion_results: HashMap::new(),
209                assertion_violations: Vec::new(),
210            };
211        }
212
213        // Initialize iteration state
214        let mut iteration_manager =
215            IterationManager::new(self.iteration_control.clone(), self.seeds.clone());
216        let mut metrics_collector = MetricsCollector::new();
217
218        while iteration_manager.should_continue() {
219            let seed = iteration_manager.next_iteration();
220            let iteration_count = iteration_manager.current_iteration();
221
222            // Prepare clean state for this iteration
223            crate::sim::reset_sim_rng();
224            crate::sim::set_sim_seed(seed);
225
226            // Initialize buggify system for this iteration
227            // Use moderate probabilities: 50% activation rate, 25% firing rate
228            crate::chaos::buggify_init(0.5, 0.25);
229
230            // Create fresh NetworkConfiguration for this iteration
231            let network_config = if self.use_random_config {
232                crate::NetworkConfiguration::random_for_seed()
233            } else {
234                crate::NetworkConfiguration::default()
235            };
236
237            // Create shared SimWorld for this iteration using fresh network config
238            let sim = crate::sim::SimWorld::new_with_network_config_and_seed(network_config, seed);
239            let provider = sim.network_provider();
240
241            let start_time = Instant::now();
242
243            // Create shutdown signal for this iteration
244            let shutdown_signal = tokio_util::sync::CancellationToken::new();
245
246            // Execute workloads using orchestrator
247            let orchestration_result = WorkloadOrchestrator::orchestrate_workloads(
248                &self.workloads,
249                seed,
250                provider,
251                sim,
252                shutdown_signal,
253                iteration_count,
254                &self.invariants,
255            )
256            .await;
257
258            let (all_results, sim_metrics) = match orchestration_result {
259                Ok((results, metrics)) => (results, metrics),
260                Err((faulty_seeds_from_deadlock, failed_count)) => {
261                    // Handle deadlock case - merge with existing state and return early
262                    metrics_collector.add_faulty_seeds(faulty_seeds_from_deadlock);
263                    metrics_collector.add_failed_runs(failed_count);
264
265                    // Create early exit report
266                    let assertion_results = crate::chaos::get_assertion_results();
267                    let assertion_violations = crate::chaos::validate_assertion_contracts();
268                    crate::chaos::buggify_reset();
269
270                    return metrics_collector.generate_report(
271                        iteration_count,
272                        iteration_manager.seeds_used().to_vec(),
273                        assertion_results,
274                        assertion_violations,
275                    );
276                }
277            };
278
279            let wall_time = start_time.elapsed();
280
281            // Record iteration results using metrics collector
282            metrics_collector.record_iteration(seed, wall_time, all_results, sim_metrics);
283
284            // Reset buggify state after each iteration to ensure clean state
285            crate::chaos::buggify_reset();
286        }
287
288        // End of main iteration loop
289
290        // Log summary of all seeds used
291        let iteration_count = iteration_manager.current_iteration();
292        let (successful_runs, failed_runs) = metrics_collector.current_stats();
293        tracing::info!(
294            "📊 Simulation completed: {}/{} iterations successful",
295            successful_runs,
296            iteration_count
297        );
298        tracing::info!("🌱 Seeds used: {:?}", iteration_manager.seeds_used());
299        if failed_runs > 0 {
300            tracing::warn!(
301                "⚠️ {} iterations failed - check logs above for failing seeds",
302                failed_runs
303            );
304        }
305
306        // Collect assertion results and validate them
307        let assertion_results = crate::chaos::get_assertion_results();
308        let assertion_violations = crate::chaos::validate_assertion_contracts();
309
310        // Final buggify reset to ensure no impact on subsequent code
311        crate::chaos::buggify_reset();
312
313        metrics_collector.generate_report(
314            iteration_count,
315            iteration_manager.seeds_used().to_vec(),
316            assertion_results,
317            assertion_violations,
318        )
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::RandomProvider;
326
327    #[test]
328    fn test_simulation_builder_basic() {
329        let local_runtime = tokio::runtime::Builder::new_current_thread()
330            .enable_io()
331            .enable_time()
332            .build_local(Default::default())
333            .expect("Failed to build local runtime");
334
335        let report = local_runtime.block_on(async move {
336            SimulationBuilder::new()
337                .register_workload(
338                    "test_workload",
339                    |random, _provider, _time_provider, _task_provider, _topology| async move {
340                        Ok(SimulationMetrics {
341                            simulated_time: Duration::from_millis(random.random_range(0..100)),
342                            events_processed: random.random_range(0..10),
343                            ..Default::default()
344                        })
345                    },
346                )
347                .set_iterations(3)
348                .set_debug_seeds(vec![1, 2, 3])
349                .run()
350                .await
351        });
352
353        assert_eq!(report.iterations, 3);
354        assert_eq!(report.successful_runs, 3);
355        assert_eq!(report.failed_runs, 0);
356        assert_eq!(report.success_rate(), 100.0);
357
358        // Check that seeds were used correctly
359        assert_eq!(report.seeds_used, vec![1, 2, 3]);
360    }
361
362    #[test]
363    fn test_simulation_builder_with_failures() {
364        let local_runtime = tokio::runtime::Builder::new_current_thread()
365            .enable_io()
366            .enable_time()
367            .build_local(Default::default())
368            .expect("Failed to build local runtime");
369
370        let report = local_runtime.block_on(async move {
371            SimulationBuilder::new()
372                .register_workload(
373                    "failing_workload",
374                    |random, _provider, _time_provider, _task_provider, _topology| async move {
375                        // Use deterministic approach: fail if random number is even, succeed if odd
376                        let random_num = random.random_range(0..100);
377                        if random_num % 2 == 0 {
378                            Err(crate::SimulationError::InvalidState(
379                                "Test failure".to_string(),
380                            ))
381                        } else {
382                            Ok(SimulationMetrics {
383                                simulated_time: Duration::from_millis(100),
384                                events_processed: 5,
385                                ..Default::default()
386                            })
387                        }
388                    },
389                )
390                .set_iterations(4)
391                .set_debug_seeds(vec![1, 2, 5, 6]) // Try different seeds to get 2 even, 2 odd
392                .run()
393                .await
394        });
395
396        assert_eq!(report.iterations, 4);
397        assert_eq!(report.successful_runs, 2);
398        assert_eq!(report.failed_runs, 2);
399        assert_eq!(report.success_rate(), 50.0);
400
401        // Only successful runs should contribute to averages
402        assert_eq!(report.average_simulated_time(), Duration::from_millis(100));
403        assert_eq!(report.average_events_processed(), 5.0);
404    }
405
406    #[tokio::test]
407    async fn test_simulation_report_display() {
408        let metrics = SimulationMetrics {
409            simulated_time: Duration::from_millis(200),
410            events_processed: 10,
411            ..Default::default()
412        };
413
414        let report = SimulationReport {
415            iterations: 2,
416            successful_runs: 2,
417            failed_runs: 0,
418            metrics,
419            individual_metrics: vec![],
420            seeds_used: vec![1, 2],
421            seeds_failing: vec![42],
422            assertion_results: HashMap::new(),
423            assertion_violations: Vec::new(),
424        };
425
426        let display = format!("{}", report);
427        assert!(display.contains("Iterations: 2"));
428        assert!(display.contains("Success Rate: 100.00%"));
429    }
430
431    #[test]
432    fn test_simulation_builder_with_network_config() {
433        let local_runtime = tokio::runtime::Builder::new_current_thread()
434            .enable_io()
435            .enable_time()
436            .build_local(Default::default())
437            .expect("Failed to build local runtime");
438
439        let report = local_runtime.block_on(async move {
440            SimulationBuilder::new()
441                .register_workload(
442                    "network_test",
443                    |_seed, _provider, _time_provider, _task_provider, _topology| async move {
444                        Ok(SimulationMetrics {
445                            simulated_time: Duration::from_millis(50),
446                            events_processed: 10,
447                            ..Default::default()
448                        })
449                    },
450                )
451                .set_iterations(2)
452                .set_debug_seeds(vec![42, 43])
453                .run()
454                .await
455        });
456
457        assert_eq!(report.iterations, 2);
458        assert_eq!(report.successful_runs, 2);
459        assert_eq!(report.failed_runs, 0);
460        assert_eq!(report.success_rate(), 100.0);
461
462        // Verify the network configuration was used by checking if simulation time advanced
463        // (WAN config should have higher latencies that cause time advancement)
464        assert!(report.average_simulated_time() >= Duration::from_millis(50));
465    }
466
467    #[test]
468    fn test_multiple_workloads() {
469        let local_runtime = tokio::runtime::Builder::new_current_thread()
470            .enable_io()
471            .enable_time()
472            .build_local(Default::default())
473            .expect("Failed to build local runtime");
474
475        let report = local_runtime.block_on(async move {
476            SimulationBuilder::new()
477                .register_workload(
478                    "workload1",
479                    |random, _provider, _time_provider, _task_provider, _topology| async move {
480                        Ok(SimulationMetrics {
481                            simulated_time: Duration::from_millis(random.random_range(0..50)),
482                            events_processed: random.random_range(0..5),
483                            ..Default::default()
484                        })
485                    },
486                )
487                .register_workload(
488                    "workload2",
489                    |random, _provider, _time_provider, _task_provider, _topology| async move {
490                        Ok(SimulationMetrics {
491                            simulated_time: Duration::from_millis(random.random_range(0..50)),
492                            events_processed: random.random_range(0..5),
493                            ..Default::default()
494                        })
495                    },
496                )
497                .set_iterations(2)
498                .set_debug_seeds(vec![10, 20])
499                .run()
500                .await
501        });
502
503        assert_eq!(report.successful_runs, 2);
504        assert_eq!(report.failed_runs, 0);
505        assert_eq!(report.success_rate(), 100.0);
506    }
507}