pub mod config;
pub mod error;
mod service;
pub mod task;
pub mod timer;
pub mod wheel;
#[cfg(test)]
mod tests;
pub use service::{TaskNotification, TimerService};
pub use task::CompletionReceiver;
pub use task::{CallbackWrapper, TaskCompletion, TaskId, TimerTask};
pub use timer::TimerWheel;
pub use timer::handle::{
BatchHandle, BatchHandleWithCompletion, TimerHandle, TimerHandleWithCompletion,
};
pub use lite_sync::spsc;
#[cfg(test)]
mod integration_tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
#[tokio::test]
async fn test_basic_timer() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
timer.register(handle, task);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multiple_timers() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
for i in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(
Duration::from_millis(10 * (i + 1)),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
timer.register(handle, task);
}
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[tokio::test]
async fn test_timer_cancellation() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let mut handles = Vec::new();
for _ in 0..5 {
let counter_clone = Arc::clone(&counter);
let alloc_handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(
Duration::from_millis(100),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer.register(alloc_handle, task);
handles.push(handle);
}
for i in 0..3 {
let cancel_result = handles[i].cancel();
assert!(cancel_result);
}
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_completion_notification_once() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let alloc_handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer.register(alloc_handle, task);
let (rx, _handle) = handle.into_parts();
match rx {
task::CompletionReceiver::OneShot(receiver) => {
receiver.recv().await.unwrap();
}
_ => panic!("Expected OneShot completion receiver"),
}
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_notify_only_timer_once() {
let timer = TimerWheel::with_defaults();
let alloc_handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
let handle = timer.register(alloc_handle, task);
let (rx, _handle) = handle.into_parts();
match rx {
task::CompletionReceiver::OneShot(receiver) => {
receiver.recv().await.unwrap();
}
_ => panic!("Expected OneShot completion receiver"),
}
}
#[tokio::test]
async fn test_batch_completion_notifications() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let handles = timer.allocate_handles(5);
let tasks: Vec<_> = (0..5)
.map(|i| {
let counter = Arc::clone(&counter);
let delay = Duration::from_millis(50 + i as u64 * 10);
let callback = CallbackWrapper::new(move || {
let counter = Arc::clone(&counter);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
});
TimerTask::new_oneshot(delay, Some(callback))
})
.collect();
let batch = timer
.register_batch(handles, tasks)
.expect("register_batch should succeed");
let (receivers, _batch_handle) = batch.into_parts();
for rx in receivers {
match rx {
task::CompletionReceiver::OneShot(receiver) => {
receiver.recv().await.unwrap();
}
_ => panic!("Expected OneShot completion receiver"),
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
#[tokio::test]
async fn test_completion_reason_expired() {
let timer = TimerWheel::with_defaults();
let alloc_handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
let handle = timer.register(alloc_handle, task);
let (rx, _handle) = handle.into_parts();
let result = match rx {
task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
_ => panic!("Expected OneShot completion receiver"),
};
assert_eq!(result, TaskCompletion::Called);
}
#[tokio::test]
async fn test_completion_reason_cancelled() {
let timer = TimerWheel::with_defaults();
let alloc_handle = timer.allocate_handle();
let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
let handle = timer.register(alloc_handle, task);
let cancelled = handle.cancel();
assert!(cancelled);
let (rx, _handle) = handle.into_parts();
let result = match rx {
task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
_ => panic!("Expected OneShot completion receiver"),
};
assert_eq!(result, TaskCompletion::Cancelled);
}
#[tokio::test]
async fn test_batch_completion_reasons() {
let timer = TimerWheel::with_defaults();
let handles = timer.allocate_handles(5);
let tasks: Vec<_> = (0..5)
.map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
.collect();
let batch = timer
.register_batch(handles, tasks)
.expect("register_batch should succeed");
let task_ids: Vec<_> = batch.task_ids().to_vec();
let (mut receivers, _batch_handle) = batch.into_parts();
timer.cancel_batch(&task_ids[0..3]);
for rx in receivers.drain(0..3) {
let result = match rx {
task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
_ => panic!("Expected OneShot completion receiver"),
};
assert_eq!(result, TaskCompletion::Cancelled);
}
timer.cancel_batch(&task_ids[3..5]);
for rx in receivers {
let result = match rx {
task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
_ => panic!("Expected OneShot completion receiver"),
};
assert_eq!(result, TaskCompletion::Cancelled);
}
}
}