rex/
queue.rs

1#![allow(dead_code)]
2
3use std::{
4    collections::VecDeque,
5    pin::Pin,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10    task::{Context, Poll, Waker},
11};
12
13use futures::stream::Stream;
14use parking_lot::Mutex;
15
16// Contains a waker for a given stream
17// as well as a boolean determining whether
18// stream has been woken
19#[derive(Debug)]
20struct ReceiverNotifier {
21    handle: Waker,
22    awake: Arc<AtomicBool>,
23}
24
25// holds inner VecDeque as well as the notification buffer
26// letting streams know when polling is ready
27// To avoid a "bowtie" effect when consuming
28// objects inserted with .push_front and .push_back
29// front and back values are separated into their own queue
30// so that values are popped in chronological order
31// irrespective of priority
32// ┌ timestamp value (bigger is younger)
33// ^
34// │|         |
35// │|||     |||
36// -|||||||||||
37// │|||     |||
38// │|         |
39// └─────|─────>
40//  queue position
41struct RawDeque<T> {
42    front_values: VecDeque<T>,
43    back_values: VecDeque<T>,
44    rx_notifiers: VecDeque<ReceiverNotifier>,
45}
46
47impl<T> RawDeque<T> {
48    const fn new() -> Self {
49        Self {
50            front_values: VecDeque::new(),
51            back_values: VecDeque::new(),
52            rx_notifiers: VecDeque::new(),
53        }
54    }
55}
56
57impl<T> RawDeque<T> {
58    // waker first receiver to poll for values
59    fn notify_rx(&mut self) {
60        if let Some(n) = self.rx_notifiers.pop_front() {
61            n.handle.wake();
62            n.awake.store(true, Ordering::Relaxed);
63        }
64    }
65}
66
67/// This type acts similarly to `std::collections::VecDeque` but
68/// modifying queue is async
69pub struct StreamableDeque<T> {
70    inner: Mutex<RawDeque<T>>,
71}
72
73impl<T> std::fmt::Debug for StreamableDeque<T> {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("StreamableDeque { ... }").finish()
76    }
77}
78
79impl<T> Default for StreamableDeque<T> {
80    fn default() -> Self {
81        Self {
82            inner: Mutex::new(RawDeque::new()),
83        }
84    }
85}
86
87impl<T> StreamableDeque<T> {
88    #[must_use]
89    pub fn new() -> Self {
90        Self::default()
91    }
92
93    /// Push an item into the queue and notify first receiver
94    pub fn push_front(&self, item: T) {
95        let mut inner = self.inner.lock();
96        inner.front_values.push_back(item);
97        // Notify first receiver in queue
98        inner.notify_rx();
99    }
100
101    /// Push an item into the back of the queue and notify first receiver
102    pub fn push_back(&self, item: T) {
103        let mut inner = self.inner.lock();
104        inner.back_values.push_back(item);
105        // Notify first receiver in queue
106        inner.notify_rx();
107    }
108
109    /// Returns a stream of items using `pop_front()`
110    /// This opens us up to handle a `back_stream()` as well
111    pub const fn stream(&self) -> StreamReceiver<T> {
112        StreamReceiver {
113            queue: self,
114            awake: None,
115        }
116    }
117
118    pub fn pop_front(&self) -> Option<T> {
119        let mut inner = self.inner.lock();
120        inner
121            .front_values
122            .pop_front()
123            .or_else(|| inner.back_values.pop_front())
124    }
125
126    #[cfg(test)]
127    pub(crate) fn pop_back(&self) -> Option<T> {
128        let mut inner = self.inner.lock();
129        inner
130            .back_values
131            .pop_back()
132            .or_else(|| inner.front_values.pop_back())
133    }
134}
135
136/// A stream of items removed from the priority queue.
137pub struct StreamReceiver<'a, T> {
138    queue: &'a StreamableDeque<T>,
139    awake: Option<Arc<AtomicBool>>,
140}
141
142impl<T> Stream for StreamReceiver<'_, T> {
143    type Item = T;
144
145    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
146        let mut inner = self.queue.inner.lock();
147
148        let value = inner
149            .front_values
150            .pop_front()
151            .or_else(|| inner.back_values.pop_front());
152
153        if let Some(v) = value {
154            self.awake = None;
155            Poll::Ready(Some(v))
156        } else {
157            // TODO avoid allocation of a new AtomicBool if possible
158            let awake = Arc::new(AtomicBool::new(false));
159            // push stream's waker onto buffer
160            inner.rx_notifiers.push_back(ReceiverNotifier {
161                handle: ctx.waker().clone(),
162                awake: awake.clone(),
163            });
164            self.awake = Some(awake);
165            drop(inner);
166            Poll::Pending
167        }
168    }
169}
170
171impl<T> Drop for StreamReceiver<'_, T> {
172    // if a stream gets dropped, notify next receiver in queue
173    fn drop(&mut self) {
174        let awake = self.awake.take().map(|w| w.load(Ordering::Relaxed));
175
176        if awake == Some(true) {
177            let mut queue_wakers = self.queue.inner.lock();
178            // StreamReceiver was woken by a None, notify another
179            if let Some(n) = queue_wakers.rx_notifiers.pop_front() {
180                n.awake.store(true, Ordering::Relaxed);
181                n.handle.wake();
182            }
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use futures::stream::StreamExt;
190
191    use super::*;
192
193    #[tokio::test]
194    async fn streamable_deque() {
195        let queue = Arc::new(StreamableDeque::<i32>::new());
196
197        let pos_queue = queue.clone();
198        tokio::spawn(async move {
199            for i in 0..=10 {
200                pos_queue.push_back(i);
201            }
202        });
203
204        let neg_queue = queue.clone();
205        tokio::spawn(async move {
206            for i in -10..=-1 {
207                neg_queue.push_front(i);
208            }
209        });
210
211        let mut rx_vec = vec![];
212
213        let mut stream = queue.stream().enumerate();
214        while let Some((i, v)) = stream.next().await {
215            rx_vec.push(v);
216            if i >= 20 {
217                break;
218            }
219        }
220
221        // we should guarantee that positive and negative numbers have been pushed out of order
222        // but push_front and push_back should guarantee that they are sorted
223        let expected_vec: Vec<i32> = (-10..=10).collect();
224        assert_eq!(expected_vec, rx_vec);
225    }
226}