Skip to main content

ntfs_core/usn/
reader.rs

1//! Streaming iterator over USN journal records.
2//!
3//! For multi-GB `$UsnJrnl:$J` journals where loading everything into memory is
4//! impractical, [`UsnJournalReader`] walks a `Read + Seek` source in 64 `KiB`
5//! windows, skipping the zero-filled gaps the change journal leaves behind and
6//! decoding each `USN_RECORD_V2`/`V3` it finds via the `crate::usn` parsers.
7
8use std::io::{Read, Seek, SeekFrom};
9
10use crate::error::Result;
11use crate::usn::{parse_usn_record_v2, parse_usn_record_v3, UsnRecord};
12
13const BUF_SIZE: usize = 64 * 1024; // 64KB read buffer
14
15/// Reads a little-endian `u32` at `offset`, yielding 0 if out of bounds.
16fn read_u32_le(data: &[u8], offset: usize) -> u32 {
17    let mut b = [0u8; 4];
18    if let Some(s) = data.get(offset..offset + 4) {
19        b.copy_from_slice(s);
20    }
21    u32::from_le_bytes(b)
22}
23
24/// Reads a little-endian `u16` at `offset`, yielding 0 if out of bounds.
25fn read_u16_le(data: &[u8], offset: usize) -> u16 {
26    let mut b = [0u8; 2];
27    if let Some(s) = data.get(offset..offset + 2) {
28        b.copy_from_slice(s);
29    }
30    u16::from_le_bytes(b)
31}
32
33/// Streaming iterator over USN records from a reader.
34///
35/// For multi-GB journals where loading everything into memory is impractical.
36pub struct UsnJournalReader<R: Read + Seek> {
37    reader: R,
38    buf: Vec<u8>,
39    buf_len: usize,
40    buf_offset: usize,
41    stream_pos: u64,
42    total_size: u64,
43    done: bool,
44}
45
46impl<R: Read + Seek> UsnJournalReader<R> {
47    /// Creates a streaming reader, recording the source's total length.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`crate::error::NtfsError::Io`] if the initial seeks fail.
52    pub fn new(mut reader: R) -> Result<Self> {
53        let total_size = reader.seek(SeekFrom::End(0))?;
54        reader.seek(SeekFrom::Start(0))?;
55
56        Ok(Self {
57            reader,
58            buf: vec![0u8; BUF_SIZE],
59            buf_len: 0,
60            buf_offset: 0,
61            stream_pos: 0,
62            total_size,
63            done: false,
64        })
65    }
66
67    fn fill_buffer(&mut self) -> Result<bool> {
68        if self.stream_pos >= self.total_size {
69            self.done = true;
70            return Ok(false);
71        }
72
73        // Move unconsumed data to front
74        if self.buf_offset > 0 && self.buf_offset < self.buf_len {
75            let remaining = self.buf_len - self.buf_offset;
76            self.buf.copy_within(self.buf_offset..self.buf_len, 0);
77            self.buf_len = remaining;
78        } else {
79            self.buf_len = 0;
80        }
81        self.buf_offset = 0;
82
83        // Read more data
84        let space = BUF_SIZE - self.buf_len;
85        if space > 0 {
86            let Some(dst) = self.buf.get_mut(self.buf_len..self.buf_len + space) else {
87                self.done = true; // cov:unreachable: buf is a fixed BUF_SIZE vec and space = BUF_SIZE - buf_len ⇒ buf_len + space == BUF_SIZE, an always-valid range, so get_mut is always Some
88                return Ok(self.buf_len > 0); // cov:unreachable: see above — get_mut never returns None
89            };
90            let n = self.reader.read(dst)?;
91            if n == 0 {
92                self.done = true; // cov:unreachable: the stream_pos >= total_size early return dominates ⇒ read is never called past EOF, so a well-formed Read+Seek source never returns n == 0 here
93                return Ok(self.buf_len > 0); // cov:unreachable: see above — n == 0 branch not taken for a source whose seek(End) length matches readable bytes
94            }
95            self.buf_len += n;
96            self.stream_pos += n as u64;
97        } // cov:unreachable: records are capped at 65536 == BUF_SIZE, so a single fill always makes progress and the space == 0 (buffer-full) re-entry that skips the read block is never taken
98
99        Ok(true)
100    }
101
102    fn skip_zeros(&mut self) -> Result<bool> {
103        loop {
104            while self.buf_offset + 8 <= self.buf_len {
105                match self.buf.get(self.buf_offset..self.buf_offset + 8) {
106                    Some([0, 0, 0, 0, 0, 0, 0, 0]) => self.buf_offset += 8,
107                    _ => return Ok(true),
108                }
109            }
110            if !self.fill_buffer()? {
111                return Ok(false);
112            }
113            if self.buf_len == 0 {
114                return Ok(false); // cov:unreachable: fill_buffer returns Ok(true) only after buf_len becomes > 0, so once the `if !fill_buffer()?` check above passes buf_len is never 0
115            }
116        }
117    }
118}
119
120impl<R: Read + Seek> Iterator for UsnJournalReader<R> {
121    type Item = Result<UsnRecord>;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        if self.done {
125            return None;
126        }
127
128        // Ensure we have data
129        if self.buf_offset >= self.buf_len {
130            match self.fill_buffer() {
131                Ok(true) => {}
132                Ok(false) => return None,
133                Err(e) => return Some(Err(e)),
134            }
135        }
136
137        // Skip zero-filled regions
138        match self.skip_zeros() {
139            Ok(true) => {}
140            Ok(false) => return None,
141            Err(e) => return Some(Err(e)),
142        }
143
144        // Need at least 8 bytes for record length + version
145        if self.buf_offset + 8 > self.buf_len {
146            match self.fill_buffer() /* cov:unreachable: skip_zeros returns Ok(true) only from its inner arm where buf_offset + 8 <= buf_len holds, so this header-refill block is never entered */ {
147                Ok(true) if self.buf_offset + 8 <= self.buf_len => {} // cov:unreachable: see above — the enclosing if is never entered
148                _ => return None, // cov:unreachable: see above — the enclosing if is never entered
149            }
150        }
151
152        let record_len = read_u32_le(&self.buf, self.buf_offset) as usize;
153
154        if !(8..=65536).contains(&record_len) {
155            self.buf_offset += 8;
156            return self.next();
157        }
158
159        // Ensure we have the full record in buffer
160        if self.buf_offset + record_len > self.buf_len {
161            match self.fill_buffer() {
162                Ok(true) if self.buf_offset + record_len <= self.buf_len => {}
163                _ => {
164                    self.buf_offset += 8;
165                    return self.next();
166                }
167            }
168        }
169
170        let version = read_u16_le(&self.buf, self.buf_offset + 4);
171
172        let Some(record_data) = self.buf.get(self.buf_offset..self.buf_offset + record_len) else {
173            self.buf_offset += 8; // cov:unreachable: the buf_offset + record_len <= buf_len (<= BUF_SIZE == buf.len()) check above dominates, so this get is always Some
174            return self.next(); // cov:unreachable: see above — the None branch is never taken
175        };
176        let record_data = record_data.to_vec();
177        let aligned = (record_len + 7) & !7;
178        self.buf_offset += aligned;
179
180        match version {
181            2 => match parse_usn_record_v2(&record_data) {
182                Ok(r) => Some(Ok(r)),
183                Err(_) => self.next(),
184            },
185            3 => match parse_usn_record_v3(&record_data) {
186                Ok(r) => Some(Ok(r)),
187                Err(_) => self.next(),
188            },
189            _ => self.next(),
190        }
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::usn::UsnReason;
198    use std::io::Cursor;
199
200    fn build_v2_record_bytes(
201        entry: u64,
202        seq: u16,
203        parent: u64,
204        parent_seq: u16,
205        reason: u32,
206        name: &str,
207    ) -> Vec<u8> {
208        let name_utf16: Vec<u16> = name.encode_utf16().collect();
209        let name_bytes_len = name_utf16.len() * 2;
210        let record_len = 0x3C + name_bytes_len;
211        let aligned_len = (record_len + 7) & !7;
212        let mut buf = vec![0u8; aligned_len];
213        buf[0..4].copy_from_slice(&(record_len as u32).to_le_bytes());
214        buf[4..6].copy_from_slice(&2u16.to_le_bytes());
215        let file_ref = entry | (u64::from(seq) << 48);
216        buf[0x08..0x10].copy_from_slice(&file_ref.to_le_bytes());
217        let parent_ref = parent | (u64::from(parent_seq) << 48);
218        buf[0x10..0x18].copy_from_slice(&parent_ref.to_le_bytes());
219        buf[0x18..0x20].copy_from_slice(&100i64.to_le_bytes());
220        let ts: i64 = 133_500_480_000_000_000;
221        buf[0x20..0x28].copy_from_slice(&ts.to_le_bytes());
222        buf[0x28..0x2C].copy_from_slice(&reason.to_le_bytes());
223        buf[0x34..0x38].copy_from_slice(&0x20u32.to_le_bytes());
224        buf[0x38..0x3A].copy_from_slice(&(name_bytes_len as u16).to_le_bytes());
225        buf[0x3A..0x3C].copy_from_slice(&0x3Cu16.to_le_bytes());
226        for (i, &ch) in name_utf16.iter().enumerate() {
227            let off = 0x3C + i * 2;
228            buf[off..off + 2].copy_from_slice(&ch.to_le_bytes());
229        }
230        buf
231    }
232
233    #[test]
234    fn test_streaming_reader_basic() {
235        let r = build_v2_record_bytes(100, 1, 5, 5, 0x100, "test.txt");
236        let cursor = Cursor::new(r);
237        let reader = UsnJournalReader::new(cursor).unwrap();
238        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
239        assert_eq!(records.len(), 1);
240        assert_eq!(records[0].filename, "test.txt");
241    }
242
243    #[test]
244    fn test_streaming_reader_skips_zeros() {
245        let mut data = vec![0u8; 4096];
246        data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "found.txt"));
247        let cursor = Cursor::new(data);
248        let reader = UsnJournalReader::new(cursor).unwrap();
249        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
250        assert_eq!(records.len(), 1);
251        assert_eq!(records[0].filename, "found.txt");
252    }
253
254    #[test]
255    fn test_streaming_reader_multiple() {
256        let mut data = Vec::new();
257        data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "a.txt"));
258        data.extend_from_slice(&build_v2_record_bytes(200, 1, 100, 1, 0x200, "b.txt"));
259        data.extend_from_slice(&build_v2_record_bytes(300, 1, 100, 1, 0x100, "c.txt"));
260        let cursor = Cursor::new(data);
261        let reader = UsnJournalReader::new(cursor).unwrap();
262        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
263        assert_eq!(records.len(), 3);
264    }
265
266    #[test]
267    fn test_streaming_reader_empty_data() {
268        let cursor = Cursor::new(Vec::<u8>::new());
269        let reader = UsnJournalReader::new(cursor).unwrap();
270        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
271        assert_eq!(records.len(), 0);
272    }
273
274    #[test]
275    fn test_streaming_reader_all_zeros() {
276        let data = vec![0u8; 4096];
277        let cursor = Cursor::new(data);
278        let reader = UsnJournalReader::new(cursor).unwrap();
279        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
280        assert_eq!(records.len(), 0);
281    }
282
283    #[test]
284    fn test_streaming_reader_includes_close_only() {
285        let data = build_v2_record_bytes(100, 1, 5, 5, 0x8000_0000, "closed.txt");
286        let cursor = Cursor::new(data);
287        let reader = UsnJournalReader::new(cursor).unwrap();
288        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
289        assert_eq!(records.len(), 1);
290        assert_eq!(records[0].reason, UsnReason::CLOSE);
291    }
292
293    #[test]
294    fn test_streaming_reader_invalid_record_length() {
295        // Record with invalid length (too small) should be skipped
296        let mut data = vec![0u8; 64];
297        data[0..4].copy_from_slice(&3u32.to_le_bytes()); // length < 8
298        data[4..6].copy_from_slice(&2u16.to_le_bytes());
299        // Rest is zeros, reader will skip
300
301        let cursor = Cursor::new(data);
302        let reader = UsnJournalReader::new(cursor).unwrap();
303        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
304        assert_eq!(records.len(), 0);
305    }
306
307    #[test]
308    fn test_streaming_reader_invalid_then_valid() {
309        let mut data = vec![0u8; 16]; // some garbage that looks non-zero
310        data[0..4].copy_from_slice(&5u32.to_le_bytes()); // invalid length
311        data[4..6].copy_from_slice(&99u16.to_le_bytes()); // invalid version
312                                                          // Pad to 8-byte boundary for skipping
313        data.resize(16, 0);
314        // Now add zeros then a valid record
315        data.extend_from_slice(&[0u8; 64]);
316        data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "valid.txt"));
317
318        let cursor = Cursor::new(data);
319        let reader = UsnJournalReader::new(cursor).unwrap();
320        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
321        assert_eq!(records.len(), 1);
322        assert_eq!(records[0].filename, "valid.txt");
323    }
324
325    #[test]
326    fn test_streaming_reader_unknown_version() {
327        // Record with valid length but unknown version
328        let mut data = vec![0u8; 0x40];
329        data[0..4].copy_from_slice(&(0x40u32).to_le_bytes());
330        data[4..6].copy_from_slice(&99u16.to_le_bytes()); // version 99
331
332        let cursor = Cursor::new(data);
333        let reader = UsnJournalReader::new(cursor).unwrap();
334        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
335        assert_eq!(records.len(), 0);
336    }
337
338    fn build_v3_record_bytes(entry: u64, parent: u64, reason: u32, name: &str) -> Vec<u8> {
339        let name_utf16: Vec<u16> = name.encode_utf16().collect();
340        let name_bytes_len = name_utf16.len() * 2;
341        let record_len = 0x4C + name_bytes_len;
342        let aligned_len = (record_len + 7) & !7;
343        let mut buf = vec![0u8; aligned_len];
344
345        buf[0..4].copy_from_slice(&(record_len as u32).to_le_bytes());
346        buf[4..6].copy_from_slice(&3u16.to_le_bytes());
347        buf[6..8].copy_from_slice(&0u16.to_le_bytes());
348        buf[0x08..0x18].copy_from_slice(&u128::from(entry).to_le_bytes());
349        buf[0x18..0x28].copy_from_slice(&u128::from(parent).to_le_bytes());
350        buf[0x28..0x30].copy_from_slice(&200i64.to_le_bytes());
351        let ts: i64 = 133_500_480_000_000_000;
352        buf[0x30..0x38].copy_from_slice(&ts.to_le_bytes());
353        buf[0x38..0x3C].copy_from_slice(&reason.to_le_bytes());
354        buf[0x44..0x48].copy_from_slice(&0x20u32.to_le_bytes());
355        buf[0x48..0x4A].copy_from_slice(&(name_bytes_len as u16).to_le_bytes());
356        buf[0x4A..0x4C].copy_from_slice(&0x4Cu16.to_le_bytes());
357        for (i, &ch) in name_utf16.iter().enumerate() {
358            let off = 0x4C + i * 2;
359            buf[off..off + 2].copy_from_slice(&ch.to_le_bytes());
360        }
361        buf
362    }
363
364    #[test]
365    fn test_streaming_reader_v3_record() {
366        let data = build_v3_record_bytes(100, 5, 0x100, "v3file.txt");
367        let cursor = Cursor::new(data);
368        let reader = UsnJournalReader::new(cursor).unwrap();
369        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
370        assert_eq!(records.len(), 1);
371        assert_eq!(records[0].filename, "v3file.txt");
372        assert_eq!(records[0].major_version, 3);
373    }
374
375    #[test]
376    fn test_streaming_reader_v3_close_only_included() {
377        let data = build_v3_record_bytes(100, 5, 0x8000_0000, "closed_v3.txt");
378        let cursor = Cursor::new(data);
379        let reader = UsnJournalReader::new(cursor).unwrap();
380        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
381        assert_eq!(records.len(), 1);
382        assert_eq!(records[0].reason, UsnReason::CLOSE);
383    }
384
385    #[test]
386    fn test_streaming_reader_large_zero_gap() {
387        // Large zero region followed by a valid record
388        let mut data = vec![0u8; 128 * 1024]; // 128KB of zeros (larger than buffer)
389        data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "deep.txt"));
390
391        let cursor = Cursor::new(data);
392        let reader = UsnJournalReader::new(cursor).unwrap();
393        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
394        assert_eq!(records.len(), 1);
395        assert_eq!(records[0].filename, "deep.txt");
396    }
397
398    #[test]
399    fn test_streaming_reader_record_larger_than_initial_buffer_fill() {
400        // Record at offset 0 where the buffer needs to be filled
401        let record = build_v2_record_bytes(42, 3, 5, 5, 0x100, "buffer_test.txt");
402        let cursor = Cursor::new(record);
403        let reader = UsnJournalReader::new(cursor).unwrap();
404        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
405        assert_eq!(records.len(), 1);
406        assert_eq!(records[0].mft_entry, 42);
407        assert_eq!(records[0].mft_sequence, 3);
408    }
409
410    #[test]
411    fn test_streaming_reader_record_too_large() {
412        // A record that claims to be 65537 bytes (> 65536 max) should be skipped
413        let mut data = vec![0u8; 128];
414        data[0..4].copy_from_slice(&(65537u32).to_le_bytes());
415        data[4..6].copy_from_slice(&2u16.to_le_bytes());
416
417        let cursor = Cursor::new(data);
418        let reader = UsnJournalReader::new(cursor).unwrap();
419        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
420        assert_eq!(records.len(), 0);
421    }
422
423    #[test]
424    fn test_streaming_reader_mixed_v2_v3() {
425        let mut data = Vec::new();
426        data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "v2.txt"));
427        data.extend_from_slice(&build_v3_record_bytes(200, 5, 0x200, "v3.txt"));
428        data.extend_from_slice(&build_v2_record_bytes(300, 1, 5, 5, 0x100, "v2b.txt"));
429
430        let cursor = Cursor::new(data);
431        let reader = UsnJournalReader::new(cursor).unwrap();
432        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
433        assert_eq!(records.len(), 3);
434        assert_eq!(records[0].major_version, 2);
435        assert_eq!(records[1].major_version, 3);
436        assert_eq!(records[2].major_version, 2);
437    }
438
439    #[test]
440    fn test_streaming_reader_fill_buffer_with_unconsumed_data() {
441        // Create data that spans multiple buffer fills.
442        // First, fill most of the 64KB buffer with valid records, then add
443        // a record that straddles the buffer boundary.
444        // This triggers the fill_buffer path where buf_offset > 0 && buf_offset < buf_len,
445        // meaning unconsumed data needs to be moved to front of buffer.
446        let mut data = Vec::new();
447        let record_size;
448        {
449            let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "sample.txt");
450            record_size = sample.len();
451        }
452
453        // Fill just under 64KB with records, then add zeros, then another record
454        // that will require a buffer refill with leftover data
455        let num_records_to_fill = (BUF_SIZE - record_size) / record_size;
456        for i in 0..num_records_to_fill {
457            data.extend_from_slice(&build_v2_record_bytes(
458                (i + 1) as u64,
459                1,
460                5,
461                5,
462                0x100,
463                &format!("f{i:04}.txt"),
464            ));
465        }
466
467        // Add a few zeros to push the next record across the buffer boundary
468        let remaining = BUF_SIZE - (num_records_to_fill * record_size);
469        if remaining > 0 && remaining < record_size {
470            data.extend_from_slice(&vec![0u8; remaining]); // cov:unreachable: for this fixture's record_size, BUF_SIZE divides evenly so remaining == 0 and the padding branch is never taken
471        }
472
473        // Add more records after the boundary
474        for i in 0..5 {
475            data.extend_from_slice(&build_v2_record_bytes(
476                (num_records_to_fill + i + 1) as u64,
477                1,
478                5,
479                5,
480                0x100,
481                &format!("after{i}.txt"),
482            ));
483        }
484
485        let cursor = Cursor::new(data);
486        let reader = UsnJournalReader::new(cursor).unwrap();
487        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
488        // Should find all records from both sides of the buffer boundary
489        assert!(records.len() >= num_records_to_fill + 5);
490    }
491
492    #[test]
493    fn test_streaming_reader_record_at_exact_buffer_boundary() {
494        // Place records such that one record ends exactly at the buffer boundary
495        // and the next starts exactly at the next fill.
496        let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "sample.txt");
497        let record_size = sample.len();
498
499        let mut data = Vec::new();
500        // Calculate how many records fit exactly in the buffer
501        let records_per_buffer = BUF_SIZE / record_size;
502        let exact_fill = records_per_buffer * record_size;
503
504        // Fill exactly to the buffer size
505        for i in 0..records_per_buffer {
506            data.extend_from_slice(&build_v2_record_bytes(
507                (i + 1) as u64,
508                1,
509                5,
510                5,
511                0x100,
512                "exact.txt",
513            ));
514        }
515
516        // Pad to exactly BUF_SIZE if needed
517        if exact_fill < BUF_SIZE {
518            data.extend_from_slice(&vec![0u8; BUF_SIZE - exact_fill]);
519        }
520
521        // Add one more record that starts at the exact boundary
522        data.extend_from_slice(&build_v2_record_bytes(
523            (records_per_buffer + 1) as u64,
524            1,
525            5,
526            5,
527            0x100,
528            "boundary.txt",
529        ));
530
531        let cursor = Cursor::new(data);
532        let reader = UsnJournalReader::new(cursor).unwrap();
533        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
534        // The last record "boundary.txt" should be found
535        assert!(records.iter().any(|r| r.filename == "boundary.txt"));
536    }
537
538    #[test]
539    fn test_streaming_reader_record_straddles_buffer() {
540        // Create data where a record starts in one buffer fill and extends
541        // into the next fill. This tests the refill path where
542        // buf_offset + record_len > buf_len triggers fill_buffer.
543        let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "sample.txt");
544        let record_size = sample.len();
545
546        let mut data = Vec::new();
547        // Fill most of the buffer
548        let records_to_fill = (BUF_SIZE / record_size) - 1;
549        for i in 0..records_to_fill {
550            data.extend_from_slice(&build_v2_record_bytes(
551                (i + 1) as u64,
552                1,
553                5,
554                5,
555                0x100,
556                "fill.txt",
557            ));
558        }
559
560        let current_len = data.len();
561        // Add zeros to position us near the end of the buffer
562        // Leave less than record_size bytes before the boundary
563        let padding = BUF_SIZE - current_len - (record_size / 2);
564        if padding > 0 {
565            data.extend_from_slice(&vec![0u8; padding]);
566        }
567
568        // Now add a record that will straddle the buffer boundary
569        data.extend_from_slice(&build_v2_record_bytes(999, 1, 5, 5, 0x100, "straddle.txt"));
570
571        // Add trailing data
572        data.extend_from_slice(&vec![0u8; 256]);
573
574        let cursor = Cursor::new(data);
575        let reader = UsnJournalReader::new(cursor).unwrap();
576        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
577        // The straddling record should be found
578        assert!(records.iter().any(|r| r.filename == "straddle.txt"));
579    }
580
581    #[test]
582    fn test_streaming_reader_data_larger_than_buffer() {
583        // Create data significantly larger than the 64KB buffer to ensure
584        // multiple fill_buffer cycles work correctly
585        let mut data = Vec::new();
586        let total_records = 2000; // Each ~80 bytes = ~160KB > 64KB buffer
587        for i in 0..total_records {
588            data.extend_from_slice(&build_v2_record_bytes(
589                (i + 1) as u64,
590                1,
591                5,
592                5,
593                0x100,
594                &format!("r{i:04}.txt"),
595            ));
596        }
597
598        let cursor = Cursor::new(data);
599        let reader = UsnJournalReader::new(cursor).unwrap();
600        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
601        assert_eq!(records.len(), total_records);
602    }
603
604    // ─── Coverage tests for uncovered lines ────────────────────────────
605
606    /// A reader that yields data from an inner buffer, but returns an IO error
607    /// after a configurable number of successful reads.
608    struct ErrorAfterNReads {
609        data: Cursor<Vec<u8>>,
610        reads_remaining: usize,
611    }
612
613    impl ErrorAfterNReads {
614        fn new(data: Vec<u8>, successful_reads: usize) -> Self {
615            Self {
616                data: Cursor::new(data),
617                reads_remaining: successful_reads,
618            }
619        }
620    }
621
622    impl Read for ErrorAfterNReads {
623        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
624            if self.reads_remaining == 0 {
625                return Err(std::io::Error::other("simulated read error"));
626            }
627            self.reads_remaining -= 1;
628            self.data.read(buf)
629        }
630    }
631
632    impl Seek for ErrorAfterNReads {
633        fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
634            self.data.seek(pos)
635        }
636    }
637
638    /// A reader that returns data in tiny chunks, simulating slow/partial reads.
639    /// After all data is consumed, `read()` returns 0 (EOF), which triggers
640    /// lines 61-62 (self.done = true; return `Ok(self.buf_len` > 0)).
641    struct TinyChunkReader {
642        data: Vec<u8>,
643        pos: u64,
644        chunk_size: usize,
645    }
646
647    impl TinyChunkReader {
648        fn new(data: Vec<u8>, chunk_size: usize) -> Self {
649            Self {
650                data,
651                pos: 0,
652                chunk_size,
653            }
654        }
655    }
656
657    impl Read for TinyChunkReader {
658        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
659            let remaining = self.data.len() - self.pos as usize;
660            if remaining == 0 {
661                return Ok(0); // cov:unreachable: UsnJournalReader::fill_buffer stops at stream_pos >= total_size before reading past EOF, so read() is never called with the data exhausted
662            }
663            let to_read = buf.len().min(self.chunk_size).min(remaining);
664            let start = self.pos as usize;
665            buf[..to_read].copy_from_slice(&self.data[start..start + to_read]);
666            self.pos += to_read as u64;
667            Ok(to_read)
668        }
669    }
670
671    impl Seek for TinyChunkReader {
672        fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
673            match pos {
674                SeekFrom::Start(n) => self.pos = n,
675                SeekFrom::End(n) => self.pos = (self.data.len() as i64 + n) as u64,
676                SeekFrom::Current(n) => self.pos = (self.pos as i64 + n) as u64, // cov:unreachable: UsnJournalReader only issues SeekFrom::End(0) and SeekFrom::Start(0), never Current, so this arm is never taken
677            }
678            Ok(self.pos)
679        }
680    }
681
682    #[test]
683    fn test_streaming_reader_done_flag_returns_none() {
684        // Covers line 95: if self.done { return None; }
685        // After consuming all records, the reader sets done=true.
686        // The next call to next() should immediately return None.
687        let record = build_v2_record_bytes(100, 1, 5, 5, 0x100, "done.txt");
688        let cursor = Cursor::new(record);
689        let mut reader = UsnJournalReader::new(cursor).unwrap();
690
691        // Consume the one record
692        let first = reader.next();
693        assert!(first.is_some());
694        assert!(first.unwrap().is_ok());
695
696        // Now done should be set, and next() returns None (line 95)
697        let second = reader.next();
698        assert!(second.is_none());
699
700        // Call again to confirm it stays None
701        let third = reader.next();
702        assert!(third.is_none());
703    }
704
705    #[test]
706    fn test_streaming_reader_fill_buffer_error_propagation() {
707        // Covers line 103: Err(e) => return Some(Err(e))
708        // The first fill_buffer call in next() triggers an IO error.
709        // We use ErrorAfterNReads with 1 successful read (for the constructor's
710        // seek operations) and then fail on the actual data read.
711        let record = build_v2_record_bytes(100, 1, 5, 5, 0x100, "err.txt");
712        let err_reader = ErrorAfterNReads::new(record, 0);
713        let mut reader = UsnJournalReader::new(err_reader).unwrap();
714
715        // The first next() call will try fill_buffer, which calls self.reader.read()
716        // That read will fail, propagating the error via line 103
717        let result = reader.next();
718        assert!(result.is_some());
719        let err = result.unwrap();
720        assert!(err.is_err());
721        assert!(err
722            .unwrap_err()
723            .to_string()
724            .contains("simulated read error"));
725    }
726
727    #[test]
728    fn test_streaming_reader_skip_zeros_error_propagation() {
729        // Covers line 111: Err(e) => return Some(Err(e)) in skip_zeros match
730        // We need data that starts with zeros (so skip_zeros is called and
731        // needs to fill_buffer), then the fill_buffer inside skip_zeros fails.
732        let mut data = vec![0u8; BUF_SIZE]; // Full buffer of zeros
733        data.extend_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]); // non-zero to prevent early EOF
734
735        // Allow 1 successful read (fills the zero buffer), then error on the
736        // refill inside skip_zeros
737        let err_reader = ErrorAfterNReads::new(data, 1);
738        let mut reader = UsnJournalReader::new(err_reader).unwrap();
739
740        let result = reader.next();
741        assert!(result.is_some());
742        let err = result.unwrap();
743        assert!(err.is_err());
744    }
745
746    #[test]
747    fn test_streaming_reader_eof_mid_fill_with_remaining_data() {
748        // Covers lines 61-62: self.done = true; return Ok(self.buf_len > 0)
749        // We need a reader where read() returns 0 (EOF) while there's still
750        // unconsumed data in the buffer. Using TinyChunkReader that returns
751        // small chunks and eventually returns 0.
752        let record = build_v2_record_bytes(42, 1, 5, 5, 0x100, "tiny.txt");
753        let data_len = record.len();
754        // Use a chunk size smaller than BUF_SIZE so multiple reads are needed
755        // to fill the buffer. After the data is exhausted, read returns 0.
756        let tiny_reader = TinyChunkReader::new(record, data_len);
757        let mut reader = UsnJournalReader::new(tiny_reader).unwrap();
758
759        let result = reader.next();
760        assert!(result.is_some());
761        let rec = result.unwrap().unwrap();
762        assert_eq!(rec.filename, "tiny.txt");
763    }
764
765    #[test]
766    fn test_streaming_reader_eof_mid_fill_no_remaining_data() {
767        // Also covers lines 61-62 with buf_len == 0 (returns Ok(false))
768        // An empty reader where read() immediately returns 0
769        let tiny_reader = TinyChunkReader::new(Vec::new(), 1);
770        let mut reader = UsnJournalReader::new(tiny_reader).unwrap();
771
772        let result = reader.next();
773        assert!(result.is_none());
774    }
775
776    #[test]
777    fn test_streaming_reader_header_refill_insufficient() {
778        // Covers lines 116-118: fill_buffer for header but still < 8 bytes
779        // We need a situation where after skip_zeros, buf_offset + 8 > buf_len,
780        // and fill_buffer can't provide enough data.
781        // Create data: zeros (to fill most of buffer), then 4 non-zero bytes at the
782        // very end. After skip_zeros consumes the zeros, we have <8 bytes of non-zero
783        // data, and fill_buffer returns Ok(true) but buf_offset + 8 > buf_len.
784        let mut data = vec![0u8; BUF_SIZE - 4]; // zeros filling most of buffer
785        data.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]); // 4 non-zero bytes at end
786
787        let cursor = Cursor::new(data);
788        let mut reader = UsnJournalReader::new(cursor).unwrap();
789
790        // skip_zeros will skip all the zeros and land at the 4 non-zero bytes.
791        // buf_offset + 8 > buf_len, fill_buffer is called but there's no more data.
792        // Lines 116-118 trigger the `_ => return None` path.
793        let result = reader.next();
794        assert!(result.is_none());
795    }
796
797    #[test]
798    fn test_streaming_reader_record_refill_insufficient() {
799        // Covers lines 138-140: fill_buffer for full record fails
800        // We need a record header that claims a large record_len, but the data
801        // is truncated so fill_buffer can't provide the full record.
802        let mut data = vec![0u8; 16];
803        // Write a record header claiming 1024 bytes, version 2
804        data[0..4].copy_from_slice(&(1024u32).to_le_bytes());
805        data[4..6].copy_from_slice(&2u16.to_le_bytes());
806        // But we only have 16 bytes total - fill_buffer can't get 1024 bytes.
807        // After failing, lines 138-140 skip 8 bytes and try next().
808
809        let cursor = Cursor::new(data);
810        let mut reader = UsnJournalReader::new(cursor).unwrap();
811
812        let result = reader.next();
813        assert!(result.is_none());
814    }
815
816    #[test]
817    fn test_streaming_reader_v2_parse_error_skips() {
818        // Covers line 155: Err(_) => self.next() for V2 parse error
819        // Create a record with valid length and version=2 but invalid internal data
820        // that causes parse_usn_record_v2 to fail, followed by a valid record.
821        let mut data = Vec::new();
822
823        // A record with record_len=0x20 (32 bytes) and version=2.
824        // The reader accepts record_len in 8..=65536, so 0x20 passes.
825        // But parse_usn_record_v2 requires record_len >= USN_V2_MIN_SIZE (0x3C=60),
826        // so 0x20 < 0x3C causes the parser to return Err, triggering line 155.
827        let mut bad_v2 = vec![0u8; 0x20]; // 32 bytes
828        bad_v2[0..4].copy_from_slice(&(0x20u32).to_le_bytes()); // record_len = 32
829        bad_v2[4..6].copy_from_slice(&2u16.to_le_bytes()); // version 2
830                                                           // Parser will fail: 0x20 < USN_V2_MIN_SIZE (0x3C)
831        data.extend_from_slice(&bad_v2);
832
833        // Second: a valid record that should be parsed
834        data.extend_from_slice(&build_v2_record_bytes(
835            100,
836            1,
837            5,
838            5,
839            0x100,
840            "after_bad_v2.txt",
841        ));
842
843        let cursor = Cursor::new(data);
844        let reader = UsnJournalReader::new(cursor).unwrap();
845        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
846
847        assert_eq!(records.len(), 1);
848        assert_eq!(records[0].filename, "after_bad_v2.txt");
849    }
850
851    #[test]
852    fn test_streaming_reader_v3_parse_error_skips() {
853        // Covers line 159: Err(_) => self.next() for V3 parse error
854        // Same approach: record_len that passes reader check (8..=65536)
855        // but fails parser check (< USN_V3_MIN_SIZE = 0x4C = 76)
856        let mut data = Vec::new();
857
858        // Bad V3: record_len = 0x20 (32), version 3 -> parser fails (32 < 76)
859        let mut bad_v3 = vec![0u8; 0x20];
860        bad_v3[0..4].copy_from_slice(&(0x20u32).to_le_bytes());
861        bad_v3[4..6].copy_from_slice(&3u16.to_le_bytes()); // version 3
862        data.extend_from_slice(&bad_v3);
863
864        // Valid V2 record after
865        data.extend_from_slice(&build_v2_record_bytes(
866            200,
867            1,
868            5,
869            5,
870            0x200,
871            "after_bad_v3.txt",
872        ));
873
874        let cursor = Cursor::new(data);
875        let reader = UsnJournalReader::new(cursor).unwrap();
876        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
877
878        assert_eq!(records.len(), 1);
879        assert_eq!(records[0].filename, "after_bad_v3.txt");
880    }
881
882    #[test]
883    fn test_streaming_reader_skip_zeros_refill_then_find_data() {
884        // Covers line 72 (outer loop re-entry in skip_zeros) and line 84 (buf_len==0)
885        // Create data with more zeros than one buffer can hold, followed by a valid record.
886        // This forces skip_zeros to call fill_buffer multiple times via the outer loop.
887        let mut data = vec![0u8; BUF_SIZE * 2 + 512]; // >2 buffer fills of zeros
888        data.extend_from_slice(&build_v2_record_bytes(
889            100,
890            1,
891            5,
892            5,
893            0x100,
894            "after_many_zeros.txt",
895        ));
896
897        let cursor = Cursor::new(data);
898        let reader = UsnJournalReader::new(cursor).unwrap();
899        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
900
901        assert_eq!(records.len(), 1);
902        assert_eq!(records[0].filename, "after_many_zeros.txt");
903    }
904
905    #[test]
906    fn test_streaming_reader_skip_zeros_all_zeros_eof() {
907        // Covers line 84: return Ok(false) when buf_len == 0 after fill_buffer
908        // All data is zeros. After fill_buffer returns Ok(false) or buf_len drops to 0,
909        // skip_zeros returns Ok(false) via line 84.
910        let data = vec![0u8; BUF_SIZE + 100]; // slightly more than one buffer of zeros
911
912        let cursor = Cursor::new(data);
913        let reader = UsnJournalReader::new(cursor).unwrap();
914        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
915
916        assert_eq!(records.len(), 0);
917    }
918
919    #[test]
920    fn test_streaming_reader_record_straddles_buffer_refill_fails() {
921        // Covers lines 138-140 more thoroughly: record header is at the end of
922        // a buffer fill, claims a size that needs more data, but the data stream
923        // ends. fill_buffer succeeds (has some data) but record_len still > available.
924        let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "fill.txt");
925        let record_size = sample.len();
926
927        let mut data = Vec::new();
928        // Fill most of one buffer with valid records
929        let records_to_fill = (BUF_SIZE / record_size) - 1;
930        for i in 0..records_to_fill {
931            data.extend_from_slice(&build_v2_record_bytes(
932                (i + 1) as u64,
933                1,
934                5,
935                5,
936                0x100,
937                "fill.txt",
938            ));
939        }
940
941        // Now add a truncated record: header claims 4096 bytes but only 16 are available
942        let current_len = data.len();
943        let remaining_in_buffer = BUF_SIZE - current_len;
944        // Pad with zeros to get near end of buffer
945        if remaining_in_buffer > 16 {
946            data.extend_from_slice(&vec![0u8; remaining_in_buffer - 16]);
947        }
948        // Add a non-zero record header that claims a large size
949        let mut truncated_header = vec![0u8; 16];
950        truncated_header[0..4].copy_from_slice(&(4096u32).to_le_bytes()); // claims 4096 bytes
951        truncated_header[4..6].copy_from_slice(&2u16.to_le_bytes()); // version 2
952        truncated_header[8] = 0xFF; // make it non-zero so skip_zeros doesn't skip it
953        data.extend_from_slice(&truncated_header);
954        // No more data after this - fill_buffer will get these 16 bytes but
955        // record_len (4096) > buf available, triggering lines 138-140
956
957        let cursor = Cursor::new(data);
958        let reader = UsnJournalReader::new(cursor).unwrap();
959        let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
960
961        // Should have found the fill records but skipped the truncated one
962        assert!(records.len() >= records_to_fill);
963    }
964}