agner-utils 0.4.1

An actor toolkit inspired by Erlang/OTP (utils)
Documentation
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

use futures::lock::BiLock;

#[cfg(test)]
mod tests;

pub fn channel<T>(max_len: usize) -> (Sender<T>, Receiver<T>) {
    let inner =
        Inner { queue: Default::default(), max_len, sender_waker: None, receiver_waker: None };

    let (sender, receiver) = BiLock::new(inner);
    (Sender(sender), Receiver(receiver))
}

#[derive(Debug)]
pub struct Receiver<T>(BiLock<Inner<T>>);

#[derive(Debug)]
pub struct Sender<T>(BiLock<Inner<T>>);

impl<T> Receiver<T>
where
    T: Unpin,
{
    pub fn recv(&mut self, should_block: bool) -> impl Future<Output = Option<T>> + '_ {
        Receive { lock: &self.0, should_block }
    }

    pub async fn len(&self) -> (usize, usize) {
        let locked = self.0.lock().await;
        (locked.queue.len(), locked.max_len)
    }
}

impl<T> Sender<T>
where
    T: Unpin,
{
    pub fn send(
        &mut self,
        item: T,
        should_block: bool,
    ) -> impl Future<Output = Result<(), T>> + '_ {
        Send { lock: &self.0, should_block, item: Some(item) }
    }

    pub async fn len(&self) -> (usize, usize) {
        let locked = self.0.lock().await;
        (locked.queue.len(), locked.max_len)
    }
}

#[derive(Debug)]
struct Inner<T> {
    queue: VecDeque<T>,
    max_len: usize,
    sender_waker: Option<Waker>,
    receiver_waker: Option<Waker>,
}

#[pin_project::pin_project]
struct Receive<'a, T> {
    lock: &'a BiLock<Inner<T>>,
    should_block: bool,
}

#[pin_project::pin_project]
struct Send<'a, T> {
    lock: &'a BiLock<Inner<T>>,
    should_block: bool,
    item: Option<T>,
}

impl<'a, T> Future for Receive<'a, T>
where
    T: Unpin,
{
    type Output = Option<T>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        let mut locked = futures::ready!(this.lock.poll_lock(cx));
        let _ = locked.receiver_waker.take();

        match (locked.queue.pop_front(), this.should_block) {
            (Some(item), _) => {
                if let Some(waker) = locked.sender_waker.take() {
                    waker.wake();
                }
                Poll::Ready(Some(item))
            },
            (None, false) => Poll::Ready(None),
            (None, true) => {
                let should_be_none = locked.receiver_waker.replace(cx.waker().to_owned());
                assert!(should_be_none.is_none());
                Poll::Pending
            },
        }
    }
}

impl<'a, T> Future for Send<'a, T>
where
    T: Unpin,
{
    type Output = Result<(), T>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        let mut locked = futures::ready!(this.lock.poll_lock(cx));
        let _ = locked.sender_waker.take();

        match (locked.queue.len() < locked.max_len, this.should_block) {
            (true, _) => {
                let item = this.item.take().expect("Item empty");
                locked.queue.push_back(item);
                if let Some(waker) = locked.receiver_waker.take() {
                    waker.wake();
                }
                Poll::Ready(Ok(()))
            },
            (false, false) => {
                let item = this.item.take().expect("Item empty");
                Poll::Ready(Err(item))
            },
            (false, true) => {
                let should_be_none = locked.sender_waker.replace(cx.waker().to_owned());
                assert!(should_be_none.is_none());
                Poll::Pending
            },
        }
    }
}