simple_delay_queue/lib.rs
1//! A crate providing a `TimeQueue` that delays yielding inserted elements until a fixed timeout
2//! has elapsed. Each inserted element is stored together with its expiration time (current Tokio
3//! `Instant` plus the constant timeout). Since the timeout is constant, the elements naturally
4//! expire in FIFO order, and both push and pop operations are O(1).
5//!
6//! # Differences with `tokio::time::DelayQueue`
7//!
8//! The `TimeQueue` in this crate is designed to be simpler and faster than `tokio::time::DelayQueue`.
9//! While `DelayQueue` offers more features such as the ability to reset timeouts and remove elements
10//! before they expire, `TimeQueue` focuses on providing a minimalistic and efficient implementation
11//! for cases where these additional features are not needed.
12//!
13//! Key differences:
14//! - **Fixed Timeout**: `TimeQueue` uses a constant timeout for all elements, whereas `DelayQueue`
15//! allows specifying different timeouts for each element.
16//! - **FIFO Order**: Elements in `TimeQueue` expire in the order they were inserted, ensuring FIFO
17//! order. `DelayQueue` does not guarantee FIFO order if elements have different timeouts.
18//! - **Performance**: `TimeQueue` is optimized for performance with O(1) push and pop operations,
19//! making it faster for use cases where the additional features of `DelayQueue` are not required.
20use futures_core::Stream;
21use std::{
22 collections::VecDeque,
23 pin::Pin,
24 task::{ready, Context, Poll},
25 time::Duration,
26};
27use tokio::time::{sleep_until, Instant, Sleep};
28
29/// A time queue that delays yielding inserted elements until a fixed timeout
30/// has elapsed. Each inserted element is stored together with its expiration
31/// time (current Tokio Instant plus the constant timeout). Since the timeout
32/// is constant, the elements naturally expire in FIFO order, and both push
33/// and pop operations are O(1).
34pub struct TimeQueue<T> {
35 /// Constant timeout duration for every element.
36 timeout: Duration,
37 /// FIFO queue storing elements paired with their expiration time.
38 queue: VecDeque<(Instant, T)>,
39 /// The currently active timer to wake up the task when the next element expires.
40 /// The sleep future is stored pinned manually.
41 timer: Pin<Box<Sleep>>,
42}
43
44impl<T> TimeQueue<T> {
45 /// Creates a new `TimeQueue` with the given timeout.
46 pub fn new(timeout: Duration) -> Self {
47 Self::with_capacity(timeout, 0)
48 }
49
50 /// Creates a new `TimeQueue` with the given timeout and reserves capacity for the underlying queue.
51 pub fn with_capacity(timeout: Duration, capacity: usize) -> Self {
52 Self {
53 timeout,
54 queue: VecDeque::with_capacity(capacity),
55 timer: Box::pin(sleep_until(Instant::now() + timeout)),
56 }
57 }
58
59 /// Inserts an element into the queue. The element will be yielded after
60 /// `timeout` has elapsed from the time of insertion.
61 pub fn push(&mut self, element: T) {
62 // Compute the expiration time based on Tokio's clock.
63 let expire_time = Instant::now() + self.timeout;
64 self.queue.push_back((expire_time, element));
65 }
66
67 #[inline(always)]
68 pub fn is_empty(&self) -> bool {
69 self.queue.is_empty()
70 }
71
72 #[inline(always)]
73 pub fn len(&self) -> usize {
74 self.queue.len()
75 }
76}
77
78impl<T> Stream for TimeQueue<T> {
79 type Item = T;
80
81 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 // Early return if the queue is empty.
83 let Some(&(expiration, _)) = self.queue.front() else {
84 return Poll::Pending;
85 };
86
87 // If the expiration time has passed, return the element immediately.
88 if expiration <= Instant::now() {
89 return Poll::Ready(self.queue.pop_front().map(|(_, elem)| elem));
90 }
91
92 // Ensure a timer is scheduled for the front element.
93 if self.timer.deadline() < expiration {
94 self.timer.as_mut().reset(expiration);
95 }
96
97 // Poll the timer using the ready! macro.
98 let _ = ready!(self.timer.as_mut().poll(cx));
99
100 // Timer expired, so pop and return the element.
101 Poll::Ready(self.queue.pop_front().map(|(_, elem)| elem))
102 }
103}
104
105// It is safe to implement Unpin manually because our type does not use any self-referential patterns.
106impl<T> Unpin for TimeQueue<T> {}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use futures_util::StreamExt;
112 use std::time::Duration;
113 use tokio::time::advance;
114
115 /// Test that polling an empty queue remains pending.
116 #[tokio::test(start_paused = true)]
117 async fn test_empty_queue() {
118 let timeout = Duration::from_secs(600);
119 let mut queue: TimeQueue<u64> = TimeQueue::new(timeout);
120 // Without any insertions, polling should never yield an element.
121 tokio::select! {
122 biased;
123 _ = queue.next() => panic!("Queue should be empty and pending"),
124 _ = tokio::time::sleep(Duration::from_millis(50)) => {},
125 }
126 }
127
128 /// Test that an element is not returned before its timeout expires.
129 #[tokio::test(start_paused = true)]
130 async fn test_element_not_ready_immediately() {
131 let timeout = Duration::from_secs(600);
132 let mut queue = TimeQueue::new(timeout);
133 queue.push(42);
134 // Immediately polling should not return the element.
135 tokio::select! {
136 biased;
137 _ = queue.next() => panic!("Element should not be ready immediately"),
138 _ = tokio::time::sleep(Duration::from_millis(50)) => {},
139 }
140 }
141
142 /// Test that multiple elements inserted are yielded in FIFO order
143 /// once their individual timeouts have elapsed.
144 #[tokio::test(start_paused = true)]
145 async fn test_time_queue_order_with_insertion_gap() {
146 let timeout = Duration::from_secs(600);
147 let mut queue = TimeQueue::new(timeout);
148 // Insert the first element.
149 queue.push(1);
150 // Advance time by half the timeout, then insert another.
151 advance(timeout / 2).await;
152 queue.push(2);
153 // Advance time so that the first element expires but the second isn't ready yet.
154 advance(timeout / 2).await;
155 // Now the first element should be ready.
156 assert_eq!(queue.next().await, Some(1));
157 // The second element should not be ready yet.
158 tokio::select! {
159 biased;
160 _ = queue.next() => panic!("Second element should not be ready yet"),
161 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
162 }
163 // Advance time so that the second element expires.
164 advance(timeout / 2).await;
165 assert_eq!(queue.next().await, Some(2));
166 }
167
168 /// Test that repeated polls before an element is ready remain pending.
169 #[tokio::test(start_paused = true)]
170 async fn test_repeated_polling() {
171 let timeout = Duration::from_secs(600);
172 let mut queue = TimeQueue::new(timeout);
173 queue.push(100);
174 // Poll several times within the timeout period.
175 for _ in 0..5 {
176 tokio::select! {
177 biased;
178 _ = queue.next() => panic!("Element should not be ready yet"),
179 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
180 }
181 }
182 // Advance time and ensure the element is then ready.
183 advance(timeout).await;
184 assert_eq!(queue.next().await, Some(100));
185 }
186
187 /// Test that inserting an element after a previous one has expired behaves correctly.
188 #[tokio::test(start_paused = true)]
189 async fn test_insert_after_timeout() {
190 let timeout = Duration::from_secs(600);
191 let mut queue = TimeQueue::new(timeout);
192 queue.push(100);
193 advance(timeout).await;
194 assert_eq!(queue.next().await, Some(100));
195
196 // Insert another element after the first has expired.
197 queue.push(200);
198 // Immediately after insertion, it should not be ready.
199 tokio::select! {
200 biased;
201 _ = queue.next() => panic!("Element should not be ready immediately"),
202 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
203 }
204 // Advance time so that the new element expires.
205 advance(timeout).await;
206 assert_eq!(queue.next().await, Some(200));
207 }
208
209 /// Test interleaved insertion and polling to ensure that the timer
210 /// is correctly re-armed and elements are yielded in the proper order.
211 #[tokio::test(start_paused = true)]
212 async fn test_interleaved_inserts() {
213 let timeout = Duration::from_secs(600);
214 let mut queue = TimeQueue::new(timeout);
215
216 // Insert a couple of elements.
217 queue.push(10);
218 queue.push(20);
219
220 // Advance time just past the timeout of the first element.
221 advance(timeout).await;
222 assert_eq!(queue.next().await, Some(10));
223
224 // Immediately insert a new element.
225 queue.push(30);
226
227 // The second element (20) should be ready since its timeout expired.
228 assert_eq!(queue.next().await, Some(20));
229
230 // The newly inserted element should not be ready until its own timeout.
231 tokio::select! {
232 biased;
233 _ = queue.next() => panic!("Newly inserted element should not be ready immediately"),
234 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
235 }
236 advance(timeout).await;
237 assert_eq!(queue.next().await, Some(30));
238 }
239
240 /// Test that `poll_next` is cancellation safe.
241 #[tokio::test(start_paused = true)]
242 async fn test_poll_next_cancellation_safety() {
243 let timeout = Duration::from_secs(600);
244 let mut queue = TimeQueue::new(timeout);
245 queue.push(42);
246
247 // Poll the queue once, but do not await the result.
248 let mut poll_future = Box::pin(queue.next());
249 let waker = futures_util::task::noop_waker();
250 let mut cx = Context::from_waker(&waker);
251
252 // Poll the future to ensure it registers the waker.
253 assert!(poll_future.as_mut().poll(&mut cx).is_pending());
254
255 // Drop the future to simulate cancellation.
256 drop(poll_future);
257
258 // Ensure the queue is still in a consistent state.
259 assert_eq!(queue.len(), 1);
260
261 // Advance time and ensure the element is then ready.
262 advance(timeout).await;
263 assert_eq!(queue.next().await, Some(42));
264 }
265}