rex/queue.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
#![allow(dead_code)]
use std::{
collections::VecDeque,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll, Waker},
};
use futures::stream::Stream;
use parking_lot::Mutex;
// Contains a waker for a given stream
// as well as a boolean determining whether
// stream has been woken
#[derive(Debug)]
struct ReceiverNotifier {
handle: Waker,
awake: Arc<AtomicBool>,
}
// holds inner VecDeque as well as the notification buffer
// letting streams know when polling is ready
// To avoid a "bowtie" effect when consuming
// objects inserted with .push_front and .push_back
// front and back values are separated into their own queue
// so that values are popped in chronological order
// irrespective of priority
// ┌ timestamp value (bigger is younger)
// ^
// │| |
// │||| |||
// -|||||||||||
// │||| |||
// │| |
// └─────|─────>
// queue position
struct RawDeque<T> {
front_values: VecDeque<T>,
back_values: VecDeque<T>,
rx_notifiers: VecDeque<ReceiverNotifier>,
}
impl<T> RawDeque<T> {
const fn new() -> Self {
Self {
front_values: VecDeque::new(),
back_values: VecDeque::new(),
rx_notifiers: VecDeque::new(),
}
}
}
impl<T> RawDeque<T> {
// waker first receiver to poll for values
fn notify_rx(&mut self) {
if let Some(n) = self.rx_notifiers.pop_front() {
n.handle.wake();
n.awake.store(true, Ordering::Relaxed);
}
}
}
/// This type acts similarly to `std::collections::VecDeque` but
/// modifying queue is async
pub struct StreamableDeque<T> {
inner: Mutex<RawDeque<T>>,
}
impl<T> std::fmt::Debug for StreamableDeque<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamableDeque { ... }").finish()
}
}
impl<T> Default for StreamableDeque<T> {
fn default() -> Self {
Self {
inner: Mutex::new(RawDeque::new()),
}
}
}
impl<T> StreamableDeque<T> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Push an item into the queue and notify first receiver
pub fn push_front(&self, item: T) {
let mut inner = self.inner.lock();
inner.front_values.push_back(item);
// Notify first receiver in queue
inner.notify_rx();
}
/// Push an item into the back of the queue and notify first receiver
pub fn push_back(&self, item: T) {
let mut inner = self.inner.lock();
inner.back_values.push_back(item);
// Notify first receiver in queue
inner.notify_rx();
}
/// Returns a stream of items using `pop_front()`
/// This opens us up to handle a `back_stream()` as well
pub const fn stream(&self) -> StreamReceiver<T> {
StreamReceiver {
queue: self,
awake: None,
}
}
pub fn pop_front(&self) -> Option<T> {
let mut inner = self.inner.lock();
inner
.front_values
.pop_front()
.or_else(|| inner.back_values.pop_front())
}
#[cfg(test)]
pub(crate) fn pop_back(&self) -> Option<T> {
let mut inner = self.inner.lock();
inner
.back_values
.pop_back()
.or_else(|| inner.front_values.pop_back())
}
}
/// A stream of items removed from the priority queue.
pub struct StreamReceiver<'a, T> {
queue: &'a StreamableDeque<T>,
awake: Option<Arc<AtomicBool>>,
}
impl<'a, T> Stream for StreamReceiver<'a, T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let mut inner = self.queue.inner.lock();
let value = inner
.front_values
.pop_front()
.or_else(|| inner.back_values.pop_front());
if let Some(v) = value {
self.awake = None;
Poll::Ready(Some(v))
} else {
// TODO avoid allocation of a new AtomicBool if possible
let awake = Arc::new(AtomicBool::new(false));
// push stream's waker onto buffer
inner.rx_notifiers.push_back(ReceiverNotifier {
handle: ctx.waker().clone(),
awake: awake.clone(),
});
self.awake = Some(awake);
drop(inner);
Poll::Pending
}
}
}
impl<'a, T> Drop for StreamReceiver<'a, T> {
// if a stream gets dropped, notify next receiver in queue
fn drop(&mut self) {
let awake = self.awake.take().map(|w| w.load(Ordering::Relaxed));
if awake == Some(true) {
let mut queue_wakers = self.queue.inner.lock();
// StreamReceiver was woken by a None, notify another
if let Some(n) = queue_wakers.rx_notifiers.pop_front() {
n.awake.store(true, Ordering::Relaxed);
n.handle.wake();
}
}
}
}
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use super::*;
#[tokio::test]
async fn streamable_deque() {
let queue = Arc::new(StreamableDeque::<i32>::new());
let pos_queue = queue.clone();
tokio::spawn(async move {
for i in 0..=10 {
pos_queue.push_back(i);
}
});
let neg_queue = queue.clone();
tokio::spawn(async move {
for i in -10..=-1 {
neg_queue.push_front(i);
}
});
let mut rx_vec = vec![];
let mut stream = queue.stream().enumerate();
while let Some((i, v)) = stream.next().await {
rx_vec.push(v);
if i >= 20 {
break;
}
}
// we should guarantee that positive and negative numbers have been pushed out of order
// but push_front and push_back should guarantee that they are sorted
let expected_vec: Vec<i32> = (-10..=10).collect();
assert_eq!(expected_vec, rx_vec);
}
}