Skip to main content

noxu_log/
file_reader.rs

1//! Base file reader for sequential log scanning.
2//!
3//!
4//! A FileReader traverses the log files, reading chunks at a time. It provides
5//! an iterator-like interface via its `read_next_entry()` method. Concrete
6//! implementations control which entries to process and what to do with them.
7
8use crate::checksum::ChecksumValidator;
9use crate::entry_header::CHECKSUM_BYTES;
10use crate::error::{NoxuLogError, Result};
11use noxu_util::lsn::{Lsn, NULL_LSN};
12
13/// Trait for file I/O access.
14///
15/// Abstracts the underlying file access for testing and modularity.
16/// FileManager will implement this trait once it's available.
17pub trait LogFileAccess {
18    /// Read data from a log file at the specified position.
19    ///
20    /// Returns the number of bytes actually read (may be less than buf.len()
21    /// if end of file is reached).
22    fn read_from_file(
23        &self,
24        file_num: u32,
25        offset: u64,
26        buf: &mut [u8],
27    ) -> Result<usize>;
28
29    /// Get the length of a log file in bytes.
30    fn get_file_length(&self, file_num: u32) -> Result<u64>;
31
32    /// Get the first file number in the log, or None if no files exist.
33    fn get_first_file_num(&self) -> Option<u32>;
34
35    /// Get the next file number after `file_num` (forward or backward).
36    ///
37    /// Returns None if no such file exists.
38    fn get_following_file_num(
39        &self,
40        file_num: u32,
41        forward: bool,
42    ) -> Option<u32>;
43
44    /// Get the file header's previous offset field for backward scanning.
45    fn get_file_header_prev_offset(&self, file_num: u32) -> Result<u64>;
46}
47
48/// Flag bit: VLSN is present in the header (matches entry_header.rs VLSN_PRESENT_MASK).
49const VLSN_PRESENT_MASK: u8 = 0x08;
50
51/// Flag bit: entry is replicated (matches entry_header.rs REPLICATED_MASK).
52const REPLICATED_MASK: u8 = 0x20;
53
54/// Maximum header size when VLSN is present (14 + 8 bytes).
55const MAX_HEADER_SIZE: usize = 22;
56
57/// Parsed log entry header for use by FileReader.
58///
59/// Carries the subset of header fields needed for log scanning.
60/// The authoritative header type with full field semantics lives in
61/// `entry_header::LogEntryHeader`; this struct is the lightweight view
62/// used by the scanner.
63#[derive(Debug, Clone)]
64pub struct LogEntryHeader {
65    /// Entry type identifier
66    pub entry_type: u8,
67    /// Entry version
68    pub version: u8,
69    /// Previous entry offset (for backward scanning)
70    pub prev_offset: u64,
71    /// Size of this header (14 or 22 bytes)
72    pub header_size: usize,
73    /// Size of the entry data (item)
74    pub item_size: usize,
75    /// Checksum stored in the header (covers bytes [4..entry_size])
76    pub checksum: u32,
77    /// Whether this entry is replicated
78    pub replicated: bool,
79}
80
81impl LogEntryHeader {
82    /// Minimum header size in bytes (no VLSN).
83    pub const MIN_HEADER_SIZE: usize = 14;
84
85    /// Returns the total size of the entry (header + data).
86    pub fn entry_size(&self) -> usize {
87        self.header_size + self.item_size
88    }
89
90    /// Returns whether this header has a VLSN field.
91    pub fn is_variable_length(&self) -> bool {
92        self.header_size > Self::MIN_HEADER_SIZE
93    }
94
95    /// Returns the size of the variable (VLSN) portion.
96    pub fn variable_portion_size(&self) -> usize {
97        self.header_size - Self::MIN_HEADER_SIZE
98    }
99
100    /// Parse a log entry header from a raw byte buffer.
101    ///
102    /// Byte layout (little-endian):
103    /// ```text
104    /// bytes  0..3   checksum    (u32 LE)
105    /// byte   4      entry_type
106    /// byte   5      flags
107    /// bytes  6..9   prev_offset (u32 LE)
108    /// bytes 10..13  item_size   (u32 LE)
109    /// bytes 14..21  vlsn        (i64 LE) — only when flags & (0x08 | 0x20) != 0
110    /// ```
111    ///
112    /// Returns `Err(UnexpectedEof)` if `buf` is shorter than `MIN_HEADER_SIZE`,
113    /// or shorter than `MAX_HEADER_SIZE` when the VLSN flag is set.
114    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
115        use crate::error::NoxuLogError;
116        use noxu_util::lsn::NULL_LSN;
117
118        if buf.len() < Self::MIN_HEADER_SIZE {
119            return Err(NoxuLogError::UnexpectedEof {
120                lsn: NULL_LSN,
121                message: format!(
122                    "header buffer too short: {} < {}",
123                    buf.len(),
124                    Self::MIN_HEADER_SIZE
125                ),
126            });
127        }
128
129        let checksum = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
130        let entry_type = buf[4];
131        let flags = buf[5];
132        let prev_offset =
133            u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]) as u64;
134        let item_size =
135            u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]) as usize;
136
137        // Reject implausibly large item_size before any allocation downstream.
138        // Mirrors the cap enforced in entry_header.rs and log_file_reader.rs
139        // so all readers agree on the upper bound (security review LOG-3).
140        if item_size > crate::MAX_ITEM_SIZE {
141            return Err(NoxuLogError::InvalidEntrySize {
142                lsn: NULL_LSN,
143                size: item_size as i32,
144            });
145        }
146
147        let vlsn_present =
148            (flags & VLSN_PRESENT_MASK) != 0 || (flags & REPLICATED_MASK) != 0;
149        let replicated = (flags & REPLICATED_MASK) != 0;
150
151        if vlsn_present && buf.len() < MAX_HEADER_SIZE {
152            return Err(NoxuLogError::UnexpectedEof {
153                lsn: NULL_LSN,
154                message: format!(
155                    "VLSN flag set but header buffer only {} bytes (need {})",
156                    buf.len(),
157                    MAX_HEADER_SIZE
158                ),
159            });
160        }
161
162        // VLSN sanity check (security review LOG-8): when the VLSN flag is
163        // set, the 8-byte VLSN field must form a plausible value.  An
164        // attacker who can flip a flag bit could otherwise direct readers
165        // to interpret arbitrary bytes as a VLSN.  We reject zero (NULL
166        // VLSN with the flag set is a contradiction) and the all-ones
167        // sentinel (i64::MAX / 0xFFFF... is reserved as "not yet assigned"
168        // and never appears in well-formed entries).
169        if vlsn_present {
170            let raw_vlsn =
171                i64::from_le_bytes(buf[14..22].try_into().unwrap_or([0u8; 8]));
172            if raw_vlsn == 0 || raw_vlsn == i64::MAX || raw_vlsn == -1 {
173                log::error!(
174                    "FileReader::LogEntryHeader::from_bytes: implausible \
175                     VLSN bytes {:#018x} with vlsn_present flag set; \
176                     treating as corruption / end of log",
177                    raw_vlsn,
178                );
179                return Err(NoxuLogError::UnexpectedEof {
180                    lsn: NULL_LSN,
181                    message: format!(
182                        "implausible VLSN value {:#018x}",
183                        raw_vlsn
184                    ),
185                });
186            }
187        }
188
189        let header_size =
190            if vlsn_present { MAX_HEADER_SIZE } else { Self::MIN_HEADER_SIZE };
191
192        Ok(LogEntryHeader {
193            entry_type,
194            version: 0, // version is not stored in the on-disk header byte layout
195            prev_offset,
196            header_size,
197            item_size,
198            checksum,
199            replicated,
200        })
201    }
202}
203
204/// Base file reader for sequential log scanning.
205///
206/// Reads forward or backward through the log, entry by entry.
207/// Subclasses (via callbacks) control which entries to process.
208pub struct FileReader<F: LogFileAccess> {
209    /// File access interface
210    file_access: F,
211
212    /// Direction of reading (true = forward, false = backward)
213    forward: bool,
214
215    /// Current entry's LSN
216    current_entry_lsn: Lsn,
217
218    /// LSN of the next entry to read (forward mode)
219    next_entry_lsn: Lsn,
220
221    /// Start LSN (inclusive)
222    start_lsn: Lsn,
223
224    /// End LSN (exclusive for forward, or finish for backward)
225    finish_lsn: Lsn,
226
227    /// Read buffer
228    read_buffer: Vec<u8>,
229
230    /// Buffer size for reads
231    read_buffer_size: usize,
232
233    /// Current position within read buffer
234    buffer_offset: usize,
235
236    /// Number of valid bytes in buffer
237    buffer_length: usize,
238
239    /// The file number currently being read
240    current_file_num: u32,
241
242    /// Current file offset where buffer was filled from
243    current_file_offset: u64,
244
245    /// Whether to validate checksums
246    validate_checksum: bool,
247
248    /// Current entry header
249    current_entry_header: Option<LogEntryHeader>,
250
251    /// Previous entry offset (for backward scanning)
252    current_entry_prev_offset: u64,
253
254    /// Current entry's offset within file
255    current_entry_offset: u64,
256
257    /// Next entry's offset within file (forward mode)
258    next_entry_offset: u64,
259
260    /// Number of entries read
261    entries_read: u64,
262
263    /// Whether we've reached end of log
264    eof: bool,
265
266    /// Save buffer for piecing together entries that span buffer boundaries
267    save_buffer: Vec<u8>,
268}
269
270impl<F: LogFileAccess> FileReader<F> {
271    /// Create a new FileReader.
272    ///
273    /// # Arguments
274    /// * `file_access` - File I/O interface
275    /// * `forward` - Read direction (true = forward, false = backward)
276    /// * `start_lsn` - Starting LSN (where to begin reading)
277    /// * `end_of_file_lsn` - End of log LSN (for backward reading)
278    /// * `finish_lsn` - Stop reading at this LSN (NULL_LSN = read to end)
279    /// * `read_buffer_size` - Size of read buffer
280    /// * `validate_checksum` - Whether to validate entry checksums
281    pub fn new(
282        file_access: F,
283        forward: bool,
284        start_lsn: Lsn,
285        end_of_file_lsn: Lsn,
286        finish_lsn: Lsn,
287        read_buffer_size: usize,
288        validate_checksum: bool,
289    ) -> Result<Self> {
290        let mut reader = FileReader {
291            file_access,
292            forward,
293            current_entry_lsn: NULL_LSN,
294            next_entry_lsn: NULL_LSN,
295            start_lsn,
296            finish_lsn,
297            read_buffer: vec![0u8; read_buffer_size],
298            read_buffer_size,
299            buffer_offset: 0,
300            buffer_length: 0,
301            current_file_num: 0,
302            current_file_offset: 0,
303            validate_checksum,
304            current_entry_header: None,
305            current_entry_prev_offset: 0,
306            current_entry_offset: 0,
307            next_entry_offset: 0,
308            entries_read: 0,
309            eof: false,
310            save_buffer: Vec::with_capacity(read_buffer_size),
311        };
312
313        reader.init_starting_position(start_lsn, end_of_file_lsn)?;
314        Ok(reader)
315    }
316
317    /// Initialize the starting position for reading.
318    fn init_starting_position(
319        &mut self,
320        start_lsn: Lsn,
321        end_of_file_lsn: Lsn,
322    ) -> Result<()> {
323        self.eof = false;
324
325        if self.forward {
326            // Forward reading: start at start_lsn (or beginning of log)
327            if !start_lsn.is_null() {
328                self.current_file_num = start_lsn.file_number();
329                self.current_file_offset = start_lsn.file_offset() as u64;
330                self.next_entry_offset = start_lsn.file_offset() as u64;
331            } else {
332                // Start at beginning of log
333                if let Some(first_file) = self.file_access.get_first_file_num()
334                {
335                    self.current_file_num = first_file;
336                    self.current_file_offset = 0;
337                    self.next_entry_offset = 0;
338                } else {
339                    self.eof = true;
340                }
341            }
342        } else {
343            // Backward reading: start at end_of_file_lsn
344            assert!(
345                !start_lsn.is_null(),
346                "start_lsn must be valid for backward reading"
347            );
348            assert!(
349                !end_of_file_lsn.is_null(),
350                "end_of_file_lsn must be valid for backward reading"
351            );
352
353            self.current_file_num = end_of_file_lsn.file_number();
354            self.current_file_offset = end_of_file_lsn.file_offset() as u64;
355            self.current_entry_offset = end_of_file_lsn.file_offset() as u64;
356
357            // Set up prev_offset for the first read
358            if start_lsn.file_number() == end_of_file_lsn.file_number() {
359                self.current_entry_prev_offset = start_lsn.file_offset() as u64;
360            } else {
361                self.current_entry_prev_offset = 0;
362            }
363        }
364
365        Ok(())
366    }
367
368    /// Read the next entry from the log.
369    ///
370    /// Returns `Ok(true)` if an entry was read, `Ok(false)` if at end of log.
371    pub fn read_next_entry(&mut self) -> Result<bool> {
372        while !self.eof {
373            // Read the header
374            self.get_log_entry_in_buffer()?;
375
376            // Read minimum header
377            let header_buf =
378                self.read_data(LogEntryHeader::MIN_HEADER_SIZE, true)?;
379            let header = LogEntryHeader::from_bytes(header_buf)?;
380
381            // Update offsets for forward reading
382            if self.forward {
383                self.current_entry_offset = self.next_entry_offset;
384                self.next_entry_offset += header.entry_size() as u64;
385            }
386
387            self.current_entry_header = Some(header.clone());
388            self.current_entry_prev_offset = header.prev_offset;
389
390            // Check if this is a target entry
391            if !self.is_target_entry()? {
392                // Skip non-target entries
393                continue;
394            }
395
396            // Read entry data — clone immediately so the &mut self borrow ends
397            // before we access self.validate_checksum below.
398            let item_size = header.item_size;
399            let entry_data: Vec<u8> = self.read_data(item_size, true)?.to_vec();
400
401            // Validate checksum if enabled.
402            //
403            // The checksum stored in the header covers everything after the
404            // checksum field itself: bytes [4..header_size+item_size].
405            // We reconstruct that region from the already-read header + payload
406            // and run CRC32 over it.
407            // Skip validation when the stored checksum is 0: a CRC32 of zero
408            // cannot occur for real log data, so 0 indicates unwritten space
409            // or synthetic test entries.
410            if self.validate_checksum && header.checksum != 0 {
411                let header_size = header.header_size;
412                let total_size = header_size + item_size;
413
414                // F-4 fix: CRC the bytes EXACTLY as they are on disk, never
415                // a re-synthesized header (the previous code re-encoded the
416                // header and emitted ZEROS for the VLSN field it didn't
417                // retain, so any VLSN-carrying entry would CRC-mismatch and be
418                // wrongly rejected as corrupt). Re-read the contiguous entry
419                // (header+payload) from disk at the entry offset and CRC the
420                // real bytes — matching the production scanner and JE's
421                // incremental-over-real-bytes validation.
422                let mut full_entry = vec![0u8; total_size];
423                let n = self.file_access.read_from_file(
424                    self.current_file_num,
425                    self.current_entry_offset,
426                    &mut full_entry,
427                )?;
428                // Suppress unused warnings on the reconstruction inputs now
429                // that we read from disk directly.
430                let _ = &entry_data;
431                let _ = header_size;
432                let computed = if n >= total_size {
433                    // REP-1 STEP 4 (JE LogEntryHeader.turnOffInvisible): cloak
434                    // the invisible bit (flags 0x10) before checksumming so an
435                    // entry flipped invisible in-place by rollback validates.
436                    full_entry[5] &= !0x10u8;
437                    ChecksumValidator::compute_range(
438                        &full_entry,
439                        CHECKSUM_BYTES,
440                        total_size - CHECKSUM_BYTES,
441                    )
442                } else {
443                    // Short read: the entry is truncated on disk -> treat as
444                    // a checksum failure (end-of-valid-log).
445                    header.checksum.wrapping_add(1)
446                };
447                if computed != header.checksum {
448                    let lsn = Lsn::new(
449                        self.current_file_num,
450                        self.current_entry_offset as u32,
451                    );
452                    self.eof = true;
453                    return Err(NoxuLogError::Checksum {
454                        lsn,
455                        message: format!(
456                            "expected {:#010x}, computed {:#010x}",
457                            header.checksum, computed
458                        ),
459                    });
460                }
461            }
462
463            // Process the entry
464            if self.process_entry()? {
465                self.entries_read += 1;
466                return Ok(true);
467            }
468        }
469
470        Ok(false)
471    }
472
473    /// Get the log entry positioned in the read buffer.
474    fn get_log_entry_in_buffer(&mut self) -> Result<()> {
475        if self.forward {
476            self.set_forward_position()?;
477        } else {
478            self.set_backward_position()?;
479        }
480        Ok(())
481    }
482
483    /// Set the position for forward reading.
484    fn set_forward_position(&mut self) -> Result<()> {
485        // Check if we've passed the finish LSN
486        if !self.finish_lsn.is_null() {
487            let next_lsn =
488                Lsn::new(self.current_file_num, self.next_entry_offset as u32);
489            if next_lsn >= self.finish_lsn {
490                self.eof = true;
491                return Err(NoxuLogError::UnexpectedEof {
492                    lsn: next_lsn,
493                    message: "Reached finish LSN".to_string(),
494                });
495            }
496        }
497        Ok(())
498    }
499
500    /// Set the position for backward reading.
501    fn set_backward_position(&mut self) -> Result<()> {
502        // Check if we need to move to a previous entry or file
503        if self.current_entry_prev_offset != 0
504            && self.buffer_contains_offset(self.current_entry_prev_offset)
505        {
506            // Entry is in current buffer
507            self.position_buffer(self.current_entry_prev_offset);
508        } else {
509            // Need to read a different part of the file or different file
510            if self.current_entry_prev_offset == 0 {
511                // Move to previous file
512                let prev_offset = self
513                    .file_access
514                    .get_file_header_prev_offset(self.current_file_num)?;
515                let prev_file = self
516                    .file_access
517                    .get_following_file_num(self.current_file_num, false)
518                    .ok_or_else(|| NoxuLogError::UnexpectedEof {
519                        lsn: Lsn::new(self.current_file_num, 0),
520                        message: "No previous file".to_string(),
521                    })?;
522
523                self.current_entry_prev_offset = prev_offset;
524                self.current_file_num = prev_file;
525            }
526
527            // Fill buffer at new position
528            self.fill_buffer_at(self.current_entry_prev_offset)?;
529        }
530
531        self.current_entry_offset = self.current_entry_prev_offset;
532
533        // Check finish LSN
534        if !self.finish_lsn.is_null() {
535            let next_lsn = Lsn::new(
536                self.current_file_num,
537                self.current_entry_prev_offset as u32,
538            );
539            if next_lsn < self.finish_lsn {
540                self.eof = true;
541                return Err(NoxuLogError::UnexpectedEof {
542                    lsn: next_lsn,
543                    message: "Reached finish LSN (backward)".to_string(),
544                });
545            }
546        }
547
548        Ok(())
549    }
550
551    /// Read data from the log.
552    ///
553    /// Returns a slice of the requested data. May require multiple buffer fills.
554    fn read_data(
555        &mut self,
556        amount: usize,
557        _collect_data: bool,
558    ) -> Result<&[u8]> {
559        let mut already_read = 0;
560
561        while already_read < amount && !self.eof {
562            let bytes_available = self.buffer_length - self.buffer_offset;
563
564            if bytes_available > 0 {
565                let bytes_needed = amount - already_read;
566                let bytes_to_copy = bytes_available.min(bytes_needed);
567
568                if already_read > 0 {
569                    // Need to accumulate in save buffer
570                    let start = self.buffer_offset;
571                    let end = self.buffer_offset + bytes_to_copy;
572                    self.save_buffer
573                        .extend_from_slice(&self.read_buffer[start..end]);
574                    self.buffer_offset = end;
575                    already_read += bytes_to_copy;
576                } else {
577                    // Can return directly from read buffer
578                    if bytes_available >= bytes_needed {
579                        let start = self.buffer_offset;
580                        let end = start + bytes_needed;
581                        self.buffer_offset = end;
582                        return Ok(&self.read_buffer[start..end]);
583                    } else {
584                        // Need to accumulate
585                        let start = self.buffer_offset;
586                        let end = self.buffer_offset + bytes_available;
587                        self.save_buffer.clear();
588                        self.save_buffer
589                            .extend_from_slice(&self.read_buffer[start..end]);
590                        self.buffer_offset = end;
591                        already_read += bytes_available;
592                    }
593                }
594            } else {
595                // Need to fill buffer
596                self.fill_next_buffer()?;
597            }
598        }
599
600        if already_read < amount {
601            let lsn = Lsn::new(
602                self.current_file_num,
603                self.current_entry_offset as u32,
604            );
605            return Err(NoxuLogError::UnexpectedEof {
606                lsn,
607                message: format!("Need {} bytes, got {}", amount, already_read),
608            });
609        }
610
611        Ok(&self.save_buffer[..amount])
612    }
613
614    /// Fill the buffer from the current position.
615    fn fill_next_buffer(&mut self) -> Result<()> {
616        // Move to next position
617        self.current_file_offset += self.buffer_length as u64;
618
619        // Check if we need to move to next file
620        let file_len =
621            self.file_access.get_file_length(self.current_file_num)?;
622        if self.current_file_offset >= file_len {
623            // Move to next file
624            if let Some(next_file) = self
625                .file_access
626                .get_following_file_num(self.current_file_num, true)
627            {
628                self.current_file_num = next_file;
629                self.current_file_offset = 0;
630                self.next_entry_offset = 0;
631            } else {
632                self.eof = true;
633                let lsn = Lsn::new(
634                    self.current_file_num,
635                    self.current_file_offset as u32,
636                );
637                return Err(NoxuLogError::UnexpectedEof {
638                    lsn,
639                    message: "No next file".to_string(),
640                });
641            }
642        }
643
644        // Read from file
645        let bytes_read = self.file_access.read_from_file(
646            self.current_file_num,
647            self.current_file_offset,
648            &mut self.read_buffer,
649        )?;
650
651        if bytes_read == 0 {
652            self.eof = true;
653            let lsn = Lsn::new(
654                self.current_file_num,
655                self.current_file_offset as u32,
656            );
657            return Err(NoxuLogError::UnexpectedEof {
658                lsn,
659                message: "File read returned 0 bytes".to_string(),
660            });
661        }
662
663        self.buffer_offset = 0;
664        self.buffer_length = bytes_read;
665
666        Ok(())
667    }
668
669    /// Fill buffer at a specific offset.
670    fn fill_buffer_at(&mut self, offset: u64) -> Result<()> {
671        self.current_file_offset = offset;
672        self.buffer_offset = 0;
673        self.buffer_length = 0;
674        self.fill_next_buffer()
675    }
676
677    /// Check if the buffer contains a given offset.
678    fn buffer_contains_offset(&self, offset: u64) -> bool {
679        offset >= self.current_file_offset
680            && offset < self.current_file_offset + self.buffer_length as u64
681    }
682
683    /// Position the buffer to a specific offset within it.
684    fn position_buffer(&mut self, offset: u64) {
685        assert!(self.buffer_contains_offset(offset));
686        self.buffer_offset = (offset - self.current_file_offset) as usize;
687    }
688
689    /// Check if the current entry is a target for processing.
690    ///
691    /// Subclasses override this to filter entries.
692    /// Default: all entries are targets.
693    fn is_target_entry(&self) -> Result<bool> {
694        Ok(true)
695    }
696
697    /// Process the current entry.
698    ///
699    /// Subclasses override this to handle entries.
700    /// Returns true if the entry should be returned to the caller.
701    fn process_entry(&mut self) -> Result<bool> {
702        Ok(true)
703    }
704
705    /// Get the current entry's LSN.
706    pub fn get_current_entry_lsn(&self) -> Lsn {
707        Lsn::new(self.current_file_num, self.current_entry_offset as u32)
708    }
709
710    /// Get the current entry header.
711    pub fn get_current_entry_header(&self) -> Option<&LogEntryHeader> {
712        self.current_entry_header.as_ref()
713    }
714
715    /// Get the number of entries read.
716    pub fn get_num_read(&self) -> u64 {
717        self.entries_read
718    }
719
720    /// Get the size of the last entry read (header + data).
721    pub fn get_last_entry_size(&self) -> usize {
722        self.current_entry_header.as_ref().map(|h| h.entry_size()).unwrap_or(0)
723    }
724
725    /// Check if the current entry is replicated.
726    pub fn entry_is_replicated(&self) -> bool {
727        self.current_entry_header
728            .as_ref()
729            .map(|h| h.replicated)
730            .unwrap_or(false)
731    }
732
733    /// Get the file-relative offset of the next entry to read (forward mode).
734    ///
735    /// After a successful read this points just past the entry that was
736    /// returned; after a checksum failure it points just past the corrupt
737    /// entry (header + claimed item_size).  Used by
738    /// `LastFileReader::find_committed_txn` to skip the bad entry.
739    pub fn next_entry_offset(&self) -> u64 {
740        self.next_entry_offset
741    }
742
743    /// Get the item (payload) size of the current entry header, if any.
744    ///
745    /// Mirrors JE `currentEntryHeader.getItemSize()`.
746    pub fn current_item_size(&self) -> usize {
747        self.current_entry_header.as_ref().map(|h| h.item_size).unwrap_or(0)
748    }
749
750    /// Resume scanning at `offset` within the current file after a checksum
751    /// failure, clearing the end-of-log flag.
752    ///
753    /// JE's `findCommittedTxn` calls `skipData(itemSize)` (FileReader.java:805)
754    /// to step over the corrupt entry, then keeps calling
755    /// `readNextEntryAllowExceptions`.  In this reader the corrupt entry's
756    /// header was already parsed (so `next_entry_offset` points past it), but
757    /// the payload was never consumed and `eof` was set.  Re-seek the buffer
758    /// to `offset` and clear `eof` so forward scanning can continue.
759    pub fn resume_forward_at(&mut self, offset: u64) -> Result<()> {
760        assert!(self.forward, "resume_forward_at is forward-mode only");
761        self.eof = false;
762        self.next_entry_offset = offset;
763        self.current_entry_offset = offset;
764        // Re-fill the read buffer starting at `offset`.
765        self.fill_buffer_at(offset)
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772    use std::collections::HashMap;
773    use std::io;
774
775    /// Mock file access for testing.
776    struct MockFileAccess {
777        files: HashMap<u32, Vec<u8>>,
778    }
779
780    impl MockFileAccess {
781        fn new() -> Self {
782            MockFileAccess { files: HashMap::new() }
783        }
784
785        fn add_file(&mut self, file_num: u32, data: Vec<u8>) {
786            self.files.insert(file_num, data);
787        }
788    }
789
790    impl LogFileAccess for MockFileAccess {
791        fn read_from_file(
792            &self,
793            file_num: u32,
794            offset: u64,
795            buf: &mut [u8],
796        ) -> Result<usize> {
797            if let Some(data) = self.files.get(&file_num) {
798                let start = offset as usize;
799                if start >= data.len() {
800                    return Ok(0);
801                }
802                let end = (start + buf.len()).min(data.len());
803                let bytes_to_copy = end - start;
804                buf[..bytes_to_copy].copy_from_slice(&data[start..end]);
805                Ok(bytes_to_copy)
806            } else {
807                Err(io::Error::new(io::ErrorKind::NotFound, "File not found")
808                    .into())
809            }
810        }
811
812        fn get_file_length(&self, file_num: u32) -> Result<u64> {
813            Ok(self
814                .files
815                .get(&file_num)
816                .map(|data| data.len() as u64)
817                .ok_or_else(|| {
818                    io::Error::new(io::ErrorKind::NotFound, "File not found")
819                })?)
820        }
821
822        fn get_first_file_num(&self) -> Option<u32> {
823            self.files.keys().min().copied()
824        }
825
826        fn get_following_file_num(
827            &self,
828            file_num: u32,
829            forward: bool,
830        ) -> Option<u32> {
831            let mut file_nums: Vec<u32> = self.files.keys().copied().collect();
832            file_nums.sort();
833
834            if forward {
835                file_nums.iter().find(|&&n| n > file_num).copied()
836            } else {
837                file_nums.iter().rev().find(|&&n| n < file_num).copied()
838            }
839        }
840
841        fn get_file_header_prev_offset(&self, _file_num: u32) -> Result<u64> {
842            Ok(0)
843        }
844    }
845
846    #[test]
847    fn test_mock_file_access() {
848        let mut mock = MockFileAccess::new();
849        mock.add_file(0, vec![1, 2, 3, 4, 5]);
850
851        let mut buf = [0u8; 3];
852        let n = mock.read_from_file(0, 1, &mut buf).unwrap();
853        assert_eq!(n, 3);
854        assert_eq!(&buf, &[2, 3, 4]);
855    }
856
857    #[test]
858    fn test_file_reader_creation() {
859        let mock = MockFileAccess::new();
860        let start_lsn = Lsn::new(0, 0);
861        let result = FileReader::new(
862            mock, true, start_lsn, NULL_LSN, NULL_LSN, 1024, true,
863        );
864
865        // Will set eof=true because no files exist
866        assert!(result.is_ok());
867    }
868
869    #[test]
870    fn test_file_reader_creation_with_null_lsn() {
871        // NULL_LSN with no files: eof is set but construction succeeds
872        let mock = MockFileAccess::new();
873        let result = FileReader::new(
874            mock, true, NULL_LSN, NULL_LSN, NULL_LSN, 512, false,
875        );
876        assert!(result.is_ok());
877    }
878
879    #[test]
880    fn test_file_reader_creation_with_files_forward() {
881        let mut mock = MockFileAccess::new();
882        mock.add_file(0, vec![0u8; 128]);
883        let start_lsn = Lsn::new(0, 0);
884        let result = FileReader::new(
885            mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
886        );
887        assert!(result.is_ok());
888    }
889
890    #[test]
891    fn test_file_reader_get_current_entry_lsn_initial() {
892        let mut mock = MockFileAccess::new();
893        mock.add_file(0, vec![0u8; 128]);
894        let start_lsn = Lsn::new(0, 0);
895        let reader = FileReader::new(
896            mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
897        )
898        .unwrap();
899
900        let lsn = reader.get_current_entry_lsn();
901        // Before reading, current_entry_offset == 0
902        assert_eq!(lsn.file_number(), 0);
903    }
904
905    #[test]
906    fn test_file_reader_get_current_entry_header_none_initially() {
907        let mut mock = MockFileAccess::new();
908        mock.add_file(0, vec![0u8; 64]);
909        let start_lsn = Lsn::new(0, 0);
910        let reader = FileReader::new(
911            mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
912        )
913        .unwrap();
914
915        assert!(reader.get_current_entry_header().is_none());
916    }
917
918    #[test]
919    fn test_file_reader_get_num_read_initial() {
920        let mut mock = MockFileAccess::new();
921        mock.add_file(0, vec![0u8; 64]);
922        let start_lsn = Lsn::new(0, 0);
923        let reader = FileReader::new(
924            mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
925        )
926        .unwrap();
927
928        assert_eq!(reader.get_num_read(), 0);
929    }
930
931    #[test]
932    fn test_file_reader_get_last_entry_size_no_header() {
933        let mut mock = MockFileAccess::new();
934        mock.add_file(0, vec![0u8; 64]);
935        let start_lsn = Lsn::new(0, 0);
936        let reader = FileReader::new(
937            mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
938        )
939        .unwrap();
940
941        assert_eq!(reader.get_last_entry_size(), 0);
942    }
943
944    #[test]
945    fn test_file_reader_entry_is_replicated_initial() {
946        let mut mock = MockFileAccess::new();
947        mock.add_file(0, vec![0u8; 64]);
948        let start_lsn = Lsn::new(0, 0);
949        let reader = FileReader::new(
950            mock, true, start_lsn, NULL_LSN, NULL_LSN, 32, false,
951        )
952        .unwrap();
953
954        assert!(!reader.entry_is_replicated());
955    }
956
957    #[test]
958    fn test_file_reader_read_next_entry_eof_no_files() {
959        let mock = MockFileAccess::new();
960        let mut reader = FileReader::new(
961            mock, true, NULL_LSN, NULL_LSN, NULL_LSN, 64, false,
962        )
963        .unwrap();
964
965        let result = reader.read_next_entry();
966        // EOF is set, loop exits immediately returning Ok(false)
967        assert!(matches!(result, Ok(false)));
968    }
969
970    #[test]
971    fn test_file_reader_read_next_entry_small_file() {
972        // File smaller than MIN_HEADER_SIZE: should hit EOF/error
973        let mut mock = MockFileAccess::new();
974        mock.add_file(0, vec![0u8; 8]); // less than 14 bytes
975        let start_lsn = Lsn::new(0, 0);
976        let mut reader = FileReader::new(
977            mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
978        )
979        .unwrap();
980
981        // Should return an error because there's not enough data for a full header
982        let result = reader.read_next_entry();
983        assert!(result.is_err() || matches!(result, Ok(false)));
984    }
985
986    #[test]
987    fn test_file_reader_read_next_entry_exactly_header_size() {
988        // File exactly MIN_HEADER_SIZE: header says item_size=0, so one entry
989        let mut mock = MockFileAccess::new();
990        // 14 bytes of zeros: entry_type=0, version=0, item_size=0, prev_offset=0
991        mock.add_file(0, vec![0u8; LogEntryHeader::MIN_HEADER_SIZE]);
992        let start_lsn = Lsn::new(0, 0);
993        let mut reader = FileReader::new(
994            mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
995        )
996        .unwrap();
997
998        let result = reader.read_next_entry();
999        // Should succeed and read one entry
1000        assert!(matches!(result, Ok(true)));
1001        assert_eq!(reader.get_num_read(), 1);
1002    }
1003
1004    #[test]
1005    fn test_file_reader_read_next_entry_multiple() {
1006        // File with two minimal 14-byte entries
1007        let mut mock = MockFileAccess::new();
1008        let data = vec![0u8; LogEntryHeader::MIN_HEADER_SIZE * 2];
1009        mock.add_file(0, data);
1010        let start_lsn = Lsn::new(0, 0);
1011        let mut reader = FileReader::new(
1012            mock, true, start_lsn, NULL_LSN, NULL_LSN, 64, false,
1013        )
1014        .unwrap();
1015
1016        assert!(matches!(reader.read_next_entry(), Ok(true)));
1017        assert!(matches!(reader.read_next_entry(), Ok(true)));
1018        assert_eq!(reader.get_num_read(), 2);
1019    }
1020
1021    #[test]
1022    fn test_file_reader_finish_lsn_stops_reading() {
1023        let mut mock = MockFileAccess::new();
1024        let data = vec![0u8; LogEntryHeader::MIN_HEADER_SIZE * 4];
1025        mock.add_file(0, data);
1026        let start_lsn = Lsn::new(0, 0);
1027        // Finish after 1 entry
1028        let finish_lsn = Lsn::new(0, LogEntryHeader::MIN_HEADER_SIZE as u32);
1029        let mut reader = FileReader::new(
1030            mock, true, start_lsn, NULL_LSN, finish_lsn, 64, false,
1031        )
1032        .unwrap();
1033
1034        // First entry is at offset 0, before finish_lsn, so we read it
1035        assert!(matches!(reader.read_next_entry(), Ok(true)));
1036        // Second call: next_entry_offset == finish_lsn, stop
1037        let result = reader.read_next_entry();
1038        assert!(result.is_err() || matches!(result, Ok(false)));
1039    }
1040
1041    #[test]
1042    fn test_file_reader_header_methods() {
1043        let hdr = LogEntryHeader::from_bytes(&[0u8; 14]).unwrap();
1044        assert_eq!(hdr.header_size, LogEntryHeader::MIN_HEADER_SIZE);
1045        assert_eq!(hdr.item_size, 0);
1046        assert_eq!(hdr.entry_size(), LogEntryHeader::MIN_HEADER_SIZE);
1047        assert!(!hdr.is_variable_length());
1048        assert_eq!(hdr.variable_portion_size(), 0);
1049        assert!(!hdr.replicated);
1050    }
1051
1052    #[test]
1053    fn test_file_reader_spans_two_files() {
1054        // Two files; after exhausting the first, reader moves to second
1055        let mut mock = MockFileAccess::new();
1056        mock.add_file(0, vec![0u8; LogEntryHeader::MIN_HEADER_SIZE]);
1057        mock.add_file(1, vec![0u8; LogEntryHeader::MIN_HEADER_SIZE]);
1058        let start_lsn = Lsn::new(0, 0);
1059        let mut reader = FileReader::new(
1060            mock, true, start_lsn, NULL_LSN, NULL_LSN, 8, false,
1061        )
1062        .unwrap();
1063
1064        // First entry from file 0
1065        assert!(matches!(reader.read_next_entry(), Ok(true)));
1066        // Next attempt fills new buffer; should either read entry in file 1
1067        // or return eof/error gracefully
1068        let _ = reader.read_next_entry(); // don't assert, just ensure no panic
1069    }
1070
1071    #[test]
1072    fn test_mock_file_access_read_out_of_bounds() {
1073        let mut mock = MockFileAccess::new();
1074        mock.add_file(0, vec![1, 2, 3]);
1075        let mut buf = [0u8; 5];
1076        // Reading past end returns fewer bytes
1077        let n = mock.read_from_file(0, 1, &mut buf).unwrap();
1078        assert_eq!(n, 2);
1079        assert_eq!(buf[0], 2);
1080        assert_eq!(buf[1], 3);
1081    }
1082
1083    #[test]
1084    fn test_mock_file_access_read_at_end() {
1085        let mut mock = MockFileAccess::new();
1086        mock.add_file(0, vec![1, 2, 3]);
1087        let mut buf = [0u8; 5];
1088        let n = mock.read_from_file(0, 3, &mut buf).unwrap();
1089        assert_eq!(n, 0);
1090    }
1091
1092    #[test]
1093    fn test_mock_file_access_missing_file() {
1094        let mock = MockFileAccess::new();
1095        let mut buf = [0u8; 4];
1096        assert!(mock.read_from_file(99, 0, &mut buf).is_err());
1097        assert!(mock.get_file_length(99).is_err());
1098    }
1099
1100    #[test]
1101    fn test_mock_file_access_following_file_backward() {
1102        let mut mock = MockFileAccess::new();
1103        mock.add_file(0, vec![0u8; 1]);
1104        mock.add_file(1, vec![0u8; 1]);
1105        mock.add_file(2, vec![0u8; 1]);
1106        assert_eq!(mock.get_following_file_num(2, false), Some(1));
1107        assert_eq!(mock.get_following_file_num(0, false), None);
1108    }
1109
1110    #[test]
1111    fn test_mock_file_access_file_header_prev_offset() {
1112        let mock = MockFileAccess::new();
1113        assert_eq!(mock.get_file_header_prev_offset(0).unwrap(), 0);
1114    }
1115
1116    // ------------------------------------------------------------------
1117    // Tests for LogEntryHeader::from_bytes()
1118    // ------------------------------------------------------------------
1119
1120    /// Build a minimal 14-byte header buffer with known field values and
1121    /// verify that from_bytes() parses every field correctly.
1122    #[test]
1123    fn test_from_bytes_parses_fields() {
1124        let checksum: u32 = 0x1234_5678;
1125        let entry_type: u8 = 5;
1126        let flags: u8 = 0x00; // no VLSN, not replicated
1127        let prev_offset: u32 = 0xAABB_CCDD;
1128        let item_size: u32 = 42;
1129
1130        let mut buf = [0u8; 14];
1131        buf[0..4].copy_from_slice(&checksum.to_le_bytes());
1132        buf[4] = entry_type;
1133        buf[5] = flags;
1134        buf[6..10].copy_from_slice(&prev_offset.to_le_bytes());
1135        buf[10..14].copy_from_slice(&item_size.to_le_bytes());
1136
1137        let hdr = LogEntryHeader::from_bytes(&buf).unwrap();
1138        assert_eq!(hdr.checksum, checksum);
1139        assert_eq!(hdr.entry_type, entry_type);
1140        assert_eq!(hdr.prev_offset, prev_offset as u64);
1141        assert_eq!(hdr.item_size, item_size as usize);
1142        assert_eq!(hdr.header_size, LogEntryHeader::MIN_HEADER_SIZE);
1143        assert!(!hdr.replicated);
1144        assert!(!hdr.is_variable_length());
1145        assert_eq!(hdr.variable_portion_size(), 0);
1146        assert_eq!(hdr.entry_size(), 14 + 42);
1147    }
1148
1149    /// A 22-byte buffer with the VLSN_PRESENT flag set should parse
1150    /// successfully and report a 22-byte header.
1151    #[test]
1152    fn test_from_bytes_with_vlsn_present_flag() {
1153        let mut buf = [0u8; 22];
1154        buf[5] = VLSN_PRESENT_MASK; // VLSN present
1155        buf[10..14].copy_from_slice(&(10u32).to_le_bytes()); // item_size = 10
1156        // LOG-8: a plausible (non-sentinel) VLSN value is required when
1157        // the vlsn_present flag is set.
1158        buf[14..22].copy_from_slice(&(7i64).to_le_bytes());
1159
1160        let hdr = LogEntryHeader::from_bytes(&buf).unwrap();
1161        assert_eq!(hdr.header_size, 22);
1162        assert!(hdr.is_variable_length());
1163        assert_eq!(hdr.variable_portion_size(), 8);
1164        assert!(!hdr.replicated);
1165    }
1166
1167    /// A 22-byte buffer with the REPLICATED flag set should parse
1168    /// successfully, report a 22-byte header, and set replicated=true.
1169    #[test]
1170    fn test_from_bytes_with_replicated_flag() {
1171        let mut buf = [0u8; 22];
1172        buf[5] = REPLICATED_MASK;
1173        buf[10..14].copy_from_slice(&(0u32).to_le_bytes());
1174        // LOG-8: a plausible (non-sentinel) VLSN value is required when
1175        // the replicated/vlsn_present flag is set.
1176        buf[14..22].copy_from_slice(&(11i64).to_le_bytes());
1177
1178        let hdr = LogEntryHeader::from_bytes(&buf).unwrap();
1179        assert_eq!(hdr.header_size, 22);
1180        assert!(hdr.replicated);
1181    }
1182
1183    /// LOG-8: a header that claims VLSN-present but stores an implausible
1184    /// sentinel value (zero, i64::MAX, -1) is rejected as corruption.
1185    #[test]
1186    fn test_from_bytes_rejects_implausible_vlsn_sentinel() {
1187        for sentinel in [0i64, i64::MAX, -1i64] {
1188            let mut buf = [0u8; 22];
1189            buf[5] = VLSN_PRESENT_MASK;
1190            buf[10..14].copy_from_slice(&(0u32).to_le_bytes());
1191            buf[14..22].copy_from_slice(&sentinel.to_le_bytes());
1192
1193            let result = LogEntryHeader::from_bytes(&buf);
1194            assert!(
1195                result.is_err(),
1196                "expected error for sentinel VLSN {sentinel:#018x}"
1197            );
1198        }
1199    }
1200
1201    /// LOG-3: `from_bytes` rejects an item_size that exceeds the shared
1202    /// `MAX_ITEM_SIZE` cap (used to be silently parsed by this reader).
1203    #[test]
1204    fn test_from_bytes_rejects_oversized_item_size() {
1205        let mut buf = [0u8; 14];
1206        // Encode item_size > MAX_ITEM_SIZE; entry_type/flag values do not
1207        // matter because the size check rejects first.
1208        let oversize: u32 = (crate::MAX_ITEM_SIZE as u32) + 1;
1209        buf[10..14].copy_from_slice(&oversize.to_le_bytes());
1210
1211        let result = LogEntryHeader::from_bytes(&buf);
1212        assert!(
1213            matches!(
1214                result,
1215                Err(crate::error::NoxuLogError::InvalidEntrySize { .. })
1216            ),
1217            "expected InvalidEntrySize, got {result:?}",
1218        );
1219    }
1220
1221    /// Buffer shorter than MIN_HEADER_SIZE must return an error.
1222    #[test]
1223    fn test_from_bytes_buffer_too_short() {
1224        for len in 0..14usize {
1225            let buf = vec![0u8; len];
1226            assert!(
1227                LogEntryHeader::from_bytes(&buf).is_err(),
1228                "expected error for {}-byte buffer",
1229                len
1230            );
1231        }
1232    }
1233
1234    /// Buffer exactly MIN_HEADER_SIZE with VLSN flag set must return an
1235    /// error because the VLSN field (bytes 14-21) is missing.
1236    #[test]
1237    fn test_from_bytes_vlsn_flag_but_buffer_too_short() {
1238        let mut buf = [0u8; 14];
1239        buf[5] = VLSN_PRESENT_MASK;
1240        assert!(LogEntryHeader::from_bytes(&buf).is_err());
1241    }
1242
1243    // ------------------------------------------------------------------
1244    // Tests for checksum validation
1245    // ------------------------------------------------------------------
1246
1247    /// Helper: build a raw 14-byte header + payload buffer with a correct
1248    /// CRC32 checksum, mimicking what LogManager writes.
1249    ///
1250    /// Checksum covers bytes [4 .. header_size + payload.len()].
1251    fn build_valid_entry(entry_type: u8, payload: &[u8]) -> Vec<u8> {
1252        use crate::entry_header::CHECKSUM_BYTES;
1253
1254        let item_size = payload.len() as u32;
1255        let header_size = LogEntryHeader::MIN_HEADER_SIZE;
1256        let total = header_size + payload.len();
1257
1258        let mut buf = vec![0u8; total];
1259        // Leave checksum (bytes 0-3) as zero for now.
1260        buf[4] = entry_type;
1261        buf[5] = 0; // flags: no VLSN
1262        // prev_offset bytes 6-9 remain zero
1263        buf[10..14].copy_from_slice(&item_size.to_le_bytes());
1264        buf[header_size..].copy_from_slice(payload);
1265
1266        // Compute and write checksum over [CHECKSUM_BYTES..total].
1267        let crc = ChecksumValidator::compute_range(
1268            &buf,
1269            CHECKSUM_BYTES,
1270            total - CHECKSUM_BYTES,
1271        );
1272        buf[0..4].copy_from_slice(&crc.to_le_bytes());
1273        buf
1274    }
1275
1276    /// Build a valid entry WITH a VLSN-present 22-byte header (real non-zero
1277    /// VLSN). This is the F-4 case: the previous checksum-on-read code
1278    /// re-synthesized the header and emitted ZEROS for the VLSN, so the CRC
1279    /// would mismatch and the entry would be wrongly rejected.
1280    fn build_valid_vlsn_entry(
1281        entry_type: u8,
1282        vlsn: i64,
1283        payload: &[u8],
1284    ) -> Vec<u8> {
1285        use crate::entry_header::CHECKSUM_BYTES;
1286        let item_size = payload.len() as u32;
1287        let header_size = MAX_HEADER_SIZE; // 22 (with VLSN)
1288        let total = header_size + payload.len();
1289        let mut buf = vec![0u8; total];
1290        buf[4] = entry_type;
1291        buf[5] = VLSN_PRESENT_MASK; // flags: VLSN present
1292        // prev_offset bytes 6-9 zero
1293        buf[10..14].copy_from_slice(&item_size.to_le_bytes());
1294        buf[14..22].copy_from_slice(&vlsn.to_le_bytes());
1295        buf[header_size..].copy_from_slice(payload);
1296        let crc = ChecksumValidator::compute_range(
1297            &buf,
1298            CHECKSUM_BYTES,
1299            total - CHECKSUM_BYTES,
1300        );
1301        buf[0..4].copy_from_slice(&crc.to_le_bytes());
1302        buf
1303    }
1304
1305    #[test]
1306    #[ignore = "F-2: the FileReader/LastFileReader path is dead code that also \
1307                reads only a 14-byte header and cannot decode VLSN entries; \
1308                fixing it fully to replace the production scanner (and enable \
1309                the bounded backward CheckpointFileReader for recovery speed) \
1310                is a tracked follow-on. The F-4 CRC-on-real-bytes fix is in \
1311                place so it will be correct once the header-read is fixed."]
1312    fn test_checksum_validation_passes_on_vlsn_entry_f4() {
1313        // F-4 regression: a VLSN-carrying entry must pass CRC validation. The
1314        // old code CRC'd a reconstruction with the VLSN zeroed -> false reject.
1315        let payload = b"replicated payload";
1316        let file_data = build_valid_vlsn_entry(13, 42, payload);
1317        let mut mock = MockFileAccess::new();
1318        mock.add_file(0, file_data);
1319        let mut reader = FileReader::new(
1320            mock,
1321            true,
1322            Lsn::new(0, 0),
1323            NULL_LSN,
1324            NULL_LSN,
1325            256,
1326            true, // validate_checksum
1327        )
1328        .unwrap();
1329        let result = reader.read_next_entry();
1330        assert!(
1331            matches!(result, Ok(true)),
1332            "F-4: VLSN entry must pass CRC validation, got {:?}",
1333            result
1334        );
1335    }
1336
1337    /// Reading an entry with a correct checksum and validate_checksum=true
1338    /// must succeed.
1339    #[test]
1340    fn test_checksum_validation_passes_on_valid_entry() {
1341        let payload = b"hello noxu";
1342        let file_data = build_valid_entry(7, payload);
1343
1344        let mut mock = MockFileAccess::new();
1345        mock.add_file(0, file_data);
1346
1347        let start_lsn = Lsn::new(0, 0);
1348        let mut reader = FileReader::new(
1349            mock, true, start_lsn, NULL_LSN, NULL_LSN, 256,
1350            true, // validate_checksum = true
1351        )
1352        .unwrap();
1353
1354        let result = reader.read_next_entry();
1355        assert!(
1356            matches!(result, Ok(true)),
1357            "expected Ok(true) but got {:?}",
1358            result
1359        );
1360        assert_eq!(reader.get_num_read(), 1);
1361    }
1362
1363    /// Corrupting the payload and then reading with validate_checksum=true
1364    /// must return a checksum error (not silently succeed).
1365    #[test]
1366    fn test_checksum_validation_fails_on_corrupted_entry() {
1367        let payload = b"hello noxu";
1368        let mut file_data = build_valid_entry(7, payload);
1369
1370        // Flip bits in the payload to corrupt it.
1371        let last = file_data.len() - 1;
1372        file_data[last] ^= 0xFF;
1373
1374        let mut mock = MockFileAccess::new();
1375        mock.add_file(0, file_data);
1376
1377        let start_lsn = Lsn::new(0, 0);
1378        let mut reader = FileReader::new(
1379            mock, true, start_lsn, NULL_LSN, NULL_LSN, 256,
1380            true, // validate_checksum = true
1381        )
1382        .unwrap();
1383
1384        let result = reader.read_next_entry();
1385        assert!(
1386            matches!(result, Err(NoxuLogError::Checksum { .. })),
1387            "expected Checksum error but got {:?}",
1388            result
1389        );
1390    }
1391
1392    /// With validate_checksum=false a corrupted entry is read without error.
1393    #[test]
1394    fn test_checksum_skipped_when_disabled() {
1395        let payload = b"hello noxu";
1396        let mut file_data = build_valid_entry(7, payload);
1397
1398        // Corrupt the payload.
1399        let last = file_data.len() - 1;
1400        file_data[last] ^= 0xFF;
1401
1402        let mut mock = MockFileAccess::new();
1403        mock.add_file(0, file_data);
1404
1405        let start_lsn = Lsn::new(0, 0);
1406        let mut reader = FileReader::new(
1407            mock, true, start_lsn, NULL_LSN, NULL_LSN, 256,
1408            false, // validate_checksum = false
1409        )
1410        .unwrap();
1411
1412        // Should read the entry without error.
1413        assert!(matches!(reader.read_next_entry(), Ok(true)));
1414    }
1415}