use std::sync::OnceLock;
use tokio_util::sync::CancellationToken;
static TOKEN: OnceLock<CancellationToken> = OnceLock::new();
pub fn token() -> CancellationToken {
TOKEN.get_or_init(CancellationToken::new).clone()
}
pub fn is_shutdown() -> bool {
TOKEN.get().is_some_and(CancellationToken::is_cancelled)
}
pub fn trigger() {
if let Some(t) = TOKEN.get() {
t.cancel();
}
}
#[must_use]
pub fn install_signal_handler() -> CancellationToken {
let t = token();
let cancel = t.clone();
tokio::spawn(async move {
wait_for_signal().await;
let prestop_delay = prestop_delay_secs();
if prestop_delay > 0 {
#[cfg(feature = "logger")]
tracing::info!(
delay_secs = prestop_delay,
"Pre-stop delay: waiting for K8s endpoint removal"
);
tokio::time::sleep(std::time::Duration::from_secs(prestop_delay)).await;
}
#[cfg(any(feature = "metrics", feature = "otel-metrics"))]
if crate::env::runtime_context().is_kubernetes() {
metrics::counter!("pod_eviction_received_total").increment(1);
}
cancel.cancel();
#[cfg(feature = "logger")]
tracing::info!("Shutdown signal received, cancelling all tasks");
});
t
}
fn prestop_delay_secs() -> u64 {
if let Ok(val) = std::env::var("PRESTOP_DELAY_SECS")
&& let Ok(secs) = val.parse::<u64>()
{
return secs;
}
if crate::env::runtime_context().is_kubernetes() {
5
} else {
0
}
}
async fn wait_for_signal() {
let ctrl_c = async {
if let Err(e) = tokio::signal::ctrl_c().await {
tracing::error!(error = %e, "Failed to install Ctrl+C handler");
std::future::pending::<()>().await;
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(e) => {
tracing::error!(
error = %e,
"Failed to install SIGTERM handler, only Ctrl+C will trigger shutdown"
);
std::future::pending::<()>().await;
}
}
};
#[cfg(unix)]
tokio::select! {
() = ctrl_c => {},
() = terminate => {},
}
#[cfg(not(unix))]
ctrl_c.await;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_is_not_cancelled_initially() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
}
#[test]
fn trigger_cancels_token() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn token_is_cloneable_and_shared() {
let t = CancellationToken::new();
let c1 = t.clone();
let c2 = t.clone();
assert!(!c1.is_cancelled());
assert!(!c2.is_cancelled());
t.cancel();
assert!(c1.is_cancelled());
assert!(c2.is_cancelled());
}
#[test]
fn multiple_triggers_are_idempotent() {
let t = CancellationToken::new();
t.cancel();
t.cancel(); assert!(t.is_cancelled());
}
#[tokio::test]
async fn cancelled_future_resolves_after_cancel() {
let t = CancellationToken::new();
let c = t.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
c.cancel();
});
t.cancelled().await;
assert!(t.is_cancelled());
}
#[tokio::test]
async fn child_token_cancelled_by_parent() {
let parent = CancellationToken::new();
let child = parent.child_token();
assert!(!child.is_cancelled());
parent.cancel();
assert!(child.is_cancelled());
}
}