1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use http_body::{Body as HttpBody, Frame, SizeHint};
6use pin_project_lite::pin_project;
7
8use crate::error::Error;
9
10pub type Request = http::Request<Body>;
11pub type Response = http::Response<Body>;
12
13pub enum Body {
14 Static(Bytes),
15 Empty,
16 Stream(Pin<Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + 'static>>),
17}
18
19impl Body {
20 #[must_use]
21 pub fn as_static(&self) -> Option<&Bytes> {
22 if let Self::Static(b) = self { Some(b) } else { None }
23 }
24
25 pub fn from_producer<B, E>(producer: B) -> Self
26 where
27 B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
28 E: Into<Error> + Send + Sync + 'static,
29 {
30 Self::Stream(Box::pin(BodyStreamAdapter { inner: producer }))
36 }
37}
38
39impl HttpBody for Body {
40 type Data = Bytes;
41 type Error = Error;
42
43 fn poll_frame(
44 self: Pin<&mut Self>,
45 cx: &mut Context<'_>,
46 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
47 match self.get_mut() {
48 Self::Static(b) => {
49 if b.is_empty() {
50 Poll::Ready(None)
51 } else {
52 let out = std::mem::take(b);
53 Poll::Ready(Some(Ok(Frame::data(out))))
54 }
55 }
56 Self::Empty => Poll::Ready(None),
57 Self::Stream(s) => s.as_mut().poll_frame(cx),
58 }
59 }
60
61 fn is_end_stream(&self) -> bool {
62 match self {
63 Self::Static(b) => b.is_empty(),
64 Self::Empty => true,
65 Self::Stream(s) => s.is_end_stream(),
66 }
67 }
68
69 fn size_hint(&self) -> SizeHint {
70 match self {
71 Self::Static(b) => SizeHint::with_exact(b.len() as u64),
72 Self::Empty => SizeHint::with_exact(0),
73 Self::Stream(s) => s.size_hint(),
74 }
75 }
76}
77
78pin_project! {
79 pub(crate) struct BodyStreamAdapter<B> {
90 #[pin]
91 inner: B,
92 }
93}
94
95impl<B, E> HttpBody for BodyStreamAdapter<B>
96where
97 B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
98 E: Into<Error> + Send + Sync + 'static,
99{
100 type Data = Bytes;
101 type Error = Error;
102
103 fn poll_frame(
104 self: Pin<&mut Self>,
105 cx: &mut Context<'_>,
106 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
107 match self.project().inner.poll_frame(cx) {
108 Poll::Pending => Poll::Pending,
109 Poll::Ready(None) => Poll::Ready(None),
110 Poll::Ready(Some(Ok(f))) => Poll::Ready(Some(Ok(f))),
111 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
112 }
113 }
114
115 fn is_end_stream(&self) -> bool {
116 self.inner.is_end_stream()
117 }
118
119 fn size_hint(&self) -> SizeHint {
120 self.inner.size_hint()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use std::collections::VecDeque;
127 use std::task::Waker;
128
129 use super::*;
130 use crate::error::{Error, ErrorKind};
131
132 enum Step<E> {
139 Data(Bytes),
140 Err(E),
141 End,
142 }
143
144 type PollFrame<D, E> = Poll<Option<Result<Frame<D>, E>>>;
145
146 struct ScriptedBody<E> {
147 steps: VecDeque<Step<E>>,
148 size_hint: SizeHint,
149 is_end_stream: bool,
150 }
151
152 impl<E> ScriptedBody<E> {
153 fn new(steps: Vec<Step<E>>) -> Self {
154 Self { steps: steps.into(), size_hint: SizeHint::new(), is_end_stream: false }
155 }
156
157 fn with_size_hint(mut self, hint: SizeHint) -> Self {
158 self.size_hint = hint;
159 self
160 }
161
162 fn with_end_stream(mut self, end: bool) -> Self {
163 self.is_end_stream = end;
164 self
165 }
166 }
167
168 impl<E> HttpBody for ScriptedBody<E>
169 where
170 E: Unpin,
171 {
172 type Data = Bytes;
173 type Error = E;
174
175 fn poll_frame(
176 self: Pin<&mut Self>,
177 _cx: &mut Context<'_>,
178 ) -> PollFrame<Self::Data, Self::Error> {
179 let this = self.get_mut();
180 match this.steps.pop_front() {
181 Some(Step::Data(b)) => Poll::Ready(Some(Ok(Frame::data(b)))),
182 Some(Step::Err(e)) => Poll::Ready(Some(Err(e))),
183 Some(Step::End) | None => Poll::Ready(None),
184 }
185 }
186
187 fn is_end_stream(&self) -> bool {
188 self.is_end_stream
189 }
190
191 fn size_hint(&self) -> SizeHint {
192 self.size_hint.clone()
193 }
194 }
195
196 fn poll_once<B: HttpBody + Unpin>(body: &mut B) -> PollFrame<B::Data, B::Error> {
197 let waker = Waker::noop();
198 let mut cx = Context::from_waker(waker);
199 Pin::new(body).poll_frame(&mut cx)
200 }
201
202 #[test]
203 fn as_static_returns_inner_bytes_for_static_variant() {
204 let payload = Bytes::from_static(b"hello");
205 let body = Body::Static(payload.clone());
206 let got = body.as_static().expect("static variant must yield Some");
207 assert_eq!(got, &payload);
208 assert_eq!(got.as_ref(), b"hello");
209 }
210
211 #[test]
212 fn as_static_returns_none_for_empty_variant() {
213 let body = Body::Empty;
214 assert!(body.as_static().is_none());
215 }
216
217 #[test]
218 fn as_static_returns_none_for_stream_variant() {
219 let producer: ScriptedBody<Error> = ScriptedBody::new(vec![Step::End]);
220 let body = Body::from_producer(producer);
221 assert!(body.as_static().is_none());
222 }
223
224 #[test]
225 fn empty_body_is_end_stream_and_zero_size_hint() {
226 let body = Body::Empty;
227 assert!(body.is_end_stream());
228 let hint = body.size_hint();
229 assert_eq!(hint.exact(), Some(0));
230 }
231
232 #[test]
233 fn static_body_reports_exact_size_and_not_end_of_stream() {
234 let body = Body::Static(Bytes::from_static(b"hi"));
235 assert!(!body.is_end_stream());
236 assert_eq!(body.size_hint().exact(), Some(2));
237 }
238
239 #[test]
240 fn static_body_yields_payload_then_end_of_stream() {
241 let mut body = Body::Static(Bytes::from_static(b"hi"));
242 match poll_once(&mut body) {
243 Poll::Ready(Some(Ok(frame))) => {
244 let data = frame.into_data().expect("first frame must be data");
245 assert_eq!(data.as_ref(), b"hi");
246 }
247 other => panic!("expected ready-data frame, got {other:?}"),
248 }
249 match poll_once(&mut body) {
250 Poll::Ready(None) => {}
251 other => panic!("expected end-of-stream after one data frame, got {other:?}"),
252 }
253 }
254
255 #[test]
256 fn empty_body_yields_no_frames() {
257 let mut body = Body::Empty;
258 match poll_once(&mut body) {
259 Poll::Ready(None) => {}
260 other => panic!("Body::Empty must immediately report end-of-stream, got {other:?}"),
261 }
262 }
263
264 #[test]
265 fn stream_body_delegates_is_end_stream_and_size_hint_to_inner() {
266 let hint = SizeHint::with_exact(42);
267 let producer: ScriptedBody<Error> =
268 ScriptedBody::new(vec![Step::End]).with_size_hint(hint).with_end_stream(true);
269 let body = Body::from_producer(producer);
270 assert!(body.is_end_stream(), "Stream variant must forward inner is_end_stream");
271 assert_eq!(body.size_hint().exact(), Some(42));
272 }
273
274 #[test]
275 fn from_producer_passes_data_frames_through_unchanged() {
276 let producer: ScriptedBody<Error> = ScriptedBody::new(vec![
277 Step::Data(Bytes::from_static(b"one")),
278 Step::Data(Bytes::from_static(b"two")),
279 Step::End,
280 ]);
281 let mut body = Body::from_producer(producer);
282
283 let Poll::Ready(Some(Ok(f1))) = poll_once(&mut body) else {
284 panic!("first poll must yield a data frame");
285 };
286 assert_eq!(f1.into_data().expect("data frame").as_ref(), b"one");
287
288 let Poll::Ready(Some(Ok(f2))) = poll_once(&mut body) else {
289 panic!("second poll must yield a data frame");
290 };
291 assert_eq!(f2.into_data().expect("data frame").as_ref(), b"two");
292
293 match poll_once(&mut body) {
294 Poll::Ready(None) => {}
295 other => panic!("exhausted stream must be Ready(None), got {other:?}"),
296 }
297 }
298
299 #[test]
300 fn from_producer_converts_inner_error_via_into() {
301 let io_err = std::io::Error::other("scripted-io-failure");
302 let producer: ScriptedBody<std::io::Error> = ScriptedBody::new(vec![Step::Err(io_err)]);
303 let mut body = Body::from_producer(producer);
304 match poll_once(&mut body) {
305 Poll::Ready(Some(Err(e))) => {
306 assert!(matches!(e.kind(), ErrorKind::Io), "io::Error must map to ErrorKind::Io");
307 }
308 other => panic!("expected Ready(Some(Err(..))) from failing producer, got {other:?}"),
309 }
310 }
311
312 #[test]
313 fn from_producer_preserves_end_of_stream_signal() {
314 let producer: ScriptedBody<Error> = ScriptedBody::new(vec![]);
315 let mut body = Body::from_producer(producer);
316 match poll_once(&mut body) {
317 Poll::Ready(None) => {}
318 other => panic!("empty scripted producer must report end-of-stream, got {other:?}"),
319 }
320 }
321
322 #[test]
323 fn from_producer_accepts_serde_json_error_source() {
324 let parse_err: serde_json::Error =
325 serde_json::from_str::<serde_json::Value>("{not json").unwrap_err();
326 let producer: ScriptedBody<serde_json::Error> = ScriptedBody::new(vec![Step::Err(parse_err)]);
327 let mut body = Body::from_producer(producer);
328 match poll_once(&mut body) {
329 Poll::Ready(Some(Err(e))) => {
330 assert!(
331 matches!(e.kind(), ErrorKind::Compile),
332 "serde_json::Error must map to ErrorKind::Compile",
333 );
334 }
335 other => panic!("expected converted Compile error, got {other:?}"),
336 }
337 }
338}