1use async_trait::async_trait;
2use std::future::Future;
3use std::sync::{
4 atomic::{AtomicBool, AtomicU64, Ordering},
5 Arc,
6};
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, watch, Mutex};
9use tokio::task::JoinHandle;
10
11#[cfg(feature = "logging")]
12use log::debug;
13
14use crate::errors::TimerError;
15
16mod driver;
17mod runtime;
18
19#[cfg(test)]
20mod tests;
21
22const TIMER_EVENT_BUFFER: usize = 64;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum TimerState {
27 Running,
28 Paused,
29 Stopped,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TimerFinishReason {
35 Completed,
36 Stopped,
37 Cancelled,
38 Replaced,
39}
40
41#[derive(Debug, Clone, Default, PartialEq, Eq)]
43pub struct TimerStatistics {
44 pub execution_count: usize,
46 pub successful_executions: usize,
48 pub failed_executions: usize,
50 pub elapsed_time: Duration,
52 pub last_error: Option<TimerError>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TimerOutcome {
59 pub run_id: u64,
61 pub reason: TimerFinishReason,
63 pub statistics: TimerStatistics,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum RecurringCadence {
70 FixedDelay,
71 FixedRate,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct RecurringSchedule {
77 interval: Duration,
78 initial_delay: Option<Duration>,
79 cadence: RecurringCadence,
80 expiration_count: Option<usize>,
81}
82
83impl RecurringSchedule {
84 pub fn new(interval: Duration) -> Self {
86 Self {
87 interval,
88 initial_delay: None,
89 cadence: RecurringCadence::FixedDelay,
90 expiration_count: None,
91 }
92 }
93
94 pub fn interval(self) -> Duration {
96 self.interval
97 }
98
99 pub fn initial_delay(self) -> Option<Duration> {
101 self.initial_delay
102 }
103
104 pub fn cadence(self) -> RecurringCadence {
106 self.cadence
107 }
108
109 pub fn expiration_count(self) -> Option<usize> {
111 self.expiration_count
112 }
113
114 pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
116 self.initial_delay = Some(initial_delay);
117 self
118 }
119
120 pub fn with_cadence(mut self, cadence: RecurringCadence) -> Self {
122 self.cadence = cadence;
123 self
124 }
125
126 pub fn fixed_delay(mut self) -> Self {
128 self.cadence = RecurringCadence::FixedDelay;
129 self
130 }
131
132 pub fn fixed_rate(mut self) -> Self {
134 self.cadence = RecurringCadence::FixedRate;
135 self
136 }
137
138 pub fn with_expiration_count(mut self, expiration_count: usize) -> Self {
140 self.expiration_count = Some(expiration_count);
141 self
142 }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub struct RetryPolicy {
148 max_retries: usize,
149}
150
151impl RetryPolicy {
152 pub fn new(max_retries: usize) -> Self {
154 Self { max_retries }
155 }
156
157 pub fn max_retries(self) -> usize {
159 self.max_retries
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum TimerEvent {
166 Started {
167 run_id: u64,
168 interval: Duration,
169 recurring: bool,
170 expiration_count: Option<usize>,
171 },
172 Paused {
173 run_id: u64,
174 },
175 Resumed {
176 run_id: u64,
177 },
178 IntervalAdjusted {
179 run_id: u64,
180 interval: Duration,
181 },
182 Tick {
183 run_id: u64,
184 statistics: TimerStatistics,
185 },
186 CallbackFailed {
187 run_id: u64,
188 error: TimerError,
189 statistics: TimerStatistics,
190 },
191 Finished(TimerOutcome),
192}
193
194pub struct TimerEvents {
196 receiver: broadcast::Receiver<TimerEvent>,
197}
198
199impl TimerEvents {
200 pub fn try_recv(&mut self) -> Option<TimerEvent> {
202 loop {
203 match self.receiver.try_recv() {
204 Ok(event) => return Some(event),
205 Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
206 Err(broadcast::error::TryRecvError::Empty)
207 | Err(broadcast::error::TryRecvError::Closed) => return None,
208 }
209 }
210 }
211
212 pub async fn recv(&mut self) -> Option<TimerEvent> {
214 loop {
215 match self.receiver.recv().await {
216 Ok(event) => return Some(event),
217 Err(broadcast::error::RecvError::Lagged(_)) => continue,
218 Err(broadcast::error::RecvError::Closed) => return None,
219 }
220 }
221 }
222
223 pub async fn wait_started(&mut self) -> Option<TimerEvent> {
225 loop {
226 if let event @ TimerEvent::Started { .. } = self.recv().await? {
227 return Some(event);
228 }
229 }
230 }
231
232 pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
234 loop {
235 if let event @ TimerEvent::Tick { .. } = self.recv().await? {
236 return Some(event);
237 }
238 }
239 }
240
241 pub async fn wait_paused(&mut self) -> Option<TimerEvent> {
243 loop {
244 if let event @ TimerEvent::Paused { .. } = self.recv().await? {
245 return Some(event);
246 }
247 }
248 }
249
250 pub async fn wait_resumed(&mut self) -> Option<TimerEvent> {
252 loop {
253 if let event @ TimerEvent::Resumed { .. } = self.recv().await? {
254 return Some(event);
255 }
256 }
257 }
258
259 pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
261 loop {
262 if let TimerEvent::Finished(outcome) = self.recv().await? {
263 return Some(outcome);
264 }
265 }
266 }
267
268 pub async fn wait_stopped(&mut self) -> Option<TimerOutcome> {
270 self.wait_finished_reason(TimerFinishReason::Stopped).await
271 }
272
273 pub async fn wait_cancelled(&mut self) -> Option<TimerOutcome> {
275 self.wait_finished_reason(TimerFinishReason::Cancelled)
276 .await
277 }
278
279 async fn wait_finished_reason(&mut self, reason: TimerFinishReason) -> Option<TimerOutcome> {
280 loop {
281 let outcome = self.wait_finished().await?;
282 if outcome.reason == reason {
283 return Some(outcome);
284 }
285 }
286 }
287}
288
289pub struct TimerCompletion {
291 receiver: watch::Receiver<Option<TimerOutcome>>,
292}
293
294impl TimerCompletion {
295 pub fn latest(&self) -> Option<TimerOutcome> {
297 self.receiver.borrow().clone()
298 }
299
300 pub async fn wait(&mut self) -> Option<TimerOutcome> {
302 loop {
303 if let Some(outcome) = self.receiver.borrow_and_update().clone() {
304 return Some(outcome);
305 }
306
307 if self.receiver.changed().await.is_err() {
308 return self.receiver.borrow_and_update().clone();
309 }
310 }
311 }
312
313 pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
315 loop {
316 let outcome = self.wait().await?;
317 if outcome.run_id == run_id {
318 return Some(outcome);
319 }
320 }
321 }
322}
323
324#[async_trait]
326pub trait TimerCallback: Send + Sync {
327 async fn execute(&self) -> Result<(), TimerError>;
329}
330
331#[async_trait]
332impl<F, Fut> TimerCallback for F
333where
334 F: Fn() -> Fut + Send + Sync,
335 Fut: Future<Output = Result<(), TimerError>> + Send,
336{
337 async fn execute(&self) -> Result<(), TimerError> {
338 (self)().await
339 }
340}
341
342pub(super) enum TimerCommand {
343 Pause,
344 Resume,
345 Stop,
346 Cancel,
347 SetInterval(Duration),
348}
349
350pub(super) struct TimerInner {
351 pub(super) state: Mutex<TimerState>,
352 pub(super) handle: Mutex<Option<JoinHandle<()>>>,
353 pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
354 pub(super) interval: Mutex<Duration>,
355 pub(super) expiration_count: Mutex<Option<usize>>,
356 pub(super) statistics: Mutex<TimerStatistics>,
357 pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
358 pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
359 pub(super) event_tx: broadcast::Sender<TimerEvent>,
360 pub(super) events_enabled: AtomicBool,
361 pub(super) runtime: driver::RuntimeHandle,
362 pub(super) next_run_id: AtomicU64,
363 pub(super) active_run_id: AtomicU64,
364}
365
366#[derive(Debug, Clone, Copy)]
367pub(super) struct RunConfig {
368 pub(super) interval: Duration,
369 pub(super) initial_delay: Option<Duration>,
370 pub(super) callback_timeout: Option<Duration>,
371 pub(super) retry_policy: Option<RetryPolicy>,
372 pub(super) recurring: bool,
373 pub(super) cadence: RecurringCadence,
374 pub(super) expiration_count: Option<usize>,
375}
376
377#[derive(Debug, Clone, Copy)]
378enum TimerKind {
379 Once(Duration),
380 Recurring(RecurringSchedule),
381}
382
383pub struct TimerBuilder {
385 kind: TimerKind,
386 callback_timeout: Option<Duration>,
387 retry_policy: Option<RetryPolicy>,
388 start_paused: bool,
389 events_enabled: bool,
390}
391
392#[derive(Clone)]
393pub struct Timer {
395 inner: Arc<TimerInner>,
396}
397
398impl Default for Timer {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404impl Timer {
405 pub fn new() -> Self {
407 Self::new_with_events(true)
408 }
409
410 pub fn new_silent() -> Self {
412 Self::new_with_events(false)
413 }
414
415 fn new_with_events(events_enabled: bool) -> Self {
416 let (completion_tx, _completion_rx) = watch::channel(None);
417 let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
418
419 Self {
420 inner: Arc::new(TimerInner {
421 state: Mutex::new(TimerState::Stopped),
422 handle: Mutex::new(None),
423 command_tx: Mutex::new(None),
424 interval: Mutex::new(Duration::ZERO),
425 expiration_count: Mutex::new(None),
426 statistics: Mutex::new(TimerStatistics::default()),
427 last_outcome: Mutex::new(None),
428 completion_tx,
429 event_tx,
430 events_enabled: AtomicBool::new(events_enabled),
431 runtime: driver::RuntimeHandle,
432 next_run_id: AtomicU64::new(1),
433 active_run_id: AtomicU64::new(0),
434 }),
435 }
436 }
437
438 pub fn once(delay: Duration) -> TimerBuilder {
440 TimerBuilder::once(delay)
441 }
442
443 pub fn recurring(schedule: RecurringSchedule) -> TimerBuilder {
445 TimerBuilder::recurring(schedule)
446 }
447
448 pub fn subscribe(&self) -> TimerEvents {
450 TimerEvents {
451 receiver: self.inner.event_tx.subscribe(),
452 }
453 }
454
455 pub fn completion(&self) -> TimerCompletion {
457 TimerCompletion {
458 receiver: self.inner.completion_tx.subscribe(),
459 }
460 }
461
462 pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
464 where
465 F: TimerCallback + 'static,
466 {
467 self.start_internal(
468 RunConfig {
469 interval: delay,
470 initial_delay: None,
471 callback_timeout: None,
472 retry_policy: None,
473 recurring: false,
474 cadence: RecurringCadence::FixedDelay,
475 expiration_count: None,
476 },
477 callback,
478 false,
479 )
480 .await
481 }
482
483 pub async fn start_once_fn<F, Fut>(
485 &self,
486 delay: Duration,
487 callback: F,
488 ) -> Result<u64, TimerError>
489 where
490 F: Fn() -> Fut + Send + Sync + 'static,
491 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
492 {
493 self.start_once(delay, callback).await
494 }
495
496 pub async fn start_recurring<F>(
498 &self,
499 schedule: RecurringSchedule,
500 callback: F,
501 ) -> Result<u64, TimerError>
502 where
503 F: TimerCallback + 'static,
504 {
505 self.start_internal(
506 RunConfig {
507 interval: schedule.interval,
508 initial_delay: schedule.initial_delay,
509 callback_timeout: None,
510 retry_policy: None,
511 recurring: true,
512 cadence: schedule.cadence,
513 expiration_count: schedule.expiration_count,
514 },
515 callback,
516 false,
517 )
518 .await
519 }
520
521 pub async fn start_recurring_fn<F, Fut>(
523 &self,
524 schedule: RecurringSchedule,
525 callback: F,
526 ) -> Result<u64, TimerError>
527 where
528 F: Fn() -> Fut + Send + Sync + 'static,
529 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
530 {
531 self.start_recurring(schedule, callback).await
532 }
533
534 pub async fn pause(&self) -> Result<(), TimerError> {
536 self.ensure_not_reentrant(
537 "pause() cannot be awaited from the timer's active callback; use request_pause().",
538 )?;
539 self.request_pause().await
540 }
541
542 pub async fn request_pause(&self) -> Result<(), TimerError> {
544 let _run_id = self
545 .active_run_id()
546 .await
547 .ok_or_else(TimerError::not_running)?;
548 let mut state = self.inner.state.lock().await;
549 if *state != TimerState::Running {
550 return Err(TimerError::not_running());
551 }
552
553 *state = TimerState::Paused;
554 drop(state);
555
556 self.send_command(TimerCommand::Pause).await;
557
558 #[cfg(feature = "logging")]
559 debug!("Timer paused.");
560
561 Ok(())
562 }
563
564 pub async fn resume(&self) -> Result<(), TimerError> {
566 self.ensure_not_reentrant(
567 "resume() cannot be awaited from the timer's active callback; use request_resume().",
568 )?;
569 self.request_resume().await
570 }
571
572 pub async fn request_resume(&self) -> Result<(), TimerError> {
574 let _run_id = self
575 .active_run_id()
576 .await
577 .ok_or_else(TimerError::not_paused)?;
578 let mut state = self.inner.state.lock().await;
579 if *state != TimerState::Paused {
580 return Err(TimerError::not_paused());
581 }
582
583 *state = TimerState::Running;
584 drop(state);
585
586 self.send_command(TimerCommand::Resume).await;
587
588 #[cfg(feature = "logging")]
589 debug!("Timer resumed.");
590
591 Ok(())
592 }
593
594 pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
596 self.ensure_not_reentrant(
597 "stop() cannot be awaited from the timer's active callback; use request_stop().",
598 )?;
599 let run_id = self
600 .active_run_id()
601 .await
602 .ok_or_else(TimerError::not_running)?;
603 self.request_stop().await?;
604 self.join_run(run_id).await
605 }
606
607 pub async fn request_stop(&self) -> Result<(), TimerError> {
609 self.active_run_id()
610 .await
611 .ok_or_else(TimerError::not_running)?;
612 self.send_command(TimerCommand::Stop).await;
613 Ok(())
614 }
615
616 pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
618 self.ensure_not_reentrant(
619 "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
620 )?;
621 self.cancel_with_reason(TimerFinishReason::Cancelled).await
622 }
623
624 pub async fn request_cancel(&self) -> Result<(), TimerError> {
626 self.active_run_id()
627 .await
628 .ok_or_else(TimerError::not_running)?;
629 self.send_command(TimerCommand::Cancel).await;
630 Ok(())
631 }
632
633 pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
635 self.ensure_not_reentrant(
636 "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
637 )?;
638 self.request_adjust_interval(new_interval).await
639 }
640
641 pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
643 if new_interval.is_zero() {
644 return Err(TimerError::invalid_parameter(
645 "Interval must be greater than zero.",
646 ));
647 }
648
649 let run_id = self
650 .active_run_id()
651 .await
652 .ok_or_else(TimerError::not_running)?;
653 *self.inner.interval.lock().await = new_interval;
654 self.send_command(TimerCommand::SetInterval(new_interval))
655 .await;
656 runtime::emit_event(
657 &self.inner,
658 TimerEvent::IntervalAdjusted {
659 run_id,
660 interval: new_interval,
661 },
662 );
663
664 #[cfg(feature = "logging")]
665 debug!("Timer interval adjusted.");
666
667 Ok(())
668 }
669
670 pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
672 self.ensure_not_reentrant(
673 "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
674 )?;
675 if let Some(run_id) = self.active_run_id().await {
676 return self.join_run(run_id).await;
677 }
678
679 self.inner
680 .last_outcome
681 .lock()
682 .await
683 .clone()
684 .ok_or_else(TimerError::not_running)
685 }
686
687 pub async fn wait(&self) {
689 let _ = self.join().await;
690 }
691
692 pub async fn get_statistics(&self) -> TimerStatistics {
694 self.inner.statistics.lock().await.clone()
695 }
696
697 pub async fn get_state(&self) -> TimerState {
699 *self.inner.state.lock().await
700 }
701
702 pub async fn get_interval(&self) -> Duration {
704 *self.inner.interval.lock().await
705 }
706
707 pub async fn get_expiration_count(&self) -> Option<usize> {
709 *self.inner.expiration_count.lock().await
710 }
711
712 pub async fn get_last_error(&self) -> Option<TimerError> {
714 self.inner.statistics.lock().await.last_error.clone()
715 }
716
717 pub async fn last_outcome(&self) -> Option<TimerOutcome> {
719 self.inner.last_outcome.lock().await.clone()
720 }
721
722 pub fn set_events_enabled(&self, enabled: bool) {
724 self.inner.events_enabled.store(enabled, Ordering::SeqCst);
725 }
726
727 async fn start_internal<F>(
728 &self,
729 config: RunConfig,
730 callback: F,
731 start_paused: bool,
732 ) -> Result<u64, TimerError>
733 where
734 F: TimerCallback + 'static,
735 {
736 if config.interval.is_zero() {
737 return Err(TimerError::invalid_parameter(
738 "Interval must be greater than zero.",
739 ));
740 }
741
742 if config.recurring && matches!(config.expiration_count, Some(0)) {
743 return Err(TimerError::invalid_parameter(
744 "Expiration count must be greater than zero.",
745 ));
746 }
747
748 if config.initial_delay.is_some_and(|delay| delay.is_zero()) {
749 return Err(TimerError::invalid_parameter(
750 "Initial delay must be greater than zero.",
751 ));
752 }
753
754 if config
755 .callback_timeout
756 .is_some_and(|timeout| timeout.is_zero())
757 {
758 return Err(TimerError::invalid_parameter(
759 "Callback timeout must be greater than zero.",
760 ));
761 }
762
763 self.ensure_not_reentrant(
764 "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
765 )?;
766
767 let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
768
769 let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
770 let (tx, rx) = mpsc::unbounded_channel();
771
772 {
773 *self.inner.state.lock().await = if start_paused {
774 TimerState::Paused
775 } else {
776 TimerState::Running
777 };
778 *self.inner.command_tx.lock().await = Some(tx);
779 *self.inner.interval.lock().await = config.interval;
780 *self.inner.expiration_count.lock().await = config.expiration_count;
781 *self.inner.statistics.lock().await = TimerStatistics::default();
782 *self.inner.last_outcome.lock().await = None;
783 self.inner.completion_tx.send_replace(None);
784 }
785 self.inner.active_run_id.store(run_id, Ordering::SeqCst);
786
787 runtime::emit_event(
788 &self.inner,
789 TimerEvent::Started {
790 run_id,
791 interval: config.interval,
792 recurring: config.recurring,
793 expiration_count: config.expiration_count,
794 },
795 );
796
797 let inner = Arc::clone(&self.inner);
798 let handle = self.inner.runtime.spawn(async move {
799 let scoped_inner = Arc::clone(&inner);
800 runtime::with_run_context(&scoped_inner, run_id, async move {
801 runtime::run_timer(inner, run_id, config, callback, rx).await;
802 })
803 .await;
804 });
805
806 *self.inner.handle.lock().await = Some(handle);
807
808 #[cfg(feature = "logging")]
809 debug!("Timer started.");
810
811 Ok(run_id)
812 }
813
814 async fn active_run_id(&self) -> Option<u64> {
815 match self.inner.active_run_id.load(Ordering::SeqCst) {
816 0 => None,
817 run_id => Some(run_id),
818 }
819 }
820
821 async fn send_command(&self, command: TimerCommand) {
822 if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
823 let _ = tx.send(command);
824 }
825 }
826
827 fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
828 if runtime::is_current_run(&self.inner) {
829 return Err(TimerError::reentrant_operation(message));
830 }
831
832 Ok(())
833 }
834
835 async fn cancel_with_reason(
836 &self,
837 reason: TimerFinishReason,
838 ) -> Result<TimerOutcome, TimerError> {
839 let run_id = self
840 .active_run_id()
841 .await
842 .ok_or_else(TimerError::not_running)?;
843
844 let _ = self.inner.command_tx.lock().await.take();
845 let handle = self.inner.handle.lock().await.take();
846 *self.inner.state.lock().await = TimerState::Stopped;
847
848 if let Some(handle) = handle {
849 handle.abort();
850 let _ = handle.await;
851 }
852
853 let statistics = self.get_statistics().await;
854 let outcome = TimerOutcome {
855 run_id,
856 reason,
857 statistics,
858 };
859
860 runtime::finish_run(&self.inner, outcome.clone()).await;
861 Ok(outcome)
862 }
863
864 async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
865 let mut completion_rx = self.inner.completion_tx.subscribe();
866
867 loop {
868 if let Some(outcome) = completion_rx.borrow().clone() {
869 if outcome.run_id == run_id {
870 return Ok(outcome);
871 }
872 }
873
874 if completion_rx.changed().await.is_err() {
875 return completion_rx
876 .borrow()
877 .clone()
878 .ok_or_else(TimerError::not_running);
879 }
880 }
881 }
882}
883
884impl TimerBuilder {
885 pub fn once(delay: Duration) -> Self {
887 Self {
888 kind: TimerKind::Once(delay),
889 callback_timeout: None,
890 retry_policy: None,
891 start_paused: false,
892 events_enabled: true,
893 }
894 }
895
896 pub fn recurring(schedule: RecurringSchedule) -> Self {
898 Self {
899 kind: TimerKind::Recurring(schedule),
900 callback_timeout: None,
901 retry_policy: None,
902 start_paused: false,
903 events_enabled: true,
904 }
905 }
906
907 pub fn callback_timeout(mut self, callback_timeout: Duration) -> Self {
909 self.callback_timeout = Some(callback_timeout);
910 self
911 }
912
913 pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
915 self.retry_policy = Some(retry_policy);
916 self
917 }
918
919 pub fn max_retries(mut self, max_retries: usize) -> Self {
921 self.retry_policy = Some(RetryPolicy::new(max_retries));
922 self
923 }
924
925 pub fn paused_start(mut self) -> Self {
927 self.start_paused = true;
928 self
929 }
930
931 pub fn with_events_disabled(mut self) -> Self {
933 self.events_enabled = false;
934 self
935 }
936
937 pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
939 where
940 F: TimerCallback + 'static,
941 {
942 let timer = Timer::new_with_events(self.events_enabled);
943 if self.start_paused {
944 *timer.inner.state.lock().await = TimerState::Paused;
945 }
946
947 match self.kind {
948 TimerKind::Once(delay) => {
949 let _ = timer
950 .start_internal(
951 RunConfig {
952 interval: delay,
953 initial_delay: None,
954 callback_timeout: self.callback_timeout,
955 retry_policy: self.retry_policy,
956 recurring: false,
957 cadence: RecurringCadence::FixedDelay,
958 expiration_count: None,
959 },
960 callback,
961 self.start_paused,
962 )
963 .await?;
964 }
965 TimerKind::Recurring(schedule) => {
966 let _ = timer
967 .start_internal(
968 RunConfig {
969 interval: schedule.interval,
970 initial_delay: schedule.initial_delay,
971 callback_timeout: self.callback_timeout,
972 retry_policy: self.retry_policy,
973 recurring: true,
974 cadence: schedule.cadence,
975 expiration_count: schedule.expiration_count,
976 },
977 callback,
978 self.start_paused,
979 )
980 .await?;
981 }
982 }
983 Ok(timer)
984 }
985}