flussab/deferred_reader.rs
1use std::io::{self, BufReader, Cursor, Read};
2
3/// A buffered reader optimized for efficient parsing.
4///
5/// Like `std`'s [`BufReader`], this provides buffering to coalesce many small reads into fewer
6/// larger reads of the underlying data source. The difference is that `DeferredReader` is optimized
7/// for efficient parsing. This includes asynchronous handling of IO errors, position tracking, and
8/// dynamic contiguous look-ahead.
9pub struct DeferredReader<'a> {
10 read: Box<dyn Read + 'a>,
11 buf: Vec<u8>,
12 // SAFETY `buf[pos_in_buf..pos_in_buf+valid_len]` must _always_ be valid
13 pos_in_buf: usize,
14 valid_len: usize,
15 complete: bool,
16 io_error: Option<io::Error>,
17 pos_of_buf: usize,
18 mark_in_buf: usize,
19 chunk_size: usize,
20}
21
22impl<'a> DeferredReader<'a> {
23 const DEFAULT_CHUNK_SIZE: usize = 16 << 10;
24
25 /// Creates a [`DeferredReader`] for the data of a [`BufReader`].
26 pub fn from_buf_reader(buf_reader: BufReader<impl Read + 'a>) -> Self {
27 // Avoid double buffering without discarding any already buffered contents.
28 let buf_data = buf_reader.buffer().to_vec();
29 if buf_data.is_empty() {
30 Self::from_read(buf_reader.into_inner())
31 } else {
32 Self::from_read(Cursor::new(buf_data).chain(buf_reader.into_inner()))
33 }
34 }
35
36 /// Creates a [`DeferredReader`] for the data of a [`Read`] instance.
37 ///
38 /// If the [`Read`] instance is a [`BufReader`], it is better to use
39 /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
40 /// data.
41 pub fn from_read(read: impl Read + 'a) -> Self {
42 Self::from_boxed_dyn_read(Box::new(read))
43 }
44
45 /// Creates a [`DeferredReader`] for the data of a boxed [`Read`] instance.
46 ///
47 /// If the [`Read`] instance is a [`BufReader`], it is better to use
48 /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
49 /// data.
50 #[inline(never)]
51 pub fn from_boxed_dyn_read(read: Box<dyn Read + 'a>) -> Self {
52 DeferredReader {
53 read,
54 buf: vec![],
55 pos_in_buf: 0,
56 valid_len: 0,
57 complete: false,
58 io_error: None,
59 pos_of_buf: 0,
60 mark_in_buf: 0,
61 chunk_size: Self::DEFAULT_CHUNK_SIZE,
62 }
63 }
64
65 /// Sets the number of bytes that are read at once.
66 ///
67 /// This sets the size of the [`read`][Read::read] requests made. Note that this is just an
68 /// upper bound. Depending on the [`Read`] implementation, smaller amounts may be read at once.
69 /// To enable interactive line based input, `DeferredReader` on its own will not issue more read
70 /// requests than necessary.
71 pub fn set_chunk_size(&mut self, size: usize) {
72 self.chunk_size = size;
73 }
74
75 /// Returns the currently buffered data in front of the cursor.
76 ///
77 /// You can call [`is_complete`][Self::is_complete] to check whether the returned data contains
78 /// all remaining input data.
79 #[inline]
80 pub fn buf(&self) -> &[u8] {
81 unsafe {
82 // SAFETY `self.pos_in_buf..self.pos_in_buf + self.valid_len` are always kept within
83 // range
84 debug_assert!(self
85 .buf
86 .get(self.pos_in_buf..self.pos_in_buf + self.valid_len)
87 .is_some());
88 self.buf
89 .get_unchecked(self.pos_in_buf..self.pos_in_buf + self.valid_len)
90 }
91 }
92
93 /// Returns the length of the currently buffered data.
94 ///
95 /// This returns the same value as `reader.buf().len()` but unlike [`reader.buf()`][Self::buf]
96 /// this does not create an intermediate reference to the buffered data. This can make a
97 /// difference in safety when raw pointers are used to access the buffered data.
98 #[inline]
99 pub fn buf_len(&self) -> usize {
100 self.valid_len
101 }
102
103 /// Returns a pointer to the currently buffered data.
104 ///
105 /// This returns the same value as `reader.buf().as_ptr()` but unlike
106 /// [`reader.buf()`][Self::buf] this does not create an intermediate reference to the buffered
107 /// data. You can use [`reader.buf_len()`][Self::buf_len] to obtain the length of the buffered
108 /// data.
109 #[inline]
110 pub fn buf_ptr(&self) -> *const u8 {
111 unsafe {
112 // SAFETY `self.pos_in_buf` is always kept in range
113 self.buf.as_ptr().add(self.pos_in_buf)
114 }
115 }
116
117 /// Advances the cursor by a given number of already buffered bytes.
118 ///
119 /// This will panic if the number of bytes exceeds the amount of buffered data.
120 #[inline]
121 pub fn advance(&mut self, n: usize) {
122 let (next_len, overflow) = self.valid_len.overflowing_sub(n);
123 self.valid_len = next_len;
124 if overflow {
125 self.advance_cold();
126 }
127 self.pos_in_buf += n;
128 // SAFETY ^ we already subtracted n from len and checked for overflow so we cannot overflow
129 // the buffer here.
130 }
131
132 #[inline(never)]
133 #[cold]
134 fn advance_cold(&self) -> ! {
135 panic!("advanced past the current buffer size");
136 }
137
138 /// Advances the cursor by a given number of already buffered bytes, returning a reference to
139 /// those bytes.
140 ///
141 /// This will panic if the number of bytes exceeds the amount of buffered data.
142 #[inline]
143 pub fn advance_with_buf(&mut self, n: usize) -> &[u8] {
144 self.advance(n);
145 unsafe {
146 // SAFETY since we just called advance which did not panic, we know that
147 // `self.pos_in_buf` was just advanced by `n` bytes, pointing into a valid buffer before
148 // and after.
149 debug_assert!(self.buf.get(self.pos_in_buf - n..self.pos_in_buf).is_some());
150 self.buf.get_unchecked(self.pos_in_buf - n..self.pos_in_buf)
151 }
152 }
153
154 /// Advances the cursor by a given number of already buffered bytes without checking if
155 /// sufficient bytes are buffered.
156 ///
157 /// # Safety
158 ///
159 /// The passed value for `n` may not exceed the value returned by [`buf_len()`][Self::buf_len].
160 #[inline]
161 pub unsafe fn advance_unchecked(&mut self, n: usize) {
162 debug_assert!(self.valid_len >= n);
163 self.valid_len -= n;
164 self.pos_in_buf += n;
165 }
166
167 /// Total number of bytes the cursor was advanced so far.
168 ///
169 /// This wraps around every `usize::MAX` bytes.
170 #[inline]
171 pub fn position(&self) -> usize {
172 self.pos_of_buf.wrapping_add(self.pos_in_buf)
173 }
174
175 /// Returns currently marked position.
176 ///
177 /// Initially this is position `0`, but can be changed using [`set_mark`][Self::set_mark] and
178 /// [`set_mark_to_position`][Self::set_mark_to_position].
179 ///
180 /// Setting the mark to the start of a token before advancing over it can be useful for error
181 /// reporting.
182 #[inline]
183 pub fn mark(&self) -> usize {
184 self.pos_of_buf.wrapping_add(self.mark_in_buf)
185 }
186
187 /// Marks the current position.
188 ///
189 /// Calling this will make [`mark`](Self::mark) return the current position.
190 #[inline]
191 pub fn set_mark(&mut self) {
192 self.mark_in_buf = self.pos_in_buf
193 }
194
195 /// Sets the position returned by [`mark`](Self::mark).
196 #[inline]
197 pub fn set_mark_to_position(&mut self, position: usize) {
198 self.mark_in_buf = position.wrapping_sub(self.pos_of_buf)
199 }
200
201 /// Returns whether all remaining data is buffered.
202 ///
203 /// If this returns `true` [`buf`][Self::buf] will contain all the remaining data. This can
204 /// happen when the end was reached or when an IO error was encountered. You can use
205 /// [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
206 #[inline]
207 pub fn is_complete(&self) -> bool {
208 self.complete
209 }
210
211 /// Returns whether the cursor is at the end of the available data.
212 ///
213 /// This can be the end of the input or all data before an IO error was encountered. You can
214 /// use [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
215 #[inline]
216 pub fn is_at_end(&self) -> bool {
217 self.complete && (self.valid_len == 0)
218 }
219
220 /// Returns an encountered IO errors as `Err(io_err)`.
221 ///
222 /// This resets the stored IO error and returns `Ok(())` if no IO error is stored.
223 #[inline]
224 pub fn check_io_error(&mut self) -> io::Result<()> {
225 if let Some(err) = self.io_error.take() {
226 Err(err)
227 } else {
228 Ok(())
229 }
230 }
231
232 /// Returns a reference to an encountered IO error.
233 ///
234 /// This does not reset the stored IO error and erturns `None` if no IO error is stored.
235 #[inline]
236 pub fn io_error(&self) -> Option<&io::Error> {
237 self.io_error.as_ref()
238 }
239
240 /// Tries to extend the buffer by reading more data until it reaches the requested length.
241 ///
242 /// Returns a slice to _all_ of the buffered data, not only the requested amount.
243 ///
244 /// This fails when the end of the input is reached or an IO error occured before enough
245 /// data was read, in which case a smaller buffer than requested is returned.
246 #[inline]
247 pub fn request(&mut self, len: usize) -> &[u8] {
248 if self.valid_len < len {
249 self.request_cold(len);
250 }
251 self.buf()
252 }
253
254 #[cold]
255 #[inline(never)]
256 fn request_cold(&mut self, len: usize) {
257 while self.valid_len < len && self.request_more() {}
258 }
259
260 /// Tries to extend the buffer by reading more data until it contains at least one byte.
261 ///
262 /// Returns the next byte.
263 ///
264 /// This fails when the end of the input is reached or an IO error occured before enough
265 /// data was read, in which case `None` is returned.
266 #[inline]
267 pub fn request_byte(&mut self) -> Option<u8> {
268 self.request_byte_at_offset(0)
269 }
270
271 /// Tries to extend the buffer by reading more data until it contains at least the byte at the
272 /// given offset from the current position.
273 ///
274 /// Returns that byte.
275 ///
276 /// This fails when the end of the input is reached or an IO error occured before enough data
277 /// was read, in which case `None` is returned.
278 #[inline]
279 pub fn request_byte_at_offset(&mut self, offset: usize) -> Option<u8> {
280 if offset < self.valid_len {
281 unsafe {
282 // SAFETY within `pos_in_buf..pos_in_buf + valid_len`
283 Some(*self.buf.get_unchecked(self.pos_in_buf + offset))
284 }
285 } else {
286 self.request_byte_at_offset_cold(offset)
287 }
288 }
289
290 #[cold]
291 #[inline(never)]
292 fn request_byte_at_offset_cold(&mut self, offset: usize) -> Option<u8> {
293 while self.valid_len <= offset {
294 if !self.request_more() {
295 return None;
296 }
297 }
298 Some(self.buf[self.pos_in_buf + offset])
299 }
300
301 /// Tries to extend the buffer by reading more data.
302 #[cold]
303 #[inline(never)]
304 pub fn request_more(&mut self) -> bool {
305 if self.complete {
306 return false;
307 }
308
309 // Only realign if we have advanced over sufficiently more data to
310 let realign = self.pos_in_buf > self.chunk_size * 2;
311
312 if realign {
313 self.buf
314 .copy_within(self.pos_in_buf..self.pos_in_buf + self.valid_len, 0);
315 self.pos_of_buf = self.pos_of_buf.wrapping_add(self.pos_in_buf);
316 self.pos_in_buf = 0;
317 self.mark_in_buf = self.mark_in_buf.wrapping_sub(self.pos_in_buf);
318
319 // If our buffer is four times as large as it needs to be for the current data and an
320 // additional chunk, shrink it.
321 if self.buf.len() > 4 * (self.pos_in_buf + self.valid_len + self.chunk_size) {
322 self.buf.truncate(self.buf.len() / 2);
323 self.buf.shrink_to_fit();
324 }
325 }
326
327 let target_end = self.pos_in_buf + self.valid_len + self.chunk_size;
328
329 // Make sure we have enough buffer space for another chunk
330 if self.buf.len() < target_end {
331 self.buf.resize(target_end, 0);
332 }
333
334 // Do only a single successful read (to make line buffered repls usable), but do retry on
335 // `Interrupted`.
336 loop {
337 match self
338 .read
339 .read(&mut self.buf[self.pos_in_buf + self.valid_len..target_end])
340 {
341 Ok(0) => self.complete = true,
342 Ok(n) => {
343 // SAFETY this assert is load bearing, as `self.valid_len` is trusted but Read
344 // implementations aren't
345 assert!(
346 n <= self.chunk_size,
347 "invariant of std::io::Read trait violated"
348 );
349 self.valid_len += n
350 }
351 Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
352 Err(err) => {
353 self.io_error = Some(err);
354 self.complete = true;
355 }
356 }
357 break;
358 }
359
360 true
361 }
362}