use futures::{
future::FusedFuture,
stream::{FusedStream, Stream},
};
use std::{
iter::FusedIterator,
collections::{BinaryHeap, VecDeque},
future::Future,
pin::Pin,
sync::{Mutex, Arc, atomic::{AtomicBool, Ordering}},
task::{Context, Poll, Waker},
};
pub struct PriorityQueue<T> {
inner: Mutex<(BinaryHeap<T>, VecDeque<(Waker, Arc<AtomicBool>)>)>,
}
impl<T: Ord> Default for PriorityQueue<T> {
fn default() -> Self {
Self {
inner: Mutex::new((BinaryHeap::new(), VecDeque::new())),
}
}
}
impl<T: Ord> PriorityQueue<T> {
pub fn new() -> Self {
Self::default()
}
pub fn push(&self, item: T) {
let mut inner = self.inner.lock().unwrap();
inner.0.push(item);
if let Some((w, woken)) = inner.1.pop_front() {
woken.store(true, Ordering::Relaxed);
Waker::wake(w)
}
}
pub fn try_pop(&self) -> Option<T> {
self.inner.lock().unwrap().0.pop()
}
pub fn pop(&self) -> PopFut<T> {
PopFut {
queue: self,
terminated: false,
woken: None,
}
}
pub fn incoming(&self) -> IncomingStream<T> {
IncomingStream {
queue: self,
woken: None,
}
}
pub fn pending(&self) -> impl Iterator<Item = T> + '_ {
std::iter::from_fn(move || self.inner.lock().unwrap().0.pop())
}
pub fn drain(&self) -> impl ExactSizeIterator<Item = T> + FusedIterator {
std::mem::take(&mut self.inner.lock().unwrap().0).into_iter()
}
pub fn len(&self) -> usize {
self.inner.lock().unwrap().0.len()
}
pub fn is_empty(&self) -> bool {
self.inner.lock().unwrap().0.is_empty()
}
}
pub struct PopFut<'a, T> {
queue: &'a PriorityQueue<T>,
terminated: bool,
woken: Option<Arc<AtomicBool>>,
}
impl<'a, T: Ord> Future for PopFut<'a, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.queue.inner.lock().unwrap();
match inner.0.pop() {
_ if self.terminated => Poll::Pending,
Some(entry) => {
self.terminated = true;
self.woken = None;
Poll::Ready(entry)
}
None => {
let woken = Arc::new(AtomicBool::new(false));
inner.1.push_back((cx.waker().clone(), woken.clone()));
self.woken = Some(woken);
Poll::Pending
}
}
}
}
impl<'a, T: Ord> FusedFuture for PopFut<'a, T> {
fn is_terminated(&self) -> bool {
self.terminated
}
}
impl<'a, T> Drop for PopFut<'a, T> {
fn drop(&mut self) {
if self.woken.take().map_or(false, |w| w.load(Ordering::Relaxed)) {
if let Some((w, woken)) = self.queue.inner.lock().unwrap().1.pop_front() {
woken.store(true, Ordering::Relaxed);
Waker::wake(w)
}
}
}
}
pub struct IncomingStream<'a, T> {
queue: &'a PriorityQueue<T>,
woken: Option<Arc<AtomicBool>>,
}
impl<'a, T: Ord> Stream for IncomingStream<'a, T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut inner = self.queue.inner.lock().unwrap();
match inner.0.pop() {
Some(entry) => {
self.woken = None;
Poll::Ready(Some(entry))
},
None => {
let woken = match self.woken.clone() {
Some(mut woken) => match Arc::get_mut(&mut woken) {
Some(w) => { *w.get_mut() = false; woken },
None => Arc::new(AtomicBool::new(false)),
},
None => Arc::new(AtomicBool::new(false)),
};
inner.1.push_back((cx.waker().clone(), woken.clone()));
self.woken = Some(woken);
Poll::Pending
}
}
}
}
impl<'a, T: Ord> FusedStream for IncomingStream<'a, T> {
fn is_terminated(&self) -> bool {
false
}
}
impl<'a, T> Drop for IncomingStream<'a, T> {
fn drop(&mut self) {
if self.woken.take().map_or(false, |w| w.load(Ordering::Relaxed)) {
if let Some((w, woken)) = self.queue.inner.lock().unwrap().1.pop_front() {
woken.store(true, Ordering::Relaxed);
Waker::wake(w)
}
}
}
}