use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
#[derive(Clone)]
pub struct ShutdownSignal {
rx: watch::Receiver<bool>,
}
impl ShutdownSignal {
pub fn is_shutdown(&self) -> bool {
*self.rx.borrow()
}
pub async fn wait(&mut self) {
while !*self.rx.borrow() {
if self.rx.changed().await.is_err() {
return; }
}
}
}
pub struct ShutdownController {
tx: watch::Sender<bool>,
}
impl ShutdownController {
pub fn new() -> (Self, ShutdownSignal) {
let (tx, rx) = watch::channel(false);
(Self { tx }, ShutdownSignal { rx })
}
pub fn shutdown(&self) {
let _ = self.tx.send(true);
tracing::info!("shutdown signaled");
}
pub fn signal(&self) -> ShutdownSignal {
ShutdownSignal {
rx: self.tx.subscribe(),
}
}
}
impl Default for ShutdownController {
fn default() -> Self {
Self::new().0
}
}
pub fn install_signal_handler() -> (Arc<ShutdownController>, ShutdownSignal) {
let (controller, signal) = ShutdownController::new();
let controller = Arc::new(controller);
let ctrl = controller.clone();
tokio::spawn(async move {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
tokio::select! {
_ = ctrl_c => {
tracing::info!("received SIGINT");
}
_ = sigterm.recv() => {
tracing::info!("received SIGTERM");
}
}
}
#[cfg(not(unix))]
{
let _ = ctrl_c.await;
tracing::info!("received SIGINT");
}
ctrl.shutdown();
});
(controller, signal)
}
pub async fn shutdown_with_timeout<F, Fut>(
signal: &mut ShutdownSignal,
timeout: Duration,
drain_fn: F,
) where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = ()>,
{
signal.wait().await;
tracing::info!(
"shutdown initiated, draining in-flight requests (timeout: {}s)...",
timeout.as_secs()
);
match tokio::time::timeout(timeout, drain_fn()).await {
Ok(()) => {
tracing::info!("graceful shutdown complete");
}
Err(_) => {
tracing::warn!("shutdown timeout exceeded, forcing exit");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn signal_starts_false() {
let (_controller, signal) = ShutdownController::new();
assert!(!signal.is_shutdown());
}
#[test]
fn signal_after_shutdown() {
let (controller, signal) = ShutdownController::new();
controller.shutdown();
assert!(signal.is_shutdown());
}
#[test]
fn multiple_signals() {
let (controller, _signal) = ShutdownController::new();
let s1 = controller.signal();
let s2 = controller.signal();
assert!(!s1.is_shutdown());
assert!(!s2.is_shutdown());
controller.shutdown();
assert!(s1.is_shutdown());
assert!(s2.is_shutdown());
}
#[tokio::test]
async fn wait_for_shutdown() {
let (controller, mut signal) = ShutdownController::new();
let handle = tokio::spawn(async move {
signal.wait().await;
true
});
tokio::time::sleep(Duration::from_millis(10)).await;
controller.shutdown();
let result = handle.await.unwrap();
assert!(result);
}
#[tokio::test]
async fn shutdown_with_timeout_completes() {
let (controller, mut signal) = ShutdownController::new();
controller.shutdown();
shutdown_with_timeout(&mut signal, Duration::from_secs(5), || async {
tokio::time::sleep(Duration::from_millis(10)).await;
})
.await;
}
}