1use std::collections::HashMap;
7use std::ops::{Range, RangeInclusive};
8use std::time::{Duration, Instant};
9use tracing::instrument;
10
11use crate::chaos::invariant_trait::Invariant;
12use crate::runner::fault_injector::{FaultInjector, PhaseConfig};
13use crate::runner::process::{Attrition, Process};
14use crate::runner::tags::TagDistribution;
15use crate::runner::workload::Workload;
16
17use super::orchestrator::{IterationManager, MetricsCollector, WorkloadOrchestrator};
18
19#[derive(Debug, Clone, Copy)]
21pub(crate) struct WorkloadClientInfo {
22 pub(crate) client_id: usize,
24 pub(crate) client_count: usize,
26}
27
28struct ResolvedEntries {
30 workloads: Vec<Box<dyn Workload>>,
31 return_map: Vec<Option<usize>>,
34 client_info: Vec<WorkloadClientInfo>,
36}
37use super::report::{SimulationMetrics, SimulationReport};
38
39#[derive(Debug, Clone)]
43pub enum IterationControl {
44 FixedCount(usize),
46 TimeLimit(Duration),
48 UntilConverged {
52 max_iterations: usize,
54 },
55}
56
57#[derive(Debug, Clone)]
72pub enum WorkloadCount {
73 Fixed(usize),
75 Random(Range<usize>),
78}
79
80impl WorkloadCount {
81 fn resolve(&self) -> usize {
84 match self {
85 WorkloadCount::Fixed(n) => *n,
86 WorkloadCount::Random(range) => crate::sim::sim_random_range(range.clone()),
87 }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq)]
110pub enum ClientId {
111 Fixed(usize),
114 RandomRange(Range<usize>),
118}
119
120impl Default for ClientId {
121 fn default() -> Self {
122 Self::Fixed(0)
123 }
124}
125
126impl ClientId {
127 fn resolve(&self, index: usize) -> usize {
129 match self {
130 ClientId::Fixed(base) => base + index,
131 ClientId::RandomRange(range) => crate::sim::sim_random_range(range.clone()),
132 }
133 }
134}
135
136#[derive(Debug, Clone, PartialEq)]
151pub enum ProcessCount {
152 Fixed(usize),
154 Range(RangeInclusive<usize>),
157}
158
159impl ProcessCount {
160 fn resolve(&self) -> usize {
162 match self {
163 ProcessCount::Fixed(n) => *n,
164 ProcessCount::Range(range) => {
165 let start = *range.start();
166 let end = *range.end() + 1; if start >= end {
168 return start;
169 }
170 crate::sim::sim_random_range(start..end)
171 }
172 }
173 }
174}
175
176impl From<usize> for ProcessCount {
177 fn from(n: usize) -> Self {
178 ProcessCount::Fixed(n)
179 }
180}
181
182impl From<RangeInclusive<usize>> for ProcessCount {
183 fn from(range: RangeInclusive<usize>) -> Self {
184 ProcessCount::Range(range)
185 }
186}
187
188pub(crate) struct ProcessEntry {
190 pub(crate) count: ProcessCount,
191 pub(crate) factory: Box<dyn Fn() -> Box<dyn Process>>,
192 pub(crate) tags: TagDistribution,
193 pub(crate) name: String,
194}
195
196enum WorkloadEntry {
198 Instance(Option<Box<dyn Workload>>, ClientId),
200 Factory {
202 count: WorkloadCount,
203 client_id: ClientId,
204 factory: Box<dyn Fn(usize) -> Box<dyn Workload>>,
205 },
206}
207
208pub struct SimulationBuilder {
210 iteration_control: IterationControl,
211 entries: Vec<WorkloadEntry>,
212 process_entry: Option<ProcessEntry>,
213 attrition: Option<Attrition>,
214 seeds: Vec<u64>,
215 use_random_config: bool,
216 invariants: Vec<Box<dyn Invariant>>,
217 fault_injectors: Vec<Box<dyn FaultInjector>>,
218 phase_config: Option<PhaseConfig>,
219 exploration_config: Option<moonpool_explorer::ExplorationConfig>,
220 replay_recipe: Option<super::report::BugRecipe>,
221 before_iteration_hooks: Vec<Box<dyn FnMut()>>,
222}
223
224impl Default for SimulationBuilder {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230impl SimulationBuilder {
231 pub fn new() -> Self {
233 Self {
234 iteration_control: IterationControl::FixedCount(1),
235 entries: Vec::new(),
236 process_entry: None,
237 attrition: None,
238 seeds: Vec::new(),
239 use_random_config: false,
240 invariants: Vec::new(),
241 fault_injectors: Vec::new(),
242 phase_config: None,
243 exploration_config: None,
244 replay_recipe: None,
245 before_iteration_hooks: Vec::new(),
246 }
247 }
248
249 pub fn workload(mut self, w: impl Workload) -> Self {
254 self.entries.push(WorkloadEntry::Instance(
255 Some(Box::new(w)),
256 ClientId::default(),
257 ));
258 self
259 }
260
261 pub fn workload_with_client_id(mut self, client_id: ClientId, w: impl Workload) -> Self {
266 self.entries
267 .push(WorkloadEntry::Instance(Some(Box::new(w)), client_id));
268 self
269 }
270
271 pub fn processes(
293 mut self,
294 count: impl Into<ProcessCount>,
295 factory: impl Fn() -> Box<dyn Process> + 'static,
296 ) -> Self {
297 let sample = factory();
298 let name = sample.name().to_string();
299 drop(sample);
300 self.process_entry = Some(ProcessEntry {
301 count: count.into(),
302 factory: Box::new(factory),
303 tags: TagDistribution::new(),
304 name,
305 });
306 self
307 }
308
309 pub fn tags(mut self, dimensions: &[(&str, &[&str])]) -> Self {
329 let entry = self
330 .process_entry
331 .as_mut()
332 .expect("tags() must be called after processes()");
333 for (key, values) in dimensions {
334 entry.tags.add(key, values);
335 }
336 self
337 }
338
339 pub fn attrition(mut self, config: Attrition) -> Self {
349 self.attrition = Some(config);
350 self
351 }
352
353 pub fn workloads(
372 mut self,
373 count: WorkloadCount,
374 factory: impl Fn(usize) -> Box<dyn Workload> + 'static,
375 ) -> Self {
376 self.entries.push(WorkloadEntry::Factory {
377 count,
378 client_id: ClientId::default(),
379 factory: Box::new(factory),
380 });
381 self
382 }
383
384 pub fn workloads_with_client_id(
400 mut self,
401 count: WorkloadCount,
402 client_id: ClientId,
403 factory: impl Fn(usize) -> Box<dyn Workload> + 'static,
404 ) -> Self {
405 self.entries.push(WorkloadEntry::Factory {
406 count,
407 client_id,
408 factory: Box::new(factory),
409 });
410 self
411 }
412
413 pub fn invariant(mut self, i: impl Invariant) -> Self {
415 self.invariants.push(Box::new(i));
416 self
417 }
418
419 pub fn invariant_fn(
421 mut self,
422 name: impl Into<String>,
423 f: impl Fn(&crate::chaos::StateHandle, u64) + 'static,
424 ) -> Self {
425 self.invariants.push(crate::chaos::invariant_fn(name, f));
426 self
427 }
428
429 pub fn fault(mut self, f: impl FaultInjector) -> Self {
431 self.fault_injectors.push(Box::new(f));
432 self
433 }
434
435 pub fn phases(mut self, config: PhaseConfig) -> Self {
437 self.phase_config = Some(config);
438 self
439 }
440
441 pub fn set_iterations(mut self, iterations: usize) -> Self {
443 self.iteration_control = IterationControl::FixedCount(iterations);
444 self
445 }
446
447 pub fn set_iteration_control(mut self, control: IterationControl) -> Self {
449 self.iteration_control = control;
450 self
451 }
452
453 pub fn set_time_limit(mut self, duration: Duration) -> Self {
455 self.iteration_control = IterationControl::TimeLimit(duration);
456 self
457 }
458
459 pub fn until_converged(mut self, max_iterations: usize) -> Self {
465 self.iteration_control = IterationControl::UntilConverged { max_iterations };
466 self
467 }
468
469 pub fn before_iteration(mut self, f: impl FnMut() + 'static) -> Self {
474 self.before_iteration_hooks.push(Box::new(f));
475 self
476 }
477
478 pub fn set_debug_seeds(mut self, seeds: Vec<u64>) -> Self {
480 self.seeds = seeds;
481 self
482 }
483
484 pub fn random_network(mut self) -> Self {
486 self.use_random_config = true;
487 self
488 }
489
490 pub fn enable_exploration(mut self, config: moonpool_explorer::ExplorationConfig) -> Self {
495 self.exploration_config = Some(config);
496 self
497 }
498
499 pub fn replay_recipe(mut self, recipe: super::report::BugRecipe) -> Self {
504 self.replay_recipe = Some(recipe);
505 self
506 }
507
508 fn resolve_entries(&mut self) -> ResolvedEntries {
510 let mut workloads = Vec::new();
511 let mut return_map = Vec::new();
512 let mut client_info = Vec::new();
513
514 for (entry_idx, entry) in self.entries.iter_mut().enumerate() {
515 match entry {
516 WorkloadEntry::Instance(opt, cid) => {
517 if let Some(w) = opt.take() {
518 return_map.push(Some(entry_idx));
519 client_info.push(WorkloadClientInfo {
520 client_id: cid.resolve(0),
521 client_count: 1,
522 });
523 workloads.push(w);
524 }
525 }
526 WorkloadEntry::Factory {
527 count,
528 client_id,
529 factory,
530 } => {
531 let n = count.resolve();
532 for i in 0..n {
533 return_map.push(None);
534 client_info.push(WorkloadClientInfo {
535 client_id: client_id.resolve(i),
536 client_count: n,
537 });
538 workloads.push(factory(i));
539 }
540 }
541 }
542 }
543
544 ResolvedEntries {
545 workloads,
546 return_map,
547 client_info,
548 }
549 }
550
551 fn return_entries(
553 &mut self,
554 workloads: Vec<Box<dyn Workload>>,
555 return_map: Vec<Option<usize>>,
556 ) {
557 for (w, slot) in workloads.into_iter().zip(return_map) {
558 if let Some(entry_idx) = slot
559 && let WorkloadEntry::Instance(opt, _) = &mut self.entries[entry_idx]
560 {
561 *opt = Some(w);
562 }
563 }
565 }
566
567 #[instrument(skip_all)]
568 pub fn run(mut self) -> SimulationReport {
573 if self.entries.is_empty() {
574 return SimulationReport {
575 iterations: 0,
576 successful_runs: 0,
577 failed_runs: 0,
578 metrics: SimulationMetrics::default(),
579 individual_metrics: Vec::new(),
580 seeds_used: Vec::new(),
581 seeds_failing: Vec::new(),
582 assertion_results: HashMap::new(),
583 assertion_violations: Vec::new(),
584 coverage_violations: Vec::new(),
585 exploration: None,
586 assertion_details: Vec::new(),
587 bucket_summaries: Vec::new(),
588 convergence_timeout: false,
589 };
590 }
591
592 let mut iteration_manager =
594 IterationManager::new(self.iteration_control.clone(), self.seeds.clone());
595 let mut metrics_collector = MetricsCollector::new();
596
597 let mut total_exploration_timelines: u64 = 0;
599 let mut total_exploration_fork_points: u64 = 0;
600 let mut total_exploration_bugs: u64 = 0;
601 let mut bug_recipes: Vec<super::report::BugRecipe> = Vec::new();
602 let mut per_seed_timelines: Vec<u64> = Vec::new();
603
604 let mut reached_sometimes: std::collections::HashSet<String> =
606 std::collections::HashSet::new();
607 let mut prev_coverage_bits: u32 = 0;
608 let mut converged = false;
609
610 if let Err(e) = moonpool_explorer::init_assertions() {
612 tracing::error!("Failed to initialize assertion table: {}", e);
613 }
614
615 if let Some(ref config) = self.exploration_config {
617 moonpool_explorer::set_rng_hooks(crate::sim::get_rng_call_count, |seed| {
618 crate::sim::set_sim_seed(seed);
619 crate::sim::reset_rng_call_count();
620 });
621 if let Err(e) = moonpool_explorer::init(config.clone()) {
622 tracing::error!("Failed to initialize exploration: {}", e);
623 }
624 }
625
626 if matches!(
628 self.iteration_control,
629 IterationControl::UntilConverged { .. }
630 ) && self.exploration_config.is_none()
631 {
632 panic!(
633 "IterationControl::UntilConverged requires enable_exploration() to be configured"
634 );
635 }
636
637 while iteration_manager.should_continue() {
638 let seed = iteration_manager.next_iteration();
639 let iteration_count = iteration_manager.current_iteration();
640
641 if iteration_count > 1 {
645 if let Some(ref config) = self.exploration_config {
646 moonpool_explorer::prepare_next_seed(config.global_energy);
647 }
648 crate::chaos::assertions::skip_next_assertion_reset();
649 }
650
651 for hook in &mut self.before_iteration_hooks {
653 hook();
654 }
655
656 crate::sim::reset_sim_rng();
658 crate::sim::set_sim_seed(seed);
659 crate::chaos::reset_always_violations();
660
661 crate::chaos::buggify_init(0.5, 0.25);
664
665 if matches!(
667 self.iteration_control,
668 IterationControl::UntilConverged { .. }
669 ) {
670 prev_coverage_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
671 }
672
673 let ResolvedEntries {
676 workloads,
677 return_map,
678 client_info,
679 } = self.resolve_entries();
680
681 let workload_info: Vec<(String, String)> = workloads
683 .iter()
684 .enumerate()
685 .map(|(i, w)| (w.name().to_string(), format!("10.0.0.{}", i + 1)))
686 .collect();
687
688 let process_config = self.process_entry.as_ref().map(
690 |entry| -> super::orchestrator::ProcessConfig<'_> {
691 let count = entry.count.resolve();
692 let mut registry = crate::runner::tags::TagRegistry::new();
693 let mut ips = Vec::with_capacity(count);
694 let mut info = Vec::with_capacity(count);
695 let base_name = &entry.name;
696 for i in 0..count {
697 let ip = format!("10.0.1.{}", i + 1);
698 let ip_addr: std::net::IpAddr = ip.parse().expect("valid process IP");
699 let tags = entry.tags.resolve(i);
700 registry.register(ip_addr, tags);
701 ips.push(ip.clone());
702 let name = if count == 1 {
703 base_name.clone()
704 } else {
705 format!("{}-{}", base_name, i)
706 };
707 info.push((name, ip));
708 }
709 super::orchestrator::ProcessConfig {
710 factory: &*entry.factory,
711 info,
712 ips,
713 tag_registry: registry,
714 }
715 },
716 );
717
718 let network_config = if self.use_random_config {
720 crate::NetworkConfiguration::random_for_seed()
721 } else {
722 crate::NetworkConfiguration::default()
723 };
724
725 let sim = crate::sim::SimWorld::new_with_network_config_and_seed(network_config, seed);
727
728 if let Some(ref br) = self.replay_recipe {
730 crate::sim::set_rng_breakpoints(br.recipe.clone());
731 }
732
733 let start_time = Instant::now();
734
735 let mut fault_injectors = std::mem::take(&mut self.fault_injectors);
737
738 if let Some(ref attrition) = self.attrition {
740 fault_injectors.push(Box::new(
741 crate::runner::fault_injector::AttritionInjector::new(attrition.clone()),
742 ));
743 }
744
745 let mut seed_bytes = [0u8; 32];
749 seed_bytes[..8].copy_from_slice(&seed.to_le_bytes());
750 let rng_seed = tokio::runtime::RngSeed::from_bytes(&seed_bytes);
751
752 let local_runtime = tokio::runtime::Builder::new_current_thread()
753 .enable_time()
754 .rng_seed(rng_seed)
755 .build_local(Default::default())
756 .expect("per-iteration runtime");
757
758 let invariants_ref = &self.invariants;
760 let phase_ref = self.phase_config.as_ref();
761
762 let orchestration_result = local_runtime.block_on(async move {
764 WorkloadOrchestrator::orchestrate_workloads(
765 workloads,
766 fault_injectors,
767 invariants_ref,
768 &workload_info,
769 &client_info,
770 process_config,
771 seed,
772 sim,
773 phase_ref,
774 iteration_count,
775 )
776 .await
777 });
778
779 match orchestration_result {
780 Ok((returned_workloads, returned_injectors, all_results, sim_metrics)) => {
781 self.return_entries(returned_workloads, return_map);
783 self.fault_injectors = returned_injectors;
784
785 let wall_time = start_time.elapsed();
786 let has_violations = crate::chaos::has_always_violations();
787
788 metrics_collector.record_iteration(
789 seed,
790 wall_time,
791 &all_results,
792 has_violations,
793 sim_metrics,
794 );
795 }
796 Err((faulty_seeds_from_deadlock, failed_count)) => {
797 metrics_collector.add_faulty_seeds(faulty_seeds_from_deadlock);
799 metrics_collector.add_failed_runs(failed_count);
800
801 let assertion_results = crate::chaos::get_assertion_results();
803 let (assertion_violations, coverage_violations) =
804 crate::chaos::validate_assertion_contracts();
805 crate::chaos::buggify_reset();
806
807 return metrics_collector.generate_report(
808 iteration_count,
809 iteration_manager.seeds_used().to_vec(),
810 assertion_results,
811 assertion_violations,
812 coverage_violations,
813 None,
814 Vec::new(),
815 Vec::new(),
816 false,
817 );
818 }
819 }
820
821 if self.exploration_config.is_some() {
823 if let Some(stats) = moonpool_explorer::get_exploration_stats() {
824 per_seed_timelines.push(stats.total_timelines);
825 total_exploration_timelines += stats.total_timelines;
826 total_exploration_fork_points += stats.fork_points;
827 total_exploration_bugs += stats.bug_found;
828 } else {
829 per_seed_timelines.push(0);
830 }
831 if let Some(recipe) = moonpool_explorer::get_bug_recipe() {
832 bug_recipes.push(super::report::BugRecipe { seed, recipe });
833 }
834 }
835
836 if matches!(
839 self.iteration_control,
840 IterationControl::UntilConverged { .. }
841 ) {
842 let slots = moonpool_explorer::assertion_read_all();
843 for slot in &slots {
844 if let Some(kind) = moonpool_explorer::AssertKind::from_u8(slot.kind)
845 && matches!(
846 kind,
847 moonpool_explorer::AssertKind::Sometimes
848 | moonpool_explorer::AssertKind::Reachable
849 )
850 {
851 if slot.pass_count > 0 {
852 reached_sometimes.insert(slot.msg.clone());
853 } else if !reached_sometimes.contains(&slot.msg) {
854 tracing::warn!(
855 "UNREACHED slot: kind={:?} msg={:?} pass={} fail={}",
856 kind,
857 slot.msg,
858 slot.pass_count,
859 slot.fail_count
860 );
861 }
862 }
863 }
864
865 if iteration_count >= 2 {
867 let all_sometimes_count = slots
870 .iter()
871 .filter(|s| {
872 moonpool_explorer::AssertKind::from_u8(s.kind)
873 .map(|k| {
874 matches!(
875 k,
876 moonpool_explorer::AssertKind::Sometimes
877 | moonpool_explorer::AssertKind::Reachable
878 )
879 })
880 .unwrap_or(false)
881 })
882 .map(|s| s.msg.clone())
883 .collect::<std::collections::HashSet<_>>()
884 .len();
885 let all_reached =
886 all_sometimes_count > 0 && reached_sometimes.len() >= all_sometimes_count;
887
888 let current_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
889 let no_new_coverage = current_bits == prev_coverage_bits;
890
891 tracing::warn!(
892 "convergence: seed={} reached={}/{} coverage={}->{} delta={}",
893 iteration_count,
894 reached_sometimes.len(),
895 all_sometimes_count,
896 prev_coverage_bits,
897 current_bits,
898 current_bits.saturating_sub(prev_coverage_bits),
899 );
900
901 if all_reached && no_new_coverage {
902 tracing::info!(
903 "Converged after {} seeds: all {} sometimes reached, no new coverage",
904 iteration_count,
905 all_sometimes_count
906 );
907 converged = true;
908 }
909 }
910 }
911
912 crate::chaos::buggify_reset();
914
915 if converged {
916 break;
917 }
918 }
919
920 let exploration_report = if self.exploration_config.is_some() {
928 let final_stats = moonpool_explorer::get_exploration_stats();
929 let coverage_bits = moonpool_explorer::explored_map_bits_set().unwrap_or(0);
932
933 Some(super::report::ExplorationReport {
934 total_timelines: total_exploration_timelines,
935 fork_points: total_exploration_fork_points,
936 bugs_found: total_exploration_bugs,
937 bug_recipes,
938 energy_remaining: final_stats.as_ref().map(|s| s.global_energy).unwrap_or(0),
939 realloc_pool_remaining: final_stats
940 .as_ref()
941 .map(|s| s.realloc_pool_remaining)
942 .unwrap_or(0),
943 coverage_bits,
944 coverage_total: (moonpool_explorer::coverage::COVERAGE_MAP_SIZE * 8) as u32,
945 sancov_edges_total: final_stats
946 .as_ref()
947 .map(|s| s.sancov_edges_total)
948 .unwrap_or(0),
949 sancov_edges_covered: final_stats
950 .as_ref()
951 .map(|s| s.sancov_edges_covered)
952 .unwrap_or(0),
953 converged,
954 per_seed_timelines,
955 })
956 } else {
957 None
958 };
959
960 let assertion_results = crate::chaos::get_assertion_results();
962 let (assertion_violations, coverage_violations) =
963 crate::chaos::validate_assertion_contracts();
964 let raw_assertion_slots = moonpool_explorer::assertion_read_all();
965 let raw_each_buckets = moonpool_explorer::each_bucket_read_all();
966
967 if self.exploration_config.is_some() {
969 moonpool_explorer::cleanup();
970 } else {
971 moonpool_explorer::cleanup_assertions();
972 }
973
974 let assertion_details = build_assertion_details(&raw_assertion_slots);
976
977 let bucket_summaries = build_bucket_summaries(&raw_each_buckets);
979
980 let iteration_count = iteration_manager.current_iteration();
981
982 let convergence_timeout = matches!(
984 self.iteration_control,
985 IterationControl::UntilConverged { .. }
986 ) && !converged;
987
988 crate::chaos::buggify_reset();
990
991 metrics_collector.generate_report(
992 iteration_count,
993 iteration_manager.seeds_used().to_vec(),
994 assertion_results,
995 assertion_violations,
996 coverage_violations,
997 exploration_report,
998 assertion_details,
999 bucket_summaries,
1000 convergence_timeout,
1001 )
1002 }
1003}
1004
1005fn build_assertion_details(
1007 slots: &[moonpool_explorer::AssertionSlotSnapshot],
1008) -> Vec<super::report::AssertionDetail> {
1009 use super::report::{AssertionDetail, AssertionStatus};
1010 use moonpool_explorer::AssertKind;
1011
1012 slots
1013 .iter()
1014 .filter_map(|slot| {
1015 let kind = AssertKind::from_u8(slot.kind)?;
1016 let total = slot.pass_count.saturating_add(slot.fail_count);
1017
1018 if total == 0 && slot.frontier == 0 {
1020 return None;
1021 }
1022
1023 let status = match kind {
1024 AssertKind::Always
1025 | AssertKind::AlwaysOrUnreachable
1026 | AssertKind::NumericAlways => {
1027 if slot.fail_count > 0 {
1028 AssertionStatus::Fail
1029 } else {
1030 AssertionStatus::Pass
1031 }
1032 }
1033 AssertKind::Sometimes | AssertKind::NumericSometimes => {
1034 if slot.pass_count > 0 {
1035 AssertionStatus::Pass
1036 } else {
1037 AssertionStatus::Miss
1038 }
1039 }
1040 AssertKind::Reachable => {
1041 if slot.pass_count > 0 {
1042 AssertionStatus::Pass
1043 } else {
1044 AssertionStatus::Miss
1045 }
1046 }
1047 AssertKind::Unreachable => {
1048 if slot.pass_count > 0 {
1049 AssertionStatus::Fail
1050 } else {
1051 AssertionStatus::Pass
1052 }
1053 }
1054 AssertKind::BooleanSometimesAll => {
1055 if slot.frontier > 0 {
1056 AssertionStatus::Pass
1057 } else {
1058 AssertionStatus::Miss
1059 }
1060 }
1061 };
1062
1063 Some(AssertionDetail {
1064 msg: slot.msg.clone(),
1065 kind,
1066 pass_count: slot.pass_count,
1067 fail_count: slot.fail_count,
1068 watermark: slot.watermark,
1069 frontier: slot.frontier,
1070 status,
1071 })
1072 })
1073 .collect()
1074}
1075
1076fn build_bucket_summaries(
1078 buckets: &[moonpool_explorer::EachBucket],
1079) -> Vec<super::report::BucketSiteSummary> {
1080 use super::report::BucketSiteSummary;
1081 use std::collections::HashMap;
1082
1083 let mut sites: HashMap<u32, BucketSiteSummary> = HashMap::new();
1084
1085 for bucket in buckets {
1086 let entry = sites
1087 .entry(bucket.site_hash)
1088 .or_insert_with(|| BucketSiteSummary {
1089 msg: bucket.msg_str().to_string(),
1090 buckets_discovered: 0,
1091 total_hits: 0,
1092 });
1093
1094 entry.buckets_discovered += 1;
1095 entry.total_hits += bucket.pass_count as u64;
1096 }
1097
1098 let mut summaries: Vec<_> = sites.into_values().collect();
1099 summaries.sort_by(|a, b| b.total_hits.cmp(&a.total_hits));
1100 summaries
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use super::*;
1106 use async_trait::async_trait;
1107 use moonpool_core::RandomProvider;
1108
1109 use crate::SimulationResult;
1110 use crate::runner::context::SimContext;
1111
1112 struct BasicWorkload;
1113
1114 #[async_trait(?Send)]
1115 impl Workload for BasicWorkload {
1116 fn name(&self) -> &str {
1117 "test_workload"
1118 }
1119
1120 async fn run(&mut self, _ctx: &SimContext) -> SimulationResult<()> {
1121 Ok(())
1122 }
1123 }
1124
1125 #[test]
1126 fn test_simulation_builder_basic() {
1127 let report = SimulationBuilder::new()
1128 .workload(BasicWorkload)
1129 .set_iterations(3)
1130 .set_debug_seeds(vec![1, 2, 3])
1131 .run();
1132
1133 assert_eq!(report.iterations, 3);
1134 assert_eq!(report.successful_runs, 3);
1135 assert_eq!(report.failed_runs, 0);
1136 assert_eq!(report.success_rate(), 100.0);
1137 assert_eq!(report.seeds_used, vec![1, 2, 3]);
1138 }
1139
1140 struct FailingWorkload;
1141
1142 #[async_trait(?Send)]
1143 impl Workload for FailingWorkload {
1144 fn name(&self) -> &str {
1145 "failing_workload"
1146 }
1147
1148 async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
1149 let random_num: u32 = ctx.random().random_range(0..100);
1151 if random_num % 2 == 0 {
1152 return Err(crate::SimulationError::InvalidState(
1153 "Test failure".to_string(),
1154 ));
1155 }
1156 Ok(())
1157 }
1158 }
1159
1160 #[test]
1161 fn test_simulation_builder_with_failures() {
1162 let report = SimulationBuilder::new()
1163 .workload(FailingWorkload)
1164 .set_iterations(10)
1165 .run();
1166
1167 assert_eq!(report.iterations, 10);
1168 assert_eq!(
1169 report.successful_runs + report.failed_runs,
1170 10,
1171 "all iterations should be accounted for"
1172 );
1173 assert!(
1174 report.failed_runs > 0,
1175 "expected at least one failure across 10 seeds"
1176 );
1177 assert!(
1178 report.successful_runs > 0,
1179 "expected at least one success across 10 seeds"
1180 );
1181 }
1182}