1use async_trait::async_trait;
2use std::future::Future;
3use std::sync::{
4 atomic::{AtomicBool, AtomicU64, Ordering},
5 Arc,
6};
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, watch, Mutex};
9use tokio::task::JoinHandle;
10
11#[cfg(feature = "logging")]
12use log::debug;
13
14use crate::errors::TimerError;
15
16mod driver;
17mod runtime;
18
19#[cfg(test)]
20mod tests;
21
22const TIMER_EVENT_BUFFER: usize = 64;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum TimerState {
27 Running,
28 Paused,
29 Stopped,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TimerFinishReason {
35 Completed,
36 Stopped,
37 Cancelled,
38 Replaced,
39}
40
41#[derive(Debug, Clone, Default, PartialEq, Eq)]
43pub struct TimerStatistics {
44 pub execution_count: usize,
46 pub successful_executions: usize,
48 pub failed_executions: usize,
50 pub elapsed_time: Duration,
52 pub last_error: Option<TimerError>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TimerOutcome {
59 pub run_id: u64,
61 pub reason: TimerFinishReason,
63 pub statistics: TimerStatistics,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub struct RetryPolicy {
70 max_retries: usize,
71}
72
73impl RetryPolicy {
74 pub fn new(max_retries: usize) -> Self {
76 Self { max_retries }
77 }
78
79 pub fn max_retries(self) -> usize {
81 self.max_retries
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub enum TimerEvent {
88 Started {
89 run_id: u64,
90 interval: Duration,
91 recurring: bool,
92 expiration_count: Option<usize>,
93 },
94 Paused {
95 run_id: u64,
96 },
97 Resumed {
98 run_id: u64,
99 },
100 IntervalAdjusted {
101 run_id: u64,
102 interval: Duration,
103 },
104 Tick {
105 run_id: u64,
106 statistics: TimerStatistics,
107 },
108 CallbackFailed {
109 run_id: u64,
110 error: TimerError,
111 statistics: TimerStatistics,
112 },
113 Finished(TimerOutcome),
114}
115
116pub struct TimerEvents {
118 receiver: broadcast::Receiver<TimerEvent>,
119}
120
121impl TimerEvents {
122 pub fn try_recv(&mut self) -> Option<TimerEvent> {
124 loop {
125 match self.receiver.try_recv() {
126 Ok(event) => return Some(event),
127 Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
128 Err(broadcast::error::TryRecvError::Empty)
129 | Err(broadcast::error::TryRecvError::Closed) => return None,
130 }
131 }
132 }
133
134 pub async fn recv(&mut self) -> Option<TimerEvent> {
136 loop {
137 match self.receiver.recv().await {
138 Ok(event) => return Some(event),
139 Err(broadcast::error::RecvError::Lagged(_)) => continue,
140 Err(broadcast::error::RecvError::Closed) => return None,
141 }
142 }
143 }
144
145 pub async fn wait_started(&mut self) -> Option<TimerEvent> {
147 loop {
148 if let event @ TimerEvent::Started { .. } = self.recv().await? {
149 return Some(event);
150 }
151 }
152 }
153
154 pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
156 loop {
157 if let event @ TimerEvent::Tick { .. } = self.recv().await? {
158 return Some(event);
159 }
160 }
161 }
162
163 pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
165 loop {
166 if let event @ TimerEvent::Paused { .. } = self.recv().await? {
167 return Some(event);
168 }
169 }
170 }
171
172 pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
174 loop {
175 if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
176 return Some(event);
177 }
178 }
179 }
180
181 pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
183 loop {
184 if let TimerEvent::Finished(outcome) = self.recv().await? {
185 return Some(outcome);
186 }
187 }
188 }
189
190 pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
192 self.wait_finished_reason(TimerFinishReason::Stopped).await
193 }
194
195 pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
197 self.wait_finished_reason(TimerFinishReason::Cancelled)
198 .await
199 }
200
201 async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
202 loop {
203 let outcome = self.wait_finished().await?;
204 if outcome.reason == reason {
205 return Some(outcome);
206 }
207 }
208 }
209}
210
211pub struct TimerCompletion {
213 receiver: watch::Receiver<Option<TimerOutcome>>,
214}
215
216impl TimerCompletion {
217 pub fn latest(&self) -> Option<TimerOutcome> {
219 self.receiver.borrow().clone()
220 }
221
222 pub async fn wait(&mut self) -> Option<TimerOutcome> {
224 loop {
225 if let Some(outcome) = self.receiver.borrow_and_update().clone() {
226 return Some(outcome);
227 }
228
229 if self.receiver.changed().await.is_err() {
230 return self.receiver.borrow_and_update().clone();
231 }
232 }
233 }
234
235 pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
237 loop {
238 let outcome = self.wait().await?;
239 if outcome.run_id == run_id {
240 return Some(outcome);
241 }
242 }
243 }
244}
245
246#[async_trait]
248pub trait TimerCallback: Send + Sync {
249 async fn execute(&self) -> Result<(), TimerError>;
251}
252
253#[async_trait]
254impl<F, Fut> TimerCallback for F
255where
256 F: Fn() -> Fut + Send + Sync,
257 Fut: Future<Output = Result<(), TimerError>> + Send,
258{
259 async fn execute(&self) -> Result<(), TimerError> {
260 (self)().await
261 }
262}
263
264pub(super) enum TimerCommand {
265 Pause,
266 Resume,
267 Stop,
268 Cancel,
269 SetInterval(Duration),
270}
271
272pub(super) struct TimerInner {
273 pub(super) state: Mutex<TimerState>,
274 pub(super) handle: Mutex<Option<JoinHandle<()>>>,
275 pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
276 pub(super) interval: Mutex<Duration>,
277 pub(super) expiration_count: Mutex<Option<usize>>,
278 pub(super) statistics: Mutex<TimerStatistics>,
279 pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
280 pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
281 pub(super) event_tx: broadcast::Sender<TimerEvent>,
282 pub(super) events_enabled: AtomicBool,
283 pub(super) runtime: driver::RuntimeHandle,
284 pub(super) next_run_id: AtomicU64,
285 pub(super) active_run_id: AtomicU64,
286}
287
288#[derive(Debug, Clone, Copy)]
289pub(super) struct RunConfig {
290 pub(super) interval: Duration,
291 pub(super) initial_delay: Option<Duration>,
292 pub(super) callback_timeout: Option<Duration>,
293 pub(super) retry_policy: Option<RetryPolicy>,
294 pub(super) recurring: bool,
295 pub(super) expiration_count: Option<usize>,
296}
297
298#[derive(Debug, Clone, Copy)]
299enum TimerKind {
300 Once(Duration),
301 Recurring {
302 interval: Duration,
303 initial_delay: Option<Duration>,
304 },
305}
306
307pub struct TimerBuilder {
309 kind: TimerKind,
310 expiration_count: Option<usize>,
311 callback_timeout: Option<Duration>,
312 retry_policy: Option<RetryPolicy>,
313 start_paused: bool,
314 events_enabled: bool,
315}
316
317#[derive(Clone)]
318pub struct Timer {
320 inner: Arc<TimerInner>,
321}
322
323impl Default for Timer {
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329impl Timer {
330 pub fn new() -> Self {
332 Self::new_with_events(true)
333 }
334
335 pub fn new_silent() -> Self {
337 Self::new_with_events(false)
338 }
339
340 fn new_with_events(events_enabled: bool) -> Self {
341 let (completion_tx, _completion_rx) = watch::channel(None);
342 let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
343
344 Self {
345 inner: Arc::new(TimerInner {
346 state: Mutex::new(TimerState::Stopped),
347 handle: Mutex::new(None),
348 command_tx: Mutex::new(None),
349 interval: Mutex::new(Duration::ZERO),
350 expiration_count: Mutex::new(None),
351 statistics: Mutex::new(TimerStatistics::default()),
352 last_outcome: Mutex::new(None),
353 completion_tx,
354 event_tx,
355 events_enabled: AtomicBool::new(events_enabled),
356 runtime: driver::RuntimeHandle,
357 next_run_id: AtomicU64::new(1),
358 active_run_id: AtomicU64::new(0),
359 }),
360 }
361 }
362
363 pub fn once(delay: Duration) -> TimerBuilder {
365 TimerBuilder::once(delay)
366 }
367
368 pub fn recurring(interval: Duration) -> TimerBuilder {
370 TimerBuilder::recurring(interval)
371 }
372
373 pub fn subscribe(&self) -> TimerEvents {
375 TimerEvents {
376 receiver: self.inner.event_tx.subscribe(),
377 }
378 }
379
380 pub fn completion(&self) -> TimerCompletion {
382 TimerCompletion {
383 receiver: self.inner.completion_tx.subscribe(),
384 }
385 }
386
387 pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
389 where
390 F: TimerCallback + 'static,
391 {
392 self.start_internal(
393 RunConfig {
394 interval: delay,
395 initial_delay: None,
396 callback_timeout: None,
397 retry_policy: None,
398 recurring: false,
399 expiration_count: None,
400 },
401 callback,
402 false,
403 )
404 .await
405 }
406
407 pub async fn start_once_fn<F, Fut>(
409 &self,
410 delay: Duration,
411 callback: F,
412 ) -> Result<u64, TimerError>
413 where
414 F: Fn() -> Fut + Send + Sync + 'static,
415 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
416 {
417 self.start_once(delay, callback).await
418 }
419
420 pub async fn start_recurring<F>(
422 &self,
423 interval: Duration,
424 callback: F,
425 expiration_count: Option<usize>,
426 ) -> Result<u64, TimerError>
427 where
428 F: TimerCallback + 'static,
429 {
430 self.start_recurring_with_delay(interval, None, callback, expiration_count)
431 .await
432 }
433
434 pub async fn start_recurring_fn<F, Fut>(
436 &self,
437 interval: Duration,
438 callback: F,
439 expiration_count: Option<usize>,
440 ) -> Result<u64, TimerError>
441 where
442 F: Fn() -> Fut + Send + Sync + 'static,
443 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
444 {
445 self.start_recurring(interval, callback, expiration_count)
446 .await
447 }
448
449 pub async fn start_recurring_with_delay<F>(
451 &self,
452 interval: Duration,
453 initial_delay: Option<Duration>,
454 callback: F,
455 expiration_count: Option<usize>,
456 ) -> Result<u64, TimerError>
457 where
458 F: TimerCallback + 'static,
459 {
460 self.start_internal(
461 RunConfig {
462 interval,
463 initial_delay,
464 callback_timeout: None,
465 retry_policy: None,
466 recurring: true,
467 expiration_count,
468 },
469 callback,
470 false,
471 )
472 .await
473 }
474
475 pub async fn pause(&self) -> Result<(), TimerError> {
477 self.ensure_not_reentrant(
478 "pause() cannot be awaited from the timer's active callback; use request_pause().",
479 )?;
480 self.request_pause().await
481 }
482
483 pub async fn request_pause(&self) -> Result<(), TimerError> {
485 let _run_id = self
486 .active_run_id()
487 .await
488 .ok_or_else(TimerError::not_running)?;
489 let mut state = self.inner.state.lock().await;
490 if *state != TimerState::Running {
491 return Err(TimerError::not_running());
492 }
493
494 *state = TimerState::Paused;
495 drop(state);
496
497 self.send_command(TimerCommand::Pause).await;
498
499 #[cfg(feature = "logging")]
500 debug!("Timer paused.");
501
502 Ok(())
503 }
504
505 pub async fn resume(&self) -> Result<(), TimerError> {
507 self.ensure_not_reentrant(
508 "resume() cannot be awaited from the timer's active callback; use request_resume().",
509 )?;
510 self.request_resume().await
511 }
512
513 pub async fn request_resume(&self) -> Result<(), TimerError> {
515 let _run_id = self
516 .active_run_id()
517 .await
518 .ok_or_else(TimerError::not_paused)?;
519 let mut state = self.inner.state.lock().await;
520 if *state != TimerState::Paused {
521 return Err(TimerError::not_paused());
522 }
523
524 *state = TimerState::Running;
525 drop(state);
526
527 self.send_command(TimerCommand::Resume).await;
528
529 #[cfg(feature = "logging")]
530 debug!("Timer resumed.");
531
532 Ok(())
533 }
534
535 pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
537 self.ensure_not_reentrant(
538 "stop() cannot be awaited from the timer's active callback; use request_stop().",
539 )?;
540 let run_id = self
541 .active_run_id()
542 .await
543 .ok_or_else(TimerError::not_running)?;
544 self.request_stop().await?;
545 self.join_run(run_id).await
546 }
547
548 pub async fn request_stop(&self) -> Result<(), TimerError> {
550 self.active_run_id()
551 .await
552 .ok_or_else(TimerError::not_running)?;
553 self.send_command(TimerCommand::Stop).await;
554 Ok(())
555 }
556
557 pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
559 self.ensure_not_reentrant(
560 "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
561 )?;
562 self.cancel_with_reason(TimerFinishReason::Cancelled).await
563 }
564
565 pub async fn request_cancel(&self) -> Result<(), TimerError> {
567 self.active_run_id()
568 .await
569 .ok_or_else(TimerError::not_running)?;
570 self.send_command(TimerCommand::Cancel).await;
571 Ok(())
572 }
573
574 pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
576 self.ensure_not_reentrant(
577 "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
578 )?;
579 self.request_adjust_interval(new_interval).await
580 }
581
582 pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
584 if new_interval.is_zero() {
585 return Err(TimerError::invalid_parameter(
586 "Interval must be greater than zero.",
587 ));
588 }
589
590 let run_id = self
591 .active_run_id()
592 .await
593 .ok_or_else(TimerError::not_running)?;
594 *self.inner.interval.lock().await = new_interval;
595 self.send_command(TimerCommand::SetInterval(new_interval))
596 .await;
597 runtime::emit_event(
598 &self.inner,
599 TimerEvent::IntervalAdjusted {
600 run_id,
601 interval: new_interval,
602 },
603 );
604
605 #[cfg(feature = "logging")]
606 debug!("Timer interval adjusted.");
607
608 Ok(())
609 }
610
611 pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
613 self.ensure_not_reentrant(
614 "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
615 )?;
616 if let Some(run_id) = self.active_run_id().await {
617 return self.join_run(run_id).await;
618 }
619
620 self.inner
621 .last_outcome
622 .lock()
623 .await
624 .clone()
625 .ok_or_else(TimerError::not_running)
626 }
627
628 pub async fn wait(&self) {
630 let _ = self.join().await;
631 }
632
633 pub async fn get_statistics(&self) -> TimerStatistics {
635 self.inner.statistics.lock().await.clone()
636 }
637
638 pub async fn get_state(&self) -> TimerState {
640 *self.inner.state.lock().await
641 }
642
643 pub async fn get_interval(&self) -> Duration {
645 *self.inner.interval.lock().await
646 }
647
648 pub async fn get_expiration_count(&self) -> Option<usize> {
650 *self.inner.expiration_count.lock().await
651 }
652
653 pub async fn get_last_error(&self) -> Option<TimerError> {
655 self.inner.statistics.lock().await.last_error.clone()
656 }
657
658 pub async fn last_outcome(&self) -> Option<TimerOutcome> {
660 self.inner.last_outcome.lock().await.clone()
661 }
662
663 pub fn set_events_enabled(&self, enabled: bool) {
665 self.inner.events_enabled.store(enabled, Ordering::SeqCst);
666 }
667
668 async fn start_internal<F>(
669 &self,
670 config: RunConfig,
671 callback: F,
672 start_paused: bool,
673 ) -> Result<u64, TimerError>
674 where
675 F: TimerCallback + 'static,
676 {
677 if config.interval.is_zero() {
678 return Err(TimerError::invalid_parameter(
679 "Interval must be greater than zero.",
680 ));
681 }
682
683 if config.recurring && matches!(config.expiration_count, Some(0)) {
684 return Err(TimerError::invalid_parameter(
685 "Expiration count must be greater than zero.",
686 ));
687 }
688
689 if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
690 return Err(TimerError::invalid_parameter(
691 "Initial delay must be greater than zero.",
692 ));
693 }
694
695 if config
696 .callback_timeout
697 .is_some_and(|timeout| timeout.is_zero())
698 {
699 return Err(TimerError::invalid_parameter(
700 "Callback timeout must be greater than zero.",
701 ));
702 }
703
704 self.ensure_not_reentrant(
705 "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
706 )?;
707
708 let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
709
710 let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
711 let (tx, rx) = mpsc::unbounded_channel();
712
713 {
714 *self.inner.state.lock().await = if start_paused {
715 TimerState::Paused
716 } else {
717 TimerState::Running
718 };
719 *self.inner.command_tx.lock().await = Some(tx);
720 *self.inner.interval.lock().await = config.interval;
721 *self.inner.expiration_count.lock().await = config.expiration_count;
722 *self.inner.statistics.lock().await = TimerStatistics::default();
723 *self.inner.last_outcome.lock().await = None;
724 self.inner.completion_tx.send_replace(None);
725 }
726 self.inner.active_run_id.store(run_id, Ordering::SeqCst);
727
728 runtime::emit_event(
729 &self.inner,
730 TimerEvent::Started {
731 run_id,
732 interval: config.interval,
733 recurring: config.recurring,
734 expiration_count: config.expiration_count,
735 },
736 );
737
738 let inner = Arc::clone(&self.inner);
739 let handle = self.inner.runtime.spawn(async move {
740 let scoped_inner = Arc::clone(&inner);
741 runtime::with_run_context(&scoped_inner, run_id, async move {
742 runtime::run_timer(inner, run_id, config, callback, rx).await;
743 })
744 .await;
745 });
746
747 *self.inner.handle.lock().await = Some(handle);
748
749 #[cfg(feature = "logging")]
750 debug!("Timer started.");
751
752 Ok(run_id)
753 }
754
755 async fn active_run_id(&self) -> Option<u64> {
756 match self.inner.active_run_id.load(Ordering::SeqCst) {
757 0 => None,
758 run_id => Some(run_id),
759 }
760 }
761
762 async fn send_command(&self, command: TimerCommand) {
763 if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
764 let _ = tx.send(command);
765 }
766 }
767
768 fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
769 if runtime::is_current_run(&self.inner) {
770 return Err(TimerError::reentrant_operation(message));
771 }
772
773 Ok(())
774 }
775
776 async fn cancel_with_reason(
777 &self,
778 reason: TimerFinishReason,
779 ) -> Result<TimerOutcome, TimerError> {
780 let run_id = self
781 .active_run_id()
782 .await
783 .ok_or_else(TimerError::not_running)?;
784
785 let _ = self.inner.command_tx.lock().await.take();
786 let handle = self.inner.handle.lock().await.take();
787 *self.inner.state.lock().await = TimerState::Stopped;
788
789 if let Some(handle) = handle {
790 handle.abort();
791 let _ = handle.await;
792 }
793
794 let statistics = self.get_statistics().await;
795 let outcome = TimerOutcome {
796 run_id,
797 reason,
798 statistics,
799 };
800
801 runtime::finish_run(&self.inner, outcome.clone()).await;
802 Ok(outcome)
803 }
804
805 async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
806 let mut completion_rx = self.inner.completion_tx.subscribe();
807
808 loop {
809 if let Some(outcome) = completion_rx.borrow().clone() {
810 if outcome.run_id == run_id {
811 return Ok(outcome);
812 }
813 }
814
815 if completion_rx.changed().await.is_err() {
816 return completion_rx
817 .borrow()
818 .clone()
819 .ok_or_else(TimerError::not_running);
820 }
821 }
822 }
823}
824
825impl TimerBuilder {
826 pub fn once(delay: Duration) -> Self {
828 Self {
829 kind: TimerKind::Once(delay),
830 expiration_count: None,
831 callback_timeout: None,
832 retry_policy: None,
833 start_paused: false,
834 events_enabled: true,
835 }
836 }
837
838 pub fn recurring(interval: Duration) -> Self {
840 Self {
841 kind: TimerKind::Recurring {
842 interval,
843 initial_delay: None,
844 },
845 expiration_count: None,
846 callback_timeout: None,
847 retry_policy: None,
848 start_paused: false,
849 events_enabled: true,
850 }
851 }
852
853 pub fn expiration_count(mut self, expiration_count: usize) -> Self {
855 self.expiration_count = Some(expiration_count);
856 self
857 }
858
859 pub fn initial_delay(mut self, initial_delay: Duration) -> Self {
861 if let TimerKind::Recurring {
862 initial_delay: builder_initial_delay,
863 ..
864 } = &mut self.kind
865 {
866 *builder_initial_delay = Some(initial_delay);
867 }
868
869 self
870 }
871
872 pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
874 self.callback_timeout = Some(callback_timeout);
875 self
876 }
877
878 pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
880 self.retry_policy = Some(retry_policy);
881 self
882 }
883
884 pub fn max_retries(mut self, max_retries: usize) -> Self {
886 self.retry_policy = Some(RetryPolicy::new(max_retries));
887 self
888 }
889
890 pub fn paused_start(mut self) -> Self {
892 self.start_paused = true;
893 self
894 }
895
896 pub fn with_events_disabled(mut self) -> Self {
898 self.events_enabled = false;
899 self
900 }
901
902 pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
904 where
905 F: TimerCallback + 'static,
906 {
907 let timer = Timer::new_with_events(self.events_enabled);
908 if self.start_paused {
909 *timer.inner.state.lock().await = TimerState::Paused;
910 }
911
912 match self.kind {
913 TimerKind::Once(delay) => {
914 let _ = timer
915 .start_internal(
916 RunConfig {
917 interval: delay,
918 initial_delay: None,
919 callback_timeout: self.callback_timeout,
920 retry_policy: self.retry_policy,
921 recurring: false,
922 expiration_count: None,
923 },
924 callback,
925 self.start_paused,
926 )
927 .await?;
928 }
929 TimerKind::Recurring {
930 interval,
931 initial_delay,
932 } => {
933 let _ = timer
934 .start_internal(
935 RunConfig {
936 interval,
937 initial_delay,
938 callback_timeout: self.callback_timeout,
939 retry_policy: self.retry_policy,
940 recurring: true,
941 expiration_count: self.expiration_count,
942 },
943 callback,
944 self.start_paused,
945 )
946 .await?;
947 }
948 }
949 Ok(timer)
950 }
951}