1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
//! # Tokio Shutdown
//!
//! Tiny crate that allows to wait for a stop signal across multiple threads. Helpful mostly in
//! server applications that run indefinitely and need a signal for graceful shutdowns.
//!
//! # Examples
//!
//! This example installs the global shutdown handler and will print a message once a single is
//! received. For demonstration purposes it creates other tasks that wait for shutdown as well.
//!
//! ```no_run
//! use tokio_shutdown::Shutdown;
//!
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//! let shutdown = Shutdown::new().expect("shutdown creation works on first call");
//!
//! // Pass a copy of the shutdown handler to another task.
//! // Clones of `Shutdown` are cheap.
//! let clone = shutdown.clone();
//! tokio::spawn(async move {
//! clone.handle().await;
//! println!("task 1 shutting down");
//! });
//!
//! // Alternatively, pass a new handle to the new task.
//! // Both this and the above way work, choose whatever works best for your use case.
//! let handle = shutdown.handle();
//! tokio::spawn(async move {
//! handle.await;
//! println!("task 2 shutting down");
//! });
//!
//! shutdown.handle().await;
//! println!("application shutting down");
//! }
//! ```
//!
//! Please have a look at the examples directory of this project for further usage instructions.
#![forbid(unsafe_code)]
#![deny(rust_2018_idioms, clippy::all, clippy::pedantic)]
use std::{
error::Error,
fmt::{self, Display},
future::Future,
sync::atomic::{AtomicBool, Ordering},
};
use tokio::{signal, sync::watch};
/// Error that occurs when the [`Shutdown::new`] function is called more than once in a process
/// lifetime.
#[derive(Debug, PartialEq, Eq)]
pub struct AlreadyCreatedError;
impl Error for AlreadyCreatedError {}
impl Display for AlreadyCreatedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("shutdown handler already created")
}
}
static CREATED: AtomicBool = AtomicBool::new(false);
/// The global shutdown handler for an application. It can be cloned cheaply wherever needed.
///
/// New handles can be created with the [`handle`](Self::handle) function, which creates futures
/// that will complete once a shutdown signal is received.
#[derive(Clone)]
pub struct Shutdown {
receiver: watch::Receiver<()>,
}
impl Shutdown {
/// Create a new shutdown handle. This can only be called once per application instance.
///
/// Signal handles can only be registered once for the duration of the entire process and
/// creating another shutdown handler would break the previous one without notice.
///
/// # Errors
///
/// If this function is called more than once during the lifetime of a process, an error will be
/// returned.
pub fn new() -> Result<Shutdown, AlreadyCreatedError> {
if (CREATED).swap(true, Ordering::SeqCst) {
return Err(AlreadyCreatedError);
}
let (tx, rx) = watch::channel(());
let handle = register_handlers();
tokio::spawn(async move {
handle.await;
tx.send(()).ok();
});
Ok(Self { receiver: rx })
}
/// Create a new handle that can be awaited on. The future will complete once a shutdown signal
/// is received.
pub fn handle(&self) -> impl Future<Output = ()> {
let mut rx = self.receiver.clone();
async move {
rx.changed().await.ok();
}
}
}
fn register_handlers() -> impl Future<Output = ()> {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
async {
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
#[cfg(feature = "tracing")]
tracing::info!("shutdown signal received");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn fail_create_two_instances() {
assert!(Shutdown::new().is_ok());
assert_eq!(Some(AlreadyCreatedError), Shutdown::new().err());
}
}