moonpool_sim/runner/fault_injector.rs
1//! Fault injection for simulation chaos testing.
2//!
3//! [`FaultInjector`] defines fault injection strategies (partitions, connection drops, etc.)
4//! that run during the chaos phase of a simulation. [`FaultContext`] provides access to
5//! `SimWorld` fault injection primitives.
6//!
7//! [`PhaseConfig`] controls the two-phase chaos/recovery lifecycle:
8//! - **Chaos phase**: Workloads + fault injectors run concurrently
9//! - **Recovery phase**: Fault injectors stopped, workloads continue, system heals
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use moonpool_sim::{FaultInjector, FaultContext, PhaseConfig, SimulationResult};
15//! use std::time::Duration;
16//!
17//! struct RandomPartition { probability: f64 }
18//!
19//! #[async_trait(?Send)]
20//! impl FaultInjector for RandomPartition {
21//! fn name(&self) -> &str { "random_partition" }
22//! async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
23//! let ips = ctx.all_ips();
24//! while !ctx.chaos_shutdown().is_cancelled() {
25//! if ctx.random().random_bool(self.probability) && ips.len() >= 2 {
26//! ctx.partition(&ips[0], &ips[1])?;
27//! ctx.time().sleep(Duration::from_secs(5)).await?;
28//! ctx.heal_partition(&ips[0], &ips[1])?;
29//! }
30//! ctx.time().sleep(Duration::from_secs(1)).await?;
31//! }
32//! Ok(())
33//! }
34//! }
35//! ```
36
37use std::time::Duration;
38
39use async_trait::async_trait;
40use moonpool_core::TimeProvider;
41
42use crate::SimulationResult;
43use crate::providers::{SimRandomProvider, SimTimeProvider};
44use crate::runner::process::RebootKind;
45use crate::runner::tags::TagRegistry;
46use crate::sim::SimWorld;
47use crate::{assert_reachable, assert_sometimes_each};
48
49/// Process-related state for fault injection targeting.
50pub struct ProcessInfo {
51 /// Server process IP addresses.
52 pub process_ips: Vec<String>,
53 /// Tag registry mapping process IPs to their resolved tags.
54 pub tag_registry: TagRegistry,
55 /// Shared count of currently dead (killed but not yet restarted) processes.
56 pub dead_count: std::rc::Rc<std::cell::Cell<usize>>,
57}
58
59/// Context for fault injectors — gives access to SimWorld fault injection methods.
60///
61/// Unlike `SimContext` (which workloads receive), `FaultContext` provides direct
62/// access to network partitioning, reboot, and other fault primitives that normal
63/// workloads should not use.
64pub struct FaultContext {
65 sim: SimWorld,
66 all_ips: Vec<String>,
67 process_info: ProcessInfo,
68 random: SimRandomProvider,
69 time: SimTimeProvider,
70 chaos_shutdown: tokio_util::sync::CancellationToken,
71}
72
73impl FaultContext {
74 /// Create a new fault context with process information.
75 pub fn new(
76 sim: SimWorld,
77 all_ips: Vec<String>,
78 process_info: ProcessInfo,
79 random: SimRandomProvider,
80 time: SimTimeProvider,
81 chaos_shutdown: tokio_util::sync::CancellationToken,
82 ) -> Self {
83 Self {
84 sim,
85 all_ips,
86 process_info,
87 random,
88 time,
89 chaos_shutdown,
90 }
91 }
92
93 /// Get the number of currently dead (killed but not yet restarted) processes.
94 pub fn dead_count(&self) -> usize {
95 self.process_info.dead_count.get()
96 }
97
98 /// Create a bidirectional network partition between two IPs.
99 ///
100 /// The partition persists until [`heal_partition`](Self::heal_partition) is called.
101 pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
102 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
103 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
104 })?;
105 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
106 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
107 })?;
108 // Use a long duration — heal_partition is the expected way to undo
109 self.sim
110 .partition_pair(a_ip, b_ip, Duration::from_secs(3600))
111 }
112
113 /// Remove a network partition between two IPs.
114 pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
115 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
116 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
117 })?;
118 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
119 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
120 })?;
121 self.sim.restore_partition(a_ip, b_ip)
122 }
123
124 /// Check whether two IPs are partitioned.
125 pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
126 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
127 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
128 })?;
129 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
130 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
131 })?;
132 self.sim.is_partitioned(a_ip, b_ip)
133 }
134
135 /// Get all IP addresses in the simulation.
136 pub fn all_ips(&self) -> &[String] {
137 &self.all_ips
138 }
139
140 /// Get the seeded random provider.
141 pub fn random(&self) -> &SimRandomProvider {
142 &self.random
143 }
144
145 /// Get the simulated time provider.
146 pub fn time(&self) -> &SimTimeProvider {
147 &self.time
148 }
149
150 /// Get the chaos-phase shutdown token.
151 ///
152 /// This token is cancelled at the chaos→recovery boundary,
153 /// signaling fault injectors to stop.
154 pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
155 &self.chaos_shutdown
156 }
157
158 /// Get all server process IPs.
159 pub fn process_ips(&self) -> &[String] {
160 &self.process_info.process_ips
161 }
162
163 /// Reboot a specific process by IP.
164 ///
165 /// For [`RebootKind::Graceful`]: schedules a `ProcessGracefulShutdown` event.
166 /// The orchestrator cancels the per-process shutdown token, giving the process
167 /// a grace period to drain buffers and clean up. After the grace period,
168 /// a force-kill aborts the task and connections, then schedules restart.
169 ///
170 /// For [`RebootKind::Crash`] and [`RebootKind::CrashAndWipe`]: immediately
171 /// aborts all connections and schedules a `ProcessRestart` event.
172 pub fn reboot(&self, ip: &str, kind: RebootKind) -> SimulationResult<()> {
173 let recovery_range = 1000..10000;
174 let grace_range = 2000..5000;
175 self.reboot_with_delays(ip, kind, &recovery_range, &grace_range)
176 }
177
178 /// Reboot a process with custom delay ranges.
179 ///
180 /// Like [`reboot`](Self::reboot) but with configurable recovery delay and
181 /// grace period ranges (in milliseconds). Used by [`AttritionInjector`] to
182 /// pass through [`Attrition`](super::process::Attrition) configuration.
183 pub fn reboot_with_delays(
184 &self,
185 ip: &str,
186 kind: RebootKind,
187 recovery_delay_range_ms: &std::ops::Range<usize>,
188 grace_period_range_ms: &std::ops::Range<usize>,
189 ) -> SimulationResult<()> {
190 let ip_addr: std::net::IpAddr = ip.parse().map_err(|e| {
191 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", ip, e))
192 })?;
193
194 match kind {
195 RebootKind::Graceful => {
196 assert_reachable!("reboot: graceful path");
197 let grace_ms = crate::sim::sim_random_range(grace_period_range_ms.clone()) as u64;
198 let recovery_ms =
199 crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
200 self.sim.schedule_event(
201 crate::sim::Event::ProcessGracefulShutdown {
202 ip: ip_addr,
203 grace_period_ms: grace_ms,
204 recovery_delay_ms: recovery_ms,
205 },
206 Duration::from_nanos(1),
207 );
208 tracing::info!(
209 "Initiated graceful reboot for process at IP {} (grace={}ms, recovery={}ms)",
210 ip,
211 grace_ms,
212 recovery_ms
213 );
214 }
215 RebootKind::Crash | RebootKind::CrashAndWipe => {
216 assert_reachable!("reboot: crash path");
217 self.sim.abort_all_connections_for_ip(ip_addr);
218 self.process_info
219 .dead_count
220 .set(self.process_info.dead_count.get() + 1);
221 let delay_ms = crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
222 let recovery_delay = Duration::from_millis(delay_ms);
223 self.sim.schedule_process_restart(ip_addr, recovery_delay);
224 tracing::info!(
225 "Crashed process at IP {} (recovery in {:?})",
226 ip,
227 recovery_delay
228 );
229 }
230 }
231
232 Ok(())
233 }
234
235 /// Reboot a random alive server process.
236 ///
237 /// Picks a random process from the process IP list and reboots it.
238 /// Returns `Ok(None)` if no processes are available.
239 pub fn reboot_random(&self, kind: RebootKind) -> SimulationResult<Option<String>> {
240 if self.process_info.process_ips.is_empty() {
241 return Ok(None);
242 }
243 let idx = crate::sim::sim_random_range(0..self.process_info.process_ips.len());
244 let ip = self.process_info.process_ips[idx].clone();
245 self.reboot(&ip, kind)?;
246 Ok(Some(ip))
247 }
248
249 /// Reboot all processes matching a tag key=value pair.
250 pub fn reboot_tagged(
251 &self,
252 key: &str,
253 value: &str,
254 kind: RebootKind,
255 ) -> SimulationResult<Vec<String>> {
256 let matching_ips: Vec<String> = self
257 .process_info
258 .tag_registry
259 .ips_tagged(key, value)
260 .into_iter()
261 .map(|ip| ip.to_string())
262 .collect();
263
264 for ip in &matching_ips {
265 self.reboot(ip, kind)?;
266 }
267
268 Ok(matching_ips)
269 }
270}
271
272/// A fault injector that introduces failures during the chaos phase.
273///
274/// Fault injectors run concurrently with workloads during the chaos phase.
275/// When `PhaseConfig` is used, they are signaled to stop via
276/// `ctx.chaos_shutdown()` at the chaos→recovery boundary.
277#[async_trait(?Send)]
278pub trait FaultInjector: 'static {
279 /// Name of this fault injector for reporting.
280 fn name(&self) -> &str;
281
282 /// Inject faults using the provided context.
283 ///
284 /// Should respect `ctx.chaos_shutdown()` to allow graceful termination.
285 async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
286}
287
288/// Two-phase simulation configuration.
289///
290/// Controls the TigerBeetle VOPR-style chaos/recovery lifecycle:
291/// 1. **Chaos phase** (`chaos_duration`): Workloads + fault injectors run concurrently.
292/// Invariants are checked after every simulation event.
293/// 2. **Recovery phase** (`recovery_duration`): Fault injectors stopped, workloads
294/// continue, system heals. Verifies convergence after faults cease.
295#[derive(Debug, Clone, PartialEq)]
296pub struct PhaseConfig {
297 /// Duration of the chaos phase (faults + workloads run concurrently).
298 pub chaos_duration: Duration,
299 /// Duration of the recovery phase (faults stopped, workloads continue).
300 pub recovery_duration: Duration,
301}
302
303/// Built-in fault injector that randomly reboots server processes.
304///
305/// Active only during the chaos phase. Respects `max_dead` to limit the
306/// number of simultaneously dead processes. The reboot type is chosen by
307/// weighted probability from the [`Attrition`](super::process::Attrition) config.
308pub(crate) struct AttritionInjector {
309 config: super::process::Attrition,
310}
311
312impl AttritionInjector {
313 /// Create a new attrition injector from the given configuration.
314 pub(crate) fn new(config: super::process::Attrition) -> Self {
315 Self { config }
316 }
317}
318
319#[async_trait(?Send)]
320impl FaultInjector for AttritionInjector {
321 fn name(&self) -> &str {
322 "attrition"
323 }
324
325 async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
326 while !ctx.chaos_shutdown().is_cancelled() {
327 // Random delay between reboot attempts (1-5 seconds)
328 let delay_ms = crate::sim::sim_random_range(1000..5000);
329 ctx.time()
330 .sleep(Duration::from_millis(delay_ms as u64))
331 .await
332 .map_err(|e| {
333 crate::SimulationError::InvalidState(format!("sleep failed: {}", e))
334 })?;
335
336 if ctx.chaos_shutdown().is_cancelled() {
337 break;
338 }
339
340 if ctx.process_ips().is_empty() {
341 continue;
342 }
343
344 // Respect max_dead: skip this cycle if already at the limit
345 if ctx.dead_count() >= self.config.max_dead {
346 assert_reachable!("attrition: max_dead limit enforced");
347 continue;
348 }
349
350 // Choose reboot kind by weighted probability
351 let rand_val = crate::sim::sim_random_range(0..10000) as f64 / 10000.0;
352 let kind = self.config.choose_kind(rand_val);
353 assert_sometimes_each!("attrition_reboot_kind", [("kind", kind as i64)]);
354
355 // Use configured delay ranges (or defaults)
356 let recovery_range = self.config.recovery_delay_ms.clone().unwrap_or(1000..10000);
357 let grace_range = self.config.grace_period_ms.clone().unwrap_or(2000..5000);
358
359 if ctx.process_ips().is_empty() {
360 continue;
361 }
362 let idx = crate::sim::sim_random_range(0..ctx.process_ips().len());
363 let ip = ctx.process_ips()[idx].to_string();
364 assert_sometimes_each!("attrition_process_targeted", [("process_idx", idx as i64)]);
365 ctx.reboot_with_delays(&ip, kind, &recovery_range, &grace_range)?;
366 }
367 Ok(())
368 }
369}