rustydht_lib/
shutdown.rs

1use log::{error, info, trace, warn};
2use std::time::Duration;
3use tokio::sync::broadcast;
4use tokio::sync::watch;
5use tokio::time::sleep;
6
7/// Contains methods to wait for a "clean shutdown" signal in asynchronous tasks.
8#[derive(Clone)]
9pub struct ShutdownReceiver {
10    shutdown_rx: watch::Receiver<bool>,
11    _shutdown_confirm_tx: broadcast::Sender<bool>,
12}
13
14impl ShutdownReceiver {
15    /// Waits for this ShutdownReceiver's corresponding ShutdownSender to signal
16    /// that it's time to shutdown. Doesn't return until then.
17    ///
18    /// The ShutdownReceiver MUST be dropped as a result of this method returning.
19    pub async fn watch(&mut self) {
20        if let Err(e) = self.shutdown_rx.changed().await {
21            error!(target:"rustydht_lib::ShutdownReceiver", "Error watching shutdown_rx. Sender has dropped? Err:{:?}", e);
22        }
23    }
24
25    /// Spawn a new async task that will automatically be dropped when the provided
26    /// ShutdownReceiver is signaled.
27    pub fn spawn_with_shutdown<T>(
28        mut shutdown: ShutdownReceiver,
29        todo: T,
30        task_name: impl std::fmt::Display + Send + 'static + Sync,
31        timeout: Option<Duration>,
32    ) where
33        T: std::future::Future + Send + 'static,
34        T::Output: Send + 'static,
35    {
36        tokio::spawn(async move {
37            trace!(target: "rustydht_lib::ShutdownReceiver", "Task '{}' starting up", task_name);
38            tokio::select! {
39                _ = shutdown.watch() => {}
40                _ = todo => {}
41                _ = async {
42                    match timeout {
43                        Some(timeout) => {
44                            sleep(timeout).await;
45                            trace!(target: "rustydht_lib::ShutdownReceiver", "Task '{}' timed out", task_name);
46                        }
47                        None => {std::future::pending::<bool>().await;}
48                    };
49                } => {}
50            }
51            trace!(target: "rustydht_lib::ShutdownReceiver", "Task '{}' terminating", task_name);
52        });
53    }
54}
55
56/// Contains methods to send a "clean shutdown" signal to asynchronous tasks.
57pub struct ShutdownSender {
58    shutdown_tx: watch::Sender<bool>,
59    shutdown_confirm_rx: broadcast::Receiver<bool>,
60}
61
62impl ShutdownSender {
63    /// Signals all async tasks waiting on the corresponding [ShutdownReceiver](crate::shutdown::ShutdownReceiver) to stop.
64    ///
65    /// Awaits until they have all shutdown (technically, until all corresponding ShutdownReceivers have been dropped).
66    pub async fn shutdown(&mut self) {
67        info!(target: "rustydht_lib::ShutdownSender", "Sending shutdown signal to tasks");
68        if let Err(e) = self.shutdown_tx.send(true) {
69            warn!(target: "rustydht_lib::ShutdownSender","Failed to send shutdown signal - likely all tasks are already stopped. Error: {:?}", e);
70        }
71        if self.shutdown_confirm_rx.recv().await.is_err() {
72            // This error is expected
73        }
74        info!(target: "rustydht_lib::ShutdownSender","All tasks have stopped");
75    }
76}
77
78/// Create a linked ShutdownSender and ShutdownReceiver pair. The receiver's
79/// [watch](crate::shutdown::ShutdownReceiver::watch) method will not return
80/// until the ShutdownSender's [shutdown](crate::shutdown::ShutdownSender::shutdown)
81/// method is called.
82///
83/// The ShutdownReceiver can (and should) be cloned and reused across many async tasks.
84pub fn create_shutdown() -> (ShutdownSender, ShutdownReceiver) {
85    // We use this channel to send a shutdown notification to everybody
86    let (shutdown_tx, shutdown_rx) = watch::channel(false);
87
88    // We use this channel to determine when all tasks have shutdown
89    let (shutdown_confirm_tx, shutdown_confirm_rx) = broadcast::channel::<bool>(1);
90
91    (
92        ShutdownSender {
93            shutdown_tx,
94            shutdown_confirm_rx,
95        },
96        ShutdownReceiver {
97            shutdown_rx,
98            _shutdown_confirm_tx: shutdown_confirm_tx,
99        },
100    )
101}