Skip to main content

moonpool_sim/runner/
fault_injector.rs

1//! Fault injection for simulation chaos testing.
2//!
3//! [`FaultInjector`] defines fault injection strategies (partitions, connection drops, etc.)
4//! that run during the chaos phase of a simulation. [`FaultContext`] provides access to
5//! `SimWorld` fault injection primitives.
6//!
7//! [`PhaseConfig`] controls the two-phase chaos/recovery lifecycle:
8//! - **Chaos phase**: Workloads + fault injectors run concurrently
9//! - **Recovery phase**: Fault injectors stopped, workloads continue, system heals
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use moonpool_sim::{FaultInjector, FaultContext, PhaseConfig, SimulationResult};
15//! use std::time::Duration;
16//!
17//! struct RandomPartition { probability: f64 }
18//!
19//! #[async_trait(?Send)]
20//! impl FaultInjector for RandomPartition {
21//!     fn name(&self) -> &str { "random_partition" }
22//!     async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
23//!         let ips = ctx.all_ips();
24//!         while !ctx.chaos_shutdown().is_cancelled() {
25//!             if ctx.random().random_bool(self.probability) && ips.len() >= 2 {
26//!                 ctx.partition(&ips[0], &ips[1])?;
27//!                 ctx.time().sleep(Duration::from_secs(5)).await?;
28//!                 ctx.heal_partition(&ips[0], &ips[1])?;
29//!             }
30//!             ctx.time().sleep(Duration::from_secs(1)).await?;
31//!         }
32//!         Ok(())
33//!     }
34//! }
35//! ```
36
37use std::time::Duration;
38
39use async_trait::async_trait;
40
41use crate::SimulationResult;
42use crate::providers::{SimRandomProvider, SimTimeProvider};
43use crate::sim::SimWorld;
44
45/// Context for fault injectors — gives access to SimWorld fault injection methods.
46///
47/// Unlike `SimContext` (which workloads receive), `FaultContext` provides direct
48/// access to network partitioning and other fault primitives that normal workloads
49/// should not use.
50pub struct FaultContext {
51    sim: SimWorld,
52    all_ips: Vec<String>,
53    random: SimRandomProvider,
54    time: SimTimeProvider,
55    chaos_shutdown: tokio_util::sync::CancellationToken,
56}
57
58impl FaultContext {
59    /// Create a new fault context.
60    pub fn new(
61        sim: SimWorld,
62        all_ips: Vec<String>,
63        random: SimRandomProvider,
64        time: SimTimeProvider,
65        chaos_shutdown: tokio_util::sync::CancellationToken,
66    ) -> Self {
67        Self {
68            sim,
69            all_ips,
70            random,
71            time,
72            chaos_shutdown,
73        }
74    }
75
76    /// Create a bidirectional network partition between two IPs.
77    ///
78    /// The partition persists until [`heal_partition`](Self::heal_partition) is called.
79    pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
80        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
81            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
82        })?;
83        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
84            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
85        })?;
86        // Use a long duration — heal_partition is the expected way to undo
87        self.sim
88            .partition_pair(a_ip, b_ip, Duration::from_secs(3600))
89    }
90
91    /// Remove a network partition between two IPs.
92    pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
93        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
94            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
95        })?;
96        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
97            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
98        })?;
99        self.sim.restore_partition(a_ip, b_ip)
100    }
101
102    /// Check whether two IPs are partitioned.
103    pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
104        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
105            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
106        })?;
107        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
108            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
109        })?;
110        self.sim.is_partitioned(a_ip, b_ip)
111    }
112
113    /// Get all IP addresses in the simulation.
114    pub fn all_ips(&self) -> &[String] {
115        &self.all_ips
116    }
117
118    /// Get the seeded random provider.
119    pub fn random(&self) -> &SimRandomProvider {
120        &self.random
121    }
122
123    /// Get the simulated time provider.
124    pub fn time(&self) -> &SimTimeProvider {
125        &self.time
126    }
127
128    /// Get the chaos-phase shutdown token.
129    ///
130    /// This token is cancelled at the chaos→recovery boundary,
131    /// signaling fault injectors to stop.
132    pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
133        &self.chaos_shutdown
134    }
135}
136
137/// A fault injector that introduces failures during the chaos phase.
138///
139/// Fault injectors run concurrently with workloads during the chaos phase.
140/// When `PhaseConfig` is used, they are signaled to stop via
141/// `ctx.chaos_shutdown()` at the chaos→recovery boundary.
142#[async_trait(?Send)]
143pub trait FaultInjector: 'static {
144    /// Name of this fault injector for reporting.
145    fn name(&self) -> &str;
146
147    /// Inject faults using the provided context.
148    ///
149    /// Should respect `ctx.chaos_shutdown()` to allow graceful termination.
150    async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
151}
152
153/// Two-phase simulation configuration.
154///
155/// Controls the TigerBeetle VOPR-style chaos/recovery lifecycle:
156/// 1. **Chaos phase** (`chaos_duration`): Workloads + fault injectors run concurrently.
157///    Invariants are checked after every simulation event.
158/// 2. **Recovery phase** (`recovery_duration`): Fault injectors stopped, workloads
159///    continue, system heals. Verifies convergence after faults cease.
160#[derive(Debug, Clone)]
161pub struct PhaseConfig {
162    /// Duration of the chaos phase (faults + workloads run concurrently).
163    pub chaos_duration: Duration,
164    /// Duration of the recovery phase (faults stopped, workloads continue).
165    pub recovery_duration: Duration,
166}