http_pool/body/
channel_body.rs

1use hyper::body::{Body, Buf, Frame};
2use std::convert::Infallible;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::mpsc::{Receiver, Sender, channel};
6
7#[derive(Debug, Clone)]
8pub struct ChannelSender<T> {
9    tx: Sender<T>,
10}
11
12impl<T> ChannelSender<T> {
13    pub async fn send(&mut self, value: T) -> Result<(), T> {
14        self.tx.send(value).await.map_err(|d| d.0)
15    }
16}
17
18#[derive(Debug)]
19pub struct ChannelBody<T> {
20    rx: Receiver<T>,
21}
22
23impl<T> ChannelBody<T> {
24    pub fn new(buffer: usize) -> (ChannelSender<T>, Self) {
25        let (tx, rx) = channel(buffer);
26        (ChannelSender { tx }, Self { rx })
27    }
28}
29
30impl<T: Buf> Body for ChannelBody<T> {
31    type Data = T;
32    type Error = Infallible;
33
34    fn poll_frame(
35        mut self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
38        match self.as_mut().rx.poll_recv(cx) {
39            Poll::Pending => Poll::Pending,
40            Poll::Ready(None) => Poll::Ready(None),
41            Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(Frame::data(v)))),
42        }
43    }
44}