Documentation
use hyper::body::{Body, Buf, Frame};
use std::convert::Infallible;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{Receiver, Sender, channel};

#[derive(Debug, Clone)]
pub struct ChannelSender<T> {
    tx: Sender<T>,
}

impl<T> ChannelSender<T> {
    pub async fn send(&mut self, value: T) -> Result<(), T> {
        self.tx.send(value).await.map_err(|d| d.0)
    }
}

#[derive(Debug)]
pub struct ChannelBody<T> {
    rx: Receiver<T>,
}

impl<T> ChannelBody<T> {
    pub fn new(buffer: usize) -> (ChannelSender<T>, Self) {
        let (tx, rx) = channel(buffer);
        (ChannelSender { tx }, Self { rx })
    }
}

impl<T: Buf> Body for ChannelBody<T> {
    type Data = T;
    type Error = Infallible;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        match self.as_mut().rx.poll_recv(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(Frame::data(v)))),
        }
    }
}