qubit_io/util/streams.rs
1// =============================================================================
2// Copyright (c) 2026 Haixing Hu.
3//
4// SPDX-License-Identifier: Apache-2.0
5//
6// Licensed under the Apache License, Version 2.0.
7// =============================================================================
8use std::cmp::Ordering;
9use std::io::{
10 Error,
11 ErrorKind,
12 Read,
13 Result,
14 Write,
15 copy,
16};
17
18use crate::ReadExt;
19
20/// Default buffer size used by stream copy operations.
21const COPY_BUFFER_SIZE: usize = 16 * 1024;
22
23/// Buffer size used by stream comparison operations.
24const COMPARE_BUFFER_SIZE: usize = 16 * 1024;
25
26/// Stream utility namespace.
27///
28/// This type is an uninstantiable namespace for operations involving one or
29/// more [`Read`] or [`Write`] values. The methods do not close or flush the
30/// supplied streams unless the underlying standard-library operation documents
31/// otherwise.
32///
33/// # Examples
34/// ```
35/// use qubit_io::Streams;
36/// use std::io::Cursor;
37///
38/// let mut input = Cursor::new(b"abcdef".to_vec());
39/// let mut output = Vec::new();
40///
41/// let copied = Streams::copy_at_most(&mut input, &mut output, 4)?;
42///
43/// assert_eq!(4, copied);
44/// assert_eq!(b"abcd", output.as_slice());
45/// # Ok::<(), std::io::Error>(())
46/// ```
47pub enum Streams {}
48
49impl Streams {
50 /// Copies all remaining bytes from `reader` to `writer`.
51 ///
52 /// This is a namespace-style wrapper around [`std::io::copy`]. It preserves
53 /// the standard-library behavior, including platform-specific optimized
54 /// copy paths when available.
55 ///
56 /// # Parameters
57 /// - `reader`: Source reader.
58 /// - `writer`: Destination writer.
59 ///
60 /// # Returns
61 /// The number of bytes copied.
62 ///
63 /// # Errors
64 /// Returns the first read or write error reported by the underlying
65 /// streams, using the same error behavior as [`std::io::copy`].
66 #[inline]
67 pub fn copy<R, W>(reader: &mut R, writer: &mut W) -> Result<u64>
68 where
69 R: Read + ?Sized,
70 W: Write + ?Sized,
71 {
72 copy(reader, writer)
73 }
74
75 /// Copies at most `max_bytes` bytes from `reader` to `writer`.
76 ///
77 /// This method stops successfully when either EOF is reached or
78 /// `max_bytes` bytes have been copied. It does not close or flush either
79 /// stream.
80 ///
81 /// # Parameters
82 /// - `reader`: Source reader.
83 /// - `writer`: Destination writer.
84 /// - `max_bytes`: Maximum number of bytes to copy.
85 ///
86 /// # Returns
87 /// The number of bytes copied.
88 ///
89 /// # Errors
90 /// Returns the first non-interrupted read error or write error reported by
91 /// the underlying streams. Interrupted reads are retried.
92 #[inline]
93 pub fn copy_at_most<R, W>(
94 reader: &mut R,
95 writer: &mut W,
96 max_bytes: u64,
97 ) -> Result<u64>
98 where
99 R: Read + ?Sized,
100 W: Write + ?Sized,
101 {
102 let mut reader = reader;
103 let mut writer = writer;
104 copy_at_most_impl(&mut reader, &mut writer, max_bytes)
105 }
106
107 /// Copies the remaining input if its total length is at most `max_bytes`.
108 ///
109 /// This method copies from the current reader position until EOF. If EOF is
110 /// not reached within `max_bytes` bytes, it returns
111 /// [`std::io::ErrorKind::InvalidData`]. Detecting oversized input consumes
112 /// one excess byte from `reader`; that excess byte is not written to
113 /// `writer`.
114 ///
115 /// # Parameters
116 /// - `reader`: Source reader.
117 /// - `writer`: Destination writer.
118 /// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
119 ///
120 /// # Returns
121 /// The number of bytes copied when EOF is reached within the limit.
122 ///
123 /// # Errors
124 /// Returns [`std::io::ErrorKind::InvalidData`] when the remaining input is
125 /// longer than `max_bytes`. Returns the first non-interrupted read error or
126 /// write error reported by the underlying streams. Interrupted reads are
127 /// retried.
128 #[inline]
129 pub fn copy_to_end_limited<R, W>(
130 reader: &mut R,
131 writer: &mut W,
132 max_bytes: u64,
133 ) -> Result<u64>
134 where
135 R: Read + ?Sized,
136 W: Write + ?Sized,
137 {
138 let mut reader = reader;
139 let mut writer = writer;
140 copy_to_end_limited_impl(&mut reader, &mut writer, max_bytes)
141 }
142
143 /// Tests whether two readable streams have equal remaining contents.
144 ///
145 /// The comparison starts at each reader's current position and reads both
146 /// streams in fixed-size chunks. A mismatch stops comparison immediately
147 /// after the differing chunks are read, so each reader may have advanced
148 /// past the first differing byte within that chunk.
149 ///
150 /// # Parameters
151 /// - `left`: First stream.
152 /// - `right`: Second stream.
153 ///
154 /// # Returns
155 /// `true` when both streams produce the same bytes until EOF.
156 ///
157 /// # Errors
158 /// Returns the first read error reported by either stream.
159 #[inline]
160 pub fn content_eq(
161 left: &mut dyn Read,
162 right: &mut dyn Read,
163 ) -> Result<bool> {
164 Ok(Self::compare_content(left, right)? == Ordering::Equal)
165 }
166
167 /// Lexicographically compares the remaining contents of two readable
168 /// streams.
169 ///
170 /// The comparison starts at each reader's current position and reads both
171 /// streams in fixed-size chunks. A mismatch stops comparison immediately
172 /// after the differing chunks are read, so each reader may have advanced
173 /// past the first differing byte within that chunk.
174 ///
175 /// # Parameters
176 /// - `left`: First stream.
177 /// - `right`: Second stream.
178 ///
179 /// # Returns
180 /// The lexicographic ordering of the remaining bytes.
181 ///
182 /// # Errors
183 /// Returns the first read error reported by either stream.
184 pub fn compare_content(
185 left: &mut dyn Read,
186 right: &mut dyn Read,
187 ) -> Result<Ordering> {
188 let mut left_buffer = [0; COMPARE_BUFFER_SIZE];
189 let mut right_buffer = [0; COMPARE_BUFFER_SIZE];
190 loop {
191 let left_count = left.read_exact_or_eof(&mut left_buffer)?;
192 let right_count = right.read_exact_or_eof(&mut right_buffer)?;
193 let n = left_count.min(right_count);
194 for index in 0..n {
195 match left_buffer[index].cmp(&right_buffer[index]) {
196 Ordering::Equal => {}
197 ordering => return Ok(ordering),
198 }
199 }
200 match left_count.cmp(&right_count) {
201 Ordering::Equal if left_count == 0 => {
202 return Ok(Ordering::Equal);
203 }
204 Ordering::Equal => {}
205 ordering => return Ok(ordering),
206 }
207 }
208 }
209}
210
211/// Copies at most `max_bytes` bytes using trait-object I/O endpoints.
212///
213/// # Parameters
214/// - `reader`: Source reader.
215/// - `writer`: Destination writer.
216/// - `max_bytes`: Maximum number of bytes to copy.
217///
218/// # Returns
219/// The number of bytes copied.
220///
221/// # Errors
222/// Returns the first non-interrupted read error or write error reported by the
223/// underlying streams. Interrupted reads are retried.
224fn copy_at_most_impl(
225 reader: &mut dyn Read,
226 writer: &mut dyn Write,
227 max_bytes: u64,
228) -> Result<u64> {
229 let mut buffer = [0; COPY_BUFFER_SIZE];
230 let mut remaining = max_bytes;
231 let mut copied = 0;
232 while remaining > 0 {
233 let requested = remaining.min(COPY_BUFFER_SIZE as u64) as usize;
234 match reader.read(&mut buffer[..requested]) {
235 Ok(0) => break,
236 Ok(count) => {
237 writer.write_all(&buffer[..count])?;
238 let count = count as u64;
239 remaining -= count;
240 copied += count;
241 }
242 Err(error) => {
243 if error.kind() == ErrorKind::Interrupted {
244 continue;
245 }
246 return Err(error);
247 }
248 }
249 }
250 Ok(copied)
251}
252
253/// Copies the remaining input through trait-object endpoints when it fits.
254///
255/// # Parameters
256/// - `reader`: Source reader.
257/// - `writer`: Destination writer.
258/// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
259///
260/// # Returns
261/// The number of bytes copied when EOF is reached within the limit.
262///
263/// # Errors
264/// Returns [`ErrorKind::InvalidData`] when the remaining input is longer than
265/// `max_bytes`. Returns the first non-interrupted read error or write error
266/// reported by the underlying streams. Interrupted reads are retried.
267fn copy_to_end_limited_impl(
268 reader: &mut dyn Read,
269 writer: &mut dyn Write,
270 max_bytes: u64,
271) -> Result<u64> {
272 let copied = copy_at_most_impl(reader, writer, max_bytes)?;
273 if copied < max_bytes {
274 return Ok(copied);
275 }
276 if has_more_input(reader)? {
277 return Err(Error::new(
278 ErrorKind::InvalidData,
279 format!("input exceeds maximum length of {max_bytes} bytes"),
280 ));
281 }
282 Ok(copied)
283}
284
285/// Returns whether `reader` has at least one more byte.
286///
287/// # Parameters
288/// - `reader`: Source reader to probe.
289///
290/// # Returns
291/// `true` when one extra byte was read, or `false` when EOF was reached.
292///
293/// # Errors
294/// Returns the first non-interrupted read error reported by `reader`.
295fn has_more_input(reader: &mut dyn Read) -> Result<bool> {
296 let mut byte = [0];
297 loop {
298 match reader.read(&mut byte) {
299 Ok(0) => return Ok(false),
300 Ok(_) => return Ok(true),
301 Err(error) => {
302 if error.kind() == ErrorKind::Interrupted {
303 continue;
304 }
305 return Err(error);
306 }
307 }
308 }
309}