use std::time::Duration;
use tokio::sync::SetOnce;
use tokio::sync::watch;
use tokio::time::timeout;
use tracing::warn;
use crate::InitError;
use crate::Module;
use crate::PostInitError;
use crate::PreInitError;
pub struct Shutdown {
ongoing: SetOnce<Ongoing>,
blockers: watch::Sender<()>,
setup: ShutdownSetup,
}
#[derive(Debug)]
pub struct ShutdownSetup {
pub grace_period: Duration,
}
impl Default for ShutdownSetup {
fn default() -> Self {
Self {
grace_period: Duration::from_secs(1),
}
}
}
impl Shutdown {
pub fn block(&self) -> impl Drop + Send + Sync + 'static {
self.blockers.subscribe()
}
pub fn has_started(&self) -> bool {
self.ongoing.initialized()
}
pub fn is_done(&self) -> bool {
self.ongoing.get().is_some_and(Ongoing::is_done)
}
pub async fn wait_for_started(&self) {
self.ongoing.wait().await;
}
pub async fn wait_for_done(&self) {
let ongoing = self.ongoing.wait().await;
ongoing.wait_done().await;
}
pub fn start(&self) {
let _ = self.ongoing.set(Ongoing(SetOnce::new()));
}
pub fn force_done(&self) {
let _ = self.ongoing.set(Ongoing(SetOnce::new_with(Some(()))));
}
}
struct Ongoing(SetOnce<()>);
impl Ongoing {
fn is_done(&self) -> bool {
self.0.initialized()
}
async fn wait_done(&self) {
self.0.wait().await;
}
fn set_done(&self) {
let _ = self.0.set(());
}
}
impl Module for Shutdown {
type Setup = ShutdownSetup;
type PreInit = ShutdownPreInit;
async fn pre_init(setup: Self::Setup) -> Result<Self::PreInit, PreInitError> {
Ok(ShutdownPreInit { setup })
}
type Dependencies = ();
async fn init(
pre_init: Self::PreInit,
_dependencies: &mut Self::Dependencies,
) -> Result<Self, InitError> {
Ok(Shutdown {
ongoing: SetOnce::new(),
blockers: Default::default(),
setup: pre_init.setup,
})
}
async fn post_init(&'static self) -> Result<(), PostInitError> {
tokio::spawn(async move {
let ongoing = self.ongoing.wait().await;
if let Err(_elapsed) = timeout(self.setup.grace_period, self.blockers.closed()).await {
warn!(grace_period = ?self.setup.grace_period, "Graceful shutdown reached timeout. Forcing shutdown...");
}
ongoing.set_done();
});
Ok(())
}
}
pub struct ShutdownPreInit {
setup: ShutdownSetup,
}