1use async_trait::async_trait;
2use std::collections::BTreeMap;
3use std::future::Future;
4use std::sync::{
5 atomic::{AtomicBool, AtomicU64, Ordering},
6 Arc,
7};
8use std::time::Duration;
9use tokio::sync::{broadcast, mpsc, watch, Mutex};
10use tokio::task::JoinHandle;
11use tokio::time::Instant;
12
13#[cfg(feature = "logging")]
14use log::debug;
15
16use crate::errors::TimerError;
17
18pub(crate) mod driver;
19mod runtime;
20
21#[cfg(test)]
22mod tests;
23
24#[cfg(feature = "test-util")]
25pub use driver::MockRuntime;
26
27const TIMER_EVENT_BUFFER: usize = 64;
28
29fn saturating_mul_duration(duration: Duration, multiplier: u32) -> Duration {
30 let nanos = duration.as_nanos();
31 let scaled = nanos.saturating_mul(multiplier as u128);
32 let capped = scaled.min(u64::MAX as u128) as u64;
33 Duration::from_nanos(capped)
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum TimerState {
39 Running,
40 Paused,
41 Stopped,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum TimerFinishReason {
47 Completed,
48 Stopped,
49 Cancelled,
50 Replaced,
51}
52
53#[derive(Debug, Clone, Default, PartialEq, Eq)]
55pub struct TimerStatistics {
56 pub execution_count: usize,
58 pub successful_executions: usize,
60 pub failed_executions: usize,
62 pub elapsed_time: Duration,
64 pub last_error: Option<TimerError>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct TimerOutcome {
71 pub run_id: u64,
73 pub reason: TimerFinishReason,
75 pub statistics: TimerStatistics,
77}
78
79#[derive(Debug, Clone, Default, PartialEq, Eq)]
81pub struct TimerMetadata {
82 pub label: Option<String>,
84 pub tags: BTreeMap<String, String>,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct TimerSnapshot {
91 pub state: TimerState,
93 pub interval: Duration,
95 pub expiration_count: Option<usize>,
97 pub statistics: TimerStatistics,
99 pub last_outcome: Option<TimerOutcome>,
101 pub metadata: TimerMetadata,
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum RecurringCadence {
108 FixedDelay,
109 FixedRate,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub struct RecurringSchedule {
115 interval: Duration,
116 initial_delay: Option<Duration>,
117 cadence: RecurringCadence,
118 expiration_count: Option<usize>,
119 jitter: Option<Duration>,
120}
121
122impl RecurringSchedule {
123 pub fn new(interval: Duration) -> Self {
125 Self {
126 interval,
127 initial_delay: None,
128 cadence: RecurringCadence::FixedDelay,
129 expiration_count: None,
130 jitter: None,
131 }
132 }
133
134 pub fn interval(self) -> Duration {
136 self.interval
137 }
138
139 pub fn initial_delay(self) -> Option<Duration> {
141 self.initial_delay
142 }
143
144 pub fn cadence(self) -> RecurringCadence {
146 self.cadence
147 }
148
149 pub fn expiration_count(self) -> Option<usize> {
151 self.expiration_count
152 }
153
154 pub fn jitter(self) -> Option<Duration> {
156 self.jitter
157 }
158
159 pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
161 self.initial_delay = Some(initial_delay);
162 self
163 }
164
165 pub fn with_cadence(mut self, cadence: RecurringCadence) -> Self {
167 self.cadence = cadence;
168 self
169 }
170
171 pub fn fixed_delay(mut self) -> Self {
173 self.cadence = RecurringCadence::FixedDelay;
174 self
175 }
176
177 pub fn fixed_rate(mut self) -> Self {
179 self.cadence = RecurringCadence::FixedRate;
180 self
181 }
182
183 pub fn with_expiration_count(mut self, expiration_count: usize) -> Self {
185 self.expiration_count = Some(expiration_count);
186 self
187 }
188
189 pub fn with_jitter(mut self, jitter: Duration) -> Self {
191 self.jitter = Some(jitter);
192 self
193 }
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub struct RetryPolicy {
199 max_retries: usize,
200 backoff: RetryBackoff,
201}
202
203impl RetryPolicy {
204 pub fn new(max_retries: usize) -> Self {
206 Self {
207 max_retries,
208 backoff: RetryBackoff::Immediate,
209 }
210 }
211
212 pub fn max_retries(self) -> usize {
214 self.max_retries
215 }
216
217 pub fn backoff(self) -> RetryBackoff {
219 self.backoff
220 }
221
222 pub fn with_backoff(mut self, backoff: RetryBackoff) -> Self {
224 self.backoff = backoff;
225 self
226 }
227
228 fn delay_for_retry(self, retry_number: usize) -> Duration {
229 self.backoff.delay_for_retry(retry_number)
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum RetryBackoff {
236 Immediate,
237 Fixed(Duration),
238 Linear(Duration),
239 Exponential(Duration),
240}
241
242impl RetryBackoff {
243 fn delay_for_retry(self, retry_number: usize) -> Duration {
244 match self {
245 Self::Immediate => Duration::ZERO,
246 Self::Fixed(delay) => delay,
247 Self::Linear(step) => saturating_mul_duration(step, retry_number as u32),
248 Self::Exponential(base) => {
249 let exponent = retry_number.saturating_sub(1) as u32;
250 let multiplier = 1_u32.checked_shl(exponent.min(31)).unwrap_or(u32::MAX);
251 saturating_mul_duration(base, multiplier)
252 }
253 }
254 }
255}
256
257#[derive(Debug, Clone, PartialEq, Eq)]
259pub enum TimerEvent {
260 Started {
261 run_id: u64,
262 interval: Duration,
263 recurring: bool,
264 expiration_count: Option<usize>,
265 metadata: TimerMetadata,
266 },
267 Paused {
268 run_id: u64,
269 },
270 Resumed {
271 run_id: u64,
272 },
273 IntervalAdjusted {
274 run_id: u64,
275 interval: Duration,
276 },
277 Tick {
278 run_id: u64,
279 statistics: TimerStatistics,
280 },
281 CallbackFailed {
282 run_id: u64,
283 error: TimerError,
284 statistics: TimerStatistics,
285 },
286 Finished(TimerOutcome),
287}
288
289pub struct TimerEvents {
291 receiver: broadcast::Receiver<TimerEvent>,
292}
293
294impl TimerEvents {
295 pub fn try_recv(&mut self) -> Option<TimerEvent> {
297 loop {
298 match self.receiver.try_recv() {
299 Ok(event) => return Some(event),
300 Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
301 Err(broadcast::error::TryRecvError::Empty)
302 | Err(broadcast::error::TryRecvError::Closed) => return None,
303 }
304 }
305 }
306
307 pub async fn recv(&mut self) -> Option<TimerEvent> {
309 loop {
310 match self.receiver.recv().await {
311 Ok(event) => return Some(event),
312 Err(broadcast::error::RecvError::Lagged(_)) => continue,
313 Err(broadcast::error::RecvError::Closed) => return None,
314 }
315 }
316 }
317
318 pub async fn wait_started(&mut self) -> Option<TimerEvent> {
320 loop {
321 if let event @ TimerEvent::Started { .. } = self.recv().await? {
322 return Some(event);
323 }
324 }
325 }
326
327 pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
329 loop {
330 if let event @ TimerEvent::Tick { .. } = self.recv().await? {
331 return Some(event);
332 }
333 }
334 }
335
336 pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
338 loop {
339 if let event @ TimerEvent::Paused { .. } = self.recv().await? {
340 return Some(event);
341 }
342 }
343 }
344
345 pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
347 loop {
348 if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
349 return Some(event);
350 }
351 }
352 }
353
354 pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
356 loop {
357 if let TimerEvent::Finished(outcome) = self.recv().await? {
358 return Some(outcome);
359 }
360 }
361 }
362
363 pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
365 self.wait_finished_reason(TimerFinishReason::Stopped).await
366 }
367
368 pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
370 self.wait_finished_reason(TimerFinishReason::Cancelled)
371 .await
372 }
373
374 async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
375 loop {
376 let outcome = self.wait_finished().await?;
377 if outcome.reason == reason {
378 return Some(outcome);
379 }
380 }
381 }
382}
383
384pub struct TimerCompletion {
386 receiver: watch::Receiver<Option<TimerOutcome>>,
387}
388
389impl TimerCompletion {
390 pub fn latest(&self) -> Option<TimerOutcome> {
392 self.receiver.borrow().clone()
393 }
394
395 pub async fn wait(&mut self) -> Option<TimerOutcome> {
397 loop {
398 if let Some(outcome) = self.receiver.borrow_and_update().clone() {
399 return Some(outcome);
400 }
401
402 if self.receiver.changed().await.is_err() {
403 return self.receiver.borrow_and_update().clone();
404 }
405 }
406 }
407
408 pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
410 loop {
411 let outcome = self.wait().await?;
412 if outcome.run_id == run_id {
413 return Some(outcome);
414 }
415 }
416 }
417}
418
419#[async_trait]
421pub trait TimerCallback: Send + Sync {
422 async fn execute(&self) -> Result<(), TimerError>;
424}
425
426#[async_trait]
427impl<F, Fut> TimerCallback for F
428where
429 F: Fn() -> Fut + Send + Sync,
430 Fut: Future<Output = Result<(), TimerError>> + Send,
431{
432 async fn execute(&self) -> Result<(), TimerError> {
433 (self)().await
434 }
435}
436
437pub(super) enum TimerCommand {
438 Pause,
439 Resume,
440 Stop,
441 Cancel,
442 SetInterval(Duration),
443}
444
445pub(super) struct TimerInner {
446 pub(super) state: Mutex<TimerState>,
447 pub(super) handle: Mutex<Option<JoinHandle<()>>>,
448 pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
449 pub(super) interval: Mutex<Duration>,
450 pub(super) expiration_count: Mutex<Option<usize>>,
451 pub(super) metadata: Mutex<TimerMetadata>,
452 pub(super) statistics: Mutex<TimerStatistics>,
453 pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
454 pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
455 pub(super) event_tx: broadcast::Sender<TimerEvent>,
456 pub(super) events_enabled: AtomicBool,
457 pub(super) runtime: driver::RuntimeHandle,
458 pub(super) next_run_id: AtomicU64,
459 pub(super) active_run_id: AtomicU64,
460}
461
462#[derive(Debug, Clone)]
463pub(super) struct RunConfig {
464 pub(super) interval: Duration,
465 pub(super) start_deadline: Option<Instant>,
466 pub(super) initial_delay: Option<Duration>,
467 pub(super) jitter: Option<Duration>,
468 pub(super) callback_timeout: Option<Duration>,
469 pub(super) retry_policy: Option<RetryPolicy>,
470 pub(super) recurring: bool,
471 pub(super) cadence: RecurringCadence,
472 pub(super) expiration_count: Option<usize>,
473 pub(super) metadata: TimerMetadata,
474}
475
476#[derive(Debug, Clone, Copy)]
477enum TimerKind {
478 Once(Duration),
479 At(Instant),
480 Recurring(RecurringSchedule),
481}
482
483pub struct TimerBuilder {
485 kind: TimerKind,
486 callback_timeout: Option<Duration>,
487 retry_policy: Option<RetryPolicy>,
488 start_paused: bool,
489 events_enabled: bool,
490 metadata: TimerMetadata,
491}
492
493#[derive(Clone)]
494pub struct Timer {
496 inner: Arc<TimerInner>,
497}
498
499impl Default for Timer {
500 fn default() -> Self {
501 Self::new()
502 }
503}
504
505impl Timer {
506 pub fn new() -> Self {
508 Self::new_with_runtime(driver::RuntimeHandle::default(), true)
509 }
510
511 pub fn new_silent() -> Self {
513 Self::new_with_runtime(driver::RuntimeHandle::default(), false)
514 }
515
516 pub(crate) fn new_with_runtime(runtime: driver::RuntimeHandle, events_enabled: bool) -> Self {
517 let (completion_tx, _completion_rx) = watch::channel(None);
518 let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
519
520 Self {
521 inner: Arc::new(TimerInner {
522 state: Mutex::new(TimerState::Stopped),
523 handle: Mutex::new(None),
524 command_tx: Mutex::new(None),
525 interval: Mutex::new(Duration::ZERO),
526 expiration_count: Mutex::new(None),
527 metadata: Mutex::new(TimerMetadata::default()),
528 statistics: Mutex::new(TimerStatistics::default()),
529 last_outcome: Mutex::new(None),
530 completion_tx,
531 event_tx,
532 events_enabled: AtomicBool::new(events_enabled),
533 runtime,
534 next_run_id: AtomicU64::new(1),
535 active_run_id: AtomicU64::new(0),
536 }),
537 }
538 }
539
540 pub fn once(delay: Duration) -> TimerBuilder {
542 TimerBuilder::once(delay)
543 }
544
545 pub fn at(deadline: Instant) -> TimerBuilder {
547 TimerBuilder::at(deadline)
548 }
549
550 pub fn recurring(schedule: RecurringSchedule) -> TimerBuilder {
552 TimerBuilder::recurring(schedule)
553 }
554
555 pub fn subscribe(&self) -> TimerEvents {
557 TimerEvents {
558 receiver: self.inner.event_tx.subscribe(),
559 }
560 }
561
562 pub fn completion(&self) -> TimerCompletion {
564 TimerCompletion {
565 receiver: self.inner.completion_tx.subscribe(),
566 }
567 }
568
569 #[cfg(feature = "test-util")]
571 pub fn new_mocked() -> (Self, MockRuntime) {
572 let runtime = MockRuntime::new();
573 (Self::new_with_runtime(runtime.handle(), true), runtime)
574 }
575
576 pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
578 where
579 F: TimerCallback + 'static,
580 {
581 let metadata = self.inner.metadata.lock().await.clone();
582 self.start_internal(
583 RunConfig {
584 interval: delay,
585 start_deadline: None,
586 initial_delay: None,
587 jitter: None,
588 callback_timeout: None,
589 retry_policy: None,
590 recurring: false,
591 cadence: RecurringCadence::FixedDelay,
592 expiration_count: None,
593 metadata,
594 },
595 callback,
596 false,
597 )
598 .await
599 }
600
601 pub async fn start_once_fn<F, Fut>(
603 &self,
604 delay: Duration,
605 callback: F,
606 ) -> Result<u64, TimerError>
607 where
608 F: Fn() -> Fut + Send + Sync + 'static,
609 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
610 {
611 self.start_once(delay, callback).await
612 }
613
614 pub async fn start_at<F>(&self, deadline: Instant, callback: F) -> Result<u64, TimerError>
616 where
617 F: TimerCallback + 'static,
618 {
619 let now = self.inner.runtime.now();
620 let metadata = self.inner.metadata.lock().await.clone();
621 self.start_internal(
622 RunConfig {
623 interval: deadline.saturating_duration_since(now),
624 start_deadline: Some(deadline),
625 initial_delay: None,
626 jitter: None,
627 callback_timeout: None,
628 retry_policy: None,
629 recurring: false,
630 cadence: RecurringCadence::FixedDelay,
631 expiration_count: None,
632 metadata,
633 },
634 callback,
635 false,
636 )
637 .await
638 }
639
640 pub async fn start_at_fn<F, Fut>(
642 &self,
643 deadline: Instant,
644 callback: F,
645 ) -> Result<u64, TimerError>
646 where
647 F: Fn() -> Fut + Send + Sync + 'static,
648 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
649 {
650 self.start_at(deadline, callback).await
651 }
652
653 pub async fn start_recurring<F>(
655 &self,
656 schedule: RecurringSchedule,
657 callback: F,
658 ) -> Result<u64, TimerError>
659 where
660 F: TimerCallback + 'static,
661 {
662 let metadata = self.inner.metadata.lock().await.clone();
663 self.start_internal(
664 RunConfig {
665 interval: schedule.interval,
666 start_deadline: None,
667 initial_delay: schedule.initial_delay,
668 jitter: schedule.jitter,
669 callback_timeout: None,
670 retry_policy: None,
671 recurring: true,
672 cadence: schedule.cadence,
673 expiration_count: schedule.expiration_count,
674 metadata,
675 },
676 callback,
677 false,
678 )
679 .await
680 }
681
682 pub async fn start_recurring_fn<F, Fut>(
684 &self,
685 schedule: RecurringSchedule,
686 callback: F,
687 ) -> Result<u64, TimerError>
688 where
689 F: Fn() -> Fut + Send + Sync + 'static,
690 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
691 {
692 self.start_recurring(schedule, callback).await
693 }
694
695 pub async fn pause(&self) -> Result<(), TimerError> {
697 self.ensure_not_reentrant(
698 "pause() cannot be awaited from the timer's active callback; use request_pause().",
699 )?;
700 self.request_pause().await
701 }
702
703 pub async fn request_pause(&self) -> Result<(), TimerError> {
705 let _run_id = self
706 .active_run_id()
707 .await
708 .ok_or_else(TimerError::not_running)?;
709 let mut state = self.inner.state.lock().await;
710 if *state != TimerState::Running {
711 return Err(TimerError::not_running());
712 }
713
714 *state = TimerState::Paused;
715 drop(state);
716
717 self.send_command(TimerCommand::Pause).await;
718
719 #[cfg(feature = "logging")]
720 debug!("Timer paused.");
721
722 Ok(())
723 }
724
725 pub async fn resume(&self) -> Result<(), TimerError> {
727 self.ensure_not_reentrant(
728 "resume() cannot be awaited from the timer's active callback; use request_resume().",
729 )?;
730 self.request_resume().await
731 }
732
733 pub async fn request_resume(&self) -> Result<(), TimerError> {
735 let _run_id = self
736 .active_run_id()
737 .await
738 .ok_or_else(TimerError::not_paused)?;
739 let mut state = self.inner.state.lock().await;
740 if *state != TimerState::Paused {
741 return Err(TimerError::not_paused());
742 }
743
744 *state = TimerState::Running;
745 drop(state);
746
747 self.send_command(TimerCommand::Resume).await;
748
749 #[cfg(feature = "logging")]
750 debug!("Timer resumed.");
751
752 Ok(())
753 }
754
755 pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
757 self.ensure_not_reentrant(
758 "stop() cannot be awaited from the timer's active callback; use request_stop().",
759 )?;
760 let run_id = self
761 .active_run_id()
762 .await
763 .ok_or_else(TimerError::not_running)?;
764 self.request_stop().await?;
765 self.join_run(run_id).await
766 }
767
768 pub async fn request_stop(&self) -> Result<(), TimerError> {
770 self.active_run_id()
771 .await
772 .ok_or_else(TimerError::not_running)?;
773 self.send_command(TimerCommand::Stop).await;
774 Ok(())
775 }
776
777 pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
779 self.ensure_not_reentrant(
780 "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
781 )?;
782 self.cancel_with_reason(TimerFinishReason::Cancelled).await
783 }
784
785 pub async fn request_cancel(&self) -> Result<(), TimerError> {
787 self.active_run_id()
788 .await
789 .ok_or_else(TimerError::not_running)?;
790 self.send_command(TimerCommand::Cancel).await;
791 Ok(())
792 }
793
794 pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
796 self.ensure_not_reentrant(
797 "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
798 )?;
799 self.request_adjust_interval(new_interval).await
800 }
801
802 pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
804 if new_interval.is_zero() {
805 return Err(TimerError::invalid_parameter(
806 "Interval must be greater than zero.",
807 ));
808 }
809
810 let run_id = self
811 .active_run_id()
812 .await
813 .ok_or_else(TimerError::not_running)?;
814 *self.inner.interval.lock().await = new_interval;
815 self.send_command(TimerCommand::SetInterval(new_interval))
816 .await;
817 runtime::emit_event(
818 &self.inner,
819 TimerEvent::IntervalAdjusted {
820 run_id,
821 interval: new_interval,
822 },
823 );
824
825 #[cfg(feature = "logging")]
826 debug!("Timer interval adjusted.");
827
828 Ok(())
829 }
830
831 pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
833 self.ensure_not_reentrant(
834 "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
835 )?;
836 if let Some(run_id) = self.active_run_id().await {
837 return self.join_run(run_id).await;
838 }
839
840 self.inner
841 .last_outcome
842 .lock()
843 .await
844 .clone()
845 .ok_or_else(TimerError::not_running)
846 }
847
848 pub async fn wait(&self) {
850 let _ = self.join().await;
851 }
852
853 pub async fn get_statistics(&self) -> TimerStatistics {
855 self.inner.statistics.lock().await.clone()
856 }
857
858 pub async fn get_state(&self) -> TimerState {
860 *self.inner.state.lock().await
861 }
862
863 pub async fn get_interval(&self) -> Duration {
865 *self.inner.interval.lock().await
866 }
867
868 pub async fn get_expiration_count(&self) -> Option<usize> {
870 *self.inner.expiration_count.lock().await
871 }
872
873 pub async fn get_last_error(&self) -> Option<TimerError> {
875 self.inner.statistics.lock().await.last_error.clone()
876 }
877
878 pub async fn metadata(&self) -> TimerMetadata {
880 self.inner.metadata.lock().await.clone()
881 }
882
883 pub async fn label(&self) -> Option<String> {
885 self.inner.metadata.lock().await.label.clone()
886 }
887
888 pub async fn set_label(&self, label: impl Into<String>) {
890 self.inner.metadata.lock().await.label = Some(label.into());
891 }
892
893 pub async fn set_tag(&self, key: impl Into<String>, value: impl Into<String>) {
895 self.inner
896 .metadata
897 .lock()
898 .await
899 .tags
900 .insert(key.into(), value.into());
901 }
902
903 pub async fn snapshot(&self) -> TimerSnapshot {
905 TimerSnapshot {
906 state: self.get_state().await,
907 interval: self.get_interval().await,
908 expiration_count: self.get_expiration_count().await,
909 statistics: self.get_statistics().await,
910 last_outcome: self.last_outcome().await,
911 metadata: self.metadata().await,
912 }
913 }
914
915 pub async fn last_outcome(&self) -> Option<TimerOutcome> {
917 self.inner.last_outcome.lock().await.clone()
918 }
919
920 pub fn set_events_enabled(&self, enabled: bool) {
922 self.inner.events_enabled.store(enabled, Ordering::SeqCst);
923 }
924
925 async fn start_internal<F>(
926 &self,
927 config: RunConfig,
928 callback: F,
929 start_paused: bool,
930 ) -> Result<u64, TimerError>
931 where
932 F: TimerCallback + 'static,
933 {
934 if config.interval.is_zero() && config.start_deadline.is_none() {
935 return Err(TimerError::invalid_parameter(
936 "Interval must be greater than zero.",
937 ));
938 }
939
940 if config.recurring && matches!(config.expiration_count, Some(0)) {
941 return Err(TimerError::invalid_parameter(
942 "Expiration count must be greater than zero.",
943 ));
944 }
945
946 if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
947 return Err(TimerError::invalid_parameter(
948 "Initial delay must be greater than zero.",
949 ));
950 }
951
952 if config.jitter.is_some_and(|jitter| jitter.is_zero()) {
953 return Err(TimerError::invalid_parameter(
954 "Jitter must be greater than zero.",
955 ));
956 }
957
958 if config
959 .callback_timeout
960 .is_some_and(|timeout| timeout.is_zero())
961 {
962 return Err(TimerError::invalid_parameter(
963 "Callback timeout must be greater than zero.",
964 ));
965 }
966
967 if config.retry_policy.is_some_and(|policy| {
968 matches!(
969 policy.backoff(),
970 RetryBackoff::Fixed(duration)
971 | RetryBackoff::Linear(duration)
972 | RetryBackoff::Exponential(duration) if duration.is_zero()
973 )
974 }) {
975 return Err(TimerError::invalid_parameter(
976 "Retry backoff must be greater than zero.",
977 ));
978 }
979
980 self.ensure_not_reentrant(
981 "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
982 )?;
983
984 let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
985
986 let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
987 let (tx, rx) = mpsc::unbounded_channel();
988
989 {
990 *self.inner.state.lock().await = if start_paused {
991 TimerState::Paused
992 } else {
993 TimerState::Running
994 };
995 *self.inner.command_tx.lock().await = Some(tx);
996 *self.inner.interval.lock().await = config.interval;
997 *self.inner.expiration_count.lock().await = config.expiration_count;
998 *self.inner.metadata.lock().await = config.metadata.clone();
999 *self.inner.statistics.lock().await = TimerStatistics::default();
1000 *self.inner.last_outcome.lock().await = None;
1001 self.inner.completion_tx.send_replace(None);
1002 }
1003 self.inner.active_run_id.store(run_id, Ordering::SeqCst);
1004
1005 runtime::emit_event(
1006 &self.inner,
1007 TimerEvent::Started {
1008 run_id,
1009 interval: config.interval,
1010 recurring: config.recurring,
1011 expiration_count: config.expiration_count,
1012 metadata: config.metadata.clone(),
1013 },
1014 );
1015
1016 let inner = Arc::clone(&self.inner);
1017 let handle = self.inner.runtime.spawn(async move {
1018 let scoped_inner = Arc::clone(&inner);
1019 runtime::with_run_context(&scoped_inner, run_id, async move {
1020 runtime::run_timer(inner, run_id, config, callback, rx).await;
1021 })
1022 .await;
1023 });
1024
1025 *self.inner.handle.lock().await = Some(handle);
1026
1027 #[cfg(feature = "logging")]
1028 debug!("Timer started.");
1029
1030 Ok(run_id)
1031 }
1032
1033 async fn active_run_id(&self) -> Option<u64> {
1034 match self.inner.active_run_id.load(Ordering::SeqCst) {
1035 0 => None,
1036 run_id => Some(run_id),
1037 }
1038 }
1039
1040 async fn send_command(&self, command: TimerCommand) {
1041 if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
1042 let _ = tx.send(command);
1043 }
1044 }
1045
1046 fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
1047 if runtime::is_current_run(&self.inner) {
1048 return Err(TimerError::reentrant_operation(message));
1049 }
1050
1051 Ok(())
1052 }
1053
1054 async fn cancel_with_reason(
1055 &self,
1056 reason: TimerFinishReason,
1057 ) -> Result<TimerOutcome, TimerError> {
1058 let run_id = self
1059 .active_run_id()
1060 .await
1061 .ok_or_else(TimerError::not_running)?;
1062
1063 let _ = self.inner.command_tx.lock().await.take();
1064 let handle = self.inner.handle.lock().await.take();
1065 *self.inner.state.lock().await = TimerState::Stopped;
1066
1067 if let Some(handle) = handle {
1068 handle.abort();
1069 let _ = handle.await;
1070 }
1071
1072 let statistics = self.get_statistics().await;
1073 let outcome = TimerOutcome {
1074 run_id,
1075 reason,
1076 statistics,
1077 };
1078
1079 runtime::finish_run(&self.inner, outcome.clone()).await;
1080 Ok(outcome)
1081 }
1082
1083 async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
1084 let mut completion_rx = self.inner.completion_tx.subscribe();
1085
1086 loop {
1087 if let Some(outcome) = completion_rx.borrow().clone() {
1088 if outcome.run_id == run_id {
1089 return Ok(outcome);
1090 }
1091 }
1092
1093 if completion_rx.changed().await.is_err() {
1094 return completion_rx
1095 .borrow()
1096 .clone()
1097 .ok_or_else(TimerError::not_running);
1098 }
1099 }
1100 }
1101}
1102
1103impl TimerBuilder {
1104 pub fn once(delay: Duration) -> Self {
1106 Self {
1107 kind: TimerKind::Once(delay),
1108 callback_timeout: None,
1109 retry_policy: None,
1110 start_paused: false,
1111 events_enabled: true,
1112 metadata: TimerMetadata::default(),
1113 }
1114 }
1115
1116 pub fn at(deadline: Instant) -> Self {
1118 Self {
1119 kind: TimerKind::At(deadline),
1120 callback_timeout: None,
1121 retry_policy: None,
1122 start_paused: false,
1123 events_enabled: true,
1124 metadata: TimerMetadata::default(),
1125 }
1126 }
1127
1128 pub fn recurring(schedule: RecurringSchedule) -> Self {
1130 Self {
1131 kind: TimerKind::Recurring(schedule),
1132 callback_timeout: None,
1133 retry_policy: None,
1134 start_paused: false,
1135 events_enabled: true,
1136 metadata: TimerMetadata::default(),
1137 }
1138 }
1139
1140 pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
1142 self.callback_timeout = Some(callback_timeout);
1143 self
1144 }
1145
1146 pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
1148 self.retry_policy = Some(retry_policy);
1149 self
1150 }
1151
1152 pub fn max_retries(mut self, max_retries: usize) -> Self {
1154 self.retry_policy = Some(RetryPolicy::new(max_retries));
1155 self
1156 }
1157
1158 pub fn fixed_backoff(mut self, delay: Duration) -> Self {
1160 self.retry_policy = Some(
1161 self.retry_policy
1162 .unwrap_or_else(|| RetryPolicy::new(0))
1163 .with_backoff(RetryBackoff::Fixed(delay)),
1164 );
1165 self
1166 }
1167
1168 pub fn linear_backoff(mut self, step: Duration) -> Self {
1170 self.retry_policy = Some(
1171 self.retry_policy
1172 .unwrap_or_else(|| RetryPolicy::new(0))
1173 .with_backoff(RetryBackoff::Linear(step)),
1174 );
1175 self
1176 }
1177
1178 pub fn exponential_backoff(mut self, base: Duration) -> Self {
1180 self.retry_policy = Some(
1181 self.retry_policy
1182 .unwrap_or_else(|| RetryPolicy::new(0))
1183 .with_backoff(RetryBackoff::Exponential(base)),
1184 );
1185 self
1186 }
1187
1188 pub fn paused_start(mut self) -> Self {
1190 self.start_paused = true;
1191 self
1192 }
1193
1194 pub fn with_events_disabled(mut self) -> Self {
1196 self.events_enabled = false;
1197 self
1198 }
1199
1200 pub fn label(mut self, label: impl Into<String>) -> Self {
1202 self.metadata.label = Some(label.into());
1203 self
1204 }
1205
1206 pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1208 self.metadata.tags.insert(key.into(), value.into());
1209 self
1210 }
1211
1212 pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
1214 where
1215 F: TimerCallback + 'static,
1216 {
1217 let Self {
1218 kind,
1219 callback_timeout,
1220 retry_policy,
1221 start_paused,
1222 events_enabled,
1223 metadata,
1224 } = self;
1225
1226 let timer = Timer::new_with_runtime(driver::RuntimeHandle::default(), events_enabled);
1227 if start_paused {
1228 *timer.inner.state.lock().await = TimerState::Paused;
1229 }
1230
1231 match kind {
1232 TimerKind::Once(delay) => {
1233 let _ = timer
1234 .start_internal(
1235 RunConfig {
1236 interval: delay,
1237 start_deadline: None,
1238 initial_delay: None,
1239 jitter: None,
1240 callback_timeout,
1241 retry_policy,
1242 recurring: false,
1243 cadence: RecurringCadence::FixedDelay,
1244 expiration_count: None,
1245 metadata: metadata.clone(),
1246 },
1247 callback,
1248 start_paused,
1249 )
1250 .await?;
1251 }
1252 TimerKind::At(deadline) => {
1253 let now = timer.inner.runtime.now();
1254 let _ = timer
1255 .start_internal(
1256 RunConfig {
1257 interval: deadline.saturating_duration_since(now),
1258 start_deadline: Some(deadline),
1259 initial_delay: None,
1260 jitter: None,
1261 callback_timeout,
1262 retry_policy,
1263 recurring: false,
1264 cadence: RecurringCadence::FixedDelay,
1265 expiration_count: None,
1266 metadata: metadata.clone(),
1267 },
1268 callback,
1269 start_paused,
1270 )
1271 .await?;
1272 }
1273 TimerKind::Recurring(schedule) => {
1274 let _ = timer
1275 .start_internal(
1276 RunConfig {
1277 interval: schedule.interval,
1278 start_deadline: None,
1279 initial_delay: schedule.initial_delay,
1280 jitter: schedule.jitter,
1281 callback_timeout,
1282 retry_policy,
1283 recurring: true,
1284 cadence: schedule.cadence,
1285 expiration_count: schedule.expiration_count,
1286 metadata,
1287 },
1288 callback,
1289 start_paused,
1290 )
1291 .await?;
1292 }
1293 }
1294 Ok(timer)
1295 }
1296}