use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tracing::field::valuable;
use crate::chaos::fault_events::{SIM_FAULT_TRAIL, SimFaultEvent};
use crate::chaos::state_handle::StateHandle;
use crate::observability::SimulationLayerHandle;
use crate::runner::builder::WorkloadClientInfo;
use crate::runner::context::SimContext;
use crate::runner::fault_injector::{FaultContext, FaultInjector};
use crate::runner::process::Process;
use crate::runner::tags::{ProcessTags, TagRegistry};
use crate::runner::topology::{TopologyFactory, TopologyInputs};
use crate::runner::workload::Workload;
use crate::{
SimulationResult, assert_always_less_than_or_equal_to, assert_reachable, chaos::AssertionStats,
};
use super::report::SimulationMetrics;
pub(crate) const DEFAULT_RUN_TIME_BUDGET: Duration = Duration::from_hours(1);
#[derive(Debug, Default)]
pub(crate) struct DeadlockDetector {
no_progress_count: usize,
threshold: usize,
}
impl DeadlockDetector {
pub(crate) fn new(threshold: usize) -> Self {
Self {
no_progress_count: 0,
threshold,
}
}
pub(crate) fn check_deadlock(
&mut self,
handles_count: usize,
initial_handle_count: usize,
event_count: usize,
initial_event_count: usize,
) -> bool {
if event_count == 0 && handles_count == initial_handle_count && initial_event_count == 0 {
self.no_progress_count += 1;
self.no_progress_count > self.threshold
} else {
self.no_progress_count = 0;
false
}
}
pub(crate) fn no_progress_count(&self) -> usize {
self.no_progress_count
}
pub(crate) fn reset(&mut self) {
self.no_progress_count = 0;
}
}
pub(crate) struct ProcessConfig<'a> {
pub(crate) factory: &'a dyn Fn() -> Box<dyn Process>,
pub(crate) info: Vec<(String, String)>,
pub(crate) ips: Vec<String>,
pub(crate) tag_registry: TagRegistry,
}
struct ProcessManager<'a> {
factory: Option<&'a dyn Fn() -> Box<dyn Process>>,
handles: Vec<Option<tokio::task::JoinHandle<()>>>,
process_tokens: Vec<Option<tokio_util::sync::CancellationToken>>,
ips: Vec<String>,
tag_registry: TagRegistry,
all_entities: Vec<(String, String)>,
dead_count: Arc<AtomicUsize>,
}
impl<'a> ProcessManager<'a> {
fn new_empty() -> Self {
Self {
factory: None,
handles: Vec::new(),
process_tokens: Vec::new(),
ips: Vec::new(),
tag_registry: TagRegistry::default(),
all_entities: Vec::new(),
dead_count: Arc::new(AtomicUsize::new(0)),
}
}
fn new(
factory: &'a dyn Fn() -> Box<dyn Process>,
handles: Vec<Option<tokio::task::JoinHandle<()>>>,
process_tokens: Vec<Option<tokio_util::sync::CancellationToken>>,
ips: Vec<String>,
tag_registry: TagRegistry,
all_entities: Vec<(String, String)>,
) -> Self {
Self {
factory: Some(factory),
handles,
process_tokens,
ips,
tag_registry,
all_entities,
dead_count: Arc::new(AtomicUsize::new(0)),
}
}
fn dead_count(&self) -> Arc<AtomicUsize> {
self.dead_count.clone()
}
fn index_for_ip(&self, ip: std::net::IpAddr) -> Option<usize> {
let ip_str = ip.to_string();
self.ips.iter().position(|p| p == &ip_str)
}
fn signal_graceful_shutdown(&mut self, ip: std::net::IpAddr) {
let Some(idx) = self.index_for_ip(ip) else {
tracing::warn!("ProcessGracefulShutdown for unknown IP {}", ip);
return;
};
if let Some(token) = &self.process_tokens[idx] {
token.cancel();
self.dead_count.fetch_add(1, Ordering::Relaxed);
assert_always_less_than_or_equal_to!(
self.dead_count.load(Ordering::Relaxed),
self.ips.len(),
"dead_count <= process_count"
);
assert_reachable!("process_manager: graceful shutdown signaled");
tracing::info!(
"Signaled graceful shutdown for process at {} (index {})",
ip,
idx
);
}
}
fn abort_process(&mut self, ip: std::net::IpAddr) {
let Some(idx) = self.index_for_ip(ip) else {
tracing::warn!("ProcessForceKill for unknown IP {}", ip);
return;
};
if let Some(handle) = self.handles[idx].take() {
handle.abort();
tracing::info!("Force-killed process at {} (index {})", ip, idx);
}
self.process_tokens[idx] = None;
}
fn handle_restart(
&mut self,
ip: std::net::IpAddr,
sim: &crate::sim::WeakSimWorld,
seed: u64,
state: &StateHandle,
obs: &SimulationLayerHandle,
shutdown_signal: &tokio_util::sync::CancellationToken,
) {
let ip_str = ip.to_string();
let Some(idx) = self.index_for_ip(ip) else {
tracing::warn!("ProcessRestart for unknown IP {}", ip);
return;
};
let Some(factory) = self.factory else {
tracing::warn!("ProcessRestart but no process factory configured");
return;
};
if let Some(handle) = self.handles[idx].take() {
handle.abort();
}
let process_token = shutdown_signal.child_token();
self.process_tokens[idx] = Some(process_token.clone());
let mut process = factory();
let process_tags = self.tag_registry.tags_for(ip).cloned().unwrap_or_default();
let topology = TopologyFactory::create_topology_with_processes(TopologyInputs {
ip: &ip_str,
client_id: idx,
client_count: self.ips.len(),
all_entities: &self.all_entities,
process_ips: &self.ips,
my_tags: process_tags,
tag_registry: self.tag_registry.clone(),
shutdown_signal: process_token,
});
let providers = crate::SimProviders::new(sim.clone(), seed, ip);
let ctx = SimContext::new(providers, topology, state.clone(), obs.clone());
let ip_for_log = ip_str.clone();
let handle = tokio::spawn(async move {
if let Err(e) = process.run(&ctx).await {
tracing::debug!("Restarted process at {} exited: {}", ip_for_log, e);
}
});
self.handles[idx] = Some(handle);
let current = self.dead_count.load(Ordering::Relaxed);
if current > 0 {
self.dead_count.fetch_sub(1, Ordering::Relaxed);
}
assert_reachable!("process_manager: process restarted");
tracing::info!("Process at {} restarted (index {})", ip_str, idx);
}
fn abort_all(&mut self) {
for handle_opt in &mut self.handles {
if let Some(handle) = handle_opt.take() {
handle.abort();
}
}
}
}
pub(crate) struct WorkloadOrchestrator;
type WorkloadResult = (Box<dyn Workload>, SimulationResult<()>);
type SetupTaskOutput = (Box<dyn Workload>, SimContext, SimulationResult<()>);
type SetupHandle = tokio::task::JoinHandle<SetupTaskOutput>;
type ProcessHandleSlots = Vec<Option<tokio::task::JoinHandle<()>>>;
type ProcessTokenSlots = Vec<Option<tokio_util::sync::CancellationToken>>;
type InjectorHandleSlots = Vec<Option<tokio::task::JoinHandle<InjectorResult>>>;
type WorkloadHandleSlots = Vec<Option<tokio::task::JoinHandle<WorkloadResult>>>;
struct CheckPhaseInputs<'a> {
sim: &'a mut crate::sim::SimWorld,
workloads: Vec<Box<dyn Workload>>,
workload_info: &'a [(String, String)],
client_info: &'a [WorkloadClientInfo],
all_entities: &'a [(String, String)],
process_ips: &'a [String],
tag_registry: &'a TagRegistry,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
seed: u64,
state: &'a StateHandle,
obs: &'a SimulationLayerHandle,
}
struct PhaseEnv<'a, 'pm> {
sim: &'a mut crate::sim::SimWorld,
process_manager: &'a mut ProcessManager<'pm>,
seed: u64,
state: &'a StateHandle,
obs: &'a SimulationLayerHandle,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
}
struct RunPhaseInputs<'a, 'pm> {
sim: &'a mut crate::sim::SimWorld,
process_manager: &'a mut ProcessManager<'pm>,
obs: &'a SimulationLayerHandle,
state: &'a StateHandle,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
chaos_shutdown: &'a tokio_util::sync::CancellationToken,
chaos_duration: Option<Duration>,
all_ips: &'a [String],
workload_handles: &'a mut WorkloadHandleSlots,
workload_collected: &'a mut [Option<WorkloadResult>],
injector_handles: &'a mut InjectorHandleSlots,
seed: u64,
iteration_count: usize,
run_time_budget: Duration,
}
struct WorkloadContextEnv<'a> {
workload_info: &'a [(String, String)],
client_info: &'a [WorkloadClientInfo],
all_entities: &'a [(String, String)],
process_ips: &'a [String],
tag_registry: &'a TagRegistry,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
sim: &'a crate::sim::SimWorld,
seed: u64,
state: &'a StateHandle,
obs: &'a SimulationLayerHandle,
}
type InjectorResult = (Box<dyn FaultInjector>, SimulationResult<()>);
pub(crate) struct OrchestrateInputs<'a> {
pub(crate) workloads: Vec<Box<dyn Workload>>,
pub(crate) fault_injectors: Vec<Box<dyn FaultInjector>>,
pub(crate) obs: SimulationLayerHandle,
pub(crate) workload_info: &'a [(String, String)],
pub(crate) client_info: &'a [WorkloadClientInfo],
pub(crate) process_config: Option<ProcessConfig<'a>>,
pub(crate) seed: u64,
pub(crate) sim: crate::sim::SimWorld,
pub(crate) chaos_duration: Option<Duration>,
pub(crate) iteration_count: usize,
pub(crate) run_time_budget: Duration,
}
pub(crate) struct OrchestrateOutput {
pub(crate) workloads: Vec<Box<dyn Workload>>,
pub(crate) fault_injectors: Vec<Box<dyn FaultInjector>>,
pub(crate) results: Vec<SimulationResult<()>>,
pub(crate) metrics: SimulationMetrics,
}
struct FinalizeOrchestration<'a, 'pm> {
sim: &'a mut crate::sim::SimWorld,
process_manager: &'a mut ProcessManager<'pm>,
returned_workloads: Vec<Box<dyn Workload>>,
returned_injectors: Vec<Box<dyn FaultInjector>>,
results: Vec<SimulationResult<()>>,
seed: u64,
state: &'a StateHandle,
obs: &'a SimulationLayerHandle,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
workload_info: &'a [(String, String)],
client_info: &'a [WorkloadClientInfo],
all_entities: &'a [(String, String)],
process_ips: &'a [String],
tag_registry: &'a TagRegistry,
}
struct TopologyMetadata {
process_ips: Vec<String>,
tag_registry: TagRegistry,
all_entities: Vec<(String, String)>,
}
struct BootAndSetupInputs<'a, 'pm> {
process_config: Option<ProcessConfig<'pm>>,
workloads: Vec<Box<dyn Workload>>,
workload_info: &'a [(String, String)],
client_info: &'a [WorkloadClientInfo],
all_entities: &'a [(String, String)],
process_ips: &'a [String],
tag_registry: &'a TagRegistry,
sim: &'a mut crate::sim::SimWorld,
seed: u64,
state: &'a StateHandle,
obs: &'a SimulationLayerHandle,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
}
enum BootAndSetupOutcome<'pm> {
Continue {
workloads: Vec<Box<dyn Workload>>,
contexts: Vec<SimContext>,
process_manager: ProcessManager<'pm>,
},
SetupFailed {
workloads: Vec<Box<dyn Workload>>,
results: Vec<SimulationResult<()>>,
},
}
struct ChaosAndRunInputs<'a, 'pm> {
sim: &'a mut crate::sim::SimWorld,
process_manager: &'a mut ProcessManager<'pm>,
workloads: Vec<Box<dyn Workload>>,
contexts: Vec<SimContext>,
fault_injectors: Vec<Box<dyn FaultInjector>>,
chaos_duration: Option<Duration>,
all_entities: &'a [(String, String)],
state: &'a StateHandle,
obs: &'a SimulationLayerHandle,
shutdown_signal: &'a tokio_util::sync::CancellationToken,
seed: u64,
iteration_count: usize,
run_time_budget: Duration,
}
struct ChaosAndRunOutput {
returned_workloads: Vec<Box<dyn Workload>>,
returned_injectors: Vec<Box<dyn FaultInjector>>,
results: Vec<SimulationResult<()>>,
}
struct BudgetGuardInputs {
current_active: usize,
now: Duration,
run_start: Duration,
run_time_budget: Duration,
budget_breach_time: Option<Duration>,
seed: u64,
iteration_count: usize,
}
struct StallGuardInputs<'a> {
sim: &'a crate::sim::SimWorld,
deadlock_detector: &'a mut DeadlockDetector,
run_start: Duration,
run_time_budget: Duration,
budget_breach_time: &'a mut Option<Duration>,
shutdown_triggered: bool,
current_active: usize,
initial_handle_count: usize,
initial_event_count: usize,
seed: u64,
iteration_count: usize,
}
struct NoProgressInputs {
current_active: usize,
initial_handle_count: usize,
event_count: usize,
initial_event_count: usize,
seed: u64,
iteration_count: usize,
}
#[derive(Clone, Copy)]
enum StallOutcome {
Ok,
Breached,
Deadlock,
}
impl StallOutcome {
fn merge(self, other: Self) -> Self {
match (self, other) {
(Self::Deadlock, _) | (_, Self::Deadlock) => Self::Deadlock,
(Self::Breached, _) | (_, Self::Breached) => Self::Breached,
_ => Self::Ok,
}
}
}
impl WorkloadOrchestrator {
pub(crate) async fn orchestrate_workloads(
inputs: OrchestrateInputs<'_>,
) -> Result<OrchestrateOutput, (Vec<u64>, usize)> {
let OrchestrateInputs {
workloads,
fault_injectors,
obs,
workload_info,
client_info,
process_config,
seed,
mut sim,
chaos_duration,
iteration_count,
run_time_budget,
} = inputs;
tracing::debug!(
"Orchestrating {} workload(s), {} fault injector(s), {} process(es)",
workloads.len(),
fault_injectors.len(),
process_config.as_ref().map_or(0, |pc| pc.ips.len()),
);
let TopologyMetadata {
process_ips,
tag_registry,
all_entities,
} = Self::build_topology_metadata(workload_info, process_config.as_ref());
let state = StateHandle::new();
let shutdown_signal = tokio_util::sync::CancellationToken::new();
let (workloads, contexts, mut process_manager) =
match Self::boot_and_setup(BootAndSetupInputs {
process_config,
workloads,
workload_info,
client_info,
all_entities: &all_entities,
process_ips: &process_ips,
tag_registry: &tag_registry,
sim: &mut sim,
seed,
state: &state,
obs: &obs,
shutdown_signal: &shutdown_signal,
})
.await?
{
BootAndSetupOutcome::Continue {
workloads,
contexts,
process_manager,
} => (workloads, contexts, process_manager),
BootAndSetupOutcome::SetupFailed { workloads, results } => {
return Ok(OrchestrateOutput {
workloads,
fault_injectors,
results,
metrics: sim.extract_metrics(),
});
}
};
let ChaosAndRunOutput {
returned_workloads,
returned_injectors,
results,
} = Self::do_chaos_and_run_phase(ChaosAndRunInputs {
sim: &mut sim,
process_manager: &mut process_manager,
workloads,
contexts,
fault_injectors,
chaos_duration,
all_entities: &all_entities,
state: &state,
obs: &obs,
shutdown_signal: &shutdown_signal,
seed,
iteration_count,
run_time_budget,
})
.await?;
Self::finalize_orchestration(FinalizeOrchestration {
sim: &mut sim,
process_manager: &mut process_manager,
returned_workloads,
returned_injectors,
results,
seed,
state: &state,
obs: &obs,
shutdown_signal: &shutdown_signal,
workload_info,
client_info,
all_entities: &all_entities,
process_ips: &process_ips,
tag_registry: &tag_registry,
})
.await
}
fn build_topology_metadata(
workload_info: &[(String, String)],
process_config: Option<&ProcessConfig<'_>>,
) -> TopologyMetadata {
let process_ips = process_config.map(|pc| pc.ips.clone()).unwrap_or_default();
let tag_registry = process_config
.map(|pc| pc.tag_registry.clone())
.unwrap_or_default();
let all_entities = workload_info
.iter()
.chain(process_config.map_or(&[][..], |pc| pc.info.as_slice()))
.cloned()
.collect();
TopologyMetadata {
process_ips,
tag_registry,
all_entities,
}
}
async fn boot_and_setup<'pm>(
inputs: BootAndSetupInputs<'_, 'pm>,
) -> Result<BootAndSetupOutcome<'pm>, (Vec<u64>, usize)> {
let BootAndSetupInputs {
process_config,
workloads,
workload_info,
client_info,
all_entities,
process_ips,
tag_registry,
sim,
seed,
state,
obs,
shutdown_signal,
} = inputs;
let mut process_manager = Self::boot_and_wrap_process_manager(
process_config,
all_entities,
sim,
seed,
state,
obs,
shutdown_signal,
)
.map_err(|()| (vec![seed], 1usize))?;
let contexts = Self::build_workload_contexts(&WorkloadContextEnv {
workload_info,
client_info,
all_entities,
process_ips,
tag_registry,
shutdown_signal,
sim,
seed,
state,
obs,
})
.map_err(|()| (vec![seed], 1usize))?;
let (workloads, contexts, setup_results, setup_failed) = Self::do_setup_phase(
workloads,
contexts,
PhaseEnv {
sim,
process_manager: &mut process_manager,
seed,
state,
obs,
shutdown_signal,
},
)
.await;
if setup_failed {
process_manager.abort_all();
return Ok(BootAndSetupOutcome::SetupFailed {
workloads,
results: setup_results,
});
}
Ok(BootAndSetupOutcome::Continue {
workloads,
contexts,
process_manager,
})
}
async fn do_chaos_and_run_phase(
inputs: ChaosAndRunInputs<'_, '_>,
) -> Result<ChaosAndRunOutput, (Vec<u64>, usize)> {
let ChaosAndRunInputs {
sim,
process_manager,
workloads,
contexts,
fault_injectors,
chaos_duration,
all_entities,
state,
obs,
shutdown_signal,
seed,
iteration_count,
run_time_budget,
} = inputs;
let chaos_shutdown = tokio_util::sync::CancellationToken::new();
let all_ips: Vec<String> = all_entities.iter().map(|(_, ip)| ip.clone()).collect();
let (mut injector_handles, parked_injectors) = Self::start_fault_injectors(
fault_injectors,
chaos_duration,
sim,
process_manager,
seed,
&chaos_shutdown,
)
.map_err(|()| (vec![seed], 1usize))?;
let total_workloads = workloads.len();
let mut workload_handles: WorkloadHandleSlots = Self::spawn_run_tasks(workloads, contexts);
let mut workload_collected: Vec<Option<WorkloadResult>> =
(0..total_workloads).map(|_| None).collect();
Self::drive_run_phase(RunPhaseInputs {
sim,
process_manager,
obs,
state,
shutdown_signal,
chaos_shutdown: &chaos_shutdown,
chaos_duration,
all_ips: &all_ips,
workload_handles: &mut workload_handles,
workload_collected: &mut workload_collected,
injector_handles: &mut injector_handles,
seed,
iteration_count,
run_time_budget,
})
.await?;
let returned_injectors =
Self::collect_injector_results(parked_injectors, injector_handles).await;
let (returned_workloads, results) =
Self::collect_workload_results(workload_collected, total_workloads);
Ok(ChaosAndRunOutput {
returned_workloads,
returned_injectors,
results,
})
}
async fn finalize_orchestration(
inputs: FinalizeOrchestration<'_, '_>,
) -> Result<OrchestrateOutput, (Vec<u64>, usize)> {
let FinalizeOrchestration {
sim,
process_manager,
returned_workloads,
returned_injectors,
results,
seed,
state,
obs,
shutdown_signal,
workload_info,
client_info,
all_entities,
process_ips,
tag_registry,
} = inputs;
process_manager.abort_all();
if let Some(settle_err) = Self::settle_phase(sim) {
return Ok(OrchestrateOutput {
workloads: returned_workloads,
fault_injectors: returned_injectors,
results: vec![Err(settle_err)],
metrics: sim.extract_metrics(),
});
}
let final_workloads = Self::do_check_phase(CheckPhaseInputs {
sim,
workloads: returned_workloads,
workload_info,
client_info,
all_entities,
process_ips,
tag_registry,
shutdown_signal,
seed,
state,
obs,
})
.await
.map_err(|()| (vec![seed], 1usize))?;
let metrics = sim.extract_metrics();
Self::maybe_exit_child(&results);
Ok(OrchestrateOutput {
workloads: final_workloads,
fault_injectors: returned_injectors,
results,
metrics,
})
}
fn maybe_exit_child(results: &[SimulationResult<()>]) {
if !moonpool_explorer::explorer_is_child() {
return;
}
let success = results.iter().all(std::result::Result::is_ok)
&& !crate::chaos::has_always_violations();
let code = if success { 0 } else { 42 };
moonpool_explorer::exit_child(code);
}
async fn do_check_phase(inputs: CheckPhaseInputs<'_>) -> Result<Vec<Box<dyn Workload>>, ()> {
let CheckPhaseInputs {
sim,
workloads,
workload_info,
client_info,
all_entities,
process_ips,
tag_registry,
shutdown_signal,
seed,
state,
obs,
} = inputs;
let check_contexts = Self::build_workload_contexts(&WorkloadContextEnv {
workload_info,
client_info,
all_entities,
process_ips,
tag_registry,
shutdown_signal,
sim,
seed,
state,
obs,
})?;
Ok(Self::run_check_phase(sim, workloads, check_contexts).await)
}
async fn do_setup_phase(
workloads: Vec<Box<dyn Workload>>,
contexts: Vec<SimContext>,
env: PhaseEnv<'_, '_>,
) -> (
Vec<Box<dyn Workload>>,
Vec<SimContext>,
Vec<SimulationResult<()>>,
bool,
) {
let setup_handles = Self::spawn_setup_tasks(workloads, contexts);
Self::cooperative_loop_until_done(
env.sim,
env.process_manager,
env.seed,
env.state,
env.obs,
env.shutdown_signal,
&setup_handles,
)
.await;
Self::collect_setup_results(setup_handles).await
}
fn spawn_setup_tasks(
workloads: Vec<Box<dyn Workload>>,
contexts: Vec<SimContext>,
) -> Vec<SetupHandle> {
let mut setup_handles = Vec::with_capacity(workloads.len());
for (workload, ctx) in workloads.into_iter().zip(contexts) {
let handle = tokio::spawn(async move {
let mut w = workload;
let result = w.setup(&ctx).await;
(w, ctx, result)
});
setup_handles.push(handle);
}
setup_handles
}
fn spawn_run_tasks(
workloads: Vec<Box<dyn Workload>>,
contexts: Vec<SimContext>,
) -> WorkloadHandleSlots {
let mut workload_handles = Vec::with_capacity(workloads.len());
for (workload, ctx) in workloads.into_iter().zip(contexts) {
let handle = tokio::spawn(async move {
let mut w = workload;
let result = w.run(&ctx).await;
(w, result)
});
workload_handles.push(Some(handle));
}
workload_handles
}
async fn drive_run_phase(inputs: RunPhaseInputs<'_, '_>) -> Result<(), (Vec<u64>, usize)> {
let RunPhaseInputs {
sim,
process_manager,
obs,
state,
shutdown_signal,
chaos_shutdown,
chaos_duration,
all_ips,
workload_handles,
workload_collected,
injector_handles,
seed,
iteration_count,
run_time_budget,
} = inputs;
let chaos_start = sim.current_time();
let run_start = sim.current_time();
let mut chaos_ended = chaos_duration.is_none();
let mut deadlock_detector = DeadlockDetector::new(3);
let mut shutdown_triggered = false;
let mut budget_breach_time: Option<Duration> = None;
let mut loop_count: u64 = 0;
loop {
let active_workloads = workload_handles.iter().filter(|h| h.is_some()).count();
if active_workloads == 0 {
break;
}
loop_count += 1;
if loop_count.is_multiple_of(100) {
tracing::debug!(
"Cooperative loop iteration {}, {} handles active, {} pending events",
loop_count,
active_workloads,
sim.pending_event_count()
);
}
let initial_handle_count = active_workloads;
let initial_event_count = sim.pending_event_count();
if !chaos_ended && Self::should_end_chaos(sim, chaos_start, chaos_duration) {
tracing::debug!("Chaos phase ended");
chaos_shutdown.cancel();
Self::heal_all_partitions(sim, all_ips);
chaos_ended = true;
assert_reachable!("phase: chaos ended");
}
if sim.pending_event_count() > 0 {
sim.step();
obs.set_sim_time_ms(
u64::try_from(sim.current_time().as_millis()).unwrap_or(u64::MAX),
);
Self::handle_process_events(
sim,
process_manager,
seed,
state,
obs,
shutdown_signal,
);
}
let any_finished =
Self::collect_finished_workloads(workload_handles, workload_collected).await;
if any_finished && !shutdown_triggered {
Self::trigger_shutdown(sim, shutdown_signal);
shutdown_triggered = true;
}
Self::reap_finished_injectors(injector_handles).await;
let current_active = workload_handles.iter().filter(|h| h.is_some()).count();
let stall = Self::evaluate_stall_guards(StallGuardInputs {
sim,
deadlock_detector: &mut deadlock_detector,
run_start,
run_time_budget,
budget_breach_time: &mut budget_breach_time,
shutdown_triggered,
current_active,
initial_handle_count,
initial_event_count,
seed,
iteration_count,
});
match stall {
StallOutcome::Ok => {}
StallOutcome::Breached => {
Self::trigger_shutdown(sim, shutdown_signal);
shutdown_triggered = true;
deadlock_detector.reset();
}
StallOutcome::Deadlock => return Err((vec![seed], 1)),
}
if current_active > 0 {
tokio::task::yield_now().await;
}
}
Ok(())
}
fn evaluate_stall_guards(inputs: StallGuardInputs<'_>) -> StallOutcome {
let StallGuardInputs {
sim,
deadlock_detector,
run_start,
run_time_budget,
budget_breach_time,
shutdown_triggered,
current_active,
initial_handle_count,
initial_event_count,
seed,
iteration_count,
} = inputs;
let budget = Self::check_run_time_budget(&BudgetGuardInputs {
current_active,
now: sim.current_time(),
run_start,
run_time_budget,
budget_breach_time: *budget_breach_time,
seed,
iteration_count,
});
if let StallOutcome::Breached = budget {
*budget_breach_time = Some(sim.current_time());
}
let no_progress = Self::check_no_progress(
deadlock_detector,
shutdown_triggered,
&NoProgressInputs {
current_active,
initial_handle_count,
event_count: sim.pending_event_count(),
initial_event_count,
seed,
iteration_count,
},
);
budget.merge(no_progress)
}
fn check_no_progress(
deadlock_detector: &mut DeadlockDetector,
shutdown_triggered: bool,
inputs: &NoProgressInputs,
) -> StallOutcome {
let &NoProgressInputs {
current_active,
initial_handle_count,
event_count,
initial_event_count,
seed,
iteration_count,
} = inputs;
if !deadlock_detector.check_deadlock(
current_active,
initial_handle_count,
event_count,
initial_event_count,
) {
return StallOutcome::Ok;
}
if shutdown_triggered {
tracing::error!(
"DEADLOCK detected on iteration {} with seed {}: {} tasks remaining after {} no-progress iterations",
iteration_count,
seed,
current_active,
deadlock_detector.no_progress_count()
);
return StallOutcome::Deadlock;
}
tracing::warn!(
"No progress detected on iteration {} with seed {}: {} tasks remaining. Triggering shutdown to unblock workloads.",
iteration_count,
seed,
current_active,
);
StallOutcome::Breached
}
fn check_run_time_budget(inputs: &BudgetGuardInputs) -> StallOutcome {
let &BudgetGuardInputs {
current_active,
now,
run_start,
run_time_budget,
budget_breach_time,
seed,
iteration_count,
} = inputs;
if current_active == 0 {
return StallOutcome::Ok;
}
let run_elapsed = now.saturating_sub(run_start);
match budget_breach_time {
None if run_elapsed > run_time_budget => {
tracing::warn!(
"Run-phase virtual-time budget exceeded on iteration {} with seed {}: simulated time advanced {:?} (budget {:?}) with {} workload(s) still running. Triggering shutdown to unblock workloads.",
iteration_count,
seed,
run_elapsed,
run_time_budget,
current_active,
);
StallOutcome::Breached
}
Some(breach) if now.saturating_sub(breach) > run_time_budget => {
tracing::error!(
"DEADLOCK detected on iteration {} with seed {}: run-phase virtual time advanced {:?} (budget {:?}) and kept climbing for another {:?} after shutdown with {} workload(s) still running — self-perpetuating timer making no workload progress",
iteration_count,
seed,
run_elapsed,
run_time_budget,
now.saturating_sub(breach),
current_active,
);
StallOutcome::Deadlock
}
_ => StallOutcome::Ok,
}
}
fn start_fault_injectors(
fault_injectors: Vec<Box<dyn FaultInjector>>,
chaos_duration: Option<Duration>,
sim: &crate::sim::SimWorld,
process_manager: &ProcessManager<'_>,
seed: u64,
chaos_shutdown: &tokio_util::sync::CancellationToken,
) -> Result<(InjectorHandleSlots, Vec<Box<dyn FaultInjector>>), ()> {
let mut injector_handles: InjectorHandleSlots = Vec::new();
let mut parked_injectors: Vec<Box<dyn FaultInjector>> = Vec::new();
if chaos_duration.is_some() {
for fi in fault_injectors {
let fault_sim = sim.downgrade().upgrade().map_err(|_| ())?;
let fault_ctx = FaultContext::new(
fault_sim,
crate::runner::fault_injector::ProcessInfo {
process_ips: process_manager.ips.clone(),
tag_registry: process_manager.tag_registry.clone(),
dead_count: process_manager.dead_count(),
},
crate::SimRandomProvider::new(seed),
sim.time_provider(),
chaos_shutdown.clone(),
);
let handle = tokio::spawn(async move {
let mut injector = fi;
let result = injector.inject(&fault_ctx).await;
(injector, result)
});
injector_handles.push(Some(handle));
}
} else {
parked_injectors = fault_injectors;
}
Ok((injector_handles, parked_injectors))
}
fn boot_and_wrap_process_manager<'pm>(
process_config: Option<ProcessConfig<'pm>>,
all_entities: &[(String, String)],
sim: &crate::sim::SimWorld,
seed: u64,
state: &StateHandle,
obs: &SimulationLayerHandle,
shutdown_signal: &tokio_util::sync::CancellationToken,
) -> Result<ProcessManager<'pm>, ()> {
let (process_handles, process_tokens) = Self::boot_processes(
process_config.as_ref(),
all_entities,
sim,
seed,
state,
obs,
shutdown_signal,
)?;
Ok(match process_config {
Some(pc) => ProcessManager::new(
pc.factory,
process_handles,
process_tokens,
pc.ips,
pc.tag_registry,
all_entities.to_vec(),
),
None => ProcessManager::new_empty(),
})
}
fn boot_processes(
process_config: Option<&ProcessConfig<'_>>,
all_entities: &[(String, String)],
sim: &crate::sim::SimWorld,
seed: u64,
state: &StateHandle,
obs: &SimulationLayerHandle,
shutdown_signal: &tokio_util::sync::CancellationToken,
) -> Result<(ProcessHandleSlots, ProcessTokenSlots), ()> {
let mut process_handles: ProcessHandleSlots = Vec::new();
let mut process_tokens: ProcessTokenSlots = Vec::new();
let Some(pc) = process_config else {
return Ok((process_handles, process_tokens));
};
for (i, ip) in pc.ips.iter().enumerate() {
let mut process = (pc.factory)();
let ip_addr: std::net::IpAddr = ip.parse().map_err(|_| ())?;
let process_tags = pc
.tag_registry
.tags_for(ip_addr)
.cloned()
.unwrap_or_default();
let process_token = shutdown_signal.child_token();
let topology = TopologyFactory::create_topology_with_processes(TopologyInputs {
ip,
client_id: i,
client_count: pc.ips.len(),
all_entities,
process_ips: &pc.ips,
my_tags: process_tags,
tag_registry: pc.tag_registry.clone(),
shutdown_signal: process_token.clone(),
});
let providers = crate::SimProviders::new(sim.downgrade(), seed, ip_addr);
let ctx = SimContext::new(providers, topology, state.clone(), obs.clone());
let ip_for_log = ip.clone();
let handle = tokio::spawn(async move {
if let Err(e) = process.run(&ctx).await {
tracing::debug!("Process at {} exited: {}", ip_for_log, e);
}
});
process_handles.push(Some(handle));
process_tokens.push(Some(process_token));
tracing::debug!("Booted process {} at {}", i, ip);
}
Ok((process_handles, process_tokens))
}
async fn run_check_phase(
sim: &mut crate::sim::SimWorld,
workloads: Vec<Box<dyn Workload>>,
contexts: Vec<SimContext>,
) -> Vec<Box<dyn Workload>> {
let mut check_handles = Vec::with_capacity(workloads.len());
for (workload, ctx) in workloads.into_iter().zip(contexts) {
let handle = tokio::spawn(async move {
let mut w = workload;
let result = w.check(&ctx).await;
if let Err(ref e) = result {
tracing::error!("Workload '{}' check failed: {}", w.name(), e);
}
w
});
check_handles.push(handle);
}
loop {
if check_handles
.iter()
.all(tokio::task::JoinHandle::is_finished)
{
break;
}
if sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let mut final_workloads = Vec::with_capacity(check_handles.len());
for handle in check_handles {
match handle.await {
Ok(w) => final_workloads.push(w),
Err(_) => {
tracing::error!("Check task panicked");
}
}
}
final_workloads
}
fn build_workload_contexts(env: &WorkloadContextEnv<'_>) -> Result<Vec<SimContext>, ()> {
let mut contexts = Vec::with_capacity(env.workload_info.len());
for (i, (_, ip)) in env.workload_info.iter().enumerate() {
let WorkloadClientInfo {
client_id,
client_count,
} = env.client_info[i];
let ip_addr: std::net::IpAddr = ip.parse().map_err(|_| ())?;
let topology = TopologyFactory::create_topology_with_processes(TopologyInputs {
ip,
client_id,
client_count,
all_entities: env.all_entities,
process_ips: env.process_ips,
my_tags: ProcessTags::default(),
tag_registry: env.tag_registry.clone(),
shutdown_signal: env.shutdown_signal.clone(),
});
let providers = crate::SimProviders::new(env.sim.downgrade(), env.seed, ip_addr);
let ctx = SimContext::new(providers, topology, env.state.clone(), env.obs.clone());
contexts.push(ctx);
}
Ok(contexts)
}
fn should_end_chaos(
sim: &crate::sim::SimWorld,
chaos_start: Duration,
chaos_duration: Option<Duration>,
) -> bool {
let elapsed = sim.current_time().saturating_sub(chaos_start);
chaos_duration.is_some_and(|cd| elapsed >= cd)
}
async fn collect_finished_workloads(
workload_handles: &mut [Option<tokio::task::JoinHandle<WorkloadResult>>],
workload_collected: &mut [Option<WorkloadResult>],
) -> bool {
let mut any_finished = false;
for i in 0..workload_handles.len() {
let finished = workload_handles[i]
.as_ref()
.is_some_and(tokio::task::JoinHandle::is_finished);
if finished {
let handle = workload_handles[i]
.take()
.expect("workload handle is finished");
match handle.await {
Ok((workload, result)) => {
tracing::debug!("Workload '{}' completed", workload.name());
workload_collected[i] = Some((workload, result));
}
Err(_) => {
tracing::error!("Workload task panicked");
}
}
any_finished = true;
}
}
any_finished
}
async fn reap_finished_injectors(
injector_handles: &mut [Option<tokio::task::JoinHandle<InjectorResult>>],
) {
for handle_opt in injector_handles {
let finished = handle_opt
.as_ref()
.is_some_and(tokio::task::JoinHandle::is_finished);
if finished {
let handle = handle_opt.take().expect("injector handle is finished");
match handle.await {
Ok((_injector, _result)) => {
tracing::debug!("Fault injector completed");
}
Err(_) => {
tracing::error!("Fault injector task panicked");
}
}
}
}
}
async fn collect_injector_results(
mut returned: Vec<Box<dyn FaultInjector>>,
mut injector_handles: Vec<Option<tokio::task::JoinHandle<InjectorResult>>>,
) -> Vec<Box<dyn FaultInjector>> {
for handle_opt in &mut injector_handles {
if let Some(handle) = handle_opt.take() {
if handle.is_finished() {
if let Ok((injector, _)) = handle.await {
returned.push(injector);
}
} else {
handle.abort();
}
}
}
returned
}
fn collect_workload_results(
workload_collected: Vec<Option<WorkloadResult>>,
total_workloads: usize,
) -> (Vec<Box<dyn Workload>>, Vec<SimulationResult<()>>) {
let mut returned_workloads = Vec::with_capacity(total_workloads);
let mut results = Vec::with_capacity(total_workloads);
for item in workload_collected {
match item {
Some((workload, result)) => {
returned_workloads.push(workload);
results.push(result);
}
None => {
results.push(Err(crate::SimulationError::InvalidState(
"Task panicked".to_string(),
)));
}
}
}
(returned_workloads, results)
}
async fn cooperative_loop_until_done<T: 'static>(
sim: &mut crate::sim::SimWorld,
process_manager: &mut ProcessManager<'_>,
seed: u64,
state: &StateHandle,
obs: &SimulationLayerHandle,
shutdown_signal: &tokio_util::sync::CancellationToken,
handles: &[tokio::task::JoinHandle<T>],
) {
loop {
if handles.iter().all(tokio::task::JoinHandle::is_finished) {
break;
}
if sim.pending_event_count() > 0 {
sim.step();
Self::handle_process_events(
sim,
process_manager,
seed,
state,
obs,
shutdown_signal,
);
}
tokio::task::yield_now().await;
}
}
async fn collect_setup_results(
setup_handles: Vec<SetupHandle>,
) -> (
Vec<Box<dyn Workload>>,
Vec<SimContext>,
Vec<SimulationResult<()>>,
bool,
) {
let mut workloads = Vec::with_capacity(setup_handles.len());
let mut contexts = Vec::with_capacity(setup_handles.len());
let mut setup_failed = false;
let mut setup_results: Vec<SimulationResult<()>> = Vec::new();
for handle in setup_handles {
if let Ok((w, ctx, result)) = handle.await {
if let Err(ref e) = result {
tracing::error!("Workload '{}' setup failed: {}", w.name(), e);
setup_failed = true;
}
setup_results.push(result);
workloads.push(w);
contexts.push(ctx);
} else {
tracing::error!("Setup task panicked");
setup_failed = true;
setup_results.push(Err(crate::SimulationError::InvalidState(
"Setup task panicked".to_string(),
)));
}
}
(workloads, contexts, setup_results, setup_failed)
}
fn settle_phase(sim: &mut crate::sim::SimWorld) -> Option<crate::SimulationError> {
let settle_start = sim.current_time();
let settle_timeout = Duration::from_mins(5);
while sim.pending_event_count() > 0 {
let elapsed = sim.current_time().saturating_sub(settle_start);
if elapsed > settle_timeout {
tracing::error!(
"Settle timeout: {} events still pending after {:?}",
sim.pending_event_count(),
elapsed
);
return Some(crate::SimulationError::SettleTimeout {
pending_events: sim.pending_event_count(),
elapsed,
});
}
sim.step();
}
None
}
fn handle_process_events(
sim: &mut crate::sim::SimWorld,
process_manager: &mut ProcessManager<'_>,
seed: u64,
state: &StateHandle,
obs: &SimulationLayerHandle,
shutdown_signal: &tokio_util::sync::CancellationToken,
) {
match sim.last_processed_event() {
Some(crate::sim::Event::ProcessGracefulShutdown {
ip,
grace_period_ms,
recovery_delay_ms,
}) => {
assert_reachable!("event: ProcessGracefulShutdown");
let event = SimFaultEvent::ProcessGracefulShutdown {
ip: ip.to_string(),
grace_period_ms,
};
tracing::info!(
capture = true,
trail = SIM_FAULT_TRAIL,
source = "sim",
event = valuable(&event),
);
process_manager.signal_graceful_shutdown(ip);
sim.schedule_event(
crate::sim::Event::ProcessForceKill {
ip,
recovery_delay_ms,
},
Duration::from_millis(grace_period_ms),
);
}
Some(crate::sim::Event::ProcessForceKill {
ip,
recovery_delay_ms,
}) => {
let event = SimFaultEvent::ProcessForceKill { ip: ip.to_string() };
tracing::info!(
capture = true,
trail = SIM_FAULT_TRAIL,
source = "sim",
event = valuable(&event),
);
process_manager.abort_process(ip);
sim.abort_all_connections_for_ip(ip);
sim.schedule_process_restart(ip, Duration::from_millis(recovery_delay_ms));
}
Some(crate::sim::Event::ProcessRestart { ip }) => {
assert_reachable!("event: ProcessRestart");
let event = SimFaultEvent::ProcessRestart { ip: ip.to_string() };
tracing::info!(
capture = true,
trail = SIM_FAULT_TRAIL,
source = "sim",
event = valuable(&event),
);
let weak_sim = sim.downgrade();
process_manager.handle_restart(ip, &weak_sim, seed, state, obs, shutdown_signal);
}
_ => {}
}
}
fn trigger_shutdown(
sim: &mut crate::sim::SimWorld,
shutdown_signal: &tokio_util::sync::CancellationToken,
) {
tracing::debug!("Triggering shutdown signal");
shutdown_signal.cancel();
sim.schedule_event(crate::sim::Event::Shutdown, Duration::from_nanos(1));
for i in 1..100 {
sim.schedule_event(
crate::sim::Event::Timer {
task_id: u64::MAX - i,
},
Duration::from_nanos(i),
);
}
}
fn heal_all_partitions(sim: &mut crate::sim::SimWorld, all_ips: &[String]) {
for i in 0..all_ips.len() {
for j in (i + 1)..all_ips.len() {
if let (Ok(a_ip), Ok(b_ip)) = (
all_ips[i].parse::<std::net::IpAddr>(),
all_ips[j].parse::<std::net::IpAddr>(),
) {
let _ = sim.restore_partition(a_ip, b_ip);
}
}
}
}
}
pub(crate) struct IterationManager {
control: super::builder::IterationControl,
seeds: Vec<u64>,
base_seed: u64,
iteration_count: usize,
start_time: Instant,
}
impl IterationManager {
pub(crate) fn new(control: super::builder::IterationControl, initial_seeds: Vec<u64>) -> Self {
let base_seed = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map_or(12345, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
Self {
control,
seeds: initial_seeds,
base_seed,
iteration_count: 0,
start_time: Instant::now(),
}
}
pub(crate) fn should_continue(&self) -> bool {
match &self.control {
super::builder::IterationControl::FixedCount(count)
| super::builder::IterationControl::UntilConverged {
max_iterations: count,
}
| super::builder::IterationControl::CoveragePlateau {
max_iterations: count,
..
} => self.iteration_count < *count,
super::builder::IterationControl::TimeLimit(duration) => {
self.start_time.elapsed() < *duration
}
}
}
pub(crate) fn next_iteration(&mut self) -> u64 {
let seed = if self.iteration_count < self.seeds.len() {
self.seeds[self.iteration_count]
} else {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
self.base_seed.hash(&mut hasher);
self.iteration_count.hash(&mut hasher);
let new_seed = hasher.finish();
self.seeds.push(new_seed);
new_seed
};
self.iteration_count += 1;
tracing::info!(
"Starting iteration {} with seed {} (iteration {}/{})",
self.iteration_count,
seed,
self.iteration_count,
match &self.control {
super::builder::IterationControl::FixedCount(count) => *count,
super::builder::IterationControl::TimeLimit(_) => 0,
super::builder::IterationControl::UntilConverged { max_iterations } => {
*max_iterations
}
super::builder::IterationControl::CoveragePlateau { max_iterations, .. } =>
*max_iterations,
}
);
seed
}
pub(crate) fn current_iteration(&self) -> usize {
self.iteration_count
}
pub(crate) fn max_iterations(&self) -> Option<usize> {
match &self.control {
super::builder::IterationControl::FixedCount(count)
| super::builder::IterationControl::UntilConverged {
max_iterations: count,
}
| super::builder::IterationControl::CoveragePlateau {
max_iterations: count,
..
} => Some(*count),
super::builder::IterationControl::TimeLimit(_) => None,
}
}
pub(crate) fn seeds_used(&self) -> &[u64] {
&self.seeds[..self.iteration_count]
}
}
pub(crate) struct MetricsCollector {
successful_runs: usize,
failed_runs: usize,
aggregated_metrics: SimulationMetrics,
individual_metrics: Vec<SimulationResult<SimulationMetrics>>,
faulty_seeds: Vec<u64>,
}
impl MetricsCollector {
pub(crate) fn new() -> Self {
Self {
successful_runs: 0,
failed_runs: 0,
aggregated_metrics: SimulationMetrics::default(),
individual_metrics: Vec::new(),
faulty_seeds: Vec::new(),
}
}
pub(crate) fn record_iteration(
&mut self,
seed: u64,
wall_time: Duration,
all_results: &[SimulationResult<()>],
has_assertion_violations: bool,
sim_metrics: SimulationMetrics,
) {
let workloads_ok = all_results.iter().all(std::result::Result::is_ok);
let all_ok = workloads_ok && !has_assertion_violations;
if all_ok {
self.record_success(seed, wall_time, sim_metrics);
} else {
self.record_failure(seed);
}
}
fn record_success(&mut self, seed: u64, wall_time: Duration, sim_metrics: SimulationMetrics) {
self.successful_runs += 1;
tracing::info!("Iteration completed successfully with seed {}", seed);
self.aggregated_metrics.wall_time += wall_time;
self.aggregated_metrics.simulated_time += sim_metrics.simulated_time;
self.aggregated_metrics.events_processed += sim_metrics.events_processed;
let mut individual = sim_metrics;
individual.wall_time = wall_time;
self.individual_metrics.push(Ok(individual));
}
fn record_failure(&mut self, seed: u64) {
self.failed_runs += 1;
tracing::error!("Iteration FAILED with seed {}", seed);
self.individual_metrics
.push(Err(crate::SimulationError::InvalidState(format!(
"One or more workloads failed (seed {seed})"
))));
self.faulty_seeds.push(seed);
}
pub(crate) fn add_faulty_seeds(&mut self, mut seeds: Vec<u64>) {
self.faulty_seeds.append(&mut seeds);
}
pub(crate) fn add_failed_runs(&mut self, count: usize) {
self.failed_runs += count;
}
pub(crate) fn generate_report(
self,
inputs: GenerateReportInputs,
) -> super::report::SimulationReport {
let GenerateReportInputs {
iteration_count,
seeds_used,
assertion_results,
assertion_violations,
coverage_violations,
exploration,
assertion_details,
bucket_summaries,
convergence_timeout,
} = inputs;
super::report::SimulationReport {
iterations: iteration_count,
successful_runs: self.successful_runs,
failed_runs: self.failed_runs,
metrics: self.aggregated_metrics,
individual_metrics: self.individual_metrics,
seeds_used,
seeds_failing: self.faulty_seeds,
assertion_results,
assertion_violations,
coverage_violations,
exploration,
assertion_details,
bucket_summaries,
convergence_timeout,
}
}
}
pub(crate) struct GenerateReportInputs {
pub(crate) iteration_count: usize,
pub(crate) seeds_used: Vec<u64>,
pub(crate) assertion_results: HashMap<String, AssertionStats>,
pub(crate) assertion_violations: Vec<String>,
pub(crate) coverage_violations: Vec<String>,
pub(crate) exploration: Option<super::report::ExplorationReport>,
pub(crate) assertion_details: Vec<super::report::AssertionDetail>,
pub(crate) bucket_summaries: Vec<super::report::BucketSiteSummary>,
pub(crate) convergence_timeout: bool,
}