use futures_core::Stream;
use std::{
collections::VecDeque,
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::time::{sleep_until, Instant, Sleep};
pub struct TimeQueue<T> {
timeout: Duration,
queue: VecDeque<(Instant, T)>,
timer: Pin<Box<Sleep>>,
}
impl<T> TimeQueue<T> {
pub fn new(timeout: Duration) -> Self {
Self::with_capacity(timeout, 0)
}
pub fn with_capacity(timeout: Duration, capacity: usize) -> Self {
Self {
timeout,
queue: VecDeque::with_capacity(capacity),
timer: Box::pin(sleep_until(Instant::now() + timeout)),
}
}
pub fn push(&mut self, element: T) {
let expire_time = Instant::now() + self.timeout;
self.queue.push_back((expire_time, element));
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
#[inline(always)]
pub fn len(&self) -> usize {
self.queue.len()
}
}
impl<T> Stream for TimeQueue<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Some(&(expiration, _)) = self.queue.front() else {
return Poll::Pending;
};
if expiration <= Instant::now() {
return Poll::Ready(self.queue.pop_front().map(|(_, elem)| elem));
}
if self.timer.deadline() < expiration {
self.timer.as_mut().reset(expiration);
}
let _ = ready!(self.timer.as_mut().poll(cx));
Poll::Ready(self.queue.pop_front().map(|(_, elem)| elem))
}
}
impl<T> Unpin for TimeQueue<T> {}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
use std::time::Duration;
use tokio::time::advance;
#[tokio::test(start_paused = true)]
async fn test_empty_queue() {
let timeout = Duration::from_secs(600);
let mut queue: TimeQueue<u64> = TimeQueue::new(timeout);
tokio::select! {
biased;
_ = queue.next() => panic!("Queue should be empty and pending"),
_ = tokio::time::sleep(Duration::from_millis(50)) => {},
}
}
#[tokio::test(start_paused = true)]
async fn test_element_not_ready_immediately() {
let timeout = Duration::from_secs(600);
let mut queue = TimeQueue::new(timeout);
queue.push(42);
tokio::select! {
biased;
_ = queue.next() => panic!("Element should not be ready immediately"),
_ = tokio::time::sleep(Duration::from_millis(50)) => {},
}
}
#[tokio::test(start_paused = true)]
async fn test_time_queue_order_with_insertion_gap() {
let timeout = Duration::from_secs(600);
let mut queue = TimeQueue::new(timeout);
queue.push(1);
advance(timeout / 2).await;
queue.push(2);
advance(timeout / 2).await;
assert_eq!(queue.next().await, Some(1));
tokio::select! {
biased;
_ = queue.next() => panic!("Second element should not be ready yet"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
}
advance(timeout / 2).await;
assert_eq!(queue.next().await, Some(2));
}
#[tokio::test(start_paused = true)]
async fn test_repeated_polling() {
let timeout = Duration::from_secs(600);
let mut queue = TimeQueue::new(timeout);
queue.push(100);
for _ in 0..5 {
tokio::select! {
biased;
_ = queue.next() => panic!("Element should not be ready yet"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
}
}
advance(timeout).await;
assert_eq!(queue.next().await, Some(100));
}
#[tokio::test(start_paused = true)]
async fn test_insert_after_timeout() {
let timeout = Duration::from_secs(600);
let mut queue = TimeQueue::new(timeout);
queue.push(100);
advance(timeout).await;
assert_eq!(queue.next().await, Some(100));
queue.push(200);
tokio::select! {
biased;
_ = queue.next() => panic!("Element should not be ready immediately"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
}
advance(timeout).await;
assert_eq!(queue.next().await, Some(200));
}
#[tokio::test(start_paused = true)]
async fn test_interleaved_inserts() {
let timeout = Duration::from_secs(600);
let mut queue = TimeQueue::new(timeout);
queue.push(10);
queue.push(20);
advance(timeout).await;
assert_eq!(queue.next().await, Some(10));
queue.push(30);
assert_eq!(queue.next().await, Some(20));
tokio::select! {
biased;
_ = queue.next() => panic!("Newly inserted element should not be ready immediately"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
}
advance(timeout).await;
assert_eq!(queue.next().await, Some(30));
}
#[tokio::test(start_paused = true)]
async fn test_poll_next_cancellation_safety() {
let timeout = Duration::from_secs(600);
let mut queue = TimeQueue::new(timeout);
queue.push(42);
let mut poll_future = Box::pin(queue.next());
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(poll_future.as_mut().poll(&mut cx).is_pending());
drop(poll_future);
assert_eq!(queue.len(), 1);
advance(timeout).await;
assert_eq!(queue.next().await, Some(42));
}
}