use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::time::delay_for;
use os_clock::{self, Clock, ThreadCPUClock};
use futures::executor::block_on;
async fn delay_until_clock<C: Clock>(clock: C, timeout_at: Duration) -> std::io::Result<()> {
loop {
let current = clock.get_time()?;
if current >= timeout_at {
return Ok(());
}
delay_for(timeout_at - current).await;
}
}
pub struct TimeoutListener(Box<dyn (FnOnce()) + Send>);
impl<L: (FnOnce()) + Send> From<L> for TimeoutListener
where
L: 'static,
{
fn from(listener: L) -> TimeoutListener {
TimeoutListener(Box::new(listener))
}
}
struct TimeoutRequest<C: Clock> {
cancel_rx: oneshot::Receiver<()>,
result_tx: oneshot::Sender<TimeoutResult>,
on_timeout: TimeoutListener,
clock: C,
timeout_at: Duration,
}
impl<C: Clock> TimeoutRequest<C> {
async fn run(self) {
let on_timeout = self.on_timeout;
let cancel_rx = self.cancel_rx;
let result = tokio::select! {
Ok(_) = cancel_rx => {
TimeoutResult::Cancelled
},
result = delay_until_clock(self.clock, self.timeout_at) => {
match result {
Ok(_) => {
on_timeout.0();
TimeoutResult::TimedOut
},
Err(err) => TimeoutResult::StatusError(err)
}
}
};
let _ = self.result_tx.send(result);
}
}
pub enum TimeoutResult {
TimedOut,
Cancelled,
StatusError(std::io::Error),
}
pub struct TimeoutHandle {
cancel_tx: oneshot::Sender<()>,
result_rx: oneshot::Receiver<TimeoutResult>,
}
impl TimeoutHandle {
pub fn cancel_ignored(self) {
let _ = self.cancel_tx.send(());
}
pub async fn cancel(self) -> TimeoutResult {
let _ = self.cancel_tx.send(());
self.result_rx.await.unwrap()
}
pub fn cancel_sync(self) -> TimeoutResult {
block_on(self.cancel())
}
}
pub struct Timer<C: Clock>
where
C: 'static,
{
tx: mpsc::UnboundedSender<TimeoutRequest<C>>,
}
impl<C: Clock> Timer<C> {
pub fn set_timeout<T: Into<TimeoutListener>>(
&self,
clock: C,
duration: Duration,
callback: T,
) -> TimeoutHandle {
let timeout_at = clock.get_time().unwrap() + duration;
let (cancel_tx, cancel_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
self.tx
.send(TimeoutRequest {
cancel_rx,
result_tx,
timeout_at,
on_timeout: callback.into(),
clock,
})
.map_err(|_| {})
.unwrap();
return TimeoutHandle {
cancel_tx,
result_rx,
};
}
pub fn spawn() -> Self {
let (timeout_req_tx, mut timeout_req_rx) = mpsc::unbounded_channel::<TimeoutRequest<C>>();
tokio::spawn(async move {
while let Some(timeout) = timeout_req_rx.recv().await {
tokio::spawn(timeout.run());
}
});
return Timer { tx: timeout_req_tx };
}
}
impl Timer<ThreadCPUClock> {
pub fn set_timeout_on_current_thread_cpu_usage<T: Into<TimeoutListener>>(
&self,
duration: Duration,
callback: T,
) -> TimeoutHandle {
let clock = os_clock::cpu_clock_for_current_thread().unwrap();
self.set_timeout(clock, duration, callback)
}
}