1use crate::error::{Result, WalError};
6
7pub const WAL_MAGIC: u32 = 0x5359_4E57; pub const WAL_FORMAT_VERSION: u16 = 1;
26
27pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
29
30pub const HEADER_SIZE: usize = 54;
36
37pub const ENCRYPTED_FLAG: u32 = 0x0000_4000;
41
42pub const REQUIRED_FLAG: u32 = 0x0000_8000;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub struct RecordHeader {
49 pub magic: u32,
50 pub format_version: u16,
51 pub record_type: u32,
52 pub lsn: u64,
53 pub tenant_id: u64,
54 pub vshard_id: u32,
55 pub payload_len: u32,
56 pub reserved: [u8; 16],
59 pub crc32c: u32,
60}
61
62impl RecordHeader {
63 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
64 let mut buf = [0u8; HEADER_SIZE];
65 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
66 buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
67 buf[6..10].copy_from_slice(&self.record_type.to_le_bytes());
68 buf[10..18].copy_from_slice(&self.lsn.to_le_bytes());
69 buf[18..26].copy_from_slice(&self.tenant_id.to_le_bytes());
70 buf[26..30].copy_from_slice(&self.vshard_id.to_le_bytes());
71 buf[30..34].copy_from_slice(&self.payload_len.to_le_bytes());
72 buf[34..50].copy_from_slice(&self.reserved);
73 buf[50..54].copy_from_slice(&self.crc32c.to_le_bytes());
74 buf
75 }
76
77 pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
78 let mut reserved = [0u8; 16];
79 reserved.copy_from_slice(&buf[34..50]);
80 Self {
81 magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
82 format_version: u16::from_le_bytes([buf[4], buf[5]]),
83 record_type: u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]),
84 lsn: u64::from_le_bytes([
85 buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17],
86 ]),
87 tenant_id: u64::from_le_bytes([
88 buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], buf[24], buf[25],
89 ]),
90 vshard_id: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
91 payload_len: u32::from_le_bytes([buf[30], buf[31], buf[32], buf[33]]),
92 reserved,
93 crc32c: u32::from_le_bytes([buf[50], buf[51], buf[52], buf[53]]),
94 }
95 }
96
97 pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
102 let header_bytes = self.to_bytes();
103 let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
104 digest = crc32c::crc32c_append(digest, payload);
105 digest
106 }
107
108 pub fn logical_record_type(&self) -> u32 {
110 self.record_type & !ENCRYPTED_FLAG
111 }
112
113 pub fn validate(&self, offset: u64) -> Result<()> {
114 if self.magic != WAL_MAGIC {
115 return Err(WalError::InvalidMagic {
116 offset,
117 expected: WAL_MAGIC,
118 actual: self.magic,
119 });
120 }
121 if self.format_version != WAL_FORMAT_VERSION {
122 return Err(WalError::UnsupportedVersion {
123 version: self.format_version,
124 supported: WAL_FORMAT_VERSION,
125 });
126 }
127 Ok(())
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 fn make_header(record_type: u32, vshard_id: u32) -> RecordHeader {
136 RecordHeader {
137 magic: WAL_MAGIC,
138 format_version: WAL_FORMAT_VERSION,
139 record_type,
140 lsn: 42,
141 tenant_id: 7,
142 vshard_id,
143 payload_len: 100,
144 reserved: [0u8; 16],
145 crc32c: 0xDEAD_BEEF,
146 }
147 }
148
149 #[test]
150 fn header_roundtrip() {
151 let header = make_header(1 | REQUIRED_FLAG, 3);
152 let bytes = header.to_bytes();
153 assert_eq!(header, RecordHeader::from_bytes(&bytes));
154 }
155
156 #[test]
157 fn header_golden_54_bytes_exact_offsets() {
158 let header = RecordHeader {
162 magic: WAL_MAGIC,
163 format_version: WAL_FORMAT_VERSION,
164 record_type: 1,
165 lsn: 0x0102_0304_0506_0708,
166 tenant_id: 0xDEAD_BEEF_CAFE_1234,
167 vshard_id: 0xCAFE_BABE,
168 payload_len: 256,
169 reserved: [0u8; 16],
170 crc32c: 0x1234_5678,
171 };
172 let b = header.to_bytes();
173 assert_eq!(b.len(), 54);
174 assert_eq!(&b[0..4], &WAL_MAGIC.to_le_bytes());
176 assert_eq!(&b[4..6], &WAL_FORMAT_VERSION.to_le_bytes());
178 assert_eq!(&b[6..10], &1u32.to_le_bytes());
180 assert_eq!(&b[10..18], &0x0102_0304_0506_0708u64.to_le_bytes());
182 assert_eq!(&b[18..26], &0xDEAD_BEEF_CAFE_1234u64.to_le_bytes());
184 assert_eq!(&b[26..30], &0xCAFE_BABEu32.to_le_bytes());
186 assert_eq!(&b[30..34], &256u32.to_le_bytes());
188 assert_eq!(&b[34..50], &[0u8; 16]);
190 assert_eq!(&b[50..54], &0x1234_5678u32.to_le_bytes());
192 }
193
194 #[test]
195 fn tenant_id_above_u32_max_roundtrip() {
196 let tid = u32::MAX as u64 + 1;
198 let header = RecordHeader {
199 magic: WAL_MAGIC,
200 format_version: WAL_FORMAT_VERSION,
201 record_type: 1,
202 lsn: 1,
203 tenant_id: tid,
204 vshard_id: 0,
205 payload_len: 0,
206 reserved: [0u8; 16],
207 crc32c: 0,
208 };
209 let bytes = header.to_bytes();
210 let decoded = RecordHeader::from_bytes(&bytes);
211 assert_eq!(decoded.tenant_id, tid);
212 }
213
214 #[test]
215 fn invalid_magic_detected() {
216 let mut header = make_header(0, 0);
217 header.magic = 0xBAD0_F00D;
218 assert!(matches!(
219 header.validate(0),
220 Err(WalError::InvalidMagic { .. })
221 ));
222 }
223
224 #[test]
225 fn unsupported_version_detected() {
226 let mut header = make_header(0, 0);
227 header.format_version = WAL_FORMAT_VERSION + 1;
228 assert!(matches!(
229 header.validate(0),
230 Err(WalError::UnsupportedVersion { .. })
231 ));
232 }
233
234 #[test]
235 fn version_4_rejected() {
236 let mut header = make_header(0, 0);
238 header.format_version = 4;
239 assert!(matches!(
240 header.validate(0),
241 Err(WalError::UnsupportedVersion { version: 4, .. })
242 ));
243 }
244
245 #[test]
246 fn large_vshard_id_roundtrip() {
247 let header = make_header(1, 0x1234_5678);
249 let bytes = header.to_bytes();
250 let decoded = RecordHeader::from_bytes(&bytes);
251 assert_eq!(decoded.vshard_id, 0x1234_5678u32);
252 }
253
254 #[test]
255 fn encrypted_flag_is_u32() {
256 let header = make_header(1 | ENCRYPTED_FLAG, 0);
257 assert_eq!(header.logical_record_type(), 1);
258 assert!(header.record_type & ENCRYPTED_FLAG != 0);
259 }
260
261 #[test]
262 fn large_record_type_roundtrip() {
263 let header = make_header(0x0001_0001, 0);
265 let bytes = header.to_bytes();
266 let decoded = RecordHeader::from_bytes(&bytes);
267 assert_eq!(decoded.record_type, 0x0001_0001u32);
268 let with_flags = make_header(0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG, 0);
270 let bytes2 = with_flags.to_bytes();
271 let decoded2 = RecordHeader::from_bytes(&bytes2);
272 assert_eq!(
273 decoded2.record_type,
274 0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG
275 );
276 assert_eq!(decoded2.logical_record_type(), 0x0001_0001 | REQUIRED_FLAG);
277 }
278}