chuchi_core/body/
body_http.rs1use super::{BodyAsyncBytesStreamer, Constraints};
2
3use std::io;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use hyper::body::{Body as HyperBody, Frame, Incoming};
8
9use futures_core::Stream;
10
11use pin_project_lite::pin_project;
12
13use bytes::Bytes;
14
15pin_project! {
16 pub struct BodyHttp {
17 #[pin]
18 inner: BodyAsyncBytesStreamer
19 }
20}
21
22impl BodyHttp {
23 pub(super) fn new(inner: super::Inner, constraints: Constraints) -> Self {
24 Self {
25 inner: BodyAsyncBytesStreamer::new(inner, constraints),
26 }
27 }
28}
29
30impl HyperBody for BodyHttp {
31 type Data = Bytes;
32 type Error = io::Error;
33
34 fn poll_frame(
35 self: Pin<&mut Self>,
36 cx: &mut Context,
37 ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
38 let me = self.project();
39 match me.inner.poll_next(cx) {
40 Poll::Ready(Some(Ok(b))) => Poll::Ready(Some(Ok(Frame::data(b)))),
41 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
42 Poll::Ready(None) => Poll::Ready(None),
43 Poll::Pending => Poll::Pending,
44 }
45 }
46}
47
48pub(super) struct HyperBodyAsAsyncBytesStream {
49 inner: Incoming,
50}
51
52impl HyperBodyAsAsyncBytesStream {
53 pub fn new(inner: Incoming) -> Self {
54 Self { inner }
55 }
56}
57
58impl Stream for HyperBodyAsAsyncBytesStream {
59 type Item = io::Result<Bytes>;
60
61 fn poll_next(
62 self: Pin<&mut Self>,
63 cx: &mut Context,
64 ) -> Poll<Option<io::Result<Bytes>>> {
65 let me = self.get_mut();
66 loop {
68 let r = match Pin::new(&mut me.inner).poll_frame(cx) {
69 Poll::Ready(Some(Ok(frame))) => {
70 let Ok(data) = frame.into_data() else {
71 continue;
72 };
73
74 Poll::Ready(Some(Ok(data)))
75 }
76 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
77 io::Error::new(io::ErrorKind::Other, e),
78 ))),
79 Poll::Ready(None) => Poll::Ready(None),
80 Poll::Pending => Poll::Pending,
81 };
82
83 break r;
84 }
85 }
86}