Skip to main content

timer_lib/timer/
mod.rs

1use async_trait::async_trait;
2use std::collections::BTreeMap;
3use std::future::Future;
4use std::sync::{
5    atomic::{AtomicBool, AtomicU64, Ordering},
6    Arc,
7};
8use std::time::Duration;
9use tokio::sync::{broadcast, mpsc, watch, Mutex};
10use tokio::task::JoinHandle;
11use tokio::time::Instant;
12
13#[cfg(feature = "logging")]
14use log::debug;
15
16use crate::errors::TimerError;
17
18pub(crate) mod driver;
19mod runtime;
20
21#[cfg(test)]
22mod tests;
23
24#[cfg(feature = "test-util")]
25pub use driver::MockRuntime;
26
27const TIMER_EVENT_BUFFER: usize = 64;
28
29fn saturating_mul_duration(duration: Duration, multiplier: u32) -> Duration {
30    let nanos = duration.as_nanos();
31    let scaled = nanos.saturating_mul(multiplier as u128);
32    let capped = scaled.min(u64::MAX as u128) as u64;
33    Duration::from_nanos(capped)
34}
35
36/// Represents the state of a timer.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum TimerState {
39    Running,
40    Paused,
41    Stopped,
42}
43
44/// Indicates how a timer run ended.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum TimerFinishReason {
47    Completed,
48    Stopped,
49    Cancelled,
50    Replaced,
51}
52
53/// Statistics for a timer run.
54#[derive(Debug, Clone, Default, PartialEq, Eq)]
55pub struct TimerStatistics {
56    /// Number of callback executions attempted during the current run.
57    pub execution_count: usize,
58    /// Number of successful callback executions.
59    pub successful_executions: usize,
60    /// Number of failed callback executions.
61    pub failed_executions: usize,
62    /// Total elapsed time since the current run started.
63    pub elapsed_time: Duration,
64    /// The most recent callback error observed in the current run.
65    pub last_error: Option<TimerError>,
66}
67
68/// Describes the result of a completed timer run.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct TimerOutcome {
71    /// The completed run identifier.
72    pub run_id: u64,
73    /// The reason the run ended.
74    pub reason: TimerFinishReason,
75    /// Final run statistics.
76    pub statistics: TimerStatistics,
77}
78
79/// Metadata attached to a timer for observability.
80#[derive(Debug, Clone, Default, PartialEq, Eq)]
81pub struct TimerMetadata {
82    /// Optional human-readable label for the timer.
83    pub label: Option<String>,
84    /// Arbitrary key-value metadata associated with the timer.
85    pub tags: BTreeMap<String, String>,
86}
87
88/// Snapshot of the current or most recent timer state.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct TimerSnapshot {
91    /// The timer state observed for the snapshot.
92    pub state: TimerState,
93    /// The effective interval configured for the timer.
94    pub interval: Duration,
95    /// The optional recurring execution limit.
96    pub expiration_count: Option<usize>,
97    /// Statistics for the current or most recent run.
98    pub statistics: TimerStatistics,
99    /// The most recent completed outcome, if any.
100    pub last_outcome: Option<TimerOutcome>,
101    /// Metadata associated with the timer.
102    pub metadata: TimerMetadata,
103}
104
105/// Defines how recurring timers schedule the next execution.
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum RecurringCadence {
108    FixedDelay,
109    FixedRate,
110}
111
112/// Configures the schedule for a recurring timer.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub struct RecurringSchedule {
115    interval: Duration,
116    initial_delay: Option<Duration>,
117    cadence: RecurringCadence,
118    expiration_count: Option<usize>,
119    jitter: Option<Duration>,
120}
121
122impl RecurringSchedule {
123    /// Creates a recurring schedule from a fixed interval.
124    pub fn new(interval: Duration) -> Self {
125        Self {
126            interval,
127            initial_delay: None,
128            cadence: RecurringCadence::FixedDelay,
129            expiration_count: None,
130            jitter: None,
131        }
132    }
133
134    /// Returns the recurring interval.
135    pub fn interval(self) -> Duration {
136        self.interval
137    }
138
139    /// Returns the initial delay before the first execution.
140    pub fn initial_delay(self) -> Option<Duration> {
141        self.initial_delay
142    }
143
144    /// Returns the cadence used for the recurring schedule.
145    pub fn cadence(self) -> RecurringCadence {
146        self.cadence
147    }
148
149    /// Returns the optional execution limit.
150    pub fn expiration_count(self) -> Option<usize> {
151        self.expiration_count
152    }
153
154    /// Returns the optional jitter applied to recurring sleeps.
155    pub fn jitter(self) -> Option<Duration> {
156        self.jitter
157    }
158
159    /// Sets an initial delay before the first recurring execution.
160    pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
161        self.initial_delay = Some(initial_delay);
162        self
163    }
164
165    /// Sets the cadence used for subsequent executions.
166    pub fn with_cadence(mut self, cadence: RecurringCadence) -> Self {
167        self.cadence = cadence;
168        self
169    }
170
171    /// Uses fixed-delay cadence semantics.
172    pub fn fixed_delay(mut self) -> Self {
173        self.cadence = RecurringCadence::FixedDelay;
174        self
175    }
176
177    /// Uses fixed-rate cadence semantics.
178    pub fn fixed_rate(mut self) -> Self {
179        self.cadence = RecurringCadence::FixedRate;
180        self
181    }
182
183    /// Limits the number of recurring executions.
184    pub fn with_expiration_count(mut self, expiration_count: usize) -> Self {
185        self.expiration_count = Some(expiration_count);
186        self
187    }
188
189    /// Adds bounded jitter to recurring delays.
190    pub fn with_jitter(mut self, jitter: Duration) -> Self {
191        self.jitter = Some(jitter);
192        self
193    }
194}
195
196/// Configures retry behavior for failed callback executions.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub struct RetryPolicy {
199    max_retries: usize,
200    backoff: RetryBackoff,
201}
202
203impl RetryPolicy {
204    /// Creates a retry policy with the provided retry limit.
205    pub fn new(max_retries: usize) -> Self {
206        Self {
207            max_retries,
208            backoff: RetryBackoff::Immediate,
209        }
210    }
211
212    /// Returns the maximum number of retry attempts after the first failure.
213    pub fn max_retries(self) -> usize {
214        self.max_retries
215    }
216
217    /// Returns the configured retry backoff strategy.
218    pub fn backoff(self) -> RetryBackoff {
219        self.backoff
220    }
221
222    /// Sets the backoff strategy used between retry attempts.
223    pub fn with_backoff(mut self, backoff: RetryBackoff) -> Self {
224        self.backoff = backoff;
225        self
226    }
227
228    fn delay_for_retry(self, retry_number: usize) -> Duration {
229        self.backoff.delay_for_retry(retry_number)
230    }
231}
232
233/// Defines how retries should back off after callback failures.
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum RetryBackoff {
236    Immediate,
237    Fixed(Duration),
238    Linear(Duration),
239    Exponential(Duration),
240}
241
242impl RetryBackoff {
243    fn delay_for_retry(self, retry_number: usize) -> Duration {
244        match self {
245            Self::Immediate => Duration::ZERO,
246            Self::Fixed(delay) => delay,
247            Self::Linear(step) => saturating_mul_duration(step, retry_number as u32),
248            Self::Exponential(base) => {
249                let exponent = retry_number.saturating_sub(1) as u32;
250                let multiplier = 1_u32.checked_shl(exponent.min(31)).unwrap_or(u32::MAX);
251                saturating_mul_duration(base, multiplier)
252            }
253        }
254    }
255}
256
257/// Event stream item produced by a timer.
258#[derive(Debug, Clone, PartialEq, Eq)]
259pub enum TimerEvent {
260    Started {
261        run_id: u64,
262        interval: Duration,
263        recurring: bool,
264        expiration_count: Option<usize>,
265        metadata: TimerMetadata,
266    },
267    Paused {
268        run_id: u64,
269    },
270    Resumed {
271        run_id: u64,
272    },
273    IntervalAdjusted {
274        run_id: u64,
275        interval: Duration,
276    },
277    Tick {
278        run_id: u64,
279        statistics: TimerStatistics,
280    },
281    CallbackFailed {
282        run_id: u64,
283        error: TimerError,
284        statistics: TimerStatistics,
285    },
286    Finished(TimerOutcome),
287}
288
289/// Subscription handle for timer events.
290pub struct TimerEvents {
291    receiver: broadcast::Receiver<TimerEvent>,
292}
293
294impl TimerEvents {
295    /// Attempts to receive the next timer event without waiting.
296    pub fn try_recv(&mut self) -> Option<TimerEvent> {
297        loop {
298            match self.receiver.try_recv() {
299                Ok(event) => return Some(event),
300                Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
301                Err(broadcast::error::TryRecvError::Empty)
302                | Err(broadcast::error::TryRecvError::Closed) => return None,
303            }
304        }
305    }
306
307    /// Waits for the next timer event.
308    pub async fn recv(&mut self) -> Option<TimerEvent> {
309        loop {
310            match self.receiver.recv().await {
311                Ok(event) => return Some(event),
312                Err(broadcast::error::RecvError::Lagged(_)) => continue,
313                Err(broadcast::error::RecvError::Closed) => return None,
314            }
315        }
316    }
317
318    /// Waits for the next start event.
319    pub async fn wait_started(&mut self) -> Option<TimerEvent> {
320        loop {
321            if let event @ TimerEvent::Started { .. } = self.recv().await? {
322                return Some(event);
323            }
324        }
325    }
326
327    /// Waits for the next tick event.
328    pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
329        loop {
330            if let event @ TimerEvent::Tick { .. } = self.recv().await? {
331                return Some(event);
332            }
333        }
334    }
335
336    /// Waits for the next paused event.
337    pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
338        loop {
339            if let event @ TimerEvent::Paused { .. } = self.recv().await? {
340                return Some(event);
341            }
342        }
343    }
344
345    /// Waits for the next resumed event.
346    pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
347        loop {
348            if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
349                return Some(event);
350            }
351        }
352    }
353
354    /// Waits for the next finished event.
355    pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
356        loop {
357            if let TimerEvent::Finished(outcome) = self.recv().await? {
358                return Some(outcome);
359            }
360        }
361    }
362
363    /// Waits for the next finished event with a stopped outcome.
364    pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
365        self.wait_finished_reason(TimerFinishReason::Stopped).await
366    }
367
368    /// Waits for the next finished event with a cancelled outcome.
369    pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
370        self.wait_finished_reason(TimerFinishReason::Cancelled)
371            .await
372    }
373
374    async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
375        loop {
376            let outcome = self.wait_finished().await?;
377            if outcome.reason == reason {
378                return Some(outcome);
379            }
380        }
381    }
382}
383
384/// Lossless subscription handle for completed timer runs.
385pub struct TimerCompletion {
386    receiver: watch::Receiver<Option<TimerOutcome>>,
387}
388
389impl TimerCompletion {
390    /// Returns the latest completed outcome, if one has been observed.
391    pub fn latest(&self) -> Option<TimerOutcome> {
392        self.receiver.borrow().clone()
393    }
394
395    /// Waits for the next unseen completed outcome.
396    pub async fn wait(&mut self) -> Option<TimerOutcome> {
397        loop {
398            if let Some(outcome) = self.receiver.borrow_and_update().clone() {
399                return Some(outcome);
400            }
401
402            if self.receiver.changed().await.is_err() {
403                return self.receiver.borrow_and_update().clone();
404            }
405        }
406    }
407
408    /// Waits for a specific run to complete.
409    pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
410        loop {
411            let outcome = self.wait().await?;
412            if outcome.run_id == run_id {
413                return Some(outcome);
414            }
415        }
416    }
417}
418
419/// A trait for timer callbacks.
420#[async_trait]
421pub trait TimerCallback: Send + Sync {
422    /// The function to execute when the timer triggers.
423    async fn execute(&self) -> Result<(), TimerError>;
424}
425
426#[async_trait]
427impl<F, Fut> TimerCallback for F
428where
429    F: Fn() -> Fut + Send + Sync,
430    Fut: Future<Output = Result<(), TimerError>> + Send,
431{
432    async fn execute(&self) -> Result<(), TimerError> {
433        (self)().await
434    }
435}
436
437pub(super) enum TimerCommand {
438    Pause,
439    Resume,
440    Stop,
441    Cancel,
442    SetInterval(Duration),
443}
444
445pub(super) struct TimerInner {
446    pub(super) state: Mutex<TimerState>,
447    pub(super) handle: Mutex<Option<JoinHandle<()>>>,
448    pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
449    pub(super) interval: Mutex<Duration>,
450    pub(super) expiration_count: Mutex<Option<usize>>,
451    pub(super) metadata: Mutex<TimerMetadata>,
452    pub(super) statistics: Mutex<TimerStatistics>,
453    pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
454    pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
455    pub(super) event_tx: broadcast::Sender<TimerEvent>,
456    pub(super) events_enabled: AtomicBool,
457    pub(super) runtime: driver::RuntimeHandle,
458    pub(super) next_run_id: AtomicU64,
459    pub(super) active_run_id: AtomicU64,
460}
461
462#[derive(Debug, Clone)]
463pub(super) struct RunConfig {
464    pub(super) interval: Duration,
465    pub(super) start_deadline: Option<Instant>,
466    pub(super) initial_delay: Option<Duration>,
467    pub(super) jitter: Option<Duration>,
468    pub(super) callback_timeout: Option<Duration>,
469    pub(super) retry_policy: Option<RetryPolicy>,
470    pub(super) recurring: bool,
471    pub(super) cadence: RecurringCadence,
472    pub(super) expiration_count: Option<usize>,
473    pub(super) metadata: TimerMetadata,
474}
475
476#[derive(Debug, Clone, Copy)]
477enum TimerKind {
478    Once(Duration),
479    At(Instant),
480    Recurring(RecurringSchedule),
481}
482
483/// Builds and starts a timer with less boilerplate.
484pub struct TimerBuilder {
485    kind: TimerKind,
486    callback_timeout: Option<Duration>,
487    retry_policy: Option<RetryPolicy>,
488    start_paused: bool,
489    events_enabled: bool,
490    metadata: TimerMetadata,
491}
492
493#[derive(Clone)]
494/// Timer handle for managing one-time and recurring tasks.
495pub struct Timer {
496    inner: Arc<TimerInner>,
497}
498
499impl Default for Timer {
500    fn default() -> Self {
501        Self::new()
502    }
503}
504
505impl Timer {
506    /// Creates a new timer.
507    pub fn new() -> Self {
508        Self::new_with_runtime(driver::RuntimeHandle::default(), true)
509    }
510
511    /// Creates a new timer with broadcast events disabled.
512    pub fn new_silent() -> Self {
513        Self::new_with_runtime(driver::RuntimeHandle::default(), false)
514    }
515
516    pub(crate) fn new_with_runtime(runtime: driver::RuntimeHandle, events_enabled: bool) -> Self {
517        let (completion_tx, _completion_rx) = watch::channel(None);
518        let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
519
520        Self {
521            inner: Arc::new(TimerInner {
522                state: Mutex::new(TimerState::Stopped),
523                handle: Mutex::new(None),
524                command_tx: Mutex::new(None),
525                interval: Mutex::new(Duration::ZERO),
526                expiration_count: Mutex::new(None),
527                metadata: Mutex::new(TimerMetadata::default()),
528                statistics: Mutex::new(TimerStatistics::default()),
529                last_outcome: Mutex::new(None),
530                completion_tx,
531                event_tx,
532                events_enabled: AtomicBool::new(events_enabled),
533                runtime,
534                next_run_id: AtomicU64::new(1),
535                active_run_id: AtomicU64::new(0),
536            }),
537        }
538    }
539
540    /// Creates a timer builder configured for a one-time run.
541    pub fn once(delay: Duration) -> TimerBuilder {
542        TimerBuilder::once(delay)
543    }
544
545    /// Creates a timer builder configured for a one-time run at a deadline.
546    pub fn at(deadline: Instant) -> TimerBuilder {
547        TimerBuilder::at(deadline)
548    }
549
550    /// Creates a timer builder configured for a recurring schedule.
551    pub fn recurring(schedule: RecurringSchedule) -> TimerBuilder {
552        TimerBuilder::recurring(schedule)
553    }
554
555    /// Subscribes to future timer events.
556    pub fn subscribe(&self) -> TimerEvents {
557        TimerEvents {
558            receiver: self.inner.event_tx.subscribe(),
559        }
560    }
561
562    /// Subscribes to completed runs without loss.
563    pub fn completion(&self) -> TimerCompletion {
564        TimerCompletion {
565            receiver: self.inner.completion_tx.subscribe(),
566        }
567    }
568
569    /// Creates a new timer backed by a manually-driven test runtime.
570    #[cfg(feature = "test-util")]
571    pub fn new_mocked() -> (Self, MockRuntime) {
572        let runtime = MockRuntime::new();
573        (Self::new_with_runtime(runtime.handle(), true), runtime)
574    }
575
576    /// Starts a one-time timer.
577    pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
578    where
579        F: TimerCallback + 'static,
580    {
581        let metadata = self.inner.metadata.lock().await.clone();
582        self.start_internal(
583            RunConfig {
584                interval: delay,
585                start_deadline: None,
586                initial_delay: None,
587                jitter: None,
588                callback_timeout: None,
589                retry_policy: None,
590                recurring: false,
591                cadence: RecurringCadence::FixedDelay,
592                expiration_count: None,
593                metadata,
594            },
595            callback,
596            false,
597        )
598        .await
599    }
600
601    /// Starts a one-time timer from an async closure.
602    pub async fn start_once_fn<F, Fut>(
603        &self,
604        delay: Duration,
605        callback: F,
606    ) -> Result<u64, TimerError>
607    where
608        F: Fn() -> Fut + Send + Sync + 'static,
609        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
610    {
611        self.start_once(delay, callback).await
612    }
613
614    /// Starts a one-time timer that fires at the provided deadline.
615    pub async fn start_at<F>(&self, deadline: Instant, callback: F) -> Result<u64, TimerError>
616    where
617        F: TimerCallback + 'static,
618    {
619        let now = self.inner.runtime.now();
620        let metadata = self.inner.metadata.lock().await.clone();
621        self.start_internal(
622            RunConfig {
623                interval: deadline.saturating_duration_since(now),
624                start_deadline: Some(deadline),
625                initial_delay: None,
626                jitter: None,
627                callback_timeout: None,
628                retry_policy: None,
629                recurring: false,
630                cadence: RecurringCadence::FixedDelay,
631                expiration_count: None,
632                metadata,
633            },
634            callback,
635            false,
636        )
637        .await
638    }
639
640    /// Starts a one-time timer from an async closure at the provided deadline.
641    pub async fn start_at_fn<F, Fut>(
642        &self,
643        deadline: Instant,
644        callback: F,
645    ) -> Result<u64, TimerError>
646    where
647        F: Fn() -> Fut + Send + Sync + 'static,
648        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
649    {
650        self.start_at(deadline, callback).await
651    }
652
653    /// Starts a recurring timer with the provided schedule.
654    pub async fn start_recurring<F>(
655        &self,
656        schedule: RecurringSchedule,
657        callback: F,
658    ) -> Result<u64, TimerError>
659    where
660        F: TimerCallback + 'static,
661    {
662        let metadata = self.inner.metadata.lock().await.clone();
663        self.start_internal(
664            RunConfig {
665                interval: schedule.interval,
666                start_deadline: None,
667                initial_delay: schedule.initial_delay,
668                jitter: schedule.jitter,
669                callback_timeout: None,
670                retry_policy: None,
671                recurring: true,
672                cadence: schedule.cadence,
673                expiration_count: schedule.expiration_count,
674                metadata,
675            },
676            callback,
677            false,
678        )
679        .await
680    }
681
682    /// Starts a recurring timer from an async closure.
683    pub async fn start_recurring_fn<F, Fut>(
684        &self,
685        schedule: RecurringSchedule,
686        callback: F,
687    ) -> Result<u64, TimerError>
688    where
689        F: Fn() -> Fut + Send + Sync + 'static,
690        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
691    {
692        self.start_recurring(schedule, callback).await
693    }
694
695    /// Pauses a running timer.
696    pub async fn pause(&self) -> Result<(), TimerError> {
697        self.ensure_not_reentrant(
698            "pause() cannot be awaited from the timer's active callback; use request_pause().",
699        )?;
700        self.request_pause().await
701    }
702
703    /// Requests that the current run pause after the current callback or sleep edge.
704    pub async fn request_pause(&self) -> Result<(), TimerError> {
705        let _run_id = self
706            .active_run_id()
707            .await
708            .ok_or_else(TimerError::not_running)?;
709        let mut state = self.inner.state.lock().await;
710        if *state != TimerState::Running {
711            return Err(TimerError::not_running());
712        }
713
714        *state = TimerState::Paused;
715        drop(state);
716
717        self.send_command(TimerCommand::Pause).await;
718
719        #[cfg(feature = "logging")]
720        debug!("Timer paused.");
721
722        Ok(())
723    }
724
725    /// Resumes a paused timer.
726    pub async fn resume(&self) -> Result<(), TimerError> {
727        self.ensure_not_reentrant(
728            "resume() cannot be awaited from the timer's active callback; use request_resume().",
729        )?;
730        self.request_resume().await
731    }
732
733    /// Requests that a paused timer resume.
734    pub async fn request_resume(&self) -> Result<(), TimerError> {
735        let _run_id = self
736            .active_run_id()
737            .await
738            .ok_or_else(TimerError::not_paused)?;
739        let mut state = self.inner.state.lock().await;
740        if *state != TimerState::Paused {
741            return Err(TimerError::not_paused());
742        }
743
744        *state = TimerState::Running;
745        drop(state);
746
747        self.send_command(TimerCommand::Resume).await;
748
749        #[cfg(feature = "logging")]
750        debug!("Timer resumed.");
751
752        Ok(())
753    }
754
755    /// Stops the timer after the current callback finishes.
756    pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
757        self.ensure_not_reentrant(
758            "stop() cannot be awaited from the timer's active callback; use request_stop().",
759        )?;
760        let run_id = self
761            .active_run_id()
762            .await
763            .ok_or_else(TimerError::not_running)?;
764        self.request_stop().await?;
765        self.join_run(run_id).await
766    }
767
768    /// Requests a graceful stop without waiting for the outcome.
769    pub async fn request_stop(&self) -> Result<(), TimerError> {
770        self.active_run_id()
771            .await
772            .ok_or_else(TimerError::not_running)?;
773        self.send_command(TimerCommand::Stop).await;
774        Ok(())
775    }
776
777    /// Cancels the timer immediately and aborts the callback task.
778    pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
779        self.ensure_not_reentrant(
780            "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
781        )?;
782        self.cancel_with_reason(TimerFinishReason::Cancelled).await
783    }
784
785    /// Requests cancellation without aborting the active callback task.
786    pub async fn request_cancel(&self) -> Result<(), TimerError> {
787        self.active_run_id()
788            .await
789            .ok_or_else(TimerError::not_running)?;
790        self.send_command(TimerCommand::Cancel).await;
791        Ok(())
792    }
793
794    /// Adjusts the interval of a running or paused timer.
795    pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
796        self.ensure_not_reentrant(
797            "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
798        )?;
799        self.request_adjust_interval(new_interval).await
800    }
801
802    /// Requests an interval adjustment for the current run.
803    pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
804        if new_interval.is_zero() {
805            return Err(TimerError::invalid_parameter(
806                "Interval must be greater than zero.",
807            ));
808        }
809
810        let run_id = self
811            .active_run_id()
812            .await
813            .ok_or_else(TimerError::not_running)?;
814        *self.inner.interval.lock().await = new_interval;
815        self.send_command(TimerCommand::SetInterval(new_interval))
816            .await;
817        runtime::emit_event(
818            &self.inner,
819            TimerEvent::IntervalAdjusted {
820                run_id,
821                interval: new_interval,
822            },
823        );
824
825        #[cfg(feature = "logging")]
826        debug!("Timer interval adjusted.");
827
828        Ok(())
829    }
830
831    /// Waits for the current run and returns the completed outcome.
832    pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
833        self.ensure_not_reentrant(
834            "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
835        )?;
836        if let Some(run_id) = self.active_run_id().await {
837            return self.join_run(run_id).await;
838        }
839
840        self.inner
841            .last_outcome
842            .lock()
843            .await
844            .clone()
845            .ok_or_else(TimerError::not_running)
846    }
847
848    /// Waits until the current run has completed or been stopped.
849    pub async fn wait(&self) {
850        let _ = self.join().await;
851    }
852
853    /// Gets the timer's statistics for the current or most recent run.
854    pub async fn get_statistics(&self) -> TimerStatistics {
855        self.inner.statistics.lock().await.clone()
856    }
857
858    /// Gets the current state of the timer.
859    pub async fn get_state(&self) -> TimerState {
860        *self.inner.state.lock().await
861    }
862
863    /// Gets the timer interval for the current or next run.
864    pub async fn get_interval(&self) -> Duration {
865        *self.inner.interval.lock().await
866    }
867
868    /// Gets the configured expiration count for the current or next run.
869    pub async fn get_expiration_count(&self) -> Option<usize> {
870        *self.inner.expiration_count.lock().await
871    }
872
873    /// Gets the most recent callback error observed for the current or most recent run.
874    pub async fn get_last_error(&self) -> Option<TimerError> {
875        self.inner.statistics.lock().await.last_error.clone()
876    }
877
878    /// Returns the metadata currently associated with the timer.
879    pub async fn metadata(&self) -> TimerMetadata {
880        self.inner.metadata.lock().await.clone()
881    }
882
883    /// Returns the current label, if one has been assigned.
884    pub async fn label(&self) -> Option<String> {
885        self.inner.metadata.lock().await.label.clone()
886    }
887
888    /// Sets the timer label used for diagnostics and registry introspection.
889    pub async fn set_label(&self, label: impl Into<String>) {
890        self.inner.metadata.lock().await.label = Some(label.into());
891    }
892
893    /// Sets or replaces a metadata tag on the timer.
894    pub async fn set_tag(&self, key: impl Into<String>, value: impl Into<String>) {
895        self.inner
896            .metadata
897            .lock()
898            .await
899            .tags
900            .insert(key.into(), value.into());
901    }
902
903    /// Captures a snapshot of the timer's current observable state.
904    pub async fn snapshot(&self) -> TimerSnapshot {
905        TimerSnapshot {
906            state: self.get_state().await,
907            interval: self.get_interval().await,
908            expiration_count: self.get_expiration_count().await,
909            statistics: self.get_statistics().await,
910            last_outcome: self.last_outcome().await,
911            metadata: self.metadata().await,
912        }
913    }
914
915    /// Gets the most recent completed run outcome.
916    pub async fn last_outcome(&self) -> Option<TimerOutcome> {
917        self.inner.last_outcome.lock().await.clone()
918    }
919
920    /// Enables or disables broadcast event emission for future runtime events.
921    pub fn set_events_enabled(&self, enabled: bool) {
922        self.inner.events_enabled.store(enabled, Ordering::SeqCst);
923    }
924
925    async fn start_internal<F>(
926        &self,
927        config: RunConfig,
928        callback: F,
929        start_paused: bool,
930    ) -> Result<u64, TimerError>
931    where
932        F: TimerCallback + 'static,
933    {
934        if config.interval.is_zero() && config.start_deadline.is_none() {
935            return Err(TimerError::invalid_parameter(
936                "Interval must be greater than zero.",
937            ));
938        }
939
940        if config.recurring && matches!(config.expiration_count, Some(0)) {
941            return Err(TimerError::invalid_parameter(
942                "Expiration count must be greater than zero.",
943            ));
944        }
945
946        if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
947            return Err(TimerError::invalid_parameter(
948                "Initial delay must be greater than zero.",
949            ));
950        }
951
952        if config.jitter.is_some_and(|jitter| jitter.is_zero()) {
953            return Err(TimerError::invalid_parameter(
954                "Jitter must be greater than zero.",
955            ));
956        }
957
958        if config
959            .callback_timeout
960            .is_some_and(|timeout| timeout.is_zero())
961        {
962            return Err(TimerError::invalid_parameter(
963                "Callback timeout must be greater than zero.",
964            ));
965        }
966
967        if config.retry_policy.is_some_and(|policy| {
968            matches!(
969                policy.backoff(),
970                RetryBackoff::Fixed(duration)
971                    | RetryBackoff::Linear(duration)
972                    | RetryBackoff::Exponential(duration) if duration.is_zero()
973            )
974        }) {
975            return Err(TimerError::invalid_parameter(
976                "Retry backoff must be greater than zero.",
977            ));
978        }
979
980        self.ensure_not_reentrant(
981            "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
982        )?;
983
984        let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
985
986        let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
987        let (tx, rx) = mpsc::unbounded_channel();
988
989        {
990            *self.inner.state.lock().await = if start_paused {
991                TimerState::Paused
992            } else {
993                TimerState::Running
994            };
995            *self.inner.command_tx.lock().await = Some(tx);
996            *self.inner.interval.lock().await = config.interval;
997            *self.inner.expiration_count.lock().await = config.expiration_count;
998            *self.inner.metadata.lock().await = config.metadata.clone();
999            *self.inner.statistics.lock().await = TimerStatistics::default();
1000            *self.inner.last_outcome.lock().await = None;
1001            self.inner.completion_tx.send_replace(None);
1002        }
1003        self.inner.active_run_id.store(run_id, Ordering::SeqCst);
1004
1005        runtime::emit_event(
1006            &self.inner,
1007            TimerEvent::Started {
1008                run_id,
1009                interval: config.interval,
1010                recurring: config.recurring,
1011                expiration_count: config.expiration_count,
1012                metadata: config.metadata.clone(),
1013            },
1014        );
1015
1016        let inner = Arc::clone(&self.inner);
1017        let handle = self.inner.runtime.spawn(async move {
1018            let scoped_inner = Arc::clone(&inner);
1019            runtime::with_run_context(&scoped_inner, run_id, async move {
1020                runtime::run_timer(inner, run_id, config, callback, rx).await;
1021            })
1022            .await;
1023        });
1024
1025        *self.inner.handle.lock().await = Some(handle);
1026
1027        #[cfg(feature = "logging")]
1028        debug!("Timer started.");
1029
1030        Ok(run_id)
1031    }
1032
1033    async fn active_run_id(&self) -> Option<u64> {
1034        match self.inner.active_run_id.load(Ordering::SeqCst) {
1035            0 => None,
1036            run_id => Some(run_id),
1037        }
1038    }
1039
1040    async fn send_command(&self, command: TimerCommand) {
1041        if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
1042            let _ = tx.send(command);
1043        }
1044    }
1045
1046    fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
1047        if runtime::is_current_run(&self.inner) {
1048            return Err(TimerError::reentrant_operation(message));
1049        }
1050
1051        Ok(())
1052    }
1053
1054    async fn cancel_with_reason(
1055        &self,
1056        reason: TimerFinishReason,
1057    ) -> Result<TimerOutcome, TimerError> {
1058        let run_id = self
1059            .active_run_id()
1060            .await
1061            .ok_or_else(TimerError::not_running)?;
1062
1063        let _ = self.inner.command_tx.lock().await.take();
1064        let handle = self.inner.handle.lock().await.take();
1065        *self.inner.state.lock().await = TimerState::Stopped;
1066
1067        if let Some(handle) = handle {
1068            handle.abort();
1069            let _ = handle.await;
1070        }
1071
1072        let statistics = self.get_statistics().await;
1073        let outcome = TimerOutcome {
1074            run_id,
1075            reason,
1076            statistics,
1077        };
1078
1079        runtime::finish_run(&self.inner, outcome.clone()).await;
1080        Ok(outcome)
1081    }
1082
1083    async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
1084        let mut completion_rx = self.inner.completion_tx.subscribe();
1085
1086        loop {
1087            if let Some(outcome) = completion_rx.borrow().clone() {
1088                if outcome.run_id == run_id {
1089                    return Ok(outcome);
1090                }
1091            }
1092
1093            if completion_rx.changed().await.is_err() {
1094                return completion_rx
1095                    .borrow()
1096                    .clone()
1097                    .ok_or_else(TimerError::not_running);
1098            }
1099        }
1100    }
1101}
1102
1103impl TimerBuilder {
1104    /// Creates a builder for a one-time timer.
1105    pub fn once(delay: Duration) -> Self {
1106        Self {
1107            kind: TimerKind::Once(delay),
1108            callback_timeout: None,
1109            retry_policy: None,
1110            start_paused: false,
1111            events_enabled: true,
1112            metadata: TimerMetadata::default(),
1113        }
1114    }
1115
1116    /// Creates a builder for a one-time timer at a deadline.
1117    pub fn at(deadline: Instant) -> Self {
1118        Self {
1119            kind: TimerKind::At(deadline),
1120            callback_timeout: None,
1121            retry_policy: None,
1122            start_paused: false,
1123            events_enabled: true,
1124            metadata: TimerMetadata::default(),
1125        }
1126    }
1127
1128    /// Creates a builder for a recurring schedule.
1129    pub fn recurring(schedule: RecurringSchedule) -> Self {
1130        Self {
1131            kind: TimerKind::Recurring(schedule),
1132            callback_timeout: None,
1133            retry_policy: None,
1134            start_paused: false,
1135            events_enabled: true,
1136            metadata: TimerMetadata::default(),
1137        }
1138    }
1139
1140    /// Sets a timeout for each callback execution.
1141    pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
1142        self.callback_timeout = Some(callback_timeout);
1143        self
1144    }
1145
1146    /// Retries failed callback executions according to the provided policy.
1147    pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
1148        self.retry_policy = Some(retry_policy);
1149        self
1150    }
1151
1152    /// Retries failed callback executions up to `max_retries` times.
1153    pub fn max_retries(mut self, max_retries: usize) -> Self {
1154        self.retry_policy = Some(RetryPolicy::new(max_retries));
1155        self
1156    }
1157
1158    /// Applies a fixed delay between retry attempts.
1159    pub fn fixed_backoff(mut self, delay: Duration) -> Self {
1160        self.retry_policy = Some(
1161            self.retry_policy
1162                .unwrap_or_else(|| RetryPolicy::new(0))
1163                .with_backoff(RetryBackoff::Fixed(delay)),
1164        );
1165        self
1166    }
1167
1168    /// Applies a linearly increasing delay between retry attempts.
1169    pub fn linear_backoff(mut self, step: Duration) -> Self {
1170        self.retry_policy = Some(
1171            self.retry_policy
1172                .unwrap_or_else(|| RetryPolicy::new(0))
1173                .with_backoff(RetryBackoff::Linear(step)),
1174        );
1175        self
1176    }
1177
1178    /// Applies an exponentially increasing delay between retry attempts.
1179    pub fn exponential_backoff(mut self, base: Duration) -> Self {
1180        self.retry_policy = Some(
1181            self.retry_policy
1182                .unwrap_or_else(|| RetryPolicy::new(0))
1183                .with_backoff(RetryBackoff::Exponential(base)),
1184        );
1185        self
1186    }
1187
1188    /// Starts the timer in the paused state.
1189    pub fn paused_start(mut self) -> Self {
1190        self.start_paused = true;
1191        self
1192    }
1193
1194    /// Disables broadcast event emission for the timer.
1195    pub fn with_events_disabled(mut self) -> Self {
1196        self.events_enabled = false;
1197        self
1198    }
1199
1200    /// Sets a label for the started timer.
1201    pub fn label(mut self, label: impl Into<String>) -> Self {
1202        self.metadata.label = Some(label.into());
1203        self
1204    }
1205
1206    /// Adds a metadata tag for the started timer.
1207    pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1208        self.metadata.tags.insert(key.into(), value.into());
1209        self
1210    }
1211
1212    /// Starts the configured timer and returns the handle.
1213    pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
1214    where
1215        F: TimerCallback + 'static,
1216    {
1217        let Self {
1218            kind,
1219            callback_timeout,
1220            retry_policy,
1221            start_paused,
1222            events_enabled,
1223            metadata,
1224        } = self;
1225
1226        let timer = Timer::new_with_runtime(driver::RuntimeHandle::default(), events_enabled);
1227        if start_paused {
1228            *timer.inner.state.lock().await = TimerState::Paused;
1229        }
1230
1231        match kind {
1232            TimerKind::Once(delay) => {
1233                let _ = timer
1234                    .start_internal(
1235                        RunConfig {
1236                            interval: delay,
1237                            start_deadline: None,
1238                            initial_delay: None,
1239                            jitter: None,
1240                            callback_timeout,
1241                            retry_policy,
1242                            recurring: false,
1243                            cadence: RecurringCadence::FixedDelay,
1244                            expiration_count: None,
1245                            metadata: metadata.clone(),
1246                        },
1247                        callback,
1248                        start_paused,
1249                    )
1250                    .await?;
1251            }
1252            TimerKind::At(deadline) => {
1253                let now = timer.inner.runtime.now();
1254                let _ = timer
1255                    .start_internal(
1256                        RunConfig {
1257                            interval: deadline.saturating_duration_since(now),
1258                            start_deadline: Some(deadline),
1259                            initial_delay: None,
1260                            jitter: None,
1261                            callback_timeout,
1262                            retry_policy,
1263                            recurring: false,
1264                            cadence: RecurringCadence::FixedDelay,
1265                            expiration_count: None,
1266                            metadata: metadata.clone(),
1267                        },
1268                        callback,
1269                        start_paused,
1270                    )
1271                    .await?;
1272            }
1273            TimerKind::Recurring(schedule) => {
1274                let _ = timer
1275                    .start_internal(
1276                        RunConfig {
1277                            interval: schedule.interval,
1278                            start_deadline: None,
1279                            initial_delay: schedule.initial_delay,
1280                            jitter: schedule.jitter,
1281                            callback_timeout,
1282                            retry_policy,
1283                            recurring: true,
1284                            cadence: schedule.cadence,
1285                            expiration_count: schedule.expiration_count,
1286                            metadata,
1287                        },
1288                        callback,
1289                        start_paused,
1290                    )
1291                    .await?;
1292            }
1293        }
1294        Ok(timer)
1295    }
1296}