use super::{AsyncBufRead, AsyncRead, ReadBuf};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
const DEFAULT_BUF_CAPACITY: usize = 8192;
#[derive(Debug)]
pub struct BufReader<R> {
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
}
impl<R> BufReader<R> {
#[must_use]
pub fn new(inner: R) -> Self {
Self::with_capacity(DEFAULT_BUF_CAPACITY, inner)
}
#[must_use]
pub fn with_capacity(capacity: usize, inner: R) -> Self {
let capacity = capacity.max(1);
Self {
inner,
buf: vec![0u8; capacity].into_boxed_slice(),
pos: 0,
cap: 0,
}
}
#[must_use]
pub fn get_ref(&self) -> &R {
&self.inner
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
#[must_use]
pub fn into_inner(self) -> R {
self.inner
}
#[must_use]
pub fn buffer(&self) -> &[u8] {
&self.buf[self.pos..self.cap]
}
#[must_use]
pub fn capacity(&self) -> usize {
self.buf.len()
}
pub fn discard_buffer(&mut self) {
self.pos = 0;
self.cap = 0;
}
}
impl<R: AsyncRead + Unpin> AsyncRead for BufReader<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}
let this = self.get_mut();
if this.pos < this.cap {
let buffered = &this.buf[this.pos..this.cap];
let to_copy = std::cmp::min(buffered.len(), buf.remaining());
buf.put_slice(&buffered[..to_copy]);
this.pos += to_copy;
return Poll::Ready(Ok(()));
}
if buf.remaining() >= this.buf.len() {
return Pin::new(&mut this.inner).poll_read(cx, buf);
}
this.pos = 0;
this.cap = 0;
let mut read_buf = ReadBuf::new(&mut this.buf);
match Pin::new(&mut this.inner).poll_read(cx, &mut read_buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(())) => {
this.cap = read_buf.filled().len();
}
}
let to_copy = std::cmp::min(this.cap, buf.remaining());
buf.put_slice(&this.buf[..to_copy]);
this.pos = to_copy;
Poll::Ready(Ok(()))
}
}
impl<R: AsyncRead + Unpin> AsyncBufRead for BufReader<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.get_mut();
if this.pos >= this.cap {
this.pos = 0;
this.cap = 0;
let mut read_buf = ReadBuf::new(&mut this.buf);
match Pin::new(&mut this.inner).poll_read(cx, &mut read_buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(())) => {
this.cap = read_buf.filled().len();
}
}
}
Poll::Ready(Ok(&this.buf[this.pos..this.cap]))
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.pos = std::cmp::min(this.pos + amt, this.cap);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::task::Waker;
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn buf_reader_new() {
init_test("buf_reader_new");
let data: &[u8] = b"hello world";
let reader = BufReader::new(data);
let capacity = reader.capacity();
crate::assert_with_log!(
capacity == DEFAULT_BUF_CAPACITY,
"capacity",
DEFAULT_BUF_CAPACITY,
capacity
);
let empty = reader.buffer().is_empty();
crate::assert_with_log!(empty, "buffer empty", true, empty);
crate::test_complete!("buf_reader_new");
}
#[test]
fn buf_reader_with_capacity() {
init_test("buf_reader_with_capacity");
let data: &[u8] = b"test";
let reader = BufReader::with_capacity(256, data);
let capacity = reader.capacity();
crate::assert_with_log!(capacity == 256, "capacity", 256, capacity);
crate::test_complete!("buf_reader_with_capacity");
}
#[test]
fn buf_reader_get_ref() {
init_test("buf_reader_get_ref");
let data: &[u8] = b"hello";
let reader = BufReader::new(data);
let inner = *reader.get_ref();
crate::assert_with_log!(inner == b"hello", "get_ref", b"hello", inner);
crate::test_complete!("buf_reader_get_ref");
}
#[test]
fn buf_reader_into_inner() {
init_test("buf_reader_into_inner");
let data: &[u8] = b"hello";
let reader = BufReader::new(data);
let inner = reader.into_inner();
crate::assert_with_log!(inner == b"hello", "into_inner", b"hello", inner);
crate::test_complete!("buf_reader_into_inner");
}
#[test]
fn buf_reader_read_small() {
init_test("buf_reader_read_small");
let data: &[u8] = b"hello world";
let mut reader = BufReader::with_capacity(16, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut buf = [0u8; 5];
let mut read_buf = ReadBuf::new(&mut buf);
let poll = Pin::new(&mut reader).poll_read(&mut cx, &mut read_buf);
let ready = matches!(poll, Poll::Ready(Ok(())));
crate::assert_with_log!(ready, "poll ready", true, ready);
let filled = read_buf.filled();
crate::assert_with_log!(filled == b"hello", "filled", b"hello", filled);
let buffer = reader.buffer();
crate::assert_with_log!(buffer == b" world", "buffer", b" world", buffer);
crate::test_complete!("buf_reader_read_small");
}
#[test]
fn buf_reader_read_exact_buffer_size() {
init_test("buf_reader_read_exact_buffer_size");
let data: &[u8] = b"exactly sixteen!";
let mut reader = BufReader::with_capacity(16, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut buf = [0u8; 16];
let mut read_buf = ReadBuf::new(&mut buf);
let poll = Pin::new(&mut reader).poll_read(&mut cx, &mut read_buf);
let ready = matches!(poll, Poll::Ready(Ok(())));
crate::assert_with_log!(ready, "poll ready", true, ready);
let filled = read_buf.filled();
crate::assert_with_log!(
filled == b"exactly sixteen!",
"filled",
b"exactly sixteen!",
filled
);
crate::test_complete!("buf_reader_read_exact_buffer_size");
}
#[test]
fn buf_reader_large_read_bypasses_buffer() {
init_test("buf_reader_large_read_bypasses_buffer");
let data: &[u8] = b"large data that exceeds buffer capacity easily";
let mut reader = BufReader::with_capacity(8, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut buf = [0u8; 32];
let mut read_buf = ReadBuf::new(&mut buf);
let poll = Pin::new(&mut reader).poll_read(&mut cx, &mut read_buf);
let ready = matches!(poll, Poll::Ready(Ok(())));
crate::assert_with_log!(ready, "poll ready", true, ready);
let len = read_buf.filled().len();
let within = len <= 32;
crate::assert_with_log!(within, "len <= 32", true, within);
crate::test_complete!("buf_reader_large_read_bypasses_buffer");
}
#[test]
fn buf_reader_zero_capacity_is_clamped() {
init_test("buf_reader_zero_capacity_is_clamped");
let data: &[u8] = b"x";
let reader = BufReader::with_capacity(0, data);
let capacity = reader.capacity();
crate::assert_with_log!(capacity == 1, "capacity", 1, capacity);
crate::test_complete!("buf_reader_zero_capacity_is_clamped");
}
#[test]
fn buf_reader_zero_capacity_fill_buf_progresses() {
init_test("buf_reader_zero_capacity_fill_buf_progresses");
let data: &[u8] = b"xyz";
let mut reader = BufReader::with_capacity(0, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll = Pin::new(&mut reader).poll_fill_buf(&mut cx);
let filled = match poll {
Poll::Ready(Ok(bytes)) => bytes.to_vec(),
other => panic!("expected Poll::Ready(Ok(_)), got {other:?}"), };
crate::assert_with_log!(filled == b"x", "filled", b"x", filled);
crate::test_complete!("buf_reader_zero_capacity_fill_buf_progresses");
}
#[test]
fn buf_reader_poll_fill_buf() {
init_test("buf_reader_poll_fill_buf");
let data: &[u8] = b"buffered content";
let mut reader = BufReader::with_capacity(32, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll = Pin::new(&mut reader).poll_fill_buf(&mut cx);
let ready = matches!(&poll, Poll::Ready(Ok(_)));
crate::assert_with_log!(ready, "poll ready", true, ready);
if let Poll::Ready(Ok(buf)) = poll {
crate::assert_with_log!(
buf == b"buffered content",
"buffer",
b"buffered content",
buf
);
}
crate::test_complete!("buf_reader_poll_fill_buf");
}
#[test]
fn buf_reader_consume() {
init_test("buf_reader_consume");
let data: &[u8] = b"consume me";
let mut reader = BufReader::with_capacity(32, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut reader).poll_fill_buf(&mut cx);
let buffer = reader.buffer();
crate::assert_with_log!(buffer == b"consume me", "buffer", b"consume me", buffer);
Pin::new(&mut reader).consume(8);
let buffer = reader.buffer();
crate::assert_with_log!(buffer == b"me", "buffer after consume", b"me", buffer);
Pin::new(&mut reader).consume(2);
let empty = reader.buffer().is_empty();
crate::assert_with_log!(empty, "buffer empty", true, empty);
crate::test_complete!("buf_reader_consume");
}
#[test]
fn buf_reader_discard_buffer() {
init_test("buf_reader_discard_buffer");
let data: &[u8] = b"discard this";
let mut reader = BufReader::with_capacity(32, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut reader).poll_fill_buf(&mut cx);
let empty = reader.buffer().is_empty();
crate::assert_with_log!(!empty, "buffer not empty", false, empty);
reader.discard_buffer();
let empty = reader.buffer().is_empty();
crate::assert_with_log!(empty, "buffer empty", true, empty);
crate::test_complete!("buf_reader_discard_buffer");
}
#[test]
fn buf_reader_empty_source() {
init_test("buf_reader_empty_source");
let data: &[u8] = b"";
let mut reader = BufReader::new(data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll = Pin::new(&mut reader).poll_fill_buf(&mut cx);
let ready = matches!(poll, Poll::Ready(Ok(buf)) if buf.is_empty());
crate::assert_with_log!(ready, "empty buf ready", true, ready);
crate::test_complete!("buf_reader_empty_source");
}
#[test]
fn buf_reader_multiple_reads() {
init_test("buf_reader_multiple_reads");
let data: &[u8] = b"first second third";
let mut reader = BufReader::with_capacity(8, data);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut buf1 = [0u8; 6];
let mut read_buf1 = ReadBuf::new(&mut buf1);
let poll = Pin::new(&mut reader).poll_read(&mut cx, &mut read_buf1);
let ready = matches!(poll, Poll::Ready(Ok(())));
crate::assert_with_log!(ready, "poll ready 1", true, ready);
let filled1 = read_buf1.filled();
crate::assert_with_log!(filled1 == b"first ", "filled1", b"first ", filled1);
let mut buf2 = [0u8; 6];
let mut read_buf2 = ReadBuf::new(&mut buf2);
let poll = Pin::new(&mut reader).poll_read(&mut cx, &mut read_buf2);
let ready = matches!(poll, Poll::Ready(Ok(())));
crate::assert_with_log!(ready, "poll ready 2", true, ready);
let filled2 = read_buf2.filled();
crate::assert_with_log!(filled2 == b"se", "filled2", b"se", filled2);
let mut buf3 = [0u8; 10];
let mut read_buf3 = ReadBuf::new(&mut buf3);
let poll = Pin::new(&mut reader).poll_read(&mut cx, &mut read_buf3);
let ready = matches!(poll, Poll::Ready(Ok(())));
crate::assert_with_log!(ready, "poll ready 3", true, ready);
crate::test_complete!("buf_reader_multiple_reads");
}
}