shutdown_async/lib.rs
1//! A library for gracefully shutting down asynchronous applications
2//!
3//! This may be useful when you want to allow all in-flight processing
4//! to complete before shutting down in order to maintain a consistent state.
5//!
6//! # Examples
7//!
8//! ```
9//! use shutdown_async::ShutdownController;
10//!
11//! #[tokio::main]
12//! async fn main() {
13//! let shutdown = ShutdownController::new();
14//!
15//! tokio::task::spawn({
16//! let mut monitor = shutdown.subscribe();
17//! async move {
18//! // Wait for something to happen
19//! tokio::select! {
20//! _ = monitor.recv() => { println!("shutdown initiated"); }
21//! _ = tokio::time::sleep(ONE_YEAR) => { println!("one year has passed!"); }
22//! }
23//! }
24//! });
25//!
26//! shutdown.shutdown().await;
27//! }
28//!
29//! static ONE_YEAR: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24 * 365);
30//! ```
31use tokio::sync::{broadcast, mpsc};
32
33/// A [`ShutdownController`] is used to control the shutdown of an application.
34///
35/// This is accomplished by creating a [`ShutdownMonitor`] instance for each task
36/// that should be monitored. When [`ShutdownController::shutdown`] is called,
37/// all [`ShutdownMonitor`] instances will be notified that shutdown has started.
38///
39/// # Examples
40///
41/// ```
42/// use shutdown_async::ShutdownController;
43///
44/// #[tokio::main]
45/// async fn main() {
46/// let shutdown = ShutdownController::new();
47///
48/// tokio::task::spawn({
49/// let mut monitor = shutdown.subscribe();
50/// async move {
51/// // Wait for something to happen
52/// tokio::select! {
53/// _ = monitor.recv() => { println!("shutdown initiated"); }
54/// _ = tokio::time::sleep(ONE_YEAR) => { println!("one year has passed!"); }
55/// }
56/// }
57/// });
58///
59/// shutdown.shutdown().await;
60/// }
61///
62/// static ONE_YEAR: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24 * 365);
63/// ```
64pub struct ShutdownController {
65 /// Used to tell all [`ShutdownMonitor`] instances that shutdown has started.
66 notify_shutdown: broadcast::Sender<()>,
67
68 /// Implicitly used to determine when all [`ShutdownMonitor`] instances have been dropped.
69 task_tracker: mpsc::Sender<()>,
70
71 /// Used to determine when all tasks have finished. Calling `recv()` on this channel
72 /// will return when all of the send halves of the `task_tracker` channel have been dropped.
73 task_waiter: mpsc::Receiver<()>,
74}
75
76impl ShutdownController {
77 /// Create a new [`ShutdownController`].
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// let shutdown = shutdown_async::ShutdownController::new();
83 /// ```
84 pub fn new() -> Self {
85 let (notify_shutdown, _) = broadcast::channel::<()>(1);
86 let (task_tracker, task_waiter) = mpsc::channel::<()>(1);
87
88 Self {
89 notify_shutdown,
90 task_tracker,
91 task_waiter,
92 }
93 }
94
95 /// Create a new [`ShutdownMonitor`] instance that can listen for the shutdown signal.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// let shutdown = shutdown_async::ShutdownController::new();
101 /// let monitor = shutdown.subscribe();
102 pub fn subscribe(&self) -> ShutdownMonitor {
103 ShutdownMonitor::new(self.notify_shutdown.subscribe(), self.task_tracker.clone())
104 }
105
106 /// Begin shutting down and wait for all [`ShutdownMonitor`] instances to be dropped.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// #[tokio::main]
112 /// async fn main() {
113 /// let shutdown = shutdown_async::ShutdownController::new();
114 ///
115 /// // ... do stuff ...
116 ///
117 /// // Tell all tasks to shutdown
118 /// shutdown.shutdown().await;
119 /// }
120 /// ```
121 pub async fn shutdown(mut self) {
122 // Notify all tasks that shutdown has started
123 drop(self.notify_shutdown);
124
125 // Destroy our mpsc::Sender so that the mpsc::Receiver::recv() will return immediately
126 // once all tasks have completed (i.e. dropped their mpsc::Sender)
127 drop(self.task_tracker);
128
129 // Wait for all tasks to finish
130 let _ = self.task_waiter.recv().await;
131 }
132}
133
134impl Default for ShutdownController {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140/// A [`ShutdownMonitor`] listens for the shutdown signal from a [`ShutdownController`] and
141/// tracks that the signal has been received.
142///
143/// Callers may query for whether the shutdown signal has been received or not.
144///
145/// # Examples
146///
147/// ```
148/// use shutdown_async::ShutdownMonitor;
149///
150/// async fn run(monitor: &mut ShutdownMonitor) {
151/// while !monitor.is_shutdown() {
152/// tokio::select! {
153/// _ = monitor.recv() => { println!("shutdown initiated"); }
154/// _ = async { /* do work */ } => { println!("one year has passed!"); }
155/// }
156/// }
157/// }
158/// ```
159pub struct ShutdownMonitor {
160 /// `true` if the shutdown signal has been received
161 shutdown_received: bool,
162
163 /// The receive half of the channel used to listen for shutdown.
164 shutdown_notifier: broadcast::Receiver<()>,
165
166 /// Implicitly used to help [`ShutdownController`] understand when the program
167 /// has completed shutdown.
168 _task_tracker: mpsc::Sender<()>,
169}
170
171impl ShutdownMonitor {
172 fn new(
173 shutdown_notifier: broadcast::Receiver<()>,
174 _task_tracker: mpsc::Sender<()>,
175 ) -> ShutdownMonitor {
176 ShutdownMonitor {
177 shutdown_received: false,
178 shutdown_notifier,
179 _task_tracker,
180 }
181 }
182
183 /// Returns `true` if the shutdown signal has been received, and `false` otherwise.
184 ///
185 /// # Examples
186 ///
187 /// ```
188 /// #[tokio::main]
189 /// async fn main() {
190 /// let shutdown = shutdown_async::ShutdownController::new();
191 /// let mut monitor = shutdown.subscribe();
192 ///
193 /// // Assert that the monitor has not yet received the shutdown signal
194 /// assert!(!monitor.is_shutdown());
195 /// }
196 /// ```
197 pub fn is_shutdown(&self) -> bool {
198 self.shutdown_received
199 }
200
201 /// Receive the shutdown notice, waiting if necessary.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// async fn long_lived_task(mut monitor: shutdown_async::ShutdownMonitor) {
207 /// // Wait for the shutdown signal
208 /// monitor.recv().await;
209 /// }
210 /// ```
211 pub async fn recv(&mut self) {
212 // If the shutdown signal has already been received, then return
213 // immediately.
214 if self.shutdown_received {
215 return;
216 }
217
218 // Cannot receive a "lag error" as only one value is ever sent.
219 let _ = self.shutdown_notifier.recv().await;
220
221 // Remember that the signal has been received.
222 self.shutdown_received = true;
223 }
224}