async_shutdown/lib.rs
1//! Runtime agnostic one-stop solution for graceful shutdown in asynchronous code.
2//!
3//! This crate addresses two separate but related problems regarding graceful shutdown:
4//! * You have to be able to stop running futures when a shutdown signal is given.
5//! * You have to be able to wait for futures to finish potential clean-up.
6//! * You want to know why the shutdown was triggered (for example to set your process exit code).
7//!
8//! All of these problems are handled by the [`ShutdownManager`] struct.
9//!
10//! # Stopping running futures
11//! You can get a future to wait for the shutdown signal with [`ShutdownManager::wait_shutdown_triggered()`].
12//! In this case you must write your async code to react to the shutdown signal appropriately.
13//!
14//! Alternatively, you can wrap a future to be cancelled (by being dropped) when the shutdown is triggered with [`ShutdownManager::wrap_cancel()`].
15//! This doesn't require the wrapped future to know anything about the shutdown signal,
16//! but it also doesn't allow the future to run custom shutdown code.
17//!
18//! To trigger the shutdown signal, simply call [`ShutdownManager::trigger_shutdown(reason)`][`ShutdownManager::trigger_shutdown()`].
19//! The shutdown reason can be any type, as long as it implements [`Clone`].
20//! If you want to pass a non-[`Clone`] object or an object that is expensive to clone, you can wrap it in an [`Arc`].
21//!
22//! # Waiting for futures to complete.
23//! You may also want to wait for some futures to complete before actually shutting down instead of just dropping them.
24//! This might be important to cleanly shutdown and prevent data loss.
25//! You can do that with [`ShutdownManager::wait_shutdown_complete()`].
26//! That function returns a future that only completes when the shutdown is "completed".
27//!
28//! You must also prevent the shutdown from completing too early by calling [`ShutdownManager::delay_shutdown_token()`] or [`ShutdownManager::wrap_delay_shutdown()`].
29//! The [`ShutdownManager::delay_shutdown_token()`] function gives you a [`DelayShutdownToken`] which prevents the shutdown from completing.
30//! To allow the shutdown to finish, simply drop the token.
31//! Alternatively, [`ShutdownManager::wrap_delay_shutdown()`] wraps an existing future,
32//! and will prevent the shutdown from completing until the future either completes or is dropped.
33//!
34//! Note that you can only delay the shutdown completion if it has not completed already.
35//! If the shutdown is already complete those functions will return an error.
36//!
37//! You can also use a token to wrap a future with [`DelayShutdownToken::wrap_future()`].
38//! If you already have a token, this allows you to wrap a future without having to worry that the shutdown might already be completed.
39//!
40//! # Automatically triggering shutdowns
41//! You can also trigger a shutdown automatically using a [`TriggerShutdownToken`].
42//! Call [`ShutdownManager::trigger_shutdown_token()`] to obtain the token.
43//! When the token is dropped, a shutdown is triggered.
44//!
45//! You can use [`ShutdownManager::wrap_trigger_shutdown()`] or [`TriggerShutdownToken::wrap_future()`] to wrap a future.
46//! When the wrapped future completes (or when it is dropped) it will trigger a shutdown.
47//! This can be used as a convenient way to trigger a shutdown when a vital task stops.
48//!
49//! # Futures versus Tasks
50//! Be careful when using `JoinHandles` as if they're a regular future.
51//! Depending on your async runtime, when you drop a `JoinHandle` this doesn't normally cause the task to stop.
52//! It may simply detach the join handle from the task, meaning that your task is still running.
53//! If you're not careful, this could still cause data loss on shutdown.
54//! As a rule of thumb, you should usually wrap futures *before* you spawn them on a new task.
55//!
56//! # Example
57//!
58//! This example is a tokio-based TCP echo server.
59//! It simply echos everything it receives from a peer back to that same peer,
60//! and it uses this crate for graceful shutdown.
61//!
62//! This example is also available in the repository as under the name [`tcp-echo-server`] if you want to run it locally.
63//!
64//! [`tcp-echo-server`]: https://github.com/de-vri-es/async-shutdown-rs/blob/main/examples/tcp-echo-server.rs
65//!
66//! ```no_run
67//! use async_shutdown::ShutdownManager;
68//! use std::net::SocketAddr;
69//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
70//! use tokio::net::{TcpListener, TcpStream};
71//!
72//! #[tokio::main]
73//! async fn main() {
74//! // Create a new shutdown object.
75//! // We will clone it into all tasks that need it.
76//! let shutdown = ShutdownManager::new();
77//!
78//! // Spawn a task to wait for CTRL+C and trigger a shutdown.
79//! tokio::spawn({
80//! let shutdown = shutdown.clone();
81//! async move {
82//! if let Err(e) = tokio::signal::ctrl_c().await {
83//! eprintln!("Failed to wait for CTRL+C: {}", e);
84//! std::process::exit(1);
85//! } else {
86//! eprintln!("\nReceived interrupt signal. Shutting down server...");
87//! shutdown.trigger_shutdown(0).ok();
88//! }
89//! }
90//! });
91//!
92//! // Run the server and set a non-zero exit code if we had an error.
93//! let exit_code = match run_server(shutdown.clone(), "[::]:9372").await {
94//! Ok(()) => {
95//! shutdown.trigger_shutdown(0).ok();
96//! },
97//! Err(e) => {
98//! eprintln!("Server task finished with an error: {}", e);
99//! shutdown.trigger_shutdown(1).ok();
100//! },
101//! };
102//!
103//! // Wait for clients to run their cleanup code, then exit.
104//! // Without this, background tasks could be killed before they can run their cleanup code.
105//! let exit_code = shutdown.wait_shutdown_complete().await;
106//!
107//! std::process::exit(exit_code);
108//! }
109//!
110//! async fn run_server(shutdown: ShutdownManager<i32>, bind_address: &str) -> std::io::Result<()> {
111//! let server = TcpListener::bind(&bind_address).await?;
112//! eprintln!("Server listening on {}", bind_address);
113//!
114//! // Simply use `wrap_cancel` for everything, since we do not need clean-up for the listening socket.
115//! // See `handle_client` for a case where a future is given the time to perform logging after the shutdown was triggered.
116//! while let Ok(connection) = shutdown.wrap_cancel(server.accept()).await {
117//! let (stream, address) = connection?;
118//! tokio::spawn(handle_client(shutdown.clone(), stream, address));
119//! }
120//!
121//! Ok(())
122//! }
123//!
124//! async fn handle_client(shutdown: ShutdownManager<i32>, mut stream: TcpStream, address: SocketAddr) {
125//! eprintln!("Accepted new connection from {}", address);
126//!
127//! // Make sure the shutdown doesn't complete until the delay token is dropped.
128//! //
129//! // Getting the token will fail if the shutdown has already started,
130//! // in which case we just log a message and return.
131//! //
132//! // If you already have a future that should be allowed to complete,
133//! // you can also use `shutdown.wrap_delay_shutdown(...)`.
134//! // Here it is easier to use a token though.
135//! let _delay_token = match shutdown.delay_shutdown_token() {
136//! Ok(token) => token,
137//! Err(_) => {
138//! eprintln!("Shutdown already started, closing connection with {}", address);
139//! return;
140//! }
141//! };
142//!
143//! // Now run the echo loop, but cancel it when the shutdown is triggered.
144//! match shutdown.wrap_cancel(echo_loop(&mut stream)).await {
145//! Ok(Err(e)) => eprintln!("Error in connection {}: {}", address, e),
146//! Ok(Ok(())) => eprintln!("Connection closed by {}", address),
147//! Err(_exit_code) => eprintln!("Shutdown triggered, closing connection with {}", address),
148//! }
149//!
150//! // The delay token will be dropped here, allowing the shutdown to complete.
151//! }
152//!
153//! async fn echo_loop(stream: &mut TcpStream) -> std::io::Result<()> {
154//! // Echo everything we receive back to the peer in a loop.
155//! let mut buffer = vec![0; 512];
156//! loop {
157//! let read = stream.read(&mut buffer).await?;
158//! if read == 0 {
159//! break;
160//! }
161//! stream.write(&buffer[..read]).await?;
162//! }
163//!
164//! Ok(())
165//! }
166//! ```
167
168#![warn(missing_docs)]
169
170use std::future::Future;
171use std::sync::{Arc, Mutex};
172
173mod shutdown_complete;
174pub use shutdown_complete::ShutdownComplete;
175
176mod shutdown_signal;
177pub use shutdown_signal::ShutdownSignal;
178
179mod wrap_cancel;
180use waker_list::WakerList;
181pub use wrap_cancel::WrapCancel;
182
183mod wrap_trigger_shutdown;
184pub use wrap_trigger_shutdown::WrapTriggerShutdown;
185
186mod wrap_delay_shutdown;
187pub use wrap_delay_shutdown::WrapDelayShutdown;
188
189mod waker_list;
190
191/// Shutdown manager for asynchronous tasks and futures.
192///
193/// The shutdown manager allows you to:
194/// * Signal futures to shutdown or forcibly cancel them (by dropping them).
195/// * Wait for futures to perform their clean-up after a shutdown was triggered.
196/// * Retrieve the shutdown reason after the shutdown was triggered.
197///
198/// The shutdown manager can be cloned and shared with multiple tasks.
199/// Each clone uses the same internal state.
200#[derive(Clone)]
201pub struct ShutdownManager<T: Clone> {
202 inner: Arc<Mutex<ShutdownManagerInner<T>>>,
203}
204
205impl<T: Clone> ShutdownManager<T> {
206 /// Create a new shutdown manager.
207 #[inline]
208 pub fn new() -> Self {
209 Self {
210 inner: Arc::new(Mutex::new(ShutdownManagerInner::new())),
211 }
212 }
213
214 /// Check if the shutdown has been triggered.
215 #[inline]
216 pub fn is_shutdown_triggered(&self) -> bool {
217 self.inner.lock().unwrap().shutdown_reason.is_some()
218 }
219
220 /// Check if the shutdown has completed.
221 #[inline]
222 pub fn is_shutdown_completed(&self) -> bool {
223 let inner = self.inner.lock().unwrap();
224 inner.shutdown_reason.is_some() && inner.delay_tokens == 0
225 }
226
227 /// Get the shutdown reason, if the shutdown has been triggered.
228 ///
229 /// Returns [`None`] if the shutdown has not been triggered yet.
230 #[inline]
231 pub fn shutdown_reason(&self) -> Option<T> {
232 self.inner.lock().unwrap().shutdown_reason.clone()
233 }
234
235 /// Asynchronously wait for the shutdown to be triggered.
236 ///
237 /// This returns a future that completes when the shutdown is triggered.
238 /// The future can be cloned and sent to other threads or tasks freely.
239 ///
240 /// If the shutdown is already triggered, the returned future immediately resolves.
241 ///
242 /// You can also use `ShutdownSignal::wrap_cancel()` of the returned object
243 /// to automatically cancel a future when the shutdown signal is received.
244 /// This is identical to `Self::wrap_cancel()`.
245 #[inline]
246 pub fn wait_shutdown_triggered(&self) -> ShutdownSignal<T> {
247 ShutdownSignal {
248 inner: self.inner.clone(),
249 waker_token: None,
250 }
251 }
252
253 /// Asynchronously wait for the shutdown to complete.
254 ///
255 /// This returns a future that completes when the shutdown is complete.
256 /// The future can be cloned and sent to other threads or tasks freely.
257 ///
258 /// The shutdown is complete when all [`DelayShutdownToken`] are dropped
259 /// and all [`WrapDelayShutdown`] futures have completed or are dropped.
260 #[inline]
261 pub fn wait_shutdown_complete(&self) -> ShutdownComplete<T> {
262 ShutdownComplete {
263 inner: self.inner.clone(),
264 waker_token: None,
265 }
266 }
267
268 /// Trigger the shutdown.
269 ///
270 /// This will cause all [`ShutdownSignal`] and [`WrapCancel`] futures associated with this shutdown manager to be resolved.
271 ///
272 /// The shutdown will not be considered complete until all [`DelayShutdownTokens`][DelayShutdownToken] are dropped.
273 ///
274 /// If the shutdown was already started, this function returns an error.
275 #[inline]
276 pub fn trigger_shutdown(&self, reason: T) -> Result<(), ShutdownAlreadyStarted<T>> {
277 self.inner.lock().unwrap().shutdown(reason)
278 }
279
280 /// Wrap a future so that it is cancelled (dropped) when the shutdown is triggered.
281 ///
282 /// The returned future completes with `Err(shutdown_reason)` if the shutdown is triggered,
283 /// and with `Ok(x)` if the wrapped future completes first.
284 #[inline]
285 pub fn wrap_cancel<F: Future>(&self, future: F) -> WrapCancel<T, F> {
286 self.wait_shutdown_triggered().wrap_cancel(future)
287 }
288
289 /// Wrap a future to cause a shutdown when the future completes or when it is dropped.
290 #[inline]
291 pub fn wrap_trigger_shutdown<F: Future>(&self, shutdown_reason: T, future: F) -> WrapTriggerShutdown<T, F> {
292 self.trigger_shutdown_token(shutdown_reason).wrap_future(future)
293 }
294
295 /// Wrap a future to delay shutdown completion until the wrapped future completes or until it is dropped.
296 ///
297 /// The returned future transparently completes with the value of the wrapped future.
298 /// However, the shutdown will not be considered complete until the wrapped future completes or is dropped.
299 ///
300 /// If the shutdown has already completed, this function returns an error.
301 #[inline]
302 pub fn wrap_delay_shutdown<F: Future>(&self, future: F) -> Result<WrapDelayShutdown<T, F>, ShutdownAlreadyCompleted<T>> {
303 Ok(self.delay_shutdown_token()?.wrap_future(future))
304 }
305
306 /// Get a token that delays shutdown completion as long as it exists.
307 ///
308 /// The manager keeps track of all the tokens it hands out.
309 /// The tokens can be cloned and sent to different threads and tasks.
310 /// All tokens (including the clones) must be dropped before the shutdown is considered to be complete.
311 ///
312 /// If the shutdown has already completed, this function returns an error.
313 ///
314 /// If you want to delay the shutdown until a future completes,
315 /// consider using [`Self::wrap_delay_shutdown()`] instead.
316 #[inline]
317 pub fn delay_shutdown_token(&self) -> Result<DelayShutdownToken<T>, ShutdownAlreadyCompleted<T>> {
318 let mut inner = self.inner.lock().unwrap();
319 // Shutdown already completed, can't delay completion anymore.
320 if inner.delay_tokens == 0 {
321 if let Some(reason) = &inner.shutdown_reason {
322 return Err(ShutdownAlreadyCompleted::new(reason.clone()));
323 }
324 }
325
326 inner.increase_delay_count();
327 Ok(DelayShutdownToken {
328 inner: self.inner.clone(),
329 })
330 }
331
332 /// Get a token that triggers a shutdown when dropped.
333 ///
334 /// When a [`TriggerShutdownToken`] is dropped, the shutdown is triggered automatically.
335 /// This applies to *any* token.
336 /// If you clone a token five times and drop one of them, it will trigger a shutdown/
337 ///
338 /// You can also use [`Self::wrap_trigger_shutdown()`] to wrap a future so that a shutdown is triggered
339 /// when the future completes or if it is dropped.
340 #[inline]
341 pub fn trigger_shutdown_token(&self, shutdown_reason: T) -> TriggerShutdownToken<T> {
342 TriggerShutdownToken {
343 shutdown_reason: Arc::new(Mutex::new(Some(shutdown_reason))),
344 inner: self.inner.clone(),
345 }
346 }
347}
348
349impl<T: Clone> Default for ShutdownManager<T> {
350 #[inline]
351 fn default() -> Self {
352 Self::new()
353 }
354}
355
356/// Token that delays shutdown completion as long as it exists.
357///
358/// The token can be cloned and sent to different threads and tasks freely.
359///
360/// All clones must be dropped before the shutdown can complete.
361pub struct DelayShutdownToken<T: Clone> {
362 inner: Arc<Mutex<ShutdownManagerInner<T>>>,
363}
364
365impl<T: Clone> DelayShutdownToken<T> {
366 /// Wrap a future to delay shutdown completion until the wrapped future completes or until it is dropped.
367 ///
368 /// This consumes the token to avoid keeping an unused token around by accident, which would delay shutdown indefinitely.
369 /// If you wish to use the token multiple times, you can clone it first:
370 /// ```
371 /// # let shutdown = async_shutdown::ShutdownManager::<()>::new();
372 /// # let delay_shutdown_token = shutdown.delay_shutdown_token().unwrap();
373 /// # let future = async { () };
374 /// let future = delay_shutdown_token.clone().wrap_future(future);
375 /// ```
376 ///
377 /// The returned future transparently completes with the value of the wrapped future.
378 /// However, the shutdown will not be considered complete until the future completes or is dropped.
379 #[inline]
380 pub fn wrap_future<F: Future>(self, future: F) -> WrapDelayShutdown<T, F> {
381 WrapDelayShutdown {
382 delay_token: Some(self),
383 future,
384 }
385 }
386}
387
388impl<T: Clone> Clone for DelayShutdownToken<T> {
389 #[inline]
390 fn clone(&self) -> Self {
391 self.inner.lock().unwrap().increase_delay_count();
392 DelayShutdownToken {
393 inner: self.inner.clone(),
394 }
395 }
396}
397
398impl<T: Clone> Drop for DelayShutdownToken<T> {
399 #[inline]
400 fn drop(&mut self) {
401 self.inner.lock().unwrap().decrease_delay_count();
402 }
403}
404
405/// Token that triggers a shutdown when it is dropped.
406///
407/// The token can be cloned and sent to different threads and tasks freely.
408/// If *one* of the cloned tokens is dropped, a shutdown is triggered.
409/// Even if the rest of the clones still exist.
410#[derive(Clone)]
411pub struct TriggerShutdownToken<T: Clone> {
412 shutdown_reason: Arc<Mutex<Option<T>>>,
413 inner: Arc<Mutex<ShutdownManagerInner<T>>>,
414}
415
416impl<T: Clone> TriggerShutdownToken<T> {
417 /// Wrap a future to trigger a shutdown when it completes or is dropped.
418 ///
419 /// This consumes the token to avoid accidentally dropping the token
420 /// after wrapping a future and instantly causing a shutdown.
421 ///
422 /// If you need to keep the token around, you can clone it first:
423 /// ```
424 /// # let trigger_shutdown_token = async_shutdown::ShutdownManager::new().trigger_shutdown_token(());
425 /// # let future = async { () };
426 /// let future = trigger_shutdown_token.clone().wrap_future(future);
427 /// ```
428 #[inline]
429 pub fn wrap_future<F: Future>(self, future: F) -> WrapTriggerShutdown<T, F> {
430 WrapTriggerShutdown {
431 trigger_shutdown_token: Some(self),
432 future,
433 }
434 }
435
436 /// Drop the token without causing a shutdown.
437 ///
438 /// This is equivalent to calling [`std::mem::forget()`] on the token.
439 #[inline]
440 pub fn forget(self) {
441 std::mem::forget(self)
442 }
443}
444
445impl<T: Clone> Drop for TriggerShutdownToken<T> {
446 #[inline]
447 fn drop(&mut self) {
448 let mut inner = self.inner.lock().unwrap();
449 let reason = self.shutdown_reason.lock().unwrap().take();
450 if let Some(reason) = reason {
451 inner.shutdown(reason).ok();
452 }
453 }
454}
455
456struct ShutdownManagerInner<T> {
457 /// The shutdown reason.
458 shutdown_reason: Option<T>,
459
460 /// Number of delay tokens in existence.
461 ///
462 /// Must reach 0 before shutdown can complete.
463 delay_tokens: usize,
464
465 /// Tasks to wake when a shutdown is triggered.
466 on_shutdown: WakerList,
467
468 /// Tasks to wake when the shutdown is complete.
469 on_shutdown_complete: WakerList,
470}
471
472impl<T: Clone> ShutdownManagerInner<T> {
473 fn new() -> Self {
474 Self {
475 shutdown_reason: None,
476 delay_tokens: 0,
477 on_shutdown_complete: WakerList::new(),
478 on_shutdown: WakerList::new(),
479 }
480 }
481
482 fn increase_delay_count(&mut self) {
483 self.delay_tokens += 1;
484 }
485
486 fn decrease_delay_count(&mut self) {
487 self.delay_tokens -= 1;
488 if self.delay_tokens == 0 {
489 self.notify_shutdown_complete();
490 }
491 }
492
493 fn shutdown(&mut self, reason: T) -> Result<(), ShutdownAlreadyStarted<T>> {
494 match &self.shutdown_reason {
495 Some(original_reason) => {
496 Err(ShutdownAlreadyStarted::new(original_reason.clone(), reason))
497 },
498 None => {
499 self.shutdown_reason = Some(reason);
500 self.on_shutdown.wake_all();
501 if self.delay_tokens == 0 {
502 self.notify_shutdown_complete()
503 }
504 Ok(())
505 },
506 }
507 }
508
509 fn notify_shutdown_complete(&mut self) {
510 self.on_shutdown_complete.wake_all();
511 }
512}
513
514/// Error returned when you try to trigger the shutdown multiple times on the same [`ShutdownManager`].
515#[derive(Debug, Clone)]
516#[non_exhaustive]
517pub struct ShutdownAlreadyStarted<T> {
518 /// The shutdown reason of the already started shutdown.
519 pub shutdown_reason: T,
520
521 /// The provided reason that was ignored because the shutdown was already started.
522 pub ignored_reason: T,
523}
524
525impl<T> ShutdownAlreadyStarted<T> {
526 pub(crate) const fn new(shutdown_reason: T, ignored_reason:T ) -> Self {
527 Self { shutdown_reason, ignored_reason }
528 }
529}
530
531impl<T: std::fmt::Debug> std::error::Error for ShutdownAlreadyStarted<T> {}
532
533impl<T> std::fmt::Display for ShutdownAlreadyStarted<T> {
534 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
535 write!(f, "shutdown has already started, can not delay shutdown completion")
536 }
537}
538
539/// Error returned when trying to delay a shutdown that has already completed.
540#[derive(Debug)]
541#[non_exhaustive]
542pub struct ShutdownAlreadyCompleted<T> {
543 /// The shutdown reason of the already completed shutdown.
544 pub shutdown_reason: T,
545}
546
547impl<T> ShutdownAlreadyCompleted<T> {
548 pub(crate) const fn new(shutdown_reason: T) -> Self {
549 Self { shutdown_reason }
550 }
551}
552
553impl<T: std::fmt::Debug> std::error::Error for ShutdownAlreadyCompleted<T> {}
554
555impl<T> std::fmt::Display for ShutdownAlreadyCompleted<T> {
556 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
557 write!(f, "shutdown has already completed, can not delay shutdown completion")
558 }
559}