use crate::child_runner::attempt::TaskExit;
use crate::error::types::SupervisorError;
use crate::readiness::signal::{ReadinessPolicy, ReadySignal};
use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
use crate::task::context::TaskContext;
use tokio::sync::watch;
#[derive(Debug, Clone)]
pub struct ChildRunReport {
pub runtime: ChildRuntime,
pub exit: TaskExit,
pub became_ready: bool,
}
#[derive(Debug, Clone, Default)]
pub struct ChildRunner;
impl ChildRunner {
pub fn new() -> Self {
Self
}
pub async fn run_once(
&self,
mut runtime: ChildRuntime,
) -> Result<ChildRunReport, SupervisorError> {
let factory =
runtime.spec.factory.clone().ok_or_else(|| {
SupervisorError::fatal_config("worker child requires a task factory")
})?;
runtime.status = ChildRuntimeStatus::Starting;
let (ready_signal, ready_receiver) = ReadySignal::new();
let (ctx, _heartbeat_receiver) = TaskContext::with_ready_signal(
runtime.id.clone(),
runtime.path.clone(),
runtime.generation,
runtime.attempt,
ready_signal,
);
mark_immediate_ready(runtime.spec.readiness_policy, &ctx, &mut runtime);
runtime.status = ChildRuntimeStatus::Running;
let exit = run_factory(factory, ctx).await;
let became_ready = observe_ready(ready_receiver);
if became_ready {
runtime.status = ChildRuntimeStatus::Ready;
}
runtime.last_exit = Some(exit.clone());
Ok(ChildRunReport {
runtime,
exit,
became_ready,
})
}
}
fn mark_immediate_ready(policy: ReadinessPolicy, ctx: &TaskContext, runtime: &mut ChildRuntime) {
if policy.is_immediate() {
ctx.mark_ready();
runtime.status = ChildRuntimeStatus::Ready;
}
}
async fn run_factory(
factory: std::sync::Arc<dyn crate::task::factory::TaskFactory>,
ctx: TaskContext,
) -> TaskExit {
let task = tokio::spawn(factory.build(ctx));
match task.await {
Ok(result) => TaskExit::from_task_result(result),
Err(error) if error.is_panic() => TaskExit::Panicked(String::from("task panicked")),
Err(_error) => TaskExit::Cancelled,
}
}
fn observe_ready(ready_receiver: watch::Receiver<bool>) -> bool {
*ready_receiver.borrow()
}