keebrs 0.3.0

Keyboard firmware building blocks
Documentation
use lock_api::RawMutex;

use log::error;

use futures::{
    future::join,
    prelude::*,
    stream::{
        StreamExt,
        TryStreamExt,
    },
};

use pin_project::pin_project;

use super::*;

use crate::channel::*;

/// A backgrounded net.
///
/// This is useful if you want to get net IO out of the hot path by placing it in its own task.
///
/// It can be constructed with the [background] function.
///
/// Note: The address of this net implementation is *static* from when it is
/// created. If the backgrounded net adjusts its address at runtime, it will not
/// be reflected in `Background`'s `Net::addr` method.
#[pin_project]
pub struct Background<R: RawMutex, T> {
    addr: i8,
    #[pin]
    send: Sender<R, Msg<T>>,
    #[pin]
    recv: Receiver<R, Msg<T>>,
}

impl<R: RawMutex, T> Clone for Background<R, T> {
    fn clone(&self) -> Self {
        Background {
            addr: self.addr,
            send: self.send.clone(),
            recv: self.recv.clone(),
        }
    }
}

/// Construct a [Background].
///
/// Returns the `Background` object and a `Future` suitable for spawning. The
/// `Background` net will do nothing unless its corresponding future is polled as
/// well.
pub fn background<R, T, N, E>(net: N) -> (impl Future<Output = ()>, Background<R, T>)
where
    R: RawMutex,
    N: Net<T> + Unpin,
    N: Stream<Item = Result<Msg<T>, E>>,
{
    let addr = net.addr();
    let (send_tx, mut send_rx) = channel::<R, _>();
    let (recv_tx, recv_rx) = channel::<R, _>();

    let (mut net_tx, mut net_rx) = StreamExt::split(net);
    let net_send_task = async move {
        while let Some(item) = send_rx.next().await {
            let _ = net_tx.send(item).await;
        }
    };
    let net_recv_task = async move {
        while let Some(item) = net_rx.try_next().await.transpose() {
            if let Ok(item) = item {
                recv_tx.send(item);
            }
        }
        error!("recv task exited");
    };
    let fut = async move {
        join(net_send_task, net_recv_task).await;
    };
    let backgrounded = Background {
        addr,
        send: send_tx,
        recv: recv_rx,
    };
    (fut, backgrounded)
}

impl<R, T> Sink<Msg<T>> for Background<R, T>
where
    R: RawMutex,
{
    type Error = !;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().send.poll_ready(cx)
    }
    fn start_send(self: Pin<&mut Self>, item: Msg<T>) -> Result<(), Self::Error> {
        self.project().send.start_send(item)
    }
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().send.poll_flush(cx)
    }
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().send.poll_close(cx)
    }
}

impl<R, T> Stream for Background<R, T>
where
    R: RawMutex,
{
    type Item = Result<Msg<T>, !>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let item = ready!(self.project().recv.poll_next(cx));
        Poll::Ready(item.map(Ok))
    }
}

impl<R: RawMutex, T> Net<T> for Background<R, T> {
    fn addr(&self) -> i8 {
        self.addr
    }
}