rust_supervisor/child_runner/
runner.rs1use crate::child_runner::attempt::TaskExit;
7use crate::error::types::SupervisorError;
8use crate::readiness::signal::{ReadinessPolicy, ReadySignal};
9use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
10use crate::task::context::TaskContext;
11use tokio::sync::watch;
12
13#[derive(Debug, Clone)]
15pub struct ChildRunReport {
16 pub runtime: ChildRuntime,
18 pub exit: TaskExit,
20 pub became_ready: bool,
22}
23
24#[derive(Debug, Clone, Default)]
26pub struct ChildRunner;
27
28impl ChildRunner {
29 pub fn new() -> Self {
45 Self
46 }
47
48 pub async fn run_once(
58 &self,
59 mut runtime: ChildRuntime,
60 ) -> Result<ChildRunReport, SupervisorError> {
61 let factory =
62 runtime.spec.factory.clone().ok_or_else(|| {
63 SupervisorError::fatal_config("worker child requires a task factory")
64 })?;
65 runtime.status = ChildRuntimeStatus::Starting;
66 let (ready_signal, ready_receiver) = ReadySignal::new();
67 let (ctx, _heartbeat_receiver) = TaskContext::with_ready_signal(
68 runtime.id.clone(),
69 runtime.path.clone(),
70 runtime.generation,
71 runtime.attempt,
72 ready_signal,
73 );
74 mark_immediate_ready(runtime.spec.readiness_policy, &ctx, &mut runtime);
75 runtime.status = ChildRuntimeStatus::Running;
76 let exit = run_factory(factory, ctx).await;
77 let became_ready = observe_ready(ready_receiver);
78 if became_ready {
79 runtime.status = ChildRuntimeStatus::Ready;
80 }
81 runtime.last_exit = Some(exit.clone());
82 Ok(ChildRunReport {
83 runtime,
84 exit,
85 became_ready,
86 })
87 }
88}
89
90fn mark_immediate_ready(policy: ReadinessPolicy, ctx: &TaskContext, runtime: &mut ChildRuntime) {
102 if policy.is_immediate() {
103 ctx.mark_ready();
104 runtime.status = ChildRuntimeStatus::Ready;
105 }
106}
107
108async fn run_factory(
119 factory: std::sync::Arc<dyn crate::task::factory::TaskFactory>,
120 ctx: TaskContext,
121) -> TaskExit {
122 let task = tokio::spawn(factory.build(ctx));
123 match task.await {
124 Ok(result) => TaskExit::from_task_result(result),
125 Err(error) if error.is_panic() => TaskExit::Panicked(String::from("task panicked")),
126 Err(_error) => TaskExit::Cancelled,
127 }
128}
129
130fn observe_ready(ready_receiver: watch::Receiver<bool>) -> bool {
140 *ready_receiver.borrow()
141}