moonpool_sim/runner/fault_injector.rs
1//! Fault injection for simulation chaos testing.
2//!
3//! [`FaultInjector`] defines fault injection strategies (partitions, connection drops, etc.)
4//! that run during the chaos phase of a simulation. [`FaultContext`] provides access to
5//! `SimWorld` fault injection primitives.
6//!
7//! [`PhaseConfig`] controls the two-phase chaos/recovery lifecycle:
8//! - **Chaos phase**: Workloads + fault injectors run concurrently
9//! - **Recovery phase**: Fault injectors stopped, workloads continue, system heals
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use moonpool_sim::{FaultInjector, FaultContext, PhaseConfig, SimulationResult};
15//! use std::time::Duration;
16//!
17//! struct RandomPartition { probability: f64 }
18//!
19//! #[async_trait(?Send)]
20//! impl FaultInjector for RandomPartition {
21//! fn name(&self) -> &str { "random_partition" }
22//! async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
23//! let ips = ctx.all_ips();
24//! while !ctx.chaos_shutdown().is_cancelled() {
25//! if ctx.random().random_bool(self.probability) && ips.len() >= 2 {
26//! ctx.partition(&ips[0], &ips[1])?;
27//! ctx.time().sleep(Duration::from_secs(5)).await?;
28//! ctx.heal_partition(&ips[0], &ips[1])?;
29//! }
30//! ctx.time().sleep(Duration::from_secs(1)).await?;
31//! }
32//! Ok(())
33//! }
34//! }
35//! ```
36
37use std::time::Duration;
38
39use async_trait::async_trait;
40
41use crate::SimulationResult;
42use crate::providers::{SimRandomProvider, SimTimeProvider};
43use crate::sim::SimWorld;
44
45/// Context for fault injectors — gives access to SimWorld fault injection methods.
46///
47/// Unlike `SimContext` (which workloads receive), `FaultContext` provides direct
48/// access to network partitioning and other fault primitives that normal workloads
49/// should not use.
50pub struct FaultContext {
51 sim: SimWorld,
52 all_ips: Vec<String>,
53 random: SimRandomProvider,
54 time: SimTimeProvider,
55 chaos_shutdown: tokio_util::sync::CancellationToken,
56}
57
58impl FaultContext {
59 /// Create a new fault context.
60 pub fn new(
61 sim: SimWorld,
62 all_ips: Vec<String>,
63 random: SimRandomProvider,
64 time: SimTimeProvider,
65 chaos_shutdown: tokio_util::sync::CancellationToken,
66 ) -> Self {
67 Self {
68 sim,
69 all_ips,
70 random,
71 time,
72 chaos_shutdown,
73 }
74 }
75
76 /// Create a bidirectional network partition between two IPs.
77 ///
78 /// The partition persists until [`heal_partition`](Self::heal_partition) is called.
79 pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
80 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
81 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
82 })?;
83 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
84 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
85 })?;
86 // Use a long duration — heal_partition is the expected way to undo
87 self.sim
88 .partition_pair(a_ip, b_ip, Duration::from_secs(3600))
89 }
90
91 /// Remove a network partition between two IPs.
92 pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
93 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
94 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
95 })?;
96 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
97 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
98 })?;
99 self.sim.restore_partition(a_ip, b_ip)
100 }
101
102 /// Check whether two IPs are partitioned.
103 pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
104 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
105 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
106 })?;
107 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
108 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
109 })?;
110 self.sim.is_partitioned(a_ip, b_ip)
111 }
112
113 /// Get all IP addresses in the simulation.
114 pub fn all_ips(&self) -> &[String] {
115 &self.all_ips
116 }
117
118 /// Get the seeded random provider.
119 pub fn random(&self) -> &SimRandomProvider {
120 &self.random
121 }
122
123 /// Get the simulated time provider.
124 pub fn time(&self) -> &SimTimeProvider {
125 &self.time
126 }
127
128 /// Get the chaos-phase shutdown token.
129 ///
130 /// This token is cancelled at the chaos→recovery boundary,
131 /// signaling fault injectors to stop.
132 pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
133 &self.chaos_shutdown
134 }
135}
136
137/// A fault injector that introduces failures during the chaos phase.
138///
139/// Fault injectors run concurrently with workloads during the chaos phase.
140/// When `PhaseConfig` is used, they are signaled to stop via
141/// `ctx.chaos_shutdown()` at the chaos→recovery boundary.
142#[async_trait(?Send)]
143pub trait FaultInjector: 'static {
144 /// Name of this fault injector for reporting.
145 fn name(&self) -> &str;
146
147 /// Inject faults using the provided context.
148 ///
149 /// Should respect `ctx.chaos_shutdown()` to allow graceful termination.
150 async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
151}
152
153/// Two-phase simulation configuration.
154///
155/// Controls the TigerBeetle VOPR-style chaos/recovery lifecycle:
156/// 1. **Chaos phase** (`chaos_duration`): Workloads + fault injectors run concurrently.
157/// Invariants are checked after every simulation event.
158/// 2. **Recovery phase** (`recovery_duration`): Fault injectors stopped, workloads
159/// continue, system heals. Verifies convergence after faults cease.
160#[derive(Debug, Clone)]
161pub struct PhaseConfig {
162 /// Duration of the chaos phase (faults + workloads run concurrently).
163 pub chaos_duration: Duration,
164 /// Duration of the recovery phase (faults stopped, workloads continue).
165 pub recovery_duration: Duration,
166}