1use std::cmp::Reverse;
32use std::collections::BinaryHeap;
33use std::sync::Arc;
34use std::sync::Mutex;
35use std::time::Duration;
36use std::time::Instant;
37
38use tokio::sync::Notify;
39use tokio::time::timeout;
40
41#[derive(Eq, PartialEq, Ord, PartialOrd)]
42struct Item<T>
43where
44 T: Ord,
45{
46 expiration: Reverse<Option<Instant>>,
47 inner: T,
48}
49
50struct SharedInner<T>
51where
52 T: Ord,
53{
54 storage: Mutex<BinaryHeap<Item<T>>>,
55 notify: Notify,
56}
57
58#[derive(Clone)]
59pub struct TimedQueue<T>
60where
61 T: Ord,
62{
63 inner: Arc<SharedInner<T>>,
64}
65
66impl<T> TimedQueue<T>
67where
68 T: Ord,
69{
70 pub fn new() -> Self {
71 Self {
72 inner: Arc::new(SharedInner {
73 storage: Mutex::new(BinaryHeap::new()),
74 notify: Notify::new(),
75 }),
76 }
77 }
78 pub fn enqueue(&self, t: T, expiration: Option<Instant>) {
79 self.inner.storage.lock().unwrap().push(Item {
80 expiration: Reverse(expiration),
81 inner: t,
82 });
83 self.inner.notify.notify_one();
84 }
85
86 fn peek_inner(&self) -> Result<(T, Option<Instant>), Option<Duration>> {
87 let now = Instant::now();
88 let mut lock = self.inner.storage.lock().unwrap();
89 let (ready, duration) = match lock.peek() {
90 Some(Item {
91 expiration: Reverse(Some(expiration)),
92 ..
93 }) => {
94 if *expiration < now {
95 (true, None)
96 } else {
97 (false, Some(*expiration - now))
98 }
99 }
100 Some(Item {
101 expiration: Reverse(None),
102 ..
103 }) => (true, None),
104 None => (false, None),
105 };
106 if ready {
107 let Item {
108 expiration: Reverse(expiration),
109 inner: item,
110 } = lock.pop().unwrap();
111 Ok((item, expiration))
112 } else {
113 Err(duration)
114 }
115 }
116
117 pub async fn dequeue(&self) -> (T, Option<Instant>) {
118 loop {
119 match self.peek_inner() {
120 Ok((item, duration)) => {
121 break (item, duration);
122 }
123 Err(Some(duration)) => {
124 let _ = timeout(duration, self.inner.notify.notified()).await;
125 }
126 Err(None) => {
127 self.inner.notify.notified().await;
128 }
129 }
130 }
131 }
132}