1use thiserror::Error;
36
37pub const MAGIC: &[u8; 4] = b"NDBB";
38pub const VERSION: u8 = 1;
39
40pub const HEADER_LEN: usize = 44;
42pub const SECTION_OVERHEAD: usize = 16;
44pub const TRAILER_LEN: usize = 4;
46
47pub const DEFAULT_MAX_TOTAL_BYTES: u64 = 16 * 1024 * 1024 * 1024;
49pub const DEFAULT_MAX_SECTION_BYTES: u64 = 16 * 1024 * 1024 * 1024;
51
52pub const SECTION_ORIGIN_CATALOG_ROWS: u64 = 0xFFFF_FFFF_FFFF_FFF0;
60pub const SECTION_ORIGIN_SOURCE_TOMBSTONES: u64 = 0xFFFF_FFFF_FFFF_FFF1;
61
62#[derive(Debug, Clone, PartialEq, Eq, zerompk::ToMessagePack, zerompk::FromMessagePack)]
68pub struct StoredCollectionBlob {
69 pub name: String,
70 pub bytes: Vec<u8>,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, zerompk::ToMessagePack, zerompk::FromMessagePack)]
79pub struct SourceTombstoneEntry {
80 pub collection: String,
81 pub purge_lsn: u64,
82}
83
84#[derive(Debug, Error, PartialEq, Eq)]
85pub enum EnvelopeError {
86 #[error("invalid backup format")]
87 BadMagic,
88 #[error("unsupported backup version: {0}")]
89 UnsupportedVersion(u8),
90 #[error("invalid backup format")]
91 HeaderCrcMismatch,
92 #[error("invalid backup format")]
93 BodyCrcMismatch,
94 #[error("invalid backup format")]
95 TrailerCrcMismatch,
96 #[error("backup truncated")]
97 Truncated,
98 #[error("backup tenant mismatch: expected {expected}, got {actual}")]
99 TenantMismatch { expected: u32, actual: u32 },
100 #[error("backup exceeds size cap of {cap} bytes")]
101 OverSizeTotal { cap: u64 },
102 #[error("backup section exceeds size cap of {cap} bytes")]
103 OverSizeSection { cap: u64 },
104 #[error("too many sections: {0}")]
105 TooManySections(u16),
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub struct EnvelopeMeta {
111 pub tenant_id: u32,
112 pub source_vshard_count: u16,
113 pub hash_seed: u64,
114 pub snapshot_watermark: u64,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct Section {
120 pub origin_node_id: u64,
121 pub body: Vec<u8>,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct Envelope {
127 pub meta: EnvelopeMeta,
128 pub sections: Vec<Section>,
129}
130
131pub struct EnvelopeWriter {
133 meta: EnvelopeMeta,
134 sections: Vec<Section>,
135 max_total: u64,
136 max_section: u64,
137 framed_size: u64,
138}
139
140impl EnvelopeWriter {
141 pub fn new(meta: EnvelopeMeta) -> Self {
142 Self::with_caps(meta, DEFAULT_MAX_TOTAL_BYTES, DEFAULT_MAX_SECTION_BYTES)
143 }
144
145 pub fn with_caps(meta: EnvelopeMeta, max_total: u64, max_section: u64) -> Self {
146 Self {
147 meta,
148 sections: Vec::new(),
149 max_total,
150 max_section,
151 framed_size: HEADER_LEN as u64 + TRAILER_LEN as u64,
152 }
153 }
154
155 pub fn push_section(
156 &mut self,
157 origin_node_id: u64,
158 body: Vec<u8>,
159 ) -> Result<(), EnvelopeError> {
160 if body.len() as u64 > self.max_section {
161 return Err(EnvelopeError::OverSizeSection {
162 cap: self.max_section,
163 });
164 }
165 let added = SECTION_OVERHEAD as u64 + body.len() as u64;
166 if self.framed_size + added > self.max_total {
167 return Err(EnvelopeError::OverSizeTotal {
168 cap: self.max_total,
169 });
170 }
171 if self.sections.len() >= u16::MAX as usize {
172 return Err(EnvelopeError::TooManySections(u16::MAX));
173 }
174 self.framed_size += added;
175 self.sections.push(Section {
176 origin_node_id,
177 body,
178 });
179 Ok(())
180 }
181
182 pub fn finalize(self) -> Vec<u8> {
183 let mut out = Vec::with_capacity(self.framed_size as usize);
184 write_header(&mut out, &self.meta, self.sections.len() as u16);
185 for section in &self.sections {
186 write_section(&mut out, section);
187 }
188 let trailer_crc = crc32c::crc32c(&out);
190 out.extend_from_slice(&trailer_crc.to_le_bytes());
191 out
192 }
193}
194
195fn write_header(out: &mut Vec<u8>, meta: &EnvelopeMeta, section_count: u16) {
196 let start = out.len();
197 out.extend_from_slice(MAGIC);
198 out.push(VERSION);
199 out.extend_from_slice(&[0u8; 3]);
200 out.extend_from_slice(&meta.tenant_id.to_le_bytes());
201 out.extend_from_slice(&meta.source_vshard_count.to_le_bytes());
202 out.extend_from_slice(&[0u8; 2]);
203 out.extend_from_slice(&meta.hash_seed.to_le_bytes());
204 out.extend_from_slice(&meta.snapshot_watermark.to_le_bytes());
205 out.extend_from_slice(§ion_count.to_le_bytes());
206 out.extend_from_slice(&[0u8; 6]);
207 let header_crc = crc32c::crc32c(&out[start..]);
208 out.extend_from_slice(&header_crc.to_le_bytes());
209}
210
211fn write_section(out: &mut Vec<u8>, section: &Section) {
212 out.extend_from_slice(§ion.origin_node_id.to_le_bytes());
213 out.extend_from_slice(&(section.body.len() as u32).to_le_bytes());
214 out.extend_from_slice(§ion.body);
215 let body_crc = crc32c::crc32c(§ion.body);
216 out.extend_from_slice(&body_crc.to_le_bytes());
217}
218
219pub fn parse(bytes: &[u8], max_total: u64) -> Result<Envelope, EnvelopeError> {
221 if bytes.len() as u64 > max_total {
222 return Err(EnvelopeError::OverSizeTotal { cap: max_total });
223 }
224 if bytes.len() < HEADER_LEN + TRAILER_LEN {
225 return Err(EnvelopeError::Truncated);
226 }
227
228 let header_bytes = &bytes[..HEADER_LEN];
230 if &header_bytes[0..4] != MAGIC {
231 return Err(EnvelopeError::BadMagic);
232 }
233 let version = header_bytes[4];
234 if version != VERSION {
235 return Err(EnvelopeError::UnsupportedVersion(version));
236 }
237 let claimed_header_crc = u32::from_le_bytes(read4(&header_bytes[40..44]));
238 let actual_header_crc = crc32c::crc32c(&header_bytes[..40]);
239 if claimed_header_crc != actual_header_crc {
240 return Err(EnvelopeError::HeaderCrcMismatch);
241 }
242
243 let meta = EnvelopeMeta {
244 tenant_id: u32::from_le_bytes(read4(&header_bytes[8..12])),
245 source_vshard_count: u16::from_le_bytes(read2(&header_bytes[12..14])),
246 hash_seed: u64::from_le_bytes(read8(&header_bytes[16..24])),
247 snapshot_watermark: u64::from_le_bytes(read8(&header_bytes[24..32])),
248 };
249 let section_count = u16::from_le_bytes(read2(&header_bytes[32..34]));
250
251 let trailer_start = bytes.len() - TRAILER_LEN;
253 let claimed_trailer_crc = u32::from_le_bytes(read4(&bytes[trailer_start..]));
254 let actual_trailer_crc = crc32c::crc32c(&bytes[..trailer_start]);
255 if claimed_trailer_crc != actual_trailer_crc {
256 return Err(EnvelopeError::TrailerCrcMismatch);
257 }
258
259 let mut cursor = HEADER_LEN;
261 let mut sections = Vec::with_capacity(section_count as usize);
262 for _ in 0..section_count {
263 if cursor + SECTION_OVERHEAD > trailer_start {
264 return Err(EnvelopeError::Truncated);
265 }
266 let origin_node_id = u64::from_le_bytes(read8(&bytes[cursor..cursor + 8]));
267 let body_len = u32::from_le_bytes(read4(&bytes[cursor + 8..cursor + 12])) as usize;
268 let body_start = cursor + 12;
269 let body_end = body_start + body_len;
270 let crc_end = body_end + 4;
271 if crc_end > trailer_start {
272 return Err(EnvelopeError::Truncated);
273 }
274 let body = bytes[body_start..body_end].to_vec();
275 let claimed_body_crc = u32::from_le_bytes(read4(&bytes[body_end..crc_end]));
276 if crc32c::crc32c(&body) != claimed_body_crc {
277 return Err(EnvelopeError::BodyCrcMismatch);
278 }
279 sections.push(Section {
280 origin_node_id,
281 body,
282 });
283 cursor = crc_end;
284 }
285 if cursor != trailer_start {
286 return Err(EnvelopeError::Truncated);
287 }
288
289 Ok(Envelope { meta, sections })
290}
291
292fn read2(s: &[u8]) -> [u8; 2] {
293 [s[0], s[1]]
294}
295fn read4(s: &[u8]) -> [u8; 4] {
296 [s[0], s[1], s[2], s[3]]
297}
298fn read8(s: &[u8]) -> [u8; 8] {
299 [s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7]]
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 fn meta() -> EnvelopeMeta {
307 EnvelopeMeta {
308 tenant_id: 42,
309 source_vshard_count: 1024,
310 hash_seed: 0,
311 snapshot_watermark: 12345,
312 }
313 }
314
315 #[test]
316 fn empty_envelope_roundtrips() {
317 let bytes = EnvelopeWriter::new(meta()).finalize();
318 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
319 assert_eq!(env.meta, meta());
320 assert!(env.sections.is_empty());
321 }
322
323 #[test]
324 fn multi_section_roundtrips() {
325 let mut w = EnvelopeWriter::new(meta());
326 w.push_section(1, b"one".to_vec()).unwrap();
327 w.push_section(2, b"two-payload".to_vec()).unwrap();
328 w.push_section(3, vec![]).unwrap();
329 let bytes = w.finalize();
330
331 let env = parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap();
332 assert_eq!(env.sections.len(), 3);
333 assert_eq!(env.sections[0].origin_node_id, 1);
334 assert_eq!(env.sections[0].body, b"one");
335 assert_eq!(env.sections[1].origin_node_id, 2);
336 assert_eq!(env.sections[1].body, b"two-payload");
337 assert_eq!(env.sections[2].body, b"");
338 }
339
340 #[test]
341 fn rejects_short_input() {
342 assert_eq!(
343 parse(b"NDBB", DEFAULT_MAX_TOTAL_BYTES),
344 Err(EnvelopeError::Truncated)
345 );
346 }
347
348 #[test]
349 fn rejects_bad_magic() {
350 let mut bytes = EnvelopeWriter::new(meta()).finalize();
351 bytes[0] = b'X';
352 match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
354 EnvelopeError::BadMagic => {}
355 other => panic!("expected BadMagic, got {other:?}"),
356 }
357 }
358
359 #[test]
360 fn rejects_unsupported_version() {
361 let mut bytes = EnvelopeWriter::new(meta()).finalize();
362 bytes[4] = 99;
363 match parse(&bytes, DEFAULT_MAX_TOTAL_BYTES).unwrap_err() {
365 EnvelopeError::UnsupportedVersion(99) => {}
366 other => panic!("expected UnsupportedVersion(99), got {other:?}"),
367 }
368 }
369
370 #[test]
371 fn rejects_header_crc_corruption() {
372 let mut bytes = EnvelopeWriter::new(meta()).finalize();
373 bytes[8] ^= 0xFF; assert_eq!(
375 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
376 Err(EnvelopeError::HeaderCrcMismatch)
377 );
378 }
379
380 #[test]
381 fn rejects_body_crc_corruption() {
382 let mut w = EnvelopeWriter::new(meta());
383 w.push_section(7, b"hello".to_vec()).unwrap();
384 let mut bytes = w.finalize();
385 let body_off = HEADER_LEN + 12;
387 bytes[body_off] ^= 0xFF;
388 let trailer_off = bytes.len() - TRAILER_LEN;
391 let new_trailer = crc32c::crc32c(&bytes[..trailer_off]);
392 bytes[trailer_off..].copy_from_slice(&new_trailer.to_le_bytes());
393 assert_eq!(
394 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
395 Err(EnvelopeError::BodyCrcMismatch)
396 );
397 }
398
399 #[test]
400 fn rejects_trailer_crc_corruption() {
401 let mut w = EnvelopeWriter::new(meta());
402 w.push_section(7, b"x".to_vec()).unwrap();
403 let mut bytes = w.finalize();
404 let last = bytes.len() - 1;
405 bytes[last] ^= 0xFF;
406 assert_eq!(
407 parse(&bytes, DEFAULT_MAX_TOTAL_BYTES),
408 Err(EnvelopeError::TrailerCrcMismatch)
409 );
410 }
411
412 #[test]
413 fn rejects_oversized_total() {
414 let mut w = EnvelopeWriter::with_caps(meta(), 64, DEFAULT_MAX_SECTION_BYTES);
415 let err = w.push_section(1, vec![0u8; 1024]).unwrap_err();
416 assert!(matches!(err, EnvelopeError::OverSizeTotal { .. }));
417 }
418
419 #[test]
420 fn rejects_oversized_section_at_write() {
421 let mut w = EnvelopeWriter::with_caps(meta(), DEFAULT_MAX_TOTAL_BYTES, 8);
422 let err = w.push_section(1, vec![0u8; 9]).unwrap_err();
423 assert!(matches!(err, EnvelopeError::OverSizeSection { .. }));
424 }
425
426 #[test]
427 fn rejects_oversized_total_at_parse() {
428 let bytes = EnvelopeWriter::new(meta()).finalize();
429 assert!(matches!(
430 parse(&bytes, 4),
431 Err(EnvelopeError::OverSizeTotal { .. })
432 ));
433 }
434
435 #[test]
436 fn truncated_section_body() {
437 let mut w = EnvelopeWriter::new(meta());
438 w.push_section(1, b"hello world".to_vec()).unwrap();
439 let bytes = w.finalize();
440 let truncated = &bytes[..bytes.len() - 8];
443 assert!(parse(truncated, DEFAULT_MAX_TOTAL_BYTES).is_err());
444 }
445}