use tokio::sync::watch;
use super::ShutdownReceiver;
#[derive(Debug)]
pub struct ShutdownWatch {
tx: watch::Sender<bool>,
}
impl ShutdownWatch {
pub fn new() -> Self {
let (tx, _rx) = watch::channel(false);
Self { tx }
}
pub fn subscribe(&self) -> ShutdownReceiver {
ShutdownReceiver::from_watch(self.tx.subscribe())
}
pub fn signal(&self) {
self.tx.send_replace(true);
}
pub fn is_shutdown(&self) -> bool {
*self.tx.borrow()
}
pub fn raw_receiver(&self) -> watch::Receiver<bool> {
self.tx.subscribe()
}
}
impl Default for ShutdownWatch {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn signal_wakes_all_subscribers() {
let watch = ShutdownWatch::new();
let mut r1 = watch.subscribe();
let mut r2 = watch.subscribe();
let mut r3 = watch.subscribe();
watch.signal();
for r in [&mut r1, &mut r2, &mut r3] {
tokio::time::timeout(Duration::from_millis(50), r.wait_cancelled())
.await
.expect("subscriber did not observe signal");
}
}
#[tokio::test]
async fn late_subscriber_sees_already_signaled() {
let watch = ShutdownWatch::new();
watch.signal();
let mut late = watch.subscribe();
tokio::time::timeout(Duration::from_millis(50), late.wait_cancelled())
.await
.expect("late subscriber did not observe prior signal");
}
#[tokio::test]
async fn signal_is_idempotent() {
let watch = ShutdownWatch::new();
watch.signal();
watch.signal();
watch.signal();
let mut r = watch.subscribe();
tokio::time::timeout(Duration::from_millis(50), r.wait_cancelled())
.await
.expect("triple-signaled watch did not fire");
}
#[test]
fn is_shutdown_reflects_state() {
let watch = ShutdownWatch::new();
assert!(!watch.is_shutdown());
watch.signal();
assert!(watch.is_shutdown());
}
}