tokio_task_manager/lib.rs
1//! Crate which provides sync primitives with as main goal
2//! to allow an async application making use of the Tokio runtime
3//! to be able to gracefully shutdown, meaning that ideally the server process exits
4//! only when all active tasks were done with their ongoing work.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use std::time::Duration;
10//! use tokio_task_manager::TaskManager;
11//!
12//! #[tokio::main]
13//! async fn main() {
14//! // An application requires only a single TaskManager,
15//! // created usually where you also build and start the Tokio runtime.
16//! let tm = TaskManager::new(Duration::from_millis(200));
17//!
18//! // In this example we spawn 10 tasks,
19//! // where each of them also spawns a task themselves,
20//! // resulting in a total of 20 tasks which we'll want to wait for.
21//! let (tx, mut rx) = tokio::sync::mpsc::channel(20);
22//! for i in 0..10 {
23//! let tx = tx.clone();
24//! let n = i;
25//!
26//! // create a task per task that we spawn, such that:
27//! // - the application can wait until the task is dropped,
28//! // identifying the spawned task is finished;
29//! // - the spawn task knows that the application is gracefully shutting down (.wait);
30//! let mut task = tm.task();
31//! tokio::spawn(async move {
32//! // spawn also child task to test task cloning,
33//! // a task is typically cloned for tasks within tasks,
34//! // each cloned task also needs to be dropped prior to
35//! // the application being able to gracefully shut down.
36//! let mut child_task = task.clone();
37//! let child_tx = tx.clone();
38//! let m = n;
39//! tokio::spawn(async move {
40//! tokio::time::sleep(Duration::from_millis(m * 10)).await;
41//! // Using the tokio::select! macro you can allow a task
42//! // to either get to its desired work, or quit already
43//! // in case the application is planning to shut down.
44//! //
45//! // A typical use case of this is a server which is waiting
46//! // for an incoming request, which is a text-book example
47//! // of a task in idle state.
48//! tokio::select! {
49//! result = child_tx.send((m+1)*10) => assert!(result.is_ok()),
50//! _ = child_task.wait() => (),
51//! }
52//! });
53//! // Do the actual work.
54//! tokio::time::sleep(Duration::from_millis(n * 10)).await;
55//! tokio::select! {
56//! result = tx.send(n) => assert!(result.is_ok()),
57//! _ = task.wait() => (),
58//! }
59//! });
60//! }
61//!
62//! // we also create a task for something that will never finish,
63//! // just to show that the tokio::select! approach does work...
64//! let mut task = tm.task();
65//! tokio::spawn(async move {
66//! // spawn also child task to test task cloning
67//! let mut child_task = task.clone();
68//! tokio::spawn(async move {
69//! // should shut down rather than block for too long
70//! tokio::select! {
71//! _ = child_task.wait() => (),
72//! _ = tokio::time::sleep(Duration::from_secs(60)) => (),
73//! }
74//! });
75//! // should shut down rather than block for too long
76//! tokio::select! {
77//! _ = task.wait() => (),
78//! _ = tokio::time::sleep(Duration::from_secs(60)) => (),
79//! }
80//! });
81//!
82//! // sleep for 100ms, just to ensure that all child tasks have been spawned as well
83//! tokio::time::sleep(Duration::from_millis(100)).await;
84//!
85//! // drop our sender such that rx.recv().await will return None,
86//! // once our other senders have been dropped and the channel's buffer is empty
87//! drop(tx);
88//!
89//! // notify all spawned tasks that we wish to gracefully shut down
90//! // and wait until they do. The resulting boolean is true if the
91//! // waiting terminated gracefully (meaning all tasks quit on their own while they were idle).
92//! assert!(tm.wait().await);
93//!
94//! // collect all our results,
95//! // which we can do all at once given the channel
96//! // was created with a sufficient buffer size.
97//! let mut results = Vec::with_capacity(20);
98//! while let Some(n) = rx.recv().await {
99//! results.push(n);
100//! }
101//!
102//! // test to proof we received all expected results
103//! results.sort_unstable();
104//! assert_eq!(
105//! &results,
106//! &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
107//! );
108//! }
109//! ```
110//!
111//! In case your application's root tasks are infinite loops you might wish
112//! to gracefully shutdown only once a SIGINT (CTRL+C) signal has been received.
113//! In this case you would instead of `tm.wait().await` do instead:
114//! `tm.shutdown_gracefully_on_ctrl_c().await`.
115
116use std::time::Duration;
117use tokio::signal;
118use tokio::sync::{broadcast, mpsc};
119use tokio::time::timeout;
120use tracing::debug;
121
122/// A task allows a shutdown to happen gracefully by waiting
123/// until the spawned Tokio task is finished and it also allows
124/// the Tokio task to know when a shutdown is requested,
125/// to be listened to on an idle moment (e.g. while waiting for an incoming network request).
126pub struct Task {
127 tx: mpsc::Sender<()>,
128 btx: broadcast::Sender<()>,
129 rx: broadcast::Receiver<()>,
130}
131
132impl Task {
133 /// Wait for a global shutdown signal to be received,
134 /// when it is received it is expected that the Tokio task exits immediately.
135 pub async fn wait(&mut self) {
136 _ = self.rx.recv().await;
137 debug!("task received shutdown signal");
138 }
139}
140
141impl Clone for Task {
142 fn clone(&self) -> Self {
143 Self {
144 tx: self.tx.clone(),
145 btx: self.btx.clone(),
146 rx: self.btx.subscribe(),
147 }
148 }
149}
150
151/// The task manager is used similar to how in Go a WaitGroup is used.
152/// It provides us with the ability to gracefully shutdown the application
153/// giving the chance for each spawned task to gracefully exit during an idle moment.
154///
155/// Normally the graceful shutdown will be induced by a system signal (e.g. SIGINT),
156/// but spawned tasks can also induce it themselves if required for critical reasons.
157pub struct TaskManager {
158 wait_timeout: Duration,
159
160 btx: broadcast::Sender<()>,
161 rtx: broadcast::Receiver<()>,
162 tx: mpsc::Sender<()>,
163 rx: mpsc::Receiver<()>,
164}
165
166impl TaskManager {
167 /// Create a new task manager.
168 /// There should be only one manager per application.
169 pub fn new(wait_timeout: Duration) -> Self {
170 let (btx, rtx) = broadcast::channel(1);
171 let (tx, rx) = mpsc::channel(1);
172
173 Self {
174 wait_timeout,
175
176 btx,
177 rtx,
178 tx,
179 rx,
180 }
181 }
182
183 // Spawn a task, to be used for any Tokio task spawned,
184 // which will ensure that any task spawned gets the chance to gracefully shutdown.
185 pub fn task(&self) -> Task {
186 Task {
187 tx: self.tx.clone(),
188 btx: self.btx.clone(),
189 rx: self.btx.subscribe(),
190 }
191 }
192
193 /// Wait for all tasks to finish,
194 /// or until the defined timeout has been reached.
195 ///
196 /// Returns a boolean indicating if the shutdown was graceful.
197 pub async fn wait(self) -> bool {
198 self.shutdown(false).await
199 }
200
201 /// Block the shutdown process until a CTRL+C signal has been received,
202 /// and shutdown gracefully once received.
203 ///
204 /// In case no tasks are active we will immediately return as well,
205 /// preventing programs from halting in case infinite loop tasks
206 /// exited early due to an error.
207 ///
208 /// Returns a boolean indicating if the shutdown was graceful,
209 /// or in case the process shut down early (no tasks = graceful by definition).
210 pub async fn shutdown_gracefully_on_ctrl_c(self) -> bool {
211 self.shutdown(true).await
212 }
213
214 async fn shutdown(mut self, block_until_signal: bool) -> bool {
215 // drop our own sender, such that we can check if our rx.recv exits with an error,
216 // which would mean that all active tasks have quit
217 drop(self.tx);
218
219 // only if the user wishes to block until a signal is received will we do so,
220 // for some purposes this however not desired and instead a downstream
221 // signal from manager to tasks is desired as a trigger instead
222 if block_until_signal {
223 tokio::select! {
224 _ = self.rx.recv() => {
225 debug!("task manager has been shut down due to no active tasks");
226 // exit early, as no signalling and waiting has to be done when no tasks are active
227 return true;
228 },
229 _ = signal::ctrl_c() => {
230 debug!("ctrl+c signal received: starting graceful shutdown of task manager");
231 }
232 };
233 }
234
235 // signal that all tasks have to stop once they can
236 if let Err(err) = self.btx.send(()) {
237 debug!(
238 "task manager received error while sending broadcast shutdown signal: {}",
239 err
240 );
241 }
242 // drop our own receiver
243 drop(self.rtx);
244
245 if let Err(err) = timeout(self.wait_timeout, async move { _ = self.rx.recv().await }).await
246 {
247 debug!("task manager received error while waiting: {}", err);
248 return false;
249 }
250
251 true
252 }
253}
254
255#[doc = include_str!("../README.md")]
256#[cfg(doctest)]
257pub struct ReadmeDocTests;
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[tokio::test]
264 async fn task_manager_zero_task_wait() {
265 let tm = TaskManager::new(Duration::from_secs(1));
266 assert!(tm.wait().await);
267 }
268
269 #[tokio::test]
270 async fn task_manager_zero_task_shutdown_gracefully_on_ctrl_c_() {
271 let tm = TaskManager::new(Duration::from_secs(1));
272 assert!(tm.shutdown_gracefully_on_ctrl_c().await);
273 }
274
275 #[tokio::test]
276 async fn task_manager_graceful_shutdown() {
277 let tm = TaskManager::new(Duration::from_millis(200));
278 let (tx, mut rx) = tokio::sync::mpsc::channel(20);
279 for i in 0..10 {
280 let tx = tx.clone();
281 let n = i;
282 let mut task = tm.task();
283 tokio::spawn(async move {
284 // spawn also child task to test task cloning
285 let mut child_task = task.clone();
286 let child_tx = tx.clone();
287 let m = n;
288 tokio::spawn(async move {
289 tokio::time::sleep(Duration::from_millis(m * 10)).await;
290 tokio::select! {
291 result = child_tx.send((m+1)*10) => assert!(result.is_ok()),
292 _ = child_task.wait() => (),
293 }
294 });
295 // do the actual work
296 tokio::time::sleep(Duration::from_millis(n * 10)).await;
297 tokio::select! {
298 result = tx.send(n) => assert!(result.is_ok()),
299 _ = task.wait() => (),
300 }
301 });
302 }
303 let mut task = tm.task();
304 tokio::spawn(async move {
305 // spawn also child task to test task cloning
306 let mut child_task = task.clone();
307 tokio::spawn(async move {
308 // should shut down rather than block for too long
309 tokio::select! {
310 _ = child_task.wait() => (),
311 _ = tokio::time::sleep(Duration::from_secs(60)) => (),
312 }
313 });
314 // should shut down rather than block for too long
315 tokio::select! {
316 _ = task.wait() => (),
317 _ = tokio::time::sleep(Duration::from_secs(60)) => (),
318 }
319 });
320 tokio::time::sleep(Duration::from_millis(100)).await;
321 drop(tx);
322 assert!(tm.wait().await);
323 let mut results = Vec::with_capacity(20);
324 while let Some(n) = rx.recv().await {
325 results.push(n);
326 }
327 results.sort_unstable();
328 assert_eq!(
329 &results,
330 &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
331 );
332 }
333
334 #[tokio::test]
335 async fn task_manager_shutdown_timeout() {
336 let tm = TaskManager::new(Duration::from_millis(10));
337
338 let mut task = tm.task();
339 tokio::spawn(async move {
340 let _ = task.wait();
341 // thread takes too long, should force to shut down
342 tokio::time::sleep(Duration::from_secs(60)).await;
343 panic!("never should reach here");
344 });
345
346 assert!(!tm.wait().await);
347 }
348}