tokio_task_manager/
lib.rs

1//! Crate which provides sync primitives with as main goal
2//! to allow an async application making use of the Tokio runtime
3//! to be able to gracefully shutdown, meaning that ideally the server process exits
4//! only when all active tasks were done with their ongoing work.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use std::time::Duration;
10//! use tokio_task_manager::TaskManager;
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     // An application requires only a single TaskManager,
15//!     // created usually where you also build and start the Tokio runtime.
16//!     let tm = TaskManager::new(Duration::from_millis(200));
17//!
18//!     // In this example we spawn 10 tasks,
19//!     // where each of them also spawns a task themselves,
20//!     // resulting in a total of 20 tasks which we'll want to wait for.
21//!     let (tx, mut rx) = tokio::sync::mpsc::channel(20);
22//!     for i in 0..10 {
23//!         let tx = tx.clone();
24//!         let n = i;
25//!
26//!         // create a task per task that we spawn, such that:
27//!         // - the application can wait until the task is dropped,
28//!         //   identifying the spawned task is finished;
29//!         // - the spawn task knows that the application is gracefully shutting down (.wait);
30//!         let mut task = tm.task();
31//!         tokio::spawn(async move {
32//!             // spawn also child task to test task cloning,
33//!             // a task is typically cloned for tasks within tasks,
34//!             // each cloned task also needs to be dropped prior to
35//!             // the application being able to gracefully shut down.
36//!             let mut child_task = task.clone();
37//!             let child_tx = tx.clone();
38//!             let m = n;
39//!             tokio::spawn(async move {
40//!                 tokio::time::sleep(Duration::from_millis(m * 10)).await;
41//!                 // Using the tokio::select! macro you can allow a task
42//!                 // to either get to its desired work, or quit already
43//!                 // in case the application is planning to shut down.
44//!                 //
45//!                 // A typical use case of this is a server which is waiting
46//!                 // for an incoming request, which is a text-book example
47//!                 // of a task in idle state.
48//!                 tokio::select! {
49//!                     result = child_tx.send((m+1)*10) => assert!(result.is_ok()),
50//!                     _ = child_task.wait() => (),
51//!                 }
52//!             });
53//!             // Do the actual work.
54//!             tokio::time::sleep(Duration::from_millis(n * 10)).await;
55//!             tokio::select! {
56//!                 result = tx.send(n) => assert!(result.is_ok()),
57//!                 _ = task.wait() => (),
58//!             }
59//!         });
60//!     }
61//!
62//!     // we also create a task for something that will never finish,
63//!     // just to show that the tokio::select! approach does work...
64//!     let mut task = tm.task();
65//!     tokio::spawn(async move {
66//!         // spawn also child task to test task cloning
67//!         let mut child_task = task.clone();
68//!         tokio::spawn(async move {
69//!             // should shut down rather than block for too long
70//!             tokio::select! {
71//!                 _ = child_task.wait() => (),
72//!                 _ = tokio::time::sleep(Duration::from_secs(60)) => (),
73//!             }
74//!         });
75//!         // should shut down rather than block for too long
76//!         tokio::select! {
77//!             _ = task.wait() => (),
78//!             _ = tokio::time::sleep(Duration::from_secs(60)) => (),
79//!         }
80//!     });
81//!
82//!     // sleep for 100ms, just to ensure that all child tasks have been spawned as well
83//!     tokio::time::sleep(Duration::from_millis(100)).await;
84//!
85//!     // drop our sender such that rx.recv().await will return None,
86//!     // once our other senders have been dropped and the channel's buffer is empty
87//!     drop(tx);
88//!
89//!     // notify all spawned tasks that we wish to gracefully shut down
90//!     // and wait until they do. The resulting boolean is true if the
91//!     // waiting terminated gracefully (meaning all tasks quit on their own while they were idle).
92//!     assert!(tm.wait().await);
93//!
94//!     // collect all our results,
95//!     // which we can do all at once given the channel
96//!     // was created with a sufficient buffer size.
97//!     let mut results = Vec::with_capacity(20);
98//!     while let Some(n) = rx.recv().await {
99//!         results.push(n);
100//!     }
101//!
102//!     // test to proof we received all expected results
103//!     results.sort_unstable();
104//!     assert_eq!(
105//!         &results,
106//!         &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
107//!     );
108//! }
109//! ```
110//!
111//! In case your application's root tasks are infinite loops you might wish
112//! to gracefully shutdown only once a SIGINT (CTRL+C) signal has been received.
113//! In this case you would instead of `tm.wait().await` do instead:
114//! `tm.shutdown_gracefully_on_ctrl_c().await`.
115
116use std::time::Duration;
117use tokio::signal;
118use tokio::sync::{broadcast, mpsc};
119use tokio::time::timeout;
120use tracing::debug;
121
122/// A task allows a shutdown to happen gracefully by waiting
123/// until the spawned Tokio task is finished and it also allows
124/// the Tokio task to know when a shutdown is requested,
125/// to be listened to on an idle moment (e.g. while waiting for an incoming network request).
126pub struct Task {
127    tx: mpsc::Sender<()>,
128    btx: broadcast::Sender<()>,
129    rx: broadcast::Receiver<()>,
130}
131
132impl Task {
133    /// Wait for a global shutdown signal to be received,
134    /// when it is received it is expected that the Tokio task exits immediately.
135    pub async fn wait(&mut self) {
136        _ = self.rx.recv().await;
137        debug!("task received shutdown signal");
138    }
139}
140
141impl Clone for Task {
142    fn clone(&self) -> Self {
143        Self {
144            tx: self.tx.clone(),
145            btx: self.btx.clone(),
146            rx: self.btx.subscribe(),
147        }
148    }
149}
150
151/// The task manager is used similar to how in Go a WaitGroup is used.
152/// It provides us with the ability to gracefully shutdown the application
153/// giving the chance for each spawned task to gracefully exit during an idle moment.
154///
155/// Normally the graceful shutdown will be induced by a system signal (e.g. SIGINT),
156/// but spawned tasks can also induce it themselves if required for critical reasons.
157pub struct TaskManager {
158    wait_timeout: Duration,
159
160    btx: broadcast::Sender<()>,
161    rtx: broadcast::Receiver<()>,
162    tx: mpsc::Sender<()>,
163    rx: mpsc::Receiver<()>,
164}
165
166impl TaskManager {
167    /// Create a new task manager.
168    /// There should be only one manager per application.
169    pub fn new(wait_timeout: Duration) -> Self {
170        let (btx, rtx) = broadcast::channel(1);
171        let (tx, rx) = mpsc::channel(1);
172
173        Self {
174            wait_timeout,
175
176            btx,
177            rtx,
178            tx,
179            rx,
180        }
181    }
182
183    // Spawn a task, to be used for any Tokio task spawned,
184    // which will ensure that any task spawned gets the chance to gracefully shutdown.
185    pub fn task(&self) -> Task {
186        Task {
187            tx: self.tx.clone(),
188            btx: self.btx.clone(),
189            rx: self.btx.subscribe(),
190        }
191    }
192
193    /// Wait for all tasks to finish,
194    /// or until the defined timeout has been reached.
195    ///
196    /// Returns a boolean indicating if the shutdown was graceful.
197    pub async fn wait(self) -> bool {
198        self.shutdown(false).await
199    }
200
201    /// Block the shutdown process until a CTRL+C signal has been received,
202    /// and shutdown gracefully once received.
203    ///
204    /// In case no tasks are active we will immediately return as well,
205    /// preventing programs from halting in case infinite loop tasks
206    /// exited early due to an error.
207    ///
208    /// Returns a boolean indicating if the shutdown was graceful,
209    /// or in case the process shut down early (no tasks = graceful by definition).
210    pub async fn shutdown_gracefully_on_ctrl_c(self) -> bool {
211        self.shutdown(true).await
212    }
213
214    async fn shutdown(mut self, block_until_signal: bool) -> bool {
215        // drop our own sender, such that we can check if our rx.recv exits with an error,
216        // which would mean that all active tasks have quit
217        drop(self.tx);
218
219        // only if the user wishes to block until a signal is received will we do so,
220        // for some purposes this however not desired and instead a downstream
221        // signal from manager to tasks is desired as a trigger instead
222        if block_until_signal {
223            tokio::select! {
224                _ = self.rx.recv() => {
225                    debug!("task manager has been shut down due to no active tasks");
226                    // exit early, as no signalling and waiting has to be done when no tasks are active
227                    return true;
228                },
229                _ = signal::ctrl_c() => {
230                    debug!("ctrl+c signal received: starting graceful shutdown of task manager");
231                }
232            };
233        }
234
235        // signal that all tasks have to stop once they can
236        if let Err(err) = self.btx.send(()) {
237            debug!(
238                "task manager received error while sending broadcast shutdown signal: {}",
239                err
240            );
241        }
242        // drop our own receiver
243        drop(self.rtx);
244
245        if let Err(err) = timeout(self.wait_timeout, async move { _ = self.rx.recv().await }).await
246        {
247            debug!("task manager received error while waiting: {}", err);
248            return false;
249        }
250
251        true
252    }
253}
254
255#[doc = include_str!("../README.md")]
256#[cfg(doctest)]
257pub struct ReadmeDocTests;
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[tokio::test]
264    async fn task_manager_zero_task_wait() {
265        let tm = TaskManager::new(Duration::from_secs(1));
266        assert!(tm.wait().await);
267    }
268
269    #[tokio::test]
270    async fn task_manager_zero_task_shutdown_gracefully_on_ctrl_c_() {
271        let tm = TaskManager::new(Duration::from_secs(1));
272        assert!(tm.shutdown_gracefully_on_ctrl_c().await);
273    }
274
275    #[tokio::test]
276    async fn task_manager_graceful_shutdown() {
277        let tm = TaskManager::new(Duration::from_millis(200));
278        let (tx, mut rx) = tokio::sync::mpsc::channel(20);
279        for i in 0..10 {
280            let tx = tx.clone();
281            let n = i;
282            let mut task = tm.task();
283            tokio::spawn(async move {
284                // spawn also child task to test task cloning
285                let mut child_task = task.clone();
286                let child_tx = tx.clone();
287                let m = n;
288                tokio::spawn(async move {
289                    tokio::time::sleep(Duration::from_millis(m * 10)).await;
290                    tokio::select! {
291                        result = child_tx.send((m+1)*10) => assert!(result.is_ok()),
292                        _ = child_task.wait() => (),
293                    }
294                });
295                // do the actual work
296                tokio::time::sleep(Duration::from_millis(n * 10)).await;
297                tokio::select! {
298                    result = tx.send(n) => assert!(result.is_ok()),
299                    _ = task.wait() => (),
300                }
301            });
302        }
303        let mut task = tm.task();
304        tokio::spawn(async move {
305            // spawn also child task to test task cloning
306            let mut child_task = task.clone();
307            tokio::spawn(async move {
308                // should shut down rather than block for too long
309                tokio::select! {
310                    _ = child_task.wait() => (),
311                    _ = tokio::time::sleep(Duration::from_secs(60)) => (),
312                }
313            });
314            // should shut down rather than block for too long
315            tokio::select! {
316                _ = task.wait() => (),
317                _ = tokio::time::sleep(Duration::from_secs(60)) => (),
318            }
319        });
320        tokio::time::sleep(Duration::from_millis(100)).await;
321        drop(tx);
322        assert!(tm.wait().await);
323        let mut results = Vec::with_capacity(20);
324        while let Some(n) = rx.recv().await {
325            results.push(n);
326        }
327        results.sort_unstable();
328        assert_eq!(
329            &results,
330            &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
331        );
332    }
333
334    #[tokio::test]
335    async fn task_manager_shutdown_timeout() {
336        let tm = TaskManager::new(Duration::from_millis(10));
337
338        let mut task = tm.task();
339        tokio::spawn(async move {
340            let _ = task.wait();
341            // thread takes too long, should force to shut down
342            tokio::time::sleep(Duration::from_secs(60)).await;
343            panic!("never should reach here");
344        });
345
346        assert!(!tm.wait().await);
347    }
348}