use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use tokio::io::{AsyncRead, BufReader, ReadBuf};
use crate::enums::BufferChunkSize;
pub struct AsyncReadByteStream<R: AsyncRead + Unpin> {
reader: BufReader<R>,
eof: bool,
chunk_size: usize,
}
impl<R: AsyncRead + Unpin> AsyncReadByteStream<R> {
pub fn new(source: R, size: BufferChunkSize) -> Self {
let chunk_size = size.chunk_size();
Self {
reader: BufReader::with_capacity(chunk_size, source),
eof: false,
chunk_size,
}
}
}
impl<R: AsyncRead + Unpin> Stream for AsyncReadByteStream<R> {
type Item = Result<Vec<u8>, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
if me.eof {
return Poll::Ready(None);
}
let mut buf = vec![0u8; me.chunk_size];
let mut read_buf = ReadBuf::new(&mut buf);
match Pin::new(&mut me.reader).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let n = read_buf.filled().len();
if n == 0 {
me.eof = true;
Poll::Ready(None)
} else {
buf.truncate(n);
Poll::Ready(Some(Ok(buf)))
}
}
Poll::Ready(Err(e)) => {
me.eof = true;
Poll::Ready(Some(Err(e)))
}
Poll::Pending => Poll::Pending,
}
}
}
impl<R: AsyncRead + Unpin> AsyncRead for AsyncReadByteStream<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
Pin::new(&mut me.reader).poll_read(cx, buf)
}
}