use async_trait::async_trait;
use std::collections::BTreeMap;
use std::future::Future;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, watch, Mutex};
use tokio::task::JoinHandle;
use tokio::time::Instant;
#[cfg(feature = "logging")]
use log::debug;
use crate::errors::TimerError;
pub(crate) mod driver;
mod runtime;
#[cfg(test)]
mod tests;
#[cfg(feature = "test-util")]
pub use driver::MockRuntime;
const TIMER_EVENT_BUFFER: usize = 64;
fn saturating_mul_duration(duration: Duration, multiplier: u32) -> Duration {
let nanos = duration.as_nanos();
let scaled = nanos.saturating_mul(multiplier as u128);
let capped = scaled.min(u64::MAX as u128) as u64;
Duration::from_nanos(capped)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimerState {
Running,
Paused,
Stopped,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimerFinishReason {
Completed,
Stopped,
Cancelled,
Replaced,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TimerStatistics {
pub execution_count: usize,
pub successful_executions: usize,
pub failed_executions: usize,
pub elapsed_time: Duration,
pub last_error: Option<TimerError>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimerOutcome {
pub run_id: u64,
pub reason: TimerFinishReason,
pub statistics: TimerStatistics,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TimerMetadata {
pub label: Option<String>,
pub tags: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimerSnapshot {
pub state: TimerState,
pub interval: Duration,
pub expiration_count: Option<usize>,
pub statistics: TimerStatistics,
pub last_outcome: Option<TimerOutcome>,
pub metadata: TimerMetadata,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecurringCadence {
FixedDelay,
FixedRate,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecurringSchedule {
interval: Duration,
initial_delay: Option<Duration>,
cadence: RecurringCadence,
expiration_count: Option<usize>,
jitter: Option<Duration>,
}
impl RecurringSchedule {
pub fn new(interval: Duration) -> Self {
Self {
interval,
initial_delay: None,
cadence: RecurringCadence::FixedDelay,
expiration_count: None,
jitter: None,
}
}
pub fn interval(self) -> Duration {
self.interval
}
pub fn initial_delay(self) -> Option<Duration> {
self.initial_delay
}
pub fn cadence(self) -> RecurringCadence {
self.cadence
}
pub fn expiration_count(self) -> Option<usize> {
self.expiration_count
}
pub fn jitter(self) -> Option<Duration> {
self.jitter
}
pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
self.initial_delay = Some(initial_delay);
self
}
pub fn with_cadence(mut self, cadence: RecurringCadence) -> Self {
self.cadence = cadence;
self
}
pub fn fixed_delay(mut self) -> Self {
self.cadence = RecurringCadence::FixedDelay;
self
}
pub fn fixed_rate(mut self) -> Self {
self.cadence = RecurringCadence::FixedRate;
self
}
pub fn with_expiration_count(mut self, expiration_count: usize) -> Self {
self.expiration_count = Some(expiration_count);
self
}
pub fn with_jitter(mut self, jitter: Duration) -> Self {
self.jitter = Some(jitter);
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
max_retries: usize,
backoff: RetryBackoff,
}
impl RetryPolicy {
pub fn new(max_retries: usize) -> Self {
Self {
max_retries,
backoff: RetryBackoff::Immediate,
}
}
pub fn max_retries(self) -> usize {
self.max_retries
}
pub fn backoff(self) -> RetryBackoff {
self.backoff
}
pub fn with_backoff(mut self, backoff: RetryBackoff) -> Self {
self.backoff = backoff;
self
}
fn delay_for_retry(self, retry_number: usize) -> Duration {
self.backoff.delay_for_retry(retry_number)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetryBackoff {
Immediate,
Fixed(Duration),
Linear(Duration),
Exponential(Duration),
}
impl RetryBackoff {
fn delay_for_retry(self, retry_number: usize) -> Duration {
match self {
Self::Immediate => Duration::ZERO,
Self::Fixed(delay) => delay,
Self::Linear(step) => saturating_mul_duration(step, retry_number as u32),
Self::Exponential(base) => {
let exponent = retry_number.saturating_sub(1) as u32;
let multiplier = 1_u32.checked_shl(exponent.min(31)).unwrap_or(u32::MAX);
saturating_mul_duration(base, multiplier)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TimerEvent {
Started {
run_id: u64,
interval: Duration,
recurring: bool,
expiration_count: Option<usize>,
metadata: TimerMetadata,
},
Paused {
run_id: u64,
},
Resumed {
run_id: u64,
},
IntervalAdjusted {
run_id: u64,
interval: Duration,
},
Tick {
run_id: u64,
statistics: TimerStatistics,
},
CallbackFailed {
run_id: u64,
error: TimerError,
statistics: TimerStatistics,
},
Finished(TimerOutcome),
}
pub struct TimerEvents {
receiver: broadcast::Receiver<TimerEvent>,
}
impl TimerEvents {
pub fn try_recv(&mut self) -> Option<TimerEvent> {
loop {
match self.receiver.try_recv() {
Ok(event) => return Some(event),
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(broadcast::error::TryRecvError::Empty)
| Err(broadcast::error::TryRecvError::Closed) => return None,
}
}
}
pub async fn recv(&mut self) -> Option<TimerEvent> {
loop {
match self.receiver.recv().await {
Ok(event) => return Some(event),
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub async fn wait_started(&mut self) -> Option<TimerEvent> {
loop {
if let event @ TimerEvent::Started { .. } = self.recv().await? {
return Some(event);
}
}
}
pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
loop {
if let event @ TimerEvent::Tick { .. } = self.recv().await? {
return Some(event);
}
}
}
pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
loop {
if let event @ TimerEvent::Paused { .. } = self.recv().await? {
return Some(event);
}
}
}
pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
loop {
if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
return Some(event);
}
}
}
pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
loop {
if let TimerEvent::Finished(outcome) = self.recv().await? {
return Some(outcome);
}
}
}
pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
self.wait_finished_reason(TimerFinishReason::Stopped).await
}
pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
self.wait_finished_reason(TimerFinishReason::Cancelled)
.await
}
async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
loop {
let outcome = self.wait_finished().await?;
if outcome.reason == reason {
return Some(outcome);
}
}
}
}
pub struct TimerCompletion {
receiver: watch::Receiver<Option<TimerOutcome>>,
}
impl TimerCompletion {
pub fn latest(&self) -> Option<TimerOutcome> {
self.receiver.borrow().clone()
}
pub async fn wait(&mut self) -> Option<TimerOutcome> {
loop {
if let Some(outcome) = self.receiver.borrow_and_update().clone() {
return Some(outcome);
}
if self.receiver.changed().await.is_err() {
return self.receiver.borrow_and_update().clone();
}
}
}
pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
loop {
let outcome = self.wait().await?;
if outcome.run_id == run_id {
return Some(outcome);
}
}
}
}
#[async_trait]
pub trait TimerCallback: Send + Sync {
async fn execute(&self) -> Result<(), TimerError>;
}
#[async_trait]
impl<F, Fut> TimerCallback for F
where
F: Fn() -> Fut + Send + Sync,
Fut: Future<Output = Result<(), TimerError>> + Send,
{
async fn execute(&self) -> Result<(), TimerError> {
(self)().await
}
}
pub(super) enum TimerCommand {
Pause,
Resume,
Stop,
Cancel,
SetInterval(Duration),
}
pub(super) struct TimerInner {
pub(super) state: Mutex<TimerState>,
pub(super) handle: Mutex<Option<JoinHandle<()>>>,
pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
pub(super) interval: Mutex<Duration>,
pub(super) expiration_count: Mutex<Option<usize>>,
pub(super) metadata: Mutex<TimerMetadata>,
pub(super) statistics: Mutex<TimerStatistics>,
pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
pub(super) event_tx: broadcast::Sender<TimerEvent>,
pub(super) events_enabled: AtomicBool,
pub(super) runtime: driver::RuntimeHandle,
pub(super) next_run_id: AtomicU64,
pub(super) active_run_id: AtomicU64,
}
#[derive(Debug, Clone)]
pub(super) struct RunConfig {
pub(super) interval: Duration,
pub(super) start_deadline: Option<Instant>,
pub(super) initial_delay: Option<Duration>,
pub(super) jitter: Option<Duration>,
pub(super) callback_timeout: Option<Duration>,
pub(super) retry_policy: Option<RetryPolicy>,
pub(super) recurring: bool,
pub(super) cadence: RecurringCadence,
pub(super) expiration_count: Option<usize>,
pub(super) metadata: TimerMetadata,
}
#[derive(Debug, Clone, Copy)]
enum TimerKind {
Once(Duration),
At(Instant),
Recurring(RecurringSchedule),
}
pub struct TimerBuilder {
kind: TimerKind,
callback_timeout: Option<Duration>,
retry_policy: Option<RetryPolicy>,
start_paused: bool,
events_enabled: bool,
metadata: TimerMetadata,
}
#[derive(Clone)]
pub struct Timer {
inner: Arc<TimerInner>,
}
impl Default for Timer {
fn default() -> Self {
Self::new()
}
}
impl Timer {
pub fn new() -> Self {
Self::new_with_runtime(driver::RuntimeHandle::default(), true)
}
pub fn new_silent() -> Self {
Self::new_with_runtime(driver::RuntimeHandle::default(), false)
}
pub(crate) fn new_with_runtime(runtime: driver::RuntimeHandle, events_enabled: bool) -> Self {
let (completion_tx, _completion_rx) = watch::channel(None);
let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
Self {
inner: Arc::new(TimerInner {
state: Mutex::new(TimerState::Stopped),
handle: Mutex::new(None),
command_tx: Mutex::new(None),
interval: Mutex::new(Duration::ZERO),
expiration_count: Mutex::new(None),
metadata: Mutex::new(TimerMetadata::default()),
statistics: Mutex::new(TimerStatistics::default()),
last_outcome: Mutex::new(None),
completion_tx,
event_tx,
events_enabled: AtomicBool::new(events_enabled),
runtime,
next_run_id: AtomicU64::new(1),
active_run_id: AtomicU64::new(0),
}),
}
}
pub fn once(delay: Duration) -> TimerBuilder {
TimerBuilder::once(delay)
}
pub fn at(deadline: Instant) -> TimerBuilder {
TimerBuilder::at(deadline)
}
pub fn recurring(schedule: RecurringSchedule) -> TimerBuilder {
TimerBuilder::recurring(schedule)
}
pub fn subscribe(&self) -> TimerEvents {
TimerEvents {
receiver: self.inner.event_tx.subscribe(),
}
}
pub fn completion(&self) -> TimerCompletion {
TimerCompletion {
receiver: self.inner.completion_tx.subscribe(),
}
}
#[cfg(feature = "test-util")]
pub fn new_mocked() -> (Self, MockRuntime) {
let runtime = MockRuntime::new();
(Self::new_with_runtime(runtime.handle(), true), runtime)
}
pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
where
F: TimerCallback + 'static,
{
let metadata = self.inner.metadata.lock().await.clone();
self.start_internal(
RunConfig {
interval: delay,
start_deadline: None,
initial_delay: None,
jitter: None,
callback_timeout: None,
retry_policy: None,
recurring: false,
cadence: RecurringCadence::FixedDelay,
expiration_count: None,
metadata,
},
callback,
false,
)
.await
}
pub async fn start_once_fn<F, Fut>(
&self,
delay: Duration,
callback: F,
) -> Result<u64, TimerError>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
{
self.start_once(delay, callback).await
}
pub async fn start_at<F>(&self, deadline: Instant, callback: F) -> Result<u64, TimerError>
where
F: TimerCallback + 'static,
{
let now = self.inner.runtime.now();
let metadata = self.inner.metadata.lock().await.clone();
self.start_internal(
RunConfig {
interval: deadline.saturating_duration_since(now),
start_deadline: Some(deadline),
initial_delay: None,
jitter: None,
callback_timeout: None,
retry_policy: None,
recurring: false,
cadence: RecurringCadence::FixedDelay,
expiration_count: None,
metadata,
},
callback,
false,
)
.await
}
pub async fn start_at_fn<F, Fut>(
&self,
deadline: Instant,
callback: F,
) -> Result<u64, TimerError>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
{
self.start_at(deadline, callback).await
}
pub async fn start_recurring<F>(
&self,
schedule: RecurringSchedule,
callback: F,
) -> Result<u64, TimerError>
where
F: TimerCallback + 'static,
{
let metadata = self.inner.metadata.lock().await.clone();
self.start_internal(
RunConfig {
interval: schedule.interval,
start_deadline: None,
initial_delay: schedule.initial_delay,
jitter: schedule.jitter,
callback_timeout: None,
retry_policy: None,
recurring: true,
cadence: schedule.cadence,
expiration_count: schedule.expiration_count,
metadata,
},
callback,
false,
)
.await
}
pub async fn start_recurring_fn<F, Fut>(
&self,
schedule: RecurringSchedule,
callback: F,
) -> Result<u64, TimerError>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
{
self.start_recurring(schedule, callback).await
}
pub async fn pause(&self) -> Result<(), TimerError> {
self.ensure_not_reentrant(
"pause() cannot be awaited from the timer's active callback; use request_pause().",
)?;
self.request_pause().await
}
pub async fn request_pause(&self) -> Result<(), TimerError> {
let _run_id = self
.active_run_id()
.await
.ok_or_else(TimerError::not_running)?;
let mut state = self.inner.state.lock().await;
if *state != TimerState::Running {
return Err(TimerError::not_running());
}
*state = TimerState::Paused;
drop(state);
self.send_command(TimerCommand::Pause).await;
#[cfg(feature = "logging")]
debug!("Timer paused.");
Ok(())
}
pub async fn resume(&self) -> Result<(), TimerError> {
self.ensure_not_reentrant(
"resume() cannot be awaited from the timer's active callback; use request_resume().",
)?;
self.request_resume().await
}
pub async fn request_resume(&self) -> Result<(), TimerError> {
let _run_id = self
.active_run_id()
.await
.ok_or_else(TimerError::not_paused)?;
let mut state = self.inner.state.lock().await;
if *state != TimerState::Paused {
return Err(TimerError::not_paused());
}
*state = TimerState::Running;
drop(state);
self.send_command(TimerCommand::Resume).await;
#[cfg(feature = "logging")]
debug!("Timer resumed.");
Ok(())
}
pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
self.ensure_not_reentrant(
"stop() cannot be awaited from the timer's active callback; use request_stop().",
)?;
let run_id = self
.active_run_id()
.await
.ok_or_else(TimerError::not_running)?;
self.request_stop().await?;
self.join_run(run_id).await
}
pub async fn request_stop(&self) -> Result<(), TimerError> {
self.active_run_id()
.await
.ok_or_else(TimerError::not_running)?;
self.send_command(TimerCommand::Stop).await;
Ok(())
}
pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
self.ensure_not_reentrant(
"cancel() cannot be awaited from the timer's active callback; use request_cancel().",
)?;
self.cancel_with_reason(TimerFinishReason::Cancelled).await
}
pub async fn request_cancel(&self) -> Result<(), TimerError> {
self.active_run_id()
.await
.ok_or_else(TimerError::not_running)?;
self.send_command(TimerCommand::Cancel).await;
Ok(())
}
pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
self.ensure_not_reentrant(
"adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
)?;
self.request_adjust_interval(new_interval).await
}
pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
if new_interval.is_zero() {
return Err(TimerError::invalid_parameter(
"Interval must be greater than zero.",
));
}
let run_id = self
.active_run_id()
.await
.ok_or_else(TimerError::not_running)?;
*self.inner.interval.lock().await = new_interval;
self.send_command(TimerCommand::SetInterval(new_interval))
.await;
runtime::emit_event(
&self.inner,
TimerEvent::IntervalAdjusted {
run_id,
interval: new_interval,
},
);
#[cfg(feature = "logging")]
debug!("Timer interval adjusted.");
Ok(())
}
pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
self.ensure_not_reentrant(
"join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
)?;
if let Some(run_id) = self.active_run_id().await {
return self.join_run(run_id).await;
}
self.inner
.last_outcome
.lock()
.await
.clone()
.ok_or_else(TimerError::not_running)
}
pub async fn wait(&self) {
let _ = self.join().await;
}
pub async fn get_statistics(&self) -> TimerStatistics {
self.inner.statistics.lock().await.clone()
}
pub async fn get_state(&self) -> TimerState {
*self.inner.state.lock().await
}
pub async fn get_interval(&self) -> Duration {
*self.inner.interval.lock().await
}
pub async fn get_expiration_count(&self) -> Option<usize> {
*self.inner.expiration_count.lock().await
}
pub async fn get_last_error(&self) -> Option<TimerError> {
self.inner.statistics.lock().await.last_error.clone()
}
pub async fn metadata(&self) -> TimerMetadata {
self.inner.metadata.lock().await.clone()
}
pub async fn label(&self) -> Option<String> {
self.inner.metadata.lock().await.label.clone()
}
pub async fn set_label(&self, label: impl Into<String>) {
self.inner.metadata.lock().await.label = Some(label.into());
}
pub async fn set_tag(&self, key: impl Into<String>, value: impl Into<String>) {
self.inner
.metadata
.lock()
.await
.tags
.insert(key.into(), value.into());
}
pub async fn snapshot(&self) -> TimerSnapshot {
TimerSnapshot {
state: self.get_state().await,
interval: self.get_interval().await,
expiration_count: self.get_expiration_count().await,
statistics: self.get_statistics().await,
last_outcome: self.last_outcome().await,
metadata: self.metadata().await,
}
}
pub async fn last_outcome(&self) -> Option<TimerOutcome> {
self.inner.last_outcome.lock().await.clone()
}
pub fn set_events_enabled(&self, enabled: bool) {
self.inner.events_enabled.store(enabled, Ordering::SeqCst);
}
async fn start_internal<F>(
&self,
config: RunConfig,
callback: F,
start_paused: bool,
) -> Result<u64, TimerError>
where
F: TimerCallback + 'static,
{
if config.interval.is_zero() && config.start_deadline.is_none() {
return Err(TimerError::invalid_parameter(
"Interval must be greater than zero.",
));
}
if config.recurring && matches!(config.expiration_count, Some(0)) {
return Err(TimerError::invalid_parameter(
"Expiration count must be greater than zero.",
));
}
if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
return Err(TimerError::invalid_parameter(
"Initial delay must be greater than zero.",
));
}
if config.jitter.is_some_and(|jitter| jitter.is_zero()) {
return Err(TimerError::invalid_parameter(
"Jitter must be greater than zero.",
));
}
if config
.callback_timeout
.is_some_and(|timeout| timeout.is_zero())
{
return Err(TimerError::invalid_parameter(
"Callback timeout must be greater than zero.",
));
}
if config.retry_policy.is_some_and(|policy| {
matches!(
policy.backoff(),
RetryBackoff::Fixed(duration)
| RetryBackoff::Linear(duration)
| RetryBackoff::Exponential(duration) if duration.is_zero()
)
}) {
return Err(TimerError::invalid_parameter(
"Retry backoff must be greater than zero.",
));
}
self.ensure_not_reentrant(
"starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
)?;
let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = mpsc::unbounded_channel();
{
*self.inner.state.lock().await = if start_paused {
TimerState::Paused
} else {
TimerState::Running
};
*self.inner.command_tx.lock().await = Some(tx);
*self.inner.interval.lock().await = config.interval;
*self.inner.expiration_count.lock().await = config.expiration_count;
*self.inner.metadata.lock().await = config.metadata.clone();
*self.inner.statistics.lock().await = TimerStatistics::default();
*self.inner.last_outcome.lock().await = None;
self.inner.completion_tx.send_replace(None);
}
self.inner.active_run_id.store(run_id, Ordering::SeqCst);
runtime::emit_event(
&self.inner,
TimerEvent::Started {
run_id,
interval: config.interval,
recurring: config.recurring,
expiration_count: config.expiration_count,
metadata: config.metadata.clone(),
},
);
let inner = Arc::clone(&self.inner);
let handle = self.inner.runtime.spawn(async move {
let scoped_inner = Arc::clone(&inner);
runtime::with_run_context(&scoped_inner, run_id, async move {
runtime::run_timer(inner, run_id, config, callback, rx).await;
})
.await;
});
*self.inner.handle.lock().await = Some(handle);
#[cfg(feature = "logging")]
debug!("Timer started.");
Ok(run_id)
}
async fn active_run_id(&self) -> Option<u64> {
match self.inner.active_run_id.load(Ordering::SeqCst) {
0 => None,
run_id => Some(run_id),
}
}
async fn send_command(&self, command: TimerCommand) {
if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
let _ = tx.send(command);
}
}
fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
if runtime::is_current_run(&self.inner) {
return Err(TimerError::reentrant_operation(message));
}
Ok(())
}
async fn cancel_with_reason(
&self,
reason: TimerFinishReason,
) -> Result<TimerOutcome, TimerError> {
let run_id = self
.active_run_id()
.await
.ok_or_else(TimerError::not_running)?;
let _ = self.inner.command_tx.lock().await.take();
let handle = self.inner.handle.lock().await.take();
*self.inner.state.lock().await = TimerState::Stopped;
if let Some(handle) = handle {
handle.abort();
let _ = handle.await;
}
let statistics = self.get_statistics().await;
let outcome = TimerOutcome {
run_id,
reason,
statistics,
};
runtime::finish_run(&self.inner, outcome.clone()).await;
Ok(outcome)
}
async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
let mut completion_rx = self.inner.completion_tx.subscribe();
loop {
if let Some(outcome) = completion_rx.borrow().clone() {
if outcome.run_id == run_id {
return Ok(outcome);
}
}
if completion_rx.changed().await.is_err() {
return completion_rx
.borrow()
.clone()
.ok_or_else(TimerError::not_running);
}
}
}
}
impl TimerBuilder {
pub fn once(delay: Duration) -> Self {
Self {
kind: TimerKind::Once(delay),
callback_timeout: None,
retry_policy: None,
start_paused: false,
events_enabled: true,
metadata: TimerMetadata::default(),
}
}
pub fn at(deadline: Instant) -> Self {
Self {
kind: TimerKind::At(deadline),
callback_timeout: None,
retry_policy: None,
start_paused: false,
events_enabled: true,
metadata: TimerMetadata::default(),
}
}
pub fn recurring(schedule: RecurringSchedule) -> Self {
Self {
kind: TimerKind::Recurring(schedule),
callback_timeout: None,
retry_policy: None,
start_paused: false,
events_enabled: true,
metadata: TimerMetadata::default(),
}
}
pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
self.callback_timeout = Some(callback_timeout);
self
}
pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = Some(retry_policy);
self
}
pub fn max_retries(mut self, max_retries: usize) -> Self {
self.retry_policy = Some(RetryPolicy::new(max_retries));
self
}
pub fn fixed_backoff(mut self, delay: Duration) -> Self {
self.retry_policy = Some(
self.retry_policy
.unwrap_or_else(|| RetryPolicy::new(0))
.with_backoff(RetryBackoff::Fixed(delay)),
);
self
}
pub fn linear_backoff(mut self, step: Duration) -> Self {
self.retry_policy = Some(
self.retry_policy
.unwrap_or_else(|| RetryPolicy::new(0))
.with_backoff(RetryBackoff::Linear(step)),
);
self
}
pub fn exponential_backoff(mut self, base: Duration) -> Self {
self.retry_policy = Some(
self.retry_policy
.unwrap_or_else(|| RetryPolicy::new(0))
.with_backoff(RetryBackoff::Exponential(base)),
);
self
}
pub fn paused_start(mut self) -> Self {
self.start_paused = true;
self
}
pub fn with_events_disabled(mut self) -> Self {
self.events_enabled = false;
self
}
pub fn label(mut self, label: impl Into<String>) -> Self {
self.metadata.label = Some(label.into());
self
}
pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.tags.insert(key.into(), value.into());
self
}
pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
where
F: TimerCallback + 'static,
{
let Self {
kind,
callback_timeout,
retry_policy,
start_paused,
events_enabled,
metadata,
} = self;
let timer = Timer::new_with_runtime(driver::RuntimeHandle::default(), events_enabled);
if start_paused {
*timer.inner.state.lock().await = TimerState::Paused;
}
match kind {
TimerKind::Once(delay) => {
let _ = timer
.start_internal(
RunConfig {
interval: delay,
start_deadline: None,
initial_delay: None,
jitter: None,
callback_timeout,
retry_policy,
recurring: false,
cadence: RecurringCadence::FixedDelay,
expiration_count: None,
metadata: metadata.clone(),
},
callback,
start_paused,
)
.await?;
}
TimerKind::At(deadline) => {
let now = timer.inner.runtime.now();
let _ = timer
.start_internal(
RunConfig {
interval: deadline.saturating_duration_since(now),
start_deadline: Some(deadline),
initial_delay: None,
jitter: None,
callback_timeout,
retry_policy,
recurring: false,
cadence: RecurringCadence::FixedDelay,
expiration_count: None,
metadata: metadata.clone(),
},
callback,
start_paused,
)
.await?;
}
TimerKind::Recurring(schedule) => {
let _ = timer
.start_internal(
RunConfig {
interval: schedule.interval,
start_deadline: None,
initial_delay: schedule.initial_delay,
jitter: schedule.jitter,
callback_timeout,
retry_policy,
recurring: true,
cadence: schedule.cadence,
expiration_count: schedule.expiration_count,
metadata,
},
callback,
start_paused,
)
.await?;
}
}
Ok(timer)
}
}