1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
//! A simple timer, used to enqueue operations meant to be executed at
//! a given time or after a given delay.
mod shutdown;
use std::future::Future;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;
use chrono::{NaiveDateTime, Utc};
pub use shutdown::Shutdown;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::time::Instant;
type Result<T> = core::result::Result<T, TimerError>;
#[derive(Debug, Error)]
pub enum TimerError {
#[error("operation failed because the timer was closed")]
Closed,
}
/// A timer, used to schedule execution of callbacks in a given future.
pub struct Timer<F>
where
F: FnMut() -> Option<NaiveDateTime>,
F: Send + Sync,
{
callback: F,
deadline: Instant,
watchdog: (mpsc::Sender<NaiveDateTime>, mpsc::Receiver<NaiveDateTime>),
shutdown: Shutdown,
}
impl<F> Timer<F>
where
F: FnMut() -> Option<NaiveDateTime>,
F: Send + Sync,
{
/// Creates a Timer instance.
pub fn new(callback: F, shutdown: Shutdown) -> Self {
let watchdog = mpsc::channel(1);
Self {
callback,
// Since it's the first call, it starts sleeping forever.
deadline: far_future(),
watchdog,
shutdown,
}
}
/// Schedule for execution after a delay.
pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
self.scheduler().schedule(deadline).await
}
/// Creates a handler to schedule new executions.
pub fn scheduler(&self) -> Scheduler {
Scheduler(self.watchdog.0.clone())
}
}
impl<F> Future for Timer<F>
where
F: FnMut() -> Option<NaiveDateTime>,
F: Send + Sync + Unpin,
{
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let poll_fn = async move {
let Self {
callback,
deadline,
watchdog: (_, watchdog),
shutdown,
} = self.as_mut().get_mut();
let sleep = tokio::time::sleep_until(*deadline);
tokio::pin!(sleep);
// TODO(fixme): the sleep only wakes up when the time's up and receive
// a new deadline which is earlier than previous one.
loop {
tracing::debug!(
"sleeping for {} secs",
deadline.duration_since(Instant::now()).as_secs()
);
tokio::select! {
// Wait for the next run.
() = &mut sleep => {
let deadline = if let Some(new_deadline) = callback() {
let duration = Utc::now().naive_utc() - new_deadline;
let Ok(duration) = duration.to_std() else {
continue;
};
Instant::now() + duration
} else {
far_future()
};
sleep.as_mut().reset(deadline);
},
// Wait for a new deadline.
Some(new_deadline) = watchdog.recv() => {
let new_duration = new_deadline - Utc::now().naive_utc();
let Ok(new_duration) = new_duration.to_std() else {
tracing::debug!("failed to set a timer for past");
continue;
};
tracing::debug!("time will be run {} secs from now", new_duration.as_secs());
// Change the sleep time for next iteration.
let deadline = Instant::now() + new_duration;
sleep.as_mut().reset(deadline);
},
// Stop if shutdown signal is received.
_ = shutdown.recv() => {
tracing::debug!("received a SIGINT");
break;
},
}
}
};
pin!(poll_fn).poll(cx)
}
}
#[derive(Clone)]
pub struct Scheduler(mpsc::Sender<NaiveDateTime>);
impl Scheduler {
pub async fn schedule(&self, deadline: NaiveDateTime) -> Result<()> {
self.0
.send(deadline)
.await
.map_err(|_| TimerError::Closed)?;
tracing::debug!("scheduled a new execution for {}", deadline);
Ok(())
}
}
pub(crate) fn far_future() -> Instant {
// Roughly 30 years from now.
//
// API does not provide a way to obtain max `Instant` or convert specific
// date in the future to instant. 1000 years overflows on macOS, 100 years
// overflows on FreeBSD.
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}