flussab/
deferred_reader.rs

1use std::io::{self, BufReader, Cursor, Read};
2
3/// A buffered reader optimized for efficient parsing.
4///
5/// Like `std`'s [`BufReader`], this provides buffering to coalesce many small reads into fewer
6/// larger reads of the underlying data source. The difference is that `DeferredReader` is optimized
7/// for efficient parsing. This includes asynchronous handling of IO errors, position tracking, and
8/// dynamic contiguous look-ahead.
9pub struct DeferredReader<'a> {
10    read: Box<dyn Read + 'a>,
11    buf: Vec<u8>,
12    // SAFETY `buf[pos_in_buf..pos_in_buf+valid_len]` must _always_ be valid
13    pos_in_buf: usize,
14    valid_len: usize,
15    complete: bool,
16    io_error: Option<io::Error>,
17    pos_of_buf: usize,
18    mark_in_buf: usize,
19    chunk_size: usize,
20}
21
22impl<'a> DeferredReader<'a> {
23    const DEFAULT_CHUNK_SIZE: usize = 16 << 10;
24
25    /// Creates a [`DeferredReader`] for the data of a [`BufReader`].
26    pub fn from_buf_reader(buf_reader: BufReader<impl Read + 'a>) -> Self {
27        // Avoid double buffering without discarding any already buffered contents.
28        let buf_data = buf_reader.buffer().to_vec();
29        if buf_data.is_empty() {
30            Self::from_read(buf_reader.into_inner())
31        } else {
32            Self::from_read(Cursor::new(buf_data).chain(buf_reader.into_inner()))
33        }
34    }
35
36    /// Creates a [`DeferredReader`] for the data of a [`Read`] instance.
37    ///
38    /// If the [`Read`] instance is a [`BufReader`], it is better to use
39    /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
40    /// data.
41    pub fn from_read(read: impl Read + 'a) -> Self {
42        Self::from_boxed_dyn_read(Box::new(read))
43    }
44
45    /// Creates a [`DeferredReader`] for the data of a boxed [`Read`] instance.
46    ///
47    /// If the [`Read`] instance is a [`BufReader`], it is better to use
48    /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
49    /// data.
50    #[inline(never)]
51    pub fn from_boxed_dyn_read(read: Box<dyn Read + 'a>) -> Self {
52        DeferredReader {
53            read,
54            buf: vec![],
55            pos_in_buf: 0,
56            valid_len: 0,
57            complete: false,
58            io_error: None,
59            pos_of_buf: 0,
60            mark_in_buf: 0,
61            chunk_size: Self::DEFAULT_CHUNK_SIZE,
62        }
63    }
64
65    /// Sets the number of bytes that are read at once.
66    ///
67    /// This sets the size of the [`read`][Read::read] requests made. Note that this is just an
68    /// upper bound. Depending on the [`Read`] implementation, smaller amounts may be read at once.
69    /// To enable interactive line based input, `DeferredReader` on its own will not issue more read
70    /// requests than necessary.
71    pub fn set_chunk_size(&mut self, size: usize) {
72        self.chunk_size = size;
73    }
74
75    /// Returns the currently buffered data in front of the cursor.
76    ///
77    /// You can call [`is_complete`][Self::is_complete] to check whether the returned data contains
78    /// all remaining input data.
79    #[inline]
80    pub fn buf(&self) -> &[u8] {
81        unsafe {
82            // SAFETY `self.pos_in_buf..self.pos_in_buf + self.valid_len` are always kept within
83            // range
84            debug_assert!(self
85                .buf
86                .get(self.pos_in_buf..self.pos_in_buf + self.valid_len)
87                .is_some());
88            self.buf
89                .get_unchecked(self.pos_in_buf..self.pos_in_buf + self.valid_len)
90        }
91    }
92
93    /// Returns the length of the currently buffered data.
94    ///
95    /// This returns the same value as `reader.buf().len()` but unlike [`reader.buf()`][Self::buf]
96    /// this does not create an intermediate reference to the buffered data. This can make a
97    /// difference in safety when raw pointers are used to access the buffered data.
98    #[inline]
99    pub fn buf_len(&self) -> usize {
100        self.valid_len
101    }
102
103    /// Returns a pointer to the currently buffered data.
104    ///
105    /// This returns the same value as `reader.buf().as_ptr()` but unlike
106    /// [`reader.buf()`][Self::buf] this does not create an intermediate reference to the buffered
107    /// data. You can use [`reader.buf_len()`][Self::buf_len] to obtain the length of the buffered
108    /// data.
109    #[inline]
110    pub fn buf_ptr(&self) -> *const u8 {
111        unsafe {
112            // SAFETY `self.pos_in_buf` is always kept in range
113            self.buf.as_ptr().add(self.pos_in_buf)
114        }
115    }
116
117    /// Advances the cursor by a given number of already buffered bytes.
118    ///
119    /// This will panic if the number of bytes exceeds the amount of buffered data.
120    #[inline]
121    pub fn advance(&mut self, n: usize) {
122        let (next_len, overflow) = self.valid_len.overflowing_sub(n);
123        self.valid_len = next_len;
124        if overflow {
125            self.advance_cold();
126        }
127        self.pos_in_buf += n;
128        // SAFETY ^ we already subtracted n from len and checked for overflow so we cannot overflow
129        // the buffer here.
130    }
131
132    #[inline(never)]
133    #[cold]
134    fn advance_cold(&self) -> ! {
135        panic!("advanced past the current buffer size");
136    }
137
138    /// Advances the cursor by a given number of already buffered bytes, returning a reference to
139    /// those bytes.
140    ///
141    /// This will panic if the number of bytes exceeds the amount of buffered data.
142    #[inline]
143    pub fn advance_with_buf(&mut self, n: usize) -> &[u8] {
144        self.advance(n);
145        unsafe {
146            // SAFETY since we just called advance which did not panic, we know that
147            // `self.pos_in_buf` was just advanced by `n` bytes, pointing into a valid buffer before
148            // and after.
149            debug_assert!(self.buf.get(self.pos_in_buf - n..self.pos_in_buf).is_some());
150            self.buf.get_unchecked(self.pos_in_buf - n..self.pos_in_buf)
151        }
152    }
153
154    /// Advances the cursor by a given number of already buffered bytes without checking if
155    /// sufficient bytes are buffered.
156    ///
157    /// # Safety
158    ///
159    /// The passed value for `n` may not exceed the value returned by [`buf_len()`][Self::buf_len].
160    #[inline]
161    pub unsafe fn advance_unchecked(&mut self, n: usize) {
162        debug_assert!(self.valid_len >= n);
163        self.valid_len -= n;
164        self.pos_in_buf += n;
165    }
166
167    /// Total number of bytes the cursor was advanced so far.
168    ///
169    /// This wraps around every `usize::MAX` bytes.
170    #[inline]
171    pub fn position(&self) -> usize {
172        self.pos_of_buf.wrapping_add(self.pos_in_buf)
173    }
174
175    /// Returns currently marked position.
176    ///
177    /// Initially this is position `0`, but can be changed using [`set_mark`][Self::set_mark] and
178    /// [`set_mark_to_position`][Self::set_mark_to_position].
179    ///
180    /// Setting the mark to the start of a token before advancing over it can be useful for error
181    /// reporting.
182    #[inline]
183    pub fn mark(&self) -> usize {
184        self.pos_of_buf.wrapping_add(self.mark_in_buf)
185    }
186
187    /// Marks the current position.
188    ///
189    /// Calling this will make [`mark`](Self::mark) return the current position.
190    #[inline]
191    pub fn set_mark(&mut self) {
192        self.mark_in_buf = self.pos_in_buf
193    }
194
195    /// Sets the position returned by [`mark`](Self::mark).
196    #[inline]
197    pub fn set_mark_to_position(&mut self, position: usize) {
198        self.mark_in_buf = position.wrapping_sub(self.pos_of_buf)
199    }
200
201    /// Returns whether all remaining data is buffered.
202    ///
203    /// If this returns `true` [`buf`][Self::buf] will contain all the remaining data. This can
204    /// happen when the end was reached or when an IO error was encountered. You can use
205    /// [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
206    #[inline]
207    pub fn is_complete(&self) -> bool {
208        self.complete
209    }
210
211    /// Returns whether the cursor is at the end of the available data.
212    ///
213    /// This can be the end of the input or all data before an IO error was encountered. You can
214    /// use [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
215    #[inline]
216    pub fn is_at_end(&self) -> bool {
217        self.complete && (self.valid_len == 0)
218    }
219
220    /// Returns an encountered IO errors as `Err(io_err)`.
221    ///
222    /// This resets the stored IO error and returns `Ok(())` if no IO error is stored.
223    #[inline]
224    pub fn check_io_error(&mut self) -> io::Result<()> {
225        if let Some(err) = self.io_error.take() {
226            Err(err)
227        } else {
228            Ok(())
229        }
230    }
231
232    /// Returns a reference to an encountered IO error.
233    ///
234    /// This does not reset the stored IO error and erturns `None` if no IO error is stored.
235    #[inline]
236    pub fn io_error(&self) -> Option<&io::Error> {
237        self.io_error.as_ref()
238    }
239
240    /// Tries to extend the buffer by reading more data until it reaches the requested length.
241    ///
242    /// Returns a slice to _all_ of the buffered data, not only the requested amount.
243    ///
244    /// This fails when the end of the input is reached or an IO error occured before enough
245    /// data was read, in which case a smaller buffer than requested is returned.
246    #[inline]
247    pub fn request(&mut self, len: usize) -> &[u8] {
248        if self.valid_len < len {
249            self.request_cold(len);
250        }
251        self.buf()
252    }
253
254    #[cold]
255    #[inline(never)]
256    fn request_cold(&mut self, len: usize) {
257        while self.valid_len < len && self.request_more() {}
258    }
259
260    /// Tries to extend the buffer by reading more data until it contains at least one byte.
261    ///
262    /// Returns the next byte.
263    ///
264    /// This fails when the end of the input is reached or an IO error occured before enough
265    /// data was read, in which case `None` is returned.
266    #[inline]
267    pub fn request_byte(&mut self) -> Option<u8> {
268        self.request_byte_at_offset(0)
269    }
270
271    /// Tries to extend the buffer by reading more data until it contains at least the byte at the
272    /// given offset from the current position.
273    ///
274    /// Returns that byte.
275    ///
276    /// This fails when the end of the input is reached or an IO error occured before enough data
277    /// was read, in which case `None` is returned.
278    #[inline]
279    pub fn request_byte_at_offset(&mut self, offset: usize) -> Option<u8> {
280        if offset < self.valid_len {
281            unsafe {
282                // SAFETY within `pos_in_buf..pos_in_buf + valid_len`
283                Some(*self.buf.get_unchecked(self.pos_in_buf + offset))
284            }
285        } else {
286            self.request_byte_at_offset_cold(offset)
287        }
288    }
289
290    #[cold]
291    #[inline(never)]
292    fn request_byte_at_offset_cold(&mut self, offset: usize) -> Option<u8> {
293        while self.valid_len <= offset {
294            if !self.request_more() {
295                return None;
296            }
297        }
298        Some(self.buf[self.pos_in_buf + offset])
299    }
300
301    /// Tries to extend the buffer by reading more data.
302    #[cold]
303    #[inline(never)]
304    pub fn request_more(&mut self) -> bool {
305        if self.complete {
306            return false;
307        }
308
309        // Only realign if we have advanced over sufficiently more data to
310        let realign = self.pos_in_buf > self.chunk_size * 2;
311
312        if realign {
313            self.buf
314                .copy_within(self.pos_in_buf..self.pos_in_buf + self.valid_len, 0);
315            self.pos_of_buf = self.pos_of_buf.wrapping_add(self.pos_in_buf);
316            self.pos_in_buf = 0;
317            self.mark_in_buf = self.mark_in_buf.wrapping_sub(self.pos_in_buf);
318
319            // If our buffer is four times as large as it needs to be for the current data and an
320            // additional chunk, shrink it.
321            if self.buf.len() > 4 * (self.pos_in_buf + self.valid_len + self.chunk_size) {
322                self.buf.truncate(self.buf.len() / 2);
323                self.buf.shrink_to_fit();
324            }
325        }
326
327        let target_end = self.pos_in_buf + self.valid_len + self.chunk_size;
328
329        // Make sure we have enough buffer space for another chunk
330        if self.buf.len() < target_end {
331            self.buf.resize(target_end, 0);
332        }
333
334        // Do only a single successful read (to make line buffered repls usable), but do retry on
335        // `Interrupted`.
336        loop {
337            match self
338                .read
339                .read(&mut self.buf[self.pos_in_buf + self.valid_len..target_end])
340            {
341                Ok(0) => self.complete = true,
342                Ok(n) => {
343                    // SAFETY this assert is load bearing, as `self.valid_len` is trusted but Read
344                    // implementations aren't
345                    assert!(
346                        n <= self.chunk_size,
347                        "invariant of std::io::Read trait violated"
348                    );
349                    self.valid_len += n
350                }
351                Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
352                Err(err) => {
353                    self.io_error = Some(err);
354                    self.complete = true;
355                }
356            }
357            break;
358        }
359
360        true
361    }
362}