ckb_rust_std/io/buffered/
bufwriter.rs

1use crate::io::{
2    self, error::core_error as error, error::ErrorKind, IntoInnerError, Seek, SeekFrom, Write,
3    DEFAULT_BUF_SIZE,
4};
5use alloc::fmt;
6use alloc::vec::Vec;
7use core::mem::{self, ManuallyDrop};
8use core::ptr;
9/// Wraps a writer and buffers its output.
10///
11/// It can be excessively inefficient to work directly with something that
12/// implements [`Write`]. For example, every call to
13/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
14/// `BufWriter<W>` keeps an in-memory buffer of data and writes it to an underlying
15/// writer in large, infrequent batches.
16///
17/// `BufWriter<W>` can improve the speed of programs that make *small* and
18/// *repeated* write calls to the same file or network socket. It does not
19/// help when writing very large amounts at once, or writing just one or a few
20/// times. It also provides no advantage when writing to a destination that is
21/// in memory, like a <code>[Vec]\<u8></code>.
22///
23/// It is critical to call [`flush`] before `BufWriter<W>` is dropped. Though
24/// dropping will attempt to flush the contents of the buffer, any errors
25/// that happen in the process of dropping will be ignored. Calling [`flush`]
26/// ensures that the buffer is empty and thus dropping will not even attempt
27/// file operations.
28///
29/// # Examples
30///
31/// Let's write the numbers one through ten to a [`TcpStream`]:
32///
33/// ```no_run
34/// use std::io::prelude::*;
35/// use std::net::TcpStream;
36///
37/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
38///
39/// for i in 0..10 {
40///     stream.write(&[i+1]).unwrap();
41/// }
42/// ```
43///
44/// Because we're not buffering, we write each one in turn, incurring the
45/// overhead of a system call per byte written. We can fix this with a
46/// `BufWriter<W>`:
47///
48/// ```no_run
49/// use std::io::prelude::*;
50/// use std::io::BufWriter;
51/// use std::net::TcpStream;
52///
53/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
54///
55/// for i in 0..10 {
56///     stream.write(&[i+1]).unwrap();
57/// }
58/// stream.flush().unwrap();
59/// ```
60///
61/// By wrapping the stream with a `BufWriter<W>`, these ten writes are all grouped
62/// together by the buffer and will all be written out in one system call when
63/// the `stream` is flushed.
64///
65/// [`TcpStream::write`]: crate::net::TcpStream::write
66/// [`TcpStream`]: crate::net::TcpStream
67/// [`flush`]: BufWriter::flush
68pub struct BufWriter<W: ?Sized + Write> {
69    // The buffer. Avoid using this like a normal `Vec` in common code paths.
70    // That is, don't use `buf.push`, `buf.extend_from_slice`, or any other
71    // methods that require bounds checking or the like. This makes an enormous
72    // difference to performance (we may want to stop using a `Vec` entirely).
73    buf: Vec<u8>,
74    // #30888: If the inner writer panics in a call to write, we don't want to
75    // write the buffered data a second time in BufWriter's destructor. This
76    // flag tells the Drop impl if it should skip the flush.
77    panicked: bool,
78    inner: W,
79}
80
81impl<W: Write> BufWriter<W> {
82    /// Creates a new `BufWriter<W>` with a default buffer capacity. The default is currently 8 KiB,
83    /// but may change in the future.
84    ///
85    /// # Examples
86    ///
87    /// ```no_run
88    /// use std::io::BufWriter;
89    /// use std::net::TcpStream;
90    ///
91    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
92    /// ```
93    pub fn new(inner: W) -> BufWriter<W> {
94        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
95    }
96
97    /// Creates a new `BufWriter<W>` with at least the specified buffer capacity.
98    ///
99    /// # Examples
100    ///
101    /// Creating a buffer with a buffer of at least a hundred bytes.
102    ///
103    /// ```no_run
104    /// use std::io::BufWriter;
105    /// use std::net::TcpStream;
106    ///
107    /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap();
108    /// let mut buffer = BufWriter::with_capacity(100, stream);
109    /// ```
110    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
111        BufWriter {
112            inner,
113            buf: Vec::with_capacity(capacity),
114            panicked: false,
115        }
116    }
117
118    /// Unwraps this `BufWriter<W>`, returning the underlying writer.
119    ///
120    /// The buffer is written out before returning the writer.
121    ///
122    /// # Errors
123    ///
124    /// An [`Err`] will be returned if an error occurs while flushing the buffer.
125    ///
126    /// # Examples
127    ///
128    /// ```no_run
129    /// use std::io::BufWriter;
130    /// use std::net::TcpStream;
131    ///
132    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
133    ///
134    /// // unwrap the TcpStream and flush the buffer
135    /// let stream = buffer.into_inner().unwrap();
136    /// ```
137    pub fn into_inner(mut self) -> Result<W, IntoInnerError<BufWriter<W>>> {
138        match self.flush_buf() {
139            Err(e) => Err(IntoInnerError::new(self, e)),
140            Ok(()) => Ok(self.into_parts().0),
141        }
142    }
143
144    /// Disassembles this `BufWriter<W>`, returning the underlying writer, and any buffered but
145    /// unwritten data.
146    ///
147    /// If the underlying writer panicked, it is not known what portion of the data was written.
148    /// In this case, we return `WriterPanicked` for the buffered data (from which the buffer
149    /// contents can still be recovered).
150    ///
151    /// `into_parts` makes no attempt to flush data and cannot fail.
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// use std::io::{BufWriter, Write};
157    ///
158    /// let mut buffer = [0u8; 10];
159    /// let mut stream = BufWriter::new(buffer.as_mut());
160    /// write!(stream, "too much data").unwrap();
161    /// stream.flush().expect_err("it doesn't fit");
162    /// let (recovered_writer, buffered_data) = stream.into_parts();
163    /// assert_eq!(recovered_writer.len(), 0);
164    /// assert_eq!(&buffered_data.unwrap(), b"ata");
165    /// ```
166    pub fn into_parts(self) -> (W, Result<Vec<u8>, WriterPanicked>) {
167        let mut this = ManuallyDrop::new(self);
168        let buf = mem::take(&mut this.buf);
169        let buf = if !this.panicked {
170            Ok(buf)
171        } else {
172            Err(WriterPanicked { buf })
173        };
174
175        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
176        let inner = unsafe { ptr::read(&this.inner) };
177
178        (inner, buf)
179    }
180}
181
182impl<W: ?Sized + Write> BufWriter<W> {
183    /// Send data in our local buffer into the inner writer, looping as
184    /// necessary until either it's all been sent or an error occurs.
185    ///
186    /// Because all the data in the buffer has been reported to our owner as
187    /// "successfully written" (by returning nonzero success values from
188    /// `write`), any 0-length writes from `inner` must be reported as i/o
189    /// errors from this method.
190    pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> {
191        /// Helper struct to ensure the buffer is updated after all the writes
192        /// are complete. It tracks the number of written bytes and drains them
193        /// all from the front of the buffer when dropped.
194        struct BufGuard<'a> {
195            buffer: &'a mut Vec<u8>,
196            written: usize,
197        }
198
199        impl<'a> BufGuard<'a> {
200            fn new(buffer: &'a mut Vec<u8>) -> Self {
201                Self { buffer, written: 0 }
202            }
203
204            /// The unwritten part of the buffer
205            fn remaining(&self) -> &[u8] {
206                &self.buffer[self.written..]
207            }
208
209            /// Flag some bytes as removed from the front of the buffer
210            fn consume(&mut self, amt: usize) {
211                self.written += amt;
212            }
213
214            /// true if all of the bytes have been written
215            fn done(&self) -> bool {
216                self.written >= self.buffer.len()
217            }
218        }
219
220        impl Drop for BufGuard<'_> {
221            fn drop(&mut self) {
222                if self.written > 0 {
223                    self.buffer.drain(..self.written);
224                }
225            }
226        }
227
228        let mut guard = BufGuard::new(&mut self.buf);
229        while !guard.done() {
230            self.panicked = true;
231            let r = self.inner.write(guard.remaining());
232            self.panicked = false;
233
234            match r {
235                Ok(0) => {
236                    return Err(io::const_io_error!(
237                        ErrorKind::WriteZero,
238                        "failed to write the buffered data",
239                    ));
240                }
241                Ok(n) => guard.consume(n),
242                Err(ref e) if e.is_interrupted() => {}
243                Err(e) => return Err(e),
244            }
245        }
246        Ok(())
247    }
248
249    /// Buffer some data without flushing it, regardless of the size of the
250    /// data. Writes as much as possible without exceeding capacity. Returns
251    /// the number of bytes written.
252    pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
253        let available = self.spare_capacity();
254        let amt_to_buffer = available.min(buf.len());
255
256        // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
257        unsafe {
258            self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
259        }
260
261        amt_to_buffer
262    }
263
264    /// Gets a reference to the underlying writer.
265    ///
266    /// # Examples
267    ///
268    /// ```no_run
269    /// use std::io::BufWriter;
270    /// use std::net::TcpStream;
271    ///
272    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
273    ///
274    /// // we can use reference just like buffer
275    /// let reference = buffer.get_ref();
276    /// ```
277    pub fn get_ref(&self) -> &W {
278        &self.inner
279    }
280
281    /// Gets a mutable reference to the underlying writer.
282    ///
283    /// It is inadvisable to directly write to the underlying writer.
284    ///
285    /// # Examples
286    ///
287    /// ```no_run
288    /// use std::io::BufWriter;
289    /// use std::net::TcpStream;
290    ///
291    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
292    ///
293    /// // we can use reference just like buffer
294    /// let reference = buffer.get_mut();
295    /// ```
296    pub fn get_mut(&mut self) -> &mut W {
297        &mut self.inner
298    }
299
300    /// Returns a reference to the internally buffered data.
301    ///
302    /// # Examples
303    ///
304    /// ```no_run
305    /// use std::io::BufWriter;
306    /// use std::net::TcpStream;
307    ///
308    /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
309    ///
310    /// // See how many bytes are currently buffered
311    /// let bytes_buffered = buf_writer.buffer().len();
312    /// ```
313    pub fn buffer(&self) -> &[u8] {
314        &self.buf
315    }
316
317    /// Returns a mutable reference to the internal buffer.
318    ///
319    /// This can be used to write data directly into the buffer without triggering writers
320    /// to the underlying writer.
321    ///
322    /// That the buffer is a `Vec` is an implementation detail.
323    /// Callers should not modify the capacity as there currently is no public API to do so
324    /// and thus any capacity changes would be unexpected by the user.
325    #[allow(dead_code)]
326    pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> {
327        &mut self.buf
328    }
329
330    /// Returns the number of bytes the internal buffer can hold without flushing.
331    ///
332    /// # Examples
333    ///
334    /// ```no_run
335    /// use std::io::BufWriter;
336    /// use std::net::TcpStream;
337    ///
338    /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
339    ///
340    /// // Check the capacity of the inner buffer
341    /// let capacity = buf_writer.capacity();
342    /// // Calculate how many bytes can be written without flushing
343    /// let without_flush = capacity - buf_writer.buffer().len();
344    /// ```
345    pub fn capacity(&self) -> usize {
346        self.buf.capacity()
347    }
348
349    // Ensure this function does not get inlined into `write`, so that it
350    // remains inlineable and its common path remains as short as possible.
351    // If this function ends up being called frequently relative to `write`,
352    // it's likely a sign that the client is using an improperly sized buffer
353    // or their write patterns are somewhat pathological.
354    #[cold]
355    #[inline(never)]
356    fn write_cold(&mut self, buf: &[u8]) -> io::Result<usize> {
357        if buf.len() > self.spare_capacity() {
358            self.flush_buf()?;
359        }
360
361        // Why not len > capacity? To avoid a needless trip through the buffer when the input
362        // exactly fills it. We'd just need to flush it to the underlying writer anyway.
363        if buf.len() >= self.buf.capacity() {
364            self.panicked = true;
365            let r = self.get_mut().write(buf);
366            self.panicked = false;
367            r
368        } else {
369            // Write to the buffer. In this case, we write to the buffer even if it fills it
370            // exactly. Doing otherwise would mean flushing the buffer, then writing this
371            // input to the inner writer, which in many cases would be a worse strategy.
372
373            // SAFETY: There was either enough spare capacity already, or there wasn't and we
374            // flushed the buffer to ensure that there is. In the latter case, we know that there
375            // is because flushing ensured that our entire buffer is spare capacity, and we entered
376            // this block because the input buffer length is less than that capacity. In either
377            // case, it's safe to write the input buffer to our buffer.
378            unsafe {
379                self.write_to_buffer_unchecked(buf);
380            }
381
382            Ok(buf.len())
383        }
384    }
385
386    // Ensure this function does not get inlined into `write_all`, so that it
387    // remains inlineable and its common path remains as short as possible.
388    // If this function ends up being called frequently relative to `write_all`,
389    // it's likely a sign that the client is using an improperly sized buffer
390    // or their write patterns are somewhat pathological.
391    #[cold]
392    #[inline(never)]
393    fn write_all_cold(&mut self, buf: &[u8]) -> io::Result<()> {
394        // Normally, `write_all` just calls `write` in a loop. We can do better
395        // by calling `self.get_mut().write_all()` directly, which avoids
396        // round trips through the buffer in the event of a series of partial
397        // writes in some circumstances.
398
399        if buf.len() > self.spare_capacity() {
400            self.flush_buf()?;
401        }
402
403        // Why not len > capacity? To avoid a needless trip through the buffer when the input
404        // exactly fills it. We'd just need to flush it to the underlying writer anyway.
405        if buf.len() >= self.buf.capacity() {
406            self.panicked = true;
407            let r = self.get_mut().write_all(buf);
408            self.panicked = false;
409            r
410        } else {
411            // Write to the buffer. In this case, we write to the buffer even if it fills it
412            // exactly. Doing otherwise would mean flushing the buffer, then writing this
413            // input to the inner writer, which in many cases would be a worse strategy.
414
415            // SAFETY: There was either enough spare capacity already, or there wasn't and we
416            // flushed the buffer to ensure that there is. In the latter case, we know that there
417            // is because flushing ensured that our entire buffer is spare capacity, and we entered
418            // this block because the input buffer length is less than that capacity. In either
419            // case, it's safe to write the input buffer to our buffer.
420            unsafe {
421                self.write_to_buffer_unchecked(buf);
422            }
423
424            Ok(())
425        }
426    }
427
428    // SAFETY: Requires `buf.len() <= self.buf.capacity() - self.buf.len()`,
429    // i.e., that input buffer length is less than or equal to spare capacity.
430    #[inline]
431    unsafe fn write_to_buffer_unchecked(&mut self, buf: &[u8]) {
432        debug_assert!(buf.len() <= self.spare_capacity());
433        let old_len = self.buf.len();
434        let buf_len = buf.len();
435        let src = buf.as_ptr();
436        unsafe {
437            let dst = self.buf.as_mut_ptr().add(old_len);
438            ptr::copy_nonoverlapping(src, dst, buf_len);
439            self.buf.set_len(old_len + buf_len);
440        }
441    }
442
443    #[inline]
444    fn spare_capacity(&self) -> usize {
445        self.buf.capacity() - self.buf.len()
446    }
447}
448
449/// Error returned for the buffered data from `BufWriter::into_parts`, when the underlying
450/// writer has previously panicked.  Contains the (possibly partly written) buffered data.
451///
452/// # Example
453///
454/// ```
455/// use std::io::{self, BufWriter, Write};
456/// use std::panic::{catch_unwind, AssertUnwindSafe};
457///
458/// struct PanickingWriter;
459/// impl Write for PanickingWriter {
460///   fn write(&mut self, buf: &[u8]) -> io::Result<usize> { panic!() }
461///   fn flush(&mut self) -> io::Result<()> { panic!() }
462/// }
463///
464/// let mut stream = BufWriter::new(PanickingWriter);
465/// write!(stream, "some data").unwrap();
466/// let result = catch_unwind(AssertUnwindSafe(|| {
467///     stream.flush().unwrap()
468/// }));
469/// assert!(result.is_err());
470/// let (recovered_writer, buffered_data) = stream.into_parts();
471/// assert!(matches!(recovered_writer, PanickingWriter));
472/// assert_eq!(buffered_data.unwrap_err().into_inner(), b"some data");
473/// ```
474pub struct WriterPanicked {
475    buf: Vec<u8>,
476}
477
478impl WriterPanicked {
479    /// Returns the perhaps-unwritten data.  Some of this data may have been written by the
480    /// panicking call(s) to the underlying writer, so simply writing it again is not a good idea.
481    #[must_use = "`self` will be dropped if the result is not used"]
482    pub fn into_inner(self) -> Vec<u8> {
483        self.buf
484    }
485
486    const DESCRIPTION: &'static str =
487        "BufWriter inner writer panicked, what data remains unwritten is not known";
488}
489impl error::Error for WriterPanicked {
490    #[allow(deprecated, deprecated_in_future)]
491    fn description(&self) -> &str {
492        Self::DESCRIPTION
493    }
494}
495impl fmt::Display for WriterPanicked {
496    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497        write!(f, "{}", Self::DESCRIPTION)
498    }
499}
500impl fmt::Debug for WriterPanicked {
501    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502        f.debug_struct("WriterPanicked")
503            .field(
504                "buffer",
505                &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
506            )
507            .finish()
508    }
509}
510impl<W: ?Sized + Write> Write for BufWriter<W> {
511    #[inline]
512    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
513        // Use < instead of <= to avoid a needless trip through the buffer in some cases.
514        // See `write_cold` for details.
515        if buf.len() < self.spare_capacity() {
516            // SAFETY: safe by above conditional.
517            unsafe {
518                self.write_to_buffer_unchecked(buf);
519            }
520
521            Ok(buf.len())
522        } else {
523            self.write_cold(buf)
524        }
525    }
526
527    #[inline]
528    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
529        // Use < instead of <= to avoid a needless trip through the buffer in some cases.
530        // See `write_all_cold` for details.
531        if buf.len() < self.spare_capacity() {
532            // SAFETY: safe by above conditional.
533            unsafe {
534                self.write_to_buffer_unchecked(buf);
535            }
536
537            Ok(())
538        } else {
539            self.write_all_cold(buf)
540        }
541    }
542    fn is_write_vectored(&self) -> bool {
543        true
544    }
545
546    fn flush(&mut self) -> io::Result<()> {
547        self.flush_buf().and_then(|()| self.get_mut().flush())
548    }
549}
550impl<W: ?Sized + Write> fmt::Debug for BufWriter<W>
551where
552    W: fmt::Debug,
553{
554    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
555        fmt.debug_struct("BufWriter")
556            .field("writer", &&self.inner)
557            .field(
558                "buffer",
559                &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
560            )
561            .finish()
562    }
563}
564impl<W: ?Sized + Write + Seek> Seek for BufWriter<W> {
565    /// Seek to the offset, in bytes, in the underlying writer.
566    ///
567    /// Seeking always writes out the internal buffer before seeking.
568    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
569        self.flush_buf()?;
570        self.get_mut().seek(pos)
571    }
572}
573impl<W: ?Sized + Write> Drop for BufWriter<W> {
574    fn drop(&mut self) {
575        if !self.panicked {
576            // dtors should not panic, so we ignore a failed flush
577            let _r = self.flush_buf();
578        }
579    }
580}