http_pool/body/
channel_body.rs1use hyper::body::{Body, Buf, Frame};
2use std::convert::Infallible;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::mpsc::{Receiver, Sender, channel};
6
7#[derive(Debug, Clone)]
8pub struct ChannelSender<T> {
9 tx: Sender<T>,
10}
11
12impl<T> ChannelSender<T> {
13 pub async fn send(&mut self, value: T) -> Result<(), T> {
14 self.tx.send(value).await.map_err(|d| d.0)
15 }
16}
17
18#[derive(Debug)]
19pub struct ChannelBody<T> {
20 rx: Receiver<T>,
21}
22
23impl<T> ChannelBody<T> {
24 pub fn new(buffer: usize) -> (ChannelSender<T>, Self) {
25 let (tx, rx) = channel(buffer);
26 (ChannelSender { tx }, Self { rx })
27 }
28}
29
30impl<T: Buf> Body for ChannelBody<T> {
31 type Data = T;
32 type Error = Infallible;
33
34 fn poll_frame(
35 mut self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
38 match self.as_mut().rx.poll_recv(cx) {
39 Poll::Pending => Poll::Pending,
40 Poll::Ready(None) => Poll::Ready(None),
41 Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(Frame::data(v)))),
42 }
43 }
44}