Skip to main content

nodedb_wal/record/
header.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! WAL record header: fixed 54-byte prefix + constants.
4
5use crate::error::{Result, WalError};
6
7/// Magic number identifying a NodeDB WAL record.
8pub const WAL_MAGIC: u32 = 0x5359_4E57; // "SYNW"
9
10/// Current WAL format version.
11///
12/// v2 introduces bitemporal record layout: `LsnMsAnchor` records (type 102)
13/// provide stable LSN↔wall-clock interpolation, and engine-level writers emit
14/// `system_from_ms` in versioned keys.
15///
16/// v3 introduces the 16-byte segment preamble (`WALP` magic) written at offset
17/// 0 of every WAL segment file.
18///
19/// v4 widens `record_type` u16→u32 and `vshard_id` u16→u32, adds 16 reserved
20/// bytes (covered by CRC32C) before the checksum, and bumps `HEADER_SIZE` to
21/// 50 bytes. Pre-release — no v1/v2/v3 readers supported.
22///
23/// v1 is the initial shipped format with 54-byte headers (u64 tenant_id,
24/// u16 vshard_id, u32 payload_len, u16 reserved, u32 crc32c).
25pub const WAL_FORMAT_VERSION: u16 = 1;
26
27/// Maximum WAL record payload size (64 MiB). Distinct from cluster RPC's limit.
28pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
29
30/// Size of the record header in bytes.
31///
32/// Layout (all little-endian):
33///   magic(4) | format_version(2) | record_type(4) | lsn(8) | tenant_id(8)
34///   | vshard_id(4) | payload_len(4) | database_id(8) | reserved(8) | crc32c(4)
35///
36/// `database_id` occupies bytes 34–41 (previously part of the 16-byte reserved
37/// field). `reserved` occupies bytes 42–49. Bytes 34–41 were zero-filled in
38/// prior records, so `database_id == 0` maps to `DatabaseId(0)` (the default
39/// database), preserving backward compatibility without a format-version bump.
40pub const HEADER_SIZE: usize = 54;
41
42/// Bit 14 in `record_type` signals the payload is AES-256-GCM encrypted.
43/// Separate from bit 15 (required flag). Both bits keep their positions;
44/// the type is now u32 so the constants are widened accordingly.
45pub const ENCRYPTED_FLAG: u32 = 0x0000_4000;
46
47/// Bit 15: required-flag. Records with this bit set and an unknown type
48/// must not be silently skipped.
49pub const REQUIRED_FLAG: u32 = 0x0000_8000;
50
51/// WAL record header (fixed 54 bytes).
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct RecordHeader {
54    pub magic: u32,
55    pub format_version: u16,
56    pub record_type: u32,
57    pub lsn: u64,
58    pub tenant_id: u64,
59    pub vshard_id: u32,
60    pub payload_len: u32,
61    /// Database scope for this record. Stored as a raw `u64`; callers convert
62    /// to/from `DatabaseId`. Pre-Tier-2 records had zeros here, so `0` maps to
63    /// `DatabaseId(0)` (the default database) — fully backward compatible.
64    ///
65    /// Occupies bytes 34–41 of the on-disk header (previously part of reserved).
66    pub database_id: u64,
67    /// Reserved for future use; must be zero on write; ignored on read
68    /// (but covered by CRC32C). Occupies bytes 42–49.
69    pub reserved: [u8; 8],
70    pub crc32c: u32,
71}
72
73impl RecordHeader {
74    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
75        let mut buf = [0u8; HEADER_SIZE];
76        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
77        buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
78        buf[6..10].copy_from_slice(&self.record_type.to_le_bytes());
79        buf[10..18].copy_from_slice(&self.lsn.to_le_bytes());
80        buf[18..26].copy_from_slice(&self.tenant_id.to_le_bytes());
81        buf[26..30].copy_from_slice(&self.vshard_id.to_le_bytes());
82        buf[30..34].copy_from_slice(&self.payload_len.to_le_bytes());
83        buf[34..42].copy_from_slice(&self.database_id.to_le_bytes());
84        buf[42..50].copy_from_slice(&self.reserved);
85        buf[50..54].copy_from_slice(&self.crc32c.to_le_bytes());
86        buf
87    }
88
89    pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
90        let mut reserved = [0u8; 8];
91        reserved.copy_from_slice(&buf[42..50]);
92        Self {
93            magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
94            format_version: u16::from_le_bytes([buf[4], buf[5]]),
95            record_type: u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]),
96            lsn: u64::from_le_bytes([
97                buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17],
98            ]),
99            tenant_id: u64::from_le_bytes([
100                buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], buf[24], buf[25],
101            ]),
102            vshard_id: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
103            payload_len: u32::from_le_bytes([buf[30], buf[31], buf[32], buf[33]]),
104            database_id: u64::from_le_bytes([
105                buf[34], buf[35], buf[36], buf[37], buf[38], buf[39], buf[40], buf[41],
106            ]),
107            reserved,
108            crc32c: u32::from_le_bytes([buf[50], buf[51], buf[52], buf[53]]),
109        }
110    }
111
112    /// CRC32C over header (excluding the crc32c field) + payload.
113    ///
114    /// The 16 reserved bytes are included in the CRC so they cannot be
115    /// silently modified without detection.
116    pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
117        let header_bytes = self.to_bytes();
118        let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
119        digest = crc32c::crc32c_append(digest, payload);
120        digest
121    }
122
123    /// Logical record type with the encryption flag stripped.
124    pub fn logical_record_type(&self) -> u32 {
125        self.record_type & !ENCRYPTED_FLAG
126    }
127
128    pub fn validate(&self, offset: u64) -> Result<()> {
129        if self.magic != WAL_MAGIC {
130            return Err(WalError::InvalidMagic {
131                offset,
132                expected: WAL_MAGIC,
133                actual: self.magic,
134            });
135        }
136        if self.format_version != WAL_FORMAT_VERSION {
137            return Err(WalError::UnsupportedVersion {
138                version: self.format_version,
139                supported: WAL_FORMAT_VERSION,
140            });
141        }
142        Ok(())
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    fn make_header(record_type: u32, vshard_id: u32) -> RecordHeader {
151        RecordHeader {
152            magic: WAL_MAGIC,
153            format_version: WAL_FORMAT_VERSION,
154            record_type,
155            lsn: 42,
156            tenant_id: 7,
157            vshard_id,
158            payload_len: 100,
159            database_id: 0,
160            reserved: [0u8; 8],
161            crc32c: 0xDEAD_BEEF,
162        }
163    }
164
165    #[test]
166    fn header_roundtrip() {
167        let header = make_header(1 | REQUIRED_FLAG, 3);
168        let bytes = header.to_bytes();
169        assert_eq!(header, RecordHeader::from_bytes(&bytes));
170    }
171
172    #[test]
173    fn header_golden_54_bytes_exact_offsets() {
174        // magic at 0..4, format_version at 4..6, record_type at 6..10,
175        // lsn at 10..18, tenant_id at 18..26, vshard_id at 26..30,
176        // payload_len at 30..34, database_id at 34..42, reserved at 42..50,
177        // crc32c at 50..54.
178        let header = RecordHeader {
179            magic: WAL_MAGIC,
180            format_version: WAL_FORMAT_VERSION,
181            record_type: 1,
182            lsn: 0x0102_0304_0506_0708,
183            tenant_id: 0xDEAD_BEEF_CAFE_1234,
184            vshard_id: 0xCAFE_BABE,
185            payload_len: 256,
186            database_id: 0xABCD_0000_1234_5678,
187            reserved: [0u8; 8],
188            crc32c: 0x1234_5678,
189        };
190        let b = header.to_bytes();
191        assert_eq!(b.len(), 54);
192        // magic
193        assert_eq!(&b[0..4], &WAL_MAGIC.to_le_bytes());
194        // format_version
195        assert_eq!(&b[4..6], &WAL_FORMAT_VERSION.to_le_bytes());
196        // record_type
197        assert_eq!(&b[6..10], &1u32.to_le_bytes());
198        // lsn
199        assert_eq!(&b[10..18], &0x0102_0304_0506_0708u64.to_le_bytes());
200        // tenant_id (u64, 8 bytes)
201        assert_eq!(&b[18..26], &0xDEAD_BEEF_CAFE_1234u64.to_le_bytes());
202        // vshard_id
203        assert_eq!(&b[26..30], &0xCAFE_BABEu32.to_le_bytes());
204        // payload_len
205        assert_eq!(&b[30..34], &256u32.to_le_bytes());
206        // database_id
207        assert_eq!(&b[34..42], &0xABCD_0000_1234_5678u64.to_le_bytes());
208        // reserved — all zero
209        assert_eq!(&b[42..50], &[0u8; 8]);
210        // crc32c
211        assert_eq!(&b[50..54], &0x1234_5678u32.to_le_bytes());
212    }
213
214    #[test]
215    fn database_id_roundtrip() {
216        // Non-zero database_id survives to_bytes → from_bytes.
217        let header = RecordHeader {
218            magic: WAL_MAGIC,
219            format_version: WAL_FORMAT_VERSION,
220            record_type: 1,
221            lsn: 1,
222            tenant_id: 42,
223            vshard_id: 0,
224            payload_len: 0,
225            database_id: 7,
226            reserved: [0u8; 8],
227            crc32c: 0,
228        };
229        let bytes = header.to_bytes();
230        let decoded = RecordHeader::from_bytes(&bytes);
231        assert_eq!(decoded.database_id, 7);
232    }
233
234    #[test]
235    fn pre_tier2_zero_database_id_compat() {
236        // A record written before Tier 2 has zeros at bytes 34..42.
237        // from_bytes must decode that as database_id == 0 (the default database).
238        let mut raw = [0u8; HEADER_SIZE];
239        raw[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
240        raw[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
241        raw[6..10].copy_from_slice(&1u32.to_le_bytes()); // record_type
242        // bytes 34..50 stay zero (pre-Tier-2 reserved field)
243        let decoded = RecordHeader::from_bytes(&raw);
244        assert_eq!(decoded.database_id, 0);
245    }
246
247    #[test]
248    fn tenant_id_above_u32_max_roundtrip() {
249        // Verify u64 tenant_id with a value > u32::MAX is preserved exactly.
250        let tid = u32::MAX as u64 + 1;
251        let header = RecordHeader {
252            magic: WAL_MAGIC,
253            format_version: WAL_FORMAT_VERSION,
254            record_type: 1,
255            lsn: 1,
256            tenant_id: tid,
257            vshard_id: 0,
258            payload_len: 0,
259            database_id: 0,
260            reserved: [0u8; 8],
261            crc32c: 0,
262        };
263        let bytes = header.to_bytes();
264        let decoded = RecordHeader::from_bytes(&bytes);
265        assert_eq!(decoded.tenant_id, tid);
266    }
267
268    #[test]
269    fn invalid_magic_detected() {
270        let mut header = make_header(0, 0);
271        header.magic = 0xBAD0_F00D;
272        assert!(matches!(
273            header.validate(0),
274            Err(WalError::InvalidMagic { .. })
275        ));
276    }
277
278    #[test]
279    fn unsupported_version_detected() {
280        let mut header = make_header(0, 0);
281        header.format_version = WAL_FORMAT_VERSION + 1;
282        assert!(matches!(
283            header.validate(0),
284            Err(WalError::UnsupportedVersion { .. })
285        ));
286    }
287
288    #[test]
289    fn version_4_rejected() {
290        // Regression: bumping from v4 to v5 — a v4 header must be rejected.
291        let mut header = make_header(0, 0);
292        header.format_version = 4;
293        assert!(matches!(
294            header.validate(0),
295            Err(WalError::UnsupportedVersion { version: 4, .. })
296        ));
297    }
298
299    #[test]
300    fn large_vshard_id_roundtrip() {
301        // 0x1234_5678 is well above old u16::MAX; ensure no truncation.
302        let header = make_header(1, 0x1234_5678);
303        let bytes = header.to_bytes();
304        let decoded = RecordHeader::from_bytes(&bytes);
305        assert_eq!(decoded.vshard_id, 0x1234_5678u32);
306    }
307
308    #[test]
309    fn encrypted_flag_is_u32() {
310        let header = make_header(1 | ENCRYPTED_FLAG, 0);
311        assert_eq!(header.logical_record_type(), 1);
312        assert!(header.record_type & ENCRYPTED_FLAG != 0);
313    }
314
315    #[test]
316    fn large_record_type_roundtrip() {
317        // 0x0001_0001 has bits above old u16 max set; verify u32 width preserved.
318        let header = make_header(0x0001_0001, 0);
319        let bytes = header.to_bytes();
320        let decoded = RecordHeader::from_bytes(&bytes);
321        assert_eq!(decoded.record_type, 0x0001_0001u32);
322        // Flags in bit-14 and bit-15 positions still work alongside high bits.
323        let with_flags = make_header(0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG, 0);
324        let bytes2 = with_flags.to_bytes();
325        let decoded2 = RecordHeader::from_bytes(&bytes2);
326        assert_eq!(
327            decoded2.record_type,
328            0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG
329        );
330        assert_eq!(decoded2.logical_record_type(), 0x0001_0001 | REQUIRED_FLAG);
331    }
332}