use std::io::{IoSliceMut, Read as _};
use std::pin::Pin;
use std::{cmp, fmt};
use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer};
use crate::io::{self, SeekFrom};
use crate::task::{Context, Poll};
const DEFAULT_CAPACITY: usize = 8 * 1024;
pub struct BufReader<R> {
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
}
impl<R: AsyncRead> BufReader<R> {
pub fn new(inner: R) -> BufReader<R> {
BufReader::with_capacity(DEFAULT_CAPACITY, inner)
}
pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
unsafe {
let mut buffer = Vec::with_capacity(capacity);
buffer.set_len(capacity);
inner.initializer().initialize(&mut buffer);
BufReader {
inner,
buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
}
}
}
}
impl<R> BufReader<R> {
pin_utils::unsafe_pinned!(inner: R);
pin_utils::unsafe_unpinned!(pos: usize);
pin_utils::unsafe_unpinned!(cap: usize);
pub fn get_ref(&self) -> &R {
&self.inner
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
pub fn buffer(&self) -> &[u8] {
&self.buf[self.pos..self.cap]
}
pub fn into_inner(self) -> R {
self.inner
}
#[inline]
fn discard_buffer(mut self: Pin<&mut Self>) {
*self.as_mut().pos() = 0;
*self.cap() = 0;
}
}
impl<R: AsyncRead> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if self.pos == self.cap && buf.len() >= self.buf.len() {
let res = futures::ready!(self.as_mut().inner().poll_read(cx, buf));
self.discard_buffer();
return Poll::Ready(res);
}
let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read(buf)?;
self.consume(nread);
Poll::Ready(Ok(nread))
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() {
let res = futures::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs));
self.discard_buffer();
return Poll::Ready(res);
}
let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read_vectored(bufs)?;
self.consume(nread);
Poll::Ready(Ok(nread))
}
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
let Self {
inner,
buf,
cap,
pos,
} = unsafe { self.get_unchecked_mut() };
let mut inner = unsafe { Pin::new_unchecked(inner) };
if *pos >= *cap {
debug_assert!(*pos == *cap);
*cap = futures::ready!(inner.as_mut().poll_read(cx, buf))?;
*pos = 0;
}
Poll::Ready(Ok(&buf[*pos..*cap]))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
*self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
}
}
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")
.field("reader", &self.inner)
.field(
"buffer",
&format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
)
.finish()
}
}
impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let result: u64;
if let SeekFrom::Current(n) = pos {
let remainder = (self.cap - self.pos) as i64;
if let Some(offset) = n.checked_sub(remainder) {
result = futures::ready!(
self.as_mut()
.inner()
.poll_seek(cx, SeekFrom::Current(offset))
)?;
} else {
futures::ready!(
self.as_mut()
.inner()
.poll_seek(cx, SeekFrom::Current(-remainder))
)?;
self.as_mut().discard_buffer();
result =
futures::ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?;
}
} else {
result = futures::ready!(self.as_mut().inner().poll_seek(cx, pos))?;
}
self.discard_buffer();
Poll::Ready(Ok(result))
}
}