1#![forbid(unsafe_code)]
2pub mod agent;
5pub mod campaign;
6pub mod clock;
7pub mod network;
8pub mod oracle;
9pub mod rng;
10
11use anyhow::{Result, bail};
12use serde::{Deserialize, Serialize};
13
14use crate::agent::{AgentId, AgentState, SimulatedAgent};
15use crate::clock::{ClockConfig, ClockSpec, SimulatedClock};
16use crate::network::{FaultConfig, NetworkMessage, SimulatedNetwork};
17use crate::oracle::{ConvergenceOracle, ConvergenceReport};
18use crate::rng::DeterministicRng;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22pub enum DropReason {
23 RandomLoss,
25 Partition,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub struct TraceEvent {
32 pub round: u64,
34 pub kind: TraceEventKind,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub enum TraceEventKind {
41 Emit {
43 agent: AgentId,
45 event_id: u64,
47 seq: u64,
49 },
50 Send {
52 from: AgentId,
54 to: AgentId,
56 event_id: u64,
58 delay_rounds: u8,
60 duplicated: bool,
62 },
63 Drop {
65 from: AgentId,
67 to: AgentId,
69 event_id: u64,
71 reason: DropReason,
73 },
74 Deliver {
76 from: AgentId,
78 to: AgentId,
80 event_id: u64,
82 },
83 Reorder {
85 delivered_count: usize,
87 },
88 Partition {
90 agent: AgentId,
92 isolated: bool,
94 },
95 ClockFreeze {
97 agent: AgentId,
99 until_round: u64,
101 },
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106pub struct SimulationConfig {
107 pub seed: u64,
109 pub agent_count: usize,
111 pub rounds: u64,
113 pub fanout: usize,
115 pub fault: FaultConfig,
117 pub clock: ClockConfig,
119}
120
121impl Default for SimulationConfig {
122 fn default() -> Self {
123 Self {
124 seed: 42,
125 agent_count: 4,
126 rounds: 24,
127 fanout: 2,
128 fault: FaultConfig::default(),
129 clock: ClockConfig::default(),
130 }
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
136pub struct SeedReplay {
137 pub config: SimulationConfig,
139}
140
141impl SeedReplay {
142 #[must_use]
144 pub fn from_config(config: &SimulationConfig) -> Self {
145 Self {
146 config: config.clone(),
147 }
148 }
149
150 pub fn replay(&self) -> Result<SimulationResult> {
156 let mut simulator = Simulator::new(self.config.clone())?;
157 simulator.run()
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct SimulationResult {
164 pub trace: Vec<TraceEvent>,
166 pub states: Vec<AgentState>,
168 pub convergence: ConvergenceReport,
170 pub interesting_state_reached: bool,
172}
173
174impl SimulationResult {
175 #[must_use]
177 pub fn trace_fingerprint(&self) -> u64 {
178 self.trace.iter().fold(0_u64, |acc, item| {
179 let encoded = format!("{item:?}");
180 encoded.as_bytes().iter().fold(acc, |inner, byte| {
181 inner.wrapping_mul(131).wrapping_add(u64::from(*byte))
182 })
183 })
184 }
185}
186
187pub struct Simulator {
189 config: SimulationConfig,
190 agents: Vec<SimulatedAgent>,
191 clocks: Vec<SimulatedClock>,
192 clock_unfreeze_round: Vec<Option<u64>>,
193 network: SimulatedNetwork,
194 partitioned: Vec<bool>,
195 rng: DeterministicRng,
196}
197
198impl Simulator {
199 pub fn new(config: SimulationConfig) -> Result<Self> {
205 if config.agent_count == 0 {
206 bail!("agent_count must be > 0");
207 }
208 if config.rounds == 0 {
209 bail!("rounds must be > 0");
210 }
211
212 let mut rng = DeterministicRng::new(config.seed);
213 let mut clocks = Vec::with_capacity(config.agent_count);
214
215 for _ in 0..config.agent_count {
216 let drift_ppm = sample_signed_i32(&mut rng, config.clock.max_abs_drift_ppm);
217 let skew_millis = sample_signed_i64(&mut rng, config.clock.max_abs_skew_millis);
218 clocks.push(SimulatedClock::new(ClockSpec {
219 base_millis: config.clock.base_millis,
220 tick_millis: config.clock.tick_millis,
221 drift_ppm,
222 skew_millis,
223 }));
224 }
225
226 let agents = (0..config.agent_count)
227 .map(SimulatedAgent::new)
228 .collect::<Vec<_>>();
229
230 Ok(Self {
231 clock_unfreeze_round: vec![None; config.agent_count],
232 partitioned: vec![false; config.agent_count],
233 network: SimulatedNetwork::new(config.fault),
234 config,
235 agents,
236 clocks,
237 rng,
238 })
239 }
240
241 pub fn run(&mut self) -> Result<SimulationResult> {
247 let mut trace = Vec::new();
248
249 for round in 0..self.config.rounds {
250 self.progress_clock_freezes(round);
251 self.maybe_toggle_partition(round, &mut trace);
252 self.maybe_freeze_clock(round, &mut trace);
253
254 for agent_idx in 0..self.agents.len() {
255 let emitted = self.agents[agent_idx].emit_event();
256 trace.push(TraceEvent {
257 round,
258 kind: TraceEventKind::Emit {
259 agent: emitted.source,
260 event_id: emitted.event_id,
261 seq: emitted.seq,
262 },
263 });
264
265 let targets = self.pick_targets(agent_idx);
266 for target in targets {
267 let message = NetworkMessage {
268 from: agent_idx,
269 to: target,
270 event_id: emitted.event_id,
271 seq: emitted.seq,
272 };
273
274 let partition_blocked = self.network.is_partitioned(message.from)
275 || self.network.is_partitioned(message.to);
276
277 let outcome = self.network.send(message, round, &mut self.rng);
278
279 if outcome.dropped {
280 trace.push(TraceEvent {
281 round,
282 kind: TraceEventKind::Drop {
283 from: message.from,
284 to: message.to,
285 event_id: message.event_id,
286 reason: if partition_blocked {
287 DropReason::Partition
288 } else {
289 DropReason::RandomLoss
290 },
291 },
292 });
293 } else {
294 trace.push(TraceEvent {
295 round,
296 kind: TraceEventKind::Send {
297 from: message.from,
298 to: message.to,
299 event_id: message.event_id,
300 delay_rounds: outcome.delay_rounds,
301 duplicated: outcome.duplicated,
302 },
303 });
304 }
305 }
306 }
307
308 self.deliver_round(round, &mut trace);
309 }
310
311 self.final_drain(&mut trace);
312
313 let states = self
314 .agents
315 .iter()
316 .map(SimulatedAgent::snapshot)
317 .collect::<Vec<_>>();
318
319 let convergence = ConvergenceOracle::evaluate(&states);
320 let interesting_state_reached = trace.iter().any(|event| {
321 matches!(
322 event.kind,
323 TraceEventKind::Drop { .. }
324 | TraceEventKind::Reorder { .. }
325 | TraceEventKind::Partition { .. }
326 | TraceEventKind::ClockFreeze { .. }
327 )
328 });
329
330 Ok(SimulationResult {
331 trace,
332 states,
333 convergence,
334 interesting_state_reached,
335 })
336 }
337
338 fn deliver_round(&mut self, round: u64, trace: &mut Vec<TraceEvent>) {
339 let delivered = self.network.deliver_ready(round, &mut self.rng);
340
341 if delivered.reordered {
342 trace.push(TraceEvent {
343 round,
344 kind: TraceEventKind::Reorder {
345 delivered_count: delivered.delivered.len(),
346 },
347 });
348 }
349
350 for message in delivered.delivered {
351 if let Some(agent) = self.agents.get_mut(message.to) {
352 agent.observe_event(message.event_id);
353 }
354 trace.push(TraceEvent {
355 round,
356 kind: TraceEventKind::Deliver {
357 from: message.from,
358 to: message.to,
359 event_id: message.event_id,
360 },
361 });
362 }
363 }
364
365 fn final_drain(&mut self, trace: &mut Vec<TraceEvent>) {
366 let mut drain_round = self.config.rounds;
367 let drain_limit = self.config.rounds.saturating_add(1_000);
368
369 while self.network.pending_len() > 0 && drain_round < drain_limit {
370 self.deliver_round(drain_round, trace);
371 drain_round = drain_round.saturating_add(1);
372 }
373 }
374
375 fn pick_targets(&mut self, source: AgentId) -> Vec<AgentId> {
376 if self.agents.len() <= 1 {
377 return Vec::new();
378 }
379
380 let max_targets = self.config.fanout.min(self.agents.len().saturating_sub(1));
381 let mut targets = Vec::new();
382
383 while targets.len() < max_targets {
384 let len_u64 = u64::try_from(self.agents.len()).unwrap_or(1);
385 let candidate_u64 = self.rng.next_bounded(len_u64);
386 let candidate = usize::try_from(candidate_u64).unwrap_or(0);
387 if candidate != source && !targets.contains(&candidate) {
388 targets.push(candidate);
389 }
390 }
391
392 targets
393 }
394
395 fn maybe_toggle_partition(&mut self, round: u64, trace: &mut Vec<TraceEvent>) {
396 if !self
397 .rng
398 .hit_rate_percent(self.config.fault.partition_rate_percent)
399 {
400 return;
401 }
402
403 let len_u64 = u64::try_from(self.partitioned.len()).unwrap_or(1);
404 let idx_u64 = self.rng.next_bounded(len_u64);
405 let idx = usize::try_from(idx_u64).unwrap_or(0);
406
407 self.partitioned[idx] = !self.partitioned[idx];
408 self.network.set_partitioned(idx, self.partitioned[idx]);
409
410 trace.push(TraceEvent {
411 round,
412 kind: TraceEventKind::Partition {
413 agent: idx,
414 isolated: self.partitioned[idx],
415 },
416 });
417 }
418
419 fn progress_clock_freezes(&mut self, round: u64) {
420 for (idx, maybe_until) in self.clock_unfreeze_round.iter_mut().enumerate() {
421 if let Some(until_round) = *maybe_until
422 && round >= until_round
423 {
424 if let Some(clock) = self.clocks.get_mut(idx) {
425 clock.unfreeze();
426 }
427 *maybe_until = None;
428 }
429 }
430 }
431
432 fn maybe_freeze_clock(&mut self, round: u64, trace: &mut Vec<TraceEvent>) {
433 if self.config.fault.freeze_duration_rounds == 0 {
434 return;
435 }
436 if !self
437 .rng
438 .hit_rate_percent(self.config.fault.freeze_rate_percent)
439 {
440 return;
441 }
442
443 let len_u64 = u64::try_from(self.clocks.len()).unwrap_or(1);
444 let idx_u64 = self.rng.next_bounded(len_u64);
445 let idx = usize::try_from(idx_u64).unwrap_or(0);
446
447 if let Some(clock) = self.clocks.get_mut(idx)
448 && !clock.is_frozen()
449 {
450 clock.freeze(round);
451 let until_round =
452 round.saturating_add(u64::from(self.config.fault.freeze_duration_rounds));
453 self.clock_unfreeze_round[idx] = Some(until_round);
454 trace.push(TraceEvent {
455 round,
456 kind: TraceEventKind::ClockFreeze {
457 agent: idx,
458 until_round,
459 },
460 });
461 }
462 }
463}
464
465pub fn sometimes_reaches_interesting_state(base: &SimulationConfig, attempts: u32) -> Result<bool> {
471 let mut seed = base.seed;
472
473 for _ in 0..attempts {
474 let mut config = base.clone();
475 config.seed = seed;
476
477 let mut simulator = Simulator::new(config)?;
478 let result = simulator.run()?;
479 if result.interesting_state_reached {
480 return Ok(true);
481 }
482
483 seed = seed.saturating_add(1);
484 }
485
486 Ok(false)
487}
488
489fn sample_signed_i32(rng: &mut DeterministicRng, max_abs: i32) -> i32 {
490 if max_abs <= 0 {
491 return 0;
492 }
493
494 let span = i64::from(max_abs)
495 .saturating_mul(2)
496 .saturating_add(1)
497 .max(1);
498 let span_u64 = u64::try_from(span).unwrap_or(1);
499 let sampled = i64::try_from(rng.next_bounded(span_u64)).unwrap_or(0) - i64::from(max_abs);
500
501 i32::try_from(sampled).unwrap_or(0)
502}
503
504fn sample_signed_i64(rng: &mut DeterministicRng, max_abs: i64) -> i64 {
505 if max_abs <= 0 {
506 return 0;
507 }
508
509 let span = max_abs.saturating_mul(2).saturating_add(1).max(1);
510 let span_u64 = u64::try_from(span).unwrap_or(1);
511 let sampled = i64::try_from(rng.next_bounded(span_u64)).unwrap_or(0);
512
513 sampled.saturating_sub(max_abs)
514}
515
516#[cfg(test)]
517mod tests {
518 use crate::agent::AgentState;
519 use crate::oracle::ConvergenceOracle;
520
521 use super::{
522 FaultConfig, SeedReplay, SimulationConfig, Simulator, sometimes_reaches_interesting_state,
523 };
524
525 #[test]
526 fn same_seed_produces_identical_trace() {
527 let config = SimulationConfig {
528 seed: 7,
529 rounds: 16,
530 ..SimulationConfig::default()
531 };
532
533 let mut left = Simulator::new(config.clone()).expect("valid config");
534 let mut right = Simulator::new(config).expect("valid config");
535
536 let left_result = left.run().expect("run left");
537 let right_result = right.run().expect("run right");
538
539 assert_eq!(left_result.trace, right_result.trace);
540 assert_eq!(
541 left_result.trace_fingerprint(),
542 right_result.trace_fingerprint()
543 );
544 }
545
546 #[test]
547 fn seed_replay_reproduces_execution() {
548 let config = SimulationConfig {
549 seed: 1234,
550 rounds: 20,
551 ..SimulationConfig::default()
552 };
553
554 let mut sim = Simulator::new(config.clone()).expect("valid config");
555 let original = sim.run().expect("original run");
556
557 let replay = SeedReplay::from_config(&config);
558 let replayed = replay.replay().expect("replayed run");
559
560 assert_eq!(original.trace, replayed.trace);
561 assert_eq!(original.states, replayed.states);
562 }
563
564 #[test]
565 fn network_faults_are_observable() {
566 let config = SimulationConfig {
567 seed: 99,
568 rounds: 12,
569 fanout: 3,
570 fault: FaultConfig {
571 max_delay_rounds: 3,
572 drop_rate_percent: 40,
573 duplicate_rate_percent: 30,
574 reorder_rate_percent: 40,
575 partition_rate_percent: 30,
576 freeze_rate_percent: 30,
577 freeze_duration_rounds: 2,
578 },
579 ..SimulationConfig::default()
580 };
581
582 let mut simulator = Simulator::new(config).expect("valid config");
583 let result = simulator.run().expect("run");
584
585 assert!(result.interesting_state_reached);
586 }
587
588 #[test]
589 fn convergence_oracle_detects_divergence() {
590 let state_a = AgentState {
591 id: 0,
592 known_events: [1_u64, 2_u64, 3_u64].into_iter().collect(),
593 };
594 let state_b = AgentState {
595 id: 1,
596 known_events: [1_u64, 3_u64].into_iter().collect(),
597 };
598
599 let report = ConvergenceOracle::evaluate(&[state_a, state_b]);
600 assert!(!report.converged);
601 assert_eq!(report.divergent_agents, vec![1]);
602 }
603
604 #[test]
605 fn sometimes_assertion_reaches_interesting_state() {
606 let base = SimulationConfig {
607 seed: 500,
608 rounds: 8,
609 fault: FaultConfig {
610 max_delay_rounds: 2,
611 drop_rate_percent: 20,
612 duplicate_rate_percent: 15,
613 reorder_rate_percent: 20,
614 partition_rate_percent: 15,
615 freeze_rate_percent: 15,
616 freeze_duration_rounds: 2,
617 },
618 ..SimulationConfig::default()
619 };
620
621 let seen = sometimes_reaches_interesting_state(&base, 12).expect("sometimes assertion");
622 assert!(seen);
623 }
624}