use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct TaskHandle {
token: CancellationToken,
}
impl TaskHandle {
pub fn cancel(&self) {
self.token.cancel();
}
pub fn is_cancelled(&self) -> bool {
self.token.is_cancelled()
}
}
pub fn task_register(period: Duration, callback: Arc<dyn Fn() + Send + Sync>) -> TaskHandle {
let token = CancellationToken::new();
let child = token.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(period);
interval.tick().await;
loop {
tokio::select! {
() = child.cancelled() => break,
_ = interval.tick() => callback(),
}
}
});
TaskHandle { token }
}
pub fn task_schedule_once(delay: Duration, callback: Box<dyn FnOnce() + Send>) -> TaskHandle {
let token = CancellationToken::new();
let child = token.clone();
tokio::spawn(async move {
tokio::select! {
() = child.cancelled() => {}
() = tokio::time::sleep(delay) => {
callback();
}
}
});
TaskHandle { token }
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use super::*;
#[tokio::test(start_paused = true)]
async fn periodic_task_fires_multiple_times() {
let n = Arc::new(AtomicUsize::new(0));
let nn = n.clone();
let handle = task_register(
Duration::from_millis(5),
Arc::new(move || {
nn.fetch_add(1, Ordering::Relaxed);
}),
);
tokio::task::yield_now().await;
for _ in 0..8 {
tokio::time::advance(Duration::from_millis(5)).await;
tokio::task::yield_now().await;
}
handle.cancel();
let after_cancel = n.load(Ordering::Relaxed);
assert!(after_cancel >= 2, "expected >=2 fires, got {after_cancel}");
tokio::time::advance(Duration::from_millis(20)).await;
tokio::task::yield_now().await;
let final_count = n.load(Ordering::Relaxed);
assert!(
final_count <= after_cancel + 1,
"task fired after cancel: before={after_cancel} after={final_count}"
);
}
#[tokio::test]
async fn cancel_before_first_tick_suppresses_callback() {
let n = Arc::new(AtomicUsize::new(0));
let nn = n.clone();
let handle = task_register(
Duration::from_millis(50),
Arc::new(move || {
nn.fetch_add(1, Ordering::Relaxed);
}),
);
handle.cancel();
tokio::time::sleep(Duration::from_millis(80)).await;
assert_eq!(n.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn one_shot_fires_exactly_once() {
let n = Arc::new(AtomicUsize::new(0));
let nn = n.clone();
let _handle = task_schedule_once(
Duration::from_millis(5),
Box::new(move || {
nn.fetch_add(1, Ordering::Relaxed);
}),
);
tokio::time::sleep(Duration::from_millis(40)).await;
assert_eq!(n.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn one_shot_can_be_cancelled() {
let n = Arc::new(AtomicUsize::new(0));
let nn = n.clone();
let handle = task_schedule_once(
Duration::from_millis(50),
Box::new(move || {
nn.fetch_add(1, Ordering::Relaxed);
}),
);
handle.cancel();
tokio::time::sleep(Duration::from_millis(80)).await;
assert_eq!(n.load(Ordering::Relaxed), 0);
assert!(handle.is_cancelled());
}
}