simple_delay_queue/lib.rs
1//! A crate providing a `TimeQueue` that delays yielding inserted elements until a fixed timeout
2//! has elapsed. Each inserted element is stored together with its expiration time (current Tokio
3//! `Instant` plus the constant timeout). Since the timeout is constant, the elements naturally
4//! expire in FIFO order, and both push and pop operations are O(1).
5//!
6//! # Differences with `tokio::time::DelayQueue`
7//!
8//! The `TimeQueue` in this crate is designed to be simpler and faster than `tokio::time::DelayQueue`.
9//! While `DelayQueue` offers more features such as the ability to reset timeouts and remove elements
10//! before they expire, `TimeQueue` focuses on providing a minimalistic and efficient implementation
11//! for cases where these additional features are not needed.
12//!
13//! Key differences:
14//! - **Fixed Timeout**: `TimeQueue` uses a constant timeout for all elements, whereas `DelayQueue`
15//! allows specifying different timeouts for each element.
16//! - **FIFO Order**: Elements in `TimeQueue` expire in the order they were inserted, ensuring FIFO
17//! order. `DelayQueue` does not guarantee FIFO order if elements have different timeouts.
18//! - **Performance**: `TimeQueue` is optimized for performance with O(1) push and pop operations,
19//! making it faster for use cases where the additional features of `DelayQueue` are not required.
20use std::{
21 collections::VecDeque,
22 pin::Pin,
23 task::{Context, Poll},
24 time::Duration,
25};
26
27use futures_core::Stream;
28use tokio::time::{sleep_until, Instant, Sleep};
29
30/// A time queue that delays yielding inserted elements until a fixed timeout
31/// has elapsed. Each inserted element is stored together with its expiration
32/// time (current Tokio Instant plus the constant timeout). Since the timeout
33/// is constant, the elements naturally expire in FIFO order, and both push
34/// and pop operations are O(1).
35pub struct TimeQueue<T> {
36 /// Constant timeout duration for every element.
37 timeout: Duration,
38 /// FIFO queue storing elements paired with their expiration time.
39 queue: VecDeque<(Instant, T)>,
40 /// The currently active timer to wake up the task when the next element expires.
41 /// The sleep future is stored pinned manually.
42 timer: Option<Pin<Box<Sleep>>>,
43}
44
45impl<T> TimeQueue<T> {
46 /// Creates a new `TimeQueue` with the given timeout.
47 pub fn new(timeout: Duration) -> Self {
48 Self {
49 timeout,
50 queue: VecDeque::new(),
51 timer: None,
52 }
53 }
54
55 /// Creates a new `TimeQueue` with the given timeout and reserves capacity for the underlying queue.
56 pub fn with_capacity(timeout: Duration, capacity: usize) -> Self {
57 Self {
58 timeout,
59 queue: VecDeque::with_capacity(capacity),
60 timer: None,
61 }
62 }
63
64 /// Inserts an element into the queue. The element will be yielded after
65 /// `timeout` has elapsed from the time of insertion.
66 pub fn push(&mut self, element: T) {
67 // Compute the expiration time based on Tokio's clock.
68 let expire_time = Instant::now() + self.timeout;
69 self.queue.push_back((expire_time, element));
70
71 // If there is no timer active, set one.
72 if self.timer.is_none() {
73 self.set_timer();
74 }
75 }
76
77 #[inline(always)]
78 pub fn is_empty(&self) -> bool {
79 self.queue.is_empty()
80 }
81
82 #[inline(always)]
83 pub fn len(&self) -> usize {
84 self.queue.len()
85 }
86
87 /// Sets (or resets) the timer to fire at the expiration time of the element
88 /// at the front of the queue.
89 fn set_timer(&mut self) {
90 if let Some(&(instant, _)) = self.queue.front() {
91 // Create a new sleep future and pin it manually.
92 self.timer = Some(Box::pin(sleep_until(instant)));
93 }
94 }
95}
96
97impl<T> Stream for TimeQueue<T> {
98 type Item = T;
99
100 /// Polls the stream to yield the next element whose timeout has expired.
101 ///
102 /// If no element is ready, this method registers the current task to be
103 /// woken when the next element's timeout is reached.
104 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 // If the queue is empty, clear any timer and return Pending.
106 if self.queue.is_empty() {
107 self.timer = None;
108 return Poll::Pending;
109 }
110
111 // Ensure there is an active timer.
112 if self.timer.is_none() {
113 self.set_timer();
114 }
115
116 // Now, we have a timer. It is stored as a Pin<Box<Sleep>>,
117 // so we can poll it using its pinned projection.
118 let timer = self.timer.as_mut().expect("timer should be set");
119 if timer.as_mut().poll(cx).is_pending() {
120 return Poll::Pending;
121 }
122
123 // Timer has fired; remove the expired element from the front.
124 let (_instant, element) = self.queue.pop_front().expect("queue was non-empty");
125
126 // Reset the timer for the next element if any remain.
127 if !self.queue.is_empty() {
128 self.set_timer();
129 } else {
130 self.timer = None;
131 }
132
133 Poll::Ready(Some(element))
134 }
135}
136
137// It is safe to implement Unpin manually because our type does not use any self-referential patterns.
138impl<T> Unpin for TimeQueue<T> {}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use futures_util::StreamExt;
144 use std::time::Duration;
145 use tokio::time::advance;
146
147 /// Test that polling an empty queue remains pending.
148 #[tokio::test(start_paused = true)]
149 async fn test_empty_queue() {
150 let timeout = Duration::from_secs(600);
151 let mut queue: TimeQueue<u64> = TimeQueue::new(timeout);
152 // Without any insertions, polling should never yield an element.
153 tokio::select! {
154 biased;
155 _ = queue.next() => panic!("Queue should be empty and pending"),
156 _ = tokio::time::sleep(Duration::from_millis(50)) => {},
157 }
158 }
159
160 /// Test that an element is not returned before its timeout expires.
161 #[tokio::test(start_paused = true)]
162 async fn test_element_not_ready_immediately() {
163 let timeout = Duration::from_secs(600);
164 let mut queue = TimeQueue::new(timeout);
165 queue.push(42);
166 // Immediately polling should not return the element.
167 tokio::select! {
168 biased;
169 _ = queue.next() => panic!("Element should not be ready immediately"),
170 _ = tokio::time::sleep(Duration::from_millis(50)) => {},
171 }
172 }
173
174 /// Test that multiple elements inserted are yielded in FIFO order
175 /// once their individual timeouts have elapsed.
176 #[tokio::test(start_paused = true)]
177 async fn test_time_queue_order_with_insertion_gap() {
178 let timeout = Duration::from_secs(600);
179 let mut queue = TimeQueue::new(timeout);
180 // Insert the first element.
181 queue.push(1);
182 // Advance time by half the timeout, then insert another.
183 advance(timeout / 2).await;
184 queue.push(2);
185 // Advance time so that the first element expires but the second isn't ready yet.
186 advance(timeout / 2).await;
187 // Now the first element should be ready.
188 assert_eq!(queue.next().await, Some(1));
189 // The second element should not be ready yet.
190 tokio::select! {
191 biased;
192 _ = queue.next() => panic!("Second element should not be ready yet"),
193 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
194 }
195 // Advance time so that the second element expires.
196 advance(timeout / 2).await;
197 assert_eq!(queue.next().await, Some(2));
198 }
199
200 /// Test that repeated polls before an element is ready remain pending.
201 #[tokio::test(start_paused = true)]
202 async fn test_repeated_polling() {
203 let timeout = Duration::from_secs(600);
204 let mut queue = TimeQueue::new(timeout);
205 queue.push(100);
206 // Poll several times within the timeout period.
207 for _ in 0..5 {
208 tokio::select! {
209 biased;
210 _ = queue.next() => panic!("Element should not be ready yet"),
211 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
212 }
213 }
214 // Advance time and ensure the element is then ready.
215 advance(timeout).await;
216 assert_eq!(queue.next().await, Some(100));
217 }
218
219 /// Test that inserting an element after a previous one has expired behaves correctly.
220 #[tokio::test(start_paused = true)]
221 async fn test_insert_after_timeout() {
222 let timeout = Duration::from_secs(600);
223 let mut queue = TimeQueue::new(timeout);
224 queue.push(100);
225 advance(timeout).await;
226 assert_eq!(queue.next().await, Some(100));
227
228 // Insert another element after the first has expired.
229 queue.push(200);
230 // Immediately after insertion, it should not be ready.
231 tokio::select! {
232 biased;
233 _ = queue.next() => panic!("Element should not be ready immediately"),
234 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
235 }
236 // Advance time so that the new element expires.
237 advance(timeout).await;
238 assert_eq!(queue.next().await, Some(200));
239 }
240
241 /// Test interleaved insertion and polling to ensure that the timer
242 /// is correctly re-armed and elements are yielded in the proper order.
243 #[tokio::test(start_paused = true)]
244 async fn test_interleaved_inserts() {
245 let timeout = Duration::from_secs(600);
246 let mut queue = TimeQueue::new(timeout);
247
248 // Insert a couple of elements.
249 queue.push(10);
250 queue.push(20);
251
252 // Advance time just past the timeout of the first element.
253 advance(timeout).await;
254 assert_eq!(queue.next().await, Some(10));
255
256 // Immediately insert a new element.
257 queue.push(30);
258
259 // The second element (20) should be ready since its timeout expired.
260 assert_eq!(queue.next().await, Some(20));
261
262 // The newly inserted element should not be ready until its own timeout.
263 tokio::select! {
264 biased;
265 _ = queue.next() => panic!("Newly inserted element should not be ready immediately"),
266 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
267 }
268 advance(timeout).await;
269 assert_eq!(queue.next().await, Some(30));
270 }
271
272 /// Test that `poll_next` is cancellation safe.
273 #[tokio::test(start_paused = true)]
274 async fn test_poll_next_cancellation_safety() {
275 let timeout = Duration::from_secs(600);
276 let mut queue = TimeQueue::new(timeout);
277 queue.push(42);
278
279 // Poll the queue once, but do not await the result.
280 let mut poll_future = Box::pin(queue.next());
281 let waker = futures_util::task::noop_waker();
282 let mut cx = Context::from_waker(&waker);
283
284 // Poll the future to ensure it registers the waker.
285 assert!(poll_future.as_mut().poll(&mut cx).is_pending());
286
287 // Drop the future to simulate cancellation.
288 drop(poll_future);
289
290 // Ensure the queue is still in a consistent state.
291 assert_eq!(queue.len(), 1);
292
293 // Advance time and ensure the element is then ready.
294 advance(timeout).await;
295 assert_eq!(queue.next().await, Some(42));
296 }
297}