#![allow(dead_code)]
use tokio::{
sync::mpsc::{
channel,
Receiver,
Sender,
},
task::JoinHandle,
time::{
Duration,
Instant,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OnConflict {
Ignore,
Overwrite,
Min,
Max,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ControlMessage {
Set {
deadline: Instant,
on_conflict: OnConflict,
},
Clear,
}
async fn deadline_clock_task(
event_tx: Sender<Instant>,
mut control_rx: Receiver<ControlMessage>,
) {
let mut active = false;
let sleep = tokio::time::sleep(Duration::new(0, 0));
tokio::pin!(sleep);
loop {
tokio::select! {
ctrl = control_rx.recv() => {
match ctrl {
Some(ControlMessage::Set {deadline, on_conflict}) => {
if !active {
sleep.as_mut().reset(deadline);
} else {
match on_conflict {
OnConflict::Ignore => {},
OnConflict::Overwrite => {
sleep.as_mut().reset(deadline);
},
OnConflict::Min => {
let new_deadline = sleep.deadline().min(deadline);
sleep.as_mut().reset(new_deadline);
},
OnConflict::Max => {
let new_deadline = sleep.deadline().max(deadline);
sleep.as_mut().reset(new_deadline);
},
}
}
active = true;
}
Some(ControlMessage::Clear) => {
active = false;
}
None => break
}
}
_ = &mut sleep, if active => {
active = false;
if event_tx.send(sleep.deadline()).await.is_err() {
break;
}
}
}
}
}
pub struct DeadlineClock {
event: Receiver<Instant>,
control: Sender<ControlMessage>,
_handle: JoinHandle<()>,
}
impl DeadlineClock {
pub fn new() -> Self {
let (event_tx, event_rx) = channel(1);
let (control_tx, control_rx) = channel(1);
let handle = tokio::spawn(deadline_clock_task(event_tx, control_rx));
Self {
event: event_rx,
control: control_tx,
_handle: handle,
}
}
pub async fn wait(&mut self) -> Instant {
self.event
.recv()
.await
.expect("Deadline clock task has panicked")
}
pub async fn set_deadline(&self, deadline: Instant, on_conflict: OnConflict) {
self.control
.send(ControlMessage::Set {
deadline,
on_conflict,
})
.await
.expect("Deadline clock task has panicked");
}
pub async fn set_timeout(&self, after: Duration, on_conflict: OnConflict) {
self.set_deadline(Instant::now() + after, on_conflict).await;
}
pub async fn clear(&self) {
self.control
.send(ControlMessage::Clear)
.await
.expect("Deadline clock task has panicked");
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::{
sync::mpsc,
time::{
self,
timeout,
Duration,
},
};
#[tokio::test]
async fn deadline_clock_realtime() {
let mut c = DeadlineClock::new();
c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite)
.await;
timeout(Duration::from_millis(20), c.wait())
.await
.expect("Timed out");
c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite)
.await;
c.clear().await;
timeout(Duration::from_millis(20), c.wait())
.await
.expect_err("Completed unexpectedly");
c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite)
.await;
c.set_timeout(Duration::from_millis(100), OnConflict::Overwrite)
.await;
timeout(Duration::from_millis(20), c.wait())
.await
.expect_err("Completed unexpectedly");
timeout(Duration::from_millis(100), c.wait())
.await
.expect("Timed out");
}
#[tokio::test(start_paused = true)]
async fn deadline_clock_mocktime_expiration() {
let mut c = DeadlineClock::new();
c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite)
.await;
assert_eq!(c.event.try_recv(), Err(mpsc::error::TryRecvError::Empty));
time::sleep(Duration::from_millis(5)).await;
assert_eq!(c.event.try_recv(), Err(mpsc::error::TryRecvError::Empty));
time::sleep(Duration::from_millis(10)).await;
assert!(c.event.try_recv().is_ok(),);
}
#[tokio::test(start_paused = true)]
async fn deadline_clock_setting_deadline_to_past_triggers_it() {
let mut c = DeadlineClock::new();
let in_past1 = Instant::now() - Duration::from_millis(200);
let in_past2 = Instant::now() - Duration::from_millis(100);
c.set_deadline(in_past1, OnConflict::Overwrite).await;
time::sleep(Duration::from_millis(1)).await;
assert!(c.event.try_recv().is_ok(),);
c.set_deadline(in_past2, OnConflict::Overwrite).await;
time::sleep(Duration::from_millis(1)).await;
assert!(c.event.try_recv().is_ok(),);
}
}