rocketmq_rust/
shutdown.rs1use tokio::sync::broadcast;
16use tracing::warn;
17
18pub struct Shutdown<T> {
19 is_shutdown: bool,
21
22 notify: broadcast::Receiver<T>,
24}
25
26impl<T> Shutdown<T>
27where
28 T: Clone,
29{
30 pub fn new(capacity: usize) -> (Shutdown<T>, broadcast::Sender<T>) {
32 let (tx, _) = broadcast::channel(capacity);
33 let shutdown = Shutdown {
34 is_shutdown: false,
35 notify: tx.subscribe(),
36 };
37 (shutdown, tx)
38 }
39
40 pub fn is_shutdown(&self) -> bool {
42 self.is_shutdown
43 }
44
45 pub async fn recv(&mut self) {
47 if self.is_shutdown {
50 return;
51 }
52
53 let result = self.notify.recv().await;
55 if result.is_err() {
56 warn!("Failed to receive shutdown signal");
57 }
58
59 self.is_shutdown = true;
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67
68 #[tokio::test]
69 async fn shutdown_signal_received() {
70 let (mut shutdown, sender) = Shutdown::new(1);
71 sender.send(()).unwrap();
72 shutdown.recv().await;
73 assert!(shutdown.is_shutdown());
74 }
75
76 #[tokio::test]
77 async fn shutdown_signal_not_received() {
78 let (shutdown, _) = Shutdown::<()>::new(1);
79 assert!(!shutdown.is_shutdown());
80 }
81
82 #[tokio::test]
83 async fn shutdown_signal_multiple_receivers() {
84 let (mut shutdown1, sender) = Shutdown::new(1);
85 sender.send(()).unwrap();
86 shutdown1.recv().await;
87
88 assert!(shutdown1.is_shutdown());
89 }
90
91 #[tokio::test]
92 async fn shutdown_signal_already_received() {
93 let (mut shutdown, sender) = Shutdown::new(1);
94 sender.send(()).unwrap();
95 shutdown.recv().await;
96 shutdown.recv().await; assert!(shutdown.is_shutdown());
98 }
99}