http_body_io/
body_reader.rs1use super::BodyIoError;
2
3#[allow(unused_imports)]
4use crate::BodyWriter;
5
6use core::{
7 future::Future,
8 pin::{pin, Pin},
9 task::{Context, Poll},
10};
11
12use bytes::Bytes;
13use tokio::sync::mpsc::Receiver;
14
15pub struct BodyReader {
20 pub(crate) receiver: Receiver<Bytes>,
21}
22
23impl std::fmt::Debug for BodyReader {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("BodyReader").finish()
26 }
27}
28
29impl http_body::Body for BodyReader {
30 type Data = Bytes;
31 type Error = BodyIoError;
32 fn poll_frame(
33 self: Pin<&mut Self>,
34 cx: &mut Context<'_>,
35 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
36 let this = self.get_mut();
37 match this.receiver.poll_recv(cx) {
38 Poll::Ready(Some(bytes)) => {
39 let frame = http_body::Frame::data(bytes);
40 Poll::Ready(Some(Ok(frame)))
41 }
42 Poll::Ready(None) => Poll::Ready(None),
43 Poll::Pending => {
44 cx.waker().wake_by_ref();
45 Poll::Pending
46 }
47 }
48 }
49}
50
51impl tokio::io::AsyncRead for BodyReader {
52 fn poll_read(
53 mut self: Pin<&mut Self>,
54 cx: &mut Context<'_>,
55 buf: &mut tokio::io::ReadBuf<'_>,
56 ) -> Poll<std::io::Result<()>> {
57 let mut this = pin!(self.receiver.recv());
58 match this.as_mut().poll(cx) {
59 Poll::Pending => {
60 cx.waker().wake_by_ref();
61 Poll::Pending
62 }
63 Poll::Ready(Some(bytes)) => {
64 buf.put_slice(&bytes);
65 Poll::Ready(Ok(()))
66 }
67 Poll::Ready(None) => Poll::Ready(Ok(())),
68 }
69 }
70}