fgumi_lib/prefetch_reader.rs
1#![deny(unsafe_code)]
2//! Async userspace read-ahead adapter.
3//!
4//! [`PrefetchReader`] is a drop-in replacement for [`std::io::BufReader<File>`]
5//! that performs asynchronous read-ahead on a dedicated OS thread. It exists to
6//! decouple the blocking `read()` wait on a file descriptor from the pipeline
7//! worker threads that consume the bytes.
8//!
9//! ## Motivation
10//!
11//! On Linux, the kernel's per-device read-ahead window (`read_ahead_kb`) is
12//! 128 KB by default. A plain `BufReader<File>` is synchronous: when its
13//! internal buffer drains, the next refill blocks in the kernel until pages
14//! arrive from disk. During that stall the calling thread is parked and
15//! cannot make progress on other work. In the fgumi unified pipeline, the
16//! reader thread is also a pipeline worker, so a blocked read translates
17//! directly into lost downstream throughput.
18//!
19//! `PrefetchReader` moves the blocking `read()` onto a dedicated producer
20//! thread that pushes fixed-size chunks through a bounded
21//! [`crossbeam_channel`]. Consumers see a normal [`std::io::Read`] interface;
22//! internally, [`Read::read`] serves bytes out of the currently-held chunk and
23//! only blocks when the producer has not yet delivered the next one. The
24//! upshot is that stalls on the disk become independent of stalls in the
25//! pipeline: the producer overlaps its disk wait with the consumer's CPU
26//! work.
27//!
28//! This is essentially what the kernel's block-layer read-ahead does, but in
29//! userspace — so it works without root, on any OS, and without having to
30//! tune `/sys/block/*/queue/read_ahead_kb`.
31//!
32//! ## Lifecycle
33//!
34//! Constructing a `PrefetchReader` spawns exactly one OS thread, named
35//! `fgumi-prefetch`. The thread owns the inner reader and exits when any of
36//! the following happen:
37//!
38//! - The inner reader signals EOF (`Ok(0)` from `read`).
39//! - The inner reader returns an error (the error is sent through the
40//! channel, then the thread exits).
41//! - The [`PrefetchReader`] is dropped (the consumer-side receiver is
42//! destroyed, the producer's next `send` returns `Disconnected`, and the
43//! loop exits).
44//!
45//! `Drop` joins the producer thread, so leaks are impossible on well-behaved
46//! inner readers. If the inner reader is currently parked in a long `read`
47//! syscall, `Drop` will wait for it to return.
48
49use std::fs::File;
50use std::io::{self, Read};
51use std::thread::{self, JoinHandle};
52
53use crossbeam_channel::{Receiver, Sender, TryRecvError, bounded};
54
55/// Default chunk size used by [`PrefetchReader::new`].
56pub const DEFAULT_CHUNK_SIZE: usize = 4 * 1024 * 1024;
57
58/// Default channel depth used by [`PrefetchReader::new`]. The producer will
59/// keep up to this many filled chunks buffered ahead of the consumer.
60pub const DEFAULT_PREFETCH_DEPTH: usize = 4;
61
62/// How far ahead (in bytes) the producer thread asks the kernel to page in via
63/// `posix_fadvise(POSIX_FADV_WILLNEED)` after each chunk fill. Only used when
64/// the reader was constructed via [`PrefetchReader::from_file`], which captures
65/// the raw fd needed for the syscall. 128 MiB is generous enough to cover
66/// ~1 second of EBS gp3 baseline throughput (125 MiB/s), ensuring pages are
67/// warm by the time the producer's `read()` reaches them.
68///
69/// Stored as `i64` to match `posix_fadvise`'s `off_t` signature directly.
70const WILLNEED_LOOKAHEAD: i64 = 128 * 1024 * 1024;
71
72/// An item handed from the producer thread to the consumer. Carries either a
73/// filled chunk of bytes or a terminal I/O error.
74type Item = io::Result<Vec<u8>>;
75
76/// A `Read` adapter that performs asynchronous userspace prefetch on a
77/// dedicated background thread.
78///
79/// See the [module docs](self) for the rationale and lifecycle model.
80///
81/// # Example
82///
83/// ```
84/// use std::io::{Cursor, Read};
85/// use fgumi_lib::prefetch_reader::PrefetchReader;
86///
87/// let data: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect();
88/// let mut reader = PrefetchReader::new(Cursor::new(data.clone()));
89/// let mut out = Vec::new();
90/// reader.read_to_end(&mut out).unwrap();
91/// assert_eq!(out, data);
92/// ```
93#[derive(Debug)]
94pub struct PrefetchReader {
95 /// The chunk currently being served out to callers of `read`.
96 ///
97 /// Stored as `(data, position)`. `None` means we need to pull the next
98 /// chunk from the channel on the next `read` call.
99 current: Option<(Vec<u8>, usize)>,
100
101 /// Receiving half of the prefetch channel. Held in `Option` so we can
102 /// drop it explicitly in `Drop::drop` before joining the producer thread;
103 /// dropping the receiver is what causes the producer's `send` to fail
104 /// with `Disconnected`, which in turn terminates the producer loop.
105 rx: Option<Receiver<Item>>,
106
107 /// Join handle for the producer thread. `None` after join in `Drop`.
108 handle: Option<JoinHandle<()>>,
109
110 /// Number of times a consumer `read` call had to block on the channel
111 /// because the producer had not yet delivered the next chunk. A high
112 /// value relative to total reads suggests the prefetch depth is too
113 /// shallow or the consumer is faster than the producer.
114 consumer_stalls: u64,
115
116 /// Total bytes handed back to callers of `read` so far.
117 bytes_consumed: u64,
118}
119
120impl PrefetchReader {
121 /// Construct a `PrefetchReader` from an arbitrary reader with default
122 /// chunk size and prefetch depth.
123 ///
124 /// No `posix_fadvise(WILLNEED)` hints are issued because the inner reader
125 /// may not be backed by a file descriptor. Use [`from_file`](Self::from_file)
126 /// when wrapping a [`File`] to get kernel page-cache warming for free.
127 #[must_use]
128 pub fn new<R: Read + Send + 'static>(inner: R) -> Self {
129 Self::with_config(inner, DEFAULT_CHUNK_SIZE, DEFAULT_PREFETCH_DEPTH)
130 }
131
132 /// Construct a `PrefetchReader` from an arbitrary reader with explicit
133 /// `chunk_size` and `prefetch_depth`.
134 ///
135 /// Steady-state memory usage is bounded by `chunk_size * prefetch_depth`
136 /// (plus one chunk being filled by the producer and one being consumed).
137 ///
138 /// # Panics
139 ///
140 /// Panics if `chunk_size == 0` or `prefetch_depth == 0`.
141 #[must_use]
142 pub fn with_config<R: Read + Send + 'static>(
143 inner: R,
144 chunk_size: usize,
145 prefetch_depth: usize,
146 ) -> Self {
147 Self::build(inner, chunk_size, prefetch_depth, None)
148 }
149
150 /// Construct a `PrefetchReader` from a [`File`] with default chunk size
151 /// and prefetch depth.
152 ///
153 /// On Linux the producer thread will call
154 /// `posix_fadvise(POSIX_FADV_WILLNEED)` after each chunk fill to
155 /// proactively page in the next [`WILLNEED_LOOKAHEAD`] bytes,
156 /// making the reader independent of the kernel's default read-ahead
157 /// window (`read_ahead_kb`). On non-Linux platforms this behaves
158 /// identically to [`new`](Self::new).
159 ///
160 /// The file should be positioned at offset 0 when passed in — the
161 /// WILLNEED hints assume reading starts from the beginning of the file.
162 #[must_use]
163 pub fn from_file(file: File) -> Self {
164 Self::from_file_with_config(file, DEFAULT_CHUNK_SIZE, DEFAULT_PREFETCH_DEPTH)
165 }
166
167 /// Construct a `PrefetchReader` from a [`File`] with explicit `chunk_size`
168 /// and `prefetch_depth`, plus kernel WILLNEED hints on Linux.
169 ///
170 /// # Panics
171 ///
172 /// Panics if `chunk_size == 0` or `prefetch_depth == 0`.
173 #[must_use]
174 pub fn from_file_with_config(file: File, chunk_size: usize, prefetch_depth: usize) -> Self {
175 let hint_fd = crate::os_hints::hint_fd(&file);
176 Self::build(file, chunk_size, prefetch_depth, hint_fd)
177 }
178
179 /// Shared construction logic.
180 fn build<R: Read + Send + 'static>(
181 inner: R,
182 chunk_size: usize,
183 prefetch_depth: usize,
184 hint_fd: Option<i32>,
185 ) -> Self {
186 assert!(chunk_size > 0, "PrefetchReader chunk_size must be > 0");
187 assert!(prefetch_depth > 0, "PrefetchReader prefetch_depth must be > 0");
188
189 let (tx, rx) = bounded::<Item>(prefetch_depth);
190 let handle = thread::Builder::new()
191 .name("fgumi-prefetch".to_string())
192 .spawn(move || producer_main(inner, chunk_size, hint_fd, &tx))
193 .expect("failed to spawn fgumi-prefetch thread");
194
195 Self {
196 current: None,
197 rx: Some(rx),
198 handle: Some(handle),
199 consumer_stalls: 0,
200 bytes_consumed: 0,
201 }
202 }
203
204 /// Total bytes served to callers of [`Read::read`] so far.
205 #[must_use]
206 pub fn bytes_consumed(&self) -> u64 {
207 self.bytes_consumed
208 }
209
210 /// Number of times a `read` call had to block waiting for the producer
211 /// to deliver the next chunk. Useful as a prototype-phase signal for
212 /// whether [`DEFAULT_PREFETCH_DEPTH`] is large enough.
213 #[must_use]
214 pub fn consumer_stalls(&self) -> u64 {
215 self.consumer_stalls
216 }
217}
218
219/// Entry point for the producer thread. Wraps [`producer_loop`] so that a
220/// panic inside the inner reader surfaces on the consumer side as an
221/// [`io::Error`] instead of a silently-truncated stream.
222fn producer_main<R: Read>(inner: R, chunk_size: usize, hint_fd: Option<i32>, tx: &Sender<Item>) {
223 let tx_for_panic = tx.clone();
224 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
225 producer_loop(inner, chunk_size, hint_fd, tx);
226 }));
227 if let Err(payload) = result {
228 let msg = match payload.downcast_ref::<&'static str>() {
229 Some(s) => (*s).to_string(),
230 None => match payload.downcast_ref::<String>() {
231 Some(s) => s.clone(),
232 None => "fgumi-prefetch producer thread panicked".to_string(),
233 },
234 };
235 let _ = tx_for_panic.send(Err(io::Error::other(msg)));
236 }
237}
238
239/// The actual producer loop. Allocates a `chunk_size`-byte buffer, issues a
240/// single `read()` call, and sends whatever bytes were returned through the
241/// channel. Emitting after the first successful read (rather than looping to
242/// fill the full chunk) ensures that short reads from pipes or throttled
243/// readers are delivered promptly. For file-backed readers, the OS typically
244/// returns a full page-aligned read in one call, so this rarely increases
245/// channel traffic. Tolerates `Interrupted` errors. Exits on EOF, I/O error,
246/// or when the consumer drops the receiver.
247///
248/// When `hint_fd` is `Some`, the loop calls
249/// `posix_fadvise(POSIX_FADV_WILLNEED)` after each chunk to proactively page
250/// in the next [`WILLNEED_LOOKAHEAD`] bytes. This removes the
251/// dependence on the kernel's default `read_ahead_kb` setting.
252fn producer_loop<R: Read>(
253 mut inner: R,
254 chunk_size: usize,
255 hint_fd: Option<i32>,
256 tx: &Sender<Item>,
257) {
258 let mut position: i64 = 0;
259
260 loop {
261 let mut buf = vec![0u8; chunk_size];
262 let mut filled: usize = 0;
263 let mut eof = false;
264
265 loop {
266 match inner.read(&mut buf[filled..]) {
267 Ok(0) => {
268 eof = true;
269 break;
270 }
271 Ok(n) => {
272 filled += n;
273 break;
274 }
275 Err(e) if e.kind() == io::ErrorKind::Interrupted => (),
276 Err(e) => {
277 // Flush any bytes we already managed to read *before*
278 // surfacing the error. Otherwise a mid-chunk failure
279 // would silently discard data that was successfully
280 // returned by the inner reader.
281 if filled > 0 {
282 buf.truncate(filled);
283 let _ = tx.send(Ok(buf));
284 }
285 let _ = tx.send(Err(e));
286 return;
287 }
288 }
289 }
290
291 if filled == 0 && eof {
292 return;
293 }
294
295 position = position.saturating_add(i64::try_from(filled).unwrap_or(i64::MAX));
296
297 // Ask the kernel to start paging in bytes ahead of our current
298 // position. The call is non-blocking — the kernel initiates the I/O
299 // and returns immediately.
300 if let Some(fd) = hint_fd {
301 crate::os_hints::advise_willneed_raw(fd, position, WILLNEED_LOOKAHEAD);
302 }
303
304 buf.truncate(filled);
305 if tx.send(Ok(buf)).is_err() {
306 // Consumer dropped the receiver; shutdown cleanly.
307 return;
308 }
309
310 if eof {
311 return;
312 }
313 }
314}
315
316impl Read for PrefetchReader {
317 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
318 if buf.is_empty() {
319 return Ok(0);
320 }
321
322 loop {
323 // Serve from the current chunk if any bytes remain.
324 if let Some((data, pos)) = self.current.as_mut() {
325 if *pos < data.len() {
326 let n = std::cmp::min(buf.len(), data.len() - *pos);
327 buf[..n].copy_from_slice(&data[*pos..*pos + n]);
328 *pos += n;
329 self.bytes_consumed += n as u64;
330 return Ok(n);
331 }
332 // Current chunk exhausted; drop it and pull the next one.
333 self.current = None;
334 }
335
336 // No current chunk. Pull the next one from the producer.
337 let Some(rx) = self.rx.as_ref() else {
338 return Ok(0);
339 };
340
341 // Fast path: a chunk is already waiting.
342 let item = match rx.try_recv() {
343 Ok(item) => item,
344 Err(TryRecvError::Disconnected) => return Ok(0),
345 Err(TryRecvError::Empty) => {
346 // Slow path: producer hasn't delivered yet; we block.
347 self.consumer_stalls += 1;
348 match rx.recv() {
349 Ok(item) => item,
350 Err(_) => return Ok(0),
351 }
352 }
353 };
354
355 match item {
356 Ok(data) if !data.is_empty() => self.current = Some((data, 0)),
357 Ok(_) => {} // defensive: skip empty chunks
358 Err(e) => return Err(e),
359 }
360 }
361 }
362}
363
364impl Drop for PrefetchReader {
365 fn drop(&mut self) {
366 // Drop the receiver first so the producer's next `send` returns
367 // `Disconnected` and the loop exits. Then join the thread to
368 // guarantee no leak.
369 self.rx = None;
370 self.current = None;
371 if let Some(handle) = self.handle.take() {
372 if handle.join().is_err() {
373 log::debug!("fgumi-prefetch producer thread panicked during shutdown");
374 }
375 }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use proptest::prelude::*;
383 use std::io::Cursor;
384 use std::sync::Arc;
385 use std::sync::atomic::{AtomicBool, Ordering};
386
387 /// Build a deterministic byte stream of the given length.
388 fn sample_bytes(len: usize) -> Vec<u8> {
389 (0..len).map(|i| u8::try_from(i % 251).expect("mod 251 fits in u8")).collect()
390 }
391
392 #[test]
393 fn empty_input_returns_zero_immediately() {
394 let mut reader = PrefetchReader::new(Cursor::new(Vec::<u8>::new()));
395 let mut buf = [0u8; 16];
396 assert_eq!(reader.read(&mut buf).unwrap(), 0);
397 // Bytes counter unchanged.
398 assert_eq!(reader.bytes_consumed(), 0);
399 }
400
401 #[test]
402 fn from_file_reads_correctly() {
403 use std::io::Write;
404 let data = sample_bytes(50_000);
405 let mut tmp = tempfile::NamedTempFile::new().expect("create temp file");
406 tmp.write_all(&data).expect("write temp file");
407 let file = File::open(tmp.path()).expect("reopen temp file");
408 let mut reader = PrefetchReader::from_file(file);
409 let mut out = Vec::new();
410 reader.read_to_end(&mut out).unwrap();
411 assert_eq!(out, data);
412 assert_eq!(reader.bytes_consumed(), data.len() as u64);
413 }
414
415 #[test]
416 fn read_to_end_small_matches_input() {
417 let data = b"hello, fgumi prefetch".to_vec();
418 let mut reader = PrefetchReader::new(Cursor::new(data.clone()));
419 let mut out = Vec::new();
420 reader.read_to_end(&mut out).unwrap();
421 assert_eq!(out, data);
422 assert_eq!(reader.bytes_consumed(), data.len() as u64);
423 }
424
425 #[test]
426 fn read_to_end_large_matches_input() {
427 // ~1 MiB — exercises multiple chunks through the channel.
428 let data = sample_bytes(1_000_003);
429 let mut reader = PrefetchReader::with_config(Cursor::new(data.clone()), 8 * 1024, 2);
430 let mut out = Vec::new();
431 reader.read_to_end(&mut out).unwrap();
432 assert_eq!(out, data);
433 assert_eq!(reader.bytes_consumed(), data.len() as u64);
434 }
435
436 #[test]
437 fn tiny_chunk_size_and_many_small_reads() {
438 let data = sample_bytes(5_000);
439 let mut reader = PrefetchReader::with_config(Cursor::new(data.clone()), 17, 2);
440 let mut out = Vec::new();
441 let mut tmp = [0u8; 7];
442 loop {
443 let n = reader.read(&mut tmp).unwrap();
444 if n == 0 {
445 break;
446 }
447 out.extend_from_slice(&tmp[..n]);
448 }
449 assert_eq!(out, data);
450 }
451
452 #[test]
453 fn repeated_read_after_eof_returns_zero_forever() {
454 let mut reader = PrefetchReader::new(Cursor::new(b"abc".to_vec()));
455 let mut out = Vec::new();
456 reader.read_to_end(&mut out).unwrap();
457 assert_eq!(out, b"abc");
458
459 let mut tmp = [0u8; 8];
460 for _ in 0..10 {
461 assert_eq!(reader.read(&mut tmp).unwrap(), 0);
462 }
463 }
464
465 #[test]
466 fn drop_before_consuming_does_not_hang() {
467 // 1 MB of data, tiny chunks, shallow channel: the producer will fill
468 // the channel quickly and then block on send. When we drop the
469 // reader, the send must unblock so `Drop::join` returns.
470 let data = vec![0u8; 1_000_000];
471 let _reader = PrefetchReader::with_config(Cursor::new(data), 4 * 1024, 2);
472 // Drop happens at end of scope.
473 }
474
475 #[test]
476 fn partial_read_then_drop_does_not_hang() {
477 let data = sample_bytes(500_000);
478 let mut reader = PrefetchReader::with_config(Cursor::new(data), 4 * 1024, 2);
479 let mut tmp = [0u8; 32];
480 // Read a bit, then drop without finishing.
481 let n = reader.read(&mut tmp).unwrap();
482 assert!(n > 0);
483 }
484
485 #[test]
486 fn error_from_inner_reader_propagates_once() {
487 struct AlwaysErr;
488 impl Read for AlwaysErr {
489 fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
490 Err(io::Error::new(io::ErrorKind::PermissionDenied, "nope"))
491 }
492 }
493 let mut reader = PrefetchReader::new(AlwaysErr);
494 let mut buf = [0u8; 16];
495 let err = reader.read(&mut buf).expect_err("first read should error");
496 assert_eq!(err.kind(), io::ErrorKind::PermissionDenied);
497 // After the error is propagated, subsequent reads see the producer
498 // exit and return EOF.
499 assert_eq!(reader.read(&mut buf).unwrap(), 0);
500 }
501
502 #[test]
503 fn error_after_some_data_delivers_data_then_error() {
504 // Reader that returns one chunk of data, then an error forever.
505 struct DataThenErr {
506 sent: bool,
507 }
508 impl Read for DataThenErr {
509 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
510 if self.sent {
511 Err(io::Error::other("subsequent error"))
512 } else {
513 self.sent = true;
514 let n = buf.len().min(128);
515 for (i, b) in buf.iter_mut().take(n).enumerate() {
516 *b = u8::try_from(i).expect("n <= 128 so i fits in u8");
517 }
518 Ok(n)
519 }
520 }
521 }
522 // chunk_size > 128 forces the producer to call read() again after
523 // the first short read, which triggers the error.
524 let mut reader = PrefetchReader::with_config(DataThenErr { sent: false }, 1024, 2);
525 let mut out = Vec::new();
526 let mut tmp = [0u8; 256];
527
528 // First read should deliver the 128 bytes of data.
529 let n = reader.read(&mut tmp).unwrap();
530 assert_eq!(n, 128);
531 out.extend_from_slice(&tmp[..n]);
532 assert_eq!(out.len(), 128);
533
534 // Next read should surface the error.
535 let err = reader.read(&mut tmp).expect_err("second read should error");
536 assert!(matches!(err.kind(), io::ErrorKind::Other | io::ErrorKind::UnexpectedEof));
537 }
538
539 #[test]
540 fn interrupted_errors_are_retried_transparently() {
541 struct FlakyThenEof {
542 call: usize,
543 }
544 impl Read for FlakyThenEof {
545 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
546 self.call += 1;
547 if self.call <= 3 {
548 return Err(io::Error::new(io::ErrorKind::Interrupted, "try again"));
549 }
550 if self.call == 4 {
551 let n = buf.len().min(10);
552 for (i, b) in buf.iter_mut().take(n).enumerate() {
553 *b = u8::try_from(i + 1).expect("n <= 10 so i+1 fits in u8");
554 }
555 return Ok(n);
556 }
557 Ok(0)
558 }
559 }
560 let mut reader = PrefetchReader::with_config(FlakyThenEof { call: 0 }, 64, 2);
561 let mut out = Vec::new();
562 reader.read_to_end(&mut out).unwrap();
563 assert_eq!(out, (1..=10).collect::<Vec<u8>>());
564 }
565
566 #[test]
567 fn drop_joins_producer_thread() {
568 /// A reader that sets a flag inside its `Drop` impl so we can
569 /// observe that the producer thread has actually been torn down.
570 struct Tracked {
571 flag: Arc<AtomicBool>,
572 data: Vec<u8>,
573 pos: usize,
574 }
575 impl Read for Tracked {
576 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
577 if self.pos >= self.data.len() {
578 return Ok(0);
579 }
580 let n = buf.len().min(self.data.len() - self.pos);
581 buf[..n].copy_from_slice(&self.data[self.pos..self.pos + n]);
582 self.pos += n;
583 Ok(n)
584 }
585 }
586 impl Drop for Tracked {
587 fn drop(&mut self) {
588 self.flag.store(true, Ordering::SeqCst);
589 }
590 }
591
592 let flag = Arc::new(AtomicBool::new(false));
593 let inner = Tracked { flag: Arc::clone(&flag), data: sample_bytes(1024), pos: 0 };
594 {
595 let mut reader = PrefetchReader::with_config(inner, 64, 2);
596 let mut out = Vec::new();
597 reader.read_to_end(&mut out).unwrap();
598 assert_eq!(out.len(), 1024);
599 // reader is dropped here; `Tracked::drop` must fire on the
600 // producer thread before `Drop::drop` returns.
601 }
602 assert!(
603 flag.load(Ordering::SeqCst),
604 "producer thread should have dropped the inner reader"
605 );
606 }
607
608 proptest! {
609 /// Property test: for any input bytes, any chunk size, any prefetch
610 /// depth, and any consumer read size, `PrefetchReader` yields exactly
611 /// the same byte sequence as a plain in-memory cursor.
612 #[test]
613 fn prop_byte_identical_to_cursor(
614 data in prop::collection::vec(any::<u8>(), 0..8_192),
615 chunk_size in 1usize..2_048,
616 depth in 1usize..6,
617 read_size in 1usize..256,
618 ) {
619 let expected = data.clone();
620 let mut reader = PrefetchReader::with_config(
621 Cursor::new(data),
622 chunk_size,
623 depth,
624 );
625 let mut out = Vec::with_capacity(expected.len());
626 let mut tmp = vec![0u8; read_size];
627 loop {
628 let n = reader.read(&mut tmp).expect("read should not fail on Cursor");
629 if n == 0 {
630 break;
631 }
632 out.extend_from_slice(&tmp[..n]);
633 }
634 prop_assert_eq!(out, expected);
635 }
636 }
637}