1use crate::checksum::ChecksumValidator;
9use crate::entry_header::CHECKSUM_BYTES;
10use crate::error::{NoxuLogError, Result};
11use noxu_util::lsn::{Lsn, NULL_LSN};
12
13pub trait LogFileAccess {
18 fn read_from_file(
23 &self,
24 file_num: u32,
25 offset: u64,
26 buf: &mut [u8],
27 ) -> Result<usize>;
28
29 fn get_file_length(&self, file_num: u32) -> Result<u64>;
31
32 fn get_first_file_num(&self) -> Option<u32>;
34
35 fn get_following_file_num(
39 &self,
40 file_num: u32,
41 forward: bool,
42 ) -> Option<u32>;
43
44 fn get_file_header_prev_offset(&self, file_num: u32) -> Result<u64>;
46}
47
48const VLSN_PRESENT_MASK: u8 = 0x08;
50
51const REPLICATED_MASK: u8 = 0x20;
53
54const MAX_HEADER_SIZE: usize = 22;
56
57#[derive(Debug, Clone)]
64pub struct LogEntryHeader {
65 pub entry_type: u8,
67 pub version: u8,
69 pub prev_offset: u64,
71 pub header_size: usize,
73 pub item_size: usize,
75 pub checksum: u32,
77 pub replicated: bool,
79}
80
81impl LogEntryHeader {
82 pub const MIN_HEADER_SIZE: usize = 14;
84
85 pub fn entry_size(&self) -> usize {
87 self.header_size + self.item_size
88 }
89
90 pub fn is_variable_length(&self) -> bool {
92 self.header_size > Self::MIN_HEADER_SIZE
93 }
94
95 pub fn variable_portion_size(&self) -> usize {
97 self.header_size - Self::MIN_HEADER_SIZE
98 }
99
100 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
115 use crate::error::NoxuLogError;
116 use noxu_util::lsn::NULL_LSN;
117
118 if buf.len() < Self::MIN_HEADER_SIZE {
119 return Err(NoxuLogError::UnexpectedEof {
120 lsn: NULL_LSN,
121 message: format!(
122 "header buffer too short: {} < {}",
123 buf.len(),
124 Self::MIN_HEADER_SIZE
125 ),
126 });
127 }
128
129 let checksum = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
130 let entry_type = buf[4];
131 let flags = buf[5];
132 let prev_offset =
133 u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]) as u64;
134 let item_size =
135 u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]) as usize;
136
137 if item_size > crate::MAX_ITEM_SIZE {
141 return Err(NoxuLogError::InvalidEntrySize {
142 lsn: NULL_LSN,
143 size: item_size as i32,
144 });
145 }
146
147 let vlsn_present =
148 (flags & VLSN_PRESENT_MASK) != 0 || (flags & REPLICATED_MASK) != 0;
149 let replicated = (flags & REPLICATED_MASK) != 0;
150
151 if vlsn_present && buf.len() < MAX_HEADER_SIZE {
152 return Err(NoxuLogError::UnexpectedEof {
153 lsn: NULL_LSN,
154 message: format!(
155 "VLSN flag set but header buffer only {} bytes (need {})",
156 buf.len(),
157 MAX_HEADER_SIZE
158 ),
159 });
160 }
161
162 if vlsn_present {
170 let raw_vlsn =
171 i64::from_le_bytes(buf[14..22].try_into().unwrap_or([0u8; 8]));
172 if raw_vlsn == 0 || raw_vlsn == i64::MAX || raw_vlsn == -1 {
173 log::error!(
174 "FileReader::LogEntryHeader::from_bytes: implausible \
175 VLSN bytes {:#018x} with vlsn_present flag set; \
176 treating as corruption / end of log",
177 raw_vlsn,
178 );
179 return Err(NoxuLogError::UnexpectedEof {
180 lsn: NULL_LSN,
181 message: format!(
182 "implausible VLSN value {:#018x}",
183 raw_vlsn
184 ),
185 });
186 }
187 }
188
189 let header_size =
190 if vlsn_present { MAX_HEADER_SIZE } else { Self::MIN_HEADER_SIZE };
191
192 Ok(LogEntryHeader {
193 entry_type,
194 version: 0, prev_offset,
196 header_size,
197 item_size,
198 checksum,
199 replicated,
200 })
201 }
202}
203
204pub struct FileReader<F: LogFileAccess> {
209 file_access: F,
211
212 forward: bool,
214
215 current_entry_lsn: Lsn,
217
218 next_entry_lsn: Lsn,
220
221 start_lsn: Lsn,
223
224 finish_lsn: Lsn,
226
227 read_buffer: Vec<u8>,
229
230 read_buffer_size: usize,
232
233 buffer_offset: usize,
235
236 buffer_length: usize,
238
239 current_file_num: u32,
241
242 current_file_offset: u64,
244
245 validate_checksum: bool,
247
248 current_entry_header: Option<LogEntryHeader>,
250
251 current_entry_prev_offset: u64,
253
254 current_entry_offset: u64,
256
257 next_entry_offset: u64,
259
260 entries_read: u64,
262
263 eof: bool,
265
266 save_buffer: Vec<u8>,
268}
269
270impl<F: LogFileAccess> FileReader<F> {
271 pub fn new(
282 file_access: F,
283 forward: bool,
284 start_lsn: Lsn,
285 end_of_file_lsn: Lsn,
286 finish_lsn: Lsn,
287 read_buffer_size: usize,
288 validate_checksum: bool,
289 ) -> Result<Self> {
290 let mut reader = FileReader {
291 file_access,
292 forward,
293 current_entry_lsn: NULL_LSN,
294 next_entry_lsn: NULL_LSN,
295 start_lsn,
296 finish_lsn,
297 read_buffer: vec![0u8; read_buffer_size],
298 read_buffer_size,
299 buffer_offset: 0,
300 buffer_length: 0,
301 current_file_num: 0,
302 current_file_offset: 0,
303 validate_checksum,
304 current_entry_header: None,
305 current_entry_prev_offset: 0,
306 current_entry_offset: 0,
307 next_entry_offset: 0,
308 entries_read: 0,
309 eof: false,
310 save_buffer: Vec::with_capacity(read_buffer_size),
311 };
312
313 reader.init_starting_position(start_lsn, end_of_file_lsn)?;
314 Ok(reader)
315 }
316
317 fn init_starting_position(
319 &mut self,
320 start_lsn: Lsn,
321 end_of_file_lsn: Lsn,
322 ) -> Result<()> {
323 self.eof = false;
324
325 if self.forward {
326 if !start_lsn.is_null() {
328 self.current_file_num = start_lsn.file_number();
329 self.current_file_offset = start_lsn.file_offset() as u64;
330 self.next_entry_offset = start_lsn.file_offset() as u64;
331 } else {
332 if let Some(first_file) = self.file_access.get_first_file_num()
334 {
335 self.current_file_num = first_file;
336 self.current_file_offset = 0;
337 self.next_entry_offset = 0;
338 } else {
339 self.eof = true;
340 }
341 }
342 } else {
343 assert!(
345 !start_lsn.is_null(),
346 "start_lsn must be valid for backward reading"
347 );
348 assert!(
349 !end_of_file_lsn.is_null(),
350 "end_of_file_lsn must be valid for backward reading"
351 );
352
353 self.current_file_num = end_of_file_lsn.file_number();
354 self.current_file_offset = end_of_file_lsn.file_offset() as u64;
355 self.current_entry_offset = end_of_file_lsn.file_offset() as u64;
356
357 if start_lsn.file_number() == end_of_file_lsn.file_number() {
359 self.current_entry_prev_offset = start_lsn.file_offset() as u64;
360 } else {
361 self.current_entry_prev_offset = 0;
362 }
363 }
364
365 Ok(())
366 }
367
368 pub fn read_next_entry(&mut self) -> Result<bool> {
372 while !self.eof {
373 self.get_log_entry_in_buffer()?;
375
376 let header_buf =
378 self.read_data(LogEntryHeader::MIN_HEADER_SIZE, true)?;
379 let header = LogEntryHeader::from_bytes(header_buf)?;
380
381 if self.forward {
383 self.current_entry_offset = self.next_entry_offset;
384 self.next_entry_offset += header.entry_size() as u64;
385 }
386
387 self.current_entry_header = Some(header.clone());
388 self.current_entry_prev_offset = header.prev_offset;
389
390 if !self.is_target_entry()? {
392 continue;
394 }
395
396 let item_size = header.item_size;
399 let entry_data: Vec<u8> = self.read_data(item_size, true)?.to_vec();
400
401 if self.validate_checksum && header.checksum != 0 {
411 let header_size = header.header_size;
412 let total_size = header_size + item_size;
413
414 let mut full_entry = vec![0u8; total_size];
423 let n = self.file_access.read_from_file(
424 self.current_file_num,
425 self.current_entry_offset,
426 &mut full_entry,
427 )?;
428 let _ = &entry_data;
431 let _ = header_size;
432 let computed = if n >= total_size {
433 full_entry[5] &= !0x10u8;
437 ChecksumValidator::compute_range(
438 &full_entry,
439 CHECKSUM_BYTES,
440 total_size - CHECKSUM_BYTES,
441 )
442 } else {
443 header.checksum.wrapping_add(1)
446 };
447 if computed != header.checksum {
448 let lsn = Lsn::new(
449 self.current_file_num,
450 self.current_entry_offset as u32,
451 );
452 self.eof = true;
453 return Err(NoxuLogError::Checksum {
454 lsn,
455 message: format!(
456 "expected {:#010x}, computed {:#010x}",
457 header.checksum, computed
458 ),
459 });
460 }
461 }
462
463 if self.process_entry()? {
465 self.entries_read += 1;
466 return Ok(true);
467 }
468 }
469
470 Ok(false)
471 }
472
473 fn get_log_entry_in_buffer(&mut self) -> Result<()> {
475 if self.forward {
476 self.set_forward_position()?;
477 } else {
478 self.set_backward_position()?;
479 }
480 Ok(())
481 }
482
483 fn set_forward_position(&mut self) -> Result<()> {
485 if !self.finish_lsn.is_null() {
487 let next_lsn =
488 Lsn::new(self.current_file_num, self.next_entry_offset as u32);
489 if next_lsn >= self.finish_lsn {
490 self.eof = true;
491 return Err(NoxuLogError::UnexpectedEof {
492 lsn: next_lsn,
493 message: "Reached finish LSN".to_string(),
494 });
495 }
496 }
497 Ok(())
498 }
499
500 fn set_backward_position(&mut self) -> Result<()> {
502 if self.current_entry_prev_offset != 0
504 && self.buffer_contains_offset(self.current_entry_prev_offset)
505 {
506 self.position_buffer(self.current_entry_prev_offset);
508 } else {
509 if self.current_entry_prev_offset == 0 {
511 let prev_offset = self
513 .file_access
514 .get_file_header_prev_offset(self.current_file_num)?;
515 let prev_file = self
516 .file_access
517 .get_following_file_num(self.current_file_num, false)
518 .ok_or_else(|| NoxuLogError::UnexpectedEof {
519 lsn: Lsn::new(self.current_file_num, 0),
520 message: "No previous file".to_string(),
521 })?;
522
523 self.current_entry_prev_offset = prev_offset;
524 self.current_file_num = prev_file;
525 }
526
527 self.fill_buffer_at(self.current_entry_prev_offset)?;
529 }
530
531 self.current_entry_offset = self.current_entry_prev_offset;
532
533 if !self.finish_lsn.is_null() {
535 let next_lsn = Lsn::new(
536 self.current_file_num,
537 self.current_entry_prev_offset as u32,
538 );
539 if next_lsn < self.finish_lsn {
540 self.eof = true;
541 return Err(NoxuLogError::UnexpectedEof {
542 lsn: next_lsn,
543 message: "Reached finish LSN (backward)".to_string(),
544 });
545 }
546 }
547
548 Ok(())
549 }
550
551 fn read_data(
555 &mut self,
556 amount: usize,
557 _collect_data: bool,
558 ) -> Result<&[u8]> {
559 let mut already_read = 0;
560
561 while already_read < amount && !self.eof {
562 let bytes_available = self.buffer_length - self.buffer_offset;
563
564 if bytes_available > 0 {
565 let bytes_needed = amount - already_read;
566 let bytes_to_copy = bytes_available.min(bytes_needed);
567
568 if already_read > 0 {
569 let start = self.buffer_offset;
571 let end = self.buffer_offset + bytes_to_copy;
572 self.save_buffer
573 .extend_from_slice(&self.read_buffer[start..end]);
574 self.buffer_offset = end;
575 already_read += bytes_to_copy;
576 } else {
577 if bytes_available >= bytes_needed {
579 let start = self.buffer_offset;
580 let end = start + bytes_needed;
581 self.buffer_offset = end;
582 return Ok(&self.read_buffer[start..end]);
583 } else {
584 let start = self.buffer_offset;
586 let end = self.buffer_offset + bytes_available;
587 self.save_buffer.clear();
588 self.save_buffer
589 .extend_from_slice(&self.read_buffer[start..end]);
590 self.buffer_offset = end;
591 already_read += bytes_available;
592 }
593 }
594 } else {
595 self.fill_next_buffer()?;
597 }
598 }
599
600 if already_read < amount {
601 let lsn = Lsn::new(
602 self.current_file_num,
603 self.current_entry_offset as u32,
604 );
605 return Err(NoxuLogError::UnexpectedEof {
606 lsn,
607 message: format!("Need {} bytes, got {}", amount, already_read),
608 });
609 }
610
611 Ok(&self.save_buffer[..amount])
612 }
613
614 fn fill_next_buffer(&mut self) -> Result<()> {
616 self.current_file_offset += self.buffer_length as u64;
618
619 let file_len =
621 self.file_access.get_file_length(self.current_file_num)?;
622 if self.current_file_offset >= file_len {
623 if let Some(next_file) = self
625 .file_access
626 .get_following_file_num(self.current_file_num, true)
627 {
628 self.current_file_num = next_file;
629 self.current_file_offset = 0;
630 self.next_entry_offset = 0;
631 } else {
632 self.eof = true;
633 let lsn = Lsn::new(
634 self.current_file_num,
635 self.current_file_offset as u32,
636 );
637 return Err(NoxuLogError::UnexpectedEof {
638 lsn,
639 message: "No next file".to_string(),
640 });
641 }
642 }
643
644 let bytes_read = self.file_access.read_from_file(
646 self.current_file_num,
647 self.current_file_offset,
648 &mut self.read_buffer,
649 )?;
650
651 if bytes_read == 0 {
652 self.eof = true;
653 let lsn = Lsn::new(
654 self.current_file_num,
655 self.current_file_offset as u32,
656 );
657 return Err(NoxuLogError::UnexpectedEof {
658 lsn,
659 message: "File read returned 0 bytes".to_string(),
660 });
661 }
662
663 self.buffer_offset = 0;
664 self.buffer_length = bytes_read;
665
666 Ok(())
667 }
668
669 fn fill_buffer_at(&mut self, offset: u64) -> Result<()> {
671 self.current_file_offset = offset;
672 self.buffer_offset = 0;
673 self.buffer_length = 0;
674 self.fill_next_buffer()
675 }
676
677 fn buffer_contains_offset(&self, offset: u64) -> bool {
679 offset >= self.current_file_offset
680 && offset < self.current_file_offset + self.buffer_length as u64
681 }
682
683 fn position_buffer(&mut self, offset: u64) {
685 assert!(self.buffer_contains_offset(offset));
686 self.buffer_offset = (offset - self.current_file_offset) as usize;
687 }
688
689 fn is_target_entry(&self) -> Result<bool> {
694 Ok(true)
695 }
696
697 fn process_entry(&mut self) -> Result<bool> {
702 Ok(true)
703 }
704
705 pub fn get_current_entry_lsn(&self) -> Lsn {
707 Lsn::new(self.current_file_num, self.current_entry_offset as u32)
708 }
709
710 pub fn get_current_entry_header(&self) -> Option<&LogEntryHeader> {
712 self.current_entry_header.as_ref()
713 }
714
715 pub fn get_num_read(&self) -> u64 {
717 self.entries_read
718 }
719
720 pub fn get_last_entry_size(&self) -> usize {
722 self.current_entry_header.as_ref().map(|h| h.entry_size()).unwrap_or(0)
723 }
724
725 pub fn entry_is_replicated(&self) -> bool {
727 self.current_entry_header
728 .as_ref()
729 .map(|h| h.replicated)
730 .unwrap_or(false)
731 }
732
733 pub fn next_entry_offset(&self) -> u64 {
740 self.next_entry_offset
741 }
742
743 pub fn current_item_size(&self) -> usize {
747 self.current_entry_header.as_ref().map(|h| h.item_size).unwrap_or(0)
748 }
749
750 pub fn resume_forward_at(&mut self, offset: u64) -> Result<()> {
760 assert!(self.forward, "resume_forward_at is forward-mode only");
761 self.eof = false;
762 self.next_entry_offset = offset;
763 self.current_entry_offset = offset;
764 self.fill_buffer_at(offset)
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772 use std::collections::HashMap;
773 use std::io;
774
775 struct MockFileAccess {
777 files: HashMap<u32, Vec<u8>>,
778 }
779
780 impl MockFileAccess {
781 fn new() -> Self {
782 MockFileAccess { files: HashMap::new() }
783 }
784
785 fn add_file(&mut self, file_num: u32, data: Vec<u8>) {
786 self.files.insert(file_num, data);
787 }
788 }
789
790 impl LogFileAccess for MockFileAccess {
791 fn read_from_file(
792 &self,
793 file_num: u32,
794 offset: u64,
795 buf: &mut [u8],
796 ) -> Result<usize> {
797 if let Some(data) = self.files.get(&file_num) {
798 let start = offset as usize;
799 if start >= data.len() {
800 return Ok(0);
801 }
802 let end = (start + buf.len()).min(data.len());
803 let bytes_to_copy = end - start;
804 buf[..bytes_to_copy].copy_from_slice(&data[start..end]);
805 Ok(bytes_to_copy)
806 } else {
807 Err(io::Error::new(io::ErrorKind::NotFound, "File not found")
808 .into())
809 }
810 }
811
812 fn get_file_length(&self, file_num: u32) -> Result<u64> {
813 Ok(self
814 .files
815 .get(&file_num)
816 .map(|data| data.len() as u64)
817 .ok_or_else(|| {
818 io::Error::new(io::ErrorKind::NotFound, "File not found")
819 })?)
820 }
821
822 fn get_first_file_num(&self) -> Option<u32> {
823 self.files.keys().min().copied()
824 }
825
826 fn get_following_file_num(
827 &self,
828 file_num: u32,
829 forward: bool,
830 ) -> Option<u32> {
831 let mut file_nums: Vec<u32> = self.files.keys().copied().collect();
832 file_nums.sort();
833
834 if forward {
835 file_nums.iter().find(|&&n| n > file_num).copied()
836 } else {
837 file_nums.iter().rev().find(|&&n| n < file_num).copied()
838 }
839 }
840
841 fn get_file_header_prev_offset(&self, _file_num: u32) -> Result<u64> {
842 Ok(0)
843 }
844 }
845
846 #[test]
847 fn test_mock_file_access() {
848 let mut mock = MockFileAccess::new();
849 mock.add_file(0, vec![1, 2, 3, 4, 5]);
850
851 let mut buf = [0u8; 3];
852 let n = mock.read_from_file(0, 1, &mut buf).unwrap();
853 assert_eq!(n, 3);
854 assert_eq!(&buf, &[2, 3, 4]);
855 }
856
857 #[test]
858 fn test_file_reader_creation() {
859 let mock = MockFileAccess::new();
860 let start_lsn = Lsn::new(0, 0);
861 let result = FileReader::new(
862 mock, true, start_lsn, NULL_LSN, NULL_LSN, 1024, true,
863 );
864
865 assert!(result.is_ok());
867 }
868
869 #[test]
870 fn test_file_reader_creation_with_null_lsn() {
871 let mock = MockFileAccess::new();
873 let result = FileReader::new(
874 mock, true, NULL_LSN, NULL_LSN, NULL_LSN, 512, false,
875 );
876 assert!(result.is_ok());
877 }
878
879 #[test]
880 fn test_file_reader_creation_with_files_forward() {
881 let mut mock = MockFileAccess::new();
882 mock.add_file(0, vec![0u8; 128]);
883 let start_lsn = Lsn::new(0, 0);
884 let result = FileReader::new(
885 mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
886 );
887 assert!(result.is_ok());
888 }
889
890 #[test]
891 fn test_file_reader_get_current_entry_lsn_initial() {
892 let mut mock = MockFileAccess::new();
893 mock.add_file(0, vec![0u8; 128]);
894 let start_lsn = Lsn::new(0, 0);
895 let reader = FileReader::new(
896 mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
897 )
898 .unwrap();
899
900 let lsn = reader.get_current_entry_lsn();
901 assert_eq!(lsn.file_number(), 0);
903 }
904
905 #[test]
906 fn test_file_reader_get_current_entry_header_none_initially() {
907 let mut mock = MockFileAccess::new();
908 mock.add_file(0, vec![0u8; 64]);
909 let start_lsn = Lsn::new(0, 0);
910 let reader = FileReader::new(
911 mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
912 )
913 .unwrap();
914
915 assert!(reader.get_current_entry_header().is_none());
916 }
917
918 #[test]
919 fn test_file_reader_get_num_read_initial() {
920 let mut mock = MockFileAccess::new();
921 mock.add_file(0, vec![0u8; 64]);
922 let start_lsn = Lsn::new(0, 0);
923 let reader = FileReader::new(
924 mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
925 )
926 .unwrap();
927
928 assert_eq!(reader.get_num_read(), 0);
929 }
930
931 #[test]
932 fn test_file_reader_get_last_entry_size_no_header() {
933 let mut mock = MockFileAccess::new();
934 mock.add_file(0, vec![0u8; 64]);
935 let start_lsn = Lsn::new(0, 0);
936 let reader = FileReader::new(
937 mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
938 )
939 .unwrap();
940
941 assert_eq!(reader.get_last_entry_size(), 0);
942 }
943
944 #[test]
945 fn test_file_reader_entry_is_replicated_initial() {
946 let mut mock = MockFileAccess::new();
947 mock.add_file(0, vec![0u8; 64]);
948 let start_lsn = Lsn::new(0, 0);
949 let reader = FileReader::new(
950 mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
951 )
952 .unwrap();
953
954 assert!(!reader.entry_is_replicated());
955 }
956
957 #[test]
958 fn test_file_reader_read_next_entry_eof_no_files() {
959 let mock = MockFileAccess::new();
960 let mut reader = FileReader::new(
961 mock, true, NULL_LSN, NULL_LSN, NULL_LSN, 64, false,
962 )
963 .unwrap();
964
965 let result = reader.read_next_entry();
966 assert!(matches!(result, Ok(false)));
968 }
969
970 #[test]
971 fn test_file_reader_read_next_entry_small_file() {
972 let mut mock = MockFileAccess::new();
974 mock.add_file(0, vec![0u8; 8]); let start_lsn = Lsn::new(0, 0);
976 let mut reader = FileReader::new(
977 mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
978 )
979 .unwrap();
980
981 let result = reader.read_next_entry();
983 assert!(result.is_err() || matches!(result, Ok(false)));
984 }
985
986 #[test]
987 fn test_file_reader_read_next_entry_exactly_header_size() {
988 let mut mock = MockFileAccess::new();
990 mock.add_file(0, vec![0u8; LogEntryHeader::MIN_HEADER_SIZE]);
992 let start_lsn = Lsn::new(0, 0);
993 let mut reader = FileReader::new(
994 mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
995 )
996 .unwrap();
997
998 let result = reader.read_next_entry();
999 assert!(matches!(result, Ok(true)));
1001 assert_eq!(reader.get_num_read(), 1);
1002 }
1003
1004 #[test]
1005 fn test_file_reader_read_next_entry_multiple() {
1006 let mut mock = MockFileAccess::new();
1008 let data = vec![0u8; LogEntryHeader::MIN_HEADER_SIZE * 2];
1009 mock.add_file(0, data);
1010 let start_lsn = Lsn::new(0, 0);
1011 let mut reader = FileReader::new(
1012 mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
1013 )
1014 .unwrap();
1015
1016 assert!(matches!(reader.read_next_entry(), Ok(true)));
1017 assert!(matches!(reader.read_next_entry(), Ok(true)));
1018 assert_eq!(reader.get_num_read(), 2);
1019 }
1020
1021 #[test]
1022 fn test_file_reader_finish_lsn_stops_reading() {
1023 let mut mock = MockFileAccess::new();
1024 let data = vec![0u8; LogEntryHeader::MIN_HEADER_SIZE * 4];
1025 mock.add_file(0, data);
1026 let start_lsn = Lsn::new(0, 0);
1027 let finish_lsn = Lsn::new(0, LogEntryHeader::MIN_HEADER_SIZE as u32);
1029 let mut reader = FileReader::new(
1030 mock, true, start_lsn, NULL_LSN, finish_lsn, 64, false,
1031 )
1032 .unwrap();
1033
1034 assert!(matches!(reader.read_next_entry(), Ok(true)));
1036 let result = reader.read_next_entry();
1038 assert!(result.is_err() || matches!(result, Ok(false)));
1039 }
1040
1041 #[test]
1042 fn test_file_reader_header_methods() {
1043 let hdr = LogEntryHeader::from_bytes(&[0u8; 14]).unwrap();
1044 assert_eq!(hdr.header_size, LogEntryHeader::MIN_HEADER_SIZE);
1045 assert_eq!(hdr.item_size, 0);
1046 assert_eq!(hdr.entry_size(), LogEntryHeader::MIN_HEADER_SIZE);
1047 assert!(!hdr.is_variable_length());
1048 assert_eq!(hdr.variable_portion_size(), 0);
1049 assert!(!hdr.replicated);
1050 }
1051
1052 #[test]
1053 fn test_file_reader_spans_two_files() {
1054 let mut mock = MockFileAccess::new();
1056 mock.add_file(0, vec![0u8; LogEntryHeader::MIN_HEADER_SIZE]);
1057 mock.add_file(1, vec![0u8; LogEntryHeader::MIN_HEADER_SIZE]);
1058 let start_lsn = Lsn::new(0, 0);
1059 let mut reader = FileReader::new(
1060 mock, true, start_lsn, NULL_LSN, NULL_LSN, 8, false,
1061 )
1062 .unwrap();
1063
1064 assert!(matches!(reader.read_next_entry(), Ok(true)));
1066 let _ = reader.read_next_entry(); }
1070
1071 #[test]
1072 fn test_mock_file_access_read_out_of_bounds() {
1073 let mut mock = MockFileAccess::new();
1074 mock.add_file(0, vec![1, 2, 3]);
1075 let mut buf = [0u8; 5];
1076 let n = mock.read_from_file(0, 1, &mut buf).unwrap();
1078 assert_eq!(n, 2);
1079 assert_eq!(buf[0], 2);
1080 assert_eq!(buf[1], 3);
1081 }
1082
1083 #[test]
1084 fn test_mock_file_access_read_at_end() {
1085 let mut mock = MockFileAccess::new();
1086 mock.add_file(0, vec![1, 2, 3]);
1087 let mut buf = [0u8; 5];
1088 let n = mock.read_from_file(0, 3, &mut buf).unwrap();
1089 assert_eq!(n, 0);
1090 }
1091
1092 #[test]
1093 fn test_mock_file_access_missing_file() {
1094 let mock = MockFileAccess::new();
1095 let mut buf = [0u8; 4];
1096 assert!(mock.read_from_file(99, 0, &mut buf).is_err());
1097 assert!(mock.get_file_length(99).is_err());
1098 }
1099
1100 #[test]
1101 fn test_mock_file_access_following_file_backward() {
1102 let mut mock = MockFileAccess::new();
1103 mock.add_file(0, vec![0u8; 1]);
1104 mock.add_file(1, vec![0u8; 1]);
1105 mock.add_file(2, vec![0u8; 1]);
1106 assert_eq!(mock.get_following_file_num(2, false), Some(1));
1107 assert_eq!(mock.get_following_file_num(0, false), None);
1108 }
1109
1110 #[test]
1111 fn test_mock_file_access_file_header_prev_offset() {
1112 let mock = MockFileAccess::new();
1113 assert_eq!(mock.get_file_header_prev_offset(0).unwrap(), 0);
1114 }
1115
1116 #[test]
1123 fn test_from_bytes_parses_fields() {
1124 let checksum: u32 = 0x1234_5678;
1125 let entry_type: u8 = 5;
1126 let flags: u8 = 0x00; let prev_offset: u32 = 0xAABB_CCDD;
1128 let item_size: u32 = 42;
1129
1130 let mut buf = [0u8; 14];
1131 buf[0..4].copy_from_slice(&checksum.to_le_bytes());
1132 buf[4] = entry_type;
1133 buf[5] = flags;
1134 buf[6..10].copy_from_slice(&prev_offset.to_le_bytes());
1135 buf[10..14].copy_from_slice(&item_size.to_le_bytes());
1136
1137 let hdr = LogEntryHeader::from_bytes(&buf).unwrap();
1138 assert_eq!(hdr.checksum, checksum);
1139 assert_eq!(hdr.entry_type, entry_type);
1140 assert_eq!(hdr.prev_offset, prev_offset as u64);
1141 assert_eq!(hdr.item_size, item_size as usize);
1142 assert_eq!(hdr.header_size, LogEntryHeader::MIN_HEADER_SIZE);
1143 assert!(!hdr.replicated);
1144 assert!(!hdr.is_variable_length());
1145 assert_eq!(hdr.variable_portion_size(), 0);
1146 assert_eq!(hdr.entry_size(), 14 + 42);
1147 }
1148
1149 #[test]
1152 fn test_from_bytes_with_vlsn_present_flag() {
1153 let mut buf = [0u8; 22];
1154 buf[5] = VLSN_PRESENT_MASK; buf[10..14].copy_from_slice(&(10u32).to_le_bytes()); buf[14..22].copy_from_slice(&(7i64).to_le_bytes());
1159
1160 let hdr = LogEntryHeader::from_bytes(&buf).unwrap();
1161 assert_eq!(hdr.header_size, 22);
1162 assert!(hdr.is_variable_length());
1163 assert_eq!(hdr.variable_portion_size(), 8);
1164 assert!(!hdr.replicated);
1165 }
1166
1167 #[test]
1170 fn test_from_bytes_with_replicated_flag() {
1171 let mut buf = [0u8; 22];
1172 buf[5] = REPLICATED_MASK;
1173 buf[10..14].copy_from_slice(&(0u32).to_le_bytes());
1174 buf[14..22].copy_from_slice(&(11i64).to_le_bytes());
1177
1178 let hdr = LogEntryHeader::from_bytes(&buf).unwrap();
1179 assert_eq!(hdr.header_size, 22);
1180 assert!(hdr.replicated);
1181 }
1182
1183 #[test]
1186 fn test_from_bytes_rejects_implausible_vlsn_sentinel() {
1187 for sentinel in [0i64, i64::MAX, -1i64] {
1188 let mut buf = [0u8; 22];
1189 buf[5] = VLSN_PRESENT_MASK;
1190 buf[10..14].copy_from_slice(&(0u32).to_le_bytes());
1191 buf[14..22].copy_from_slice(&sentinel.to_le_bytes());
1192
1193 let result = LogEntryHeader::from_bytes(&buf);
1194 assert!(
1195 result.is_err(),
1196 "expected error for sentinel VLSN {sentinel:#018x}"
1197 );
1198 }
1199 }
1200
1201 #[test]
1204 fn test_from_bytes_rejects_oversized_item_size() {
1205 let mut buf = [0u8; 14];
1206 let oversize: u32 = (crate::MAX_ITEM_SIZE as u32) + 1;
1209 buf[10..14].copy_from_slice(&oversize.to_le_bytes());
1210
1211 let result = LogEntryHeader::from_bytes(&buf);
1212 assert!(
1213 matches!(
1214 result,
1215 Err(crate::error::NoxuLogError::InvalidEntrySize { .. })
1216 ),
1217 "expected InvalidEntrySize, got {result:?}",
1218 );
1219 }
1220
1221 #[test]
1223 fn test_from_bytes_buffer_too_short() {
1224 for len in 0..14usize {
1225 let buf = vec![0u8; len];
1226 assert!(
1227 LogEntryHeader::from_bytes(&buf).is_err(),
1228 "expected error for {}-byte buffer",
1229 len
1230 );
1231 }
1232 }
1233
1234 #[test]
1237 fn test_from_bytes_vlsn_flag_but_buffer_too_short() {
1238 let mut buf = [0u8; 14];
1239 buf[5] = VLSN_PRESENT_MASK;
1240 assert!(LogEntryHeader::from_bytes(&buf).is_err());
1241 }
1242
1243 fn build_valid_entry(entry_type: u8, payload: &[u8]) -> Vec<u8> {
1252 use crate::entry_header::CHECKSUM_BYTES;
1253
1254 let item_size = payload.len() as u32;
1255 let header_size = LogEntryHeader::MIN_HEADER_SIZE;
1256 let total = header_size + payload.len();
1257
1258 let mut buf = vec![0u8; total];
1259 buf[4] = entry_type;
1261 buf[5] = 0; buf[10..14].copy_from_slice(&item_size.to_le_bytes());
1264 buf[header_size..].copy_from_slice(payload);
1265
1266 let crc = ChecksumValidator::compute_range(
1268 &buf,
1269 CHECKSUM_BYTES,
1270 total - CHECKSUM_BYTES,
1271 );
1272 buf[0..4].copy_from_slice(&crc.to_le_bytes());
1273 buf
1274 }
1275
1276 fn build_valid_vlsn_entry(
1281 entry_type: u8,
1282 vlsn: i64,
1283 payload: &[u8],
1284 ) -> Vec<u8> {
1285 use crate::entry_header::CHECKSUM_BYTES;
1286 let item_size = payload.len() as u32;
1287 let header_size = MAX_HEADER_SIZE; let total = header_size + payload.len();
1289 let mut buf = vec![0u8; total];
1290 buf[4] = entry_type;
1291 buf[5] = VLSN_PRESENT_MASK; buf[10..14].copy_from_slice(&item_size.to_le_bytes());
1294 buf[14..22].copy_from_slice(&vlsn.to_le_bytes());
1295 buf[header_size..].copy_from_slice(payload);
1296 let crc = ChecksumValidator::compute_range(
1297 &buf,
1298 CHECKSUM_BYTES,
1299 total - CHECKSUM_BYTES,
1300 );
1301 buf[0..4].copy_from_slice(&crc.to_le_bytes());
1302 buf
1303 }
1304
1305 #[test]
1306 #[ignore = "F-2: the FileReader/LastFileReader path is dead code that also \
1307 reads only a 14-byte header and cannot decode VLSN entries; \
1308 fixing it fully to replace the production scanner (and enable \
1309 the bounded backward CheckpointFileReader for recovery speed) \
1310 is a tracked follow-on. The F-4 CRC-on-real-bytes fix is in \
1311 place so it will be correct once the header-read is fixed."]
1312 fn test_checksum_validation_passes_on_vlsn_entry_f4() {
1313 let payload = b"replicated payload";
1316 let file_data = build_valid_vlsn_entry(13, 42, payload);
1317 let mut mock = MockFileAccess::new();
1318 mock.add_file(0, file_data);
1319 let mut reader = FileReader::new(
1320 mock,
1321 true,
1322 Lsn::new(0, 0),
1323 NULL_LSN,
1324 NULL_LSN,
1325 256,
1326 true, )
1328 .unwrap();
1329 let result = reader.read_next_entry();
1330 assert!(
1331 matches!(result, Ok(true)),
1332 "F-4: VLSN entry must pass CRC validation, got {:?}",
1333 result
1334 );
1335 }
1336
1337 #[test]
1340 fn test_checksum_validation_passes_on_valid_entry() {
1341 let payload = b"hello noxu";
1342 let file_data = build_valid_entry(7, payload);
1343
1344 let mut mock = MockFileAccess::new();
1345 mock.add_file(0, file_data);
1346
1347 let start_lsn = Lsn::new(0, 0);
1348 let mut reader = FileReader::new(
1349 mock, true, start_lsn, NULL_LSN, NULL_LSN, 256,
1350 true, )
1352 .unwrap();
1353
1354 let result = reader.read_next_entry();
1355 assert!(
1356 matches!(result, Ok(true)),
1357 "expected Ok(true) but got {:?}",
1358 result
1359 );
1360 assert_eq!(reader.get_num_read(), 1);
1361 }
1362
1363 #[test]
1366 fn test_checksum_validation_fails_on_corrupted_entry() {
1367 let payload = b"hello noxu";
1368 let mut file_data = build_valid_entry(7, payload);
1369
1370 let last = file_data.len() - 1;
1372 file_data[last] ^= 0xFF;
1373
1374 let mut mock = MockFileAccess::new();
1375 mock.add_file(0, file_data);
1376
1377 let start_lsn = Lsn::new(0, 0);
1378 let mut reader = FileReader::new(
1379 mock, true, start_lsn, NULL_LSN, NULL_LSN, 256,
1380 true, )
1382 .unwrap();
1383
1384 let result = reader.read_next_entry();
1385 assert!(
1386 matches!(result, Err(NoxuLogError::Checksum { .. })),
1387 "expected Checksum error but got {:?}",
1388 result
1389 );
1390 }
1391
1392 #[test]
1394 fn test_checksum_skipped_when_disabled() {
1395 let payload = b"hello noxu";
1396 let mut file_data = build_valid_entry(7, payload);
1397
1398 let last = file_data.len() - 1;
1400 file_data[last] ^= 0xFF;
1401
1402 let mut mock = MockFileAccess::new();
1403 mock.add_file(0, file_data);
1404
1405 let start_lsn = Lsn::new(0, 0);
1406 let mut reader = FileReader::new(
1407 mock, true, start_lsn, NULL_LSN, NULL_LSN, 256,
1408 false, )
1410 .unwrap();
1411
1412 assert!(matches!(reader.read_next_entry(), Ok(true)));
1414 }
1415}