use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
use futures::stream::Stream;
use pin_project_lite::pin_project;
use tokio::io::AsyncRead;
pin_project! {
#[derive(Debug)]
pub struct ReaderStream<R> {
#[pin]
reader: R,
buf: BytesMut,
capacity: usize,
done: bool,
}
}
impl<R: AsyncRead> ReaderStream<R> {
pub fn with_capacity(reader: R, capacity: usize) -> Self {
ReaderStream {
reader,
buf: BytesMut::with_capacity(capacity),
capacity,
done: false,
}
}
}
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use tokio_util::io::poll_read_buf;
let mut this = self.as_mut().project();
if *this.done {
return Poll::Ready(None);
}
if this.buf.capacity() == 0 {
this.buf.reserve(*this.capacity);
}
let reader = this.reader;
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
*this.done = true;
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
*this.done = true;
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}
impl<R: AsyncRead> hyper::body::Body for ReaderStream<R> {
type Data = bytes::Bytes;
type Error = std::io::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
self.poll_next(cx).map_ok(hyper::body::Frame::data)
}
}