Skip to main content

rust_supervisor/child_runner/
runner.rs

1//! Minimal child runner.
2//!
3//! This module starts one child attempt, advances readiness state, and records
4//! the resulting task exit.
5
6use crate::child_runner::attempt::TaskExit;
7use crate::error::types::SupervisorError;
8use crate::readiness::signal::{ReadinessPolicy, ReadySignal};
9use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
10use crate::task::context::TaskContext;
11use tokio::sync::watch;
12
13/// Result of running one child attempt.
14#[derive(Debug, Clone)]
15pub struct ChildRunReport {
16    /// Runtime record after the attempt.
17    pub runtime: ChildRuntime,
18    /// Final task exit classification.
19    pub exit: TaskExit,
20    /// Whether the task became ready during the attempt.
21    pub became_ready: bool,
22}
23
24/// Runner that executes one child attempt.
25#[derive(Debug, Clone, Default)]
26pub struct ChildRunner;
27
28impl ChildRunner {
29    /// Creates a child runner.
30    ///
31    /// # Arguments
32    ///
33    /// This function has no arguments.
34    ///
35    /// # Returns
36    ///
37    /// Returns a [`ChildRunner`].
38    ///
39    /// # Examples
40    ///
41    /// ```
42    /// let _runner = rust_supervisor::child_runner::runner::ChildRunner::new();
43    /// ```
44    pub fn new() -> Self {
45        Self
46    }
47
48    /// Runs one child attempt.
49    ///
50    /// # Arguments
51    ///
52    /// - `runtime`: Runtime record for the child attempt.
53    ///
54    /// # Returns
55    ///
56    /// Returns a [`ChildRunReport`] when the child owns a task factory.
57    pub async fn run_once(
58        &self,
59        mut runtime: ChildRuntime,
60    ) -> Result<ChildRunReport, SupervisorError> {
61        let factory =
62            runtime.spec.factory.clone().ok_or_else(|| {
63                SupervisorError::fatal_config("worker child requires a task factory")
64            })?;
65        runtime.status = ChildRuntimeStatus::Starting;
66        let (ready_signal, ready_receiver) = ReadySignal::new();
67        let (ctx, _heartbeat_receiver) = TaskContext::with_ready_signal(
68            runtime.id.clone(),
69            runtime.path.clone(),
70            runtime.generation,
71            runtime.attempt,
72            ready_signal,
73        );
74        mark_immediate_ready(runtime.spec.readiness_policy, &ctx, &mut runtime);
75        runtime.status = ChildRuntimeStatus::Running;
76        let exit = run_factory(factory, ctx).await;
77        let became_ready = observe_ready(ready_receiver);
78        if became_ready {
79            runtime.status = ChildRuntimeStatus::Ready;
80        }
81        runtime.last_exit = Some(exit.clone());
82        Ok(ChildRunReport {
83            runtime,
84            exit,
85            became_ready,
86        })
87    }
88}
89
90/// Marks a runtime ready when policy requires immediate readiness.
91///
92/// # Arguments
93///
94/// - `policy`: Readiness policy attached to the child.
95/// - `ctx`: Task context that owns the readiness sender.
96/// - `runtime`: Runtime record whose status should advance.
97///
98/// # Returns
99///
100/// This function does not return a value.
101fn mark_immediate_ready(policy: ReadinessPolicy, ctx: &TaskContext, runtime: &mut ChildRuntime) {
102    if policy.is_immediate() {
103        ctx.mark_ready();
104        runtime.status = ChildRuntimeStatus::Ready;
105    }
106}
107
108/// Runs a factory and classifies the result.
109///
110/// # Arguments
111///
112/// - `factory`: Task factory for this child.
113/// - `ctx`: Per-attempt task context.
114///
115/// # Returns
116///
117/// Returns the classified task exit.
118async fn run_factory(
119    factory: std::sync::Arc<dyn crate::task::factory::TaskFactory>,
120    ctx: TaskContext,
121) -> TaskExit {
122    let task = tokio::spawn(factory.build(ctx));
123    match task.await {
124        Ok(result) => TaskExit::from_task_result(result),
125        Err(error) if error.is_panic() => TaskExit::Panicked(String::from("task panicked")),
126        Err(_error) => TaskExit::Cancelled,
127    }
128}
129
130/// Observes whether readiness was reported.
131///
132/// # Arguments
133///
134/// - `ready_receiver`: Receiver that stores the latest readiness value.
135///
136/// # Returns
137///
138/// Returns `true` when the receiver observed readiness.
139fn observe_ready(ready_receiver: watch::Receiver<bool>) -> bool {
140    *ready_receiver.borrow()
141}