use crate::task::{CallbackWrapper, CompletionReceiver, TimerTask};
use crate::timer::TimerWheel;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
#[tokio::test]
async fn test_postpone_timer() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let allocate_handle = timer.allocate_handle();
let task_id = allocate_handle.task_id();
let handle = timer.register(allocate_handle, task);
let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
assert!(postponed);
tokio::time::sleep(Duration::from_millis(70)).await;
assert_eq!(counter.load(Ordering::SeqCst), 0);
let (rx, _handle) = handle.into_parts();
let result = match rx {
CompletionReceiver::OneShot(receiver) => {
tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await.unwrap()
}
_ => panic!("Expected OneShot completion receiver"),
};
assert!(result.is_ok());
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_postpone_with_callback() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone1 = Arc::clone(&counter);
let counter_clone2 = Arc::clone(&counter);
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone1);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let allocate_handle = timer.allocate_handle();
let task_id = allocate_handle.task_id();
let handle = timer.register(allocate_handle, task);
let postponed = timer.postpone(
task_id,
Duration::from_millis(100),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone2);
async move {
counter.fetch_add(10, Ordering::SeqCst);
}
})),
);
assert!(postponed);
let (rx, _handle) = handle.into_parts();
let result = match rx {
CompletionReceiver::OneShot(receiver) => {
tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await.unwrap()
}
_ => panic!("Expected OneShot completion receiver"),
};
assert!(result.is_ok());
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[tokio::test]
async fn test_postpone_keeps_completion_receiver_valid() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let allocate_handle = timer.allocate_handle();
let task_id = allocate_handle.task_id();
let handle = timer.register(allocate_handle, task);
timer.postpone(task_id, Duration::from_millis(100), None);
let (rx, _handle) = handle.into_parts();
let result = match rx {
CompletionReceiver::OneShot(receiver) => {
tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await.unwrap()
}
_ => panic!("Expected OneShot completion receiver"),
};
assert!(
result.is_ok(),
"Completion receiver should still work after postpone"
);
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}