timer_rs/
lib.rs

1//! A simple timer, used to enqueue operations meant to be executed at
2//! a given time or after a given delay.
3
4use std::future::{Future, IntoFuture};
5use std::pin::Pin;
6use std::time::Duration;
7
8use chrono::{NaiveDateTime, Utc};
9use futures::FutureExt as _;
10use thiserror::Error;
11use tokio::sync::mpsc;
12use tokio::time::Instant;
13
14type Result<T> = core::result::Result<T, TimerError>;
15
16#[derive(Debug, Error)]
17pub enum TimerError {
18    #[error("operation failed because the timer was closed")]
19    Closed,
20}
21
22/// A timer, used to schedule execution of callbacks in a given future.
23///
24/// If the callback does not provide the new time period, the next execution
25/// will be scheduled for a distant future.
26pub struct Timer {
27    callback: Pin<Box<dyn Fn() -> Option<NaiveDateTime> + Send>>,
28    deadline: Instant,
29    watchdog: (mpsc::Sender<NaiveDateTime>, mpsc::Receiver<NaiveDateTime>),
30    shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
31}
32
33impl Timer {
34    /// Creates a Timer instance.
35    pub fn new(
36        callback: impl Fn() -> Option<NaiveDateTime> + Send + 'static,
37    ) -> Self {
38        let watchdog = mpsc::channel(1);
39
40        Self {
41            callback: Box::pin(callback),
42            // Since it's the first call, it starts sleeping forever.
43            deadline: far_future(),
44            watchdog,
45            shutdown: None,
46        }
47    }
48
49    pub fn with_graceful_shutdown<O>(
50        self,
51        shutdown: impl Future<Output = O> + Send + 'static,
52    ) -> Self {
53        Self {
54            shutdown: Some(Box::pin(shutdown.map(|_| ()))),
55            ..self
56        }
57    }
58
59    /// Schedule for execution after a delay.
60    pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
61        self.scheduler().schedule(deadline).await
62    }
63
64    /// Creates a handler to schedule new executions.
65    pub fn scheduler(&self) -> Scheduler {
66        Scheduler(self.watchdog.0.clone())
67    }
68}
69
70impl IntoFuture for Timer {
71    type Output = ();
72    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
73
74    fn into_future(self) -> Self::IntoFuture {
75        async move {
76            let Self {
77                callback,
78                deadline,
79                watchdog: (_, watchdog),
80                shutdown,
81            } = self;
82
83            let sleep = tokio::time::sleep_until(deadline);
84            let mut shutdown = shutdown.unwrap_or_else(|| Box::pin(futures::future::pending()));
85
86            futures::pin_mut!(sleep);
87            futures::pin_mut!(watchdog);
88
89            loop {
90                let duration = sleep.deadline() - Instant::now();
91                tracing::trace!("sleeping for {} secs", duration.as_secs());
92
93                tokio::select! {
94                    // Wait for a new deadline.
95                    Some(new_deadline) = watchdog.recv() => {
96                        let new_duration = new_deadline - Utc::now().naive_utc();
97                        let Ok(new_duration) = new_duration.to_std() else {
98                            tracing::trace!("unable to schedule a timer for a time in the past");
99                            continue;
100                        };
101
102                        tracing::trace!("task will be executed {} secs from now", new_duration.as_secs());
103
104                        // Change the sleep time for next iteration.
105                        let deadline = Instant::now() + new_duration;
106                        sleep.as_mut().reset(deadline);
107                    },
108                    // Wait for the next run.
109                    () = &mut sleep => {
110                        tracing::trace!("timer elapsed");
111                        let deadline = if let Some(new_deadline) = (callback)() {
112                            let duration = Utc::now().naive_utc() - new_deadline;
113                            let Ok(duration) = duration.to_std() else {
114                                continue;
115                            };
116
117                            Instant::now() + duration
118                        } else {
119                            far_future()
120                        };
121
122                        sleep.as_mut().reset(deadline);
123                    },
124                    _ = &mut shutdown => {
125                        tracing::trace!("received shutdown signal");
126                        break;
127                    }
128                }
129            }
130        }.boxed()
131    }
132}
133
134#[derive(Clone)]
135pub struct Scheduler(mpsc::Sender<NaiveDateTime>);
136
137impl Scheduler {
138    pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
139        self.0
140            .send(deadline)
141            .await
142            .map_err(|_| TimerError::Closed)?;
143
144        tracing::trace!("scheduled a new execution for {}", deadline);
145
146        Ok(())
147    }
148
149    pub fn blocking_schedule(&self, deadline: NaiveDateTime) -> Result<()> {
150        self.0
151            .blocking_send(deadline)
152            .map_err(|_| TimerError::Closed)?;
153
154        tracing::trace!("scheduled a new execution for {}", deadline);
155
156        Ok(())
157    }
158}
159
160pub(crate) fn far_future() -> Instant {
161    // Roughly 30 years from now.
162    //
163    // API does not provide a way to obtain max `Instant` or convert specific
164    // date in the future to instant. 1000 years overflows on macOS, 100 years
165    // overflows on FreeBSD.
166    Instant::now() + Duration::from_secs(86400 * 365 * 30)
167}