Skip to main content

nodedb_types/
backup_envelope.rs

1//! Backup envelope: framing for tenant backup bytes.
2//!
3//! All multi-byte integers are little-endian.
4//!
5//! ```text
6//!   ┌─ HEADER ────────────────────────────────────────────────────┐
7//!   │ magic       : 4 bytes  = b"NDBB"                            │
8//!   │ version     : u8       = 1                                  │
9//!   │ _reserved   : 3 bytes  = 0                                  │
10//!   │ tenant_id   : u32                                           │
11//!   │ src_vshards : u16   (source cluster's VSHARD_COUNT)         │
12//!   │ _reserved   : 2 bytes  = 0                                  │
13//!   │ hash_seed   : u64   (source cluster's hash seed, 0 today)   │
14//!   │ watermark   : u64   (snapshot LSN; 0 if not captured)       │
15//!   │ section_cnt : u16                                           │
16//!   │ _reserved   : 6 bytes  = 0                                  │
17//!   │ header_crc  : u32   (crc32c over the preceding 40 bytes)    │
18//!   └─────────────────────────────────────────────────────────────┘
19//!   ┌─ SECTION × section_cnt ─────────────────────────────────────┐
20//!   │ origin_node : u64                                           │
21//!   │ body_len    : u32   (≤ MAX_SECTION_BYTES)                   │
22//!   │ body        : body_len bytes                                │
23//!   │ body_crc    : u32   (crc32c over body)                      │
24//!   └─────────────────────────────────────────────────────────────┘
25//!   ┌─ TRAILER ───────────────────────────────────────────────────┐
26//!   │ trailer_crc : u32   (crc32c over header bytes + every       │
27//!   │                      section's framed bytes)                │
28//!   └─────────────────────────────────────────────────────────────┘
29//! ```
30//!
31//! Total envelope size is bounded by `MAX_TOTAL_BYTES`. The decoder
32//! short-circuits before allocating per body / per envelope, so a
33//! caller-supplied byte stream cannot drive unbounded allocation.
34
35use thiserror::Error;
36
37pub const MAGIC: &[u8; 4] = b"NDBB";
38pub const VERSION: u8 = 1;
39
40/// Header is fixed-size — 44 bytes (40 framed + 4 crc).
41pub const HEADER_LEN: usize = 44;
42/// Per-section framing overhead: origin(8) + len(4) + crc(4).
43pub const SECTION_OVERHEAD: usize = 16;
44/// Trailing crc.
45pub const TRAILER_LEN: usize = 4;
46
47/// Default cap on total envelope size: 16 GiB. Tunable per call.
48pub const DEFAULT_MAX_TOTAL_BYTES: u64 = 16 * 1024 * 1024 * 1024;
49/// Default cap on a single section body: 16 GiB.
50pub const DEFAULT_MAX_SECTION_BYTES: u64 = 16 * 1024 * 1024 * 1024;
51
52#[derive(Debug, Error, PartialEq, Eq)]
53pub enum EnvelopeError {
54    #[error("invalid backup format")]
55    BadMagic,
56    #[error("unsupported backup version: {0}")]
57    UnsupportedVersion(u8),
58    #[error("invalid backup format")]
59    HeaderCrcMismatch,
60    #[error("invalid backup format")]
61    BodyCrcMismatch,
62    #[error("invalid backup format")]
63    TrailerCrcMismatch,
64    #[error("backup truncated")]
65    Truncated,
66    #[error("backup tenant mismatch: expected {expected}, got {actual}")]
67    TenantMismatch { expected: u32, actual: u32 },
68    #[error("backup exceeds size cap of {cap} bytes")]
69    OverSizeTotal { cap: u64 },
70    #[error("backup section exceeds size cap of {cap} bytes")]
71    OverSizeSection { cap: u64 },
72    #[error("too many sections: {0}")]
73    TooManySections(u16),
74}
75
76/// Header metadata captured at backup time.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct EnvelopeMeta {
79    pub tenant_id: u32,
80    pub source_vshard_count: u16,
81    pub hash_seed: u64,
82    pub snapshot_watermark: u64,
83}
84
85/// One contiguous body produced by one origin node.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct Section {
88    pub origin_node_id: u64,
89    pub body: Vec<u8>,
90}
91
92/// Decoded envelope.
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct Envelope {
95    pub meta: EnvelopeMeta,
96    pub sections: Vec<Section>,
97}
98
99/// Build an envelope by pushing sections one at a time, then `finalize()`.
100pub struct EnvelopeWriter {
101    meta: EnvelopeMeta,
102    sections: Vec<Section>,
103    max_total: u64,
104    max_section: u64,
105    framed_size: u64,
106}
107
108impl EnvelopeWriter {
109    pub fn new(meta: EnvelopeMeta) -> Self {
110        Self::with_caps(meta, DEFAULT_MAX_TOTAL_BYTES, DEFAULT_MAX_SECTION_BYTES)
111    }
112
113    pub fn with_caps(meta: EnvelopeMeta, max_total: u64, max_section: u64) -> Self {
114        Self {
115            meta,
116            sections: Vec::new(),
117            max_total,
118            max_section,
119            framed_size: HEADER_LEN as u64 + TRAILER_LEN as u64,
120        }
121    }
122
123    pub fn push_section(
124        &mut self,
125        origin_node_id: u64,
126        body: Vec<u8>,
127    ) -> Result<(), EnvelopeError> {
128        if body.len() as u64 > self.max_section {
129            return Err(EnvelopeError::OverSizeSection {
130                cap: self.max_section,
131            });
132        }
133        let added = SECTION_OVERHEAD as u64 + body.len() as u64;
134        if self.framed_size + added > self.max_total {
135            return Err(EnvelopeError::OverSizeTotal {
136                cap: self.max_total,
137            });
138        }
139        if self.sections.len() >= u16::MAX as usize {
140            return Err(EnvelopeError::TooManySections(u16::MAX));
141        }
142        self.framed_size += added;
143        self.sections.push(Section {
144            origin_node_id,
145            body,
146        });
147        Ok(())
148    }
149
150    pub fn finalize(self) -> Vec<u8> {
151        let mut out = Vec::with_capacity(self.framed_size as usize);
152        write_header(&mut out, &self.meta, self.sections.len() as u16);
153        for section in &self.sections {
154            write_section(&mut out, section);
155        }
156        // Trailer crc covers header bytes + every section's framed bytes.
157        let trailer_crc = crc32c::crc32c(&out);
158        out.extend_from_slice(&trailer_crc.to_le_bytes());
159        out
160    }
161}
162
163fn write_header(out: &mut Vec<u8>, meta: &EnvelopeMeta, section_count: u16) {
164    let start = out.len();
165    out.extend_from_slice(MAGIC);
166    out.push(VERSION);
167    out.extend_from_slice(&[0u8; 3]);
168    out.extend_from_slice(&meta.tenant_id.to_le_bytes());
169    out.extend_from_slice(&meta.source_vshard_count.to_le_bytes());
170    out.extend_from_slice(&[0u8; 2]);
171    out.extend_from_slice(&meta.hash_seed.to_le_bytes());
172    out.extend_from_slice(&meta.snapshot_watermark.to_le_bytes());
173    out.extend_from_slice(&section_count.to_le_bytes());
174    out.extend_from_slice(&[0u8; 6]);
175    let header_crc = crc32c::crc32c(&out[start..]);
176    out.extend_from_slice(&header_crc.to_le_bytes());
177}
178
179fn write_section(out: &mut Vec<u8>, section: &Section) {
180    out.extend_from_slice(&section.origin_node_id.to_le_bytes());
181    out.extend_from_slice(&(section.body.len() as u32).to_le_bytes());
182    out.extend_from_slice(&section.body);
183    let body_crc = crc32c::crc32c(&section.body);
184    out.extend_from_slice(&body_crc.to_le_bytes());
185}
186
187/// Parse and fully validate an envelope.
188pub fn parse(bytes: &[u8], max_total: u64) -> Result<Envelope, EnvelopeError> {
189    if bytes.len() as u64 > max_total {
190        return Err(EnvelopeError::OverSizeTotal { cap: max_total });
191    }
192    if bytes.len() < HEADER_LEN + TRAILER_LEN {
193        return Err(EnvelopeError::Truncated);
194    }
195
196    // Header.
197    let header_bytes = &bytes[..HEADER_LEN];
198    if &header_bytes[0..4] != MAGIC {
199        return Err(EnvelopeError::BadMagic);
200    }
201    let version = header_bytes[4];
202    if version != VERSION {
203        return Err(EnvelopeError::UnsupportedVersion(version));
204    }
205    let claimed_header_crc = u32::from_le_bytes(read4(&header_bytes[40..44]));
206    let actual_header_crc = crc32c::crc32c(&header_bytes[..40]);
207    if claimed_header_crc != actual_header_crc {
208        return Err(EnvelopeError::HeaderCrcMismatch);
209    }
210
211    let meta = EnvelopeMeta {
212        tenant_id: u32::from_le_bytes(read4(&header_bytes[8..12])),
213        source_vshard_count: u16::from_le_bytes(read2(&header_bytes[12..14])),
214        hash_seed: u64::from_le_bytes(read8(&header_bytes[16..24])),
215        snapshot_watermark: u64::from_le_bytes(read8(&header_bytes[24..32])),
216    };
217    let section_count = u16::from_le_bytes(read2(&header_bytes[32..34]));
218
219    // Trailer position: tail 4 bytes.
220    let trailer_start = bytes.len() - TRAILER_LEN;
221    let claimed_trailer_crc = u32::from_le_bytes(read4(&bytes[trailer_start..]));
222    let actual_trailer_crc = crc32c::crc32c(&bytes[..trailer_start]);
223    if claimed_trailer_crc != actual_trailer_crc {
224        return Err(EnvelopeError::TrailerCrcMismatch);
225    }
226
227    // Sections live between header and trailer.
228    let mut cursor = HEADER_LEN;
229    let mut sections = Vec::with_capacity(section_count as usize);
230    for _ in 0..section_count {
231        if cursor + SECTION_OVERHEAD > trailer_start {
232            return Err(EnvelopeError::Truncated);
233        }
234        let origin_node_id = u64::from_le_bytes(read8(&bytes[cursor..cursor + 8]));
235        let body_len = u32::from_le_bytes(read4(&bytes[cursor + 8..cursor + 12])) as usize;
236        let body_start = cursor + 12;
237        let body_end = body_start + body_len;
238        let crc_end = body_end + 4;
239        if crc_end > trailer_start {
240            return Err(EnvelopeError::Truncated);
241        }
242        let body = bytes[body_start..body_end].to_vec();
243        let claimed_body_crc = u32::from_le_bytes(read4(&bytes[body_end..crc_end]));
244        if crc32c::crc32c(&body) != claimed_body_crc {
245            return Err(EnvelopeError::BodyCrcMismatch);
246        }
247        sections.push(Section {
248            origin_node_id,
249            body,
250        });
251        cursor = crc_end;
252    }
253    if cursor != trailer_start {
254        return Err(EnvelopeError::Truncated);
255    }
256
257    Ok(Envelope { meta, sections })
258}
259
260fn read2(s: &[u8]) -> [u8; 2] {
261    [s[0], s[1]]
262}
263fn read4(s: &[u8]) -> [u8; 4] {
264    [s[0], s[1], s[2], s[3]]
265}
266fn read8(s: &[u8]) -> [u8; 8] {
267    [s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7]]
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    fn meta() -> EnvelopeMeta {
275        EnvelopeMeta {
276            tenant_id: 42,
277            source_vshard_count: 1024,
278            hash_seed: 0,
279            snapshot_watermark: 12345,
280        }
281    }
282
283    #[test]
284    fn empty_envelope_roundtrips() {
285        let bytes = EnvelopeWriter::new(meta()).finalize();
286        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
287        assert_eq!(env.meta, meta());
288        assert!(env.sections.is_empty());
289    }
290
291    #[test]
292    fn multi_section_roundtrips() {
293        let mut w = EnvelopeWriter::new(meta());
294        w.push_section(1, b"one".to_vec()).unwrap();
295        w.push_section(2, b"two-payload".to_vec()).unwrap();
296        w.push_section(3, vec![]).unwrap();
297        let bytes = w.finalize();
298
299        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
300        assert_eq!(env.sections.len(), 3);
301        assert_eq!(env.sections[0].origin_node_id, 1);
302        assert_eq!(env.sections[0].body, b"one");
303        assert_eq!(env.sections[1].origin_node_id, 2);
304        assert_eq!(env.sections[1].body, b"two-payload");
305        assert_eq!(env.sections[2].body, b"");
306    }
307
308    #[test]
309    fn rejects_short_input() {
310        assert_eq!(
311            parse(b"NDBB", DEFAULT_MAX_TOTAL_BYTES),
312            Err(EnvelopeError::Truncated)
313        );
314    }
315
316    #[test]
317    fn rejects_bad_magic() {
318        let mut bytes = EnvelopeWriter::new(meta()).finalize();
319        bytes[0] = b'X';
320        // Header CRC will also fail; ensure magic check trips first.
321        match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
322            EnvelopeError::BadMagic => {}
323            other => panic!("expected BadMagic, got {other:?}"),
324        }
325    }
326
327    #[test]
328    fn rejects_unsupported_version() {
329        let mut bytes = EnvelopeWriter::new(meta()).finalize();
330        bytes[4] = 99;
331        // Header CRC will mismatch, but version is checked first.
332        match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
333            EnvelopeError::UnsupportedVersion(99) => {}
334            other => panic!("expected UnsupportedVersion(99), got {other:?}"),
335        }
336    }
337
338    #[test]
339    fn rejects_header_crc_corruption() {
340        let mut bytes = EnvelopeWriter::new(meta()).finalize();
341        bytes[8] ^= 0xFF; // mutate tenant_id, leave header crc stale
342        assert_eq!(
343            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
344            Err(EnvelopeError::HeaderCrcMismatch)
345        );
346    }
347
348    #[test]
349    fn rejects_body_crc_corruption() {
350        let mut w = EnvelopeWriter::new(meta());
351        w.push_section(7, b"hello".to_vec()).unwrap();
352        let mut bytes = w.finalize();
353        // Body sits at HEADER_LEN + 8 (origin) + 4 (len) = HEADER_LEN+12.
354        let body_off = HEADER_LEN + 12;
355        bytes[body_off] ^= 0xFF;
356        // Trailer CRC will fail before body CRC is even checked. Recompute
357        // trailer to isolate body-CRC enforcement.
358        let trailer_off = bytes.len() - TRAILER_LEN;
359        let new_trailer = crc32c::crc32c(&bytes[..trailer_off]);
360        bytes[trailer_off..].copy_from_slice(&new_trailer.to_le_bytes());
361        assert_eq!(
362            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
363            Err(EnvelopeError::BodyCrcMismatch)
364        );
365    }
366
367    #[test]
368    fn rejects_trailer_crc_corruption() {
369        let mut w = EnvelopeWriter::new(meta());
370        w.push_section(7, b"x".to_vec()).unwrap();
371        let mut bytes = w.finalize();
372        let last = bytes.len() - 1;
373        bytes[last] ^= 0xFF;
374        assert_eq!(
375            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
376            Err(EnvelopeError::TrailerCrcMismatch)
377        );
378    }
379
380    #[test]
381    fn rejects_oversized_total() {
382        let mut w = EnvelopeWriter::with_caps(meta(), 64, DEFAULT_MAX_SECTION_BYTES);
383        let err = w.push_section(1, vec![0u8; 1024]).unwrap_err();
384        assert!(matches!(err, EnvelopeError::OverSizeTotal { .. }));
385    }
386
387    #[test]
388    fn rejects_oversized_section_at_write() {
389        let mut w = EnvelopeWriter::with_caps(meta(), DEFAULT_MAX_TOTAL_BYTES, 8);
390        let err = w.push_section(1, vec![0u8; 9]).unwrap_err();
391        assert!(matches!(err, EnvelopeError::OverSizeSection { .. }));
392    }
393
394    #[test]
395    fn rejects_oversized_total_at_parse() {
396        let bytes = EnvelopeWriter::new(meta()).finalize();
397        assert!(matches!(
398            parse(&bytes, 4),
399            Err(EnvelopeError::OverSizeTotal { .. })
400        ));
401    }
402
403    #[test]
404    fn truncated_section_body() {
405        let mut w = EnvelopeWriter::new(meta());
406        w.push_section(1, b"hello world".to_vec()).unwrap();
407        let bytes = w.finalize();
408        // Lop the last 8 bytes; trailer crc will fail and parse returns
409        // either TrailerCrcMismatch or Truncated. Either is a sound rejection.
410        let truncated = &bytes[..bytes.len() - 8];
411        assert!(parse(truncated, DEFAULT_MAX_TOTAL_BYTES).is_err());
412    }
413}