use std::time::Duration;
use async_trait::async_trait;
use moonpool_sim::{
Attrition, FaultContext, FaultInjector, Invariant, NetworkProvider, Process, RebootKind,
SIM_FAULT_TIMELINE, SimContext, SimFaultEvent, SimulationBuilder, SimulationResult,
StateHandle, TcpListenerTrait, TimeProvider, Workload, assert_always,
};
use std::cell::Cell;
struct EchoProcess;
#[async_trait(?Send)]
impl Process for EchoProcess {
fn name(&self) -> &str {
"echo"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let listener = ctx.network().bind(ctx.my_ip()).await?;
tracing::debug!("EchoProcess bound to {}", ctx.my_ip());
loop {
let result = tokio::select! {
r = listener.accept() => r,
_ = ctx.shutdown().cancelled() => return Ok(()),
};
match result {
Ok((mut stream, _)) => {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = [0u8; 64];
match stream.read(&mut buf).await {
Ok(n) if n > 0 => {
let _ = stream.write_all(&buf[..n]).await;
}
_ => {}
}
}
Err(e) => {
tracing::debug!("EchoProcess accept error (expected under chaos): {}", e);
}
}
}
}
}
struct ProcessMonitorWorkload;
#[async_trait(?Send)]
impl Workload for ProcessMonitorWorkload {
fn name(&self) -> &str {
"monitor"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let process_ips = ctx.topology().all_process_ips();
assert!(
!process_ips.is_empty(),
"workload should see process IPs in topology"
);
ctx.shutdown().cancelled().await;
Ok(())
}
}
#[test]
fn test_process_boot_and_topology() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(EchoProcess))
.workload(ProcessMonitorWorkload)
.set_iterations(1)
.set_debug_seeds(vec![42])
.run();
assert_eq!(report.successful_runs, 1, "simulation should succeed");
assert_eq!(report.failed_runs, 0);
}
struct TagVerifierWorkload;
#[async_trait(?Send)]
impl Workload for TagVerifierWorkload {
fn name(&self) -> &str {
"tag_verifier"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let process_ips = ctx.topology().all_process_ips();
assert_eq!(process_ips.len(), 4, "should have 4 processes");
let east_ips = ctx.topology().ips_tagged("dc", "east");
let west_ips = ctx.topology().ips_tagged("dc", "west");
assert_eq!(east_ips.len(), 2, "should have 2 east processes");
assert_eq!(west_ips.len(), 2, "should have 2 west processes");
ctx.shutdown().cancelled().await;
Ok(())
}
}
#[test]
fn test_process_tags_round_robin() {
let report = SimulationBuilder::new()
.processes(4, || Box::new(EchoProcess))
.tags(&[("dc", &["east", "west"])])
.expect("tags after processes")
.workload(TagVerifierWorkload)
.set_iterations(1)
.set_debug_seeds(vec![42])
.run();
assert_eq!(report.successful_runs, 1, "tag verification should succeed");
}
struct RebootOnceInjector;
#[async_trait(?Send)]
impl FaultInjector for RebootOnceInjector {
fn name(&self) -> &str {
"reboot_once"
}
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
let _ = ctx.time().sleep(Duration::from_millis(100)).await;
if !ctx.chaos_shutdown().is_cancelled() {
let rebooted = ctx.reboot_random(RebootKind::Crash)?;
if let Some(ip) = rebooted {
tracing::info!("Rebooted process at {}", ip);
}
}
ctx.chaos_shutdown().cancelled().await;
Ok(())
}
}
struct TimedWorkload(Duration);
#[async_trait(?Send)]
impl Workload for TimedWorkload {
fn name(&self) -> &str {
"timed"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
ctx.time().sleep(self.0).await.map_err(|e| {
moonpool_sim::SimulationError::InvalidState(format!("sleep failed: {}", e))
})?;
Ok(())
}
}
#[test]
fn test_manual_reboot_via_fault_injector() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(EchoProcess))
.workload(TimedWorkload(Duration::from_secs(15)))
.fault(RebootOnceInjector)
.chaos_duration(Duration::from_secs(5))
.set_iterations(3)
.set_debug_seeds(vec![42, 123, 999])
.run();
assert_eq!(
report.failed_runs, 0,
"all iterations should succeed after reboot + recovery"
);
}
#[test]
fn test_builtin_attrition() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(EchoProcess))
.workload(TimedWorkload(Duration::from_secs(25)))
.attrition(Attrition {
max_dead: 1,
prob_graceful: 0.3,
prob_crash: 0.5,
prob_wipe: 0.2,
recovery_delay_ms: None,
grace_period_ms: None,
})
.chaos_duration(Duration::from_secs(10))
.set_iterations(3)
.set_debug_seeds(vec![42, 123, 999])
.run();
assert_eq!(
report.failed_runs, 0,
"attrition should not cause workload failures"
);
}
struct RebootTaggedInjector;
#[async_trait(?Send)]
impl FaultInjector for RebootTaggedInjector {
fn name(&self) -> &str {
"reboot_tagged"
}
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
let _ = ctx.time().sleep(Duration::from_millis(100)).await;
if !ctx.chaos_shutdown().is_cancelled() {
let rebooted = ctx.reboot_tagged("dc", "east", RebootKind::Crash)?;
tracing::info!("Rebooted {} east processes", rebooted.len());
}
ctx.chaos_shutdown().cancelled().await;
Ok(())
}
}
#[test]
fn test_tag_based_reboot() {
let report = SimulationBuilder::new()
.processes(4, || Box::new(EchoProcess))
.tags(&[("dc", &["east", "west"])])
.expect("tags after processes")
.workload(TimedWorkload(Duration::from_secs(20)))
.fault(RebootTaggedInjector)
.chaos_duration(Duration::from_secs(5))
.set_iterations(1)
.set_debug_seeds(vec![42])
.run();
assert_eq!(report.failed_runs, 0);
}
struct TagAwareProcess;
#[async_trait(?Send)]
impl Process for TagAwareProcess {
fn name(&self) -> &str {
"tag_aware"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let my_tags = ctx.topology().my_tags();
let role = my_tags.get("role");
tracing::info!("Process at {} has role={:?}", ctx.my_ip(), role);
if role.is_none() {
return Err(moonpool_sim::SimulationError::InvalidState(
"process should have a role tag".into(),
));
}
ctx.shutdown().cancelled().await;
Ok(())
}
}
#[test]
fn test_process_reads_own_tags() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(TagAwareProcess))
.tags(&[("role", &["leader", "follower"])])
.expect("tags after processes")
.workload(TimedWorkload(Duration::from_secs(1)))
.set_iterations(1)
.set_debug_seeds(vec![42])
.run();
assert_eq!(report.successful_runs, 1);
}
struct GracefulProcess;
#[async_trait(?Send)]
impl Process for GracefulProcess {
fn name(&self) -> &str {
"graceful"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let listener = ctx.network().bind(ctx.my_ip()).await?;
tracing::info!("GracefulProcess bound to {}", ctx.my_ip());
loop {
tokio::select! {
result = listener.accept() => {
match result {
Ok((mut stream, _)) => {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = [0u8; 64];
if let Ok(n) = stream.read(&mut buf).await {
if n > 0 {
let _ = stream.write_all(&buf[..n]).await;
}
}
}
Err(_) => {}
}
}
_ = ctx.shutdown().cancelled() => {
tracing::info!("GracefulProcess at {} saw shutdown, exiting cleanly", ctx.my_ip());
return Ok(());
}
}
}
}
}
struct GracefulRebootInjector;
#[async_trait(?Send)]
impl FaultInjector for GracefulRebootInjector {
fn name(&self) -> &str {
"graceful_reboot"
}
async fn inject(&mut self, ctx: &FaultContext) -> SimulationResult<()> {
let _ = ctx.time().sleep(Duration::from_millis(100)).await;
if !ctx.chaos_shutdown().is_cancelled() {
let rebooted = ctx.reboot_random(RebootKind::Graceful)?;
if let Some(ip) = rebooted {
tracing::info!("Initiated graceful reboot for {}", ip);
}
}
ctx.chaos_shutdown().cancelled().await;
Ok(())
}
}
#[test]
fn test_graceful_reboot_signals_shutdown_token() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(GracefulProcess))
.workload(TimedWorkload(Duration::from_secs(25)))
.fault(GracefulRebootInjector)
.chaos_duration(Duration::from_secs(10))
.set_iterations(3)
.set_debug_seeds(vec![42, 123, 999])
.run();
assert_eq!(
report.failed_runs, 0,
"graceful reboot should succeed — process exits cleanly on shutdown signal"
);
}
struct StuckProcess;
#[async_trait(?Send)]
impl Process for StuckProcess {
fn name(&self) -> &str {
"stuck"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let _listener = ctx.network().bind(ctx.my_ip()).await?;
tracing::info!("StuckProcess bound to {}", ctx.my_ip());
loop {
let _ = ctx.time().sleep(Duration::from_secs(1)).await;
}
}
}
#[test]
fn test_graceful_reboot_force_kills_stuck_process() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(StuckProcess))
.workload(TimedWorkload(Duration::from_secs(25)))
.fault(GracefulRebootInjector)
.chaos_duration(Duration::from_secs(10))
.set_iterations(3)
.set_debug_seeds(vec![42, 123, 999])
.run();
assert_eq!(
report.failed_runs, 0,
"stuck process should be force-killed after grace period and restarted"
);
}
struct RebootTimingInvariant {
last_checked: Cell<usize>,
}
impl RebootTimingInvariant {
fn new() -> Self {
Self {
last_checked: Cell::new(0),
}
}
}
impl Invariant for RebootTimingInvariant {
fn name(&self) -> &str {
"reboot_timing"
}
fn check(&self, state: &StateHandle, _sim_time_ms: u64) {
let Some(tl) = state.timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE) else {
return;
};
let entries = tl.all();
let len = entries.len();
if len == self.last_checked.get() {
return; }
self.last_checked.set(len);
for (i, entry) in entries.iter().enumerate() {
if let SimFaultEvent::ProcessForceKill { ip } = &entry.event {
for j in (0..i).rev() {
if let SimFaultEvent::ProcessGracefulShutdown {
ip: gs_ip,
grace_period_ms,
} = &entries[j].event
{
if gs_ip == ip {
let actual_delta = entry.time_ms - entries[j].time_ms;
assert_always!(
actual_delta == *grace_period_ms,
format!(
"Grace period mismatch for {}: expected {}ms, got {}ms",
ip, grace_period_ms, actual_delta
)
);
break;
}
}
}
}
}
for (i, entry) in entries.iter().enumerate() {
if let SimFaultEvent::ProcessRestart { ip } = &entry.event {
for j in (0..i).rev() {
if let SimFaultEvent::ProcessForceKill { ip: fk_ip } = &entries[j].event {
if fk_ip == ip {
assert_always!(
entry.time_ms > entries[j].time_ms,
format!(
"ProcessRestart at {}ms should be after ProcessForceKill at {}ms for {}",
entry.time_ms, entries[j].time_ms, ip
)
);
break;
}
}
}
}
}
}
}
#[test]
fn test_graceful_reboot_timing_invariant() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(GracefulProcess))
.workload(TimedWorkload(Duration::from_secs(25)))
.fault(GracefulRebootInjector)
.invariant(RebootTimingInvariant::new())
.chaos_duration(Duration::from_secs(10))
.set_iterations(3)
.set_debug_seeds(vec![42, 123, 999])
.run();
assert_eq!(
report.failed_runs, 0,
"graceful reboot timing invariant should pass"
);
}
#[test]
fn test_attrition_timing_invariant() {
let report = SimulationBuilder::new()
.processes(3, || Box::new(EchoProcess))
.workload(TimedWorkload(Duration::from_secs(25)))
.attrition(Attrition {
max_dead: 1,
prob_graceful: 0.5,
prob_crash: 0.3,
prob_wipe: 0.2,
recovery_delay_ms: Some(500..2000),
grace_period_ms: Some(1000..3000),
})
.invariant(RebootTimingInvariant::new())
.chaos_duration(Duration::from_secs(10))
.set_iterations(5)
.set_debug_seeds(vec![42, 123, 999, 7, 314])
.run();
assert_eq!(
report.failed_runs, 0,
"attrition with timing invariant should pass"
);
}
#[test]
fn test_max_dead_limits_concurrent_kills_via_attrition() {
let report = SimulationBuilder::new()
.processes(5, || Box::new(EchoProcess))
.workload(TimedWorkload(Duration::from_secs(25)))
.attrition(Attrition {
max_dead: 1,
prob_graceful: 0.0,
prob_crash: 1.0,
prob_wipe: 0.0,
recovery_delay_ms: Some(500..2000),
grace_period_ms: None,
})
.chaos_duration(Duration::from_secs(10))
.set_iterations(5)
.set_debug_seeds(vec![42, 123, 999, 7, 314])
.run();
assert_eq!(
report.failed_runs, 0,
"attrition with max_dead=1 should not cause failures"
);
}