use crate::executor;
use core::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, io, mem};
use bytes::BytesMut;
use http_body_util::{BodyExt, Full};
use hyper::body::Frame;
pub use hyper::body::Bytes;
pub type Request = hyper::Request<Body>;
pub type Response = hyper::Response<Body>;
pub type ResponseBuilder = hyper::http::response::Builder;
pub struct Body(pub(crate) BoxBody);
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, io::Error>;
impl Body {
pub fn new(data: impl Into<Bytes>) -> Body {
Body(BoxBody::new(
Full::new(data.into()).map_err(|err| match err {}),
))
}
pub fn empty() -> Body {
Body(BoxBody::default())
}
pub fn wrap_reader<R>(reader: R) -> Body
where
R: io::Read + Send + 'static,
{
Body(BoxBody::new(ReaderBody::new(reader)))
}
pub fn reader(&mut self) -> BodyReader<'_> {
BodyReader {
body: self,
prev_bytes: Bytes::new(),
}
}
}
impl<T> From<T> for Body
where
Bytes: From<T>,
{
fn from(data: T) -> Body {
Body::new(data)
}
}
impl Iterator for Body {
type Item = io::Result<Bytes>;
fn next(&mut self) -> Option<Self::Item> {
struct FrameFuture<'body>(Pin<&'body mut BoxBody>);
impl Future for FrameFuture<'_> {
type Output = Option<Result<Frame<Bytes>, io::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
hyper::body::Body::poll_frame(self.0.as_mut(), cx)
}
}
loop {
let result = executor::Parker::new().block_on(FrameFuture(Pin::new(&mut self.0)))?;
return match result {
Ok(frame) => match frame.into_data() {
Ok(bytes) => Some(Ok(bytes)),
Err(_) => continue,
},
Err(err) => Some(Err(err)),
};
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let size_hint = hyper::body::Body::size_hint(&self.0);
(
size_hint.lower() as _,
size_hint.upper().map(|size| size as _),
)
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Default for Body {
fn default() -> Self {
Self::empty()
}
}
impl hyper::body::Body for Body {
type Data = Bytes;
type Error = io::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}
}
pub struct BodyReader<'body> {
body: &'body mut Body,
prev_bytes: Bytes,
}
impl std::io::Read for BodyReader<'_> {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
loop {
if buf.is_empty() {
return Ok(written);
}
if !self.prev_bytes.is_empty() {
let chunk_size = cmp::min(buf.len(), self.prev_bytes.len());
let prev_bytes_start = self.prev_bytes.split_to(chunk_size);
buf[..chunk_size].copy_from_slice(&prev_bytes_start[..]);
buf = &mut buf[chunk_size..];
written += chunk_size;
continue;
}
if written != 0 {
return Ok(written);
}
debug_assert!(self.prev_bytes.is_empty());
debug_assert!(written == 0);
self.prev_bytes = if let Some(next) = self.body.next() {
next?
} else {
return Ok(written);
}
}
}
}
struct ReaderBody<R> {
reader: Option<R>,
buf: BytesMut,
}
impl<R> ReaderBody<R> {
fn new(reader: R) -> Self {
Self {
reader: Some(reader),
buf: BytesMut::zeroed(CHUNK),
}
}
}
const CHUNK: usize = 4096;
impl<R> Unpin for ReaderBody<R> {}
impl<R> hyper::body::Body for ReaderBody<R>
where
R: io::Read,
{
type Data = Bytes;
type Error = io::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let ReaderBody { reader, buf } = &mut *self;
let reader = match reader {
Some(reader) => reader,
None => return Poll::Ready(None),
};
if buf.capacity() == 0 {
buf.extend_from_slice(&[0; CHUNK]);
}
match reader.read(buf) {
Err(err) => Poll::Ready(Some(Err(err))),
Ok(0) => {
self.reader.take();
Poll::Ready(None)
}
Ok(n) => {
let remaining = buf.split_off(n);
let chunk = mem::replace(buf, remaining);
Poll::Ready(Some(Ok(Frame::data(Bytes::from(chunk)))))
}
}
}
}