use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
#[must_use]
pub fn poker(
interval: Duration,
cancel: CancellationToken,
) -> (Arc<Notify>, tokio::task::JoinHandle<()>) {
let notify = Arc::new(Notify::new());
let n = notify.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = cancel.cancelled() => break,
() = tokio::time::sleep(interval) => { n.notify_one(); }
}
}
});
(notify, handle)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
#[tokio::test]
async fn periodic_notification() {
let cancel = CancellationToken::new();
let (notify, handle) = poker(Duration::from_millis(1), cancel.clone());
notify.notified().await;
cancel.cancel();
drop(handle.await);
}
#[tokio::test]
async fn cancellation_stops_poker() {
let cancel = CancellationToken::new();
let (notify, handle) = poker(Duration::from_millis(1), cancel.clone());
notify.notified().await;
cancel.cancel();
drop(handle.await);
let counter = Arc::new(AtomicU32::new(0));
let counter_c = Arc::clone(&counter);
let notify_c = Arc::clone(¬ify);
let watcher_cancel = CancellationToken::new();
let watcher_cancel_c = watcher_cancel.clone();
let watcher = tokio::spawn(async move {
loop {
tokio::select! {
() = watcher_cancel_c.cancelled() => break,
() = notify_c.notified() => {
counter_c.fetch_add(1, Ordering::Relaxed);
}
}
}
});
tokio::task::yield_now().await;
tokio::task::yield_now().await;
watcher_cancel.cancel();
drop(watcher.await);
assert_eq!(
counter.load(Ordering::Relaxed),
0,
"no pokes should arrive after cancel + join"
);
}
#[tokio::test(start_paused = true)]
async fn stored_permit_semantics() {
let cancel = CancellationToken::new();
let (notify, handle) = poker(Duration::from_millis(1), cancel.clone());
tokio::task::yield_now().await;
tokio::task::yield_now().await;
notify.notified().await;
cancel.cancel();
drop(handle.await);
}
}