flussab 0.3.0

Utilities for writing parsers
Documentation
use std::io::{self, BufReader, Cursor, Read};

/// A buffered reader optimized for efficient parsing.
///
/// Like `std`'s [`BufReader`], this provides buffering to coalesce many small reads into fewer
/// larger reads of the underlying data source. The difference is that `DeferredReader` is optimized
/// for efficient parsing. This includes asynchronous handling of IO errors, position tracking, and
/// dynamic contiguous look-ahead.
pub struct DeferredReader<'a> {
    read: Box<dyn Read + 'a>,
    buf: Vec<u8>,
    // SAFETY `buf[pos_in_buf..pos_in_buf+valid_len]` must _always_ be valid
    pos_in_buf: usize,
    valid_len: usize,
    complete: bool,
    io_error: Option<io::Error>,
    pos_of_buf: usize,
    mark_in_buf: usize,
    chunk_size: usize,
}

impl<'a> DeferredReader<'a> {
    const DEFAULT_CHUNK_SIZE: usize = 16 << 10;

    /// Creates a [`DeferredReader`] for the data of a [`BufReader`].
    pub fn from_buf_reader(buf_reader: BufReader<impl Read + 'a>) -> Self {
        // Avoid double buffering without discarding any already buffered contents.
        let buf_data = buf_reader.buffer().to_vec();
        if buf_data.is_empty() {
            Self::from_read(buf_reader.into_inner())
        } else {
            Self::from_read(Cursor::new(buf_data).chain(buf_reader.into_inner()))
        }
    }

    /// Creates a [`DeferredReader`] for the data of a [`Read`] instance.
    ///
    /// If the [`Read`] instance is a [`BufReader`], it is better to use
    /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
    /// data.
    pub fn from_read(read: impl Read + 'a) -> Self {
        Self::from_boxed_dyn_read(Box::new(read))
    }

    /// Creates a [`DeferredReader`] for the data of a boxed [`Read`] instance.
    ///
    /// If the [`Read`] instance is a [`BufReader`], it is better to use
    /// [`from_buf_reader`][Self::from_buf_reader] to avoid unnecessary double buffering of the
    /// data.
    #[inline(never)]
    pub fn from_boxed_dyn_read(read: Box<dyn Read + 'a>) -> Self {
        DeferredReader {
            read,
            buf: vec![],
            pos_in_buf: 0,
            valid_len: 0,
            complete: false,
            io_error: None,
            pos_of_buf: 0,
            mark_in_buf: 0,
            chunk_size: Self::DEFAULT_CHUNK_SIZE,
        }
    }

    /// Sets the number of bytes that are read at once.
    ///
    /// This sets the size of the [`read`][Read::read] requests made. Note that this is just an
    /// upper bound. Depending on the [`Read`] implementation, smaller amounts may be read at once.
    /// To enable interactive line based input, `DeferredReader` on its own will not issue more read
    /// requests than necessary.
    pub fn set_chunk_size(&mut self, size: usize) {
        self.chunk_size = size;
    }

    /// Returns the currently buffered data in front of the cursor.
    ///
    /// You can call [`is_complete`][Self::is_complete] to check whether the returned data contains
    /// all remaining input data.
    #[inline]
    pub fn buf(&self) -> &[u8] {
        unsafe {
            // SAFETY `self.pos_in_buf..self.pos_in_buf + self.valid_len` are always kept within
            // range
            debug_assert!(self
                .buf
                .get(self.pos_in_buf..self.pos_in_buf + self.valid_len)
                .is_some());
            self.buf
                .get_unchecked(self.pos_in_buf..self.pos_in_buf + self.valid_len)
        }
    }

    /// Returns the length of the currently buffered data.
    ///
    /// This returns the same value as `reader.buf().len()` but unlike [`reader.buf()`][Self::buf]
    /// this does not create an intermediate reference to the buffered data. This can make a
    /// difference in safety when raw pointers are used to access the buffered data.
    #[inline]
    pub fn buf_len(&self) -> usize {
        self.valid_len
    }

    /// Returns a pointer to the currently buffered data.
    ///
    /// This returns the same value as `reader.buf().as_ptr()` but unlike
    /// [`reader.buf()`][Self::buf] this does not create an intermediate reference to the buffered
    /// data. You can use [`reader.buf_len()`][Self::buf_len] to obtain the length of the buffered
    /// data.
    #[inline]
    pub fn buf_ptr(&self) -> *const u8 {
        unsafe {
            // SAFETY `self.pos_in_buf` is always kept in range
            self.buf.as_ptr().add(self.pos_in_buf)
        }
    }

    /// Advances the cursor by a given number of already buffered bytes.
    ///
    /// This will panic if the number of bytes exceeds the amount of buffered data.
    #[inline]
    pub fn advance(&mut self, n: usize) {
        let (next_len, overflow) = self.valid_len.overflowing_sub(n);
        self.valid_len = next_len;
        if overflow {
            self.advance_cold();
        }
        self.pos_in_buf += n;
        // SAFETY ^ we already subtracted n from len and checked for overflow so we cannot overflow
        // the buffer here.
    }

    #[inline(never)]
    #[cold]
    fn advance_cold(&mut self) -> ! {
        panic!("advanced past the current buffer size");
    }

    /// Advances the cursor by a given number of already buffered bytes without checking if
    /// sufficient bytes are buffered.
    ///
    /// # Safety
    ///
    /// The passed value for `n` may not exceed the value returned by [`buf_len()`][Self::buf_len].
    #[inline]
    pub unsafe fn advance_unchecked(&mut self, n: usize) {
        debug_assert!(self.valid_len >= n);
        self.valid_len -= n;
        self.pos_in_buf += n;
    }

    /// Total number of bytes the cursor was advanced so far.
    ///
    /// This wraps around every `usize::MAX` bytes.
    #[inline]
    pub fn position(&self) -> usize {
        self.pos_of_buf.wrapping_add(self.pos_in_buf)
    }

    /// Returns currently marked position.
    ///
    /// Initially this is position `0`, but can be changed using [`set_mark`][Self::set_mark] and
    /// [`set_mark_to_position`][Self::set_mark_to_position].
    ///
    /// Setting the mark to the start of a token before advancing over it can be useful for error
    /// reporting.
    #[inline]
    pub fn mark(&self) -> usize {
        self.pos_of_buf.wrapping_add(self.mark_in_buf)
    }

    /// Marks the current position.
    ///
    /// Calling this will make [`mark`](Self::mark) return the current position.
    #[inline]
    pub fn set_mark(&mut self) {
        self.mark_in_buf = self.pos_in_buf
    }

    /// Sets the position returned by [`mark`](Self::mark).
    #[inline]
    pub fn set_mark_to_position(&mut self, position: usize) {
        self.mark_in_buf = position.wrapping_sub(self.pos_of_buf)
    }

    /// Returns whether all remaining data is buffered.
    ///
    /// If this returns `true` [`buf`][Self::buf] will contain all the remaining data. This can
    /// happen when the end was reached or when an IO error was encountered. You can use
    /// [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
    #[inline]
    pub fn is_complete(&self) -> bool {
        self.complete
    }

    /// Returns whether the cursor is at the end of the available data.
    ///
    /// This can be the end of the input or all data before an IO error was encountered. You can
    /// use [`check_io_error`][Self::check_io_error] to determine whether an IO error occured.
    #[inline]
    pub fn is_at_end(&self) -> bool {
        self.complete && (self.valid_len == 0)
    }

    /// Returns an encountered IO errors as `Err(io_err)`.
    ///
    /// This resets the stored IO error and returns `Ok(())` if no IO error is stored.
    #[inline]
    pub fn check_io_error(&mut self) -> io::Result<()> {
        if let Some(err) = self.io_error.take() {
            Err(err)
        } else {
            Ok(())
        }
    }

    /// Returns a reference to an encountered IO error.
    ///
    /// This does not reset the stored IO error and erturns `None` if no IO error is stored.
    #[inline]
    pub fn io_error(&self) -> Option<&io::Error> {
        self.io_error.as_ref()
    }

    /// Tries to extend the buffer by reading more data until it reaches the requested length.
    ///
    /// Returns a slice to _all_ of the buffered data, not only the requested amount.
    ///
    /// This fails when the end of the input is reached or an IO error occured before enough
    /// data was read, in which case a smaller buffer than requested is returned.
    #[inline]
    pub fn request(&mut self, len: usize) -> &[u8] {
        if self.valid_len < len {
            self.request_cold(len);
        }
        self.buf()
    }

    #[cold]
    #[inline(never)]
    fn request_cold(&mut self, len: usize) {
        while self.valid_len < len && self.request_more() {}
    }

    /// Tries to extend the buffer by reading more data until it contains at least one byte.
    ///
    /// Returns the next byte.
    ///
    /// This fails when the end of the input is reached or an IO error occured before enough
    /// data was read, in which case `None` is returned.
    #[inline]
    pub fn request_byte(&mut self) -> Option<u8> {
        self.request_byte_at_offset(0)
    }

    /// Tries to extend the buffer by reading more data until it contains at least the byte at the
    /// given offset from the current position.
    ///
    /// Returns that byte.
    ///
    /// This fails when the end of the input is reached or an IO error occured before enough data
    /// was read, in which case `None` is returned.
    #[inline]
    pub fn request_byte_at_offset(&mut self, offset: usize) -> Option<u8> {
        if offset < self.valid_len {
            unsafe {
                // SAFETY within `pos_in_buf..pos_in_buf + valid_len`
                Some(*self.buf.get_unchecked(self.pos_in_buf + offset))
            }
        } else {
            self.request_byte_at_offset_cold(offset)
        }
    }

    #[cold]
    #[inline(never)]
    fn request_byte_at_offset_cold(&mut self, offset: usize) -> Option<u8> {
        while self.valid_len <= offset {
            if !self.request_more() {
                return None;
            }
        }
        Some(self.buf[self.pos_in_buf + offset])
    }

    /// Tries to extend the buffer by reading more data.
    #[cold]
    #[inline(never)]
    pub fn request_more(&mut self) -> bool {
        if self.complete {
            return false;
        }

        // Only realign if we have advanced over sufficiently more data to
        let realign = self.pos_in_buf > self.chunk_size * 2;

        if realign {
            self.buf
                .copy_within(self.pos_in_buf..self.pos_in_buf + self.valid_len, 0);
            self.pos_of_buf = self.pos_of_buf.wrapping_add(self.pos_in_buf);
            self.pos_in_buf = 0;
            self.mark_in_buf = self.mark_in_buf.wrapping_sub(self.pos_in_buf);

            // If our buffer is four times as large as it needs to be for the current data and an
            // additional chunk, shrink it.
            if self.buf.len() > 4 * (self.pos_in_buf + self.valid_len + self.chunk_size) {
                self.buf.truncate(self.buf.len() / 2);
                self.buf.shrink_to_fit();
            }
        }

        let target_end = self.pos_in_buf + self.valid_len + self.chunk_size;

        // Make sure we have enough buffer space for another chunk
        if self.buf.len() < target_end {
            self.buf.resize(target_end, 0);
        }

        // Do only a single successful read (to make line buffered repls usable), but do retry on
        // `Interrupted`.
        loop {
            match self
                .read
                .read(&mut self.buf[self.pos_in_buf + self.valid_len..target_end])
            {
                Ok(0) => self.complete = true,
                Ok(n) => {
                    // SAFETY this assert is load bearing, as `self.valid_len` is trusted but Read
                    // implementations aren't
                    assert!(
                        n <= self.chunk_size,
                        "invariant of std::io::Read trait violated"
                    );
                    self.valid_len += n
                }
                Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
                Err(err) => {
                    self.io_error = Some(err);
                    self.complete = true;
                }
            }
            break;
        }

        true
    }
}