reaction_plugin/shutdown.rs
1//! Helper module that provides structures to ease the quitting process when having multiple tokio tasks.
2//!
3//! It defines a [`ShutdownController`], that permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit.
4//!
5//! You can have it as an attribute of your plugin struct.
6//! ```
7//! struct MyPlugin {
8//! shutdown: ShutdownController
9//! }
10//! ```
11//!
12//! You can then give a [`ShutdownToken`] to other tasks when creating them:
13//!
14//! ```
15//! impl PluginInfo for MyPlugin {
16//! async fn start(&mut self) -> RemoteResult<()> {
17//! let token = self.shutdown.token();
18//!
19//! tokio::spawn(async move {
20//! token.wait().await;
21//! eprintln!("DEBUG shutdown asked to quit, now quitting")
22//! })
23//! }
24//! }
25//! ```
26//!
27//! On closing, calling [`ShutdownController::ask_shutdown`] will inform all tasks waiting on [`ShutdownToken::wait`] that it's time to leave.
28//! Then we can wait for [`ShutdownController::wait_all_task_shutdown`] to complete.
29//!
30//! ```
31//! impl PluginInfo for MyPlugin {
32//! async fn close(self) -> RemoteResult<()> {
33//! self.shutdown.ask_shutdown();
34//! self.shutdown.wait_all_task_shutdown().await;
35//! Ok(())
36//! }
37//! }
38//! ```
39//!
40//! [`ShutdownDelegate::handle_quit_signals`] permits to handle SIGHUP, SIGINT and SIGTERM by gracefully shutting down tasks.
41
42use tokio::signal::unix::{SignalKind, signal};
43use tokio_util::{
44 sync::{CancellationToken, WaitForCancellationFuture},
45 task::task_tracker::{TaskTracker, TaskTrackerToken},
46};
47
48/// Permits to keep track of ongoing tasks, ask them to shutdown and wait for all of them to quit.
49/// Stupid wrapper around [`tokio_util::sync::CancellationToken`] and [`tokio_util::task::task_tracker::TaskTracker`].
50#[derive(Default, Clone)]
51pub struct ShutdownController {
52 shutdown_notifyer: CancellationToken,
53 task_tracker: TaskTracker,
54}
55
56impl ShutdownController {
57 pub fn new() -> Self {
58 Self::default()
59 }
60
61 /// Ask for all tasks to quit
62 pub fn ask_shutdown(&self) {
63 self.shutdown_notifyer.cancel();
64 self.task_tracker.close();
65 }
66
67 /// Wait for all tasks to quit.
68 /// This task may return even without having called [`ShutdownController::ask_shutdown`]
69 /// first, if all tasks quit by themselves.
70 pub async fn wait_all_task_shutdown(self) {
71 self.task_tracker.close();
72 self.task_tracker.wait().await;
73 }
74
75 /// Returns a new shutdown token, to be held by a task.
76 pub fn token(&self) -> ShutdownToken {
77 ShutdownToken::new(self.shutdown_notifyer.clone(), self.task_tracker.token())
78 }
79
80 /// Returns a [`ShutdownDelegate`], which is able to ask for shutdown,
81 /// without counting as a task that needs to be awaited.
82 pub fn delegate(&self) -> ShutdownDelegate {
83 ShutdownDelegate(self.shutdown_notifyer.clone())
84 }
85
86 /// Returns a future that will resolve only when a shutdown request happened.
87 pub fn wait(&self) -> WaitForCancellationFuture<'_> {
88 self.shutdown_notifyer.cancelled()
89 }
90}
91
92/// Permits to ask for shutdown, without counting as a task that needs to be awaited.
93pub struct ShutdownDelegate(CancellationToken);
94
95impl ShutdownDelegate {
96 /// Ask for all tasks to quit
97 pub fn ask_shutdown(&self) {
98 self.0.cancel();
99 }
100
101 /// Ensure [`Self::ask_shutdown`] is called whenever we receive SIGHUP,
102 /// SIGTERM or SIGINT. Spawns a task that consumes self.
103 pub fn handle_quit_signals(self) -> Result<(), String> {
104 let err_str = |err| format!("could not register signal: {err}");
105
106 let mut sighup = signal(SignalKind::hangup()).map_err(err_str)?;
107 let mut sigint = signal(SignalKind::interrupt()).map_err(err_str)?;
108 let mut sigterm = signal(SignalKind::terminate()).map_err(err_str)?;
109
110 tokio::spawn(async move {
111 let signal = tokio::select! {
112 _ = sighup.recv() => "SIGHUP",
113 _ = sigint.recv() => "SIGINT",
114 _ = sigterm.recv() => "SIGTERM",
115 };
116 eprintln!("received {signal}, closing...");
117 self.ask_shutdown();
118 });
119 Ok(())
120 }
121}
122
123/// Created by a [`ShutdownController`].
124/// Serves two purposes:
125///
126/// - Wait for a shutdown request to happen with [`Self::wait`]
127/// - Keep track of the current task. While this token is held,
128/// [`ShutdownController::wait_all_task_shutdown`] will block.
129#[derive(Clone)]
130pub struct ShutdownToken {
131 shutdown_notifyer: CancellationToken,
132 _task_tracker_token: TaskTrackerToken,
133}
134
135impl ShutdownToken {
136 fn new(shutdown_notifyer: CancellationToken, _task_tracker_token: TaskTrackerToken) -> Self {
137 Self {
138 shutdown_notifyer,
139 _task_tracker_token,
140 }
141 }
142
143 /// Returns underlying [`CancellationToken`] and [`TaskTrackerToken`], consuming self.
144 pub fn split(self) -> (CancellationToken, TaskTrackerToken) {
145 (self.shutdown_notifyer, self._task_tracker_token)
146 }
147
148 /// Returns a future that will resolve only when a shutdown request happened.
149 pub fn wait(&self) -> WaitForCancellationFuture<'_> {
150 self.shutdown_notifyer.cancelled()
151 }
152
153 /// Returns true if the shutdown request happened
154 pub fn is_shutdown(&self) -> bool {
155 self.shutdown_notifyer.is_cancelled()
156 }
157
158 /// Ask for all tasks to quit
159 pub fn ask_shutdown(&self) {
160 self.shutdown_notifyer.cancel();
161 }
162}