nodedb_types/backup_envelope/
read.rs1use super::types::{Envelope, EnvelopeError, EnvelopeMeta, Section, read2, read4, read8};
6use super::types::{HEADER_LEN, MAGIC, SECTION_OVERHEAD, TRAILER_LEN, VERSION};
7
8pub fn parse(bytes: &[u8], max_total: u64) -> Result<Envelope, EnvelopeError> {
13 if bytes.len() as u64 > max_total {
14 return Err(EnvelopeError::OverSizeTotal { cap: max_total });
15 }
16 if bytes.len() < HEADER_LEN + TRAILER_LEN {
17 return Err(EnvelopeError::Truncated);
18 }
19
20 let header_bytes = &bytes[..HEADER_LEN];
22 if &header_bytes[0..4] != MAGIC {
23 return Err(EnvelopeError::BadMagic);
24 }
25 let version = header_bytes[4];
26 if version != VERSION {
27 return Err(EnvelopeError::UnsupportedVersion(version));
28 }
29
30 parse_validated_body(bytes, header_bytes)
31}
32
33pub(super) fn parse_validated_body(
37 bytes: &[u8],
38 header_bytes: &[u8],
39) -> Result<Envelope, EnvelopeError> {
40 let claimed_header_crc = u32::from_le_bytes(read4(&header_bytes[48..52]));
42 let actual_header_crc = crc32c::crc32c(&header_bytes[..48]);
43 if claimed_header_crc != actual_header_crc {
44 return Err(EnvelopeError::HeaderCrcMismatch);
45 }
46
47 let meta = EnvelopeMeta {
48 tenant_id: u64::from_le_bytes(read8(&header_bytes[8..16])),
49 source_vshard_count: u16::from_le_bytes(read2(&header_bytes[16..18])),
50 hash_seed: u64::from_le_bytes(read8(&header_bytes[24..32])),
51 snapshot_watermark: u64::from_le_bytes(read8(&header_bytes[32..40])),
52 };
53 let section_count = u16::from_le_bytes(read2(&header_bytes[40..42]));
54
55 let trailer_start = bytes.len() - TRAILER_LEN;
57 let claimed_trailer_crc = u32::from_le_bytes(read4(&bytes[trailer_start..]));
58 let actual_trailer_crc = crc32c::crc32c(&bytes[..trailer_start]);
59 if claimed_trailer_crc != actual_trailer_crc {
60 return Err(EnvelopeError::TrailerCrcMismatch);
61 }
62
63 parse_sections(bytes, section_count, HEADER_LEN, trailer_start, meta)
64}
65
66pub(super) fn parse_sections(
67 bytes: &[u8],
68 section_count: u16,
69 body_start: usize,
70 trailer_start: usize,
71 meta: EnvelopeMeta,
72) -> Result<Envelope, EnvelopeError> {
73 let mut cursor = body_start;
74 let mut sections = Vec::with_capacity(section_count as usize);
75 for _ in 0..section_count {
76 if cursor + SECTION_OVERHEAD > trailer_start {
77 return Err(EnvelopeError::Truncated);
78 }
79 let origin_node_id = u64::from_le_bytes(read8(&bytes[cursor..cursor + 8]));
80 let body_len = u32::from_le_bytes(read4(&bytes[cursor + 8..cursor + 12])) as usize;
81 let body_start_inner = cursor + 12;
82 let body_end = body_start_inner + body_len;
83 let crc_end = body_end + 4;
84 if crc_end > trailer_start {
85 return Err(EnvelopeError::Truncated);
86 }
87 let body = bytes[body_start_inner..body_end].to_vec();
88 let claimed_body_crc = u32::from_le_bytes(read4(&bytes[body_end..crc_end]));
89 if crc32c::crc32c(&body) != claimed_body_crc {
90 return Err(EnvelopeError::BodyCrcMismatch);
91 }
92 sections.push(Section {
93 origin_node_id,
94 body,
95 });
96 cursor = crc_end;
97 }
98 if cursor != trailer_start {
99 return Err(EnvelopeError::Truncated);
100 }
101
102 Ok(Envelope { meta, sections })
103}
104
105#[cfg(test)]
108mod tests {
109 use super::*;
110 use crate::backup_envelope::types::{DEFAULT_MAX_SECTION_BYTES, DEFAULT_MAX_TOTAL_BYTES};
111 use crate::backup_envelope::write::EnvelopeWriter;
112
113 fn meta() -> EnvelopeMeta {
114 EnvelopeMeta {
115 tenant_id: 42_u64,
116 source_vshard_count: 1024,
117 hash_seed: 0,
118 snapshot_watermark: 12345,
119 }
120 }
121
122 #[test]
123 fn empty_envelope_roundtrips() {
124 let bytes = EnvelopeWriter::new(meta()).finalize();
125 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
126 assert_eq!(env.meta, meta());
127 assert!(env.sections.is_empty());
128 }
129
130 #[test]
131 fn multi_section_roundtrips() {
132 let mut w = EnvelopeWriter::new(meta());
133 w.push_section(1, b"one".to_vec()).unwrap();
134 w.push_section(2, b"two-payload".to_vec()).unwrap();
135 w.push_section(3, vec![]).unwrap();
136 let bytes = w.finalize();
137
138 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
139 assert_eq!(env.sections.len(), 3);
140 assert_eq!(env.sections[0].origin_node_id, 1);
141 assert_eq!(env.sections[0].body, b"one");
142 assert_eq!(env.sections[1].origin_node_id, 2);
143 assert_eq!(env.sections[1].body, b"two-payload");
144 assert_eq!(env.sections[2].body, b"");
145 }
146
147 #[test]
148 fn rejects_short_input() {
149 assert_eq!(
150 parse(b"NDBB", DEFAULT_MAX_TOTAL_BYTES),
151 Err(EnvelopeError::Truncated)
152 );
153 }
154
155 #[test]
156 fn rejects_bad_magic() {
157 let mut bytes = EnvelopeWriter::new(meta()).finalize();
158 bytes[0] = b'X';
159 match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
160 EnvelopeError::BadMagic => {}
161 other => panic!("expected BadMagic, got {other:?}"),
162 }
163 }
164
165 #[test]
166 fn rejects_unsupported_version() {
167 let mut bytes = EnvelopeWriter::new(meta()).finalize();
168 bytes[4] = 99;
169 match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
170 EnvelopeError::UnsupportedVersion(99) => {}
171 other => panic!("expected UnsupportedVersion(99), got {other:?}"),
172 }
173 }
174
175 #[test]
176 fn u64_tenant_id_roundtrips() {
177 let large_meta = EnvelopeMeta {
178 tenant_id: u32::MAX as u64 + 1,
179 source_vshard_count: 512,
180 hash_seed: 0xDEAD_BEEF_CAFE_1234,
181 snapshot_watermark: 9_999_999_999,
182 };
183 let mut w = EnvelopeWriter::new(large_meta);
184 w.push_section(42, b"payload".to_vec()).unwrap();
185 let bytes = w.finalize();
186 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
187 assert_eq!(env.meta, large_meta);
188 assert_eq!(env.sections.len(), 1);
189 assert_eq!(env.sections[0].body, b"payload");
190 }
191
192 #[test]
193 fn rejects_header_crc_corruption() {
194 let mut bytes = EnvelopeWriter::new(meta()).finalize();
195 bytes[8] ^= 0xFF;
196 assert_eq!(
197 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
198 Err(EnvelopeError::HeaderCrcMismatch)
199 );
200 }
201
202 #[test]
203 fn rejects_body_crc_corruption() {
204 let mut w = EnvelopeWriter::new(meta());
205 w.push_section(7, b"hello".to_vec()).unwrap();
206 let mut bytes = w.finalize();
207 let body_off = HEADER_LEN + 12;
208 bytes[body_off] ^= 0xFF;
209 let trailer_off = bytes.len() - TRAILER_LEN;
211 let new_trailer = crc32c::crc32c(&bytes[..trailer_off]);
212 bytes[trailer_off..].copy_from_slice(&new_trailer.to_le_bytes());
213 assert_eq!(
214 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
215 Err(EnvelopeError::BodyCrcMismatch)
216 );
217 }
218
219 #[test]
220 fn rejects_trailer_crc_corruption() {
221 let mut w = EnvelopeWriter::new(meta());
222 w.push_section(7, b"x".to_vec()).unwrap();
223 let mut bytes = w.finalize();
224 let last = bytes.len() - 1;
225 bytes[last] ^= 0xFF;
226 assert_eq!(
227 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
228 Err(EnvelopeError::TrailerCrcMismatch)
229 );
230 }
231
232 #[test]
233 fn rejects_oversized_total() {
234 let mut w = EnvelopeWriter::with_caps(meta(), 64, DEFAULT_MAX_SECTION_BYTES);
235 let err = w.push_section(1, vec![0u8; 1024]).unwrap_err();
236 assert!(matches!(err, EnvelopeError::OverSizeTotal { .. }));
237 }
238
239 #[test]
240 fn rejects_oversized_section_at_write() {
241 let mut w = EnvelopeWriter::with_caps(meta(), DEFAULT_MAX_TOTAL_BYTES, 8);
242 let err = w.push_section(1, vec![0u8; 9]).unwrap_err();
243 assert!(matches!(err, EnvelopeError::OverSizeSection { .. }));
244 }
245
246 #[test]
247 fn rejects_oversized_total_at_parse() {
248 let bytes = EnvelopeWriter::new(meta()).finalize();
249 assert!(matches!(
250 parse(&bytes, 4),
251 Err(EnvelopeError::OverSizeTotal { .. })
252 ));
253 }
254
255 #[test]
256 fn truncated_section_body() {
257 let mut w = EnvelopeWriter::new(meta());
258 w.push_section(1, b"hello world".to_vec()).unwrap();
259 let bytes = w.finalize();
260 let truncated = &bytes[..bytes.len() - 8];
261 assert!(parse(truncated, DEFAULT_MAX_TOTAL_BYTES).is_err());
262 }
263
264 #[test]
267 fn golden_backup_envelope_format() {
268 use crate::backup_envelope::types::{HEADER_LEN, MAGIC, VERSION};
269
270 let bytes = EnvelopeWriter::new(meta()).finalize();
271
272 assert_eq!(&bytes[0..4], MAGIC.as_slice(), "magic mismatch");
274
275 assert_eq!(bytes[4], VERSION, "version mismatch");
277 assert_eq!(bytes[4], 1u8, "expected VERSION == 1");
278
279 assert!(bytes.len() >= HEADER_LEN, "envelope too short for header");
281 let stored_crc = u32::from_le_bytes([bytes[48], bytes[49], bytes[50], bytes[51]]);
282 let recomputed = crc32c::crc32c(&bytes[..48]);
283 assert_eq!(stored_crc, recomputed, "header CRC mismatch");
284 }
285}