use atomic_waker::AtomicWaker;
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
#[derive(Debug)]
pub struct WakerQueue<T> {
queue: ConcurrentQueue<T>,
waker: AtomicWaker,
}
impl<T: 'static + Send> WakerQueue<T> {
pub fn bounded(size: usize) -> WakerQueue<T> {
WakerQueue {
queue: ConcurrentQueue::bounded(size),
waker: AtomicWaker::new(),
}
}
pub fn unbounded() -> WakerQueue<T> {
WakerQueue {
queue: ConcurrentQueue::unbounded(),
waker: AtomicWaker::new(),
}
}
pub fn is_empty(&self) -> bool { self.queue.is_empty() }
pub fn is_full(&self) -> bool { self.queue.is_full() }
pub fn len(&self) -> usize { self.queue.len() }
pub fn capacity(&self) -> Option<usize> { self.queue.capacity() }
pub fn close(&self) { self.queue.close(); }
pub fn is_closed(&self) -> bool { self.queue.is_closed() }
pub fn try_push(&self, value: T) -> Result<(), PushError<T>> { self.queue.push(value) }
pub fn try_push_wake(&self, value: T, wake: bool) -> Result<(), PushError<T>> {
let ret = self.try_push(value);
self.wake_if(ret.is_ok() && wake);
ret
}
pub fn try_push_wake_empty(&self, value: T) -> Result<(), PushError<T>> {
self.try_push_wake(value, self.is_empty())
}
pub fn try_push_wake_full(&self, value: T) -> Result<(), PushError<T>> {
self.try_push_wake(value, self.is_full())
}
pub fn try_pop(&self) -> Result<T, PopError> { self.queue.pop() }
pub fn try_pop_wake(&self, wake: bool) -> Result<T, PopError> {
let ret = self.try_pop();
self.wake_if(ret.is_ok() && wake);
ret
}
pub fn try_pop_wake_empty(&self) -> Result<T, PopError> { self.try_pop_wake(self.is_empty()) }
pub fn try_pop_wake_full(&self) -> Result<T, PopError> { self.try_pop_wake(self.is_full()) }
pub fn push<'a, F>(&'a self, value: T, wake_if: F) -> Push<'a, T, F>
where F: Fn(&'a WakerQueue<T>) -> bool {
Push::new(self, value, wake_if)
}
pub fn pop<'a, F>(&'a self, wake_if: F) -> Pop<'a, T, F>
where F: Fn(&'a WakerQueue<T>) -> bool {
Pop::new(self, wake_if)
}
pub fn register(&self, waker: &Waker) { self.waker.register(waker); }
pub fn wake(&self) { self.waker.wake(); }
pub fn wake_if(&self, wake: bool) { if wake { self.wake(); } }
pub fn poll_pop(&self, ctx: &Context) -> Result<T, PopError> {
match self.try_pop() {
Err(PopError::Empty) => {
self.register(ctx.waker());
self.try_pop() }
other => other,
}
}
pub fn poll_push(&self, value: T, ctx: &Context) -> Result<(), PushError<T>> {
match self.try_push(value) {
Err(PushError::Full(value)) => {
self.register(ctx.waker());
self.try_push(value) }
other => other,
}
}
}
unsafe impl<T: 'static + Send> Send for WakerQueue<T> {}
unsafe impl<T: 'static + Send> Sync for WakerQueue<T> {}
pin_project! {
pub struct Push<'a, T, F> {
queue: &'a WakerQueue<T>,
value: Option<T>,
wake_if: F,
}
}
impl<'a, T, F> Push<'a, T, F>
where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
fn new(queue: &'a WakerQueue<T>, value: T, wake_if: F) -> Push<'a, T, F> {
Push { queue, value: Some(value), wake_if }
}
}
impl<'a, T, F> Future for Push<'a, T, F>
where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
type Output = Result<(), PushError<T>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let value = this.value.take().expect("Do not poll futures after completion.");
let wake_if = (this.wake_if)(&this.queue);
match this.queue.poll_push(value, ctx) {
Ok(()) => {
if wake_if { this.queue.wake(); }
Poll::Ready(Ok(()))
}
Err(PushError::Closed(value)) =>
Poll::Ready(Err(PushError::Closed(value))),
Err(PushError::Full(value)) => {
*this.value = Some(value);
Poll::Pending
}
}
}
}
pin_project! {
pub struct Pop<'a, T, F> {
queue: &'a WakerQueue<T>,
wake_if: F,
}
}
impl<'a, T, F> Pop<'a, T, F>
where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
fn new(queue: &'a WakerQueue<T>, wake_if: F) -> Pop<'a, T, F> {
Pop { queue, wake_if }
}
}
impl<'a, T, F> Future for Pop<'a, T, F>
where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool {
type Output = Result<T, PopError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, PopError>> {
let this = self.project();
let wake_if = (this.wake_if)(&this.queue);
match this.queue.poll_pop(ctx) {
Ok(val) => {
if wake_if { this.queue.wake(); }
Poll::Ready(Ok(val))
}
Err(PopError::Closed) => Poll::Ready(Err(PopError::Closed)),
Err(PopError::Empty) => Poll::Pending,
}
}
}