Skip to main content

moonpool_sim/runner/
builder.rs

1//! Simulation builder pattern for configuring and running experiments.
2//!
3//! This module provides the main SimulationBuilder type for setting up
4//! and executing simulation experiments.
5
6use std::collections::HashMap;
7use std::ops::{Range, RangeInclusive};
8use std::time::{Duration, Instant};
9use tracing::instrument;
10
11use crate::chaos::invariant_trait::Invariant;
12use crate::runner::fault_injector::{FaultInjector, PhaseConfig};
13use crate::runner::process::{Attrition, Process};
14use crate::runner::tags::TagDistribution;
15use crate::runner::workload::Workload;
16
17use super::orchestrator::{IterationManager, MetricsCollector, WorkloadOrchestrator};
18
19/// Client identity information for a single workload instance.
20#[derive(Debug, Clone, Copy)]
21pub(crate) struct WorkloadClientInfo {
22    /// The resolved client ID for this instance.
23    pub(crate) client_id: usize,
24    /// Total number of workload instances sharing this builder entry.
25    pub(crate) client_count: usize,
26}
27
28/// Resolved workload entries for a single iteration.
29struct ResolvedEntries {
30    workloads: Vec<Box<dyn Workload>>,
31    /// `return_map[i] = Some(entry_idx)` means `workloads[i]` should be
32    /// returned to `entries[entry_idx]` after the iteration.
33    return_map: Vec<Option<usize>>,
34    /// Client identity info parallel to `workloads`.
35    client_info: Vec<WorkloadClientInfo>,
36}
37use super::report::{SimulationMetrics, SimulationReport};
38
39/// Configuration for how many iterations a simulation should run.
40///
41/// Provides flexible control over simulation execution duration and completion criteria.
42#[derive(Debug, Clone)]
43pub enum IterationControl {
44    /// Run a fixed number of iterations with specific seeds
45    FixedCount(usize),
46    /// Run for a specific duration of wall-clock time
47    TimeLimit(Duration),
48    /// Stop when all `assert_sometimes!` have been reached AND a new seed
49    /// produces no new code coverage. Requires exploration to be enabled.
50    /// The `max_iterations` field is a safety cap.
51    UntilConverged {
52        /// Maximum number of seeds before stopping regardless.
53        max_iterations: usize,
54    },
55}
56
57/// How many instances of a workload to spawn per iteration.
58///
59/// Use `Fixed` for deterministic topologies or `Random` for chaos testing
60/// with varying cluster sizes.
61///
62/// # Examples
63///
64/// ```ignore
65/// // Always 3 replicas
66/// WorkloadCount::Fixed(3)
67///
68/// // 1 to 5 replicas, randomized per iteration
69/// WorkloadCount::Random(1..6)
70/// ```
71#[derive(Debug, Clone)]
72pub enum WorkloadCount {
73    /// Spawn exactly N instances every iteration.
74    Fixed(usize),
75    /// Spawn a random number of instances in `[start..end)` per iteration,
76    /// using the simulation RNG (deterministic per seed).
77    Random(Range<usize>),
78}
79
80impl WorkloadCount {
81    /// Resolve the count for the current iteration.
82    /// For `Random`, uses the sim RNG which must already be seeded.
83    fn resolve(&self) -> usize {
84        match self {
85            WorkloadCount::Fixed(n) => *n,
86            WorkloadCount::Random(range) => crate::sim::sim_random_range(range.clone()),
87        }
88    }
89}
90
91/// Strategy for assigning client IDs to workload instances.
92///
93/// Inspired by FoundationDB's `WorkloadContext.clientId`, but more
94/// programmable. The resolved client ID is available via
95/// [`SimContext::client_id()`](super::context::SimContext::client_id).
96///
97/// # Examples
98///
99/// ```ignore
100/// // FDB-style sequential: IDs 0, 1, 2
101/// ClientId::Fixed(0)
102///
103/// // Sequential starting from 10: IDs 10, 11, 12
104/// ClientId::Fixed(10)
105///
106/// // Random IDs in [100..200) per instance
107/// ClientId::RandomRange(100..200)
108/// ```
109#[derive(Debug, Clone, PartialEq)]
110pub enum ClientId {
111    /// Sequential IDs starting from `base`: instance 0 gets `base`,
112    /// instance 1 gets `base + 1`, and so on.
113    Fixed(usize),
114    /// Random ID drawn from `[start..end)` per instance,
115    /// using the simulation RNG (deterministic per seed).
116    /// IDs are not guaranteed unique across instances.
117    RandomRange(Range<usize>),
118}
119
120impl Default for ClientId {
121    fn default() -> Self {
122        Self::Fixed(0)
123    }
124}
125
126impl ClientId {
127    /// Resolve a client ID for the given instance index.
128    fn resolve(&self, index: usize) -> usize {
129        match self {
130            ClientId::Fixed(base) => base + index,
131            ClientId::RandomRange(range) => crate::sim::sim_random_range(range.clone()),
132        }
133    }
134}
135
136/// How many process instances to spawn per iteration.
137///
138/// Use `Fixed` for deterministic topologies or `Range` for chaos testing
139/// with varying cluster sizes.
140///
141/// # Examples
142///
143/// ```ignore
144/// // Always 3 server processes
145/// ProcessCount::Fixed(3)
146///
147/// // 3 to 7 server processes, randomized per iteration
148/// ProcessCount::Range(3..=7)
149/// ```
150#[derive(Debug, Clone, PartialEq)]
151pub enum ProcessCount {
152    /// Spawn exactly N process instances every iteration.
153    Fixed(usize),
154    /// Spawn a random number in `[start..=end]` per iteration,
155    /// using the simulation RNG (deterministic per seed).
156    Range(RangeInclusive<usize>),
157}
158
159impl ProcessCount {
160    /// Resolve the count for the current iteration.
161    fn resolve(&self) -> usize {
162        match self {
163            ProcessCount::Fixed(n) => *n,
164            ProcessCount::Range(range) => {
165                let start = *range.start();
166                let end = *range.end() + 1; // RangeInclusive -> exclusive for sim_random_range
167                if start >= end {
168                    return start;
169                }
170                crate::sim::sim_random_range(start..end)
171            }
172        }
173    }
174}
175
176impl From<usize> for ProcessCount {
177    fn from(n: usize) -> Self {
178        ProcessCount::Fixed(n)
179    }
180}
181
182impl From<RangeInclusive<usize>> for ProcessCount {
183    fn from(range: RangeInclusive<usize>) -> Self {
184        ProcessCount::Range(range)
185    }
186}
187
188/// Internal storage for a process entry in the builder.
189pub(crate) struct ProcessEntry {
190    pub(crate) count: ProcessCount,
191    pub(crate) factory: Box<dyn Fn() -> Box<dyn Process>>,
192    pub(crate) tags: TagDistribution,
193    pub(crate) name: String,
194}
195
196/// Internal storage for workload entries in the builder.
197enum WorkloadEntry {
198    /// Single instance, reused across iterations (from `.workload()`).
199    Instance(Option<Box<dyn Workload>>, ClientId),
200    /// Factory-based, fresh instances per iteration (from `.workloads()`).
201    Factory {
202        count: WorkloadCount,
203        client_id: ClientId,
204        factory: Box<dyn Fn(usize) -> Box<dyn Workload>>,
205    },
206}
207
208/// Builder pattern for configuring and running simulation experiments.
209pub struct SimulationBuilder {
210    iteration_control: IterationControl,
211    entries: Vec<WorkloadEntry>,
212    process_entry: Option<ProcessEntry>,
213    attrition: Option<Attrition>,
214    seeds: Vec<u64>,
215    use_random_config: bool,
216    invariants: Vec<Box<dyn Invariant>>,
217    fault_injectors: Vec<Box<dyn FaultInjector>>,
218    phase_config: Option<PhaseConfig>,
219    exploration_config: Option<moonpool_explorer::ExplorationConfig>,
220    replay_recipe: Option<super::report::BugRecipe>,
221    before_iteration_hooks: Vec<Box<dyn FnMut()>>,
222}
223
224impl Default for SimulationBuilder {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230impl SimulationBuilder {
231    /// Create a new empty simulation builder.
232    pub fn new() -> Self {
233        Self {
234            iteration_control: IterationControl::FixedCount(1),
235            entries: Vec::new(),
236            process_entry: None,
237            attrition: None,
238            seeds: Vec::new(),
239            use_random_config: false,
240            invariants: Vec::new(),
241            fault_injectors: Vec::new(),
242            phase_config: None,
243            exploration_config: None,
244            replay_recipe: None,
245            before_iteration_hooks: Vec::new(),
246        }
247    }
248
249    /// Add a single workload instance to the simulation.
250    ///
251    /// The instance is reused across iterations (the `run()` method is called
252    /// each iteration on the same struct). Gets `client_id = 0`, `client_count = 1`.
253    pub fn workload(mut self, w: impl Workload) -> Self {
254        self.entries.push(WorkloadEntry::Instance(
255            Some(Box::new(w)),
256            ClientId::default(),
257        ));
258        self
259    }
260
261    /// Add a single workload instance with a custom client ID strategy.
262    ///
263    /// Like [`workload()`](Self::workload), but the resolved client ID is
264    /// available via [`SimContext::client_id()`](super::context::SimContext::client_id).
265    pub fn workload_with_client_id(mut self, client_id: ClientId, w: impl Workload) -> Self {
266        self.entries
267            .push(WorkloadEntry::Instance(Some(Box::new(w)), client_id));
268        self
269    }
270
271    /// Add server processes to the simulation.
272    ///
273    /// Processes represent the **system under test** — they can be killed and
274    /// restarted (rebooted). A fresh instance is created from the factory on
275    /// every boot.
276    ///
277    /// The `count` parameter accepts either a fixed `usize` or a
278    /// `RangeInclusive<usize>` for seeded random count per iteration.
279    ///
280    /// Only one `.processes()` call is supported per builder. Subsequent calls
281    /// overwrite the previous one.
282    ///
283    /// # Examples
284    ///
285    /// ```ignore
286    /// // Fixed 3 server processes
287    /// builder.processes(3, || Box::new(MyNode::new()))
288    ///
289    /// // 3 to 7 processes, randomized per iteration
290    /// builder.processes(3..=7, || Box::new(MyNode::new()))
291    /// ```
292    pub fn processes(
293        mut self,
294        count: impl Into<ProcessCount>,
295        factory: impl Fn() -> Box<dyn Process> + 'static,
296    ) -> Self {
297        let sample = factory();
298        let name = sample.name().to_string();
299        drop(sample);
300        self.process_entry = Some(ProcessEntry {
301            count: count.into(),
302            factory: Box::new(factory),
303            tags: TagDistribution::new(),
304            name,
305        });
306        self
307    }
308
309    /// Attach tag distribution to the last `.processes()` call.
310    ///
311    /// Tags are distributed round-robin across process instances. Each tag
312    /// dimension is distributed independently.
313    ///
314    /// # Panics
315    ///
316    /// Panics if called without a preceding `.processes()` call.
317    ///
318    /// # Examples
319    ///
320    /// ```ignore
321    /// // 5 processes: dc cycles east/west/eu, rack cycles r1/r2
322    /// builder.processes(5, || Box::new(MyNode::new()))
323    ///     .tags(&[
324    ///         ("dc", &["east", "west", "eu"]),
325    ///         ("rack", &["r1", "r2"]),
326    ///     ])
327    /// ```
328    pub fn tags(mut self, dimensions: &[(&str, &[&str])]) -> Self {
329        let entry = self
330            .process_entry
331            .as_mut()
332            .expect("tags() must be called after processes()");
333        for (key, values) in dimensions {
334            entry.tags.add(key, values);
335        }
336        self
337    }
338
339    /// Set built-in attrition for automatic process reboots during chaos phase.
340    ///
341    /// Attrition randomly kills and restarts server processes. It respects
342    /// `max_dead` to limit the number of simultaneously dead processes.
343    ///
344    /// **Requires** [`.phases()`](Self::phases) — attrition injectors only run during
345    /// the chaos phase. Without a phase config, the injector will not be spawned.
346    ///
347    /// For custom fault injection, use `.fault()` with a [`FaultInjector`] instead.
348    pub fn attrition(mut self, config: Attrition) -> Self {
349        self.attrition = Some(config);
350        self
351    }
352
353    /// Add multiple workload instances from a factory.
354    ///
355    /// The factory receives an instance index (0-based) and must return a fresh
356    /// workload. Instances are created each iteration and dropped afterward.
357    /// Client IDs default to sequential starting from 0 (FDB-style).
358    ///
359    /// The workload is responsible for its own `name()` — use the index to
360    /// produce unique names when count > 1 (e.g., `format!("client-{i}")`).
361    ///
362    /// # Examples
363    ///
364    /// ```ignore
365    /// // 3 fixed replicas
366    /// builder.workloads(WorkloadCount::Fixed(3), |i| Box::new(ReplicaWorkload::new(i)))
367    ///
368    /// // 1–5 random clients
369    /// builder.workloads(WorkloadCount::Random(1..6), |i| Box::new(ClientWorkload::new(i)))
370    /// ```
371    pub fn workloads(
372        mut self,
373        count: WorkloadCount,
374        factory: impl Fn(usize) -> Box<dyn Workload> + 'static,
375    ) -> Self {
376        self.entries.push(WorkloadEntry::Factory {
377            count,
378            client_id: ClientId::default(),
379            factory: Box::new(factory),
380        });
381        self
382    }
383
384    /// Add multiple workload instances with a custom client ID strategy.
385    ///
386    /// Like [`workloads()`](Self::workloads), but client IDs are assigned
387    /// according to the given [`ClientId`] strategy instead of sequential.
388    ///
389    /// # Examples
390    ///
391    /// ```ignore
392    /// // 3 clients with random IDs in [100..200)
393    /// builder.workloads_with_client_id(
394    ///     WorkloadCount::Fixed(3),
395    ///     ClientId::RandomRange(100..200),
396    ///     |i| Box::new(ClientWorkload::new(i)),
397    /// )
398    /// ```
399    pub fn workloads_with_client_id(
400        mut self,
401        count: WorkloadCount,
402        client_id: ClientId,
403        factory: impl Fn(usize) -> Box<dyn Workload> + 'static,
404    ) -> Self {
405        self.entries.push(WorkloadEntry::Factory {
406            count,
407            client_id,
408            factory: Box::new(factory),
409        });
410        self
411    }
412
413    /// Add an invariant to be checked after every simulation event.
414    pub fn invariant(mut self, i: impl Invariant) -> Self {
415        self.invariants.push(Box::new(i));
416        self
417    }
418
419    /// Add a closure-based invariant.
420    pub fn invariant_fn(
421        mut self,
422        name: impl Into<String>,
423        f: impl Fn(&crate::chaos::StateHandle, u64) + 'static,
424    ) -> Self {
425        self.invariants.push(crate::chaos::invariant_fn(name, f));
426        self
427    }
428
429    /// Add a fault injector to run during the chaos phase.
430    pub fn fault(mut self, f: impl FaultInjector) -> Self {
431        self.fault_injectors.push(Box::new(f));
432        self
433    }
434
435    /// Set two-phase chaos/recovery configuration.
436    pub fn phases(mut self, config: PhaseConfig) -> Self {
437        self.phase_config = Some(config);
438        self
439    }
440
441    /// Set the number of iterations to run.
442    pub fn set_iterations(mut self, iterations: usize) -> Self {
443        self.iteration_control = IterationControl::FixedCount(iterations);
444        self
445    }
446
447    /// Set the iteration control strategy.
448    pub fn set_iteration_control(mut self, control: IterationControl) -> Self {
449        self.iteration_control = control;
450        self
451    }
452
453    /// Run for a specific wall-clock time duration.
454    pub fn set_time_limit(mut self, duration: Duration) -> Self {
455        self.iteration_control = IterationControl::TimeLimit(duration);
456        self
457    }
458
459    /// Run until exploration has converged: all `assert_sometimes!` assertions
460    /// have been reached and no new coverage was found on the last seed.
461    ///
462    /// Requires `.enable_exploration()` to be configured.
463    /// `max_iterations` is a safety cap to prevent infinite loops.
464    pub fn until_converged(mut self, max_iterations: usize) -> Self {
465        self.iteration_control = IterationControl::UntilConverged { max_iterations };
466        self
467    }
468
469    /// Register a callback invoked at the start of each simulation iteration.
470    ///
471    /// Use this to reset shared state (directories, membership, stores) that
472    /// lives outside the builder and is shared via `Rc` across iterations.
473    pub fn before_iteration(mut self, f: impl FnMut() + 'static) -> Self {
474        self.before_iteration_hooks.push(Box::new(f));
475        self
476    }
477
478    /// Set specific seeds for deterministic debugging and regression testing.
479    pub fn set_debug_seeds(mut self, seeds: Vec<u64>) -> Self {
480        self.seeds = seeds;
481        self
482    }
483
484    /// Enable randomized network configuration for chaos testing.
485    pub fn random_network(mut self) -> Self {
486        self.use_random_config = true;
487        self
488    }
489
490    /// Enable fork-based multiverse exploration.
491    ///
492    /// When enabled, the simulation will fork child processes at assertion
493    /// discovery points to explore alternate timelines with different seeds.
494    pub fn enable_exploration(mut self, config: moonpool_explorer::ExplorationConfig) -> Self {
495        self.exploration_config = Some(config);
496        self
497    }
498
499    /// Set a bug recipe for deterministic replay.
500    ///
501    /// The builder applies the recipe's RNG breakpoints after its own
502    /// initialization, ensuring they survive internal resets.
503    pub fn replay_recipe(mut self, recipe: super::report::BugRecipe) -> Self {
504        self.replay_recipe = Some(recipe);
505        self
506    }
507
508    /// Resolve all entries into a flat workload list for one iteration.
509    fn resolve_entries(&mut self) -> ResolvedEntries {
510        let mut workloads = Vec::new();
511        let mut return_map = Vec::new();
512        let mut client_info = Vec::new();
513
514        for (entry_idx, entry) in self.entries.iter_mut().enumerate() {
515            match entry {
516                WorkloadEntry::Instance(opt, cid) => {
517                    if let Some(w) = opt.take() {
518                        return_map.push(Some(entry_idx));
519                        client_info.push(WorkloadClientInfo {
520                            client_id: cid.resolve(0),
521                            client_count: 1,
522                        });
523                        workloads.push(w);
524                    }
525                }
526                WorkloadEntry::Factory {
527                    count,
528                    client_id,
529                    factory,
530                } => {
531                    let n = count.resolve();
532                    for i in 0..n {
533                        return_map.push(None);
534                        client_info.push(WorkloadClientInfo {
535                            client_id: client_id.resolve(i),
536                            client_count: n,
537                        });
538                        workloads.push(factory(i));
539                    }
540                }
541            }
542        }
543
544        ResolvedEntries {
545            workloads,
546            return_map,
547            client_info,
548        }
549    }
550
551    /// Return instance-based workloads to their entry slots after an iteration.
552    fn return_entries(
553        &mut self,
554        workloads: Vec<Box<dyn Workload>>,
555        return_map: Vec<Option<usize>>,
556    ) {
557        for (w, slot) in workloads.into_iter().zip(return_map) {
558            if let Some(entry_idx) = slot
559                && let WorkloadEntry::Instance(opt, _) = &mut self.entries[entry_idx]
560            {
561                *opt = Some(w);
562            }
563            // Factory-created workloads are dropped
564        }
565    }
566
567    #[instrument(skip_all)]
568    /// Run the simulation and generate a report.
569    ///
570    /// Creates a fresh tokio `LocalRuntime` per iteration for full isolation —
571    /// all tasks are killed when the runtime is dropped at iteration end.
572    pub fn run(mut self) -> SimulationReport {
573        if self.entries.is_empty() {
574            return SimulationReport {
575                iterations: 0,
576                successful_runs: 0,
577                failed_runs: 0,
578                metrics: SimulationMetrics::default(),
579                individual_metrics: Vec::new(),
580                seeds_used: Vec::new(),
581                seeds_failing: Vec::new(),
582                assertion_results: HashMap::new(),
583                assertion_violations: Vec::new(),
584                coverage_violations: Vec::new(),
585                exploration: None,
586                assertion_details: Vec::new(),
587                bucket_summaries: Vec::new(),
588                convergence_timeout: false,
589            };
590        }
591
592        // Initialize iteration state
593        let mut iteration_manager =
594            IterationManager::new(self.iteration_control.clone(), self.seeds.clone());
595        let mut metrics_collector = MetricsCollector::new();
596
597        // Accumulators for multi-seed exploration stats
598        let mut total_exploration_timelines: u64 = 0;
599        let mut total_exploration_fork_points: u64 = 0;
600        let mut total_exploration_bugs: u64 = 0;
601        let mut bug_recipes: Vec<super::report::BugRecipe> = Vec::new();
602        let mut per_seed_timelines: Vec<u64> = Vec::new();
603
604        // Convergence tracking (used only with UntilConverged)
605        let mut reached_sometimes: std::collections::HashSet<String> =
606            std::collections::HashSet::new();
607        let mut prev_coverage_bits: u32 = 0;
608        let mut converged = false;
609
610        // Initialize assertion table (unconditional — works even without exploration)
611        if let Err(e) = moonpool_explorer::init_assertions() {
612            tracing::error!("Failed to initialize assertion table: {}", e);
613        }
614
615        // Initialize exploration if configured
616        if let Some(ref config) = self.exploration_config {
617            moonpool_explorer::set_rng_hooks(crate::sim::get_rng_call_count, |seed| {
618                crate::sim::set_sim_seed(seed);
619                crate::sim::reset_rng_call_count();
620            });
621            if let Err(e) = moonpool_explorer::init(config.clone()) {
622                tracing::error!("Failed to initialize exploration: {}", e);
623            }
624        }
625
626        // Validate UntilConverged requires exploration
627        if matches!(
628            self.iteration_control,
629            IterationControl::UntilConverged { .. }
630        ) && self.exploration_config.is_none()
631        {
632            panic!(
633                "IterationControl::UntilConverged requires enable_exploration() to be configured"
634            );
635        }
636
637        while iteration_manager.should_continue() {
638            let seed = iteration_manager.next_iteration();
639            let iteration_count = iteration_manager.current_iteration();
640
641            // Preserve assertion data across iterations so the final report
642            // reflects all seeds, not just the last one.  For exploration runs,
643            // prepare_next_seed() also does a selective reset of coverage state.
644            if iteration_count > 1 {
645                if let Some(ref config) = self.exploration_config {
646                    moonpool_explorer::prepare_next_seed(config.global_energy);
647                }
648                crate::chaos::assertions::skip_next_assertion_reset();
649            }
650
651            // Run user-provided reset hooks
652            for hook in &mut self.before_iteration_hooks {
653                hook();
654            }
655
656            // Prepare clean state for this iteration
657            crate::sim::reset_sim_rng();
658            crate::sim::set_sim_seed(seed);
659            crate::chaos::reset_always_violations();
660
661            // Initialize buggify system for this iteration
662            // Use moderate probabilities: 50% activation rate, 25% firing rate
663            crate::chaos::buggify_init(0.5, 0.25);
664
665            // Snapshot coverage before this seed for convergence detection
666            if matches!(
667                self.iteration_control,
668                IterationControl::UntilConverged { .. }
669            ) {
670                prev_coverage_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
671            }
672
673            // Resolve workload entries into concrete instances for this iteration
674            // (WorkloadCount::Random and ClientId::RandomRange use the sim RNG, already seeded above)
675            let ResolvedEntries {
676                workloads,
677                return_map,
678                client_info,
679            } = self.resolve_entries();
680
681            // Compute workload name/IP pairs from resolved workloads
682            let workload_info: Vec<(String, String)> = workloads
683                .iter()
684                .enumerate()
685                .map(|(i, w)| (w.name().to_string(), format!("10.0.0.{}", i + 1)))
686                .collect();
687
688            // Resolve process configuration (if any)
689            let process_config = self.process_entry.as_ref().map(
690                |entry| -> super::orchestrator::ProcessConfig<'_> {
691                    let count = entry.count.resolve();
692                    let mut registry = crate::runner::tags::TagRegistry::new();
693                    let mut ips = Vec::with_capacity(count);
694                    let mut info = Vec::with_capacity(count);
695                    let base_name = &entry.name;
696                    for i in 0..count {
697                        let ip = format!("10.0.1.{}", i + 1);
698                        let ip_addr: std::net::IpAddr = ip.parse().expect("valid process IP");
699                        let tags = entry.tags.resolve(i);
700                        registry.register(ip_addr, tags);
701                        ips.push(ip.clone());
702                        let name = if count == 1 {
703                            base_name.clone()
704                        } else {
705                            format!("{}-{}", base_name, i)
706                        };
707                        info.push((name, ip));
708                    }
709                    super::orchestrator::ProcessConfig {
710                        factory: &*entry.factory,
711                        info,
712                        ips,
713                        tag_registry: registry,
714                    }
715                },
716            );
717
718            // Create fresh NetworkConfiguration for this iteration
719            let network_config = if self.use_random_config {
720                crate::NetworkConfiguration::random_for_seed()
721            } else {
722                crate::NetworkConfiguration::default()
723            };
724
725            // Create shared SimWorld for this iteration using fresh network config
726            let sim = crate::sim::SimWorld::new_with_network_config_and_seed(network_config, seed);
727
728            // Apply replay breakpoints after SimWorld creation (which resets RNG state)
729            if let Some(ref br) = self.replay_recipe {
730                crate::sim::set_rng_breakpoints(br.recipe.clone());
731            }
732
733            let start_time = Instant::now();
734
735            // Move fault injectors to orchestrator, get them back after
736            let mut fault_injectors = std::mem::take(&mut self.fault_injectors);
737
738            // Add built-in attrition injector if configured
739            if let Some(ref attrition) = self.attrition {
740                fault_injectors.push(Box::new(
741                    crate::runner::fault_injector::AttritionInjector::new(attrition.clone()),
742                ));
743            }
744
745            // Create a fresh tokio runtime per iteration for complete isolation.
746            // When this runtime is dropped, ALL tasks are killed — no orphan
747            // tasks leak between iterations.
748            let mut seed_bytes = [0u8; 32];
749            seed_bytes[..8].copy_from_slice(&seed.to_le_bytes());
750            let rng_seed = tokio::runtime::RngSeed::from_bytes(&seed_bytes);
751
752            let local_runtime = tokio::runtime::Builder::new_current_thread()
753                .enable_time()
754                .rng_seed(rng_seed)
755                .build_local(Default::default())
756                .expect("per-iteration runtime");
757
758            // Borrow self fields before the async block so we don't move self
759            let invariants_ref = &self.invariants;
760            let phase_ref = self.phase_config.as_ref();
761
762            // Execute workloads using orchestrator inside this iteration's runtime
763            let orchestration_result = local_runtime.block_on(async move {
764                WorkloadOrchestrator::orchestrate_workloads(
765                    workloads,
766                    fault_injectors,
767                    invariants_ref,
768                    &workload_info,
769                    &client_info,
770                    process_config,
771                    seed,
772                    sim,
773                    phase_ref,
774                    iteration_count,
775                )
776                .await
777            });
778
779            match orchestration_result {
780                Ok((returned_workloads, returned_injectors, all_results, sim_metrics)) => {
781                    // Return Instance workloads to their entry slots
782                    self.return_entries(returned_workloads, return_map);
783                    self.fault_injectors = returned_injectors;
784
785                    let wall_time = start_time.elapsed();
786                    let has_violations = crate::chaos::has_always_violations();
787
788                    metrics_collector.record_iteration(
789                        seed,
790                        wall_time,
791                        &all_results,
792                        has_violations,
793                        sim_metrics,
794                    );
795                }
796                Err((faulty_seeds_from_deadlock, failed_count)) => {
797                    // Handle deadlock case - merge with existing state and return early
798                    metrics_collector.add_faulty_seeds(faulty_seeds_from_deadlock);
799                    metrics_collector.add_failed_runs(failed_count);
800
801                    // Create early exit report
802                    let assertion_results = crate::chaos::get_assertion_results();
803                    let (assertion_violations, coverage_violations) =
804                        crate::chaos::validate_assertion_contracts();
805                    crate::chaos::buggify_reset();
806
807                    return metrics_collector.generate_report(
808                        iteration_count,
809                        iteration_manager.seeds_used().to_vec(),
810                        assertion_results,
811                        assertion_violations,
812                        coverage_violations,
813                        None,
814                        Vec::new(),
815                        Vec::new(),
816                        false,
817                    );
818                }
819            }
820
821            // Accumulate exploration stats across seeds (before reset)
822            if self.exploration_config.is_some() {
823                if let Some(stats) = moonpool_explorer::get_exploration_stats() {
824                    per_seed_timelines.push(stats.total_timelines);
825                    total_exploration_timelines += stats.total_timelines;
826                    total_exploration_fork_points += stats.fork_points;
827                    total_exploration_bugs += stats.bug_found;
828                } else {
829                    per_seed_timelines.push(0);
830                }
831                if let Some(recipe) = moonpool_explorer::get_bug_recipe() {
832                    bug_recipes.push(super::report::BugRecipe { seed, recipe });
833                }
834            }
835
836            // Accumulate which Sometimes/Reachable assertions have been reached
837            // (must read before next prepare_next_seed resets pass_count)
838            if matches!(
839                self.iteration_control,
840                IterationControl::UntilConverged { .. }
841            ) {
842                let slots = moonpool_explorer::assertion_read_all();
843                for slot in &slots {
844                    if let Some(kind) = moonpool_explorer::AssertKind::from_u8(slot.kind)
845                        && matches!(
846                            kind,
847                            moonpool_explorer::AssertKind::Sometimes
848                                | moonpool_explorer::AssertKind::Reachable
849                        )
850                    {
851                        if slot.pass_count > 0 {
852                            reached_sometimes.insert(slot.msg.clone());
853                        } else if !reached_sometimes.contains(&slot.msg) {
854                            tracing::warn!(
855                                "UNREACHED slot: kind={:?} msg={:?} pass={} fail={}",
856                                kind,
857                                slot.msg,
858                                slot.pass_count,
859                                slot.fail_count
860                            );
861                        }
862                    }
863                }
864
865                // Check convergence from seed 2 onward (need baseline for coverage delta)
866                if iteration_count >= 2 {
867                    // Count unique message strings (not raw slots) to handle
868                    // any residual duplicate slots from the fork allocation race.
869                    let all_sometimes_count = slots
870                        .iter()
871                        .filter(|s| {
872                            moonpool_explorer::AssertKind::from_u8(s.kind)
873                                .map(|k| {
874                                    matches!(
875                                        k,
876                                        moonpool_explorer::AssertKind::Sometimes
877                                            | moonpool_explorer::AssertKind::Reachable
878                                    )
879                                })
880                                .unwrap_or(false)
881                        })
882                        .map(|s| s.msg.clone())
883                        .collect::<std::collections::HashSet<_>>()
884                        .len();
885                    let all_reached =
886                        all_sometimes_count > 0 && reached_sometimes.len() >= all_sometimes_count;
887
888                    let current_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
889                    let no_new_coverage = current_bits == prev_coverage_bits;
890
891                    tracing::warn!(
892                        "convergence: seed={} reached={}/{} coverage={}->{} delta={}",
893                        iteration_count,
894                        reached_sometimes.len(),
895                        all_sometimes_count,
896                        prev_coverage_bits,
897                        current_bits,
898                        current_bits.saturating_sub(prev_coverage_bits),
899                    );
900
901                    if all_reached && no_new_coverage {
902                        tracing::info!(
903                            "Converged after {} seeds: all {} sometimes reached, no new coverage",
904                            iteration_count,
905                            all_sometimes_count
906                        );
907                        converged = true;
908                    }
909                }
910            }
911
912            // Reset buggify state after each iteration to ensure clean state
913            crate::chaos::buggify_reset();
914
915            if converged {
916                break;
917            }
918        }
919
920        // End of main iteration loop
921        //
922        // Data collection: read ALL shared memory BEFORE any cleanup.
923        // cleanup() calls cleanup_assertions() which frees the assertion
924        // table and each-bucket table.
925
926        // 1. Read exploration-specific data (freed by cleanup)
927        let exploration_report = if self.exploration_config.is_some() {
928            let final_stats = moonpool_explorer::get_exploration_stats();
929            // The per-iteration capture above should have caught all recipes.
930            // No fallback needed since we capture after every iteration.
931            let coverage_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
932
933            Some(super::report::ExplorationReport {
934                total_timelines: total_exploration_timelines,
935                fork_points: total_exploration_fork_points,
936                bugs_found: total_exploration_bugs,
937                bug_recipes,
938                energy_remaining: final_stats.as_ref().map(|s| s.global_energy).unwrap_or(0),
939                realloc_pool_remaining: final_stats
940                    .as_ref()
941                    .map(|s| s.realloc_pool_remaining)
942                    .unwrap_or(0),
943                coverage_bits,
944                coverage_total: (moonpool_explorer::coverage::COVERAGE_MAP_SIZE * 8) as u32,
945                sancov_edges_total: final_stats
946                    .as_ref()
947                    .map(|s| s.sancov_edges_total)
948                    .unwrap_or(0),
949                sancov_edges_covered: final_stats
950                    .as_ref()
951                    .map(|s| s.sancov_edges_covered)
952                    .unwrap_or(0),
953                converged,
954                per_seed_timelines,
955            })
956        } else {
957            None
958        };
959
960        // 2. Read assertion + bucket data (freed by cleanup/cleanup_assertions)
961        let assertion_results = crate::chaos::get_assertion_results();
962        let (assertion_violations, coverage_violations) =
963            crate::chaos::validate_assertion_contracts();
964        let raw_assertion_slots = moonpool_explorer::assertion_read_all();
965        let raw_each_buckets = moonpool_explorer::each_bucket_read_all();
966
967        // 3. Now safe to free all shared memory
968        if self.exploration_config.is_some() {
969            moonpool_explorer::cleanup();
970        } else {
971            moonpool_explorer::cleanup_assertions();
972        }
973
974        // 4. Build rich assertion details from raw slot snapshots
975        let assertion_details = build_assertion_details(&raw_assertion_slots);
976
977        // 5. Build bucket summaries by grouping EachBuckets by site
978        let bucket_summaries = build_bucket_summaries(&raw_each_buckets);
979
980        let iteration_count = iteration_manager.current_iteration();
981
982        // Detect convergence timeout: UntilConverged was used but we didn't converge
983        let convergence_timeout = matches!(
984            self.iteration_control,
985            IterationControl::UntilConverged { .. }
986        ) && !converged;
987
988        // Final buggify reset to ensure no impact on subsequent code
989        crate::chaos::buggify_reset();
990
991        metrics_collector.generate_report(
992            iteration_count,
993            iteration_manager.seeds_used().to_vec(),
994            assertion_results,
995            assertion_violations,
996            coverage_violations,
997            exploration_report,
998            assertion_details,
999            bucket_summaries,
1000            convergence_timeout,
1001        )
1002    }
1003}
1004
1005/// Build [`AssertionDetail`] vec from raw assertion slot snapshots.
1006fn build_assertion_details(
1007    slots: &[moonpool_explorer::AssertionSlotSnapshot],
1008) -> Vec<super::report::AssertionDetail> {
1009    use super::report::{AssertionDetail, AssertionStatus};
1010    use moonpool_explorer::AssertKind;
1011
1012    slots
1013        .iter()
1014        .filter_map(|slot| {
1015            let kind = AssertKind::from_u8(slot.kind)?;
1016            let total = slot.pass_count.saturating_add(slot.fail_count);
1017
1018            // Skip unvisited assertions
1019            if total == 0 && slot.frontier == 0 {
1020                return None;
1021            }
1022
1023            let status = match kind {
1024                AssertKind::Always
1025                | AssertKind::AlwaysOrUnreachable
1026                | AssertKind::NumericAlways => {
1027                    if slot.fail_count > 0 {
1028                        AssertionStatus::Fail
1029                    } else {
1030                        AssertionStatus::Pass
1031                    }
1032                }
1033                AssertKind::Sometimes | AssertKind::NumericSometimes => {
1034                    if slot.pass_count > 0 {
1035                        AssertionStatus::Pass
1036                    } else {
1037                        AssertionStatus::Miss
1038                    }
1039                }
1040                AssertKind::Reachable => {
1041                    if slot.pass_count > 0 {
1042                        AssertionStatus::Pass
1043                    } else {
1044                        AssertionStatus::Miss
1045                    }
1046                }
1047                AssertKind::Unreachable => {
1048                    if slot.pass_count > 0 {
1049                        AssertionStatus::Fail
1050                    } else {
1051                        AssertionStatus::Pass
1052                    }
1053                }
1054                AssertKind::BooleanSometimesAll => {
1055                    if slot.frontier > 0 {
1056                        AssertionStatus::Pass
1057                    } else {
1058                        AssertionStatus::Miss
1059                    }
1060                }
1061            };
1062
1063            Some(AssertionDetail {
1064                msg: slot.msg.clone(),
1065                kind,
1066                pass_count: slot.pass_count,
1067                fail_count: slot.fail_count,
1068                watermark: slot.watermark,
1069                frontier: slot.frontier,
1070                status,
1071            })
1072        })
1073        .collect()
1074}
1075
1076/// Build [`BucketSiteSummary`] vec by grouping [`EachBucket`]s by site message.
1077fn build_bucket_summaries(
1078    buckets: &[moonpool_explorer::EachBucket],
1079) -> Vec<super::report::BucketSiteSummary> {
1080    use super::report::BucketSiteSummary;
1081    use std::collections::HashMap;
1082
1083    let mut sites: HashMap<u32, BucketSiteSummary> = HashMap::new();
1084
1085    for bucket in buckets {
1086        let entry = sites
1087            .entry(bucket.site_hash)
1088            .or_insert_with(|| BucketSiteSummary {
1089                msg: bucket.msg_str().to_string(),
1090                buckets_discovered: 0,
1091                total_hits: 0,
1092            });
1093
1094        entry.buckets_discovered += 1;
1095        entry.total_hits += bucket.pass_count as u64;
1096    }
1097
1098    let mut summaries: Vec<_> = sites.into_values().collect();
1099    summaries.sort_by(|a, b| b.total_hits.cmp(&a.total_hits));
1100    summaries
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105    use super::*;
1106    use async_trait::async_trait;
1107    use moonpool_core::RandomProvider;
1108
1109    use crate::SimulationResult;
1110    use crate::runner::context::SimContext;
1111
1112    struct BasicWorkload;
1113
1114    #[async_trait(?Send)]
1115    impl Workload for BasicWorkload {
1116        fn name(&self) -> &str {
1117            "test_workload"
1118        }
1119
1120        async fn run(&mut self, _ctx: &SimContext) -> SimulationResult<()> {
1121            Ok(())
1122        }
1123    }
1124
1125    #[test]
1126    fn test_simulation_builder_basic() {
1127        let report = SimulationBuilder::new()
1128            .workload(BasicWorkload)
1129            .set_iterations(3)
1130            .set_debug_seeds(vec![1, 2, 3])
1131            .run();
1132
1133        assert_eq!(report.iterations, 3);
1134        assert_eq!(report.successful_runs, 3);
1135        assert_eq!(report.failed_runs, 0);
1136        assert_eq!(report.success_rate(), 100.0);
1137        assert_eq!(report.seeds_used, vec![1, 2, 3]);
1138    }
1139
1140    struct FailingWorkload;
1141
1142    #[async_trait(?Send)]
1143    impl Workload for FailingWorkload {
1144        fn name(&self) -> &str {
1145            "failing_workload"
1146        }
1147
1148        async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
1149            // Deterministic: fail if first random number is even
1150            let random_num: u32 = ctx.random().random_range(0..100);
1151            if random_num % 2 == 0 {
1152                return Err(crate::SimulationError::InvalidState(
1153                    "Test failure".to_string(),
1154                ));
1155            }
1156            Ok(())
1157        }
1158    }
1159
1160    #[test]
1161    fn test_simulation_builder_with_failures() {
1162        let report = SimulationBuilder::new()
1163            .workload(FailingWorkload)
1164            .set_iterations(10)
1165            .run();
1166
1167        assert_eq!(report.iterations, 10);
1168        assert_eq!(
1169            report.successful_runs + report.failed_runs,
1170            10,
1171            "all iterations should be accounted for"
1172        );
1173        assert!(
1174            report.failed_runs > 0,
1175            "expected at least one failure across 10 seeds"
1176        );
1177        assert!(
1178            report.successful_runs > 0,
1179            "expected at least one success across 10 seeds"
1180        );
1181    }
1182}