io_streams/buffered/
buf_duplexer.rs

1//! This file is derived from Rust's library/std/src/io/buffered at revision
2//! f7801d6c7cc19ab22bdebcc8efa894a564c53469.
3
4use super::{IntoInnerError, DEFAULT_BUF_SIZE};
5use duplex::HalfDuplex;
6#[cfg(feature = "layered-io")]
7use layered_io::{default_suggested_buffer_size, Bufferable, HalfDuplexLayered};
8#[cfg(read_initializer)]
9use std::io::Initializer;
10use std::io::{self, BufRead, Error, ErrorKind, IoSlice, IoSliceMut, Read, Write};
11use std::{cmp, fmt};
12#[cfg(not(windows))]
13use {
14    io_extras::os::rustix::{AsRawFd, RawFd},
15    io_lifetimes::{AsFd, BorrowedFd},
16};
17#[cfg(windows)]
18use {
19    io_extras::os::windows::{
20        AsHandleOrSocket, AsRawHandleOrSocket, BorrowedHandleOrSocket, RawHandleOrSocket,
21    },
22    io_lifetimes::{AsHandle, AsSocket, BorrowedHandle, BorrowedSocket},
23    std::os::windows::io::{AsRawHandle, AsRawSocket, RawHandle, RawSocket},
24};
25
26/// Wraps a reader and writer and buffers their output.
27///
28/// It can be excessively inefficient to work directly with something that
29/// implements [`Write`]. For example, every call to
30/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
31/// `BufDuplexer<Inner>` keeps an in-memory buffer of data and writes it to an
32/// underlying writer in large, infrequent batches.
33///
34/// It can be excessively inefficient to work directly with a [`Read`]
35/// instance. For example, every call to [`read`][`TcpStream::read`] on
36/// [`TcpStream`] results in a system call. A `BufDuplexer<Inner>` performs
37/// large, infrequent reads on the underlying [`Read`] and maintains an
38/// in-memory buffer of the results.
39///
40/// `BufDuplexer<Inner>` can improve the speed of programs that make *small*
41/// and *repeated* write calls to the same file or network socket. It does not
42/// help when writing very large amounts at once, or writing just one or a few
43/// times. It also provides no advantage when writing to a destination that is
44/// in memory, like a [`Vec`]`<u8>`.
45///
46/// `BufDuplexer<Inner>` can improve the speed of programs that make *small*
47/// and *repeated* read calls to the same file or network socket. It does not
48/// help when reading very large amounts at once, or reading just one or a few
49/// times. It also provides no advantage when reading from a source that is
50/// already in memory, like a [`Vec`]`<u8>`.
51///
52/// It is critical to call [`flush`] before `BufDuplexer<Inner>` is dropped.
53/// Though dropping will attempt to flush the contents of the writer buffer,
54/// any errors that happen in the process of dropping will be ignored. Calling
55/// [`flush`] ensures that the writer buffer is empty and thus dropping will
56/// not even attempt file operations.
57///
58/// When the `BufDuplexer<Inner>` is dropped, the contents of its reader buffer
59/// will be discarded. Creating multiple instances of a `BufDuplexer<Inner>` on
60/// the same stream can cause data loss. Reading from the underlying reader
61/// after unwrapping the `BufDuplexer<Inner>` with [`BufDuplexer::into_inner`]
62/// can also cause data loss.
63///
64/// # Examples
65///
66/// Let's write the numbers one through ten to a [`TcpStream`]:
67///
68/// ```no_run
69/// use std::io::prelude::*;
70/// use std::net::TcpStream;
71///
72/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
73///
74/// for i in 0..10 {
75///     stream.write(&[i + 1]).unwrap();
76/// }
77/// ```
78///
79/// Because we're not buffering, we write each one in turn, incurring the
80/// overhead of a system call per byte written. We can fix this with a
81/// `BufDuplexer<Inner>`:
82///
83/// ```no_run
84/// use io_streams::BufDuplexer;
85/// use std::io::prelude::*;
86/// use std::net::TcpStream;
87///
88/// let mut stream = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
89///
90/// for i in 0..10 {
91///     stream.write(&[i + 1]).unwrap();
92/// }
93/// stream.flush().unwrap();
94/// ```
95///
96/// By wrapping the stream with a `BufDuplexer<Inner>`, these ten writes are
97/// all grouped together by the buffer and will all be written out in one
98/// system call when the `stream` is flushed.
99///
100/// ```no_run
101/// use io_streams::BufDuplexer;
102/// use std::io::prelude::*;
103/// use std::net::TcpStream;
104///
105/// fn main() -> std::io::Result<()> {
106///     let mut stream = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
107///
108///     let mut line = String::new();
109///     let len = stream.read_line(&mut line)?;
110///     println!("First line is {} bytes long", len);
111///     Ok(())
112/// }
113/// ```
114///
115/// [`TcpStream::read`]: std::io::Read::read
116/// [`TcpStream::write`]: std::io::Write::write
117/// [`TcpStream`]: std::net::TcpStream
118/// [`flush`]: std::io::Write::flush
119pub struct BufDuplexer<Inner: HalfDuplex> {
120    inner: BufDuplexerBackend<Inner>,
121}
122
123pub(crate) struct BufDuplexerBackend<Inner: HalfDuplex> {
124    inner: Option<Inner>,
125
126    // writer fields
127    writer_buf: Vec<u8>,
128    // #30888: If the inner writer panics in a call to write, we don't want to
129    // write the buffered data a second time in BufDuplexer's destructor. This
130    // flag tells the Drop impl if it should skip the flush.
131    panicked: bool,
132
133    // reader fields
134    reader_buf: Box<[u8]>,
135    pos: usize,
136    cap: usize,
137}
138
139impl<Inner: HalfDuplex> BufDuplexer<Inner> {
140    /// Creates a new `BufDuplexer<Inner>` with default buffer capacities. The
141    /// default is currently 8 KB, but may change in the future.
142    ///
143    /// # Examples
144    ///
145    /// ```no_run
146    /// use io_streams::BufDuplexer;
147    /// use std::net::TcpStream;
148    ///
149    /// let mut buffer = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
150    /// ```
151    #[inline]
152    pub fn new(inner: Inner) -> Self {
153        Self {
154            inner: BufDuplexerBackend::new(inner),
155        }
156    }
157
158    /// Creates a new `BufDuplexer<Inner>` with the specified buffer
159    /// capacities.
160    ///
161    /// # Examples
162    ///
163    /// Creating a buffer with ten bytes of reader capacity and a writer buffer
164    /// of a hundered bytes:
165    ///
166    /// ```no_run
167    /// use io_streams::BufDuplexer;
168    /// use std::net::TcpStream;
169    ///
170    /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap();
171    /// let mut buffer = BufDuplexer::with_capacities(10, 100, stream);
172    /// ```
173    #[inline]
174    pub fn with_capacities(reader_capacity: usize, writer_capacity: usize, inner: Inner) -> Self {
175        Self {
176            inner: BufDuplexerBackend::with_capacities(reader_capacity, writer_capacity, inner),
177        }
178    }
179
180    /// Gets a reference to the underlying reader/writer.
181    ///
182    /// # Examples
183    ///
184    /// ```no_run
185    /// use io_streams::BufDuplexer;
186    /// use std::net::TcpStream;
187    ///
188    /// let mut buffer = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
189    ///
190    /// // we can use reference just like buffer
191    /// let reference = buffer.get_ref();
192    /// ```
193    #[inline]
194    pub fn get_ref(&self) -> &Inner {
195        self.inner.get_ref()
196    }
197
198    /// Gets a mutable reference to the underlying reader/writer.
199    ///
200    /// It is inadvisable to directly write to the underlying reader/writer.
201    ///
202    /// # Examples
203    ///
204    /// ```no_run
205    /// use io_streams::BufDuplexer;
206    /// use std::net::TcpStream;
207    ///
208    /// let mut buffer = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
209    ///
210    /// // we can use reference just like buffer
211    /// let reference = buffer.get_mut();
212    /// ```
213    #[inline]
214    pub fn get_mut(&mut self) -> &mut Inner {
215        self.inner.get_mut()
216    }
217
218    /// Returns a reference to the internally buffered writer data.
219    ///
220    /// # Examples
221    ///
222    /// ```no_run
223    /// use io_streams::BufDuplexer;
224    /// use std::net::TcpStream;
225    ///
226    /// let buf_writer = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
227    ///
228    /// // See how many bytes are currently buffered
229    /// let bytes_buffered = buf_writer.writer_buffer().len();
230    /// ```
231    #[inline]
232    pub fn writer_buffer(&self) -> &[u8] {
233        self.inner.writer_buffer()
234    }
235
236    /// Returns a reference to the internally buffered reader data.
237    ///
238    /// Unlike [`fill_buf`], this will not attempt to fill the buffer if it is
239    /// empty.
240    ///
241    /// [`fill_buf`]: BufRead::fill_buf
242    ///
243    /// # Examples
244    ///
245    /// ```no_run
246    /// use char_device::CharDevice;
247    /// use io_streams::BufDuplexer;
248    /// use std::fs::File;
249    /// use std::io::BufRead;
250    ///
251    /// fn main() -> std::io::Result<()> {
252    ///     let f = CharDevice::new(File::open("/dev/ttyS0")?)?;
253    ///     let mut reader = BufDuplexer::new(f);
254    ///     assert!(reader.reader_buffer().is_empty());
255    ///
256    ///     if reader.fill_buf()?.len() > 0 {
257    ///         assert!(!reader.reader_buffer().is_empty());
258    ///     }
259    ///     Ok(())
260    /// }
261    /// ```
262    pub fn reader_buffer(&self) -> &[u8] {
263        self.inner.reader_buffer()
264    }
265
266    /// Returns the number of bytes the internal writer buffer can hold without
267    /// flushing.
268    ///
269    /// # Examples
270    ///
271    /// ```no_run
272    /// use io_streams::BufDuplexer;
273    /// use std::net::TcpStream;
274    ///
275    /// let buf_duplexer = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
276    ///
277    /// // Check the capacity of the inner buffer
278    /// let capacity = buf_duplexer.writer_capacity();
279    /// // Calculate how many bytes can be written without flushing
280    /// let without_flush = capacity - buf_duplexer.writer_buffer().len();
281    /// ```
282    #[inline]
283    pub fn writer_capacity(&self) -> usize {
284        self.inner.writer_capacity()
285    }
286
287    /// Returns the number of bytes the internal reader buffer can hold at
288    /// once.
289    ///
290    /// # Examples
291    ///
292    /// ```no_run
293    /// use char_device::CharDevice;
294    /// use io_streams::BufDuplexer;
295    /// use std::fs::File;
296    /// use std::io::BufRead;
297    ///
298    /// fn main() -> std::io::Result<()> {
299    ///     let f = CharDevice::new(File::open("/dev/tty")?)?;
300    ///     let mut reader = BufDuplexer::new(f);
301    ///
302    ///     let capacity = reader.reader_capacity();
303    ///     let buffer = reader.fill_buf()?;
304    ///     assert!(buffer.len() <= capacity);
305    ///     Ok(())
306    /// }
307    /// ```
308    pub fn reader_capacity(&self) -> usize {
309        self.inner.reader_capacity()
310    }
311
312    /// Unwraps this `BufDuplexer<Inner>`, returning the underlying
313    /// reader/writer.
314    ///
315    /// The buffer is written out before returning the reader/writer.
316    ///
317    /// # Errors
318    ///
319    /// An [`Err`] will be returned if an error occurs while flushing the
320    /// buffer.
321    ///
322    /// # Examples
323    ///
324    /// ```no_run
325    /// use io_streams::BufDuplexer;
326    /// use std::net::TcpStream;
327    ///
328    /// let mut buffer = BufDuplexer::new(TcpStream::connect("127.0.0.1:34254").unwrap());
329    ///
330    /// // unwrap the TcpStream and flush the buffer
331    /// let stream = buffer.into_inner().unwrap();
332    /// ```
333    pub fn into_inner(self) -> Result<Inner, IntoInnerError<Self>> {
334        self.inner
335            .into_inner()
336            .map_err(|err| err.new_wrapped(|inner| Self { inner }))
337    }
338}
339
340impl<Inner: HalfDuplex> BufDuplexerBackend<Inner> {
341    pub fn new(inner: Inner) -> Self {
342        Self::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
343    }
344
345    pub fn with_capacities(reader_capacity: usize, writer_capacity: usize, inner: Inner) -> Self {
346        #[cfg(not(read_initializer))]
347        let buffer = vec![0; reader_capacity];
348
349        #[cfg(read_initializer)]
350        let buffer = unsafe {
351            let mut buffer = Vec::with_capacity(reader_capacity);
352            buffer.set_len(reader_capacity);
353            inner.initializer().initialize(&mut buffer);
354            buffer
355        };
356
357        Self {
358            inner: Some(inner),
359            writer_buf: Vec::with_capacity(writer_capacity),
360            panicked: false,
361            reader_buf: buffer.into_boxed_slice(),
362            pos: 0,
363            cap: 0,
364        }
365    }
366
367    /// Send data in our local buffer into the inner writer, looping as
368    /// necessary until either it's all been sent or an error occurs.
369    ///
370    /// Because all the data in the buffer has been reported to our owner as
371    /// "successfully written" (by returning nonzero success values from
372    /// `write`), any 0-length writes from `inner` must be reported as i/o
373    /// errors from this method.
374    pub(super) fn flush_buf(&mut self) -> io::Result<()> {
375        /// Helper struct to ensure the buffer is updated after all the writes
376        /// are complete. It tracks the number of written bytes and drains them
377        /// all from the front of the buffer when dropped.
378        struct BufGuard<'a> {
379            buffer: &'a mut Vec<u8>,
380            written: usize,
381        }
382
383        impl<'a> BufGuard<'a> {
384            fn new(buffer: &'a mut Vec<u8>) -> Self {
385                Self { buffer, written: 0 }
386            }
387
388            /// The unwritten part of the buffer
389            fn remaining(&self) -> &[u8] {
390                &self.buffer[self.written..]
391            }
392
393            /// Flag some bytes as removed from the front of the buffer
394            fn consume(&mut self, amt: usize) {
395                self.written += amt;
396            }
397
398            /// true if all of the bytes have been written
399            fn done(&self) -> bool {
400                self.written >= self.buffer.len()
401            }
402        }
403
404        impl Drop for BufGuard<'_> {
405            fn drop(&mut self) {
406                if self.written > 0 {
407                    self.buffer.drain(..self.written);
408                }
409            }
410        }
411
412        let mut guard = BufGuard::new(&mut self.writer_buf);
413        let inner = self.inner.as_mut().unwrap();
414        while !guard.done() {
415            self.panicked = true;
416            let r = inner.write(guard.remaining());
417            self.panicked = false;
418
419            match r {
420                Ok(0) => {
421                    return Err(Error::new(
422                        ErrorKind::WriteZero,
423                        "failed to write the buffered data",
424                    ));
425                }
426                Ok(n) => guard.consume(n),
427                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
428                Err(e) => return Err(e),
429            }
430        }
431        Ok(())
432    }
433
434    /// Buffer some data without flushing it, regardless of the size of the
435    /// data. Writes as much as possible without exceeding capacity. Returns
436    /// the number of bytes written.
437    pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
438        let available = self.writer_buf.capacity() - self.writer_buf.len();
439        let amt_to_buffer = available.min(buf.len());
440        self.writer_buf.extend_from_slice(&buf[..amt_to_buffer]);
441        amt_to_buffer
442    }
443
444    #[inline]
445    pub fn get_ref(&self) -> &Inner {
446        self.inner.as_ref().unwrap()
447    }
448
449    #[inline]
450    pub fn get_mut(&mut self) -> &mut Inner {
451        self.inner.as_mut().unwrap()
452    }
453
454    #[inline]
455    pub fn writer_buffer(&self) -> &[u8] {
456        &self.writer_buf
457    }
458
459    pub fn reader_buffer(&self) -> &[u8] {
460        &self.reader_buf[self.pos..self.cap]
461    }
462
463    #[inline]
464    pub fn writer_capacity(&self) -> usize {
465        self.writer_buf.capacity()
466    }
467
468    pub fn reader_capacity(&self) -> usize {
469        self.reader_buf.len()
470    }
471
472    pub fn into_inner(mut self) -> Result<Inner, IntoInnerError<Self>> {
473        match self.flush_buf() {
474            Err(e) => Err(IntoInnerError::new(self, e)),
475            Ok(()) => Ok(self.inner.take().unwrap()),
476        }
477    }
478
479    /// Invalidates all data in the internal buffer.
480    #[inline]
481    fn discard_reader_buffer(&mut self) {
482        self.pos = 0;
483        self.cap = 0;
484    }
485}
486
487impl<Inner: HalfDuplex> Write for BufDuplexer<Inner> {
488    #[inline]
489    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
490        self.inner.write(buf)
491    }
492
493    #[inline]
494    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
495        self.inner.write_all(buf)
496    }
497
498    #[inline]
499    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
500        self.inner.write_vectored(bufs)
501    }
502
503    #[cfg(can_vector)]
504    #[inline]
505    fn is_write_vectored(&self) -> bool {
506        self.inner.is_write_vectored()
507    }
508
509    #[inline]
510    fn flush(&mut self) -> io::Result<()> {
511        self.inner.flush()
512    }
513}
514
515impl<Inner: HalfDuplex> Write for BufDuplexerBackend<Inner> {
516    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
517        if self.writer_buf.len() + buf.len() > self.writer_buf.capacity() {
518            self.flush_buf()?;
519        }
520        // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
521        if buf.len() >= self.writer_buf.capacity() {
522            self.panicked = true;
523            let r = self.get_mut().write(buf);
524            self.panicked = false;
525            r
526        } else {
527            self.writer_buf.extend_from_slice(buf);
528            Ok(buf.len())
529        }
530    }
531
532    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
533        // Normally, `write_all` just calls `write` in a loop. We can do better
534        // by calling `self.get_mut().write_all()` directly, which avoids
535        // round trips through the buffer in the event of a series of partial
536        // writes in some circumstances.
537        if self.writer_buf.len() + buf.len() > self.writer_buf.capacity() {
538            self.flush_buf()?;
539        }
540        // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
541        if buf.len() >= self.writer_buf.capacity() {
542            self.panicked = true;
543            let r = self.get_mut().write_all(buf);
544            self.panicked = false;
545            r
546        } else {
547            self.writer_buf.extend_from_slice(buf);
548            Ok(())
549        }
550    }
551
552    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
553        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
554        if self.writer_buf.len() + total_len > self.writer_buf.capacity() {
555            self.flush_buf()?;
556        }
557        // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
558        if total_len >= self.writer_buf.capacity() {
559            self.panicked = true;
560            let r = self.get_mut().write_vectored(bufs);
561            self.panicked = false;
562            r
563        } else {
564            bufs.iter()
565                .for_each(|b| self.writer_buf.extend_from_slice(b));
566            Ok(total_len)
567        }
568    }
569
570    #[cfg(can_vector)]
571    #[inline]
572    fn is_write_vectored(&self) -> bool {
573        self.get_ref().is_write_vectored()
574    }
575
576    #[inline]
577    fn flush(&mut self) -> io::Result<()> {
578        self.flush_buf().and_then(|()| self.get_mut().flush())
579    }
580}
581
582impl<Inner: HalfDuplex> Read for BufDuplexer<Inner> {
583    #[inline]
584    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
585        // Flush the writer half of this `BufDuplexer` before reading.
586        self.inner.flush()?;
587
588        self.inner.read(buf)
589    }
590
591    #[inline]
592    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
593        // Flush the writer half of this `BufDuplexer` before reading.
594        self.inner.flush()?;
595
596        self.inner.read_vectored(bufs)
597    }
598
599    #[cfg(can_vector)]
600    #[inline]
601    fn is_read_vectored(&self) -> bool {
602        self.inner.is_read_vectored()
603    }
604
605    // we can't skip unconditionally because of the large buffer case in read.
606    #[cfg(read_initializer)]
607    #[inline]
608    unsafe fn initializer(&self) -> Initializer {
609        self.inner.initializer()
610    }
611}
612
613impl<Inner: HalfDuplex> Read for BufDuplexerBackend<Inner> {
614    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
615        if self.pos == self.cap && buf.len() >= self.reader_buf.len() {
616            self.discard_reader_buffer();
617            return self.inner.as_mut().unwrap().read(buf);
618        }
619        let size = {
620            let mut rem = self.fill_buf()?;
621            rem.read(buf)?
622        };
623        self.consume(size);
624        Ok(size)
625    }
626
627    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
628        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
629        if self.pos == self.cap && total_len >= self.reader_buf.len() {
630            self.discard_reader_buffer();
631            return self.inner.as_mut().unwrap().read_vectored(bufs);
632        }
633        let size = {
634            let mut rem = self.fill_buf()?;
635            rem.read_vectored(bufs)?
636        };
637        self.consume(size);
638        Ok(size)
639    }
640
641    #[cfg(can_vector)]
642    fn is_read_vectored(&self) -> bool {
643        self.inner.as_ref().unwrap().is_read_vectored()
644    }
645
646    // we can't skip unconditionally because of the large buffer case in read.
647    #[cfg(read_initializer)]
648    unsafe fn initializer(&self) -> Initializer {
649        self.inner.as_ref().unwrap().initializer()
650    }
651}
652
653impl<Inner: HalfDuplex> BufRead for BufDuplexer<Inner> {
654    #[inline]
655    fn fill_buf(&mut self) -> io::Result<&[u8]> {
656        self.inner.fill_buf()
657    }
658
659    #[inline]
660    fn consume(&mut self, amt: usize) {
661        self.inner.consume(amt)
662    }
663
664    #[inline]
665    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
666        // Flush the writer half of this `BufDuplexer` before reading.
667        self.inner.flush()?;
668
669        self.inner.read_until(byte, buf)
670    }
671
672    #[inline]
673    fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
674        // Flush the writer half of this `BufDuplexer` before reading.
675        self.inner.flush()?;
676
677        self.inner.read_line(buf)
678    }
679}
680
681// FIXME: impl read_line for BufRead explicitly?
682impl<Inner: HalfDuplex> BufRead for BufDuplexerBackend<Inner> {
683    fn fill_buf(&mut self) -> io::Result<&[u8]> {
684        // If we've reached the end of our internal buffer then we need to fetch
685        // some more data from the underlying reader.
686        // Branch using `>=` instead of the more correct `==`
687        // to tell the compiler that the pos..cap slice is always valid.
688        if self.pos >= self.cap {
689            debug_assert!(self.pos == self.cap);
690            self.cap = self.inner.as_mut().unwrap().read(&mut self.reader_buf)?;
691            self.pos = 0;
692        }
693        Ok(&self.reader_buf[self.pos..self.cap])
694    }
695
696    fn consume(&mut self, amt: usize) {
697        self.pos = cmp::min(self.pos + amt, self.cap);
698    }
699}
700
701impl<Inner: HalfDuplex> fmt::Debug for BufDuplexer<Inner>
702where
703    Inner: fmt::Debug,
704{
705    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
706        self.inner.fmt(fmt)
707    }
708}
709
710impl<Inner: HalfDuplex> fmt::Debug for BufDuplexerBackend<Inner>
711where
712    Inner: fmt::Debug,
713{
714    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
715        fmt.debug_struct("BufDuplexer")
716            .field("inner", &self.inner.as_ref().unwrap())
717            .field(
718                "reader_buffer",
719                &format_args!("{}/{}", self.cap - self.pos, self.reader_buf.len()),
720            )
721            .field(
722                "writer_buffer",
723                &format_args!("{}/{}", self.writer_buf.len(), self.writer_buf.capacity()),
724            )
725            .finish()
726    }
727}
728
729impl<Inner: HalfDuplex> Drop for BufDuplexerBackend<Inner> {
730    fn drop(&mut self) {
731        if self.inner.is_some() && !self.panicked {
732            // dtors should not panic, so we ignore a failed flush
733            let _r = self.flush_buf();
734        }
735    }
736}
737
738#[cfg(not(windows))]
739impl<Inner: HalfDuplex + AsRawFd> AsRawFd for BufDuplexer<Inner> {
740    #[inline]
741    fn as_raw_fd(&self) -> RawFd {
742        self.inner.as_raw_fd()
743    }
744}
745
746#[cfg(windows)]
747impl<Inner: HalfDuplex + AsRawHandle> AsRawHandle for BufDuplexer<Inner> {
748    #[inline]
749    fn as_raw_handle(&self) -> RawHandle {
750        self.inner.as_raw_handle()
751    }
752}
753
754#[cfg(windows)]
755impl<Inner: HalfDuplex + AsRawSocket> AsRawSocket for BufDuplexer<Inner> {
756    #[inline]
757    fn as_raw_socket(&self) -> RawSocket {
758        self.inner.as_raw_socket()
759    }
760}
761
762#[cfg(windows)]
763impl<Inner: HalfDuplex + AsRawHandleOrSocket> AsRawHandleOrSocket for BufDuplexer<Inner> {
764    #[inline]
765    fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
766        self.inner.as_raw_handle_or_socket()
767    }
768}
769
770#[cfg(not(windows))]
771impl<Inner: HalfDuplex + AsRawFd> AsRawFd for BufDuplexerBackend<Inner> {
772    #[inline]
773    fn as_raw_fd(&self) -> RawFd {
774        self.inner.as_ref().unwrap().as_raw_fd()
775    }
776}
777
778#[cfg(windows)]
779impl<Inner: HalfDuplex + AsRawHandle> AsRawHandle for BufDuplexerBackend<Inner> {
780    #[inline]
781    fn as_raw_handle(&self) -> RawHandle {
782        self.inner.as_ref().unwrap().as_raw_handle()
783    }
784}
785
786#[cfg(windows)]
787impl<Inner: HalfDuplex + AsRawSocket> AsRawSocket for BufDuplexerBackend<Inner> {
788    #[inline]
789    fn as_raw_socket(&self) -> RawSocket {
790        self.inner.as_ref().unwrap().as_raw_socket()
791    }
792}
793
794#[cfg(windows)]
795impl<Inner: HalfDuplex + AsRawHandleOrSocket> AsRawHandleOrSocket for BufDuplexerBackend<Inner> {
796    #[inline]
797    fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
798        self.inner.as_ref().unwrap().as_raw_handle_or_socket()
799    }
800}
801
802#[cfg(not(windows))]
803impl<Inner: HalfDuplex + AsFd> AsFd for BufDuplexer<Inner> {
804    #[inline]
805    fn as_fd(&self) -> BorrowedFd<'_> {
806        self.inner.as_fd()
807    }
808}
809
810#[cfg(windows)]
811impl<Inner: HalfDuplex + AsHandle> AsHandle for BufDuplexer<Inner> {
812    #[inline]
813    fn as_handle(&self) -> BorrowedHandle<'_> {
814        self.inner.as_handle()
815    }
816}
817
818#[cfg(windows)]
819impl<Inner: HalfDuplex + AsSocket> AsSocket for BufDuplexer<Inner> {
820    #[inline]
821    fn as_socket(&self) -> BorrowedSocket<'_> {
822        self.inner.as_socket()
823    }
824}
825
826#[cfg(windows)]
827impl<Inner: HalfDuplex + AsHandleOrSocket> AsHandleOrSocket for BufDuplexer<Inner> {
828    #[inline]
829    fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
830        self.inner.as_handle_or_socket()
831    }
832}
833
834#[cfg(not(windows))]
835impl<Inner: HalfDuplex + AsFd> AsFd for BufDuplexerBackend<Inner> {
836    #[inline]
837    fn as_fd(&self) -> BorrowedFd<'_> {
838        self.inner.as_ref().unwrap().as_fd()
839    }
840}
841
842#[cfg(windows)]
843impl<Inner: HalfDuplex + AsHandle> AsHandle for BufDuplexerBackend<Inner> {
844    #[inline]
845    fn as_handle(&self) -> BorrowedHandle<'_> {
846        self.inner.as_ref().unwrap().as_handle()
847    }
848}
849
850#[cfg(windows)]
851impl<Inner: HalfDuplex + AsSocket> AsSocket for BufDuplexerBackend<Inner> {
852    #[inline]
853    fn as_socket(&self) -> BorrowedSocket<'_> {
854        self.inner.as_ref().unwrap().as_socket()
855    }
856}
857
858#[cfg(windows)]
859impl<Inner: HalfDuplex + AsHandleOrSocket> AsHandleOrSocket for BufDuplexerBackend<Inner> {
860    #[inline]
861    fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
862        self.inner.as_ref().unwrap().as_handle_or_socket()
863    }
864}
865
866#[cfg(feature = "terminal-io")]
867impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::Terminal for BufDuplexer<Inner> {}
868
869#[cfg(feature = "terminal-io")]
870impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::Terminal
871    for BufDuplexerBackend<Inner>
872{
873}
874
875#[cfg(feature = "terminal-io")]
876impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::WriteTerminal
877    for BufDuplexer<Inner>
878{
879    #[inline]
880    fn color_support(&self) -> terminal_io::TerminalColorSupport {
881        self.inner.color_support()
882    }
883
884    #[inline]
885    fn color_preference(&self) -> bool {
886        self.inner.color_preference()
887    }
888
889    #[inline]
890    fn is_output_terminal(&self) -> bool {
891        self.inner.is_output_terminal()
892    }
893}
894
895#[cfg(feature = "terminal-io")]
896impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::WriteTerminal
897    for BufDuplexerBackend<Inner>
898{
899    #[inline]
900    fn color_support(&self) -> terminal_io::TerminalColorSupport {
901        self.inner.as_ref().unwrap().color_support()
902    }
903
904    #[inline]
905    fn color_preference(&self) -> bool {
906        self.inner.as_ref().unwrap().color_preference()
907    }
908
909    #[inline]
910    fn is_output_terminal(&self) -> bool {
911        match &self.inner {
912            Some(inner) => inner.is_output_terminal(),
913            None => false,
914        }
915    }
916}
917
918#[cfg(feature = "layered-io")]
919impl<Inner: HalfDuplexLayered> Bufferable for BufDuplexer<Inner> {
920    #[inline]
921    fn abandon(&mut self) {
922        self.inner.abandon()
923    }
924
925    #[inline]
926    fn suggested_buffer_size(&self) -> usize {
927        self.inner.suggested_buffer_size()
928    }
929}
930
931#[cfg(feature = "layered-io")]
932impl<Inner: HalfDuplexLayered> Bufferable for BufDuplexerBackend<Inner> {
933    #[inline]
934    fn abandon(&mut self) {
935        match &mut self.inner {
936            Some(inner) => inner.abandon(),
937            None => (),
938        }
939    }
940
941    #[inline]
942    fn suggested_buffer_size(&self) -> usize {
943        match &self.inner {
944            Some(inner) => {
945                std::cmp::max(inner.minimum_buffer_size(), inner.suggested_buffer_size())
946            }
947            None => default_suggested_buffer_size(self),
948        }
949    }
950}