Skip to main content

vane_core/
body.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use http_body::{Body as HttpBody, Frame, SizeHint};
6use pin_project_lite::pin_project;
7
8use crate::error::Error;
9
10pub type Request = http::Request<Body>;
11pub type Response = http::Response<Body>;
12
13pub enum Body {
14	Static(Bytes),
15	Empty,
16	Stream(Pin<Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + 'static>>),
17}
18
19impl Body {
20	#[must_use]
21	pub fn as_static(&self) -> Option<&Bytes> {
22		if let Self::Static(b) = self { Some(b) } else { None }
23	}
24
25	pub fn from_producer<B, E>(producer: B) -> Self
26	where
27		B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
28		E: Into<Error> + Send + Sync + 'static,
29	{
30		// One heap allocation — for the outer `dyn HttpBody` —
31		// instead of two. The inner adapter pin-projects to its
32		// stored producer in place via `pin_project_lite`, so we
33		// drop the prior `inner: Pin<Box<B>>` layer that existed
34		// only to dodge an `unsafe` projection.
35		Self::Stream(Box::pin(BodyStreamAdapter { inner: producer }))
36	}
37}
38
39impl HttpBody for Body {
40	type Data = Bytes;
41	type Error = Error;
42
43	fn poll_frame(
44		self: Pin<&mut Self>,
45		cx: &mut Context<'_>,
46	) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
47		match self.get_mut() {
48			Self::Static(b) => {
49				if b.is_empty() {
50					Poll::Ready(None)
51				} else {
52					let out = std::mem::take(b);
53					Poll::Ready(Some(Ok(Frame::data(out))))
54				}
55			}
56			Self::Empty => Poll::Ready(None),
57			Self::Stream(s) => s.as_mut().poll_frame(cx),
58		}
59	}
60
61	fn is_end_stream(&self) -> bool {
62		match self {
63			Self::Static(b) => b.is_empty(),
64			Self::Empty => true,
65			Self::Stream(s) => s.is_end_stream(),
66		}
67	}
68
69	fn size_hint(&self) -> SizeHint {
70		match self {
71			Self::Static(b) => SizeHint::with_exact(b.len() as u64),
72			Self::Empty => SizeHint::with_exact(0),
73			Self::Stream(s) => s.size_hint(),
74		}
75	}
76}
77
78pin_project! {
79	/// Erases the producer's error type into [`Error`] via the
80	/// caller-supplied `Into` impl, leaving the rest of the
81	/// [`HttpBody`] surface untouched. `pin_project_lite` generates a
82	/// safe `project()` so the inner producer is stored by value and
83	/// projected without an `unsafe` block (the prior shape stored it
84	/// behind a `Pin<Box<B>>` purely to dodge `unsafe`).
85	///
86	/// Implementation detail of [`Body::from_producer`]; not part of
87	/// the public API surface. Construct via `Body::from_producer`
88	/// instead.
89	pub(crate) struct BodyStreamAdapter<B> {
90		#[pin]
91		inner: B,
92	}
93}
94
95impl<B, E> HttpBody for BodyStreamAdapter<B>
96where
97	B: HttpBody<Data = Bytes, Error = E> + Send + 'static,
98	E: Into<Error> + Send + Sync + 'static,
99{
100	type Data = Bytes;
101	type Error = Error;
102
103	fn poll_frame(
104		self: Pin<&mut Self>,
105		cx: &mut Context<'_>,
106	) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
107		match self.project().inner.poll_frame(cx) {
108			Poll::Pending => Poll::Pending,
109			Poll::Ready(None) => Poll::Ready(None),
110			Poll::Ready(Some(Ok(f))) => Poll::Ready(Some(Ok(f))),
111			Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
112		}
113	}
114
115	fn is_end_stream(&self) -> bool {
116		self.inner.is_end_stream()
117	}
118
119	fn size_hint(&self) -> SizeHint {
120		self.inner.size_hint()
121	}
122}
123
124#[cfg(test)]
125mod tests {
126	use std::collections::VecDeque;
127	use std::task::Waker;
128
129	use super::*;
130	use crate::error::{Error, ErrorKind};
131
132	/// A hand-rolled `http_body::Body` fixture driven from a scripted frame queue.
133	///
134	/// Each `Step` resolves synchronously under one `poll_frame` call; the
135	/// producer is constructed with a finite script and panics if polled past
136	/// end-of-stream. `E` is the producer's declared error type so tests can
137	/// exercise the `E: Into<Error>` conversion path in `BodyStreamAdapter`.
138	enum Step<E> {
139		Data(Bytes),
140		Err(E),
141		End,
142	}
143
144	type PollFrame<D, E> = Poll<Option<Result<Frame<D>, E>>>;
145
146	struct ScriptedBody<E> {
147		steps: VecDeque<Step<E>>,
148		size_hint: SizeHint,
149		is_end_stream: bool,
150	}
151
152	impl<E> ScriptedBody<E> {
153		fn new(steps: Vec<Step<E>>) -> Self {
154			Self { steps: steps.into(), size_hint: SizeHint::new(), is_end_stream: false }
155		}
156
157		fn with_size_hint(mut self, hint: SizeHint) -> Self {
158			self.size_hint = hint;
159			self
160		}
161
162		fn with_end_stream(mut self, end: bool) -> Self {
163			self.is_end_stream = end;
164			self
165		}
166	}
167
168	impl<E> HttpBody for ScriptedBody<E>
169	where
170		E: Unpin,
171	{
172		type Data = Bytes;
173		type Error = E;
174
175		fn poll_frame(
176			self: Pin<&mut Self>,
177			_cx: &mut Context<'_>,
178		) -> PollFrame<Self::Data, Self::Error> {
179			let this = self.get_mut();
180			match this.steps.pop_front() {
181				Some(Step::Data(b)) => Poll::Ready(Some(Ok(Frame::data(b)))),
182				Some(Step::Err(e)) => Poll::Ready(Some(Err(e))),
183				Some(Step::End) | None => Poll::Ready(None),
184			}
185		}
186
187		fn is_end_stream(&self) -> bool {
188			self.is_end_stream
189		}
190
191		fn size_hint(&self) -> SizeHint {
192			self.size_hint.clone()
193		}
194	}
195
196	fn poll_once<B: HttpBody + Unpin>(body: &mut B) -> PollFrame<B::Data, B::Error> {
197		let waker = Waker::noop();
198		let mut cx = Context::from_waker(waker);
199		Pin::new(body).poll_frame(&mut cx)
200	}
201
202	#[test]
203	fn as_static_returns_inner_bytes_for_static_variant() {
204		let payload = Bytes::from_static(b"hello");
205		let body = Body::Static(payload.clone());
206		let got = body.as_static().expect("static variant must yield Some");
207		assert_eq!(got, &payload);
208		assert_eq!(got.as_ref(), b"hello");
209	}
210
211	#[test]
212	fn as_static_returns_none_for_empty_variant() {
213		let body = Body::Empty;
214		assert!(body.as_static().is_none());
215	}
216
217	#[test]
218	fn as_static_returns_none_for_stream_variant() {
219		let producer: ScriptedBody<Error> = ScriptedBody::new(vec![Step::End]);
220		let body = Body::from_producer(producer);
221		assert!(body.as_static().is_none());
222	}
223
224	#[test]
225	fn empty_body_is_end_stream_and_zero_size_hint() {
226		let body = Body::Empty;
227		assert!(body.is_end_stream());
228		let hint = body.size_hint();
229		assert_eq!(hint.exact(), Some(0));
230	}
231
232	#[test]
233	fn static_body_reports_exact_size_and_not_end_of_stream() {
234		let body = Body::Static(Bytes::from_static(b"hi"));
235		assert!(!body.is_end_stream());
236		assert_eq!(body.size_hint().exact(), Some(2));
237	}
238
239	#[test]
240	fn static_body_yields_payload_then_end_of_stream() {
241		let mut body = Body::Static(Bytes::from_static(b"hi"));
242		match poll_once(&mut body) {
243			Poll::Ready(Some(Ok(frame))) => {
244				let data = frame.into_data().expect("first frame must be data");
245				assert_eq!(data.as_ref(), b"hi");
246			}
247			other => panic!("expected ready-data frame, got {other:?}"),
248		}
249		match poll_once(&mut body) {
250			Poll::Ready(None) => {}
251			other => panic!("expected end-of-stream after one data frame, got {other:?}"),
252		}
253	}
254
255	#[test]
256	fn empty_body_yields_no_frames() {
257		let mut body = Body::Empty;
258		match poll_once(&mut body) {
259			Poll::Ready(None) => {}
260			other => panic!("Body::Empty must immediately report end-of-stream, got {other:?}"),
261		}
262	}
263
264	#[test]
265	fn stream_body_delegates_is_end_stream_and_size_hint_to_inner() {
266		let hint = SizeHint::with_exact(42);
267		let producer: ScriptedBody<Error> =
268			ScriptedBody::new(vec![Step::End]).with_size_hint(hint).with_end_stream(true);
269		let body = Body::from_producer(producer);
270		assert!(body.is_end_stream(), "Stream variant must forward inner is_end_stream");
271		assert_eq!(body.size_hint().exact(), Some(42));
272	}
273
274	#[test]
275	fn from_producer_passes_data_frames_through_unchanged() {
276		let producer: ScriptedBody<Error> = ScriptedBody::new(vec![
277			Step::Data(Bytes::from_static(b"one")),
278			Step::Data(Bytes::from_static(b"two")),
279			Step::End,
280		]);
281		let mut body = Body::from_producer(producer);
282
283		let Poll::Ready(Some(Ok(f1))) = poll_once(&mut body) else {
284			panic!("first poll must yield a data frame");
285		};
286		assert_eq!(f1.into_data().expect("data frame").as_ref(), b"one");
287
288		let Poll::Ready(Some(Ok(f2))) = poll_once(&mut body) else {
289			panic!("second poll must yield a data frame");
290		};
291		assert_eq!(f2.into_data().expect("data frame").as_ref(), b"two");
292
293		match poll_once(&mut body) {
294			Poll::Ready(None) => {}
295			other => panic!("exhausted stream must be Ready(None), got {other:?}"),
296		}
297	}
298
299	#[test]
300	fn from_producer_converts_inner_error_via_into() {
301		let io_err = std::io::Error::other("scripted-io-failure");
302		let producer: ScriptedBody<std::io::Error> = ScriptedBody::new(vec![Step::Err(io_err)]);
303		let mut body = Body::from_producer(producer);
304		match poll_once(&mut body) {
305			Poll::Ready(Some(Err(e))) => {
306				assert!(matches!(e.kind(), ErrorKind::Io), "io::Error must map to ErrorKind::Io");
307			}
308			other => panic!("expected Ready(Some(Err(..))) from failing producer, got {other:?}"),
309		}
310	}
311
312	#[test]
313	fn from_producer_preserves_end_of_stream_signal() {
314		let producer: ScriptedBody<Error> = ScriptedBody::new(vec![]);
315		let mut body = Body::from_producer(producer);
316		match poll_once(&mut body) {
317			Poll::Ready(None) => {}
318			other => panic!("empty scripted producer must report end-of-stream, got {other:?}"),
319		}
320	}
321
322	#[test]
323	fn from_producer_accepts_serde_json_error_source() {
324		let parse_err: serde_json::Error =
325			serde_json::from_str::<serde_json::Value>("{not json").unwrap_err();
326		let producer: ScriptedBody<serde_json::Error> = ScriptedBody::new(vec![Step::Err(parse_err)]);
327		let mut body = Body::from_producer(producer);
328		match poll_once(&mut body) {
329			Poll::Ready(Some(Err(e))) => {
330				assert!(
331					matches!(e.kind(), ErrorKind::Compile),
332					"serde_json::Error must map to ErrorKind::Compile",
333				);
334			}
335			other => panic!("expected converted Compile error, got {other:?}"),
336		}
337	}
338}