use core::{
cell::RefCell,
pin::Pin,
task::{
Context,
Poll,
Waker,
},
};
use alloc::{
collections::VecDeque,
sync::Arc,
};
use futures::prelude::*;
use cortex_m::interrupt::{
self,
Mutex,
};
struct QueueInner<T> {
queue: VecDeque<T>,
wakers: VecDeque<Waker>,
senders: usize,
}
struct Queue<T> {
inner: Arc<Mutex<RefCell<QueueInner<T>>>>,
}
impl<T> Clone for Queue<T> {
fn clone(&self) -> Self {
Queue {
inner: self.inner.clone(),
}
}
}
impl<T> Queue<T> {
fn new() -> Self {
Queue {
inner: Arc::new(Mutex::new(RefCell::new(QueueInner {
queue: Default::default(),
wakers: Default::default(),
senders: 0,
}))),
}
}
fn enqueue(&self, item: T) {
interrupt::free(move |cs| {
let mut inner = self.inner.borrow(cs).borrow_mut();
inner.queue.push_back(item);
inner.wakers.drain(..).for_each(Waker::wake);
})
}
fn try_dequeue(&self) -> Option<T> {
interrupt::free(|cs| self.inner.borrow(cs).borrow_mut().queue.pop_front())
}
fn poll_dequeue(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let this = self.get_mut();
interrupt::free(|cs| {
let mut inner = this.inner.borrow(cs).borrow_mut();
if let Some(item) = inner.queue.pop_front() {
Poll::Ready(Some(item))
} else {
if inner.senders == 0 {
return Poll::Ready(None);
}
if !inner.wakers.iter().any(|w| cx.waker().will_wake(w)) {
inner.wakers.push_back(cx.waker().clone());
}
Poll::Pending
}
})
}
}
pub struct Send<T> {
queue: Queue<T>,
}
impl<T> Send<T> {
fn new(queue: Queue<T>) -> Self {
interrupt::free(|cs| {
let mut queue = queue.inner.borrow(cs).borrow_mut();
queue.senders += 1;
});
Self { queue }
}
pub fn send(&self, item: T) {
self.queue.enqueue(item);
}
}
impl<T> Clone for Send<T> {
fn clone(&self) -> Self {
Self::new(self.queue.clone())
}
}
impl<T> Drop for Send<T> {
fn drop(&mut self) {
interrupt::free(|cs| {
let mut queue = self.queue.inner.borrow(cs).borrow_mut();
queue.senders -= 1;
});
}
}
#[derive(Clone)]
pub struct Recv<T> {
queue: Queue<T>,
}
impl<T> Recv<T> {
fn new(queue: Queue<T>) -> Self {
Self { queue }
}
}
impl<T> Recv<T> {
pub fn try_recv(&self) -> Option<T> {
self.queue.try_dequeue()
}
}
impl<T> Stream for Recv<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Pin::new(&mut this.queue).poll_dequeue(cx)
}
}
impl<T> Sink<T> for Send<T> {
type Error = !;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), !> {
Send::send(self.get_mut(), item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
Poll::Ready(Ok(()))
}
}
pub fn channel<T>() -> (Send<T>, Recv<T>) {
let queue = Queue::new();
(Send::new(queue.clone()), Recv::new(queue))
}