use tokio::sync::broadcast;
use tracing::warn;
pub struct Shutdown<T> {
is_shutdown: bool,
notify: broadcast::Receiver<T>,
}
impl<T> Shutdown<T>
where
T: Clone,
{
pub fn new(capacity: usize) -> (Shutdown<T>, broadcast::Sender<T>) {
let (tx, _) = broadcast::channel(capacity);
let shutdown = Shutdown {
is_shutdown: false,
notify: tx.subscribe(),
};
(shutdown, tx)
}
pub fn is_shutdown(&self) -> bool {
self.is_shutdown
}
pub async fn recv(&mut self) {
if self.is_shutdown {
return;
}
let result = self.notify.recv().await;
if result.is_err() {
warn!("Failed to receive shutdown signal");
}
self.is_shutdown = true;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn shutdown_signal_received() {
let (mut shutdown, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown.recv().await;
assert!(shutdown.is_shutdown());
}
#[tokio::test]
async fn shutdown_signal_not_received() {
let (shutdown, _) = Shutdown::<()>::new(1);
assert!(!shutdown.is_shutdown());
}
#[tokio::test]
async fn shutdown_signal_multiple_receivers() {
let (mut shutdown1, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown1.recv().await;
assert!(shutdown1.is_shutdown());
}
#[tokio::test]
async fn shutdown_signal_already_received() {
let (mut shutdown, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown.recv().await;
shutdown.recv().await; assert!(shutdown.is_shutdown());
}
}