use tokio::sync::watch;
#[derive(Clone)]
pub struct CancellationHandle {
tx: watch::Sender<bool>,
rx: watch::Receiver<bool>,
}
impl CancellationHandle {
pub fn new() -> Self {
let (tx, rx) = watch::channel(false);
Self { tx, rx }
}
pub fn cancel(&self) {
let _ = self.tx.send(true);
}
pub fn is_cancelled(&self) -> bool {
*self.rx.borrow()
}
pub async fn cancelled(&self) {
let mut rx = self.rx.clone();
if *rx.borrow() {
return;
}
loop {
if rx.changed().await.is_err() {
return;
}
if *rx.borrow() {
return;
}
}
}
}
impl Default for CancellationHandle {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_new_handle_not_cancelled() {
let handle = CancellationHandle::new();
assert!(!handle.is_cancelled());
}
#[tokio::test]
async fn test_cancel_sets_flag() {
let handle = CancellationHandle::new();
handle.cancel();
assert!(handle.is_cancelled());
}
#[tokio::test]
async fn test_cancel_idempotent() {
let handle = CancellationHandle::new();
handle.cancel();
handle.cancel();
assert!(handle.is_cancelled());
}
#[tokio::test]
async fn test_clone_shares_state() {
let handle = CancellationHandle::new();
let clone = handle.clone();
handle.cancel();
assert!(clone.is_cancelled());
}
#[tokio::test]
async fn test_cancelled_future_resolves() {
let handle = CancellationHandle::new();
let clone = handle.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
clone.cancel();
});
tokio::time::timeout(std::time::Duration::from_secs(1), handle.cancelled())
.await
.expect("cancelled() should resolve within timeout");
}
#[tokio::test]
async fn test_cancelled_future_immediate_if_already_cancelled() {
let handle = CancellationHandle::new();
handle.cancel();
tokio::time::timeout(std::time::Duration::from_millis(10), handle.cancelled())
.await
.expect("cancelled() should resolve immediately when already cancelled");
}
}