1use std::future::{Future, IntoFuture};
5use std::pin::Pin;
6use std::time::Duration;
7
8use chrono::{NaiveDateTime, Utc};
9use futures::FutureExt as _;
10use thiserror::Error;
11use tokio::sync::mpsc;
12use tokio::time::Instant;
13
14type Result<T> = core::result::Result<T, TimerError>;
15
16#[derive(Debug, Error)]
17pub enum TimerError {
18 #[error("operation failed because the timer was closed")]
19 Closed,
20}
21
22pub struct Timer {
27 callback: Pin<Box<dyn Fn() -> Option<NaiveDateTime> + Send>>,
28 deadline: Instant,
29 watchdog: (mpsc::Sender<NaiveDateTime>, mpsc::Receiver<NaiveDateTime>),
30 shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
31}
32
33impl Timer {
34 pub fn new(
36 callback: impl Fn() -> Option<NaiveDateTime> + Send + 'static,
37 ) -> Self {
38 let watchdog = mpsc::channel(1);
39
40 Self {
41 callback: Box::pin(callback),
42 deadline: far_future(),
44 watchdog,
45 shutdown: None,
46 }
47 }
48
49 pub fn with_graceful_shutdown<O>(
50 self,
51 shutdown: impl Future<Output = O> + Send + 'static,
52 ) -> Self {
53 Self {
54 shutdown: Some(Box::pin(shutdown.map(|_| ()))),
55 ..self
56 }
57 }
58
59 pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
61 self.scheduler().schedule(deadline).await
62 }
63
64 pub fn scheduler(&self) -> Scheduler {
66 Scheduler(self.watchdog.0.clone())
67 }
68}
69
70impl IntoFuture for Timer {
71 type Output = ();
72 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
73
74 fn into_future(self) -> Self::IntoFuture {
75 async move {
76 let Self {
77 callback,
78 deadline,
79 watchdog: (_, watchdog),
80 shutdown,
81 } = self;
82
83 let sleep = tokio::time::sleep_until(deadline);
84 let mut shutdown = shutdown.unwrap_or_else(|| Box::pin(futures::future::pending()));
85
86 futures::pin_mut!(sleep);
87 futures::pin_mut!(watchdog);
88
89 loop {
90 let duration = sleep.deadline() - Instant::now();
91 tracing::trace!("sleeping for {} secs", duration.as_secs());
92
93 tokio::select! {
94 Some(new_deadline) = watchdog.recv() => {
96 let new_duration = new_deadline - Utc::now().naive_utc();
97 let Ok(new_duration) = new_duration.to_std() else {
98 tracing::trace!("unable to schedule a timer for a time in the past");
99 continue;
100 };
101
102 tracing::trace!("task will be executed {} secs from now", new_duration.as_secs());
103
104 let deadline = Instant::now() + new_duration;
106 sleep.as_mut().reset(deadline);
107 },
108 () = &mut sleep => {
110 tracing::trace!("timer elapsed");
111 let deadline = if let Some(new_deadline) = (callback)() {
112 let duration = Utc::now().naive_utc() - new_deadline;
113 let Ok(duration) = duration.to_std() else {
114 continue;
115 };
116
117 Instant::now() + duration
118 } else {
119 far_future()
120 };
121
122 sleep.as_mut().reset(deadline);
123 },
124 _ = &mut shutdown => {
125 tracing::trace!("received shutdown signal");
126 break;
127 }
128 }
129 }
130 }.boxed()
131 }
132}
133
134#[derive(Clone)]
135pub struct Scheduler(mpsc::Sender<NaiveDateTime>);
136
137impl Scheduler {
138 pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
139 self.0
140 .send(deadline)
141 .await
142 .map_err(|_| TimerError::Closed)?;
143
144 tracing::trace!("scheduled a new execution for {}", deadline);
145
146 Ok(())
147 }
148
149 pub fn blocking_schedule(&self, deadline: NaiveDateTime) -> Result<()> {
150 self.0
151 .blocking_send(deadline)
152 .map_err(|_| TimerError::Closed)?;
153
154 tracing::trace!("scheduled a new execution for {}", deadline);
155
156 Ok(())
157 }
158}
159
160pub(crate) fn far_future() -> Instant {
161 Instant::now() + Duration::from_secs(86400 * 365 * 30)
167}