use async_channel::{bounded, Sender, Receiver};
use futures_core::{FusedStream, Stream};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use std::{
io::{self, BufRead, Cursor, Write},
pin::Pin,
task::{Context, Poll},
};
pub(crate) fn new(count: usize) -> (Reader, Writer) {
let (buf_pool_tx, buf_pool_rx) = bounded(count);
let (buf_stream_tx, buf_stream_rx) = bounded(count);
for _ in 0..count {
buf_pool_tx
.try_send(Cursor::new(Vec::new()))
.expect("buffer pool overflow");
}
let reader = Reader {
buf_pool_tx,
buf_stream_rx: Box::pin(buf_stream_rx),
chunk: None,
};
let writer = Writer {
buf_pool_rx: Box::pin(buf_pool_rx),
buf_stream_tx,
};
(reader, writer)
}
pub(crate) struct Reader {
buf_pool_tx: Sender<Cursor<Vec<u8>>>,
buf_stream_rx: Pin<Box<Receiver<Cursor<Vec<u8>>>>>,
chunk: Option<Cursor<Vec<u8>>>,
}
impl AsyncRead for Reader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.as_mut().poll_fill_buf(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(chunk) => {
let amt = buf.write(chunk)?;
self.consume(amt);
Poll::Ready(Ok(amt))
}
}
}
}
impl AsyncBufRead for Reader {
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
if let Some(chunk) = self.chunk.as_ref() {
if chunk.position() >= chunk.get_ref().len() as u64 {
let mut chunk = self.chunk.take().unwrap();
chunk.set_position(0);
chunk.get_mut().clear();
if let Err(e) = self.buf_pool_tx.try_send(chunk) {
if e.is_full() {
panic!("buffer pool overflow")
}
else if e.is_closed() {
}
else {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
}
}
}
if self.chunk.is_none() {
if self.buf_stream_rx.is_terminated() {
return Poll::Ready(Ok(&[]));
}
match self.buf_stream_rx.as_mut().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(Ok(&[])),
Poll::Ready(buf) => self.chunk = buf,
}
}
#[allow(unsafe_code)]
Poll::Ready(match unsafe { self.get_unchecked_mut().chunk.as_mut() } {
Some(chunk) => chunk.fill_buf(),
None => Ok(&[]),
})
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
if let Some(chunk) = self.chunk.as_mut() {
chunk.consume(amt);
}
}
}
impl Drop for Reader {
fn drop(&mut self) {
self.buf_stream_rx.close();
self.buf_pool_tx.close();
}
}
pub(crate) struct Writer {
buf_pool_rx: Pin<Box<Receiver<Cursor<Vec<u8>>>>>,
buf_stream_tx: Sender<Cursor<Vec<u8>>>,
}
impl AsyncWrite for Writer {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.buf_stream_tx.is_closed() {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
match self.buf_pool_rx.as_mut().poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
Poll::Ready(Some(mut chunk)) => {
chunk.get_mut().extend_from_slice(buf);
match self.buf_stream_tx.try_send(chunk) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => {
if e.is_full() {
panic!("buffer pool overflow")
} else {
Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
}
}
}
}
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.buf_stream_tx.close();
Poll::Ready(Ok(()))
}
}