1use crate::error::{NoxuLogError, Result};
8use crate::file_reader::{FileReader, LogFileAccess};
9use hashbrown::{HashMap, HashSet};
10use noxu_util::lsn::{Lsn, NULL_LSN};
11
12pub struct LastFileReader<F: LogFileAccess> {
16 reader: FileReader<F>,
18
19 trackable_entries: HashSet<u8>,
21
22 last_offset_seen: HashMap<u8, u64>,
24
25 next_unproven_offset: u64,
27
28 last_valid_offset: u64,
30
31 last_entry_type: u8,
33
34 file_num: u32,
36
37 halt_on_commit_after_checksum: bool,
45}
46
47impl<F: LogFileAccess> LastFileReader<F> {
48 pub fn new(file_access: F, read_buffer_size: usize) -> Result<Self> {
56 let (file_num, file_len) = Self::find_last_good_file(&file_access)?;
58
59 let start_lsn = Lsn::new(file_num, 0);
60 let end_of_file_lsn = Lsn::new(file_num, file_len as u32);
61
62 let reader = FileReader::new(
63 file_access,
64 true, start_lsn,
66 end_of_file_lsn,
67 NULL_LSN, read_buffer_size,
69 true, )?;
71
72 Ok(LastFileReader {
73 reader,
74 trackable_entries: HashSet::new(),
75 last_offset_seen: HashMap::new(),
76 next_unproven_offset: 0,
77 last_valid_offset: 0,
78 last_entry_type: 0,
79 file_num,
80 halt_on_commit_after_checksum: false,
81 })
82 }
83
84 fn find_last_good_file(file_access: &F) -> Result<(u32, u64)> {
88 let first_file = file_access.get_first_file_num().unwrap_or(0);
90
91 let mut current_file = first_file;
92 let mut last_good_file = None;
93
94 #[expect(clippy::while_let_loop)]
96 loop {
97 match file_access.get_file_length(current_file) {
98 Ok(len) => {
99 if len > 0 {
101 last_good_file = Some((current_file, len));
102 }
103
104 if let Some(next) =
106 file_access.get_following_file_num(current_file, true)
107 {
108 current_file = next;
109 } else {
110 break;
111 }
112 }
113 Err(_) => {
114 break;
116 }
117 }
118 }
119
120 last_good_file.ok_or_else(|| NoxuLogError::UnexpectedEof {
121 lsn: NULL_LSN,
122 message: "No valid log files found".to_string(),
123 })
124 }
125
126 pub fn set_target_type(&mut self, entry_type: u8) {
130 self.trackable_entries.insert(entry_type);
131 }
132
133 pub fn set_halt_on_commit_after_checksum(&mut self, halt: bool) {
147 self.halt_on_commit_after_checksum = halt;
148 }
149
150 pub fn get_last_seen(&self, entry_type: u8) -> Lsn {
154 self.last_offset_seen
155 .get(&entry_type)
156 .map(|&offset| Lsn::new(self.file_num, offset as u32))
157 .unwrap_or(NULL_LSN)
158 }
159
160 pub fn get_end_of_log(&self) -> Lsn {
164 Lsn::new(self.file_num, self.next_unproven_offset as u32)
165 }
166
167 pub fn get_last_valid_lsn(&self) -> Lsn {
171 Lsn::new(self.file_num, self.last_valid_offset as u32)
172 }
173
174 pub fn get_prev_offset(&self) -> u64 {
176 self.last_valid_offset
177 }
178
179 pub fn get_entry_type(&self) -> u8 {
181 self.last_entry_type
182 }
183
184 pub fn read_next_entry(&mut self) -> Result<bool> {
191 let current_offset =
193 self.reader.get_current_entry_lsn().file_offset() as u64;
194 let _next_offset = current_offset; match self.reader.read_next_entry() {
198 Ok(found) => {
199 if found {
200 let lsn = self.reader.get_current_entry_lsn();
202 self.last_valid_offset = lsn.file_offset() as u64;
203 self.next_unproven_offset = self.last_valid_offset
204 + self.reader.get_last_entry_size() as u64;
205
206 if let Some(header) = self.reader.get_current_entry_header()
208 {
209 self.last_entry_type = header.entry_type;
210
211 if self.trackable_entries.contains(&header.entry_type) {
212 self.last_offset_seen.insert(
213 header.entry_type,
214 self.last_valid_offset,
215 );
216 }
217 }
218
219 Ok(true)
220 } else {
221 Ok(false)
223 }
224 }
225 Err(NoxuLogError::Checksum { lsn, .. }) => {
226 let corrupt_lsn = lsn;
246 if self.halt_on_commit_after_checksum {
247 log::warn!(
248 "LastFileReader: checksum failure at LSN {corrupt_lsn} \
249 during end-of-log scan; haltOnCommit enabled, \
250 scanning forward for committed txn"
251 );
252 if let Some(commit_lsn) = self.find_committed_txn()? {
253 return Err(NoxuLogError::FoundCommittedTxn {
254 corrupt_lsn,
255 commit_lsn,
256 });
257 }
258 }
259 Ok(false)
262 }
263 Err(e) => {
264 Err(e)
266 }
267 }
268 }
269
270 fn find_committed_txn(&mut self) -> Result<Option<Lsn>> {
284 let resume_offset = self.reader.next_entry_offset();
290 if self.reader.resume_forward_at(resume_offset).is_err() {
293 return Ok(None);
294 }
295
296 const LOG_TXN_COMMIT: u8 = 30;
298
299 loop {
300 match self.reader.read_next_entry() {
301 Ok(true) => {
302 if let Some(header) = self.reader.get_current_entry_header()
304 && header.entry_type == LOG_TXN_COMMIT
305 {
306 return Ok(Some(self.reader.get_current_entry_lsn()));
307 }
308 }
309 Ok(false) => return Ok(None),
311 Err(NoxuLogError::Checksum { .. })
314 | Err(NoxuLogError::UnexpectedEof { .. }) => {
315 return Ok(None);
316 }
317 Err(e) => return Err(e),
318 }
319 }
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate::checksum::ChecksumValidator;
327 use crate::entry_header::CHECKSUM_BYTES;
328 use crate::file_reader::LogEntryHeader as FrHeader;
329 use crate::file_reader::LogFileAccess;
330 use std::collections::HashMap;
331 use std::io;
332
333 fn build_valid_entry(entry_type: u8, payload: &[u8]) -> Vec<u8> {
338 let header_size = FrHeader::MIN_HEADER_SIZE;
339 let total = header_size + payload.len();
340 let mut buf = vec![0u8; total];
341 buf[4] = entry_type;
342 buf[5] = 0; buf[10..14].copy_from_slice(&(payload.len() as u32).to_le_bytes());
344 buf[header_size..].copy_from_slice(payload);
345 let crc = ChecksumValidator::compute_range(
346 &buf,
347 CHECKSUM_BYTES,
348 total - CHECKSUM_BYTES,
349 );
350 buf[0..4].copy_from_slice(&crc.to_le_bytes());
351 buf
352 }
353
354 fn build_corrupt_entry(entry_type: u8, payload: &[u8]) -> Vec<u8> {
358 let mut buf = build_valid_entry(entry_type, payload);
359 let last = buf.len() - 1;
361 buf[last] ^= 0xFF;
362 buf
363 }
364
365 struct MockFileAccess {
367 files: HashMap<u32, Vec<u8>>,
368 }
369
370 impl MockFileAccess {
371 fn new() -> Self {
372 MockFileAccess { files: HashMap::new() }
373 }
374
375 fn add_file(&mut self, file_num: u32, data: Vec<u8>) {
376 self.files.insert(file_num, data);
377 }
378 }
379
380 impl LogFileAccess for MockFileAccess {
381 fn read_from_file(
382 &self,
383 file_num: u32,
384 offset: u64,
385 buf: &mut [u8],
386 ) -> Result<usize> {
387 if let Some(data) = self.files.get(&file_num) {
388 let start = offset as usize;
389 if start >= data.len() {
390 return Ok(0);
391 }
392 let end = (start + buf.len()).min(data.len());
393 let bytes_to_copy = end - start;
394 buf[..bytes_to_copy].copy_from_slice(&data[start..end]);
395 Ok(bytes_to_copy)
396 } else {
397 Err(io::Error::new(io::ErrorKind::NotFound, "File not found")
398 .into())
399 }
400 }
401
402 fn get_file_length(&self, file_num: u32) -> Result<u64> {
403 self.files.get(&file_num).map(|data| data.len() as u64).ok_or_else(
404 || {
405 io::Error::new(io::ErrorKind::NotFound, "File not found")
406 .into()
407 },
408 )
409 }
410
411 fn get_first_file_num(&self) -> Option<u32> {
412 self.files.keys().min().copied()
413 }
414
415 fn get_following_file_num(
416 &self,
417 file_num: u32,
418 forward: bool,
419 ) -> Option<u32> {
420 let mut file_nums: Vec<u32> = self.files.keys().copied().collect();
421 file_nums.sort();
422
423 if forward {
424 file_nums.iter().find(|&&n| n > file_num).copied()
425 } else {
426 file_nums.iter().rev().find(|&&n| n < file_num).copied()
427 }
428 }
429
430 fn get_file_header_prev_offset(&self, _file_num: u32) -> Result<u64> {
431 Ok(0)
432 }
433 }
434
435 #[test]
436 fn test_last_file_reader_creation() {
437 let mut mock = MockFileAccess::new();
438 mock.add_file(0, vec![0u8; 100]);
439
440 let result = LastFileReader::new(mock, 1024);
441 assert!(result.is_ok());
442 }
443
444 #[test]
445 fn test_find_last_good_file() {
446 let mut mock = MockFileAccess::new();
447 mock.add_file(0, vec![0u8; 100]);
448 mock.add_file(1, vec![0u8; 200]);
449 mock.add_file(2, vec![0u8; 50]);
450
451 let (file_num, len) =
452 LastFileReader::find_last_good_file(&mock).unwrap();
453 assert_eq!(file_num, 2);
454 assert_eq!(len, 50);
455 }
456
457 #[test]
458 fn test_last_file_reader_no_files() {
459 let mock = MockFileAccess::new();
460 let result = LastFileReader::new(mock, 1024);
461 assert!(result.is_err());
463 }
464
465 #[test]
466 fn test_last_file_reader_single_file() {
467 let mut mock = MockFileAccess::new();
468 mock.add_file(0, vec![0u8; 100]);
469 let result = LastFileReader::new(mock, 1024);
470 assert!(result.is_ok());
471 }
472
473 #[test]
474 fn test_find_last_good_file_single_file() {
475 let mut mock = MockFileAccess::new();
476 mock.add_file(0, vec![0u8; 42]);
477 let (file_num, len) =
478 LastFileReader::find_last_good_file(&mock).unwrap();
479 assert_eq!(file_num, 0);
480 assert_eq!(len, 42);
481 }
482
483 #[test]
484 fn test_find_last_good_file_empty_file_skipped() {
485 let mut mock = MockFileAccess::new();
486 mock.add_file(0, vec![0u8; 100]);
487 mock.add_file(1, vec![]);
489 let (file_num, len) =
490 LastFileReader::find_last_good_file(&mock).unwrap();
491 assert_eq!(file_num, 0);
492 assert_eq!(len, 100);
493 }
494
495 #[test]
496 fn test_last_file_reader_set_target_type() {
497 let mut mock = MockFileAccess::new();
498 mock.add_file(0, vec![0u8; 64]);
499 let mut reader = LastFileReader::new(mock, 64).unwrap();
500
501 reader.set_target_type(1);
502 reader.set_target_type(2);
503 assert!(reader.get_last_seen(1).is_null());
505 assert!(reader.get_last_seen(255).is_null());
506 }
507
508 #[test]
509 fn test_last_file_reader_initial_offsets() {
510 let mut mock = MockFileAccess::new();
511 mock.add_file(0, vec![0u8; 128]);
512 let reader = LastFileReader::new(mock, 64).unwrap();
513
514 assert_eq!(reader.get_prev_offset(), 0);
515 assert_eq!(reader.get_entry_type(), 0);
516 let eol = reader.get_end_of_log();
518 assert_eq!(eol.file_number(), 0);
519 }
520
521 #[test]
522 fn test_last_file_reader_read_entry() {
523 let mut mock = MockFileAccess::new();
525 mock.add_file(0, vec![0u8; 14]);
526 let mut reader = LastFileReader::new(mock, 64).unwrap();
527
528 let result = reader.read_next_entry();
529 assert!(matches!(result, Ok(true)));
530 assert_eq!(reader.get_entry_type(), 0);
531 }
532
533 #[test]
534 fn test_last_file_reader_read_entry_updates_valid_lsn() {
535 let mut mock = MockFileAccess::new();
536 mock.add_file(0, vec![0u8; 14]);
537 let mut reader = LastFileReader::new(mock, 64).unwrap();
538
539 reader.read_next_entry().unwrap();
540 let valid_lsn = reader.get_last_valid_lsn();
541 assert_eq!(valid_lsn.file_number(), 0);
542 }
543
544 #[test]
545 fn test_last_file_reader_read_entry_updates_end_of_log() {
546 let mut mock = MockFileAccess::new();
547 mock.add_file(0, vec![0u8; 14]);
548 let mut reader = LastFileReader::new(mock, 64).unwrap();
549
550 reader.read_next_entry().unwrap();
551 let eol = reader.get_end_of_log();
552 assert_eq!(eol.file_number(), 0);
553 let _ = eol.file_offset();
555 }
556
557 #[test]
558 fn test_last_file_reader_tracks_target_type() {
559 let mut mock = MockFileAccess::new();
562 let data = vec![0u8; 28]; mock.add_file(0, data);
564 let mut reader = LastFileReader::new(mock, 64).unwrap();
565 reader.set_target_type(0); reader.read_next_entry().unwrap();
568 reader.read_next_entry().unwrap();
569
570 let lsn = reader.get_last_seen(0);
572 assert!(!lsn.is_null());
573 }
574
575 #[test]
576 fn test_last_file_reader_untracked_type_returns_null() {
577 let mut mock = MockFileAccess::new();
578 mock.add_file(0, vec![0u8; 14]); let mut reader = LastFileReader::new(mock, 64).unwrap();
580 reader.read_next_entry().unwrap();
583 assert!(reader.get_last_seen(5).is_null());
585 }
586
587 #[test]
588 fn test_last_file_reader_read_until_eof() {
589 let mut mock = MockFileAccess::new();
590 mock.add_file(0, vec![0u8; 14]);
591 let mut reader = LastFileReader::new(mock, 64).unwrap();
592
593 assert!(matches!(reader.read_next_entry(), Ok(true)));
594 let result = reader.read_next_entry();
595 assert!(matches!(result, Ok(false)) || result.is_err());
597 }
598
599 #[test]
613 fn test_l14_found_committed_txn_after_corruption_halts() {
614 let mut data = Vec::new();
615 data.extend_from_slice(&build_valid_entry(0, b"good"));
616 data.extend_from_slice(&build_corrupt_entry(0, b"bad!"));
617 data.extend_from_slice(&build_valid_entry(30, b"commit"));
619
620 let mut mock = MockFileAccess::new();
621 mock.add_file(0, data);
622 let mut reader = LastFileReader::new(mock, 256).unwrap();
623 reader.set_halt_on_commit_after_checksum(true);
624
625 assert!(matches!(reader.read_next_entry(), Ok(true)));
627 let result = reader.read_next_entry();
630 assert!(
631 matches!(result, Err(NoxuLogError::FoundCommittedTxn { .. })),
632 "expected FoundCommittedTxn, got {result:?}"
633 );
634 }
635
636 #[test]
640 fn test_l14_torn_tail_no_commit_truncates_when_enabled() {
641 let mut data = Vec::new();
642 data.extend_from_slice(&build_valid_entry(0, b"good"));
643 data.extend_from_slice(&build_corrupt_entry(0, b"bad!"));
644 data.extend_from_slice(&build_valid_entry(0, b"more"));
646
647 let mut mock = MockFileAccess::new();
648 mock.add_file(0, data);
649 let mut reader = LastFileReader::new(mock, 256).unwrap();
650 reader.set_halt_on_commit_after_checksum(true);
651
652 assert!(matches!(reader.read_next_entry(), Ok(true)));
653 assert!(matches!(reader.read_next_entry(), Ok(false)));
655 }
656
657 #[test]
660 fn test_l14_disabled_keeps_truncate_even_with_commit_after() {
661 let mut data = Vec::new();
662 data.extend_from_slice(&build_valid_entry(0, b"good"));
663 data.extend_from_slice(&build_corrupt_entry(0, b"bad!"));
664 data.extend_from_slice(&build_valid_entry(30, b"commit"));
665
666 let mut mock = MockFileAccess::new();
667 mock.add_file(0, data);
668 let mut reader = LastFileReader::new(mock, 256).unwrap();
669 assert!(matches!(reader.read_next_entry(), Ok(true)));
672 assert!(matches!(reader.read_next_entry(), Ok(false)));
674 }
675}