1use thiserror::Error;
36
37pub const MAGIC: &[u8; 4] = b"NDBB";
38pub const VERSION: u8 = 1;
39
40pub const HEADER_LEN: usize = 44;
42pub const SECTION_OVERHEAD: usize = 16;
44pub const TRAILER_LEN: usize = 4;
46
47pub const DEFAULT_MAX_TOTAL_BYTES: u64 = 16 * 1024 * 1024 * 1024;
49pub const DEFAULT_MAX_SECTION_BYTES: u64 = 16 * 1024 * 1024 * 1024;
51
52#[derive(Debug, Error, PartialEq, Eq)]
53pub enum EnvelopeError {
54 #[error("invalid backup format")]
55 BadMagic,
56 #[error("unsupported backup version: {0}")]
57 UnsupportedVersion(u8),
58 #[error("invalid backup format")]
59 HeaderCrcMismatch,
60 #[error("invalid backup format")]
61 BodyCrcMismatch,
62 #[error("invalid backup format")]
63 TrailerCrcMismatch,
64 #[error("backup truncated")]
65 Truncated,
66 #[error("backup tenant mismatch: expected {expected}, got {actual}")]
67 TenantMismatch { expected: u32, actual: u32 },
68 #[error("backup exceeds size cap of {cap} bytes")]
69 OverSizeTotal { cap: u64 },
70 #[error("backup section exceeds size cap of {cap} bytes")]
71 OverSizeSection { cap: u64 },
72 #[error("too many sections: {0}")]
73 TooManySections(u16),
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct EnvelopeMeta {
79 pub tenant_id: u32,
80 pub source_vshard_count: u16,
81 pub hash_seed: u64,
82 pub snapshot_watermark: u64,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct Section {
88 pub origin_node_id: u64,
89 pub body: Vec<u8>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct Envelope {
95 pub meta: EnvelopeMeta,
96 pub sections: Vec<Section>,
97}
98
99pub struct EnvelopeWriter {
101 meta: EnvelopeMeta,
102 sections: Vec<Section>,
103 max_total: u64,
104 max_section: u64,
105 framed_size: u64,
106}
107
108impl EnvelopeWriter {
109 pub fn new(meta: EnvelopeMeta) -> Self {
110 Self::with_caps(meta, DEFAULT_MAX_TOTAL_BYTES, DEFAULT_MAX_SECTION_BYTES)
111 }
112
113 pub fn with_caps(meta: EnvelopeMeta, max_total: u64, max_section: u64) -> Self {
114 Self {
115 meta,
116 sections: Vec::new(),
117 max_total,
118 max_section,
119 framed_size: HEADER_LEN as u64 + TRAILER_LEN as u64,
120 }
121 }
122
123 pub fn push_section(
124 &mut self,
125 origin_node_id: u64,
126 body: Vec<u8>,
127 ) -> Result<(), EnvelopeError> {
128 if body.len() as u64 > self.max_section {
129 return Err(EnvelopeError::OverSizeSection {
130 cap: self.max_section,
131 });
132 }
133 let added = SECTION_OVERHEAD as u64 + body.len() as u64;
134 if self.framed_size + added > self.max_total {
135 return Err(EnvelopeError::OverSizeTotal {
136 cap: self.max_total,
137 });
138 }
139 if self.sections.len() >= u16::MAX as usize {
140 return Err(EnvelopeError::TooManySections(u16::MAX));
141 }
142 self.framed_size += added;
143 self.sections.push(Section {
144 origin_node_id,
145 body,
146 });
147 Ok(())
148 }
149
150 pub fn finalize(self) -> Vec<u8> {
151 let mut out = Vec::with_capacity(self.framed_size as usize);
152 write_header(&mut out, &self.meta, self.sections.len() as u16);
153 for section in &self.sections {
154 write_section(&mut out, section);
155 }
156 let trailer_crc = crc32c::crc32c(&out);
158 out.extend_from_slice(&trailer_crc.to_le_bytes());
159 out
160 }
161}
162
163fn write_header(out: &mut Vec<u8>, meta: &EnvelopeMeta, section_count: u16) {
164 let start = out.len();
165 out.extend_from_slice(MAGIC);
166 out.push(VERSION);
167 out.extend_from_slice(&[0u8; 3]);
168 out.extend_from_slice(&meta.tenant_id.to_le_bytes());
169 out.extend_from_slice(&meta.source_vshard_count.to_le_bytes());
170 out.extend_from_slice(&[0u8; 2]);
171 out.extend_from_slice(&meta.hash_seed.to_le_bytes());
172 out.extend_from_slice(&meta.snapshot_watermark.to_le_bytes());
173 out.extend_from_slice(§ion_count.to_le_bytes());
174 out.extend_from_slice(&[0u8; 6]);
175 let header_crc = crc32c::crc32c(&out[start..]);
176 out.extend_from_slice(&header_crc.to_le_bytes());
177}
178
179fn write_section(out: &mut Vec<u8>, section: &Section) {
180 out.extend_from_slice(§ion.origin_node_id.to_le_bytes());
181 out.extend_from_slice(&(section.body.len() as u32).to_le_bytes());
182 out.extend_from_slice(§ion.body);
183 let body_crc = crc32c::crc32c(§ion.body);
184 out.extend_from_slice(&body_crc.to_le_bytes());
185}
186
187pub fn parse(bytes: &[u8], max_total: u64) -> Result<Envelope, EnvelopeError> {
189 if bytes.len() as u64 > max_total {
190 return Err(EnvelopeError::OverSizeTotal { cap: max_total });
191 }
192 if bytes.len() < HEADER_LEN + TRAILER_LEN {
193 return Err(EnvelopeError::Truncated);
194 }
195
196 let header_bytes = &bytes[..HEADER_LEN];
198 if &header_bytes[0..4] != MAGIC {
199 return Err(EnvelopeError::BadMagic);
200 }
201 let version = header_bytes[4];
202 if version != VERSION {
203 return Err(EnvelopeError::UnsupportedVersion(version));
204 }
205 let claimed_header_crc = u32::from_le_bytes(read4(&header_bytes[40..44]));
206 let actual_header_crc = crc32c::crc32c(&header_bytes[..40]);
207 if claimed_header_crc != actual_header_crc {
208 return Err(EnvelopeError::HeaderCrcMismatch);
209 }
210
211 let meta = EnvelopeMeta {
212 tenant_id: u32::from_le_bytes(read4(&header_bytes[8..12])),
213 source_vshard_count: u16::from_le_bytes(read2(&header_bytes[12..14])),
214 hash_seed: u64::from_le_bytes(read8(&header_bytes[16..24])),
215 snapshot_watermark: u64::from_le_bytes(read8(&header_bytes[24..32])),
216 };
217 let section_count = u16::from_le_bytes(read2(&header_bytes[32..34]));
218
219 let trailer_start = bytes.len() - TRAILER_LEN;
221 let claimed_trailer_crc = u32::from_le_bytes(read4(&bytes[trailer_start..]));
222 let actual_trailer_crc = crc32c::crc32c(&bytes[..trailer_start]);
223 if claimed_trailer_crc != actual_trailer_crc {
224 return Err(EnvelopeError::TrailerCrcMismatch);
225 }
226
227 let mut cursor = HEADER_LEN;
229 let mut sections = Vec::with_capacity(section_count as usize);
230 for _ in 0..section_count {
231 if cursor + SECTION_OVERHEAD > trailer_start {
232 return Err(EnvelopeError::Truncated);
233 }
234 let origin_node_id = u64::from_le_bytes(read8(&bytes[cursor..cursor + 8]));
235 let body_len = u32::from_le_bytes(read4(&bytes[cursor + 8..cursor + 12])) as usize;
236 let body_start = cursor + 12;
237 let body_end = body_start + body_len;
238 let crc_end = body_end + 4;
239 if crc_end > trailer_start {
240 return Err(EnvelopeError::Truncated);
241 }
242 let body = bytes[body_start..body_end].to_vec();
243 let claimed_body_crc = u32::from_le_bytes(read4(&bytes[body_end..crc_end]));
244 if crc32c::crc32c(&body) != claimed_body_crc {
245 return Err(EnvelopeError::BodyCrcMismatch);
246 }
247 sections.push(Section {
248 origin_node_id,
249 body,
250 });
251 cursor = crc_end;
252 }
253 if cursor != trailer_start {
254 return Err(EnvelopeError::Truncated);
255 }
256
257 Ok(Envelope { meta, sections })
258}
259
260fn read2(s: &[u8]) -> [u8; 2] {
261 [s[0], s[1]]
262}
263fn read4(s: &[u8]) -> [u8; 4] {
264 [s[0], s[1], s[2], s[3]]
265}
266fn read8(s: &[u8]) -> [u8; 8] {
267 [s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7]]
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 fn meta() -> EnvelopeMeta {
275 EnvelopeMeta {
276 tenant_id: 42,
277 source_vshard_count: 1024,
278 hash_seed: 0,
279 snapshot_watermark: 12345,
280 }
281 }
282
283 #[test]
284 fn empty_envelope_roundtrips() {
285 let bytes = EnvelopeWriter::new(meta()).finalize();
286 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
287 assert_eq!(env.meta, meta());
288 assert!(env.sections.is_empty());
289 }
290
291 #[test]
292 fn multi_section_roundtrips() {
293 let mut w = EnvelopeWriter::new(meta());
294 w.push_section(1, b"one".to_vec()).unwrap();
295 w.push_section(2, b"two-payload".to_vec()).unwrap();
296 w.push_section(3, vec![]).unwrap();
297 let bytes = w.finalize();
298
299 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
300 assert_eq!(env.sections.len(), 3);
301 assert_eq!(env.sections[0].origin_node_id, 1);
302 assert_eq!(env.sections[0].body, b"one");
303 assert_eq!(env.sections[1].origin_node_id, 2);
304 assert_eq!(env.sections[1].body, b"two-payload");
305 assert_eq!(env.sections[2].body, b"");
306 }
307
308 #[test]
309 fn rejects_short_input() {
310 assert_eq!(
311 parse(b"NDBB", DEFAULT_MAX_TOTAL_BYTES),
312 Err(EnvelopeError::Truncated)
313 );
314 }
315
316 #[test]
317 fn rejects_bad_magic() {
318 let mut bytes = EnvelopeWriter::new(meta()).finalize();
319 bytes[0] = b'X';
320 match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
322 EnvelopeError::BadMagic => {}
323 other => panic!("expected BadMagic, got {other:?}"),
324 }
325 }
326
327 #[test]
328 fn rejects_unsupported_version() {
329 let mut bytes = EnvelopeWriter::new(meta()).finalize();
330 bytes[4] = 99;
331 match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
333 EnvelopeError::UnsupportedVersion(99) => {}
334 other => panic!("expected UnsupportedVersion(99), got {other:?}"),
335 }
336 }
337
338 #[test]
339 fn rejects_header_crc_corruption() {
340 let mut bytes = EnvelopeWriter::new(meta()).finalize();
341 bytes[8] ^= 0xFF; assert_eq!(
343 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
344 Err(EnvelopeError::HeaderCrcMismatch)
345 );
346 }
347
348 #[test]
349 fn rejects_body_crc_corruption() {
350 let mut w = EnvelopeWriter::new(meta());
351 w.push_section(7, b"hello".to_vec()).unwrap();
352 let mut bytes = w.finalize();
353 let body_off = HEADER_LEN + 12;
355 bytes[body_off] ^= 0xFF;
356 let trailer_off = bytes.len() - TRAILER_LEN;
359 let new_trailer = crc32c::crc32c(&bytes[..trailer_off]);
360 bytes[trailer_off..].copy_from_slice(&new_trailer.to_le_bytes());
361 assert_eq!(
362 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
363 Err(EnvelopeError::BodyCrcMismatch)
364 );
365 }
366
367 #[test]
368 fn rejects_trailer_crc_corruption() {
369 let mut w = EnvelopeWriter::new(meta());
370 w.push_section(7, b"x".to_vec()).unwrap();
371 let mut bytes = w.finalize();
372 let last = bytes.len() - 1;
373 bytes[last] ^= 0xFF;
374 assert_eq!(
375 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
376 Err(EnvelopeError::TrailerCrcMismatch)
377 );
378 }
379
380 #[test]
381 fn rejects_oversized_total() {
382 let mut w = EnvelopeWriter::with_caps(meta(), 64, DEFAULT_MAX_SECTION_BYTES);
383 let err = w.push_section(1, vec![0u8; 1024]).unwrap_err();
384 assert!(matches!(err, EnvelopeError::OverSizeTotal { .. }));
385 }
386
387 #[test]
388 fn rejects_oversized_section_at_write() {
389 let mut w = EnvelopeWriter::with_caps(meta(), DEFAULT_MAX_TOTAL_BYTES, 8);
390 let err = w.push_section(1, vec![0u8; 9]).unwrap_err();
391 assert!(matches!(err, EnvelopeError::OverSizeSection { .. }));
392 }
393
394 #[test]
395 fn rejects_oversized_total_at_parse() {
396 let bytes = EnvelopeWriter::new(meta()).finalize();
397 assert!(matches!(
398 parse(&bytes, 4),
399 Err(EnvelopeError::OverSizeTotal { .. })
400 ));
401 }
402
403 #[test]
404 fn truncated_section_body() {
405 let mut w = EnvelopeWriter::new(meta());
406 w.push_section(1, b"hello world".to_vec()).unwrap();
407 let bytes = w.finalize();
408 let truncated = &bytes[..bytes.len() - 8];
411 assert!(parse(truncated, DEFAULT_MAX_TOTAL_BYTES).is_err());
412 }
413}