use crate::types::{ChildExitReason, ChildId};
use crate::worker::Worker;
use tokio::sync::mpsc;
pub(crate) struct WorkerTermination {
pub id: ChildId,
pub reason: ChildExitReason,
}
pub(crate) async fn run_worker<W: Worker, Cmd>(
supervisor_name: String,
worker_id: ChildId,
mut worker: W,
control_tx: mpsc::UnboundedSender<Cmd>,
init_tx: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
) where
Cmd: From<WorkerTermination>,
{
let qualified_name = format!("{supervisor_name}/{worker_id}");
match worker.initialize().await {
Ok(()) => {
if let Some(tx) = init_tx {
let _send = tx.send(Ok(()));
}
}
Err(err) => {
tracing::error!(
worker = %qualified_name,
error = %err,
"worker initialization failed"
);
if let Some(tx) = init_tx {
let _send = tx.send(Err(err.to_string()));
}
let _send = control_tx.send(
WorkerTermination {
id: worker_id,
reason: ChildExitReason::Abnormal,
}
.into(),
);
return;
}
}
tracing::debug!(worker = %qualified_name, "worker started");
let exit_reason = match worker.run().await {
Ok(()) => {
tracing::debug!(worker = %qualified_name, "worker completed normally");
ChildExitReason::Normal
}
Err(err) => {
tracing::warn!(
worker = %qualified_name,
error = %err,
"worker failed"
);
ChildExitReason::Abnormal
}
};
if let Err(err) = worker.shutdown().await {
tracing::error!(
worker = %qualified_name,
error = %err,
"worker shutdown failed"
);
}
tracing::debug!(worker = %qualified_name, "worker stopped");
let _send = control_tx.send(
WorkerTermination {
id: worker_id,
reason: exit_reason,
}
.into(),
);
}