Skip to main content

nodedb_types/
backup_envelope.rs

1//! Backup envelope: framing for tenant backup bytes.
2//!
3//! All multi-byte integers are little-endian.
4//!
5//! ```text
6//!   ┌─ HEADER ────────────────────────────────────────────────────┐
7//!   │ magic       : 4 bytes  = b"NDBB"                            │
8//!   │ version     : u8       = 1                                  │
9//!   │ _reserved   : 3 bytes  = 0                                  │
10//!   │ tenant_id   : u32                                           │
11//!   │ src_vshards : u16   (source cluster's VSHARD_COUNT)         │
12//!   │ _reserved   : 2 bytes  = 0                                  │
13//!   │ hash_seed   : u64   (source cluster's hash seed, 0 today)   │
14//!   │ watermark   : u64   (snapshot LSN; 0 if not captured)       │
15//!   │ section_cnt : u16                                           │
16//!   │ _reserved   : 6 bytes  = 0                                  │
17//!   │ header_crc  : u32   (crc32c over the preceding 40 bytes)    │
18//!   └─────────────────────────────────────────────────────────────┘
19//!   ┌─ SECTION × section_cnt ─────────────────────────────────────┐
20//!   │ origin_node : u64                                           │
21//!   │ body_len    : u32   (≤ MAX_SECTION_BYTES)                   │
22//!   │ body        : body_len bytes                                │
23//!   │ body_crc    : u32   (crc32c over body)                      │
24//!   └─────────────────────────────────────────────────────────────┘
25//!   ┌─ TRAILER ───────────────────────────────────────────────────┐
26//!   │ trailer_crc : u32   (crc32c over header bytes + every       │
27//!   │                      section's framed bytes)                │
28//!   └─────────────────────────────────────────────────────────────┘
29//! ```
30//!
31//! Total envelope size is bounded by `MAX_TOTAL_BYTES`. The decoder
32//! short-circuits before allocating per body / per envelope, so a
33//! caller-supplied byte stream cannot drive unbounded allocation.
34
35use thiserror::Error;
36
37pub const MAGIC: &[u8; 4] = b"NDBB";
38pub const VERSION: u8 = 1;
39
40/// Header is fixed-size — 44 bytes (40 framed + 4 crc).
41pub const HEADER_LEN: usize = 44;
42/// Per-section framing overhead: origin(8) + len(4) + crc(4).
43pub const SECTION_OVERHEAD: usize = 16;
44/// Trailing crc.
45pub const TRAILER_LEN: usize = 4;
46
47/// Default cap on total envelope size: 16 GiB. Tunable per call.
48pub const DEFAULT_MAX_TOTAL_BYTES: u64 = 16 * 1024 * 1024 * 1024;
49/// Default cap on a single section body: 16 GiB.
50pub const DEFAULT_MAX_SECTION_BYTES: u64 = 16 * 1024 * 1024 * 1024;
51
52/// Sentinel `origin_node_id` values that mark sections carrying
53/// metadata rather than per-node engine data. The envelope format is
54/// backward-compatible: V1 readers that don't know about these
55/// sentinels still decode the envelope (the bytes just aren't applied
56/// at restore time), but the section CRCs still validate. Restore
57/// handlers recognize the sentinel and route the body to the correct
58/// catalog writer.
59pub const SECTION_ORIGIN_CATALOG_ROWS: u64 = 0xFFFF_FFFF_FFFF_FFF0;
60pub const SECTION_ORIGIN_SOURCE_TOMBSTONES: u64 = 0xFFFF_FFFF_FFFF_FFF1;
61
62/// Single catalog-row entry in a catalog-rows section. The outer
63/// container is `Vec<StoredCollectionBlob>` msgpack-encoded into the
64/// section body. Bytes are the zerompk-encoded `StoredCollection`
65/// from the `nodedb` crate — `nodedb-types` intentionally doesn't
66/// depend on the `nodedb` catalog types, so the blob is opaque here.
67#[derive(Debug, Clone, PartialEq, Eq, zerompk::ToMessagePack, zerompk::FromMessagePack)]
68pub struct StoredCollectionBlob {
69    pub name: String,
70    /// zerompk-encoded `StoredCollection`.
71    pub bytes: Vec<u8>,
72}
73
74/// Single source-side tombstone entry. `purge_lsn` is the Origin WAL
75/// LSN at which the hard-delete committed — restore uses it as a
76/// per-collection replay barrier so rows older than the purge don't
77/// resurrect.
78#[derive(Debug, Clone, PartialEq, Eq, zerompk::ToMessagePack, zerompk::FromMessagePack)]
79pub struct SourceTombstoneEntry {
80    pub collection: String,
81    pub purge_lsn: u64,
82}
83
84#[derive(Debug, Error, PartialEq, Eq)]
85pub enum EnvelopeError {
86    #[error("invalid backup format")]
87    BadMagic,
88    #[error("unsupported backup version: {0}")]
89    UnsupportedVersion(u8),
90    #[error("invalid backup format")]
91    HeaderCrcMismatch,
92    #[error("invalid backup format")]
93    BodyCrcMismatch,
94    #[error("invalid backup format")]
95    TrailerCrcMismatch,
96    #[error("backup truncated")]
97    Truncated,
98    #[error("backup tenant mismatch: expected {expected}, got {actual}")]
99    TenantMismatch { expected: u32, actual: u32 },
100    #[error("backup exceeds size cap of {cap} bytes")]
101    OverSizeTotal { cap: u64 },
102    #[error("backup section exceeds size cap of {cap} bytes")]
103    OverSizeSection { cap: u64 },
104    #[error("too many sections: {0}")]
105    TooManySections(u16),
106}
107
108/// Header metadata captured at backup time.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub struct EnvelopeMeta {
111    pub tenant_id: u32,
112    pub source_vshard_count: u16,
113    pub hash_seed: u64,
114    pub snapshot_watermark: u64,
115}
116
117/// One contiguous body produced by one origin node.
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct Section {
120    pub origin_node_id: u64,
121    pub body: Vec<u8>,
122}
123
124/// Decoded envelope.
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct Envelope {
127    pub meta: EnvelopeMeta,
128    pub sections: Vec<Section>,
129}
130
131/// Build an envelope by pushing sections one at a time, then `finalize()`.
132pub struct EnvelopeWriter {
133    meta: EnvelopeMeta,
134    sections: Vec<Section>,
135    max_total: u64,
136    max_section: u64,
137    framed_size: u64,
138}
139
140impl EnvelopeWriter {
141    pub fn new(meta: EnvelopeMeta) -> Self {
142        Self::with_caps(meta, DEFAULT_MAX_TOTAL_BYTES, DEFAULT_MAX_SECTION_BYTES)
143    }
144
145    pub fn with_caps(meta: EnvelopeMeta, max_total: u64, max_section: u64) -> Self {
146        Self {
147            meta,
148            sections: Vec::new(),
149            max_total,
150            max_section,
151            framed_size: HEADER_LEN as u64 + TRAILER_LEN as u64,
152        }
153    }
154
155    pub fn push_section(
156        &mut self,
157        origin_node_id: u64,
158        body: Vec<u8>,
159    ) -> Result<(), EnvelopeError> {
160        if body.len() as u64 > self.max_section {
161            return Err(EnvelopeError::OverSizeSection {
162                cap: self.max_section,
163            });
164        }
165        let added = SECTION_OVERHEAD as u64 + body.len() as u64;
166        if self.framed_size + added > self.max_total {
167            return Err(EnvelopeError::OverSizeTotal {
168                cap: self.max_total,
169            });
170        }
171        if self.sections.len() >= u16::MAX as usize {
172            return Err(EnvelopeError::TooManySections(u16::MAX));
173        }
174        self.framed_size += added;
175        self.sections.push(Section {
176            origin_node_id,
177            body,
178        });
179        Ok(())
180    }
181
182    pub fn finalize(self) -> Vec<u8> {
183        let mut out = Vec::with_capacity(self.framed_size as usize);
184        write_header(&mut out, &self.meta, self.sections.len() as u16);
185        for section in &self.sections {
186            write_section(&mut out, section);
187        }
188        // Trailer crc covers header bytes + every section's framed bytes.
189        let trailer_crc = crc32c::crc32c(&out);
190        out.extend_from_slice(&trailer_crc.to_le_bytes());
191        out
192    }
193}
194
195fn write_header(out: &mut Vec<u8>, meta: &EnvelopeMeta, section_count: u16) {
196    let start = out.len();
197    out.extend_from_slice(MAGIC);
198    out.push(VERSION);
199    out.extend_from_slice(&[0u8; 3]);
200    out.extend_from_slice(&meta.tenant_id.to_le_bytes());
201    out.extend_from_slice(&meta.source_vshard_count.to_le_bytes());
202    out.extend_from_slice(&[0u8; 2]);
203    out.extend_from_slice(&meta.hash_seed.to_le_bytes());
204    out.extend_from_slice(&meta.snapshot_watermark.to_le_bytes());
205    out.extend_from_slice(&section_count.to_le_bytes());
206    out.extend_from_slice(&[0u8; 6]);
207    let header_crc = crc32c::crc32c(&out[start..]);
208    out.extend_from_slice(&header_crc.to_le_bytes());
209}
210
211fn write_section(out: &mut Vec<u8>, section: &Section) {
212    out.extend_from_slice(&section.origin_node_id.to_le_bytes());
213    out.extend_from_slice(&(section.body.len() as u32).to_le_bytes());
214    out.extend_from_slice(&section.body);
215    let body_crc = crc32c::crc32c(&section.body);
216    out.extend_from_slice(&body_crc.to_le_bytes());
217}
218
219/// Parse and fully validate an envelope.
220pub fn parse(bytes: &[u8], max_total: u64) -> Result<Envelope, EnvelopeError> {
221    if bytes.len() as u64 > max_total {
222        return Err(EnvelopeError::OverSizeTotal { cap: max_total });
223    }
224    if bytes.len() < HEADER_LEN + TRAILER_LEN {
225        return Err(EnvelopeError::Truncated);
226    }
227
228    // Header.
229    let header_bytes = &bytes[..HEADER_LEN];
230    if &header_bytes[0..4] != MAGIC {
231        return Err(EnvelopeError::BadMagic);
232    }
233    let version = header_bytes[4];
234    if version != VERSION {
235        return Err(EnvelopeError::UnsupportedVersion(version));
236    }
237    let claimed_header_crc = u32::from_le_bytes(read4(&header_bytes[40..44]));
238    let actual_header_crc = crc32c::crc32c(&header_bytes[..40]);
239    if claimed_header_crc != actual_header_crc {
240        return Err(EnvelopeError::HeaderCrcMismatch);
241    }
242
243    let meta = EnvelopeMeta {
244        tenant_id: u32::from_le_bytes(read4(&header_bytes[8..12])),
245        source_vshard_count: u16::from_le_bytes(read2(&header_bytes[12..14])),
246        hash_seed: u64::from_le_bytes(read8(&header_bytes[16..24])),
247        snapshot_watermark: u64::from_le_bytes(read8(&header_bytes[24..32])),
248    };
249    let section_count = u16::from_le_bytes(read2(&header_bytes[32..34]));
250
251    // Trailer position: tail 4 bytes.
252    let trailer_start = bytes.len() - TRAILER_LEN;
253    let claimed_trailer_crc = u32::from_le_bytes(read4(&bytes[trailer_start..]));
254    let actual_trailer_crc = crc32c::crc32c(&bytes[..trailer_start]);
255    if claimed_trailer_crc != actual_trailer_crc {
256        return Err(EnvelopeError::TrailerCrcMismatch);
257    }
258
259    // Sections live between header and trailer.
260    let mut cursor = HEADER_LEN;
261    let mut sections = Vec::with_capacity(section_count as usize);
262    for _ in 0..section_count {
263        if cursor + SECTION_OVERHEAD > trailer_start {
264            return Err(EnvelopeError::Truncated);
265        }
266        let origin_node_id = u64::from_le_bytes(read8(&bytes[cursor..cursor + 8]));
267        let body_len = u32::from_le_bytes(read4(&bytes[cursor + 8..cursor + 12])) as usize;
268        let body_start = cursor + 12;
269        let body_end = body_start + body_len;
270        let crc_end = body_end + 4;
271        if crc_end > trailer_start {
272            return Err(EnvelopeError::Truncated);
273        }
274        let body = bytes[body_start..body_end].to_vec();
275        let claimed_body_crc = u32::from_le_bytes(read4(&bytes[body_end..crc_end]));
276        if crc32c::crc32c(&body) != claimed_body_crc {
277            return Err(EnvelopeError::BodyCrcMismatch);
278        }
279        sections.push(Section {
280            origin_node_id,
281            body,
282        });
283        cursor = crc_end;
284    }
285    if cursor != trailer_start {
286        return Err(EnvelopeError::Truncated);
287    }
288
289    Ok(Envelope { meta, sections })
290}
291
292fn read2(s: &[u8]) -> [u8; 2] {
293    [s[0], s[1]]
294}
295fn read4(s: &[u8]) -> [u8; 4] {
296    [s[0], s[1], s[2], s[3]]
297}
298fn read8(s: &[u8]) -> [u8; 8] {
299    [s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7]]
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    fn meta() -> EnvelopeMeta {
307        EnvelopeMeta {
308            tenant_id: 42,
309            source_vshard_count: 1024,
310            hash_seed: 0,
311            snapshot_watermark: 12345,
312        }
313    }
314
315    #[test]
316    fn empty_envelope_roundtrips() {
317        let bytes = EnvelopeWriter::new(meta()).finalize();
318        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
319        assert_eq!(env.meta, meta());
320        assert!(env.sections.is_empty());
321    }
322
323    #[test]
324    fn multi_section_roundtrips() {
325        let mut w = EnvelopeWriter::new(meta());
326        w.push_section(1, b"one".to_vec()).unwrap();
327        w.push_section(2, b"two-payload".to_vec()).unwrap();
328        w.push_section(3, vec![]).unwrap();
329        let bytes = w.finalize();
330
331        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
332        assert_eq!(env.sections.len(), 3);
333        assert_eq!(env.sections[0].origin_node_id, 1);
334        assert_eq!(env.sections[0].body, b"one");
335        assert_eq!(env.sections[1].origin_node_id, 2);
336        assert_eq!(env.sections[1].body, b"two-payload");
337        assert_eq!(env.sections[2].body, b"");
338    }
339
340    #[test]
341    fn rejects_short_input() {
342        assert_eq!(
343            parse(b"NDBB", DEFAULT_MAX_TOTAL_BYTES),
344            Err(EnvelopeError::Truncated)
345        );
346    }
347
348    #[test]
349    fn rejects_bad_magic() {
350        let mut bytes = EnvelopeWriter::new(meta()).finalize();
351        bytes[0] = b'X';
352        // Header CRC will also fail; ensure magic check trips first.
353        match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
354            EnvelopeError::BadMagic => {}
355            other => panic!("expected BadMagic, got {other:?}"),
356        }
357    }
358
359    #[test]
360    fn rejects_unsupported_version() {
361        let mut bytes = EnvelopeWriter::new(meta()).finalize();
362        bytes[4] = 99;
363        // Header CRC will mismatch, but version is checked first.
364        match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
365            EnvelopeError::UnsupportedVersion(99) => {}
366            other => panic!("expected UnsupportedVersion(99), got {other:?}"),
367        }
368    }
369
370    #[test]
371    fn rejects_header_crc_corruption() {
372        let mut bytes = EnvelopeWriter::new(meta()).finalize();
373        bytes[8] ^= 0xFF; // mutate tenant_id, leave header crc stale
374        assert_eq!(
375            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
376            Err(EnvelopeError::HeaderCrcMismatch)
377        );
378    }
379
380    #[test]
381    fn rejects_body_crc_corruption() {
382        let mut w = EnvelopeWriter::new(meta());
383        w.push_section(7, b"hello".to_vec()).unwrap();
384        let mut bytes = w.finalize();
385        // Body sits at HEADER_LEN + 8 (origin) + 4 (len) = HEADER_LEN+12.
386        let body_off = HEADER_LEN + 12;
387        bytes[body_off] ^= 0xFF;
388        // Trailer CRC will fail before body CRC is even checked. Recompute
389        // trailer to isolate body-CRC enforcement.
390        let trailer_off = bytes.len() - TRAILER_LEN;
391        let new_trailer = crc32c::crc32c(&bytes[..trailer_off]);
392        bytes[trailer_off..].copy_from_slice(&new_trailer.to_le_bytes());
393        assert_eq!(
394            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
395            Err(EnvelopeError::BodyCrcMismatch)
396        );
397    }
398
399    #[test]
400    fn rejects_trailer_crc_corruption() {
401        let mut w = EnvelopeWriter::new(meta());
402        w.push_section(7, b"x".to_vec()).unwrap();
403        let mut bytes = w.finalize();
404        let last = bytes.len() - 1;
405        bytes[last] ^= 0xFF;
406        assert_eq!(
407            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
408            Err(EnvelopeError::TrailerCrcMismatch)
409        );
410    }
411
412    #[test]
413    fn rejects_oversized_total() {
414        let mut w = EnvelopeWriter::with_caps(meta(), 64, DEFAULT_MAX_SECTION_BYTES);
415        let err = w.push_section(1, vec![0u8; 1024]).unwrap_err();
416        assert!(matches!(err, EnvelopeError::OverSizeTotal { .. }));
417    }
418
419    #[test]
420    fn rejects_oversized_section_at_write() {
421        let mut w = EnvelopeWriter::with_caps(meta(), DEFAULT_MAX_TOTAL_BYTES, 8);
422        let err = w.push_section(1, vec![0u8; 9]).unwrap_err();
423        assert!(matches!(err, EnvelopeError::OverSizeSection { .. }));
424    }
425
426    #[test]
427    fn rejects_oversized_total_at_parse() {
428        let bytes = EnvelopeWriter::new(meta()).finalize();
429        assert!(matches!(
430            parse(&bytes, 4),
431            Err(EnvelopeError::OverSizeTotal { .. })
432        ));
433    }
434
435    #[test]
436    fn truncated_section_body() {
437        let mut w = EnvelopeWriter::new(meta());
438        w.push_section(1, b"hello world".to_vec()).unwrap();
439        let bytes = w.finalize();
440        // Lop the last 8 bytes; trailer crc will fail and parse returns
441        // either TrailerCrcMismatch or Truncated. Either is a sound rejection.
442        let truncated = &bytes[..bytes.len() - 8];
443        assert!(parse(truncated, DEFAULT_MAX_TOTAL_BYTES).is_err());
444    }
445}