http_body_io/
body_reader.rs

1use super::BodyIoError;
2
3#[allow(unused_imports)]
4use crate::BodyWriter;
5
6use core::{
7    future::Future,
8    pin::{pin, Pin},
9    task::{Context, Poll},
10};
11
12use bytes::Bytes;
13use tokio::sync::mpsc::Receiver;
14
15/// A reader for the body of an HTTP request or response.
16///
17/// This reader implements the [`http_body::Body`] trait and is used for
18/// web servers to access the data being sent by the [`BodyWriter`].
19pub struct BodyReader {
20    pub(crate) receiver: Receiver<Bytes>,
21}
22
23impl std::fmt::Debug for BodyReader {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("BodyReader").finish()
26    }
27}
28
29impl http_body::Body for BodyReader {
30    type Data = Bytes;
31    type Error = BodyIoError;
32    fn poll_frame(
33        self: Pin<&mut Self>,
34        cx: &mut Context<'_>,
35    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
36        let this = self.get_mut();
37        match this.receiver.poll_recv(cx) {
38            Poll::Ready(Some(bytes)) => {
39                let frame = http_body::Frame::data(bytes);
40                Poll::Ready(Some(Ok(frame)))
41            }
42            Poll::Ready(None) => Poll::Ready(None),
43            Poll::Pending => {
44                cx.waker().wake_by_ref();
45                Poll::Pending
46            }
47        }
48    }
49}
50
51impl tokio::io::AsyncRead for BodyReader {
52    fn poll_read(
53        mut self: Pin<&mut Self>,
54        cx: &mut Context<'_>,
55        buf: &mut tokio::io::ReadBuf<'_>,
56    ) -> Poll<std::io::Result<()>> {
57        let mut this = pin!(self.receiver.recv());
58        match this.as_mut().poll(cx) {
59            Poll::Pending => {
60                cx.waker().wake_by_ref();
61                Poll::Pending
62            }
63            Poll::Ready(Some(bytes)) => {
64                buf.put_slice(&bytes);
65                Poll::Ready(Ok(()))
66            }
67            Poll::Ready(None) => Poll::Ready(Ok(())),
68        }
69    }
70}