1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
use std::io::{self, BufReader, Cursor, Read};

/// A buffered reader optimized for efficient parsing.
///
/// Like `std`'s [`BufReader`], this provides buffering to coalesce many small reads into fewer
/// larger reads of the underlying data source. The difference is that `ByteReader` is optimized for
/// efficient parsing. This includes asynchronous handling of IO errors, position tracking, and
/// dynamic contiguous look-ahead.
pub struct ByteReader<'a> {
    read: Box<dyn Read + 'a>,
    buf: Vec<u8>,
    // SAFETY `buf[pos_in_buf..pos_in_buf+valid_len]` must _always_ be valid
    pos_in_buf: usize,
    valid_len: usize,
    complete: bool,
    io_error: Option<io::Error>,
    pos_of_buf: usize,
    mark_in_buf: usize,
    chunk_size: usize,
}

impl<'a> ByteReader<'a> {
    const DEFAULT_CHUNK_SIZE: usize = 16 << 10;

    /// Creates a [`ByteReader`] for the data of a [`BufReader`].
    pub fn from_buf_reader(buf_reader: BufReader<impl Read + 'a>) -> Self {
        // Avoid double buffering without discarding any already buffered contents.
        let buf_data = buf_reader.buffer().to_vec();
        if buf_data.is_empty() {
            Self::from_read(buf_reader.into_inner())
        } else {
            Self::from_read(Cursor::new(buf_data).chain(buf_reader.into_inner()))
        }
    }

    /// Creates a [`ByteReader`] for the data of a [`Read`] instance.
    ///
    /// If the [`Read`] instance is a [`BufReader`], it is better to use
    /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
    /// data.
    pub fn from_read(read: impl Read + 'a) -> Self {
        Self::from_boxed_dyn_read(Box::new(read))
    }

    /// Creates a [`ByteReader`] for the data of a boxed [`Read`] instance.
    ///
    /// If the [`Read`] instance is a [`BufReader`], it is better to use
    /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
    /// data.
    #[inline(never)]
    pub fn from_boxed_dyn_read(read: Box<dyn Read + 'a>) -> Self {
        ByteReader {
            read,
            buf: vec![],
            pos_in_buf: 0,
            valid_len: 0,
            complete: false,
            io_error: None,
            pos_of_buf: 0,
            mark_in_buf: 0,
            chunk_size: Self::DEFAULT_CHUNK_SIZE,
        }
    }

    /// Sets the number of bytes that are read at once.
    ///
    /// This sets the size of the [`read`][Read::read] requests made. Note that this is just an
    /// upper bound. Depending on the [`Read`] implementation, smaller amounts may be read at once.
    /// To enable interactive line based input, `ByteReader` on its own will not issue more read
    /// requests than necessary.
    pub fn set_chunk_size(&mut self, size: usize) {
        self.chunk_size = size;
    }

    /// Returns the currently buffered data in front of the cursor.
    ///
    /// You can call [`is_complete`][Self::is_complete] to check whether the returned data contains
    /// all remaining input data.
    #[inline]
    pub fn buf(&self) -> &[u8] {
        unsafe {
            // SAFETY `self.pos_in_buf..self.pos_in_buf + self.valid_len` are always kept within
            // range
            debug_assert!(self
                .buf
                .get(self.pos_in_buf..self.pos_in_buf + self.valid_len)
                .is_some());
            self.buf
                .get_unchecked(self.pos_in_buf..self.pos_in_buf + self.valid_len)
        }
    }

    /// Returns the length of the currently buffered data.
    ///
    /// This returns the same value as `reader.buf().len()` but unlike [`reader.buf()`][Self::buf]
    /// this does not create an intermediate reference to the buffered data. This can make a
    /// difference in safety when raw pointers are used to access the buffered data.
    #[inline]
    pub fn buf_len(&self) -> usize {
        self.valid_len
    }

    /// Returns a pointer to the currently buffered data.
    ///
    /// This returns the same value as `reader.buf().as_ptr()` but unlike
    /// [`reader.buf()`][Self::buf] this does not create an intermediate reference to the buffered
    /// data. You can use [`reader.buf_len()`][Self::buf_len] to obtain the length of the buffered
    /// data.
    #[inline]
    pub fn buf_ptr(&self) -> *const u8 {
        unsafe {
            // SAFETY `self.pos_in_buf` is always kept in range
            self.buf.as_ptr().add(self.pos_in_buf)
        }
    }

    /// Advances the cursor by a given number of already buffered bytes.
    ///
    /// This will panic if the number of bytes exceeds the amount of buffered data.
    #[inline]
    pub fn advance(&mut self, n: usize) {
        let (next_len, overflow) = self.valid_len.overflowing_sub(n);
        self.valid_len = next_len;
        if overflow {
            self.advance_cold();
        }
        self.pos_in_buf += n;
        // SAFETY ^ we already subtracted n from len and checked for overflow so we cannot overflow
        // the buffer here.
    }

    #[inline(never)]
    #[cold]
    fn advance_cold(&mut self) -> ! {
        panic!("advanced past the current buffer size");
    }

    /// Advances the cursor by a given number of already buffered bytes without checking if
    /// sufficient bytes are buffered.
    ///
    /// # Safety
    ///
    /// The passed value for `n` may not exceed the value returned by [`buf_len()`][Self::buf_len].
    #[inline]
    pub unsafe fn advance_unchecked(&mut self, n: usize) {
        debug_assert!(self.valid_len >= n);
        self.valid_len -= n;
        self.pos_in_buf += n;
    }

    /// Total number of bytes the cursor was advanced so far.
    ///
    /// This wraps around every `usize::MAX` bytes.
    #[inline]
    pub fn position(&self) -> usize {
        self.pos_of_buf.wrapping_add(self.pos_in_buf)
    }

    /// Returns currently marked position.
    ///
    /// Initially this is position `0`, but can be changed using [`set_mark`][Self::set_mark] and
    /// [`set_mark_to_position`][Self::set_mark_to_position].
    ///
    /// Setting the mark to the start of a token before advancing over it can be useful for error
    /// reporting.
    #[inline]
    pub fn mark(&self) -> usize {
        self.pos_of_buf.wrapping_add(self.mark_in_buf)
    }

    /// Marks the current position.
    ///
    /// Calling this will make [`mark`](Self::mark) return the current position.
    #[inline]
    pub fn set_mark(&mut self) {
        self.mark_in_buf = self.pos_in_buf
    }

    /// Sets the position returned by [`mark`](Self::mark).
    #[inline]
    pub fn set_mark_to_position(&mut self, position: usize) {
        self.mark_in_buf = position.wrapping_sub(self.pos_of_buf)
    }

    /// Returns whether all remaining data is buffered.
    ///
    /// If this returns `true` [`buf`][Self::buf] will contain all the remaining data. This can
    /// happen when the end was reached or when an IO error was encountered. You can use
    /// [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
    #[inline]
    pub fn is_complete(&self) -> bool {
        self.complete
    }

    /// Returns whether the cursor is at the end of the available data.
    ///
    /// This can be the end of the input or all data before an IO error was encountered. You can
    /// use [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
    #[inline]
    pub fn is_at_end(&self) -> bool {
        self.complete && (self.valid_len == 0)
    }

    /// Returns an encountered IO errors as `Err(io_err)`.
    ///
    /// This resets the stored IO error and returns `Ok(())` if no IO error is stored.
    #[inline]
    pub fn check_io_error(&mut self) -> io::Result<()> {
        if let Some(err) = self.io_error.take() {
            Err(err)
        } else {
            Ok(())
        }
    }

    /// Returns a reference to an encountered IO error.
    ///
    /// This does not reset the stored IO error and erturns `None` if no IO error is stored.
    #[inline]
    pub fn io_error(&self) -> Option<&io::Error> {
        self.io_error.as_ref()
    }

    /// Tries to extend the buffer by reading more data until it reaches the requested length.
    ///
    /// Returns a slice to _all_ of the buffered data, not only the requested amount.
    ///
    /// This fails when the end of the input is reached or an IO error occured before enough
    /// data was read, in which case a smaller buffer than requested is returned.
    #[inline]
    pub fn request(&mut self, len: usize) -> &[u8] {
        if self.valid_len < len {
            self.request_cold(len);
        }
        self.buf()
    }

    #[cold]
    #[inline(never)]
    fn request_cold(&mut self, len: usize) {
        while self.valid_len < len && self.request_more() {}
    }

    /// Tries to extend the buffer by reading more data until it contains at least one byte.
    ///
    /// Returns the next byte.
    ///
    /// This fails when the end of the input is reached or an IO error occured before enough
    /// data was read, in which case `None` is returned.
    #[inline]
    pub fn request_byte(&mut self) -> Option<u8> {
        self.request_byte_at_offset(0)
    }

    /// Tries to extend the buffer by reading more data until it contains at least the byte at the
    /// given offset from the current position.
    ///
    /// Returns that byte.
    ///
    /// This fails when the end of the input is reached or an IO error occured before enough data
    /// was read, in which case `None` is returned.
    #[inline]
    pub fn request_byte_at_offset(&mut self, offset: usize) -> Option<u8> {
        if offset < self.valid_len {
            unsafe {
                // SAFETY within `pos_in_buf..pos_in_buf + valid_len`
                Some(*self.buf.get_unchecked(self.pos_in_buf + offset))
            }
        } else {
            self.request_byte_at_offset_cold(offset)
        }
    }

    #[cold]
    #[inline(never)]
    fn request_byte_at_offset_cold(&mut self, offset: usize) -> Option<u8> {
        while self.valid_len <= offset {
            if !self.request_more() {
                return None;
            }
        }
        Some(self.buf[self.pos_in_buf + offset])
    }

    /// Tries to extend the buffer by reading more data.
    #[cold]
    #[inline(never)]
    pub fn request_more(&mut self) -> bool {
        if self.complete {
            return false;
        }

        // Only realign if we have advanced over sufficiently more data to
        let realign = self.pos_in_buf > self.chunk_size * 2;

        if realign {
            self.buf
                .copy_within(self.pos_in_buf..self.pos_in_buf + self.valid_len, 0);
            self.pos_of_buf = self.pos_of_buf.wrapping_add(self.pos_in_buf);
            self.pos_in_buf = 0;
            self.mark_in_buf = self.mark_in_buf.wrapping_sub(self.pos_in_buf);

            // If our buffer is four times as large as it needs to be for the current data and an
            // additional chunk, shrink it.
            if self.buf.len() > 4 * (self.pos_in_buf + self.valid_len + self.chunk_size) {
                self.buf.truncate(self.buf.len() / 2);
                self.buf.shrink_to_fit();
            }
        }

        let target_end = self.pos_in_buf + self.valid_len + self.chunk_size;

        // Make sure we have enough buffer space for another chunk
        if self.buf.len() < target_end {
            self.buf.resize(target_end, 0);
        }

        // Do only a single successful read (to make line buffered repls usable), but do retry on
        // `Interrupted`.
        loop {
            match self
                .read
                .read(&mut self.buf[self.pos_in_buf + self.valid_len..target_end])
            {
                Ok(0) => self.complete = true,
                Ok(n) => {
                    // SAFETY this assert is load bearing, as `self.valid_len` is trusted but Read
                    // implementations aren't
                    assert!(
                        n <= self.chunk_size,
                        "invariant of std::io::Read trait violated"
                    );
                    self.valid_len += n
                }
                Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
                Err(err) => {
                    self.io_error = Some(err);
                    self.complete = true;
                }
            }
            break;
        }

        true
    }
}