Skip to main content

fs_core/
stream.rs

1//! `std::io::Read` + `std::io::Seek` adapter over any [`BlockRead`].
2//!
3//! Every consumer that wants to stream a positioned-read device
4//! top-to-bottom (hashing, copying, burning, feeding into a parser that
5//! expects `Read`) otherwise reinvents the same cursor wrapper: track the
6//! current offset, call `read_at`, advance, clamp at `size_bytes`, signal
7//! EOF. [`BlockReadStreamer`] is that wrapper, once.
8//!
9//! Generic over `T: BlockRead` so the parent can be owned, `Arc`-held, or
10//! borrowed:
11//!
12//! ```ignore
13//! use fs_core::{BlockReadStreamer, FileDevice};
14//! use std::io::Read;
15//!
16//! let dev = FileDevice::open("disk.img")?;
17//! let mut stream = BlockReadStreamer::new(dev);
18//! let mut hasher = sha2::Sha256::new();
19//! std::io::copy(&mut stream, &mut hasher)?;
20//! ```
21//!
22//! Short-read semantics match `std::io::Read`: a read past `size_bytes()`
23//! is clamped, not an error; a read at or past the end returns `Ok(0)`.
24
25use crate::block::BlockRead;
26use crate::error::Error;
27use std::io::{self, Read, Seek, SeekFrom};
28
29/// Sequential `Read` + `Seek` adapter over any `BlockRead`.
30///
31/// Holds the parent by value. Use:
32/// - `BlockReadStreamer<MyDevice>` to own a concrete device.
33/// - `BlockReadStreamer<std::sync::Arc<dyn BlockRead>>` for shared ownership.
34/// - `BlockReadStreamer<&dyn BlockRead>` (or `&MyDevice`) to borrow.
35pub struct BlockReadStreamer<T: BlockRead> {
36    inner: T,
37    pos: u64,
38}
39
40impl<T: BlockRead> BlockReadStreamer<T> {
41    /// New streamer starting at offset 0.
42    pub fn new(inner: T) -> Self {
43        Self { inner, pos: 0 }
44    }
45
46    /// New streamer starting at the given byte offset. The position is
47    /// not bounded against `size_bytes()` — a position past the end is
48    /// legal (just reads will return `Ok(0)`), matching `std::io::Seek`.
49    pub fn with_position(inner: T, pos: u64) -> Self {
50        Self { inner, pos }
51    }
52
53    /// Current byte offset within the device.
54    pub fn position(&self) -> u64 {
55        self.pos
56    }
57
58    /// Borrow the wrapped device.
59    pub fn get_ref(&self) -> &T {
60        &self.inner
61    }
62
63    /// Consume the streamer and return the wrapped device.
64    pub fn into_inner(self) -> T {
65        self.inner
66    }
67}
68
69impl<T: BlockRead> Read for BlockReadStreamer<T> {
70    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71        let size = self.inner.size_bytes();
72        if self.pos >= size {
73            return Ok(0);
74        }
75        let remaining = size - self.pos;
76        let n = std::cmp::min(buf.len() as u64, remaining) as usize;
77        if n == 0 {
78            return Ok(0);
79        }
80        self.inner
81            .read_at(self.pos, &mut buf[..n])
82            .map_err(fs_core_error_to_io)?;
83        self.pos += n as u64;
84        Ok(n)
85    }
86}
87
88impl<T: BlockRead> Seek for BlockReadStreamer<T> {
89    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
90        // `SeekFrom::End`/`Current` with a negative offset that would
91        // place the cursor before byte 0 returns an InvalidInput error,
92        // mirroring `std::io::Cursor`. Seeking *past* the end is allowed
93        // — the next read returns `Ok(0)`.
94        let new_pos = match pos {
95            SeekFrom::Start(n) => n,
96            SeekFrom::End(n) => offset_from(self.inner.size_bytes(), n)?,
97            SeekFrom::Current(n) => offset_from(self.pos, n)?,
98        };
99        self.pos = new_pos;
100        Ok(new_pos)
101    }
102}
103
104fn offset_from(base: u64, delta: i64) -> io::Result<u64> {
105    if delta >= 0 {
106        base.checked_add(delta as u64)
107            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "seek offset overflows u64"))
108    } else {
109        let abs = delta.unsigned_abs();
110        base.checked_sub(abs).ok_or_else(|| {
111            io::Error::new(
112                io::ErrorKind::InvalidInput,
113                "seek would place cursor before byte 0",
114            )
115        })
116    }
117}
118
119fn fs_core_error_to_io(e: Error) -> io::Error {
120    match e {
121        Error::Io(io) => io,
122        Error::ShortRead { offset, want, got } => io::Error::new(
123            io::ErrorKind::UnexpectedEof,
124            format!("short read at {offset}: wanted {want} got {got}"),
125        ),
126        Error::OutOfBounds { offset, len, size } => io::Error::new(
127            io::ErrorKind::UnexpectedEof,
128            format!("{offset}+{len} past device size {size}"),
129        ),
130        Error::ReadOnly => io::Error::new(io::ErrorKind::PermissionDenied, "device is read-only"),
131        Error::Custom(s) => io::Error::other(s),
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use crate::error::Result as FsResult;
139    use std::sync::{Arc, Mutex};
140
141    /// In-memory `BlockRead` for tests. Read past end returns `ShortRead`.
142    struct Bytes(Mutex<Vec<u8>>);
143    impl BlockRead for Bytes {
144        fn read_at(&self, offset: u64, buf: &mut [u8]) -> FsResult<()> {
145            let b = self.0.lock().unwrap();
146            let start = offset as usize;
147            let end = start + buf.len();
148            if end > b.len() {
149                return Err(Error::ShortRead {
150                    offset,
151                    want: buf.len(),
152                    got: b.len().saturating_sub(start),
153                });
154            }
155            buf.copy_from_slice(&b[start..end]);
156            Ok(())
157        }
158        fn size_bytes(&self) -> u64 {
159            self.0.lock().unwrap().len() as u64
160        }
161    }
162
163    /// `BlockRead` that always errors with `Custom` — used to verify
164    /// error propagation through the streamer.
165    struct AlwaysFails;
166    impl BlockRead for AlwaysFails {
167        fn read_at(&self, _offset: u64, _buf: &mut [u8]) -> FsResult<()> {
168            Err(Error::Custom("simulated failure".into()))
169        }
170        fn size_bytes(&self) -> u64 {
171            1024
172        }
173    }
174
175    fn fixture() -> Bytes {
176        let mut v = vec![0u8; 32];
177        for (i, b) in v.iter_mut().enumerate() {
178            *b = i as u8;
179        }
180        Bytes(Mutex::new(v))
181    }
182
183    #[test]
184    fn read_to_end_returns_full_contents() {
185        let mut s = BlockReadStreamer::new(fixture());
186        let mut out = Vec::new();
187        let n = s.read_to_end(&mut out).unwrap();
188        assert_eq!(n, 32);
189        assert_eq!(out.len(), 32);
190        assert_eq!(out[0], 0);
191        assert_eq!(out[31], 31);
192    }
193
194    #[test]
195    fn partial_end_read_is_clamped_not_errored() {
196        let mut s = BlockReadStreamer::with_position(fixture(), 30);
197        let mut buf = [0u8; 16];
198        let n = s.read(&mut buf).unwrap();
199        assert_eq!(n, 2);
200        assert_eq!(&buf[..2], &[30, 31]);
201        assert_eq!(s.position(), 32);
202    }
203
204    #[test]
205    fn read_at_eof_returns_zero() {
206        let mut s = BlockReadStreamer::with_position(fixture(), 32);
207        let mut buf = [0u8; 8];
208        assert_eq!(s.read(&mut buf).unwrap(), 0);
209        // Still zero on subsequent reads.
210        assert_eq!(s.read(&mut buf).unwrap(), 0);
211    }
212
213    #[test]
214    fn read_past_eof_position_returns_zero() {
215        let mut s = BlockReadStreamer::with_position(fixture(), 9_999);
216        let mut buf = [0u8; 8];
217        assert_eq!(s.read(&mut buf).unwrap(), 0);
218    }
219
220    #[test]
221    fn zero_length_buf_returns_zero() {
222        let mut s = BlockReadStreamer::new(fixture());
223        let mut buf: [u8; 0] = [];
224        assert_eq!(s.read(&mut buf).unwrap(), 0);
225        assert_eq!(s.position(), 0);
226    }
227
228    #[test]
229    fn position_advances_after_read() {
230        let mut s = BlockReadStreamer::new(fixture());
231        let mut buf = [0u8; 4];
232        s.read_exact(&mut buf).unwrap();
233        assert_eq!(s.position(), 4);
234        assert_eq!(buf, [0, 1, 2, 3]);
235
236        s.read_exact(&mut buf).unwrap();
237        assert_eq!(s.position(), 8);
238        assert_eq!(buf, [4, 5, 6, 7]);
239    }
240
241    #[test]
242    fn seek_start_jumps_absolute() {
243        let mut s = BlockReadStreamer::new(fixture());
244        let p = s.seek(SeekFrom::Start(10)).unwrap();
245        assert_eq!(p, 10);
246        assert_eq!(s.position(), 10);
247        let mut buf = [0u8; 2];
248        s.read_exact(&mut buf).unwrap();
249        assert_eq!(buf, [10, 11]);
250    }
251
252    #[test]
253    fn seek_end_jumps_relative_to_size() {
254        let mut s = BlockReadStreamer::new(fixture());
255        let p = s.seek(SeekFrom::End(-4)).unwrap();
256        assert_eq!(p, 28);
257        let mut buf = [0u8; 4];
258        s.read_exact(&mut buf).unwrap();
259        assert_eq!(buf, [28, 29, 30, 31]);
260    }
261
262    #[test]
263    fn seek_current_jumps_relative_to_cursor() {
264        let mut s = BlockReadStreamer::with_position(fixture(), 10);
265        let p = s.seek(SeekFrom::Current(5)).unwrap();
266        assert_eq!(p, 15);
267        let p = s.seek(SeekFrom::Current(-3)).unwrap();
268        assert_eq!(p, 12);
269    }
270
271    #[test]
272    fn seek_past_end_is_allowed_then_read_returns_zero() {
273        let mut s = BlockReadStreamer::new(fixture());
274        assert_eq!(s.seek(SeekFrom::Start(1_000_000)).unwrap(), 1_000_000);
275        let mut buf = [0u8; 4];
276        assert_eq!(s.read(&mut buf).unwrap(), 0);
277    }
278
279    #[test]
280    fn seek_before_zero_is_invalid_input() {
281        let mut s = BlockReadStreamer::new(fixture());
282        let err = s.seek(SeekFrom::Current(-1)).unwrap_err();
283        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
284
285        let err = s.seek(SeekFrom::End(-99_999)).unwrap_err();
286        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
287    }
288
289    #[test]
290    fn works_through_arc_dyn_blockread() {
291        let dev: Arc<dyn BlockRead> = Arc::new(fixture());
292        let mut s = BlockReadStreamer::new(dev);
293        let mut out = Vec::new();
294        s.read_to_end(&mut out).unwrap();
295        assert_eq!(out.len(), 32);
296    }
297
298    #[test]
299    fn works_through_borrowed_reference() {
300        let dev = fixture();
301        {
302            let mut s = BlockReadStreamer::new(&dev as &dyn BlockRead);
303            let mut buf = [0u8; 8];
304            s.read_exact(&mut buf).unwrap();
305            assert_eq!(buf, [0, 1, 2, 3, 4, 5, 6, 7]);
306        }
307        // `dev` is still usable after the streamer goes out of scope.
308        assert_eq!(dev.size_bytes(), 32);
309    }
310
311    #[test]
312    fn into_inner_returns_wrapped_device() {
313        let s = BlockReadStreamer::new(fixture());
314        let inner = s.into_inner();
315        assert_eq!(inner.size_bytes(), 32);
316    }
317
318    #[test]
319    fn get_ref_exposes_inner_without_consuming() {
320        let s = BlockReadStreamer::new(fixture());
321        assert_eq!(s.get_ref().size_bytes(), 32);
322        // Streamer still usable.
323        assert_eq!(s.position(), 0);
324    }
325
326    #[test]
327    fn error_from_inner_propagates_as_io_error() {
328        let mut s = BlockReadStreamer::new(AlwaysFails);
329        let mut buf = [0u8; 8];
330        let err = s.read(&mut buf).unwrap_err();
331        // `Custom` is mapped via `io::Error::other`, whose kind is `Other`.
332        assert_eq!(err.kind(), io::ErrorKind::Other);
333        assert!(err.to_string().contains("simulated failure"));
334    }
335}