use crate::config::{BatchConfig, WheelConfig};
use crate::task::{
CallbackWrapper, CompletionReceiver, TaskCompletion, TimerTask, TimerTaskWithCompletionNotifier,
};
use crate::wheel::Wheel;
use std::time::Duration;
#[test]
fn test_periodic_task_basic() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(50), Duration::from_millis(50), Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
for _ in 0..5 {
wheel.advance();
}
assert!(rx.try_recv().is_ok(), "Should receive first notification");
assert!(!wheel.is_empty(), "Periodic task should be reinserted");
assert!(
wheel.task_index.get(task_id.key()).is_some(),
"Task should still be in index"
);
for _ in 0..5 {
wheel.advance();
}
assert!(rx.try_recv().is_ok(), "Should receive second notification");
assert!(
!wheel.is_empty(),
"Periodic task should still be in the wheel"
);
assert!(
wheel.cancel(task_id),
"Should be able to cancel periodic task"
);
assert!(wheel.is_empty(), "Wheel should be empty after cancellation");
}
#[test]
fn test_periodic_task_cancel() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(100),
Duration::from_millis(100),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
assert!(wheel.cancel(task_id), "Should successfully cancel");
if let Ok(reason) = rx.try_recv() {
assert_eq!(
reason,
TaskCompletion::Cancelled,
"Should receive Cancelled notification"
);
} else {
panic!("Should receive cancellation notification");
}
assert!(wheel.is_empty(), "Wheel should be empty");
for _ in 0..20 {
let expired = wheel.advance();
assert!(
expired.is_empty(),
"No tasks should expire after cancellation"
);
}
}
#[test]
fn test_periodic_task_multiple_triggers() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(30),
Duration::from_millis(30),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
let mut trigger_count = 0;
for _ in 0..100 {
wheel.advance();
while let Ok(_) = rx.try_recv() {
trigger_count += 1;
}
}
assert!(
trigger_count >= 3,
"Should trigger at least 3 times, got {}",
trigger_count
);
wheel.cancel(task_id);
}
#[test]
fn test_periodic_task_cross_layer() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_secs(10), Duration::from_secs(10),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
let location = wheel.task_index.get(task_id.key()).unwrap();
assert_eq!(location.level, 1, "Long interval task should be in L1");
for _ in 0..1001 {
wheel.advance();
}
assert!(rx.try_recv().is_ok(), "Should receive notification");
assert!(
wheel.task_index.get(task_id.key()).is_some(),
"Task should be reinserted"
);
let location = wheel.task_index.get(task_id.key()).unwrap();
assert_eq!(location.level, 1, "Reinserted task should still be in L1");
wheel.cancel(task_id);
}
#[test]
fn test_periodic_task_batch_cancel() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let mut task_ids = Vec::new();
let mut receivers = Vec::new();
for i in 0..5 {
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(100 + i * 10),
Duration::from_millis(100),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
task_ids.push(task_id);
if let CompletionReceiver::Periodic(rx) = completion_receiver {
receivers.push(rx);
}
}
let cancelled_count = wheel.cancel_batch(&task_ids);
assert_eq!(cancelled_count, 5, "Should cancel all 5 tasks");
for mut rx in receivers {
if let Ok(reason) = rx.try_recv() {
assert_eq!(reason, TaskCompletion::Cancelled);
} else {
panic!("Should receive cancellation notification");
}
}
assert!(wheel.is_empty(), "Wheel should be empty");
}
#[test]
fn test_periodic_task_with_initial_delay() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(100), Duration::from_millis(50), Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
for _ in 0..10 {
wheel.advance();
}
assert!(
rx.try_recv().is_ok(),
"Should receive first notification after initial delay"
);
for _ in 0..5 {
wheel.advance();
}
assert!(
rx.try_recv().is_ok(),
"Should receive second notification after interval"
);
wheel.cancel(task_id);
}
#[test]
fn test_periodic_task_postpone() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
assert!(
wheel.postpone(task_id, Duration::from_millis(100), None),
"Should postpone periodic task"
);
for _ in 0..5 {
wheel.advance();
}
assert!(
rx.try_recv().is_err(),
"Should not receive notification before postponed time"
);
for _ in 0..5 {
wheel.advance();
}
assert!(
rx.try_recv().is_ok(),
"Should receive notification at postponed time"
);
for _ in 0..5 {
wheel.advance();
}
assert!(
rx.try_recv().is_ok(),
"Should receive second notification after interval"
);
wheel.cancel(task_id);
}
#[test]
fn test_periodic_task_postpone_cross_layer() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(100),
Duration::from_millis(100),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
assert_eq!(wheel.task_index.get(task_id.key()).unwrap().level, 0);
assert!(wheel.postpone(task_id, Duration::from_secs(10), None));
assert_eq!(wheel.task_index.get(task_id.key()).unwrap().level, 1);
assert!(wheel.postpone(task_id, Duration::from_millis(200), None));
assert_eq!(wheel.task_index.get(task_id.key()).unwrap().level, 0);
for _ in 0..20 {
wheel.advance();
}
assert!(rx.try_recv().is_ok(), "Should receive notification");
wheel.cancel(task_id);
}
#[test]
fn test_periodic_task_batch_insert() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let handles = wheel.allocate_handles(10);
let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
let mut tasks = Vec::new();
for i in 0..10 {
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(100 + i * 10),
Duration::from_millis(50),
Some(callback),
None,
);
let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
tasks.push(task_with_notifier);
}
wheel
.insert_batch(handles, tasks)
.expect("insert_batch should succeed");
assert_eq!(wheel.task_index.len(), 10, "All tasks should be in index");
for _ in 0..200 {
let _expired = wheel.advance();
}
assert_eq!(
wheel.task_index.len(),
10,
"All periodic tasks should still be in wheel"
);
let cancelled = wheel.cancel_batch(&task_ids);
assert_eq!(cancelled, 10, "Should cancel all periodic tasks");
assert!(wheel.is_empty());
}
#[test]
fn test_periodic_task_batch_postpone() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let mut task_ids = Vec::new();
for _ in 0..5 {
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(callback),
None,
);
let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
task_ids.push(task_id);
}
let updates: Vec<_> = task_ids
.iter()
.map(|&id| (id, Duration::from_millis(150)))
.collect();
let postponed_count = wheel.postpone_batch(updates);
assert_eq!(postponed_count, 5, "Should postpone all 5 periodic tasks");
for _ in 0..5 {
let expired = wheel.advance();
assert!(
expired.is_empty(),
"Tasks should not trigger before postponed time"
);
}
let mut total_triggered = 0;
for _ in 0..10 {
let expired = wheel.advance();
total_triggered += expired.len();
}
assert_eq!(
total_triggered, 5,
"All 5 tasks should trigger at postponed time"
);
wheel.cancel_batch(&task_ids);
}
#[test]
fn test_mixed_oneshot_and_periodic_tasks() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let mut oneshot_ids = Vec::new();
for i in 0..5 {
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
oneshot_ids.push(task_id);
}
let mut periodic_ids = Vec::new();
let mut periodic_receivers = Vec::new();
for _ in 0..5 {
let callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(100),
Duration::from_millis(100),
Some(callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
periodic_ids.push(task_id);
if let CompletionReceiver::Periodic(rx) = completion_receiver {
periodic_receivers.push(rx);
}
}
assert_eq!(wheel.task_index.len(), 10, "Should have 10 tasks total");
let mut total_expired = 0;
for _ in 0..15 {
let expired = wheel.advance();
total_expired += expired.len();
}
assert_eq!(
total_expired, 10,
"Should have triggered oneshot and periodic tasks"
);
assert_eq!(
wheel.task_index.len(),
5,
"Only periodic tasks should remain"
);
for id in &oneshot_ids {
assert!(
wheel.task_index.get(id.key()).is_none(),
"Oneshot task should be removed"
);
}
for id in &periodic_ids {
assert!(
wheel.task_index.get(id.key()).is_some(),
"Periodic task should still be present"
);
}
wheel.cancel_batch(&periodic_ids);
assert!(wheel.is_empty());
}
#[test]
fn test_periodic_task_postpone_with_callback() {
let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
let old_callback = CallbackWrapper::new(|| async {});
let task = TimerTask::new_periodic(
Duration::from_millis(50),
Duration::from_millis(50),
Some(old_callback),
None,
);
let (task_with_notifier, completion_receiver) =
TimerTaskWithCompletionNotifier::from_timer_task(task);
let handle = wheel.allocate_handle();
let task_id = handle.task_id();
wheel.insert(handle, task_with_notifier);
let mut rx = match completion_receiver {
CompletionReceiver::Periodic(receiver) => receiver,
_ => panic!("Expected periodic completion receiver"),
};
let new_callback = CallbackWrapper::new(|| async {});
assert!(wheel.postpone(task_id, Duration::from_millis(100), Some(new_callback)));
for _ in 0..10 {
wheel.advance();
}
assert!(
rx.try_recv().is_ok(),
"Should receive notification with new callback"
);
wheel.cancel(task_id);
}