simple_delay_queue/
lib.rs

1//! A crate providing a `TimeQueue` that delays yielding inserted elements until a fixed timeout
2//! has elapsed. Each inserted element is stored together with its expiration time (current Tokio
3//! `Instant` plus the constant timeout). Since the timeout is constant, the elements naturally
4//! expire in FIFO order, and both push and pop operations are O(1).
5//!
6//! # Differences with `tokio::time::DelayQueue`
7//!
8//! The `TimeQueue` in this crate is designed to be simpler and faster than `tokio::time::DelayQueue`.
9//! While `DelayQueue` offers more features such as the ability to reset timeouts and remove elements
10//! before they expire, `TimeQueue` focuses on providing a minimalistic and efficient implementation
11//! for cases where these additional features are not needed.
12//!
13//! Key differences:
14//! - **Fixed Timeout**: `TimeQueue` uses a constant timeout for all elements, whereas `DelayQueue`
15//!   allows specifying different timeouts for each element.
16//! - **FIFO Order**: Elements in `TimeQueue` expire in the order they were inserted, ensuring FIFO
17//!   order. `DelayQueue` does not guarantee FIFO order if elements have different timeouts.
18//! - **Performance**: `TimeQueue` is optimized for performance with O(1) push and pop operations,
19//!   making it faster for use cases where the additional features of `DelayQueue` are not required.
20use std::{
21    collections::VecDeque,
22    pin::Pin,
23    task::{Context, Poll},
24    time::Duration,
25};
26
27use futures_core::Stream;
28use tokio::time::{sleep_until, Instant, Sleep};
29
30/// A time queue that delays yielding inserted elements until a fixed timeout
31/// has elapsed. Each inserted element is stored together with its expiration
32/// time (current Tokio Instant plus the constant timeout). Since the timeout
33/// is constant, the elements naturally expire in FIFO order, and both push
34/// and pop operations are O(1).
35pub struct TimeQueue<T> {
36    /// Constant timeout duration for every element.
37    timeout: Duration,
38    /// FIFO queue storing elements paired with their expiration time.
39    queue: VecDeque<(Instant, T)>,
40    /// The currently active timer to wake up the task when the next element expires.
41    /// The sleep future is stored pinned manually.
42    timer: Option<Pin<Box<Sleep>>>,
43}
44
45impl<T> TimeQueue<T> {
46    /// Creates a new `TimeQueue` with the given timeout.
47    pub fn new(timeout: Duration) -> Self {
48        Self {
49            timeout,
50            queue: VecDeque::new(),
51            timer: None,
52        }
53    }
54
55    /// Creates a new `TimeQueue` with the given timeout and reserves capacity for the underlying queue.
56    pub fn with_capacity(timeout: Duration, capacity: usize) -> Self {
57        Self {
58            timeout,
59            queue: VecDeque::with_capacity(capacity),
60            timer: None,
61        }
62    }
63
64    /// Inserts an element into the queue. The element will be yielded after
65    /// `timeout` has elapsed from the time of insertion.
66    pub fn push(&mut self, element: T) {
67        // Compute the expiration time based on Tokio's clock.
68        let expire_time = Instant::now() + self.timeout;
69        self.queue.push_back((expire_time, element));
70
71        // If there is no timer active, set one.
72        if self.timer.is_none() {
73            self.set_timer();
74        }
75    }
76
77    #[inline(always)]
78    pub fn is_empty(&self) -> bool {
79        self.queue.is_empty()
80    }
81
82    #[inline(always)]
83    pub fn len(&self) -> usize {
84        self.queue.len()
85    }
86
87    /// Sets (or resets) the timer to fire at the expiration time of the element
88    /// at the front of the queue.
89    fn set_timer(&mut self) {
90        if let Some(&(instant, _)) = self.queue.front() {
91            // Create a new sleep future and pin it manually.
92            self.timer = Some(Box::pin(sleep_until(instant)));
93        }
94    }
95}
96
97impl<T> Stream for TimeQueue<T> {
98    type Item = T;
99
100    /// Polls the stream to yield the next element whose timeout has expired.
101    ///
102    /// If no element is ready, this method registers the current task to be
103    /// woken when the next element's timeout is reached.
104    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105        // If the queue is empty, clear any timer and return Pending.
106        if self.queue.is_empty() {
107            self.timer = None;
108            return Poll::Pending;
109        }
110
111        // Ensure there is an active timer.
112        if self.timer.is_none() {
113            self.set_timer();
114        }
115
116        // Now, we have a timer. It is stored as a Pin<Box<Sleep>>,
117        // so we can poll it using its pinned projection.
118        let timer = self.timer.as_mut().expect("timer should be set");
119        if timer.as_mut().poll(cx).is_pending() {
120            return Poll::Pending;
121        }
122
123        // Timer has fired; remove the expired element from the front.
124        let (_instant, element) = self.queue.pop_front().expect("queue was non-empty");
125
126        // Reset the timer for the next element if any remain.
127        if !self.queue.is_empty() {
128            self.set_timer();
129        } else {
130            self.timer = None;
131        }
132
133        Poll::Ready(Some(element))
134    }
135}
136
137// It is safe to implement Unpin manually because our type does not use any self-referential patterns.
138impl<T> Unpin for TimeQueue<T> {}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use futures_util::StreamExt;
144    use std::time::Duration;
145    use tokio::time::advance;
146
147    /// Test that polling an empty queue remains pending.
148    #[tokio::test(start_paused = true)]
149    async fn test_empty_queue() {
150        let timeout = Duration::from_secs(600);
151        let mut queue: TimeQueue<u64> = TimeQueue::new(timeout);
152        // Without any insertions, polling should never yield an element.
153        tokio::select! {
154            biased;
155            _ = queue.next() => panic!("Queue should be empty and pending"),
156            _ = tokio::time::sleep(Duration::from_millis(50)) => {},
157        }
158    }
159
160    /// Test that an element is not returned before its timeout expires.
161    #[tokio::test(start_paused = true)]
162    async fn test_element_not_ready_immediately() {
163        let timeout = Duration::from_secs(600);
164        let mut queue = TimeQueue::new(timeout);
165        queue.push(42);
166        // Immediately polling should not return the element.
167        tokio::select! {
168            biased;
169            _ = queue.next() => panic!("Element should not be ready immediately"),
170            _ = tokio::time::sleep(Duration::from_millis(50)) => {},
171        }
172    }
173
174    /// Test that multiple elements inserted are yielded in FIFO order
175    /// once their individual timeouts have elapsed.
176    #[tokio::test(start_paused = true)]
177    async fn test_time_queue_order_with_insertion_gap() {
178        let timeout = Duration::from_secs(600);
179        let mut queue = TimeQueue::new(timeout);
180        // Insert the first element.
181        queue.push(1);
182        // Advance time by half the timeout, then insert another.
183        advance(timeout / 2).await;
184        queue.push(2);
185        // Advance time so that the first element expires but the second isn't ready yet.
186        advance(timeout / 2).await;
187        // Now the first element should be ready.
188        assert_eq!(queue.next().await, Some(1));
189        // The second element should not be ready yet.
190        tokio::select! {
191            biased;
192            _ = queue.next() => panic!("Second element should not be ready yet"),
193            _ = tokio::time::sleep(Duration::from_millis(10)) => {},
194        }
195        // Advance time so that the second element expires.
196        advance(timeout / 2).await;
197        assert_eq!(queue.next().await, Some(2));
198    }
199
200    /// Test that repeated polls before an element is ready remain pending.
201    #[tokio::test(start_paused = true)]
202    async fn test_repeated_polling() {
203        let timeout = Duration::from_secs(600);
204        let mut queue = TimeQueue::new(timeout);
205        queue.push(100);
206        // Poll several times within the timeout period.
207        for _ in 0..5 {
208            tokio::select! {
209                biased;
210                _ = queue.next() => panic!("Element should not be ready yet"),
211                _ = tokio::time::sleep(Duration::from_millis(10)) => {},
212            }
213        }
214        // Advance time and ensure the element is then ready.
215        advance(timeout).await;
216        assert_eq!(queue.next().await, Some(100));
217    }
218
219    /// Test that inserting an element after a previous one has expired behaves correctly.
220    #[tokio::test(start_paused = true)]
221    async fn test_insert_after_timeout() {
222        let timeout = Duration::from_secs(600);
223        let mut queue = TimeQueue::new(timeout);
224        queue.push(100);
225        advance(timeout).await;
226        assert_eq!(queue.next().await, Some(100));
227
228        // Insert another element after the first has expired.
229        queue.push(200);
230        // Immediately after insertion, it should not be ready.
231        tokio::select! {
232            biased;
233            _ = queue.next() => panic!("Element should not be ready immediately"),
234            _ = tokio::time::sleep(Duration::from_millis(10)) => {},
235        }
236        // Advance time so that the new element expires.
237        advance(timeout).await;
238        assert_eq!(queue.next().await, Some(200));
239    }
240
241    /// Test interleaved insertion and polling to ensure that the timer
242    /// is correctly re-armed and elements are yielded in the proper order.
243    #[tokio::test(start_paused = true)]
244    async fn test_interleaved_inserts() {
245        let timeout = Duration::from_secs(600);
246        let mut queue = TimeQueue::new(timeout);
247
248        // Insert a couple of elements.
249        queue.push(10);
250        queue.push(20);
251
252        // Advance time just past the timeout of the first element.
253        advance(timeout).await;
254        assert_eq!(queue.next().await, Some(10));
255
256        // Immediately insert a new element.
257        queue.push(30);
258
259        // The second element (20) should be ready since its timeout expired.
260        assert_eq!(queue.next().await, Some(20));
261
262        // The newly inserted element should not be ready until its own timeout.
263        tokio::select! {
264            biased;
265            _ = queue.next() => panic!("Newly inserted element should not be ready immediately"),
266            _ = tokio::time::sleep(Duration::from_millis(10)) => {},
267        }
268        advance(timeout).await;
269        assert_eq!(queue.next().await, Some(30));
270    }
271
272    /// Test that `poll_next` is cancellation safe.
273    #[tokio::test(start_paused = true)]
274    async fn test_poll_next_cancellation_safety() {
275        let timeout = Duration::from_secs(600);
276        let mut queue = TimeQueue::new(timeout);
277        queue.push(42);
278
279        // Poll the queue once, but do not await the result.
280        let mut poll_future = Box::pin(queue.next());
281        let waker = futures_util::task::noop_waker();
282        let mut cx = Context::from_waker(&waker);
283
284        // Poll the future to ensure it registers the waker.
285        assert!(poll_future.as_mut().poll(&mut cx).is_pending());
286
287        // Drop the future to simulate cancellation.
288        drop(poll_future);
289
290        // Ensure the queue is still in a consistent state.
291        assert_eq!(queue.len(), 1);
292
293        // Advance time and ensure the element is then ready.
294        advance(timeout).await;
295        assert_eq!(queue.next().await, Some(42));
296    }
297}