futures_delay_queue/
lib.rs

1//! A queue of delayed elements backed by [`futures-timer`](https://crates.io/crates/futures-timer)
2//! that can be used with both:
3//!
4//! - [`async-std`](https://crates.io/crates/async-std) as default, and
5//! - [`tokio`](https://crates.io/crates/tokio) with feature `tokio` (and
6//!   *disabled* default features)
7//!
8//! An element is inserted into the [`DelayQueue`] and will be yielded once the
9//! specified deadline has been reached.
10//!
11//! The delayed items can be consumed through a channel returned at creation.
12//!
13//! ## Implementation
14//!
15//! The delays are spawned and a timeout races against a reset channel that can
16//! be triggered with the [`DelayHandle`]. If the timeout occurs before
17//! cancelation or a reset the item is yielded through the receiver channel.
18//!
19//! ## Usage
20//!
21//! A [`DelayQueue`] and a channel for receiving the expired items is created
22//! using the [`delay_queue`] function.
23//!
24//! Elements are inserted into [`DelayQueue`] using the
25//! [`insert()`](DelayQueue::insert) or [`insert_at()`](DelayQueue::insert_at)
26//! methods. A deadline is provided with the item and a [`DelayHandle`] is
27//! returned. The delay handle is used to remove the entry.
28//!
29//! The delays can be configured with the [`reset_at()`](DelayHandle::reset_at)
30//! or the [`reset()`](DelayHandle::reset) method or canceled by calling the
31//! [`cancel()`](DelayHandle::cancel) method. Dropping the handle will not
32//! cancel the delay.
33//!
34//! Modification of the delay fails if the delayed item expired in the meantime.
35//! In this case an [`ErrorAlreadyExpired`] will be returned. If modification
36//! succeeds the handle will be returned back to the caller.
37//!
38//! ## Example
39//!
40//! ```rust
41//! use futures_delay_queue::delay_queue;
42//! use std::time::Duration;
43//!
44//! #[async_std::main]
45//! async fn main() {
46//!     let (delay_queue, rx) = delay_queue::<i32>();
47//!
48//!     let delay_handle = delay_queue.insert(1, Duration::from_millis(20));
49//!     assert!(delay_handle.reset(Duration::from_millis(40)).await.is_ok());
50//!
51//!     let delay_handle = delay_queue.insert(2, Duration::from_millis(10));
52//!     assert!(delay_handle.cancel().await.is_ok());
53//!
54//!     let delay_handle = delay_queue.insert(3, Duration::from_millis(30));
55//!
56//!     assert_eq!(rx.receive().await, Some(3));
57//!     assert_eq!(rx.receive().await, Some(1));
58//!
59//!     drop(delay_queue);
60//!     assert_eq!(rx.receive().await, None);
61//! }
62//! ```
63
64#![cfg_attr(feature = "docs", feature(doc_cfg))]
65#![warn(missing_docs, missing_debug_implementations)]
66#![doc(test(attr(deny(warnings))))]
67#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
68
69#[cfg(feature = "async-std")]
70use async_std::task;
71use futures_intrusive::{
72    buffer::{FixedHeapBuf, GrowingHeapBuf, RingBuf},
73    channel::shared::{generic_channel, ChannelReceiveFuture, GenericReceiver},
74};
75use futures_timer::Delay;
76use pin_project_lite::pin_project;
77use std::{
78    error::Error,
79    fmt::{self, Display},
80    future::Future,
81    pin::Pin,
82    task::{Context, Poll},
83    time::{Duration, Instant},
84};
85#[cfg(feature = "tokio")]
86use tokio::task;
87
88pub use futures_intrusive::channel::shared::{GenericSender, Receiver};
89
90/// A queue for managing delayed items.
91#[derive(Debug, Clone)]
92pub struct DelayQueue<T: 'static, A: RingBuf<Item = T>> {
93    // Used to send the expired items.
94    expired: GenericSender<parking_lot::RawMutex, T, A>,
95}
96
97/// A handle to cancel the corresponding delay of an item in `DelayQueue`.
98#[derive(Debug)]
99pub struct DelayHandle {
100    // Used to change the delay.
101    reset: GenericSender<parking_lot::RawMutex, DelayReset, FixedHeapBuf<DelayReset>>,
102}
103
104enum DelayReset {
105    NewDuration(Duration),
106    Cancel,
107}
108
109/// The error type for delays that are modified after they have already expired
110/// in the `DelayQueue`.
111#[derive(Debug, Clone, Copy, PartialEq)]
112pub struct ErrorAlreadyExpired {}
113
114impl Error for ErrorAlreadyExpired {
115    fn description(&self) -> &str {
116        "delay already expired"
117    }
118}
119
120impl Display for ErrorAlreadyExpired {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        write!(f, "Delay already expired")
123    }
124}
125
126impl DelayHandle {
127    /// Resets the delay of the corresponding item to `when` and returns a new
128    /// `DelayHandle` on success.
129    pub async fn reset_at(self, when: Instant) -> Result<Self, ErrorAlreadyExpired> {
130        let now = Instant::now();
131        let dur = if when <= now {
132            Duration::from_nanos(0)
133        } else {
134            when - now
135        };
136        self.reset(dur).await
137    }
138
139    /// Resets the delay of the corresponding item to now + `dur` and returns a
140    /// new `DelayHandle`on success.
141    pub async fn reset(self, dur: Duration) -> Result<Self, ErrorAlreadyExpired> {
142        self.reset
143            .send(DelayReset::NewDuration(dur))
144            .await
145            .map_err(|_| ErrorAlreadyExpired {})?;
146        Ok(self)
147    }
148
149    /// Cancels the delay.
150    pub async fn cancel(self) -> Result<(), ErrorAlreadyExpired> {
151        self.reset
152            .send(DelayReset::Cancel)
153            .await
154            .map_err(|_| ErrorAlreadyExpired {})
155    }
156}
157
158/// Creates a dynamically growing delay queue and a multi consumer channel for
159/// receiving expired items.
160///
161/// # Example
162///
163/// ```
164/// # async_std::task::block_on(async {
165/// #
166/// use futures_delay_queue::delay_queue;
167/// use std::time::Duration;
168///
169/// let (delay_queue, expired_items) = delay_queue();
170/// delay_queue.insert(1, Duration::from_millis(10));
171///
172/// // approximately 10ms later
173/// assert_eq!(expired_items.receive().await, Some(1));
174/// #
175/// # })
176/// ```
177pub fn delay_queue<T: 'static + Send>() -> (
178    DelayQueue<T, GrowingHeapBuf<T>>,
179    GenericReceiver<parking_lot::RawMutex, T, GrowingHeapBuf<T>>,
180) {
181    let (tx, rx) = generic_channel(0);
182    (DelayQueue { expired: tx }, rx)
183}
184
185pin_project! {
186    struct DelayedItem<T> {
187        value: Option<T>,
188        delay: Delay,
189        reset_rx: GenericReceiver<parking_lot::RawMutex, DelayReset, FixedHeapBuf<DelayReset>>,
190        reset: ChannelReceiveFuture<parking_lot::RawMutex, DelayReset>,
191        handle_dropped: bool,
192    }
193}
194
195impl<T> Future for DelayedItem<T> {
196    type Output = Option<T>;
197
198    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199        if !self.handle_dropped {
200            // Make sure the reset future is polled at least once to be awaken on new
201            // messages.
202            // Check if we got a reset or got canceled
203            // `new_unchecked` is ok because the value is never used again after being
204            // dropped.
205            while let Poll::Ready(v) = unsafe { Pin::new_unchecked(&mut self.reset).poll(cx) } {
206                match v {
207                    Some(reset) => match reset {
208                        DelayReset::Cancel => return Poll::Ready(None),
209                        DelayReset::NewDuration(dur) => self.delay = Delay::new(dur),
210                    },
211                    // handle got dropped, from now on we wait until the item expires
212                    None => {
213                        self.handle_dropped = true;
214                        // Channel is closed.
215                        break;
216                    }
217                }
218                // Future is consumed, ask for a new one and poll it (loop).
219                self.reset = self.reset_rx.receive();
220            }
221        }
222
223        // expired?
224        match Pin::new(&mut self.delay).poll(cx) {
225            Poll::Ready(_) => Poll::Ready(self.value.take()),
226            Poll::Pending => Poll::Pending,
227        }
228    }
229}
230
231impl<T, A> DelayQueue<T, A>
232where
233    T: 'static + Send,
234    A: 'static + RingBuf<Item = T> + Send,
235{
236    /// Inserts an item into the delay queue that will be yielded after `dur`
237    /// has passed.
238    pub fn insert(&self, value: T, dur: Duration) -> DelayHandle {
239        self.new_handle_with_future(value, dur)
240    }
241
242    /// Inserts an item into the delay queue that will be yielded at `when`.
243    pub fn insert_at(&self, value: T, when: Instant) -> DelayHandle {
244        let now = Instant::now();
245        let dur = if now >= when {
246            Duration::from_nanos(0)
247        } else {
248            when - now
249        };
250        self.new_handle_with_future(value, dur)
251    }
252
253    fn new_handle_with_future(&self, value: T, dur: Duration) -> DelayHandle {
254        let (reset_tx, reset_rx) = generic_channel::<parking_lot::RawMutex, _, FixedHeapBuf<_>>(0);
255        let expired = self.expired.clone();
256        let reset = reset_rx.receive();
257        let delayed_item = DelayedItem {
258            value: Some(value),
259            delay: Delay::new(dur),
260            reset_rx,
261            reset,
262            handle_dropped: false,
263        };
264        task::spawn(async move {
265            if let Some(v) = delayed_item.await {
266                let _ = expired.send(v).await;
267            }
268        });
269        DelayHandle { reset: reset_tx }
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use async_std::future::timeout;
277
278    #[async_std::test]
279    async fn insert() {
280        let (delay_queue, rx) = delay_queue::<i32>();
281        delay_queue.insert(1, Duration::from_millis(10));
282        delay_queue.insert(2, Duration::from_millis(5));
283        assert_eq!(
284            timeout(Duration::from_millis(8), rx.receive()).await,
285            Ok(Some(2))
286        );
287        assert_eq!(
288            timeout(Duration::from_millis(7), rx.receive()).await,
289            Ok(Some(1))
290        );
291    }
292
293    #[async_std::test]
294    async fn reset() {
295        let (delay_queue, rx) = delay_queue::<i32>();
296        let delay_handle = delay_queue.insert(1, Duration::from_millis(100));
297        assert!(delay_handle.reset(Duration::from_millis(20)).await.is_ok());
298
299        assert_eq!(
300            timeout(Duration::from_millis(40), rx.receive()).await,
301            Ok(Some(1))
302        );
303
304        let delay_handle = delay_queue.insert(2, Duration::from_millis(100));
305        assert!(delay_handle
306            .reset_at(Instant::now() + Duration::from_millis(20))
307            .await
308            .is_ok());
309
310        assert_eq!(
311            timeout(Duration::from_millis(40), rx.receive()).await,
312            Ok(Some(2))
313        );
314    }
315
316    #[async_std::test]
317    async fn cancel() {
318        let (delay_queue, rx) = delay_queue::<i32>();
319        let delay_handle = delay_queue.insert(1, Duration::from_millis(200));
320        // Make sure the future is polled at least once
321        task::sleep(Duration::from_millis(50)).await;
322        let instant = Instant::now();
323        assert!(delay_handle.cancel().await.is_ok());
324        assert!(instant.elapsed() < Duration::from_millis(10));
325        assert!(timeout(Duration::from_millis(500), rx.receive())
326            .await
327            .is_err());
328    }
329}