ckb_rust_std/io/buffered/bufwriter.rs
1use crate::io::{
2 self, error::core_error as error, error::ErrorKind, IntoInnerError, Seek, SeekFrom, Write,
3 DEFAULT_BUF_SIZE,
4};
5use alloc::fmt;
6use alloc::vec::Vec;
7use core::mem::{self, ManuallyDrop};
8use core::ptr;
9/// Wraps a writer and buffers its output.
10///
11/// It can be excessively inefficient to work directly with something that
12/// implements [`Write`]. For example, every call to
13/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
14/// `BufWriter<W>` keeps an in-memory buffer of data and writes it to an underlying
15/// writer in large, infrequent batches.
16///
17/// `BufWriter<W>` can improve the speed of programs that make *small* and
18/// *repeated* write calls to the same file or network socket. It does not
19/// help when writing very large amounts at once, or writing just one or a few
20/// times. It also provides no advantage when writing to a destination that is
21/// in memory, like a <code>[Vec]\<u8></code>.
22///
23/// It is critical to call [`flush`] before `BufWriter<W>` is dropped. Though
24/// dropping will attempt to flush the contents of the buffer, any errors
25/// that happen in the process of dropping will be ignored. Calling [`flush`]
26/// ensures that the buffer is empty and thus dropping will not even attempt
27/// file operations.
28///
29/// # Examples
30///
31/// Let's write the numbers one through ten to a [`TcpStream`]:
32///
33/// ```no_run
34/// use std::io::prelude::*;
35/// use std::net::TcpStream;
36///
37/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
38///
39/// for i in 0..10 {
40/// stream.write(&[i+1]).unwrap();
41/// }
42/// ```
43///
44/// Because we're not buffering, we write each one in turn, incurring the
45/// overhead of a system call per byte written. We can fix this with a
46/// `BufWriter<W>`:
47///
48/// ```no_run
49/// use std::io::prelude::*;
50/// use std::io::BufWriter;
51/// use std::net::TcpStream;
52///
53/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
54///
55/// for i in 0..10 {
56/// stream.write(&[i+1]).unwrap();
57/// }
58/// stream.flush().unwrap();
59/// ```
60///
61/// By wrapping the stream with a `BufWriter<W>`, these ten writes are all grouped
62/// together by the buffer and will all be written out in one system call when
63/// the `stream` is flushed.
64///
65/// [`TcpStream::write`]: crate::net::TcpStream::write
66/// [`TcpStream`]: crate::net::TcpStream
67/// [`flush`]: BufWriter::flush
68pub struct BufWriter<W: ?Sized + Write> {
69 // The buffer. Avoid using this like a normal `Vec` in common code paths.
70 // That is, don't use `buf.push`, `buf.extend_from_slice`, or any other
71 // methods that require bounds checking or the like. This makes an enormous
72 // difference to performance (we may want to stop using a `Vec` entirely).
73 buf: Vec<u8>,
74 // #30888: If the inner writer panics in a call to write, we don't want to
75 // write the buffered data a second time in BufWriter's destructor. This
76 // flag tells the Drop impl if it should skip the flush.
77 panicked: bool,
78 inner: W,
79}
80
81impl<W: Write> BufWriter<W> {
82 /// Creates a new `BufWriter<W>` with a default buffer capacity. The default is currently 8 KiB,
83 /// but may change in the future.
84 ///
85 /// # Examples
86 ///
87 /// ```no_run
88 /// use std::io::BufWriter;
89 /// use std::net::TcpStream;
90 ///
91 /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
92 /// ```
93 pub fn new(inner: W) -> BufWriter<W> {
94 BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
95 }
96
97 /// Creates a new `BufWriter<W>` with at least the specified buffer capacity.
98 ///
99 /// # Examples
100 ///
101 /// Creating a buffer with a buffer of at least a hundred bytes.
102 ///
103 /// ```no_run
104 /// use std::io::BufWriter;
105 /// use std::net::TcpStream;
106 ///
107 /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap();
108 /// let mut buffer = BufWriter::with_capacity(100, stream);
109 /// ```
110 pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
111 BufWriter {
112 inner,
113 buf: Vec::with_capacity(capacity),
114 panicked: false,
115 }
116 }
117
118 /// Unwraps this `BufWriter<W>`, returning the underlying writer.
119 ///
120 /// The buffer is written out before returning the writer.
121 ///
122 /// # Errors
123 ///
124 /// An [`Err`] will be returned if an error occurs while flushing the buffer.
125 ///
126 /// # Examples
127 ///
128 /// ```no_run
129 /// use std::io::BufWriter;
130 /// use std::net::TcpStream;
131 ///
132 /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
133 ///
134 /// // unwrap the TcpStream and flush the buffer
135 /// let stream = buffer.into_inner().unwrap();
136 /// ```
137 pub fn into_inner(mut self) -> Result<W, IntoInnerError<BufWriter<W>>> {
138 match self.flush_buf() {
139 Err(e) => Err(IntoInnerError::new(self, e)),
140 Ok(()) => Ok(self.into_parts().0),
141 }
142 }
143
144 /// Disassembles this `BufWriter<W>`, returning the underlying writer, and any buffered but
145 /// unwritten data.
146 ///
147 /// If the underlying writer panicked, it is not known what portion of the data was written.
148 /// In this case, we return `WriterPanicked` for the buffered data (from which the buffer
149 /// contents can still be recovered).
150 ///
151 /// `into_parts` makes no attempt to flush data and cannot fail.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use std::io::{BufWriter, Write};
157 ///
158 /// let mut buffer = [0u8; 10];
159 /// let mut stream = BufWriter::new(buffer.as_mut());
160 /// write!(stream, "too much data").unwrap();
161 /// stream.flush().expect_err("it doesn't fit");
162 /// let (recovered_writer, buffered_data) = stream.into_parts();
163 /// assert_eq!(recovered_writer.len(), 0);
164 /// assert_eq!(&buffered_data.unwrap(), b"ata");
165 /// ```
166 pub fn into_parts(self) -> (W, Result<Vec<u8>, WriterPanicked>) {
167 let mut this = ManuallyDrop::new(self);
168 let buf = mem::take(&mut this.buf);
169 let buf = if !this.panicked {
170 Ok(buf)
171 } else {
172 Err(WriterPanicked { buf })
173 };
174
175 // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
176 let inner = unsafe { ptr::read(&this.inner) };
177
178 (inner, buf)
179 }
180}
181
182impl<W: ?Sized + Write> BufWriter<W> {
183 /// Send data in our local buffer into the inner writer, looping as
184 /// necessary until either it's all been sent or an error occurs.
185 ///
186 /// Because all the data in the buffer has been reported to our owner as
187 /// "successfully written" (by returning nonzero success values from
188 /// `write`), any 0-length writes from `inner` must be reported as i/o
189 /// errors from this method.
190 pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> {
191 /// Helper struct to ensure the buffer is updated after all the writes
192 /// are complete. It tracks the number of written bytes and drains them
193 /// all from the front of the buffer when dropped.
194 struct BufGuard<'a> {
195 buffer: &'a mut Vec<u8>,
196 written: usize,
197 }
198
199 impl<'a> BufGuard<'a> {
200 fn new(buffer: &'a mut Vec<u8>) -> Self {
201 Self { buffer, written: 0 }
202 }
203
204 /// The unwritten part of the buffer
205 fn remaining(&self) -> &[u8] {
206 &self.buffer[self.written..]
207 }
208
209 /// Flag some bytes as removed from the front of the buffer
210 fn consume(&mut self, amt: usize) {
211 self.written += amt;
212 }
213
214 /// true if all of the bytes have been written
215 fn done(&self) -> bool {
216 self.written >= self.buffer.len()
217 }
218 }
219
220 impl Drop for BufGuard<'_> {
221 fn drop(&mut self) {
222 if self.written > 0 {
223 self.buffer.drain(..self.written);
224 }
225 }
226 }
227
228 let mut guard = BufGuard::new(&mut self.buf);
229 while !guard.done() {
230 self.panicked = true;
231 let r = self.inner.write(guard.remaining());
232 self.panicked = false;
233
234 match r {
235 Ok(0) => {
236 return Err(io::const_io_error!(
237 ErrorKind::WriteZero,
238 "failed to write the buffered data",
239 ));
240 }
241 Ok(n) => guard.consume(n),
242 Err(ref e) if e.is_interrupted() => {}
243 Err(e) => return Err(e),
244 }
245 }
246 Ok(())
247 }
248
249 /// Buffer some data without flushing it, regardless of the size of the
250 /// data. Writes as much as possible without exceeding capacity. Returns
251 /// the number of bytes written.
252 pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
253 let available = self.spare_capacity();
254 let amt_to_buffer = available.min(buf.len());
255
256 // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
257 unsafe {
258 self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
259 }
260
261 amt_to_buffer
262 }
263
264 /// Gets a reference to the underlying writer.
265 ///
266 /// # Examples
267 ///
268 /// ```no_run
269 /// use std::io::BufWriter;
270 /// use std::net::TcpStream;
271 ///
272 /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
273 ///
274 /// // we can use reference just like buffer
275 /// let reference = buffer.get_ref();
276 /// ```
277 pub fn get_ref(&self) -> &W {
278 &self.inner
279 }
280
281 /// Gets a mutable reference to the underlying writer.
282 ///
283 /// It is inadvisable to directly write to the underlying writer.
284 ///
285 /// # Examples
286 ///
287 /// ```no_run
288 /// use std::io::BufWriter;
289 /// use std::net::TcpStream;
290 ///
291 /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
292 ///
293 /// // we can use reference just like buffer
294 /// let reference = buffer.get_mut();
295 /// ```
296 pub fn get_mut(&mut self) -> &mut W {
297 &mut self.inner
298 }
299
300 /// Returns a reference to the internally buffered data.
301 ///
302 /// # Examples
303 ///
304 /// ```no_run
305 /// use std::io::BufWriter;
306 /// use std::net::TcpStream;
307 ///
308 /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
309 ///
310 /// // See how many bytes are currently buffered
311 /// let bytes_buffered = buf_writer.buffer().len();
312 /// ```
313 pub fn buffer(&self) -> &[u8] {
314 &self.buf
315 }
316
317 /// Returns a mutable reference to the internal buffer.
318 ///
319 /// This can be used to write data directly into the buffer without triggering writers
320 /// to the underlying writer.
321 ///
322 /// That the buffer is a `Vec` is an implementation detail.
323 /// Callers should not modify the capacity as there currently is no public API to do so
324 /// and thus any capacity changes would be unexpected by the user.
325 #[allow(dead_code)]
326 pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> {
327 &mut self.buf
328 }
329
330 /// Returns the number of bytes the internal buffer can hold without flushing.
331 ///
332 /// # Examples
333 ///
334 /// ```no_run
335 /// use std::io::BufWriter;
336 /// use std::net::TcpStream;
337 ///
338 /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
339 ///
340 /// // Check the capacity of the inner buffer
341 /// let capacity = buf_writer.capacity();
342 /// // Calculate how many bytes can be written without flushing
343 /// let without_flush = capacity - buf_writer.buffer().len();
344 /// ```
345 pub fn capacity(&self) -> usize {
346 self.buf.capacity()
347 }
348
349 // Ensure this function does not get inlined into `write`, so that it
350 // remains inlineable and its common path remains as short as possible.
351 // If this function ends up being called frequently relative to `write`,
352 // it's likely a sign that the client is using an improperly sized buffer
353 // or their write patterns are somewhat pathological.
354 #[cold]
355 #[inline(never)]
356 fn write_cold(&mut self, buf: &[u8]) -> io::Result<usize> {
357 if buf.len() > self.spare_capacity() {
358 self.flush_buf()?;
359 }
360
361 // Why not len > capacity? To avoid a needless trip through the buffer when the input
362 // exactly fills it. We'd just need to flush it to the underlying writer anyway.
363 if buf.len() >= self.buf.capacity() {
364 self.panicked = true;
365 let r = self.get_mut().write(buf);
366 self.panicked = false;
367 r
368 } else {
369 // Write to the buffer. In this case, we write to the buffer even if it fills it
370 // exactly. Doing otherwise would mean flushing the buffer, then writing this
371 // input to the inner writer, which in many cases would be a worse strategy.
372
373 // SAFETY: There was either enough spare capacity already, or there wasn't and we
374 // flushed the buffer to ensure that there is. In the latter case, we know that there
375 // is because flushing ensured that our entire buffer is spare capacity, and we entered
376 // this block because the input buffer length is less than that capacity. In either
377 // case, it's safe to write the input buffer to our buffer.
378 unsafe {
379 self.write_to_buffer_unchecked(buf);
380 }
381
382 Ok(buf.len())
383 }
384 }
385
386 // Ensure this function does not get inlined into `write_all`, so that it
387 // remains inlineable and its common path remains as short as possible.
388 // If this function ends up being called frequently relative to `write_all`,
389 // it's likely a sign that the client is using an improperly sized buffer
390 // or their write patterns are somewhat pathological.
391 #[cold]
392 #[inline(never)]
393 fn write_all_cold(&mut self, buf: &[u8]) -> io::Result<()> {
394 // Normally, `write_all` just calls `write` in a loop. We can do better
395 // by calling `self.get_mut().write_all()` directly, which avoids
396 // round trips through the buffer in the event of a series of partial
397 // writes in some circumstances.
398
399 if buf.len() > self.spare_capacity() {
400 self.flush_buf()?;
401 }
402
403 // Why not len > capacity? To avoid a needless trip through the buffer when the input
404 // exactly fills it. We'd just need to flush it to the underlying writer anyway.
405 if buf.len() >= self.buf.capacity() {
406 self.panicked = true;
407 let r = self.get_mut().write_all(buf);
408 self.panicked = false;
409 r
410 } else {
411 // Write to the buffer. In this case, we write to the buffer even if it fills it
412 // exactly. Doing otherwise would mean flushing the buffer, then writing this
413 // input to the inner writer, which in many cases would be a worse strategy.
414
415 // SAFETY: There was either enough spare capacity already, or there wasn't and we
416 // flushed the buffer to ensure that there is. In the latter case, we know that there
417 // is because flushing ensured that our entire buffer is spare capacity, and we entered
418 // this block because the input buffer length is less than that capacity. In either
419 // case, it's safe to write the input buffer to our buffer.
420 unsafe {
421 self.write_to_buffer_unchecked(buf);
422 }
423
424 Ok(())
425 }
426 }
427
428 // SAFETY: Requires `buf.len() <= self.buf.capacity() - self.buf.len()`,
429 // i.e., that input buffer length is less than or equal to spare capacity.
430 #[inline]
431 unsafe fn write_to_buffer_unchecked(&mut self, buf: &[u8]) {
432 debug_assert!(buf.len() <= self.spare_capacity());
433 let old_len = self.buf.len();
434 let buf_len = buf.len();
435 let src = buf.as_ptr();
436 unsafe {
437 let dst = self.buf.as_mut_ptr().add(old_len);
438 ptr::copy_nonoverlapping(src, dst, buf_len);
439 self.buf.set_len(old_len + buf_len);
440 }
441 }
442
443 #[inline]
444 fn spare_capacity(&self) -> usize {
445 self.buf.capacity() - self.buf.len()
446 }
447}
448
449/// Error returned for the buffered data from `BufWriter::into_parts`, when the underlying
450/// writer has previously panicked. Contains the (possibly partly written) buffered data.
451///
452/// # Example
453///
454/// ```
455/// use std::io::{self, BufWriter, Write};
456/// use std::panic::{catch_unwind, AssertUnwindSafe};
457///
458/// struct PanickingWriter;
459/// impl Write for PanickingWriter {
460/// fn write(&mut self, buf: &[u8]) -> io::Result<usize> { panic!() }
461/// fn flush(&mut self) -> io::Result<()> { panic!() }
462/// }
463///
464/// let mut stream = BufWriter::new(PanickingWriter);
465/// write!(stream, "some data").unwrap();
466/// let result = catch_unwind(AssertUnwindSafe(|| {
467/// stream.flush().unwrap()
468/// }));
469/// assert!(result.is_err());
470/// let (recovered_writer, buffered_data) = stream.into_parts();
471/// assert!(matches!(recovered_writer, PanickingWriter));
472/// assert_eq!(buffered_data.unwrap_err().into_inner(), b"some data");
473/// ```
474pub struct WriterPanicked {
475 buf: Vec<u8>,
476}
477
478impl WriterPanicked {
479 /// Returns the perhaps-unwritten data. Some of this data may have been written by the
480 /// panicking call(s) to the underlying writer, so simply writing it again is not a good idea.
481 #[must_use = "`self` will be dropped if the result is not used"]
482 pub fn into_inner(self) -> Vec<u8> {
483 self.buf
484 }
485
486 const DESCRIPTION: &'static str =
487 "BufWriter inner writer panicked, what data remains unwritten is not known";
488}
489impl error::Error for WriterPanicked {
490 #[allow(deprecated, deprecated_in_future)]
491 fn description(&self) -> &str {
492 Self::DESCRIPTION
493 }
494}
495impl fmt::Display for WriterPanicked {
496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497 write!(f, "{}", Self::DESCRIPTION)
498 }
499}
500impl fmt::Debug for WriterPanicked {
501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 f.debug_struct("WriterPanicked")
503 .field(
504 "buffer",
505 &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
506 )
507 .finish()
508 }
509}
510impl<W: ?Sized + Write> Write for BufWriter<W> {
511 #[inline]
512 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
513 // Use < instead of <= to avoid a needless trip through the buffer in some cases.
514 // See `write_cold` for details.
515 if buf.len() < self.spare_capacity() {
516 // SAFETY: safe by above conditional.
517 unsafe {
518 self.write_to_buffer_unchecked(buf);
519 }
520
521 Ok(buf.len())
522 } else {
523 self.write_cold(buf)
524 }
525 }
526
527 #[inline]
528 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
529 // Use < instead of <= to avoid a needless trip through the buffer in some cases.
530 // See `write_all_cold` for details.
531 if buf.len() < self.spare_capacity() {
532 // SAFETY: safe by above conditional.
533 unsafe {
534 self.write_to_buffer_unchecked(buf);
535 }
536
537 Ok(())
538 } else {
539 self.write_all_cold(buf)
540 }
541 }
542 fn is_write_vectored(&self) -> bool {
543 true
544 }
545
546 fn flush(&mut self) -> io::Result<()> {
547 self.flush_buf().and_then(|()| self.get_mut().flush())
548 }
549}
550impl<W: ?Sized + Write> fmt::Debug for BufWriter<W>
551where
552 W: fmt::Debug,
553{
554 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
555 fmt.debug_struct("BufWriter")
556 .field("writer", &&self.inner)
557 .field(
558 "buffer",
559 &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
560 )
561 .finish()
562 }
563}
564impl<W: ?Sized + Write + Seek> Seek for BufWriter<W> {
565 /// Seek to the offset, in bytes, in the underlying writer.
566 ///
567 /// Seeking always writes out the internal buffer before seeking.
568 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
569 self.flush_buf()?;
570 self.get_mut().seek(pos)
571 }
572}
573impl<W: ?Sized + Write> Drop for BufWriter<W> {
574 fn drop(&mut self) {
575 if !self.panicked {
576 // dtors should not panic, so we ignore a failed flush
577 let _r = self.flush_buf();
578 }
579 }
580}