#![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs, missing_debug_implementations)]
#![doc(test(attr(deny(warnings))))]
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
#[cfg(feature = "async-std")]
use async_std::task;
use futures_intrusive::{
buffer::{FixedHeapBuf, GrowingHeapBuf, RingBuf},
channel::shared::{generic_channel, ChannelReceiveFuture, GenericReceiver},
};
use futures_timer::Delay;
use pin_project_lite::pin_project;
use std::{
error::Error,
fmt::{self, Display},
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
#[cfg(feature = "tokio")]
use tokio::task;
pub use futures_intrusive::channel::shared::{GenericSender, Receiver};
#[derive(Debug, Clone)]
pub struct DelayQueue<T: 'static, A: RingBuf<Item = T>> {
expired: GenericSender<parking_lot::RawMutex, T, A>,
}
#[derive(Debug)]
pub struct DelayHandle {
reset: GenericSender<parking_lot::RawMutex, DelayReset, FixedHeapBuf<DelayReset>>,
}
enum DelayReset {
NewDuration(Duration),
Cancel,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ErrorAlreadyExpired {}
impl Error for ErrorAlreadyExpired {
fn description(&self) -> &str {
"delay already expired"
}
}
impl Display for ErrorAlreadyExpired {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Delay already expired")
}
}
impl DelayHandle {
pub async fn reset_at(self, when: Instant) -> Result<Self, ErrorAlreadyExpired> {
let now = Instant::now();
let dur = if when <= now {
Duration::from_nanos(0)
} else {
when - now
};
self.reset(dur).await
}
pub async fn reset(self, dur: Duration) -> Result<Self, ErrorAlreadyExpired> {
self.reset
.send(DelayReset::NewDuration(dur))
.await
.map_err(|_| ErrorAlreadyExpired {})?;
Ok(self)
}
pub async fn cancel(self) -> Result<(), ErrorAlreadyExpired> {
self.reset
.send(DelayReset::Cancel)
.await
.map_err(|_| ErrorAlreadyExpired {})
}
}
pub fn delay_queue<T: 'static + Send>() -> (
DelayQueue<T, GrowingHeapBuf<T>>,
GenericReceiver<parking_lot::RawMutex, T, GrowingHeapBuf<T>>,
) {
let (tx, rx) = generic_channel(0);
(DelayQueue { expired: tx }, rx)
}
pin_project! {
struct DelayedItem<T> {
value: Option<T>,
delay: Delay,
reset_rx: GenericReceiver<parking_lot::RawMutex, DelayReset, FixedHeapBuf<DelayReset>>,
reset: ChannelReceiveFuture<parking_lot::RawMutex, DelayReset>,
handle_dropped: bool,
}
}
impl<T> Future for DelayedItem<T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.handle_dropped {
while let Poll::Ready(v) = unsafe { Pin::new_unchecked(&mut self.reset).poll(cx) } {
match v {
Some(reset) => match reset {
DelayReset::Cancel => return Poll::Ready(None),
DelayReset::NewDuration(dur) => self.delay = Delay::new(dur),
},
None => {
self.handle_dropped = true;
break;
}
}
self.reset = self.reset_rx.receive();
}
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => Poll::Ready(self.value.take()),
Poll::Pending => Poll::Pending,
}
}
}
impl<T, A> DelayQueue<T, A>
where
T: 'static + Send,
A: 'static + RingBuf<Item = T> + Send,
{
pub fn insert(&self, value: T, dur: Duration) -> DelayHandle {
self.new_handle_with_future(value, dur)
}
pub fn insert_at(&self, value: T, when: Instant) -> DelayHandle {
let now = Instant::now();
let dur = if now >= when {
Duration::from_nanos(0)
} else {
when - now
};
self.new_handle_with_future(value, dur)
}
fn new_handle_with_future(&self, value: T, dur: Duration) -> DelayHandle {
let (reset_tx, reset_rx) = generic_channel::<parking_lot::RawMutex, _, FixedHeapBuf<_>>(0);
let expired = self.expired.clone();
let reset = reset_rx.receive();
let delayed_item = DelayedItem {
value: Some(value),
delay: Delay::new(dur),
reset_rx,
reset,
handle_dropped: false,
};
task::spawn(async move {
if let Some(v) = delayed_item.await {
let _ = expired.send(v).await;
}
});
DelayHandle { reset: reset_tx }
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::future::timeout;
#[async_std::test]
async fn insert() {
let (delay_queue, rx) = delay_queue::<i32>();
delay_queue.insert(1, Duration::from_millis(10));
delay_queue.insert(2, Duration::from_millis(5));
assert_eq!(
timeout(Duration::from_millis(8), rx.receive()).await,
Ok(Some(2))
);
assert_eq!(
timeout(Duration::from_millis(7), rx.receive()).await,
Ok(Some(1))
);
}
#[async_std::test]
async fn reset() {
let (delay_queue, rx) = delay_queue::<i32>();
let delay_handle = delay_queue.insert(1, Duration::from_millis(100));
assert!(delay_handle.reset(Duration::from_millis(20)).await.is_ok());
assert_eq!(
timeout(Duration::from_millis(40), rx.receive()).await,
Ok(Some(1))
);
let delay_handle = delay_queue.insert(2, Duration::from_millis(100));
assert!(delay_handle
.reset_at(Instant::now() + Duration::from_millis(20))
.await
.is_ok());
assert_eq!(
timeout(Duration::from_millis(40), rx.receive()).await,
Ok(Some(2))
);
}
#[async_std::test]
async fn cancel() {
let (delay_queue, rx) = delay_queue::<i32>();
let delay_handle = delay_queue.insert(1, Duration::from_millis(200));
task::sleep(Duration::from_millis(50)).await;
let instant = Instant::now();
assert!(delay_handle.cancel().await.is_ok());
assert!(instant.elapsed() < Duration::from_millis(10));
assert!(timeout(Duration::from_millis(500), rx.receive())
.await
.is_err());
}
}