Skip to main content

moonpool_sim/runner/
fault_injector.rs

1//! Fault injection for simulation chaos testing.
2//!
3//! [`FaultInjector`] defines fault injection strategies (partitions, connection drops, etc.)
4//! that run during the chaos phase of a simulation. [`FaultContext`] provides access to
5//! `SimWorld` fault injection primitives.
6//!
7//! [`PhaseConfig`] controls the two-phase chaos/recovery lifecycle:
8//! - **Chaos phase**: Workloads + fault injectors run concurrently
9//! - **Recovery phase**: Fault injectors stopped, workloads continue, system heals
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use moonpool_sim::{FaultInjector, FaultContext, PhaseConfig, SimulationResult};
15//! use std::time::Duration;
16//!
17//! struct RandomPartition { probability: f64 }
18//!
19//! #[async_trait(?Send)]
20//! impl FaultInjector for RandomPartition {
21//!     fn name(&self) -> &str { "random_partition" }
22//!     async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
23//!         let ips = ctx.all_ips();
24//!         while !ctx.chaos_shutdown().is_cancelled() {
25//!             if ctx.random().random_bool(self.probability) && ips.len() >= 2 {
26//!                 ctx.partition(&ips[0], &ips[1])?;
27//!                 ctx.time().sleep(Duration::from_secs(5)).await?;
28//!                 ctx.heal_partition(&ips[0], &ips[1])?;
29//!             }
30//!             ctx.time().sleep(Duration::from_secs(1)).await?;
31//!         }
32//!         Ok(())
33//!     }
34//! }
35//! ```
36
37use std::time::Duration;
38
39use async_trait::async_trait;
40use moonpool_core::TimeProvider;
41
42use crate::SimulationResult;
43use crate::providers::{SimRandomProvider, SimTimeProvider};
44use crate::runner::process::RebootKind;
45use crate::runner::tags::TagRegistry;
46use crate::sim::SimWorld;
47use crate::{assert_reachable, assert_sometimes_each};
48
49/// Process-related state for fault injection targeting.
50pub struct ProcessInfo {
51    /// Server process IP addresses.
52    pub process_ips: Vec<String>,
53    /// Tag registry mapping process IPs to their resolved tags.
54    pub tag_registry: TagRegistry,
55    /// Shared count of currently dead (killed but not yet restarted) processes.
56    pub dead_count: std::rc::Rc<std::cell::Cell<usize>>,
57}
58
59/// Context for fault injectors — gives access to SimWorld fault injection methods.
60///
61/// Unlike `SimContext` (which workloads receive), `FaultContext` provides direct
62/// access to network partitioning, reboot, and other fault primitives that normal
63/// workloads should not use.
64pub struct FaultContext {
65    sim: SimWorld,
66    all_ips: Vec<String>,
67    process_info: ProcessInfo,
68    random: SimRandomProvider,
69    time: SimTimeProvider,
70    chaos_shutdown: tokio_util::sync::CancellationToken,
71}
72
73impl FaultContext {
74    /// Create a new fault context with process information.
75    pub fn new(
76        sim: SimWorld,
77        all_ips: Vec<String>,
78        process_info: ProcessInfo,
79        random: SimRandomProvider,
80        time: SimTimeProvider,
81        chaos_shutdown: tokio_util::sync::CancellationToken,
82    ) -> Self {
83        Self {
84            sim,
85            all_ips,
86            process_info,
87            random,
88            time,
89            chaos_shutdown,
90        }
91    }
92
93    /// Get the number of currently dead (killed but not yet restarted) processes.
94    pub fn dead_count(&self) -> usize {
95        self.process_info.dead_count.get()
96    }
97
98    /// Create a bidirectional network partition between two IPs.
99    ///
100    /// The partition persists until [`heal_partition`](Self::heal_partition) is called.
101    pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
102        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
103            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
104        })?;
105        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
106            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
107        })?;
108        // Use a long duration — heal_partition is the expected way to undo
109        self.sim
110            .partition_pair(a_ip, b_ip, Duration::from_secs(3600))
111    }
112
113    /// Remove a network partition between two IPs.
114    pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
115        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
116            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
117        })?;
118        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
119            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
120        })?;
121        self.sim.restore_partition(a_ip, b_ip)
122    }
123
124    /// Check whether two IPs are partitioned.
125    pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
126        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
127            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
128        })?;
129        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
130            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
131        })?;
132        self.sim.is_partitioned(a_ip, b_ip)
133    }
134
135    /// Get all IP addresses in the simulation.
136    pub fn all_ips(&self) -> &[String] {
137        &self.all_ips
138    }
139
140    /// Get the seeded random provider.
141    pub fn random(&self) -> &SimRandomProvider {
142        &self.random
143    }
144
145    /// Get the simulated time provider.
146    pub fn time(&self) -> &SimTimeProvider {
147        &self.time
148    }
149
150    /// Get the chaos-phase shutdown token.
151    ///
152    /// This token is cancelled at the chaos→recovery boundary,
153    /// signaling fault injectors to stop.
154    pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
155        &self.chaos_shutdown
156    }
157
158    /// Get all server process IPs.
159    pub fn process_ips(&self) -> &[String] {
160        &self.process_info.process_ips
161    }
162
163    /// Reboot a specific process by IP.
164    ///
165    /// For [`RebootKind::Graceful`]: schedules a `ProcessGracefulShutdown` event.
166    /// The orchestrator cancels the per-process shutdown token, giving the process
167    /// a grace period to drain buffers and clean up. After the grace period,
168    /// a force-kill aborts the task and connections, then schedules restart.
169    ///
170    /// For [`RebootKind::Crash`] and [`RebootKind::CrashAndWipe`]: immediately
171    /// aborts all connections and schedules a `ProcessRestart` event.
172    pub fn reboot(&self, ip: &str, kind: RebootKind) -> SimulationResult<()> {
173        let recovery_range = 1000..10000;
174        let grace_range = 2000..5000;
175        self.reboot_with_delays(ip, kind, &recovery_range, &grace_range)
176    }
177
178    /// Reboot a process with custom delay ranges.
179    ///
180    /// Like [`reboot`](Self::reboot) but with configurable recovery delay and
181    /// grace period ranges (in milliseconds). Used by [`AttritionInjector`] to
182    /// pass through [`Attrition`](super::process::Attrition) configuration.
183    pub fn reboot_with_delays(
184        &self,
185        ip: &str,
186        kind: RebootKind,
187        recovery_delay_range_ms: &std::ops::Range<usize>,
188        grace_period_range_ms: &std::ops::Range<usize>,
189    ) -> SimulationResult<()> {
190        let ip_addr: std::net::IpAddr = ip.parse().map_err(|e| {
191            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", ip, e))
192        })?;
193
194        match kind {
195            RebootKind::Graceful => {
196                assert_reachable!("reboot: graceful path");
197                let grace_ms = crate::sim::sim_random_range(grace_period_range_ms.clone()) as u64;
198                let recovery_ms =
199                    crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
200                self.sim.schedule_event(
201                    crate::sim::Event::ProcessGracefulShutdown {
202                        ip: ip_addr,
203                        grace_period_ms: grace_ms,
204                        recovery_delay_ms: recovery_ms,
205                    },
206                    Duration::from_nanos(1),
207                );
208                tracing::info!(
209                    "Initiated graceful reboot for process at IP {} (grace={}ms, recovery={}ms)",
210                    ip,
211                    grace_ms,
212                    recovery_ms
213                );
214            }
215            RebootKind::Crash | RebootKind::CrashAndWipe => {
216                assert_reachable!("reboot: crash path");
217                self.sim.abort_all_connections_for_ip(ip_addr);
218                self.process_info
219                    .dead_count
220                    .set(self.process_info.dead_count.get() + 1);
221                let delay_ms = crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
222                let recovery_delay = Duration::from_millis(delay_ms);
223                self.sim.schedule_process_restart(ip_addr, recovery_delay);
224                tracing::info!(
225                    "Crashed process at IP {} (recovery in {:?})",
226                    ip,
227                    recovery_delay
228                );
229            }
230        }
231
232        Ok(())
233    }
234
235    /// Reboot a random alive server process.
236    ///
237    /// Picks a random process from the process IP list and reboots it.
238    /// Returns `Ok(None)` if no processes are available.
239    pub fn reboot_random(&self, kind: RebootKind) -> SimulationResult<Option<String>> {
240        if self.process_info.process_ips.is_empty() {
241            return Ok(None);
242        }
243        let idx = crate::sim::sim_random_range(0..self.process_info.process_ips.len());
244        let ip = self.process_info.process_ips[idx].clone();
245        self.reboot(&ip, kind)?;
246        Ok(Some(ip))
247    }
248
249    /// Reboot all processes matching a tag key=value pair.
250    pub fn reboot_tagged(
251        &self,
252        key: &str,
253        value: &str,
254        kind: RebootKind,
255    ) -> SimulationResult<Vec<String>> {
256        let matching_ips: Vec<String> = self
257            .process_info
258            .tag_registry
259            .ips_tagged(key, value)
260            .into_iter()
261            .map(|ip| ip.to_string())
262            .collect();
263
264        for ip in &matching_ips {
265            self.reboot(ip, kind)?;
266        }
267
268        Ok(matching_ips)
269    }
270}
271
272/// A fault injector that introduces failures during the chaos phase.
273///
274/// Fault injectors run concurrently with workloads during the chaos phase.
275/// When `PhaseConfig` is used, they are signaled to stop via
276/// `ctx.chaos_shutdown()` at the chaos→recovery boundary.
277#[async_trait(?Send)]
278pub trait FaultInjector: 'static {
279    /// Name of this fault injector for reporting.
280    fn name(&self) -> &str;
281
282    /// Inject faults using the provided context.
283    ///
284    /// Should respect `ctx.chaos_shutdown()` to allow graceful termination.
285    async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
286}
287
288/// Two-phase simulation configuration.
289///
290/// Controls the TigerBeetle VOPR-style chaos/recovery lifecycle:
291/// 1. **Chaos phase** (`chaos_duration`): Workloads + fault injectors run concurrently.
292///    Invariants are checked after every simulation event.
293/// 2. **Recovery phase** (`recovery_duration`): Fault injectors stopped, workloads
294///    continue, system heals. Verifies convergence after faults cease.
295#[derive(Debug, Clone, PartialEq)]
296pub struct PhaseConfig {
297    /// Duration of the chaos phase (faults + workloads run concurrently).
298    pub chaos_duration: Duration,
299    /// Duration of the recovery phase (faults stopped, workloads continue).
300    pub recovery_duration: Duration,
301}
302
303/// Built-in fault injector that randomly reboots server processes.
304///
305/// Active only during the chaos phase. Respects `max_dead` to limit the
306/// number of simultaneously dead processes. The reboot type is chosen by
307/// weighted probability from the [`Attrition`](super::process::Attrition) config.
308pub(crate) struct AttritionInjector {
309    config: super::process::Attrition,
310}
311
312impl AttritionInjector {
313    /// Create a new attrition injector from the given configuration.
314    pub(crate) fn new(config: super::process::Attrition) -> Self {
315        Self { config }
316    }
317}
318
319#[async_trait(?Send)]
320impl FaultInjector for AttritionInjector {
321    fn name(&self) -> &str {
322        "attrition"
323    }
324
325    async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
326        while !ctx.chaos_shutdown().is_cancelled() {
327            // Random delay between reboot attempts (1-5 seconds)
328            let delay_ms = crate::sim::sim_random_range(1000..5000);
329            ctx.time()
330                .sleep(Duration::from_millis(delay_ms as u64))
331                .await
332                .map_err(|e| {
333                    crate::SimulationError::InvalidState(format!("sleep failed: {}", e))
334                })?;
335
336            if ctx.chaos_shutdown().is_cancelled() {
337                break;
338            }
339
340            if ctx.process_ips().is_empty() {
341                continue;
342            }
343
344            // Respect max_dead: skip this cycle if already at the limit
345            if ctx.dead_count() >= self.config.max_dead {
346                assert_reachable!("attrition: max_dead limit enforced");
347                continue;
348            }
349
350            // Choose reboot kind by weighted probability
351            let rand_val = crate::sim::sim_random_range(0..10000) as f64 / 10000.0;
352            let kind = self.config.choose_kind(rand_val);
353            assert_sometimes_each!("attrition_reboot_kind", [("kind", kind as i64)]);
354
355            // Use configured delay ranges (or defaults)
356            let recovery_range = self.config.recovery_delay_ms.clone().unwrap_or(1000..10000);
357            let grace_range = self.config.grace_period_ms.clone().unwrap_or(2000..5000);
358
359            if ctx.process_ips().is_empty() {
360                continue;
361            }
362            let idx = crate::sim::sim_random_range(0..ctx.process_ips().len());
363            let ip = ctx.process_ips()[idx].to_string();
364            assert_sometimes_each!("attrition_process_targeted", [("process_idx", idx as i64)]);
365            ctx.reboot_with_delays(&ip, kind, &recovery_range, &grace_range)?;
366        }
367        Ok(())
368    }
369}