Skip to main content

moonpool_sim/runner/
fault_injector.rs

1//! Fault injection for simulation chaos testing.
2//!
3//! [`FaultInjector`] defines fault injection strategies (partitions, connection drops, etc.)
4//! that run during the chaos phase of a simulation. [`FaultContext`] provides access to
5//! `SimWorld` fault injection primitives.
6//!
7//! When `chaos_duration` is configured on the builder, fault injectors run concurrently
8//! with workloads. At the chaos boundary, `ctx.chaos_shutdown()` is cancelled and the
9//! system settles before running workload checks.
10//!
11//! # Usage
12//!
13//! ```ignore
14//! use moonpool_sim::{FaultInjector, FaultContext, SimulationResult};
15//! use std::time::Duration;
16//!
17//! struct RandomPartition { probability: f64 }
18//!
19//! #[async_trait(?Send)]
20//! impl FaultInjector for RandomPartition {
21//!     fn name(&self) -> &str { "random_partition" }
22//!     async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
23//!         let ips = ctx.all_ips();
24//!         while !ctx.chaos_shutdown().is_cancelled() {
25//!             if ctx.random().random_bool(self.probability) && ips.len() >= 2 {
26//!                 ctx.partition(&ips[0], &ips[1])?;
27//!                 ctx.time().sleep(Duration::from_secs(5)).await?;
28//!                 ctx.heal_partition(&ips[0], &ips[1])?;
29//!             }
30//!             ctx.time().sleep(Duration::from_secs(1)).await?;
31//!         }
32//!         Ok(())
33//!     }
34//! }
35//! ```
36
37use std::time::Duration;
38
39use async_trait::async_trait;
40use moonpool_core::TimeProvider;
41
42use crate::SimulationResult;
43use crate::providers::{SimRandomProvider, SimTimeProvider};
44use crate::runner::process::RebootKind;
45use crate::runner::tags::TagRegistry;
46use crate::sim::SimWorld;
47use crate::{assert_reachable, assert_sometimes_each};
48
49/// Process-related state for fault injection targeting.
50pub struct ProcessInfo {
51    /// Server process IP addresses.
52    pub process_ips: Vec<String>,
53    /// Tag registry mapping process IPs to their resolved tags.
54    pub tag_registry: TagRegistry,
55    /// Shared count of currently dead (killed but not yet restarted) processes.
56    pub dead_count: std::rc::Rc<std::cell::Cell<usize>>,
57}
58
59/// Context for fault injectors — gives access to SimWorld fault injection methods.
60///
61/// Unlike `SimContext` (which workloads receive), `FaultContext` provides direct
62/// access to network partitioning, reboot, and other fault primitives that normal
63/// workloads should not use.
64pub struct FaultContext {
65    sim: SimWorld,
66    all_ips: Vec<String>,
67    process_info: ProcessInfo,
68    random: SimRandomProvider,
69    time: SimTimeProvider,
70    chaos_shutdown: tokio_util::sync::CancellationToken,
71}
72
73impl FaultContext {
74    /// Create a new fault context with process information.
75    pub fn new(
76        sim: SimWorld,
77        all_ips: Vec<String>,
78        process_info: ProcessInfo,
79        random: SimRandomProvider,
80        time: SimTimeProvider,
81        chaos_shutdown: tokio_util::sync::CancellationToken,
82    ) -> Self {
83        Self {
84            sim,
85            all_ips,
86            process_info,
87            random,
88            time,
89            chaos_shutdown,
90        }
91    }
92
93    /// Get the number of currently dead (killed but not yet restarted) processes.
94    pub fn dead_count(&self) -> usize {
95        self.process_info.dead_count.get()
96    }
97
98    /// Create a bidirectional network partition between two IPs.
99    ///
100    /// The partition persists until [`heal_partition`](Self::heal_partition) is called.
101    pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
102        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
103            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
104        })?;
105        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
106            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
107        })?;
108        // Use a long duration — heal_partition is the expected way to undo
109        self.sim
110            .partition_pair(a_ip, b_ip, Duration::from_secs(3600))
111    }
112
113    /// Remove a network partition between two IPs.
114    pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
115        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
116            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
117        })?;
118        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
119            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
120        })?;
121        self.sim.restore_partition(a_ip, b_ip)
122    }
123
124    /// Check whether two IPs are partitioned.
125    pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
126        let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
127            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
128        })?;
129        let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
130            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
131        })?;
132        self.sim.is_partitioned(a_ip, b_ip)
133    }
134
135    /// Get all IP addresses in the simulation.
136    pub fn all_ips(&self) -> &[String] {
137        &self.all_ips
138    }
139
140    /// Get the seeded random provider.
141    pub fn random(&self) -> &SimRandomProvider {
142        &self.random
143    }
144
145    /// Get the simulated time provider.
146    pub fn time(&self) -> &SimTimeProvider {
147        &self.time
148    }
149
150    /// Get the chaos-phase shutdown token.
151    ///
152    /// This token is cancelled at the chaos→recovery boundary,
153    /// signaling fault injectors to stop.
154    pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
155        &self.chaos_shutdown
156    }
157
158    /// Get all server process IPs.
159    pub fn process_ips(&self) -> &[String] {
160        &self.process_info.process_ips
161    }
162
163    /// Reboot a specific process by IP.
164    ///
165    /// For [`RebootKind::Graceful`]: schedules a `ProcessGracefulShutdown` event.
166    /// The orchestrator cancels the per-process shutdown token, giving the process
167    /// a grace period to drain buffers and clean up. After the grace period,
168    /// a force-kill aborts the task and connections, then schedules restart.
169    ///
170    /// For [`RebootKind::Crash`] and [`RebootKind::CrashAndWipe`]: immediately
171    /// aborts all connections and schedules a `ProcessRestart` event.
172    pub fn reboot(&self, ip: &str, kind: RebootKind) -> SimulationResult<()> {
173        let recovery_range = 1000..10000;
174        let grace_range = 2000..5000;
175        self.reboot_with_delays(ip, kind, &recovery_range, &grace_range)
176    }
177
178    /// Reboot a process with custom delay ranges.
179    ///
180    /// Like [`reboot`](Self::reboot) but with configurable recovery delay and
181    /// grace period ranges (in milliseconds). Used by [`AttritionInjector`] to
182    /// pass through [`Attrition`](super::process::Attrition) configuration.
183    pub fn reboot_with_delays(
184        &self,
185        ip: &str,
186        kind: RebootKind,
187        recovery_delay_range_ms: &std::ops::Range<usize>,
188        grace_period_range_ms: &std::ops::Range<usize>,
189    ) -> SimulationResult<()> {
190        let ip_addr: std::net::IpAddr = ip.parse().map_err(|e| {
191            crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", ip, e))
192        })?;
193
194        match kind {
195            RebootKind::Graceful => {
196                assert_reachable!("reboot: graceful path");
197                let grace_ms = crate::sim::sim_random_range(grace_period_range_ms.clone()) as u64;
198                let recovery_ms =
199                    crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
200                self.sim.schedule_event(
201                    crate::sim::Event::ProcessGracefulShutdown {
202                        ip: ip_addr,
203                        grace_period_ms: grace_ms,
204                        recovery_delay_ms: recovery_ms,
205                    },
206                    Duration::from_nanos(1),
207                );
208                tracing::info!(
209                    "Initiated graceful reboot for process at IP {} (grace={}ms, recovery={}ms)",
210                    ip,
211                    grace_ms,
212                    recovery_ms
213                );
214            }
215            RebootKind::Crash | RebootKind::CrashAndWipe => {
216                assert_reachable!("reboot: crash path");
217                self.sim.abort_all_connections_for_ip(ip_addr);
218                // Crash storage for this process
219                self.sim.simulate_crash_for_process(ip_addr, true);
220                // Wipe storage if CrashAndWipe
221                if kind == RebootKind::CrashAndWipe {
222                    self.sim.wipe_storage_for_process(ip_addr);
223                }
224                self.process_info
225                    .dead_count
226                    .set(self.process_info.dead_count.get() + 1);
227                let delay_ms = crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
228                let recovery_delay = Duration::from_millis(delay_ms);
229                self.sim.schedule_process_restart(ip_addr, recovery_delay);
230                tracing::info!(
231                    "Crashed process at IP {} (recovery in {:?})",
232                    ip,
233                    recovery_delay
234                );
235            }
236        }
237
238        Ok(())
239    }
240
241    /// Reboot a random alive server process.
242    ///
243    /// Picks a random process from the process IP list and reboots it.
244    /// Returns `Ok(None)` if no processes are available.
245    pub fn reboot_random(&self, kind: RebootKind) -> SimulationResult<Option<String>> {
246        if self.process_info.process_ips.is_empty() {
247            return Ok(None);
248        }
249        let idx = crate::sim::sim_random_range(0..self.process_info.process_ips.len());
250        let ip = self.process_info.process_ips[idx].clone();
251        self.reboot(&ip, kind)?;
252        Ok(Some(ip))
253    }
254
255    /// Reboot all processes matching a tag key=value pair.
256    pub fn reboot_tagged(
257        &self,
258        key: &str,
259        value: &str,
260        kind: RebootKind,
261    ) -> SimulationResult<Vec<String>> {
262        let matching_ips: Vec<String> = self
263            .process_info
264            .tag_registry
265            .ips_tagged(key, value)
266            .into_iter()
267            .map(|ip| ip.to_string())
268            .collect();
269
270        for ip in &matching_ips {
271            self.reboot(ip, kind)?;
272        }
273
274        Ok(matching_ips)
275    }
276}
277
278/// A fault injector that introduces failures during the chaos phase.
279///
280/// Fault injectors run concurrently with workloads when `chaos_duration` is set.
281/// They are signaled to stop via `ctx.chaos_shutdown()` when the chaos duration
282/// elapses. After all workloads complete, the system settles before checks run.
283#[async_trait(?Send)]
284pub trait FaultInjector: 'static {
285    /// Name of this fault injector for reporting.
286    fn name(&self) -> &str;
287
288    /// Inject faults using the provided context.
289    ///
290    /// Should respect `ctx.chaos_shutdown()` to allow graceful termination.
291    async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
292}
293
294/// Built-in fault injector that randomly reboots server processes.
295///
296/// Active only during the chaos phase. Respects `max_dead` to limit the
297/// number of simultaneously dead processes. The reboot type is chosen by
298/// weighted probability from the [`Attrition`](super::process::Attrition) config.
299pub(crate) struct AttritionInjector {
300    config: super::process::Attrition,
301}
302
303impl AttritionInjector {
304    /// Create a new attrition injector from the given configuration.
305    pub(crate) fn new(config: super::process::Attrition) -> Self {
306        Self { config }
307    }
308}
309
310#[async_trait(?Send)]
311impl FaultInjector for AttritionInjector {
312    fn name(&self) -> &str {
313        "attrition"
314    }
315
316    async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
317        while !ctx.chaos_shutdown().is_cancelled() {
318            // Random delay between reboot attempts (1-5 seconds)
319            let delay_ms = crate::sim::sim_random_range(1000..5000);
320            ctx.time()
321                .sleep(Duration::from_millis(delay_ms as u64))
322                .await
323                .map_err(|e| {
324                    crate::SimulationError::InvalidState(format!("sleep failed: {}", e))
325                })?;
326
327            if ctx.chaos_shutdown().is_cancelled() {
328                break;
329            }
330
331            if ctx.process_ips().is_empty() {
332                continue;
333            }
334
335            // Respect max_dead: skip this cycle if already at the limit
336            if ctx.dead_count() >= self.config.max_dead {
337                assert_reachable!("attrition: max_dead limit enforced");
338                continue;
339            }
340
341            // Choose reboot kind by weighted probability
342            let rand_val = crate::sim::sim_random_range(0..10000) as f64 / 10000.0;
343            let kind = self.config.choose_kind(rand_val);
344            assert_sometimes_each!("attrition_reboot_kind", [("kind", kind as i64)]);
345
346            // Use configured delay ranges (or defaults)
347            let recovery_range = self.config.recovery_delay_ms.clone().unwrap_or(1000..10000);
348            let grace_range = self.config.grace_period_ms.clone().unwrap_or(2000..5000);
349
350            if ctx.process_ips().is_empty() {
351                continue;
352            }
353            let idx = crate::sim::sim_random_range(0..ctx.process_ips().len());
354            let ip = ctx.process_ips()[idx].to_string();
355            assert_sometimes_each!("attrition_process_targeted", [("process_idx", idx as i64)]);
356            ctx.reboot_with_delays(&ip, kind, &recovery_range, &grace_range)?;
357        }
358        Ok(())
359    }
360}