use core::{
pin::Pin,
task::{
Context,
Poll,
Waker,
},
};
use alloc::{
collections::VecDeque,
sync::Arc,
};
use futures::prelude::*;
use lock_api::{
Mutex,
RawMutex,
};
struct QueueInner<T> {
queue: VecDeque<T>,
wakers: VecDeque<Waker>,
senders: usize,
}
struct Queue<R: RawMutex, T> {
inner: Arc<Mutex<R, QueueInner<T>>>,
}
impl<R: RawMutex, T> Clone for Queue<R, T> {
fn clone(&self) -> Self {
Queue {
inner: self.inner.clone(),
}
}
}
impl<R: RawMutex, T> Queue<R, T> {
fn new() -> Self {
Queue {
inner: Arc::new(Mutex::new(QueueInner {
queue: Default::default(),
wakers: Default::default(),
senders: 0,
})),
}
}
fn enqueue(&self, item: T) {
let mut inner = self.inner.lock();
inner.queue.push_back(item);
inner.wakers.drain(..).for_each(Waker::wake);
}
fn poll_dequeue(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let this = self.get_mut();
let mut inner = this.inner.lock();
if let Some(item) = inner.queue.pop_front() {
Poll::Ready(Some(item))
} else {
if inner.senders == 0 {
return Poll::Ready(None);
}
if !inner.wakers.iter().any(|w| cx.waker().will_wake(w)) {
inner.wakers.push_back(cx.waker().clone());
}
Poll::Pending
}
}
}
pub struct Sender<R: RawMutex, T> {
queue: Queue<R, T>,
}
impl<R: RawMutex, T> Sender<R, T> {
fn new(queue: Queue<R, T>) -> Self {
let mut inner = queue.inner.lock();
inner.senders += 1;
drop(inner);
Self { queue }
}
pub fn send(&self, item: T) {
self.queue.enqueue(item);
}
}
impl<R: RawMutex, T> Clone for Sender<R, T> {
fn clone(&self) -> Self {
Self::new(self.queue.clone())
}
}
impl<R: RawMutex, T> Drop for Sender<R, T> {
fn drop(&mut self) {
let mut queue = self.queue.inner.lock();
queue.senders -= 1;
}
}
pub struct Receiver<R: RawMutex, T> {
queue: Queue<R, T>,
}
impl<R: RawMutex, T> Receiver<R, T> {
fn new(queue: Queue<R, T>) -> Self {
Self { queue }
}
}
impl<R: RawMutex, T> Clone for Receiver<R, T> {
fn clone(&self) -> Self {
Self::new(self.queue.clone())
}
}
impl<R: RawMutex, T> Stream for Receiver<R, T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Pin::new(&mut this.queue).poll_dequeue(cx)
}
}
impl<R: RawMutex, T> Sink<T> for Sender<R, T> {
type Error = !;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), !> {
Sender::send(self.get_mut(), item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
Poll::Ready(Ok(()))
}
}
pub fn channel<R: RawMutex, T>() -> (Sender<R, T>, Receiver<R, T>) {
let queue = Queue::new();
(Sender::new(queue.clone()), Receiver::new(queue))
}