1use crate::error::{Result, WalError};
6
7pub const WAL_MAGIC: u32 = 0x5359_4E57; pub const WAL_FORMAT_VERSION: u16 = 1;
26
27pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
29
30pub const HEADER_SIZE: usize = 54;
41
42pub const ENCRYPTED_FLAG: u32 = 0x0000_4000;
46
47pub const REQUIRED_FLAG: u32 = 0x0000_8000;
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct RecordHeader {
54 pub magic: u32,
55 pub format_version: u16,
56 pub record_type: u32,
57 pub lsn: u64,
58 pub tenant_id: u64,
59 pub vshard_id: u32,
60 pub payload_len: u32,
61 pub database_id: u64,
67 pub reserved: [u8; 8],
70 pub crc32c: u32,
71}
72
73impl RecordHeader {
74 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
75 let mut buf = [0u8; HEADER_SIZE];
76 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
77 buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
78 buf[6..10].copy_from_slice(&self.record_type.to_le_bytes());
79 buf[10..18].copy_from_slice(&self.lsn.to_le_bytes());
80 buf[18..26].copy_from_slice(&self.tenant_id.to_le_bytes());
81 buf[26..30].copy_from_slice(&self.vshard_id.to_le_bytes());
82 buf[30..34].copy_from_slice(&self.payload_len.to_le_bytes());
83 buf[34..42].copy_from_slice(&self.database_id.to_le_bytes());
84 buf[42..50].copy_from_slice(&self.reserved);
85 buf[50..54].copy_from_slice(&self.crc32c.to_le_bytes());
86 buf
87 }
88
89 pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
90 let mut reserved = [0u8; 8];
91 reserved.copy_from_slice(&buf[42..50]);
92 Self {
93 magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
94 format_version: u16::from_le_bytes([buf[4], buf[5]]),
95 record_type: u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]),
96 lsn: u64::from_le_bytes([
97 buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17],
98 ]),
99 tenant_id: u64::from_le_bytes([
100 buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], buf[24], buf[25],
101 ]),
102 vshard_id: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
103 payload_len: u32::from_le_bytes([buf[30], buf[31], buf[32], buf[33]]),
104 database_id: u64::from_le_bytes([
105 buf[34], buf[35], buf[36], buf[37], buf[38], buf[39], buf[40], buf[41],
106 ]),
107 reserved,
108 crc32c: u32::from_le_bytes([buf[50], buf[51], buf[52], buf[53]]),
109 }
110 }
111
112 pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
117 let header_bytes = self.to_bytes();
118 let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
119 digest = crc32c::crc32c_append(digest, payload);
120 digest
121 }
122
123 pub fn logical_record_type(&self) -> u32 {
125 self.record_type & !ENCRYPTED_FLAG
126 }
127
128 pub fn validate(&self, offset: u64) -> Result<()> {
129 if self.magic != WAL_MAGIC {
130 return Err(WalError::InvalidMagic {
131 offset,
132 expected: WAL_MAGIC,
133 actual: self.magic,
134 });
135 }
136 if self.format_version != WAL_FORMAT_VERSION {
137 return Err(WalError::UnsupportedVersion {
138 version: self.format_version,
139 supported: WAL_FORMAT_VERSION,
140 });
141 }
142 Ok(())
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 fn make_header(record_type: u32, vshard_id: u32) -> RecordHeader {
151 RecordHeader {
152 magic: WAL_MAGIC,
153 format_version: WAL_FORMAT_VERSION,
154 record_type,
155 lsn: 42,
156 tenant_id: 7,
157 vshard_id,
158 payload_len: 100,
159 database_id: 0,
160 reserved: [0u8; 8],
161 crc32c: 0xDEAD_BEEF,
162 }
163 }
164
165 #[test]
166 fn header_roundtrip() {
167 let header = make_header(1 | REQUIRED_FLAG, 3);
168 let bytes = header.to_bytes();
169 assert_eq!(header, RecordHeader::from_bytes(&bytes));
170 }
171
172 #[test]
173 fn header_golden_54_bytes_exact_offsets() {
174 let header = RecordHeader {
179 magic: WAL_MAGIC,
180 format_version: WAL_FORMAT_VERSION,
181 record_type: 1,
182 lsn: 0x0102_0304_0506_0708,
183 tenant_id: 0xDEAD_BEEF_CAFE_1234,
184 vshard_id: 0xCAFE_BABE,
185 payload_len: 256,
186 database_id: 0xABCD_0000_1234_5678,
187 reserved: [0u8; 8],
188 crc32c: 0x1234_5678,
189 };
190 let b = header.to_bytes();
191 assert_eq!(b.len(), 54);
192 assert_eq!(&b[0..4], &WAL_MAGIC.to_le_bytes());
194 assert_eq!(&b[4..6], &WAL_FORMAT_VERSION.to_le_bytes());
196 assert_eq!(&b[6..10], &1u32.to_le_bytes());
198 assert_eq!(&b[10..18], &0x0102_0304_0506_0708u64.to_le_bytes());
200 assert_eq!(&b[18..26], &0xDEAD_BEEF_CAFE_1234u64.to_le_bytes());
202 assert_eq!(&b[26..30], &0xCAFE_BABEu32.to_le_bytes());
204 assert_eq!(&b[30..34], &256u32.to_le_bytes());
206 assert_eq!(&b[34..42], &0xABCD_0000_1234_5678u64.to_le_bytes());
208 assert_eq!(&b[42..50], &[0u8; 8]);
210 assert_eq!(&b[50..54], &0x1234_5678u32.to_le_bytes());
212 }
213
214 #[test]
215 fn database_id_roundtrip() {
216 let header = RecordHeader {
218 magic: WAL_MAGIC,
219 format_version: WAL_FORMAT_VERSION,
220 record_type: 1,
221 lsn: 1,
222 tenant_id: 42,
223 vshard_id: 0,
224 payload_len: 0,
225 database_id: 7,
226 reserved: [0u8; 8],
227 crc32c: 0,
228 };
229 let bytes = header.to_bytes();
230 let decoded = RecordHeader::from_bytes(&bytes);
231 assert_eq!(decoded.database_id, 7);
232 }
233
234 #[test]
235 fn pre_tier2_zero_database_id_compat() {
236 let mut raw = [0u8; HEADER_SIZE];
239 raw[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
240 raw[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
241 raw[6..10].copy_from_slice(&1u32.to_le_bytes()); let decoded = RecordHeader::from_bytes(&raw);
244 assert_eq!(decoded.database_id, 0);
245 }
246
247 #[test]
248 fn tenant_id_above_u32_max_roundtrip() {
249 let tid = u32::MAX as u64 + 1;
251 let header = RecordHeader {
252 magic: WAL_MAGIC,
253 format_version: WAL_FORMAT_VERSION,
254 record_type: 1,
255 lsn: 1,
256 tenant_id: tid,
257 vshard_id: 0,
258 payload_len: 0,
259 database_id: 0,
260 reserved: [0u8; 8],
261 crc32c: 0,
262 };
263 let bytes = header.to_bytes();
264 let decoded = RecordHeader::from_bytes(&bytes);
265 assert_eq!(decoded.tenant_id, tid);
266 }
267
268 #[test]
269 fn invalid_magic_detected() {
270 let mut header = make_header(0, 0);
271 header.magic = 0xBAD0_F00D;
272 assert!(matches!(
273 header.validate(0),
274 Err(WalError::InvalidMagic { .. })
275 ));
276 }
277
278 #[test]
279 fn unsupported_version_detected() {
280 let mut header = make_header(0, 0);
281 header.format_version = WAL_FORMAT_VERSION + 1;
282 assert!(matches!(
283 header.validate(0),
284 Err(WalError::UnsupportedVersion { .. })
285 ));
286 }
287
288 #[test]
289 fn version_4_rejected() {
290 let mut header = make_header(0, 0);
292 header.format_version = 4;
293 assert!(matches!(
294 header.validate(0),
295 Err(WalError::UnsupportedVersion { version: 4, .. })
296 ));
297 }
298
299 #[test]
300 fn large_vshard_id_roundtrip() {
301 let header = make_header(1, 0x1234_5678);
303 let bytes = header.to_bytes();
304 let decoded = RecordHeader::from_bytes(&bytes);
305 assert_eq!(decoded.vshard_id, 0x1234_5678u32);
306 }
307
308 #[test]
309 fn encrypted_flag_is_u32() {
310 let header = make_header(1 | ENCRYPTED_FLAG, 0);
311 assert_eq!(header.logical_record_type(), 1);
312 assert!(header.record_type & ENCRYPTED_FLAG != 0);
313 }
314
315 #[test]
316 fn large_record_type_roundtrip() {
317 let header = make_header(0x0001_0001, 0);
319 let bytes = header.to_bytes();
320 let decoded = RecordHeader::from_bytes(&bytes);
321 assert_eq!(decoded.record_type, 0x0001_0001u32);
322 let with_flags = make_header(0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG, 0);
324 let bytes2 = with_flags.to_bytes();
325 let decoded2 = RecordHeader::from_bytes(&bytes2);
326 assert_eq!(
327 decoded2.record_type,
328 0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG
329 );
330 assert_eq!(decoded2.logical_record_type(), 0x0001_0001 | REQUIRED_FLAG);
331 }
332}