1use async_trait::async_trait;
2use std::future::Future;
3use std::sync::{
4 atomic::{AtomicBool, AtomicU64, Ordering},
5 Arc,
6};
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, watch, Mutex};
9use tokio::task::JoinHandle;
10
11#[cfg(feature = "logging")]
12use log::debug;
13
14use crate::errors::TimerError;
15
16mod driver;
17mod runtime;
18
19#[cfg(test)]
20mod tests;
21
22const TIMER_EVENT_BUFFER: usize = 64;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum TimerState {
27 Running,
28 Paused,
29 Stopped,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TimerFinishReason {
35 Completed,
36 Stopped,
37 Cancelled,
38 Replaced,
39}
40
41#[derive(Debug, Clone, Default, PartialEq, Eq)]
43pub struct TimerStatistics {
44 pub execution_count: usize,
46 pub successful_executions: usize,
48 pub failed_executions: usize,
50 pub elapsed_time: Duration,
52 pub last_error: Option<TimerError>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct TimerOutcome {
59 pub run_id: u64,
61 pub reason: TimerFinishReason,
63 pub statistics: TimerStatistics,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum TimerEvent {
70 Started {
71 run_id: u64,
72 interval: Duration,
73 recurring: bool,
74 expiration_count: Option<usize>,
75 },
76 Paused {
77 run_id: u64,
78 },
79 Resumed {
80 run_id: u64,
81 },
82 IntervalAdjusted {
83 run_id: u64,
84 interval: Duration,
85 },
86 Tick {
87 run_id: u64,
88 statistics: TimerStatistics,
89 },
90 CallbackFailed {
91 run_id: u64,
92 error: TimerError,
93 statistics: TimerStatistics,
94 },
95 Finished(TimerOutcome),
96}
97
98pub struct TimerEvents {
100 receiver: broadcast::Receiver<TimerEvent>,
101}
102
103impl TimerEvents {
104 pub fn try_recv(&mut self) -> Option<TimerEvent> {
106 loop {
107 match self.receiver.try_recv() {
108 Ok(event) => return Some(event),
109 Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
110 Err(broadcast::error::TryRecvError::Empty)
111 | Err(broadcast::error::TryRecvError::Closed) => return None,
112 }
113 }
114 }
115
116 pub async fn recv(&mut self) -> Option<TimerEvent> {
118 loop {
119 match self.receiver.recv().await {
120 Ok(event) => return Some(event),
121 Err(broadcast::error::RecvError::Lagged(_)) => continue,
122 Err(broadcast::error::RecvError::Closed) => return None,
123 }
124 }
125 }
126
127 pub async fn wait_started(&mut self) -> Option<TimerEvent> {
129 loop {
130 if let event @ TimerEvent::Started { .. } = self.recv().await? {
131 return Some(event);
132 }
133 }
134 }
135
136 pub async fn wait_tick(&mut self) -> Option<TimerEvent> {
138 loop {
139 if let event @ TimerEvent::Tick { .. } = self.recv().await? {
140 return Some(event);
141 }
142 }
143 }
144
145 pub async fn wait_finished(&mut self) -> Option<TimerOutcome> {
147 loop {
148 if let TimerEvent::Finished(outcome) = self.recv().await? {
149 return Some(outcome);
150 }
151 }
152 }
153}
154
155pub struct TimerCompletion {
157 receiver: watch::Receiver<Option<TimerOutcome>>,
158}
159
160impl TimerCompletion {
161 pub fn latest(&self) -> Option<TimerOutcome> {
163 self.receiver.borrow().clone()
164 }
165
166 pub async fn wait(&mut self) -> Option<TimerOutcome> {
168 loop {
169 if let Some(outcome) = self.receiver.borrow_and_update().clone() {
170 return Some(outcome);
171 }
172
173 if self.receiver.changed().await.is_err() {
174 return self.receiver.borrow_and_update().clone();
175 }
176 }
177 }
178
179 pub async fn wait_for_run(&mut self, run_id: u64) -> Option<TimerOutcome> {
181 loop {
182 let outcome = self.wait().await?;
183 if outcome.run_id == run_id {
184 return Some(outcome);
185 }
186 }
187 }
188}
189
190#[async_trait]
192pub trait TimerCallback: Send + Sync {
193 async fn execute(&self) -> Result<(), TimerError>;
195}
196
197#[async_trait]
198impl<F, Fut> TimerCallback for F
199where
200 F: Fn() -> Fut + Send + Sync,
201 Fut: Future<Output = Result<(), TimerError>> + Send,
202{
203 async fn execute(&self) -> Result<(), TimerError> {
204 (self)().await
205 }
206}
207
208pub(super) enum TimerCommand {
209 Pause,
210 Resume,
211 Stop,
212 Cancel,
213 SetInterval(Duration),
214}
215
216pub(super) struct TimerInner {
217 pub(super) state: Mutex<TimerState>,
218 pub(super) handle: Mutex<Option<JoinHandle<()>>>,
219 pub(super) command_tx: Mutex<Option<mpsc::UnboundedSender<TimerCommand>>>,
220 pub(super) interval: Mutex<Duration>,
221 pub(super) expiration_count: Mutex<Option<usize>>,
222 pub(super) statistics: Mutex<TimerStatistics>,
223 pub(super) last_outcome: Mutex<Option<TimerOutcome>>,
224 pub(super) completion_tx: watch::Sender<Option<TimerOutcome>>,
225 pub(super) event_tx: broadcast::Sender<TimerEvent>,
226 pub(super) events_enabled: AtomicBool,
227 pub(super) runtime: driver::RuntimeHandle,
228 pub(super) next_run_id: AtomicU64,
229 pub(super) active_run_id: AtomicU64,
230}
231
232#[derive(Debug, Clone, Copy)]
233enum TimerKind {
234 Once(Duration),
235 Recurring(Duration),
236}
237
238pub struct TimerBuilder {
240 kind: TimerKind,
241 expiration_count: Option<usize>,
242 start_paused: bool,
243 events_enabled: bool,
244}
245
246#[derive(Clone)]
247pub struct Timer {
249 inner: Arc<TimerInner>,
250}
251
252impl Default for Timer {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258impl Timer {
259 pub fn new() -> Self {
261 Self::new_with_events(true)
262 }
263
264 pub fn new_silent() -> Self {
266 Self::new_with_events(false)
267 }
268
269 fn new_with_events(events_enabled: bool) -> Self {
270 let (completion_tx, _completion_rx) = watch::channel(None);
271 let (event_tx, _event_rx) = broadcast::channel(TIMER_EVENT_BUFFER);
272
273 Self {
274 inner: Arc::new(TimerInner {
275 state: Mutex::new(TimerState::Stopped),
276 handle: Mutex::new(None),
277 command_tx: Mutex::new(None),
278 interval: Mutex::new(Duration::ZERO),
279 expiration_count: Mutex::new(None),
280 statistics: Mutex::new(TimerStatistics::default()),
281 last_outcome: Mutex::new(None),
282 completion_tx,
283 event_tx,
284 events_enabled: AtomicBool::new(events_enabled),
285 runtime: driver::RuntimeHandle,
286 next_run_id: AtomicU64::new(1),
287 active_run_id: AtomicU64::new(0),
288 }),
289 }
290 }
291
292 pub fn once(delay: Duration) -> TimerBuilder {
294 TimerBuilder::once(delay)
295 }
296
297 pub fn recurring(interval: Duration) -> TimerBuilder {
299 TimerBuilder::recurring(interval)
300 }
301
302 pub fn subscribe(&self) -> TimerEvents {
304 TimerEvents {
305 receiver: self.inner.event_tx.subscribe(),
306 }
307 }
308
309 pub fn completion(&self) -> TimerCompletion {
311 TimerCompletion {
312 receiver: self.inner.completion_tx.subscribe(),
313 }
314 }
315
316 pub async fn start_once<F>(&self, delay: Duration, callback: F) -> Result<u64, TimerError>
318 where
319 F: TimerCallback + 'static,
320 {
321 self.start_internal(delay, callback, false, None, false)
322 .await
323 }
324
325 pub async fn start_once_fn<F, Fut>(
327 &self,
328 delay: Duration,
329 callback: F,
330 ) -> Result<u64, TimerError>
331 where
332 F: Fn() -> Fut + Send + Sync + 'static,
333 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
334 {
335 self.start_once(delay, callback).await
336 }
337
338 pub async fn start_recurring<F>(
340 &self,
341 interval: Duration,
342 callback: F,
343 expiration_count: Option<usize>,
344 ) -> Result<u64, TimerError>
345 where
346 F: TimerCallback + 'static,
347 {
348 self.start_internal(interval, callback, true, expiration_count, false)
349 .await
350 }
351
352 pub async fn start_recurring_fn<F, Fut>(
354 &self,
355 interval: Duration,
356 callback: F,
357 expiration_count: Option<usize>,
358 ) -> Result<u64, TimerError>
359 where
360 F: Fn() -> Fut + Send + Sync + 'static,
361 Fut: Future<Output = Result<(), TimerError>> + Send + 'static,
362 {
363 self.start_recurring(interval, callback, expiration_count)
364 .await
365 }
366
367 pub async fn pause(&self) -> Result<(), TimerError> {
369 self.ensure_not_reentrant(
370 "pause() cannot be awaited from the timer's active callback; use request_pause().",
371 )?;
372 self.request_pause().await
373 }
374
375 pub async fn request_pause(&self) -> Result<(), TimerError> {
377 let _run_id = self
378 .active_run_id()
379 .await
380 .ok_or_else(TimerError::not_running)?;
381 let mut state = self.inner.state.lock().await;
382 if *state != TimerState::Running {
383 return Err(TimerError::not_running());
384 }
385
386 *state = TimerState::Paused;
387 drop(state);
388
389 self.send_command(TimerCommand::Pause).await;
390
391 #[cfg(feature = "logging")]
392 debug!("Timer paused.");
393
394 Ok(())
395 }
396
397 pub async fn resume(&self) -> Result<(), TimerError> {
399 self.ensure_not_reentrant(
400 "resume() cannot be awaited from the timer's active callback; use request_resume().",
401 )?;
402 self.request_resume().await
403 }
404
405 pub async fn request_resume(&self) -> Result<(), TimerError> {
407 let _run_id = self
408 .active_run_id()
409 .await
410 .ok_or_else(TimerError::not_paused)?;
411 let mut state = self.inner.state.lock().await;
412 if *state != TimerState::Paused {
413 return Err(TimerError::not_paused());
414 }
415
416 *state = TimerState::Running;
417 drop(state);
418
419 self.send_command(TimerCommand::Resume).await;
420
421 #[cfg(feature = "logging")]
422 debug!("Timer resumed.");
423
424 Ok(())
425 }
426
427 pub async fn stop(&self) -> Result<TimerOutcome, TimerError> {
429 self.ensure_not_reentrant(
430 "stop() cannot be awaited from the timer's active callback; use request_stop().",
431 )?;
432 let run_id = self
433 .active_run_id()
434 .await
435 .ok_or_else(TimerError::not_running)?;
436 self.request_stop().await?;
437 self.join_run(run_id).await
438 }
439
440 pub async fn request_stop(&self) -> Result<(), TimerError> {
442 self.active_run_id()
443 .await
444 .ok_or_else(TimerError::not_running)?;
445 self.send_command(TimerCommand::Stop).await;
446 Ok(())
447 }
448
449 pub async fn cancel(&self) -> Result<TimerOutcome, TimerError> {
451 self.ensure_not_reentrant(
452 "cancel() cannot be awaited from the timer's active callback; use request_cancel().",
453 )?;
454 self.cancel_with_reason(TimerFinishReason::Cancelled).await
455 }
456
457 pub async fn request_cancel(&self) -> Result<(), TimerError> {
459 self.active_run_id()
460 .await
461 .ok_or_else(TimerError::not_running)?;
462 self.send_command(TimerCommand::Cancel).await;
463 Ok(())
464 }
465
466 pub async fn adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
468 self.ensure_not_reentrant(
469 "adjust_interval() cannot be awaited from the timer's active callback; use request_adjust_interval().",
470 )?;
471 self.request_adjust_interval(new_interval).await
472 }
473
474 pub async fn request_adjust_interval(&self, new_interval: Duration) -> Result<(), TimerError> {
476 if new_interval.is_zero() {
477 return Err(TimerError::invalid_parameter(
478 "Interval must be greater than zero.",
479 ));
480 }
481
482 let run_id = self
483 .active_run_id()
484 .await
485 .ok_or_else(TimerError::not_running)?;
486 *self.inner.interval.lock().await = new_interval;
487 self.send_command(TimerCommand::SetInterval(new_interval))
488 .await;
489 runtime::emit_event(
490 &self.inner,
491 TimerEvent::IntervalAdjusted {
492 run_id,
493 interval: new_interval,
494 },
495 );
496
497 #[cfg(feature = "logging")]
498 debug!("Timer interval adjusted.");
499
500 Ok(())
501 }
502
503 pub async fn join(&self) -> Result<TimerOutcome, TimerError> {
505 self.ensure_not_reentrant(
506 "join() cannot be awaited from the timer's active callback; use completion().wait() from another task instead.",
507 )?;
508 if let Some(run_id) = self.active_run_id().await {
509 return self.join_run(run_id).await;
510 }
511
512 self.inner
513 .last_outcome
514 .lock()
515 .await
516 .clone()
517 .ok_or_else(TimerError::not_running)
518 }
519
520 pub async fn wait(&self) {
522 let _ = self.join().await;
523 }
524
525 pub async fn get_statistics(&self) -> TimerStatistics {
527 self.inner.statistics.lock().await.clone()
528 }
529
530 pub async fn get_state(&self) -> TimerState {
532 *self.inner.state.lock().await
533 }
534
535 pub async fn get_interval(&self) -> Duration {
537 *self.inner.interval.lock().await
538 }
539
540 pub async fn get_expiration_count(&self) -> Option<usize> {
542 *self.inner.expiration_count.lock().await
543 }
544
545 pub async fn get_last_error(&self) -> Option<TimerError> {
547 self.inner.statistics.lock().await.last_error.clone()
548 }
549
550 pub async fn last_outcome(&self) -> Option<TimerOutcome> {
552 self.inner.last_outcome.lock().await.clone()
553 }
554
555 pub fn set_events_enabled(&self, enabled: bool) {
557 self.inner.events_enabled.store(enabled, Ordering::SeqCst);
558 }
559
560 async fn start_internal<F>(
561 &self,
562 interval: Duration,
563 callback: F,
564 recurring: bool,
565 expiration_count: Option<usize>,
566 start_paused: bool,
567 ) -> Result<u64, TimerError>
568 where
569 F: TimerCallback + 'static,
570 {
571 if interval.is_zero() {
572 return Err(TimerError::invalid_parameter(
573 "Interval must be greater than zero.",
574 ));
575 }
576
577 if recurring && matches!(expiration_count, Some(0)) {
578 return Err(TimerError::invalid_parameter(
579 "Expiration count must be greater than zero.",
580 ));
581 }
582
583 self.ensure_not_reentrant(
584 "starting a new run from the timer's active callback is not supported; spawn a separate task instead.",
585 )?;
586
587 let _ = self.cancel_with_reason(TimerFinishReason::Replaced).await;
588
589 let run_id = self.inner.next_run_id.fetch_add(1, Ordering::SeqCst);
590 let (tx, rx) = mpsc::unbounded_channel();
591
592 {
593 *self.inner.state.lock().await = if start_paused {
594 TimerState::Paused
595 } else {
596 TimerState::Running
597 };
598 *self.inner.command_tx.lock().await = Some(tx);
599 *self.inner.interval.lock().await = interval;
600 *self.inner.expiration_count.lock().await = expiration_count;
601 *self.inner.statistics.lock().await = TimerStatistics::default();
602 *self.inner.last_outcome.lock().await = None;
603 self.inner.completion_tx.send_replace(None);
604 }
605 self.inner.active_run_id.store(run_id, Ordering::SeqCst);
606
607 runtime::emit_event(
608 &self.inner,
609 TimerEvent::Started {
610 run_id,
611 interval,
612 recurring,
613 expiration_count,
614 },
615 );
616
617 let inner = Arc::clone(&self.inner);
618 let handle = self.inner.runtime.spawn(async move {
619 let scoped_inner = Arc::clone(&inner);
620 runtime::with_run_context(&scoped_inner, run_id, async move {
621 runtime::run_timer(
622 inner,
623 run_id,
624 interval,
625 recurring,
626 expiration_count,
627 callback,
628 rx,
629 )
630 .await;
631 })
632 .await;
633 });
634
635 *self.inner.handle.lock().await = Some(handle);
636
637 #[cfg(feature = "logging")]
638 debug!("Timer started.");
639
640 Ok(run_id)
641 }
642
643 async fn active_run_id(&self) -> Option<u64> {
644 match self.inner.active_run_id.load(Ordering::SeqCst) {
645 0 => None,
646 run_id => Some(run_id),
647 }
648 }
649
650 async fn send_command(&self, command: TimerCommand) {
651 if let Some(tx) = self.inner.command_tx.lock().await.as_ref() {
652 let _ = tx.send(command);
653 }
654 }
655
656 fn ensure_not_reentrant(&self, message: &'static str) -> Result<(), TimerError> {
657 if runtime::is_current_run(&self.inner) {
658 return Err(TimerError::reentrant_operation(message));
659 }
660
661 Ok(())
662 }
663
664 async fn cancel_with_reason(
665 &self,
666 reason: TimerFinishReason,
667 ) -> Result<TimerOutcome, TimerError> {
668 let run_id = self
669 .active_run_id()
670 .await
671 .ok_or_else(TimerError::not_running)?;
672
673 let _ = self.inner.command_tx.lock().await.take();
674 let handle = self.inner.handle.lock().await.take();
675 *self.inner.state.lock().await = TimerState::Stopped;
676
677 if let Some(handle) = handle {
678 handle.abort();
679 let _ = handle.await;
680 }
681
682 let statistics = self.get_statistics().await;
683 let outcome = TimerOutcome {
684 run_id,
685 reason,
686 statistics,
687 };
688
689 runtime::finish_run(&self.inner, outcome.clone()).await;
690 Ok(outcome)
691 }
692
693 async fn join_run(&self, run_id: u64) -> Result<TimerOutcome, TimerError> {
694 let mut completion_rx = self.inner.completion_tx.subscribe();
695
696 loop {
697 if let Some(outcome) = completion_rx.borrow().clone() {
698 if outcome.run_id == run_id {
699 return Ok(outcome);
700 }
701 }
702
703 if completion_rx.changed().await.is_err() {
704 return completion_rx
705 .borrow()
706 .clone()
707 .ok_or_else(TimerError::not_running);
708 }
709 }
710 }
711}
712
713impl TimerBuilder {
714 pub fn once(delay: Duration) -> Self {
716 Self {
717 kind: TimerKind::Once(delay),
718 expiration_count: None,
719 start_paused: false,
720 events_enabled: true,
721 }
722 }
723
724 pub fn recurring(interval: Duration) -> Self {
726 Self {
727 kind: TimerKind::Recurring(interval),
728 expiration_count: None,
729 start_paused: false,
730 events_enabled: true,
731 }
732 }
733
734 pub fn expiration_count(mut self, expiration_count: usize) -> Self {
736 self.expiration_count = Some(expiration_count);
737 self
738 }
739
740 pub fn paused_start(mut self) -> Self {
742 self.start_paused = true;
743 self
744 }
745
746 pub fn with_events_disabled(mut self) -> Self {
748 self.events_enabled = false;
749 self
750 }
751
752 pub async fn start<F>(self, callback: F) -> Result<Timer, TimerError>
754 where
755 F: TimerCallback + 'static,
756 {
757 let timer = Timer::new_with_events(self.events_enabled);
758 if self.start_paused {
759 *timer.inner.state.lock().await = TimerState::Paused;
760 }
761
762 match self.kind {
763 TimerKind::Once(delay) => {
764 let _ = timer
765 .start_internal(delay, callback, false, None, self.start_paused)
766 .await?;
767 }
768 TimerKind::Recurring(interval) => {
769 let _ = timer
770 .start_internal(
771 interval,
772 callback,
773 true,
774 self.expiration_count,
775 self.start_paused,
776 )
777 .await?;
778 }
779 }
780 Ok(timer)
781 }
782}