Skip to main content

ferro_rs/http/
body.rs

1//! HTTP body types and parsing utilities.
2//!
3//! This module provides:
4//! - [`FerroBody`] — the unified response body type for the framework's hyper serve loop.
5//! - Request-body collection and parsing helpers.
6
7use crate::error::FrameworkError;
8use bytes::Bytes;
9use http_body_util::{BodyExt, Full};
10use hyper::body::Incoming;
11use hyper::body::{Body, Frame, SizeHint};
12use serde::de::DeserializeOwned;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Collect the full body from an Incoming stream
17pub async fn collect_body(body: Incoming) -> Result<Bytes, FrameworkError> {
18    body.collect()
19        .await
20        .map(|collected| collected.to_bytes())
21        .map_err(|e| FrameworkError::internal(format!("Failed to read request body: {e}")))
22}
23
24/// Parse bytes as JSON into the target type
25pub fn parse_json<T: DeserializeOwned>(bytes: &Bytes) -> Result<T, FrameworkError> {
26    serde_json::from_slice(bytes)
27        .map_err(|e| FrameworkError::internal(format!("Failed to parse JSON body: {e}")))
28}
29
30/// Parse bytes as form-urlencoded into the target type
31pub fn parse_form<T: DeserializeOwned>(bytes: &Bytes) -> Result<T, FrameworkError> {
32    serde_urlencoded::from_bytes(bytes)
33        .map_err(|e| FrameworkError::internal(format!("Failed to parse form body: {e}")))
34}
35
36// ──────────────────────────────────────────────────────────────────────────────
37// FerroBody — unified response body for the hyper serve loop
38// ──────────────────────────────────────────────────────────────────────────────
39
40/// Unified HTTP response body for the framework's hyper serve loop.
41///
42/// Carries either a fully-buffered body ([`Full<Bytes>`]) or a streaming SSE body
43/// ([`SseStream`](crate::http::sse::SseStream)), allowing the same `hyper::Response<FerroBody>`
44/// type to flow through all response-path code without dynamic dispatch or heap allocation on
45/// the buffered hot path.
46///
47/// # Structural compression rule (D-06)
48///
49/// If a compression layer is added in a future phase it MUST match on `FerroBody::Full` only
50/// and pass `FerroBody::Stream` through untouched. A streaming body cannot be whole-body
51/// buffered without breaking the SSE protocol.
52///
53/// # Error type
54///
55/// `FerroBody::Error = Infallible` — both variants are infallible so the serve loop's
56/// `Ok::<_, Infallible>(...)` wrapper is unchanged.
57pub enum FerroBody {
58    /// Fully-buffered response body (the common, allocation-free path).
59    Full(Full<Bytes>),
60    /// Streaming SSE body (held-open connection; yields frames as events arrive).
61    Stream(crate::http::sse::SseStream),
62}
63
64impl std::fmt::Debug for FerroBody {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        match self {
67            FerroBody::Full(b) => write!(f, "FerroBody::Full({b:?})"),
68            FerroBody::Stream(_) => write!(f, "FerroBody::Stream(..)"),
69        }
70    }
71}
72
73impl Body for FerroBody {
74    type Data = Bytes;
75    type Error = std::convert::Infallible;
76
77    fn poll_frame(
78        mut self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80    ) -> Poll<Option<Result<Frame<Bytes>, Self::Error>>> {
81        match &mut *self {
82            FerroBody::Full(b) => {
83                // Full<Bytes>::Error = Infallible — map_err is a no-op but satisfies the type.
84                Pin::new(b).poll_frame(cx).map_err(|e| match e {})
85            }
86            FerroBody::Stream(s) => Pin::new(s).poll_frame(cx),
87        }
88    }
89
90    fn is_end_stream(&self) -> bool {
91        match self {
92            FerroBody::Full(b) => b.is_end_stream(),
93            FerroBody::Stream(s) => s.is_end_stream(),
94        }
95    }
96
97    fn size_hint(&self) -> SizeHint {
98        match self {
99            FerroBody::Full(b) => b.size_hint(),
100            FerroBody::Stream(s) => s.size_hint(),
101        }
102    }
103}
104
105impl FerroBody {
106    /// Returns `true` if this body is the streaming `Stream` variant.
107    ///
108    /// # Structural compression rule (D-06)
109    ///
110    /// If a compression layer is added in a future phase, it must check `is_streaming()` and
111    /// pass `FerroBody::Stream` bodies through untouched — only `FerroBody::Full` can be
112    /// whole-body buffered without breaking the SSE protocol.
113    pub fn is_streaming(&self) -> bool {
114        matches!(self, FerroBody::Stream(_))
115    }
116}
117
118impl From<Full<Bytes>> for FerroBody {
119    fn from(b: Full<Bytes>) -> Self {
120        FerroBody::Full(b)
121    }
122}
123
124#[cfg(test)]
125mod ferro_body_tests {
126    use super::*;
127    use crate::http::sse::{SseEvent, SseStream};
128    use futures_util::task::noop_waker;
129    use http_body_util::BodyExt;
130
131    /// T-168-05: FerroBody::Full variant collects to expected bytes
132    #[tokio::test]
133    async fn ferro_body_full_variant() {
134        let body = FerroBody::Full(Full::new(Bytes::from("hi")));
135        let collected = body.collect().await.unwrap().to_bytes();
136        assert_eq!(collected, Bytes::from("hi"));
137    }
138
139    /// T-168-06: FerroBody::Stream variant delegates poll_frame to SseStream
140    #[tokio::test]
141    async fn ferro_body_stream_variant() {
142        let (tx, stream) = SseStream::channel(4);
143        tx.send(SseEvent::data("test")).await.unwrap();
144
145        let mut body = FerroBody::Stream(stream);
146        let waker = noop_waker();
147        let mut cx = Context::from_waker(&waker);
148
149        let frame = Pin::new(&mut body).poll_frame(&mut cx);
150        assert!(
151            matches!(frame, Poll::Ready(Some(Ok(_)))),
152            "expected Poll::Ready(Some(Ok(_))), got {frame:?}"
153        );
154    }
155}