chuchi_core/body/
body_http.rs

1use super::{BodyAsyncBytesStreamer, Constraints};
2
3use std::io;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use hyper::body::{Body as HyperBody, Frame, Incoming};
8
9use futures_core::Stream;
10
11use pin_project_lite::pin_project;
12
13use bytes::Bytes;
14
15pin_project! {
16	pub struct BodyHttp {
17		#[pin]
18		inner: BodyAsyncBytesStreamer
19	}
20}
21
22impl BodyHttp {
23	pub(super) fn new(inner: super::Inner, constraints: Constraints) -> Self {
24		Self {
25			inner: BodyAsyncBytesStreamer::new(inner, constraints),
26		}
27	}
28}
29
30impl HyperBody for BodyHttp {
31	type Data = Bytes;
32	type Error = io::Error;
33
34	fn poll_frame(
35		self: Pin<&mut Self>,
36		cx: &mut Context,
37	) -> Poll<Option<io::Result<Frame<Bytes>>>> {
38		let me = self.project();
39		match me.inner.poll_next(cx) {
40			Poll::Ready(Some(Ok(b))) => Poll::Ready(Some(Ok(Frame::data(b)))),
41			Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
42			Poll::Ready(None) => Poll::Ready(None),
43			Poll::Pending => Poll::Pending,
44		}
45	}
46}
47
48pub(super) struct HyperBodyAsAsyncBytesStream {
49	inner: Incoming,
50}
51
52impl HyperBodyAsAsyncBytesStream {
53	pub fn new(inner: Incoming) -> Self {
54		Self { inner }
55	}
56}
57
58impl Stream for HyperBodyAsAsyncBytesStream {
59	type Item = io::Result<Bytes>;
60
61	fn poll_next(
62		self: Pin<&mut Self>,
63		cx: &mut Context,
64	) -> Poll<Option<io::Result<Bytes>>> {
65		let me = self.get_mut();
66		// loop to retry to get data
67		loop {
68			let r = match Pin::new(&mut me.inner).poll_frame(cx) {
69				Poll::Ready(Some(Ok(frame))) => {
70					let Ok(data) = frame.into_data() else {
71						continue;
72					};
73
74					Poll::Ready(Some(Ok(data)))
75				}
76				Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
77					io::Error::new(io::ErrorKind::Other, e),
78				))),
79				Poll::Ready(None) => Poll::Ready(None),
80				Poll::Pending => Poll::Pending,
81			};
82
83			break r;
84		}
85	}
86}