Skip to main content

nodedb_types/backup_envelope/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! `parse` — decode and fully validate a plaintext backup envelope.
4
5use super::types::{Envelope, EnvelopeError, EnvelopeMeta, Section, read2, read4, read8};
6use super::types::{HEADER_LEN, MAGIC, SECTION_OVERHEAD, TRAILER_LEN, VERSION};
7
8/// Parse and fully validate a plaintext backup envelope.
9///
10/// Rejects bytes that do not carry version 1 in the header.
11/// Use [`crate::backup_envelope::parse_encrypted`] for encrypted envelopes.
12pub fn parse(bytes: &[u8], max_total: u64) -> Result<Envelope, EnvelopeError> {
13    if bytes.len() as u64 > max_total {
14        return Err(EnvelopeError::OverSizeTotal { cap: max_total });
15    }
16    if bytes.len() < HEADER_LEN + TRAILER_LEN {
17        return Err(EnvelopeError::Truncated);
18    }
19
20    // Header.
21    let header_bytes = &bytes[..HEADER_LEN];
22    if &header_bytes[0..4] != MAGIC {
23        return Err(EnvelopeError::BadMagic);
24    }
25    let version = header_bytes[4];
26    if version != VERSION {
27        return Err(EnvelopeError::UnsupportedVersion(version));
28    }
29
30    parse_validated_body(bytes, header_bytes)
31}
32
33/// Inner parse shared by the plain and encrypted paths.
34/// Caller has already verified magic, version, and (for encrypted path)
35/// the crypto header. `bytes` is the full envelope; `header_bytes` is `&bytes[..HEADER_LEN]`.
36pub(super) fn parse_validated_body(
37    bytes: &[u8],
38    header_bytes: &[u8],
39) -> Result<Envelope, EnvelopeError> {
40    // Validate header CRC.
41    let claimed_header_crc = u32::from_le_bytes(read4(&header_bytes[48..52]));
42    let actual_header_crc = crc32c::crc32c(&header_bytes[..48]);
43    if claimed_header_crc != actual_header_crc {
44        return Err(EnvelopeError::HeaderCrcMismatch);
45    }
46
47    let meta = EnvelopeMeta {
48        tenant_id: u64::from_le_bytes(read8(&header_bytes[8..16])),
49        source_vshard_count: u16::from_le_bytes(read2(&header_bytes[16..18])),
50        hash_seed: u64::from_le_bytes(read8(&header_bytes[24..32])),
51        snapshot_watermark: u64::from_le_bytes(read8(&header_bytes[32..40])),
52    };
53    let section_count = u16::from_le_bytes(read2(&header_bytes[40..42]));
54
55    // Trailer position: tail 4 bytes.
56    let trailer_start = bytes.len() - TRAILER_LEN;
57    let claimed_trailer_crc = u32::from_le_bytes(read4(&bytes[trailer_start..]));
58    let actual_trailer_crc = crc32c::crc32c(&bytes[..trailer_start]);
59    if claimed_trailer_crc != actual_trailer_crc {
60        return Err(EnvelopeError::TrailerCrcMismatch);
61    }
62
63    parse_sections(bytes, section_count, HEADER_LEN, trailer_start, meta)
64}
65
66pub(super) fn parse_sections(
67    bytes: &[u8],
68    section_count: u16,
69    body_start: usize,
70    trailer_start: usize,
71    meta: EnvelopeMeta,
72) -> Result<Envelope, EnvelopeError> {
73    let mut cursor = body_start;
74    let mut sections = Vec::with_capacity(section_count as usize);
75    for _ in 0..section_count {
76        if cursor + SECTION_OVERHEAD > trailer_start {
77            return Err(EnvelopeError::Truncated);
78        }
79        let origin_node_id = u64::from_le_bytes(read8(&bytes[cursor..cursor + 8]));
80        let body_len = u32::from_le_bytes(read4(&bytes[cursor + 8..cursor + 12])) as usize;
81        let body_start_inner = cursor + 12;
82        let body_end = body_start_inner + body_len;
83        let crc_end = body_end + 4;
84        if crc_end > trailer_start {
85            return Err(EnvelopeError::Truncated);
86        }
87        let body = bytes[body_start_inner..body_end].to_vec();
88        let claimed_body_crc = u32::from_le_bytes(read4(&bytes[body_end..crc_end]));
89        if crc32c::crc32c(&body) != claimed_body_crc {
90            return Err(EnvelopeError::BodyCrcMismatch);
91        }
92        sections.push(Section {
93            origin_node_id,
94            body,
95        });
96        cursor = crc_end;
97    }
98    if cursor != trailer_start {
99        return Err(EnvelopeError::Truncated);
100    }
101
102    Ok(Envelope { meta, sections })
103}
104
105// ── shared tests for plaintext path ─────────────────────────────────────────
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use crate::backup_envelope::types::{DEFAULT_MAX_SECTION_BYTES, DEFAULT_MAX_TOTAL_BYTES};
111    use crate::backup_envelope::write::EnvelopeWriter;
112
113    fn meta() -> EnvelopeMeta {
114        EnvelopeMeta {
115            tenant_id: 42_u64,
116            source_vshard_count: 1024,
117            hash_seed: 0,
118            snapshot_watermark: 12345,
119        }
120    }
121
122    #[test]
123    fn empty_envelope_roundtrips() {
124        let bytes = EnvelopeWriter::new(meta()).finalize();
125        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
126        assert_eq!(env.meta, meta());
127        assert!(env.sections.is_empty());
128    }
129
130    #[test]
131    fn multi_section_roundtrips() {
132        let mut w = EnvelopeWriter::new(meta());
133        w.push_section(1, b"one".to_vec()).unwrap();
134        w.push_section(2, b"two-payload".to_vec()).unwrap();
135        w.push_section(3, vec![]).unwrap();
136        let bytes = w.finalize();
137
138        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
139        assert_eq!(env.sections.len(), 3);
140        assert_eq!(env.sections[0].origin_node_id, 1);
141        assert_eq!(env.sections[0].body, b"one");
142        assert_eq!(env.sections[1].origin_node_id, 2);
143        assert_eq!(env.sections[1].body, b"two-payload");
144        assert_eq!(env.sections[2].body, b"");
145    }
146
147    #[test]
148    fn rejects_short_input() {
149        assert_eq!(
150            parse(b"NDBB", DEFAULT_MAX_TOTAL_BYTES),
151            Err(EnvelopeError::Truncated)
152        );
153    }
154
155    #[test]
156    fn rejects_bad_magic() {
157        let mut bytes = EnvelopeWriter::new(meta()).finalize();
158        bytes[0] = b'X';
159        match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
160            EnvelopeError::BadMagic => {}
161            other => panic!("expected BadMagic, got {other:?}"),
162        }
163    }
164
165    #[test]
166    fn rejects_unsupported_version() {
167        let mut bytes = EnvelopeWriter::new(meta()).finalize();
168        bytes[4] = 99;
169        match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
170            EnvelopeError::UnsupportedVersion(99) => {}
171            other => panic!("expected UnsupportedVersion(99), got {other:?}"),
172        }
173    }
174
175    #[test]
176    fn u64_tenant_id_roundtrips() {
177        let large_meta = EnvelopeMeta {
178            tenant_id: u32::MAX as u64 + 1,
179            source_vshard_count: 512,
180            hash_seed: 0xDEAD_BEEF_CAFE_1234,
181            snapshot_watermark: 9_999_999_999,
182        };
183        let mut w = EnvelopeWriter::new(large_meta);
184        w.push_section(42, b"payload".to_vec()).unwrap();
185        let bytes = w.finalize();
186        let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
187        assert_eq!(env.meta, large_meta);
188        assert_eq!(env.sections.len(), 1);
189        assert_eq!(env.sections[0].body, b"payload");
190    }
191
192    #[test]
193    fn rejects_header_crc_corruption() {
194        let mut bytes = EnvelopeWriter::new(meta()).finalize();
195        bytes[8] ^= 0xFF;
196        assert_eq!(
197            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
198            Err(EnvelopeError::HeaderCrcMismatch)
199        );
200    }
201
202    #[test]
203    fn rejects_body_crc_corruption() {
204        let mut w = EnvelopeWriter::new(meta());
205        w.push_section(7, b"hello".to_vec()).unwrap();
206        let mut bytes = w.finalize();
207        let body_off = HEADER_LEN + 12;
208        bytes[body_off] ^= 0xFF;
209        // Recompute trailer to isolate body-CRC enforcement.
210        let trailer_off = bytes.len() - TRAILER_LEN;
211        let new_trailer = crc32c::crc32c(&bytes[..trailer_off]);
212        bytes[trailer_off..].copy_from_slice(&new_trailer.to_le_bytes());
213        assert_eq!(
214            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
215            Err(EnvelopeError::BodyCrcMismatch)
216        );
217    }
218
219    #[test]
220    fn rejects_trailer_crc_corruption() {
221        let mut w = EnvelopeWriter::new(meta());
222        w.push_section(7, b"x".to_vec()).unwrap();
223        let mut bytes = w.finalize();
224        let last = bytes.len() - 1;
225        bytes[last] ^= 0xFF;
226        assert_eq!(
227            parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
228            Err(EnvelopeError::TrailerCrcMismatch)
229        );
230    }
231
232    #[test]
233    fn rejects_oversized_total() {
234        let mut w = EnvelopeWriter::with_caps(meta(), 64, DEFAULT_MAX_SECTION_BYTES);
235        let err = w.push_section(1, vec![0u8; 1024]).unwrap_err();
236        assert!(matches!(err, EnvelopeError::OverSizeTotal { .. }));
237    }
238
239    #[test]
240    fn rejects_oversized_section_at_write() {
241        let mut w = EnvelopeWriter::with_caps(meta(), DEFAULT_MAX_TOTAL_BYTES, 8);
242        let err = w.push_section(1, vec![0u8; 9]).unwrap_err();
243        assert!(matches!(err, EnvelopeError::OverSizeSection { .. }));
244    }
245
246    #[test]
247    fn rejects_oversized_total_at_parse() {
248        let bytes = EnvelopeWriter::new(meta()).finalize();
249        assert!(matches!(
250            parse(&bytes, 4),
251            Err(EnvelopeError::OverSizeTotal { .. })
252        ));
253    }
254
255    #[test]
256    fn truncated_section_body() {
257        let mut w = EnvelopeWriter::new(meta());
258        w.push_section(1, b"hello world".to_vec()).unwrap();
259        let bytes = w.finalize();
260        let truncated = &bytes[..bytes.len() - 8];
261        assert!(parse(truncated, DEFAULT_MAX_TOTAL_BYTES).is_err());
262    }
263
264    /// Asserts `NDBB` magic at [0..4], VERSION == 1 at [4], and that the
265    /// header CRC at [48..52] covers header bytes [0..48].
266    #[test]
267    fn golden_backup_envelope_format() {
268        use crate::backup_envelope::types::{HEADER_LEN, MAGIC, VERSION};
269
270        let bytes = EnvelopeWriter::new(meta()).finalize();
271
272        // Magic at [0..4].
273        assert_eq!(&bytes[0..4], MAGIC.as_slice(), "magic mismatch");
274
275        // VERSION == 1 at [4].
276        assert_eq!(bytes[4], VERSION, "version mismatch");
277        assert_eq!(bytes[4], 1u8, "expected VERSION == 1");
278
279        // Header CRC at [48..52] covers [0..48].
280        assert!(bytes.len() >= HEADER_LEN, "envelope too short for header");
281        let stored_crc = u32::from_le_bytes([bytes[48], bytes[49], bytes[50], bytes[51]]);
282        let recomputed = crc32c::crc32c(&bytes[..48]);
283        assert_eq!(stored_crc, recomputed, "header CRC mismatch");
284    }
285}