use super::error::SupervisorError;
use super::runtime::{SupervisorCommand, SupervisorRuntime};
use super::spec::SupervisorSpec;
use crate::restart::RestartPolicy;
use crate::types::{ChildId, ChildInfo};
use crate::worker::{Worker, WorkerSpec};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
#[derive(Clone)]
pub struct SupervisorHandle<W: Worker> {
pub(crate) name: Arc<String>,
pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
}
impl<W: Worker> SupervisorHandle<W> {
#[must_use]
pub fn start(spec: SupervisorSpec<W>) -> Self {
let (control_tx, control_rx) = mpsc::unbounded_channel();
let name_arc = Arc::new(spec.name.clone());
let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
let runtime_name = Arc::clone(&name_arc);
tokio::spawn(async move {
runtime.run().await;
tracing::debug!(name = %*runtime_name, "supervisor stopped");
});
Self {
name: name_arc,
control_tx,
}
}
pub async fn start_child(
&self,
id: impl Into<String>,
factory: impl Fn() -> W + Send + Sync + 'static,
restart_policy: RestartPolicy,
) -> Result<ChildId, SupervisorError> {
let (result_tx, result_rx) = oneshot::channel();
let spec = WorkerSpec::new(id, factory, restart_policy);
self.control_tx
.send(SupervisorCommand::StartChild {
spec,
respond_to: result_tx,
})
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
result_rx
.await
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
}
pub async fn start_child_linked(
&self,
id: impl Into<String>,
factory: impl Fn() -> W + Send + Sync + 'static,
restart_policy: RestartPolicy,
timeout: std::time::Duration,
) -> Result<ChildId, SupervisorError> {
let (result_tx, result_rx) = oneshot::channel();
let spec = WorkerSpec::new(id, factory, restart_policy);
self.control_tx
.send(SupervisorCommand::StartChildLinked {
spec,
timeout,
respond_to: result_tx,
})
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
result_rx
.await
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
}
pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
let (result_tx, result_rx) = oneshot::channel();
self.control_tx
.send(SupervisorCommand::TerminateChild {
id: id.to_owned(),
respond_to: result_tx,
})
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
result_rx
.await
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
}
pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
let (result_tx, result_rx) = oneshot::channel();
self.control_tx
.send(SupervisorCommand::WhichChildren {
respond_to: result_tx,
})
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
result_rx
.await
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
}
#[allow(clippy::unused_async)]
pub async fn shutdown(&self) -> Result<(), SupervisorError> {
self.control_tx
.send(SupervisorCommand::Shutdown)
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
Ok(())
}
#[must_use]
pub fn name(&self) -> &str {
self.name.as_str()
}
pub async fn restart_strategy(
&self,
) -> Result<crate::restart::RestartStrategy, SupervisorError> {
let (result_tx, result_rx) = oneshot::channel();
self.control_tx
.send(SupervisorCommand::GetRestartStrategy {
respond_to: result_tx,
})
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
result_rx
.await
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
}
pub async fn uptime(&self) -> Result<u64, SupervisorError> {
let (result_tx, result_rx) = oneshot::channel();
self.control_tx
.send(SupervisorCommand::GetUptime {
respond_to: result_tx,
})
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
result_rx
.await
.map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
}
}