1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use http_body::{Body as HttpBody, Frame, SizeHint};
6
7use crate::error::Error;
8
9pub type Request = http::Request<Body>;
10pub type Response = http::Response<Body>;
11
12pub enum Body {
13 Static(Bytes),
14 Empty,
15 Stream(Pin<Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + 'static>>),
16}
17
18impl Body {
19 #[must_use]
20 pub fn as_static(&self) -> Option<&Bytes> {
21 if let Self::Static(b) = self { Some(b) } else { None }
22 }
23
24 pub fn from_producer<B, E>(producer: B) -> Self
25 where
26 B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
27 E: Into<Error> + Send + Sync + 'static,
28 {
29 Self::Stream(Box::pin(BodyStreamAdapter { inner: Box::pin(producer) }))
30 }
31}
32
33impl HttpBody for Body {
34 type Data = Bytes;
35 type Error = Error;
36
37 fn poll_frame(
38 self: Pin<&mut Self>,
39 cx: &mut Context<'_>,
40 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
41 match self.get_mut() {
42 Self::Static(b) => {
43 if b.is_empty() {
44 Poll::Ready(None)
45 } else {
46 let out = std::mem::take(b);
47 Poll::Ready(Some(Ok(Frame::data(out))))
48 }
49 }
50 Self::Empty => Poll::Ready(None),
51 Self::Stream(s) => s.as_mut().poll_frame(cx),
52 }
53 }
54
55 fn is_end_stream(&self) -> bool {
56 match self {
57 Self::Static(b) => b.is_empty(),
58 Self::Empty => true,
59 Self::Stream(s) => s.is_end_stream(),
60 }
61 }
62
63 fn size_hint(&self) -> SizeHint {
64 match self {
65 Self::Static(b) => SizeHint::with_exact(b.len() as u64),
66 Self::Empty => SizeHint::with_exact(0),
67 Self::Stream(s) => s.size_hint(),
68 }
69 }
70}
71
72pub struct BodyStreamAdapter<B> {
75 inner: Pin<Box<B>>,
76}
77
78impl<B, E> HttpBody for BodyStreamAdapter<B>
79where
80 B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
81 E: Into<Error> + Send + Sync + 'static,
82{
83 type Data = Bytes;
84 type Error = Error;
85
86 fn poll_frame(
87 self: Pin<&mut Self>,
88 cx: &mut Context<'_>,
89 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
90 match self.get_mut().inner.as_mut().poll_frame(cx) {
91 Poll::Pending => Poll::Pending,
92 Poll::Ready(None) => Poll::Ready(None),
93 Poll::Ready(Some(Ok(f))) => Poll::Ready(Some(Ok(f))),
94 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
95 }
96 }
97
98 fn is_end_stream(&self) -> bool {
99 self.inner.is_end_stream()
100 }
101
102 fn size_hint(&self) -> SizeHint {
103 self.inner.size_hint()
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use std::collections::VecDeque;
110 use std::task::Waker;
111
112 use super::*;
113 use crate::error::{Error, ErrorKind};
114
115 enum Step<E> {
122 Data(Bytes),
123 Err(E),
124 End,
125 }
126
127 type PollFrame<D, E> = Poll<Option<Result<Frame<D>, E>>>;
128
129 struct ScriptedBody<E> {
130 steps: VecDeque<Step<E>>,
131 size_hint: SizeHint,
132 is_end_stream: bool,
133 }
134
135 impl<E> ScriptedBody<E> {
136 fn new(steps: Vec<Step<E>>) -> Self {
137 Self { steps: steps.into(), size_hint: SizeHint::new(), is_end_stream: false }
138 }
139
140 fn with_size_hint(mut self, hint: SizeHint) -> Self {
141 self.size_hint = hint;
142 self
143 }
144
145 fn with_end_stream(mut self, end: bool) -> Self {
146 self.is_end_stream = end;
147 self
148 }
149 }
150
151 impl<E> HttpBody for ScriptedBody<E>
152 where
153 E: Unpin,
154 {
155 type Data = Bytes;
156 type Error = E;
157
158 fn poll_frame(
159 self: Pin<&mut Self>,
160 _cx: &mut Context<'_>,
161 ) -> PollFrame<Self::Data, Self::Error> {
162 let this = self.get_mut();
163 match this.steps.pop_front() {
164 Some(Step::Data(b)) => Poll::Ready(Some(Ok(Frame::data(b)))),
165 Some(Step::Err(e)) => Poll::Ready(Some(Err(e))),
166 Some(Step::End) | None => Poll::Ready(None),
167 }
168 }
169
170 fn is_end_stream(&self) -> bool {
171 self.is_end_stream
172 }
173
174 fn size_hint(&self) -> SizeHint {
175 self.size_hint.clone()
176 }
177 }
178
179 fn poll_once<B: HttpBody + Unpin>(body: &mut B) -> PollFrame<B::Data, B::Error> {
180 let waker = Waker::noop();
181 let mut cx = Context::from_waker(waker);
182 Pin::new(body).poll_frame(&mut cx)
183 }
184
185 #[test]
186 fn as_static_returns_inner_bytes_for_static_variant() {
187 let payload = Bytes::from_static(b"hello");
188 let body = Body::Static(payload.clone());
189 let got = body.as_static().expect("static variant must yield Some");
190 assert_eq!(got, &payload);
191 assert_eq!(got.as_ref(), b"hello");
192 }
193
194 #[test]
195 fn as_static_returns_none_for_empty_variant() {
196 let body = Body::Empty;
197 assert!(body.as_static().is_none());
198 }
199
200 #[test]
201 fn as_static_returns_none_for_stream_variant() {
202 let producer: ScriptedBody<Error> = ScriptedBody::new(vec![Step::End]);
203 let body = Body::from_producer(producer);
204 assert!(body.as_static().is_none());
205 }
206
207 #[test]
208 fn empty_body_is_end_stream_and_zero_size_hint() {
209 let body = Body::Empty;
210 assert!(body.is_end_stream());
211 let hint = body.size_hint();
212 assert_eq!(hint.exact(), Some(0));
213 }
214
215 #[test]
216 fn static_body_reports_exact_size_and_not_end_of_stream() {
217 let body = Body::Static(Bytes::from_static(b"hi"));
218 assert!(!body.is_end_stream());
219 assert_eq!(body.size_hint().exact(), Some(2));
220 }
221
222 #[test]
223 fn static_body_yields_payload_then_end_of_stream() {
224 let mut body = Body::Static(Bytes::from_static(b"hi"));
225 match poll_once(&mut body) {
226 Poll::Ready(Some(Ok(frame))) => {
227 let data = frame.into_data().expect("first frame must be data");
228 assert_eq!(data.as_ref(), b"hi");
229 }
230 other => panic!("expected ready-data frame, got {other:?}"),
231 }
232 match poll_once(&mut body) {
233 Poll::Ready(None) => {}
234 other => panic!("expected end-of-stream after one data frame, got {other:?}"),
235 }
236 }
237
238 #[test]
239 fn empty_body_yields_no_frames() {
240 let mut body = Body::Empty;
241 match poll_once(&mut body) {
242 Poll::Ready(None) => {}
243 other => panic!("Body::Empty must immediately report end-of-stream, got {other:?}"),
244 }
245 }
246
247 #[test]
248 fn stream_body_delegates_is_end_stream_and_size_hint_to_inner() {
249 let hint = SizeHint::with_exact(42);
250 let producer: ScriptedBody<Error> =
251 ScriptedBody::new(vec![Step::End]).with_size_hint(hint).with_end_stream(true);
252 let body = Body::from_producer(producer);
253 assert!(body.is_end_stream(), "Stream variant must forward inner is_end_stream");
254 assert_eq!(body.size_hint().exact(), Some(42));
255 }
256
257 #[test]
258 fn from_producer_passes_data_frames_through_unchanged() {
259 let producer: ScriptedBody<Error> = ScriptedBody::new(vec![
260 Step::Data(Bytes::from_static(b"one")),
261 Step::Data(Bytes::from_static(b"two")),
262 Step::End,
263 ]);
264 let mut body = Body::from_producer(producer);
265
266 let Poll::Ready(Some(Ok(f1))) = poll_once(&mut body) else {
267 panic!("first poll must yield a data frame");
268 };
269 assert_eq!(f1.into_data().expect("data frame").as_ref(), b"one");
270
271 let Poll::Ready(Some(Ok(f2))) = poll_once(&mut body) else {
272 panic!("second poll must yield a data frame");
273 };
274 assert_eq!(f2.into_data().expect("data frame").as_ref(), b"two");
275
276 match poll_once(&mut body) {
277 Poll::Ready(None) => {}
278 other => panic!("exhausted stream must be Ready(None), got {other:?}"),
279 }
280 }
281
282 #[test]
283 fn from_producer_converts_inner_error_via_into() {
284 let io_err = std::io::Error::other("scripted-io-failure");
285 let producer: ScriptedBody<std::io::Error> = ScriptedBody::new(vec![Step::Err(io_err)]);
286 let mut body = Body::from_producer(producer);
287 match poll_once(&mut body) {
288 Poll::Ready(Some(Err(e))) => {
289 assert!(matches!(e.kind(), ErrorKind::Io), "io::Error must map to ErrorKind::Io");
290 }
291 other => panic!("expected Ready(Some(Err(..))) from failing producer, got {other:?}"),
292 }
293 }
294
295 #[test]
296 fn from_producer_preserves_end_of_stream_signal() {
297 let producer: ScriptedBody<Error> = ScriptedBody::new(vec![]);
298 let mut body = Body::from_producer(producer);
299 match poll_once(&mut body) {
300 Poll::Ready(None) => {}
301 other => panic!("empty scripted producer must report end-of-stream, got {other:?}"),
302 }
303 }
304
305 #[test]
306 fn from_producer_accepts_serde_json_error_source() {
307 let parse_err: serde_json::Error =
308 serde_json::from_str::<serde_json::Value>("{not json").unwrap_err();
309 let producer: ScriptedBody<serde_json::Error> = ScriptedBody::new(vec![Step::Err(parse_err)]);
310 let mut body = Body::from_producer(producer);
311 match poll_once(&mut body) {
312 Poll::Ready(Some(Err(e))) => {
313 assert!(
314 matches!(e.kind(), ErrorKind::Compile),
315 "serde_json::Error must map to ErrorKind::Compile",
316 );
317 }
318 other => panic!("expected converted Compile error, got {other:?}"),
319 }
320 }
321}