Skip to main content

nodedb_wal/
record.rs

1//! WAL record format.
2//!
3//! On-disk layout (all fields little-endian):
4//!
5//! ```text
6//! ┌──────────┬─────────────────┬────────────┬─────┬───────────┬───────────┬─────────────┬─────────┐
7//! │  magic   │ format_version  │ record_type│ lsn │ tenant_id │ vshard_id │ payload_len │ crc32c  │
8//! │  4 bytes │    2 bytes      │   2 bytes  │ 8B  │   4 bytes │  2 bytes  │   4 bytes   │ 4 bytes │
9//! └──────────┴─────────────────┴────────────┴─────┴───────────┴───────────┴─────────────┴─────────┘
10//! Total header: 30 bytes
11//! Followed by: [payload_len bytes of payload]
12//! ```
13
14use crate::error::{Result, WalError};
15
16/// Magic number identifying a NodeDB WAL record.
17pub const WAL_MAGIC: u32 = 0x5359_4E57; // "SYNW" in ASCII
18
19/// Current WAL format version.
20pub const WAL_FORMAT_VERSION: u16 = 1;
21
22/// Maximum WAL record payload size (64 MiB). Distinct from cluster RPC's limit.
23pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
24
25/// Size of the record header in bytes.
26pub const HEADER_SIZE: usize = 30;
27
28/// WAL record header (fixed 30 bytes).
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct RecordHeader {
31    /// Magic number (`WAL_MAGIC`).
32    pub magic: u32,
33
34    /// Format version for forward/backward compatibility.
35    pub format_version: u16,
36
37    /// Record type discriminant.
38    pub record_type: u16,
39
40    /// Log Sequence Number — monotonically increasing, globally unique.
41    pub lsn: u64,
42
43    /// Tenant ID for multi-tenant isolation.
44    pub tenant_id: u32,
45
46    /// Virtual shard ID for routing.
47    pub vshard_id: u16,
48
49    /// Length of the payload following this header.
50    pub payload_len: u32,
51
52    /// CRC32C of the header (excluding this field) + payload.
53    pub crc32c: u32,
54}
55
56impl RecordHeader {
57    /// Serialize the header to a byte buffer.
58    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
59        let mut buf = [0u8; HEADER_SIZE];
60        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
61        buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
62        buf[6..8].copy_from_slice(&self.record_type.to_le_bytes());
63        buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
64        buf[16..20].copy_from_slice(&self.tenant_id.to_le_bytes());
65        buf[20..22].copy_from_slice(&self.vshard_id.to_le_bytes());
66        buf[22..26].copy_from_slice(&self.payload_len.to_le_bytes());
67        buf[26..30].copy_from_slice(&self.crc32c.to_le_bytes());
68        buf
69    }
70
71    /// Deserialize a header from a byte buffer.
72    pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
73        Self {
74            magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
75            format_version: u16::from_le_bytes([buf[4], buf[5]]),
76            record_type: u16::from_le_bytes([buf[6], buf[7]]),
77            lsn: u64::from_le_bytes([
78                buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
79            ]),
80            tenant_id: u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]),
81            vshard_id: u16::from_le_bytes([buf[20], buf[21]]),
82            payload_len: u32::from_le_bytes([buf[22], buf[23], buf[24], buf[25]]),
83            crc32c: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
84        }
85    }
86
87    /// Compute the CRC32C over the header (excluding the crc32c field) + payload.
88    pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
89        let header_bytes = self.to_bytes();
90        // Hash everything except the last 4 bytes (the crc32c field itself).
91        let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
92        digest = crc32c::crc32c_append(digest, payload);
93        digest
94    }
95
96    /// Get the logical record type (with encryption flag stripped).
97    pub fn logical_record_type(&self) -> u16 {
98        self.record_type & !ENCRYPTED_FLAG
99    }
100
101    /// Validate this header's magic and version.
102    pub fn validate(&self, offset: u64) -> Result<()> {
103        if self.magic != WAL_MAGIC {
104            return Err(WalError::InvalidMagic {
105                offset,
106                expected: WAL_MAGIC,
107                actual: self.magic,
108            });
109        }
110        if self.format_version > WAL_FORMAT_VERSION {
111            return Err(WalError::UnsupportedVersion {
112                version: self.format_version,
113                supported: WAL_FORMAT_VERSION,
114            });
115        }
116        Ok(())
117    }
118}
119
120/// Record type discriminants.
121///
122/// Types 0-255 are reserved for NodeDB core.
123/// Types 256+ are available for NodeDB specific records.
124///
125/// Bit 15 (0x8000) marks a record as **required** — unknown required records
126/// cause a replay failure. Unknown records without bit 15 set are safely skipped.
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128#[repr(u16)]
129pub enum RecordType {
130    /// No-op / padding record (skipped during replay).
131    Noop = 0,
132
133    /// Generic key-value write.
134    Put = 1 | 0x8000,
135
136    /// Generic key deletion.
137    Delete = 2 | 0x8000,
138
139    /// Vector engine: insert/update embedding.
140    VectorPut = 10 | 0x8000,
141
142    /// Vector engine: soft-delete a vector by internal ID.
143    VectorDelete = 11 | 0x8000,
144
145    /// Vector engine: set HNSW index parameters for a collection.
146    VectorParams = 12 | 0x8000,
147
148    /// CRDT engine: delta application.
149    CrdtDelta = 20 | 0x8000,
150
151    /// Timeseries engine: metric sample batch.
152    TimeseriesBatch = 30,
153
154    /// Timeseries engine: log entry batch.
155    LogBatch = 31,
156
157    /// Atomic transaction: wraps multiple sub-records into a single WAL
158    /// group. On replay, either all sub-records apply or none.
159    /// Payload: MessagePack-encoded `Vec<(record_type: u16, payload: Vec<u8>)>`.
160    Transaction = 50 | 0x8000,
161
162    /// Checkpoint marker — indicates a consistent snapshot point.
163    Checkpoint = 100 | 0x8000,
164
165    /// Collection hard-delete tombstone — any prior record for
166    /// `(tenant_id, collection)` with `lsn < purge_lsn` must be skipped on
167    /// replay. Payload: [`CollectionTombstonePayload`] (MessagePack).
168    ///
169    /// Required: replaying a node that does not understand this record
170    /// would resurrect purged rows.
171    CollectionTombstoned = 101 | 0x8000,
172}
173
174impl RecordType {
175    /// Whether this record type is required (must be understood for correct replay).
176    pub fn is_required(raw: u16) -> bool {
177        raw & 0x8000 != 0
178    }
179
180    /// Convert a raw u16 to a known RecordType, or None if unknown.
181    pub fn from_raw(raw: u16) -> Option<Self> {
182        match raw {
183            0 => Some(Self::Noop),
184            x if x == 1 | 0x8000 => Some(Self::Put),
185            x if x == 2 | 0x8000 => Some(Self::Delete),
186            x if x == 10 | 0x8000 => Some(Self::VectorPut),
187            x if x == 11 | 0x8000 => Some(Self::VectorDelete),
188            x if x == 12 | 0x8000 => Some(Self::VectorParams),
189            x if x == 20 | 0x8000 => Some(Self::CrdtDelta),
190            x if x == 50 | 0x8000 => Some(Self::Transaction),
191            30 => Some(Self::TimeseriesBatch),
192            31 => Some(Self::LogBatch),
193            x if x == 100 | 0x8000 => Some(Self::Checkpoint),
194            x if x == 101 | 0x8000 => Some(Self::CollectionTombstoned),
195            _ => None,
196        }
197    }
198}
199
200/// A complete WAL record: header + payload.
201#[derive(Debug, Clone)]
202pub struct WalRecord {
203    pub header: RecordHeader,
204    pub payload: Vec<u8>,
205}
206
207impl WalRecord {
208    /// Create a new WAL record with computed CRC32C.
209    ///
210    /// If `encryption_key` is provided, the payload is encrypted before
211    /// CRC computation. The ciphertext includes a 16-byte auth tag.
212    pub fn new(
213        record_type: u16,
214        lsn: u64,
215        tenant_id: u32,
216        vshard_id: u16,
217        payload: Vec<u8>,
218        encryption_key: Option<&crate::crypto::WalEncryptionKey>,
219    ) -> Result<Self> {
220        if payload.len() > MAX_WAL_PAYLOAD_SIZE {
221            return Err(WalError::PayloadTooLarge {
222                size: payload.len(),
223                max: MAX_WAL_PAYLOAD_SIZE,
224            });
225        }
226
227        // Encrypt if key provided.
228        let (final_payload, encrypted) = if let Some(key) = encryption_key {
229            // Build a temporary header for AAD (crc32c is 0 during encryption).
230            let temp_header = RecordHeader {
231                magic: WAL_MAGIC,
232                format_version: WAL_FORMAT_VERSION,
233                record_type,
234                lsn,
235                tenant_id,
236                vshard_id,
237                payload_len: 0, // Will be updated after encryption.
238                crc32c: 0,
239            };
240            let header_bytes = temp_header.to_bytes();
241            let ciphertext = key.encrypt(lsn, &header_bytes, &payload)?;
242            (ciphertext, true)
243        } else {
244            (payload, false)
245        };
246
247        // Set bit 14 in record_type to indicate encryption.
248        let record_type = if encrypted {
249            record_type | ENCRYPTED_FLAG
250        } else {
251            record_type
252        };
253
254        let mut header = RecordHeader {
255            magic: WAL_MAGIC,
256            format_version: WAL_FORMAT_VERSION,
257            record_type,
258            lsn,
259            tenant_id,
260            vshard_id,
261            payload_len: final_payload.len() as u32,
262            crc32c: 0,
263        };
264
265        header.crc32c = header.compute_checksum(&final_payload);
266
267        Ok(Self {
268            header,
269            payload: final_payload,
270        })
271    }
272
273    /// Decrypt the payload if the record is encrypted.
274    ///
275    /// `epoch` is the encryption epoch from the WAL segment header.
276    /// Returns the plaintext payload. If not encrypted, returns the payload as-is.
277    pub fn decrypt_payload(
278        &self,
279        epoch: &[u8; 4],
280        encryption_key: Option<&crate::crypto::WalEncryptionKey>,
281    ) -> Result<Vec<u8>> {
282        if !self.is_encrypted() {
283            return Ok(self.payload.clone());
284        }
285
286        let key = encryption_key.ok_or_else(|| WalError::EncryptionError {
287            detail: "record is encrypted but no decryption key provided".into(),
288        })?;
289
290        // Reconstruct the header bytes used as AAD (with the encrypted flag stripped
291        // from record_type, and payload_len=0, crc32c=0 — same as during encryption).
292        let mut aad_header = self.header;
293        aad_header.record_type &= !ENCRYPTED_FLAG;
294        aad_header.payload_len = 0;
295        aad_header.crc32c = 0;
296        let header_bytes = aad_header.to_bytes();
297
298        key.decrypt(epoch, self.header.lsn, &header_bytes, &self.payload)
299    }
300
301    /// Decrypt the payload using a key ring (supports dual-key rotation).
302    ///
303    /// `epoch` is the encryption epoch from the WAL segment header.
304    /// Tries the current key first, then falls back to the previous key.
305    /// Returns the plaintext payload. If not encrypted, returns the payload as-is.
306    pub fn decrypt_payload_ring(
307        &self,
308        epoch: &[u8; 4],
309        ring: Option<&crate::crypto::KeyRing>,
310    ) -> Result<Vec<u8>> {
311        if !self.is_encrypted() {
312            return Ok(self.payload.clone());
313        }
314
315        let ring = ring.ok_or_else(|| WalError::EncryptionError {
316            detail: "record is encrypted but no decryption key ring provided".into(),
317        })?;
318
319        let mut aad_header = self.header;
320        aad_header.record_type &= !ENCRYPTED_FLAG;
321        aad_header.payload_len = 0;
322        aad_header.crc32c = 0;
323        let header_bytes = aad_header.to_bytes();
324
325        ring.decrypt(epoch, self.header.lsn, &header_bytes, &self.payload)
326    }
327
328    /// Whether this record's payload is encrypted.
329    pub fn is_encrypted(&self) -> bool {
330        self.header.record_type & ENCRYPTED_FLAG != 0
331    }
332
333    /// Get the logical record type (with encryption flag stripped).
334    pub fn logical_record_type(&self) -> u16 {
335        self.header.record_type & !ENCRYPTED_FLAG
336    }
337
338    /// Verify the CRC32C checksum.
339    pub fn verify_checksum(&self) -> Result<()> {
340        let expected = self.header.crc32c;
341        let actual = self.header.compute_checksum(&self.payload);
342        if expected != actual {
343            return Err(WalError::ChecksumMismatch {
344                lsn: self.header.lsn,
345                expected,
346                actual,
347            });
348        }
349        Ok(())
350    }
351
352    /// Total size on disk: header + payload.
353    pub fn wire_size(&self) -> usize {
354        HEADER_SIZE + self.payload.len()
355    }
356}
357
358/// Bit 14 in record_type signals that the payload is AES-256-GCM encrypted.
359/// This is separate from bit 15 (required flag).
360pub const ENCRYPTED_FLAG: u16 = 0x4000;
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn header_roundtrip() {
368        let header = RecordHeader {
369            magic: WAL_MAGIC,
370            format_version: WAL_FORMAT_VERSION,
371            record_type: RecordType::Put as u16,
372            lsn: 42,
373            tenant_id: 7,
374            vshard_id: 3,
375            payload_len: 100,
376            crc32c: 0xDEAD_BEEF,
377        };
378
379        let bytes = header.to_bytes();
380        let decoded = RecordHeader::from_bytes(&bytes);
381        assert_eq!(header, decoded);
382    }
383
384    #[test]
385    fn checksum_roundtrip() {
386        let payload = b"hello nodedb";
387        let record =
388            WalRecord::new(RecordType::Put as u16, 1, 0, 0, payload.to_vec(), None).unwrap();
389
390        record.verify_checksum().unwrap();
391    }
392
393    #[test]
394    fn checksum_detects_corruption() {
395        let payload = b"hello nodedb";
396        let mut record =
397            WalRecord::new(RecordType::Put as u16, 1, 0, 0, payload.to_vec(), None).unwrap();
398
399        // Corrupt one byte.
400        record.payload[0] ^= 0xFF;
401
402        assert!(matches!(
403            record.verify_checksum(),
404            Err(WalError::ChecksumMismatch { .. })
405        ));
406    }
407
408    #[test]
409    fn invalid_magic_detected() {
410        let header = RecordHeader {
411            magic: 0xBAD0_F00D,
412            format_version: WAL_FORMAT_VERSION,
413            record_type: 0,
414            lsn: 0,
415            tenant_id: 0,
416            vshard_id: 0,
417            payload_len: 0,
418            crc32c: 0,
419        };
420
421        assert!(matches!(
422            header.validate(0),
423            Err(WalError::InvalidMagic { .. })
424        ));
425    }
426
427    #[test]
428    fn payload_too_large_rejected() {
429        let big_payload = vec![0u8; MAX_WAL_PAYLOAD_SIZE + 1];
430        assert!(matches!(
431            WalRecord::new(RecordType::Put as u16, 1, 0, 0, big_payload, None),
432            Err(WalError::PayloadTooLarge { .. })
433        ));
434    }
435
436    #[test]
437    fn record_type_required_flag() {
438        assert!(RecordType::is_required(RecordType::Put as u16));
439        assert!(RecordType::is_required(RecordType::Delete as u16));
440        assert!(RecordType::is_required(RecordType::Checkpoint as u16));
441        assert!(!RecordType::is_required(RecordType::Noop as u16));
442        assert!(!RecordType::is_required(RecordType::TimeseriesBatch as u16));
443        assert!(!RecordType::is_required(RecordType::LogBatch as u16));
444    }
445}