shutdown_async/
lib.rs

1//! A library for gracefully shutting down asynchronous applications
2//!
3//! This may be useful when you want to allow all in-flight processing
4//! to complete before shutting down in order to maintain a consistent state.
5//!
6//! # Examples
7//!
8//! ```
9//! use shutdown_async::ShutdownController;
10//!
11//! #[tokio::main]
12//! async fn main() {
13//!   let shutdown = ShutdownController::new();
14//!   
15//!   tokio::task::spawn({
16//!     let mut monitor = shutdown.subscribe();
17//!     async move {
18//!       // Wait for something to happen
19//!       tokio::select! {
20//!        _ = monitor.recv() => { println!("shutdown initiated"); }
21//!        _ = tokio::time::sleep(ONE_YEAR) => { println!("one year has passed!"); }
22//!       }
23//!     }
24//!   });
25//!
26//!   shutdown.shutdown().await;
27//! }
28//!
29//! static ONE_YEAR: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24 * 365);
30//! ```
31use tokio::sync::{broadcast, mpsc};
32
33/// A [`ShutdownController`] is used to control the shutdown of an application.
34///
35/// This is accomplished by creating a [`ShutdownMonitor`] instance for each task
36/// that should be monitored. When [`ShutdownController::shutdown`] is called,
37/// all [`ShutdownMonitor`] instances will be notified that shutdown has started.
38///
39/// # Examples
40///
41/// ```
42/// use shutdown_async::ShutdownController;
43///
44/// #[tokio::main]
45/// async fn main() {
46///   let shutdown = ShutdownController::new();
47///   
48///   tokio::task::spawn({
49///     let mut monitor = shutdown.subscribe();
50///     async move {
51///       // Wait for something to happen
52///       tokio::select! {
53///        _ = monitor.recv() => { println!("shutdown initiated"); }
54///        _ = tokio::time::sleep(ONE_YEAR) => { println!("one year has passed!"); }
55///       }
56///     }
57///   });
58///
59///   shutdown.shutdown().await;
60/// }
61///
62/// static ONE_YEAR: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24 * 365);
63/// ```
64pub struct ShutdownController {
65    /// Used to tell all [`ShutdownMonitor`] instances that shutdown has started.
66    notify_shutdown: broadcast::Sender<()>,
67
68    /// Implicitly used to determine when all [`ShutdownMonitor`] instances have been dropped.
69    task_tracker: mpsc::Sender<()>,
70
71    /// Used to determine when all tasks have finished. Calling `recv()` on this channel
72    /// will return when all of the send halves of the `task_tracker` channel have been dropped.
73    task_waiter: mpsc::Receiver<()>,
74}
75
76impl ShutdownController {
77    /// Create a new [`ShutdownController`].
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// let shutdown = shutdown_async::ShutdownController::new();
83    /// ```
84    pub fn new() -> Self {
85        let (notify_shutdown, _) = broadcast::channel::<()>(1);
86        let (task_tracker, task_waiter) = mpsc::channel::<()>(1);
87
88        Self {
89            notify_shutdown,
90            task_tracker,
91            task_waiter,
92        }
93    }
94
95    /// Create a new [`ShutdownMonitor`] instance that can listen for the shutdown signal.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// let shutdown = shutdown_async::ShutdownController::new();
101    /// let monitor = shutdown.subscribe();
102    pub fn subscribe(&self) -> ShutdownMonitor {
103        ShutdownMonitor::new(self.notify_shutdown.subscribe(), self.task_tracker.clone())
104    }
105
106    /// Begin shutting down and wait for all [`ShutdownMonitor`] instances to be dropped.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// #[tokio::main]
112    /// async fn main() {
113    ///  let shutdown = shutdown_async::ShutdownController::new();
114    ///
115    ///  // ... do stuff ...
116    ///
117    ///  // Tell all tasks to shutdown
118    ///  shutdown.shutdown().await;
119    /// }
120    /// ```
121    pub async fn shutdown(mut self) {
122        // Notify all tasks that shutdown has started
123        drop(self.notify_shutdown);
124
125        // Destroy our mpsc::Sender so that the mpsc::Receiver::recv() will return immediately
126        // once all tasks have completed (i.e. dropped their mpsc::Sender)
127        drop(self.task_tracker);
128
129        // Wait for all tasks to finish
130        let _ = self.task_waiter.recv().await;
131    }
132}
133
134impl Default for ShutdownController {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140/// A [`ShutdownMonitor`] listens for the shutdown signal from a [`ShutdownController`] and
141/// tracks that the signal has been received.
142///
143/// Callers may query for whether the shutdown signal has been received or not.
144///
145/// # Examples
146///
147/// ```
148/// use shutdown_async::ShutdownMonitor;
149///
150/// async fn run(monitor: &mut ShutdownMonitor) {
151///   while !monitor.is_shutdown() {
152///       tokio::select! {
153///        _ = monitor.recv() => { println!("shutdown initiated"); }
154///        _ = async { /* do work */ } => { println!("one year has passed!"); }
155///       }
156///   }
157/// }
158/// ```
159pub struct ShutdownMonitor {
160    /// `true` if the shutdown signal has been received
161    shutdown_received: bool,
162
163    /// The receive half of the channel used to listen for shutdown.
164    shutdown_notifier: broadcast::Receiver<()>,
165
166    /// Implicitly used to help [`ShutdownController`] understand when the program
167    /// has completed shutdown.
168    _task_tracker: mpsc::Sender<()>,
169}
170
171impl ShutdownMonitor {
172    fn new(
173        shutdown_notifier: broadcast::Receiver<()>,
174        _task_tracker: mpsc::Sender<()>,
175    ) -> ShutdownMonitor {
176        ShutdownMonitor {
177            shutdown_received: false,
178            shutdown_notifier,
179            _task_tracker,
180        }
181    }
182
183    /// Returns `true` if the shutdown signal has been received, and `false` otherwise.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// #[tokio::main]
189    /// async fn main() {
190    ///   let shutdown = shutdown_async::ShutdownController::new();
191    ///   let mut monitor = shutdown.subscribe();
192    ///
193    ///   // Assert that the monitor has not yet received the shutdown signal
194    ///   assert!(!monitor.is_shutdown());
195    /// }
196    /// ```
197    pub fn is_shutdown(&self) -> bool {
198        self.shutdown_received
199    }
200
201    /// Receive the shutdown notice, waiting if necessary.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// async fn long_lived_task(mut monitor: shutdown_async::ShutdownMonitor) {
207    ///    // Wait for the shutdown signal
208    ///    monitor.recv().await;
209    /// }
210    /// ```
211    pub async fn recv(&mut self) {
212        // If the shutdown signal has already been received, then return
213        // immediately.
214        if self.shutdown_received {
215            return;
216        }
217
218        // Cannot receive a "lag error" as only one value is ever sent.
219        let _ = self.shutdown_notifier.recv().await;
220
221        // Remember that the signal has been received.
222        self.shutdown_received = true;
223    }
224}