Skip to main content

moonpool_sim/runner/
builder.rs

1//! Simulation builder pattern for configuring and running experiments.
2//!
3//! This module provides the main SimulationBuilder type for setting up
4//! and executing simulation experiments.
5
6use std::collections::HashMap;
7use std::ops::{Range, RangeInclusive};
8use std::time::{Duration, Instant};
9use tracing::instrument;
10
11use crate::SimulationError;
12use crate::chaos::invariant_trait::Invariant;
13use crate::runner::fault_injector::FaultInjector;
14use crate::runner::process::{Attrition, Process};
15use crate::runner::tags::TagDistribution;
16use crate::runner::workload::Workload;
17
18use super::orchestrator::{IterationManager, MetricsCollector, WorkloadOrchestrator};
19
20/// Client identity information for a single workload instance.
21#[derive(Debug, Clone, Copy)]
22pub(crate) struct WorkloadClientInfo {
23    /// The resolved client ID for this instance.
24    pub(crate) client_id: usize,
25    /// Total number of workload instances sharing this builder entry.
26    pub(crate) client_count: usize,
27}
28
29/// Resolved workload entries for a single iteration.
30struct ResolvedEntries {
31    workloads: Vec<Box<dyn Workload>>,
32    /// `return_map[i] = Some(entry_idx)` means `workloads[i]` should be
33    /// returned to `entries[entry_idx]` after the iteration.
34    return_map: Vec<Option<usize>>,
35    /// Client identity info parallel to `workloads`.
36    client_info: Vec<WorkloadClientInfo>,
37}
38use super::report::{SimulationMetrics, SimulationReport};
39
40/// Configuration for how many iterations a simulation should run.
41///
42/// Provides flexible control over simulation execution duration and completion criteria.
43#[derive(Debug, Clone)]
44pub enum IterationControl {
45    /// Run a fixed number of iterations with specific seeds
46    FixedCount(usize),
47    /// Run for a specific duration of wall-clock time
48    TimeLimit(Duration),
49    /// Stop when all `assert_sometimes!` have been reached AND a new seed
50    /// produces no new code coverage. Requires exploration to be enabled.
51    /// The `max_iterations` field is a safety cap.
52    UntilConverged {
53        /// Maximum number of seeds before stopping regardless.
54        max_iterations: usize,
55    },
56}
57
58/// How many instances of a workload to spawn per iteration.
59///
60/// Use `Fixed` for deterministic topologies or `Random` for chaos testing
61/// with varying cluster sizes.
62///
63/// # Examples
64///
65/// ```ignore
66/// // Always 3 replicas
67/// WorkloadCount::Fixed(3)
68///
69/// // 1 to 5 replicas, randomized per iteration
70/// WorkloadCount::Random(1..6)
71/// ```
72#[derive(Debug, Clone)]
73pub enum WorkloadCount {
74    /// Spawn exactly N instances every iteration.
75    Fixed(usize),
76    /// Spawn a random number of instances in `[start..end)` per iteration,
77    /// using the simulation RNG (deterministic per seed).
78    Random(Range<usize>),
79}
80
81impl WorkloadCount {
82    /// Resolve the count for the current iteration.
83    /// For `Random`, uses the sim RNG which must already be seeded.
84    fn resolve(&self) -> usize {
85        match self {
86            WorkloadCount::Fixed(n) => *n,
87            WorkloadCount::Random(range) => crate::sim::sim_random_range(range.clone()),
88        }
89    }
90}
91
92/// Strategy for assigning client IDs to workload instances.
93///
94/// Inspired by FoundationDB's `WorkloadContext.clientId`, but more
95/// programmable. The resolved client ID is available via
96/// [`SimContext::client_id()`](super::context::SimContext::client_id).
97///
98/// # Examples
99///
100/// ```ignore
101/// // FDB-style sequential: IDs 0, 1, 2
102/// ClientId::Fixed(0)
103///
104/// // Sequential starting from 10: IDs 10, 11, 12
105/// ClientId::Fixed(10)
106///
107/// // Random IDs in [100..200) per instance
108/// ClientId::RandomRange(100..200)
109/// ```
110#[derive(Debug, Clone, PartialEq)]
111pub enum ClientId {
112    /// Sequential IDs starting from `base`: instance 0 gets `base`,
113    /// instance 1 gets `base + 1`, and so on.
114    Fixed(usize),
115    /// Random ID drawn from `[start..end)` per instance,
116    /// using the simulation RNG (deterministic per seed).
117    /// IDs are not guaranteed unique across instances.
118    RandomRange(Range<usize>),
119}
120
121impl Default for ClientId {
122    fn default() -> Self {
123        Self::Fixed(0)
124    }
125}
126
127impl ClientId {
128    /// Resolve a client ID for the given instance index.
129    fn resolve(&self, index: usize) -> usize {
130        match self {
131            ClientId::Fixed(base) => base + index,
132            ClientId::RandomRange(range) => crate::sim::sim_random_range(range.clone()),
133        }
134    }
135}
136
137/// How many process instances to spawn per iteration.
138///
139/// Use `Fixed` for deterministic topologies or `Range` for chaos testing
140/// with varying cluster sizes.
141///
142/// # Examples
143///
144/// ```ignore
145/// // Always 3 server processes
146/// ProcessCount::Fixed(3)
147///
148/// // 3 to 7 server processes, randomized per iteration
149/// ProcessCount::Range(3..=7)
150/// ```
151#[derive(Debug, Clone, PartialEq)]
152pub enum ProcessCount {
153    /// Spawn exactly N process instances every iteration.
154    Fixed(usize),
155    /// Spawn a random number in `[start..=end]` per iteration,
156    /// using the simulation RNG (deterministic per seed).
157    Range(RangeInclusive<usize>),
158}
159
160impl ProcessCount {
161    /// Resolve the count for the current iteration.
162    fn resolve(&self) -> usize {
163        match self {
164            ProcessCount::Fixed(n) => *n,
165            ProcessCount::Range(range) => {
166                let start = *range.start();
167                let end = *range.end() + 1; // RangeInclusive -> exclusive for sim_random_range
168                if start >= end {
169                    return start;
170                }
171                crate::sim::sim_random_range(start..end)
172            }
173        }
174    }
175}
176
177impl From<usize> for ProcessCount {
178    fn from(n: usize) -> Self {
179        ProcessCount::Fixed(n)
180    }
181}
182
183impl From<RangeInclusive<usize>> for ProcessCount {
184    fn from(range: RangeInclusive<usize>) -> Self {
185        ProcessCount::Range(range)
186    }
187}
188
189/// Internal storage for a process entry in the builder.
190pub(crate) struct ProcessEntry {
191    pub(crate) count: ProcessCount,
192    pub(crate) factory: Box<dyn Fn() -> Box<dyn Process>>,
193    pub(crate) tags: TagDistribution,
194    pub(crate) name: String,
195}
196
197/// Internal storage for workload entries in the builder.
198enum WorkloadEntry {
199    /// Single instance, reused across iterations (from `.workload()`).
200    Instance(Option<Box<dyn Workload>>, ClientId),
201    /// Factory-based, fresh instances per iteration (from `.workloads()`).
202    Factory {
203        count: WorkloadCount,
204        client_id: ClientId,
205        factory: Box<dyn Fn(usize) -> Box<dyn Workload>>,
206    },
207}
208
209/// Builder pattern for configuring and running simulation experiments.
210pub struct SimulationBuilder {
211    iteration_control: IterationControl,
212    entries: Vec<WorkloadEntry>,
213    process_entry: Option<ProcessEntry>,
214    attrition: Option<Attrition>,
215    seeds: Vec<u64>,
216    use_random_config: bool,
217    invariants: Vec<Box<dyn Invariant>>,
218    fault_injectors: Vec<Box<dyn FaultInjector>>,
219    chaos_duration: Option<Duration>,
220    exploration_config: Option<moonpool_explorer::ExplorationConfig>,
221    replay_recipe: Option<super::report::BugRecipe>,
222    before_iteration_hooks: Vec<Box<dyn FnMut()>>,
223    seed_warning_timeout: Option<Duration>,
224}
225
226impl Default for SimulationBuilder {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232impl SimulationBuilder {
233    /// Create a new empty simulation builder.
234    pub fn new() -> Self {
235        Self {
236            iteration_control: IterationControl::FixedCount(1),
237            entries: Vec::new(),
238            process_entry: None,
239            attrition: None,
240            seeds: Vec::new(),
241            use_random_config: false,
242            invariants: Vec::new(),
243            fault_injectors: Vec::new(),
244            chaos_duration: None,
245            exploration_config: None,
246            replay_recipe: None,
247            before_iteration_hooks: Vec::new(),
248            seed_warning_timeout: None,
249        }
250    }
251
252    /// Add a single workload instance to the simulation.
253    ///
254    /// The instance is reused across iterations (the `run()` method is called
255    /// each iteration on the same struct). Gets `client_id = 0`, `client_count = 1`.
256    pub fn workload(mut self, w: impl Workload) -> Self {
257        self.entries.push(WorkloadEntry::Instance(
258            Some(Box::new(w)),
259            ClientId::default(),
260        ));
261        self
262    }
263
264    /// Add a single workload instance with a custom client ID strategy.
265    ///
266    /// Like [`workload()`](Self::workload), but the resolved client ID is
267    /// available via [`SimContext::client_id()`](super::context::SimContext::client_id).
268    pub fn workload_with_client_id(mut self, client_id: ClientId, w: impl Workload) -> Self {
269        self.entries
270            .push(WorkloadEntry::Instance(Some(Box::new(w)), client_id));
271        self
272    }
273
274    /// Add server processes to the simulation.
275    ///
276    /// Processes represent the **system under test** — they can be killed and
277    /// restarted (rebooted). A fresh instance is created from the factory on
278    /// every boot.
279    ///
280    /// The `count` parameter accepts either a fixed `usize` or a
281    /// `RangeInclusive<usize>` for seeded random count per iteration.
282    ///
283    /// Only one `.processes()` call is supported per builder. Subsequent calls
284    /// overwrite the previous one.
285    ///
286    /// # Examples
287    ///
288    /// ```ignore
289    /// // Fixed 3 server processes
290    /// builder.processes(3, || Box::new(MyNode::new()))
291    ///
292    /// // 3 to 7 processes, randomized per iteration
293    /// builder.processes(3..=7, || Box::new(MyNode::new()))
294    /// ```
295    pub fn processes(
296        mut self,
297        count: impl Into<ProcessCount>,
298        factory: impl Fn() -> Box<dyn Process> + 'static,
299    ) -> Self {
300        let sample = factory();
301        let name = sample.name().to_string();
302        drop(sample);
303        self.process_entry = Some(ProcessEntry {
304            count: count.into(),
305            factory: Box::new(factory),
306            tags: TagDistribution::new(),
307            name,
308        });
309        self
310    }
311
312    /// Attach tag distribution to the last `.processes()` call.
313    ///
314    /// Tags are distributed round-robin across process instances. Each tag
315    /// dimension is distributed independently.
316    ///
317    /// # Errors
318    ///
319    /// Returns `SimulationError::InvalidState` if called without a preceding
320    /// `.processes()` call.
321    ///
322    /// # Examples
323    ///
324    /// ```ignore
325    /// // 5 processes: dc cycles east/west/eu, rack cycles r1/r2
326    /// builder.processes(5, || Box::new(MyNode::new()))
327    ///     .tags(&[
328    ///         ("dc", &["east", "west", "eu"]),
329    ///         ("rack", &["r1", "r2"]),
330    ///     ])?
331    /// ```
332    pub fn tags(mut self, dimensions: &[(&str, &[&str])]) -> Result<Self, SimulationError> {
333        let entry = self.process_entry.as_mut().ok_or_else(|| {
334            SimulationError::InvalidState("tags() must be called after processes()".into())
335        })?;
336        for (key, values) in dimensions {
337            entry.tags.add(key, values);
338        }
339        Ok(self)
340    }
341
342    /// Set built-in attrition for automatic process reboots during chaos phase.
343    ///
344    /// Attrition randomly kills and restarts server processes. It respects
345    /// `max_dead` to limit the number of simultaneously dead processes.
346    ///
347    /// **Requires** [`.chaos_duration()`](Self::chaos_duration) — attrition injectors
348    /// only run during the chaos phase. Without a chaos duration, the injector
349    /// will not be spawned.
350    ///
351    /// For custom fault injection, use `.fault()` with a [`FaultInjector`] instead.
352    pub fn attrition(mut self, config: Attrition) -> Self {
353        self.attrition = Some(config);
354        self
355    }
356
357    /// Add multiple workload instances from a factory.
358    ///
359    /// The factory receives an instance index (0-based) and must return a fresh
360    /// workload. Instances are created each iteration and dropped afterward.
361    /// Client IDs default to sequential starting from 0 (FDB-style).
362    ///
363    /// The workload is responsible for its own `name()` — use the index to
364    /// produce unique names when count > 1 (e.g., `format!("client-{i}")`).
365    ///
366    /// # Examples
367    ///
368    /// ```ignore
369    /// // 3 fixed replicas
370    /// builder.workloads(WorkloadCount::Fixed(3), |i| Box::new(ReplicaWorkload::new(i)))
371    ///
372    /// // 1–5 random clients
373    /// builder.workloads(WorkloadCount::Random(1..6), |i| Box::new(ClientWorkload::new(i)))
374    /// ```
375    pub fn workloads(
376        mut self,
377        count: WorkloadCount,
378        factory: impl Fn(usize) -> Box<dyn Workload> + 'static,
379    ) -> Self {
380        self.entries.push(WorkloadEntry::Factory {
381            count,
382            client_id: ClientId::default(),
383            factory: Box::new(factory),
384        });
385        self
386    }
387
388    /// Add multiple workload instances with a custom client ID strategy.
389    ///
390    /// Like [`workloads()`](Self::workloads), but client IDs are assigned
391    /// according to the given [`ClientId`] strategy instead of sequential.
392    ///
393    /// # Examples
394    ///
395    /// ```ignore
396    /// // 3 clients with random IDs in [100..200)
397    /// builder.workloads_with_client_id(
398    ///     WorkloadCount::Fixed(3),
399    ///     ClientId::RandomRange(100..200),
400    ///     |i| Box::new(ClientWorkload::new(i)),
401    /// )
402    /// ```
403    pub fn workloads_with_client_id(
404        mut self,
405        count: WorkloadCount,
406        client_id: ClientId,
407        factory: impl Fn(usize) -> Box<dyn Workload> + 'static,
408    ) -> Self {
409        self.entries.push(WorkloadEntry::Factory {
410            count,
411            client_id,
412            factory: Box::new(factory),
413        });
414        self
415    }
416
417    /// Add an invariant to be checked after every simulation event.
418    pub fn invariant(mut self, i: impl Invariant) -> Self {
419        self.invariants.push(Box::new(i));
420        self
421    }
422
423    /// Add a closure-based invariant.
424    pub fn invariant_fn(
425        mut self,
426        name: impl Into<String>,
427        f: impl Fn(&crate::chaos::StateHandle, u64) + 'static,
428    ) -> Self {
429        self.invariants.push(crate::chaos::invariant_fn(name, f));
430        self
431    }
432
433    /// Add a fault injector to run during the chaos phase.
434    pub fn fault(mut self, f: impl FaultInjector) -> Self {
435        self.fault_injectors.push(Box::new(f));
436        self
437    }
438
439    /// Set the chaos phase duration.
440    ///
441    /// When set, fault injectors run concurrently with workloads for this
442    /// duration. After it elapses, faults stop and the system continues
443    /// until all workloads complete. A settle phase then drains remaining
444    /// events before checks run.
445    pub fn chaos_duration(mut self, duration: Duration) -> Self {
446        self.chaos_duration = Some(duration);
447        self
448    }
449
450    /// Set the number of iterations to run.
451    pub fn set_iterations(mut self, iterations: usize) -> Self {
452        self.iteration_control = IterationControl::FixedCount(iterations);
453        self
454    }
455
456    /// Set the wall-clock time threshold for warning about slow seeds.
457    ///
458    /// When a seed takes longer than this duration, a `tracing::warn!` is emitted.
459    /// If not set, no slow-seed warnings are produced.
460    pub fn seed_warning_timeout(mut self, timeout: Duration) -> Self {
461        self.seed_warning_timeout = Some(timeout);
462        self
463    }
464
465    /// Set the iteration control strategy.
466    pub fn set_iteration_control(mut self, control: IterationControl) -> Self {
467        self.iteration_control = control;
468        self
469    }
470
471    /// Run for a specific wall-clock time duration.
472    pub fn set_time_limit(mut self, duration: Duration) -> Self {
473        self.iteration_control = IterationControl::TimeLimit(duration);
474        self
475    }
476
477    /// Run until exploration has converged: all `assert_sometimes!` assertions
478    /// have been reached and no new coverage was found on the last seed.
479    ///
480    /// Requires `.enable_exploration()` to be configured.
481    /// `max_iterations` is a safety cap to prevent infinite loops.
482    pub fn until_converged(mut self, max_iterations: usize) -> Self {
483        self.iteration_control = IterationControl::UntilConverged { max_iterations };
484        self
485    }
486
487    /// Register a callback invoked at the start of each simulation iteration.
488    ///
489    /// Use this to reset shared state (directories, membership, stores) that
490    /// lives outside the builder and is shared via `Rc` across iterations.
491    pub fn before_iteration(mut self, f: impl FnMut() + 'static) -> Self {
492        self.before_iteration_hooks.push(Box::new(f));
493        self
494    }
495
496    /// Set specific seeds for deterministic debugging and regression testing.
497    pub fn set_debug_seeds(mut self, seeds: Vec<u64>) -> Self {
498        self.seeds = seeds;
499        self
500    }
501
502    /// Enable randomized network configuration for chaos testing.
503    pub fn random_network(mut self) -> Self {
504        self.use_random_config = true;
505        self
506    }
507
508    /// Enable fork-based multiverse exploration.
509    ///
510    /// When enabled, the simulation will fork child processes at assertion
511    /// discovery points to explore alternate timelines with different seeds.
512    pub fn enable_exploration(mut self, config: moonpool_explorer::ExplorationConfig) -> Self {
513        self.exploration_config = Some(config);
514        self
515    }
516
517    /// Set a bug recipe for deterministic replay.
518    ///
519    /// The builder applies the recipe's RNG breakpoints after its own
520    /// initialization, ensuring they survive internal resets.
521    pub fn replay_recipe(mut self, recipe: super::report::BugRecipe) -> Self {
522        self.replay_recipe = Some(recipe);
523        self
524    }
525
526    /// Resolve all entries into a flat workload list for one iteration.
527    fn resolve_entries(&mut self) -> ResolvedEntries {
528        let mut workloads = Vec::new();
529        let mut return_map = Vec::new();
530        let mut client_info = Vec::new();
531
532        for (entry_idx, entry) in self.entries.iter_mut().enumerate() {
533            match entry {
534                WorkloadEntry::Instance(opt, cid) => {
535                    if let Some(w) = opt.take() {
536                        return_map.push(Some(entry_idx));
537                        client_info.push(WorkloadClientInfo {
538                            client_id: cid.resolve(0),
539                            client_count: 1,
540                        });
541                        workloads.push(w);
542                    }
543                }
544                WorkloadEntry::Factory {
545                    count,
546                    client_id,
547                    factory,
548                } => {
549                    let n = count.resolve();
550                    for i in 0..n {
551                        return_map.push(None);
552                        client_info.push(WorkloadClientInfo {
553                            client_id: client_id.resolve(i),
554                            client_count: n,
555                        });
556                        workloads.push(factory(i));
557                    }
558                }
559            }
560        }
561
562        ResolvedEntries {
563            workloads,
564            return_map,
565            client_info,
566        }
567    }
568
569    /// Return instance-based workloads to their entry slots after an iteration.
570    fn return_entries(
571        &mut self,
572        workloads: Vec<Box<dyn Workload>>,
573        return_map: Vec<Option<usize>>,
574    ) {
575        for (w, slot) in workloads.into_iter().zip(return_map) {
576            if let Some(entry_idx) = slot
577                && let WorkloadEntry::Instance(opt, _) = &mut self.entries[entry_idx]
578            {
579                *opt = Some(w);
580            }
581            // Factory-created workloads are dropped
582        }
583    }
584
585    #[instrument(skip_all)]
586    /// Run the simulation and generate a report.
587    ///
588    /// Creates a fresh tokio `LocalRuntime` per iteration for full isolation —
589    /// all tasks are killed when the runtime is dropped at iteration end.
590    pub fn run(mut self) -> SimulationReport {
591        if self.entries.is_empty() {
592            return SimulationReport {
593                iterations: 0,
594                successful_runs: 0,
595                failed_runs: 0,
596                metrics: SimulationMetrics::default(),
597                individual_metrics: Vec::new(),
598                seeds_used: Vec::new(),
599                seeds_failing: Vec::new(),
600                assertion_results: HashMap::new(),
601                assertion_violations: Vec::new(),
602                coverage_violations: Vec::new(),
603                exploration: None,
604                assertion_details: Vec::new(),
605                bucket_summaries: Vec::new(),
606                convergence_timeout: false,
607            };
608        }
609
610        // Initialize iteration state
611        let mut iteration_manager =
612            IterationManager::new(self.iteration_control.clone(), self.seeds.clone());
613        let mut metrics_collector = MetricsCollector::new();
614
615        // Progress reporting: compute milestone interval (every ~10%)
616        let progress_milestone = iteration_manager
617            .max_iterations()
618            .map(|max| std::cmp::max(max / 10, 1));
619
620        // Accumulators for multi-seed exploration stats
621        let mut total_exploration_timelines: u64 = 0;
622        let mut total_exploration_fork_points: u64 = 0;
623        let mut total_exploration_bugs: u64 = 0;
624        let mut bug_recipes: Vec<super::report::BugRecipe> = Vec::new();
625        let mut per_seed_timelines: Vec<u64> = Vec::new();
626
627        // Convergence tracking (used only with UntilConverged)
628        let mut reached_sometimes: std::collections::HashSet<String> =
629            std::collections::HashSet::new();
630        let mut prev_coverage_bits: u32 = 0;
631        let mut converged = false;
632
633        // Initialize assertion table (unconditional — works even without exploration)
634        if let Err(e) = moonpool_explorer::init_assertions() {
635            tracing::error!("Failed to initialize assertion table: {}", e);
636        }
637
638        // Initialize exploration if configured
639        if let Some(ref config) = self.exploration_config {
640            moonpool_explorer::set_rng_hooks(crate::sim::rng_call_count, |seed| {
641                crate::sim::set_sim_seed(seed);
642                crate::sim::reset_rng_call_count();
643            });
644            if let Err(e) = moonpool_explorer::init(config.clone()) {
645                tracing::error!("Failed to initialize exploration: {}", e);
646            }
647        }
648
649        // Validate UntilConverged requires exploration
650        if matches!(
651            self.iteration_control,
652            IterationControl::UntilConverged { .. }
653        ) && self.exploration_config.is_none()
654        {
655            panic!(
656                "IterationControl::UntilConverged requires enable_exploration() to be configured"
657            );
658        }
659
660        while iteration_manager.should_continue() {
661            let seed = iteration_manager.next_iteration();
662            let iteration_count = iteration_manager.current_iteration();
663
664            // Preserve assertion data across iterations so the final report
665            // reflects all seeds, not just the last one.  For exploration runs,
666            // prepare_next_seed() also does a selective reset of coverage state.
667            if iteration_count > 1 {
668                if let Some(ref config) = self.exploration_config {
669                    moonpool_explorer::prepare_next_seed(config.global_energy);
670                }
671                crate::chaos::assertions::skip_next_assertion_reset();
672            }
673
674            // Run user-provided reset hooks
675            for hook in &mut self.before_iteration_hooks {
676                hook();
677            }
678
679            // Reset invariants for new seed (StateHandle is recreated each iteration,
680            // so invariant cursors and tracking state must be cleared)
681            for invariant in &mut self.invariants {
682                invariant.reset();
683            }
684
685            // Prepare clean state for this iteration
686            crate::sim::reset_sim_rng();
687            crate::sim::set_sim_seed(seed);
688            crate::chaos::reset_always_violations();
689
690            // Initialize buggify system for this iteration
691            // Use moderate probabilities: 50% activation rate, 25% firing rate
692            crate::chaos::buggify_init(0.5, 0.25);
693
694            // Snapshot coverage before this seed for convergence detection
695            if matches!(
696                self.iteration_control,
697                IterationControl::UntilConverged { .. }
698            ) {
699                prev_coverage_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
700            }
701
702            // Resolve workload entries into concrete instances for this iteration
703            // (WorkloadCount::Random and ClientId::RandomRange use the sim RNG, already seeded above)
704            let ResolvedEntries {
705                workloads,
706                return_map,
707                client_info,
708            } = self.resolve_entries();
709
710            // Compute workload name/IP pairs from resolved workloads
711            let workload_info: Vec<(String, String)> = workloads
712                .iter()
713                .enumerate()
714                .map(|(i, w)| (w.name().to_string(), format!("10.0.0.{}", i + 1)))
715                .collect();
716
717            // Resolve process configuration (if any)
718            let process_config = self.process_entry.as_ref().map(
719                |entry| -> super::orchestrator::ProcessConfig<'_> {
720                    let count = entry.count.resolve();
721                    let mut registry = crate::runner::tags::TagRegistry::new();
722                    let mut ips = Vec::with_capacity(count);
723                    let mut info = Vec::with_capacity(count);
724                    let base_name = &entry.name;
725                    for i in 0..count {
726                        let ip = format!("10.0.1.{}", i + 1);
727                        let ip_addr: std::net::IpAddr = ip.parse().expect("valid process IP");
728                        let tags = entry.tags.resolve(i);
729                        registry.register(ip_addr, tags);
730                        ips.push(ip.clone());
731                        let name = if count == 1 {
732                            base_name.clone()
733                        } else {
734                            format!("{}-{}", base_name, i)
735                        };
736                        info.push((name, ip));
737                    }
738                    super::orchestrator::ProcessConfig {
739                        factory: &*entry.factory,
740                        info,
741                        ips,
742                        tag_registry: registry,
743                    }
744                },
745            );
746
747            // Create fresh NetworkConfiguration for this iteration
748            let network_config = if self.use_random_config {
749                crate::NetworkConfiguration::random_for_seed()
750            } else {
751                crate::NetworkConfiguration::default()
752            };
753
754            // Create shared SimWorld for this iteration using fresh network config
755            let sim = crate::sim::SimWorld::new_with_network_config_and_seed(network_config, seed);
756
757            // Apply replay breakpoints after SimWorld creation (which resets RNG state)
758            if let Some(ref br) = self.replay_recipe {
759                crate::sim::set_rng_breakpoints(br.recipe.clone());
760            }
761
762            let start_time = Instant::now();
763
764            // Move fault injectors to orchestrator, get them back after
765            let mut fault_injectors = std::mem::take(&mut self.fault_injectors);
766
767            // Add built-in attrition injector if configured
768            if let Some(ref attrition) = self.attrition {
769                fault_injectors.push(Box::new(
770                    crate::runner::fault_injector::AttritionInjector::new(attrition.clone()),
771                ));
772            }
773
774            // Create a fresh tokio runtime per iteration for complete isolation.
775            // When this runtime is dropped, ALL tasks are killed — no orphan
776            // tasks leak between iterations.
777            let mut seed_bytes = [0u8; 32];
778            seed_bytes[..8].copy_from_slice(&seed.to_le_bytes());
779            let rng_seed = tokio::runtime::RngSeed::from_bytes(&seed_bytes);
780
781            let local_runtime = tokio::runtime::Builder::new_current_thread()
782                .enable_time()
783                .rng_seed(rng_seed)
784                .build_local(Default::default())
785                .expect("per-iteration runtime");
786
787            // Borrow self fields before the async block so we don't move self
788            let invariants_ref = &self.invariants;
789            let chaos_duration = self.chaos_duration;
790
791            // Execute workloads using orchestrator inside this iteration's runtime
792            let orchestration_result = local_runtime.block_on(async move {
793                WorkloadOrchestrator::orchestrate_workloads(
794                    workloads,
795                    fault_injectors,
796                    invariants_ref,
797                    &workload_info,
798                    &client_info,
799                    process_config,
800                    seed,
801                    sim,
802                    chaos_duration,
803                    iteration_count,
804                )
805                .await
806            });
807
808            match orchestration_result {
809                Ok((returned_workloads, returned_injectors, all_results, sim_metrics)) => {
810                    // Return Instance workloads to their entry slots
811                    self.return_entries(returned_workloads, return_map);
812                    self.fault_injectors = returned_injectors;
813
814                    let wall_time = start_time.elapsed();
815                    let has_violations = crate::chaos::has_always_violations();
816
817                    metrics_collector.record_iteration(
818                        seed,
819                        wall_time,
820                        &all_results,
821                        has_violations,
822                        sim_metrics,
823                    );
824
825                    // Progress: warn on slow seeds
826                    if let Some(threshold) = self.seed_warning_timeout
827                        && wall_time > threshold
828                    {
829                        tracing::warn!(
830                            seed,
831                            wall_time_ms = wall_time.as_millis() as u64,
832                            threshold_ms = threshold.as_millis() as u64,
833                            "seed took {:.2}s (threshold: {}s)",
834                            wall_time.as_secs_f64(),
835                            threshold.as_secs(),
836                        );
837                    }
838
839                    // Progress: milestone reporting
840                    if let Some(interval) = progress_milestone
841                        && iteration_count.is_multiple_of(interval)
842                    {
843                        let max = iteration_manager
844                            .max_iterations()
845                            .unwrap_or(iteration_count);
846                        let pct = (iteration_count as f64 / max as f64) * 100.0;
847                        tracing::info!(
848                            iteration = iteration_count,
849                            total = max,
850                            "[{}/{}] {:.0}% complete",
851                            iteration_count,
852                            max,
853                            pct,
854                        );
855                    }
856                }
857                Err((faulty_seeds_from_deadlock, failed_count)) => {
858                    // Handle deadlock case - merge with existing state and return early
859                    metrics_collector.add_faulty_seeds(faulty_seeds_from_deadlock);
860                    metrics_collector.add_failed_runs(failed_count);
861
862                    // Create early exit report
863                    let assertion_results = crate::chaos::assertion_results();
864                    let (assertion_violations, coverage_violations) =
865                        crate::chaos::validate_assertion_contracts();
866                    crate::chaos::buggify_reset();
867
868                    return metrics_collector.generate_report(
869                        iteration_count,
870                        iteration_manager.seeds_used().to_vec(),
871                        assertion_results,
872                        assertion_violations,
873                        coverage_violations,
874                        None,
875                        Vec::new(),
876                        Vec::new(),
877                        false,
878                    );
879                }
880            }
881
882            // Accumulate exploration stats across seeds (before reset)
883            if self.exploration_config.is_some() {
884                if let Some(stats) = moonpool_explorer::exploration_stats() {
885                    per_seed_timelines.push(stats.total_timelines);
886                    total_exploration_timelines += stats.total_timelines;
887                    total_exploration_fork_points += stats.fork_points;
888                    total_exploration_bugs += stats.bug_found;
889                } else {
890                    per_seed_timelines.push(0);
891                }
892                if let Some(recipe) = moonpool_explorer::bug_recipe() {
893                    bug_recipes.push(super::report::BugRecipe { seed, recipe });
894                }
895            }
896
897            // Accumulate which Sometimes/Reachable assertions have been reached
898            // (must read before next prepare_next_seed resets pass_count)
899            if matches!(
900                self.iteration_control,
901                IterationControl::UntilConverged { .. }
902            ) {
903                let slots = moonpool_explorer::assertion_read_all();
904                for slot in &slots {
905                    if let Some(kind) = moonpool_explorer::AssertKind::from_u8(slot.kind)
906                        && matches!(
907                            kind,
908                            moonpool_explorer::AssertKind::Sometimes
909                                | moonpool_explorer::AssertKind::Reachable
910                        )
911                    {
912                        if slot.pass_count > 0 {
913                            reached_sometimes.insert(slot.msg.clone());
914                        } else if !reached_sometimes.contains(&slot.msg) {
915                            tracing::warn!(
916                                "UNREACHED slot: kind={:?} msg={:?} pass={} fail={}",
917                                kind,
918                                slot.msg,
919                                slot.pass_count,
920                                slot.fail_count
921                            );
922                        }
923                    }
924                }
925
926                // Check convergence from seed 2 onward (need baseline for coverage delta)
927                if iteration_count >= 2 {
928                    // Count unique message strings (not raw slots) to handle
929                    // any residual duplicate slots from the fork allocation race.
930                    let all_sometimes_count = slots
931                        .iter()
932                        .filter(|s| {
933                            moonpool_explorer::AssertKind::from_u8(s.kind)
934                                .map(|k| {
935                                    matches!(
936                                        k,
937                                        moonpool_explorer::AssertKind::Sometimes
938                                            | moonpool_explorer::AssertKind::Reachable
939                                    )
940                                })
941                                .unwrap_or(false)
942                        })
943                        .map(|s| s.msg.clone())
944                        .collect::<std::collections::HashSet<_>>()
945                        .len();
946                    let all_reached =
947                        all_sometimes_count > 0 && reached_sometimes.len() >= all_sometimes_count;
948
949                    let current_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
950                    let no_new_coverage = current_bits == prev_coverage_bits;
951
952                    tracing::warn!(
953                        "convergence: seed={} reached={}/{} coverage={}->{} delta={}",
954                        iteration_count,
955                        reached_sometimes.len(),
956                        all_sometimes_count,
957                        prev_coverage_bits,
958                        current_bits,
959                        current_bits.saturating_sub(prev_coverage_bits),
960                    );
961
962                    if all_reached && no_new_coverage {
963                        tracing::info!(
964                            "Converged after {} seeds: all {} sometimes reached, no new coverage",
965                            iteration_count,
966                            all_sometimes_count
967                        );
968                        converged = true;
969                    }
970                }
971            }
972
973            // Reset buggify state after each iteration to ensure clean state
974            crate::chaos::buggify_reset();
975
976            if converged {
977                break;
978            }
979        }
980
981        // End of main iteration loop
982        //
983        // Data collection: read ALL shared memory BEFORE any cleanup.
984        // cleanup() calls cleanup_assertions() which frees the assertion
985        // table and each-bucket table.
986
987        // 1. Read exploration-specific data (freed by cleanup)
988        let exploration_report = if self.exploration_config.is_some() {
989            let final_stats = moonpool_explorer::exploration_stats();
990            // The per-iteration capture above should have caught all recipes.
991            // No fallback needed since we capture after every iteration.
992            let coverage_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
993
994            Some(super::report::ExplorationReport {
995                total_timelines: total_exploration_timelines,
996                fork_points: total_exploration_fork_points,
997                bugs_found: total_exploration_bugs,
998                bug_recipes,
999                energy_remaining: final_stats.as_ref().map(|s| s.global_energy).unwrap_or(0),
1000                realloc_pool_remaining: final_stats
1001                    .as_ref()
1002                    .map(|s| s.realloc_pool_remaining)
1003                    .unwrap_or(0),
1004                coverage_bits,
1005                coverage_total: (moonpool_explorer::coverage::COVERAGE_MAP_SIZE * 8) as u32,
1006                sancov_edges_total: final_stats
1007                    .as_ref()
1008                    .map(|s| s.sancov_edges_total)
1009                    .unwrap_or(0),
1010                sancov_edges_covered: final_stats
1011                    .as_ref()
1012                    .map(|s| s.sancov_edges_covered)
1013                    .unwrap_or(0),
1014                converged,
1015                per_seed_timelines,
1016            })
1017        } else {
1018            None
1019        };
1020
1021        // 2. Read assertion + bucket data (freed by cleanup/cleanup_assertions)
1022        let assertion_results = crate::chaos::assertion_results();
1023        let (assertion_violations, coverage_violations) =
1024            crate::chaos::validate_assertion_contracts();
1025        let raw_assertion_slots = moonpool_explorer::assertion_read_all();
1026        let raw_each_buckets = moonpool_explorer::each_bucket_read_all();
1027
1028        // 3. Now safe to free all shared memory
1029        if self.exploration_config.is_some() {
1030            moonpool_explorer::cleanup();
1031        } else {
1032            moonpool_explorer::cleanup_assertions();
1033        }
1034
1035        // 4. Build rich assertion details from raw slot snapshots
1036        let assertion_details = build_assertion_details(&raw_assertion_slots);
1037
1038        // 5. Build bucket summaries by grouping EachBuckets by site
1039        let bucket_summaries = build_bucket_summaries(&raw_each_buckets);
1040
1041        let iteration_count = iteration_manager.current_iteration();
1042
1043        // Detect convergence timeout: UntilConverged was used but we didn't converge
1044        let convergence_timeout = matches!(
1045            self.iteration_control,
1046            IterationControl::UntilConverged { .. }
1047        ) && !converged;
1048
1049        // Final buggify reset to ensure no impact on subsequent code
1050        crate::chaos::buggify_reset();
1051
1052        metrics_collector.generate_report(
1053            iteration_count,
1054            iteration_manager.seeds_used().to_vec(),
1055            assertion_results,
1056            assertion_violations,
1057            coverage_violations,
1058            exploration_report,
1059            assertion_details,
1060            bucket_summaries,
1061            convergence_timeout,
1062        )
1063    }
1064}
1065
1066/// Build [`AssertionDetail`] vec from raw assertion slot snapshots.
1067fn build_assertion_details(
1068    slots: &[moonpool_explorer::AssertionSlotSnapshot],
1069) -> Vec<super::report::AssertionDetail> {
1070    use super::report::{AssertionDetail, AssertionStatus};
1071    use moonpool_explorer::AssertKind;
1072
1073    slots
1074        .iter()
1075        .filter_map(|slot| {
1076            let kind = AssertKind::from_u8(slot.kind)?;
1077            let total = slot.pass_count.saturating_add(slot.fail_count);
1078
1079            // Skip unvisited assertions
1080            if total == 0 && slot.frontier == 0 {
1081                return None;
1082            }
1083
1084            let status = match kind {
1085                AssertKind::Always
1086                | AssertKind::AlwaysOrUnreachable
1087                | AssertKind::NumericAlways => {
1088                    if slot.fail_count > 0 {
1089                        AssertionStatus::Fail
1090                    } else {
1091                        AssertionStatus::Pass
1092                    }
1093                }
1094                AssertKind::Sometimes | AssertKind::NumericSometimes => {
1095                    if slot.pass_count > 0 {
1096                        AssertionStatus::Pass
1097                    } else {
1098                        AssertionStatus::Miss
1099                    }
1100                }
1101                AssertKind::Reachable => {
1102                    if slot.pass_count > 0 {
1103                        AssertionStatus::Pass
1104                    } else {
1105                        AssertionStatus::Miss
1106                    }
1107                }
1108                AssertKind::Unreachable => {
1109                    if slot.pass_count > 0 {
1110                        AssertionStatus::Fail
1111                    } else {
1112                        AssertionStatus::Pass
1113                    }
1114                }
1115                AssertKind::BooleanSometimesAll => {
1116                    if slot.frontier > 0 {
1117                        AssertionStatus::Pass
1118                    } else {
1119                        AssertionStatus::Miss
1120                    }
1121                }
1122            };
1123
1124            Some(AssertionDetail {
1125                msg: slot.msg.clone(),
1126                kind,
1127                pass_count: slot.pass_count,
1128                fail_count: slot.fail_count,
1129                watermark: slot.watermark,
1130                frontier: slot.frontier,
1131                status,
1132            })
1133        })
1134        .collect()
1135}
1136
1137/// Build [`BucketSiteSummary`] vec by grouping [`EachBucket`]s by site message.
1138fn build_bucket_summaries(
1139    buckets: &[moonpool_explorer::EachBucket],
1140) -> Vec<super::report::BucketSiteSummary> {
1141    use super::report::BucketSiteSummary;
1142    use std::collections::HashMap;
1143
1144    let mut sites: HashMap<u32, BucketSiteSummary> = HashMap::new();
1145
1146    for bucket in buckets {
1147        let entry = sites
1148            .entry(bucket.site_hash)
1149            .or_insert_with(|| BucketSiteSummary {
1150                msg: bucket.msg_str().to_string(),
1151                buckets_discovered: 0,
1152                total_hits: 0,
1153            });
1154
1155        entry.buckets_discovered += 1;
1156        entry.total_hits += bucket.pass_count as u64;
1157    }
1158
1159    let mut summaries: Vec<_> = sites.into_values().collect();
1160    summaries.sort_by(|a, b| b.total_hits.cmp(&a.total_hits));
1161    summaries
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166    use super::*;
1167    use async_trait::async_trait;
1168    use moonpool_core::RandomProvider;
1169
1170    use crate::SimulationResult;
1171    use crate::runner::context::SimContext;
1172
1173    struct BasicWorkload;
1174
1175    #[async_trait(?Send)]
1176    impl Workload for BasicWorkload {
1177        fn name(&self) -> &str {
1178            "test_workload"
1179        }
1180
1181        async fn run(&mut self, _ctx: &SimContext) -> SimulationResult<()> {
1182            Ok(())
1183        }
1184    }
1185
1186    #[test]
1187    fn test_simulation_builder_basic() {
1188        let report = SimulationBuilder::new()
1189            .workload(BasicWorkload)
1190            .set_iterations(3)
1191            .set_debug_seeds(vec![1, 2, 3])
1192            .run();
1193
1194        assert_eq!(report.iterations, 3);
1195        assert_eq!(report.successful_runs, 3);
1196        assert_eq!(report.failed_runs, 0);
1197        assert_eq!(report.success_rate(), 100.0);
1198        assert_eq!(report.seeds_used, vec![1, 2, 3]);
1199    }
1200
1201    struct FailingWorkload;
1202
1203    #[async_trait(?Send)]
1204    impl Workload for FailingWorkload {
1205        fn name(&self) -> &str {
1206            "failing_workload"
1207        }
1208
1209        async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
1210            // Deterministic: fail if first random number is even
1211            let random_num: u32 = ctx.random().random_range(0..100);
1212            if random_num % 2 == 0 {
1213                return Err(crate::SimulationError::InvalidState(
1214                    "Test failure".to_string(),
1215                ));
1216            }
1217            Ok(())
1218        }
1219    }
1220
1221    #[test]
1222    fn test_simulation_builder_with_failures() {
1223        let report = SimulationBuilder::new()
1224            .workload(FailingWorkload)
1225            .set_iterations(10)
1226            .run();
1227
1228        assert_eq!(report.iterations, 10);
1229        assert_eq!(
1230            report.successful_runs + report.failed_runs,
1231            10,
1232            "all iterations should be accounted for"
1233        );
1234        assert!(
1235            report.failed_runs > 0,
1236            "expected at least one failure across 10 seeds"
1237        );
1238        assert!(
1239            report.successful_runs > 0,
1240            "expected at least one success across 10 seeds"
1241        );
1242    }
1243}