use crate::{Error, Status};
use bytes::{Buf, Bytes};
use http_body::Body as HttpBody;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
pub trait Body: sealed::Sealed + Send + Sync {
type Data: Buf;
type Error: Into<Error>;
fn is_end_stream(&self) -> bool;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>>;
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>>;
}
impl<T> Body for T
where
T: HttpBody + Send + Sync + 'static,
T::Error: Into<Error>,
{
type Data = T::Data;
type Error = T::Error;
fn is_end_stream(&self) -> bool {
HttpBody::is_end_stream(self)
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
HttpBody::poll_data(self, cx)
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
HttpBody::poll_trailers(self, cx)
}
}
impl<T> sealed::Sealed for T
where
T: HttpBody,
T::Error: Into<Error>,
{
}
mod sealed {
pub trait Sealed {}
}
pub struct BoxBody {
inner: Pin<Box<dyn Body<Data = Bytes, Error = Status> + Send + Sync + 'static>>,
}
struct MapBody<B>(B);
impl BoxBody {
pub fn new<B>(inner: B) -> Self
where
B: Body<Data = Bytes, Error = Status> + Send + Sync + 'static,
{
BoxBody {
inner: Box::pin(inner),
}
}
pub fn map_from<B>(inner: B) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
{
BoxBody {
inner: Box::pin(MapBody(inner)),
}
}
pub fn empty() -> Self {
BoxBody {
inner: Box::pin(EmptyBody::default()),
}
}
}
impl HttpBody for BoxBody {
type Data = Bytes;
type Error = Status;
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Body::poll_data(self.inner.as_mut(), cx)
}
fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Body::poll_trailers(self.inner.as_mut(), cx)
}
}
impl<B> HttpBody for MapBody<B>
where
B: Body,
B::Error: Into<crate::Error>,
{
type Data = Bytes;
type Error = Status;
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let v = unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut me.0).poll_data(cx)
};
match futures_util::ready!(v) {
Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.to_bytes()))),
Some(Err(e)) => {
let err = Status::map_error(e.into());
Poll::Ready(Some(Err(err)))
}
None => Poll::Ready(None),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let v = unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut me.0).poll_trailers(cx)
};
let v = futures_util::ready!(v).map_err(|e| Status::from_error(&*e.into()));
Poll::Ready(v)
}
}
impl fmt::Debug for BoxBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BoxBody").finish()
}
}
#[derive(Debug, Default)]
struct EmptyBody {
_p: (),
}
impl HttpBody for EmptyBody {
type Data = Bytes;
type Error = Status;
fn is_end_stream(&self) -> bool {
true
}
fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(None)
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}