1use crate::block::BlockRead;
26use crate::error::Error;
27use std::io::{self, Read, Seek, SeekFrom};
28
29pub struct BlockReadStreamer<T: BlockRead> {
36 inner: T,
37 pos: u64,
38}
39
40impl<T: BlockRead> BlockReadStreamer<T> {
41 pub fn new(inner: T) -> Self {
43 Self { inner, pos: 0 }
44 }
45
46 pub fn with_position(inner: T, pos: u64) -> Self {
50 Self { inner, pos }
51 }
52
53 pub fn position(&self) -> u64 {
55 self.pos
56 }
57
58 pub fn get_ref(&self) -> &T {
60 &self.inner
61 }
62
63 pub fn into_inner(self) -> T {
65 self.inner
66 }
67}
68
69impl<T: BlockRead> Read for BlockReadStreamer<T> {
70 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71 let size = self.inner.size_bytes();
72 if self.pos >= size {
73 return Ok(0);
74 }
75 let remaining = size - self.pos;
76 let n = std::cmp::min(buf.len() as u64, remaining) as usize;
77 if n == 0 {
78 return Ok(0);
79 }
80 self.inner
81 .read_at(self.pos, &mut buf[..n])
82 .map_err(fs_core_error_to_io)?;
83 self.pos += n as u64;
84 Ok(n)
85 }
86}
87
88impl<T: BlockRead> Seek for BlockReadStreamer<T> {
89 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
90 let new_pos = match pos {
95 SeekFrom::Start(n) => n,
96 SeekFrom::End(n) => offset_from(self.inner.size_bytes(), n)?,
97 SeekFrom::Current(n) => offset_from(self.pos, n)?,
98 };
99 self.pos = new_pos;
100 Ok(new_pos)
101 }
102}
103
104fn offset_from(base: u64, delta: i64) -> io::Result<u64> {
105 if delta >= 0 {
106 base.checked_add(delta as u64)
107 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "seek offset overflows u64"))
108 } else {
109 let abs = delta.unsigned_abs();
110 base.checked_sub(abs).ok_or_else(|| {
111 io::Error::new(
112 io::ErrorKind::InvalidInput,
113 "seek would place cursor before byte 0",
114 )
115 })
116 }
117}
118
119fn fs_core_error_to_io(e: Error) -> io::Error {
120 match e {
121 Error::Io(io) => io,
122 Error::ShortRead { offset, want, got } => io::Error::new(
123 io::ErrorKind::UnexpectedEof,
124 format!("short read at {offset}: wanted {want} got {got}"),
125 ),
126 Error::OutOfBounds { offset, len, size } => io::Error::new(
127 io::ErrorKind::UnexpectedEof,
128 format!("{offset}+{len} past device size {size}"),
129 ),
130 Error::ReadOnly => io::Error::new(io::ErrorKind::PermissionDenied, "device is read-only"),
131 Error::Custom(s) => io::Error::other(s),
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::error::Result as FsResult;
139 use std::sync::{Arc, Mutex};
140
141 struct Bytes(Mutex<Vec<u8>>);
143 impl BlockRead for Bytes {
144 fn read_at(&self, offset: u64, buf: &mut [u8]) -> FsResult<()> {
145 let b = self.0.lock().unwrap();
146 let start = offset as usize;
147 let end = start + buf.len();
148 if end > b.len() {
149 return Err(Error::ShortRead {
150 offset,
151 want: buf.len(),
152 got: b.len().saturating_sub(start),
153 });
154 }
155 buf.copy_from_slice(&b[start..end]);
156 Ok(())
157 }
158 fn size_bytes(&self) -> u64 {
159 self.0.lock().unwrap().len() as u64
160 }
161 }
162
163 struct AlwaysFails;
166 impl BlockRead for AlwaysFails {
167 fn read_at(&self, _offset: u64, _buf: &mut [u8]) -> FsResult<()> {
168 Err(Error::Custom("simulated failure".into()))
169 }
170 fn size_bytes(&self) -> u64 {
171 1024
172 }
173 }
174
175 fn fixture() -> Bytes {
176 let mut v = vec![0u8; 32];
177 for (i, b) in v.iter_mut().enumerate() {
178 *b = i as u8;
179 }
180 Bytes(Mutex::new(v))
181 }
182
183 #[test]
184 fn read_to_end_returns_full_contents() {
185 let mut s = BlockReadStreamer::new(fixture());
186 let mut out = Vec::new();
187 let n = s.read_to_end(&mut out).unwrap();
188 assert_eq!(n, 32);
189 assert_eq!(out.len(), 32);
190 assert_eq!(out[0], 0);
191 assert_eq!(out[31], 31);
192 }
193
194 #[test]
195 fn partial_end_read_is_clamped_not_errored() {
196 let mut s = BlockReadStreamer::with_position(fixture(), 30);
197 let mut buf = [0u8; 16];
198 let n = s.read(&mut buf).unwrap();
199 assert_eq!(n, 2);
200 assert_eq!(&buf[..2], &[30, 31]);
201 assert_eq!(s.position(), 32);
202 }
203
204 #[test]
205 fn read_at_eof_returns_zero() {
206 let mut s = BlockReadStreamer::with_position(fixture(), 32);
207 let mut buf = [0u8; 8];
208 assert_eq!(s.read(&mut buf).unwrap(), 0);
209 assert_eq!(s.read(&mut buf).unwrap(), 0);
211 }
212
213 #[test]
214 fn read_past_eof_position_returns_zero() {
215 let mut s = BlockReadStreamer::with_position(fixture(), 9_999);
216 let mut buf = [0u8; 8];
217 assert_eq!(s.read(&mut buf).unwrap(), 0);
218 }
219
220 #[test]
221 fn zero_length_buf_returns_zero() {
222 let mut s = BlockReadStreamer::new(fixture());
223 let mut buf: [u8; 0] = [];
224 assert_eq!(s.read(&mut buf).unwrap(), 0);
225 assert_eq!(s.position(), 0);
226 }
227
228 #[test]
229 fn position_advances_after_read() {
230 let mut s = BlockReadStreamer::new(fixture());
231 let mut buf = [0u8; 4];
232 s.read_exact(&mut buf).unwrap();
233 assert_eq!(s.position(), 4);
234 assert_eq!(buf, [0, 1, 2, 3]);
235
236 s.read_exact(&mut buf).unwrap();
237 assert_eq!(s.position(), 8);
238 assert_eq!(buf, [4, 5, 6, 7]);
239 }
240
241 #[test]
242 fn seek_start_jumps_absolute() {
243 let mut s = BlockReadStreamer::new(fixture());
244 let p = s.seek(SeekFrom::Start(10)).unwrap();
245 assert_eq!(p, 10);
246 assert_eq!(s.position(), 10);
247 let mut buf = [0u8; 2];
248 s.read_exact(&mut buf).unwrap();
249 assert_eq!(buf, [10, 11]);
250 }
251
252 #[test]
253 fn seek_end_jumps_relative_to_size() {
254 let mut s = BlockReadStreamer::new(fixture());
255 let p = s.seek(SeekFrom::End(-4)).unwrap();
256 assert_eq!(p, 28);
257 let mut buf = [0u8; 4];
258 s.read_exact(&mut buf).unwrap();
259 assert_eq!(buf, [28, 29, 30, 31]);
260 }
261
262 #[test]
263 fn seek_current_jumps_relative_to_cursor() {
264 let mut s = BlockReadStreamer::with_position(fixture(), 10);
265 let p = s.seek(SeekFrom::Current(5)).unwrap();
266 assert_eq!(p, 15);
267 let p = s.seek(SeekFrom::Current(-3)).unwrap();
268 assert_eq!(p, 12);
269 }
270
271 #[test]
272 fn seek_past_end_is_allowed_then_read_returns_zero() {
273 let mut s = BlockReadStreamer::new(fixture());
274 assert_eq!(s.seek(SeekFrom::Start(1_000_000)).unwrap(), 1_000_000);
275 let mut buf = [0u8; 4];
276 assert_eq!(s.read(&mut buf).unwrap(), 0);
277 }
278
279 #[test]
280 fn seek_before_zero_is_invalid_input() {
281 let mut s = BlockReadStreamer::new(fixture());
282 let err = s.seek(SeekFrom::Current(-1)).unwrap_err();
283 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
284
285 let err = s.seek(SeekFrom::End(-99_999)).unwrap_err();
286 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
287 }
288
289 #[test]
290 fn works_through_arc_dyn_blockread() {
291 let dev: Arc<dyn BlockRead> = Arc::new(fixture());
292 let mut s = BlockReadStreamer::new(dev);
293 let mut out = Vec::new();
294 s.read_to_end(&mut out).unwrap();
295 assert_eq!(out.len(), 32);
296 }
297
298 #[test]
299 fn works_through_borrowed_reference() {
300 let dev = fixture();
301 {
302 let mut s = BlockReadStreamer::new(&dev as &dyn BlockRead);
303 let mut buf = [0u8; 8];
304 s.read_exact(&mut buf).unwrap();
305 assert_eq!(buf, [0, 1, 2, 3, 4, 5, 6, 7]);
306 }
307 assert_eq!(dev.size_bytes(), 32);
309 }
310
311 #[test]
312 fn into_inner_returns_wrapped_device() {
313 let s = BlockReadStreamer::new(fixture());
314 let inner = s.into_inner();
315 assert_eq!(inner.size_bytes(), 32);
316 }
317
318 #[test]
319 fn get_ref_exposes_inner_without_consuming() {
320 let s = BlockReadStreamer::new(fixture());
321 assert_eq!(s.get_ref().size_bytes(), 32);
322 assert_eq!(s.position(), 0);
324 }
325
326 #[test]
327 fn error_from_inner_propagates_as_io_error() {
328 let mut s = BlockReadStreamer::new(AlwaysFails);
329 let mut buf = [0u8; 8];
330 let err = s.read(&mut buf).unwrap_err();
331 assert_eq!(err.kind(), io::ErrorKind::Other);
333 assert!(err.to_string().contains("simulated failure"));
334 }
335
336 #[test]
337 fn offset_from_handles_add_sub_and_overflow() {
338 assert_eq!(offset_from(10, 5).unwrap(), 15);
339 assert_eq!(offset_from(10, -4).unwrap(), 6);
340 assert_eq!(offset_from(10, 0).unwrap(), 10);
341
342 let err = offset_from(u64::MAX, 1).unwrap_err();
344 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
345
346 let err = offset_from(3, -4).unwrap_err();
348 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
349 }
350
351 #[test]
352 fn error_mapping_covers_every_variant() {
353 let io_err = fs_core_error_to_io(Error::Io(io::Error::new(
355 io::ErrorKind::NotFound,
356 "missing",
357 )));
358 assert_eq!(io_err.kind(), io::ErrorKind::NotFound);
359
360 let sr = fs_core_error_to_io(Error::ShortRead {
361 offset: 4,
362 want: 8,
363 got: 2,
364 });
365 assert_eq!(sr.kind(), io::ErrorKind::UnexpectedEof);
366 assert!(sr.to_string().contains("short read"));
367
368 let oob = fs_core_error_to_io(Error::OutOfBounds {
369 offset: 16,
370 len: 4,
371 size: 8,
372 });
373 assert_eq!(oob.kind(), io::ErrorKind::UnexpectedEof);
374
375 let ro = fs_core_error_to_io(Error::ReadOnly);
376 assert_eq!(ro.kind(), io::ErrorKind::PermissionDenied);
377
378 let custom = fs_core_error_to_io(Error::Custom("boom".into()));
379 assert_eq!(custom.kind(), io::ErrorKind::Other);
380 assert!(custom.to_string().contains("boom"));
381 }
382}