ugi 0.2.1

Runtime-agnostic Rust request client with HTTP/1.1, HTTP/2, HTTP/3, H2C, WebSocket, SSE, and gRPC support
Documentation
use std::pin::Pin;
use std::{fmt, fmt::Formatter};

use bytes::Bytes;
use futures_lite::Stream;

use crate::error::{Error, ErrorKind, Result};

/// A pinned, boxed async byte stream used as a streaming request or response body.
pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + 'static>>;

enum BodyInner {
    Bytes(Option<Bytes>),
    Stream(Option<BodyStream>),
}

/// The resolved form of a [`Body`] after it has been consumed.
///
/// Returned by [`Body::into_data`].
pub enum BodyData {
    /// The body was an in-memory buffer.
    Bytes(Bytes),
    /// The body is a streaming source.
    Stream(BodyStream),
}

/// An HTTP request or response body.
///
/// A `Body` can be backed by either an in-memory [`Bytes`] buffer or an async
/// [`BodyStream`].  Each variant can be consumed only once; subsequent reads
/// return [`ErrorKind::BodyAlreadyConsumed`](crate::ErrorKind::BodyAlreadyConsumed).
pub struct Body {
    inner: BodyInner,
}

impl Body {
    /// Create an empty body (zero bytes, no stream).
    pub fn empty() -> Self {
        Self::from(Bytes::new())
    }

    /// Wrap an async stream as a body.
    pub fn from_stream(stream: BodyStream) -> Self {
        Self {
            inner: BodyInner::Stream(Some(stream)),
        }
    }

    /// Consume the body and return its data.
    ///
    /// Returns an error if the body has already been consumed.
    pub fn into_data(self) -> Result<BodyData> {
        match self.inner {
            BodyInner::Bytes(Some(bytes)) => Ok(BodyData::Bytes(bytes)),
            BodyInner::Stream(Some(stream)) => Ok(BodyData::Stream(stream)),
            BodyInner::Bytes(None) | BodyInner::Stream(None) => Err(Error::new(
                ErrorKind::BodyAlreadyConsumed,
                "body was already consumed",
            )),
        }
    }

    /// Take the inner bytes if this is a buffered body.
    ///
    /// Returns an error if the body is streaming or has already been consumed.
    pub fn take_bytes(&mut self) -> Result<Bytes> {
        match &mut self.inner {
            BodyInner::Bytes(slot) => slot.take().ok_or_else(|| {
                Error::new(ErrorKind::BodyAlreadyConsumed, "body was already consumed")
            }),
            BodyInner::Stream(_) => Err(Error::new(
                ErrorKind::BodyAlreadyConsumed,
                "stream body cannot be read as aggregated bytes yet",
            )),
        }
    }

    /// Take the inner stream if this is a streaming body.
    ///
    /// Returns an error if the body is buffered bytes or has already been consumed.
    pub fn take_stream(&mut self) -> Result<BodyStream> {
        match &mut self.inner {
            BodyInner::Stream(slot) => slot.take().ok_or_else(|| {
                Error::new(ErrorKind::BodyAlreadyConsumed, "body was already consumed")
            }),
            BodyInner::Bytes(_) => Err(Error::new(
                ErrorKind::BodyAlreadyConsumed,
                "bytes body cannot be read as stream",
            )),
        }
    }

    /// Returns `true` if the body has already been consumed.
    pub fn is_consumed(&self) -> bool {
        match &self.inner {
            BodyInner::Bytes(slot) => slot.is_none(),
            BodyInner::Stream(slot) => slot.is_none(),
        }
    }

    /// Clone this body if it is backed by in-memory bytes.
    ///
    /// Returns `None` for streaming bodies or already-consumed bodies, since
    /// streams cannot be cheaply cloned.
    pub fn try_clone(&self) -> Option<Self> {
        match &self.inner {
            BodyInner::Bytes(Some(bytes)) => Some(Self::from(bytes.clone())),
            BodyInner::Bytes(None) | BodyInner::Stream(_) => None,
        }
    }
}

impl Default for Body {
    fn default() -> Self {
        Self::empty()
    }
}

impl fmt::Debug for Body {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match &self.inner {
            BodyInner::Bytes(Some(bytes)) => f
                .debug_struct("Body")
                .field("kind", &"bytes")
                .field("len", &bytes.len())
                .finish(),
            BodyInner::Bytes(None) => f
                .debug_struct("Body")
                .field("kind", &"bytes")
                .field("consumed", &true)
                .finish(),
            BodyInner::Stream(Some(_)) => f.debug_struct("Body").field("kind", &"stream").finish(),
            BodyInner::Stream(None) => f
                .debug_struct("Body")
                .field("kind", &"stream")
                .field("consumed", &true)
                .finish(),
        }
    }
}

impl From<Bytes> for Body {
    fn from(value: Bytes) -> Self {
        Self {
            inner: BodyInner::Bytes(Some(value)),
        }
    }
}

impl From<Vec<u8>> for Body {
    fn from(value: Vec<u8>) -> Self {
        Self::from(Bytes::from(value))
    }
}

impl From<String> for Body {
    fn from(value: String) -> Self {
        Self::from(Bytes::from(value))
    }
}

impl From<&'static [u8]> for Body {
    fn from(value: &'static [u8]) -> Self {
        Self::from(Bytes::from_static(value))
    }
}

impl From<&'static str> for Body {
    fn from(value: &'static str) -> Self {
        Self::from(Bytes::from_static(value.as_bytes()))
    }
}

#[cfg(test)]
mod tests {
    use bytes::Bytes;

    use super::Body;

    #[test]
    fn bytes_body_is_consumed_once() {
        let mut body = Body::from(Bytes::from_static(b"hello"));
        assert_eq!(body.take_bytes().unwrap(), Bytes::from_static(b"hello"));
        assert!(body.take_bytes().is_err());
    }

    #[test]
    fn bytes_body_can_be_cloned_before_consumption() {
        let body = Body::from(Bytes::from_static(b"hello"));
        let mut cloned = body.try_clone().expect("bytes body to clone");
        assert_eq!(cloned.take_bytes().unwrap(), Bytes::from_static(b"hello"));
    }
}