use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use super::context::BootstrapCtx;
use super::errors::{BootstrapError, ShutdownError};
use super::health::SubsystemHealth;
pub struct SubsystemHandle {
task: Option<JoinHandle<()>>,
pub shutdown_tx: watch::Sender<bool>,
pub name: &'static str,
}
impl SubsystemHandle {
pub fn new(name: &'static str, task: JoinHandle<()>, shutdown_tx: watch::Sender<bool>) -> Self {
Self {
task: Some(task),
shutdown_tx,
name,
}
}
pub async fn shutdown_and_wait(mut self, deadline: Instant) -> Result<(), ShutdownError> {
let _ = self.shutdown_tx.send(true);
let task = match self.task.take() {
Some(t) => t,
None => return Ok(()),
};
let timeout = deadline.saturating_duration_since(Instant::now());
match tokio::time::timeout(timeout, task).await {
Ok(Ok(())) => Ok(()),
Ok(Err(join_err)) if join_err.is_panic() => {
Err(ShutdownError::Panicked { name: self.name })
}
Ok(Err(_)) => {
Ok(())
}
Err(_elapsed) => Err(ShutdownError::DeadlineExceeded { name: self.name }),
}
}
}
impl Drop for SubsystemHandle {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(true);
}
}
#[async_trait]
pub trait ClusterSubsystem: Send + Sync {
fn name(&self) -> &'static str;
fn dependencies(&self) -> &'static [&'static str];
async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError>;
async fn shutdown(&self, deadline: Instant) -> Result<(), ShutdownError>;
fn health(&self) -> SubsystemHealth;
}