timed_queue/
lib.rs

1//! `timed-queue` provides `TimedQueue`, a set of objects and the minimum time at which they should be returned.
2//!
3//! # Example
4//! Imagine the "new messages" queue of an SMTP server implementation. Delivery should be attempted immediately for new messages.
5//! Messages for which delivery fails should be retried after 30 minutes.
6//!
7//!```
8//! fn server_loop<I: IntoIterator<Item = MailMessage>>(tq: TimedQueue<MailMessage>, messages: I) {
9//!     for m in messages {
10//!         tq.enqueue(m, None);
11//!     }
12//! }
13//!
14//! async fn delivery_loop(tq: TimedQueue<MailMessage>) {
15//!     loop {
16//!         let (msg, _) = tq.dequeue().await;
17//!         if try_deliver(msg).await.is_err() {
18//!             tq.enqueue(msg, Some(Instant::now() + Duration::from_secs(30 * 60)));
19//!         }
20//!     }
21//! }
22//!
23//! #[tokio::main]
24//! async fn main() {
25//!     let tq = TimedQueue::new();
26//!     let tq2 = tq.clone();
27//!     std::thread::spawn(move || server_loop(tq, get_message_stream()));
28//!     tokio::spawn(delivery_loop(tq2));
29//! }
30//! ```
31use std::cmp::Reverse;
32use std::collections::BinaryHeap;
33use std::sync::Arc;
34use std::sync::Mutex;
35use std::time::Duration;
36use std::time::Instant;
37
38use tokio::sync::Notify;
39use tokio::time::timeout;
40
41#[derive(Eq, PartialEq, Ord, PartialOrd)]
42struct Item<T>
43where
44    T: Ord,
45{
46    expiration: Reverse<Option<Instant>>,
47    inner: T,
48}
49
50struct SharedInner<T>
51where
52    T: Ord,
53{
54    storage: Mutex<BinaryHeap<Item<T>>>,
55    notify: Notify,
56}
57
58#[derive(Clone)]
59pub struct TimedQueue<T>
60where
61    T: Ord,
62{
63    inner: Arc<SharedInner<T>>,
64}
65
66impl<T> TimedQueue<T>
67where
68    T: Ord,
69{
70    pub fn new() -> Self {
71        Self {
72            inner: Arc::new(SharedInner {
73                storage: Mutex::new(BinaryHeap::new()),
74                notify: Notify::new(),
75            }),
76        }
77    }
78    pub fn enqueue(&self, t: T, expiration: Option<Instant>) {
79        self.inner.storage.lock().unwrap().push(Item {
80            expiration: Reverse(expiration),
81            inner: t,
82        });
83        self.inner.notify.notify_one();
84    }
85
86    fn peek_inner(&self) -> Result<(T, Option<Instant>), Option<Duration>> {
87        let now = Instant::now();
88        let mut lock = self.inner.storage.lock().unwrap();
89        let (ready, duration) = match lock.peek() {
90            Some(Item {
91                expiration: Reverse(Some(expiration)),
92                ..
93            }) => {
94                if *expiration < now {
95                    (true, None)
96                } else {
97                    (false, Some(*expiration - now))
98                }
99            }
100            Some(Item {
101                expiration: Reverse(None),
102                ..
103            }) => (true, None),
104            None => (false, None),
105        };
106        if ready {
107            let Item {
108                expiration: Reverse(expiration),
109                inner: item,
110            } = lock.pop().unwrap();
111            Ok((item, expiration))
112        } else {
113            Err(duration)
114        }
115    }
116
117    pub async fn dequeue(&self) -> (T, Option<Instant>) {
118        loop {
119            match self.peek_inner() {
120                Ok((item, duration)) => {
121                    break (item, duration);
122                }
123                Err(Some(duration)) => {
124                    let _ = timeout(duration, self.inner.notify.notified()).await;
125                }
126                Err(None) => {
127                    self.inner.notify.notified().await;
128                }
129            }
130        }
131    }
132}