use std::time::Duration;
use timer_lib::{RecurringSchedule, Timer, TimerEvent, TimerFinishReason, TimerRegistry};
use tokio::task::yield_now;
use tokio::time::{advance, Instant};
async fn settle() {
for _ in 0..5 {
yield_now().await;
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn timer_closure_api_is_simple_to_use() {
let timer = Timer::new();
let run_id = timer
.start_once(Duration::from_secs(1), || async { Ok(()) })
.await
.unwrap();
advance(Duration::from_secs(1)).await;
settle().await;
let outcome = timer.join().await.unwrap();
assert_eq!(outcome.run_id, run_id);
assert_eq!(outcome.reason, TimerFinishReason::Completed);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn timer_events_are_consumable_from_the_public_api() {
let timer = Timer::new();
let mut events = timer.subscribe();
let run_id = timer
.start_once(Duration::from_secs(1), || async { Ok(()) })
.await
.unwrap();
assert!(matches!(
events.wait_started().await,
Some(TimerEvent::Started { run_id: seen, .. }) if seen == run_id
));
advance(Duration::from_secs(1)).await;
settle().await;
assert!(matches!(
events.wait_tick().await,
Some(TimerEvent::Tick { run_id: seen, .. }) if seen == run_id
));
assert!(events.wait_finished().await.is_some());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn lifecycle_wait_helpers_are_consumable_from_the_public_api() {
let timer = Timer::new();
let mut events = timer.subscribe();
timer
.start_recurring(RecurringSchedule::new(Duration::from_secs(2)), || async {
Ok(())
})
.await
.unwrap();
settle().await;
timer.pause().await.unwrap();
assert!(matches!(
events.wait_paused().await,
Some(TimerEvent::Paused { .. })
));
timer.resume().await.unwrap();
assert!(matches!(
events.wait_resumed().await,
Some(TimerEvent::Resumed { .. })
));
let stopped = timer.stop().await.unwrap();
let seen = events.wait_stopped().await.unwrap();
assert_eq!(seen, stopped);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn registry_spawn_helpers_reduce_boilerplate() {
let registry = TimerRegistry::new();
let (timer_id, timer) = registry
.start_once(Duration::from_secs(2), || async { Ok(()) })
.await
.unwrap();
assert!(registry.get(timer_id).await.is_some());
advance(Duration::from_secs(2)).await;
settle().await;
let outcome = timer.join().await.unwrap();
assert_eq!(outcome.reason, TimerFinishReason::Completed);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn builder_api_is_simple_to_use() {
let timer = Timer::once(Duration::from_secs(3))
.start(|| async { Ok(()) })
.await
.unwrap();
advance(Duration::from_secs(3)).await;
settle().await;
let outcome = timer.join().await.unwrap();
assert_eq!(outcome.reason, TimerFinishReason::Completed);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn recurring_initial_delay_is_available_from_the_public_api() {
let timer = Timer::recurring(
RecurringSchedule::new(Duration::from_secs(5))
.with_initial_delay(Duration::from_secs(2))
.with_expiration_count(1),
)
.start(|| async { Ok(()) })
.await
.unwrap();
advance(Duration::from_secs(1)).await;
settle().await;
assert_eq!(timer.get_statistics().await.execution_count, 0);
advance(Duration::from_secs(1)).await;
settle().await;
assert_eq!(
timer.join().await.unwrap().reason,
TimerFinishReason::Completed
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn callback_timeout_is_available_from_the_public_api() {
let timer = Timer::once(Duration::from_secs(1))
.callback_timeout(Duration::from_secs(2))
.start(|| async {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok::<(), timer_lib::TimerError>(())
})
.await
.unwrap();
settle().await;
advance(Duration::from_secs(1)).await;
settle().await;
advance(Duration::from_secs(2)).await;
settle().await;
let outcome = timer.join().await.unwrap();
assert_eq!(outcome.statistics.failed_executions, 1);
assert!(outcome
.statistics
.last_error
.as_ref()
.is_some_and(|error| error.is_callback_timed_out()));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn retry_policy_is_available_from_the_public_api() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let attempts_for_callback = std::sync::Arc::clone(&attempts);
let timer = Timer::once(Duration::from_secs(1))
.max_retries(1)
.start(move || {
let attempts = std::sync::Arc::clone(&attempts_for_callback);
async move {
if attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst) == 0 {
Err(timer_lib::TimerError::callback_failed("retry"))
} else {
Ok::<(), timer_lib::TimerError>(())
}
}
})
.await
.unwrap();
settle().await;
advance(Duration::from_secs(1)).await;
settle().await;
let outcome = timer.join().await.unwrap();
assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
assert_eq!(outcome.statistics.failed_executions, 1);
assert_eq!(outcome.statistics.successful_executions, 1);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn completion_api_is_simple_to_consume() {
let timer = Timer::new();
let mut completion = timer.completion();
let run_id = timer
.start_once(Duration::from_secs(1), || async { Ok(()) })
.await
.unwrap();
advance(Duration::from_secs(1)).await;
settle().await;
let outcome = completion.wait_for_run(run_id).await.unwrap();
assert_eq!(outcome.run_id, run_id);
assert_eq!(outcome.reason, TimerFinishReason::Completed);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn registry_ergonomics_cover_common_bulk_operations() {
let registry = TimerRegistry::new();
let (first_id, _first_timer) = registry
.start_once(Duration::from_secs(1), || async { Ok(()) })
.await
.unwrap();
let (second_id, second_timer) = registry
.start_once(Duration::from_secs(2), || async { Ok(()) })
.await
.unwrap();
assert!(registry.contains(first_id).await);
assert!(registry.stop(first_id).await.unwrap().is_some());
assert!(registry.cancel(999_999).await.unwrap().is_none());
advance(Duration::from_secs(2)).await;
settle().await;
let joined = registry.join_all().await;
assert!(joined.iter().any(|(id, outcome)| {
*id == second_id && outcome.reason == TimerFinishReason::Completed
}));
assert_eq!(
second_timer.join().await.unwrap().reason,
TimerFinishReason::Completed
);
assert_eq!(registry.clear().await, 2);
assert!(registry.is_empty().await);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn registry_pause_and_resume_helpers_are_available_from_the_public_api() {
let registry = TimerRegistry::new();
let (timer_id, timer) = registry
.start_recurring(
RecurringSchedule::new(Duration::from_secs(2)).with_expiration_count(1),
|| async { Ok(()) },
)
.await
.unwrap();
settle().await;
assert!(registry.pause(timer_id).await.unwrap());
assert_eq!(timer.get_state().await, timer_lib::TimerState::Paused);
advance(Duration::from_secs(5)).await;
settle().await;
assert_eq!(timer.get_statistics().await.execution_count, 0);
registry.resume_all().await;
advance(Duration::from_secs(2)).await;
settle().await;
assert_eq!(
timer.join().await.unwrap().reason,
TimerFinishReason::Completed
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn recurring_schedule_is_the_public_configuration_entry_point() {
let timer = Timer::recurring(
RecurringSchedule::new(Duration::from_secs(3))
.with_initial_delay(Duration::from_secs(1))
.fixed_rate()
.with_expiration_count(1),
)
.start(|| async { Ok(()) })
.await
.unwrap();
settle().await;
advance(Duration::from_secs(1)).await;
settle().await;
assert_eq!(
timer.join().await.unwrap().reason,
TimerFinishReason::Completed
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn start_at_is_available_from_the_public_api() {
let timer = Timer::new();
let deadline = Instant::now() + Duration::from_secs(5);
timer.start_at(deadline, || async { Ok(()) }).await.unwrap();
advance(Duration::from_secs(4)).await;
settle().await;
assert_eq!(timer.get_statistics().await.execution_count, 0);
advance(Duration::from_secs(1)).await;
settle().await;
assert_eq!(
timer.join().await.unwrap().reason,
TimerFinishReason::Completed
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn labels_and_registry_snapshots_are_available_from_the_public_api() {
let timer = Timer::once(Duration::from_secs(1))
.label("billing")
.tag("tenant", "acme")
.start(|| async { Ok(()) })
.await
.unwrap();
let snapshot = timer.snapshot().await;
assert_eq!(snapshot.metadata.label.as_deref(), Some("billing"));
assert_eq!(
snapshot.metadata.tags.get("tenant").map(String::as_str),
Some("acme")
);
let registry = TimerRegistry::new();
let timer_id = registry.insert(timer.clone()).await;
let listed = registry.list().await;
assert!(listed.iter().any(|entry| {
entry.id == timer_id && entry.metadata.label.as_deref() == Some("billing")
}));
assert_eq!(registry.find_by_label("billing").await, vec![timer_id]);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn jitter_and_retry_backoff_are_available_from_the_public_api() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let attempts_for_callback = std::sync::Arc::clone(&attempts);
let timer = Timer::recurring(
RecurringSchedule::new(Duration::from_secs(2))
.with_jitter(Duration::from_secs(1))
.with_expiration_count(1),
)
.max_retries(1)
.fixed_backoff(Duration::from_secs(3))
.start(move || {
let attempts = std::sync::Arc::clone(&attempts_for_callback);
async move {
if attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst) == 0 {
Err(timer_lib::TimerError::callback_failed("boom"))
} else {
Ok::<(), timer_lib::TimerError>(())
}
}
})
.await
.unwrap();
advance(Duration::from_secs(6)).await;
settle().await;
let outcome = timer.join().await.unwrap();
assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
assert_eq!(outcome.reason, TimerFinishReason::Completed);
assert_eq!(outcome.statistics.failed_executions, 1);
assert_eq!(outcome.statistics.successful_executions, 1);
}
#[cfg(feature = "test-util")]
#[tokio::test(flavor = "current_thread")]
async fn mock_runtime_is_available_from_the_public_api() {
let (timer, runtime) = Timer::new_mocked();
let deadline = runtime.now() + Duration::from_secs(3);
timer.start_at(deadline, || async { Ok(()) }).await.unwrap();
runtime.advance(Duration::from_secs(2)).await;
assert_eq!(timer.get_statistics().await.execution_count, 0);
runtime.advance(Duration::from_secs(1)).await;
assert_eq!(
timer.join().await.unwrap().reason,
TimerFinishReason::Completed
);
}