cortex-m-async 0.1.0

Cortex-M helpers for async IO abstractions.
use core::{
    cell::RefCell,
    pin::Pin,
    task::{
        Context,
        Poll,
        Waker,
    },
};

use alloc::{
    collections::VecDeque,
    sync::Arc,
};

use futures::prelude::*;

use cortex_m::interrupt::{
    self,
    Mutex,
};

struct QueueInner<T> {
    queue: VecDeque<T>,
    wakers: VecDeque<Waker>,
    senders: usize,
}

struct Queue<T> {
    inner: Arc<Mutex<RefCell<QueueInner<T>>>>,
}

impl<T> Clone for Queue<T> {
    fn clone(&self) -> Self {
        Queue {
            inner: self.inner.clone(),
        }
    }
}

impl<T> Queue<T> {
    fn new() -> Self {
        Queue {
            inner: Arc::new(Mutex::new(RefCell::new(QueueInner {
                queue: Default::default(),
                wakers: Default::default(),
                senders: 0,
            }))),
        }
    }

    fn enqueue(&self, item: T) {
        interrupt::free(move |cs| {
            let mut inner = self.inner.borrow(cs).borrow_mut();
            inner.queue.push_back(item);
            inner.wakers.drain(..).for_each(Waker::wake);
        })
    }

    fn try_dequeue(&self) -> Option<T> {
        interrupt::free(|cs| self.inner.borrow(cs).borrow_mut().queue.pop_front())
    }

    fn poll_dequeue(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
        let this = self.get_mut();
        interrupt::free(|cs| {
            let mut inner = this.inner.borrow(cs).borrow_mut();
            if let Some(item) = inner.queue.pop_front() {
                Poll::Ready(Some(item))
            } else {
                if inner.senders == 0 {
                    return Poll::Ready(None);
                }
                if !inner.wakers.iter().any(|w| cx.waker().will_wake(w)) {
                    inner.wakers.push_back(cx.waker().clone());
                }
                Poll::Pending
            }
        })
    }
}

/// Unbounded channel sender
pub struct Send<T> {
    queue: Queue<T>,
}

impl<T> Send<T> {
    fn new(queue: Queue<T>) -> Self {
        interrupt::free(|cs| {
            let mut queue = queue.inner.borrow(cs).borrow_mut();
            queue.senders += 1;
        });
        Self { queue }
    }

    pub fn send(&self, item: T) {
        self.queue.enqueue(item);
    }
}

impl<T> Clone for Send<T> {
    fn clone(&self) -> Self {
        Self::new(self.queue.clone())
    }
}

impl<T> Drop for Send<T> {
    fn drop(&mut self) {
        interrupt::free(|cs| {
            let mut queue = self.queue.inner.borrow(cs).borrow_mut();
            queue.senders -= 1;
        });
    }
}

/// Unbounded channel receiver
#[derive(Clone)]
pub struct Recv<T> {
    queue: Queue<T>,
}

impl<T> Recv<T> {
    fn new(queue: Queue<T>) -> Self {
        Self { queue }
    }
}

impl<T> Recv<T> {
    pub fn try_recv(&self) -> Option<T> {
        self.queue.try_dequeue()
    }
}

impl<T> Stream for Recv<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        Pin::new(&mut this.queue).poll_dequeue(cx)
    }
}

impl<T> Sink<T> for Send<T> {
    type Error = !;

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), !> {
        Send::send(self.get_mut(), item);
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
        Poll::Ready(Ok(()))
    }
}

/// Create a new unbounded channel
///
/// This is a stupid-simple (and possibly not very performant) MPMC queue backed
/// by a VecDeque.
pub fn channel<T>() -> (Send<T>, Recv<T>) {
    let queue = Queue::new();
    (Send::new(queue.clone()), Recv::new(queue))
}