1use crate::error::{Result, WalError};
15
16pub const WAL_MAGIC: u32 = 0x5359_4E57; pub const WAL_FORMAT_VERSION: u16 = 1;
21
22pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
24
25pub const HEADER_SIZE: usize = 30;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct RecordHeader {
31 pub magic: u32,
33
34 pub format_version: u16,
36
37 pub record_type: u16,
39
40 pub lsn: u64,
42
43 pub tenant_id: u32,
45
46 pub vshard_id: u16,
48
49 pub payload_len: u32,
51
52 pub crc32c: u32,
54}
55
56impl RecordHeader {
57 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
59 let mut buf = [0u8; HEADER_SIZE];
60 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
61 buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
62 buf[6..8].copy_from_slice(&self.record_type.to_le_bytes());
63 buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
64 buf[16..20].copy_from_slice(&self.tenant_id.to_le_bytes());
65 buf[20..22].copy_from_slice(&self.vshard_id.to_le_bytes());
66 buf[22..26].copy_from_slice(&self.payload_len.to_le_bytes());
67 buf[26..30].copy_from_slice(&self.crc32c.to_le_bytes());
68 buf
69 }
70
71 pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
73 Self {
74 magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
75 format_version: u16::from_le_bytes([buf[4], buf[5]]),
76 record_type: u16::from_le_bytes([buf[6], buf[7]]),
77 lsn: u64::from_le_bytes([
78 buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
79 ]),
80 tenant_id: u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]),
81 vshard_id: u16::from_le_bytes([buf[20], buf[21]]),
82 payload_len: u32::from_le_bytes([buf[22], buf[23], buf[24], buf[25]]),
83 crc32c: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
84 }
85 }
86
87 pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
89 let header_bytes = self.to_bytes();
90 let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
92 digest = crc32c::crc32c_append(digest, payload);
93 digest
94 }
95
96 pub fn logical_record_type(&self) -> u16 {
98 self.record_type & !ENCRYPTED_FLAG
99 }
100
101 pub fn validate(&self, offset: u64) -> Result<()> {
103 if self.magic != WAL_MAGIC {
104 return Err(WalError::InvalidMagic {
105 offset,
106 expected: WAL_MAGIC,
107 actual: self.magic,
108 });
109 }
110 if self.format_version > WAL_FORMAT_VERSION {
111 return Err(WalError::UnsupportedVersion {
112 version: self.format_version,
113 supported: WAL_FORMAT_VERSION,
114 });
115 }
116 Ok(())
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128#[repr(u16)]
129pub enum RecordType {
130 Noop = 0,
132
133 Put = 1 | 0x8000,
135
136 Delete = 2 | 0x8000,
138
139 VectorPut = 10 | 0x8000,
141
142 VectorDelete = 11 | 0x8000,
144
145 VectorParams = 12 | 0x8000,
147
148 CrdtDelta = 20 | 0x8000,
150
151 TimeseriesBatch = 30,
153
154 LogBatch = 31,
156
157 Transaction = 50 | 0x8000,
161
162 Checkpoint = 100 | 0x8000,
164
165 CollectionTombstoned = 101 | 0x8000,
172}
173
174impl RecordType {
175 pub fn is_required(raw: u16) -> bool {
177 raw & 0x8000 != 0
178 }
179
180 pub fn from_raw(raw: u16) -> Option<Self> {
182 match raw {
183 0 => Some(Self::Noop),
184 x if x == 1 | 0x8000 => Some(Self::Put),
185 x if x == 2 | 0x8000 => Some(Self::Delete),
186 x if x == 10 | 0x8000 => Some(Self::VectorPut),
187 x if x == 11 | 0x8000 => Some(Self::VectorDelete),
188 x if x == 12 | 0x8000 => Some(Self::VectorParams),
189 x if x == 20 | 0x8000 => Some(Self::CrdtDelta),
190 x if x == 50 | 0x8000 => Some(Self::Transaction),
191 30 => Some(Self::TimeseriesBatch),
192 31 => Some(Self::LogBatch),
193 x if x == 100 | 0x8000 => Some(Self::Checkpoint),
194 x if x == 101 | 0x8000 => Some(Self::CollectionTombstoned),
195 _ => None,
196 }
197 }
198}
199
200#[derive(Debug, Clone)]
202pub struct WalRecord {
203 pub header: RecordHeader,
204 pub payload: Vec<u8>,
205}
206
207impl WalRecord {
208 pub fn new(
213 record_type: u16,
214 lsn: u64,
215 tenant_id: u32,
216 vshard_id: u16,
217 payload: Vec<u8>,
218 encryption_key: Option<&crate::crypto::WalEncryptionKey>,
219 ) -> Result<Self> {
220 if payload.len() > MAX_WAL_PAYLOAD_SIZE {
221 return Err(WalError::PayloadTooLarge {
222 size: payload.len(),
223 max: MAX_WAL_PAYLOAD_SIZE,
224 });
225 }
226
227 let (final_payload, encrypted) = if let Some(key) = encryption_key {
229 let temp_header = RecordHeader {
231 magic: WAL_MAGIC,
232 format_version: WAL_FORMAT_VERSION,
233 record_type,
234 lsn,
235 tenant_id,
236 vshard_id,
237 payload_len: 0, crc32c: 0,
239 };
240 let header_bytes = temp_header.to_bytes();
241 let ciphertext = key.encrypt(lsn, &header_bytes, &payload)?;
242 (ciphertext, true)
243 } else {
244 (payload, false)
245 };
246
247 let record_type = if encrypted {
249 record_type | ENCRYPTED_FLAG
250 } else {
251 record_type
252 };
253
254 let mut header = RecordHeader {
255 magic: WAL_MAGIC,
256 format_version: WAL_FORMAT_VERSION,
257 record_type,
258 lsn,
259 tenant_id,
260 vshard_id,
261 payload_len: final_payload.len() as u32,
262 crc32c: 0,
263 };
264
265 header.crc32c = header.compute_checksum(&final_payload);
266
267 Ok(Self {
268 header,
269 payload: final_payload,
270 })
271 }
272
273 pub fn decrypt_payload(
278 &self,
279 epoch: &[u8; 4],
280 encryption_key: Option<&crate::crypto::WalEncryptionKey>,
281 ) -> Result<Vec<u8>> {
282 if !self.is_encrypted() {
283 return Ok(self.payload.clone());
284 }
285
286 let key = encryption_key.ok_or_else(|| WalError::EncryptionError {
287 detail: "record is encrypted but no decryption key provided".into(),
288 })?;
289
290 let mut aad_header = self.header;
293 aad_header.record_type &= !ENCRYPTED_FLAG;
294 aad_header.payload_len = 0;
295 aad_header.crc32c = 0;
296 let header_bytes = aad_header.to_bytes();
297
298 key.decrypt(epoch, self.header.lsn, &header_bytes, &self.payload)
299 }
300
301 pub fn decrypt_payload_ring(
307 &self,
308 epoch: &[u8; 4],
309 ring: Option<&crate::crypto::KeyRing>,
310 ) -> Result<Vec<u8>> {
311 if !self.is_encrypted() {
312 return Ok(self.payload.clone());
313 }
314
315 let ring = ring.ok_or_else(|| WalError::EncryptionError {
316 detail: "record is encrypted but no decryption key ring provided".into(),
317 })?;
318
319 let mut aad_header = self.header;
320 aad_header.record_type &= !ENCRYPTED_FLAG;
321 aad_header.payload_len = 0;
322 aad_header.crc32c = 0;
323 let header_bytes = aad_header.to_bytes();
324
325 ring.decrypt(epoch, self.header.lsn, &header_bytes, &self.payload)
326 }
327
328 pub fn is_encrypted(&self) -> bool {
330 self.header.record_type & ENCRYPTED_FLAG != 0
331 }
332
333 pub fn logical_record_type(&self) -> u16 {
335 self.header.record_type & !ENCRYPTED_FLAG
336 }
337
338 pub fn verify_checksum(&self) -> Result<()> {
340 let expected = self.header.crc32c;
341 let actual = self.header.compute_checksum(&self.payload);
342 if expected != actual {
343 return Err(WalError::ChecksumMismatch {
344 lsn: self.header.lsn,
345 expected,
346 actual,
347 });
348 }
349 Ok(())
350 }
351
352 pub fn wire_size(&self) -> usize {
354 HEADER_SIZE + self.payload.len()
355 }
356}
357
358pub const ENCRYPTED_FLAG: u16 = 0x4000;
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn header_roundtrip() {
368 let header = RecordHeader {
369 magic: WAL_MAGIC,
370 format_version: WAL_FORMAT_VERSION,
371 record_type: RecordType::Put as u16,
372 lsn: 42,
373 tenant_id: 7,
374 vshard_id: 3,
375 payload_len: 100,
376 crc32c: 0xDEAD_BEEF,
377 };
378
379 let bytes = header.to_bytes();
380 let decoded = RecordHeader::from_bytes(&bytes);
381 assert_eq!(header, decoded);
382 }
383
384 #[test]
385 fn checksum_roundtrip() {
386 let payload = b"hello nodedb";
387 let record =
388 WalRecord::new(RecordType::Put as u16, 1, 0, 0, payload.to_vec(), None).unwrap();
389
390 record.verify_checksum().unwrap();
391 }
392
393 #[test]
394 fn checksum_detects_corruption() {
395 let payload = b"hello nodedb";
396 let mut record =
397 WalRecord::new(RecordType::Put as u16, 1, 0, 0, payload.to_vec(), None).unwrap();
398
399 record.payload[0] ^= 0xFF;
401
402 assert!(matches!(
403 record.verify_checksum(),
404 Err(WalError::ChecksumMismatch { .. })
405 ));
406 }
407
408 #[test]
409 fn invalid_magic_detected() {
410 let header = RecordHeader {
411 magic: 0xBAD0_F00D,
412 format_version: WAL_FORMAT_VERSION,
413 record_type: 0,
414 lsn: 0,
415 tenant_id: 0,
416 vshard_id: 0,
417 payload_len: 0,
418 crc32c: 0,
419 };
420
421 assert!(matches!(
422 header.validate(0),
423 Err(WalError::InvalidMagic { .. })
424 ));
425 }
426
427 #[test]
428 fn payload_too_large_rejected() {
429 let big_payload = vec![0u8; MAX_WAL_PAYLOAD_SIZE + 1];
430 assert!(matches!(
431 WalRecord::new(RecordType::Put as u16, 1, 0, 0, big_payload, None),
432 Err(WalError::PayloadTooLarge { .. })
433 ));
434 }
435
436 #[test]
437 fn record_type_required_flag() {
438 assert!(RecordType::is_required(RecordType::Put as u16));
439 assert!(RecordType::is_required(RecordType::Delete as u16));
440 assert!(RecordType::is_required(RecordType::Checkpoint as u16));
441 assert!(!RecordType::is_required(RecordType::Noop as u16));
442 assert!(!RecordType::is_required(RecordType::TimeseriesBatch as u16));
443 assert!(!RecordType::is_required(RecordType::LogBatch as u16));
444 }
445}