http_pool/body/
channel_body.rs

1use hyper::body::{Body, Buf, Frame};
2use std::convert::Infallible;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::mpsc::{Receiver, Sender, channel};
6
7enum Data<T> {
8    Data(T),
9    Frame(Frame<T>),
10}
11
12#[derive(Debug, Clone)]
13pub struct ChannelSender<T> {
14    tx: Sender<Data<T>>,
15}
16
17impl<T> ChannelSender<T> {
18    pub async fn send(&mut self, value: T) -> Result<(), T> {
19        self.tx
20            .send(Data::Data(value))
21            .await
22            .map_err(|d| match d.0 {
23                Data::Data(value) => value,
24                _ => unreachable!(),
25            })
26    }
27
28    pub async fn send_frame(&mut self, value: Frame<T>) -> Result<(), Frame<T>> {
29        self.tx
30            .send(Data::Frame(value))
31            .await
32            .map_err(|d| match d.0 {
33                Data::Frame(f) => f,
34                _ => unreachable!(),
35            })
36    }
37}
38
39#[derive(Debug)]
40pub struct ChannelBody<T> {
41    rx: Receiver<Data<T>>,
42}
43
44impl<T> ChannelBody<T> {
45    pub fn new(buffer: usize) -> (ChannelSender<T>, Self) {
46        let (tx, rx) = channel(buffer);
47        (ChannelSender { tx }, Self { rx })
48    }
49}
50
51impl<T: Buf> Body for ChannelBody<T> {
52    type Data = T;
53    type Error = Infallible;
54
55    fn poll_frame(
56        mut self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
59        match self.as_mut().rx.poll_recv(cx) {
60            Poll::Pending => Poll::Pending,
61            Poll::Ready(None) => Poll::Ready(None),
62            Poll::Ready(Some(v)) => match v {
63                Data::Frame(f) => Poll::Ready(Some(Ok(f))),
64                Data::Data(v) => Poll::Ready(Some(Ok(Frame::data(v)))),
65            },
66        }
67    }
68}