use std::pin::Pin;
use std::{fmt, fmt::Formatter};
use bytes::Bytes;
use futures_lite::Stream;
use crate::error::{Error, ErrorKind, Result};
pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + 'static>>;
enum BodyInner {
Bytes(Option<Bytes>),
Stream(Option<BodyStream>),
}
pub enum BodyData {
Bytes(Bytes),
Stream(BodyStream),
}
pub struct Body {
inner: BodyInner,
}
impl Body {
pub fn empty() -> Self {
Self::from(Bytes::new())
}
pub fn from_stream(stream: BodyStream) -> Self {
Self {
inner: BodyInner::Stream(Some(stream)),
}
}
pub fn into_data(self) -> Result<BodyData> {
match self.inner {
BodyInner::Bytes(Some(bytes)) => Ok(BodyData::Bytes(bytes)),
BodyInner::Stream(Some(stream)) => Ok(BodyData::Stream(stream)),
BodyInner::Bytes(None) | BodyInner::Stream(None) => Err(Error::new(
ErrorKind::BodyAlreadyConsumed,
"body was already consumed",
)),
}
}
pub fn take_bytes(&mut self) -> Result<Bytes> {
match &mut self.inner {
BodyInner::Bytes(slot) => slot.take().ok_or_else(|| {
Error::new(ErrorKind::BodyAlreadyConsumed, "body was already consumed")
}),
BodyInner::Stream(_) => Err(Error::new(
ErrorKind::BodyAlreadyConsumed,
"stream body cannot be read as aggregated bytes yet",
)),
}
}
pub fn take_stream(&mut self) -> Result<BodyStream> {
match &mut self.inner {
BodyInner::Stream(slot) => slot.take().ok_or_else(|| {
Error::new(ErrorKind::BodyAlreadyConsumed, "body was already consumed")
}),
BodyInner::Bytes(_) => Err(Error::new(
ErrorKind::BodyAlreadyConsumed,
"bytes body cannot be read as stream",
)),
}
}
pub fn is_consumed(&self) -> bool {
match &self.inner {
BodyInner::Bytes(slot) => slot.is_none(),
BodyInner::Stream(slot) => slot.is_none(),
}
}
pub fn try_clone(&self) -> Option<Self> {
match &self.inner {
BodyInner::Bytes(Some(bytes)) => Some(Self::from(bytes.clone())),
BodyInner::Bytes(None) | BodyInner::Stream(_) => None,
}
}
}
impl Default for Body {
fn default() -> Self {
Self::empty()
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self.inner {
BodyInner::Bytes(Some(bytes)) => f
.debug_struct("Body")
.field("kind", &"bytes")
.field("len", &bytes.len())
.finish(),
BodyInner::Bytes(None) => f
.debug_struct("Body")
.field("kind", &"bytes")
.field("consumed", &true)
.finish(),
BodyInner::Stream(Some(_)) => f.debug_struct("Body").field("kind", &"stream").finish(),
BodyInner::Stream(None) => f
.debug_struct("Body")
.field("kind", &"stream")
.field("consumed", &true)
.finish(),
}
}
}
impl From<Bytes> for Body {
fn from(value: Bytes) -> Self {
Self {
inner: BodyInner::Bytes(Some(value)),
}
}
}
impl From<Vec<u8>> for Body {
fn from(value: Vec<u8>) -> Self {
Self::from(Bytes::from(value))
}
}
impl From<String> for Body {
fn from(value: String) -> Self {
Self::from(Bytes::from(value))
}
}
impl From<&'static [u8]> for Body {
fn from(value: &'static [u8]) -> Self {
Self::from(Bytes::from_static(value))
}
}
impl From<&'static str> for Body {
fn from(value: &'static str) -> Self {
Self::from(Bytes::from_static(value.as_bytes()))
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use super::Body;
#[test]
fn bytes_body_is_consumed_once() {
let mut body = Body::from(Bytes::from_static(b"hello"));
assert_eq!(body.take_bytes().unwrap(), Bytes::from_static(b"hello"));
assert!(body.take_bytes().is_err());
}
#[test]
fn bytes_body_can_be_cloned_before_consumption() {
let body = Body::from(Bytes::from_static(b"hello"));
let mut cloned = body.try_clone().expect("bytes body to clone");
assert_eq!(cloned.take_bytes().unwrap(), Bytes::from_static(b"hello"));
}
}