use std::time::Duration;
use async_trait::async_trait;
use moonpool_core::TimeProvider;
use crate::SimulationResult;
use crate::providers::{SimRandomProvider, SimTimeProvider};
use crate::runner::process::RebootKind;
use crate::runner::tags::TagRegistry;
use crate::sim::SimWorld;
use crate::{assert_reachable, assert_sometimes_each};
pub struct ProcessInfo {
pub process_ips: Vec<String>,
pub tag_registry: TagRegistry,
pub dead_count: std::rc::Rc<std::cell::Cell<usize>>,
}
pub struct FaultContext {
sim: SimWorld,
all_ips: Vec<String>,
process_info: ProcessInfo,
random: SimRandomProvider,
time: SimTimeProvider,
chaos_shutdown: tokio_util::sync::CancellationToken,
}
impl FaultContext {
pub fn new(
sim: SimWorld,
all_ips: Vec<String>,
process_info: ProcessInfo,
random: SimRandomProvider,
time: SimTimeProvider,
chaos_shutdown: tokio_util::sync::CancellationToken,
) -> Self {
Self {
sim,
all_ips,
process_info,
random,
time,
chaos_shutdown,
}
}
pub fn dead_count(&self) -> usize {
self.process_info.dead_count.get()
}
pub fn partition(&self, a: &str, b: &str) -> SimulationResult<()> {
let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
})?;
let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
})?;
self.sim
.partition_pair(a_ip, b_ip, Duration::from_secs(3600))
}
pub fn heal_partition(&self, a: &str, b: &str) -> SimulationResult<()> {
let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
})?;
let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
})?;
self.sim.restore_partition(a_ip, b_ip)
}
pub fn is_partitioned(&self, a: &str, b: &str) -> SimulationResult<bool> {
let a_ip: std::net::IpAddr = a.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", a, e))
})?;
let b_ip: std::net::IpAddr = b.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", b, e))
})?;
self.sim.is_partitioned(a_ip, b_ip)
}
pub fn all_ips(&self) -> &[String] {
&self.all_ips
}
pub fn random(&self) -> &SimRandomProvider {
&self.random
}
pub fn time(&self) -> &SimTimeProvider {
&self.time
}
pub fn chaos_shutdown(&self) -> &tokio_util::sync::CancellationToken {
&self.chaos_shutdown
}
pub fn process_ips(&self) -> &[String] {
&self.process_info.process_ips
}
pub fn reboot(&self, ip: &str, kind: RebootKind) -> SimulationResult<()> {
let recovery_range = 1000..10000;
let grace_range = 2000..5000;
self.reboot_with_delays(ip, kind, &recovery_range, &grace_range)
}
pub fn reboot_with_delays(
&self,
ip: &str,
kind: RebootKind,
recovery_delay_range_ms: &std::ops::Range<usize>,
grace_period_range_ms: &std::ops::Range<usize>,
) -> SimulationResult<()> {
let ip_addr: std::net::IpAddr = ip.parse().map_err(|e| {
crate::SimulationError::InvalidState(format!("invalid IP '{}': {}", ip, e))
})?;
match kind {
RebootKind::Graceful => {
assert_reachable!("reboot: graceful path");
let grace_ms = crate::sim::sim_random_range(grace_period_range_ms.clone()) as u64;
let recovery_ms =
crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
self.sim.schedule_event(
crate::sim::Event::ProcessGracefulShutdown {
ip: ip_addr,
grace_period_ms: grace_ms,
recovery_delay_ms: recovery_ms,
},
Duration::from_nanos(1),
);
tracing::info!(
"Initiated graceful reboot for process at IP {} (grace={}ms, recovery={}ms)",
ip,
grace_ms,
recovery_ms
);
}
RebootKind::Crash | RebootKind::CrashAndWipe => {
assert_reachable!("reboot: crash path");
self.sim.abort_all_connections_for_ip(ip_addr);
self.sim.simulate_crash_for_process(ip_addr, true);
if kind == RebootKind::CrashAndWipe {
self.sim.wipe_storage_for_process(ip_addr);
}
self.process_info
.dead_count
.set(self.process_info.dead_count.get() + 1);
let delay_ms = crate::sim::sim_random_range(recovery_delay_range_ms.clone()) as u64;
let recovery_delay = Duration::from_millis(delay_ms);
self.sim.schedule_process_restart(ip_addr, recovery_delay);
tracing::info!(
"Crashed process at IP {} (recovery in {:?})",
ip,
recovery_delay
);
}
}
Ok(())
}
pub fn reboot_random(&self, kind: RebootKind) -> SimulationResult<Option<String>> {
if self.process_info.process_ips.is_empty() {
return Ok(None);
}
let idx = crate::sim::sim_random_range(0..self.process_info.process_ips.len());
let ip = self.process_info.process_ips[idx].clone();
self.reboot(&ip, kind)?;
Ok(Some(ip))
}
pub fn reboot_tagged(
&self,
key: &str,
value: &str,
kind: RebootKind,
) -> SimulationResult<Vec<String>> {
let matching_ips: Vec<String> = self
.process_info
.tag_registry
.ips_tagged(key, value)
.into_iter()
.map(|ip| ip.to_string())
.collect();
for ip in &matching_ips {
self.reboot(ip, kind)?;
}
Ok(matching_ips)
}
}
#[async_trait(?Send)]
pub trait FaultInjector: 'static {
fn name(&self) -> &str;
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()>;
}
pub(crate) struct AttritionInjector {
config: super::process::Attrition,
}
impl AttritionInjector {
pub(crate) fn new(config: super::process::Attrition) -> Self {
Self { config }
}
}
#[async_trait(?Send)]
impl FaultInjector for AttritionInjector {
fn name(&self) -> &str {
"attrition"
}
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
while !ctx.chaos_shutdown().is_cancelled() {
let delay_ms = crate::sim::sim_random_range(1000..5000);
ctx.time()
.sleep(Duration::from_millis(delay_ms as u64))
.await
.map_err(|e| {
crate::SimulationError::InvalidState(format!("sleep failed: {}", e))
})?;
if ctx.chaos_shutdown().is_cancelled() {
break;
}
if ctx.process_ips().is_empty() {
continue;
}
if ctx.dead_count() >= self.config.max_dead {
assert_reachable!("attrition: max_dead limit enforced");
continue;
}
let rand_val = crate::sim::sim_random_range(0..10000) as f64 / 10000.0;
let kind = self.config.choose_kind(rand_val);
assert_sometimes_each!("attrition_reboot_kind", [("kind", kind as i64)]);
let recovery_range = self.config.recovery_delay_ms.clone().unwrap_or(1000..10000);
let grace_range = self.config.grace_period_ms.clone().unwrap_or(2000..5000);
if ctx.process_ips().is_empty() {
continue;
}
let idx = crate::sim::sim_random_range(0..ctx.process_ips().len());
let ip = ctx.process_ips()[idx].to_string();
assert_sometimes_each!("attrition_process_targeted", [("process_idx", idx as i64)]);
ctx.reboot_with_delays(&ip, kind, &recovery_range, &grace_range)?;
}
Ok(())
}
}