Skip to main content

rust_supervisor/child_runner/
runner.rs

1//! Minimal child runner.
2//!
3//! This module starts one child child_start_count, advances readiness state, and records
4//! the resulting task exit.
5
6use crate::child_runner::run_exit::TaskExit;
7use crate::error::types::SupervisorError;
8use crate::readiness::signal::{ReadinessPolicy, ReadinessState, ReadySignal};
9use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
10use crate::task::context::TaskContext;
11use tokio::sync::{watch, watch::Receiver};
12use tokio::task::{AbortHandle, JoinHandle};
13use tokio::time::Instant;
14use tokio_util::sync::CancellationToken;
15
16/// Result of running one child child_start_count.
17#[derive(Debug, Clone)]
18pub struct ChildRunReport {
19    /// Runtime record after the child_start_count.
20    pub runtime: ChildRuntime,
21    /// Final task exit classification.
22    pub exit: TaskExit,
23    /// Whether the task became ready during the child_start_count.
24    pub became_ready: bool,
25}
26
27/// Handle for one running child child_start_count.
28#[derive(Debug)]
29pub struct ChildRunHandle {
30    /// Runtime cancellation token shared with the task context.
31    pub cancellation_token: CancellationToken,
32    /// Abort handle attached to the real child future.
33    pub abort_handle: AbortHandle,
34    /// Receiver that observes the completed child run report.
35    pub completion_receiver: Receiver<Option<Result<ChildRunReport, SupervisorError>>>,
36    /// Receiver that observes the latest child heartbeat.
37    pub heartbeat_receiver: watch::Receiver<Option<Instant>>,
38    /// Receiver that observes the latest child readiness state.
39    pub readiness_receiver: watch::Receiver<ReadinessState>,
40}
41
42/// Runner that executes one child child_start_count.
43#[derive(Debug, Clone, Default)]
44pub struct ChildRunner;
45
46impl ChildRunner {
47    /// Creates a child runner.
48    ///
49    /// # Arguments
50    ///
51    /// This function has no arguments.
52    ///
53    /// # Returns
54    ///
55    /// Returns a [`ChildRunner`].
56    ///
57    /// # Examples
58    ///
59    /// ```
60    /// let _runner = rust_supervisor::child_runner::runner::ChildRunner::new();
61    /// ```
62    pub fn new() -> Self {
63        Self
64    }
65
66    /// Runs one child child_start_count.
67    ///
68    /// # Arguments
69    ///
70    /// - `runtime`: Runtime record for the child task.
71    ///
72    /// # Returns
73    ///
74    /// Returns a [`ChildRunReport`] when the child owns a task factory.
75    pub async fn run_once(&self, runtime: ChildRuntime) -> Result<ChildRunReport, SupervisorError> {
76        let mut completion_receiver = self.spawn_once(runtime)?.completion_receiver;
77        wait_for_report(&mut completion_receiver).await
78    }
79
80    /// Spawns one child task and returns cancellation and abort handles.
81    ///
82    /// # Arguments
83    ///
84    /// - `runtime`: Runtime record for the child task.
85    ///
86    /// # Returns
87    ///
88    /// Returns a [`ChildRunHandle`] when the child owns a task factory.
89    pub fn spawn_once(&self, mut runtime: ChildRuntime) -> Result<ChildRunHandle, SupervisorError> {
90        #[cfg(debug_assertions)]
91        if crate::test_support::child_spawn::take_child_spawn_failure_attempt(&runtime.id) {
92            return Err(SupervisorError::InvalidTransition {
93                message: "test hook: child spawn_once failure".to_owned(),
94            });
95        }
96        let factory =
97            runtime.spec.factory.clone().ok_or_else(|| {
98                SupervisorError::fatal_config("worker child requires a task factory")
99            })?;
100        runtime.status = ChildRuntimeStatus::Starting;
101        let (ready_signal, ready_receiver) = ReadySignal::new();
102        let cancellation_token = CancellationToken::new();
103        let (ctx, heartbeat_receiver) = TaskContext::with_ready_signal_and_cancellation_token(
104            runtime.id.clone(),
105            runtime.path.clone(),
106            runtime.generation,
107            runtime.child_start_count,
108            ready_signal,
109            cancellation_token.clone(),
110        );
111        mark_immediate_ready(runtime.spec.readiness_policy, &ctx, &mut runtime);
112        runtime.status = ChildRuntimeStatus::Running;
113        let (completion_sender, completion_receiver) = watch::channel(None);
114        let child_task = tokio::spawn(factory.build(ctx));
115        let abort_handle = child_task.abort_handle();
116        let run_ready_receiver = ready_receiver.clone();
117        tokio::spawn(async move {
118            let report = run_factory(runtime, run_ready_receiver, child_task).await;
119            let _ignored = completion_sender.send(Some(report));
120        });
121        Ok(ChildRunHandle {
122            cancellation_token,
123            abort_handle,
124            completion_receiver,
125            heartbeat_receiver,
126            readiness_receiver: ready_receiver.clone(),
127        })
128    }
129}
130
131/// Marks a runtime ready when policy requires immediate readiness.
132///
133/// # Arguments
134///
135/// - `policy`: Readiness policy attached to the child.
136/// - `ctx`: Task context that owns the readiness sender.
137/// - `runtime`: Runtime record whose status should advance.
138///
139/// # Returns
140///
141/// This function does not return a value.
142fn mark_immediate_ready(policy: ReadinessPolicy, ctx: &TaskContext, runtime: &mut ChildRuntime) {
143    if policy.is_immediate() {
144        ctx.mark_ready();
145        runtime.status = ChildRuntimeStatus::Ready;
146    }
147}
148
149/// Runs a factory and classifies the result.
150///
151/// # Arguments
152///
153/// - `factory`: Task factory for this child.
154/// - `ctx`: Per-child_start_count task context.
155///
156/// # Returns
157///
158/// Returns the classified task exit.
159async fn run_factory(
160    mut runtime: ChildRuntime,
161    ready_receiver: watch::Receiver<ReadinessState>,
162    task: JoinHandle<crate::task::factory::TaskResult>,
163) -> Result<ChildRunReport, SupervisorError> {
164    match task.await {
165        Ok(result) => {
166            let exit = TaskExit::from_task_result(result);
167            let became_ready = observe_ready(ready_receiver);
168            if became_ready {
169                runtime.status = ChildRuntimeStatus::Ready;
170            }
171            runtime.last_exit = Some(exit.clone());
172            Ok(ChildRunReport {
173                runtime,
174                exit,
175                became_ready,
176            })
177        }
178        Err(error) if error.is_panic() => {
179            let exit = TaskExit::Panicked(String::from("task panicked"));
180            runtime.last_exit = Some(exit.clone());
181            Ok(ChildRunReport {
182                runtime,
183                exit,
184                became_ready: observe_ready(ready_receiver),
185            })
186        }
187        Err(_error) => {
188            let exit = TaskExit::Cancelled;
189            runtime.last_exit = Some(exit.clone());
190            Ok(ChildRunReport {
191                runtime,
192                exit,
193                became_ready: observe_ready(ready_receiver),
194            })
195        }
196    }
197}
198
199/// Observes whether readiness was reported.
200///
201/// # Arguments
202///
203/// - `ready_receiver`: Receiver that stores the latest readiness value.
204///
205/// # Returns
206///
207/// Returns `true` when the receiver observed readiness.
208fn observe_ready(ready_receiver: watch::Receiver<ReadinessState>) -> bool {
209    matches!(*ready_receiver.borrow(), ReadinessState::Ready)
210}
211
212/// Waits for the report sender to publish a child run report.
213///
214/// # Arguments
215///
216/// - `completion_receiver`: Receiver published by the run observer task.
217///
218/// # Returns
219///
220/// Returns the completed run report.
221pub(crate) async fn wait_for_report(
222    completion_receiver: &mut Receiver<Option<Result<ChildRunReport, SupervisorError>>>,
223) -> Result<ChildRunReport, SupervisorError> {
224    loop {
225        if let Some(result) = completion_receiver.borrow().clone() {
226            return result;
227        }
228        if completion_receiver.changed().await.is_err() {
229            return Err(SupervisorError::InvalidTransition {
230                message: "child run report channel closed before completion".to_owned(),
231            });
232        }
233    }
234}