Skip to main content

timer_lib/timer/
mod.rs

1use async_trait::async_trait;
2use std::future::Future;
3use std::sync::{
4    atomic::{AtomicBool, AtomicU64, Ordering},
5    Arc,
6};
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, watch, Mutex};
9use tokio::task::JoinHandle;
10
11#[cfg(feature = "logging")]
12use log::debug;
13
14use crate::errors::TimerError;
15
16mod driver;
17mod runtime;
18
19#[cfg(test)]
20mod tests;
21
22const TIMER_EVENT_BUFFER: usize = 64;
23
24/// Represents the state of a timer.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum TimerState {
27    Running,
28    Paused,
29    Stopped,
30}
31
32/// Indicates how a timer run ended.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TimerFinishReason {
35    Completed,
36    Stopped,
37    Cancelled,
38    Replaced,
39}
40
41/// Statistics for a timer run.
42#[derive(Debug, Clone, Default, PartialEq, Eq)]
43pub struct TimerStatistics {
44    /// Number of callback executions attempted during the current run.
45    pub execution_count: usize,
46    /// Number of successful callback executions.
47    pub successful_executions: usize,
48    /// Number of failed callback executions.
49    pub failed_executions: usize,
50    /// Total elapsed time since the current run started.
51    pub elapsed_time: Duration,
52    /// The most recent callback error observed in the current run.
53    pub last_error: Option<TimerError>,
54}
55
56/// Describes the result of a completed timer run.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TimerOutcome {
59    /// The completed run identifier.
60    pub run_id: u64,
61    /// The reason the run ended.
62    pub reason: TimerFinishReason,
63    /// Final run statistics.
64    pub statistics: TimerStatistics,
65}
66
67/// Configures retry behavior for failed callback executions.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub struct RetryPolicy {
70    max_retries: usize,
71}
72
73impl RetryPolicy {
74    /// Creates a retry policy with the provided retry limit.
75    pub fn new(max_retries: usize) -> Self {
76        Self { max_retries }
77    }
78
79    /// Returns the maximum number of retry attempts after the first failure.
80    pub fn max_retries(self) -> usize {
81        self.max_retries
82    }
83}
84
85/// Event stream item produced by a timer.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub enum TimerEvent {
88    Started {
89        run_id: u64,
90        interval: Duration,
91        recurring: bool,
92        expiration_count: Option<usize>,
93    },
94    Paused {
95        run_id: u64,
96    },
97    Resumed {
98        run_id: u64,
99    },
100    IntervalAdjusted {
101        run_id: u64,
102        interval: Duration,
103    },
104    Tick {
105        run_id: u64,
106        statistics: TimerStatistics,
107    },
108    CallbackFailed {
109        run_id: u64,
110        error: TimerError,
111        statistics: TimerStatistics,
112    },
113    Finished(TimerOutcome),
114}
115
116/// Subscription handle for timer events.
117pub struct TimerEvents {
118    receiver: broadcast::Receiver<TimerEvent>,
119}
120
121impl TimerEvents {
122    /// Attempts to receive the next timer event without waiting.
123    pub fn try_recv(&mut self) -> Option<TimerEvent> {
124        loop {
125            match self.receiver.try_recv() {
126                Ok(event) => return Some(event),
127                Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
128                Err(broadcast::error::TryRecvError::Empty)
129                | Err(broadcast::error::TryRecvError::Closed) => return None,
130            }
131        }
132    }
133
134    /// Waits for the next timer event.
135    pub async fn recv(&mut self) -> Option<TimerEvent> {
136        loop {
137            match self.receiver.recv().await {
138                Ok(event) => return Some(event),
139                Err(broadcast::error::RecvError::Lagged(_)) => continue,
140                Err(broadcast::error::RecvError::Closed) => return None,
141            }
142        }
143    }
144
145    /// Waits for the next start event.
146    pub async fn wait_started(&mut self) -> Option<TimerEvent> {
147        loop {
148            if let event @ TimerEvent::Started { .. } = self.recv().await? {
149                return Some(event);
150            }
151        }
152    }
153
154    /// Waits for the next tick event.
155    pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
156        loop {
157            if let event @ TimerEvent::Tick { .. } = self.recv().await? {
158                return Some(event);
159            }
160        }
161    }
162
163    /// Waits for the next paused event.
164    pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
165        loop {
166            if let event @ TimerEvent::Paused { .. } = self.recv().await? {
167                return Some(event);
168            }
169        }
170    }
171
172    /// Waits for the next resumed event.
173    pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
174        loop {
175            if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
176                return Some(event);
177            }
178        }
179    }
180
181    /// Waits for the next finished event.
182    pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
183        loop {
184            if let TimerEvent::Finished(outcome) = self.recv().await? {
185                return Some(outcome);
186            }
187        }
188    }
189
190    /// Waits for the next finished event with a stopped outcome.
191    pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
192        self.wait_finished_reason(TimerFinishReason::Stopped).await
193    }
194
195    /// Waits for the next finished event with a cancelled outcome.
196    pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
197        self.wait_finished_reason(TimerFinishReason::Cancelled)
198            .await
199    }
200
201    async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
202        loop {
203            let outcome = self.wait_finished().await?;
204            if outcome.reason == reason {
205                return Some(outcome);
206            }
207        }
208    }
209}
210
211/// Lossless subscription handle for completed timer runs.
212pub struct TimerCompletion {
213    receiver: watch::Receiver<Option<TimerOutcome>>,
214}
215
216impl TimerCompletion {
217    /// Returns the latest completed outcome, if one has been observed.
218    pub fn latest(&self) -> Option<TimerOutcome> {
219        self.receiver.borrow().clone()
220    }
221
222    /// Waits for the next unseen completed outcome.
223    pub async fn wait(&mut self) -> Option<TimerOutcome> {
224        loop {
225            if let Some(outcome) = self.receiver.borrow_and_update().clone() {
226                return Some(outcome);
227            }
228
229            if self.receiver.changed().await.is_err() {
230                return self.receiver.borrow_and_update().clone();
231            }
232        }
233    }
234
235    /// Waits for a specific run to complete.
236    pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
237        loop {
238            let outcome = self.wait().await?;
239            if outcome.run_id == run_id {
240                return Some(outcome);
241            }
242        }
243    }
244}
245
246/// A trait for timer callbacks.
247#[async_trait]
248pub trait TimerCallback: Send + Sync {
249    /// The function to execute when the timer triggers.
250    async fn execute(&self) -> Result<(), TimerError>;
251}
252
253#[async_trait]
254impl<F, Fut> TimerCallback for F
255where
256    F: Fn() -> Fut + Send + Sync,
257    Fut: Future<Output = Result<(), TimerError>> + Send,
258{
259    async fn execute(&self) -> Result<(), TimerError> {
260        (self)().await
261    }
262}
263
264pub(super) enum TimerCommand {
265    Pause,
266    Resume,
267    Stop,
268    Cancel,
269    SetInterval(Duration),
270}
271
272pub(super) struct TimerInner {
273    pub(super) state: Mutex<TimerState>,
274    pub(super) handle: Mutex<Option<JoinHandle<()>>>,
275    pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
276    pub(super) interval: Mutex<Duration>,
277    pub(super) expiration_count: Mutex<Option<usize>>,
278    pub(super) statistics: Mutex<TimerStatistics>,
279    pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
280    pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
281    pub(super) event_tx: broadcast::Sender<TimerEvent>,
282    pub(super) events_enabled: AtomicBool,
283    pub(super) runtime: driver::RuntimeHandle,
284    pub(super) next_run_id: AtomicU64,
285    pub(super) active_run_id: AtomicU64,
286}
287
288#[derive(Debug, Clone, Copy)]
289pub(super) struct RunConfig {
290    pub(super) interval: Duration,
291    pub(super) initial_delay: Option<Duration>,
292    pub(super) callback_timeout: Option<Duration>,
293    pub(super) retry_policy: Option<RetryPolicy>,
294    pub(super) recurring: bool,
295    pub(super) expiration_count: Option<usize>,
296}
297
298#[derive(Debug, Clone, Copy)]
299enum TimerKind {
300    Once(Duration),
301    Recurring {
302        interval: Duration,
303        initial_delay: Option<Duration>,
304    },
305}
306
307/// Builds and starts a timer with less boilerplate.
308pub struct TimerBuilder {
309    kind: TimerKind,
310    expiration_count: Option<usize>,
311    callback_timeout: Option<Duration>,
312    retry_policy: Option<RetryPolicy>,
313    start_paused: bool,
314    events_enabled: bool,
315}
316
317#[derive(Clone)]
318/// Timer handle for managing one-time and recurring tasks.
319pub struct Timer {
320    inner: Arc<TimerInner>,
321}
322
323impl Default for Timer {
324    fn default() -> Self {
325        Self::new()
326    }
327}
328
329impl Timer {
330    /// Creates a new timer.
331    pub fn new() -> Self {
332        Self::new_with_events(true)
333    }
334
335    /// Creates a new timer with broadcast events disabled.
336    pub fn new_silent() -> Self {
337        Self::new_with_events(false)
338    }
339
340    fn new_with_events(events_enabled: bool) -> Self {
341        let (completion_tx, _completion_rx) = watch::channel(None);
342        let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
343
344        Self {
345            inner: Arc::new(TimerInner {
346                state: Mutex::new(TimerState::Stopped),
347                handle: Mutex::new(None),
348                command_tx: Mutex::new(None),
349                interval: Mutex::new(Duration::ZERO),
350                expiration_count: Mutex::new(None),
351                statistics: Mutex::new(TimerStatistics::default()),
352                last_outcome: Mutex::new(None),
353                completion_tx,
354                event_tx,
355                events_enabled: AtomicBool::new(events_enabled),
356                runtime: driver::RuntimeHandle,
357                next_run_id: AtomicU64::new(1),
358                active_run_id: AtomicU64::new(0),
359            }),
360        }
361    }
362
363    /// Creates a timer builder configured for a one-time run.
364    pub fn once(delay: Duration) -> TimerBuilder {
365        TimerBuilder::once(delay)
366    }
367
368    /// Creates a timer builder configured for a recurring run.
369    pub fn recurring(interval: Duration) -> TimerBuilder {
370        TimerBuilder::recurring(interval)
371    }
372
373    /// Subscribes to future timer events.
374    pub fn subscribe(&self) -> TimerEvents {
375        TimerEvents {
376            receiver: self.inner.event_tx.subscribe(),
377        }
378    }
379
380    /// Subscribes to completed runs without loss.
381    pub fn completion(&self) -> TimerCompletion {
382        TimerCompletion {
383            receiver: self.inner.completion_tx.subscribe(),
384        }
385    }
386
387    /// Starts a one-time timer.
388    pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
389    where
390        F: TimerCallback + 'static,
391    {
392        self.start_internal(
393            RunConfig {
394                interval: delay,
395                initial_delay: None,
396                callback_timeout: None,
397                retry_policy: None,
398                recurring: false,
399                expiration_count: None,
400            },
401            callback,
402            false,
403        )
404        .await
405    }
406
407    /// Starts a one-time timer from an async closure.
408    pub async fn start_once_fn<F, Fut>(
409        &self,
410        delay: Duration,
411        callback: F,
412    ) -> Result<u64, TimerError>
413    where
414        F: Fn() -> Fut + Send + Sync + 'static,
415        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
416    {
417        self.start_once(delay, callback).await
418    }
419
420    /// Starts a recurring timer with an optional expiration count.
421    pub async fn start_recurring<F>(
422        &self,
423        interval: Duration,
424        callback: F,
425        expiration_count: Option<usize>,
426    ) -> Result<u64, TimerError>
427    where
428        F: TimerCallback + 'static,
429    {
430        self.start_recurring_with_delay(interval, None, callback, expiration_count)
431            .await
432    }
433
434    /// Starts a recurring timer from an async closure.
435    pub async fn start_recurring_fn<F, Fut>(
436        &self,
437        interval: Duration,
438        callback: F,
439        expiration_count: Option<usize>,
440    ) -> Result<u64, TimerError>
441    where
442        F: Fn() -> Fut + Send + Sync + 'static,
443        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
444    {
445        self.start_recurring(interval, callback, expiration_count)
446            .await
447    }
448
449    /// Starts a recurring timer with an optional initial delay.
450    pub async fn start_recurring_with_delay<F>(
451        &self,
452        interval: Duration,
453        initial_delay: Option<Duration>,
454        callback: F,
455        expiration_count: Option<usize>,
456    ) -> Result<u64, TimerError>
457    where
458        F: TimerCallback + 'static,
459    {
460        self.start_internal(
461            RunConfig {
462                interval,
463                initial_delay,
464                callback_timeout: None,
465                retry_policy: None,
466                recurring: true,
467                expiration_count,
468            },
469            callback,
470            false,
471        )
472        .await
473    }
474
475    /// Pauses a running timer.
476    pub async fn pause(&self) -> Result<(), TimerError> {
477        self.ensure_not_reentrant(
478            "pause() cannot be awaited from the timer's active callback; use request_pause().",
479        )?;
480        self.request_pause().await
481    }
482
483    /// Requests that the current run pause after the current callback or sleep edge.
484    pub async fn request_pause(&self) -> Result<(), TimerError> {
485        let _run_id = self
486            .active_run_id()
487            .await
488            .ok_or_else(TimerError::not_running)?;
489        let mut state = self.inner.state.lock().await;
490        if *state != TimerState::Running {
491            return Err(TimerError::not_running());
492        }
493
494        *state = TimerState::Paused;
495        drop(state);
496
497        self.send_command(TimerCommand::Pause).await;
498
499        #[cfg(feature = "logging")]
500        debug!("Timer paused.");
501
502        Ok(())
503    }
504
505    /// Resumes a paused timer.
506    pub async fn resume(&self) -> Result<(), TimerError> {
507        self.ensure_not_reentrant(
508            "resume() cannot be awaited from the timer's active callback; use request_resume().",
509        )?;
510        self.request_resume().await
511    }
512
513    /// Requests that a paused timer resume.
514    pub async fn request_resume(&self) -> Result<(), TimerError> {
515        let _run_id = self
516            .active_run_id()
517            .await
518            .ok_or_else(TimerError::not_paused)?;
519        let mut state = self.inner.state.lock().await;
520        if *state != TimerState::Paused {
521            return Err(TimerError::not_paused());
522        }
523
524        *state = TimerState::Running;
525        drop(state);
526
527        self.send_command(TimerCommand::Resume).await;
528
529        #[cfg(feature = "logging")]
530        debug!("Timer resumed.");
531
532        Ok(())
533    }
534
535    /// Stops the timer after the current callback finishes.
536    pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
537        self.ensure_not_reentrant(
538            "stop() cannot be awaited from the timer's active callback; use request_stop().",
539        )?;
540        let run_id = self
541            .active_run_id()
542            .await
543            .ok_or_else(TimerError::not_running)?;
544        self.request_stop().await?;
545        self.join_run(run_id).await
546    }
547
548    /// Requests a graceful stop without waiting for the outcome.
549    pub async fn request_stop(&self) -> Result<(), TimerError> {
550        self.active_run_id()
551            .await
552            .ok_or_else(TimerError::not_running)?;
553        self.send_command(TimerCommand::Stop).await;
554        Ok(())
555    }
556
557    /// Cancels the timer immediately and aborts the callback task.
558    pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
559        self.ensure_not_reentrant(
560            "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
561        )?;
562        self.cancel_with_reason(TimerFinishReason::Cancelled).await
563    }
564
565    /// Requests cancellation without aborting the active callback task.
566    pub async fn request_cancel(&self) -> Result<(), TimerError> {
567        self.active_run_id()
568            .await
569            .ok_or_else(TimerError::not_running)?;
570        self.send_command(TimerCommand::Cancel).await;
571        Ok(())
572    }
573
574    /// Adjusts the interval of a running or paused timer.
575    pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
576        self.ensure_not_reentrant(
577            "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
578        )?;
579        self.request_adjust_interval(new_interval).await
580    }
581
582    /// Requests an interval adjustment for the current run.
583    pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
584        if new_interval.is_zero() {
585            return Err(TimerError::invalid_parameter(
586                "Interval must be greater than zero.",
587            ));
588        }
589
590        let run_id = self
591            .active_run_id()
592            .await
593            .ok_or_else(TimerError::not_running)?;
594        *self.inner.interval.lock().await = new_interval;
595        self.send_command(TimerCommand::SetInterval(new_interval))
596            .await;
597        runtime::emit_event(
598            &self.inner,
599            TimerEvent::IntervalAdjusted {
600                run_id,
601                interval: new_interval,
602            },
603        );
604
605        #[cfg(feature = "logging")]
606        debug!("Timer interval adjusted.");
607
608        Ok(())
609    }
610
611    /// Waits for the current run and returns the completed outcome.
612    pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
613        self.ensure_not_reentrant(
614            "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
615        )?;
616        if let Some(run_id) = self.active_run_id().await {
617            return self.join_run(run_id).await;
618        }
619
620        self.inner
621            .last_outcome
622            .lock()
623            .await
624            .clone()
625            .ok_or_else(TimerError::not_running)
626    }
627
628    /// Waits until the current run has completed or been stopped.
629    pub async fn wait(&self) {
630        let _ = self.join().await;
631    }
632
633    /// Gets the timer's statistics for the current or most recent run.
634    pub async fn get_statistics(&self) -> TimerStatistics {
635        self.inner.statistics.lock().await.clone()
636    }
637
638    /// Gets the current state of the timer.
639    pub async fn get_state(&self) -> TimerState {
640        *self.inner.state.lock().await
641    }
642
643    /// Gets the timer interval for the current or next run.
644    pub async fn get_interval(&self) -> Duration {
645        *self.inner.interval.lock().await
646    }
647
648    /// Gets the configured expiration count for the current or next run.
649    pub async fn get_expiration_count(&self) -> Option<usize> {
650        *self.inner.expiration_count.lock().await
651    }
652
653    /// Gets the most recent callback error observed for the current or most recent run.
654    pub async fn get_last_error(&self) -> Option<TimerError> {
655        self.inner.statistics.lock().await.last_error.clone()
656    }
657
658    /// Gets the most recent completed run outcome.
659    pub async fn last_outcome(&self) -> Option<TimerOutcome> {
660        self.inner.last_outcome.lock().await.clone()
661    }
662
663    /// Enables or disables broadcast event emission for future runtime events.
664    pub fn set_events_enabled(&self, enabled: bool) {
665        self.inner.events_enabled.store(enabled, Ordering::SeqCst);
666    }
667
668    async fn start_internal<F>(
669        &self,
670        config: RunConfig,
671        callback: F,
672        start_paused: bool,
673    ) -> Result<u64, TimerError>
674    where
675        F: TimerCallback + 'static,
676    {
677        if config.interval.is_zero() {
678            return Err(TimerError::invalid_parameter(
679                "Interval must be greater than zero.",
680            ));
681        }
682
683        if config.recurring && matches!(config.expiration_count, Some(0)) {
684            return Err(TimerError::invalid_parameter(
685                "Expiration count must be greater than zero.",
686            ));
687        }
688
689        if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
690            return Err(TimerError::invalid_parameter(
691                "Initial delay must be greater than zero.",
692            ));
693        }
694
695        if config
696            .callback_timeout
697            .is_some_and(|timeout| timeout.is_zero())
698        {
699            return Err(TimerError::invalid_parameter(
700                "Callback timeout must be greater than zero.",
701            ));
702        }
703
704        self.ensure_not_reentrant(
705            "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
706        )?;
707
708        let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
709
710        let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
711        let (tx, rx) = mpsc::unbounded_channel();
712
713        {
714            *self.inner.state.lock().await = if start_paused {
715                TimerState::Paused
716            } else {
717                TimerState::Running
718            };
719            *self.inner.command_tx.lock().await = Some(tx);
720            *self.inner.interval.lock().await = config.interval;
721            *self.inner.expiration_count.lock().await = config.expiration_count;
722            *self.inner.statistics.lock().await = TimerStatistics::default();
723            *self.inner.last_outcome.lock().await = None;
724            self.inner.completion_tx.send_replace(None);
725        }
726        self.inner.active_run_id.store(run_id, Ordering::SeqCst);
727
728        runtime::emit_event(
729            &self.inner,
730            TimerEvent::Started {
731                run_id,
732                interval: config.interval,
733                recurring: config.recurring,
734                expiration_count: config.expiration_count,
735            },
736        );
737
738        let inner = Arc::clone(&self.inner);
739        let handle = self.inner.runtime.spawn(async move {
740            let scoped_inner = Arc::clone(&inner);
741            runtime::with_run_context(&scoped_inner, run_id, async move {
742                runtime::run_timer(inner, run_id, config, callback, rx).await;
743            })
744            .await;
745        });
746
747        *self.inner.handle.lock().await = Some(handle);
748
749        #[cfg(feature = "logging")]
750        debug!("Timer started.");
751
752        Ok(run_id)
753    }
754
755    async fn active_run_id(&self) -> Option<u64> {
756        match self.inner.active_run_id.load(Ordering::SeqCst) {
757            0 => None,
758            run_id => Some(run_id),
759        }
760    }
761
762    async fn send_command(&self, command: TimerCommand) {
763        if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
764            let _ = tx.send(command);
765        }
766    }
767
768    fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
769        if runtime::is_current_run(&self.inner) {
770            return Err(TimerError::reentrant_operation(message));
771        }
772
773        Ok(())
774    }
775
776    async fn cancel_with_reason(
777        &self,
778        reason: TimerFinishReason,
779    ) -> Result<TimerOutcome, TimerError> {
780        let run_id = self
781            .active_run_id()
782            .await
783            .ok_or_else(TimerError::not_running)?;
784
785        let _ = self.inner.command_tx.lock().await.take();
786        let handle = self.inner.handle.lock().await.take();
787        *self.inner.state.lock().await = TimerState::Stopped;
788
789        if let Some(handle) = handle {
790            handle.abort();
791            let _ = handle.await;
792        }
793
794        let statistics = self.get_statistics().await;
795        let outcome = TimerOutcome {
796            run_id,
797            reason,
798            statistics,
799        };
800
801        runtime::finish_run(&self.inner, outcome.clone()).await;
802        Ok(outcome)
803    }
804
805    async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
806        let mut completion_rx = self.inner.completion_tx.subscribe();
807
808        loop {
809            if let Some(outcome) = completion_rx.borrow().clone() {
810                if outcome.run_id == run_id {
811                    return Ok(outcome);
812                }
813            }
814
815            if completion_rx.changed().await.is_err() {
816                return completion_rx
817                    .borrow()
818                    .clone()
819                    .ok_or_else(TimerError::not_running);
820            }
821        }
822    }
823}
824
825impl TimerBuilder {
826    /// Creates a builder for a one-time timer.
827    pub fn once(delay: Duration) -> Self {
828        Self {
829            kind: TimerKind::Once(delay),
830            expiration_count: None,
831            callback_timeout: None,
832            retry_policy: None,
833            start_paused: false,
834            events_enabled: true,
835        }
836    }
837
838    /// Creates a builder for a recurring timer.
839    pub fn recurring(interval: Duration) -> Self {
840        Self {
841            kind: TimerKind::Recurring {
842                interval,
843                initial_delay: None,
844            },
845            expiration_count: None,
846            callback_timeout: None,
847            retry_policy: None,
848            start_paused: false,
849            events_enabled: true,
850        }
851    }
852
853    /// Sets the expiration count for a recurring timer.
854    pub fn expiration_count(mut self, expiration_count: usize) -> Self {
855        self.expiration_count = Some(expiration_count);
856        self
857    }
858
859    /// Sets an initial delay before the first recurring execution.
860    pub fn initial_delay(mut self, initial_delay: Duration) -> Self {
861        if let TimerKind::Recurring {
862            initial_delay: builder_initial_delay,
863            ..
864        } = &mut self.kind
865        {
866            *builder_initial_delay = Some(initial_delay);
867        }
868
869        self
870    }
871
872    /// Sets a timeout for each callback execution.
873    pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
874        self.callback_timeout = Some(callback_timeout);
875        self
876    }
877
878    /// Retries failed callback executions according to the provided policy.
879    pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
880        self.retry_policy = Some(retry_policy);
881        self
882    }
883
884    /// Retries failed callback executions up to `max_retries` times.
885    pub fn max_retries(mut self, max_retries: usize) -> Self {
886        self.retry_policy = Some(RetryPolicy::new(max_retries));
887        self
888    }
889
890    /// Starts the timer in the paused state.
891    pub fn paused_start(mut self) -> Self {
892        self.start_paused = true;
893        self
894    }
895
896    /// Disables broadcast event emission for the timer.
897    pub fn with_events_disabled(mut self) -> Self {
898        self.events_enabled = false;
899        self
900    }
901
902    /// Starts the configured timer and returns the handle.
903    pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
904    where
905        F: TimerCallback + 'static,
906    {
907        let timer = Timer::new_with_events(self.events_enabled);
908        if self.start_paused {
909            *timer.inner.state.lock().await = TimerState::Paused;
910        }
911
912        match self.kind {
913            TimerKind::Once(delay) => {
914                let _ = timer
915                    .start_internal(
916                        RunConfig {
917                            interval: delay,
918                            initial_delay: None,
919                            callback_timeout: self.callback_timeout,
920                            retry_policy: self.retry_policy,
921                            recurring: false,
922                            expiration_count: None,
923                        },
924                        callback,
925                        self.start_paused,
926                    )
927                    .await?;
928            }
929            TimerKind::Recurring {
930                interval,
931                initial_delay,
932            } => {
933                let _ = timer
934                    .start_internal(
935                        RunConfig {
936                            interval,
937                            initial_delay,
938                            callback_timeout: self.callback_timeout,
939                            retry_policy: self.retry_policy,
940                            recurring: true,
941                            expiration_count: self.expiration_count,
942                        },
943                        callback,
944                        self.start_paused,
945                    )
946                    .await?;
947            }
948        }
949        Ok(timer)
950    }
951}