use std::{sync::Arc, time::Duration};
use tokio::sync::broadcast;
use crate::error::RuntimeError;
use crate::events::EventKind;
use crate::tasks::TaskSpec;
use super::supervisor::Supervisor;
#[derive(Clone)]
pub struct SupervisorHandle {
inner: Arc<Supervisor>,
}
impl std::fmt::Debug for SupervisorHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SupervisorHandle")
.field("supervisor", &self.inner)
.finish()
}
}
impl SupervisorHandle {
pub(crate) fn new(supervisor: Arc<Supervisor>) -> Self {
Self { inner: supervisor }
}
pub fn add(&self, spec: TaskSpec) -> Result<(), RuntimeError> {
self.inner.add_task(spec)
}
pub async fn add_and_wait(
&self,
spec: TaskSpec,
timeout: Duration,
) -> Result<(), RuntimeError> {
let target: Arc<str> = Arc::from(spec.task().name());
let mut rx = self.inner.subscribe_bus();
self.inner.add_task(spec)?;
let target2 = Arc::clone(&target);
let wait = async move {
loop {
match rx.recv().await {
Ok(ev)
if ev.kind == EventKind::TaskAdded
&& ev.task.as_deref() == Some(&*target2) =>
{
return Ok(());
}
Ok(_) => continue,
Err(broadcast::error::RecvError::Lagged(_)) => {
if self.inner.registry_contains(&target2).await {
return Ok(());
}
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
}
}
if self.inner.registry_contains(&target2).await {
Ok(())
} else {
Err(RuntimeError::TaskAddTimeout {
name: target2,
timeout,
})
}
};
match tokio::time::timeout(timeout, wait).await {
Ok(result) => result,
Err(_) => Err(RuntimeError::TaskAddTimeout {
name: target,
timeout,
}),
}
}
pub fn remove(&self, name: &str) -> Result<(), RuntimeError> {
self.inner.remove_task(name)
}
pub async fn list(&self) -> Vec<Arc<str>> {
self.inner.list_tasks().await
}
pub async fn snapshot(&self) -> Vec<Arc<str>> {
self.inner.snapshot().await
}
pub async fn is_alive(&self, name: &str) -> bool {
self.inner.is_alive(name).await
}
pub async fn cancel(&self, name: &str) -> Result<bool, RuntimeError> {
self.inner.cancel(name).await
}
pub async fn cancel_with_timeout(
&self,
name: &str,
wait_for: Duration,
) -> Result<bool, RuntimeError> {
self.inner.cancel_with_timeout(name, wait_for).await
}
pub async fn shutdown(self) -> Result<(), RuntimeError> {
self.inner.shutdown().await
}
#[cfg(feature = "controller")]
pub async fn submit(
&self,
spec: crate::controller::ControllerSpec,
) -> Result<(), crate::controller::ControllerError> {
self.inner.submit(spec).await
}
#[cfg(feature = "controller")]
pub fn try_submit(
&self,
spec: crate::controller::ControllerSpec,
) -> Result<(), crate::controller::ControllerError> {
self.inner.try_submit(spec)
}
}