async_priority_queue/
lib.rs

1// Copyright 2020 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! An async-aware priority queue.
5
6use futures::{
7    future::FusedFuture,
8    stream::{FusedStream, Stream},
9};
10
11use std::{
12    iter::FusedIterator,
13    collections::{BinaryHeap, VecDeque},
14    future::Future,
15    pin::Pin,
16    sync::{Mutex, Arc, atomic::{AtomicBool, Ordering}},
17    task::{Context, Poll, Waker},
18};
19
20/// An async-aware priority queue.
21pub struct PriorityQueue<T> {
22    inner: Mutex<(BinaryHeap<T>, VecDeque<(Waker, Arc<AtomicBool>)>)>,
23}
24
25impl<T: Ord> Default for PriorityQueue<T> {
26    fn default() -> Self {
27        Self {
28            inner: Mutex::new((BinaryHeap::new(), VecDeque::new())),
29        }
30    }
31}
32
33impl<T: Ord> PriorityQueue<T> {
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Pushes an item into the queue. It will be removed in an order consistent with the ordering
39    /// of itself relative to other items in the queue at the time of removal.
40    pub fn push(&self, item: T) {
41        let mut inner = self.inner.lock().unwrap();
42
43        inner.0.push(item);
44
45        if let Some((w, woken)) = inner.1.pop_front() {
46            woken.store(true, Ordering::Relaxed);
47            Waker::wake(w)
48        }
49    }
50
51    /// Attempts to remove the item with the highest priority from the queue, returning [`None`] if
52    /// there are no available items.
53    pub fn try_pop(&self) -> Option<T> {
54        self.inner.lock().unwrap().0.pop()
55    }
56
57    /// Removes the item with the highest priority from the queue, waiting for an item should there
58    /// not be one immediately available.
59    ///
60    /// Items are removed from the queue on a 'first come, first served' basis.
61    pub fn pop(&self) -> PopFut<T> {
62        PopFut {
63            queue: self,
64            terminated: false,
65            woken: None,
66        }
67    }
68
69    /// Returns a stream of highest-priority items from this queue.
70    ///
71    /// Items are removed from the queue on a 'first come, first served' basis.
72    pub fn incoming(&self) -> IncomingStream<T> {
73        IncomingStream {
74            queue: self,
75            woken: None,
76        }
77    }
78
79    /// Returns an iterator of pending items from the queue (i.e: those that have already been inserted). Items will
80    /// only be removed from the queue as the iterator is advanced.
81    pub fn pending(&self) -> impl Iterator<Item = T> + '_ {
82        std::iter::from_fn(move || self.inner.lock().unwrap().0.pop())
83    }
84
85    /// Returns an iterator of items currently occupying the queue, immediately draining the queue.
86    pub fn drain(&self) -> impl ExactSizeIterator<Item = T> + FusedIterator {
87        std::mem::take(&mut self.inner.lock().unwrap().0).into_iter()
88    }
89
90    /// Return the number of items currently occupying the priority queue.
91    ///
92    /// Because the queue is asynchronous, this information should be considered out of date immediately and, as such,
93    /// should only be used for the purpose of logging, heuristics, etc.
94    pub fn len(&self) -> usize {
95        self.inner.lock().unwrap().0.len()
96    }
97
98    /// Return `true` if the priority queue is currently empty.
99    ///
100    /// Because the queue is asynchronous, this information should be considered out of date immediately and, as such,
101    /// should only be used for the purpose of logging, heuristics, etc.
102    pub fn is_empty(&self) -> bool {
103        self.inner.lock().unwrap().0.is_empty()
104    }
105}
106
107/// A future representing an item to be removed from the priority queue.
108pub struct PopFut<'a, T> {
109    queue: &'a PriorityQueue<T>,
110    terminated: bool,
111    woken: Option<Arc<AtomicBool>>,
112}
113
114impl<'a, T: Ord> Future for PopFut<'a, T> {
115    type Output = T;
116
117    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
118        let mut inner = self.queue.inner.lock().unwrap();
119
120        match inner.0.pop() {
121            _ if self.terminated => Poll::Pending,
122            Some(entry) => {
123                self.terminated = true;
124                self.woken = None;
125                Poll::Ready(entry)
126            }
127            None => {
128                let woken = Arc::new(AtomicBool::new(false));
129                inner.1.push_back((cx.waker().clone(), woken.clone()));
130                self.woken = Some(woken);
131                Poll::Pending
132            }
133        }
134    }
135}
136
137impl<'a, T: Ord> FusedFuture for PopFut<'a, T> {
138    fn is_terminated(&self) -> bool {
139        self.terminated
140    }
141}
142
143impl<'a, T> Drop for PopFut<'a, T> {
144    fn drop(&mut self) {
145        // We were woken but didn't receive anything, wake up another
146        if self.woken.take().map_or(false, |w| w.load(Ordering::Relaxed)) {
147            if let Some((w, woken)) = self.queue.inner.lock().unwrap().1.pop_front() {
148                woken.store(true, Ordering::Relaxed);
149                Waker::wake(w)
150            }
151        }
152    }
153}
154
155/// A stream of items removed from the priority queue.
156pub struct IncomingStream<'a, T> {
157    queue: &'a PriorityQueue<T>,
158    woken: Option<Arc<AtomicBool>>,
159}
160
161impl<'a, T: Ord> Stream for IncomingStream<'a, T> {
162    type Item = T;
163
164    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
165        let mut inner = self.queue.inner.lock().unwrap();
166
167        match inner.0.pop() {
168            Some(entry) => {
169                self.woken = None;
170                Poll::Ready(Some(entry))
171            },
172            None => {
173                // Attempt to reuse the `Arc` to avoid an allocation, but don't do so at the risk of missing a wakup
174                let woken = match self.woken.clone() {
175                    Some(mut woken) => match Arc::get_mut(&mut woken) {
176                        Some(w) => { *w.get_mut() = false; woken },
177                        None => Arc::new(AtomicBool::new(false)),
178                    },
179                    None => Arc::new(AtomicBool::new(false)),
180                };
181                inner.1.push_back((cx.waker().clone(), woken.clone()));
182                self.woken = Some(woken);
183                Poll::Pending
184            }
185        }
186    }
187}
188
189impl<'a, T: Ord> FusedStream for IncomingStream<'a, T> {
190    fn is_terminated(&self) -> bool {
191        false
192    }
193}
194
195impl<'a, T> Drop for IncomingStream<'a, T> {
196    fn drop(&mut self) {
197        // We were woken but didn't receive anything, wake up another
198        if self.woken.take().map_or(false, |w| w.load(Ordering::Relaxed)) {
199            if let Some((w, woken)) = self.queue.inner.lock().unwrap().1.pop_front() {
200                woken.store(true, Ordering::Relaxed);
201                Waker::wake(w)
202            }
203        }
204    }
205}