use tokio::signal;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Signal {
Int,
Term,
Quit,
Hup,
User1,
User2,
}
pub async fn try_wait() -> std::io::Result<()> {
#[cfg(feature = "tracing")]
tracing::debug!("waiting for shutdown signal (Ctrl+C or SIGTERM)");
let ctrl_c = signal::ctrl_c();
#[cfg(unix)]
let mut terminate = signal::unix::signal(signal::unix::SignalKind::terminate())?;
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
res = ctrl_c => {
#[cfg(feature = "tracing")]
tracing::info!("received Ctrl+C signal");
res
},
_ = async {
#[cfg(unix)]
{
terminate.recv().await;
}
#[cfg(not(unix))]
{
let () = terminate.await;
}
} => {
#[cfg(feature = "tracing")]
tracing::info!("received SIGTERM signal");
Ok(())
},
}
}
pub async fn wait() {
try_wait().await.expect("failed to install signal handler");
}
#[cfg(all(unix, feature = "rt", feature = "sync"))]
pub async fn wait_for(signals: &[Signal]) -> std::io::Result<Signal> {
use signal::unix::{SignalKind, signal};
use tokio::sync::mpsc;
#[cfg(feature = "tracing")]
tracing::debug!(?signals, "waiting for specified signals");
let (tx, mut rx) = mpsc::channel(1);
let mut listeners = Vec::with_capacity(signals.len());
for &sig in signals {
let kind = match sig {
Signal::Int => SignalKind::interrupt(),
Signal::Term => SignalKind::terminate(),
Signal::Quit => SignalKind::quit(),
Signal::Hup => SignalKind::hangup(),
Signal::User1 => SignalKind::user_defined1(),
Signal::User2 => SignalKind::user_defined2(),
};
let stream = signal(kind)?;
listeners.push((sig, stream));
}
for (sig, mut stream) in listeners {
let tx = tx.clone();
tokio::spawn(async move {
stream.recv().await;
let _ = tx.send(sig).await;
});
}
drop(tx);
let received = rx
.recv()
.await
.expect("signal listeners task failed unexpectedly");
#[cfg(feature = "tracing")]
tracing::info!(?received, "received signal");
Ok(received)
}
#[cfg(feature = "cancel")]
pub async fn shutdown_signal(token: crate::CancellationToken) {
#[cfg(feature = "tracing")]
tracing::debug!("waiting for shutdown signal or cancellation token");
tokio::select! {
_ = wait() => {
#[cfg(feature = "tracing")]
tracing::info!("shutdown triggered by system signal");
},
_ = token.cancelled() => {
#[cfg(feature = "tracing")]
tracing::info!("shutdown triggered by cancellation token");
},
}
}
pub async fn wait_timeout(
duration: std::time::Duration,
) -> Result<(), tokio::time::error::Elapsed> {
#[cfg(feature = "tracing")]
tracing::debug!(?duration, "waiting for shutdown signal with timeout");
let result = tokio::time::timeout(duration, wait()).await;
#[cfg(feature = "tracing")]
if result.is_err() {
tracing::debug!("shutdown wait timed out");
}
result
}