keebrs 0.3.0

Keyboard firmware building blocks
use core::{
    pin::Pin,
    task::{
        Context,
        Poll,
        Waker,
    },
};

use alloc::{
    collections::VecDeque,
    sync::Arc,
};

use futures::prelude::*;

use lock_api::{
    Mutex,
    RawMutex,
};

struct QueueInner<T> {
    queue: VecDeque<T>,
    wakers: VecDeque<Waker>,
    senders: usize,
}

struct Queue<R: RawMutex, T> {
    inner: Arc<Mutex<R, QueueInner<T>>>,
}

impl<R: RawMutex, T> Clone for Queue<R, T> {
    fn clone(&self) -> Self {
        Queue {
            inner: self.inner.clone(),
        }
    }
}

impl<R: RawMutex, T> Queue<R, T> {
    fn new() -> Self {
        Queue {
            inner: Arc::new(Mutex::new(QueueInner {
                queue: Default::default(),
                wakers: Default::default(),
                senders: 0,
            })),
        }
    }

    fn enqueue(&self, item: T) {
        let mut inner = self.inner.lock();
        inner.queue.push_back(item);
        inner.wakers.drain(..).for_each(Waker::wake);
    }

    fn poll_dequeue(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
        let this = self.get_mut();
        let mut inner = this.inner.lock();
        if let Some(item) = inner.queue.pop_front() {
            Poll::Ready(Some(item))
        } else {
            if inner.senders == 0 {
                return Poll::Ready(None);
            }
            if !inner.wakers.iter().any(|w| cx.waker().will_wake(w)) {
                inner.wakers.push_back(cx.waker().clone());
            }
            Poll::Pending
        }
    }
}

/// Unbounded channel sender
pub struct Sender<R: RawMutex, T> {
    queue: Queue<R, T>,
}

impl<R: RawMutex, T> Sender<R, T> {
    fn new(queue: Queue<R, T>) -> Self {
        let mut inner = queue.inner.lock();
        inner.senders += 1;
        drop(inner);
        Self { queue }
    }

    pub fn send(&self, item: T) {
        self.queue.enqueue(item);
    }
}

impl<R: RawMutex, T> Clone for Sender<R, T> {
    fn clone(&self) -> Self {
        Self::new(self.queue.clone())
    }
}

impl<R: RawMutex, T> Drop for Sender<R, T> {
    fn drop(&mut self) {
        let mut queue = self.queue.inner.lock();
        queue.senders -= 1;
    }
}

/// Unbounded channel receiver
pub struct Receiver<R: RawMutex, T> {
    queue: Queue<R, T>,
}

impl<R: RawMutex, T> Receiver<R, T> {
    fn new(queue: Queue<R, T>) -> Self {
        Self { queue }
    }
}

impl<R: RawMutex, T> Clone for Receiver<R, T> {
    fn clone(&self) -> Self {
        Self::new(self.queue.clone())
    }
}

impl<R: RawMutex, T> Stream for Receiver<R, T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        Pin::new(&mut this.queue).poll_dequeue(cx)
    }
}

impl<R: RawMutex, T> Sink<T> for Sender<R, T> {
    type Error = !;

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), !> {
        Sender::send(self.get_mut(), item);
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), !>> {
        Poll::Ready(Ok(()))
    }
}

/// Create a new unbounded channel
///
/// This is a stupid-simple (and possibly not very performant) MPMC queue backed
/// by a VecDeque.
pub fn channel<R: RawMutex, T>() -> (Sender<R, T>, Receiver<R, T>) {
    let queue = Queue::new();
    (Sender::new(queue.clone()), Receiver::new(queue))
}