use crate::error::Error;
use futures::stream::{Stream, TryStreamExt};
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
use hyper::body::{Body, Bytes, Frame, Incoming, SizeHint};
use std::{
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug, Default)]
pub enum HttpBody<D = Bytes> {
#[default]
Empty,
Incoming(Incoming),
Full(Full<D>),
Boxed(BoxBody<D, Error>),
}
impl HttpBody {
pub fn stream<S, D, E>(stream: S) -> Self
where
S: Stream<Item = Result<D, E>> + Send + Sync + 'static,
D: Into<Bytes>,
E: Into<Error> + 'static,
{
Self::Boxed(BodyExt::boxed(StreamBody::new(
stream
.map_ok(|data| Frame::<Bytes>::data(data.into()))
.map_err(Into::into),
)))
}
}
impl hyper::body::Body for HttpBody {
type Data = Bytes;
type Error = Error;
#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.get_mut() {
Self::Empty => Poll::Ready(None),
Self::Incoming(body) => Pin::new(body).poll_frame(cx).map_err(Error::from),
Self::Full(f) => Pin::new(f).poll_frame(cx).map_err(Error::from),
Self::Boxed(b) => Pin::new(b).poll_frame(cx),
}
}
fn is_end_stream(&self) -> bool {
match self {
Self::Empty => true,
Self::Full(f) => f.is_end_stream(),
Self::Boxed(b) => b.is_end_stream(),
Self::Incoming(body) => body.is_end_stream(),
}
}
fn size_hint(&self) -> SizeHint {
match self {
Self::Empty => SizeHint::with_exact(0),
Self::Full(f) => f.size_hint(),
Self::Boxed(b) => b.size_hint(),
Self::Incoming(body) => body.size_hint(),
}
}
}
impl Stream for HttpBody {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match match self.get_mut() {
Self::Empty => return Poll::Ready(None),
Self::Incoming(body) => Pin::new(body)
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
Self::Full(f) => Pin::new(f)
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
Self::Boxed(b) => Pin::new(b)
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
} {
Poll::Ready(Some(f)) => Poll::Ready(f.into_data().map(Ok).ok()),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let sh = match self {
Self::Empty => return (0, Some(0)),
Self::Incoming(b) => b.size_hint(),
Self::Full(f) => f.size_hint(),
Self::Boxed(b) => b.size_hint(),
};
(
usize::try_from(sh.lower()).unwrap_or(usize::MAX),
sh.upper().map(|v| usize::try_from(v).unwrap_or(usize::MAX)),
)
}
}
impl<D> From<()> for HttpBody<D> {
fn from(_: ()) -> Self {
Self::Empty
}
}
impl<D> From<Full<D>> for HttpBody<D> {
fn from(value: Full<D>) -> Self {
Self::Full(value)
}
}
impl<D> From<BoxBody<D, Error>> for HttpBody<D> {
fn from(value: BoxBody<D, Error>) -> Self {
Self::Boxed(value)
}
}
impl From<Bytes> for HttpBody {
fn from(value: Bytes) -> HttpBody {
Self::Full(Full::from(value))
}
}
impl From<Incoming> for HttpBody {
fn from(value: Incoming) -> HttpBody {
Self::Incoming(value)
}
}
impl From<String> for HttpBody {
#[inline]
fn from(value: String) -> HttpBody {
Self::Full(Full::from(value))
}
}
impl From<&'static [u8]> for HttpBody {
fn from(value: &'static [u8]) -> HttpBody {
Self::Full(Full::from(value))
}
}
impl From<&'static str> for HttpBody {
fn from(value: &'static str) -> HttpBody {
Self::Full(Full::from(value))
}
}
impl From<Vec<u8>> for HttpBody {
fn from(value: Vec<u8>) -> HttpBody {
Self::Full(Full::from(value))
}
}