use tokio::sync::watch;
#[derive(Debug)]
pub struct ShutdownReceiver {
rx: watch::Receiver<bool>,
}
impl ShutdownReceiver {
pub(super) fn from_watch(rx: watch::Receiver<bool>) -> Self {
Self { rx }
}
pub async fn wait_cancelled(&mut self) {
if *self.rx.borrow() {
return;
}
while self.rx.changed().await.is_ok() {
if *self.rx.borrow() {
return;
}
}
}
pub fn is_cancelled(&self) -> bool {
*self.rx.borrow()
}
}
#[cfg(test)]
mod tests {
use super::super::ShutdownWatch;
use std::time::Duration;
#[tokio::test]
async fn wait_cancelled_returns_on_signal() {
let watch = ShutdownWatch::new();
let mut rx = watch.subscribe();
let handle = tokio::spawn(async move {
rx.wait_cancelled().await;
});
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(!handle.is_finished());
watch.signal();
tokio::time::timeout(Duration::from_millis(100), handle)
.await
.expect("task did not wake after signal")
.expect("task panicked");
}
#[tokio::test]
async fn wait_cancelled_returns_immediately_if_already_signaled() {
let watch = ShutdownWatch::new();
watch.signal();
let mut rx = watch.subscribe();
tokio::time::timeout(Duration::from_millis(10), rx.wait_cancelled())
.await
.expect("already-signaled receiver did not return immediately");
}
#[tokio::test]
async fn wait_cancelled_survives_select() {
let watch = ShutdownWatch::new();
let mut rx = watch.subscribe();
let result = tokio::select! {
_ = rx.wait_cancelled() => "shutdown",
_ = tokio::time::sleep(Duration::from_millis(20)) => "tick",
};
assert_eq!(result, "tick");
assert!(!rx.is_cancelled());
}
#[tokio::test]
async fn is_cancelled_reflects_state() {
let watch = ShutdownWatch::new();
let rx = watch.subscribe();
assert!(!rx.is_cancelled());
watch.signal();
assert!(rx.is_cancelled());
}
}