Skip to main content

nodedb_raft/
snapshot_framing.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Snapshot chunk framing for `InstallSnapshot` RPCs.
4//!
5//! # Wire layout (per chunk)
6//!
7//! ```text
8//! [magic 4B][version u16 BE][engine_id u16 BE][crc32c u32 BE][payload]
9//! ```
10//!
11//! - **magic**: `NDSN` (`0x4E 0x44 0x53 0x4E`) — identifies NodeDB snapshot frames.
12//! - **version**: `SNAPSHOT_FORMAT_VERSION` (currently `1`). Unknown future versions
13//!   are rejected. There is no v0-fallback path for snapshot data — the format is
14//!   always framed.
15//! - **engine_id**: discriminant from [`SnapshotEngineId`] (u16 big-endian).
16//! - **crc32c**: CRC-32C computed over `engine_id bytes (2) ++ payload bytes`.
17//!   Validates both the engine tag and the payload together.
18//! - **payload**: engine-specific snapshot bytes.
19//!
20//! # Empty-payload bootstrap stub
21//!
22//! The current `tick.rs` sends `InstallSnapshot` with `data: vec![]` as a
23//! positional marker (no real engine data yet). The receive side in
24//! `handle_rpc.rs` therefore skips framing validation when `data` is
25//! empty. When a real engine starts shipping snapshot data it must call
26//! [`encode_snapshot_chunk`] on the sender side; the receiver enforces
27//! framing for all non-empty payloads automatically.
28//!
29//! # Adding a new engine
30//!
31//! 1. Add a variant to [`SnapshotEngineId`] with a unique discriminant.
32//! 2. Call [`encode_snapshot_chunk`] when building the chunk.
33//! 3. The receiver's `decode_snapshot_chunk` requires no changes.
34
35use thiserror::Error;
36
37/// Four-byte magic that identifies a NodeDB snapshot frame.
38pub const SNAPSHOT_MAGIC: [u8; 4] = *b"NDSN";
39
40/// Format version embedded in every snapshot frame header.
41///
42/// A single version shared across all engines. Per-engine versioning
43/// (if ever needed) belongs inside the payload, not the frame header.
44pub const SNAPSHOT_FORMAT_VERSION: u16 = 1;
45
46/// Fixed byte length of the frame header (magic + version + engine_id + crc32c).
47const HEADER_LEN: usize = 4 + 2 + 2 + 4;
48
49/// Engine identifier embedded in every snapshot frame.
50///
51/// The discriminant is stored as a `u16` big-endian in the wire format.
52/// Gaps in the numbering are intentional — they leave room for future
53/// engines without renumbering existing ones.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55#[repr(u16)]
56pub enum SnapshotEngineId {
57    Vector = 1,
58    Graph = 2,
59    DocumentSchemaless = 3,
60    DocumentStrict = 4,
61    Columnar = 5,
62    KeyValue = 6,
63    Fts = 7,
64    Spatial = 8,
65    Crdt = 9,
66}
67
68impl SnapshotEngineId {
69    /// Parse a raw `u16` discriminant from the wire.
70    pub fn from_u16(v: u16) -> Result<Self, SnapshotFramingError> {
71        match v {
72            1 => Ok(Self::Vector),
73            2 => Ok(Self::Graph),
74            3 => Ok(Self::DocumentSchemaless),
75            4 => Ok(Self::DocumentStrict),
76            5 => Ok(Self::Columnar),
77            6 => Ok(Self::KeyValue),
78            7 => Ok(Self::Fts),
79            8 => Ok(Self::Spatial),
80            9 => Ok(Self::Crdt),
81            other => Err(SnapshotFramingError::UnknownEngineId(other)),
82        }
83    }
84}
85
86/// Errors produced by snapshot frame encoding and decoding.
87#[derive(Debug, Error, Clone)]
88pub enum SnapshotFramingError {
89    #[error("snapshot frame magic mismatch: expected {SNAPSHOT_MAGIC:?}, got {0:?}")]
90    MagicMismatch([u8; 4]),
91
92    #[error("snapshot frame version mismatch: expected {SNAPSHOT_FORMAT_VERSION}, got {0}")]
93    VersionMismatch(u16),
94
95    #[error("snapshot frame CRC mismatch: stored {stored:#010x}, computed {computed:#010x}")]
96    CrcMismatch { stored: u32, computed: u32 },
97
98    #[error("unknown snapshot engine id: {0}")]
99    UnknownEngineId(u16),
100
101    #[error("snapshot frame truncated: need at least {HEADER_LEN} bytes, got {0}")]
102    Truncated(usize),
103}
104
105impl From<SnapshotFramingError> for crate::error::RaftError {
106    fn from(e: SnapshotFramingError) -> Self {
107        crate::error::RaftError::SnapshotFormat {
108            detail: e.to_string(),
109        }
110    }
111}
112
113/// Encode a single snapshot chunk into the framed wire format.
114///
115/// The CRC-32C is computed over the two `engine_id` bytes followed by
116/// all `payload` bytes, then the result is prefixed with the fixed header.
117pub fn encode_snapshot_chunk(engine_id: SnapshotEngineId, payload: &[u8]) -> Vec<u8> {
118    let engine_bytes = (engine_id as u16).to_be_bytes();
119    let crc = {
120        let mut h = crc32c::crc32c(&engine_bytes);
121        h = crc32c::crc32c_append(h, payload);
122        h
123    };
124
125    let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
126    out.extend_from_slice(&SNAPSHOT_MAGIC);
127    out.extend_from_slice(&SNAPSHOT_FORMAT_VERSION.to_be_bytes());
128    out.extend_from_slice(&engine_bytes);
129    out.extend_from_slice(&crc.to_be_bytes());
130    out.extend_from_slice(payload);
131    out
132}
133
134/// Decode a framed snapshot chunk, returning the engine id and a slice
135/// into the original buffer pointing at the payload (zero-copy).
136///
137/// Validates magic, version, and CRC in that order; returns the first
138/// error encountered.
139pub fn decode_snapshot_chunk(
140    data: &[u8],
141) -> Result<(SnapshotEngineId, &[u8]), SnapshotFramingError> {
142    if data.len() < HEADER_LEN {
143        return Err(SnapshotFramingError::Truncated(data.len()));
144    }
145
146    // Magic — safe: we verified data.len() >= HEADER_LEN (12) above.
147    let magic: [u8; 4] = [data[0], data[1], data[2], data[3]];
148    if magic != SNAPSHOT_MAGIC {
149        return Err(SnapshotFramingError::MagicMismatch(magic));
150    }
151
152    // Version.
153    let version = u16::from_be_bytes([data[4], data[5]]);
154    if version != SNAPSHOT_FORMAT_VERSION {
155        return Err(SnapshotFramingError::VersionMismatch(version));
156    }
157
158    // Engine id.
159    let engine_id_raw = u16::from_be_bytes([data[6], data[7]]);
160    let engine_id = SnapshotEngineId::from_u16(engine_id_raw)?;
161
162    // CRC (over engine_id bytes + payload).
163    let stored_crc = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
164    let payload = &data[HEADER_LEN..];
165    let computed_crc = {
166        let mut h = crc32c::crc32c(&data[6..8]); // engine_id bytes
167        h = crc32c::crc32c_append(h, payload);
168        h
169    };
170    if stored_crc != computed_crc {
171        return Err(SnapshotFramingError::CrcMismatch {
172            stored: stored_crc,
173            computed: computed_crc,
174        });
175    }
176
177    Ok((engine_id, payload))
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    const ALL_ENGINES: &[SnapshotEngineId] = &[
185        SnapshotEngineId::Vector,
186        SnapshotEngineId::Graph,
187        SnapshotEngineId::DocumentSchemaless,
188        SnapshotEngineId::DocumentStrict,
189        SnapshotEngineId::Columnar,
190        SnapshotEngineId::KeyValue,
191        SnapshotEngineId::Fts,
192        SnapshotEngineId::Spatial,
193        SnapshotEngineId::Crdt,
194    ];
195
196    #[test]
197    fn roundtrip_all_engine_ids() {
198        for &engine_id in ALL_ENGINES {
199            let payload = b"test snapshot payload";
200            let framed = encode_snapshot_chunk(engine_id, payload);
201            let (decoded_id, decoded_payload) = decode_snapshot_chunk(&framed).unwrap();
202            assert_eq!(decoded_id, engine_id);
203            assert_eq!(decoded_payload, payload);
204        }
205    }
206
207    #[test]
208    fn roundtrip_empty_payload() {
209        let framed = encode_snapshot_chunk(SnapshotEngineId::KeyValue, &[]);
210        let (id, payload) = decode_snapshot_chunk(&framed).unwrap();
211        assert_eq!(id, SnapshotEngineId::KeyValue);
212        assert!(payload.is_empty());
213    }
214
215    #[test]
216    fn tamper_magic_returns_magic_mismatch() {
217        let mut framed = encode_snapshot_chunk(SnapshotEngineId::Vector, b"data");
218        framed[0] ^= 0xFF;
219        let err = decode_snapshot_chunk(&framed).unwrap_err();
220        assert!(
221            matches!(err, SnapshotFramingError::MagicMismatch(_)),
222            "{err}"
223        );
224    }
225
226    #[test]
227    fn tamper_version_returns_version_mismatch() {
228        let mut framed = encode_snapshot_chunk(SnapshotEngineId::Graph, b"data");
229        // Flip the version bytes to something != SNAPSHOT_FORMAT_VERSION.
230        let bad_version = SNAPSHOT_FORMAT_VERSION.wrapping_add(1).to_be_bytes();
231        framed[4] = bad_version[0];
232        framed[5] = bad_version[1];
233        let err = decode_snapshot_chunk(&framed).unwrap_err();
234        assert!(
235            matches!(err, SnapshotFramingError::VersionMismatch(_)),
236            "{err}"
237        );
238    }
239
240    #[test]
241    fn tamper_crc_returns_crc_mismatch() {
242        let mut framed = encode_snapshot_chunk(SnapshotEngineId::Fts, b"important data");
243        // Flip a bit in the CRC field (bytes 8..12).
244        framed[9] ^= 0x01;
245        let err = decode_snapshot_chunk(&framed).unwrap_err();
246        assert!(
247            matches!(err, SnapshotFramingError::CrcMismatch { .. }),
248            "{err}"
249        );
250    }
251
252    #[test]
253    fn reject_unknown_engine_id() {
254        // Manually construct a frame with a valid header but unknown engine_id = 99.
255        let engine_id_raw: u16 = 99;
256        let engine_bytes = engine_id_raw.to_be_bytes();
257        let payload = b"payload";
258        let crc = {
259            let mut h = crc32c::crc32c(&engine_bytes);
260            h = crc32c::crc32c_append(h, payload);
261            h
262        };
263        let mut frame = Vec::new();
264        frame.extend_from_slice(&SNAPSHOT_MAGIC);
265        frame.extend_from_slice(&SNAPSHOT_FORMAT_VERSION.to_be_bytes());
266        frame.extend_from_slice(&engine_bytes);
267        frame.extend_from_slice(&crc.to_be_bytes());
268        frame.extend_from_slice(payload);
269
270        let err = decode_snapshot_chunk(&frame).unwrap_err();
271        assert!(
272            matches!(err, SnapshotFramingError::UnknownEngineId(99)),
273            "{err}"
274        );
275    }
276
277    #[test]
278    fn truncated_frame_returns_truncated_error() {
279        let framed = encode_snapshot_chunk(SnapshotEngineId::Crdt, b"data");
280        // Feed only 5 bytes — not enough for the full header.
281        let err = decode_snapshot_chunk(&framed[..5]).unwrap_err();
282        assert!(matches!(err, SnapshotFramingError::Truncated(5)), "{err}");
283    }
284
285    #[test]
286    fn from_u16_roundtrip_all_discriminants() {
287        for &engine_id in ALL_ENGINES {
288            let raw = engine_id as u16;
289            let decoded = SnapshotEngineId::from_u16(raw).unwrap();
290            assert_eq!(decoded, engine_id);
291        }
292    }
293
294    #[test]
295    fn from_u16_unknown_returns_error() {
296        let err = SnapshotEngineId::from_u16(0).unwrap_err();
297        assert!(matches!(err, SnapshotFramingError::UnknownEngineId(0)));
298
299        let err = SnapshotEngineId::from_u16(255).unwrap_err();
300        assert!(matches!(err, SnapshotFramingError::UnknownEngineId(255)));
301    }
302
303    /// Asserts `NDSN` magic at [0..4], SNAPSHOT_FORMAT_VERSION == 1 BE at [4..6],
304    /// and that the CRC at [8..12] covers the engine_id bytes plus payload.
305    #[test]
306    fn golden_raft_snapshot_frame_format() {
307        let payload = b"golden-payload";
308        let framed = encode_snapshot_chunk(SnapshotEngineId::KeyValue, payload);
309
310        // Magic at [0..4].
311        assert_eq!(&framed[0..4], b"NDSN", "magic mismatch");
312
313        // Format version at [4..6] BE.
314        let version = u16::from_be_bytes([framed[4], framed[5]]);
315        assert_eq!(version, SNAPSHOT_FORMAT_VERSION, "version mismatch");
316        assert_eq!(version, 1u16, "expected SNAPSHOT_FORMAT_VERSION == 1");
317
318        // Engine ID at [6..8].
319        let engine_id_raw = u16::from_be_bytes([framed[6], framed[7]]);
320        assert_eq!(engine_id_raw, SnapshotEngineId::KeyValue as u16);
321
322        // CRC at [8..12] BE covers engine_id bytes + payload.
323        let stored_crc = u32::from_be_bytes([framed[8], framed[9], framed[10], framed[11]]);
324        let engine_bytes = (SnapshotEngineId::KeyValue as u16).to_be_bytes();
325        let mut h = crc32c::crc32c(&engine_bytes);
326        h = crc32c::crc32c_append(h, payload);
327        assert_eq!(stored_crc, h, "CRC mismatch");
328
329        // Payload follows immediately after the 12-byte header (magic+version+engine_id+crc).
330        assert_eq!(&framed[12..], payload);
331    }
332}