1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//! A simple timer, used to enqueue operations meant to be executed at
//! a given time or after a given delay.

mod shutdown;

use std::future::Future;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;

use chrono::{NaiveDateTime, Utc};
pub use shutdown::Shutdown;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::time::Instant;

type Result<T> = core::result::Result<T, TimerError>;

#[derive(Debug, Error)]
pub enum TimerError {
    #[error("operation failed because the timer was closed")]
    Closed,
}

/// A timer, used to schedule execution of callbacks in a given future.
pub struct Timer<F>
where
    F: FnMut() -> Option<NaiveDateTime>,
    F: Send + Sync,
{
    callback: F,
    deadline: Instant,
    watchdog: (mpsc::Sender<NaiveDateTime>, mpsc::Receiver<NaiveDateTime>),
    shutdown: Shutdown,
}

impl<F> Timer<F>
where
    F: FnMut() -> Option<NaiveDateTime>,
    F: Send + Sync,
{
    /// Creates a Timer instance.
    pub fn new(callback: F, shutdown: Shutdown) -> Self {
        let watchdog = mpsc::channel(1);

        Self {
            callback,
            // Since it's the first call, it starts sleeping forever.
            deadline: far_future(),
            watchdog,
            shutdown,
        }
    }

    /// Schedule for execution after a delay.
    pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
        self.scheduler().schedule(deadline).await
    }

    /// Creates a handler to schedule new executions.
    pub fn scheduler(&self) -> Scheduler {
        Scheduler(self.watchdog.0.clone())
    }
}

impl<F> Future for Timer<F>
where
    F: FnMut() -> Option<NaiveDateTime>,
    F: Send + Sync + Unpin,
{
    type Output = ();

    fn poll(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output> {
        let poll_fn = async move {
            let Self {
                callback,
                deadline,
                watchdog: (_, watchdog),
                shutdown,
            } = self.as_mut().get_mut();

            let sleep = tokio::time::sleep_until(*deadline);
            tokio::pin!(sleep);

            // TODO(fixme): the sleep only wakes up when the time's up and receive
            // a new deadline which is earlier than previous one.
            loop {
                tracing::debug!(
                    "sleeping for {} secs",
                    deadline.duration_since(Instant::now()).as_secs()
                );

                tokio::select! {
                    // Wait for the next run.
                    () = &mut sleep => {
                        let deadline = if let Some(new_deadline) = callback() {
                            let duration = Utc::now().naive_utc() - new_deadline;
                            let Ok(duration) = duration.to_std() else {
                                continue;
                            };

                            Instant::now() + duration
                        } else {
                            far_future()
                        };

                        sleep.as_mut().reset(deadline);
                    },
                    // Wait for a new deadline.
                    Some(new_deadline) = watchdog.recv() => {
                        let new_duration = new_deadline - Utc::now().naive_utc();

                        let Ok(new_duration) = new_duration.to_std() else {
                            tracing::debug!("failed to set a timer for past");
                            continue;
                        };

                        tracing::debug!("time will be run {} secs from now", new_duration.as_secs());

                        // Change the sleep time for next iteration.
                        let deadline = Instant::now() + new_duration;
                        sleep.as_mut().reset(deadline);
                    },
                    // Stop if shutdown signal is received.
                    _ = shutdown.recv() => {
                        tracing::debug!("received a SIGINT");
                        break;
                    },
                }
            }
        };

        pin!(poll_fn).poll(cx)
    }
}

#[derive(Clone)]
pub struct Scheduler(mpsc::Sender<NaiveDateTime>);

impl Scheduler {
    pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
        self.0
            .send(deadline)
            .await
            .map_err(|_| TimerError::Closed)?;

        tracing::debug!("scheduled a new execution for {}", deadline);

        Ok(())
    }
}

pub(crate) fn far_future() -> Instant {
    // Roughly 30 years from now.
    //
    // API does not provide a way to obtain max `Instant` or convert specific
    // date in the future to instant. 1000 years overflows on macOS, 100 years
    // overflows on FreeBSD.
    Instant::now() + Duration::from_secs(86400 * 365 * 30)
}