Skip to main content

timer_lib/timer/
mod.rs

1use async_trait::async_trait;
2use std::future::Future;
3use std::sync::{
4    atomic::{AtomicBool, AtomicU64, Ordering},
5    Arc,
6};
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, watch, Mutex};
9use tokio::task::JoinHandle;
10
11#[cfg(feature = "logging")]
12use log::debug;
13
14use crate::errors::TimerError;
15
16mod driver;
17mod runtime;
18
19#[cfg(test)]
20mod tests;
21
22const TIMER_EVENT_BUFFER: usize = 64;
23
24/// Represents the state of a timer.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum TimerState {
27    Running,
28    Paused,
29    Stopped,
30}
31
32/// Indicates how a timer run ended.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TimerFinishReason {
35    Completed,
36    Stopped,
37    Cancelled,
38    Replaced,
39}
40
41/// Statistics for a timer run.
42#[derive(Debug, Clone, Default, PartialEq, Eq)]
43pub struct TimerStatistics {
44    /// Number of callback executions attempted during the current run.
45    pub execution_count: usize,
46    /// Number of successful callback executions.
47    pub successful_executions: usize,
48    /// Number of failed callback executions.
49    pub failed_executions: usize,
50    /// Total elapsed time since the current run started.
51    pub elapsed_time: Duration,
52    /// The most recent callback error observed in the current run.
53    pub last_error: Option<TimerError>,
54}
55
56/// Describes the result of a completed timer run.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TimerOutcome {
59    /// The completed run identifier.
60    pub run_id: u64,
61    /// The reason the run ended.
62    pub reason: TimerFinishReason,
63    /// Final run statistics.
64    pub statistics: TimerStatistics,
65}
66
67/// Defines how recurring timers schedule the next execution.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum RecurringCadence {
70    FixedDelay,
71    FixedRate,
72}
73
74/// Configures the schedule for a recurring timer.
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct RecurringSchedule {
77    interval: Duration,
78    initial_delay: Option<Duration>,
79    cadence: RecurringCadence,
80    expiration_count: Option<usize>,
81}
82
83impl RecurringSchedule {
84    /// Creates a recurring schedule from a fixed interval.
85    pub fn new(interval: Duration) -> Self {
86        Self {
87            interval,
88            initial_delay: None,
89            cadence: RecurringCadence::FixedDelay,
90            expiration_count: None,
91        }
92    }
93
94    /// Returns the recurring interval.
95    pub fn interval(self) -> Duration {
96        self.interval
97    }
98
99    /// Returns the initial delay before the first execution.
100    pub fn initial_delay(self) -> Option<Duration> {
101        self.initial_delay
102    }
103
104    /// Returns the cadence used for the recurring schedule.
105    pub fn cadence(self) -> RecurringCadence {
106        self.cadence
107    }
108
109    /// Returns the optional execution limit.
110    pub fn expiration_count(self) -> Option<usize> {
111        self.expiration_count
112    }
113
114    /// Sets an initial delay before the first recurring execution.
115    pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
116        self.initial_delay = Some(initial_delay);
117        self
118    }
119
120    /// Sets the cadence used for subsequent executions.
121    pub fn with_cadence(mut self, cadence: RecurringCadence) -> Self {
122        self.cadence = cadence;
123        self
124    }
125
126    /// Uses fixed-delay cadence semantics.
127    pub fn fixed_delay(mut self) -> Self {
128        self.cadence = RecurringCadence::FixedDelay;
129        self
130    }
131
132    /// Uses fixed-rate cadence semantics.
133    pub fn fixed_rate(mut self) -> Self {
134        self.cadence = RecurringCadence::FixedRate;
135        self
136    }
137
138    /// Limits the number of recurring executions.
139    pub fn with_expiration_count(mut self, expiration_count: usize) -> Self {
140        self.expiration_count = Some(expiration_count);
141        self
142    }
143}
144
145/// Configures retry behavior for failed callback executions.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub struct RetryPolicy {
148    max_retries: usize,
149}
150
151impl RetryPolicy {
152    /// Creates a retry policy with the provided retry limit.
153    pub fn new(max_retries: usize) -> Self {
154        Self { max_retries }
155    }
156
157    /// Returns the maximum number of retry attempts after the first failure.
158    pub fn max_retries(self) -> usize {
159        self.max_retries
160    }
161}
162
163/// Event stream item produced by a timer.
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum TimerEvent {
166    Started {
167        run_id: u64,
168        interval: Duration,
169        recurring: bool,
170        expiration_count: Option<usize>,
171    },
172    Paused {
173        run_id: u64,
174    },
175    Resumed {
176        run_id: u64,
177    },
178    IntervalAdjusted {
179        run_id: u64,
180        interval: Duration,
181    },
182    Tick {
183        run_id: u64,
184        statistics: TimerStatistics,
185    },
186    CallbackFailed {
187        run_id: u64,
188        error: TimerError,
189        statistics: TimerStatistics,
190    },
191    Finished(TimerOutcome),
192}
193
194/// Subscription handle for timer events.
195pub struct TimerEvents {
196    receiver: broadcast::Receiver<TimerEvent>,
197}
198
199impl TimerEvents {
200    /// Attempts to receive the next timer event without waiting.
201    pub fn try_recv(&mut self) -> Option<TimerEvent> {
202        loop {
203            match self.receiver.try_recv() {
204                Ok(event) => return Some(event),
205                Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
206                Err(broadcast::error::TryRecvError::Empty)
207                | Err(broadcast::error::TryRecvError::Closed) => return None,
208            }
209        }
210    }
211
212    /// Waits for the next timer event.
213    pub async fn recv(&mut self) -> Option<TimerEvent> {
214        loop {
215            match self.receiver.recv().await {
216                Ok(event) => return Some(event),
217                Err(broadcast::error::RecvError::Lagged(_)) => continue,
218                Err(broadcast::error::RecvError::Closed) => return None,
219            }
220        }
221    }
222
223    /// Waits for the next start event.
224    pub async fn wait_started(&mut self) -> Option<TimerEvent> {
225        loop {
226            if let event @ TimerEvent::Started { .. } = self.recv().await? {
227                return Some(event);
228            }
229        }
230    }
231
232    /// Waits for the next tick event.
233    pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
234        loop {
235            if let event @ TimerEvent::Tick { .. } = self.recv().await? {
236                return Some(event);
237            }
238        }
239    }
240
241    /// Waits for the next paused event.
242    pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
243        loop {
244            if let event @ TimerEvent::Paused { .. } = self.recv().await? {
245                return Some(event);
246            }
247        }
248    }
249
250    /// Waits for the next resumed event.
251    pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
252        loop {
253            if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
254                return Some(event);
255            }
256        }
257    }
258
259    /// Waits for the next finished event.
260    pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
261        loop {
262            if let TimerEvent::Finished(outcome) = self.recv().await? {
263                return Some(outcome);
264            }
265        }
266    }
267
268    /// Waits for the next finished event with a stopped outcome.
269    pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
270        self.wait_finished_reason(TimerFinishReason::Stopped).await
271    }
272
273    /// Waits for the next finished event with a cancelled outcome.
274    pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
275        self.wait_finished_reason(TimerFinishReason::Cancelled)
276            .await
277    }
278
279    async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
280        loop {
281            let outcome = self.wait_finished().await?;
282            if outcome.reason == reason {
283                return Some(outcome);
284            }
285        }
286    }
287}
288
289/// Lossless subscription handle for completed timer runs.
290pub struct TimerCompletion {
291    receiver: watch::Receiver<Option<TimerOutcome>>,
292}
293
294impl TimerCompletion {
295    /// Returns the latest completed outcome, if one has been observed.
296    pub fn latest(&self) -> Option<TimerOutcome> {
297        self.receiver.borrow().clone()
298    }
299
300    /// Waits for the next unseen completed outcome.
301    pub async fn wait(&mut self) -> Option<TimerOutcome> {
302        loop {
303            if let Some(outcome) = self.receiver.borrow_and_update().clone() {
304                return Some(outcome);
305            }
306
307            if self.receiver.changed().await.is_err() {
308                return self.receiver.borrow_and_update().clone();
309            }
310        }
311    }
312
313    /// Waits for a specific run to complete.
314    pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
315        loop {
316            let outcome = self.wait().await?;
317            if outcome.run_id == run_id {
318                return Some(outcome);
319            }
320        }
321    }
322}
323
324/// A trait for timer callbacks.
325#[async_trait]
326pub trait TimerCallback: Send + Sync {
327    /// The function to execute when the timer triggers.
328    async fn execute(&self) -> Result<(), TimerError>;
329}
330
331#[async_trait]
332impl<F, Fut> TimerCallback for F
333where
334    F: Fn() -> Fut + Send + Sync,
335    Fut: Future<Output = Result<(), TimerError>> + Send,
336{
337    async fn execute(&self) -> Result<(), TimerError> {
338        (self)().await
339    }
340}
341
342pub(super) enum TimerCommand {
343    Pause,
344    Resume,
345    Stop,
346    Cancel,
347    SetInterval(Duration),
348}
349
350pub(super) struct TimerInner {
351    pub(super) state: Mutex<TimerState>,
352    pub(super) handle: Mutex<Option<JoinHandle<()>>>,
353    pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
354    pub(super) interval: Mutex<Duration>,
355    pub(super) expiration_count: Mutex<Option<usize>>,
356    pub(super) statistics: Mutex<TimerStatistics>,
357    pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
358    pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
359    pub(super) event_tx: broadcast::Sender<TimerEvent>,
360    pub(super) events_enabled: AtomicBool,
361    pub(super) runtime: driver::RuntimeHandle,
362    pub(super) next_run_id: AtomicU64,
363    pub(super) active_run_id: AtomicU64,
364}
365
366#[derive(Debug, Clone, Copy)]
367pub(super) struct RunConfig {
368    pub(super) interval: Duration,
369    pub(super) initial_delay: Option<Duration>,
370    pub(super) callback_timeout: Option<Duration>,
371    pub(super) retry_policy: Option<RetryPolicy>,
372    pub(super) recurring: bool,
373    pub(super) cadence: RecurringCadence,
374    pub(super) expiration_count: Option<usize>,
375}
376
377#[derive(Debug, Clone, Copy)]
378enum TimerKind {
379    Once(Duration),
380    Recurring(RecurringSchedule),
381}
382
383/// Builds and starts a timer with less boilerplate.
384pub struct TimerBuilder {
385    kind: TimerKind,
386    callback_timeout: Option<Duration>,
387    retry_policy: Option<RetryPolicy>,
388    start_paused: bool,
389    events_enabled: bool,
390}
391
392#[derive(Clone)]
393/// Timer handle for managing one-time and recurring tasks.
394pub struct Timer {
395    inner: Arc<TimerInner>,
396}
397
398impl Default for Timer {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404impl Timer {
405    /// Creates a new timer.
406    pub fn new() -> Self {
407        Self::new_with_events(true)
408    }
409
410    /// Creates a new timer with broadcast events disabled.
411    pub fn new_silent() -> Self {
412        Self::new_with_events(false)
413    }
414
415    fn new_with_events(events_enabled: bool) -> Self {
416        let (completion_tx, _completion_rx) = watch::channel(None);
417        let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
418
419        Self {
420            inner: Arc::new(TimerInner {
421                state: Mutex::new(TimerState::Stopped),
422                handle: Mutex::new(None),
423                command_tx: Mutex::new(None),
424                interval: Mutex::new(Duration::ZERO),
425                expiration_count: Mutex::new(None),
426                statistics: Mutex::new(TimerStatistics::default()),
427                last_outcome: Mutex::new(None),
428                completion_tx,
429                event_tx,
430                events_enabled: AtomicBool::new(events_enabled),
431                runtime: driver::RuntimeHandle,
432                next_run_id: AtomicU64::new(1),
433                active_run_id: AtomicU64::new(0),
434            }),
435        }
436    }
437
438    /// Creates a timer builder configured for a one-time run.
439    pub fn once(delay: Duration) -> TimerBuilder {
440        TimerBuilder::once(delay)
441    }
442
443    /// Creates a timer builder configured for a recurring schedule.
444    pub fn recurring(schedule: RecurringSchedule) -> TimerBuilder {
445        TimerBuilder::recurring(schedule)
446    }
447
448    /// Subscribes to future timer events.
449    pub fn subscribe(&self) -> TimerEvents {
450        TimerEvents {
451            receiver: self.inner.event_tx.subscribe(),
452        }
453    }
454
455    /// Subscribes to completed runs without loss.
456    pub fn completion(&self) -> TimerCompletion {
457        TimerCompletion {
458            receiver: self.inner.completion_tx.subscribe(),
459        }
460    }
461
462    /// Starts a one-time timer.
463    pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
464    where
465        F: TimerCallback + 'static,
466    {
467        self.start_internal(
468            RunConfig {
469                interval: delay,
470                initial_delay: None,
471                callback_timeout: None,
472                retry_policy: None,
473                recurring: false,
474                cadence: RecurringCadence::FixedDelay,
475                expiration_count: None,
476            },
477            callback,
478            false,
479        )
480        .await
481    }
482
483    /// Starts a one-time timer from an async closure.
484    pub async fn start_once_fn<F, Fut>(
485        &self,
486        delay: Duration,
487        callback: F,
488    ) -> Result<u64, TimerError>
489    where
490        F: Fn() -> Fut + Send + Sync + 'static,
491        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
492    {
493        self.start_once(delay, callback).await
494    }
495
496    /// Starts a recurring timer with the provided schedule.
497    pub async fn start_recurring<F>(
498        &self,
499        schedule: RecurringSchedule,
500        callback: F,
501    ) -> Result<u64, TimerError>
502    where
503        F: TimerCallback + 'static,
504    {
505        self.start_internal(
506            RunConfig {
507                interval: schedule.interval,
508                initial_delay: schedule.initial_delay,
509                callback_timeout: None,
510                retry_policy: None,
511                recurring: true,
512                cadence: schedule.cadence,
513                expiration_count: schedule.expiration_count,
514            },
515            callback,
516            false,
517        )
518        .await
519    }
520
521    /// Starts a recurring timer from an async closure.
522    pub async fn start_recurring_fn<F, Fut>(
523        &self,
524        schedule: RecurringSchedule,
525        callback: F,
526    ) -> Result<u64, TimerError>
527    where
528        F: Fn() -> Fut + Send + Sync + 'static,
529        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
530    {
531        self.start_recurring(schedule, callback).await
532    }
533
534    /// Pauses a running timer.
535    pub async fn pause(&self) -> Result<(), TimerError> {
536        self.ensure_not_reentrant(
537            "pause() cannot be awaited from the timer's active callback; use request_pause().",
538        )?;
539        self.request_pause().await
540    }
541
542    /// Requests that the current run pause after the current callback or sleep edge.
543    pub async fn request_pause(&self) -> Result<(), TimerError> {
544        let _run_id = self
545            .active_run_id()
546            .await
547            .ok_or_else(TimerError::not_running)?;
548        let mut state = self.inner.state.lock().await;
549        if *state != TimerState::Running {
550            return Err(TimerError::not_running());
551        }
552
553        *state = TimerState::Paused;
554        drop(state);
555
556        self.send_command(TimerCommand::Pause).await;
557
558        #[cfg(feature = "logging")]
559        debug!("Timer paused.");
560
561        Ok(())
562    }
563
564    /// Resumes a paused timer.
565    pub async fn resume(&self) -> Result<(), TimerError> {
566        self.ensure_not_reentrant(
567            "resume() cannot be awaited from the timer's active callback; use request_resume().",
568        )?;
569        self.request_resume().await
570    }
571
572    /// Requests that a paused timer resume.
573    pub async fn request_resume(&self) -> Result<(), TimerError> {
574        let _run_id = self
575            .active_run_id()
576            .await
577            .ok_or_else(TimerError::not_paused)?;
578        let mut state = self.inner.state.lock().await;
579        if *state != TimerState::Paused {
580            return Err(TimerError::not_paused());
581        }
582
583        *state = TimerState::Running;
584        drop(state);
585
586        self.send_command(TimerCommand::Resume).await;
587
588        #[cfg(feature = "logging")]
589        debug!("Timer resumed.");
590
591        Ok(())
592    }
593
594    /// Stops the timer after the current callback finishes.
595    pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
596        self.ensure_not_reentrant(
597            "stop() cannot be awaited from the timer's active callback; use request_stop().",
598        )?;
599        let run_id = self
600            .active_run_id()
601            .await
602            .ok_or_else(TimerError::not_running)?;
603        self.request_stop().await?;
604        self.join_run(run_id).await
605    }
606
607    /// Requests a graceful stop without waiting for the outcome.
608    pub async fn request_stop(&self) -> Result<(), TimerError> {
609        self.active_run_id()
610            .await
611            .ok_or_else(TimerError::not_running)?;
612        self.send_command(TimerCommand::Stop).await;
613        Ok(())
614    }
615
616    /// Cancels the timer immediately and aborts the callback task.
617    pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
618        self.ensure_not_reentrant(
619            "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
620        )?;
621        self.cancel_with_reason(TimerFinishReason::Cancelled).await
622    }
623
624    /// Requests cancellation without aborting the active callback task.
625    pub async fn request_cancel(&self) -> Result<(), TimerError> {
626        self.active_run_id()
627            .await
628            .ok_or_else(TimerError::not_running)?;
629        self.send_command(TimerCommand::Cancel).await;
630        Ok(())
631    }
632
633    /// Adjusts the interval of a running or paused timer.
634    pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
635        self.ensure_not_reentrant(
636            "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
637        )?;
638        self.request_adjust_interval(new_interval).await
639    }
640
641    /// Requests an interval adjustment for the current run.
642    pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
643        if new_interval.is_zero() {
644            return Err(TimerError::invalid_parameter(
645                "Interval must be greater than zero.",
646            ));
647        }
648
649        let run_id = self
650            .active_run_id()
651            .await
652            .ok_or_else(TimerError::not_running)?;
653        *self.inner.interval.lock().await = new_interval;
654        self.send_command(TimerCommand::SetInterval(new_interval))
655            .await;
656        runtime::emit_event(
657            &self.inner,
658            TimerEvent::IntervalAdjusted {
659                run_id,
660                interval: new_interval,
661            },
662        );
663
664        #[cfg(feature = "logging")]
665        debug!("Timer interval adjusted.");
666
667        Ok(())
668    }
669
670    /// Waits for the current run and returns the completed outcome.
671    pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
672        self.ensure_not_reentrant(
673            "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
674        )?;
675        if let Some(run_id) = self.active_run_id().await {
676            return self.join_run(run_id).await;
677        }
678
679        self.inner
680            .last_outcome
681            .lock()
682            .await
683            .clone()
684            .ok_or_else(TimerError::not_running)
685    }
686
687    /// Waits until the current run has completed or been stopped.
688    pub async fn wait(&self) {
689        let _ = self.join().await;
690    }
691
692    /// Gets the timer's statistics for the current or most recent run.
693    pub async fn get_statistics(&self) -> TimerStatistics {
694        self.inner.statistics.lock().await.clone()
695    }
696
697    /// Gets the current state of the timer.
698    pub async fn get_state(&self) -> TimerState {
699        *self.inner.state.lock().await
700    }
701
702    /// Gets the timer interval for the current or next run.
703    pub async fn get_interval(&self) -> Duration {
704        *self.inner.interval.lock().await
705    }
706
707    /// Gets the configured expiration count for the current or next run.
708    pub async fn get_expiration_count(&self) -> Option<usize> {
709        *self.inner.expiration_count.lock().await
710    }
711
712    /// Gets the most recent callback error observed for the current or most recent run.
713    pub async fn get_last_error(&self) -> Option<TimerError> {
714        self.inner.statistics.lock().await.last_error.clone()
715    }
716
717    /// Gets the most recent completed run outcome.
718    pub async fn last_outcome(&self) -> Option<TimerOutcome> {
719        self.inner.last_outcome.lock().await.clone()
720    }
721
722    /// Enables or disables broadcast event emission for future runtime events.
723    pub fn set_events_enabled(&self, enabled: bool) {
724        self.inner.events_enabled.store(enabled, Ordering::SeqCst);
725    }
726
727    async fn start_internal<F>(
728        &self,
729        config: RunConfig,
730        callback: F,
731        start_paused: bool,
732    ) -> Result<u64, TimerError>
733    where
734        F: TimerCallback + 'static,
735    {
736        if config.interval.is_zero() {
737            return Err(TimerError::invalid_parameter(
738                "Interval must be greater than zero.",
739            ));
740        }
741
742        if config.recurring && matches!(config.expiration_count, Some(0)) {
743            return Err(TimerError::invalid_parameter(
744                "Expiration count must be greater than zero.",
745            ));
746        }
747
748        if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
749            return Err(TimerError::invalid_parameter(
750                "Initial delay must be greater than zero.",
751            ));
752        }
753
754        if config
755            .callback_timeout
756            .is_some_and(|timeout| timeout.is_zero())
757        {
758            return Err(TimerError::invalid_parameter(
759                "Callback timeout must be greater than zero.",
760            ));
761        }
762
763        self.ensure_not_reentrant(
764            "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
765        )?;
766
767        let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
768
769        let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
770        let (tx, rx) = mpsc::unbounded_channel();
771
772        {
773            *self.inner.state.lock().await = if start_paused {
774                TimerState::Paused
775            } else {
776                TimerState::Running
777            };
778            *self.inner.command_tx.lock().await = Some(tx);
779            *self.inner.interval.lock().await = config.interval;
780            *self.inner.expiration_count.lock().await = config.expiration_count;
781            *self.inner.statistics.lock().await = TimerStatistics::default();
782            *self.inner.last_outcome.lock().await = None;
783            self.inner.completion_tx.send_replace(None);
784        }
785        self.inner.active_run_id.store(run_id, Ordering::SeqCst);
786
787        runtime::emit_event(
788            &self.inner,
789            TimerEvent::Started {
790                run_id,
791                interval: config.interval,
792                recurring: config.recurring,
793                expiration_count: config.expiration_count,
794            },
795        );
796
797        let inner = Arc::clone(&self.inner);
798        let handle = self.inner.runtime.spawn(async move {
799            let scoped_inner = Arc::clone(&inner);
800            runtime::with_run_context(&scoped_inner, run_id, async move {
801                runtime::run_timer(inner, run_id, config, callback, rx).await;
802            })
803            .await;
804        });
805
806        *self.inner.handle.lock().await = Some(handle);
807
808        #[cfg(feature = "logging")]
809        debug!("Timer started.");
810
811        Ok(run_id)
812    }
813
814    async fn active_run_id(&self) -> Option<u64> {
815        match self.inner.active_run_id.load(Ordering::SeqCst) {
816            0 => None,
817            run_id => Some(run_id),
818        }
819    }
820
821    async fn send_command(&self, command: TimerCommand) {
822        if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
823            let _ = tx.send(command);
824        }
825    }
826
827    fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
828        if runtime::is_current_run(&self.inner) {
829            return Err(TimerError::reentrant_operation(message));
830        }
831
832        Ok(())
833    }
834
835    async fn cancel_with_reason(
836        &self,
837        reason: TimerFinishReason,
838    ) -> Result<TimerOutcome, TimerError> {
839        let run_id = self
840            .active_run_id()
841            .await
842            .ok_or_else(TimerError::not_running)?;
843
844        let _ = self.inner.command_tx.lock().await.take();
845        let handle = self.inner.handle.lock().await.take();
846        *self.inner.state.lock().await = TimerState::Stopped;
847
848        if let Some(handle) = handle {
849            handle.abort();
850            let _ = handle.await;
851        }
852
853        let statistics = self.get_statistics().await;
854        let outcome = TimerOutcome {
855            run_id,
856            reason,
857            statistics,
858        };
859
860        runtime::finish_run(&self.inner, outcome.clone()).await;
861        Ok(outcome)
862    }
863
864    async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
865        let mut completion_rx = self.inner.completion_tx.subscribe();
866
867        loop {
868            if let Some(outcome) = completion_rx.borrow().clone() {
869                if outcome.run_id == run_id {
870                    return Ok(outcome);
871                }
872            }
873
874            if completion_rx.changed().await.is_err() {
875                return completion_rx
876                    .borrow()
877                    .clone()
878                    .ok_or_else(TimerError::not_running);
879            }
880        }
881    }
882}
883
884impl TimerBuilder {
885    /// Creates a builder for a one-time timer.
886    pub fn once(delay: Duration) -> Self {
887        Self {
888            kind: TimerKind::Once(delay),
889            callback_timeout: None,
890            retry_policy: None,
891            start_paused: false,
892            events_enabled: true,
893        }
894    }
895
896    /// Creates a builder for a recurring schedule.
897    pub fn recurring(schedule: RecurringSchedule) -> Self {
898        Self {
899            kind: TimerKind::Recurring(schedule),
900            callback_timeout: None,
901            retry_policy: None,
902            start_paused: false,
903            events_enabled: true,
904        }
905    }
906
907    /// Sets a timeout for each callback execution.
908    pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
909        self.callback_timeout = Some(callback_timeout);
910        self
911    }
912
913    /// Retries failed callback executions according to the provided policy.
914    pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
915        self.retry_policy = Some(retry_policy);
916        self
917    }
918
919    /// Retries failed callback executions up to `max_retries` times.
920    pub fn max_retries(mut self, max_retries: usize) -> Self {
921        self.retry_policy = Some(RetryPolicy::new(max_retries));
922        self
923    }
924
925    /// Starts the timer in the paused state.
926    pub fn paused_start(mut self) -> Self {
927        self.start_paused = true;
928        self
929    }
930
931    /// Disables broadcast event emission for the timer.
932    pub fn with_events_disabled(mut self) -> Self {
933        self.events_enabled = false;
934        self
935    }
936
937    /// Starts the configured timer and returns the handle.
938    pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
939    where
940        F: TimerCallback + 'static,
941    {
942        let timer = Timer::new_with_events(self.events_enabled);
943        if self.start_paused {
944            *timer.inner.state.lock().await = TimerState::Paused;
945        }
946
947        match self.kind {
948            TimerKind::Once(delay) => {
949                let _ = timer
950                    .start_internal(
951                        RunConfig {
952                            interval: delay,
953                            initial_delay: None,
954                            callback_timeout: self.callback_timeout,
955                            retry_policy: self.retry_policy,
956                            recurring: false,
957                            cadence: RecurringCadence::FixedDelay,
958                            expiration_count: None,
959                        },
960                        callback,
961                        self.start_paused,
962                    )
963                    .await?;
964            }
965            TimerKind::Recurring(schedule) => {
966                let _ = timer
967                    .start_internal(
968                        RunConfig {
969                            interval: schedule.interval,
970                            initial_delay: schedule.initial_delay,
971                            callback_timeout: self.callback_timeout,
972                            retry_policy: self.retry_policy,
973                            recurring: true,
974                            cadence: schedule.cadence,
975                            expiration_count: schedule.expiration_count,
976                        },
977                        callback,
978                        self.start_paused,
979                    )
980                    .await?;
981            }
982        }
983        Ok(timer)
984    }
985}