use core::{
convert::Infallible,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
use std::{borrow::Cow, error};
use futures_core::stream::{LocalBoxStream, Stream};
use pin_project_lite::pin_project;
use super::{
bytes::{Buf, Bytes, BytesMut},
error::BodyError,
};
pub const fn none_body_hint() -> (usize, Option<usize>) {
NONE_BODY_HINT
}
pub const NONE_BODY_HINT: (usize, Option<usize>) = (usize::MAX, Some(0));
pub const fn exact_body_hint(size: usize) -> (usize, Option<usize>) {
(size, Some(size))
}
#[derive(Default)]
pub enum RequestBody {
#[cfg(feature = "http1")]
H1(super::h1::RequestBody),
#[cfg(feature = "http2")]
H2(super::h2::RequestBody),
#[cfg(feature = "http3")]
H3(super::h3::RequestBody),
Unknown(BoxBody),
#[default]
None,
}
impl Stream for RequestBody {
type Item = Result<Bytes, BodyError>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.get_mut() {
#[cfg(feature = "http1")]
Self::H1(body) => Pin::new(body).poll_next(cx).map_err(Into::into),
#[cfg(feature = "http2")]
Self::H2(body) => Pin::new(body).poll_next(cx),
#[cfg(feature = "http3")]
Self::H3(body) => Pin::new(body).poll_next(cx),
Self::Unknown(body) => Pin::new(body).poll_next(cx),
Self::None => Poll::Ready(None),
}
}
}
impl<B> From<NoneBody<B>> for RequestBody {
fn from(_: NoneBody<B>) -> Self {
Self::None
}
}
impl From<Bytes> for RequestBody {
fn from(bytes: Bytes) -> Self {
Self::from(Once::new(bytes))
}
}
impl From<Once<Bytes>> for RequestBody {
fn from(once: Once<Bytes>) -> Self {
Self::from(BoxBody::new(once))
}
}
impl From<BoxBody> for RequestBody {
fn from(body: BoxBody) -> Self {
Self::Unknown(body)
}
}
macro_rules! req_bytes_impl {
($ty: ty) => {
impl From<$ty> for RequestBody {
fn from(item: $ty) -> Self {
Self::from(Bytes::from(item))
}
}
};
}
req_bytes_impl!(&'static [u8]);
req_bytes_impl!(Box<[u8]>);
req_bytes_impl!(Vec<u8>);
req_bytes_impl!(String);
pub struct NoneBody<B>(PhantomData<fn(B)>);
impl<B> Default for NoneBody<B> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<B> Stream for NoneBody<B> {
type Item = Result<B, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unreachable!("NoneBody must not be polled. See NoneBody for detail")
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
none_body_hint()
}
}
#[derive(Default)]
pub struct Once<B>(Option<B>);
impl<B> Once<B>
where
B: Buf + Unpin,
{
#[inline]
pub const fn new(body: B) -> Self {
Self(Some(body))
}
}
impl<B> From<B> for Once<B>
where
B: Buf + Unpin,
{
fn from(b: B) -> Self {
Self::new(b)
}
}
impl<B> Stream for Once<B>
where
B: Buf + Unpin,
{
type Item = Result<B, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(mem::replace(self.get_mut(), Self(None)).0.map(Ok))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0
.as_ref()
.map(|b| exact_body_hint(b.remaining()))
.expect("Once must check size_hint before it got polled")
}
}
pin_project! {
pub struct Either<L, R> {
#[pin]
inner: EitherInner<L, R>
}
}
pin_project! {
#[project = EitherProj]
enum EitherInner<L, R> {
L {
#[pin]
inner: L
},
R {
#[pin]
inner: R
}
}
}
impl<L, R> Either<L, R> {
#[inline]
pub const fn left(inner: L) -> Self {
Self {
inner: EitherInner::L { inner },
}
}
#[inline]
pub const fn right(inner: R) -> Self {
Self {
inner: EitherInner::R { inner },
}
}
}
impl<L, R, T, E, E2> Stream for Either<L, R>
where
L: Stream<Item = Result<T, E>>,
R: Stream<Item = Result<T, E2>>,
E2: From<E>,
{
type Item = Result<T, E2>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().inner.project() {
EitherProj::L { inner } => inner.poll_next(cx).map(|res| res.map(|res| res.map_err(Into::into))),
EitherProj::R { inner } => inner.poll_next(cx),
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
match self.inner {
EitherInner::L { ref inner } => inner.size_hint(),
EitherInner::R { ref inner } => inner.size_hint(),
}
}
}
pub struct BoxBody(LocalBoxStream<'static, Result<Bytes, BodyError>>);
impl Default for BoxBody {
fn default() -> Self {
Self::new(NoneBody::<Bytes>::default())
}
}
impl BoxBody {
#[inline]
pub fn new<B, T, E>(body: B) -> Self
where
B: Stream<Item = Result<T, E>> + 'static,
T: Into<Bytes>,
E: Into<BodyError>,
{
pin_project! {
struct MapStream<B> {
#[pin]
body: B
}
}
impl<B, T, E> Stream for MapStream<B>
where
B: Stream<Item = Result<T, E>>,
T: Into<Bytes>,
E: Into<BodyError>,
{
type Item = Result<Bytes, BodyError>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().body.poll_next(cx).map_ok(Into::into).map_err(Into::into)
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.body.size_hint()
}
}
Self(Box::pin(MapStream { body }))
}
}
impl Stream for BoxBody {
type Item = Result<Bytes, BodyError>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().0.as_mut().poll_next(cx)
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}
pin_project! {
pub struct ResponseBody<B = BoxBody> {
#[pin]
inner: ResponseBodyInner<B>
}
}
pin_project! {
#[project = ResponseBodyProj]
#[project_replace = ResponseBodyProjReplace]
enum ResponseBodyInner<B> {
None,
Bytes {
bytes: Bytes,
},
Stream {
#[pin]
stream: B,
},
}
}
impl<B> Default for ResponseBody<B> {
fn default() -> Self {
Self::none()
}
}
impl ResponseBody {
#[inline]
pub fn box_stream<B, T, E>(stream: B) -> Self
where
B: Stream<Item = Result<T, E>> + 'static,
T: Into<Bytes>,
E: Into<BodyError>,
{
Self::stream(BoxBody::new(stream))
}
}
impl<B> ResponseBody<B> {
#[inline]
pub const fn none() -> Self {
Self {
inner: ResponseBodyInner::None,
}
}
#[inline]
pub const fn empty() -> Self {
Self {
inner: ResponseBodyInner::Bytes { bytes: Bytes::new() },
}
}
#[inline]
pub const fn stream(stream: B) -> Self {
Self {
inner: ResponseBodyInner::Stream { stream },
}
}
#[inline]
pub fn bytes<B2>(bytes: B2) -> Self
where
Bytes: From<B2>,
{
Self {
inner: ResponseBodyInner::Bytes {
bytes: Bytes::from(bytes),
},
}
}
#[inline]
pub fn into_boxed<T, E>(self) -> ResponseBody
where
B: Stream<Item = Result<T, E>> + 'static,
T: Into<Bytes>,
E: error::Error + Send + Sync + 'static,
{
match self.inner {
ResponseBodyInner::None => ResponseBody::none(),
ResponseBodyInner::Bytes { bytes } => ResponseBody::bytes(bytes),
ResponseBodyInner::Stream { stream } => ResponseBody::box_stream(stream),
}
}
}
impl<B, E> Stream for ResponseBody<B>
where
B: Stream<Item = Result<Bytes, E>>,
{
type Item = Result<Bytes, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = self.project().inner;
match inner.as_mut().project() {
ResponseBodyProj::None => Poll::Ready(None),
ResponseBodyProj::Bytes { .. } => match inner.project_replace(ResponseBodyInner::None) {
ResponseBodyProjReplace::Bytes { bytes } => Poll::Ready(Some(Ok(bytes))),
_ => unreachable!(),
},
ResponseBodyProj::Stream { stream } => stream.poll_next(cx),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.inner {
ResponseBodyInner::None => none_body_hint(),
ResponseBodyInner::Bytes { ref bytes } => exact_body_hint(bytes.len()),
ResponseBodyInner::Stream { ref stream } => stream.size_hint(),
}
}
}
impl<B> From<NoneBody<B>> for ResponseBody {
fn from(_: NoneBody<B>) -> Self {
ResponseBody::none()
}
}
impl<B> From<Once<B>> for ResponseBody
where
B: Into<Bytes>,
{
fn from(once: Once<B>) -> Self {
ResponseBody::bytes(once.0.map(Into::into).unwrap_or_default())
}
}
impl From<BoxBody> for ResponseBody {
fn from(stream: BoxBody) -> Self {
Self::stream(stream)
}
}
macro_rules! res_bytes_impl {
($ty: ty) => {
impl<B> From<$ty> for ResponseBody<B> {
fn from(item: $ty) -> Self {
Self::bytes(item)
}
}
};
}
res_bytes_impl!(Bytes);
res_bytes_impl!(BytesMut);
res_bytes_impl!(&'static [u8]);
res_bytes_impl!(&'static str);
res_bytes_impl!(Box<[u8]>);
res_bytes_impl!(Vec<u8>);
res_bytes_impl!(String);
impl<B> From<Box<str>> for ResponseBody<B> {
fn from(str: Box<str>) -> Self {
Self::from(Box::<[u8]>::from(str))
}
}
impl<B> From<Cow<'static, str>> for ResponseBody<B> {
fn from(str: Cow<'static, str>) -> Self {
match str {
Cow::Owned(str) => Self::from(str),
Cow::Borrowed(str) => Self::from(str),
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum BodySize {
None,
Sized(usize),
Stream,
}
impl BodySize {
#[inline]
pub fn from_stream<S>(stream: &S) -> Self
where
S: Stream,
{
match stream.size_hint() {
NONE_BODY_HINT => Self::None,
(_, Some(size)) => Self::Sized(size),
(_, None) => Self::Stream,
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn stream_body_size_hint() {
let body = BoxBody::new(Once::new(Bytes::new()));
assert_eq!(BodySize::from_stream(&body), BodySize::Sized(0));
let body = BoxBody::new(NoneBody::<Bytes>::default());
assert_eq!(BodySize::from_stream(&body), BodySize::None);
}
}