1use std::time::Duration;
38
39use async_trait::async_trait;
40use moonpool_core::TimeProvider;
41
42use crate::SimulationResult;
43use crate::providers::{SimRandomProvider, SimTimeProvider};
44use crate::runner::process::RebootKind;
45use crate::runner::tags::TagRegistry;
46use crate::sim::SimWorld;
47use crate::{assert_reachable, assert_sometimes_each};
48
49pub struct ProcessInfo {
51 pub process_ips: Vec<String>,
53 pub tag_registry: TagRegistry,
55 pub dead_count: std::rc::Rc<std::cell::Cell<usize>>,
57}
58
59pub struct FaultContext {
65 sim: SimWorld,
66 all_ips: Vec<String>,
67 process_info: ProcessInfo,
68 random: SimRandomProvider,
69 time: SimTimeProvider,
70 chaos_shutdown: tokio_util::sync::CancellationToken,
71}
72
73impl FaultContext {
74 pub fn new(
76 sim: SimWorld,
77 all_ips: Vec<String>,
78 process_info: ProcessInfo,
79 random: SimRandomProvider,
80 time: SimTimeProvider,
81 chaos_shutdown: tokio_util::sync::CancellationToken,
82 ) -> Self {
83 Self {
84 sim,
85 all_ips,
86 process_info,
87 random,
88 time,
89 chaos_shutdown,
90 }
91 }
92
93 pub fn dead_count(&self) -> usize {
95 self.process_info.dead_count.get()
96 }
97
98 pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
102 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
103 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
104 })?;
105 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
106 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
107 })?;
108 self.sim
110 .partition_pair(a_ip, b_ip, Duration::from_secs(3600))
111 }
112
113 pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
115 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
116 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
117 })?;
118 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
119 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
120 })?;
121 self.sim.restore_partition(a_ip, b_ip)
122 }
123
124 pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
126 let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
127 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
128 })?;
129 let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
130 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
131 })?;
132 self.sim.is_partitioned(a_ip, b_ip)
133 }
134
135 pub fn all_ips(&self) -> &[String] {
137 &self.all_ips
138 }
139
140 pub fn random(&self) -> &SimRandomProvider {
142 &self.random
143 }
144
145 pub fn time(&self) -> &SimTimeProvider {
147 &self.time
148 }
149
150 pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
155 &self.chaos_shutdown
156 }
157
158 pub fn process_ips(&self) -> &[String] {
160 &self.process_info.process_ips
161 }
162
163 pub fn reboot(&self, ip: &str, kind: RebootKind) -> SimulationResult<()> {
173 let recovery_range = 1000..10000;
174 let grace_range = 2000..5000;
175 self.reboot_with_delays(ip, kind, &recovery_range, &grace_range)
176 }
177
178 pub fn reboot_with_delays(
184 &self,
185 ip: &str,
186 kind: RebootKind,
187 recovery_delay_range_ms: &std::ops::Range<usize>,
188 grace_period_range_ms: &std::ops::Range<usize>,
189 ) -> SimulationResult<()> {
190 let ip_addr: std::net::IpAddr = ip.parse().map_err(|e| {
191 crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", ip, e))
192 })?;
193
194 match kind {
195 RebootKind::Graceful => {
196 assert_reachable!("reboot: graceful path");
197 let grace_ms = crate::sim::sim_random_range(grace_period_range_ms.clone()) as u64;
198 let recovery_ms =
199 crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
200 self.sim.schedule_event(
201 crate::sim::Event::ProcessGracefulShutdown {
202 ip: ip_addr,
203 grace_period_ms: grace_ms,
204 recovery_delay_ms: recovery_ms,
205 },
206 Duration::from_nanos(1),
207 );
208 tracing::info!(
209 "Initiated graceful reboot for process at IP {} (grace={}ms, recovery={}ms)",
210 ip,
211 grace_ms,
212 recovery_ms
213 );
214 }
215 RebootKind::Crash | RebootKind::CrashAndWipe => {
216 assert_reachable!("reboot: crash path");
217 self.sim.abort_all_connections_for_ip(ip_addr);
218 self.sim.simulate_crash_for_process(ip_addr, true);
220 if kind == RebootKind::CrashAndWipe {
222 self.sim.wipe_storage_for_process(ip_addr);
223 }
224 self.process_info
225 .dead_count
226 .set(self.process_info.dead_count.get() + 1);
227 let delay_ms = crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
228 let recovery_delay = Duration::from_millis(delay_ms);
229 self.sim.schedule_process_restart(ip_addr, recovery_delay);
230 tracing::info!(
231 "Crashed process at IP {} (recovery in {:?})",
232 ip,
233 recovery_delay
234 );
235 }
236 }
237
238 Ok(())
239 }
240
241 pub fn reboot_random(&self, kind: RebootKind) -> SimulationResult<Option<String>> {
246 if self.process_info.process_ips.is_empty() {
247 return Ok(None);
248 }
249 let idx = crate::sim::sim_random_range(0..self.process_info.process_ips.len());
250 let ip = self.process_info.process_ips[idx].clone();
251 self.reboot(&ip, kind)?;
252 Ok(Some(ip))
253 }
254
255 pub fn reboot_tagged(
257 &self,
258 key: &str,
259 value: &str,
260 kind: RebootKind,
261 ) -> SimulationResult<Vec<String>> {
262 let matching_ips: Vec<String> = self
263 .process_info
264 .tag_registry
265 .ips_tagged(key, value)
266 .into_iter()
267 .map(|ip| ip.to_string())
268 .collect();
269
270 for ip in &matching_ips {
271 self.reboot(ip, kind)?;
272 }
273
274 Ok(matching_ips)
275 }
276}
277
278#[async_trait(?Send)]
284pub trait FaultInjector: 'static {
285 fn name(&self) -> &str;
287
288 async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
292}
293
294pub(crate) struct AttritionInjector {
300 config: super::process::Attrition,
301}
302
303impl AttritionInjector {
304 pub(crate) fn new(config: super::process::Attrition) -> Self {
306 Self { config }
307 }
308}
309
310#[async_trait(?Send)]
311impl FaultInjector for AttritionInjector {
312 fn name(&self) -> &str {
313 "attrition"
314 }
315
316 async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
317 while !ctx.chaos_shutdown().is_cancelled() {
318 let delay_ms = crate::sim::sim_random_range(1000..5000);
320 ctx.time()
321 .sleep(Duration::from_millis(delay_ms as u64))
322 .await
323 .map_err(|e| {
324 crate::SimulationError::InvalidState(format!("sleep failed: {}", e))
325 })?;
326
327 if ctx.chaos_shutdown().is_cancelled() {
328 break;
329 }
330
331 if ctx.process_ips().is_empty() {
332 continue;
333 }
334
335 if ctx.dead_count() >= self.config.max_dead {
337 assert_reachable!("attrition: max_dead limit enforced");
338 continue;
339 }
340
341 let rand_val = crate::sim::sim_random_range(0..10000) as f64 / 10000.0;
343 let kind = self.config.choose_kind(rand_val);
344 assert_sometimes_each!("attrition_reboot_kind", [("kind", kind as i64)]);
345
346 let recovery_range = self.config.recovery_delay_ms.clone().unwrap_or(1000..10000);
348 let grace_range = self.config.grace_period_ms.clone().unwrap_or(2000..5000);
349
350 if ctx.process_ips().is_empty() {
351 continue;
352 }
353 let idx = crate::sim::sim_random_range(0..ctx.process_ips().len());
354 let ip = ctx.process_ips()[idx].to_string();
355 assert_sometimes_each!("attrition_process_targeted", [("process_idx", idx as i64)]);
356 ctx.reboot_with_delays(&ip, kind, &recovery_range, &grace_range)?;
357 }
358 Ok(())
359 }
360}