Skip to main content

rust_supervisor/runtime/
control_loop.rs

1//! Runtime control loop.
2//!
3//! This module executes control-plane commands, receives child attempt exits,
4//! and applies supervisor restart strategy decisions.
5
6use crate::child_runner::attempt::TaskExit;
7use crate::child_runner::runner::{ChildRunReport, ChildRunner};
8use crate::control::command::{CommandResult, ControlCommand, CurrentState, ManagedChildState};
9use crate::error::types::SupervisorError;
10use crate::id::types::ChildId;
11use crate::policy::backoff::BackoffPolicy;
12use crate::policy::decision::{
13    PolicyEngine, RestartDecision, RestartPolicy, TaskExit as PolicyTaskExit,
14};
15use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
16use crate::registry::store::RegistryStore;
17use crate::shutdown::coordinator::ShutdownCoordinator;
18use crate::shutdown::stage::{ShutdownCause, ShutdownPolicy};
19use crate::spec::child::RestartPolicy as ChildRestartPolicy;
20use crate::spec::supervisor::SupervisorSpec;
21use crate::tree::builder::SupervisorTree;
22use crate::tree::order::{restart_execution_plan, startup_order};
23use std::collections::HashMap;
24use std::time::Duration;
25use tokio::sync::{broadcast, mpsc, oneshot};
26
27/// Message consumed by the runtime control loop.
28#[derive(Debug)]
29pub enum RuntimeCommand {
30    /// Control command sent from [`crate::control::handle::SupervisorHandle`].
31    Control {
32        /// Command to execute.
33        command: ControlCommand,
34        /// Reply channel used to return the command result.
35        reply_sender: oneshot::Sender<Result<CommandResult, SupervisorError>>,
36    },
37    /// Child attempt finished and must be evaluated by runtime policy.
38    ChildExited {
39        /// Report returned by the child runner.
40        report: Box<ChildRunReport>,
41    },
42    /// Child attempt could not start.
43    ChildStartFailed {
44        /// Child identifier whose attempt failed before execution.
45        child_id: ChildId,
46        /// Diagnostic message for the failed attempt.
47        message: String,
48    },
49}
50
51/// Mutable state owned by the control loop.
52#[derive(Debug)]
53pub struct RuntimeControlState {
54    /// Shutdown state machine used by tree-level shutdown commands.
55    shutdown: ShutdownCoordinator,
56    /// Runtime child states set by explicit control commands.
57    children: HashMap<ChildId, ManagedChildState>,
58    /// Dynamic child manifests accepted after startup.
59    manifests: Vec<String>,
60    /// Registry that owns declared child runtime records.
61    registry: RegistryStore,
62    /// Built supervisor tree used for order and scope planning.
63    tree: SupervisorTree,
64    /// Supervisor specification that owns strategy and dynamic policies.
65    spec: SupervisorSpec,
66    /// Policy engine used to convert task exits into restart decisions.
67    policy_engine: PolicyEngine,
68    /// Sender used by spawned child attempts to report runtime messages.
69    command_sender: mpsc::Sender<RuntimeCommand>,
70}
71
72impl RuntimeControlState {
73    /// Creates control state from a supervisor specification.
74    ///
75    /// # Arguments
76    ///
77    /// - `spec`: Supervisor declaration that owns children and strategy.
78    /// - `shutdown_policy`: Policy used by the shutdown coordinator.
79    /// - `command_sender`: Sender used by child attempts to report exits.
80    ///
81    /// # Returns
82    ///
83    /// Returns a [`RuntimeControlState`] value.
84    pub fn new(
85        spec: SupervisorSpec,
86        shutdown_policy: ShutdownPolicy,
87        command_sender: mpsc::Sender<RuntimeCommand>,
88    ) -> Result<Self, SupervisorError> {
89        let tree = SupervisorTree::build(&spec)?;
90        let mut registry = RegistryStore::new();
91        registry.register_tree(&tree)?;
92        Ok(Self {
93            shutdown: ShutdownCoordinator::new(shutdown_policy),
94            children: HashMap::new(),
95            manifests: Vec::new(),
96            registry,
97            tree,
98            spec,
99            policy_engine: PolicyEngine::new(),
100            command_sender,
101        })
102    }
103
104    /// Starts every declared child in supervisor startup order.
105    ///
106    /// # Arguments
107    ///
108    /// This function has no arguments.
109    ///
110    /// # Returns
111    ///
112    /// This function does not return a value.
113    pub fn start_declared_children(&mut self) {
114        let child_ids = startup_order(&self.tree)
115            .into_iter()
116            .map(|node| node.child.id.clone())
117            .collect::<Vec<_>>();
118        for child_id in child_ids {
119            self.spawn_child_attempt(child_id, false, Duration::ZERO);
120        }
121    }
122
123    /// Executes one control command.
124    ///
125    /// # Arguments
126    ///
127    /// - `command`: Command received by the runtime.
128    ///
129    /// # Returns
130    ///
131    /// Returns a command result.
132    pub fn execute_control(
133        &mut self,
134        command: ControlCommand,
135    ) -> Result<CommandResult, SupervisorError> {
136        match command {
137            ControlCommand::AddChild { child_manifest, .. } => {
138                self.ensure_dynamic_child_allowed()?;
139                self.manifests.push(child_manifest.clone());
140                Ok(CommandResult::ChildAdded { child_manifest })
141            }
142            ControlCommand::RemoveChild { child_id, .. } => {
143                Ok(self.set_child_state(child_id, ManagedChildState::Removed))
144            }
145            ControlCommand::RestartChild { child_id, .. } => {
146                self.spawn_child_attempt(child_id.clone(), true, Duration::ZERO);
147                Ok(self.set_child_state(child_id, ManagedChildState::Running))
148            }
149            ControlCommand::PauseChild { child_id, .. } => {
150                Ok(self.set_child_state(child_id, ManagedChildState::Paused))
151            }
152            ControlCommand::ResumeChild { child_id, .. } => {
153                Ok(self.set_child_state(child_id, ManagedChildState::Running))
154            }
155            ControlCommand::QuarantineChild { child_id, .. } => {
156                Ok(self.set_child_state(child_id, ManagedChildState::Quarantined))
157            }
158            ControlCommand::ShutdownTree { meta } => {
159                let cause = ShutdownCause::new(meta.requested_by, meta.reason);
160                let result = self.shutdown.request_stop(cause);
161                self.shutdown.advance();
162                self.shutdown.advance();
163                self.shutdown.advance();
164                self.shutdown.advance();
165                self.shutdown.complete();
166                Ok(CommandResult::Shutdown { result })
167            }
168            ControlCommand::CurrentState { .. } => Ok(CommandResult::CurrentState {
169                state: CurrentState {
170                    child_count: self.dynamic_child_count(),
171                    shutdown_completed: self.shutdown.phase()
172                        == crate::shutdown::stage::ShutdownPhase::Completed,
173                },
174            }),
175        }
176    }
177
178    /// Applies policy to a completed child attempt.
179    ///
180    /// # Arguments
181    ///
182    /// - `report`: Completed child attempt report.
183    /// - `event_sender`: Event channel used for lifecycle text.
184    ///
185    /// # Returns
186    ///
187    /// This function does not return a value.
188    pub fn handle_child_exit(
189        &mut self,
190        report: ChildRunReport,
191        event_sender: &broadcast::Sender<String>,
192    ) {
193        let child_id = report.runtime.id.clone();
194        self.record_child_exit(report);
195        let _ignored = event_sender.send(format!("child_exit:{child_id}"));
196        if !self.should_apply_automatic_policy(&child_id) {
197            return;
198        }
199        let Some(decision) = self.restart_decision(&child_id) else {
200            return;
201        };
202        self.execute_restart_decision(child_id, decision, event_sender);
203    }
204
205    /// Records a failed child start.
206    ///
207    /// # Arguments
208    ///
209    /// - `child_id`: Child identifier whose attempt failed.
210    /// - `message`: Diagnostic error message.
211    /// - `event_sender`: Event channel used for lifecycle text.
212    ///
213    /// # Returns
214    ///
215    /// This function does not return a value.
216    pub fn handle_child_start_failed(
217        &mut self,
218        child_id: ChildId,
219        message: String,
220        event_sender: &broadcast::Sender<String>,
221    ) {
222        let _ignored = event_sender.send(format!("child_start_failed:{child_id}:{message}"));
223        let _result = self.set_child_state(child_id, ManagedChildState::Quarantined);
224    }
225
226    /// Sets a child state and reports whether the operation was idempotent.
227    ///
228    /// # Arguments
229    ///
230    /// - `child_id`: Target child identifier.
231    /// - `next`: Requested managed child state.
232    ///
233    /// # Returns
234    ///
235    /// Returns a [`CommandResult::ChildState`] value.
236    fn set_child_state(&mut self, child_id: ChildId, next: ManagedChildState) -> CommandResult {
237        let previous = self.children.insert(child_id.clone(), next);
238        CommandResult::ChildState {
239            child_id,
240            state: next,
241            idempotent: previous == Some(next),
242        }
243    }
244
245    /// Records the completed attempt in the registry.
246    ///
247    /// # Arguments
248    ///
249    /// - `report`: Completed child attempt report.
250    ///
251    /// # Returns
252    ///
253    /// This function does not return a value.
254    fn record_child_exit(&mut self, report: ChildRunReport) {
255        let child_id = report.runtime.id.clone();
256        if let Some(runtime) = self.registry.child_mut(&child_id) {
257            runtime.last_exit = Some(report.exit);
258            runtime.status = ChildRuntimeStatus::Exited;
259            runtime.generation = report.runtime.generation;
260            runtime.attempt = report.runtime.attempt;
261            runtime.restart_count = report.runtime.restart_count;
262        }
263    }
264
265    /// Reports whether automatic policy may still act on a child.
266    ///
267    /// # Arguments
268    ///
269    /// - `child_id`: Child whose latest exit is being evaluated.
270    ///
271    /// # Returns
272    ///
273    /// Returns `true` when the runtime may restart the child.
274    fn should_apply_automatic_policy(&self, child_id: &ChildId) -> bool {
275        if self.shutdown.phase() != crate::shutdown::stage::ShutdownPhase::Idle {
276            return false;
277        }
278        !matches!(
279            self.children.get(child_id),
280            Some(ManagedChildState::Paused)
281                | Some(ManagedChildState::Quarantined)
282                | Some(ManagedChildState::Removed)
283        )
284    }
285
286    /// Calculates a restart decision for the latest child exit.
287    ///
288    /// # Arguments
289    ///
290    /// - `child_id`: Child whose latest exit is being evaluated.
291    ///
292    /// # Returns
293    ///
294    /// Returns a restart decision when the child is known.
295    fn restart_decision(&self, child_id: &ChildId) -> Option<RestartDecision> {
296        let runtime = self.registry.child(child_id)?;
297        let exit = runtime.last_exit.as_ref()?;
298        let policy_exit = policy_task_exit(exit);
299        let restart_policy = restart_policy(runtime.spec.restart_policy);
300        let backoff = backoff_policy(runtime.spec.backoff_policy);
301        Some(self.policy_engine.decide(
302            restart_policy,
303            policy_exit,
304            runtime.attempt.value,
305            &backoff,
306        ))
307    }
308
309    /// Executes a restart decision after a child exit.
310    ///
311    /// # Arguments
312    ///
313    /// - `failed_child`: Child whose exit triggered the decision.
314    /// - `decision`: Restart decision returned by the policy engine.
315    /// - `event_sender`: Event channel used for lifecycle text.
316    ///
317    /// # Returns
318    ///
319    /// This function does not return a value.
320    fn execute_restart_decision(
321        &mut self,
322        failed_child: ChildId,
323        decision: RestartDecision,
324        event_sender: &broadcast::Sender<String>,
325    ) {
326        match decision {
327            RestartDecision::RestartAfter { delay } => {
328                self.restart_strategy_scope(failed_child, delay, event_sender);
329            }
330            RestartDecision::Quarantine => {
331                let _result = self.set_child_state(failed_child, ManagedChildState::Quarantined);
332            }
333            RestartDecision::ShutdownTree => {
334                let cause = ShutdownCause::new("runtime", "policy requested tree shutdown");
335                let _result = self.shutdown.request_stop(cause);
336            }
337            RestartDecision::EscalateToParent | RestartDecision::DoNotRestart => {}
338        }
339    }
340
341    /// Restarts every child selected by the current execution plan.
342    ///
343    /// # Arguments
344    ///
345    /// - `failed_child`: Child whose exit triggered the restart scope.
346    /// - `delay`: Delay before every selected child is restarted.
347    /// - `event_sender`: Event channel used for lifecycle text.
348    ///
349    /// # Returns
350    ///
351    /// This function does not return a value.
352    fn restart_strategy_scope(
353        &mut self,
354        failed_child: ChildId,
355        delay: Duration,
356        event_sender: &broadcast::Sender<String>,
357    ) {
358        let plan = restart_execution_plan(&self.tree, &self.spec, &failed_child);
359        let scope_label = child_scope_label(&plan.scope);
360        let group_label = plan.group.as_deref().unwrap_or("supervisor");
361        let _ignored = event_sender.send(format!(
362            "restart_plan:{:?}:{group_label}:{scope_label}",
363            plan.strategy
364        ));
365        for child_id in plan.scope {
366            self.spawn_child_attempt(child_id, true, delay);
367        }
368    }
369
370    /// Ensures that the dynamic supervisor accepts another child manifest.
371    ///
372    /// # Arguments
373    ///
374    /// This function has no arguments.
375    ///
376    /// # Returns
377    ///
378    /// Returns `Ok(())` when another dynamic child can be added.
379    fn ensure_dynamic_child_allowed(&self) -> Result<(), SupervisorError> {
380        let current_child_count = self.dynamic_child_count();
381        if self
382            .spec
383            .dynamic_supervisor_policy
384            .allows_addition(current_child_count)
385        {
386            return Ok(());
387        }
388        Err(SupervisorError::InvalidTransition {
389            message: "dynamic supervisor child limit reached".to_owned(),
390        })
391    }
392
393    /// Counts declared and dynamic child records.
394    ///
395    /// # Arguments
396    ///
397    /// This function has no arguments.
398    ///
399    /// # Returns
400    ///
401    /// Returns the number of declared children plus accepted dynamic manifests.
402    fn dynamic_child_count(&self) -> usize {
403        self.registry
404            .declaration_order()
405            .len()
406            .saturating_add(self.manifests.len())
407    }
408
409    /// Spawns one child attempt and reports the exit back to this control loop.
410    ///
411    /// # Arguments
412    ///
413    /// - `child_id`: Child that should run.
414    /// - `is_restart`: Whether this attempt is a restart attempt.
415    /// - `delay`: Delay before the attempt starts.
416    ///
417    /// # Returns
418    ///
419    /// This function does not return a value.
420    fn spawn_child_attempt(&mut self, child_id: ChildId, is_restart: bool, delay: Duration) {
421        let Some(runtime) = self.prepare_child_attempt(&child_id, is_restart) else {
422            return;
423        };
424        let sender = self.command_sender.clone();
425        tokio::spawn(async move {
426            if !delay.is_zero() {
427                tokio::time::sleep(delay).await;
428            }
429            let child_id = runtime.id.clone();
430            let result = ChildRunner::new().run_once(runtime).await;
431            send_child_result(sender, child_id, result).await;
432        });
433    }
434
435    /// Prepares registry state for one child attempt.
436    ///
437    /// # Arguments
438    ///
439    /// - `child_id`: Child that should run.
440    /// - `is_restart`: Whether this attempt is a restart attempt.
441    ///
442    /// # Returns
443    ///
444    /// Returns a runtime record for the child runner.
445    fn prepare_child_attempt(
446        &mut self,
447        child_id: &ChildId,
448        is_restart: bool,
449    ) -> Option<ChildRuntime> {
450        let runtime = self.registry.child_mut(child_id)?;
451        if is_restart {
452            runtime.attempt = runtime.attempt.next();
453            runtime.generation = runtime.generation.next();
454            runtime.restart_count = runtime.restart_count.saturating_add(1);
455        }
456        runtime.status = ChildRuntimeStatus::Starting;
457        self.children
458            .insert(child_id.clone(), ManagedChildState::Running);
459        Some(runtime.clone())
460    }
461}
462
463/// Runs the control loop until all command senders are dropped.
464///
465/// # Arguments
466///
467/// - `state`: Runtime state initialized from the supervisor specification.
468/// - `receiver`: Runtime command receiver.
469/// - `event_sender`: Event channel used for audit text.
470///
471/// # Returns
472///
473/// This function returns when `receiver` is closed.
474pub async fn run_control_loop(
475    mut state: RuntimeControlState,
476    mut receiver: mpsc::Receiver<RuntimeCommand>,
477    event_sender: broadcast::Sender<String>,
478) {
479    state.start_declared_children();
480    while let Some(message) = receiver.recv().await {
481        match message {
482            RuntimeCommand::Control {
483                command,
484                reply_sender,
485            } => {
486                let command_name = command_name(&command);
487                let result = state.execute_control(command);
488                let _ignored = event_sender.send(format!("control_command:{command_name}"));
489                let _ignored = reply_sender.send(result);
490            }
491            RuntimeCommand::ChildExited { report } => {
492                state.handle_child_exit(*report, &event_sender);
493            }
494            RuntimeCommand::ChildStartFailed { child_id, message } => {
495                state.handle_child_start_failed(child_id, message, &event_sender);
496            }
497        }
498    }
499}
500
501/// Returns a stable command name for audit text.
502///
503/// # Arguments
504///
505/// - `command`: Command being executed.
506///
507/// # Returns
508///
509/// Returns a static command name.
510fn command_name(command: &ControlCommand) -> &'static str {
511    match command {
512        ControlCommand::AddChild { .. } => "add_child",
513        ControlCommand::RemoveChild { .. } => "remove_child",
514        ControlCommand::RestartChild { .. } => "restart_child",
515        ControlCommand::PauseChild { .. } => "pause_child",
516        ControlCommand::ResumeChild { .. } => "resume_child",
517        ControlCommand::QuarantineChild { .. } => "quarantine_child",
518        ControlCommand::ShutdownTree { .. } => "shutdown_tree",
519        ControlCommand::CurrentState { .. } => "current_state",
520    }
521}
522
523/// Sends a child run result back to the control loop.
524///
525/// # Arguments
526///
527/// - `sender`: Runtime command sender.
528/// - `child_id`: Child identifier used when the run fails before reporting.
529/// - `result`: Child run result.
530///
531/// # Returns
532///
533/// This function does not return a value.
534async fn send_child_result(
535    sender: mpsc::Sender<RuntimeCommand>,
536    child_id: ChildId,
537    result: Result<ChildRunReport, SupervisorError>,
538) {
539    let message = match result {
540        Ok(report) => RuntimeCommand::ChildExited {
541            report: Box::new(report),
542        },
543        Err(error) => RuntimeCommand::ChildStartFailed {
544            child_id,
545            message: error.to_string(),
546        },
547    };
548    let _ignored = sender.send(message).await;
549}
550
551/// Maps child restart policy into policy-engine restart policy.
552///
553/// # Arguments
554///
555/// - `policy`: Restart policy stored on the child declaration.
556///
557/// # Returns
558///
559/// Returns the equivalent policy-engine value.
560fn restart_policy(policy: ChildRestartPolicy) -> RestartPolicy {
561    match policy {
562        ChildRestartPolicy::Permanent => RestartPolicy::Permanent,
563        ChildRestartPolicy::Transient => RestartPolicy::Transient,
564        ChildRestartPolicy::Temporary => RestartPolicy::Temporary,
565    }
566}
567
568/// Maps child backoff policy into policy-engine backoff policy.
569///
570/// # Arguments
571///
572/// - `policy`: Backoff policy stored on the child declaration.
573///
574/// # Returns
575///
576/// Returns the equivalent policy-engine value.
577fn backoff_policy(policy: crate::spec::child::BackoffPolicy) -> BackoffPolicy {
578    let jitter_percent = (policy.jitter_ratio * 100.0).round().clamp(0.0, 100.0) as u8;
579    BackoffPolicy::new(
580        policy.initial_delay,
581        policy.max_delay,
582        jitter_percent,
583        policy.max_delay,
584    )
585}
586
587/// Maps a child-runner exit into policy-engine task exit.
588///
589/// # Arguments
590///
591/// - `exit`: Exit reported by the child runner.
592///
593/// # Returns
594///
595/// Returns the policy-engine exit value.
596fn policy_task_exit(exit: &TaskExit) -> PolicyTaskExit {
597    match exit.failure_kind() {
598        Some(kind) => PolicyTaskExit::Failed { kind: kind.into() },
599        None => PolicyTaskExit::Succeeded,
600    }
601}
602
603/// Formats a restart scope for lifecycle events.
604///
605/// # Arguments
606///
607/// - `scope`: Child identifiers selected by strategy.
608///
609/// # Returns
610///
611/// Returns a comma-separated child identifier list.
612fn child_scope_label(scope: &[ChildId]) -> String {
613    scope
614        .iter()
615        .map(|child_id| child_id.value.clone())
616        .collect::<Vec<_>>()
617        .join(",")
618}