1use crate::error::FrameworkError;
8use bytes::Bytes;
9use http_body_util::{BodyExt, Full};
10use hyper::body::Incoming;
11use hyper::body::{Body, Frame, SizeHint};
12use serde::de::DeserializeOwned;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pub async fn collect_body(body: Incoming) -> Result<Bytes, FrameworkError> {
18 body.collect()
19 .await
20 .map(|collected| collected.to_bytes())
21 .map_err(|e| FrameworkError::internal(format!("Failed to read request body: {e}")))
22}
23
24pub fn parse_json<T: DeserializeOwned>(bytes: &Bytes) -> Result<T, FrameworkError> {
26 serde_json::from_slice(bytes)
27 .map_err(|e| FrameworkError::internal(format!("Failed to parse JSON body: {e}")))
28}
29
30pub fn parse_form<T: DeserializeOwned>(bytes: &Bytes) -> Result<T, FrameworkError> {
32 serde_urlencoded::from_bytes(bytes)
33 .map_err(|e| FrameworkError::internal(format!("Failed to parse form body: {e}")))
34}
35
36pub enum FerroBody {
58 Full(Full<Bytes>),
60 Stream(crate::http::sse::SseStream),
62}
63
64impl std::fmt::Debug for FerroBody {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 FerroBody::Full(b) => write!(f, "FerroBody::Full({b:?})"),
68 FerroBody::Stream(_) => write!(f, "FerroBody::Stream(..)"),
69 }
70 }
71}
72
73impl Body for FerroBody {
74 type Data = Bytes;
75 type Error = std::convert::Infallible;
76
77 fn poll_frame(
78 mut self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 ) -> Poll<Option<Result<Frame<Bytes>, Self::Error>>> {
81 match &mut *self {
82 FerroBody::Full(b) => {
83 Pin::new(b).poll_frame(cx).map_err(|e| match e {})
85 }
86 FerroBody::Stream(s) => Pin::new(s).poll_frame(cx),
87 }
88 }
89
90 fn is_end_stream(&self) -> bool {
91 match self {
92 FerroBody::Full(b) => b.is_end_stream(),
93 FerroBody::Stream(s) => s.is_end_stream(),
94 }
95 }
96
97 fn size_hint(&self) -> SizeHint {
98 match self {
99 FerroBody::Full(b) => b.size_hint(),
100 FerroBody::Stream(s) => s.size_hint(),
101 }
102 }
103}
104
105impl FerroBody {
106 pub fn is_streaming(&self) -> bool {
114 matches!(self, FerroBody::Stream(_))
115 }
116}
117
118impl From<Full<Bytes>> for FerroBody {
119 fn from(b: Full<Bytes>) -> Self {
120 FerroBody::Full(b)
121 }
122}
123
124#[cfg(test)]
125mod ferro_body_tests {
126 use super::*;
127 use crate::http::sse::{SseEvent, SseStream};
128 use futures_util::task::noop_waker;
129 use http_body_util::BodyExt;
130
131 #[tokio::test]
133 async fn ferro_body_full_variant() {
134 let body = FerroBody::Full(Full::new(Bytes::from("hi")));
135 let collected = body.collect().await.unwrap().to_bytes();
136 assert_eq!(collected, Bytes::from("hi"));
137 }
138
139 #[tokio::test]
141 async fn ferro_body_stream_variant() {
142 let (tx, stream) = SseStream::channel(4);
143 tx.send(SseEvent::data("test")).await.unwrap();
144
145 let mut body = FerroBody::Stream(stream);
146 let waker = noop_waker();
147 let mut cx = Context::from_waker(&waker);
148
149 let frame = Pin::new(&mut body).poll_frame(&mut cx);
150 assert!(
151 matches!(frame, Poll::Ready(Some(Ok(_)))),
152 "expected Poll::Ready(Some(Ok(_))), got {frame:?}"
153 );
154 }
155}