async_hal/io/buf_reader.rs
1use super::{AsyncBufRead, AsyncRead};
2use core::{
3 pin::Pin,
4 task::{Context, Poll},
5};
6use futures::ready;
7use pin_project_lite::pin_project;
8
9pin_project! {
10 /// The `BufReader` struct adds buffering to any reader.
11 ///
12 /// It can be excessively inefficient to work directly with a [`AsyncRead`]
13 /// instance. A `BufReader` performs large, infrequent reads on the underlying
14 /// [`AsyncRead`] and maintains an in-memory buffer of the results.
15 ///
16 /// `BufReader` can improve the speed of programs that make *small* and
17 /// *repeated* read calls to the same file or network socket. It does not
18 /// help when reading very large amounts at once, or reading just one or a few
19 /// times. It also provides no advantage when reading from a source that is
20 /// already in memory, like a `Vec<u8>`.
21 ///
22 /// When the `BufReader` is dropped, the contents of its buffer will be
23 /// discarded. Creating multiple instances of a `BufReader` on the same
24 /// stream can cause data loss.
25 pub struct BufReader<'buf, R> {
26 #[pin]
27 inner: R,
28 buf: &'buf mut [u8],
29 pos: usize,
30 cap: usize,
31 }
32}
33
34impl<'buf, R: AsyncRead> BufReader<'buf, R> {
35 /// Creates a new `BufReader` with the specified buffer capacity.
36 pub fn new(buf: &'buf mut [u8], inner: R) -> Self {
37 Self {
38 inner,
39 buf,
40 pos: 0,
41 cap: 0,
42 }
43 }
44
45 /// Gets a reference to the underlying reader.
46 ///
47 /// It is inadvisable to directly read from the underlying reader.
48 pub fn get_ref(&self) -> &R {
49 &self.inner
50 }
51
52 /// Gets a mutable reference to the underlying reader.
53 ///
54 /// It is inadvisable to directly read from the underlying reader.
55 pub fn get_mut(&mut self) -> &mut R {
56 &mut self.inner
57 }
58
59 /// Gets a pinned mutable reference to the underlying reader.
60 ///
61 /// It is inadvisable to directly read from the underlying reader.
62 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
63 self.project().inner
64 }
65
66 /// Consumes this `BufReader`, returning the underlying reader.
67 ///
68 /// Note that any leftover data in the internal buffer is lost.
69 pub fn into_inner(self) -> R {
70 self.inner
71 }
72
73 /// Returns a reference to the internally buffered data.
74 ///
75 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
76 pub fn buffer(&self) -> &[u8] {
77 &self.buf[self.pos..self.cap]
78 }
79
80 /// Invalidates all data in the internal buffer.
81 #[inline]
82 fn discard_buffer(self: Pin<&mut Self>) {
83 let me = self.project();
84 *me.pos = 0;
85 *me.cap = 0;
86 }
87}
88
89impl<R: AsyncRead> AsyncRead for BufReader<'_, R> {
90 type Error = R::Error;
91
92 fn poll_read(
93 mut self: Pin<&mut Self>,
94 cx: &mut Context,
95 buf: &mut [u8],
96 ) -> Poll<Result<usize, R::Error>> {
97 // If we don't have any buffered data and we're doing a massive read
98 // (larger than our internal buffer), bypass our internal buffer
99 // entirely.
100 if self.pos == self.cap && buf.len() >= self.buf.len() {
101 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
102 self.discard_buffer();
103 return Poll::Ready(res);
104 }
105
106 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
107 let used = core::cmp::min(rem.len(), buf.len());
108 buf[..used].copy_from_slice(&rem[..used]);
109 self.consume(used);
110 Poll::Ready(Ok(used))
111 }
112}
113
114impl<R: AsyncRead> AsyncBufRead for BufReader<'_, R> {
115 fn poll_fill_buf(
116 self: Pin<&mut Self>,
117 cx: &mut Context<'_>,
118 ) -> Poll<Result<&[u8], Self::Error>> {
119 let me = self.project();
120
121 // If we've reached the end of our internal buffer then we need to fetch
122 // some more data from the underlying reader.
123 // Branch using `>=` instead of the more correct `==`
124 // to tell the compiler that the pos..cap slice is always valid.
125 if *me.pos >= *me.cap {
126 debug_assert!(*me.pos == *me.cap);
127
128 ready!(me.inner.poll_read(cx, me.buf))?;
129 *me.cap = me.buf.len();
130 *me.pos = 0;
131 }
132 Poll::Ready(Ok(&me.buf[*me.pos..*me.cap]))
133 }
134
135 fn consume(self: Pin<&mut Self>, amt: usize) {
136 let me = self.project();
137 *me.pos = core::cmp::min(*me.pos + amt, *me.cap);
138 }
139}