1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//! A simple timer, used to enqueue operations meant to be executed at
//! a given time or after a given delay.

use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::time::Duration;

use chrono::{NaiveDateTime, Utc};
use futures::FutureExt as _;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::time::Instant;

type Result<T> = core::result::Result<T, TimerError>;

#[derive(Debug, Error)]
pub enum TimerError {
    #[error("operation failed because the timer was closed")]
    Closed,
}

/// A timer, used to schedule execution of callbacks in a given future.
///
/// If the callback does not provide the new time period, the next execution
/// will be scheduled for a distant future.
pub struct Timer {
    callback: Pin<Box<dyn Fn() -> Option<NaiveDateTime> + Send>>,
    deadline: Instant,
    watchdog: (mpsc::Sender<NaiveDateTime>, mpsc::Receiver<NaiveDateTime>),
    shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

impl Timer {
    /// Creates a Timer instance.
    pub fn new(
        callback: impl Fn() -> Option<NaiveDateTime> + Send + 'static,
    ) -> Self {
        let watchdog = mpsc::channel(1);

        Self {
            callback: Box::pin(callback),
            // Since it's the first call, it starts sleeping forever.
            deadline: far_future(),
            watchdog,
            shutdown: None,
        }
    }

    pub fn with_graceful_shutdown<O>(
        self,
        shutdown: impl Future<Output = O> + Send + 'static,
    ) -> Self {
        Self {
            shutdown: Some(Box::pin(shutdown.map(|_| ()))),
            ..self
        }
    }

    /// Schedule for execution after a delay.
    pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
        self.scheduler().schedule(deadline).await
    }

    /// Creates a handler to schedule new executions.
    pub fn scheduler(&self) -> Scheduler {
        Scheduler(self.watchdog.0.clone())
    }
}

impl IntoFuture for Timer {
    type Output = ();
    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output>>>;

    fn into_future(self) -> Self::IntoFuture {
        async move {
            let Self {
                callback,
                deadline,
                watchdog: (_, watchdog),
                shutdown,
            } = self;

            let sleep = tokio::time::sleep_until(deadline);
            let mut shutdown = shutdown.unwrap_or_else(|| Box::pin(futures::future::pending()));

            futures::pin_mut!(sleep);
            futures::pin_mut!(watchdog);

            loop {
                let duration = sleep.deadline() - Instant::now();
                tracing::trace!("sleeping for {} secs", duration.as_secs());

                tokio::select! {
                    // Wait for a new deadline.
                    Some(new_deadline) = watchdog.recv() => {
                        let new_duration = new_deadline - Utc::now().naive_utc();
                        let Ok(new_duration) = new_duration.to_std() else {
                            tracing::trace!("unable to schedule a timer for a time in the past");
                            continue;
                        };

                        tracing::trace!("task will be executed {} secs from now", new_duration.as_secs());

                        // Change the sleep time for next iteration.
                        let deadline = Instant::now() + new_duration;
                        sleep.as_mut().reset(deadline);
                    },
                    // Wait for the next run.
                    () = &mut sleep => {
                        tracing::trace!("timer elapsed");
                        let deadline = if let Some(new_deadline) = (callback)() {
                            let duration = Utc::now().naive_utc() - new_deadline;
                            let Ok(duration) = duration.to_std() else {
                                continue;
                            };

                            Instant::now() + duration
                        } else {
                            far_future()
                        };

                        sleep.as_mut().reset(deadline);
                    },
                    _ = &mut shutdown => {
                        tracing::trace!("received shutdown signal");
                        break;
                    }
                }
            }
        }.boxed()
    }
}

#[derive(Clone)]
pub struct Scheduler(mpsc::Sender<NaiveDateTime>);

impl Scheduler {
    pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
        self.0
            .send(deadline)
            .await
            .map_err(|_| TimerError::Closed)?;

        tracing::trace!("scheduled a new execution for {}", deadline);

        Ok(())
    }
}

pub(crate) fn far_future() -> Instant {
    // Roughly 30 years from now.
    //
    // API does not provide a way to obtain max `Instant` or convert specific
    // date in the future to instant. 1000 years overflows on macOS, 100 years
    // overflows on FreeBSD.
    Instant::now() + Duration::from_secs(86400 * 365 * 30)
}