use crate::error::FrameworkError;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::body::{Body, Frame, SizeHint};
use serde::de::DeserializeOwned;
use std::pin::Pin;
use std::task::{Context, Poll};
pub async fn collect_body(body: Incoming) -> Result<Bytes, FrameworkError> {
body.collect()
.await
.map(|collected| collected.to_bytes())
.map_err(|e| FrameworkError::internal(format!("Failed to read request body: {e}")))
}
pub fn parse_json<T: DeserializeOwned>(bytes: &Bytes) -> Result<T, FrameworkError> {
serde_json::from_slice(bytes)
.map_err(|e| FrameworkError::internal(format!("Failed to parse JSON body: {e}")))
}
pub fn parse_form<T: DeserializeOwned>(bytes: &Bytes) -> Result<T, FrameworkError> {
serde_urlencoded::from_bytes(bytes)
.map_err(|e| FrameworkError::internal(format!("Failed to parse form body: {e}")))
}
pub enum FerroBody {
Full(Full<Bytes>),
Stream(crate::http::sse::SseStream),
}
impl std::fmt::Debug for FerroBody {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FerroBody::Full(b) => write!(f, "FerroBody::Full({b:?})"),
FerroBody::Stream(_) => write!(f, "FerroBody::Stream(..)"),
}
}
}
impl Body for FerroBody {
type Data = Bytes;
type Error = std::convert::Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Bytes>, Self::Error>>> {
match &mut *self {
FerroBody::Full(b) => {
Pin::new(b).poll_frame(cx).map_err(|e| match e {})
}
FerroBody::Stream(s) => Pin::new(s).poll_frame(cx),
}
}
fn is_end_stream(&self) -> bool {
match self {
FerroBody::Full(b) => b.is_end_stream(),
FerroBody::Stream(s) => s.is_end_stream(),
}
}
fn size_hint(&self) -> SizeHint {
match self {
FerroBody::Full(b) => b.size_hint(),
FerroBody::Stream(s) => s.size_hint(),
}
}
}
impl FerroBody {
pub fn is_streaming(&self) -> bool {
matches!(self, FerroBody::Stream(_))
}
}
impl From<Full<Bytes>> for FerroBody {
fn from(b: Full<Bytes>) -> Self {
FerroBody::Full(b)
}
}
#[cfg(test)]
mod ferro_body_tests {
use super::*;
use crate::http::sse::{SseEvent, SseStream};
use futures_util::task::noop_waker;
use http_body_util::BodyExt;
#[tokio::test]
async fn ferro_body_full_variant() {
let body = FerroBody::Full(Full::new(Bytes::from("hi")));
let collected = body.collect().await.unwrap().to_bytes();
assert_eq!(collected, Bytes::from("hi"));
}
#[tokio::test]
async fn ferro_body_stream_variant() {
let (tx, stream) = SseStream::channel(4);
tx.send(SseEvent::data("test")).await.unwrap();
let mut body = FerroBody::Stream(stream);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let frame = Pin::new(&mut body).poll_frame(&mut cx);
assert!(
matches!(frame, Poll::Ready(Some(Ok(_)))),
"expected Poll::Ready(Some(Ok(_))), got {frame:?}"
);
}
}