use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
use futures::future::ok;
use futures::stream::{once, Stream, StreamExt};
use tokio::io::{self, AsyncRead, ReadBuf};
const DEFAULT_CHUNK_SIZE: usize = 4096;
pub enum Body {
Empty,
Once(Bytes),
Stream(Segment),
}
#[derive(Default)]
pub struct Segment(Option<Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Sync + Send + 'static>>>);
impl Body {
#[inline]
pub fn empty() -> Self {
Body::Empty
}
#[inline]
pub fn once(bytes: impl Into<Bytes>) -> Self {
Body::Once(bytes.into())
}
#[inline]
pub fn stream<S>(stream: S) -> Self
where
S: Stream<Item = io::Result<Bytes>> + Sync + Send + 'static,
{
Body::Stream(Segment::new(stream))
}
#[inline]
pub fn write_stream(
&mut self,
stream: impl Stream<Item = io::Result<Bytes>> + Sync + Send + 'static,
) -> &mut Self {
match self {
Body::Empty => {
*self = Self::stream(stream);
}
Body::Once(bytes) => {
let stream = once(ok(mem::take(bytes))).chain(stream);
*self = Self::stream(stream);
}
Body::Stream(segment) => {
*self = Self::stream(mem::take(segment).chain(stream));
}
}
self
}
#[inline]
pub fn write_reader(
&mut self,
reader: impl AsyncRead + Sync + Send + Unpin + 'static,
) -> &mut Self {
self.write_chunk(reader, DEFAULT_CHUNK_SIZE)
}
#[inline]
pub fn write_chunk(
&mut self,
reader: impl AsyncRead + Sync + Send + Unpin + 'static,
chunk_size: usize,
) -> &mut Self {
self.write_stream(ReaderStream::new(reader, chunk_size))
}
#[inline]
pub fn write(&mut self, data: impl Into<Bytes>) -> &mut Self {
match self {
Body::Empty => {
*self = Self::once(data.into());
self
}
body => body.write_stream(once(ok(data.into()))),
}
}
}
impl Segment {
#[inline]
fn new(stream: impl Stream<Item = io::Result<Bytes>> + Sync + Send + 'static) -> Self {
Self(Some(Box::pin(stream)))
}
}
impl From<Body> for hyper::Body {
#[inline]
fn from(body: Body) -> Self {
match body {
Body::Empty => hyper::Body::empty(),
Body::Once(bytes) => hyper::Body::from(bytes),
Body::Stream(stream) => hyper::Body::wrap_stream(stream),
}
}
}
impl Default for Body {
#[inline]
fn default() -> Self {
Self::empty()
}
}
pub struct ReaderStream<R> {
chunk_size: usize,
reader: R,
}
impl<R> ReaderStream<R> {
#[inline]
fn new(reader: R, chunk_size: usize) -> Self {
Self { reader, chunk_size }
}
}
impl<R> Stream for ReaderStream<R>
where
R: AsyncRead + Unpin,
{
type Item = io::Result<Bytes>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let chunk_size = self.chunk_size;
let mut chunk = BytesMut::with_capacity(chunk_size);
unsafe { chunk.set_len(chunk_size) };
let mut buf = ReadBuf::new(&mut *chunk);
futures::ready!(Pin::new(&mut self.reader).poll_read(cx, &mut buf))?;
let filled_len = buf.filled().len();
if filled_len == 0 {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(chunk.freeze().slice(0..filled_len))))
}
}
}
impl Stream for Body {
type Item = io::Result<Bytes>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut *self {
Body::Empty => Poll::Ready(None),
Body::Once(bytes) => {
let data = mem::take(bytes);
*self = Body::empty();
Poll::Ready(Some(Ok(data)))
}
Body::Stream(stream) => Pin::new(stream).poll_next(cx),
}
}
}
impl Stream for Segment {
type Item = io::Result<Bytes>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.0 {
None => Poll::Ready(None),
Some(ref mut stream) => stream.as_mut().poll_next(cx),
}
}
}
#[cfg(test)]
mod tests {
use std::io;
use futures::{AsyncReadExt, TryStreamExt};
use tokio::fs::File;
use super::Body;
async fn read_body(body: Body) -> io::Result<String> {
let mut data = String::new();
body.into_async_read().read_to_string(&mut data).await?;
Ok(data)
}
#[tokio::test]
async fn body_empty() -> std::io::Result<()> {
let body = Body::default();
assert_eq!("", read_body(body).await?);
Ok(())
}
#[tokio::test]
async fn body_single() -> std::io::Result<()> {
let mut body = Body::default();
body.write("Hello, World");
assert_eq!("Hello, World", read_body(body).await?);
Ok(())
}
#[tokio::test]
async fn body_multiple() -> std::io::Result<()> {
let mut body = Body::default();
body.write("He").write("llo, ").write("World");
assert_eq!("Hello, World", read_body(body).await?);
Ok(())
}
#[tokio::test]
async fn body_composed() -> std::io::Result<()> {
let mut body = Body::empty();
body.write("He")
.write("llo, ")
.write_reader(File::open("../assets/author.txt").await?)
.write_reader(File::open("../assets/author.txt").await?)
.write(".");
assert_eq!("Hello, HexileeHexilee.", read_body(body).await?);
Ok(())
}
}