aldrin-core 0.13.0

Shared core components of Aldrin, a message bus for service-oriented RPC and interprocess communication.
Documentation
use super::AsyncTransport;
use crate::message::Message;
use pin_project_lite::pin_project;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
    #[derive(Debug)]
    pub struct Buffered<T> {
        #[pin]
        inner: T,

        buffer: VecDeque<Message>,
    }
}

impl<T> Buffered<T> {
    pub fn new(inner: T) -> Self {
        Self {
            inner,
            buffer: VecDeque::new(),
        }
    }
}

impl<T: AsyncTransport> AsyncTransport for Buffered<T> {
    type Error = T::Error;

    fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>> {
        self.project().inner.receive_poll(cx)
    }

    fn send_poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
        self.project().buffer.push_back(msg);
        Ok(())
    }

    fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        let mut this = self.project();

        while !this.buffer.is_empty() {
            match this.inner.as_mut().send_poll_ready(cx) {
                Poll::Ready(Ok(())) => {
                    let msg = this.buffer.pop_front().unwrap();
                    this.inner.as_mut().send_start(msg)?;
                }

                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
                Poll::Pending => return Poll::Pending,
            }
        }

        this.inner.as_mut().send_poll_flush(cx)
    }
}