http_pool/body/
channel_body.rs1use hyper::body::{Body, Buf, Frame};
2use std::convert::Infallible;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::mpsc::{Receiver, Sender, channel};
6
7enum Data<T> {
8 Data(T),
9 Frame(Frame<T>),
10}
11
12#[derive(Debug, Clone)]
13pub struct ChannelSender<T> {
14 tx: Sender<Data<T>>,
15}
16
17impl<T> ChannelSender<T> {
18 pub async fn send(&mut self, value: T) -> Result<(), T> {
19 self.tx
20 .send(Data::Data(value))
21 .await
22 .map_err(|d| match d.0 {
23 Data::Data(value) => value,
24 _ => unreachable!(),
25 })
26 }
27
28 pub async fn send_frame(&mut self, value: Frame<T>) -> Result<(), Frame<T>> {
29 self.tx
30 .send(Data::Frame(value))
31 .await
32 .map_err(|d| match d.0 {
33 Data::Frame(f) => f,
34 _ => unreachable!(),
35 })
36 }
37}
38
39#[derive(Debug)]
40pub struct ChannelBody<T> {
41 rx: Receiver<Data<T>>,
42}
43
44impl<T> ChannelBody<T> {
45 pub fn new(buffer: usize) -> (ChannelSender<T>, Self) {
46 let (tx, rx) = channel(buffer);
47 (ChannelSender { tx }, Self { rx })
48 }
49}
50
51impl<T: Buf> Body for ChannelBody<T> {
52 type Data = T;
53 type Error = Infallible;
54
55 fn poll_frame(
56 mut self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
59 match self.as_mut().rx.poll_recv(cx) {
60 Poll::Pending => Poll::Pending,
61 Poll::Ready(None) => Poll::Ready(None),
62 Poll::Ready(Some(v)) => match v {
63 Data::Frame(f) => Poll::Ready(Some(Ok(f))),
64 Data::Data(v) => Poll::Ready(Some(Ok(Frame::data(v)))),
65 },
66 }
67 }
68}