1use std::io::{Read, Seek, SeekFrom};
9
10use crate::error::Result;
11use crate::usn::{parse_usn_record_v2, parse_usn_record_v3, UsnRecord};
12
13const BUF_SIZE: usize = 64 * 1024; fn read_u32_le(data: &[u8], offset: usize) -> u32 {
17 let mut b = [0u8; 4];
18 if let Some(s) = data.get(offset..offset + 4) {
19 b.copy_from_slice(s);
20 }
21 u32::from_le_bytes(b)
22}
23
24fn read_u16_le(data: &[u8], offset: usize) -> u16 {
26 let mut b = [0u8; 2];
27 if let Some(s) = data.get(offset..offset + 2) {
28 b.copy_from_slice(s);
29 }
30 u16::from_le_bytes(b)
31}
32
33pub struct UsnJournalReader<R: Read + Seek> {
37 reader: R,
38 buf: Vec<u8>,
39 buf_len: usize,
40 buf_offset: usize,
41 stream_pos: u64,
42 total_size: u64,
43 done: bool,
44}
45
46impl<R: Read + Seek> UsnJournalReader<R> {
47 pub fn new(mut reader: R) -> Result<Self> {
53 let total_size = reader.seek(SeekFrom::End(0))?;
54 reader.seek(SeekFrom::Start(0))?;
55
56 Ok(Self {
57 reader,
58 buf: vec![0u8; BUF_SIZE],
59 buf_len: 0,
60 buf_offset: 0,
61 stream_pos: 0,
62 total_size,
63 done: false,
64 })
65 }
66
67 fn fill_buffer(&mut self) -> Result<bool> {
68 if self.stream_pos >= self.total_size {
69 self.done = true;
70 return Ok(false);
71 }
72
73 if self.buf_offset > 0 && self.buf_offset < self.buf_len {
75 let remaining = self.buf_len - self.buf_offset;
76 self.buf.copy_within(self.buf_offset..self.buf_len, 0);
77 self.buf_len = remaining;
78 } else {
79 self.buf_len = 0;
80 }
81 self.buf_offset = 0;
82
83 let space = BUF_SIZE - self.buf_len;
85 if space > 0 {
86 let Some(dst) = self.buf.get_mut(self.buf_len..self.buf_len + space) else {
87 self.done = true; return Ok(self.buf_len > 0); };
90 let n = self.reader.read(dst)?;
91 if n == 0 {
92 self.done = true; return Ok(self.buf_len > 0); }
95 self.buf_len += n;
96 self.stream_pos += n as u64;
97 } Ok(true)
100 }
101
102 fn skip_zeros(&mut self) -> Result<bool> {
103 loop {
104 while self.buf_offset + 8 <= self.buf_len {
105 match self.buf.get(self.buf_offset..self.buf_offset + 8) {
106 Some([0, 0, 0, 0, 0, 0, 0, 0]) => self.buf_offset += 8,
107 _ => return Ok(true),
108 }
109 }
110 if !self.fill_buffer()? {
111 return Ok(false);
112 }
113 if self.buf_len == 0 {
114 return Ok(false); }
116 }
117 }
118}
119
120impl<R: Read + Seek> Iterator for UsnJournalReader<R> {
121 type Item = Result<UsnRecord>;
122
123 fn next(&mut self) -> Option<Self::Item> {
124 if self.done {
125 return None;
126 }
127
128 if self.buf_offset >= self.buf_len {
130 match self.fill_buffer() {
131 Ok(true) => {}
132 Ok(false) => return None,
133 Err(e) => return Some(Err(e)),
134 }
135 }
136
137 match self.skip_zeros() {
139 Ok(true) => {}
140 Ok(false) => return None,
141 Err(e) => return Some(Err(e)),
142 }
143
144 if self.buf_offset + 8 > self.buf_len {
146 match self.fill_buffer() {
147 Ok(true) if self.buf_offset + 8 <= self.buf_len => {} _ => return None, }
150 }
151
152 let record_len = read_u32_le(&self.buf, self.buf_offset) as usize;
153
154 if !(8..=65536).contains(&record_len) {
155 self.buf_offset += 8;
156 return self.next();
157 }
158
159 if self.buf_offset + record_len > self.buf_len {
161 match self.fill_buffer() {
162 Ok(true) if self.buf_offset + record_len <= self.buf_len => {}
163 _ => {
164 self.buf_offset += 8;
165 return self.next();
166 }
167 }
168 }
169
170 let version = read_u16_le(&self.buf, self.buf_offset + 4);
171
172 let Some(record_data) = self.buf.get(self.buf_offset..self.buf_offset + record_len) else {
173 self.buf_offset += 8; return self.next(); };
176 let record_data = record_data.to_vec();
177 let aligned = (record_len + 7) & !7;
178 self.buf_offset += aligned;
179
180 match version {
181 2 => match parse_usn_record_v2(&record_data) {
182 Ok(r) => Some(Ok(r)),
183 Err(_) => self.next(),
184 },
185 3 => match parse_usn_record_v3(&record_data) {
186 Ok(r) => Some(Ok(r)),
187 Err(_) => self.next(),
188 },
189 _ => self.next(),
190 }
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::usn::UsnReason;
198 use std::io::Cursor;
199
200 fn build_v2_record_bytes(
201 entry: u64,
202 seq: u16,
203 parent: u64,
204 parent_seq: u16,
205 reason: u32,
206 name: &str,
207 ) -> Vec<u8> {
208 let name_utf16: Vec<u16> = name.encode_utf16().collect();
209 let name_bytes_len = name_utf16.len() * 2;
210 let record_len = 0x3C + name_bytes_len;
211 let aligned_len = (record_len + 7) & !7;
212 let mut buf = vec![0u8; aligned_len];
213 buf[0..4].copy_from_slice(&(record_len as u32).to_le_bytes());
214 buf[4..6].copy_from_slice(&2u16.to_le_bytes());
215 let file_ref = entry | (u64::from(seq) << 48);
216 buf[0x08..0x10].copy_from_slice(&file_ref.to_le_bytes());
217 let parent_ref = parent | (u64::from(parent_seq) << 48);
218 buf[0x10..0x18].copy_from_slice(&parent_ref.to_le_bytes());
219 buf[0x18..0x20].copy_from_slice(&100i64.to_le_bytes());
220 let ts: i64 = 133_500_480_000_000_000;
221 buf[0x20..0x28].copy_from_slice(&ts.to_le_bytes());
222 buf[0x28..0x2C].copy_from_slice(&reason.to_le_bytes());
223 buf[0x34..0x38].copy_from_slice(&0x20u32.to_le_bytes());
224 buf[0x38..0x3A].copy_from_slice(&(name_bytes_len as u16).to_le_bytes());
225 buf[0x3A..0x3C].copy_from_slice(&0x3Cu16.to_le_bytes());
226 for (i, &ch) in name_utf16.iter().enumerate() {
227 let off = 0x3C + i * 2;
228 buf[off..off + 2].copy_from_slice(&ch.to_le_bytes());
229 }
230 buf
231 }
232
233 #[test]
234 fn test_streaming_reader_basic() {
235 let r = build_v2_record_bytes(100, 1, 5, 5, 0x100, "test.txt");
236 let cursor = Cursor::new(r);
237 let reader = UsnJournalReader::new(cursor).unwrap();
238 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
239 assert_eq!(records.len(), 1);
240 assert_eq!(records[0].filename, "test.txt");
241 }
242
243 #[test]
244 fn test_streaming_reader_skips_zeros() {
245 let mut data = vec![0u8; 4096];
246 data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "found.txt"));
247 let cursor = Cursor::new(data);
248 let reader = UsnJournalReader::new(cursor).unwrap();
249 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
250 assert_eq!(records.len(), 1);
251 assert_eq!(records[0].filename, "found.txt");
252 }
253
254 #[test]
255 fn test_streaming_reader_multiple() {
256 let mut data = Vec::new();
257 data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "a.txt"));
258 data.extend_from_slice(&build_v2_record_bytes(200, 1, 100, 1, 0x200, "b.txt"));
259 data.extend_from_slice(&build_v2_record_bytes(300, 1, 100, 1, 0x100, "c.txt"));
260 let cursor = Cursor::new(data);
261 let reader = UsnJournalReader::new(cursor).unwrap();
262 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
263 assert_eq!(records.len(), 3);
264 }
265
266 #[test]
267 fn test_streaming_reader_empty_data() {
268 let cursor = Cursor::new(Vec::<u8>::new());
269 let reader = UsnJournalReader::new(cursor).unwrap();
270 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
271 assert_eq!(records.len(), 0);
272 }
273
274 #[test]
275 fn test_streaming_reader_all_zeros() {
276 let data = vec![0u8; 4096];
277 let cursor = Cursor::new(data);
278 let reader = UsnJournalReader::new(cursor).unwrap();
279 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
280 assert_eq!(records.len(), 0);
281 }
282
283 #[test]
284 fn test_streaming_reader_includes_close_only() {
285 let data = build_v2_record_bytes(100, 1, 5, 5, 0x8000_0000, "closed.txt");
286 let cursor = Cursor::new(data);
287 let reader = UsnJournalReader::new(cursor).unwrap();
288 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
289 assert_eq!(records.len(), 1);
290 assert_eq!(records[0].reason, UsnReason::CLOSE);
291 }
292
293 #[test]
294 fn test_streaming_reader_invalid_record_length() {
295 let mut data = vec![0u8; 64];
297 data[0..4].copy_from_slice(&3u32.to_le_bytes()); data[4..6].copy_from_slice(&2u16.to_le_bytes());
299 let cursor = Cursor::new(data);
302 let reader = UsnJournalReader::new(cursor).unwrap();
303 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
304 assert_eq!(records.len(), 0);
305 }
306
307 #[test]
308 fn test_streaming_reader_invalid_then_valid() {
309 let mut data = vec![0u8; 16]; data[0..4].copy_from_slice(&5u32.to_le_bytes()); data[4..6].copy_from_slice(&99u16.to_le_bytes()); data.resize(16, 0);
314 data.extend_from_slice(&[0u8; 64]);
316 data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "valid.txt"));
317
318 let cursor = Cursor::new(data);
319 let reader = UsnJournalReader::new(cursor).unwrap();
320 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
321 assert_eq!(records.len(), 1);
322 assert_eq!(records[0].filename, "valid.txt");
323 }
324
325 #[test]
326 fn test_streaming_reader_unknown_version() {
327 let mut data = vec![0u8; 0x40];
329 data[0..4].copy_from_slice(&(0x40u32).to_le_bytes());
330 data[4..6].copy_from_slice(&99u16.to_le_bytes()); let cursor = Cursor::new(data);
333 let reader = UsnJournalReader::new(cursor).unwrap();
334 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
335 assert_eq!(records.len(), 0);
336 }
337
338 fn build_v3_record_bytes(entry: u64, parent: u64, reason: u32, name: &str) -> Vec<u8> {
339 let name_utf16: Vec<u16> = name.encode_utf16().collect();
340 let name_bytes_len = name_utf16.len() * 2;
341 let record_len = 0x4C + name_bytes_len;
342 let aligned_len = (record_len + 7) & !7;
343 let mut buf = vec![0u8; aligned_len];
344
345 buf[0..4].copy_from_slice(&(record_len as u32).to_le_bytes());
346 buf[4..6].copy_from_slice(&3u16.to_le_bytes());
347 buf[6..8].copy_from_slice(&0u16.to_le_bytes());
348 buf[0x08..0x18].copy_from_slice(&u128::from(entry).to_le_bytes());
349 buf[0x18..0x28].copy_from_slice(&u128::from(parent).to_le_bytes());
350 buf[0x28..0x30].copy_from_slice(&200i64.to_le_bytes());
351 let ts: i64 = 133_500_480_000_000_000;
352 buf[0x30..0x38].copy_from_slice(&ts.to_le_bytes());
353 buf[0x38..0x3C].copy_from_slice(&reason.to_le_bytes());
354 buf[0x44..0x48].copy_from_slice(&0x20u32.to_le_bytes());
355 buf[0x48..0x4A].copy_from_slice(&(name_bytes_len as u16).to_le_bytes());
356 buf[0x4A..0x4C].copy_from_slice(&0x4Cu16.to_le_bytes());
357 for (i, &ch) in name_utf16.iter().enumerate() {
358 let off = 0x4C + i * 2;
359 buf[off..off + 2].copy_from_slice(&ch.to_le_bytes());
360 }
361 buf
362 }
363
364 #[test]
365 fn test_streaming_reader_v3_record() {
366 let data = build_v3_record_bytes(100, 5, 0x100, "v3file.txt");
367 let cursor = Cursor::new(data);
368 let reader = UsnJournalReader::new(cursor).unwrap();
369 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
370 assert_eq!(records.len(), 1);
371 assert_eq!(records[0].filename, "v3file.txt");
372 assert_eq!(records[0].major_version, 3);
373 }
374
375 #[test]
376 fn test_streaming_reader_v3_close_only_included() {
377 let data = build_v3_record_bytes(100, 5, 0x8000_0000, "closed_v3.txt");
378 let cursor = Cursor::new(data);
379 let reader = UsnJournalReader::new(cursor).unwrap();
380 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
381 assert_eq!(records.len(), 1);
382 assert_eq!(records[0].reason, UsnReason::CLOSE);
383 }
384
385 #[test]
386 fn test_streaming_reader_large_zero_gap() {
387 let mut data = vec![0u8; 128 * 1024]; data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "deep.txt"));
390
391 let cursor = Cursor::new(data);
392 let reader = UsnJournalReader::new(cursor).unwrap();
393 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
394 assert_eq!(records.len(), 1);
395 assert_eq!(records[0].filename, "deep.txt");
396 }
397
398 #[test]
399 fn test_streaming_reader_record_larger_than_initial_buffer_fill() {
400 let record = build_v2_record_bytes(42, 3, 5, 5, 0x100, "buffer_test.txt");
402 let cursor = Cursor::new(record);
403 let reader = UsnJournalReader::new(cursor).unwrap();
404 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
405 assert_eq!(records.len(), 1);
406 assert_eq!(records[0].mft_entry, 42);
407 assert_eq!(records[0].mft_sequence, 3);
408 }
409
410 #[test]
411 fn test_streaming_reader_record_too_large() {
412 let mut data = vec![0u8; 128];
414 data[0..4].copy_from_slice(&(65537u32).to_le_bytes());
415 data[4..6].copy_from_slice(&2u16.to_le_bytes());
416
417 let cursor = Cursor::new(data);
418 let reader = UsnJournalReader::new(cursor).unwrap();
419 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
420 assert_eq!(records.len(), 0);
421 }
422
423 #[test]
424 fn test_streaming_reader_mixed_v2_v3() {
425 let mut data = Vec::new();
426 data.extend_from_slice(&build_v2_record_bytes(100, 1, 5, 5, 0x100, "v2.txt"));
427 data.extend_from_slice(&build_v3_record_bytes(200, 5, 0x200, "v3.txt"));
428 data.extend_from_slice(&build_v2_record_bytes(300, 1, 5, 5, 0x100, "v2b.txt"));
429
430 let cursor = Cursor::new(data);
431 let reader = UsnJournalReader::new(cursor).unwrap();
432 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
433 assert_eq!(records.len(), 3);
434 assert_eq!(records[0].major_version, 2);
435 assert_eq!(records[1].major_version, 3);
436 assert_eq!(records[2].major_version, 2);
437 }
438
439 #[test]
440 fn test_streaming_reader_fill_buffer_with_unconsumed_data() {
441 let mut data = Vec::new();
447 let record_size;
448 {
449 let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "sample.txt");
450 record_size = sample.len();
451 }
452
453 let num_records_to_fill = (BUF_SIZE - record_size) / record_size;
456 for i in 0..num_records_to_fill {
457 data.extend_from_slice(&build_v2_record_bytes(
458 (i + 1) as u64,
459 1,
460 5,
461 5,
462 0x100,
463 &format!("f{i:04}.txt"),
464 ));
465 }
466
467 let remaining = BUF_SIZE - (num_records_to_fill * record_size);
469 if remaining > 0 && remaining < record_size {
470 data.extend_from_slice(&vec![0u8; remaining]); }
472
473 for i in 0..5 {
475 data.extend_from_slice(&build_v2_record_bytes(
476 (num_records_to_fill + i + 1) as u64,
477 1,
478 5,
479 5,
480 0x100,
481 &format!("after{i}.txt"),
482 ));
483 }
484
485 let cursor = Cursor::new(data);
486 let reader = UsnJournalReader::new(cursor).unwrap();
487 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
488 assert!(records.len() >= num_records_to_fill + 5);
490 }
491
492 #[test]
493 fn test_streaming_reader_record_at_exact_buffer_boundary() {
494 let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "sample.txt");
497 let record_size = sample.len();
498
499 let mut data = Vec::new();
500 let records_per_buffer = BUF_SIZE / record_size;
502 let exact_fill = records_per_buffer * record_size;
503
504 for i in 0..records_per_buffer {
506 data.extend_from_slice(&build_v2_record_bytes(
507 (i + 1) as u64,
508 1,
509 5,
510 5,
511 0x100,
512 "exact.txt",
513 ));
514 }
515
516 if exact_fill < BUF_SIZE {
518 data.extend_from_slice(&vec![0u8; BUF_SIZE - exact_fill]);
519 }
520
521 data.extend_from_slice(&build_v2_record_bytes(
523 (records_per_buffer + 1) as u64,
524 1,
525 5,
526 5,
527 0x100,
528 "boundary.txt",
529 ));
530
531 let cursor = Cursor::new(data);
532 let reader = UsnJournalReader::new(cursor).unwrap();
533 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
534 assert!(records.iter().any(|r| r.filename == "boundary.txt"));
536 }
537
538 #[test]
539 fn test_streaming_reader_record_straddles_buffer() {
540 let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "sample.txt");
544 let record_size = sample.len();
545
546 let mut data = Vec::new();
547 let records_to_fill = (BUF_SIZE / record_size) - 1;
549 for i in 0..records_to_fill {
550 data.extend_from_slice(&build_v2_record_bytes(
551 (i + 1) as u64,
552 1,
553 5,
554 5,
555 0x100,
556 "fill.txt",
557 ));
558 }
559
560 let current_len = data.len();
561 let padding = BUF_SIZE - current_len - (record_size / 2);
564 if padding > 0 {
565 data.extend_from_slice(&vec![0u8; padding]);
566 }
567
568 data.extend_from_slice(&build_v2_record_bytes(999, 1, 5, 5, 0x100, "straddle.txt"));
570
571 data.extend_from_slice(&vec![0u8; 256]);
573
574 let cursor = Cursor::new(data);
575 let reader = UsnJournalReader::new(cursor).unwrap();
576 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
577 assert!(records.iter().any(|r| r.filename == "straddle.txt"));
579 }
580
581 #[test]
582 fn test_streaming_reader_data_larger_than_buffer() {
583 let mut data = Vec::new();
586 let total_records = 2000; for i in 0..total_records {
588 data.extend_from_slice(&build_v2_record_bytes(
589 (i + 1) as u64,
590 1,
591 5,
592 5,
593 0x100,
594 &format!("r{i:04}.txt"),
595 ));
596 }
597
598 let cursor = Cursor::new(data);
599 let reader = UsnJournalReader::new(cursor).unwrap();
600 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
601 assert_eq!(records.len(), total_records);
602 }
603
604 struct ErrorAfterNReads {
609 data: Cursor<Vec<u8>>,
610 reads_remaining: usize,
611 }
612
613 impl ErrorAfterNReads {
614 fn new(data: Vec<u8>, successful_reads: usize) -> Self {
615 Self {
616 data: Cursor::new(data),
617 reads_remaining: successful_reads,
618 }
619 }
620 }
621
622 impl Read for ErrorAfterNReads {
623 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
624 if self.reads_remaining == 0 {
625 return Err(std::io::Error::other("simulated read error"));
626 }
627 self.reads_remaining -= 1;
628 self.data.read(buf)
629 }
630 }
631
632 impl Seek for ErrorAfterNReads {
633 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
634 self.data.seek(pos)
635 }
636 }
637
638 struct TinyChunkReader {
642 data: Vec<u8>,
643 pos: u64,
644 chunk_size: usize,
645 }
646
647 impl TinyChunkReader {
648 fn new(data: Vec<u8>, chunk_size: usize) -> Self {
649 Self {
650 data,
651 pos: 0,
652 chunk_size,
653 }
654 }
655 }
656
657 impl Read for TinyChunkReader {
658 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
659 let remaining = self.data.len() - self.pos as usize;
660 if remaining == 0 {
661 return Ok(0); }
663 let to_read = buf.len().min(self.chunk_size).min(remaining);
664 let start = self.pos as usize;
665 buf[..to_read].copy_from_slice(&self.data[start..start + to_read]);
666 self.pos += to_read as u64;
667 Ok(to_read)
668 }
669 }
670
671 impl Seek for TinyChunkReader {
672 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
673 match pos {
674 SeekFrom::Start(n) => self.pos = n,
675 SeekFrom::End(n) => self.pos = (self.data.len() as i64 + n) as u64,
676 SeekFrom::Current(n) => self.pos = (self.pos as i64 + n) as u64, }
678 Ok(self.pos)
679 }
680 }
681
682 #[test]
683 fn test_streaming_reader_done_flag_returns_none() {
684 let record = build_v2_record_bytes(100, 1, 5, 5, 0x100, "done.txt");
688 let cursor = Cursor::new(record);
689 let mut reader = UsnJournalReader::new(cursor).unwrap();
690
691 let first = reader.next();
693 assert!(first.is_some());
694 assert!(first.unwrap().is_ok());
695
696 let second = reader.next();
698 assert!(second.is_none());
699
700 let third = reader.next();
702 assert!(third.is_none());
703 }
704
705 #[test]
706 fn test_streaming_reader_fill_buffer_error_propagation() {
707 let record = build_v2_record_bytes(100, 1, 5, 5, 0x100, "err.txt");
712 let err_reader = ErrorAfterNReads::new(record, 0);
713 let mut reader = UsnJournalReader::new(err_reader).unwrap();
714
715 let result = reader.next();
718 assert!(result.is_some());
719 let err = result.unwrap();
720 assert!(err.is_err());
721 assert!(err
722 .unwrap_err()
723 .to_string()
724 .contains("simulated read error"));
725 }
726
727 #[test]
728 fn test_streaming_reader_skip_zeros_error_propagation() {
729 let mut data = vec![0u8; BUF_SIZE]; data.extend_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]); let err_reader = ErrorAfterNReads::new(data, 1);
738 let mut reader = UsnJournalReader::new(err_reader).unwrap();
739
740 let result = reader.next();
741 assert!(result.is_some());
742 let err = result.unwrap();
743 assert!(err.is_err());
744 }
745
746 #[test]
747 fn test_streaming_reader_eof_mid_fill_with_remaining_data() {
748 let record = build_v2_record_bytes(42, 1, 5, 5, 0x100, "tiny.txt");
753 let data_len = record.len();
754 let tiny_reader = TinyChunkReader::new(record, data_len);
757 let mut reader = UsnJournalReader::new(tiny_reader).unwrap();
758
759 let result = reader.next();
760 assert!(result.is_some());
761 let rec = result.unwrap().unwrap();
762 assert_eq!(rec.filename, "tiny.txt");
763 }
764
765 #[test]
766 fn test_streaming_reader_eof_mid_fill_no_remaining_data() {
767 let tiny_reader = TinyChunkReader::new(Vec::new(), 1);
770 let mut reader = UsnJournalReader::new(tiny_reader).unwrap();
771
772 let result = reader.next();
773 assert!(result.is_none());
774 }
775
776 #[test]
777 fn test_streaming_reader_header_refill_insufficient() {
778 let mut data = vec![0u8; BUF_SIZE - 4]; data.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]); let cursor = Cursor::new(data);
788 let mut reader = UsnJournalReader::new(cursor).unwrap();
789
790 let result = reader.next();
794 assert!(result.is_none());
795 }
796
797 #[test]
798 fn test_streaming_reader_record_refill_insufficient() {
799 let mut data = vec![0u8; 16];
803 data[0..4].copy_from_slice(&(1024u32).to_le_bytes());
805 data[4..6].copy_from_slice(&2u16.to_le_bytes());
806 let cursor = Cursor::new(data);
810 let mut reader = UsnJournalReader::new(cursor).unwrap();
811
812 let result = reader.next();
813 assert!(result.is_none());
814 }
815
816 #[test]
817 fn test_streaming_reader_v2_parse_error_skips() {
818 let mut data = Vec::new();
822
823 let mut bad_v2 = vec![0u8; 0x20]; bad_v2[0..4].copy_from_slice(&(0x20u32).to_le_bytes()); bad_v2[4..6].copy_from_slice(&2u16.to_le_bytes()); data.extend_from_slice(&bad_v2);
832
833 data.extend_from_slice(&build_v2_record_bytes(
835 100,
836 1,
837 5,
838 5,
839 0x100,
840 "after_bad_v2.txt",
841 ));
842
843 let cursor = Cursor::new(data);
844 let reader = UsnJournalReader::new(cursor).unwrap();
845 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
846
847 assert_eq!(records.len(), 1);
848 assert_eq!(records[0].filename, "after_bad_v2.txt");
849 }
850
851 #[test]
852 fn test_streaming_reader_v3_parse_error_skips() {
853 let mut data = Vec::new();
857
858 let mut bad_v3 = vec![0u8; 0x20];
860 bad_v3[0..4].copy_from_slice(&(0x20u32).to_le_bytes());
861 bad_v3[4..6].copy_from_slice(&3u16.to_le_bytes()); data.extend_from_slice(&bad_v3);
863
864 data.extend_from_slice(&build_v2_record_bytes(
866 200,
867 1,
868 5,
869 5,
870 0x200,
871 "after_bad_v3.txt",
872 ));
873
874 let cursor = Cursor::new(data);
875 let reader = UsnJournalReader::new(cursor).unwrap();
876 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
877
878 assert_eq!(records.len(), 1);
879 assert_eq!(records[0].filename, "after_bad_v3.txt");
880 }
881
882 #[test]
883 fn test_streaming_reader_skip_zeros_refill_then_find_data() {
884 let mut data = vec![0u8; BUF_SIZE * 2 + 512]; data.extend_from_slice(&build_v2_record_bytes(
889 100,
890 1,
891 5,
892 5,
893 0x100,
894 "after_many_zeros.txt",
895 ));
896
897 let cursor = Cursor::new(data);
898 let reader = UsnJournalReader::new(cursor).unwrap();
899 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
900
901 assert_eq!(records.len(), 1);
902 assert_eq!(records[0].filename, "after_many_zeros.txt");
903 }
904
905 #[test]
906 fn test_streaming_reader_skip_zeros_all_zeros_eof() {
907 let data = vec![0u8; BUF_SIZE + 100]; let cursor = Cursor::new(data);
913 let reader = UsnJournalReader::new(cursor).unwrap();
914 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
915
916 assert_eq!(records.len(), 0);
917 }
918
919 #[test]
920 fn test_streaming_reader_record_straddles_buffer_refill_fails() {
921 let sample = build_v2_record_bytes(1, 1, 5, 5, 0x100, "fill.txt");
925 let record_size = sample.len();
926
927 let mut data = Vec::new();
928 let records_to_fill = (BUF_SIZE / record_size) - 1;
930 for i in 0..records_to_fill {
931 data.extend_from_slice(&build_v2_record_bytes(
932 (i + 1) as u64,
933 1,
934 5,
935 5,
936 0x100,
937 "fill.txt",
938 ));
939 }
940
941 let current_len = data.len();
943 let remaining_in_buffer = BUF_SIZE - current_len;
944 if remaining_in_buffer > 16 {
946 data.extend_from_slice(&vec![0u8; remaining_in_buffer - 16]);
947 }
948 let mut truncated_header = vec![0u8; 16];
950 truncated_header[0..4].copy_from_slice(&(4096u32).to_le_bytes()); truncated_header[4..6].copy_from_slice(&2u16.to_le_bytes()); truncated_header[8] = 0xFF; data.extend_from_slice(&truncated_header);
954 let cursor = Cursor::new(data);
958 let reader = UsnJournalReader::new(cursor).unwrap();
959 let records: Vec<_> = reader.filter_map(std::result::Result::ok).collect();
960
961 assert!(records.len() >= records_to_fill);
963 }
964}