simple_delay_queue/
lib.rs

1//! A crate providing a `TimeQueue` that delays yielding inserted elements until a fixed timeout
2//! has elapsed. Each inserted element is stored together with its expiration time (current Tokio
3//! `Instant` plus the constant timeout). Since the timeout is constant, the elements naturally
4//! expire in FIFO order, and both push and pop operations are O(1).
5//!
6//! # Differences with `tokio::time::DelayQueue`
7//!
8//! The `TimeQueue` in this crate is designed to be simpler and faster than `tokio::time::DelayQueue`.
9//! While `DelayQueue` offers more features such as the ability to reset timeouts and remove elements
10//! before they expire, `TimeQueue` focuses on providing a minimalistic and efficient implementation
11//! for cases where these additional features are not needed.
12//!
13//! Key differences:
14//! - **Fixed Timeout**: `TimeQueue` uses a constant timeout for all elements, whereas `DelayQueue`
15//!   allows specifying different timeouts for each element.
16//! - **FIFO Order**: Elements in `TimeQueue` expire in the order they were inserted, ensuring FIFO
17//!   order. `DelayQueue` does not guarantee FIFO order if elements have different timeouts.
18//! - **Performance**: `TimeQueue` is optimized for performance with O(1) push and pop operations,
19//!   making it faster for use cases where the additional features of `DelayQueue` are not required.
20use futures_core::Stream;
21use std::{
22    collections::VecDeque,
23    pin::Pin,
24    task::{ready, Context, Poll},
25    time::Duration,
26};
27use tokio::time::{sleep_until, Instant, Sleep};
28
29/// A time queue that delays yielding inserted elements until a fixed timeout
30/// has elapsed. Each inserted element is stored together with its expiration
31/// time (current Tokio Instant plus the constant timeout). Since the timeout
32/// is constant, the elements naturally expire in FIFO order, and both push
33/// and pop operations are O(1).
34pub struct TimeQueue<T> {
35    /// Constant timeout duration for every element.
36    timeout: Duration,
37    /// FIFO queue storing elements paired with their expiration time.
38    queue: VecDeque<(Instant, T)>,
39    /// The currently active timer to wake up the task when the next element expires.
40    /// The sleep future is stored pinned manually.
41    timer: Pin<Box<Sleep>>,
42}
43
44impl<T> TimeQueue<T> {
45    /// Creates a new `TimeQueue` with the given timeout.
46    pub fn new(timeout: Duration) -> Self {
47        Self::with_capacity(timeout, 0)
48    }
49
50    /// Creates a new `TimeQueue` with the given timeout and reserves capacity for the underlying queue.
51    pub fn with_capacity(timeout: Duration, capacity: usize) -> Self {
52        Self {
53            timeout,
54            queue: VecDeque::with_capacity(capacity),
55            timer: Box::pin(sleep_until(Instant::now() + timeout)),
56        }
57    }
58
59    /// Inserts an element into the queue. The element will be yielded after
60    /// `timeout` has elapsed from the time of insertion.
61    pub fn push(&mut self, element: T) {
62        // Compute the expiration time based on Tokio's clock.
63        let expire_time = Instant::now() + self.timeout;
64        self.queue.push_back((expire_time, element));
65    }
66
67    #[inline(always)]
68    pub fn is_empty(&self) -> bool {
69        self.queue.is_empty()
70    }
71
72    #[inline(always)]
73    pub fn len(&self) -> usize {
74        self.queue.len()
75    }
76}
77
78impl<T> Stream for TimeQueue<T> {
79    type Item = T;
80
81    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82        // Early return if the queue is empty.
83        let Some(&(expiration, _)) = self.queue.front() else {
84            return Poll::Pending;
85        };
86
87        // If the expiration time has passed, return the element immediately.
88        if expiration <= Instant::now() {
89            return Poll::Ready(self.queue.pop_front().map(|(_, elem)| elem));
90        }
91
92        // Ensure a timer is scheduled for the front element.
93        if self.timer.deadline() < expiration {
94            self.timer.as_mut().reset(expiration);
95        }
96
97        // Poll the timer using the ready! macro.
98        let _ = ready!(self.timer.as_mut().poll(cx));
99
100        // Timer expired, so pop and return the element.
101        Poll::Ready(self.queue.pop_front().map(|(_, elem)| elem))
102    }
103}
104
105// It is safe to implement Unpin manually because our type does not use any self-referential patterns.
106impl<T> Unpin for TimeQueue<T> {}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use futures_util::StreamExt;
112    use std::time::Duration;
113    use tokio::time::advance;
114
115    /// Test that polling an empty queue remains pending.
116    #[tokio::test(start_paused = true)]
117    async fn test_empty_queue() {
118        let timeout = Duration::from_secs(600);
119        let mut queue: TimeQueue<u64> = TimeQueue::new(timeout);
120        // Without any insertions, polling should never yield an element.
121        tokio::select! {
122            biased;
123            _ = queue.next() => panic!("Queue should be empty and pending"),
124            _ = tokio::time::sleep(Duration::from_millis(50)) => {},
125        }
126    }
127
128    /// Test that an element is not returned before its timeout expires.
129    #[tokio::test(start_paused = true)]
130    async fn test_element_not_ready_immediately() {
131        let timeout = Duration::from_secs(600);
132        let mut queue = TimeQueue::new(timeout);
133        queue.push(42);
134        // Immediately polling should not return the element.
135        tokio::select! {
136            biased;
137            _ = queue.next() => panic!("Element should not be ready immediately"),
138            _ = tokio::time::sleep(Duration::from_millis(50)) => {},
139        }
140    }
141
142    /// Test that multiple elements inserted are yielded in FIFO order
143    /// once their individual timeouts have elapsed.
144    #[tokio::test(start_paused = true)]
145    async fn test_time_queue_order_with_insertion_gap() {
146        let timeout = Duration::from_secs(600);
147        let mut queue = TimeQueue::new(timeout);
148        // Insert the first element.
149        queue.push(1);
150        // Advance time by half the timeout, then insert another.
151        advance(timeout / 2).await;
152        queue.push(2);
153        // Advance time so that the first element expires but the second isn't ready yet.
154        advance(timeout / 2).await;
155        // Now the first element should be ready.
156        assert_eq!(queue.next().await, Some(1));
157        // The second element should not be ready yet.
158        tokio::select! {
159            biased;
160            _ = queue.next() => panic!("Second element should not be ready yet"),
161            _ = tokio::time::sleep(Duration::from_millis(10)) => {},
162        }
163        // Advance time so that the second element expires.
164        advance(timeout / 2).await;
165        assert_eq!(queue.next().await, Some(2));
166    }
167
168    /// Test that repeated polls before an element is ready remain pending.
169    #[tokio::test(start_paused = true)]
170    async fn test_repeated_polling() {
171        let timeout = Duration::from_secs(600);
172        let mut queue = TimeQueue::new(timeout);
173        queue.push(100);
174        // Poll several times within the timeout period.
175        for _ in 0..5 {
176            tokio::select! {
177                biased;
178                _ = queue.next() => panic!("Element should not be ready yet"),
179                _ = tokio::time::sleep(Duration::from_millis(10)) => {},
180            }
181        }
182        // Advance time and ensure the element is then ready.
183        advance(timeout).await;
184        assert_eq!(queue.next().await, Some(100));
185    }
186
187    /// Test that inserting an element after a previous one has expired behaves correctly.
188    #[tokio::test(start_paused = true)]
189    async fn test_insert_after_timeout() {
190        let timeout = Duration::from_secs(600);
191        let mut queue = TimeQueue::new(timeout);
192        queue.push(100);
193        advance(timeout).await;
194        assert_eq!(queue.next().await, Some(100));
195
196        // Insert another element after the first has expired.
197        queue.push(200);
198        // Immediately after insertion, it should not be ready.
199        tokio::select! {
200            biased;
201            _ = queue.next() => panic!("Element should not be ready immediately"),
202            _ = tokio::time::sleep(Duration::from_millis(10)) => {},
203        }
204        // Advance time so that the new element expires.
205        advance(timeout).await;
206        assert_eq!(queue.next().await, Some(200));
207    }
208
209    /// Test interleaved insertion and polling to ensure that the timer
210    /// is correctly re-armed and elements are yielded in the proper order.
211    #[tokio::test(start_paused = true)]
212    async fn test_interleaved_inserts() {
213        let timeout = Duration::from_secs(600);
214        let mut queue = TimeQueue::new(timeout);
215
216        // Insert a couple of elements.
217        queue.push(10);
218        queue.push(20);
219
220        // Advance time just past the timeout of the first element.
221        advance(timeout).await;
222        assert_eq!(queue.next().await, Some(10));
223
224        // Immediately insert a new element.
225        queue.push(30);
226
227        // The second element (20) should be ready since its timeout expired.
228        assert_eq!(queue.next().await, Some(20));
229
230        // The newly inserted element should not be ready until its own timeout.
231        tokio::select! {
232            biased;
233            _ = queue.next() => panic!("Newly inserted element should not be ready immediately"),
234            _ = tokio::time::sleep(Duration::from_millis(10)) => {},
235        }
236        advance(timeout).await;
237        assert_eq!(queue.next().await, Some(30));
238    }
239
240    /// Test that `poll_next` is cancellation safe.
241    #[tokio::test(start_paused = true)]
242    async fn test_poll_next_cancellation_safety() {
243        let timeout = Duration::from_secs(600);
244        let mut queue = TimeQueue::new(timeout);
245        queue.push(42);
246
247        // Poll the queue once, but do not await the result.
248        let mut poll_future = Box::pin(queue.next());
249        let waker = futures_util::task::noop_waker();
250        let mut cx = Context::from_waker(&waker);
251
252        // Poll the future to ensure it registers the waker.
253        assert!(poll_future.as_mut().poll(&mut cx).is_pending());
254
255        // Drop the future to simulate cancellation.
256        drop(poll_future);
257
258        // Ensure the queue is still in a consistent state.
259        assert_eq!(queue.len(), 1);
260
261        // Advance time and ensure the element is then ready.
262        advance(timeout).await;
263        assert_eq!(queue.next().await, Some(42));
264    }
265}