io-streams 0.16.3

Unbuffered and unlocked I/O streams
Documentation
//! This file is derived from Rust's library/std/src/io/buffered at revision
//! f7801d6c7cc19ab22bdebcc8efa894a564c53469.

use super::buf_duplexer::BufDuplexerBackend;
use duplex::HalfDuplex;
use std::io::{self, IoSlice, Write};

/// Private helper struct for implementing the line-buffered writing logic.
/// This shim temporarily wraps a `BufDuplexer`, and uses its internals to
/// implement a line-buffered writer (specifically by using the internal
/// methods like `write_to_buf` and `flush_buf`). In this way, a more efficient
/// abstraction can be created than one that only had access to `write` and
/// `flush`, without needlessly duplicating a lot of the implementation details
/// of `BufDuplexer`. This also allows existing `BufDuplexer`s to be
/// temporarily given line-buffering logic; this is what enables Stdout to be
/// alternately in line-buffered or block-buffered mode.
#[derive(Debug)]
pub(super) struct BufReaderLineWriterShim<'a, Inner: HalfDuplex> {
    buffer: &'a mut BufDuplexerBackend<Inner>,
}

impl<'a, Inner: HalfDuplex> BufReaderLineWriterShim<'a, Inner> {
    #[inline]
    pub fn new(buffer: &'a mut BufDuplexerBackend<Inner>) -> Self {
        Self { buffer }
    }

    /// Get a mutable reference to the inner writer (that is, the writer
    /// wrapped by the `BufDuplexer`). Be careful with this writer, as writes
    /// to it will bypass the buffer.
    #[inline]
    fn inner_mut(&mut self) -> &mut Inner {
        self.buffer.get_mut()
    }

    /// Get the content currently buffered in self.buffer's writer buffer
    #[inline]
    fn writer_buffered(&self) -> &[u8] {
        self.buffer.writer_buffer()
    }

    /// Flush the buffer iff the last byte is a newline (indicating that an
    /// earlier write only succeeded partially, and we want to retry flushing
    /// the buffered line before continuing with a subsequent write)
    fn flush_if_completed_line(&mut self) -> io::Result<()> {
        match self.writer_buffered().last().copied() {
            Some(b'\n') => self.buffer.flush_buf(),
            _ => Ok(()),
        }
    }
}

impl<'a, Inner: HalfDuplex> Write for BufReaderLineWriterShim<'a, Inner> {
    /// Write some data into this `BufReaderLineWriterShim` with line
    /// buffering. This means that, if any newlines are present in the
    /// data, the data up to the last newline is sent directly to the
    /// underlying writer, and data after it is buffered. Returns the
    /// number of bytes written.
    ///
    /// This function operates on a "best effort basis"; in keeping with the
    /// convention of `std::io::Write::write`, it makes at most one attempt to
    /// write new data to the underlying writer. If that write only reports
    /// a partial success, the remaining data will be buffered.
    ///
    /// Because this function attempts to send completed lines to the
    /// underlying writer, it will also flush the existing buffer if it
    /// ends with a newline, even if the incoming data does not contain any
    /// newlines.
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let newline_idx = match memchr::memrchr(b'\n', buf) {
            // If there are no new newlines (that is, if this write is less than
            // one line), just do a regular buffered write (which may flush if
            // we exceed the inner buffer's size)
            None => {
                self.flush_if_completed_line()?;
                return self.buffer.write(buf);
            }
            // Otherwise, arrange for the lines to be written directly to the
            // inner writer.
            Some(newline_idx) => newline_idx + 1,
        };

        // Flush existing content to prepare for our write. We have to do this
        // before attempting to write `buf` in order to maintain consistency;
        // if we add `buf` to the buffer then try to flush it all at once,
        // we're obligated to return Ok(), which would mean suppressing any
        // errors that occur during flush.
        self.buffer.flush_buf()?;

        // This is what we're going to try to write directly to the inner
        // writer. The rest will be buffered, if nothing goes wrong.
        let lines = &buf[..newline_idx];

        // Write `lines` directly to the inner writer. In keeping with the
        // `write` convention, make at most one attempt to add new (unbuffered)
        // data. Because this write doesn't touch the `BufDuplexer` state directly,
        // and the buffer is known to be empty, we don't need to worry about
        // self.buffer.panicked here.
        let flushed = self.inner_mut().write(lines)?;

        // If buffer returns Ok(0), propagate that to the caller without
        // doing additional buffering; otherwise we're just guaranteeing
        // an "ErrorKind::WriteZero" later.
        if flushed == 0 {
            return Ok(0);
        }

        // Now that the write has succeeded, buffer the rest (or as much of
        // the rest as possible). If there were any unwritten newlines, we
        // only buffer out to the last unwritten newline that fits in the
        // buffer; this helps prevent flushing partial lines on subsequent
        // calls to BufReaderLineWriterShim::write.

        // Handle the cases in order of most-common to least-common, under
        // the presumption that most writes succeed in totality, and that most
        // writes are smaller than the buffer.
        // - Is this a partial line (ie, no newlines left in the unwritten tail)
        // - If not, does the data out to the last unwritten newline fit in the buffer?
        // - If not, scan for the last newline that *does* fit in the buffer
        let tail = if flushed >= newline_idx {
            &buf[flushed..]
        } else if newline_idx - flushed <= self.buffer.writer_capacity() {
            &buf[flushed..newline_idx]
        } else {
            let scan_area = &buf[flushed..];
            let scan_area = &scan_area[..self.buffer.writer_capacity()];
            match memchr::memrchr(b'\n', scan_area) {
                Some(newline_idx) => &scan_area[..=newline_idx],
                None => scan_area,
            }
        };

        let buffered = self.buffer.write_to_buf(tail);
        Ok(flushed + buffered)
    }

    #[inline]
    fn flush(&mut self) -> io::Result<()> {
        self.buffer.flush()
    }

    /// Write some vectored data into this `BufReaderLineWriterShim` with line
    /// buffering. This means that, if any newlines are present in the
    /// data, the data up to and including the buffer containing the last
    /// newline is sent directly to the inner writer, and the data after it
    /// is buffered. Returns the number of bytes written.
    ///
    /// This function operates on a "best effort basis"; in keeping with the
    /// convention of `std::io::Write::write`, it makes at most one attempt to
    /// write new data to the underlying writer.
    ///
    /// Because this function attempts to send completed lines to the
    /// underlying writer, it will also flush the existing buffer if it
    /// contains any newlines.
    ///
    /// Because sorting through an array of `IoSlice` can be a bit convoluted,
    /// This method differs from write in the following ways:
    ///
    /// - It attempts to write the full content of all the buffers up to and
    ///   including the one containing the last newline. This means that it may
    ///   attempt to write a partial line, that buffer has data past the
    ///   newline.
    /// - If the write only reports partial success, it does not attempt to
    ///   find the precise location of the written bytes and buffer the rest.
    ///
    /// If the underlying vector doesn't support vectored writing, we instead
    /// simply write the first non-empty buffer with `write`. This way, we
    /// get the benefits of more granular partial-line handling without losing
    /// anything in efficiency
    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
        // If there's no specialized behavior for write_vectored, just use
        // write. This has the benefit of more granular partial-line handling.
        #[cfg(can_vector)]
        if !self.is_write_vectored() {
            return match bufs.iter().find(|buf| !buf.is_empty()) {
                Some(buf) => self.write(buf),
                None => Ok(0),
            };
        }

        // Find the buffer containing the last newline
        let last_newline_buf_idx = bufs
            .iter()
            .enumerate()
            .rev()
            .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));

        // If there are no new newlines (that is, if this write is less than
        // one line), just do a regular buffered write
        let last_newline_buf_idx = match last_newline_buf_idx {
            // No newlines; just do a normal buffered write
            None => {
                self.flush_if_completed_line()?;
                return self.buffer.write_vectored(bufs);
            }
            Some(i) => i,
        };

        // Flush existing content to prepare for our write
        self.buffer.flush_buf()?;

        // This is what we're going to try to write directly to the inner
        // writer. The rest will be buffered, if nothing goes wrong.
        let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);

        // Write `lines` directly to the inner writer. In keeping with the
        // `write` convention, make at most one attempt to add new (unbuffered)
        // data. Because this write doesn't touch the `BufDuplexer` state directly,
        // and the buffer is known to be empty, we don't need to worry about
        // self.panicked here.
        let flushed = self.inner_mut().write_vectored(lines)?;

        // If inner returns Ok(0), propagate that to the caller without
        // doing additional buffering; otherwise we're just guaranteeing
        // an "ErrorKind::WriteZero" later.
        if flushed == 0 {
            return Ok(0);
        }

        // Don't try to reconstruct the exact amount written; just bail
        // in the event of a partial write
        let lines_len = lines.iter().map(|buf| buf.len()).sum();
        if flushed < lines_len {
            return Ok(flushed);
        }

        // Now that the write has succeeded, buffer the rest (or as much of the
        // rest as possible)
        let buffered: usize = tail
            .iter()
            .filter(|buf| !buf.is_empty())
            .map(|buf| self.buffer.write_to_buf(buf))
            .take_while(|&n| n > 0)
            .sum();

        Ok(flushed + buffered)
    }

    #[cfg(can_vector)]
    #[inline]
    fn is_write_vectored(&self) -> bool {
        self.buffer.is_write_vectored()
    }

    /// Write some data into this `BufReaderLineWriterShim` with line
    /// buffering. This means that, if any newlines are present in the
    /// data, the data up to the last newline is sent directly to the
    /// underlying writer, and data after it is buffered.
    ///
    /// Because this function attempts to send completed lines to the
    /// underlying writer, it will also flush the existing buffer if it
    /// contains any newlines, even if the incoming data does not contain
    /// any newlines.
    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
        match memchr::memrchr(b'\n', buf) {
            // If there are no new newlines (that is, if this write is less than
            // one line), just do a regular buffered write (which may flush if
            // we exceed the inner buffer's size)
            None => {
                self.flush_if_completed_line()?;
                self.buffer.write_all(buf)
            }
            Some(newline_idx) => {
                let (lines, tail) = buf.split_at(newline_idx + 1);

                if self.writer_buffered().is_empty() {
                    self.inner_mut().write_all(lines)?;
                } else {
                    // If there is any buffered data, we add the incoming lines
                    // to that buffer before flushing, which saves us at least
                    // one write call. We can't really do this with `write`,
                    // since we can't do this *and* not suppress errors *and*
                    // report a consistent state to the caller in a return
                    // value, but here in write_all it's fine.
                    self.buffer.write_all(lines)?;
                    self.buffer.flush_buf()?;
                }

                self.buffer.write_all(tail)
            }
        }
    }
}