use hyper::body::{Body, Buf, Frame};
use std::convert::Infallible;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{Receiver, Sender, channel};
enum Data<T> {
Data(T),
Frame(Frame<T>),
}
#[derive(Debug, Clone)]
pub struct ChannelSender<T> {
tx: Sender<Data<T>>,
}
impl<T> ChannelSender<T> {
pub async fn send(&mut self, value: T) -> Result<(), T> {
self.tx
.send(Data::Data(value))
.await
.map_err(|d| match d.0 {
Data::Data(value) => value,
_ => unreachable!(),
})
}
pub async fn send_frame(&mut self, value: Frame<T>) -> Result<(), Frame<T>> {
self.tx
.send(Data::Frame(value))
.await
.map_err(|d| match d.0 {
Data::Frame(f) => f,
_ => unreachable!(),
})
}
}
#[derive(Debug)]
pub struct ChannelBody<T> {
rx: Receiver<Data<T>>,
}
impl<T> ChannelBody<T> {
pub fn new(buffer: usize) -> (ChannelSender<T>, Self) {
let (tx, rx) = channel(buffer);
(ChannelSender { tx }, Self { rx })
}
}
impl<T: Buf> Body for ChannelBody<T> {
type Data = T;
type Error = Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.as_mut().rx.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(v)) => match v {
Data::Frame(f) => Poll::Ready(Some(Ok(f))),
Data::Data(v) => Poll::Ready(Some(Ok(Frame::data(v)))),
},
}
}
}