1use crate::child_runner::attempt::TaskExit;
7use crate::child_runner::runner::{ChildRunReport, ChildRunner};
8use crate::control::command::{CommandResult, ControlCommand, CurrentState, ManagedChildState};
9use crate::error::types::SupervisorError;
10use crate::id::types::ChildId;
11use crate::policy::backoff::BackoffPolicy;
12use crate::policy::decision::{
13 PolicyEngine, RestartDecision, RestartPolicy, TaskExit as PolicyTaskExit,
14};
15use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
16use crate::registry::store::RegistryStore;
17use crate::shutdown::coordinator::ShutdownCoordinator;
18use crate::shutdown::stage::{ShutdownCause, ShutdownPolicy};
19use crate::spec::child::RestartPolicy as ChildRestartPolicy;
20use crate::spec::supervisor::SupervisorSpec;
21use crate::tree::builder::SupervisorTree;
22use crate::tree::order::{restart_execution_plan, startup_order};
23use std::collections::HashMap;
24use std::time::Duration;
25use tokio::sync::{broadcast, mpsc, oneshot};
26
27#[derive(Debug)]
29pub enum RuntimeCommand {
30 Control {
32 command: ControlCommand,
34 reply_sender: oneshot::Sender<Result<CommandResult, SupervisorError>>,
36 },
37 ChildExited {
39 report: Box<ChildRunReport>,
41 },
42 ChildStartFailed {
44 child_id: ChildId,
46 message: String,
48 },
49}
50
51#[derive(Debug)]
53pub struct RuntimeControlState {
54 shutdown: ShutdownCoordinator,
56 children: HashMap<ChildId, ManagedChildState>,
58 manifests: Vec<String>,
60 registry: RegistryStore,
62 tree: SupervisorTree,
64 spec: SupervisorSpec,
66 policy_engine: PolicyEngine,
68 command_sender: mpsc::Sender<RuntimeCommand>,
70}
71
72impl RuntimeControlState {
73 pub fn new(
85 spec: SupervisorSpec,
86 shutdown_policy: ShutdownPolicy,
87 command_sender: mpsc::Sender<RuntimeCommand>,
88 ) -> Result<Self, SupervisorError> {
89 let tree = SupervisorTree::build(&spec)?;
90 let mut registry = RegistryStore::new();
91 registry.register_tree(&tree)?;
92 Ok(Self {
93 shutdown: ShutdownCoordinator::new(shutdown_policy),
94 children: HashMap::new(),
95 manifests: Vec::new(),
96 registry,
97 tree,
98 spec,
99 policy_engine: PolicyEngine::new(),
100 command_sender,
101 })
102 }
103
104 pub fn start_declared_children(&mut self) {
114 let child_ids = startup_order(&self.tree)
115 .into_iter()
116 .map(|node| node.child.id.clone())
117 .collect::<Vec<_>>();
118 for child_id in child_ids {
119 self.spawn_child_attempt(child_id, false, Duration::ZERO);
120 }
121 }
122
123 pub fn execute_control(
133 &mut self,
134 command: ControlCommand,
135 ) -> Result<CommandResult, SupervisorError> {
136 match command {
137 ControlCommand::AddChild { child_manifest, .. } => {
138 self.ensure_dynamic_child_allowed()?;
139 self.manifests.push(child_manifest.clone());
140 Ok(CommandResult::ChildAdded { child_manifest })
141 }
142 ControlCommand::RemoveChild { child_id, .. } => {
143 Ok(self.set_child_state(child_id, ManagedChildState::Removed))
144 }
145 ControlCommand::RestartChild { child_id, .. } => {
146 self.spawn_child_attempt(child_id.clone(), true, Duration::ZERO);
147 Ok(self.set_child_state(child_id, ManagedChildState::Running))
148 }
149 ControlCommand::PauseChild { child_id, .. } => {
150 Ok(self.set_child_state(child_id, ManagedChildState::Paused))
151 }
152 ControlCommand::ResumeChild { child_id, .. } => {
153 Ok(self.set_child_state(child_id, ManagedChildState::Running))
154 }
155 ControlCommand::QuarantineChild { child_id, .. } => {
156 Ok(self.set_child_state(child_id, ManagedChildState::Quarantined))
157 }
158 ControlCommand::ShutdownTree { meta } => {
159 let cause = ShutdownCause::new(meta.requested_by, meta.reason);
160 let result = self.shutdown.request_stop(cause);
161 self.shutdown.advance();
162 self.shutdown.advance();
163 self.shutdown.advance();
164 self.shutdown.advance();
165 self.shutdown.complete();
166 Ok(CommandResult::Shutdown { result })
167 }
168 ControlCommand::CurrentState { .. } => Ok(CommandResult::CurrentState {
169 state: CurrentState {
170 child_count: self.dynamic_child_count(),
171 shutdown_completed: self.shutdown.phase()
172 == crate::shutdown::stage::ShutdownPhase::Completed,
173 },
174 }),
175 }
176 }
177
178 pub fn handle_child_exit(
189 &mut self,
190 report: ChildRunReport,
191 event_sender: &broadcast::Sender<String>,
192 ) {
193 let child_id = report.runtime.id.clone();
194 self.record_child_exit(report);
195 let _ignored = event_sender.send(format!("child_exit:{child_id}"));
196 if !self.should_apply_automatic_policy(&child_id) {
197 return;
198 }
199 let Some(decision) = self.restart_decision(&child_id) else {
200 return;
201 };
202 self.execute_restart_decision(child_id, decision, event_sender);
203 }
204
205 pub fn handle_child_start_failed(
217 &mut self,
218 child_id: ChildId,
219 message: String,
220 event_sender: &broadcast::Sender<String>,
221 ) {
222 let _ignored = event_sender.send(format!("child_start_failed:{child_id}:{message}"));
223 let _result = self.set_child_state(child_id, ManagedChildState::Quarantined);
224 }
225
226 fn set_child_state(&mut self, child_id: ChildId, next: ManagedChildState) -> CommandResult {
237 let previous = self.children.insert(child_id.clone(), next);
238 CommandResult::ChildState {
239 child_id,
240 state: next,
241 idempotent: previous == Some(next),
242 }
243 }
244
245 fn record_child_exit(&mut self, report: ChildRunReport) {
255 let child_id = report.runtime.id.clone();
256 if let Some(runtime) = self.registry.child_mut(&child_id) {
257 runtime.last_exit = Some(report.exit);
258 runtime.status = ChildRuntimeStatus::Exited;
259 runtime.generation = report.runtime.generation;
260 runtime.attempt = report.runtime.attempt;
261 runtime.restart_count = report.runtime.restart_count;
262 }
263 }
264
265 fn should_apply_automatic_policy(&self, child_id: &ChildId) -> bool {
275 if self.shutdown.phase() != crate::shutdown::stage::ShutdownPhase::Idle {
276 return false;
277 }
278 !matches!(
279 self.children.get(child_id),
280 Some(ManagedChildState::Paused)
281 | Some(ManagedChildState::Quarantined)
282 | Some(ManagedChildState::Removed)
283 )
284 }
285
286 fn restart_decision(&self, child_id: &ChildId) -> Option<RestartDecision> {
296 let runtime = self.registry.child(child_id)?;
297 let exit = runtime.last_exit.as_ref()?;
298 let policy_exit = policy_task_exit(exit);
299 let restart_policy = restart_policy(runtime.spec.restart_policy);
300 let backoff = backoff_policy(runtime.spec.backoff_policy);
301 Some(self.policy_engine.decide(
302 restart_policy,
303 policy_exit,
304 runtime.attempt.value,
305 &backoff,
306 ))
307 }
308
309 fn execute_restart_decision(
321 &mut self,
322 failed_child: ChildId,
323 decision: RestartDecision,
324 event_sender: &broadcast::Sender<String>,
325 ) {
326 match decision {
327 RestartDecision::RestartAfter { delay } => {
328 self.restart_strategy_scope(failed_child, delay, event_sender);
329 }
330 RestartDecision::Quarantine => {
331 let _result = self.set_child_state(failed_child, ManagedChildState::Quarantined);
332 }
333 RestartDecision::ShutdownTree => {
334 let cause = ShutdownCause::new("runtime", "policy requested tree shutdown");
335 let _result = self.shutdown.request_stop(cause);
336 }
337 RestartDecision::EscalateToParent | RestartDecision::DoNotRestart => {}
338 }
339 }
340
341 fn restart_strategy_scope(
353 &mut self,
354 failed_child: ChildId,
355 delay: Duration,
356 event_sender: &broadcast::Sender<String>,
357 ) {
358 let plan = restart_execution_plan(&self.tree, &self.spec, &failed_child);
359 let scope_label = child_scope_label(&plan.scope);
360 let group_label = plan.group.as_deref().unwrap_or("supervisor");
361 let _ignored = event_sender.send(format!(
362 "restart_plan:{:?}:{group_label}:{scope_label}",
363 plan.strategy
364 ));
365 for child_id in plan.scope {
366 self.spawn_child_attempt(child_id, true, delay);
367 }
368 }
369
370 fn ensure_dynamic_child_allowed(&self) -> Result<(), SupervisorError> {
380 let current_child_count = self.dynamic_child_count();
381 if self
382 .spec
383 .dynamic_supervisor_policy
384 .allows_addition(current_child_count)
385 {
386 return Ok(());
387 }
388 Err(SupervisorError::InvalidTransition {
389 message: "dynamic supervisor child limit reached".to_owned(),
390 })
391 }
392
393 fn dynamic_child_count(&self) -> usize {
403 self.registry
404 .declaration_order()
405 .len()
406 .saturating_add(self.manifests.len())
407 }
408
409 fn spawn_child_attempt(&mut self, child_id: ChildId, is_restart: bool, delay: Duration) {
421 let Some(runtime) = self.prepare_child_attempt(&child_id, is_restart) else {
422 return;
423 };
424 let sender = self.command_sender.clone();
425 tokio::spawn(async move {
426 if !delay.is_zero() {
427 tokio::time::sleep(delay).await;
428 }
429 let child_id = runtime.id.clone();
430 let result = ChildRunner::new().run_once(runtime).await;
431 send_child_result(sender, child_id, result).await;
432 });
433 }
434
435 fn prepare_child_attempt(
446 &mut self,
447 child_id: &ChildId,
448 is_restart: bool,
449 ) -> Option<ChildRuntime> {
450 let runtime = self.registry.child_mut(child_id)?;
451 if is_restart {
452 runtime.attempt = runtime.attempt.next();
453 runtime.generation = runtime.generation.next();
454 runtime.restart_count = runtime.restart_count.saturating_add(1);
455 }
456 runtime.status = ChildRuntimeStatus::Starting;
457 self.children
458 .insert(child_id.clone(), ManagedChildState::Running);
459 Some(runtime.clone())
460 }
461}
462
463pub async fn run_control_loop(
475 mut state: RuntimeControlState,
476 mut receiver: mpsc::Receiver<RuntimeCommand>,
477 event_sender: broadcast::Sender<String>,
478) {
479 state.start_declared_children();
480 while let Some(message) = receiver.recv().await {
481 match message {
482 RuntimeCommand::Control {
483 command,
484 reply_sender,
485 } => {
486 let command_name = command_name(&command);
487 let result = state.execute_control(command);
488 let _ignored = event_sender.send(format!("control_command:{command_name}"));
489 let _ignored = reply_sender.send(result);
490 }
491 RuntimeCommand::ChildExited { report } => {
492 state.handle_child_exit(*report, &event_sender);
493 }
494 RuntimeCommand::ChildStartFailed { child_id, message } => {
495 state.handle_child_start_failed(child_id, message, &event_sender);
496 }
497 }
498 }
499}
500
501fn command_name(command: &ControlCommand) -> &'static str {
511 match command {
512 ControlCommand::AddChild { .. } => "add_child",
513 ControlCommand::RemoveChild { .. } => "remove_child",
514 ControlCommand::RestartChild { .. } => "restart_child",
515 ControlCommand::PauseChild { .. } => "pause_child",
516 ControlCommand::ResumeChild { .. } => "resume_child",
517 ControlCommand::QuarantineChild { .. } => "quarantine_child",
518 ControlCommand::ShutdownTree { .. } => "shutdown_tree",
519 ControlCommand::CurrentState { .. } => "current_state",
520 }
521}
522
523async fn send_child_result(
535 sender: mpsc::Sender<RuntimeCommand>,
536 child_id: ChildId,
537 result: Result<ChildRunReport, SupervisorError>,
538) {
539 let message = match result {
540 Ok(report) => RuntimeCommand::ChildExited {
541 report: Box::new(report),
542 },
543 Err(error) => RuntimeCommand::ChildStartFailed {
544 child_id,
545 message: error.to_string(),
546 },
547 };
548 let _ignored = sender.send(message).await;
549}
550
551fn restart_policy(policy: ChildRestartPolicy) -> RestartPolicy {
561 match policy {
562 ChildRestartPolicy::Permanent => RestartPolicy::Permanent,
563 ChildRestartPolicy::Transient => RestartPolicy::Transient,
564 ChildRestartPolicy::Temporary => RestartPolicy::Temporary,
565 }
566}
567
568fn backoff_policy(policy: crate::spec::child::BackoffPolicy) -> BackoffPolicy {
578 let jitter_percent = (policy.jitter_ratio * 100.0).round().clamp(0.0, 100.0) as u8;
579 BackoffPolicy::new(
580 policy.initial_delay,
581 policy.max_delay,
582 jitter_percent,
583 policy.max_delay,
584 )
585}
586
587fn policy_task_exit(exit: &TaskExit) -> PolicyTaskExit {
597 match exit.failure_kind() {
598 Some(kind) => PolicyTaskExit::Failed { kind: kind.into() },
599 None => PolicyTaskExit::Succeeded,
600 }
601}
602
603fn child_scope_label(scope: &[ChildId]) -> String {
613 scope
614 .iter()
615 .map(|child_id| child_id.value.clone())
616 .collect::<Vec<_>>()
617 .join(",")
618}