use std::time::Duration;
use async_trait::async_trait;
use moonpool_core::TimeProvider;
use crate::SimulationResult;
use crate::providers::{SimRandomProvider, SimTimeProvider};
use crate::runner::process::RebootKind;
use crate::runner::tags::TagRegistry;
use crate::sim::SimWorld;
use crate::{assert_reachable, assert_sometimes_each};
pub struct ProcessInfo {
pub process_ips: Vec<String>,
pub tag_registry: TagRegistry,
pub dead_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
pub struct FaultContext {
sim: SimWorld,
process_info: ProcessInfo,
random: SimRandomProvider,
time: SimTimeProvider,
chaos_shutdown: tokio_util::sync::CancellationToken,
}
impl FaultContext {
#[must_use]
pub fn new(
sim: SimWorld,
process_info: ProcessInfo,
random: SimRandomProvider,
time: SimTimeProvider,
chaos_shutdown: tokio_util::sync::CancellationToken,
) -> Self {
Self {
sim,
process_info,
random,
time,
chaos_shutdown,
}
}
#[must_use]
pub fn dead_count(&self) -> usize {
self.process_info
.dead_count
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
let a_ip: std::net::IpAddr = a
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{a}': {e}")))?;
let b_ip: std::net::IpAddr = b
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{b}': {e}")))?;
self.sim.partition_pair(a_ip, b_ip, Duration::from_hours(1))
}
pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
let a_ip: std::net::IpAddr = a
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{a}': {e}")))?;
let b_ip: std::net::IpAddr = b
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{b}': {e}")))?;
self.sim.restore_partition(a_ip, b_ip)
}
pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
let a_ip: std::net::IpAddr = a
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{a}': {e}")))?;
let b_ip: std::net::IpAddr = b
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{b}': {e}")))?;
self.sim.is_partitioned(a_ip, b_ip)
}
#[must_use]
pub fn random(&self) -> &SimRandomProvider {
&self.random
}
#[must_use]
pub fn time(&self) -> &SimTimeProvider {
&self.time
}
#[must_use]
pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
&self.chaos_shutdown
}
#[must_use]
pub fn process_ips(&self) -> &[String] {
&self.process_info.process_ips
}
pub fn reboot(&self, ip: &str, kind: RebootKind) -> SimulationResult<()> {
let recovery_range = 1000..10000;
let grace_range = 2000..5000;
self.reboot_with_delays(ip, kind, &recovery_range, &grace_range)
}
pub fn reboot_with_delays(
&self,
ip: &str,
kind: RebootKind,
recovery_delay_range_ms: &std::ops::Range<usize>,
grace_period_range_ms: &std::ops::Range<usize>,
) -> SimulationResult<()> {
let ip_addr: std::net::IpAddr = ip
.parse()
.map_err(|e| crate::SimulationError::InvalidState(format!("invalid IP '{ip}': {e}")))?;
match kind {
RebootKind::Graceful => {
assert_reachable!("reboot: graceful path");
let grace_ms = crate::sim::sim_random_range(grace_period_range_ms.clone()) as u64;
let recovery_ms =
crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
self.sim.schedule_event(
crate::sim::Event::ProcessGracefulShutdown {
ip: ip_addr,
grace_period_ms: grace_ms,
recovery_delay_ms: recovery_ms,
},
Duration::from_nanos(1),
);
tracing::info!(
"Initiated graceful reboot for process at IP {} (grace={}ms, recovery={}ms)",
ip,
grace_ms,
recovery_ms
);
}
RebootKind::Crash | RebootKind::CrashAndWipe => {
assert_reachable!("reboot: crash path");
self.sim.abort_all_connections_for_ip(ip_addr);
self.sim.simulate_crash_for_process(ip_addr, true);
if kind == RebootKind::CrashAndWipe {
self.sim.wipe_storage_for_process(ip_addr);
}
self.process_info
.dead_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let delay_ms = crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
let recovery_delay = Duration::from_millis(delay_ms);
self.sim.schedule_process_restart(ip_addr, recovery_delay);
tracing::info!(
"Crashed process at IP {} (recovery in {:?})",
ip,
recovery_delay
);
}
}
Ok(())
}
pub fn reboot_random(&self, kind: RebootKind) -> SimulationResult<Option<String>> {
if self.process_info.process_ips.is_empty() {
return Ok(None);
}
let idx = crate::sim::sim_random_range(0..self.process_info.process_ips.len());
let ip = self.process_info.process_ips[idx].clone();
self.reboot(&ip, kind)?;
Ok(Some(ip))
}
pub fn reboot_tagged(
&self,
key: &str,
value: &str,
kind: RebootKind,
) -> SimulationResult<Vec<String>> {
let matching_ips: Vec<String> = self
.process_info
.tag_registry
.ips_tagged(key, value)
.into_iter()
.map(|ip| ip.to_string())
.collect();
for ip in &matching_ips {
self.reboot(ip, kind)?;
}
Ok(matching_ips)
}
}
#[async_trait]
pub trait FaultInjector: Send + Sync + 'static {
fn name(&self) -> &str;
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
}
pub(crate) struct AttritionInjector {
config: super::process::Attrition,
}
impl AttritionInjector {
pub(crate) fn new(config: super::process::Attrition) -> Self {
Self { config }
}
}
#[async_trait]
impl FaultInjector for AttritionInjector {
fn name(&self) -> &'static str {
"attrition"
}
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
while !ctx.chaos_shutdown().is_cancelled() {
let delay_ms = crate::sim::sim_random_range(1000..5000);
ctx.time()
.sleep(Duration::from_millis(
u64::try_from(delay_ms).expect("delay_ms is non-negative"),
))
.await
.map_err(|e| crate::SimulationError::InvalidState(format!("sleep failed: {e}")))?;
if ctx.chaos_shutdown().is_cancelled() {
break;
}
if ctx.process_ips().is_empty() {
continue;
}
if ctx.dead_count() >= self.config.max_dead {
assert_reachable!("attrition: max_dead limit enforced");
continue;
}
let rand_val = f64::from(crate::sim::sim_random_range(0..10000)) / 10000.0;
let kind = self.config.choose_kind(rand_val);
assert_sometimes_each!("attrition_reboot_kind", [("kind", kind as i64)]);
let recovery_range = self.config.recovery_delay_ms.clone().unwrap_or(1000..10000);
let grace_range = self.config.grace_period_ms.clone().unwrap_or(2000..5000);
if ctx.process_ips().is_empty() {
continue;
}
let idx = crate::sim::sim_random_range(0..ctx.process_ips().len());
let ip = ctx.process_ips()[idx].clone();
assert_sometimes_each!(
"attrition_process_targeted",
[("process_idx", i64::try_from(idx).unwrap_or(i64::MAX))]
);
ctx.reboot_with_delays(&ip, kind, &recovery_range, &grace_range)?;
}
Ok(())
}
}