use std::future::Future;
use crate::{
buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut},
io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent},
BufResult,
};
pub struct BufReader<R> {
inner: R,
buf: Option<Box<[u8]>>,
pos: usize,
cap: usize,
}
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
impl<R> BufReader<R> {
#[inline]
pub fn new(inner: R) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
}
#[inline]
pub fn with_capacity(capacity: usize, inner: R) -> Self {
let buffer = vec![0; capacity];
Self {
inner,
buf: Some(buffer.into_boxed_slice()),
pos: 0,
cap: 0,
}
}
#[inline]
pub const fn get_ref(&self) -> &R {
&self.inner
}
#[inline]
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
#[inline]
pub fn into_inner(self) -> R {
self.inner
}
#[inline]
pub fn buffer(&self) -> &[u8] {
&self.buf.as_ref().expect("unable to take buffer")[self.pos..self.cap]
}
#[inline]
fn discard_buffer(&mut self) {
self.pos = 0;
self.cap = 0;
}
}
impl<R: AsyncReadRent> AsyncReadRent for BufReader<R> {
async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
let owned_buf = self.buf.as_ref().unwrap();
if self.pos == self.cap && buf.bytes_total() >= owned_buf.len() {
self.discard_buffer();
return self.inner.read(buf).await;
}
let rem = match self.fill_buf().await {
Ok(slice) => slice,
Err(e) => {
return (Err(e), buf);
}
};
let amt = std::cmp::min(rem.len(), buf.bytes_total());
unsafe {
buf.write_ptr().copy_from_nonoverlapping(rem.as_ptr(), amt);
buf.set_init(amt);
}
self.consume(amt);
(Ok(amt), buf)
}
async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
let slice = match IoVecWrapperMut::new(buf) {
Ok(slice) => slice,
Err(buf) => return (Ok(0), buf),
};
let (result, slice) = self.read(slice).await;
buf = slice.into_inner();
if let Ok(n) = result {
unsafe { buf.set_init(n) };
}
(result, buf)
}
}
impl<R: AsyncReadRent> AsyncBufRead for BufReader<R> {
async fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
if self.pos == self.cap {
let buf = self
.buf
.take()
.expect("no buffer available, generated future must be awaited");
let (res, buf_) = self.inner.read(buf).await;
self.buf = Some(buf_);
match res {
Ok(n) => {
self.pos = 0;
self.cap = n;
return Ok(unsafe {
&(self.buf.as_ref().unwrap_unchecked().as_ref())[self.pos..self.cap]
});
}
Err(e) => {
return Err(e);
}
}
}
Ok(&(self
.buf
.as_ref()
.expect("no buffer available, generated future must be awaited")
.as_ref())[self.pos..self.cap])
}
fn consume(&mut self, amt: usize) {
self.pos = self.cap.min(self.pos + amt);
}
}
impl<R: AsyncReadRent + AsyncWriteRent> AsyncWriteRent for BufReader<R> {
#[inline]
fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
self.inner.write(buf)
}
#[inline]
fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
self.inner.writev(buf_vec)
}
#[inline]
fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
self.inner.flush()
}
#[inline]
fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
self.inner.shutdown()
}
}