use bytes::Bytes;
use futures_core::Stream;
use http_body::Frame;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::OxiHttpError;
#[derive(Debug, Default)]
pub enum Body {
#[default]
Empty,
Full(FullBody),
Stream(StreamBody),
}
impl Body {
pub fn empty() -> Self {
Self::Empty
}
pub fn full(data: impl Into<Bytes>) -> Self {
Self::Full(FullBody {
data: Some(data.into()),
})
}
pub fn stream(inner: Pin<Box<dyn Stream<Item = Result<Bytes, OxiHttpError>> + Send>>) -> Self {
Self::Stream(StreamBody { inner })
}
pub fn content_length(&self) -> Option<u64> {
match self {
Self::Empty => Some(0),
Self::Full(full) => full.data.as_ref().map(|d| d.len() as u64),
Self::Stream(_) => None,
}
}
pub fn into_pinned(self) -> PinnedBody {
PinnedBody::from(self)
}
}
impl From<()> for Body {
fn from(_: ()) -> Self {
Self::Empty
}
}
impl From<Bytes> for Body {
fn from(b: Bytes) -> Self {
if b.is_empty() {
Self::Empty
} else {
Self::full(b)
}
}
}
impl From<Vec<u8>> for Body {
fn from(v: Vec<u8>) -> Self {
Self::from(Bytes::from(v))
}
}
impl From<String> for Body {
fn from(s: String) -> Self {
Self::from(Bytes::from(s))
}
}
impl From<&'static str> for Body {
fn from(s: &'static str) -> Self {
Self::from(Bytes::from_static(s.as_bytes()))
}
}
impl From<&'static [u8]> for Body {
fn from(s: &'static [u8]) -> Self {
Self::from(Bytes::from_static(s))
}
}
#[derive(Debug)]
pub struct FullBody {
data: Option<Bytes>,
}
pub struct StreamBody {
inner: Pin<Box<dyn Stream<Item = Result<Bytes, OxiHttpError>> + Send>>,
}
impl std::fmt::Debug for StreamBody {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamBody").finish()
}
}
pin_project! {
#[project = PinnedBodyProj]
pub enum PinnedBody {
Empty,
Full { data: Option<Bytes> },
Stream { #[pin] inner: Pin<Box<dyn Stream<Item = Result<Bytes, OxiHttpError>> + Send>> },
}
}
impl From<Body> for PinnedBody {
fn from(body: Body) -> Self {
match body {
Body::Empty => PinnedBody::Empty,
Body::Full(f) => PinnedBody::Full { data: f.data },
Body::Stream(s) => PinnedBody::Stream { inner: s.inner },
}
}
}
impl http_body::Body for PinnedBody {
type Data = Bytes;
type Error = OxiHttpError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.project() {
PinnedBodyProj::Empty => Poll::Ready(None),
PinnedBodyProj::Full { data } => {
let chunk = data.take();
match chunk {
Some(d) if !d.is_empty() => Poll::Ready(Some(Ok(Frame::data(d)))),
_ => Poll::Ready(None),
}
}
PinnedBodyProj::Stream { mut inner } => match inner.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => Poll::Ready(Some(Ok(Frame::data(chunk)))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
}
}
fn is_end_stream(&self) -> bool {
match self {
PinnedBody::Empty => true,
PinnedBody::Full { data } => data.is_none(),
PinnedBody::Stream { .. } => false,
}
}
fn size_hint(&self) -> http_body::SizeHint {
match self {
PinnedBody::Empty => http_body::SizeHint::with_exact(0),
PinnedBody::Full { data } => match data {
Some(d) => http_body::SizeHint::with_exact(d.len() as u64),
None => http_body::SizeHint::with_exact(0),
},
PinnedBody::Stream { .. } => http_body::SizeHint::default(),
}
}
}