Skip to main content

reaction_plugin/
shutdown.rs

1//! Helper module that provides structures to ease the quitting process when having multiple tokio tasks.
2//!
3//! It defines a [`ShutdownController`], that permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit.
4//!
5//! You can have it as an attribute of your plugin struct.
6//! ```
7//! struct MyPlugin {
8//!   shutdown: ShutdownController
9//! }
10//! ```
11//!
12//! You can then give a [`ShutdownToken`] to other tasks when creating them:
13//!
14//! ```
15//! impl PluginInfo for MyPlugin {
16//!     async fn start(&mut self) -> RemoteResult<()> {
17//!         let token = self.shutdown.token();
18//!
19//!         tokio::spawn(async move {
20//!            token.wait().await;
21//!            eprintln!("DEBUG shutdown asked to quit, now quitting")
22//!         })
23//!     }
24//! }
25//! ```
26//!
27//! On closing, calling [`ShutdownController::ask_shutdown`] will inform all tasks waiting on [`ShutdownToken::wait`] that it's time to leave.
28//! Then we can wait for [`ShutdownController::wait_all_task_shutdown`] to complete.
29//!
30//! ```
31//! impl PluginInfo for MyPlugin {
32//!     async fn close(self) -> RemoteResult<()> {
33//!         self.shutdown.ask_shutdown();
34//!         self.shutdown.wait_all_task_shutdown().await;
35//!         Ok(())
36//!     }
37//! }
38//! ```
39//!
40//! [`ShutdownDelegate::handle_quit_signals`] permits to handle SIGHUP, SIGINT and SIGTERM by gracefully shutting down tasks.
41
42use tokio::signal::unix::{SignalKind, signal};
43use tokio_util::{
44    sync::{CancellationToken, WaitForCancellationFuture},
45    task::task_tracker::{TaskTracker, TaskTrackerToken},
46};
47
48/// Permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit.
49/// Stupid wrapper around [`tokio_util::sync::CancellationToken`] and [`tokio_util::task::task_tracker::TaskTracker`].
50#[derive(Default, Clone)]
51pub struct ShutdownController {
52    shutdown_notifyer: CancellationToken,
53    task_tracker: TaskTracker,
54}
55
56impl ShutdownController {
57    pub fn new() -> Self {
58        Self::default()
59    }
60
61    /// Ask for all tasks to quit
62    pub fn ask_shutdown(&self) {
63        self.shutdown_notifyer.cancel();
64        self.task_tracker.close();
65    }
66
67    /// Wait for all tasks to quit.
68    /// This task may return even without having called [`ShutdownController::ask_shutdown`]
69    /// first, if all tasks quit by themselves.
70    pub async fn wait_all_task_shutdown(self) {
71        self.task_tracker.close();
72        self.task_tracker.wait().await;
73    }
74
75    /// Returns a new shutdown token, to be held by a task.
76    pub fn token(&self) -> ShutdownToken {
77        ShutdownToken::new(self.shutdown_notifyer.clone(), self.task_tracker.token())
78    }
79
80    /// Returns a [`ShutdownDelegate`], which is able to ask for shutdown,
81    /// without counting as a task that needs to be awaited.
82    pub fn delegate(&self) -> ShutdownDelegate {
83        ShutdownDelegate(self.shutdown_notifyer.clone())
84    }
85
86    /// Returns a future that will resolve only when a shutdown request happened.
87    pub fn wait(&self) -> WaitForCancellationFuture<'_> {
88        self.shutdown_notifyer.cancelled()
89    }
90}
91
92/// Permits to ask for shutdown, without counting as a task that needs to be awaited.
93pub struct ShutdownDelegate(CancellationToken);
94
95impl ShutdownDelegate {
96    /// Ask for all tasks to quit
97    pub fn ask_shutdown(&self) {
98        self.0.cancel();
99    }
100
101    /// Ensure [`Self::ask_shutdown`] is called whenever we receive SIGHUP,
102    /// SIGTERM or SIGINT. Spawns a task that consumes self.
103    pub fn handle_quit_signals(self) -> Result<(), String> {
104        let err_str = |err| format!("could not register signal: {err}");
105
106        let mut sighup = signal(SignalKind::hangup()).map_err(err_str)?;
107        let mut sigint = signal(SignalKind::interrupt()).map_err(err_str)?;
108        let mut sigterm = signal(SignalKind::terminate()).map_err(err_str)?;
109
110        tokio::spawn(async move {
111            let signal = tokio::select! {
112                _ = sighup.recv() => "SIGHUP",
113                _ = sigint.recv() => "SIGINT",
114                _ = sigterm.recv() => "SIGTERM",
115            };
116            eprintln!("received {signal}, closing...");
117            self.ask_shutdown();
118        });
119        Ok(())
120    }
121}
122
123/// Created by a [`ShutdownController`].
124/// Serves two purposes:
125///
126/// - Wait for a shutdown request to happen with [`Self::wait`]
127/// - Keep track of the current task. While this token is held,
128///   [`ShutdownController::wait_all_task_shutdown`] will block.
129#[derive(Clone)]
130pub struct ShutdownToken {
131    shutdown_notifyer: CancellationToken,
132    _task_tracker_token: TaskTrackerToken,
133}
134
135impl ShutdownToken {
136    fn new(shutdown_notifyer: CancellationToken, _task_tracker_token: TaskTrackerToken) -> Self {
137        Self {
138            shutdown_notifyer,
139            _task_tracker_token,
140        }
141    }
142
143    /// Returns underlying [`CancellationToken`] and [`TaskTrackerToken`], consuming self.
144    pub fn split(self) -> (CancellationToken, TaskTrackerToken) {
145        (self.shutdown_notifyer, self._task_tracker_token)
146    }
147
148    /// Returns a future that will resolve only when a shutdown request happened.
149    pub fn wait(&self) -> WaitForCancellationFuture<'_> {
150        self.shutdown_notifyer.cancelled()
151    }
152
153    /// Returns true if the shutdown request happened
154    pub fn is_shutdown(&self) -> bool {
155        self.shutdown_notifyer.is_cancelled()
156    }
157
158    /// Ask for all tasks to quit
159    pub fn ask_shutdown(&self) {
160        self.shutdown_notifyer.cancel();
161    }
162}