use crate::task::{CallbackWrapper, CompletionReceiver, TaskCompletion, TimerTask};
use crate::timer::TimerWheel;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
#[tokio::test]
async fn test_periodic_basic() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_periodic(
Duration::from_millis(50), Duration::from_millis(50), Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
None, );
let allocate_handle = timer.allocate_handle();
let (mut rx, _handle) = timer.register(allocate_handle, task).into_parts();
tokio::time::sleep(Duration::from_millis(250)).await;
let count = counter.load(Ordering::SeqCst);
assert!(count >= 3, "Expected at least 3 executions, got {}", count);
match rx {
CompletionReceiver::Periodic(ref mut receiver) => {
let mut notification_count = 0;
while let Ok(completion) = receiver.try_recv() {
assert_eq!(completion, TaskCompletion::Called);
notification_count += 1;
}
assert!(
notification_count >= 3,
"Expected at least 3 notifications, got {}",
notification_count
);
}
_ => panic!("Expected Periodic completion receiver"),
}
}
#[tokio::test]
async fn test_periodic_cancel() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
None,
);
let allocate_handle = timer.allocate_handle();
let handle = timer.register(allocate_handle, task);
tokio::time::sleep(Duration::from_millis(80)).await;
let count_before_cancel = counter.load(Ordering::SeqCst);
assert!(
count_before_cancel >= 1,
"Expected at least 1 execution before cancel"
);
let cancelled = handle.cancel();
assert!(cancelled);
tokio::time::sleep(Duration::from_millis(150)).await;
let count_after_cancel = counter.load(Ordering::SeqCst);
assert!(
count_after_cancel - count_before_cancel <= 1,
"Task should stop after cancel, before: {}, after: {}",
count_before_cancel,
count_after_cancel
);
}
#[tokio::test]
async fn test_periodic_cancel_notification() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
None,
);
let allocate_handle = timer.allocate_handle();
let (mut rx, handle) = timer.register(allocate_handle, task).into_parts();
tokio::time::sleep(Duration::from_millis(80)).await;
let cancelled = handle.cancel();
assert!(cancelled);
tokio::time::sleep(Duration::from_millis(50)).await;
match rx {
CompletionReceiver::Periodic(ref mut receiver) => {
let mut found_cancelled = false;
while let Ok(completion) = receiver.try_recv() {
if completion == TaskCompletion::Cancelled {
found_cancelled = true;
break;
}
}
assert!(
found_cancelled,
"Expected to receive Cancelled notification"
);
}
_ => panic!("Expected Periodic completion receiver"),
}
}
#[tokio::test]
async fn test_periodic_postpone() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
None,
);
let allocate_handle = timer.allocate_handle();
let task_id = allocate_handle.task_id();
let _handle = timer.register(allocate_handle, task);
let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
assert!(postponed);
tokio::time::sleep(Duration::from_millis(80)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger at original time"
);
tokio::time::sleep(Duration::from_millis(150)).await;
let count = counter.load(Ordering::SeqCst);
assert!(
count >= 1,
"Task should trigger after postpone, got count: {}",
count
);
}
#[tokio::test]
async fn test_periodic_postpone_with_callback() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone1 = Arc::clone(&counter);
let counter_clone2 = Arc::clone(&counter);
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone1);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
None,
);
let allocate_handle = timer.allocate_handle();
let task_id = allocate_handle.task_id();
let _handle = timer.register(allocate_handle, task);
let postponed = timer.postpone(
task_id,
Duration::from_millis(100),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone2);
async move {
counter.fetch_add(10, Ordering::SeqCst);
}
})),
);
assert!(postponed);
tokio::time::sleep(Duration::from_millis(200)).await;
let count = counter.load(Ordering::SeqCst);
assert!(
count >= 10,
"New callback should be used, got count: {}",
count
);
assert_eq!(
count % 10,
0,
"Count should be multiple of 10, got: {}",
count
);
}
#[tokio::test]
async fn test_periodic_completion_receiver() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
None,
);
let allocate_handle = timer.allocate_handle();
let (mut rx, _handle) = timer.register(allocate_handle, task).into_parts();
let mut notification_count = 0;
match rx {
CompletionReceiver::Periodic(ref mut receiver) => {
let timeout = tokio::time::sleep(Duration::from_millis(250));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => break,
result = receiver.recv() => {
match result {
Some(completion) => {
assert_eq!(completion, TaskCompletion::Called);
notification_count += 1;
}
None => break,
}
}
}
}
}
_ => panic!("Expected Periodic completion receiver"),
}
assert!(
notification_count >= 3,
"Expected at least 3 notifications, got {}",
notification_count
);
}