1#![forbid(unsafe_code)]
2pub mod agent;
5pub mod campaign;
6pub mod clock;
7pub mod network;
8pub mod oracle;
9pub mod rng;
10
11use anyhow::{Result, bail};
12use serde::{Deserialize, Serialize};
13
14use crate::agent::{AgentId, AgentState, SimulatedAgent};
15use crate::clock::{ClockConfig, ClockSpec, SimulatedClock};
16use crate::network::{FaultConfig, NetworkMessage, SimulatedNetwork};
17use crate::oracle::{ConvergenceOracle, ConvergenceReport};
18use crate::rng::DeterministicRng;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22pub enum DropReason {
23 RandomLoss,
25 Partition,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub struct TraceEvent {
32 pub round: u64,
34 pub kind: TraceEventKind,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub enum TraceEventKind {
41 Emit {
43 agent: AgentId,
45 event_id: u64,
47 seq: u64,
49 },
50 Send {
52 from: AgentId,
54 to: AgentId,
56 event_id: u64,
58 delay_rounds: u8,
60 duplicated: bool,
62 },
63 Drop {
65 from: AgentId,
67 to: AgentId,
69 event_id: u64,
71 reason: DropReason,
73 },
74 Deliver {
76 from: AgentId,
78 to: AgentId,
80 event_id: u64,
82 },
83 Reorder {
85 delivered_count: usize,
87 },
88 Partition {
90 agent: AgentId,
92 isolated: bool,
94 },
95 ClockFreeze {
97 agent: AgentId,
99 until_round: u64,
101 },
102 Reconcile {
104 agent_a: AgentId,
106 agent_b: AgentId,
108 a_to_b: usize,
110 b_to_a: usize,
112 },
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
117pub struct SimulationConfig {
118 pub seed: u64,
120 pub agent_count: usize,
122 pub rounds: u64,
124 pub fanout: usize,
126 pub fault: FaultConfig,
128 pub clock: ClockConfig,
130 #[serde(default = "default_reconciliation_rounds")]
132 pub reconciliation_rounds: u8,
133}
134
135const fn default_reconciliation_rounds() -> u8 {
136 3
137}
138
139impl Default for SimulationConfig {
140 fn default() -> Self {
141 Self {
142 seed: 42,
143 agent_count: 4,
144 rounds: 24,
145 fanout: 2,
146 fault: FaultConfig::default(),
147 clock: ClockConfig::default(),
148 reconciliation_rounds: default_reconciliation_rounds(),
149 }
150 }
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
155pub struct SeedReplay {
156 pub config: SimulationConfig,
158}
159
160impl SeedReplay {
161 #[must_use]
163 pub fn from_config(config: &SimulationConfig) -> Self {
164 Self {
165 config: config.clone(),
166 }
167 }
168
169 pub fn replay(&self) -> Result<SimulationResult> {
175 let mut simulator = Simulator::new(self.config.clone())?;
176 simulator.run()
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct SimulationResult {
183 pub trace: Vec<TraceEvent>,
185 pub states: Vec<AgentState>,
187 pub convergence: ConvergenceReport,
189 pub interesting_state_reached: bool,
191}
192
193impl SimulationResult {
194 #[must_use]
196 pub fn trace_fingerprint(&self) -> u64 {
197 self.trace.iter().fold(0_u64, |acc, item| {
198 let encoded = format!("{item:?}");
199 encoded.as_bytes().iter().fold(acc, |inner, byte| {
200 inner.wrapping_mul(131).wrapping_add(u64::from(*byte))
201 })
202 })
203 }
204}
205
206pub struct Simulator {
208 config: SimulationConfig,
209 agents: Vec<SimulatedAgent>,
210 clocks: Vec<SimulatedClock>,
211 clock_unfreeze_round: Vec<Option<u64>>,
212 network: SimulatedNetwork,
213 partitioned: Vec<bool>,
214 rng: DeterministicRng,
215}
216
217impl Simulator {
218 pub fn new(config: SimulationConfig) -> Result<Self> {
224 if config.agent_count == 0 {
225 bail!("agent_count must be > 0");
226 }
227 if config.rounds == 0 {
228 bail!("rounds must be > 0");
229 }
230
231 let mut rng = DeterministicRng::new(config.seed);
232 let mut clocks = Vec::with_capacity(config.agent_count);
233
234 for _ in 0..config.agent_count {
235 let drift_ppm = sample_signed_i32(&mut rng, config.clock.max_abs_drift_ppm);
236 let skew_millis = sample_signed_i64(&mut rng, config.clock.max_abs_skew_millis);
237 clocks.push(SimulatedClock::new(ClockSpec {
238 base_millis: config.clock.base_millis,
239 tick_millis: config.clock.tick_millis,
240 drift_ppm,
241 skew_millis,
242 }));
243 }
244
245 let agents = (0..config.agent_count)
246 .map(SimulatedAgent::new)
247 .collect::<Vec<_>>();
248
249 Ok(Self {
250 clock_unfreeze_round: vec![None; config.agent_count],
251 partitioned: vec![false; config.agent_count],
252 network: SimulatedNetwork::new(config.fault),
253 config,
254 agents,
255 clocks,
256 rng,
257 })
258 }
259
260 pub fn run(&mut self) -> Result<SimulationResult> {
266 let mut trace = Vec::new();
267
268 for round in 0..self.config.rounds {
269 self.progress_clock_freezes(round);
270 self.maybe_toggle_partition(round, &mut trace);
271 self.maybe_freeze_clock(round, &mut trace);
272
273 for agent_idx in 0..self.agents.len() {
274 let emitted = self.agents[agent_idx].emit_event();
275 trace.push(TraceEvent {
276 round,
277 kind: TraceEventKind::Emit {
278 agent: emitted.source,
279 event_id: emitted.event_id,
280 seq: emitted.seq,
281 },
282 });
283
284 let targets = self.pick_targets(agent_idx);
285 for target in targets {
286 let message = NetworkMessage {
287 from: agent_idx,
288 to: target,
289 event_id: emitted.event_id,
290 seq: emitted.seq,
291 };
292
293 let partition_blocked = self.network.is_partitioned(message.from)
294 || self.network.is_partitioned(message.to);
295
296 let outcome = self.network.send(message, round, &mut self.rng);
297
298 if outcome.dropped {
299 trace.push(TraceEvent {
300 round,
301 kind: TraceEventKind::Drop {
302 from: message.from,
303 to: message.to,
304 event_id: message.event_id,
305 reason: if partition_blocked {
306 DropReason::Partition
307 } else {
308 DropReason::RandomLoss
309 },
310 },
311 });
312 } else {
313 trace.push(TraceEvent {
314 round,
315 kind: TraceEventKind::Send {
316 from: message.from,
317 to: message.to,
318 event_id: message.event_id,
319 delay_rounds: outcome.delay_rounds,
320 duplicated: outcome.duplicated,
321 },
322 });
323 }
324 }
325 }
326
327 self.deliver_round(round, &mut trace);
328 }
329
330 self.final_drain(&mut trace);
331 self.reconcile(&mut trace);
332
333 let states = self
334 .agents
335 .iter()
336 .map(SimulatedAgent::snapshot)
337 .collect::<Vec<_>>();
338
339 let convergence = ConvergenceOracle::evaluate(&states);
340 let interesting_state_reached = trace.iter().any(|event| {
341 matches!(
342 event.kind,
343 TraceEventKind::Drop { .. }
344 | TraceEventKind::Reorder { .. }
345 | TraceEventKind::Partition { .. }
346 | TraceEventKind::ClockFreeze { .. }
347 )
348 });
349
350 Ok(SimulationResult {
351 trace,
352 states,
353 convergence,
354 interesting_state_reached,
355 })
356 }
357
358 fn deliver_round(&mut self, round: u64, trace: &mut Vec<TraceEvent>) {
359 let delivered = self.network.deliver_ready(round, &mut self.rng);
360
361 if delivered.reordered {
362 trace.push(TraceEvent {
363 round,
364 kind: TraceEventKind::Reorder {
365 delivered_count: delivered.delivered.len(),
366 },
367 });
368 }
369
370 for message in delivered.delivered {
371 if let Some(agent) = self.agents.get_mut(message.to) {
372 agent.observe_event(message.event_id);
373 }
374 trace.push(TraceEvent {
375 round,
376 kind: TraceEventKind::Deliver {
377 from: message.from,
378 to: message.to,
379 event_id: message.event_id,
380 },
381 });
382 }
383 }
384
385 fn final_drain(&mut self, trace: &mut Vec<TraceEvent>) {
386 let mut drain_round = self.config.rounds;
387 let drain_limit = self.config.rounds.saturating_add(1_000);
388
389 while self.network.pending_len() > 0 && drain_round < drain_limit {
390 self.deliver_round(drain_round, trace);
391 drain_round = drain_round.saturating_add(1);
392 }
393 }
394
395 fn reconcile(&mut self, trace: &mut Vec<TraceEvent>) {
396 if self.config.reconciliation_rounds == 0 {
397 return;
398 }
399
400 let n = self.agents.len();
401 if n < 2 {
402 return;
403 }
404
405 for round_offset in 0..u64::from(self.config.reconciliation_rounds) {
406 if self.all_agents_converged() {
408 break;
409 }
410
411 let synthetic_round = self
412 .config
413 .rounds
414 .saturating_add(1001)
415 .saturating_add(round_offset);
416
417 let len_u64 = u64::try_from(n).unwrap_or(1);
419 for a in 0..n {
420 let mut b = usize::try_from(self.rng.next_bounded(len_u64)).unwrap_or(0);
421 while b == a {
422 b = usize::try_from(self.rng.next_bounded(len_u64)).unwrap_or(0);
423 }
424 self.reconcile_pair(a, b, synthetic_round, trace);
425 }
426 }
427
428 if !self.all_agents_converged() {
432 let synthetic_round = self
433 .config
434 .rounds
435 .saturating_add(1001)
436 .saturating_add(u64::from(self.config.reconciliation_rounds));
437 for a in 0..n.saturating_sub(1) {
439 self.reconcile_pair(a, a + 1, synthetic_round, trace);
440 }
441 for a in (0..n.saturating_sub(1)).rev() {
443 self.reconcile_pair(a + 1, a, synthetic_round, trace);
444 }
445 }
446 }
447
448 fn all_agents_converged(&self) -> bool {
449 let first = &self.agents[0].snapshot().known_events;
450 self.agents[1..]
451 .iter()
452 .all(|a| a.snapshot().known_events == *first)
453 }
454
455 fn reconcile_pair(&mut self, a: AgentId, b: AgentId, round: u64, trace: &mut Vec<TraceEvent>) {
456 let snap_a = self.agents[a].snapshot().known_events;
457 let snap_b = self.agents[b].snapshot().known_events;
458 let a_to_b: Vec<u64> = snap_a.difference(&snap_b).copied().collect();
459 let b_to_a: Vec<u64> = snap_b.difference(&snap_a).copied().collect();
460
461 let a_to_b_count = a_to_b.len();
462 let b_to_a_count = b_to_a.len();
463
464 for event_id in &a_to_b {
465 self.agents[b].observe_event(*event_id);
466 }
467 for event_id in &b_to_a {
468 self.agents[a].observe_event(*event_id);
469 }
470
471 trace.push(TraceEvent {
472 round,
473 kind: TraceEventKind::Reconcile {
474 agent_a: a,
475 agent_b: b,
476 a_to_b: a_to_b_count,
477 b_to_a: b_to_a_count,
478 },
479 });
480 }
481
482 fn pick_targets(&mut self, source: AgentId) -> Vec<AgentId> {
483 if self.agents.len() <= 1 {
484 return Vec::new();
485 }
486
487 let max_targets = self.config.fanout.min(self.agents.len().saturating_sub(1));
488 let mut targets = Vec::new();
489
490 while targets.len() < max_targets {
491 let len_u64 = u64::try_from(self.agents.len()).unwrap_or(1);
492 let candidate_u64 = self.rng.next_bounded(len_u64);
493 let candidate = usize::try_from(candidate_u64).unwrap_or(0);
494 if candidate != source && !targets.contains(&candidate) {
495 targets.push(candidate);
496 }
497 }
498
499 targets
500 }
501
502 fn maybe_toggle_partition(&mut self, round: u64, trace: &mut Vec<TraceEvent>) {
503 if !self
504 .rng
505 .hit_rate_percent(self.config.fault.partition_rate_percent)
506 {
507 return;
508 }
509
510 let len_u64 = u64::try_from(self.partitioned.len()).unwrap_or(1);
511 let idx_u64 = self.rng.next_bounded(len_u64);
512 let idx = usize::try_from(idx_u64).unwrap_or(0);
513
514 self.partitioned[idx] = !self.partitioned[idx];
515 self.network.set_partitioned(idx, self.partitioned[idx]);
516
517 trace.push(TraceEvent {
518 round,
519 kind: TraceEventKind::Partition {
520 agent: idx,
521 isolated: self.partitioned[idx],
522 },
523 });
524 }
525
526 fn progress_clock_freezes(&mut self, round: u64) {
527 for (idx, maybe_until) in self.clock_unfreeze_round.iter_mut().enumerate() {
528 if let Some(until_round) = *maybe_until
529 && round >= until_round
530 {
531 if let Some(clock) = self.clocks.get_mut(idx) {
532 clock.unfreeze();
533 }
534 *maybe_until = None;
535 }
536 }
537 }
538
539 fn maybe_freeze_clock(&mut self, round: u64, trace: &mut Vec<TraceEvent>) {
540 if self.config.fault.freeze_duration_rounds == 0 {
541 return;
542 }
543 if !self
544 .rng
545 .hit_rate_percent(self.config.fault.freeze_rate_percent)
546 {
547 return;
548 }
549
550 let len_u64 = u64::try_from(self.clocks.len()).unwrap_or(1);
551 let idx_u64 = self.rng.next_bounded(len_u64);
552 let idx = usize::try_from(idx_u64).unwrap_or(0);
553
554 if let Some(clock) = self.clocks.get_mut(idx)
555 && !clock.is_frozen()
556 {
557 clock.freeze(round);
558 let until_round =
559 round.saturating_add(u64::from(self.config.fault.freeze_duration_rounds));
560 self.clock_unfreeze_round[idx] = Some(until_round);
561 trace.push(TraceEvent {
562 round,
563 kind: TraceEventKind::ClockFreeze {
564 agent: idx,
565 until_round,
566 },
567 });
568 }
569 }
570}
571
572pub fn sometimes_reaches_interesting_state(base: &SimulationConfig, attempts: u32) -> Result<bool> {
578 let mut seed = base.seed;
579
580 for _ in 0..attempts {
581 let mut config = base.clone();
582 config.seed = seed;
583
584 let mut simulator = Simulator::new(config)?;
585 let result = simulator.run()?;
586 if result.interesting_state_reached {
587 return Ok(true);
588 }
589
590 seed = seed.saturating_add(1);
591 }
592
593 Ok(false)
594}
595
596fn sample_signed_i32(rng: &mut DeterministicRng, max_abs: i32) -> i32 {
597 if max_abs <= 0 {
598 return 0;
599 }
600
601 let span = i64::from(max_abs)
602 .saturating_mul(2)
603 .saturating_add(1)
604 .max(1);
605 let span_u64 = u64::try_from(span).unwrap_or(1);
606 let sampled = i64::try_from(rng.next_bounded(span_u64)).unwrap_or(0) - i64::from(max_abs);
607
608 i32::try_from(sampled).unwrap_or(0)
609}
610
611fn sample_signed_i64(rng: &mut DeterministicRng, max_abs: i64) -> i64 {
612 if max_abs <= 0 {
613 return 0;
614 }
615
616 let span = max_abs.saturating_mul(2).saturating_add(1).max(1);
617 let span_u64 = u64::try_from(span).unwrap_or(1);
618 let sampled = i64::try_from(rng.next_bounded(span_u64)).unwrap_or(0);
619
620 sampled.saturating_sub(max_abs)
621}
622
623#[cfg(test)]
624mod tests {
625 use crate::agent::AgentState;
626 use crate::oracle::ConvergenceOracle;
627
628 use super::{
629 FaultConfig, SeedReplay, SimulationConfig, Simulator, sometimes_reaches_interesting_state,
630 };
631
632 #[test]
633 fn same_seed_produces_identical_trace() {
634 let config = SimulationConfig {
635 seed: 7,
636 rounds: 16,
637 ..SimulationConfig::default()
638 };
639
640 let mut left = Simulator::new(config.clone()).expect("valid config");
641 let mut right = Simulator::new(config).expect("valid config");
642
643 let left_result = left.run().expect("run left");
644 let right_result = right.run().expect("run right");
645
646 assert_eq!(left_result.trace, right_result.trace);
647 assert_eq!(
648 left_result.trace_fingerprint(),
649 right_result.trace_fingerprint()
650 );
651 }
652
653 #[test]
654 fn seed_replay_reproduces_execution() {
655 let config = SimulationConfig {
656 seed: 1234,
657 rounds: 20,
658 ..SimulationConfig::default()
659 };
660
661 let mut sim = Simulator::new(config.clone()).expect("valid config");
662 let original = sim.run().expect("original run");
663
664 let replay = SeedReplay::from_config(&config);
665 let replayed = replay.replay().expect("replayed run");
666
667 assert_eq!(original.trace, replayed.trace);
668 assert_eq!(original.states, replayed.states);
669 }
670
671 #[test]
672 fn network_faults_are_observable() {
673 let config = SimulationConfig {
674 seed: 99,
675 rounds: 12,
676 fanout: 3,
677 fault: FaultConfig {
678 max_delay_rounds: 3,
679 drop_rate_percent: 40,
680 duplicate_rate_percent: 30,
681 reorder_rate_percent: 40,
682 partition_rate_percent: 30,
683 freeze_rate_percent: 30,
684 freeze_duration_rounds: 2,
685 },
686 ..SimulationConfig::default()
687 };
688
689 let mut simulator = Simulator::new(config).expect("valid config");
690 let result = simulator.run().expect("run");
691
692 assert!(result.interesting_state_reached);
693 }
694
695 #[test]
696 fn convergence_oracle_detects_divergence() {
697 let state_a = AgentState {
698 id: 0,
699 known_events: [1_u64, 2_u64, 3_u64].into_iter().collect(),
700 };
701 let state_b = AgentState {
702 id: 1,
703 known_events: [1_u64, 3_u64].into_iter().collect(),
704 };
705
706 let report = ConvergenceOracle::evaluate(&[state_a, state_b]);
707 assert!(!report.converged);
708 assert_eq!(report.divergent_agents, vec![1]);
709 }
710
711 #[test]
712 fn reconciliation_heals_dropped_messages() {
713 let config = SimulationConfig {
714 seed: 42,
715 agent_count: 5,
716 rounds: 24,
717 fanout: 2,
718 fault: FaultConfig {
719 max_delay_rounds: 3,
720 drop_rate_percent: 10,
721 duplicate_rate_percent: 5,
722 reorder_rate_percent: 10,
723 partition_rate_percent: 5,
724 freeze_rate_percent: 5,
725 freeze_duration_rounds: 2,
726 },
727 clock: Default::default(),
728 reconciliation_rounds: 3,
729 };
730
731 let mut simulator = Simulator::new(config).expect("valid config");
732 let result = simulator.run().expect("run");
733
734 let first = &result.states[0].known_events;
736 for state in &result.states[1..] {
737 assert_eq!(
738 &state.known_events, first,
739 "agent {} diverges after reconciliation",
740 state.id
741 );
742 }
743 }
744
745 #[test]
746 fn reconciliation_disabled_allows_divergence() {
747 let mut found_divergence = false;
750 for seed in 0..20 {
751 let config = SimulationConfig {
752 seed,
753 agent_count: 5,
754 rounds: 24,
755 fanout: 2,
756 fault: FaultConfig {
757 max_delay_rounds: 3,
758 drop_rate_percent: 10,
759 duplicate_rate_percent: 5,
760 reorder_rate_percent: 10,
761 partition_rate_percent: 5,
762 freeze_rate_percent: 5,
763 freeze_duration_rounds: 2,
764 },
765 clock: Default::default(),
766 reconciliation_rounds: 0,
767 };
768
769 let mut simulator = Simulator::new(config).expect("valid config");
770 let result = simulator.run().expect("run");
771
772 let first = &result.states[0].known_events;
773 if result.states[1..].iter().any(|s| &s.known_events != first) {
774 found_divergence = true;
775 break;
776 }
777 }
778 assert!(
779 found_divergence,
780 "expected at least one seed to diverge without reconciliation"
781 );
782 }
783
784 #[test]
785 fn sometimes_assertion_reaches_interesting_state() {
786 let base = SimulationConfig {
787 seed: 500,
788 rounds: 8,
789 fault: FaultConfig {
790 max_delay_rounds: 2,
791 drop_rate_percent: 20,
792 duplicate_rate_percent: 15,
793 reorder_rate_percent: 20,
794 partition_rate_percent: 15,
795 freeze_rate_percent: 15,
796 freeze_duration_rounds: 2,
797 },
798 ..SimulationConfig::default()
799 };
800
801 let seen = sometimes_reaches_interesting_state(&base, 12).expect("sometimes assertion");
802 assert!(seen);
803 }
804}