Skip to main content

modelvault_core/segments/
header.rs

1use crate::checksum::{crc32c, CHECKSUM_KIND_CRC32C};
2use crate::error::{DbError, FormatError};
3
4pub const SEGMENT_MAGIC: [u8; 4] = *b"TSG0";
5pub const SEGMENT_VERSION: u16 = 0;
6pub const SEGMENT_HEADER_LEN: usize = 32;
7
8#[repr(u16)]
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SegmentType {
11    Schema = 1,
12    Record = 2,
13    Manifest = 3,
14    Checkpoint = 4,
15    Index = 5,
16    /// Start of a transaction batch (format minor 6+).
17    TxnBegin = 6,
18    /// Commit all segments since matching [`SegmentType::TxnBegin`].
19    TxnCommit = 7,
20    /// Explicitly discard staged segments since matching begin (optional on disk).
21    TxnAbort = 8,
22    /// Ephemeral spill segments written during bounded-memory query execution (0.12.0+).
23    ///
24    /// These segments are never published via the superblock/manifest and must be ignored by
25    /// replay/recovery scans.
26    Temp = 9,
27}
28
29impl SegmentType {
30    fn from_u16(v: u16) -> Option<Self> {
31        Some(match v {
32            1 => SegmentType::Schema,
33            2 => SegmentType::Record,
34            3 => SegmentType::Manifest,
35            4 => SegmentType::Checkpoint,
36            5 => SegmentType::Index,
37            6 => SegmentType::TxnBegin,
38            7 => SegmentType::TxnCommit,
39            8 => SegmentType::TxnAbort,
40            9 => SegmentType::Temp,
41            _ => return None,
42        })
43    }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct SegmentHeader {
48    pub segment_type: SegmentType,
49    pub payload_len: u64,
50    pub payload_crc32c: u32,
51}
52
53impl SegmentHeader {
54    pub fn encode(self) -> [u8; SEGMENT_HEADER_LEN] {
55        let mut buf = [0u8; SEGMENT_HEADER_LEN];
56        buf[0..4].copy_from_slice(&SEGMENT_MAGIC);
57        buf[4..6].copy_from_slice(&SEGMENT_VERSION.to_le_bytes());
58        buf[6..8].copy_from_slice(&(self.segment_type as u16).to_le_bytes());
59        buf[8..12].copy_from_slice(&(SEGMENT_HEADER_LEN as u32).to_le_bytes());
60        buf[12..20].copy_from_slice(&self.payload_len.to_le_bytes());
61        buf[20..24].copy_from_slice(&self.payload_crc32c.to_le_bytes());
62        buf[24] = CHECKSUM_KIND_CRC32C;
63
64        let crc = crc32c(&buf[0..28]);
65        buf[28..32].copy_from_slice(&crc.to_le_bytes());
66        buf
67    }
68}
69
70pub fn decode_segment_header(bytes: &[u8]) -> Result<SegmentHeader, DbError> {
71    if bytes.len() < SEGMENT_HEADER_LEN {
72        return Err(DbError::Format(FormatError::TruncatedSegmentHeader {
73            got: bytes.len(),
74            expected: SEGMENT_HEADER_LEN,
75        }));
76    }
77
78    if bytes[0..4] != SEGMENT_MAGIC {
79        let mut got = [0u8; 4];
80        got.copy_from_slice(&bytes[0..4]);
81        return Err(DbError::Format(FormatError::BadSegmentMagic { got }));
82    }
83
84    let version = u16::from_le_bytes([bytes[4], bytes[5]]);
85    if version != SEGMENT_VERSION {
86        return Err(DbError::Format(FormatError::UnsupportedVersion {
87            major: 0,
88            minor: version,
89        }));
90    }
91
92    let segment_type_raw = u16::from_le_bytes([bytes[6], bytes[7]]);
93    let Some(segment_type) = SegmentType::from_u16(segment_type_raw) else {
94        return Err(DbError::Format(FormatError::UnsupportedVersion {
95            major: 0,
96            minor: segment_type_raw,
97        }));
98    };
99
100    let header_len = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
101    if header_len as usize != SEGMENT_HEADER_LEN {
102        return Err(DbError::Format(FormatError::UnsupportedVersion {
103            major: 0,
104            minor: header_len as u16,
105        }));
106    }
107
108    let payload_len = u64::from_le_bytes(bytes[12..20].try_into().unwrap());
109    let payload_crc32c = u32::from_le_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]);
110
111    let checksum_kind = bytes[24];
112    if checksum_kind != CHECKSUM_KIND_CRC32C {
113        return Err(DbError::Format(FormatError::UnsupportedVersion {
114            major: 0,
115            minor: checksum_kind as u16,
116        }));
117    }
118
119    let expected_crc = u32::from_le_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]);
120    let actual_crc = crc32c(&bytes[0..28]);
121    if expected_crc != actual_crc {
122        return Err(DbError::Format(FormatError::BadSegmentHeaderChecksum));
123    }
124
125    Ok(SegmentHeader {
126        segment_type,
127        payload_len,
128        payload_crc32c,
129    })
130}