use crate::child_runner::run_exit::TaskExit;
use crate::error::types::SupervisorError;
use crate::readiness::signal::{ReadinessPolicy, ReadinessState, ReadySignal};
use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
use crate::task::context::TaskContext;
use tokio::sync::{watch, watch::Receiver};
use tokio::task::{AbortHandle, JoinHandle};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct ChildRunReport {
pub runtime: ChildRuntime,
pub exit: TaskExit,
pub became_ready: bool,
}
#[derive(Debug)]
pub struct ChildRunHandle {
pub cancellation_token: CancellationToken,
pub abort_handle: AbortHandle,
pub completion_receiver: Receiver<Option<Result<ChildRunReport, SupervisorError>>>,
pub heartbeat_receiver: watch::Receiver<Option<Instant>>,
pub readiness_receiver: watch::Receiver<ReadinessState>,
}
#[derive(Debug, Clone, Default)]
pub struct ChildRunner;
impl ChildRunner {
pub fn new() -> Self {
Self
}
pub async fn run_once(&self, runtime: ChildRuntime) -> Result<ChildRunReport, SupervisorError> {
let mut completion_receiver = self.spawn_once(runtime)?.completion_receiver;
wait_for_report(&mut completion_receiver).await
}
pub fn spawn_once(&self, mut runtime: ChildRuntime) -> Result<ChildRunHandle, SupervisorError> {
#[cfg(debug_assertions)]
if crate::test_support::child_spawn::take_child_spawn_failure_attempt(&runtime.id) {
return Err(SupervisorError::InvalidTransition {
message: "test hook: child spawn_once failure".to_owned(),
});
}
let factory =
runtime.spec.factory.clone().ok_or_else(|| {
SupervisorError::fatal_config("worker child requires a task factory")
})?;
runtime.status = ChildRuntimeStatus::Starting;
let (ready_signal, ready_receiver) = ReadySignal::new();
let cancellation_token = CancellationToken::new();
let (ctx, heartbeat_receiver) = TaskContext::with_ready_signal_and_cancellation_token(
runtime.id.clone(),
runtime.path.clone(),
runtime.generation,
runtime.child_start_count,
ready_signal,
cancellation_token.clone(),
);
mark_immediate_ready(runtime.spec.readiness_policy, &ctx, &mut runtime);
runtime.status = ChildRuntimeStatus::Running;
let (completion_sender, completion_receiver) = watch::channel(None);
let child_task = tokio::spawn(factory.build(ctx));
let abort_handle = child_task.abort_handle();
let run_ready_receiver = ready_receiver.clone();
tokio::spawn(async move {
let report = run_factory(runtime, run_ready_receiver, child_task).await;
let _ignored = completion_sender.send(Some(report));
});
Ok(ChildRunHandle {
cancellation_token,
abort_handle,
completion_receiver,
heartbeat_receiver,
readiness_receiver: ready_receiver.clone(),
})
}
}
fn mark_immediate_ready(policy: ReadinessPolicy, ctx: &TaskContext, runtime: &mut ChildRuntime) {
if policy.is_immediate() {
ctx.mark_ready();
runtime.status = ChildRuntimeStatus::Ready;
}
}
async fn run_factory(
mut runtime: ChildRuntime,
ready_receiver: watch::Receiver<ReadinessState>,
task: JoinHandle<crate::task::factory::TaskResult>,
) -> Result<ChildRunReport, SupervisorError> {
match task.await {
Ok(result) => {
let exit = TaskExit::from_task_result(result);
let became_ready = observe_ready(ready_receiver);
if became_ready {
runtime.status = ChildRuntimeStatus::Ready;
}
runtime.last_exit = Some(exit.clone());
Ok(ChildRunReport {
runtime,
exit,
became_ready,
})
}
Err(error) if error.is_panic() => {
let exit = TaskExit::Panicked(String::from("task panicked"));
runtime.last_exit = Some(exit.clone());
Ok(ChildRunReport {
runtime,
exit,
became_ready: observe_ready(ready_receiver),
})
}
Err(_error) => {
let exit = TaskExit::Cancelled;
runtime.last_exit = Some(exit.clone());
Ok(ChildRunReport {
runtime,
exit,
became_ready: observe_ready(ready_receiver),
})
}
}
}
fn observe_ready(ready_receiver: watch::Receiver<ReadinessState>) -> bool {
matches!(*ready_receiver.borrow(), ReadinessState::Ready)
}
pub(crate) async fn wait_for_report(
completion_receiver: &mut Receiver<Option<Result<ChildRunReport, SupervisorError>>>,
) -> Result<ChildRunReport, SupervisorError> {
loop {
if let Some(result) = completion_receiver.borrow().clone() {
return result;
}
if completion_receiver.changed().await.is_err() {
return Err(SupervisorError::InvalidTransition {
message: "child run report channel closed before completion".to_owned(),
});
}
}
}