qubit_io/buffered/buffered_byte_input.rs
1// =============================================================================
2// Copyright (c) 2026 Haixing Hu.
3//
4// SPDX-License-Identifier: Apache-2.0
5//
6// Licensed under the Apache License, Version 2.0.
7// =============================================================================
8
9use std::io::{
10 BufRead,
11 Error,
12 ErrorKind,
13 Read,
14 Result,
15 Seek,
16 SeekFrom,
17};
18
19use crate::Buffer;
20use crate::ReadExt;
21use crate::buffered::DEFAULT_BUFFER_CAPACITY;
22
23/// Buffered byte input over a wrapped reader.
24///
25/// This type owns a wrapped input object and an internal byte buffer. It keeps
26/// unread bytes in `buffer[position..limit]` so callers can inspect or consume
27/// the current byte window before refilling it.
28///
29/// `BufferedByteInput` is deliberately byte-oriented. It performs no binary
30/// decoding, text decoding, or record parsing; higher-level stream adapters can
31/// build those concerns on top of [`Self::unread_slice`],
32/// [`Self::unread_raw_parts`], [`Self::ensure_available`], and
33/// [`Self::read_into_unchecked`]. The type also implements [`BufRead`] for
34/// callers that want the standard buffered-read interface.
35#[derive(Debug)]
36pub struct BufferedByteInput<R> {
37 inner: R,
38 buffer: Buffer<u8>,
39}
40
41impl<R> BufferedByteInput<R> {
42 /// Creates a buffered byte input with the default capacity.
43 ///
44 /// # Arguments
45 ///
46 /// * `inner` - The input object wrapped by this buffer.
47 ///
48 /// # Returns
49 ///
50 /// A new buffered byte input whose internal buffer has at least
51 /// `DEFAULT_BUFFER_CAPACITY` bytes.
52 #[inline(always)]
53 #[must_use]
54 pub fn new(inner: R) -> Self {
55 Self::with_capacity(inner, DEFAULT_BUFFER_CAPACITY)
56 }
57
58 /// Creates a buffered byte input with at least the requested capacity.
59 ///
60 /// The actual capacity is raised to `1` when the requested value is `0`.
61 ///
62 /// # Arguments
63 ///
64 /// * `inner` - The input object wrapped by this buffer.
65 /// * `capacity` - The requested internal buffer capacity, in bytes.
66 ///
67 /// # Returns
68 ///
69 /// A new buffered byte input whose internal buffer capacity is
70 /// `capacity.max(1)`.
71 #[inline]
72 #[must_use]
73 pub fn with_capacity(inner: R, capacity: usize) -> Self {
74 Self {
75 inner,
76 buffer: Buffer::with_capacity(capacity),
77 }
78 }
79
80 /// Returns a shared reference to the wrapped input object.
81 ///
82 /// # Returns
83 ///
84 /// A shared reference to the inner input object.
85 #[inline(always)]
86 pub const fn inner(&self) -> &R {
87 &self.inner
88 }
89
90 /// Returns an exclusive reference to the wrapped input object.
91 ///
92 /// Mutating the wrapped object directly may invalidate assumptions about
93 /// bytes already buffered by this value.
94 ///
95 /// # Returns
96 ///
97 /// An exclusive reference to the wrapped input object.
98 #[inline(always)]
99 pub fn inner_mut(&mut self) -> &mut R {
100 &mut self.inner
101 }
102
103 /// Consumes this buffered input and returns the wrapped input object plus
104 /// unread bytes.
105 ///
106 /// This method performs no I/O. Bytes that have already been read from the
107 /// wrapped input but not consumed by this buffered input are returned as
108 /// the second tuple item.
109 ///
110 /// # Returns
111 ///
112 /// The wrapped input object and a vector containing the unread buffered
113 /// bytes in logical read order.
114 #[inline(always)]
115 #[must_use]
116 pub fn into_parts(self) -> (R, Vec<u8>) {
117 let unread = self.unread_slice().to_vec();
118 (self.inner, unread)
119 }
120
121 /// Returns the internal buffer capacity.
122 ///
123 /// # Returns
124 ///
125 /// The total number of bytes that can be held by the internal buffer.
126 #[inline(always)]
127 #[must_use]
128 pub fn capacity(&self) -> usize {
129 self.buffer.capacity()
130 }
131
132 /// Returns the number of unread bytes currently buffered.
133 ///
134 /// # Returns
135 ///
136 /// The length of `buffer[position..limit]`, in bytes.
137 #[inline(always)]
138 #[must_use]
139 pub fn available(&self) -> usize {
140 self.buffer.available()
141 }
142
143 /// Returns the currently buffered unread bytes.
144 ///
145 /// # Returns
146 ///
147 /// The unread range `buffer[position..limit]`.
148 #[inline(always)]
149 #[must_use]
150 pub fn unread_slice(&self) -> &[u8] {
151 &self.buffer.data()[self.buffer.position()..self.buffer.limit()]
152 }
153
154 /// Returns raw unread-buffer parts for hot-path callers.
155 ///
156 /// The returned slice is the full internal backing storage. `index` is the
157 /// start of the unread byte window, and `count` is the number of unread
158 /// bytes. Callers that need a slice can use `&buffer[index..index +
159 /// count]`; callers that already validated bounds can pass `buffer` and
160 /// `index` directly to indexed unchecked codecs.
161 ///
162 /// # Returns
163 ///
164 /// The backing storage, the unread start index, and the unread byte count.
165 #[inline(always)]
166 #[must_use]
167 pub fn unread_raw_parts(&self) -> (&[u8], usize, usize) {
168 (
169 self.buffer.data(),
170 self.buffer.position(),
171 self.buffer.available(),
172 )
173 }
174
175 /// Advances the unread cursor by `count` bytes.
176 ///
177 /// # Parameters
178 ///
179 /// * `count` - Number of currently unread bytes to consume.
180 ///
181 /// # Panics
182 ///
183 /// Panics when `count` exceeds [`Self::available`].
184 #[inline(always)]
185 pub fn consume(&mut self, count: usize) {
186 assert!(
187 count <= self.available(),
188 "cannot consume beyond buffered input"
189 );
190 // SAFETY: The assertion proves that `count` is within the readable
191 // input window.
192 unsafe {
193 self.buffer.consume_unchecked(count);
194 }
195 }
196
197 /// Advances the unread cursor without checking bounds.
198 ///
199 /// # Parameters
200 ///
201 /// * `count` - Number of currently unread bytes to consume.
202 ///
203 /// # Safety
204 ///
205 /// The caller must guarantee that `count <= self.available()`.
206 #[inline(always)]
207 pub unsafe fn consume_unchecked(&mut self, count: usize) {
208 // SAFETY: The caller guarantees that `count` is within the readable
209 // input window.
210 unsafe {
211 self.buffer.consume_unchecked(count);
212 }
213 }
214
215 /// Returns the unused capacity at the end of the buffer.
216 ///
217 /// # Returns
218 ///
219 /// The number of writable bytes in `buffer[limit..]`.
220 #[inline(always)]
221 fn tail_capacity(&self) -> usize {
222 self.buffer.spare_capacity()
223 }
224
225 /// Invalidates all buffered bytes.
226 ///
227 /// After this call, the buffer is considered empty and subsequent reads
228 /// will refill it from the wrapped reader.
229 #[inline(always)]
230 fn discard_buffer(&mut self) {
231 self.buffer.clear();
232 }
233
234 /// Moves unread bytes to the front of the buffer.
235 ///
236 /// This preserves the unread range while reclaiming tail capacity for
237 /// future reads. If there are no unread bytes, the buffer is discarded.
238 #[inline(always)]
239 fn backshift(&mut self) {
240 self.buffer.compact();
241 }
242}
243
244impl<R> BufferedByteInput<R>
245where
246 R: Read,
247{
248 /// Appends one more chunk from the wrapped reader to the internal buffer.
249 ///
250 /// This method reads into `buffer[limit..]` and advances `limit` by the
251 /// number of bytes read. It retries automatically when the wrapped reader
252 /// returns [`ErrorKind::Interrupted`].
253 ///
254 /// # Returns
255 ///
256 /// `Ok(true)` if at least one byte was appended, or `Ok(false)` if the
257 /// wrapped reader reached EOF.
258 ///
259 /// # Errors
260 ///
261 /// Returns any non-interrupted I/O error produced by the wrapped reader.
262 /// Returns [`ErrorKind::InvalidData`] if the wrapped reader reports more
263 /// bytes than the spare buffer range could hold.
264 fn read_more(&mut self) -> Result<bool> {
265 let count = self.tail_capacity();
266 debug_assert!(count > 0, "buffer has no tail capacity");
267 loop {
268 let limit = self.buffer.limit();
269 // SAFETY: `limit` is always within `buffer`, and `count` is the
270 // remaining capacity from `limit` to the end of `buffer`.
271 match unsafe {
272 self.inner
273 .read_unchecked(self.buffer.data_mut(), limit, count)
274 } {
275 Ok(0) => return Ok(false),
276 Ok(read) => {
277 validate_read_count(read, count)?;
278 // SAFETY: `read_unchecked` returns a count in
279 // `0..=count`, and `count` was the spare capacity.
280 unsafe {
281 self.buffer.advance_unchecked(read);
282 }
283 return Ok(true);
284 }
285 Err(error) if error.kind() == ErrorKind::Interrupted => {
286 continue;
287 }
288 Err(error) => return Err(error),
289 }
290 }
291 }
292
293 /// Refills the internal buffer after preserving unread bytes.
294 ///
295 /// Consumed bytes may be discarded, and unread bytes may be moved to the
296 /// front of the buffer before the wrapped reader is called.
297 ///
298 /// # Returns
299 ///
300 /// `Ok(true)` if at least one byte was appended, or `Ok(false)` at EOF.
301 ///
302 /// # Errors
303 ///
304 /// Returns any non-interrupted I/O error produced by the wrapped reader.
305 pub fn fill_more(&mut self) -> Result<bool> {
306 if self.available() == 0 {
307 self.discard_buffer();
308 } else if self.tail_capacity() == 0 {
309 self.backshift();
310 }
311 self.read_more()
312 }
313
314 /// Refills the buffer until at least `count` unread bytes are available.
315 ///
316 /// This method may discard consumed bytes or move unread bytes to the front
317 /// of the buffer before reading more data. It stops as soon as the unread
318 /// window reaches `count` bytes or the wrapped reader reaches EOF.
319 ///
320 /// # Parameters
321 ///
322 /// * `count` - Minimum number of unread bytes required.
323 ///
324 /// # Returns
325 ///
326 /// `Ok(true)` if at least `count` unread bytes are buffered. `Ok(false)`
327 /// means EOF was reached before the requested byte count became available.
328 ///
329 /// # Errors
330 ///
331 /// Returns [`ErrorKind::InvalidInput`] when `count` exceeds the internal
332 /// buffer capacity. Returns [`ErrorKind::InvalidData`] if the wrapped
333 /// reader reports more bytes than the spare buffer range could hold.
334 /// Returns any non-interrupted I/O error produced by the wrapped reader
335 /// while refilling the buffer.
336 #[inline]
337 pub fn fill_until(&mut self, count: usize) -> Result<bool> {
338 if count > self.capacity() {
339 return Err(Error::new(
340 ErrorKind::InvalidInput,
341 "requested available bytes exceed buffered input capacity",
342 ));
343 }
344 while self.available() < count {
345 let available = self.available();
346 if available == 0 {
347 self.discard_buffer();
348 } else {
349 let missing = count - available;
350 if self.tail_capacity() < missing {
351 self.backshift();
352 }
353 }
354 if !self.read_more()? {
355 return Ok(false);
356 }
357 }
358 Ok(true)
359 }
360
361 /// Ensures that at least `count` unread bytes are available.
362 ///
363 /// Unlike [`Self::fill_until`], this method treats EOF before the requested
364 /// byte count as [`ErrorKind::UnexpectedEof`]. Any partial bytes buffered
365 /// before EOF are consumed so callers observe the same logical position as
366 /// a failed exact read.
367 ///
368 /// # Parameters
369 ///
370 /// * `count` - Minimum number of unread bytes required.
371 ///
372 /// # Errors
373 ///
374 /// Returns [`ErrorKind::UnexpectedEof`] if EOF is reached before `count`
375 /// bytes are available. Returns [`ErrorKind::InvalidInput`] when `count`
376 /// exceeds the internal buffer capacity. Returns [`ErrorKind::InvalidData`]
377 /// if the wrapped reader reports more bytes than the spare buffer range
378 /// could hold. Returns any non-interrupted I/O error produced by the
379 /// wrapped reader while refilling the buffer.
380 #[inline]
381 pub fn ensure_available(&mut self, count: usize) -> Result<()> {
382 if self.fill_until(count)? {
383 return Ok(());
384 }
385 let available = self.available();
386 // SAFETY: `available` is the current readable byte count.
387 unsafe {
388 self.consume_unchecked(available);
389 }
390 Err(Error::new(
391 ErrorKind::UnexpectedEof,
392 "failed to fill whole buffer",
393 ))
394 }
395
396 /// Reads bytes through the internal buffer into an indexed output range.
397 ///
398 /// If the internal buffer is empty and `count` is at least as large as the
399 /// internal buffer capacity, the read is delegated directly to the wrapped
400 /// reader to avoid an unnecessary copy. Otherwise, bytes are served from
401 /// the internal buffer.
402 ///
403 /// # Arguments
404 ///
405 /// * `output` - Destination storage that receives bytes.
406 /// * `output_index` - Start index inside `output`.
407 /// * `count` - Maximum number of bytes to read.
408 ///
409 /// # Returns
410 ///
411 /// The number of bytes written into `output[output_index..output_index +
412 /// count]`. A return value of `0` means that `count` was zero or EOF was
413 /// reached before any bytes were read.
414 ///
415 /// # Errors
416 ///
417 /// Returns any I/O error produced by the wrapped reader. Returns
418 /// [`ErrorKind::InvalidData`] if the wrapped reader reports more bytes
419 /// than the requested destination range could hold. Interrupted reads are
420 /// retried when the method refills the internal buffer through
421 /// `read_more`; direct delegated reads follow the wrapped reader's own
422 /// [`Read::read`] behavior.
423 ///
424 /// # Safety
425 ///
426 /// The caller must guarantee that `output_index..output_index + count` is
427 /// a valid range inside `output` and that the addition does not overflow.
428 #[inline(always)]
429 pub unsafe fn read_into_unchecked(
430 &mut self,
431 output: &mut [u8],
432 output_index: usize,
433 count: usize,
434 ) -> Result<usize> {
435 debug_assert!(
436 output_index
437 .checked_add(count)
438 .is_some_and(|end| end <= output.len()),
439 "unchecked read output range exceeds destination buffer"
440 );
441 if count == 0 {
442 return Ok(0);
443 }
444 if self.available() == 0 {
445 self.discard_buffer();
446 if count >= self.buffer.capacity() {
447 // SAFETY: The caller guarantees that the target range is valid.
448 let read = unsafe {
449 self.inner.read_unchecked(output, output_index, count)
450 }?;
451 validate_read_count(read, count)?;
452 return Ok(read);
453 }
454 if !self.read_more()? {
455 return Ok(0);
456 }
457 }
458 let read_count = count.min(self.available());
459 // SAFETY: `read_count` is bounded by the caller-provided output range
460 // and the available input range.
461 unsafe {
462 self.buffer
463 .copy_to_unchecked(output, output_index, read_count);
464 }
465 Ok(read_count)
466 }
467
468 /// Seeks the wrapped reader and discards buffered bytes after success.
469 ///
470 /// For [`SeekFrom::Current`], the offset is adjusted by the number of
471 /// unread bytes already buffered, so seeking is relative to the logical
472 /// position observed by callers of this buffered input.
473 ///
474 /// # Arguments
475 ///
476 /// * `position` - The target seek position.
477 ///
478 /// # Returns
479 ///
480 /// The new absolute stream position reported by the wrapped reader.
481 ///
482 /// # Errors
483 ///
484 /// Returns [`ErrorKind::InvalidInput`] if a [`SeekFrom::Current`] offset
485 /// cannot be adjusted by the unread buffered byte count. Returns any seek
486 /// error produced by the wrapped reader.
487 fn seek_logical(&mut self, position: SeekFrom) -> Result<u64>
488 where
489 R: Seek,
490 {
491 let position = match position {
492 SeekFrom::Current(offset) => {
493 // `buffer` is a `Vec<u8>`, whose maximum allocation size fits
494 // in `isize`; that always fits in `i64`.
495 let unread = self.available() as i64;
496 let adjusted = offset.checked_sub(unread).ok_or_else(|| {
497 Error::new(
498 ErrorKind::InvalidInput,
499 "current seek offset underflows after buffered adjustment",
500 )
501 })?;
502 self.inner.seek(SeekFrom::Current(adjusted))
503 }
504 other => self.inner.seek(other),
505 }?;
506 self.discard_buffer();
507 Ok(position)
508 }
509}
510
511impl<R> Read for BufferedByteInput<R>
512where
513 R: Read,
514{
515 /// Reads bytes through the internal buffer.
516 ///
517 /// # Arguments
518 ///
519 /// * `output` - Destination slice that receives the bytes read.
520 ///
521 /// # Returns
522 ///
523 /// The number of bytes written to `output`.
524 ///
525 /// # Errors
526 ///
527 /// Returns any I/O error produced by the wrapped reader.
528 #[inline(always)]
529 fn read(&mut self, output: &mut [u8]) -> Result<usize> {
530 // SAFETY: The full output slice is a valid writable range.
531 unsafe { self.read_into_unchecked(output, 0, output.len()) }
532 }
533}
534
535impl<R> BufRead for BufferedByteInput<R>
536where
537 R: Read,
538{
539 /// Returns the currently buffered unread bytes, refilling when empty.
540 #[inline]
541 fn fill_buf(&mut self) -> Result<&[u8]> {
542 if self.available() == 0 {
543 self.discard_buffer();
544 if !self.read_more()? {
545 return Ok(&[]);
546 }
547 }
548 Ok(self.unread_slice())
549 }
550
551 /// Consumes `amount` bytes from the unread byte window.
552 #[inline(always)]
553 fn consume(&mut self, amount: usize) {
554 BufferedByteInput::consume(self, amount);
555 }
556}
557
558impl<R> Seek for BufferedByteInput<R>
559where
560 R: Read + Seek,
561{
562 /// Seeks the wrapped reader and discards buffered bytes after success.
563 #[inline(always)]
564 fn seek(&mut self, position: SeekFrom) -> Result<u64> {
565 self.seek_logical(position)
566 }
567}
568
569/// Validates a byte count returned by a wrapped reader.
570///
571/// # Parameters
572///
573/// * `read` - Byte count reported by the wrapped reader.
574/// * `requested` - Maximum byte count requested from the wrapped reader.
575///
576/// # Errors
577///
578/// Returns [`ErrorKind::InvalidData`] when the wrapped reader reports more
579/// bytes than the destination range could hold.
580#[inline(always)]
581fn validate_read_count(read: usize, requested: usize) -> Result<()> {
582 if read > requested {
583 return Err(Error::new(
584 ErrorKind::InvalidData,
585 format!(
586 "reader reported {read} bytes for a {requested}-byte buffer"
587 ),
588 ));
589 }
590 Ok(())
591}