use futures::future;
use kestrel_timer::config::ServiceConfig;
use kestrel_timer::{CallbackWrapper, CompletionReceiver, TimerTask, TimerWheel};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
#[tokio::test]
async fn test_large_scale_timers() {
let timer = Arc::new(TimerWheel::with_defaults());
let counter = Arc::new(AtomicU32::new(0));
const TIMER_COUNT: u32 = 10_000;
let start = Instant::now();
let mut futures = Vec::new();
for i in 0..TIMER_COUNT {
let timer_clone = Arc::clone(&timer);
let counter_clone = Arc::clone(&counter);
let delay = Duration::from_millis(10 + (i % 100) as u64);
let future = async move {
let task = TimerTask::new_oneshot(
delay,
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer_clone.allocate_handle();
timer_clone.register(handle, task)
};
futures.push(future);
}
future::join_all(futures).await;
println!(
"Creation of {} timers took: {:?}",
TIMER_COUNT,
start.elapsed()
);
tokio::time::sleep(Duration::from_millis(200)).await;
let count = counter.load(Ordering::SeqCst);
println!("Number of triggered timers: {}", count);
assert_eq!(count, TIMER_COUNT, "All timers should have been triggered");
}
#[tokio::test]
async fn test_timer_precision() {
let timer = TimerWheel::with_defaults();
let start_time = Arc::new(parking_lot::Mutex::new(None::<Instant>));
let end_time = Arc::new(parking_lot::Mutex::new(None::<Instant>));
*start_time.lock() = Some(Instant::now());
let end_clone = Arc::clone(&end_time);
let task = TimerTask::new_oneshot(
Duration::from_millis(100),
Some(CallbackWrapper::new(move || {
let end_time = Arc::clone(&end_clone);
async move {
*end_time.lock() = Some(Instant::now());
}
})),
);
let allocated_handle = timer.allocate_handle();
let handle = timer.register(allocated_handle, task);
let (rx, _handle) = handle.into_parts();
match rx {
CompletionReceiver::OneShot(receiver) => {
receiver.recv().await.unwrap();
}
_ => {}
}
tokio::time::sleep(Duration::from_millis(20)).await;
let start = start_time.lock().expect("start_time should be set");
let end = end_time
.lock()
.expect("end_time should be set after timer completion");
let elapsed = end.duration_since(start);
println!("Expected delay: 100ms, actual delay: {:?}", elapsed);
assert!(
elapsed >= Duration::from_millis(80) && elapsed <= Duration::from_millis(180),
"Timer precision is within acceptable range, actual delay: {:?}",
elapsed
);
}
#[tokio::test]
async fn test_concurrent_operations() {
let timer = Arc::new(TimerWheel::with_defaults());
let counter = Arc::new(AtomicU32::new(0));
let mut all_futures = Vec::new();
for _ in 0..5 {
for _ in 0..1000 {
let timer_clone = Arc::clone(&timer);
let counter_clone = Arc::clone(&counter);
let future = async move {
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer_clone.allocate_handle();
timer_clone.register(handle, task)
};
all_futures.push(future);
}
}
future::join_all(all_futures).await;
tokio::time::sleep(Duration::from_millis(150)).await;
let count = counter.load(Ordering::SeqCst);
println!("Number of triggered timers: {}", count);
assert_eq!(count, 5000, "All timers should have been triggered");
}
#[tokio::test]
async fn test_timer_with_different_delays() {
let timer = TimerWheel::with_defaults();
let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
let delays = vec![10, 20, 30, 50, 100, 150, 200];
let mut handles = Vec::new();
for (idx, &delay_ms) in delays.iter().enumerate() {
let results_clone = Arc::clone(&results);
let task = TimerTask::new_oneshot(
Duration::from_millis(delay_ms),
Some(CallbackWrapper::new(move || {
let results = Arc::clone(&results_clone);
async move {
results.lock().push((idx, delay_ms));
}
})),
);
let allocated_handle = timer.allocate_handle();
let handle = timer.register(allocated_handle, task);
handles.push(handle);
}
for handle in handles {
let (rx, _handle) = handle.into_parts();
match rx {
CompletionReceiver::OneShot(receiver) => {
receiver.recv().await.unwrap();
}
_ => {}
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
let final_results = results.lock();
println!("Trigger order: {:?}", final_results);
assert_eq!(
final_results.len(),
delays.len(),
"All timers should have been triggered"
);
}
#[tokio::test]
async fn test_memory_efficiency() {
let timer = Arc::new(TimerWheel::with_defaults());
let mut create_futures = Vec::new();
for _ in 0..5000 {
let timer_clone = Arc::clone(&timer);
let future = async move {
let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
let handle = timer_clone.allocate_handle();
timer_clone.register(handle, task)
};
create_futures.push(future);
}
let handles = future::join_all(create_futures).await;
let cancelled_count = handles
.into_iter()
.map(|handle| handle.cancel())
.filter(|&success| success)
.count();
println!("Number of cancelled timers: {}", cancelled_count);
assert_eq!(cancelled_count, 5000);
}
#[tokio::test]
async fn test_batch_schedule() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
const BATCH_SIZE: usize = 100;
let start = Instant::now();
let handles = timer.allocate_handles(BATCH_SIZE);
let tasks: Vec<_> = (0..BATCH_SIZE)
.map(|i| {
let counter_clone = Arc::clone(&counter);
let delay = Duration::from_millis(50 + (i % 10) as u64);
let callback = Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
TimerTask::new_oneshot(delay, callback)
})
.collect();
let batch = timer.register_batch(handles, tasks).unwrap();
println!(
"Batch scheduling of {} timers took: {:?}",
BATCH_SIZE,
start.elapsed()
);
assert_eq!(batch.len(), BATCH_SIZE);
tokio::time::sleep(Duration::from_millis(150)).await;
let count = counter.load(Ordering::SeqCst);
println!("Number of triggered timers: {}", count);
assert_eq!(
count, BATCH_SIZE as u32,
"All 100 timers should have been triggered"
);
}
#[tokio::test]
async fn test_batch_cancel() {
let timer = TimerWheel::with_defaults();
const TIMER_COUNT: usize = 500;
let handles = timer.allocate_handles(TIMER_COUNT);
let tasks: Vec<_> = (0..TIMER_COUNT)
.map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
.collect();
let batch = timer.register_batch(handles, tasks).unwrap();
assert_eq!(batch.len(), TIMER_COUNT);
let start = Instant::now();
let cancelled = batch.cancel_all();
let elapsed = start.elapsed();
println!("Batch canceling {} timers took: {:?}", TIMER_COUNT, elapsed);
assert_eq!(
cancelled, TIMER_COUNT,
"All 500 timers should have been successfully cancelled"
);
}
#[tokio::test]
async fn test_batch_cancel_partial() {
let timer = TimerWheel::with_defaults();
let allocated_handles = timer.allocate_handles(10);
let tasks: Vec<_> = (0..10)
.map(|_| TimerTask::new_oneshot(Duration::from_millis(100), None))
.collect();
let batch = timer.register_batch(allocated_handles, tasks).unwrap();
let mut handles = batch.into_handles();
let remaining_handles = handles.split_off(5);
let mut cancelled_count = 0;
for handle in handles {
if handle.cancel() {
cancelled_count += 1;
}
}
assert_eq!(cancelled_count, 5);
tokio::time::sleep(Duration::from_millis(200)).await;
let mut cancelled_after = 0;
for handle in remaining_handles {
if handle.cancel() {
cancelled_after += 1;
}
}
assert_eq!(
cancelled_after, 0,
"Triggered timers should not be cancelled"
);
}
#[tokio::test]
async fn test_batch_cancel_no_wait() {
let timer = TimerWheel::with_defaults();
let allocated_handles = timer.allocate_handles(100);
let tasks: Vec<_> = (0..100)
.map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
.collect();
let batch = timer.register_batch(allocated_handles, tasks).unwrap();
let start = Instant::now();
let _ = batch.cancel_all();
let elapsed = start.elapsed();
println!("Batch cancel (without waiting) took: {:?}", elapsed);
assert!(
elapsed < Duration::from_millis(10),
"Operation without waiting should be very fast"
);
}
#[tokio::test]
async fn test_postpone_single_timer() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let allocated_handle = timer.allocate_handle();
let task_id = allocated_handle.task_id();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer.register(allocated_handle, task);
let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
assert!(postponed, "Task should have been successfully postponed");
tokio::time::sleep(Duration::from_millis(70)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger at original time"
);
let (rx, _handle) = handle.into_parts();
let result = match rx {
CompletionReceiver::OneShot(receiver) => {
tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await.unwrap()
}
_ => panic!("Expected OneShot receiver"),
};
assert!(result.is_ok(), "Task should trigger at postponed time");
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Task should have been executed once"
);
}
#[tokio::test]
async fn test_postpone_with_new_callback() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone1 = Arc::clone(&counter); let counter_clone2 = Arc::clone(&counter);
let allocated_handle = timer.allocate_handle();
let task_id = allocated_handle.task_id();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone1);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer.register(allocated_handle, task);
let postponed = timer.postpone(
task_id,
Duration::from_millis(100),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone2);
async move {
counter.fetch_add(10, Ordering::SeqCst);
}
})),
);
assert!(
postponed,
"Task should have been successfully postponed and replaced callback"
);
let (rx, _handle) = handle.into_parts();
let result = match rx {
CompletionReceiver::OneShot(receiver) => {
tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await.unwrap()
}
_ => panic!("Expected OneShot receiver"),
};
assert!(result.is_ok(), "Task should trigger");
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
10,
"New callback should have been executed (add 10 instead of 1)"
);
}
#[tokio::test]
async fn test_batch_postpone() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
const BATCH_SIZE: usize = 100;
let mut task_ids = Vec::new();
for _ in 0..BATCH_SIZE {
let counter_clone = Arc::clone(&counter);
let allocated_handle = timer.allocate_handle();
let task_id = allocated_handle.task_id();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
task_ids.push((task_id, Duration::from_millis(150)));
timer.register(allocated_handle, task);
}
let start = Instant::now();
let postponed = timer.postpone_batch(task_ids);
let elapsed = start.elapsed();
println!("Batch postponing {} timers took: {:?}", BATCH_SIZE, elapsed);
assert_eq!(
postponed, BATCH_SIZE,
"All tasks should have been successfully postponed"
);
tokio::time::sleep(Duration::from_millis(70)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger at original time"
);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
BATCH_SIZE as u32,
"All tasks should have been executed"
);
}
#[tokio::test]
async fn test_postpone_batch_with_callbacks() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
const BATCH_SIZE: usize = 50;
let mut task_ids = Vec::new();
for _ in 0..BATCH_SIZE {
let allocated_handle = timer.allocate_handle();
let task_id = allocated_handle.task_id();
let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
task_ids.push(task_id);
timer.register(allocated_handle, task);
}
let updates: Vec<_> = task_ids
.into_iter()
.map(|id| {
let counter = Arc::clone(&counter);
(
id,
Duration::from_millis(150),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
)
})
.collect();
let start = Instant::now();
let postponed = timer.postpone_batch_with_callbacks(updates);
let elapsed = start.elapsed();
println!(
"Batch postponing and replacing callbacks {} timers took: {:?}",
BATCH_SIZE, elapsed
);
assert_eq!(
postponed, BATCH_SIZE,
"All tasks should have been successfully postponed"
);
tokio::time::sleep(Duration::from_millis(70)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger at original time"
);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
BATCH_SIZE as u32,
"All new callbacks should have been executed"
);
}
#[tokio::test]
async fn test_postpone_multiple_times() {
let timer = TimerWheel::with_defaults();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let allocated_handle = timer.allocate_handle();
let task_id = allocated_handle.task_id();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = timer.register(allocated_handle, task);
assert!(timer.postpone(task_id, Duration::from_millis(100), None));
tokio::time::sleep(Duration::from_millis(60)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger after first postpone"
);
assert!(timer.postpone(task_id, Duration::from_millis(150), None));
tokio::time::sleep(Duration::from_millis(60)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger after second postpone"
);
assert!(timer.postpone(task_id, Duration::from_millis(100), None));
let (rx, _handle) = handle.into_parts();
let result = match rx {
CompletionReceiver::OneShot(receiver) => {
tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await.unwrap()
}
_ => panic!("Expected OneShot receiver"),
};
assert!(result.is_ok(), "Task should finally trigger");
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Task should have been executed only once"
);
}
#[tokio::test]
async fn test_postpone_with_service() {
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let allocated_handle = service.allocate_handle();
let task_id = allocated_handle.task_id();
let task = TimerTask::new_oneshot(
Duration::from_millis(50),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
service.register(allocated_handle, task).unwrap();
let postponed = service.postpone(task_id, Duration::from_millis(150), None);
assert!(postponed, "Task should have been successfully postponed");
tokio::time::sleep(Duration::from_millis(70)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Task should not trigger at original time"
);
let rx = service.take_receiver().unwrap();
let result = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await;
assert!(result.is_ok(), "Should receive timeout notification");
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"Task should have been executed"
);
}
#[tokio::test]
async fn test_single_wheel_multiple_services() {
let timer = Arc::new(TimerWheel::with_defaults());
let counter = Arc::new(AtomicU32::new(0));
const SERVICE_COUNT: usize = 10;
const TASKS_PER_SERVICE: usize = 100;
let start = Instant::now();
let mut services = Vec::new();
for _ in 0..SERVICE_COUNT {
let service = timer.create_service(ServiceConfig::default());
services.push(service);
}
let mut handles = Vec::new();
for mut service in services {
let counter_clone = Arc::clone(&counter);
let handle = tokio::spawn(async move {
for i in 0..TASKS_PER_SERVICE {
let counter_inner = Arc::clone(&counter_clone);
let task = TimerTask::new_oneshot(
Duration::from_millis(50 + (i % 20) as u64),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_inner);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = service.allocate_handle();
if let Ok(_) = service.register(handle, task) {
}
}
service.take_receiver()
});
handles.push(handle);
}
let mut all_receivers = Vec::new();
for handle in handles {
if let Ok(receiver) = handle.await {
if let Some(rx) = receiver {
all_receivers.push(rx);
}
}
}
println!(
"From {} services concurrently creating {} timers took: {:?}",
SERVICE_COUNT,
SERVICE_COUNT * TASKS_PER_SERVICE,
start.elapsed()
);
tokio::time::sleep(Duration::from_millis(200)).await;
let count = counter.load(Ordering::SeqCst);
println!(
"Triggered timers: {} / {}",
count,
SERVICE_COUNT * TASKS_PER_SERVICE
);
assert_eq!(
count,
(SERVICE_COUNT * TASKS_PER_SERVICE) as u32,
"All timers from different services should have been correctly triggered"
);
}
#[tokio::test]
async fn test_multiple_services_concurrent_operations() {
let timer = Arc::new(TimerWheel::with_defaults());
let counter = Arc::new(AtomicU32::new(0));
const SERVICE_COUNT: usize = 5;
let start = Instant::now();
let mut handles = Vec::new();
for service_idx in 0..SERVICE_COUNT {
let timer_clone = Arc::clone(&timer);
let counter_clone = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let service = timer_clone.create_service(ServiceConfig::default());
let mut task_ids = Vec::new();
for _ in 0..100 {
let counter_inner = Arc::clone(&counter_clone);
let task = TimerTask::new_oneshot(
Duration::from_millis(100),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter_inner);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = service.allocate_handle();
let task_id = handle.task_id();
task_ids.push(task_id);
if let Ok(_) = service.register(handle, task) {
}
}
match service_idx % 3 {
0 => {
for task_id in task_ids.iter().step_by(2) {
service.postpone(*task_id, Duration::from_millis(150), None);
}
}
1 => {
for task_id in task_ids.iter().step_by(3) {
timer_clone.cancel(*task_id);
}
}
_ => {
}
}
(service_idx, task_ids.len())
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
println!(
"Multiple services concurrent operations took: {:?}",
start.elapsed()
);
tokio::time::sleep(Duration::from_millis(250)).await;
let count = counter.load(Ordering::SeqCst);
println!("Final triggered timer count: {}", count);
assert!(
count >= 400 && count <= 450,
"Considering cancellation operations, the number of triggered timers should be in a reasonable range, actual: {}",
count
);
}
#[tokio::test]
async fn test_service_isolation() {
let timer = Arc::new(TimerWheel::with_defaults());
let counter1 = Arc::new(AtomicU32::new(0));
let counter2 = Arc::new(AtomicU32::new(0));
let service1 = timer.create_service(ServiceConfig::default());
let service2 = timer.create_service(ServiceConfig::default());
let mut task_ids_1 = Vec::new();
for _ in 0..100 {
let counter = Arc::clone(&counter1);
let task = TimerTask::new_oneshot(
Duration::from_millis(80),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = service1.allocate_handle();
task_ids_1.push(handle.task_id());
service1.register(handle, task).unwrap();
}
for _ in 0..100 {
let counter = Arc::clone(&counter2);
let task = TimerTask::new_oneshot(
Duration::from_millis(80),
Some(CallbackWrapper::new(move || {
let counter = Arc::clone(&counter);
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})),
);
let handle = service2.allocate_handle();
service2.register(handle, task).unwrap();
}
for task_id in task_ids_1 {
timer.cancel(task_id);
}
tokio::time::sleep(Duration::from_millis(150)).await;
let count1 = counter1.load(Ordering::SeqCst);
let count2 = counter2.load(Ordering::SeqCst);
println!("Service 1 triggered count: {}", count1);
println!("Service 2 triggered count: {}", count2);
assert_eq!(count1, 0, "Service 1 all tasks should have been cancelled");
assert_eq!(count2, 100, "Service 2 tasks should not be affected");
}