Skip to main content

modelvault_core/segments/
reader.rs

1use crate::checksum::crc32c_append;
2use crate::error::{DbError, FormatError};
3use crate::segments::header::SegmentType;
4use crate::segments::header::{decode_segment_header, SegmentHeader, SEGMENT_HEADER_LEN};
5use crate::storage::Store;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct SegmentMeta {
9    pub offset: u64,
10    pub header: SegmentHeader,
11}
12
13pub fn read_segment_header_at(
14    store: &mut impl Store,
15    offset: u64,
16) -> Result<([u8; SEGMENT_HEADER_LEN], SegmentHeader), DbError> {
17    let mut buf = [0u8; SEGMENT_HEADER_LEN];
18    store.read_exact_at(offset, &mut buf)?;
19    let header = decode_segment_header(&buf)?;
20    Ok((buf, header))
21}
22
23pub fn scan_segments(store: &mut impl Store, start: u64) -> Result<Vec<SegmentMeta>, DbError> {
24    let mut out = Vec::new();
25    let mut offset = start;
26    let file_len = store.len()?;
27
28    while offset < file_len {
29        if file_len - offset < SEGMENT_HEADER_LEN as u64 {
30            return Err(DbError::Format(FormatError::TruncatedSegmentHeader {
31                got: (file_len - offset) as usize,
32                expected: SEGMENT_HEADER_LEN,
33            }));
34        }
35
36        let (_, header) = read_segment_header_at(store, offset)?;
37        let payload_start = offset + SEGMENT_HEADER_LEN as u64;
38        let payload_end = payload_start + header.payload_len;
39        if payload_end > file_len {
40            return Err(DbError::Format(FormatError::SegmentPayloadPastEof));
41        }
42
43        // CRC check with bounded reads (no large allocations).
44        let mut remaining = header.payload_len;
45        let mut chunk = [0u8; 8192];
46        let mut cursor = payload_start;
47        let mut crc = 0u32;
48        while remaining > 0 {
49            let to_read = std::cmp::min(remaining as usize, chunk.len());
50            store.read_exact_at(cursor, &mut chunk[..to_read])?;
51            crc = crc32c_append(crc, &chunk[..to_read]);
52            cursor += to_read as u64;
53            remaining -= to_read as u64;
54        }
55        // Checkpoint/Temp payload corruption should not prevent opening and replaying the log.
56        // (Checkpoint is validated when used; Temp is ephemeral and ignored by replay.)
57        if header.segment_type != SegmentType::Checkpoint
58            && header.segment_type != SegmentType::Temp
59            && crc != header.payload_crc32c
60        {
61            return Err(DbError::Format(FormatError::BadSegmentPayloadChecksum));
62        }
63
64        out.push(SegmentMeta { offset, header });
65        offset = payload_end;
66    }
67
68    Ok(out)
69}
70
71/// Read the payload bytes for a segment whose header was validated by [`scan_segments`].
72pub fn read_segment_payload(
73    store: &mut impl Store,
74    meta: &SegmentMeta,
75) -> Result<Vec<u8>, DbError> {
76    let mut payload = vec![0u8; meta.header.payload_len as usize];
77    let start = meta.offset + SEGMENT_HEADER_LEN as u64;
78    store.read_exact_at(start, &mut payload)?;
79    Ok(payload)
80}