Skip to main content

timer_lib/timer/
mod.rs

1use async_trait::async_trait;
2use std::future::Future;
3use std::sync::{
4    atomic::{AtomicBool, AtomicU64, Ordering},
5    Arc,
6};
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, watch, Mutex};
9use tokio::task::JoinHandle;
10
11#[cfg(feature = "logging")]
12use log::debug;
13
14use crate::errors::TimerError;
15
16mod driver;
17mod runtime;
18
19#[cfg(test)]
20mod tests;
21
22const TIMER_EVENT_BUFFER: usize = 64;
23
24/// Represents the state of a timer.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum TimerState {
27    Running,
28    Paused,
29    Stopped,
30}
31
32/// Indicates how a timer run ended.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TimerFinishReason {
35    Completed,
36    Stopped,
37    Cancelled,
38    Replaced,
39}
40
41/// Statistics for a timer run.
42#[derive(Debug, Clone, Default, PartialEq, Eq)]
43pub struct TimerStatistics {
44    /// Number of callback executions attempted during the current run.
45    pub execution_count: usize,
46    /// Number of successful callback executions.
47    pub successful_executions: usize,
48    /// Number of failed callback executions.
49    pub failed_executions: usize,
50    /// Total elapsed time since the current run started.
51    pub elapsed_time: Duration,
52    /// The most recent callback error observed in the current run.
53    pub last_error: Option<TimerError>,
54}
55
56/// Describes the result of a completed timer run.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TimerOutcome {
59    /// The completed run identifier.
60    pub run_id: u64,
61    /// The reason the run ended.
62    pub reason: TimerFinishReason,
63    /// Final run statistics.
64    pub statistics: TimerStatistics,
65}
66
67/// Event stream item produced by a timer.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum TimerEvent {
70    Started {
71        run_id: u64,
72        interval: Duration,
73        recurring: bool,
74        expiration_count: Option<usize>,
75    },
76    Paused {
77        run_id: u64,
78    },
79    Resumed {
80        run_id: u64,
81    },
82    IntervalAdjusted {
83        run_id: u64,
84        interval: Duration,
85    },
86    Tick {
87        run_id: u64,
88        statistics: TimerStatistics,
89    },
90    CallbackFailed {
91        run_id: u64,
92        error: TimerError,
93        statistics: TimerStatistics,
94    },
95    Finished(TimerOutcome),
96}
97
98/// Subscription handle for timer events.
99pub struct TimerEvents {
100    receiver: broadcast::Receiver<TimerEvent>,
101}
102
103impl TimerEvents {
104    /// Attempts to receive the next timer event without waiting.
105    pub fn try_recv(&mut self) -> Option<TimerEvent> {
106        loop {
107            match self.receiver.try_recv() {
108                Ok(event) => return Some(event),
109                Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
110                Err(broadcast::error::TryRecvError::Empty)
111                | Err(broadcast::error::TryRecvError::Closed) => return None,
112            }
113        }
114    }
115
116    /// Waits for the next timer event.
117    pub async fn recv(&mut self) -> Option<TimerEvent> {
118        loop {
119            match self.receiver.recv().await {
120                Ok(event) => return Some(event),
121                Err(broadcast::error::RecvError::Lagged(_)) => continue,
122                Err(broadcast::error::RecvError::Closed) => return None,
123            }
124        }
125    }
126
127    /// Waits for the next start event.
128    pub async fn wait_started(&mut self) -> Option<TimerEvent> {
129        loop {
130            if let event @ TimerEvent::Started { .. } = self.recv().await? {
131                return Some(event);
132            }
133        }
134    }
135
136    /// Waits for the next tick event.
137    pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
138        loop {
139            if let event @ TimerEvent::Tick { .. } = self.recv().await? {
140                return Some(event);
141            }
142        }
143    }
144
145    /// Waits for the next finished event.
146    pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
147        loop {
148            if let TimerEvent::Finished(outcome) = self.recv().await? {
149                return Some(outcome);
150            }
151        }
152    }
153}
154
155/// Lossless subscription handle for completed timer runs.
156pub struct TimerCompletion {
157    receiver: watch::Receiver<Option<TimerOutcome>>,
158}
159
160impl TimerCompletion {
161    /// Returns the latest completed outcome, if one has been observed.
162    pub fn latest(&self) -> Option<TimerOutcome> {
163        self.receiver.borrow().clone()
164    }
165
166    /// Waits for the next unseen completed outcome.
167    pub async fn wait(&mut self) -> Option<TimerOutcome> {
168        loop {
169            if let Some(outcome) = self.receiver.borrow_and_update().clone() {
170                return Some(outcome);
171            }
172
173            if self.receiver.changed().await.is_err() {
174                return self.receiver.borrow_and_update().clone();
175            }
176        }
177    }
178
179    /// Waits for a specific run to complete.
180    pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
181        loop {
182            let outcome = self.wait().await?;
183            if outcome.run_id == run_id {
184                return Some(outcome);
185            }
186        }
187    }
188}
189
190/// A trait for timer callbacks.
191#[async_trait]
192pub trait TimerCallback: Send + Sync {
193    /// The function to execute when the timer triggers.
194    async fn execute(&self) -> Result<(), TimerError>;
195}
196
197#[async_trait]
198impl<F, Fut> TimerCallback for F
199where
200    F: Fn() -> Fut + Send + Sync,
201    Fut: Future<Output = Result<(), TimerError>> + Send,
202{
203    async fn execute(&self) -> Result<(), TimerError> {
204        (self)().await
205    }
206}
207
208pub(super) enum TimerCommand {
209    Pause,
210    Resume,
211    Stop,
212    Cancel,
213    SetInterval(Duration),
214}
215
216pub(super) struct TimerInner {
217    pub(super) state: Mutex<TimerState>,
218    pub(super) handle: Mutex<Option<JoinHandle<()>>>,
219    pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
220    pub(super) interval: Mutex<Duration>,
221    pub(super) expiration_count: Mutex<Option<usize>>,
222    pub(super) statistics: Mutex<TimerStatistics>,
223    pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
224    pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
225    pub(super) event_tx: broadcast::Sender<TimerEvent>,
226    pub(super) events_enabled: AtomicBool,
227    pub(super) runtime: driver::RuntimeHandle,
228    pub(super) next_run_id: AtomicU64,
229    pub(super) active_run_id: AtomicU64,
230}
231
232#[derive(Debug, Clone, Copy)]
233enum TimerKind {
234    Once(Duration),
235    Recurring(Duration),
236}
237
238/// Builds and starts a timer with less boilerplate.
239pub struct TimerBuilder {
240    kind: TimerKind,
241    expiration_count: Option<usize>,
242    start_paused: bool,
243    events_enabled: bool,
244}
245
246#[derive(Clone)]
247/// Timer handle for managing one-time and recurring tasks.
248pub struct Timer {
249    inner: Arc<TimerInner>,
250}
251
252impl Default for Timer {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258impl Timer {
259    /// Creates a new timer.
260    pub fn new() -> Self {
261        Self::new_with_events(true)
262    }
263
264    /// Creates a new timer with broadcast events disabled.
265    pub fn new_silent() -> Self {
266        Self::new_with_events(false)
267    }
268
269    fn new_with_events(events_enabled: bool) -> Self {
270        let (completion_tx, _completion_rx) = watch::channel(None);
271        let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
272
273        Self {
274            inner: Arc::new(TimerInner {
275                state: Mutex::new(TimerState::Stopped),
276                handle: Mutex::new(None),
277                command_tx: Mutex::new(None),
278                interval: Mutex::new(Duration::ZERO),
279                expiration_count: Mutex::new(None),
280                statistics: Mutex::new(TimerStatistics::default()),
281                last_outcome: Mutex::new(None),
282                completion_tx,
283                event_tx,
284                events_enabled: AtomicBool::new(events_enabled),
285                runtime: driver::RuntimeHandle,
286                next_run_id: AtomicU64::new(1),
287                active_run_id: AtomicU64::new(0),
288            }),
289        }
290    }
291
292    /// Creates a timer builder configured for a one-time run.
293    pub fn once(delay: Duration) -> TimerBuilder {
294        TimerBuilder::once(delay)
295    }
296
297    /// Creates a timer builder configured for a recurring run.
298    pub fn recurring(interval: Duration) -> TimerBuilder {
299        TimerBuilder::recurring(interval)
300    }
301
302    /// Subscribes to future timer events.
303    pub fn subscribe(&self) -> TimerEvents {
304        TimerEvents {
305            receiver: self.inner.event_tx.subscribe(),
306        }
307    }
308
309    /// Subscribes to completed runs without loss.
310    pub fn completion(&self) -> TimerCompletion {
311        TimerCompletion {
312            receiver: self.inner.completion_tx.subscribe(),
313        }
314    }
315
316    /// Starts a one-time timer.
317    pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
318    where
319        F: TimerCallback + 'static,
320    {
321        self.start_internal(delay, callback, false, None, false)
322            .await
323    }
324
325    /// Starts a one-time timer from an async closure.
326    pub async fn start_once_fn<F, Fut>(
327        &self,
328        delay: Duration,
329        callback: F,
330    ) -> Result<u64, TimerError>
331    where
332        F: Fn() -> Fut + Send + Sync + 'static,
333        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
334    {
335        self.start_once(delay, callback).await
336    }
337
338    /// Starts a recurring timer with an optional expiration count.
339    pub async fn start_recurring<F>(
340        &self,
341        interval: Duration,
342        callback: F,
343        expiration_count: Option<usize>,
344    ) -> Result<u64, TimerError>
345    where
346        F: TimerCallback + 'static,
347    {
348        self.start_internal(interval, callback, true, expiration_count, false)
349            .await
350    }
351
352    /// Starts a recurring timer from an async closure.
353    pub async fn start_recurring_fn<F, Fut>(
354        &self,
355        interval: Duration,
356        callback: F,
357        expiration_count: Option<usize>,
358    ) -> Result<u64, TimerError>
359    where
360        F: Fn() -> Fut + Send + Sync + 'static,
361        Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
362    {
363        self.start_recurring(interval, callback, expiration_count)
364            .await
365    }
366
367    /// Pauses a running timer.
368    pub async fn pause(&self) -> Result<(), TimerError> {
369        self.ensure_not_reentrant(
370            "pause() cannot be awaited from the timer's active callback; use request_pause().",
371        )?;
372        self.request_pause().await
373    }
374
375    /// Requests that the current run pause after the current callback or sleep edge.
376    pub async fn request_pause(&self) -> Result<(), TimerError> {
377        let _run_id = self
378            .active_run_id()
379            .await
380            .ok_or_else(TimerError::not_running)?;
381        let mut state = self.inner.state.lock().await;
382        if *state != TimerState::Running {
383            return Err(TimerError::not_running());
384        }
385
386        *state = TimerState::Paused;
387        drop(state);
388
389        self.send_command(TimerCommand::Pause).await;
390
391        #[cfg(feature = "logging")]
392        debug!("Timer paused.");
393
394        Ok(())
395    }
396
397    /// Resumes a paused timer.
398    pub async fn resume(&self) -> Result<(), TimerError> {
399        self.ensure_not_reentrant(
400            "resume() cannot be awaited from the timer's active callback; use request_resume().",
401        )?;
402        self.request_resume().await
403    }
404
405    /// Requests that a paused timer resume.
406    pub async fn request_resume(&self) -> Result<(), TimerError> {
407        let _run_id = self
408            .active_run_id()
409            .await
410            .ok_or_else(TimerError::not_paused)?;
411        let mut state = self.inner.state.lock().await;
412        if *state != TimerState::Paused {
413            return Err(TimerError::not_paused());
414        }
415
416        *state = TimerState::Running;
417        drop(state);
418
419        self.send_command(TimerCommand::Resume).await;
420
421        #[cfg(feature = "logging")]
422        debug!("Timer resumed.");
423
424        Ok(())
425    }
426
427    /// Stops the timer after the current callback finishes.
428    pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
429        self.ensure_not_reentrant(
430            "stop() cannot be awaited from the timer's active callback; use request_stop().",
431        )?;
432        let run_id = self
433            .active_run_id()
434            .await
435            .ok_or_else(TimerError::not_running)?;
436        self.request_stop().await?;
437        self.join_run(run_id).await
438    }
439
440    /// Requests a graceful stop without waiting for the outcome.
441    pub async fn request_stop(&self) -> Result<(), TimerError> {
442        self.active_run_id()
443            .await
444            .ok_or_else(TimerError::not_running)?;
445        self.send_command(TimerCommand::Stop).await;
446        Ok(())
447    }
448
449    /// Cancels the timer immediately and aborts the callback task.
450    pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
451        self.ensure_not_reentrant(
452            "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
453        )?;
454        self.cancel_with_reason(TimerFinishReason::Cancelled).await
455    }
456
457    /// Requests cancellation without aborting the active callback task.
458    pub async fn request_cancel(&self) -> Result<(), TimerError> {
459        self.active_run_id()
460            .await
461            .ok_or_else(TimerError::not_running)?;
462        self.send_command(TimerCommand::Cancel).await;
463        Ok(())
464    }
465
466    /// Adjusts the interval of a running or paused timer.
467    pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
468        self.ensure_not_reentrant(
469            "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
470        )?;
471        self.request_adjust_interval(new_interval).await
472    }
473
474    /// Requests an interval adjustment for the current run.
475    pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
476        if new_interval.is_zero() {
477            return Err(TimerError::invalid_parameter(
478                "Interval must be greater than zero.",
479            ));
480        }
481
482        let run_id = self
483            .active_run_id()
484            .await
485            .ok_or_else(TimerError::not_running)?;
486        *self.inner.interval.lock().await = new_interval;
487        self.send_command(TimerCommand::SetInterval(new_interval))
488            .await;
489        runtime::emit_event(
490            &self.inner,
491            TimerEvent::IntervalAdjusted {
492                run_id,
493                interval: new_interval,
494            },
495        );
496
497        #[cfg(feature = "logging")]
498        debug!("Timer interval adjusted.");
499
500        Ok(())
501    }
502
503    /// Waits for the current run and returns the completed outcome.
504    pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
505        self.ensure_not_reentrant(
506            "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
507        )?;
508        if let Some(run_id) = self.active_run_id().await {
509            return self.join_run(run_id).await;
510        }
511
512        self.inner
513            .last_outcome
514            .lock()
515            .await
516            .clone()
517            .ok_or_else(TimerError::not_running)
518    }
519
520    /// Waits until the current run has completed or been stopped.
521    pub async fn wait(&self) {
522        let _ = self.join().await;
523    }
524
525    /// Gets the timer's statistics for the current or most recent run.
526    pub async fn get_statistics(&self) -> TimerStatistics {
527        self.inner.statistics.lock().await.clone()
528    }
529
530    /// Gets the current state of the timer.
531    pub async fn get_state(&self) -> TimerState {
532        *self.inner.state.lock().await
533    }
534
535    /// Gets the timer interval for the current or next run.
536    pub async fn get_interval(&self) -> Duration {
537        *self.inner.interval.lock().await
538    }
539
540    /// Gets the configured expiration count for the current or next run.
541    pub async fn get_expiration_count(&self) -> Option<usize> {
542        *self.inner.expiration_count.lock().await
543    }
544
545    /// Gets the most recent callback error observed for the current or most recent run.
546    pub async fn get_last_error(&self) -> Option<TimerError> {
547        self.inner.statistics.lock().await.last_error.clone()
548    }
549
550    /// Gets the most recent completed run outcome.
551    pub async fn last_outcome(&self) -> Option<TimerOutcome> {
552        self.inner.last_outcome.lock().await.clone()
553    }
554
555    /// Enables or disables broadcast event emission for future runtime events.
556    pub fn set_events_enabled(&self, enabled: bool) {
557        self.inner.events_enabled.store(enabled, Ordering::SeqCst);
558    }
559
560    async fn start_internal<F>(
561        &self,
562        interval: Duration,
563        callback: F,
564        recurring: bool,
565        expiration_count: Option<usize>,
566        start_paused: bool,
567    ) -> Result<u64, TimerError>
568    where
569        F: TimerCallback + 'static,
570    {
571        if interval.is_zero() {
572            return Err(TimerError::invalid_parameter(
573                "Interval must be greater than zero.",
574            ));
575        }
576
577        if recurring && matches!(expiration_count, Some(0)) {
578            return Err(TimerError::invalid_parameter(
579                "Expiration count must be greater than zero.",
580            ));
581        }
582
583        self.ensure_not_reentrant(
584            "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
585        )?;
586
587        let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
588
589        let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
590        let (tx, rx) = mpsc::unbounded_channel();
591
592        {
593            *self.inner.state.lock().await = if start_paused {
594                TimerState::Paused
595            } else {
596                TimerState::Running
597            };
598            *self.inner.command_tx.lock().await = Some(tx);
599            *self.inner.interval.lock().await = interval;
600            *self.inner.expiration_count.lock().await = expiration_count;
601            *self.inner.statistics.lock().await = TimerStatistics::default();
602            *self.inner.last_outcome.lock().await = None;
603            self.inner.completion_tx.send_replace(None);
604        }
605        self.inner.active_run_id.store(run_id, Ordering::SeqCst);
606
607        runtime::emit_event(
608            &self.inner,
609            TimerEvent::Started {
610                run_id,
611                interval,
612                recurring,
613                expiration_count,
614            },
615        );
616
617        let inner = Arc::clone(&self.inner);
618        let handle = self.inner.runtime.spawn(async move {
619            let scoped_inner = Arc::clone(&inner);
620            runtime::with_run_context(&scoped_inner, run_id, async move {
621                runtime::run_timer(
622                    inner,
623                    run_id,
624                    interval,
625                    recurring,
626                    expiration_count,
627                    callback,
628                    rx,
629                )
630                .await;
631            })
632            .await;
633        });
634
635        *self.inner.handle.lock().await = Some(handle);
636
637        #[cfg(feature = "logging")]
638        debug!("Timer started.");
639
640        Ok(run_id)
641    }
642
643    async fn active_run_id(&self) -> Option<u64> {
644        match self.inner.active_run_id.load(Ordering::SeqCst) {
645            0 => None,
646            run_id => Some(run_id),
647        }
648    }
649
650    async fn send_command(&self, command: TimerCommand) {
651        if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
652            let _ = tx.send(command);
653        }
654    }
655
656    fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
657        if runtime::is_current_run(&self.inner) {
658            return Err(TimerError::reentrant_operation(message));
659        }
660
661        Ok(())
662    }
663
664    async fn cancel_with_reason(
665        &self,
666        reason: TimerFinishReason,
667    ) -> Result<TimerOutcome, TimerError> {
668        let run_id = self
669            .active_run_id()
670            .await
671            .ok_or_else(TimerError::not_running)?;
672
673        let _ = self.inner.command_tx.lock().await.take();
674        let handle = self.inner.handle.lock().await.take();
675        *self.inner.state.lock().await = TimerState::Stopped;
676
677        if let Some(handle) = handle {
678            handle.abort();
679            let _ = handle.await;
680        }
681
682        let statistics = self.get_statistics().await;
683        let outcome = TimerOutcome {
684            run_id,
685            reason,
686            statistics,
687        };
688
689        runtime::finish_run(&self.inner, outcome.clone()).await;
690        Ok(outcome)
691    }
692
693    async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
694        let mut completion_rx = self.inner.completion_tx.subscribe();
695
696        loop {
697            if let Some(outcome) = completion_rx.borrow().clone() {
698                if outcome.run_id == run_id {
699                    return Ok(outcome);
700                }
701            }
702
703            if completion_rx.changed().await.is_err() {
704                return completion_rx
705                    .borrow()
706                    .clone()
707                    .ok_or_else(TimerError::not_running);
708            }
709        }
710    }
711}
712
713impl TimerBuilder {
714    /// Creates a builder for a one-time timer.
715    pub fn once(delay: Duration) -> Self {
716        Self {
717            kind: TimerKind::Once(delay),
718            expiration_count: None,
719            start_paused: false,
720            events_enabled: true,
721        }
722    }
723
724    /// Creates a builder for a recurring timer.
725    pub fn recurring(interval: Duration) -> Self {
726        Self {
727            kind: TimerKind::Recurring(interval),
728            expiration_count: None,
729            start_paused: false,
730            events_enabled: true,
731        }
732    }
733
734    /// Sets the expiration count for a recurring timer.
735    pub fn expiration_count(mut self, expiration_count: usize) -> Self {
736        self.expiration_count = Some(expiration_count);
737        self
738    }
739
740    /// Starts the timer in the paused state.
741    pub fn paused_start(mut self) -> Self {
742        self.start_paused = true;
743        self
744    }
745
746    /// Disables broadcast event emission for the timer.
747    pub fn with_events_disabled(mut self) -> Self {
748        self.events_enabled = false;
749        self
750    }
751
752    /// Starts the configured timer and returns the handle.
753    pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
754    where
755        F: TimerCallback + 'static,
756    {
757        let timer = Timer::new_with_events(self.events_enabled);
758        if self.start_paused {
759            *timer.inner.state.lock().await = TimerState::Paused;
760        }
761
762        match self.kind {
763            TimerKind::Once(delay) => {
764                let _ = timer
765                    .start_internal(delay, callback, false, None, self.start_paused)
766                    .await?;
767            }
768            TimerKind::Recurring(interval) => {
769                let _ = timer
770                    .start_internal(
771                        interval,
772                        callback,
773                        true,
774                        self.expiration_count,
775                        self.start_paused,
776                    )
777                    .await?;
778            }
779        }
780        Ok(timer)
781    }
782}