async_priority_queue/
lib.rs1use futures::{
7 future::FusedFuture,
8 stream::{FusedStream, Stream},
9};
10
11use std::{
12 iter::FusedIterator,
13 collections::{BinaryHeap, VecDeque},
14 future::Future,
15 pin::Pin,
16 sync::{Mutex, Arc, atomic::{AtomicBool, Ordering}},
17 task::{Context, Poll, Waker},
18};
19
20pub struct PriorityQueue<T> {
22 inner: Mutex<(BinaryHeap<T>, VecDeque<(Waker, Arc<AtomicBool>)>)>,
23}
24
25impl<T: Ord> Default for PriorityQueue<T> {
26 fn default() -> Self {
27 Self {
28 inner: Mutex::new((BinaryHeap::new(), VecDeque::new())),
29 }
30 }
31}
32
33impl<T: Ord> PriorityQueue<T> {
34 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn push(&self, item: T) {
41 let mut inner = self.inner.lock().unwrap();
42
43 inner.0.push(item);
44
45 if let Some((w, woken)) = inner.1.pop_front() {
46 woken.store(true, Ordering::Relaxed);
47 Waker::wake(w)
48 }
49 }
50
51 pub fn try_pop(&self) -> Option<T> {
54 self.inner.lock().unwrap().0.pop()
55 }
56
57 pub fn pop(&self) -> PopFut<T> {
62 PopFut {
63 queue: self,
64 terminated: false,
65 woken: None,
66 }
67 }
68
69 pub fn incoming(&self) -> IncomingStream<T> {
73 IncomingStream {
74 queue: self,
75 woken: None,
76 }
77 }
78
79 pub fn pending(&self) -> impl Iterator<Item = T> + '_ {
82 std::iter::from_fn(move || self.inner.lock().unwrap().0.pop())
83 }
84
85 pub fn drain(&self) -> impl ExactSizeIterator<Item = T> + FusedIterator {
87 std::mem::take(&mut self.inner.lock().unwrap().0).into_iter()
88 }
89
90 pub fn len(&self) -> usize {
95 self.inner.lock().unwrap().0.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
103 self.inner.lock().unwrap().0.is_empty()
104 }
105}
106
107pub struct PopFut<'a, T> {
109 queue: &'a PriorityQueue<T>,
110 terminated: bool,
111 woken: Option<Arc<AtomicBool>>,
112}
113
114impl<'a, T: Ord> Future for PopFut<'a, T> {
115 type Output = T;
116
117 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
118 let mut inner = self.queue.inner.lock().unwrap();
119
120 match inner.0.pop() {
121 _ if self.terminated => Poll::Pending,
122 Some(entry) => {
123 self.terminated = true;
124 self.woken = None;
125 Poll::Ready(entry)
126 }
127 None => {
128 let woken = Arc::new(AtomicBool::new(false));
129 inner.1.push_back((cx.waker().clone(), woken.clone()));
130 self.woken = Some(woken);
131 Poll::Pending
132 }
133 }
134 }
135}
136
137impl<'a, T: Ord> FusedFuture for PopFut<'a, T> {
138 fn is_terminated(&self) -> bool {
139 self.terminated
140 }
141}
142
143impl<'a, T> Drop for PopFut<'a, T> {
144 fn drop(&mut self) {
145 if self.woken.take().map_or(false, |w| w.load(Ordering::Relaxed)) {
147 if let Some((w, woken)) = self.queue.inner.lock().unwrap().1.pop_front() {
148 woken.store(true, Ordering::Relaxed);
149 Waker::wake(w)
150 }
151 }
152 }
153}
154
155pub struct IncomingStream<'a, T> {
157 queue: &'a PriorityQueue<T>,
158 woken: Option<Arc<AtomicBool>>,
159}
160
161impl<'a, T: Ord> Stream for IncomingStream<'a, T> {
162 type Item = T;
163
164 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
165 let mut inner = self.queue.inner.lock().unwrap();
166
167 match inner.0.pop() {
168 Some(entry) => {
169 self.woken = None;
170 Poll::Ready(Some(entry))
171 },
172 None => {
173 let woken = match self.woken.clone() {
175 Some(mut woken) => match Arc::get_mut(&mut woken) {
176 Some(w) => { *w.get_mut() = false; woken },
177 None => Arc::new(AtomicBool::new(false)),
178 },
179 None => Arc::new(AtomicBool::new(false)),
180 };
181 inner.1.push_back((cx.waker().clone(), woken.clone()));
182 self.woken = Some(woken);
183 Poll::Pending
184 }
185 }
186 }
187}
188
189impl<'a, T: Ord> FusedStream for IncomingStream<'a, T> {
190 fn is_terminated(&self) -> bool {
191 false
192 }
193}
194
195impl<'a, T> Drop for IncomingStream<'a, T> {
196 fn drop(&mut self) {
197 if self.woken.take().map_or(false, |w| w.load(Ordering::Relaxed)) {
199 if let Some((w, woken)) = self.queue.inner.lock().unwrap().1.pop_front() {
200 woken.store(true, Ordering::Relaxed);
201 Waker::wake(w)
202 }
203 }
204 }
205}