use crate::child_runner::attempt::TaskExit;
use crate::child_runner::runner::{ChildRunReport, ChildRunner};
use crate::control::command::{CommandResult, ControlCommand, CurrentState, ManagedChildState};
use crate::error::types::SupervisorError;
use crate::id::types::ChildId;
use crate::policy::backoff::BackoffPolicy;
use crate::policy::decision::{
PolicyEngine, RestartDecision, RestartPolicy, TaskExit as PolicyTaskExit,
};
use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
use crate::registry::store::RegistryStore;
use crate::shutdown::coordinator::ShutdownCoordinator;
use crate::shutdown::stage::{ShutdownCause, ShutdownPolicy};
use crate::spec::child::RestartPolicy as ChildRestartPolicy;
use crate::spec::supervisor::SupervisorSpec;
use crate::tree::builder::SupervisorTree;
use crate::tree::order::{restart_execution_plan, startup_order};
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, oneshot};
#[derive(Debug)]
pub enum RuntimeCommand {
Control {
command: ControlCommand,
reply_sender: oneshot::Sender<Result<CommandResult, SupervisorError>>,
},
ChildExited {
report: Box<ChildRunReport>,
},
ChildStartFailed {
child_id: ChildId,
message: String,
},
}
#[derive(Debug)]
pub struct RuntimeControlState {
shutdown: ShutdownCoordinator,
children: HashMap<ChildId, ManagedChildState>,
manifests: Vec<String>,
registry: RegistryStore,
tree: SupervisorTree,
spec: SupervisorSpec,
policy_engine: PolicyEngine,
command_sender: mpsc::Sender<RuntimeCommand>,
}
impl RuntimeControlState {
pub fn new(
spec: SupervisorSpec,
shutdown_policy: ShutdownPolicy,
command_sender: mpsc::Sender<RuntimeCommand>,
) -> Result<Self, SupervisorError> {
let tree = SupervisorTree::build(&spec)?;
let mut registry = RegistryStore::new();
registry.register_tree(&tree)?;
Ok(Self {
shutdown: ShutdownCoordinator::new(shutdown_policy),
children: HashMap::new(),
manifests: Vec::new(),
registry,
tree,
spec,
policy_engine: PolicyEngine::new(),
command_sender,
})
}
pub fn start_declared_children(&mut self) {
let child_ids = startup_order(&self.tree)
.into_iter()
.map(|node| node.child.id.clone())
.collect::<Vec<_>>();
for child_id in child_ids {
self.spawn_child_attempt(child_id, false, Duration::ZERO);
}
}
pub fn execute_control(
&mut self,
command: ControlCommand,
) -> Result<CommandResult, SupervisorError> {
match command {
ControlCommand::AddChild { child_manifest, .. } => {
self.ensure_dynamic_child_allowed()?;
self.manifests.push(child_manifest.clone());
Ok(CommandResult::ChildAdded { child_manifest })
}
ControlCommand::RemoveChild { child_id, .. } => {
Ok(self.set_child_state(child_id, ManagedChildState::Removed))
}
ControlCommand::RestartChild { child_id, .. } => {
self.spawn_child_attempt(child_id.clone(), true, Duration::ZERO);
Ok(self.set_child_state(child_id, ManagedChildState::Running))
}
ControlCommand::PauseChild { child_id, .. } => {
Ok(self.set_child_state(child_id, ManagedChildState::Paused))
}
ControlCommand::ResumeChild { child_id, .. } => {
Ok(self.set_child_state(child_id, ManagedChildState::Running))
}
ControlCommand::QuarantineChild { child_id, .. } => {
Ok(self.set_child_state(child_id, ManagedChildState::Quarantined))
}
ControlCommand::ShutdownTree { meta } => {
let cause = ShutdownCause::new(meta.requested_by, meta.reason);
let result = self.shutdown.request_stop(cause);
self.shutdown.advance();
self.shutdown.advance();
self.shutdown.advance();
self.shutdown.advance();
self.shutdown.complete();
Ok(CommandResult::Shutdown { result })
}
ControlCommand::CurrentState { .. } => Ok(CommandResult::CurrentState {
state: CurrentState {
child_count: self.dynamic_child_count(),
shutdown_completed: self.shutdown.phase()
== crate::shutdown::stage::ShutdownPhase::Completed,
},
}),
}
}
pub fn handle_child_exit(
&mut self,
report: ChildRunReport,
event_sender: &broadcast::Sender<String>,
) {
let child_id = report.runtime.id.clone();
self.record_child_exit(report);
let _ignored = event_sender.send(format!("child_exit:{child_id}"));
if !self.should_apply_automatic_policy(&child_id) {
return;
}
let Some(decision) = self.restart_decision(&child_id) else {
return;
};
self.execute_restart_decision(child_id, decision, event_sender);
}
pub fn handle_child_start_failed(
&mut self,
child_id: ChildId,
message: String,
event_sender: &broadcast::Sender<String>,
) {
let _ignored = event_sender.send(format!("child_start_failed:{child_id}:{message}"));
let _result = self.set_child_state(child_id, ManagedChildState::Quarantined);
}
fn set_child_state(&mut self, child_id: ChildId, next: ManagedChildState) -> CommandResult {
let previous = self.children.insert(child_id.clone(), next);
CommandResult::ChildState {
child_id,
state: next,
idempotent: previous == Some(next),
}
}
fn record_child_exit(&mut self, report: ChildRunReport) {
let child_id = report.runtime.id.clone();
if let Some(runtime) = self.registry.child_mut(&child_id) {
runtime.last_exit = Some(report.exit);
runtime.status = ChildRuntimeStatus::Exited;
runtime.generation = report.runtime.generation;
runtime.attempt = report.runtime.attempt;
runtime.restart_count = report.runtime.restart_count;
}
}
fn should_apply_automatic_policy(&self, child_id: &ChildId) -> bool {
if self.shutdown.phase() != crate::shutdown::stage::ShutdownPhase::Idle {
return false;
}
!matches!(
self.children.get(child_id),
Some(ManagedChildState::Paused)
| Some(ManagedChildState::Quarantined)
| Some(ManagedChildState::Removed)
)
}
fn restart_decision(&self, child_id: &ChildId) -> Option<RestartDecision> {
let runtime = self.registry.child(child_id)?;
let exit = runtime.last_exit.as_ref()?;
let policy_exit = policy_task_exit(exit);
let restart_policy = restart_policy(runtime.spec.restart_policy);
let backoff = backoff_policy(runtime.spec.backoff_policy);
Some(self.policy_engine.decide(
restart_policy,
policy_exit,
runtime.attempt.value,
&backoff,
))
}
fn execute_restart_decision(
&mut self,
failed_child: ChildId,
decision: RestartDecision,
event_sender: &broadcast::Sender<String>,
) {
match decision {
RestartDecision::RestartAfter { delay } => {
self.restart_strategy_scope(failed_child, delay, event_sender);
}
RestartDecision::Quarantine => {
let _result = self.set_child_state(failed_child, ManagedChildState::Quarantined);
}
RestartDecision::ShutdownTree => {
let cause = ShutdownCause::new("runtime", "policy requested tree shutdown");
let _result = self.shutdown.request_stop(cause);
}
RestartDecision::EscalateToParent | RestartDecision::DoNotRestart => {}
}
}
fn restart_strategy_scope(
&mut self,
failed_child: ChildId,
delay: Duration,
event_sender: &broadcast::Sender<String>,
) {
let plan = restart_execution_plan(&self.tree, &self.spec, &failed_child);
let scope_label = child_scope_label(&plan.scope);
let group_label = plan.group.as_deref().unwrap_or("supervisor");
let _ignored = event_sender.send(format!(
"restart_plan:{:?}:{group_label}:{scope_label}",
plan.strategy
));
for child_id in plan.scope {
self.spawn_child_attempt(child_id, true, delay);
}
}
fn ensure_dynamic_child_allowed(&self) -> Result<(), SupervisorError> {
let current_child_count = self.dynamic_child_count();
if self
.spec
.dynamic_supervisor_policy
.allows_addition(current_child_count)
{
return Ok(());
}
Err(SupervisorError::InvalidTransition {
message: "dynamic supervisor child limit reached".to_owned(),
})
}
fn dynamic_child_count(&self) -> usize {
self.registry
.declaration_order()
.len()
.saturating_add(self.manifests.len())
}
fn spawn_child_attempt(&mut self, child_id: ChildId, is_restart: bool, delay: Duration) {
let Some(runtime) = self.prepare_child_attempt(&child_id, is_restart) else {
return;
};
let sender = self.command_sender.clone();
tokio::spawn(async move {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
let child_id = runtime.id.clone();
let result = ChildRunner::new().run_once(runtime).await;
send_child_result(sender, child_id, result).await;
});
}
fn prepare_child_attempt(
&mut self,
child_id: &ChildId,
is_restart: bool,
) -> Option<ChildRuntime> {
let runtime = self.registry.child_mut(child_id)?;
if is_restart {
runtime.attempt = runtime.attempt.next();
runtime.generation = runtime.generation.next();
runtime.restart_count = runtime.restart_count.saturating_add(1);
}
runtime.status = ChildRuntimeStatus::Starting;
self.children
.insert(child_id.clone(), ManagedChildState::Running);
Some(runtime.clone())
}
}
pub async fn run_control_loop(
mut state: RuntimeControlState,
mut receiver: mpsc::Receiver<RuntimeCommand>,
event_sender: broadcast::Sender<String>,
) {
state.start_declared_children();
while let Some(message) = receiver.recv().await {
match message {
RuntimeCommand::Control {
command,
reply_sender,
} => {
let command_name = command_name(&command);
let result = state.execute_control(command);
let _ignored = event_sender.send(format!("control_command:{command_name}"));
let _ignored = reply_sender.send(result);
}
RuntimeCommand::ChildExited { report } => {
state.handle_child_exit(*report, &event_sender);
}
RuntimeCommand::ChildStartFailed { child_id, message } => {
state.handle_child_start_failed(child_id, message, &event_sender);
}
}
}
}
fn command_name(command: &ControlCommand) -> &'static str {
match command {
ControlCommand::AddChild { .. } => "add_child",
ControlCommand::RemoveChild { .. } => "remove_child",
ControlCommand::RestartChild { .. } => "restart_child",
ControlCommand::PauseChild { .. } => "pause_child",
ControlCommand::ResumeChild { .. } => "resume_child",
ControlCommand::QuarantineChild { .. } => "quarantine_child",
ControlCommand::ShutdownTree { .. } => "shutdown_tree",
ControlCommand::CurrentState { .. } => "current_state",
}
}
async fn send_child_result(
sender: mpsc::Sender<RuntimeCommand>,
child_id: ChildId,
result: Result<ChildRunReport, SupervisorError>,
) {
let message = match result {
Ok(report) => RuntimeCommand::ChildExited {
report: Box::new(report),
},
Err(error) => RuntimeCommand::ChildStartFailed {
child_id,
message: error.to_string(),
},
};
let _ignored = sender.send(message).await;
}
fn restart_policy(policy: ChildRestartPolicy) -> RestartPolicy {
match policy {
ChildRestartPolicy::Permanent => RestartPolicy::Permanent,
ChildRestartPolicy::Transient => RestartPolicy::Transient,
ChildRestartPolicy::Temporary => RestartPolicy::Temporary,
}
}
fn backoff_policy(policy: crate::spec::child::BackoffPolicy) -> BackoffPolicy {
let jitter_percent = (policy.jitter_ratio * 100.0).round().clamp(0.0, 100.0) as u8;
BackoffPolicy::new(
policy.initial_delay,
policy.max_delay,
jitter_percent,
policy.max_delay,
)
}
fn policy_task_exit(exit: &TaskExit) -> PolicyTaskExit {
match exit.failure_kind() {
Some(kind) => PolicyTaskExit::Failed { kind: kind.into() },
None => PolicyTaskExit::Succeeded,
}
}
fn child_scope_label(scope: &[ChildId]) -> String {
scope
.iter()
.map(|child_id| child_id.value.clone())
.collect::<Vec<_>>()
.join(",")
}