1use log::{error, info, trace, warn};
2use std::time::Duration;
3use tokio::sync::broadcast;
4use tokio::sync::watch;
5use tokio::time::sleep;
6
7#[derive(Clone)]
9pub struct ShutdownReceiver {
10 shutdown_rx: watch::Receiver<bool>,
11 _shutdown_confirm_tx: broadcast::Sender<bool>,
12}
13
14impl ShutdownReceiver {
15 pub async fn watch(&mut self) {
20 if let Err(e) = self.shutdown_rx.changed().await {
21 error!(target:"rustydht_lib::ShutdownReceiver", "Error watching shutdown_rx. Sender has dropped? Err:{:?}", e);
22 }
23 }
24
25 pub fn spawn_with_shutdown<T>(
28 mut shutdown: ShutdownReceiver,
29 todo: T,
30 task_name: impl std::fmt::Display + Send + 'static + Sync,
31 timeout: Option<Duration>,
32 ) where
33 T: std::future::Future + Send + 'static,
34 T::Output: Send + 'static,
35 {
36 tokio::spawn(async move {
37 trace!(target: "rustydht_lib::ShutdownReceiver", "Task '{}' starting up", task_name);
38 tokio::select! {
39 _ = shutdown.watch() => {}
40 _ = todo => {}
41 _ = async {
42 match timeout {
43 Some(timeout) => {
44 sleep(timeout).await;
45 trace!(target: "rustydht_lib::ShutdownReceiver", "Task '{}' timed out", task_name);
46 }
47 None => {std::future::pending::<bool>().await;}
48 };
49 } => {}
50 }
51 trace!(target: "rustydht_lib::ShutdownReceiver", "Task '{}' terminating", task_name);
52 });
53 }
54}
55
56pub struct ShutdownSender {
58 shutdown_tx: watch::Sender<bool>,
59 shutdown_confirm_rx: broadcast::Receiver<bool>,
60}
61
62impl ShutdownSender {
63 pub async fn shutdown(&mut self) {
67 info!(target: "rustydht_lib::ShutdownSender", "Sending shutdown signal to tasks");
68 if let Err(e) = self.shutdown_tx.send(true) {
69 warn!(target: "rustydht_lib::ShutdownSender","Failed to send shutdown signal - likely all tasks are already stopped. Error: {:?}", e);
70 }
71 if self.shutdown_confirm_rx.recv().await.is_err() {
72 }
74 info!(target: "rustydht_lib::ShutdownSender","All tasks have stopped");
75 }
76}
77
78pub fn create_shutdown() -> (ShutdownSender, ShutdownReceiver) {
85 let (shutdown_tx, shutdown_rx) = watch::channel(false);
87
88 let (shutdown_confirm_tx, shutdown_confirm_rx) = broadcast::channel::<bool>(1);
90
91 (
92 ShutdownSender {
93 shutdown_tx,
94 shutdown_confirm_rx,
95 },
96 ShutdownReceiver {
97 shutdown_rx,
98 _shutdown_confirm_tx: shutdown_confirm_tx,
99 },
100 )
101}