Skip to main content

nodedb_wal/record/
header.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! WAL record header: fixed 50-byte prefix + constants.
4
5use crate::error::{Result, WalError};
6
7/// Magic number identifying a NodeDB WAL record.
8pub const WAL_MAGIC: u32 = 0x5359_4E57; // "SYNW"
9
10/// Current WAL format version.
11///
12/// v2 introduces bitemporal record layout: `LsnMsAnchor` records (type 102)
13/// provide stable LSN↔wall-clock interpolation, and engine-level writers emit
14/// `system_from_ms` in versioned keys.
15///
16/// v3 introduces the 16-byte segment preamble (`WALP` magic) written at offset
17/// 0 of every WAL segment file.
18///
19/// v4 widens `record_type` u16→u32 and `vshard_id` u16→u32, adds 16 reserved
20/// bytes (covered by CRC32C) before the checksum, and bumps `HEADER_SIZE` to
21/// 50 bytes. Pre-release — no v1/v2/v3 readers supported.
22///
23/// v1 is the initial shipped format with 54-byte headers (u64 tenant_id,
24/// u16 vshard_id, u32 payload_len, u16 reserved, u32 crc32c).
25pub const WAL_FORMAT_VERSION: u16 = 1;
26
27/// Maximum WAL record payload size (64 MiB). Distinct from cluster RPC's limit.
28pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
29
30/// Size of the record header in bytes.
31///
32/// Layout (all little-endian):
33///   magic(4) | format_version(2) | record_type(4) | lsn(8) | tenant_id(8)
34///   | vshard_id(4) | payload_len(4) | reserved(16) | crc32c(4)
35pub const HEADER_SIZE: usize = 54;
36
37/// Bit 14 in `record_type` signals the payload is AES-256-GCM encrypted.
38/// Separate from bit 15 (required flag). Both bits keep their positions;
39/// the type is now u32 so the constants are widened accordingly.
40pub const ENCRYPTED_FLAG: u32 = 0x0000_4000;
41
42/// Bit 15: required-flag. Records with this bit set and an unknown type
43/// must not be silently skipped.
44pub const REQUIRED_FLAG: u32 = 0x0000_8000;
45
46/// WAL record header (fixed 54 bytes).
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub struct RecordHeader {
49    pub magic: u32,
50    pub format_version: u16,
51    pub record_type: u32,
52    pub lsn: u64,
53    pub tenant_id: u64,
54    pub vshard_id: u32,
55    pub payload_len: u32,
56    /// Reserved for future use; must be zero on write; ignored on read
57    /// (but covered by CRC32C).
58    pub reserved: [u8; 16],
59    pub crc32c: u32,
60}
61
62impl RecordHeader {
63    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
64        let mut buf = [0u8; HEADER_SIZE];
65        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
66        buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
67        buf[6..10].copy_from_slice(&self.record_type.to_le_bytes());
68        buf[10..18].copy_from_slice(&self.lsn.to_le_bytes());
69        buf[18..26].copy_from_slice(&self.tenant_id.to_le_bytes());
70        buf[26..30].copy_from_slice(&self.vshard_id.to_le_bytes());
71        buf[30..34].copy_from_slice(&self.payload_len.to_le_bytes());
72        buf[34..50].copy_from_slice(&self.reserved);
73        buf[50..54].copy_from_slice(&self.crc32c.to_le_bytes());
74        buf
75    }
76
77    pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
78        let mut reserved = [0u8; 16];
79        reserved.copy_from_slice(&buf[34..50]);
80        Self {
81            magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
82            format_version: u16::from_le_bytes([buf[4], buf[5]]),
83            record_type: u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]),
84            lsn: u64::from_le_bytes([
85                buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17],
86            ]),
87            tenant_id: u64::from_le_bytes([
88                buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], buf[24], buf[25],
89            ]),
90            vshard_id: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
91            payload_len: u32::from_le_bytes([buf[30], buf[31], buf[32], buf[33]]),
92            reserved,
93            crc32c: u32::from_le_bytes([buf[50], buf[51], buf[52], buf[53]]),
94        }
95    }
96
97    /// CRC32C over header (excluding the crc32c field) + payload.
98    ///
99    /// The 16 reserved bytes are included in the CRC so they cannot be
100    /// silently modified without detection.
101    pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
102        let header_bytes = self.to_bytes();
103        let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
104        digest = crc32c::crc32c_append(digest, payload);
105        digest
106    }
107
108    /// Logical record type with the encryption flag stripped.
109    pub fn logical_record_type(&self) -> u32 {
110        self.record_type & !ENCRYPTED_FLAG
111    }
112
113    pub fn validate(&self, offset: u64) -> Result<()> {
114        if self.magic != WAL_MAGIC {
115            return Err(WalError::InvalidMagic {
116                offset,
117                expected: WAL_MAGIC,
118                actual: self.magic,
119            });
120        }
121        if self.format_version != WAL_FORMAT_VERSION {
122            return Err(WalError::UnsupportedVersion {
123                version: self.format_version,
124                supported: WAL_FORMAT_VERSION,
125            });
126        }
127        Ok(())
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    fn make_header(record_type: u32, vshard_id: u32) -> RecordHeader {
136        RecordHeader {
137            magic: WAL_MAGIC,
138            format_version: WAL_FORMAT_VERSION,
139            record_type,
140            lsn: 42,
141            tenant_id: 7,
142            vshard_id,
143            payload_len: 100,
144            reserved: [0u8; 16],
145            crc32c: 0xDEAD_BEEF,
146        }
147    }
148
149    #[test]
150    fn header_roundtrip() {
151        let header = make_header(1 | REQUIRED_FLAG, 3);
152        let bytes = header.to_bytes();
153        assert_eq!(header, RecordHeader::from_bytes(&bytes));
154    }
155
156    #[test]
157    fn header_golden_54_bytes_exact_offsets() {
158        // magic at 0..4, format_version at 4..6, record_type at 6..10,
159        // lsn at 10..18, tenant_id at 18..26, vshard_id at 26..30,
160        // payload_len at 30..34, reserved at 34..50, crc32c at 50..54.
161        let header = RecordHeader {
162            magic: WAL_MAGIC,
163            format_version: WAL_FORMAT_VERSION,
164            record_type: 1,
165            lsn: 0x0102_0304_0506_0708,
166            tenant_id: 0xDEAD_BEEF_CAFE_1234,
167            vshard_id: 0xCAFE_BABE,
168            payload_len: 256,
169            reserved: [0u8; 16],
170            crc32c: 0x1234_5678,
171        };
172        let b = header.to_bytes();
173        assert_eq!(b.len(), 54);
174        // magic
175        assert_eq!(&b[0..4], &WAL_MAGIC.to_le_bytes());
176        // format_version
177        assert_eq!(&b[4..6], &WAL_FORMAT_VERSION.to_le_bytes());
178        // record_type
179        assert_eq!(&b[6..10], &1u32.to_le_bytes());
180        // lsn
181        assert_eq!(&b[10..18], &0x0102_0304_0506_0708u64.to_le_bytes());
182        // tenant_id (now u64, 8 bytes)
183        assert_eq!(&b[18..26], &0xDEAD_BEEF_CAFE_1234u64.to_le_bytes());
184        // vshard_id
185        assert_eq!(&b[26..30], &0xCAFE_BABEu32.to_le_bytes());
186        // payload_len
187        assert_eq!(&b[30..34], &256u32.to_le_bytes());
188        // reserved — all zero
189        assert_eq!(&b[34..50], &[0u8; 16]);
190        // crc32c
191        assert_eq!(&b[50..54], &0x1234_5678u32.to_le_bytes());
192    }
193
194    #[test]
195    fn tenant_id_above_u32_max_roundtrip() {
196        // Verify u64 tenant_id with a value > u32::MAX is preserved exactly.
197        let tid = u32::MAX as u64 + 1;
198        let header = RecordHeader {
199            magic: WAL_MAGIC,
200            format_version: WAL_FORMAT_VERSION,
201            record_type: 1,
202            lsn: 1,
203            tenant_id: tid,
204            vshard_id: 0,
205            payload_len: 0,
206            reserved: [0u8; 16],
207            crc32c: 0,
208        };
209        let bytes = header.to_bytes();
210        let decoded = RecordHeader::from_bytes(&bytes);
211        assert_eq!(decoded.tenant_id, tid);
212    }
213
214    #[test]
215    fn invalid_magic_detected() {
216        let mut header = make_header(0, 0);
217        header.magic = 0xBAD0_F00D;
218        assert!(matches!(
219            header.validate(0),
220            Err(WalError::InvalidMagic { .. })
221        ));
222    }
223
224    #[test]
225    fn unsupported_version_detected() {
226        let mut header = make_header(0, 0);
227        header.format_version = WAL_FORMAT_VERSION + 1;
228        assert!(matches!(
229            header.validate(0),
230            Err(WalError::UnsupportedVersion { .. })
231        ));
232    }
233
234    #[test]
235    fn version_4_rejected() {
236        // Regression: bumping from v4 to v5 — a v4 header must be rejected.
237        let mut header = make_header(0, 0);
238        header.format_version = 4;
239        assert!(matches!(
240            header.validate(0),
241            Err(WalError::UnsupportedVersion { version: 4, .. })
242        ));
243    }
244
245    #[test]
246    fn large_vshard_id_roundtrip() {
247        // 0x1234_5678 is well above old u16::MAX; ensure no truncation.
248        let header = make_header(1, 0x1234_5678);
249        let bytes = header.to_bytes();
250        let decoded = RecordHeader::from_bytes(&bytes);
251        assert_eq!(decoded.vshard_id, 0x1234_5678u32);
252    }
253
254    #[test]
255    fn encrypted_flag_is_u32() {
256        let header = make_header(1 | ENCRYPTED_FLAG, 0);
257        assert_eq!(header.logical_record_type(), 1);
258        assert!(header.record_type & ENCRYPTED_FLAG != 0);
259    }
260
261    #[test]
262    fn large_record_type_roundtrip() {
263        // 0x0001_0001 has bits above old u16 max set; verify u32 width preserved.
264        let header = make_header(0x0001_0001, 0);
265        let bytes = header.to_bytes();
266        let decoded = RecordHeader::from_bytes(&bytes);
267        assert_eq!(decoded.record_type, 0x0001_0001u32);
268        // Flags in bit-14 and bit-15 positions still work alongside high bits.
269        let with_flags = make_header(0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG, 0);
270        let bytes2 = with_flags.to_bytes();
271        let decoded2 = RecordHeader::from_bytes(&bytes2);
272        assert_eq!(
273            decoded2.record_type,
274            0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG
275        );
276        assert_eq!(decoded2.logical_record_type(), 0x0001_0001 | REQUIRED_FLAG);
277    }
278}