rust_supervisor/child_runner/
runner.rs1use crate::child_runner::run_exit::TaskExit;
7use crate::error::types::SupervisorError;
8use crate::readiness::signal::{ReadinessPolicy, ReadinessState, ReadySignal};
9use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
10use crate::task::context::TaskContext;
11use tokio::sync::{watch, watch::Receiver};
12use tokio::task::{AbortHandle, JoinHandle};
13use tokio::time::Instant;
14use tokio_util::sync::CancellationToken;
15
16#[derive(Debug, Clone)]
18pub struct ChildRunReport {
19 pub runtime: ChildRuntime,
21 pub exit: TaskExit,
23 pub became_ready: bool,
25}
26
27#[derive(Debug)]
29pub struct ChildRunHandle {
30 pub cancellation_token: CancellationToken,
32 pub abort_handle: AbortHandle,
34 pub completion_receiver: Receiver<Option<Result<ChildRunReport, SupervisorError>>>,
36 pub heartbeat_receiver: watch::Receiver<Option<Instant>>,
38 pub readiness_receiver: watch::Receiver<ReadinessState>,
40}
41
42#[derive(Debug, Clone, Default)]
44pub struct ChildRunner;
45
46impl ChildRunner {
47 pub fn new() -> Self {
63 Self
64 }
65
66 pub async fn run_once(&self, runtime: ChildRuntime) -> Result<ChildRunReport, SupervisorError> {
76 let mut completion_receiver = self.spawn_once(runtime)?.completion_receiver;
77 wait_for_report(&mut completion_receiver).await
78 }
79
80 pub fn spawn_once(&self, mut runtime: ChildRuntime) -> Result<ChildRunHandle, SupervisorError> {
90 #[cfg(debug_assertions)]
91 if crate::test_support::child_spawn::take_child_spawn_failure_attempt(&runtime.id) {
92 return Err(SupervisorError::InvalidTransition {
93 message: "test hook: child spawn_once failure".to_owned(),
94 });
95 }
96 let factory =
97 runtime.spec.factory.clone().ok_or_else(|| {
98 SupervisorError::fatal_config("worker child requires a task factory")
99 })?;
100 runtime.status = ChildRuntimeStatus::Starting;
101 let (ready_signal, ready_receiver) = ReadySignal::new();
102 let cancellation_token = CancellationToken::new();
103 let (ctx, heartbeat_receiver) = TaskContext::with_ready_signal_and_cancellation_token(
104 runtime.id.clone(),
105 runtime.path.clone(),
106 runtime.generation,
107 runtime.child_start_count,
108 ready_signal,
109 cancellation_token.clone(),
110 );
111 mark_immediate_ready(runtime.spec.readiness_policy, &ctx, &mut runtime);
112 runtime.status = ChildRuntimeStatus::Running;
113 let (completion_sender, completion_receiver) = watch::channel(None);
114 let child_task = tokio::spawn(factory.build(ctx));
115 let abort_handle = child_task.abort_handle();
116 let run_ready_receiver = ready_receiver.clone();
117 tokio::spawn(async move {
118 let report = run_factory(runtime, run_ready_receiver, child_task).await;
119 let _ignored = completion_sender.send(Some(report));
120 });
121 Ok(ChildRunHandle {
122 cancellation_token,
123 abort_handle,
124 completion_receiver,
125 heartbeat_receiver,
126 readiness_receiver: ready_receiver.clone(),
127 })
128 }
129}
130
131fn mark_immediate_ready(policy: ReadinessPolicy, ctx: &TaskContext, runtime: &mut ChildRuntime) {
143 if policy.is_immediate() {
144 ctx.mark_ready();
145 runtime.status = ChildRuntimeStatus::Ready;
146 }
147}
148
149async fn run_factory(
160 mut runtime: ChildRuntime,
161 ready_receiver: watch::Receiver<ReadinessState>,
162 task: JoinHandle<crate::task::factory::TaskResult>,
163) -> Result<ChildRunReport, SupervisorError> {
164 match task.await {
165 Ok(result) => {
166 let exit = TaskExit::from_task_result(result);
167 let became_ready = observe_ready(ready_receiver);
168 if became_ready {
169 runtime.status = ChildRuntimeStatus::Ready;
170 }
171 runtime.last_exit = Some(exit.clone());
172 Ok(ChildRunReport {
173 runtime,
174 exit,
175 became_ready,
176 })
177 }
178 Err(error) if error.is_panic() => {
179 let exit = TaskExit::Panicked(String::from("task panicked"));
180 runtime.last_exit = Some(exit.clone());
181 Ok(ChildRunReport {
182 runtime,
183 exit,
184 became_ready: observe_ready(ready_receiver),
185 })
186 }
187 Err(_error) => {
188 let exit = TaskExit::Cancelled;
189 runtime.last_exit = Some(exit.clone());
190 Ok(ChildRunReport {
191 runtime,
192 exit,
193 became_ready: observe_ready(ready_receiver),
194 })
195 }
196 }
197}
198
199fn observe_ready(ready_receiver: watch::Receiver<ReadinessState>) -> bool {
209 matches!(*ready_receiver.borrow(), ReadinessState::Ready)
210}
211
212pub(crate) async fn wait_for_report(
222 completion_receiver: &mut Receiver<Option<Result<ChildRunReport, SupervisorError>>>,
223) -> Result<ChildRunReport, SupervisorError> {
224 loop {
225 if let Some(result) = completion_receiver.borrow().clone() {
226 return result;
227 }
228 if completion_receiver.changed().await.is_err() {
229 return Err(SupervisorError::InvalidTransition {
230 message: "child run report channel closed before completion".to_owned(),
231 });
232 }
233 }
234}