Skip to main content

qubit_io/util/
streams.rs

1// =============================================================================
2//    Copyright (c) 2026 Haixing Hu.
3//
4//    SPDX-License-Identifier: Apache-2.0
5//
6//    Licensed under the Apache License, Version 2.0.
7// =============================================================================
8use std::cmp::Ordering;
9use std::io::{
10    Error,
11    ErrorKind,
12    Read,
13    Result,
14    Write,
15    copy,
16};
17
18use crate::ReadExt;
19
20/// Default buffer size used by stream copy operations.
21const COPY_BUFFER_SIZE: usize = 16 * 1024;
22
23/// Buffer size used by stream comparison operations.
24const COMPARE_BUFFER_SIZE: usize = 16 * 1024;
25
26/// Stream utility namespace.
27///
28/// This type is an uninstantiable namespace for operations involving one or
29/// more [`Read`] or [`Write`] values. The methods do not close or flush the
30/// supplied streams unless the underlying standard-library operation documents
31/// otherwise.
32///
33/// # Examples
34/// ```
35/// use qubit_io::Streams;
36/// use std::io::Cursor;
37///
38/// let mut input = Cursor::new(b"abcdef".to_vec());
39/// let mut output = Vec::new();
40///
41/// let copied = Streams::copy_at_most(&mut input, &mut output, 4)?;
42///
43/// assert_eq!(4, copied);
44/// assert_eq!(b"abcd", output.as_slice());
45/// # Ok::<(), std::io::Error>(())
46/// ```
47pub enum Streams {}
48
49impl Streams {
50    /// Copies all remaining bytes from `reader` to `writer`.
51    ///
52    /// This is a namespace-style wrapper around [`std::io::copy`]. It preserves
53    /// the standard-library behavior, including platform-specific optimized
54    /// copy paths when available.
55    ///
56    /// # Parameters
57    /// - `reader`: Source reader.
58    /// - `writer`: Destination writer.
59    ///
60    /// # Returns
61    /// The number of bytes copied.
62    ///
63    /// # Errors
64    /// Returns the first read or write error reported by the underlying
65    /// streams, using the same error behavior as [`std::io::copy`].
66    #[inline]
67    pub fn copy<R, W>(reader: &mut R, writer: &mut W) -> Result<u64>
68    where
69        R: Read + ?Sized,
70        W: Write + ?Sized,
71    {
72        copy(reader, writer)
73    }
74
75    /// Copies at most `max_bytes` bytes from `reader` to `writer`.
76    ///
77    /// This method stops successfully when either EOF is reached or
78    /// `max_bytes` bytes have been copied. It does not close or flush either
79    /// stream.
80    ///
81    /// # Parameters
82    /// - `reader`: Source reader.
83    /// - `writer`: Destination writer.
84    /// - `max_bytes`: Maximum number of bytes to copy.
85    ///
86    /// # Returns
87    /// The number of bytes copied.
88    ///
89    /// # Errors
90    /// Returns the first non-interrupted read error or write error reported by
91    /// the underlying streams. Interrupted reads are retried.
92    #[inline]
93    pub fn copy_at_most<R, W>(
94        reader: &mut R,
95        writer: &mut W,
96        max_bytes: u64,
97    ) -> Result<u64>
98    where
99        R: Read + ?Sized,
100        W: Write + ?Sized,
101    {
102        let mut reader = reader;
103        let mut writer = writer;
104        copy_at_most_impl(&mut reader, &mut writer, max_bytes)
105    }
106
107    /// Copies the remaining input if its total length is at most `max_bytes`.
108    ///
109    /// This method copies from the current reader position until EOF. If EOF is
110    /// not reached within `max_bytes` bytes, it returns
111    /// [`std::io::ErrorKind::InvalidData`]. Detecting oversized input consumes
112    /// one excess byte from `reader`; that excess byte is not written to
113    /// `writer`.
114    ///
115    /// # Parameters
116    /// - `reader`: Source reader.
117    /// - `writer`: Destination writer.
118    /// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
119    ///
120    /// # Returns
121    /// The number of bytes copied when EOF is reached within the limit.
122    ///
123    /// # Errors
124    /// Returns [`std::io::ErrorKind::InvalidData`] when the remaining input is
125    /// longer than `max_bytes`. Returns the first non-interrupted read error or
126    /// write error reported by the underlying streams. Interrupted reads are
127    /// retried.
128    #[inline]
129    pub fn copy_to_end_limited<R, W>(
130        reader: &mut R,
131        writer: &mut W,
132        max_bytes: u64,
133    ) -> Result<u64>
134    where
135        R: Read + ?Sized,
136        W: Write + ?Sized,
137    {
138        let mut reader = reader;
139        let mut writer = writer;
140        copy_to_end_limited_impl(&mut reader, &mut writer, max_bytes)
141    }
142
143    /// Tests whether two readable streams have equal remaining contents.
144    ///
145    /// The comparison starts at each reader's current position and reads both
146    /// streams in fixed-size chunks. A mismatch stops comparison immediately
147    /// after the differing chunks are read, so each reader may have advanced
148    /// past the first differing byte within that chunk.
149    ///
150    /// # Parameters
151    /// - `left`: First stream.
152    /// - `right`: Second stream.
153    ///
154    /// # Returns
155    /// `true` when both streams produce the same bytes until EOF.
156    ///
157    /// # Errors
158    /// Returns the first read error reported by either stream.
159    #[inline]
160    pub fn content_eq(
161        left: &mut dyn Read,
162        right: &mut dyn Read,
163    ) -> Result<bool> {
164        Ok(Self::compare_content(left, right)? == Ordering::Equal)
165    }
166
167    /// Lexicographically compares the remaining contents of two readable
168    /// streams.
169    ///
170    /// The comparison starts at each reader's current position and reads both
171    /// streams in fixed-size chunks. A mismatch stops comparison immediately
172    /// after the differing chunks are read, so each reader may have advanced
173    /// past the first differing byte within that chunk.
174    ///
175    /// # Parameters
176    /// - `left`: First stream.
177    /// - `right`: Second stream.
178    ///
179    /// # Returns
180    /// The lexicographic ordering of the remaining bytes.
181    ///
182    /// # Errors
183    /// Returns the first read error reported by either stream.
184    pub fn compare_content(
185        left: &mut dyn Read,
186        right: &mut dyn Read,
187    ) -> Result<Ordering> {
188        let mut left_buffer = [0; COMPARE_BUFFER_SIZE];
189        let mut right_buffer = [0; COMPARE_BUFFER_SIZE];
190        loop {
191            let left_count = left.read_exact_or_eof(&mut left_buffer)?;
192            let right_count = right.read_exact_or_eof(&mut right_buffer)?;
193            let n = left_count.min(right_count);
194            for index in 0..n {
195                match left_buffer[index].cmp(&right_buffer[index]) {
196                    Ordering::Equal => {}
197                    ordering => return Ok(ordering),
198                }
199            }
200            match left_count.cmp(&right_count) {
201                Ordering::Equal if left_count == 0 => {
202                    return Ok(Ordering::Equal);
203                }
204                Ordering::Equal => {}
205                ordering => return Ok(ordering),
206            }
207        }
208    }
209}
210
211/// Copies at most `max_bytes` bytes using trait-object I/O endpoints.
212///
213/// # Parameters
214/// - `reader`: Source reader.
215/// - `writer`: Destination writer.
216/// - `max_bytes`: Maximum number of bytes to copy.
217///
218/// # Returns
219/// The number of bytes copied.
220///
221/// # Errors
222/// Returns the first non-interrupted read error or write error reported by the
223/// underlying streams. Interrupted reads are retried.
224fn copy_at_most_impl(
225    reader: &mut dyn Read,
226    writer: &mut dyn Write,
227    max_bytes: u64,
228) -> Result<u64> {
229    let mut buffer = [0; COPY_BUFFER_SIZE];
230    let mut remaining = max_bytes;
231    let mut copied = 0;
232    while remaining > 0 {
233        let requested = remaining.min(COPY_BUFFER_SIZE as u64) as usize;
234        match reader.read(&mut buffer[..requested]) {
235            Ok(0) => break,
236            Ok(count) => {
237                writer.write_all(&buffer[..count])?;
238                let count = count as u64;
239                remaining -= count;
240                copied += count;
241            }
242            Err(error) => {
243                if error.kind() == ErrorKind::Interrupted {
244                    continue;
245                }
246                return Err(error);
247            }
248        }
249    }
250    Ok(copied)
251}
252
253/// Copies the remaining input through trait-object endpoints when it fits.
254///
255/// # Parameters
256/// - `reader`: Source reader.
257/// - `writer`: Destination writer.
258/// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
259///
260/// # Returns
261/// The number of bytes copied when EOF is reached within the limit.
262///
263/// # Errors
264/// Returns [`ErrorKind::InvalidData`] when the remaining input is longer than
265/// `max_bytes`. Returns the first non-interrupted read error or write error
266/// reported by the underlying streams. Interrupted reads are retried.
267fn copy_to_end_limited_impl(
268    reader: &mut dyn Read,
269    writer: &mut dyn Write,
270    max_bytes: u64,
271) -> Result<u64> {
272    let copied = copy_at_most_impl(reader, writer, max_bytes)?;
273    if copied < max_bytes {
274        return Ok(copied);
275    }
276    if has_more_input(reader)? {
277        return Err(Error::new(
278            ErrorKind::InvalidData,
279            format!("input exceeds maximum length of {max_bytes} bytes"),
280        ));
281    }
282    Ok(copied)
283}
284
285/// Returns whether `reader` has at least one more byte.
286///
287/// # Parameters
288/// - `reader`: Source reader to probe.
289///
290/// # Returns
291/// `true` when one extra byte was read, or `false` when EOF was reached.
292///
293/// # Errors
294/// Returns the first non-interrupted read error reported by `reader`.
295fn has_more_input(reader: &mut dyn Read) -> Result<bool> {
296    let mut byte = [0];
297    loop {
298        match reader.read(&mut byte) {
299            Ok(0) => return Ok(false),
300            Ok(_) => return Ok(true),
301            Err(error) => {
302                if error.kind() == ErrorKind::Interrupted {
303                    continue;
304                }
305                return Err(error);
306            }
307        }
308    }
309}