http-request-body 0.1.0

HTTP Request Body
Documentation
#[cfg(feature = "hyper-request-body")]
pub use hyper_request_body::{self, Body as HyperRequestBody};
#[cfg(feature = "warp-request-body")]
pub use warp_request_body::{self, Body as WarpRequestBody};

use core::{
    pin::Pin,
    task::{Context, Poll},
};

use bytes::Bytes;
use futures_util::Stream;
use pin_project_lite::pin_project;

pub mod error;
mod utils;

use error::Error;

//
pin_project! {
    #[project = BodyProj]
    pub enum Body {
        Bytes { inner: Bytes },
        Stream { #[pin] inner: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>> },
    }
}

impl core::fmt::Debug for Body {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::Bytes { inner } => f.debug_tuple("Bytes").field(&inner).finish(),
            Self::Stream { inner: _ } => write!(f, "Stream"),
        }
    }
}

impl core::fmt::Display for Body {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        write!(f, "{self:?}")
    }
}

impl Default for Body {
    fn default() -> Self {
        Self::Bytes {
            inner: Bytes::default(),
        }
    }
}

//
impl Body {
    pub fn with_bytes_from<T>(v: T) -> Self
    where
        T: Into<Bytes>,
    {
        Self::Bytes { inner: v.into() }
    }

    pub fn with_bytes(bytes: Bytes) -> Self {
        Self::Bytes { inner: bytes }
    }

    pub fn with_stream(stream: impl Stream<Item = Result<Bytes, Error>> + Send + 'static) -> Self {
        Self::Stream {
            inner: Box::pin(stream),
        }
    }

    #[cfg(feature = "hyper-request-body")]
    pub fn from_hyper_body(body: hyper_request_body::HyperBody) -> Self {
        use futures_util::TryStreamExt as _;

        Self::with_stream(body.map_err(|err| Error::Other(err.into())))
    }

    #[cfg(feature = "hyper-request-body")]
    pub fn from_hyper_request_body(body: hyper_request_body::Body) -> Self {
        use futures_util::TryStreamExt as _;
        use hyper_request_body::error::Error as HyperRequestBodyError;

        match body {
            hyper_request_body::Body::HyperBody { inner } => {
                Self::with_stream(inner.map_err(|err| Error::Other(err.into())))
            }
            body => Self::with_stream(body.map_err(|err| match err {
                HyperRequestBodyError::HyperError(err) => Error::Other(err.into()),
                HyperRequestBodyError::Other(err) => Error::Other(err),
            })),
        }
    }

    #[cfg(feature = "warp-request-body")]
    pub fn from_warp_request_body(body: warp_request_body::Body) -> Self {
        use futures_util::TryStreamExt as _;
        use warp_request_body::error::Error as WarpRequestBodyError;

        match body {
            warp_request_body::Body::HyperBody { inner } => {
                Self::with_stream(inner.map_err(|err| Error::Other(err.into())))
            }
            body => Self::with_stream(body.map_err(|err| match err {
                WarpRequestBodyError::HyperError(err) => Error::Other(err.into()),
                WarpRequestBodyError::WarpError(err) => Error::Other(err.into()),
            })),
        }
    }
}

#[cfg(feature = "hyper-request-body")]
impl From<hyper_request_body::HyperBody> for Body {
    fn from(body: hyper_request_body::HyperBody) -> Self {
        Self::from_hyper_body(body)
    }
}

#[cfg(feature = "hyper-request-body")]
impl From<hyper_request_body::Body> for Body {
    fn from(body: hyper_request_body::Body) -> Self {
        Self::from_hyper_request_body(body)
    }
}

#[cfg(feature = "warp-request-body")]
impl From<warp_request_body::Body> for Body {
    fn from(body: warp_request_body::Body) -> Self {
        Self::from_warp_request_body(body)
    }
}

impl Body {
    pub fn require_to_bytes_async(&self) -> bool {
        matches!(self, Self::Stream { inner: _ })
    }

    pub fn to_bytes(self) -> Bytes {
        match self {
            Self::Bytes { inner } => inner,
            Self::Stream { inner: _ } => panic!("Please call require_to_bytes_async first"),
        }
    }

    pub async fn to_bytes_async(self) -> Result<Bytes, Error> {
        match self {
            Self::Bytes { inner } => Ok(inner),
            Self::Stream { inner } => utils::bytes_stream_to_bytes(inner)
                .await
                .map_err(Into::into),
        }
    }
}

//
impl Stream for Body {
    type Item = Result<Bytes, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.project() {
            BodyProj::Bytes { inner } => {
                if !inner.is_empty() {
                    let bytes = inner.clone();
                    inner.clear();
                    Poll::Ready(Some(Ok(bytes)))
                } else {
                    Poll::Ready(None)
                }
            }
            BodyProj::Stream { inner } => inner.poll_next(cx).map_err(Into::into),
        }
    }
}

#[cfg(test)]
mod tests {
    use futures_util::StreamExt as _;

    use super::*;

    #[tokio::test]
    async fn test_with_bytes() {
        //
        let bytes = Bytes::from_static(b"foo");
        let body = Body::with_bytes(bytes);
        assert!(matches!(body, Body::Bytes { inner: _ }));
        assert!(!body.require_to_bytes_async());
        assert_eq!(body.to_bytes(), Bytes::copy_from_slice(b"foo"));
    }

    #[tokio::test]
    async fn test_with_stream() {
        //
        let stream =
            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
        let body = Body::with_stream(stream);
        assert!(matches!(body, Body::Stream { inner: _ }));
        assert!(body.require_to_bytes_async());
        assert_eq!(
            body.to_bytes_async().await.unwrap(),
            Bytes::copy_from_slice(b"foo")
        );
    }

    #[cfg(feature = "hyper-request-body")]
    #[tokio::test]
    async fn test_from_hyper_body() {
        //
        let body = hyper_request_body::HyperBody::from("foo");
        let body = Body::from_hyper_body(body);
        assert!(matches!(body, Body::Stream { inner: _ }));
        assert!(body.require_to_bytes_async());
        assert_eq!(
            body.to_bytes_async().await.unwrap(),
            Bytes::copy_from_slice(b"foo")
        );
    }

    #[cfg(feature = "hyper-request-body")]
    #[tokio::test]
    async fn test_from_hyper_request_body() {
        //
        let body = hyper_request_body::Body::HyperBody {
            inner: hyper_request_body::HyperBody::from("foo"),
        };
        let body = Body::from_hyper_request_body(body);
        assert!(matches!(body, Body::Stream { inner: _ }));
        assert!(body.require_to_bytes_async());
        assert_eq!(
            body.to_bytes_async().await.unwrap(),
            Bytes::copy_from_slice(b"foo")
        );
    }

    #[cfg(feature = "warp-request-body")]
    #[tokio::test]
    async fn test_from_warp_request_body() {
        //
        let body = warp_request_body::Body::Bytes {
            inner: Bytes::from_static(b"foo"),
        };
        let body = Body::from_warp_request_body(body);
        assert!(matches!(body, Body::Stream { inner: _ }));
        assert!(body.require_to_bytes_async());
        assert_eq!(
            body.to_bytes_async().await.unwrap(),
            Bytes::copy_from_slice(b"foo")
        );
    }
}