Skip to main content

fgumi_lib/
prefetch_reader.rs

1#![deny(unsafe_code)]
2//! Async userspace read-ahead adapter.
3//!
4//! [`PrefetchReader`] is a drop-in replacement for [`std::io::BufReader<File>`]
5//! that performs asynchronous read-ahead on a dedicated OS thread. It exists to
6//! decouple the blocking `read()` wait on a file descriptor from the pipeline
7//! worker threads that consume the bytes.
8//!
9//! ## Motivation
10//!
11//! On Linux, the kernel's per-device read-ahead window (`read_ahead_kb`) is
12//! 128 KB by default. A plain `BufReader<File>` is synchronous: when its
13//! internal buffer drains, the next refill blocks in the kernel until pages
14//! arrive from disk. During that stall the calling thread is parked and
15//! cannot make progress on other work. In the fgumi unified pipeline, the
16//! reader thread is also a pipeline worker, so a blocked read translates
17//! directly into lost downstream throughput.
18//!
19//! `PrefetchReader` moves the blocking `read()` onto a dedicated producer
20//! thread that pushes fixed-size chunks through a bounded
21//! [`crossbeam_channel`]. Consumers see a normal [`std::io::Read`] interface;
22//! internally, [`Read::read`] serves bytes out of the currently-held chunk and
23//! only blocks when the producer has not yet delivered the next one. The
24//! upshot is that stalls on the disk become independent of stalls in the
25//! pipeline: the producer overlaps its disk wait with the consumer's CPU
26//! work.
27//!
28//! This is essentially what the kernel's block-layer read-ahead does, but in
29//! userspace — so it works without root, on any OS, and without having to
30//! tune `/sys/block/*/queue/read_ahead_kb`.
31//!
32//! ## Lifecycle
33//!
34//! Constructing a `PrefetchReader` spawns exactly one OS thread, named
35//! `fgumi-prefetch`. The thread owns the inner reader and exits when any of
36//! the following happen:
37//!
38//! - The inner reader signals EOF (`Ok(0)` from `read`).
39//! - The inner reader returns an error (the error is sent through the
40//!   channel, then the thread exits).
41//! - The [`PrefetchReader`] is dropped (the consumer-side receiver is
42//!   destroyed, the producer's next `send` returns `Disconnected`, and the
43//!   loop exits).
44//!
45//! `Drop` joins the producer thread, so leaks are impossible on well-behaved
46//! inner readers. If the inner reader is currently parked in a long `read`
47//! syscall, `Drop` will wait for it to return.
48
49use std::fs::File;
50use std::io::{self, Read};
51use std::thread::{self, JoinHandle};
52
53use crossbeam_channel::{Receiver, Sender, TryRecvError, bounded};
54
55/// Default chunk size used by [`PrefetchReader::new`].
56pub const DEFAULT_CHUNK_SIZE: usize = 4 * 1024 * 1024;
57
58/// Default channel depth used by [`PrefetchReader::new`]. The producer will
59/// keep up to this many filled chunks buffered ahead of the consumer.
60pub const DEFAULT_PREFETCH_DEPTH: usize = 4;
61
62/// How far ahead (in bytes) the producer thread asks the kernel to page in via
63/// `posix_fadvise(POSIX_FADV_WILLNEED)` after each chunk fill. Only used when
64/// the reader was constructed via [`PrefetchReader::from_file`], which captures
65/// the raw fd needed for the syscall. 128 MiB is generous enough to cover
66/// ~1 second of EBS gp3 baseline throughput (125 MiB/s), ensuring pages are
67/// warm by the time the producer's `read()` reaches them.
68///
69/// Stored as `i64` to match `posix_fadvise`'s `off_t` signature directly.
70const WILLNEED_LOOKAHEAD: i64 = 128 * 1024 * 1024;
71
72/// An item handed from the producer thread to the consumer. Carries either a
73/// filled chunk of bytes or a terminal I/O error.
74type Item = io::Result<Vec<u8>>;
75
76/// A `Read` adapter that performs asynchronous userspace prefetch on a
77/// dedicated background thread.
78///
79/// See the [module docs](self) for the rationale and lifecycle model.
80///
81/// # Example
82///
83/// ```
84/// use std::io::{Cursor, Read};
85/// use fgumi_lib::prefetch_reader::PrefetchReader;
86///
87/// let data: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect();
88/// let mut reader = PrefetchReader::new(Cursor::new(data.clone()));
89/// let mut out = Vec::new();
90/// reader.read_to_end(&mut out).unwrap();
91/// assert_eq!(out, data);
92/// ```
93#[derive(Debug)]
94pub struct PrefetchReader {
95    /// The chunk currently being served out to callers of `read`.
96    ///
97    /// Stored as `(data, position)`. `None` means we need to pull the next
98    /// chunk from the channel on the next `read` call.
99    current: Option<(Vec<u8>, usize)>,
100
101    /// Receiving half of the prefetch channel. Held in `Option` so we can
102    /// drop it explicitly in `Drop::drop` before joining the producer thread;
103    /// dropping the receiver is what causes the producer's `send` to fail
104    /// with `Disconnected`, which in turn terminates the producer loop.
105    rx: Option<Receiver<Item>>,
106
107    /// Join handle for the producer thread. `None` after join in `Drop`.
108    handle: Option<JoinHandle<()>>,
109
110    /// Number of times a consumer `read` call had to block on the channel
111    /// because the producer had not yet delivered the next chunk. A high
112    /// value relative to total reads suggests the prefetch depth is too
113    /// shallow or the consumer is faster than the producer.
114    consumer_stalls: u64,
115
116    /// Total bytes handed back to callers of `read` so far.
117    bytes_consumed: u64,
118}
119
120impl PrefetchReader {
121    /// Construct a `PrefetchReader` from an arbitrary reader with default
122    /// chunk size and prefetch depth.
123    ///
124    /// No `posix_fadvise(WILLNEED)` hints are issued because the inner reader
125    /// may not be backed by a file descriptor. Use [`from_file`](Self::from_file)
126    /// when wrapping a [`File`] to get kernel page-cache warming for free.
127    #[must_use]
128    pub fn new<R: Read + Send + 'static>(inner: R) -> Self {
129        Self::with_config(inner, DEFAULT_CHUNK_SIZE, DEFAULT_PREFETCH_DEPTH)
130    }
131
132    /// Construct a `PrefetchReader` from an arbitrary reader with explicit
133    /// `chunk_size` and `prefetch_depth`.
134    ///
135    /// Steady-state memory usage is bounded by `chunk_size * prefetch_depth`
136    /// (plus one chunk being filled by the producer and one being consumed).
137    ///
138    /// # Panics
139    ///
140    /// Panics if `chunk_size == 0` or `prefetch_depth == 0`.
141    #[must_use]
142    pub fn with_config<R: Read + Send + 'static>(
143        inner: R,
144        chunk_size: usize,
145        prefetch_depth: usize,
146    ) -> Self {
147        Self::build(inner, chunk_size, prefetch_depth, None)
148    }
149
150    /// Construct a `PrefetchReader` from a [`File`] with default chunk size
151    /// and prefetch depth.
152    ///
153    /// On Linux the producer thread will call
154    /// `posix_fadvise(POSIX_FADV_WILLNEED)` after each chunk fill to
155    /// proactively page in the next [`WILLNEED_LOOKAHEAD`] bytes,
156    /// making the reader independent of the kernel's default read-ahead
157    /// window (`read_ahead_kb`). On non-Linux platforms this behaves
158    /// identically to [`new`](Self::new).
159    ///
160    /// The file should be positioned at offset 0 when passed in — the
161    /// WILLNEED hints assume reading starts from the beginning of the file.
162    #[must_use]
163    pub fn from_file(file: File) -> Self {
164        Self::from_file_with_config(file, DEFAULT_CHUNK_SIZE, DEFAULT_PREFETCH_DEPTH)
165    }
166
167    /// Construct a `PrefetchReader` from a [`File`] with explicit `chunk_size`
168    /// and `prefetch_depth`, plus kernel WILLNEED hints on Linux.
169    ///
170    /// # Panics
171    ///
172    /// Panics if `chunk_size == 0` or `prefetch_depth == 0`.
173    #[must_use]
174    pub fn from_file_with_config(file: File, chunk_size: usize, prefetch_depth: usize) -> Self {
175        let hint_fd = crate::os_hints::hint_fd(&file);
176        Self::build(file, chunk_size, prefetch_depth, hint_fd)
177    }
178
179    /// Shared construction logic.
180    fn build<R: Read + Send + 'static>(
181        inner: R,
182        chunk_size: usize,
183        prefetch_depth: usize,
184        hint_fd: Option<i32>,
185    ) -> Self {
186        assert!(chunk_size > 0, "PrefetchReader chunk_size must be > 0");
187        assert!(prefetch_depth > 0, "PrefetchReader prefetch_depth must be > 0");
188
189        let (tx, rx) = bounded::<Item>(prefetch_depth);
190        let handle = thread::Builder::new()
191            .name("fgumi-prefetch".to_string())
192            .spawn(move || producer_main(inner, chunk_size, hint_fd, &tx))
193            .expect("failed to spawn fgumi-prefetch thread");
194
195        Self {
196            current: None,
197            rx: Some(rx),
198            handle: Some(handle),
199            consumer_stalls: 0,
200            bytes_consumed: 0,
201        }
202    }
203
204    /// Total bytes served to callers of [`Read::read`] so far.
205    #[must_use]
206    pub fn bytes_consumed(&self) -> u64 {
207        self.bytes_consumed
208    }
209
210    /// Number of times a `read` call had to block waiting for the producer
211    /// to deliver the next chunk. Useful as a prototype-phase signal for
212    /// whether [`DEFAULT_PREFETCH_DEPTH`] is large enough.
213    #[must_use]
214    pub fn consumer_stalls(&self) -> u64 {
215        self.consumer_stalls
216    }
217}
218
219/// Entry point for the producer thread. Wraps [`producer_loop`] so that a
220/// panic inside the inner reader surfaces on the consumer side as an
221/// [`io::Error`] instead of a silently-truncated stream.
222fn producer_main<R: Read>(inner: R, chunk_size: usize, hint_fd: Option<i32>, tx: &Sender<Item>) {
223    let tx_for_panic = tx.clone();
224    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
225        producer_loop(inner, chunk_size, hint_fd, tx);
226    }));
227    if let Err(payload) = result {
228        let msg = match payload.downcast_ref::<&'static str>() {
229            Some(s) => (*s).to_string(),
230            None => match payload.downcast_ref::<String>() {
231                Some(s) => s.clone(),
232                None => "fgumi-prefetch producer thread panicked".to_string(),
233            },
234        };
235        let _ = tx_for_panic.send(Err(io::Error::other(msg)));
236    }
237}
238
239/// The actual producer loop. Allocates a `chunk_size`-byte buffer, issues a
240/// single `read()` call, and sends whatever bytes were returned through the
241/// channel. Emitting after the first successful read (rather than looping to
242/// fill the full chunk) ensures that short reads from pipes or throttled
243/// readers are delivered promptly. For file-backed readers, the OS typically
244/// returns a full page-aligned read in one call, so this rarely increases
245/// channel traffic. Tolerates `Interrupted` errors. Exits on EOF, I/O error,
246/// or when the consumer drops the receiver.
247///
248/// When `hint_fd` is `Some`, the loop calls
249/// `posix_fadvise(POSIX_FADV_WILLNEED)` after each chunk to proactively page
250/// in the next [`WILLNEED_LOOKAHEAD`] bytes. This removes the
251/// dependence on the kernel's default `read_ahead_kb` setting.
252fn producer_loop<R: Read>(
253    mut inner: R,
254    chunk_size: usize,
255    hint_fd: Option<i32>,
256    tx: &Sender<Item>,
257) {
258    let mut position: i64 = 0;
259
260    loop {
261        let mut buf = vec![0u8; chunk_size];
262        let mut filled: usize = 0;
263        let mut eof = false;
264
265        loop {
266            match inner.read(&mut buf[filled..]) {
267                Ok(0) => {
268                    eof = true;
269                    break;
270                }
271                Ok(n) => {
272                    filled += n;
273                    break;
274                }
275                Err(e) if e.kind() == io::ErrorKind::Interrupted => (),
276                Err(e) => {
277                    // Flush any bytes we already managed to read *before*
278                    // surfacing the error. Otherwise a mid-chunk failure
279                    // would silently discard data that was successfully
280                    // returned by the inner reader.
281                    if filled > 0 {
282                        buf.truncate(filled);
283                        let _ = tx.send(Ok(buf));
284                    }
285                    let _ = tx.send(Err(e));
286                    return;
287                }
288            }
289        }
290
291        if filled == 0 && eof {
292            return;
293        }
294
295        position = position.saturating_add(i64::try_from(filled).unwrap_or(i64::MAX));
296
297        // Ask the kernel to start paging in bytes ahead of our current
298        // position. The call is non-blocking — the kernel initiates the I/O
299        // and returns immediately.
300        if let Some(fd) = hint_fd {
301            crate::os_hints::advise_willneed_raw(fd, position, WILLNEED_LOOKAHEAD);
302        }
303
304        buf.truncate(filled);
305        if tx.send(Ok(buf)).is_err() {
306            // Consumer dropped the receiver; shutdown cleanly.
307            return;
308        }
309
310        if eof {
311            return;
312        }
313    }
314}
315
316impl Read for PrefetchReader {
317    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
318        if buf.is_empty() {
319            return Ok(0);
320        }
321
322        loop {
323            // Serve from the current chunk if any bytes remain.
324            if let Some((data, pos)) = self.current.as_mut() {
325                if *pos < data.len() {
326                    let n = std::cmp::min(buf.len(), data.len() - *pos);
327                    buf[..n].copy_from_slice(&data[*pos..*pos + n]);
328                    *pos += n;
329                    self.bytes_consumed += n as u64;
330                    return Ok(n);
331                }
332                // Current chunk exhausted; drop it and pull the next one.
333                self.current = None;
334            }
335
336            // No current chunk. Pull the next one from the producer.
337            let Some(rx) = self.rx.as_ref() else {
338                return Ok(0);
339            };
340
341            // Fast path: a chunk is already waiting.
342            let item = match rx.try_recv() {
343                Ok(item) => item,
344                Err(TryRecvError::Disconnected) => return Ok(0),
345                Err(TryRecvError::Empty) => {
346                    // Slow path: producer hasn't delivered yet; we block.
347                    self.consumer_stalls += 1;
348                    match rx.recv() {
349                        Ok(item) => item,
350                        Err(_) => return Ok(0),
351                    }
352                }
353            };
354
355            match item {
356                Ok(data) if !data.is_empty() => self.current = Some((data, 0)),
357                Ok(_) => {} // defensive: skip empty chunks
358                Err(e) => return Err(e),
359            }
360        }
361    }
362}
363
364impl Drop for PrefetchReader {
365    fn drop(&mut self) {
366        // Drop the receiver first so the producer's next `send` returns
367        // `Disconnected` and the loop exits. Then join the thread to
368        // guarantee no leak.
369        self.rx = None;
370        self.current = None;
371        if let Some(handle) = self.handle.take() {
372            if handle.join().is_err() {
373                log::debug!("fgumi-prefetch producer thread panicked during shutdown");
374            }
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use proptest::prelude::*;
383    use std::io::Cursor;
384    use std::sync::Arc;
385    use std::sync::atomic::{AtomicBool, Ordering};
386
387    /// Build a deterministic byte stream of the given length.
388    fn sample_bytes(len: usize) -> Vec<u8> {
389        (0..len).map(|i| u8::try_from(i % 251).expect("mod 251 fits in u8")).collect()
390    }
391
392    #[test]
393    fn empty_input_returns_zero_immediately() {
394        let mut reader = PrefetchReader::new(Cursor::new(Vec::<u8>::new()));
395        let mut buf = [0u8; 16];
396        assert_eq!(reader.read(&mut buf).unwrap(), 0);
397        // Bytes counter unchanged.
398        assert_eq!(reader.bytes_consumed(), 0);
399    }
400
401    #[test]
402    fn from_file_reads_correctly() {
403        use std::io::Write;
404        let data = sample_bytes(50_000);
405        let mut tmp = tempfile::NamedTempFile::new().expect("create temp file");
406        tmp.write_all(&data).expect("write temp file");
407        let file = File::open(tmp.path()).expect("reopen temp file");
408        let mut reader = PrefetchReader::from_file(file);
409        let mut out = Vec::new();
410        reader.read_to_end(&mut out).unwrap();
411        assert_eq!(out, data);
412        assert_eq!(reader.bytes_consumed(), data.len() as u64);
413    }
414
415    #[test]
416    fn read_to_end_small_matches_input() {
417        let data = b"hello, fgumi prefetch".to_vec();
418        let mut reader = PrefetchReader::new(Cursor::new(data.clone()));
419        let mut out = Vec::new();
420        reader.read_to_end(&mut out).unwrap();
421        assert_eq!(out, data);
422        assert_eq!(reader.bytes_consumed(), data.len() as u64);
423    }
424
425    #[test]
426    fn read_to_end_large_matches_input() {
427        // ~1 MiB — exercises multiple chunks through the channel.
428        let data = sample_bytes(1_000_003);
429        let mut reader = PrefetchReader::with_config(Cursor::new(data.clone()), 8 * 1024, 2);
430        let mut out = Vec::new();
431        reader.read_to_end(&mut out).unwrap();
432        assert_eq!(out, data);
433        assert_eq!(reader.bytes_consumed(), data.len() as u64);
434    }
435
436    #[test]
437    fn tiny_chunk_size_and_many_small_reads() {
438        let data = sample_bytes(5_000);
439        let mut reader = PrefetchReader::with_config(Cursor::new(data.clone()), 17, 2);
440        let mut out = Vec::new();
441        let mut tmp = [0u8; 7];
442        loop {
443            let n = reader.read(&mut tmp).unwrap();
444            if n == 0 {
445                break;
446            }
447            out.extend_from_slice(&tmp[..n]);
448        }
449        assert_eq!(out, data);
450    }
451
452    #[test]
453    fn repeated_read_after_eof_returns_zero_forever() {
454        let mut reader = PrefetchReader::new(Cursor::new(b"abc".to_vec()));
455        let mut out = Vec::new();
456        reader.read_to_end(&mut out).unwrap();
457        assert_eq!(out, b"abc");
458
459        let mut tmp = [0u8; 8];
460        for _ in 0..10 {
461            assert_eq!(reader.read(&mut tmp).unwrap(), 0);
462        }
463    }
464
465    #[test]
466    fn drop_before_consuming_does_not_hang() {
467        // 1 MB of data, tiny chunks, shallow channel: the producer will fill
468        // the channel quickly and then block on send. When we drop the
469        // reader, the send must unblock so `Drop::join` returns.
470        let data = vec![0u8; 1_000_000];
471        let _reader = PrefetchReader::with_config(Cursor::new(data), 4 * 1024, 2);
472        // Drop happens at end of scope.
473    }
474
475    #[test]
476    fn partial_read_then_drop_does_not_hang() {
477        let data = sample_bytes(500_000);
478        let mut reader = PrefetchReader::with_config(Cursor::new(data), 4 * 1024, 2);
479        let mut tmp = [0u8; 32];
480        // Read a bit, then drop without finishing.
481        let n = reader.read(&mut tmp).unwrap();
482        assert!(n > 0);
483    }
484
485    #[test]
486    fn error_from_inner_reader_propagates_once() {
487        struct AlwaysErr;
488        impl Read for AlwaysErr {
489            fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
490                Err(io::Error::new(io::ErrorKind::PermissionDenied, "nope"))
491            }
492        }
493        let mut reader = PrefetchReader::new(AlwaysErr);
494        let mut buf = [0u8; 16];
495        let err = reader.read(&mut buf).expect_err("first read should error");
496        assert_eq!(err.kind(), io::ErrorKind::PermissionDenied);
497        // After the error is propagated, subsequent reads see the producer
498        // exit and return EOF.
499        assert_eq!(reader.read(&mut buf).unwrap(), 0);
500    }
501
502    #[test]
503    fn error_after_some_data_delivers_data_then_error() {
504        // Reader that returns one chunk of data, then an error forever.
505        struct DataThenErr {
506            sent: bool,
507        }
508        impl Read for DataThenErr {
509            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
510                if self.sent {
511                    Err(io::Error::other("subsequent error"))
512                } else {
513                    self.sent = true;
514                    let n = buf.len().min(128);
515                    for (i, b) in buf.iter_mut().take(n).enumerate() {
516                        *b = u8::try_from(i).expect("n <= 128 so i fits in u8");
517                    }
518                    Ok(n)
519                }
520            }
521        }
522        // chunk_size > 128 forces the producer to call read() again after
523        // the first short read, which triggers the error.
524        let mut reader = PrefetchReader::with_config(DataThenErr { sent: false }, 1024, 2);
525        let mut out = Vec::new();
526        let mut tmp = [0u8; 256];
527
528        // First read should deliver the 128 bytes of data.
529        let n = reader.read(&mut tmp).unwrap();
530        assert_eq!(n, 128);
531        out.extend_from_slice(&tmp[..n]);
532        assert_eq!(out.len(), 128);
533
534        // Next read should surface the error.
535        let err = reader.read(&mut tmp).expect_err("second read should error");
536        assert!(matches!(err.kind(), io::ErrorKind::Other | io::ErrorKind::UnexpectedEof));
537    }
538
539    #[test]
540    fn interrupted_errors_are_retried_transparently() {
541        struct FlakyThenEof {
542            call: usize,
543        }
544        impl Read for FlakyThenEof {
545            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
546                self.call += 1;
547                if self.call <= 3 {
548                    return Err(io::Error::new(io::ErrorKind::Interrupted, "try again"));
549                }
550                if self.call == 4 {
551                    let n = buf.len().min(10);
552                    for (i, b) in buf.iter_mut().take(n).enumerate() {
553                        *b = u8::try_from(i + 1).expect("n <= 10 so i+1 fits in u8");
554                    }
555                    return Ok(n);
556                }
557                Ok(0)
558            }
559        }
560        let mut reader = PrefetchReader::with_config(FlakyThenEof { call: 0 }, 64, 2);
561        let mut out = Vec::new();
562        reader.read_to_end(&mut out).unwrap();
563        assert_eq!(out, (1..=10).collect::<Vec<u8>>());
564    }
565
566    #[test]
567    fn drop_joins_producer_thread() {
568        /// A reader that sets a flag inside its `Drop` impl so we can
569        /// observe that the producer thread has actually been torn down.
570        struct Tracked {
571            flag: Arc<AtomicBool>,
572            data: Vec<u8>,
573            pos: usize,
574        }
575        impl Read for Tracked {
576            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
577                if self.pos >= self.data.len() {
578                    return Ok(0);
579                }
580                let n = buf.len().min(self.data.len() - self.pos);
581                buf[..n].copy_from_slice(&self.data[self.pos..self.pos + n]);
582                self.pos += n;
583                Ok(n)
584            }
585        }
586        impl Drop for Tracked {
587            fn drop(&mut self) {
588                self.flag.store(true, Ordering::SeqCst);
589            }
590        }
591
592        let flag = Arc::new(AtomicBool::new(false));
593        let inner = Tracked { flag: Arc::clone(&flag), data: sample_bytes(1024), pos: 0 };
594        {
595            let mut reader = PrefetchReader::with_config(inner, 64, 2);
596            let mut out = Vec::new();
597            reader.read_to_end(&mut out).unwrap();
598            assert_eq!(out.len(), 1024);
599            // reader is dropped here; `Tracked::drop` must fire on the
600            // producer thread before `Drop::drop` returns.
601        }
602        assert!(
603            flag.load(Ordering::SeqCst),
604            "producer thread should have dropped the inner reader"
605        );
606    }
607
608    proptest! {
609        /// Property test: for any input bytes, any chunk size, any prefetch
610        /// depth, and any consumer read size, `PrefetchReader` yields exactly
611        /// the same byte sequence as a plain in-memory cursor.
612        #[test]
613        fn prop_byte_identical_to_cursor(
614            data in prop::collection::vec(any::<u8>(), 0..8_192),
615            chunk_size in 1usize..2_048,
616            depth in 1usize..6,
617            read_size in 1usize..256,
618        ) {
619            let expected = data.clone();
620            let mut reader = PrefetchReader::with_config(
621                Cursor::new(data),
622                chunk_size,
623                depth,
624            );
625            let mut out = Vec::with_capacity(expected.len());
626            let mut tmp = vec![0u8; read_size];
627            loop {
628                let n = reader.read(&mut tmp).expect("read should not fail on Cursor");
629                if n == 0 {
630                    break;
631                }
632                out.extend_from_slice(&tmp[..n]);
633            }
634            prop_assert_eq!(out, expected);
635        }
636    }
637}