qubit_io/buffered/buffered_byte_output.rs
1// =============================================================================
2// Copyright (c) 2026 Haixing Hu.
3//
4// SPDX-License-Identifier: Apache-2.0
5//
6// Licensed under the Apache License, Version 2.0.
7// =============================================================================
8
9use std::io::{
10 Error,
11 ErrorKind,
12 Result,
13 Seek,
14 SeekFrom,
15 Write,
16};
17
18use crate::Buffer;
19use crate::WriteExt;
20use crate::buffered::DEFAULT_BUFFER_CAPACITY;
21
22/// Buffered byte output over a wrapped writer.
23///
24/// This type keeps a fixed-size byte buffer in front of an underlying writer so
25/// small byte writes can be accumulated before they are written to the I/O
26/// target. Large writes may bypass the buffer after pending buffered bytes
27/// have been flushed.
28///
29/// `BufferedByteOutput` is deliberately byte-oriented. It performs no binary
30/// encoding, text encoding, or record framing. Higher-level writers can either
31/// use the standard [`Write`] implementation or write directly into
32/// [`Self::spare_buffer_mut`] or [`Self::spare_raw_parts_mut`] and then call
33/// [`Self::advance`] or [`Self::advance_unchecked`] after validating the range
34/// they initialized. Callers that need to recover the wrapped writer should
35/// call [`Write::flush`] first, then use [`Self::into_parts`].
36#[derive(Debug)]
37pub struct BufferedByteOutput<W> {
38 inner: W,
39 buffer: Buffer<u8>,
40}
41
42impl<W> BufferedByteOutput<W> {
43 /// Creates a buffered byte output with the default capacity.
44 ///
45 /// # Parameters
46 ///
47 /// * `inner` - The writer that receives bytes when the internal buffer is
48 /// flushed.
49 ///
50 /// # Returns
51 ///
52 /// A new buffered byte output using `DEFAULT_BUFFER_CAPACITY`.
53 #[inline(always)]
54 #[must_use]
55 pub fn new(inner: W) -> Self {
56 Self::with_capacity(inner, DEFAULT_BUFFER_CAPACITY)
57 }
58
59 /// Creates a buffered byte output with at least the requested capacity.
60 ///
61 /// # Parameters
62 ///
63 /// * `inner` - The writer that receives bytes when the internal buffer is
64 /// flushed.
65 /// * `capacity` - The requested internal buffer capacity in bytes.
66 ///
67 /// # Returns
68 ///
69 /// A new buffered byte output whose actual buffer capacity is
70 /// `capacity.max(1)`.
71 #[inline(always)]
72 #[must_use]
73 pub fn with_capacity(inner: W, capacity: usize) -> Self {
74 Self {
75 inner,
76 buffer: Buffer::with_capacity(capacity),
77 }
78 }
79
80 /// Returns a shared reference to the wrapped writer.
81 ///
82 /// # Returns
83 ///
84 /// An immutable reference to the underlying writer. Pending bytes may
85 /// still be present in the internal buffer and are not flushed by this
86 /// method.
87 #[inline(always)]
88 pub const fn inner(&self) -> &W {
89 &self.inner
90 }
91
92 /// Returns an exclusive reference to the wrapped writer.
93 ///
94 /// Pending bytes may still be present in the internal buffer and are not
95 /// flushed by this method.
96 ///
97 /// # Returns
98 ///
99 /// A mutable reference to the underlying writer.
100 #[inline(always)]
101 pub fn inner_mut(&mut self) -> &mut W {
102 &mut self.inner
103 }
104
105 /// Returns the internal buffer capacity.
106 ///
107 /// # Returns
108 ///
109 /// The total number of bytes that can be held by the internal buffer.
110 #[inline(always)]
111 #[must_use]
112 pub fn capacity(&self) -> usize {
113 self.buffer.capacity()
114 }
115
116 /// Returns the unused capacity in the internal buffer.
117 ///
118 /// # Returns
119 ///
120 /// The number of bytes that can still be appended to the internal buffer
121 /// before it must be flushed.
122 #[inline(always)]
123 #[must_use]
124 pub fn spare_capacity(&self) -> usize {
125 self.buffer.spare_capacity()
126 }
127
128 /// Returns the unused portion of the internal buffer.
129 ///
130 /// Callers may write initialized bytes into the returned slice and then
131 /// call [`Self::advance`] with the number of bytes written.
132 ///
133 /// # Returns
134 ///
135 /// A mutable slice over the spare buffer capacity.
136 #[inline(always)]
137 #[must_use]
138 pub fn spare_buffer_mut(&mut self) -> &mut [u8] {
139 let limit = self.buffer.limit();
140 &mut self.buffer.data_mut()[limit..]
141 }
142
143 /// Returns raw spare-buffer parts for hot-path callers.
144 ///
145 /// The returned slice is the full internal backing storage. `index` is the
146 /// start of the spare byte window, and `count` is the number of spare
147 /// bytes. Callers that need a slice can use `&mut buffer[index..index +
148 /// count]`; callers that already validated bounds can pass `buffer` and
149 /// `index` directly to indexed unchecked codecs.
150 ///
151 /// Mutating bytes outside `index..index + count` changes pending output
152 /// bytes and may corrupt the logical stream.
153 ///
154 /// # Returns
155 ///
156 /// The backing storage, the spare start index, and the spare byte count.
157 #[inline(always)]
158 #[must_use]
159 pub fn spare_raw_parts_mut(&mut self) -> (&mut [u8], usize, usize) {
160 let index = self.buffer.limit();
161 let count = self.buffer.spare_capacity();
162 (self.buffer.data_mut(), index, count)
163 }
164
165 /// Marks `count` bytes from [`Self::spare_buffer_mut`] as written.
166 ///
167 /// # Parameters
168 ///
169 /// * `count` - Number of bytes initialized by the caller.
170 ///
171 /// # Panics
172 ///
173 /// Panics when `count` exceeds [`Self::spare_capacity`].
174 #[inline(always)]
175 pub fn advance(&mut self, count: usize) {
176 assert!(
177 count <= self.spare_capacity(),
178 "cannot advance beyond spare output buffer"
179 );
180 // SAFETY: The assertion proves that `count` is within spare capacity.
181 unsafe {
182 self.buffer.advance_unchecked(count);
183 }
184 }
185
186 /// Marks spare bytes as written without checking bounds.
187 ///
188 /// # Parameters
189 ///
190 /// * `count` - Number of initialized spare bytes to make pending for
191 /// output.
192 ///
193 /// # Safety
194 ///
195 /// The caller must guarantee that `count <= self.spare_capacity()` and
196 /// that the corresponding bytes returned by [`Self::spare_buffer_mut`]
197 /// have been initialized.
198 #[inline(always)]
199 pub unsafe fn advance_unchecked(&mut self, count: usize) {
200 // SAFETY: The caller guarantees that `count` is within spare capacity.
201 unsafe {
202 self.buffer.advance_unchecked(count);
203 }
204 }
205
206 /// Writes bytes into the internal buffer without checking spare capacity.
207 ///
208 /// # Parameters
209 ///
210 /// * `input` - The source bytes.
211 /// * `input_index` - The starting index in `input`.
212 /// * `count` - The number of bytes to copy.
213 ///
214 /// # Safety
215 ///
216 /// The caller must ensure that `input_index..input_index + count` is valid
217 /// in `input`, that `count <= self.spare_capacity()`, and that the copied
218 /// source range does not overlap with the destination range in the internal
219 /// buffer.
220 #[inline(always)]
221 unsafe fn write_to_buffer_unchecked(
222 &mut self,
223 input: &[u8],
224 input_index: usize,
225 count: usize,
226 ) {
227 // SAFETY: The caller upholds `Buffer::copy_from_unchecked` range and
228 // non-overlap requirements.
229 unsafe {
230 self.buffer.copy_from_unchecked(input, input_index, count);
231 }
232 }
233}
234
235impl<W> BufferedByteOutput<W>
236where
237 W: Write,
238{
239 /// Consumes this buffered output without flushing pending bytes.
240 ///
241 /// This method performs no I/O. Pending bytes that have been accepted into
242 /// the internal buffer but not written to the wrapped writer are returned
243 /// as the second tuple item.
244 ///
245 /// # Returns
246 ///
247 /// The wrapped writer and pending bytes in logical write order.
248 #[inline(always)]
249 #[must_use]
250 pub fn into_parts(self) -> (W, Vec<u8>) {
251 let pending = self.pending_slice().to_vec();
252 (self.inner, pending)
253 }
254
255 /// Ensures that at least `count` bytes are available in the spare buffer.
256 ///
257 /// # Parameters
258 ///
259 /// * `count` - Number of spare bytes required.
260 ///
261 /// # Errors
262 ///
263 /// Returns any non-interrupted I/O error produced while flushing buffered
264 /// bytes. Returns [`ErrorKind::InvalidInput`] if `count` exceeds the buffer
265 /// capacity. Returns [`ErrorKind::InvalidData`] if the wrapped writer
266 /// reports more bytes than the pending buffer range contained.
267 pub fn ensure_spare_capacity(&mut self, count: usize) -> Result<()> {
268 if count > self.buffer.capacity() {
269 return Err(Error::new(
270 ErrorKind::InvalidInput,
271 "requested spare capacity exceeds buffered output capacity",
272 ));
273 }
274 if self.spare_capacity() < count {
275 self.flush_buffer()?;
276 }
277 Ok(())
278 }
279
280 /// Writes all bytes through the internal buffer.
281 ///
282 /// Small inputs are appended to the internal buffer. Inputs that do not
283 /// fit may flush the buffer first, and inputs at least as large as the
284 /// buffer may be written directly to the wrapped writer.
285 ///
286 /// # Parameters
287 ///
288 /// * `input` - The bytes to write.
289 ///
290 /// # Returns
291 ///
292 /// `Ok(())` after all bytes from `input` have been accepted.
293 ///
294 /// # Errors
295 ///
296 /// Returns any I/O error produced while flushing pending bytes or writing a
297 /// large input directly to the wrapped writer. Flush failures include
298 /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
299 /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
300 /// it reports more bytes than the requested range contained.
301 #[inline]
302 fn write_all_buffered(&mut self, input: &[u8]) -> Result<()> {
303 if input.len() < self.spare_capacity() {
304 // SAFETY: The branch proves that the input fits in spare capacity.
305 unsafe {
306 self.write_to_buffer_unchecked(input, 0, input.len());
307 }
308 Ok(())
309 } else {
310 self.write_all_cold(input)
311 }
312 }
313
314 /// Handles slow-path raw writes that must flush or bypass the buffer.
315 ///
316 /// # Parameters
317 ///
318 /// * `input` - The bytes to write after the fast path determined that they
319 /// do not fit comfortably in the current spare buffer capacity.
320 ///
321 /// # Returns
322 ///
323 /// `Ok(())` after all bytes from `input` have been accepted either by the
324 /// buffer or by the wrapped writer.
325 ///
326 /// # Errors
327 ///
328 /// Returns any I/O error produced while flushing pending bytes or writing a
329 /// large input directly to the wrapped writer. Flush failures include
330 /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
331 /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
332 /// it reports more bytes than the requested range contained.
333 #[cold]
334 #[inline(never)]
335 fn write_all_cold(&mut self, input: &[u8]) -> Result<()> {
336 if input.len() > self.spare_capacity() {
337 self.flush_buffer()?;
338 }
339 if input.len() >= self.buffer.capacity() {
340 // SAFETY: The range covers the full source slice.
341 unsafe { self.write_all_inner_unchecked(input, 0, input.len()) }
342 } else {
343 // SAFETY: After the optional flush, any input smaller than the
344 // buffer capacity fits in the empty or sufficiently spare buffer.
345 unsafe {
346 self.write_to_buffer_unchecked(input, 0, input.len());
347 }
348 Ok(())
349 }
350 }
351
352 /// Handles slow-path raw writes for [`Write::write`] semantics.
353 ///
354 /// The method preserves `Write::write` behavior: it may accept fewer bytes
355 /// than the input length when the write is delegated directly to the
356 /// wrapped writer.
357 ///
358 /// # Parameters
359 ///
360 /// * `input` - The bytes to write after the fast path determined that they
361 /// do not fit comfortably in the current spare buffer capacity.
362 ///
363 /// # Returns
364 ///
365 /// The number of bytes accepted. Buffered writes return `input.len()`;
366 /// direct writes return the byte count reported by the wrapped writer.
367 ///
368 /// # Errors
369 ///
370 /// Returns any I/O error produced while flushing pending bytes or writing a
371 /// large input directly to the wrapped writer. Flush failures include
372 /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
373 /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
374 /// it reports more bytes than the requested range contained.
375 #[cold]
376 #[inline(never)]
377 fn write_cold(&mut self, input: &[u8]) -> Result<usize> {
378 if input.len() > self.spare_capacity() {
379 self.flush_buffer()?;
380 }
381 if input.len() >= self.buffer.capacity() {
382 // SAFETY: The range covers the full source slice.
383 unsafe { self.write_inner_unchecked(input, 0, input.len()) }
384 } else {
385 // SAFETY: After the optional flush, any input smaller than the
386 // buffer capacity fits in the empty or sufficiently spare buffer.
387 unsafe {
388 self.write_to_buffer_unchecked(input, 0, input.len());
389 }
390 Ok(input.len())
391 }
392 }
393
394 /// Flushes buffered bytes to the wrapped writer.
395 ///
396 /// The method retries interrupted writes. If an error occurs after some
397 /// bytes have been written, the already-written bytes are removed from the
398 /// front of the buffer and the unwritten suffix is kept for a later retry.
399 ///
400 /// # Returns
401 ///
402 /// `Ok(())` once all currently buffered bytes have been written to the
403 /// wrapped writer.
404 ///
405 /// # Errors
406 ///
407 /// Returns any non-interrupted I/O error produced by the wrapped writer.
408 /// Returns [`ErrorKind::WriteZero`] if the writer reports a zero-length
409 /// write before all buffered bytes are drained. Returns
410 /// [`ErrorKind::InvalidData`] if the writer reports more bytes than the
411 /// pending buffer range contained.
412 pub fn flush_buffer(&mut self) -> Result<()> {
413 while !self.buffer.is_empty() {
414 let position = self.buffer.position();
415 let available = self.buffer.available();
416 // SAFETY: `position..position + available` is the current readable
417 // range maintained by `Buffer`.
418 match unsafe {
419 self.inner.write_unchecked(
420 self.buffer.data(),
421 position,
422 available,
423 )
424 } {
425 Ok(0) => {
426 self.buffer.compact();
427 return Err(Error::new(
428 ErrorKind::WriteZero,
429 "failed to write buffered data",
430 ));
431 }
432 Ok(written) => {
433 if let Err(error) = validate_write_count(written, available)
434 {
435 self.buffer.compact();
436 return Err(error);
437 }
438 // SAFETY: The validated count is in `0..=available`.
439 unsafe {
440 self.buffer.consume_unchecked(written);
441 }
442 }
443 Err(error) if error.kind() == ErrorKind::Interrupted => {}
444 Err(error) => {
445 self.buffer.compact();
446 return Err(error);
447 }
448 }
449 }
450 self.buffer.clear();
451 Ok(())
452 }
453
454 /// Flushes buffered bytes and then flushes the wrapped writer.
455 ///
456 /// # Returns
457 ///
458 /// `Ok(())` once pending buffered bytes have been written and the wrapped
459 /// writer's own flush operation succeeds.
460 ///
461 /// # Errors
462 ///
463 /// Returns any non-interrupted I/O error produced while flushing buffered
464 /// bytes, [`ErrorKind::WriteZero`] if the wrapped writer cannot make
465 /// progress while draining the buffer, [`ErrorKind::InvalidData`] if the
466 /// writer reports an impossible byte count, or any error returned by
467 /// [`Write::flush`] on the wrapped writer.
468 #[inline(always)]
469 fn flush_all(&mut self) -> Result<()> {
470 self.flush_buffer().and_then(|()| self.inner.flush())
471 }
472
473 /// Writes bytes from the input slice and reports the accepted byte count.
474 ///
475 /// This is the buffered implementation for [`Write::write`]-style callers.
476 /// Small inputs are appended to the buffer and reported as fully accepted;
477 /// large inputs may be delegated to the wrapped writer after pending bytes
478 /// are flushed.
479 ///
480 /// # Parameters
481 ///
482 /// * `input` - The bytes to write.
483 ///
484 /// # Returns
485 ///
486 /// The number of bytes accepted. Buffered writes return `input.len()`;
487 /// direct writes return the byte count reported by the wrapped writer.
488 ///
489 /// # Errors
490 ///
491 /// Returns any I/O error produced while flushing pending bytes or writing a
492 /// large input directly to the wrapped writer. Flush failures include
493 /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
494 /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
495 /// it reports more bytes than the requested range contained.
496 #[inline]
497 fn write_from(&mut self, input: &[u8]) -> Result<usize> {
498 if input.len() < self.spare_capacity() {
499 // SAFETY: The branch proves that the input fits in spare capacity.
500 unsafe {
501 self.write_to_buffer_unchecked(input, 0, input.len());
502 }
503 Ok(input.len())
504 } else {
505 self.write_cold(input)
506 }
507 }
508
509 /// Flushes pending bytes before seeking the wrapped writer.
510 ///
511 /// # Parameters
512 ///
513 /// * `position` - The target seek position passed to the wrapped writer.
514 ///
515 /// # Returns
516 ///
517 /// The new stream position reported by the wrapped writer.
518 ///
519 /// # Errors
520 ///
521 /// Returns any non-interrupted I/O error produced while flushing buffered
522 /// bytes, [`ErrorKind::WriteZero`] if the wrapped writer cannot make
523 /// progress while draining the buffer, [`ErrorKind::InvalidData`] if the
524 /// writer reports an impossible byte count, or any error returned by
525 /// [`Seek::seek`] on the wrapped writer.
526 #[inline(always)]
527 fn flush_then_seek(&mut self, position: SeekFrom) -> Result<u64>
528 where
529 W: Seek,
530 {
531 self.flush_buffer().and_then(|()| self.inner.seek(position))
532 }
533
534 /// Returns pending bytes currently stored in the internal buffer.
535 ///
536 /// # Returns
537 ///
538 /// A slice over bytes accepted by this output but not yet written to the
539 /// wrapped writer.
540 #[inline(always)]
541 fn pending_slice(&self) -> &[u8] {
542 &self.buffer.data()[self.buffer.position()..self.buffer.limit()]
543 }
544
545 /// Writes bytes to the wrapped writer and validates the reported count.
546 ///
547 /// # Parameters
548 ///
549 /// * `input` - Source storage.
550 /// * `input_index` - Start index inside `input`.
551 /// * `count` - Maximum number of bytes to write.
552 ///
553 /// # Returns
554 ///
555 /// The number of bytes accepted by the wrapped writer.
556 ///
557 /// # Errors
558 ///
559 /// Returns the wrapped writer's I/O error, or [`ErrorKind::InvalidData`]
560 /// if it reports a byte count larger than `count`.
561 ///
562 /// # Safety
563 ///
564 /// The caller must guarantee that `input_index..input_index + count` is a
565 /// valid range inside `input` and that the addition does not overflow.
566 #[inline(always)]
567 unsafe fn write_inner_unchecked(
568 &mut self,
569 input: &[u8],
570 input_index: usize,
571 count: usize,
572 ) -> Result<usize> {
573 // SAFETY: The caller guarantees the source range is valid.
574 let written =
575 unsafe { self.inner.write_unchecked(input, input_index, count) }?;
576 validate_write_count(written, count)?;
577 Ok(written)
578 }
579
580 /// Writes all bytes in an indexed source range to the wrapped writer.
581 ///
582 /// # Parameters
583 ///
584 /// * `input` - Source storage.
585 /// * `input_index` - Start index inside `input`.
586 /// * `count` - Number of bytes to write.
587 ///
588 /// # Errors
589 ///
590 /// Returns the wrapped writer's I/O error, [`ErrorKind::WriteZero`] if the
591 /// writer cannot make progress, or [`ErrorKind::InvalidData`] if it
592 /// reports an impossible byte count.
593 ///
594 /// # Safety
595 ///
596 /// The caller must guarantee that `input_index..input_index + count` is a
597 /// valid range inside `input` and that the addition does not overflow.
598 unsafe fn write_all_inner_unchecked(
599 &mut self,
600 input: &[u8],
601 input_index: usize,
602 count: usize,
603 ) -> Result<()> {
604 let mut written = 0;
605 while written < count {
606 let remaining = count - written;
607 // SAFETY: `written < count`, so this suffix remains inside the
608 // caller-validated source range.
609 match unsafe {
610 self.write_inner_unchecked(
611 input,
612 input_index + written,
613 remaining,
614 )
615 } {
616 Ok(0) => {
617 return Err(Error::new(
618 ErrorKind::WriteZero,
619 "failed to write whole buffer",
620 ));
621 }
622 Ok(count) => written += count,
623 Err(error) if error.kind() == ErrorKind::Interrupted => {}
624 Err(error) => return Err(error),
625 }
626 }
627 Ok(())
628 }
629}
630
631impl<W> Write for BufferedByteOutput<W>
632where
633 W: Write,
634{
635 /// Writes bytes through the internal buffer.
636 #[inline(always)]
637 fn write(&mut self, buffer: &[u8]) -> Result<usize> {
638 self.write_from(buffer)
639 }
640
641 /// Writes all bytes through the internal buffer.
642 #[inline(always)]
643 fn write_all(&mut self, buffer: &[u8]) -> Result<()> {
644 self.write_all_buffered(buffer)
645 }
646
647 /// Flushes the internal buffer and then the wrapped writer.
648 #[inline(always)]
649 fn flush(&mut self) -> Result<()> {
650 self.flush_all()
651 }
652}
653
654impl<W> Seek for BufferedByteOutput<W>
655where
656 W: Write + Seek,
657{
658 /// Flushes pending bytes before seeking the wrapped writer.
659 #[inline(always)]
660 fn seek(&mut self, position: SeekFrom) -> Result<u64> {
661 self.flush_then_seek(position)
662 }
663}
664
665/// Validates a byte count returned by a wrapped writer.
666///
667/// # Parameters
668///
669/// * `written` - Byte count reported by the wrapped writer.
670/// * `requested` - Maximum byte count requested from the wrapped writer.
671///
672/// # Errors
673///
674/// Returns [`ErrorKind::InvalidData`] when the wrapped writer reports more
675/// bytes than the source range contained.
676#[inline(always)]
677fn validate_write_count(written: usize, requested: usize) -> Result<()> {
678 if written > requested {
679 return Err(Error::new(
680 ErrorKind::InvalidData,
681 format!(
682 "writer reported {written} bytes for a {requested}-byte buffer"
683 ),
684 ));
685 }
686 Ok(())
687}