qubit_io/buffered/buffered_input.rs
1// =============================================================================
2// Copyright (c) 2026 Haixing Hu.
3//
4// SPDX-License-Identifier: Apache-2.0
5//
6// Licensed under the Apache License, Version 2.0.
7// =============================================================================
8
9use std::io::{
10 BufRead,
11 Error,
12 ErrorKind,
13 Read,
14 Result,
15 Seek,
16 SeekFrom,
17};
18
19use crate::buffered::DEFAULT_BUFFER_CAPACITY;
20use crate::{
21 Buffer,
22 Input,
23};
24
25/// Buffered unit input over a wrapped input source.
26///
27/// This type owns a wrapped input object and an internal unit buffer. It keeps
28/// unread units in `buffer[position..limit]` so callers can inspect or consume
29/// the current unit window before refilling it.
30///
31/// `BufferedInput` is deliberately unit-oriented. It performs no binary
32/// decoding, text decoding, or record parsing; higher-level stream adapters can
33/// build those concerns on top of [`Self::unread_slice`],
34/// [`Self::unread_raw_parts`], [`Self::ensure_available`], and
35/// [`Self::read_into_unchecked`]. The type also implements [`BufRead`] for
36/// callers that want the standard buffered-read interface.
37#[derive(Debug)]
38pub struct BufferedInput<I>
39where
40 I: Input,
41 I::Item: Copy + Default,
42{
43 inner: I,
44 buffer: Buffer<I::Item>,
45}
46
47impl<I> BufferedInput<I>
48where
49 I: Input,
50 I::Item: Copy + Default,
51{
52 /// Creates a buffered unit input with the default capacity.
53 ///
54 /// # Arguments
55 ///
56 /// * `inner` - The input object wrapped by this buffer.
57 ///
58 /// # Returns
59 ///
60 /// A new buffered unit input whose internal buffer has at least
61 /// `DEFAULT_BUFFER_CAPACITY` units.
62 #[inline(always)]
63 #[must_use]
64 pub fn new(inner: I) -> Self {
65 Self::with_capacity(inner, DEFAULT_BUFFER_CAPACITY)
66 }
67
68 /// Creates a buffered unit input with at least the requested capacity.
69 ///
70 /// The actual capacity is raised to `1` when the requested value is `0`.
71 ///
72 /// # Arguments
73 ///
74 /// * `inner` - The input object wrapped by this buffer.
75 /// * `capacity` - The requested internal buffer capacity, in units.
76 ///
77 /// # Returns
78 ///
79 /// A new buffered unit input whose internal buffer capacity is
80 /// `capacity.max(1)`.
81 #[inline]
82 #[must_use]
83 pub fn with_capacity(inner: I, capacity: usize) -> Self {
84 Self {
85 inner,
86 buffer: Buffer::with_capacity(capacity),
87 }
88 }
89
90 /// Returns a shared reference to the wrapped input object.
91 ///
92 /// # Returns
93 ///
94 /// A shared reference to the inner input object.
95 #[inline(always)]
96 pub const fn inner(&self) -> &I {
97 &self.inner
98 }
99
100 /// Returns an exclusive reference to the wrapped input object.
101 ///
102 /// Mutating the wrapped object directly may invalidate assumptions about
103 /// units already buffered by this value.
104 ///
105 /// # Returns
106 ///
107 /// An exclusive reference to the wrapped input object.
108 #[inline(always)]
109 pub fn inner_mut(&mut self) -> &mut I {
110 &mut self.inner
111 }
112
113 /// Consumes this buffered input and returns the wrapped input object plus
114 /// unread bytes.
115 ///
116 /// This method performs no I/O. Units that have already been read from the
117 /// wrapped input but not consumed by this buffered input are returned as
118 /// the second tuple item.
119 ///
120 /// # Returns
121 ///
122 /// The wrapped input object and a vector containing the unread buffered
123 /// units in logical read order.
124 #[inline(always)]
125 #[must_use]
126 pub fn into_parts(self) -> (I, Vec<I::Item>) {
127 let unread = self.unread_slice().to_vec();
128 (self.inner, unread)
129 }
130
131 /// Returns the internal buffer capacity.
132 ///
133 /// # Returns
134 ///
135 /// The total number of units that can be held by the internal buffer.
136 #[inline(always)]
137 #[must_use]
138 pub fn capacity(&self) -> usize {
139 self.buffer.capacity()
140 }
141
142 /// Returns the number of unread units currently buffered.
143 ///
144 /// # Returns
145 ///
146 /// The length of `buffer[position..limit]`, in units.
147 #[inline(always)]
148 #[must_use]
149 pub fn available(&self) -> usize {
150 self.buffer.available()
151 }
152
153 /// Returns the currently buffered unread units.
154 ///
155 /// # Returns
156 ///
157 /// The unread range `buffer[position..limit]`.
158 #[inline(always)]
159 #[must_use]
160 pub fn unread_slice(&self) -> &[I::Item] {
161 &self.buffer.data()[self.buffer.position()..self.buffer.limit()]
162 }
163
164 /// Returns raw unread-buffer parts for hot-path callers.
165 ///
166 /// The returned slice is the full internal backing storage. `index` is the
167 /// start of the unread unit window, and `count` is the number of unread
168 /// units. Callers that need a slice can use `&buffer[index..index +
169 /// count]`; callers that already validated bounds can pass `buffer` and
170 /// `index` directly to indexed unchecked codecs.
171 ///
172 /// # Returns
173 ///
174 /// The backing storage, the unread start index, and the unread unit count.
175 #[inline(always)]
176 #[must_use]
177 pub fn unread_raw_parts(&self) -> (&[I::Item], usize, usize) {
178 (
179 self.buffer.data(),
180 self.buffer.position(),
181 self.buffer.available(),
182 )
183 }
184
185 /// Advances the unread cursor by `count` units.
186 ///
187 /// # Parameters
188 ///
189 /// * `count` - Number of currently unread units to consume.
190 ///
191 /// # Panics
192 ///
193 /// Panics when `count` exceeds [`Self::available`].
194 #[inline(always)]
195 pub fn consume(&mut self, count: usize) {
196 assert!(
197 count <= self.available(),
198 "cannot consume beyond buffered input"
199 );
200 // SAFETY: The assertion proves that `count` is within the readable
201 // input window.
202 unsafe {
203 self.buffer.consume_unchecked(count);
204 }
205 }
206
207 /// Advances the unread cursor without checking bounds.
208 ///
209 /// # Parameters
210 ///
211 /// * `count` - Number of currently unread bytes to consume.
212 ///
213 /// # Safety
214 ///
215 /// The caller must guarantee that `count <= self.available()`.
216 #[inline(always)]
217 pub unsafe fn consume_unchecked(&mut self, count: usize) {
218 // SAFETY: The caller guarantees that `count` is within the readable
219 // input window.
220 unsafe {
221 self.buffer.consume_unchecked(count);
222 }
223 }
224
225 /// Returns the unused capacity at the end of the buffer.
226 ///
227 /// # Returns
228 ///
229 /// The number of writable bytes in `buffer[limit..]`.
230 #[inline(always)]
231 fn tail_capacity(&self) -> usize {
232 self.buffer.spare_capacity()
233 }
234
235 /// Invalidates all buffered units.
236 ///
237 /// After this call, the buffer is considered empty and subsequent reads
238 /// will refill it from the wrapped input.
239 #[inline(always)]
240 fn discard_buffer(&mut self) {
241 self.buffer.clear();
242 }
243
244 /// Moves unread units to the front of the buffer.
245 ///
246 /// This preserves the unread range while reclaiming tail capacity for
247 /// future reads. If there are no unread bytes, the buffer is discarded.
248 #[inline(always)]
249 fn backshift(&mut self) {
250 self.buffer.compact();
251 }
252}
253
254impl<I> BufferedInput<I>
255where
256 I: Input,
257 I::Item: Copy + Default,
258{
259 /// Appends one more chunk from the wrapped reader to the internal buffer.
260 ///
261 /// This method reads into `buffer[limit..]` and advances `limit` by the
262 /// number of bytes read. It retries automatically when the wrapped reader
263 /// returns [`ErrorKind::Interrupted`].
264 ///
265 /// # Returns
266 ///
267 /// `Ok(true)` if at least one byte was appended, or `Ok(false)` if the
268 /// wrapped reader reached EOF.
269 ///
270 /// # Errors
271 ///
272 /// Returns any non-interrupted I/O error produced by the wrapped reader.
273 /// Returns [`ErrorKind::InvalidData`] if the wrapped reader reports more
274 /// bytes than the spare buffer range could hold.
275 fn read_more(&mut self) -> Result<bool> {
276 let count = self.tail_capacity();
277 debug_assert!(count > 0, "buffer has no tail capacity");
278 loop {
279 let limit = self.buffer.limit();
280 // SAFETY: `limit` is always within `buffer`, and `count` is the
281 // remaining capacity from `limit` to the end of `buffer`.
282 match unsafe {
283 self.inner
284 .read_unchecked(self.buffer.data_mut(), limit, count)
285 } {
286 Ok(0) => return Ok(false),
287 Ok(read) => {
288 validate_read_count(read, count)?;
289 // SAFETY: `read_unchecked` returns a count in
290 // `0..=count`, and `count` was the spare capacity.
291 unsafe {
292 self.buffer.advance_unchecked(read);
293 }
294 return Ok(true);
295 }
296 Err(error) if error.kind() == ErrorKind::Interrupted => {
297 continue;
298 }
299 Err(error) => return Err(error),
300 }
301 }
302 }
303
304 /// Refills the internal buffer after preserving unread bytes.
305 ///
306 /// Consumed bytes may be discarded, and unread bytes may be moved to the
307 /// front of the buffer before the wrapped reader is called.
308 ///
309 /// # Returns
310 ///
311 /// `Ok(true)` if at least one byte was appended, or `Ok(false)` at EOF.
312 ///
313 /// # Errors
314 ///
315 /// Returns any non-interrupted I/O error produced by the wrapped reader.
316 pub fn fill_more(&mut self) -> Result<bool> {
317 if self.available() == 0 {
318 self.discard_buffer();
319 } else if self.tail_capacity() == 0 {
320 self.backshift();
321 }
322 self.read_more()
323 }
324
325 /// Refills the buffer until at least `count` unread bytes are available.
326 ///
327 /// This method may discard consumed bytes or move unread bytes to the front
328 /// of the buffer before reading more data. It stops as soon as the unread
329 /// window reaches `count` bytes or the wrapped reader reaches EOF.
330 ///
331 /// # Parameters
332 ///
333 /// * `count` - Minimum number of unread bytes required.
334 ///
335 /// # Returns
336 ///
337 /// `Ok(true)` if at least `count` unread bytes are buffered. `Ok(false)`
338 /// means EOF was reached before the requested byte count became available.
339 ///
340 /// # Errors
341 ///
342 /// Returns [`ErrorKind::InvalidInput`] when `count` exceeds the internal
343 /// buffer capacity. Returns [`ErrorKind::InvalidData`] if the wrapped
344 /// reader reports more bytes than the spare buffer range could hold.
345 /// Returns any non-interrupted I/O error produced by the wrapped reader
346 /// while refilling the buffer.
347 #[inline]
348 pub fn fill_until(&mut self, count: usize) -> Result<bool> {
349 if count > self.capacity() {
350 return Err(Error::new(
351 ErrorKind::InvalidInput,
352 "requested available bytes exceed buffered input capacity",
353 ));
354 }
355 while self.available() < count {
356 let available = self.available();
357 if available == 0 {
358 self.discard_buffer();
359 } else {
360 let missing = count - available;
361 if self.tail_capacity() < missing {
362 self.backshift();
363 }
364 }
365 if !self.read_more()? {
366 return Ok(false);
367 }
368 }
369 Ok(true)
370 }
371
372 /// Ensures that at least `count` unread bytes are available.
373 ///
374 /// Unlike [`Self::fill_until`], this method treats EOF before the requested
375 /// byte count as [`ErrorKind::UnexpectedEof`]. Any partial bytes buffered
376 /// before EOF are consumed so callers observe the same logical position as
377 /// a failed exact read.
378 ///
379 /// # Parameters
380 ///
381 /// * `count` - Minimum number of unread bytes required.
382 ///
383 /// # Errors
384 ///
385 /// Returns [`ErrorKind::UnexpectedEof`] if EOF is reached before `count`
386 /// bytes are available. Returns [`ErrorKind::InvalidInput`] when `count`
387 /// exceeds the internal buffer capacity. Returns [`ErrorKind::InvalidData`]
388 /// if the wrapped reader reports more bytes than the spare buffer range
389 /// could hold. Returns any non-interrupted I/O error produced by the
390 /// wrapped reader while refilling the buffer.
391 #[inline]
392 pub fn ensure_available(&mut self, count: usize) -> Result<()> {
393 if self.fill_until(count)? {
394 return Ok(());
395 }
396 let available = self.available();
397 // SAFETY: `available` is the current readable byte count.
398 unsafe {
399 self.consume_unchecked(available);
400 }
401 Err(Error::new(
402 ErrorKind::UnexpectedEof,
403 "failed to fill whole buffer",
404 ))
405 }
406
407 /// Reads bytes through the internal buffer into an indexed output range.
408 ///
409 /// If the internal buffer is empty and `count` is at least as large as the
410 /// internal buffer capacity, the read is delegated directly to the wrapped
411 /// reader to avoid an unnecessary copy. Otherwise, bytes are served from
412 /// the internal buffer.
413 ///
414 /// # Arguments
415 ///
416 /// * `output` - Destination storage that receives bytes.
417 /// * `output_index` - Start index inside `output`.
418 /// * `count` - Maximum number of bytes to read.
419 ///
420 /// # Returns
421 ///
422 /// The number of bytes written into `output[output_index..output_index +
423 /// count]`. A return value of `0` means that `count` was zero or EOF was
424 /// reached before any bytes were read.
425 ///
426 /// # Errors
427 ///
428 /// Returns any I/O error produced by the wrapped reader. Returns
429 /// [`ErrorKind::InvalidData`] if the wrapped reader reports more bytes
430 /// than the requested destination range could hold. Interrupted reads are
431 /// retried when the method refills the internal buffer through
432 /// `read_more`; direct delegated reads follow the wrapped reader's own
433 /// [`Read::read`] behavior.
434 ///
435 /// # Safety
436 ///
437 /// The caller must guarantee that `output_index..output_index + count` is
438 /// a valid range inside `output` and that the addition does not overflow.
439 #[inline(always)]
440 pub unsafe fn read_into_unchecked(
441 &mut self,
442 output: &mut [I::Item],
443 output_index: usize,
444 count: usize,
445 ) -> Result<usize> {
446 debug_assert!(
447 output_index
448 .checked_add(count)
449 .is_some_and(|end| end <= output.len()),
450 "unchecked read output range exceeds destination buffer"
451 );
452 if count == 0 {
453 return Ok(0);
454 }
455 if self.available() == 0 {
456 self.discard_buffer();
457 if count >= self.buffer.capacity() {
458 // SAFETY: The caller guarantees that the target range is valid.
459 let read = unsafe {
460 self.inner.read_unchecked(output, output_index, count)
461 }?;
462 validate_read_count(read, count)?;
463 return Ok(read);
464 }
465 if !self.read_more()? {
466 return Ok(0);
467 }
468 }
469 let read_count = count.min(self.available());
470 // SAFETY: `read_count` is bounded by the caller-provided output range
471 // and the available input range.
472 unsafe {
473 self.buffer
474 .copy_to_unchecked(output, output_index, read_count);
475 }
476 Ok(read_count)
477 }
478}
479
480impl<I> BufferedInput<I>
481where
482 I: Input<Item = u8>,
483{
484 /// Seeks the wrapped reader and discards buffered bytes after success.
485 ///
486 /// For [`SeekFrom::Current`], the offset is adjusted by the number of
487 /// unread bytes already buffered, so seeking is relative to the logical
488 /// position observed by callers of this buffered input.
489 ///
490 /// # Arguments
491 ///
492 /// * `position` - The target seek position.
493 ///
494 /// # Returns
495 ///
496 /// The new absolute stream position reported by the wrapped reader.
497 ///
498 /// # Errors
499 ///
500 /// Returns [`ErrorKind::InvalidInput`] if a [`SeekFrom::Current`] offset
501 /// cannot be adjusted by the unread buffered byte count. Returns any seek
502 /// error produced by the wrapped reader.
503 fn seek_logical(&mut self, position: SeekFrom) -> Result<u64>
504 where
505 I: Seek,
506 {
507 let position = match position {
508 SeekFrom::Current(offset) => {
509 // `buffer` is a `Vec<u8>`, whose maximum allocation size fits
510 // in `isize`; that always fits in `i64`.
511 let unread = self.available() as i64;
512 let adjusted = offset.checked_sub(unread).ok_or_else(|| {
513 Error::new(
514 ErrorKind::InvalidInput,
515 "current seek offset underflows after buffered adjustment",
516 )
517 })?;
518 self.inner.seek(SeekFrom::Current(adjusted))
519 }
520 other => self.inner.seek(other),
521 }?;
522 self.discard_buffer();
523 Ok(position)
524 }
525}
526
527impl<I> Read for BufferedInput<I>
528where
529 I: Input<Item = u8>,
530{
531 /// Reads bytes through the internal buffer.
532 ///
533 /// # Arguments
534 ///
535 /// * `output` - Destination slice that receives the bytes read.
536 ///
537 /// # Returns
538 ///
539 /// The number of bytes written to `output`.
540 ///
541 /// # Errors
542 ///
543 /// Returns any I/O error produced by the wrapped reader.
544 #[inline(always)]
545 fn read(&mut self, output: &mut [u8]) -> Result<usize> {
546 // SAFETY: The full output slice is a valid writable range.
547 unsafe { self.read_into_unchecked(output, 0, output.len()) }
548 }
549}
550
551impl<I> BufRead for BufferedInput<I>
552where
553 I: Input<Item = u8>,
554{
555 /// Returns the currently buffered unread bytes, refilling when empty.
556 #[inline]
557 fn fill_buf(&mut self) -> Result<&[u8]> {
558 if self.available() == 0 {
559 self.discard_buffer();
560 if !self.read_more()? {
561 return Ok(&[]);
562 }
563 }
564 Ok(self.unread_slice())
565 }
566
567 /// Consumes `amount` bytes from the unread byte window.
568 #[inline(always)]
569 fn consume(&mut self, amount: usize) {
570 BufferedInput::consume(self, amount);
571 }
572}
573
574impl<I> Seek for BufferedInput<I>
575where
576 I: Input<Item = u8> + Seek,
577{
578 /// Seeks the wrapped reader and discards buffered bytes after success.
579 #[inline(always)]
580 fn seek(&mut self, position: SeekFrom) -> Result<u64> {
581 self.seek_logical(position)
582 }
583}
584
585/// Validates a byte count returned by a wrapped reader.
586///
587/// # Parameters
588///
589/// * `read` - Byte count reported by the wrapped reader.
590/// * `requested` - Maximum byte count requested from the wrapped reader.
591///
592/// # Errors
593///
594/// Returns [`ErrorKind::InvalidData`] when the wrapped reader reports more
595/// bytes than the destination range could hold.
596#[inline(always)]
597fn validate_read_count(read: usize, requested: usize) -> Result<()> {
598 if read > requested {
599 return Err(Error::new(
600 ErrorKind::InvalidData,
601 format!(
602 "reader reported {read} bytes for a {requested}-byte buffer"
603 ),
604 ));
605 }
606 Ok(())
607}