async_shutdown/
lib.rs

1//! Runtime agnostic one-stop solution for graceful shutdown in asynchronous code.
2//!
3//! This crate addresses two separate but related problems regarding graceful shutdown:
4//! * You have to be able to stop running futures when a shutdown signal is given.
5//! * You have to be able to wait for futures to finish potential clean-up.
6//! * You want to know why the shutdown was triggered (for example to set your process exit code).
7//!
8//! All of these problems are handled by the [`ShutdownManager`] struct.
9//!
10//! # Stopping running futures
11//! You can get a future to wait for the shutdown signal with [`ShutdownManager::wait_shutdown_triggered()`].
12//! In this case you must write your async code to react to the shutdown signal appropriately.
13//!
14//! Alternatively, you can wrap a future to be cancelled (by being dropped) when the shutdown is triggered with [`ShutdownManager::wrap_cancel()`].
15//! This doesn't require the wrapped future to know anything about the shutdown signal,
16//! but it also doesn't allow the future to run custom shutdown code.
17//!
18//! To trigger the shutdown signal, simply call [`ShutdownManager::trigger_shutdown(reason)`][`ShutdownManager::trigger_shutdown()`].
19//! The shutdown reason can be any type, as long as it implements [`Clone`].
20//! If you want to pass a non-[`Clone`] object or an object that is expensive to clone, you can wrap it in an [`Arc`].
21//!
22//! # Waiting for futures to complete.
23//! You may also want to wait for some futures to complete before actually shutting down instead of just dropping them.
24//! This might be important to cleanly shutdown and prevent data loss.
25//! You can do that with [`ShutdownManager::wait_shutdown_complete()`].
26//! That function returns a future that only completes when the shutdown is "completed".
27//!
28//! You must also prevent the shutdown from completing too early by calling [`ShutdownManager::delay_shutdown_token()`] or [`ShutdownManager::wrap_delay_shutdown()`].
29//! The [`ShutdownManager::delay_shutdown_token()`] function gives you a [`DelayShutdownToken`] which prevents the shutdown from completing.
30//! To allow the shutdown to finish, simply drop the token.
31//! Alternatively, [`ShutdownManager::wrap_delay_shutdown()`] wraps an existing future,
32//! and will prevent the shutdown from completing until the future either completes or is dropped.
33//!
34//! Note that you can only delay the shutdown completion if it has not completed already.
35//! If the shutdown is already complete those functions will return an error.
36//!
37//! You can also use a token to wrap a future with [`DelayShutdownToken::wrap_future()`].
38//! If you already have a token, this allows you to wrap a future without having to worry that the shutdown might already be completed.
39//!
40//! # Automatically triggering shutdowns
41//! You can also trigger a shutdown automatically using a [`TriggerShutdownToken`].
42//! Call [`ShutdownManager::trigger_shutdown_token()`] to obtain the token.
43//! When the token is dropped, a shutdown is triggered.
44//!
45//! You can use [`ShutdownManager::wrap_trigger_shutdown()`] or [`TriggerShutdownToken::wrap_future()`] to wrap a future.
46//! When the wrapped future completes (or when it is dropped) it will trigger a shutdown.
47//! This can be used as a convenient way to trigger a shutdown when a vital task stops.
48//!
49//! # Futures versus Tasks
50//! Be careful when using `JoinHandles` as if they're a regular future.
51//! Depending on your async runtime, when you drop a `JoinHandle` this doesn't normally cause the task to stop.
52//! It may simply detach the join handle from the task, meaning that your task is still running.
53//! If you're not careful, this could still cause data loss on shutdown.
54//! As a rule of thumb, you should usually wrap futures *before* you spawn them on a new task.
55//!
56//! # Example
57//!
58//! This example is a tokio-based TCP echo server.
59//! It simply echos everything it receives from a peer back to that same peer,
60//! and it uses this crate for graceful shutdown.
61//!
62//! This example is also available in the repository as under the name [`tcp-echo-server`] if you want to run it locally.
63//!
64//! [`tcp-echo-server`]: https://github.com/de-vri-es/async-shutdown-rs/blob/main/examples/tcp-echo-server.rs
65//!
66//! ```no_run
67//! use async_shutdown::ShutdownManager;
68//! use std::net::SocketAddr;
69//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
70//! use tokio::net::{TcpListener, TcpStream};
71//!
72//! #[tokio::main]
73//! async fn main() {
74//!     // Create a new shutdown object.
75//!     // We will clone it into all tasks that need it.
76//!     let shutdown = ShutdownManager::new();
77//!
78//!     // Spawn a task to wait for CTRL+C and trigger a shutdown.
79//!     tokio::spawn({
80//!         let shutdown = shutdown.clone();
81//!         async move {
82//!             if let Err(e) = tokio::signal::ctrl_c().await {
83//!                 eprintln!("Failed to wait for CTRL+C: {}", e);
84//!                 std::process::exit(1);
85//!             } else {
86//!                 eprintln!("\nReceived interrupt signal. Shutting down server...");
87//!                 shutdown.trigger_shutdown(0).ok();
88//!             }
89//!         }
90//!     });
91//!
92//!     // Run the server and set a non-zero exit code if we had an error.
93//!     let exit_code = match run_server(shutdown.clone(), "[::]:9372").await {
94//!         Ok(()) => {
95//!             shutdown.trigger_shutdown(0).ok();
96//!         },
97//!         Err(e) => {
98//!             eprintln!("Server task finished with an error: {}", e);
99//!             shutdown.trigger_shutdown(1).ok();
100//!         },
101//!     };
102//!
103//!     // Wait for clients to run their cleanup code, then exit.
104//!     // Without this, background tasks could be killed before they can run their cleanup code.
105//!     let exit_code = shutdown.wait_shutdown_complete().await;
106//!
107//!     std::process::exit(exit_code);
108//! }
109//!
110//! async fn run_server(shutdown: ShutdownManager<i32>, bind_address: &str) -> std::io::Result<()> {
111//!     let server = TcpListener::bind(&bind_address).await?;
112//!     eprintln!("Server listening on {}", bind_address);
113//!
114//!     // Simply use `wrap_cancel` for everything, since we do not need clean-up for the listening socket.
115//!     // See `handle_client` for a case where a future is given the time to perform logging after the shutdown was triggered.
116//!     while let Ok(connection) = shutdown.wrap_cancel(server.accept()).await {
117//!         let (stream, address) = connection?;
118//!         tokio::spawn(handle_client(shutdown.clone(), stream, address));
119//!     }
120//!
121//!     Ok(())
122//! }
123//!
124//! async fn handle_client(shutdown: ShutdownManager<i32>, mut stream: TcpStream, address: SocketAddr) {
125//!     eprintln!("Accepted new connection from {}", address);
126//!
127//!     // Make sure the shutdown doesn't complete until the delay token is dropped.
128//!     //
129//!     // Getting the token will fail if the shutdown has already started,
130//!     // in which case we just log a message and return.
131//!     //
132//!     // If you already have a future that should be allowed to complete,
133//!     // you can also use `shutdown.wrap_delay_shutdown(...)`.
134//!     // Here it is easier to use a token though.
135//!     let _delay_token = match shutdown.delay_shutdown_token() {
136//!         Ok(token) => token,
137//!         Err(_) => {
138//!             eprintln!("Shutdown already started, closing connection with {}", address);
139//!             return;
140//!         }
141//!     };
142//!
143//!     // Now run the echo loop, but cancel it when the shutdown is triggered.
144//!     match shutdown.wrap_cancel(echo_loop(&mut stream)).await {
145//!         Ok(Err(e)) => eprintln!("Error in connection {}: {}", address, e),
146//!         Ok(Ok(())) => eprintln!("Connection closed by {}", address),
147//!         Err(_exit_code) => eprintln!("Shutdown triggered, closing connection with {}", address),
148//!     }
149//!
150//!     // The delay token will be dropped here, allowing the shutdown to complete.
151//! }
152//!
153//! async fn echo_loop(stream: &mut TcpStream) -> std::io::Result<()> {
154//!     // Echo everything we receive back to the peer in a loop.
155//!     let mut buffer = vec![0; 512];
156//!     loop {
157//!         let read = stream.read(&mut buffer).await?;
158//!         if read == 0 {
159//!             break;
160//!         }
161//!         stream.write(&buffer[..read]).await?;
162//!     }
163//!
164//!     Ok(())
165//! }
166//! ```
167
168#![warn(missing_docs)]
169
170use std::future::Future;
171use std::sync::{Arc, Mutex};
172
173mod shutdown_complete;
174pub use shutdown_complete::ShutdownComplete;
175
176mod shutdown_signal;
177pub use shutdown_signal::ShutdownSignal;
178
179mod wrap_cancel;
180use waker_list::WakerList;
181pub use wrap_cancel::WrapCancel;
182
183mod wrap_trigger_shutdown;
184pub use wrap_trigger_shutdown::WrapTriggerShutdown;
185
186mod wrap_delay_shutdown;
187pub use wrap_delay_shutdown::WrapDelayShutdown;
188
189mod waker_list;
190
191/// Shutdown manager for asynchronous tasks and futures.
192///
193/// The shutdown manager allows you to:
194/// * Signal futures to shutdown or forcibly cancel them (by dropping them).
195/// * Wait for futures to perform their clean-up after a shutdown was triggered.
196/// * Retrieve the shutdown reason after the shutdown was triggered.
197///
198/// The shutdown manager can be cloned and shared with multiple tasks.
199/// Each clone uses the same internal state.
200#[derive(Clone)]
201pub struct ShutdownManager<T: Clone> {
202	inner: Arc<Mutex<ShutdownManagerInner<T>>>,
203}
204
205impl<T: Clone> ShutdownManager<T> {
206	/// Create a new shutdown manager.
207	#[inline]
208	pub fn new() -> Self {
209		Self {
210			inner: Arc::new(Mutex::new(ShutdownManagerInner::new())),
211		}
212	}
213
214	/// Check if the shutdown has been triggered.
215	#[inline]
216	pub fn is_shutdown_triggered(&self) -> bool {
217		self.inner.lock().unwrap().shutdown_reason.is_some()
218	}
219
220	/// Check if the shutdown has completed.
221	#[inline]
222	pub fn is_shutdown_completed(&self) -> bool {
223		let inner = self.inner.lock().unwrap();
224		inner.shutdown_reason.is_some() && inner.delay_tokens == 0
225	}
226
227	/// Get the shutdown reason, if the shutdown has been triggered.
228	///
229	/// Returns [`None`] if the shutdown has not been triggered yet.
230	#[inline]
231	pub fn shutdown_reason(&self) -> Option<T> {
232		self.inner.lock().unwrap().shutdown_reason.clone()
233	}
234
235	/// Asynchronously wait for the shutdown to be triggered.
236	///
237	/// This returns a future that completes when the shutdown is triggered.
238	/// The future can be cloned and sent to other threads or tasks freely.
239	///
240	/// If the shutdown is already triggered, the returned future immediately resolves.
241	///
242	/// You can also use `ShutdownSignal::wrap_cancel()` of the returned object
243	/// to automatically cancel a future when the shutdown signal is received.
244	/// This is identical to `Self::wrap_cancel()`.
245	#[inline]
246	pub fn wait_shutdown_triggered(&self) -> ShutdownSignal<T> {
247		ShutdownSignal {
248			inner: self.inner.clone(),
249			waker_token: None,
250		}
251	}
252
253	/// Asynchronously wait for the shutdown to complete.
254	///
255	/// This returns a future that completes when the shutdown is complete.
256	/// The future can be cloned and sent to other threads or tasks freely.
257	///
258	/// The shutdown is complete when all [`DelayShutdownToken`] are dropped
259	/// and all [`WrapDelayShutdown`] futures have completed or are dropped.
260	#[inline]
261	pub fn wait_shutdown_complete(&self) -> ShutdownComplete<T> {
262		ShutdownComplete {
263			inner: self.inner.clone(),
264			waker_token: None,
265		}
266	}
267
268	/// Trigger the shutdown.
269	///
270	/// This will cause all [`ShutdownSignal`] and [`WrapCancel`] futures associated with this shutdown manager to be resolved.
271	///
272	/// The shutdown will not be considered complete until all [`DelayShutdownTokens`][DelayShutdownToken] are dropped.
273	///
274	/// If the shutdown was already started, this function returns an error.
275	#[inline]
276	pub fn trigger_shutdown(&self, reason: T) -> Result<(), ShutdownAlreadyStarted<T>> {
277		self.inner.lock().unwrap().shutdown(reason)
278	}
279
280	/// Wrap a future so that it is cancelled (dropped) when the shutdown is triggered.
281	///
282	/// The returned future completes with `Err(shutdown_reason)` if the shutdown is triggered,
283	/// and with `Ok(x)` if the wrapped future completes first.
284	#[inline]
285	pub fn wrap_cancel<F: Future>(&self, future: F) -> WrapCancel<T, F> {
286		self.wait_shutdown_triggered().wrap_cancel(future)
287	}
288
289	/// Wrap a future to cause a shutdown when the future completes or when it is dropped.
290	#[inline]
291	pub fn wrap_trigger_shutdown<F: Future>(&self, shutdown_reason: T, future: F) -> WrapTriggerShutdown<T, F> {
292		self.trigger_shutdown_token(shutdown_reason).wrap_future(future)
293	}
294
295	/// Wrap a future to delay shutdown completion until the wrapped future completes or until it is dropped.
296	///
297	/// The returned future transparently completes with the value of the wrapped future.
298	/// However, the shutdown will not be considered complete until the wrapped future completes or is dropped.
299	///
300	/// If the shutdown has already completed, this function returns an error.
301	#[inline]
302	pub fn wrap_delay_shutdown<F: Future>(&self, future: F) -> Result<WrapDelayShutdown<T, F>, ShutdownAlreadyCompleted<T>> {
303		Ok(self.delay_shutdown_token()?.wrap_future(future))
304	}
305
306	/// Get a token that delays shutdown completion as long as it exists.
307	///
308	/// The manager keeps track of all the tokens it hands out.
309	/// The tokens can be cloned and sent to different threads and tasks.
310	/// All tokens (including the clones) must be dropped before the shutdown is considered to be complete.
311	///
312	/// If the shutdown has already completed, this function returns an error.
313	///
314	/// If you want to delay the shutdown until a future completes,
315	/// consider using [`Self::wrap_delay_shutdown()`] instead.
316	#[inline]
317	pub fn delay_shutdown_token(&self) -> Result<DelayShutdownToken<T>, ShutdownAlreadyCompleted<T>> {
318		let mut inner = self.inner.lock().unwrap();
319		// Shutdown already completed, can't delay completion anymore.
320		if inner.delay_tokens == 0 {
321			if let Some(reason) = &inner.shutdown_reason {
322				return Err(ShutdownAlreadyCompleted::new(reason.clone()));
323			}
324		}
325
326		inner.increase_delay_count();
327		Ok(DelayShutdownToken {
328			inner: self.inner.clone(),
329		})
330	}
331
332	/// Get a token that triggers a shutdown when dropped.
333	///
334	/// When a [`TriggerShutdownToken`] is dropped, the shutdown is triggered automatically.
335	/// This applies to *any* token.
336	/// If you clone a token five times and drop one of them, it will trigger a shutdown/
337	///
338	/// You can also use [`Self::wrap_trigger_shutdown()`] to wrap a future so that a shutdown is triggered
339	/// when the future completes or if it is dropped.
340	#[inline]
341	pub fn trigger_shutdown_token(&self, shutdown_reason: T) -> TriggerShutdownToken<T> {
342		TriggerShutdownToken {
343			shutdown_reason: Arc::new(Mutex::new(Some(shutdown_reason))),
344			inner: self.inner.clone(),
345		}
346	}
347}
348
349impl<T: Clone> Default for ShutdownManager<T> {
350	#[inline]
351	fn default() -> Self {
352		Self::new()
353	}
354}
355
356/// Token that delays shutdown completion as long as it exists.
357///
358/// The token can be cloned and sent to different threads and tasks freely.
359///
360/// All clones must be dropped before the shutdown can complete.
361pub struct DelayShutdownToken<T: Clone> {
362	inner: Arc<Mutex<ShutdownManagerInner<T>>>,
363}
364
365impl<T: Clone> DelayShutdownToken<T> {
366	/// Wrap a future to delay shutdown completion until the wrapped future completes or until it is dropped.
367	///
368	/// This consumes the token to avoid keeping an unused token around by accident, which would delay shutdown indefinitely.
369	/// If you wish to use the token multiple times, you can clone it first:
370	/// ```
371	/// # let shutdown = async_shutdown::ShutdownManager::<()>::new();
372	/// # let delay_shutdown_token = shutdown.delay_shutdown_token().unwrap();
373	/// # let future = async { () };
374	/// let future = delay_shutdown_token.clone().wrap_future(future);
375	/// ```
376	///
377	/// The returned future transparently completes with the value of the wrapped future.
378	/// However, the shutdown will not be considered complete until the future completes or is dropped.
379	#[inline]
380	pub fn wrap_future<F: Future>(self, future: F) -> WrapDelayShutdown<T, F> {
381		WrapDelayShutdown {
382			delay_token: Some(self),
383			future,
384		}
385	}
386}
387
388impl<T: Clone> Clone for DelayShutdownToken<T> {
389	#[inline]
390	fn clone(&self) -> Self {
391		self.inner.lock().unwrap().increase_delay_count();
392		DelayShutdownToken {
393			inner: self.inner.clone(),
394		}
395	}
396}
397
398impl<T: Clone> Drop for DelayShutdownToken<T> {
399	#[inline]
400	fn drop(&mut self) {
401		self.inner.lock().unwrap().decrease_delay_count();
402	}
403}
404
405/// Token that triggers a shutdown when it is dropped.
406///
407/// The token can be cloned and sent to different threads and tasks freely.
408/// If *one* of the cloned tokens is dropped, a shutdown is triggered.
409/// Even if the rest of the clones still exist.
410#[derive(Clone)]
411pub struct TriggerShutdownToken<T: Clone> {
412	shutdown_reason: Arc<Mutex<Option<T>>>,
413	inner: Arc<Mutex<ShutdownManagerInner<T>>>,
414}
415
416impl<T: Clone> TriggerShutdownToken<T> {
417	/// Wrap a future to trigger a shutdown when it completes or is dropped.
418	///
419	/// This consumes the token to avoid accidentally dropping the token
420	/// after wrapping a future and instantly causing a shutdown.
421	///
422	/// If you need to keep the token around, you can clone it first:
423	/// ```
424	/// # let trigger_shutdown_token = async_shutdown::ShutdownManager::new().trigger_shutdown_token(());
425	/// # let future = async { () };
426	/// let future = trigger_shutdown_token.clone().wrap_future(future);
427	/// ```
428	#[inline]
429	pub fn wrap_future<F: Future>(self, future: F) -> WrapTriggerShutdown<T, F> {
430		WrapTriggerShutdown {
431			trigger_shutdown_token: Some(self),
432			future,
433		}
434	}
435
436	/// Drop the token without causing a shutdown.
437	///
438	/// This is equivalent to calling [`std::mem::forget()`] on the token.
439	#[inline]
440	pub fn forget(self) {
441		std::mem::forget(self)
442	}
443}
444
445impl<T: Clone> Drop for TriggerShutdownToken<T> {
446	#[inline]
447	fn drop(&mut self) {
448		let mut inner = self.inner.lock().unwrap();
449		let reason = self.shutdown_reason.lock().unwrap().take();
450		if let Some(reason) = reason {
451			inner.shutdown(reason).ok();
452		}
453	}
454}
455
456struct ShutdownManagerInner<T> {
457	/// The shutdown reason.
458	shutdown_reason: Option<T>,
459
460	/// Number of delay tokens in existence.
461	///
462	/// Must reach 0 before shutdown can complete.
463	delay_tokens: usize,
464
465	/// Tasks to wake when a shutdown is triggered.
466	on_shutdown: WakerList,
467
468	/// Tasks to wake when the shutdown is complete.
469	on_shutdown_complete: WakerList,
470}
471
472impl<T: Clone> ShutdownManagerInner<T> {
473	fn new() -> Self {
474		Self {
475			shutdown_reason: None,
476			delay_tokens: 0,
477			on_shutdown_complete: WakerList::new(),
478			on_shutdown: WakerList::new(),
479		}
480	}
481
482	fn increase_delay_count(&mut self) {
483		self.delay_tokens += 1;
484	}
485
486	fn decrease_delay_count(&mut self) {
487		self.delay_tokens -= 1;
488		if self.delay_tokens == 0 {
489			self.notify_shutdown_complete();
490		}
491	}
492
493	fn shutdown(&mut self, reason: T) -> Result<(), ShutdownAlreadyStarted<T>> {
494		match &self.shutdown_reason {
495			Some(original_reason) => {
496				Err(ShutdownAlreadyStarted::new(original_reason.clone(), reason))
497			},
498			None => {
499				self.shutdown_reason = Some(reason);
500				self.on_shutdown.wake_all();
501				if self.delay_tokens == 0 {
502					self.notify_shutdown_complete()
503				}
504				Ok(())
505			},
506		}
507	}
508
509	fn notify_shutdown_complete(&mut self) {
510		self.on_shutdown_complete.wake_all();
511	}
512}
513
514/// Error returned when you try to trigger the shutdown multiple times on the same [`ShutdownManager`].
515#[derive(Debug, Clone)]
516#[non_exhaustive]
517pub struct ShutdownAlreadyStarted<T> {
518	/// The shutdown reason of the already started shutdown.
519	pub shutdown_reason: T,
520
521	/// The provided reason that was ignored because the shutdown was already started.
522	pub ignored_reason: T,
523}
524
525impl<T> ShutdownAlreadyStarted<T> {
526	pub(crate) const fn new(shutdown_reason: T, ignored_reason:T ) -> Self {
527		Self { shutdown_reason, ignored_reason }
528	}
529}
530
531impl<T: std::fmt::Debug> std::error::Error for ShutdownAlreadyStarted<T> {}
532
533impl<T> std::fmt::Display for ShutdownAlreadyStarted<T> {
534	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
535		write!(f, "shutdown has already started, can not delay shutdown completion")
536	}
537}
538
539/// Error returned when trying to delay a shutdown that has already completed.
540#[derive(Debug)]
541#[non_exhaustive]
542pub struct ShutdownAlreadyCompleted<T> {
543	/// The shutdown reason of the already completed shutdown.
544	pub shutdown_reason: T,
545}
546
547impl<T> ShutdownAlreadyCompleted<T> {
548	pub(crate) const fn new(shutdown_reason: T) -> Self {
549		Self { shutdown_reason }
550	}
551}
552
553impl<T: std::fmt::Debug> std::error::Error for ShutdownAlreadyCompleted<T> {}
554
555impl<T> std::fmt::Display for ShutdownAlreadyCompleted<T> {
556	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
557		write!(f, "shutdown has already completed, can not delay shutdown completion")
558	}
559}