Skip to main content

vane_core/
body.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use http_body::{Body as HttpBody, Frame, SizeHint};
6
7use crate::error::Error;
8
9pub type Request = http::Request<Body>;
10pub type Response = http::Response<Body>;
11
12pub enum Body {
13	Static(Bytes),
14	Empty,
15	Stream(Pin<Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + 'static>>),
16}
17
18impl Body {
19	#[must_use]
20	pub fn as_static(&self) -> Option<&Bytes> {
21		if let Self::Static(b) = self { Some(b) } else { None }
22	}
23
24	pub fn from_producer<B, E>(producer: B) -> Self
25	where
26		B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
27		E: Into<Error> + Send + Sync + 'static,
28	{
29		Self::Stream(Box::pin(BodyStreamAdapter { inner: Box::pin(producer) }))
30	}
31}
32
33impl HttpBody for Body {
34	type Data = Bytes;
35	type Error = Error;
36
37	fn poll_frame(
38		self: Pin<&mut Self>,
39		cx: &mut Context<'_>,
40	) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
41		match self.get_mut() {
42			Self::Static(b) => {
43				if b.is_empty() {
44					Poll::Ready(None)
45				} else {
46					let out = std::mem::take(b);
47					Poll::Ready(Some(Ok(Frame::data(out))))
48				}
49			}
50			Self::Empty => Poll::Ready(None),
51			Self::Stream(s) => s.as_mut().poll_frame(cx),
52		}
53	}
54
55	fn is_end_stream(&self) -> bool {
56		match self {
57			Self::Static(b) => b.is_empty(),
58			Self::Empty => true,
59			Self::Stream(s) => s.is_end_stream(),
60		}
61	}
62
63	fn size_hint(&self) -> SizeHint {
64		match self {
65			Self::Static(b) => SizeHint::with_exact(b.len() as u64),
66			Self::Empty => SizeHint::with_exact(0),
67			Self::Stream(s) => s.size_hint(),
68		}
69	}
70}
71
72// `inner` is `Pin<Box<B>>` rather than `B` so we can poll without unsafe pin
73// projection; the extra heap indirection is the price of `unsafe_code = deny`.
74pub struct BodyStreamAdapter<B> {
75	inner: Pin<Box<B>>,
76}
77
78impl<B, E> HttpBody for BodyStreamAdapter<B>
79where
80	B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
81	E: Into<Error> + Send + Sync + 'static,
82{
83	type Data = Bytes;
84	type Error = Error;
85
86	fn poll_frame(
87		self: Pin<&mut Self>,
88		cx: &mut Context<'_>,
89	) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
90		match self.get_mut().inner.as_mut().poll_frame(cx) {
91			Poll::Pending => Poll::Pending,
92			Poll::Ready(None) => Poll::Ready(None),
93			Poll::Ready(Some(Ok(f))) => Poll::Ready(Some(Ok(f))),
94			Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
95		}
96	}
97
98	fn is_end_stream(&self) -> bool {
99		self.inner.is_end_stream()
100	}
101
102	fn size_hint(&self) -> SizeHint {
103		self.inner.size_hint()
104	}
105}
106
107#[cfg(test)]
108mod tests {
109	use std::collections::VecDeque;
110	use std::task::Waker;
111
112	use super::*;
113	use crate::error::{Error, ErrorKind};
114
115	/// A hand-rolled `http_body::Body` fixture driven from a scripted frame queue.
116	///
117	/// Each `Step` resolves synchronously under one `poll_frame` call; the
118	/// producer is constructed with a finite script and panics if polled past
119	/// end-of-stream. `E` is the producer's declared error type so tests can
120	/// exercise the `E: Into<Error>` conversion path in `BodyStreamAdapter`.
121	enum Step<E> {
122		Data(Bytes),
123		Err(E),
124		End,
125	}
126
127	type PollFrame<D, E> = Poll<Option<Result<Frame<D>, E>>>;
128
129	struct ScriptedBody<E> {
130		steps: VecDeque<Step<E>>,
131		size_hint: SizeHint,
132		is_end_stream: bool,
133	}
134
135	impl<E> ScriptedBody<E> {
136		fn new(steps: Vec<Step<E>>) -> Self {
137			Self { steps: steps.into(), size_hint: SizeHint::new(), is_end_stream: false }
138		}
139
140		fn with_size_hint(mut self, hint: SizeHint) -> Self {
141			self.size_hint = hint;
142			self
143		}
144
145		fn with_end_stream(mut self, end: bool) -> Self {
146			self.is_end_stream = end;
147			self
148		}
149	}
150
151	impl<E> HttpBody for ScriptedBody<E>
152	where
153		E: Unpin,
154	{
155		type Data = Bytes;
156		type Error = E;
157
158		fn poll_frame(
159			self: Pin<&mut Self>,
160			_cx: &mut Context<'_>,
161		) -> PollFrame<Self::Data, Self::Error> {
162			let this = self.get_mut();
163			match this.steps.pop_front() {
164				Some(Step::Data(b)) => Poll::Ready(Some(Ok(Frame::data(b)))),
165				Some(Step::Err(e)) => Poll::Ready(Some(Err(e))),
166				Some(Step::End) | None => Poll::Ready(None),
167			}
168		}
169
170		fn is_end_stream(&self) -> bool {
171			self.is_end_stream
172		}
173
174		fn size_hint(&self) -> SizeHint {
175			self.size_hint.clone()
176		}
177	}
178
179	fn poll_once<B: HttpBody + Unpin>(body: &mut B) -> PollFrame<B::Data, B::Error> {
180		let waker = Waker::noop();
181		let mut cx = Context::from_waker(waker);
182		Pin::new(body).poll_frame(&mut cx)
183	}
184
185	#[test]
186	fn as_static_returns_inner_bytes_for_static_variant() {
187		let payload = Bytes::from_static(b"hello");
188		let body = Body::Static(payload.clone());
189		let got = body.as_static().expect("static variant must yield Some");
190		assert_eq!(got, &payload);
191		assert_eq!(got.as_ref(), b"hello");
192	}
193
194	#[test]
195	fn as_static_returns_none_for_empty_variant() {
196		let body = Body::Empty;
197		assert!(body.as_static().is_none());
198	}
199
200	#[test]
201	fn as_static_returns_none_for_stream_variant() {
202		let producer: ScriptedBody<Error> = ScriptedBody::new(vec![Step::End]);
203		let body = Body::from_producer(producer);
204		assert!(body.as_static().is_none());
205	}
206
207	#[test]
208	fn empty_body_is_end_stream_and_zero_size_hint() {
209		let body = Body::Empty;
210		assert!(body.is_end_stream());
211		let hint = body.size_hint();
212		assert_eq!(hint.exact(), Some(0));
213	}
214
215	#[test]
216	fn static_body_reports_exact_size_and_not_end_of_stream() {
217		let body = Body::Static(Bytes::from_static(b"hi"));
218		assert!(!body.is_end_stream());
219		assert_eq!(body.size_hint().exact(), Some(2));
220	}
221
222	#[test]
223	fn static_body_yields_payload_then_end_of_stream() {
224		let mut body = Body::Static(Bytes::from_static(b"hi"));
225		match poll_once(&mut body) {
226			Poll::Ready(Some(Ok(frame))) => {
227				let data = frame.into_data().expect("first frame must be data");
228				assert_eq!(data.as_ref(), b"hi");
229			}
230			other => panic!("expected ready-data frame, got {other:?}"),
231		}
232		match poll_once(&mut body) {
233			Poll::Ready(None) => {}
234			other => panic!("expected end-of-stream after one data frame, got {other:?}"),
235		}
236	}
237
238	#[test]
239	fn empty_body_yields_no_frames() {
240		let mut body = Body::Empty;
241		match poll_once(&mut body) {
242			Poll::Ready(None) => {}
243			other => panic!("Body::Empty must immediately report end-of-stream, got {other:?}"),
244		}
245	}
246
247	#[test]
248	fn stream_body_delegates_is_end_stream_and_size_hint_to_inner() {
249		let hint = SizeHint::with_exact(42);
250		let producer: ScriptedBody<Error> =
251			ScriptedBody::new(vec![Step::End]).with_size_hint(hint).with_end_stream(true);
252		let body = Body::from_producer(producer);
253		assert!(body.is_end_stream(), "Stream variant must forward inner is_end_stream");
254		assert_eq!(body.size_hint().exact(), Some(42));
255	}
256
257	#[test]
258	fn from_producer_passes_data_frames_through_unchanged() {
259		let producer: ScriptedBody<Error> = ScriptedBody::new(vec![
260			Step::Data(Bytes::from_static(b"one")),
261			Step::Data(Bytes::from_static(b"two")),
262			Step::End,
263		]);
264		let mut body = Body::from_producer(producer);
265
266		let Poll::Ready(Some(Ok(f1))) = poll_once(&mut body) else {
267			panic!("first poll must yield a data frame");
268		};
269		assert_eq!(f1.into_data().expect("data frame").as_ref(), b"one");
270
271		let Poll::Ready(Some(Ok(f2))) = poll_once(&mut body) else {
272			panic!("second poll must yield a data frame");
273		};
274		assert_eq!(f2.into_data().expect("data frame").as_ref(), b"two");
275
276		match poll_once(&mut body) {
277			Poll::Ready(None) => {}
278			other => panic!("exhausted stream must be Ready(None), got {other:?}"),
279		}
280	}
281
282	#[test]
283	fn from_producer_converts_inner_error_via_into() {
284		let io_err = std::io::Error::other("scripted-io-failure");
285		let producer: ScriptedBody<std::io::Error> = ScriptedBody::new(vec![Step::Err(io_err)]);
286		let mut body = Body::from_producer(producer);
287		match poll_once(&mut body) {
288			Poll::Ready(Some(Err(e))) => {
289				assert!(matches!(e.kind(), ErrorKind::Io), "io::Error must map to ErrorKind::Io");
290			}
291			other => panic!("expected Ready(Some(Err(..))) from failing producer, got {other:?}"),
292		}
293	}
294
295	#[test]
296	fn from_producer_preserves_end_of_stream_signal() {
297		let producer: ScriptedBody<Error> = ScriptedBody::new(vec![]);
298		let mut body = Body::from_producer(producer);
299		match poll_once(&mut body) {
300			Poll::Ready(None) => {}
301			other => panic!("empty scripted producer must report end-of-stream, got {other:?}"),
302		}
303	}
304
305	#[test]
306	fn from_producer_accepts_serde_json_error_source() {
307		let parse_err: serde_json::Error =
308			serde_json::from_str::<serde_json::Value>("{not json").unwrap_err();
309		let producer: ScriptedBody<serde_json::Error> = ScriptedBody::new(vec![Step::Err(parse_err)]);
310		let mut body = Body::from_producer(producer);
311		match poll_once(&mut body) {
312			Poll::Ready(Some(Err(e))) => {
313				assert!(
314					matches!(e.kind(), ErrorKind::Compile),
315					"serde_json::Error must map to ErrorKind::Compile",
316				);
317			}
318			other => panic!("expected converted Compile error, got {other:?}"),
319		}
320	}
321}