1use std::fmt;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13
14use fsqlite_error::{FrankenError, Result};
15use fsqlite_vfs::host_fs;
16use tracing::{Level, debug, error, info, span, warn};
17
18const BEAD_ID: &str = "bd-1hi.18";
23
24pub const DB_FEC_MAGIC: [u8; 8] = *b"FSQLDFEC";
26
27pub const GROUP_META_MAGIC: [u8; 8] = *b"FSQLDGRP";
29
30pub const DB_FEC_VERSION: u32 = 1;
32
33pub const DEFAULT_GROUP_SIZE: u32 = 64;
35
36pub const DEFAULT_R_REPAIR: u32 = 4;
38
39pub const HEADER_PAGE_R_REPAIR: u32 = 4;
41
42pub const DB_GEN_DIGEST_DOMAIN: &str = "fsqlite:compat:dbgen:v1";
44
45pub const GROUP_OBJECT_ID_DOMAIN: &str = "fsqlite:compat:db-fec-group:v1";
47
48pub const DB_FEC_HEADER_SIZE: usize = 52;
52
53pub static GLOBAL_SNAPSHOT_FEC_METRICS: SnapshotFecMetrics = SnapshotFecMetrics::new();
59
60pub struct SnapshotFecMetrics {
62 pub encoded_pages_total: AtomicU64,
64 pub sidecar_bytes_total: AtomicU64,
66 pub encode_ops: AtomicU64,
68}
69
70impl SnapshotFecMetrics {
71 #[must_use]
73 pub const fn new() -> Self {
74 Self {
75 encoded_pages_total: AtomicU64::new(0),
76 sidecar_bytes_total: AtomicU64::new(0),
77 encode_ops: AtomicU64::new(0),
78 }
79 }
80
81 pub fn record_encode(&self, pages_encoded: u64, sidecar_bytes: u64) {
83 self.encode_ops.fetch_add(1, Ordering::Relaxed);
84 self.encoded_pages_total
85 .fetch_add(pages_encoded, Ordering::Relaxed);
86 self.sidecar_bytes_total
87 .fetch_add(sidecar_bytes, Ordering::Relaxed);
88 }
89
90 #[must_use]
92 pub fn snapshot(&self) -> SnapshotFecMetricsSnapshot {
93 SnapshotFecMetricsSnapshot {
94 encoded_pages_total: self.encoded_pages_total.load(Ordering::Relaxed),
95 sidecar_bytes_total: self.sidecar_bytes_total.load(Ordering::Relaxed),
96 encode_ops: self.encode_ops.load(Ordering::Relaxed),
97 }
98 }
99
100 pub fn reset(&self) {
102 self.encoded_pages_total.store(0, Ordering::Relaxed);
103 self.sidecar_bytes_total.store(0, Ordering::Relaxed);
104 self.encode_ops.store(0, Ordering::Relaxed);
105 }
106}
107
108impl Default for SnapshotFecMetrics {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct SnapshotFecMetricsSnapshot {
117 pub encoded_pages_total: u64,
118 pub sidecar_bytes_total: u64,
119 pub encode_ops: u64,
120}
121
122impl fmt::Display for SnapshotFecMetricsSnapshot {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 write!(
125 f,
126 "snapshot_fec_pages_encoded={} sidecar_bytes={} encode_ops={}",
127 self.encoded_pages_total, self.sidecar_bytes_total, self.encode_ops,
128 )
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub struct PageGroup {
139 pub start_pgno: u32,
141 pub group_size: u32,
143 pub repair: u32,
145}
146
147#[must_use]
152pub fn partition_page_groups(db_size_pages: u32) -> Vec<PageGroup> {
153 if db_size_pages == 0 {
154 return Vec::new();
155 }
156
157 let mut groups = Vec::new();
158
159 groups.push(PageGroup {
161 start_pgno: 1,
162 group_size: 1,
163 repair: HEADER_PAGE_R_REPAIR,
164 });
165
166 let mut pgno: u32 = 2;
167 while pgno <= db_size_pages {
168 let remaining = db_size_pages - pgno + 1;
169 let group_size = remaining.min(DEFAULT_GROUP_SIZE);
170 groups.push(PageGroup {
171 start_pgno: pgno,
172 group_size,
173 repair: DEFAULT_R_REPAIR,
174 });
175 if let Some(next) = pgno.checked_add(group_size) {
176 pgno = next;
177 } else {
178 break;
179 }
180 }
181
182 debug!(
183 bead_id = BEAD_ID,
184 db_size_pages,
185 group_count = groups.len(),
186 "partitioned pages into .db-fec groups"
187 );
188
189 groups
190}
191
192#[must_use]
201pub fn compute_db_gen_digest(
202 change_counter: u32,
203 page_count: u32,
204 freelist_count: u32,
205 schema_cookie: u32,
206) -> [u8; 16] {
207 let mut hasher = blake3::Hasher::new();
208 hasher.update(DB_GEN_DIGEST_DOMAIN.as_bytes());
209 hasher.update(&change_counter.to_be_bytes());
210 hasher.update(&page_count.to_be_bytes());
211 hasher.update(&freelist_count.to_be_bytes());
212 hasher.update(&schema_cookie.to_be_bytes());
213 let hash = hasher.finalize();
214 let mut digest = [0u8; 16];
215 digest.copy_from_slice(&hash.as_bytes()[..16]);
216 digest
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct DbFecHeader {
226 pub magic: [u8; 8],
227 pub version: u32,
228 pub page_size: u32,
229 pub default_group_size: u32,
230 pub default_r_repair: u32,
231 pub header_page_r_repair: u32,
232 pub db_gen_digest: [u8; 16],
233 pub checksum: u64,
234}
235
236impl DbFecHeader {
237 #[must_use]
239 pub fn new(
240 page_size: u32,
241 change_counter: u32,
242 page_count: u32,
243 freelist_count: u32,
244 schema_cookie: u32,
245 ) -> Self {
246 let digest =
247 compute_db_gen_digest(change_counter, page_count, freelist_count, schema_cookie);
248 let mut hdr = Self {
249 magic: DB_FEC_MAGIC,
250 version: DB_FEC_VERSION,
251 page_size,
252 default_group_size: DEFAULT_GROUP_SIZE,
253 default_r_repair: DEFAULT_R_REPAIR,
254 header_page_r_repair: HEADER_PAGE_R_REPAIR,
255 db_gen_digest: digest,
256 checksum: 0,
257 };
258 hdr.checksum = hdr.compute_checksum();
259 hdr
260 }
261
262 #[must_use]
264 pub fn to_bytes(&self) -> [u8; DB_FEC_HEADER_SIZE] {
265 let mut buf = [0u8; DB_FEC_HEADER_SIZE];
266 buf[0..8].copy_from_slice(&self.magic);
267 buf[8..12].copy_from_slice(&self.version.to_le_bytes());
268 buf[12..16].copy_from_slice(&self.page_size.to_le_bytes());
269 buf[16..20].copy_from_slice(&self.default_group_size.to_le_bytes());
270 buf[20..24].copy_from_slice(&self.default_r_repair.to_le_bytes());
271 buf[24..28].copy_from_slice(&self.header_page_r_repair.to_le_bytes());
272 buf[28..44].copy_from_slice(&self.db_gen_digest);
273 buf[44..52].copy_from_slice(&self.checksum.to_le_bytes());
274 buf
275 }
276
277 pub fn from_bytes(buf: &[u8; DB_FEC_HEADER_SIZE]) -> Result<Self> {
279 let magic: [u8; 8] = buf[0..8].try_into().expect("slice len");
280 if magic != DB_FEC_MAGIC {
281 return Err(FrankenError::DatabaseCorrupt {
282 detail: format!("bad .db-fec magic: {magic:?}"),
283 });
284 }
285 let version = u32::from_le_bytes(buf[8..12].try_into().expect("slice len"));
286 if version != DB_FEC_VERSION {
287 return Err(FrankenError::DatabaseCorrupt {
288 detail: format!("unsupported .db-fec version: {version}"),
289 });
290 }
291 let page_size = u32::from_le_bytes(buf[12..16].try_into().expect("slice len"));
292 let default_group_size = u32::from_le_bytes(buf[16..20].try_into().expect("slice len"));
293 let default_r_repair = u32::from_le_bytes(buf[20..24].try_into().expect("slice len"));
294 let header_page_r_repair = u32::from_le_bytes(buf[24..28].try_into().expect("slice len"));
295 let mut db_gen_digest = [0u8; 16];
296 db_gen_digest.copy_from_slice(&buf[28..44]);
297 let checksum = u64::from_le_bytes(buf[44..52].try_into().expect("slice len"));
298
299 let hdr = Self {
300 magic,
301 version,
302 page_size,
303 default_group_size,
304 default_r_repair,
305 header_page_r_repair,
306 db_gen_digest,
307 checksum,
308 };
309
310 let expected = hdr.compute_checksum();
311 if hdr.checksum != expected {
312 return Err(FrankenError::DatabaseCorrupt {
313 detail: format!(
314 ".db-fec header checksum mismatch: stored={:#x}, computed={expected:#x}",
315 hdr.checksum
316 ),
317 });
318 }
319
320 info!(
321 bead_id = BEAD_ID,
322 page_size,
323 G_pages_per_group = default_group_size,
324 R_repair_pages = default_r_repair,
325 header_group_policy = header_page_r_repair,
326 format_version = version,
327 ".db-fec config on open"
328 );
329
330 Ok(hdr)
331 }
332
333 #[must_use]
335 fn compute_checksum(&self) -> u64 {
336 let buf = self.to_bytes();
337 xxhash_rust::xxh3::xxh3_64(&buf[..44])
339 }
340
341 #[must_use]
343 pub fn is_current(
344 &self,
345 change_counter: u32,
346 page_count: u32,
347 freelist_count: u32,
348 schema_cookie: u32,
349 ) -> bool {
350 let current =
351 compute_db_gen_digest(change_counter, page_count, freelist_count, schema_cookie);
352 self.db_gen_digest == current
353 }
354}
355
356#[derive(Debug, Clone, PartialEq, Eq)]
362pub struct DbFecGroupMeta {
363 pub magic: [u8; 8],
364 pub version: u32,
365 pub page_size: u32,
366 pub start_pgno: u32,
367 pub group_size: u32,
368 pub r_repair: u32,
369 pub object_id: [u8; 16],
371 pub source_page_xxh3_128: Vec<[u8; 16]>,
373 pub db_gen_digest: [u8; 16],
375 pub checksum: u64,
376}
377
378impl DbFecGroupMeta {
379 #[must_use]
381 pub fn new(
382 page_size: u32,
383 start_pgno: u32,
384 group_size: u32,
385 r_repair: u32,
386 source_page_xxh3_128: Vec<[u8; 16]>,
387 db_gen_digest: [u8; 16],
388 ) -> Self {
389 assert!(
390 source_page_xxh3_128.len() == group_size as usize,
391 "source_page_xxh3_128.len() must equal group_size"
392 );
393 let mut meta = Self {
394 magic: GROUP_META_MAGIC,
395 version: DB_FEC_VERSION,
396 page_size,
397 start_pgno,
398 group_size,
399 r_repair,
400 object_id: [0u8; 16],
401 source_page_xxh3_128,
402 db_gen_digest,
403 checksum: 0,
404 };
405 meta.object_id = meta.compute_object_id();
406 meta.checksum = meta.compute_checksum();
407 meta
408 }
409
410 const FIXED_SIZE: usize = 68;
414
415 #[must_use]
417 pub fn serialized_size(&self) -> usize {
418 Self::FIXED_SIZE + self.source_page_xxh3_128.len() * 16
419 }
420
421 #[must_use]
423 pub fn serialized_size_for(group_size: u32) -> usize {
424 (group_size as usize)
425 .saturating_mul(16)
426 .saturating_add(Self::FIXED_SIZE)
427 }
428
429 #[must_use]
431 pub fn to_bytes(&self) -> Vec<u8> {
432 let total = self.serialized_size();
433 let mut buf = vec![0u8; total];
434 buf[0..8].copy_from_slice(&self.magic);
435 buf[8..12].copy_from_slice(&self.version.to_le_bytes());
436 buf[12..16].copy_from_slice(&self.page_size.to_le_bytes());
437 buf[16..20].copy_from_slice(&self.start_pgno.to_le_bytes());
438 buf[20..24].copy_from_slice(&self.group_size.to_le_bytes());
439 buf[24..28].copy_from_slice(&self.r_repair.to_le_bytes());
440 buf[28..44].copy_from_slice(&self.object_id);
441 let hash_start = 44;
442 for (i, h) in self.source_page_xxh3_128.iter().enumerate() {
443 let off = hash_start + i * 16;
444 buf[off..off + 16].copy_from_slice(h);
445 }
446 let digest_off = hash_start + self.source_page_xxh3_128.len() * 16;
447 buf[digest_off..digest_off + 16].copy_from_slice(&self.db_gen_digest);
448 buf[digest_off + 16..digest_off + 24].copy_from_slice(&self.checksum.to_le_bytes());
449 buf
450 }
451
452 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
454 if buf.len() < Self::FIXED_SIZE {
455 return Err(FrankenError::DatabaseCorrupt {
456 detail: format!("group meta too short: {} < {}", buf.len(), Self::FIXED_SIZE),
457 });
458 }
459 let magic: [u8; 8] = buf[0..8].try_into().expect("slice len");
460 if magic != GROUP_META_MAGIC {
461 return Err(FrankenError::DatabaseCorrupt {
462 detail: format!("bad group meta magic: {magic:?}"),
463 });
464 }
465 let version = u32::from_le_bytes(buf[8..12].try_into().expect("slice len"));
466 if version != DB_FEC_VERSION {
467 return Err(FrankenError::DatabaseCorrupt {
468 detail: format!("unsupported group meta version: {version}"),
469 });
470 }
471 let page_size = u32::from_le_bytes(buf[12..16].try_into().expect("slice len"));
472 let start_pgno = u32::from_le_bytes(buf[16..20].try_into().expect("slice len"));
473 let group_size = u32::from_le_bytes(buf[20..24].try_into().expect("slice len"));
474 let r_repair = u32::from_le_bytes(buf[24..28].try_into().expect("slice len"));
475 let mut object_id = [0u8; 16];
476 object_id.copy_from_slice(&buf[28..44]);
477
478 let expected_total = Self::serialized_size_for(group_size);
479 if buf.len() < expected_total {
480 return Err(FrankenError::DatabaseCorrupt {
481 detail: format!(
482 "group meta truncated: {} < {expected_total} for group_size={group_size}",
483 buf.len()
484 ),
485 });
486 }
487
488 let hash_start = 44;
489 let mut source_page_xxh3_128 = Vec::with_capacity(group_size as usize);
490 for i in 0..group_size as usize {
491 let off = hash_start + i * 16;
492 let mut h = [0u8; 16];
493 h.copy_from_slice(&buf[off..off + 16]);
494 source_page_xxh3_128.push(h);
495 }
496
497 let digest_off = hash_start + group_size as usize * 16;
498 let mut db_gen_digest = [0u8; 16];
499 db_gen_digest.copy_from_slice(&buf[digest_off..digest_off + 16]);
500 let checksum = u64::from_le_bytes(
501 buf[digest_off + 16..digest_off + 24]
502 .try_into()
503 .expect("slice len"),
504 );
505
506 let meta = Self {
507 magic,
508 version,
509 page_size,
510 start_pgno,
511 group_size,
512 r_repair,
513 object_id,
514 source_page_xxh3_128,
515 db_gen_digest,
516 checksum,
517 };
518
519 let expected_cksum = meta.compute_checksum();
520 if meta.checksum != expected_cksum {
521 return Err(FrankenError::DatabaseCorrupt {
522 detail: format!(
523 "group meta checksum mismatch: stored={:#x}, computed={expected_cksum:#x}",
524 meta.checksum
525 ),
526 });
527 }
528
529 let expected_oid = meta.compute_object_id();
530 if meta.object_id != expected_oid {
531 return Err(FrankenError::DatabaseCorrupt {
532 detail: "group meta object_id mismatch".into(),
533 });
534 }
535
536 debug!(
537 bead_id = BEAD_ID,
538 group_idx = meta.start_pgno,
539 pgno_start = meta.start_pgno,
540 K = meta.group_size,
541 R = meta.r_repair,
542 "group meta validated"
543 );
544
545 Ok(meta)
546 }
547
548 #[must_use]
550 fn compute_object_id(&self) -> [u8; 16] {
551 let mut hasher = blake3::Hasher::new();
552 hasher.update(GROUP_OBJECT_ID_DOMAIN.as_bytes());
553 hasher.update(&self.magic);
555 hasher.update(&self.version.to_le_bytes());
556 hasher.update(&self.page_size.to_le_bytes());
557 hasher.update(&self.start_pgno.to_le_bytes());
558 hasher.update(&self.group_size.to_le_bytes());
559 hasher.update(&self.r_repair.to_le_bytes());
560 for h in &self.source_page_xxh3_128 {
561 hasher.update(h);
562 }
563 hasher.update(&self.db_gen_digest);
564 let hash = hasher.finalize();
565 let mut oid = [0u8; 16];
566 oid.copy_from_slice(&hash.as_bytes()[..16]);
567 oid
568 }
569
570 #[must_use]
572 fn compute_checksum(&self) -> u64 {
573 let bytes = self.to_bytes();
574 xxhash_rust::xxh3::xxh3_64(&bytes[..bytes.len() - 8])
576 }
577}
578
579#[must_use]
592pub fn segment_offset(g: u32, segment_1_len: usize, full_segment_len: usize) -> usize {
593 DB_FEC_HEADER_SIZE + segment_1_len + g as usize * full_segment_len
594}
595
596#[must_use]
600pub fn group_segment_size(group_size: u32, r_repair: u32, page_size: u32) -> usize {
601 DbFecGroupMeta::serialized_size_for(group_size) + r_repair as usize * page_size as usize
602}
603
604#[must_use]
607pub fn find_full_group_index(pgno: u32) -> Option<u32> {
608 if pgno < 2 {
609 return None;
610 }
611 Some((pgno - 2) / DEFAULT_GROUP_SIZE)
612}
613
614#[derive(Debug, Clone, PartialEq, Eq)]
620pub enum RepairResult {
621 Intact,
623 Repaired { pgno: u32, symbols_used: u32 },
625 Unrecoverable {
627 pgno: u32,
628 missing_pages: u32,
629 r_budget: u32,
630 },
631}
632
633#[must_use]
637pub fn verify_page_xxh3_128(page_data: &[u8], expected_xxh3_128: &[u8; 16]) -> bool {
638 let hash = xxhash_rust::xxh3::xxh3_128(page_data);
639 hash.to_le_bytes() == *expected_xxh3_128
640}
641
642#[must_use]
644pub fn page_xxh3_128(page_data: &[u8]) -> [u8; 16] {
645 let hash = xxhash_rust::xxh3::xxh3_128(page_data);
646 hash.to_le_bytes()
647}
648
649#[allow(clippy::too_many_lines)]
662pub fn attempt_page_repair(
663 target_pgno: u32,
664 group_meta: &DbFecGroupMeta,
665 all_page_data: &dyn Fn(u32) -> Vec<u8>,
666 repair_symbols: &[(u32, Vec<u8>)],
667) -> Result<(Vec<u8>, RepairResult)> {
668 let local_idx = target_pgno - group_meta.start_pgno;
669 let k = group_meta.group_size;
670
671 debug!(
672 bead_id = BEAD_ID,
673 target_pgno,
674 group_start = group_meta.start_pgno,
675 K = k,
676 R = group_meta.r_repair,
677 "attempting on-the-fly page repair"
678 );
679
680 let mut available: Vec<(u32, Vec<u8>)> = Vec::new();
682 let mut corrupt_count: u32 = 0;
683
684 for i in 0..k {
685 let pgno = group_meta.start_pgno + i;
686 if pgno == target_pgno {
687 corrupt_count += 1;
688 continue;
689 }
690 let data = all_page_data(pgno);
691 if verify_page_xxh3_128(&data, &group_meta.source_page_xxh3_128[i as usize]) {
692 available.push((i, data));
693 } else {
694 corrupt_count += 1;
695 }
696 }
697
698 for (esi, sym_data) in repair_symbols {
700 available.push((*esi, sym_data.clone()));
701 }
702
703 debug!(
704 bead_id = BEAD_ID,
705 target_pgno,
706 available_symbols = available.len(),
707 corrupt_count,
708 K = k,
709 "collected symbols for repair"
710 );
711
712 #[allow(clippy::cast_possible_truncation)]
713 let available_count = available.len() as u32;
714 if available_count < k {
715 error!(
716 bead_id = BEAD_ID,
717 target_pgno,
718 missing_or_corrupt_pages = corrupt_count,
719 R_budget = group_meta.r_repair,
720 action = "fail",
721 "unrecoverable group loss"
722 );
723 return Err(FrankenError::DatabaseCorrupt {
724 detail: format!(
725 "page {target_pgno}: insufficient symbols for repair ({} available, {k} needed, {corrupt_count} corrupt)",
726 available.len()
727 ),
728 });
729 }
730
731 let page_size = group_meta.page_size as usize;
733 let k_usize = k as usize;
734 let seed = derive_db_fec_repair_seed(group_meta);
735 let decoder = asupersync::raptorq::decoder::InactivationDecoder::new(k_usize, page_size, seed);
736 let params = decoder.params();
737 let base_rows = params.s + params.h;
738 let constraints = asupersync::raptorq::systematic::ConstraintMatrix::build(params, seed);
739
740 let mut received = decoder.constraint_symbols();
741
742 for (esi, data) in &available {
743 if (*esi as usize) < k_usize {
744 let (cols, coefs) = decoder.source_equation(*esi);
745 received.push(asupersync::raptorq::decoder::ReceivedSymbol {
746 esi: *esi,
747 is_source: true,
748 columns: cols,
749 coefficients: coefs,
750 data: data.clone(),
751 });
752 } else {
753 let (cols, coefs) = decoder.repair_equation(*esi);
754 received.push(asupersync::raptorq::decoder::ReceivedSymbol::repair(
755 *esi,
756 cols,
757 coefs,
758 data.clone(),
759 ));
760 }
761 }
762
763 for source_index in k_usize..params.k_prime {
766 let row = base_rows + source_index;
767 let mut columns = Vec::new();
768 let mut coefficients = Vec::new();
769 for col in 0..constraints.cols {
770 let coeff = constraints.get(row, col);
771 if !coeff.is_zero() {
772 columns.push(col);
773 coefficients.push(coeff);
774 }
775 }
776 received.push(asupersync::raptorq::decoder::ReceivedSymbol {
777 esi: u32::try_from(source_index).expect("source index fits u32"),
778 is_source: true,
779 columns,
780 coefficients,
781 data: vec![0_u8; page_size],
782 });
783 }
784
785 let result = decoder
786 .decode(&received)
787 .map_err(|err| FrankenError::DatabaseCorrupt {
788 detail: format!("page {target_pgno}: RaptorQ decode failed: {err:?}"),
789 })?;
790
791 if result.source.len() != k_usize {
792 return Err(FrankenError::DatabaseCorrupt {
793 detail: format!(
794 "page {target_pgno}: RaptorQ decode returned {} source symbols, expected {k}",
795 result.source.len()
796 ),
797 });
798 }
799
800 let recovered = result.source[local_idx as usize].clone();
801
802 if verify_page_xxh3_128(
804 &recovered,
805 &group_meta.source_page_xxh3_128[local_idx as usize],
806 ) {
807 info!(
808 bead_id = BEAD_ID,
809 target_pgno,
810 group_start = group_meta.start_pgno,
811 pages_repaired = 1,
812 symbols_used = available.len(),
813 "successful on-the-fly page repair"
814 );
815 Ok((
816 recovered,
817 RepairResult::Repaired {
818 pgno: target_pgno,
819 symbols_used: available_count,
820 },
821 ))
822 } else {
823 warn!(
824 bead_id = BEAD_ID,
825 target_pgno,
826 missing_or_corrupt_pages = corrupt_count,
827 R_budget = group_meta.r_repair,
828 "near-capacity repair: recovered page xxh3 mismatch"
829 );
830 Err(FrankenError::DatabaseCorrupt {
831 detail: format!("page {target_pgno}: recovered page failed xxh3_128 validation"),
832 })
833 }
834}
835
836#[must_use]
842pub fn db_fec_path_for_db(db_path: &Path) -> PathBuf {
843 let mut p = db_path.as_os_str().to_owned();
844 p.push("-fec");
845 PathBuf::from(p)
846}
847
848const SQLITE_HEADER_MIN_BYTES: usize = 100;
850const PAGE_SIZE_OFFSET: usize = 16;
851const CHANGE_COUNTER_OFFSET: usize = 24;
852const PAGE_COUNT_OFFSET: usize = 28;
853const FREELIST_COUNT_OFFSET: usize = 36;
854const SCHEMA_COOKIE_OFFSET: usize = 40;
855
856#[derive(Debug, Clone, Copy)]
858pub struct DbHeaderFields {
859 pub page_size: u32,
860 pub change_counter: u32,
861 pub page_count: u32,
862 pub freelist_count: u32,
863 pub schema_cookie: u32,
864}
865
866pub fn read_db_header_fields(db_path: &Path) -> Result<DbHeaderFields> {
868 let data = host_fs::read(db_path)?;
869 parse_db_header_fields(&data)
870}
871
872pub fn parse_db_header_fields(data: &[u8]) -> Result<DbHeaderFields> {
874 if data.len() < SQLITE_HEADER_MIN_BYTES {
875 return Err(FrankenError::DatabaseCorrupt {
876 detail: format!(
877 "database too short for header: {} < {SQLITE_HEADER_MIN_BYTES}",
878 data.len()
879 ),
880 });
881 }
882
883 let page_size_raw = u16::from_be_bytes(
884 data[PAGE_SIZE_OFFSET..PAGE_SIZE_OFFSET + 2]
885 .try_into()
886 .expect("fixed-length slice"),
887 );
888 let page_size = if page_size_raw == 1 {
890 65536
891 } else {
892 u32::from(page_size_raw)
893 };
894
895 let change_counter = u32::from_be_bytes(
896 data[CHANGE_COUNTER_OFFSET..CHANGE_COUNTER_OFFSET + 4]
897 .try_into()
898 .expect("fixed-length slice"),
899 );
900 let page_count = u32::from_be_bytes(
901 data[PAGE_COUNT_OFFSET..PAGE_COUNT_OFFSET + 4]
902 .try_into()
903 .expect("fixed-length slice"),
904 );
905 let freelist_count = u32::from_be_bytes(
906 data[FREELIST_COUNT_OFFSET..FREELIST_COUNT_OFFSET + 4]
907 .try_into()
908 .expect("fixed-length slice"),
909 );
910 let schema_cookie = u32::from_be_bytes(
911 data[SCHEMA_COOKIE_OFFSET..SCHEMA_COOKIE_OFFSET + 4]
912 .try_into()
913 .expect("fixed-length slice"),
914 );
915
916 Ok(DbHeaderFields {
917 page_size,
918 change_counter,
919 page_count,
920 freelist_count,
921 schema_cookie,
922 })
923}
924
925fn derive_db_fec_repair_seed(meta: &DbFecGroupMeta) -> u64 {
930 let mut seed_material = Vec::with_capacity(16 + 4 * 4 + 16);
931 seed_material.extend_from_slice(&meta.object_id);
932 seed_material.extend_from_slice(&meta.page_size.to_le_bytes());
933 seed_material.extend_from_slice(&meta.start_pgno.to_le_bytes());
934 seed_material.extend_from_slice(&meta.group_size.to_le_bytes());
935 seed_material.extend_from_slice(&meta.r_repair.to_le_bytes());
936 seed_material.extend_from_slice(&meta.db_gen_digest);
937 xxhash_rust::xxh3::xxh3_64(&seed_material)
938}
939
940pub fn compute_raptorq_repair_symbols(
945 meta: &DbFecGroupMeta,
946 source_pages: &[&[u8]],
947 page_size: usize,
948) -> Result<Vec<Vec<u8>>> {
949 if source_pages.len() != meta.group_size as usize {
950 return Err(FrankenError::DatabaseCorrupt {
951 detail: format!(
952 "source_pages.len()={} != meta.group_size={}; encoder/decoder seed mismatch would corrupt data",
953 source_pages.len(),
954 meta.group_size,
955 ),
956 });
957 }
958 let seed = derive_db_fec_repair_seed(meta);
959 let source_vecs: Vec<Vec<u8>> = source_pages.iter().map(|s| s.to_vec()).collect();
960 let encoder =
961 asupersync::raptorq::systematic::SystematicEncoder::new(&source_vecs, page_size, seed)
962 .ok_or_else(|| FrankenError::DatabaseCorrupt {
963 detail: "RaptorQ constraint matrix singular during encoding".to_owned(),
964 })?;
965
966 let k = u32::try_from(source_pages.len()).map_err(|_| FrankenError::DatabaseCorrupt {
967 detail: "source page count does not fit in u32".to_owned(),
968 })?;
969
970 let mut symbols = Vec::with_capacity(meta.r_repair as usize);
971 for r_idx in 0..meta.r_repair {
972 let esi = k + r_idx;
973 symbols.push(encoder.repair_symbol(esi));
974 }
975 Ok(symbols)
976}
977
978fn read_page_from_bytes(db_data: &[u8], pgno: u32, page_size: usize) -> Vec<u8> {
980 let offset_u64 = (u64::from(pgno) - 1) * (page_size as u64);
981 let offset = usize::try_from(offset_u64).unwrap_or(usize::MAX);
982 if offset.saturating_add(page_size) <= db_data.len() {
983 db_data[offset..offset + page_size].to_vec()
984 } else {
985 let mut page = vec![0u8; page_size];
986 if offset < db_data.len() {
987 let available = db_data.len() - offset;
988 page[..available].copy_from_slice(&db_data[offset..offset + available]);
989 }
990 page
991 }
992}
993
994#[allow(clippy::too_many_lines)]
1001pub fn generate_db_fec_from_bytes(db_data: &[u8]) -> Result<Vec<u8>> {
1002 let fields = parse_db_header_fields(db_data)?;
1003 let ps = fields.page_size as usize;
1004
1005 let header = DbFecHeader::new(
1006 fields.page_size,
1007 fields.change_counter,
1008 fields.page_count,
1009 fields.freelist_count,
1010 fields.schema_cookie,
1011 );
1012 let digest = header.db_gen_digest;
1013 let groups = partition_page_groups(fields.page_count);
1014
1015 let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, fields.page_size);
1017 let full_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, fields.page_size);
1018
1019 let num_general_groups = groups.len().saturating_sub(1);
1021 let total_size = if groups.is_empty() {
1022 DB_FEC_HEADER_SIZE
1023 } else {
1024 DB_FEC_HEADER_SIZE + seg1_len + num_general_groups * full_seg_len
1025 };
1026 let mut sidecar = vec![0u8; total_size];
1027
1028 sidecar[..DB_FEC_HEADER_SIZE].copy_from_slice(&header.to_bytes());
1030
1031 let mut cursor = DB_FEC_HEADER_SIZE;
1032
1033 for (gi, group) in groups.iter().enumerate() {
1034 let source_refs: Vec<Vec<u8>> = (0..group.group_size)
1036 .map(|i| read_page_from_bytes(db_data, group.start_pgno + i, ps))
1037 .collect();
1038 let source_slices: Vec<&[u8]> = source_refs.iter().map(Vec::as_slice).collect();
1039
1040 let hashes: Vec<[u8; 16]> = source_slices.iter().map(|p| page_xxh3_128(p)).collect();
1042
1043 let meta = DbFecGroupMeta::new(
1045 fields.page_size,
1046 group.start_pgno,
1047 group.group_size,
1048 group.repair,
1049 hashes,
1050 digest,
1051 );
1052
1053 let repair_symbols = compute_raptorq_repair_symbols(&meta, &source_slices, ps)?;
1055
1056 let meta_bytes = meta.to_bytes();
1058 sidecar[cursor..cursor + meta_bytes.len()].copy_from_slice(&meta_bytes);
1059 cursor += meta_bytes.len();
1060
1061 for sym in &repair_symbols {
1063 sidecar[cursor..cursor + ps].copy_from_slice(sym);
1064 cursor += ps;
1065 }
1066
1067 if gi > 0 {
1069 let actual_seg_size = meta_bytes.len() + group.repair as usize * ps;
1070 let padding = full_seg_len - actual_seg_size;
1071 cursor += padding; }
1073 }
1074
1075 let sidecar_len = sidecar.len() as u64;
1076 let page_count_u64 = u64::from(fields.page_count);
1077
1078 let _span = span!(
1080 Level::INFO,
1081 "snapshot_raptorq",
1082 pages_encoded = page_count_u64,
1083 total_bytes = sidecar_len,
1084 groups = groups.len(),
1085 )
1086 .entered();
1087
1088 GLOBAL_SNAPSHOT_FEC_METRICS.record_encode(page_count_u64, sidecar_len);
1089
1090 info!(
1091 bead_id = "bd-2r4z",
1092 page_count = fields.page_count,
1093 page_size = fields.page_size,
1094 groups = groups.len(),
1095 sidecar_bytes = sidecar.len(),
1096 "generated .db-fec sidecar"
1097 );
1098
1099 Ok(sidecar)
1100}
1101
1102pub fn generate_db_fec_sidecar(db_path: &Path) -> Result<Vec<u8>> {
1104 let db_data = host_fs::read(db_path)?;
1105 generate_db_fec_from_bytes(&db_data)
1106}
1107
1108pub fn write_db_fec_sidecar(db_path: &Path) -> Result<PathBuf> {
1110 let sidecar_data = generate_db_fec_sidecar(db_path)?;
1111 let sidecar_path = db_fec_path_for_db(db_path);
1112 host_fs::write(&sidecar_path, &sidecar_data)?;
1113
1114 info!(
1115 bead_id = "bd-2r4z",
1116 db_path = %db_path.display(),
1117 sidecar_path = %sidecar_path.display(),
1118 sidecar_bytes = sidecar_data.len(),
1119 "wrote .db-fec sidecar"
1120 );
1121
1122 Ok(sidecar_path)
1123}
1124
1125pub fn read_db_fec_header(sidecar_path: &Path) -> Result<DbFecHeader> {
1127 let data = host_fs::read(sidecar_path)?;
1128 if data.len() < DB_FEC_HEADER_SIZE {
1129 return Err(FrankenError::DatabaseCorrupt {
1130 detail: format!(
1131 "sidecar too short for header: {} < {DB_FEC_HEADER_SIZE}",
1132 data.len()
1133 ),
1134 });
1135 }
1136 let buf: [u8; DB_FEC_HEADER_SIZE] = data[..DB_FEC_HEADER_SIZE]
1137 .try_into()
1138 .expect("fixed-length slice");
1139 DbFecHeader::from_bytes(&buf)
1140}
1141
1142#[allow(clippy::type_complexity)]
1147pub fn read_db_fec_group_for_page(
1148 sidecar_data: &[u8],
1149 header: &DbFecHeader,
1150 target_pgno: u32,
1151) -> Result<(DbFecGroupMeta, Vec<(u32, Vec<u8>)>)> {
1152 let ps = header.page_size as usize;
1153
1154 let (seg_offset, group_size_hint) = if target_pgno == 1 {
1156 (DB_FEC_HEADER_SIZE, 1_u32)
1157 } else {
1158 let gi =
1159 find_full_group_index(target_pgno).ok_or_else(|| FrankenError::DatabaseCorrupt {
1160 detail: format!("invalid target page number: {target_pgno}"),
1161 })?;
1162 let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, header.page_size);
1163 let full_seg_len =
1164 group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, header.page_size);
1165 let offset = segment_offset(gi, seg1_len, full_seg_len);
1166 (offset, DEFAULT_GROUP_SIZE)
1167 };
1168
1169 if seg_offset >= sidecar_data.len() {
1170 return Err(FrankenError::DatabaseCorrupt {
1171 detail: format!(
1172 "sidecar too short for segment at offset {seg_offset}: len={}",
1173 sidecar_data.len()
1174 ),
1175 });
1176 }
1177
1178 let meta_size = DbFecGroupMeta::serialized_size_for(group_size_hint);
1180 let meta_end = seg_offset + meta_size;
1181 if meta_end > sidecar_data.len() {
1182 return Err(FrankenError::DatabaseCorrupt {
1183 detail: format!(
1184 "sidecar truncated reading group meta at {seg_offset}: need {meta_size}, have {}",
1185 sidecar_data.len() - seg_offset
1186 ),
1187 });
1188 }
1189 let meta = DbFecGroupMeta::from_bytes(&sidecar_data[seg_offset..meta_end])?;
1190
1191 let actual_r = meta.r_repair;
1192 let actual_meta_size = meta.serialized_size();
1193 let mut sym_cursor = seg_offset + actual_meta_size;
1194
1195 let needed_repair_bytes = (actual_r as usize).saturating_mul(ps);
1197 if sym_cursor.saturating_add(needed_repair_bytes) > sidecar_data.len() {
1198 return Err(FrankenError::DatabaseCorrupt {
1199 detail: format!("sidecar too short for {} repair symbols", actual_r),
1200 });
1201 }
1202
1203 let mut symbols = Vec::with_capacity(actual_r as usize);
1205 for r_idx in 0..actual_r {
1206 if sym_cursor + ps > sidecar_data.len() {
1207 return Err(FrankenError::DatabaseCorrupt {
1208 detail: format!("sidecar truncated reading repair symbol {r_idx} at {sym_cursor}"),
1209 });
1210 }
1211 let esi = meta.group_size + r_idx;
1212 symbols.push((esi, sidecar_data[sym_cursor..sym_cursor + ps].to_vec()));
1213 sym_cursor += ps;
1214 }
1215
1216 debug!(
1217 bead_id = "bd-2r4z",
1218 target_pgno,
1219 group_start = meta.start_pgno,
1220 K = meta.group_size,
1221 R = actual_r,
1222 "read .db-fec group for repair"
1223 );
1224
1225 Ok((meta, symbols))
1226}
1227
1228#[cfg(test)]
1233mod tests {
1234 use super::*;
1235
1236 #[test]
1239 fn test_db_fec_header_roundtrip() {
1240 let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1241 let bytes = hdr.to_bytes();
1242 assert_eq!(bytes.len(), DB_FEC_HEADER_SIZE);
1243 let decoded = DbFecHeader::from_bytes(&bytes).expect("decode");
1244 assert_eq!(hdr, decoded);
1245 }
1246
1247 #[test]
1248 fn test_db_gen_digest_computation() {
1249 let d1 = compute_db_gen_digest(42, 100, 5, 99);
1251 let d2 = compute_db_gen_digest(42, 100, 5, 99);
1252 assert_eq!(d1, d2, "deterministic");
1253
1254 let d3 = compute_db_gen_digest(43, 100, 5, 99);
1256 assert_ne!(d1, d3);
1257 let d4 = compute_db_gen_digest(42, 101, 5, 99);
1258 assert_ne!(d1, d4);
1259 let d5 = compute_db_gen_digest(42, 100, 6, 99);
1260 assert_ne!(d1, d5);
1261 let d6 = compute_db_gen_digest(42, 100, 5, 100);
1262 assert_ne!(d1, d6);
1263 }
1264
1265 #[test]
1266 fn test_stale_sidecar_detection() {
1267 let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1268 assert!(hdr.is_current(42, 100, 5, 99));
1269 assert!(!hdr.is_current(43, 100, 5, 99));
1271 assert!(!hdr.is_current(42, 101, 5, 99));
1272 }
1273
1274 #[test]
1275 fn test_db_fec_header_bad_checksum() {
1276 let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1277 let mut bytes = hdr.to_bytes();
1278 bytes[44] ^= 0xFF;
1280 let result = DbFecHeader::from_bytes(&bytes);
1281 assert!(result.is_err());
1282 }
1283
1284 #[test]
1285 fn test_db_fec_header_bad_magic() {
1286 let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1287 let mut bytes = hdr.to_bytes();
1288 bytes[0] = b'X';
1289 let result = DbFecHeader::from_bytes(&bytes);
1290 assert!(result.is_err());
1291 }
1292
1293 #[test]
1296 fn test_page_group_partitioning_single_page() {
1297 let groups = partition_page_groups(1);
1298 assert_eq!(groups.len(), 1);
1299 assert_eq!(
1300 groups[0],
1301 PageGroup {
1302 start_pgno: 1,
1303 group_size: 1,
1304 repair: HEADER_PAGE_R_REPAIR
1305 }
1306 );
1307 }
1308
1309 #[test]
1310 fn test_page_group_partitioning_64_pages() {
1311 let groups = partition_page_groups(64);
1312 assert_eq!(groups.len(), 2);
1313 assert_eq!(groups[0].start_pgno, 1);
1315 assert_eq!(groups[0].group_size, 1);
1316 assert_eq!(groups[0].repair, HEADER_PAGE_R_REPAIR);
1317 assert_eq!(groups[1].start_pgno, 2);
1319 assert_eq!(groups[1].group_size, 63);
1320 assert_eq!(groups[1].repair, DEFAULT_R_REPAIR);
1321 }
1322
1323 #[test]
1324 fn test_page_group_partitioning_65_pages() {
1325 let groups = partition_page_groups(65);
1326 assert_eq!(groups.len(), 2);
1327 assert_eq!(groups[1].start_pgno, 2);
1328 assert_eq!(groups[1].group_size, 64);
1329 assert_eq!(groups[1].repair, DEFAULT_R_REPAIR);
1330 }
1331
1332 #[test]
1333 fn test_page_group_partitioning_128_pages() {
1334 let groups = partition_page_groups(128);
1335 assert_eq!(groups.len(), 3);
1336 assert_eq!(groups[0].start_pgno, 1);
1337 assert_eq!(groups[0].group_size, 1);
1338 assert_eq!(groups[1].start_pgno, 2);
1339 assert_eq!(groups[1].group_size, 64);
1340 assert_eq!(groups[2].start_pgno, 66);
1341 assert_eq!(groups[2].group_size, 63);
1342 }
1343
1344 #[test]
1345 fn test_page_group_partitioning_1000_pages() {
1346 let groups = partition_page_groups(1000);
1347 assert_eq!(groups.len(), 17);
1349 assert_eq!(groups[0].group_size, 1);
1350 let total_pages: u32 = groups.iter().map(|g| g.group_size).sum();
1352 assert_eq!(total_pages, 1000);
1353 }
1354
1355 #[test]
1356 fn test_page_group_partitioning_zero() {
1357 let groups = partition_page_groups(0);
1358 assert!(groups.is_empty());
1359 }
1360
1361 #[test]
1362 fn test_header_page_400pct_redundancy() {
1363 let groups = partition_page_groups(100);
1364 assert_eq!(groups[0].group_size, 1);
1366 assert_eq!(groups[0].repair, 4);
1367 }
1368
1369 #[test]
1372 fn test_segment_offset_o1() {
1373 let page_size: u32 = 4096;
1374 let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, page_size);
1375 let general_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, page_size);
1376
1377 for g in 0..10_u32 {
1379 let off = segment_offset(g, seg1_len, general_seg_len);
1380 let expected = DB_FEC_HEADER_SIZE + seg1_len + g as usize * general_seg_len;
1381 assert_eq!(off, expected, "segment offset mismatch for g={g}");
1382 }
1383 }
1384
1385 #[test]
1388 fn test_group_meta_roundtrip() {
1389 let hashes: Vec<[u8; 16]> = (0..4)
1390 .map(|i| {
1391 let mut h = [0u8; 16];
1392 h[0] = i;
1393 h
1394 })
1395 .collect();
1396 let digest = compute_db_gen_digest(1, 100, 0, 42);
1397 let meta = DbFecGroupMeta::new(4096, 2, 4, 4, hashes, digest);
1398 let bytes = meta.to_bytes();
1399 let decoded = DbFecGroupMeta::from_bytes(&bytes).expect("decode");
1400 assert_eq!(meta, decoded);
1401 }
1402
1403 #[test]
1404 fn test_group_meta_object_id() {
1405 let hashes: Vec<[u8; 16]> = (0..2)
1406 .map(|i| {
1407 let mut h = [0u8; 16];
1408 h[0] = i;
1409 h
1410 })
1411 .collect();
1412 let digest = compute_db_gen_digest(1, 100, 0, 42);
1413 let meta = DbFecGroupMeta::new(4096, 2, 2, 4, hashes, digest);
1414
1415 let oid = meta.object_id;
1417 assert_ne!(oid, [0u8; 16], "object_id should be non-zero");
1418
1419 let mut hashes2: Vec<[u8; 16]> = (0..2)
1421 .map(|i| {
1422 let mut h = [0u8; 16];
1423 h[0] = i;
1424 h
1425 })
1426 .collect();
1427 hashes2[0][1] = 0xFF;
1428 let meta2 = DbFecGroupMeta::new(4096, 2, 2, 4, hashes2, digest);
1429 assert_ne!(meta.object_id, meta2.object_id);
1430 }
1431
1432 #[test]
1433 fn test_group_meta_stale_guard() {
1434 let hashes = vec![[0u8; 16]; 1];
1435 let digest = compute_db_gen_digest(1, 100, 0, 42);
1436 let meta = DbFecGroupMeta::new(4096, 1, 1, 4, hashes, digest);
1437
1438 let stale_digest = compute_db_gen_digest(2, 100, 0, 42);
1439 assert_ne!(meta.db_gen_digest, stale_digest);
1441 }
1442
1443 #[test]
1444 fn test_group_meta_bad_checksum() {
1445 let hashes = vec![[1u8; 16]; 2];
1446 let digest = compute_db_gen_digest(1, 100, 0, 42);
1447 let meta = DbFecGroupMeta::new(4096, 2, 2, 4, hashes, digest);
1448 let mut bytes = meta.to_bytes();
1449 let last = bytes.len() - 1;
1451 bytes[last] ^= 0xFF;
1452 let result = DbFecGroupMeta::from_bytes(&bytes);
1453 assert!(result.is_err());
1454 }
1455
1456 #[test]
1459 fn test_read_path_intact() {
1460 let page_size = 64_u32;
1461 let page_data: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i; page_size as usize]).collect();
1462 let hashes: Vec<[u8; 16]> = page_data.iter().map(|d| page_xxh3_128(d)).collect();
1463 let digest = compute_db_gen_digest(1, 5, 0, 1);
1464 let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1465
1466 for (i, d) in page_data.iter().enumerate() {
1468 assert!(verify_page_xxh3_128(d, &meta.source_page_xxh3_128[i]));
1469 }
1470 }
1471
1472 #[test]
1473 fn test_read_path_single_corruption() {
1474 let page_size = 64_u32;
1475 let page_data: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i + 1; page_size as usize]).collect();
1476 let hashes: Vec<[u8; 16]> = page_data.iter().map(|d| page_xxh3_128(d)).collect();
1477 let digest = compute_db_gen_digest(1, 5, 0, 1);
1478 let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1479
1480 let source_slices: Vec<&[u8]> = page_data.iter().map(Vec::as_slice).collect();
1482 let repair_data = compute_raptorq_repair_symbols(&meta, &source_slices, page_size as usize)
1483 .expect("encode");
1484
1485 let target_pgno = 4;
1487 let corrupted = vec![0xFF_u8; page_size as usize];
1488
1489 let read_fn = |pgno: u32| -> Vec<u8> {
1490 if pgno == target_pgno {
1491 corrupted.clone()
1492 } else {
1493 page_data[(pgno - 2) as usize].clone()
1494 }
1495 };
1496
1497 let repair_symbols: Vec<(u32, Vec<u8>)> = repair_data
1499 .into_iter()
1500 .enumerate()
1501 .map(|(i, d)| (4 + u32::try_from(i).expect("i fits u32"), d))
1502 .collect();
1503 let result = attempt_page_repair(target_pgno, &meta, &read_fn, &repair_symbols);
1504 let (recovered, status) = result.expect("repair should succeed");
1505 assert_eq!(
1506 recovered, page_data[2],
1507 "recovered page must match original"
1508 );
1509 assert!(matches!(status, RepairResult::Repaired { pgno: 4, .. }));
1510 }
1511
1512 #[test]
1513 fn test_read_path_exceed_corruption() {
1514 let page_size = 64_u32;
1515 let page_data: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i + 1; page_size as usize]).collect();
1516 let hashes: Vec<[u8; 16]> = page_data.iter().map(|d| page_xxh3_128(d)).collect();
1517 let digest = compute_db_gen_digest(1, 5, 0, 1);
1518 let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1519
1520 let corrupted = vec![0xFF_u8; page_size as usize];
1522 let read_fn = |_pgno: u32| -> Vec<u8> { corrupted.clone() };
1523 let repair_symbols: Vec<(u32, Vec<u8>)> = Vec::new();
1524
1525 let result = attempt_page_repair(3, &meta, &read_fn, &repair_symbols);
1526 assert!(result.is_err());
1527 }
1528
1529 #[test]
1530 fn test_e2e_bitrot_recovery() {
1531 let page_size = 128_u32;
1533 let num_pages = 4_u32;
1534 let pages: Vec<Vec<u8>> = (0..num_pages)
1535 .map(|i| {
1536 let mut data = vec![0u8; page_size as usize];
1537 for (j, b) in data.iter_mut().enumerate() {
1539 #[allow(clippy::cast_possible_truncation)]
1540 {
1541 *b = ((i as usize * 37 + j * 13) & 0xFF) as u8;
1542 }
1543 }
1544 data
1545 })
1546 .collect();
1547
1548 let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1549 let digest = compute_db_gen_digest(1, num_pages + 1, 0, 1);
1550 let meta = DbFecGroupMeta::new(page_size, 2, num_pages, 4, hashes, digest);
1551
1552 let source_slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1554 let repair_data = compute_raptorq_repair_symbols(&meta, &source_slices, page_size as usize)
1555 .expect("encode");
1556
1557 let target = 2_u32;
1559 let corrupted = vec![0xAA_u8; page_size as usize];
1560
1561 let read_fn = |pgno: u32| -> Vec<u8> {
1562 if pgno == target {
1563 corrupted.clone()
1564 } else {
1565 pages[(pgno - 2) as usize].clone()
1566 }
1567 };
1568
1569 let repair_symbols: Vec<(u32, Vec<u8>)> = repair_data
1570 .into_iter()
1571 .enumerate()
1572 .map(|(i, d)| (num_pages + u32::try_from(i).expect("i fits u32"), d))
1573 .collect();
1574 let (recovered, _) =
1575 attempt_page_repair(target, &meta, &read_fn, &repair_symbols).expect("repair");
1576 assert_eq!(recovered, pages[0]);
1577 }
1578
1579 #[test]
1580 fn test_e2e_stale_sidecar_rejected() {
1581 let hdr1 = DbFecHeader::new(4096, 1, 100, 0, 1);
1582 let hdr2 = DbFecHeader::new(4096, 2, 100, 0, 1); assert_ne!(hdr1.db_gen_digest, hdr2.db_gen_digest);
1584 assert!(!hdr1.is_current(2, 100, 0, 1));
1585 }
1586
1587 #[test]
1588 fn test_overflow_threshold_g64_r4() {
1589 let overhead = f64::from(DEFAULT_R_REPAIR) / f64::from(DEFAULT_GROUP_SIZE);
1591 assert!((overhead - 0.0625).abs() < f64::EPSILON);
1592 }
1593
1594 #[test]
1595 fn test_last_group_partial() {
1596 let groups = partition_page_groups(100);
1598 assert_eq!(groups.len(), 3);
1599 assert_eq!(groups[2].start_pgno, 66);
1600 assert_eq!(groups[2].group_size, 35);
1601
1602 let page_size = 4096_u32;
1605 let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, page_size);
1606 let general_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, page_size);
1607 let off = segment_offset(1, seg1_len, general_seg_len);
1608 assert_eq!(
1609 off,
1610 DB_FEC_HEADER_SIZE + seg1_len + general_seg_len,
1611 "second full-group offset"
1612 );
1613 }
1614
1615 #[test]
1616 fn test_find_full_group_index() {
1617 assert_eq!(find_full_group_index(1), None); assert_eq!(find_full_group_index(2), Some(0));
1619 assert_eq!(find_full_group_index(65), Some(0));
1620 assert_eq!(find_full_group_index(66), Some(1));
1621 assert_eq!(find_full_group_index(130), Some(2));
1622 }
1623
1624 #[test]
1627 fn test_bd_1hi_18_unit_compliance_gate() {
1628 assert_eq!(BEAD_ID, "bd-1hi.18");
1630 assert_eq!(DB_FEC_MAGIC, *b"FSQLDFEC");
1631 assert_eq!(GROUP_META_MAGIC, *b"FSQLDGRP");
1632 assert_eq!(DB_FEC_VERSION, 1);
1633 assert_eq!(DEFAULT_GROUP_SIZE, 64);
1634 assert_eq!(DEFAULT_R_REPAIR, 4);
1635 assert_eq!(HEADER_PAGE_R_REPAIR, 4);
1636 }
1637
1638 #[test]
1639 fn prop_bd_1hi_18_structure_compliance() {
1640 for n in [1_u32, 2, 63, 64, 65, 128, 129, 500, 1000] {
1642 let groups = partition_page_groups(n);
1643 let total: u32 = groups.iter().map(|g| g.group_size).sum();
1644 assert_eq!(total, n, "total pages mismatch for n={n}");
1645
1646 let mut covered = 0_u32;
1648 for g in &groups {
1649 assert!(g.start_pgno > covered, "overlap at pgno {}", g.start_pgno);
1650 covered = g.start_pgno + g.group_size - 1;
1651 }
1652 assert_eq!(covered, n);
1653 }
1654 }
1655
1656 #[test]
1657 fn test_e2e_bd_1hi_18_compliance() {
1658 let page_size = 4096_u32;
1660 let db_pages = 200_u32;
1661 let hdr = DbFecHeader::new(page_size, 10, db_pages, 3, 42);
1662
1663 let hdr2 = DbFecHeader::from_bytes(&hdr.to_bytes()).expect("roundtrip");
1665 assert_eq!(hdr, hdr2);
1666 assert!(hdr.is_current(10, db_pages, 3, 42));
1667
1668 let groups = partition_page_groups(db_pages);
1670 assert!(!groups.is_empty());
1671 let total: u32 = groups.iter().map(|g| g.group_size).sum();
1672 assert_eq!(total, db_pages);
1673
1674 assert_eq!(groups[0].group_size, 1);
1676 assert_eq!(groups[0].repair, HEADER_PAGE_R_REPAIR);
1677
1678 let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, page_size);
1680 let general_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, page_size);
1681 let mut prev_off = 0;
1682 #[allow(clippy::cast_possible_truncation)]
1683 let group_count = groups.len().saturating_sub(1) as u32;
1684 for g in 0..group_count {
1685 let off = segment_offset(g, seg1_len, general_seg_len);
1686 assert!(
1687 off > prev_off || g == 0,
1688 "offsets must be monotonically increasing"
1689 );
1690 prev_off = off;
1691 }
1692 }
1693
1694 #[test]
1697 fn prop_db_gen_digest_deterministic() {
1698 for i in 0..50_u32 {
1699 let d1 = compute_db_gen_digest(i, i * 10, i * 2, i * 3);
1700 let d2 = compute_db_gen_digest(i, i * 10, i * 2, i * 3);
1701 assert_eq!(d1, d2, "digest must be deterministic for i={i}");
1702 }
1703 }
1704
1705 #[test]
1708 fn prop_group_segment_sizes_consistent() {
1709 for ps in [512_u32, 1024, 4096, 8192, 16384, 32768, 65536] {
1710 let seg1 = group_segment_size(1, HEADER_PAGE_R_REPAIR, ps);
1711 let general_seg = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, ps);
1712
1713 assert!(seg1 < general_seg, "page-1 segment should be smaller");
1715
1716 let expected_seg1 = DbFecGroupMeta::serialized_size_for(1)
1718 + HEADER_PAGE_R_REPAIR as usize * ps as usize;
1719 assert_eq!(seg1, expected_seg1);
1720
1721 let expected_general_seg = DbFecGroupMeta::serialized_size_for(DEFAULT_GROUP_SIZE)
1722 + DEFAULT_R_REPAIR as usize * ps as usize;
1723 assert_eq!(general_seg, expected_general_seg);
1724 }
1725 }
1726
1727 fn make_synthetic_db(page_size: u32, page_count: u32) -> Vec<u8> {
1730 let ps = page_size as usize;
1731 let mut db = vec![0u8; ps * page_count as usize];
1732 db[..16].copy_from_slice(b"SQLite format 3\0");
1733 #[allow(clippy::cast_possible_truncation)]
1734 let ps_enc: u16 = if page_size == 65536 {
1735 1
1736 } else {
1737 page_size as u16
1738 };
1739 db[PAGE_SIZE_OFFSET..PAGE_SIZE_OFFSET + 2].copy_from_slice(&ps_enc.to_be_bytes());
1740 db[CHANGE_COUNTER_OFFSET..CHANGE_COUNTER_OFFSET + 4].copy_from_slice(&1_u32.to_be_bytes());
1741 db[PAGE_COUNT_OFFSET..PAGE_COUNT_OFFSET + 4].copy_from_slice(&page_count.to_be_bytes());
1742 db[FREELIST_COUNT_OFFSET..FREELIST_COUNT_OFFSET + 4].copy_from_slice(&0_u32.to_be_bytes());
1743 db[SCHEMA_COOKIE_OFFSET..SCHEMA_COOKIE_OFFSET + 4].copy_from_slice(&42_u32.to_be_bytes());
1744 for pgno in 1..=page_count {
1745 let offset = (pgno as usize - 1) * ps;
1746 let start = if pgno == 1 { 100 } else { 0 };
1747 for j in start..ps {
1748 #[allow(clippy::cast_possible_truncation)]
1749 {
1750 db[offset + j] = ((pgno as usize * 37 + j * 13) & 0xFF) as u8;
1751 }
1752 }
1753 }
1754 db
1755 }
1756
1757 #[test]
1758 fn test_parse_db_header_fields() {
1759 let db = make_synthetic_db(4096, 10);
1760 let fields = parse_db_header_fields(&db).expect("parse");
1761 assert_eq!(fields.page_size, 4096);
1762 assert_eq!(fields.change_counter, 1);
1763 assert_eq!(fields.page_count, 10);
1764 assert_eq!(fields.freelist_count, 0);
1765 assert_eq!(fields.schema_cookie, 42);
1766 }
1767
1768 #[test]
1769 fn test_parse_db_header_too_short() {
1770 assert!(parse_db_header_fields(&[0u8; 50]).is_err());
1771 }
1772
1773 #[test]
1774 fn test_db_fec_path_for_db() {
1775 let p = db_fec_path_for_db(Path::new("/tmp/test.db"));
1776 assert_eq!(p, PathBuf::from("/tmp/test.db-fec"));
1777 }
1778
1779 #[test]
1780 fn test_generate_db_fec_sidecar_header_valid() {
1781 let db = make_synthetic_db(512, 5);
1782 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1783 assert!(sidecar.len() >= DB_FEC_HEADER_SIZE);
1784 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1785 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1786 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1787 assert_eq!(hdr.page_size, 512);
1788 assert!(hdr.is_current(1, 5, 0, 42));
1789 }
1790
1791 #[test]
1792 fn test_generate_and_read_group_roundtrip() {
1793 let db = make_synthetic_db(512, 5);
1794 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1795 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1796 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1797 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1798 let (meta1, syms1) = read_db_fec_group_for_page(&sidecar, &hdr, 1).expect("page 1 group");
1799 assert_eq!(meta1.start_pgno, 1);
1800 assert_eq!(meta1.group_size, 1);
1801 assert_eq!(meta1.r_repair, HEADER_PAGE_R_REPAIR);
1802 assert_eq!(syms1.len(), HEADER_PAGE_R_REPAIR as usize);
1803 let (meta2, syms2) = read_db_fec_group_for_page(&sidecar, &hdr, 2).expect("page 2 group");
1804 assert_eq!(meta2.start_pgno, 2);
1805 assert_eq!(meta2.group_size, 4);
1806 assert_eq!(syms2.len(), DEFAULT_R_REPAIR as usize);
1807 for i in 0..meta2.group_size {
1808 let page = read_page_from_bytes(&db, meta2.start_pgno + i, 512);
1809 assert!(verify_page_xxh3_128(
1810 &page,
1811 &meta2.source_page_xxh3_128[i as usize]
1812 ));
1813 }
1814 }
1815
1816 #[test]
1817 fn test_sidecar_encode_corrupt_decode_cycle() {
1818 let ps = 512_usize;
1819 let mut db = make_synthetic_db(512, 5);
1820 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1821 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1822 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1823 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1824 let target_pgno = 3_u32;
1825 let original_page = read_page_from_bytes(&db, target_pgno, ps);
1826 let corrupt_offset = (target_pgno as usize - 1) * ps;
1827 for b in &mut db[corrupt_offset..corrupt_offset + ps] {
1828 *b = 0xDE;
1829 }
1830 let (meta, repair_symbols) =
1831 read_db_fec_group_for_page(&sidecar, &hdr, target_pgno).expect("read group");
1832 let corrupted_data = read_page_from_bytes(&db, target_pgno, ps);
1833 let idx = (target_pgno - meta.start_pgno) as usize;
1834 assert!(!verify_page_xxh3_128(
1835 &corrupted_data,
1836 &meta.source_page_xxh3_128[idx]
1837 ));
1838 let read_fn = |pgno: u32| -> Vec<u8> { read_page_from_bytes(&db, pgno, ps) };
1839 let (recovered, result) =
1840 attempt_page_repair(target_pgno, &meta, &read_fn, &repair_symbols)
1841 .expect("repair should succeed");
1842 assert_eq!(recovered, original_page);
1843 assert!(matches!(result, RepairResult::Repaired { pgno: 3, .. }));
1844 }
1845
1846 #[test]
1847 fn test_sidecar_header_page_repair() {
1848 let ps = 256_usize;
1849 let mut db = make_synthetic_db(256, 3);
1850 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1851 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1852 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1853 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1854 let original_page1 = read_page_from_bytes(&db, 1, ps);
1855 for b in &mut db[..ps] {
1856 *b = 0xCC;
1857 }
1858 let (meta, repair_symbols) =
1859 read_db_fec_group_for_page(&sidecar, &hdr, 1).expect("read group");
1860 assert_eq!(meta.group_size, 1);
1861 assert_eq!(meta.r_repair, 4);
1862 let read_fn = |_pgno: u32| -> Vec<u8> { read_page_from_bytes(&db, 1, ps) };
1863 let (recovered, _) =
1864 attempt_page_repair(1, &meta, &read_fn, &repair_symbols).expect("repair page 1");
1865 assert_eq!(recovered, original_page1);
1866 }
1867
1868 #[test]
1869 fn test_sidecar_stale_digest_detection() {
1870 let db = make_synthetic_db(512, 5);
1871 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1872 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1873 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1874 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1875 assert!(hdr.is_current(1, 5, 0, 42));
1876 assert!(!hdr.is_current(2, 5, 0, 42));
1877 assert!(!hdr.is_current(1, 6, 0, 42));
1878 }
1879
1880 #[test]
1881 fn test_sidecar_xxh3_validates_corruption() {
1882 let db = make_synthetic_db(512, 5);
1883 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1884 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1885 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1886 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1887 let (meta, _) = read_db_fec_group_for_page(&sidecar, &hdr, 3).expect("read");
1888 let page = read_page_from_bytes(&db, 3, 512);
1889 let idx = (3 - meta.start_pgno) as usize;
1890 assert!(verify_page_xxh3_128(&page, &meta.source_page_xxh3_128[idx]));
1891 let corrupt = vec![0xFF_u8; 512];
1892 assert!(!verify_page_xxh3_128(
1893 &corrupt,
1894 &meta.source_page_xxh3_128[idx]
1895 ));
1896 }
1897
1898 #[test]
1899 fn test_sidecar_large_db_128_pages() {
1900 let mut db = make_synthetic_db(512, 128);
1901 let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1902 let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1903 hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1904 let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1905 let (m1, _) = read_db_fec_group_for_page(&sidecar, &hdr, 1).expect("page 1");
1906 assert_eq!(m1.group_size, 1);
1907 let (m2, _) = read_db_fec_group_for_page(&sidecar, &hdr, 30).expect("page 30");
1908 assert_eq!(m2.start_pgno, 2);
1909 assert_eq!(m2.group_size, 64);
1910 let (m3, _) = read_db_fec_group_for_page(&sidecar, &hdr, 100).expect("page 100");
1911 assert_eq!(m3.start_pgno, 66);
1912 assert_eq!(m3.group_size, 63);
1913 let original = read_page_from_bytes(&db, 100, 512);
1914 let off = (100 - 1) * 512;
1915 for b in &mut db[off..off + 512] {
1916 *b = 0xBB;
1917 }
1918 let (meta, syms) = read_db_fec_group_for_page(&sidecar, &hdr, 100).expect("read");
1919 let read_fn = |pgno: u32| -> Vec<u8> { read_page_from_bytes(&db, pgno, 512) };
1920 let (recovered, _) =
1921 attempt_page_repair(100, &meta, &read_fn, &syms).expect("repair page 100");
1922 assert_eq!(recovered, original);
1923 }
1924
1925 #[test]
1926 fn test_sidecar_file_write_read_roundtrip() {
1927 let dir = tempfile::tempdir().expect("tempdir");
1928 let db_path = dir.path().join("test.db");
1929 let db = make_synthetic_db(512, 5);
1930 std::fs::write(&db_path, &db).expect("write db");
1931 let sidecar_path = write_db_fec_sidecar(&db_path).expect("write sidecar");
1932 assert_eq!(sidecar_path, db_fec_path_for_db(&db_path));
1933 assert!(sidecar_path.exists());
1934 let hdr = read_db_fec_header(&sidecar_path).expect("read header");
1935 assert_eq!(hdr.page_size, 512);
1936 assert!(hdr.is_current(1, 5, 0, 42));
1937 }
1938
1939 #[test]
1942 fn test_raptorq_encode_deterministic() {
1943 let page_size = 128_u32;
1944 let pages: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i + 1; page_size as usize]).collect();
1945 let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1946 let digest = compute_db_gen_digest(1, 5, 0, 1);
1947 let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1948 let slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1949 let r1 = compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("e1");
1950 let r2 = compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("e2");
1951 assert_eq!(r1, r2, "RaptorQ encoding must be deterministic");
1952 }
1953
1954 #[test]
1955 fn test_raptorq_encode_produces_correct_count() {
1956 let page_size = 64_u32;
1957 let pages: Vec<Vec<u8>> = (0..8_u8).map(|i| vec![i; page_size as usize]).collect();
1958 let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1959 let digest = compute_db_gen_digest(1, 9, 0, 1);
1960 let meta = DbFecGroupMeta::new(page_size, 2, 8, 4, hashes, digest);
1961 let slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1962 let syms =
1963 compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("encode");
1964 assert_eq!(syms.len(), 4, "should produce R=4 repair symbols");
1965 for sym in &syms {
1966 assert_eq!(sym.len(), page_size as usize, "symbol size = page_size");
1967 }
1968 }
1969
1970 #[test]
1971 fn test_raptorq_multi_corruption_recovery() {
1972 let page_size = 128_u32;
1975 let k = 8_u32;
1976 let r = 4_u32;
1977 let pages: Vec<Vec<u8>> = (0..k)
1978 .map(|i| {
1979 let mut data = vec![0u8; page_size as usize];
1980 for (j, b) in data.iter_mut().enumerate() {
1981 #[allow(clippy::cast_possible_truncation)]
1982 {
1983 *b = ((i as usize * 41 + j * 7) & 0xFF) as u8;
1984 }
1985 }
1986 data
1987 })
1988 .collect();
1989
1990 let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1991 let digest = compute_db_gen_digest(1, k + 1, 0, 1);
1992 let meta = DbFecGroupMeta::new(page_size, 2, k, r, hashes, digest);
1993
1994 let slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1995 let repair_data =
1996 compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("encode");
1997 let repair_symbols: Vec<(u32, Vec<u8>)> = repair_data
1998 .into_iter()
1999 .enumerate()
2000 .map(|(i, d)| (k + u32::try_from(i).expect("i fits u32"), d))
2001 .collect();
2002
2003 let corrupt_pgnos = [2_u32, 3_u32];
2005 let corrupted = vec![0xDD_u8; page_size as usize];
2006
2007 let read_fn = |pgno: u32| -> Vec<u8> {
2008 if corrupt_pgnos.contains(&pgno) {
2009 corrupted.clone()
2010 } else {
2011 pages[(pgno - 2) as usize].clone()
2012 }
2013 };
2014
2015 let (recovered_p2, status) =
2017 attempt_page_repair(2, &meta, &read_fn, &repair_symbols).expect("repair page 2");
2018 assert_eq!(recovered_p2, pages[0]);
2019 assert!(matches!(status, RepairResult::Repaired { pgno: 2, .. }));
2020
2021 let (recovered_p3, status) =
2023 attempt_page_repair(3, &meta, &read_fn, &repair_symbols).expect("repair page 3");
2024 assert_eq!(recovered_p3, pages[1]);
2025 assert!(matches!(status, RepairResult::Repaired { pgno: 3, .. }));
2026 }
2027
2028 #[test]
2029 fn test_raptorq_seed_differs_per_group() {
2030 let digest = compute_db_gen_digest(1, 200, 0, 42);
2031 let meta_a = DbFecGroupMeta::new(4096, 1, 1, 4, vec![[0u8; 16]], digest);
2032 let meta_b = DbFecGroupMeta::new(4096, 2, 64, 4, vec![[0u8; 16]; 64], digest);
2033 let seed_a = derive_db_fec_repair_seed(&meta_a);
2034 let seed_b = derive_db_fec_repair_seed(&meta_b);
2035 assert_ne!(
2036 seed_a, seed_b,
2037 "different groups must produce different seeds"
2038 );
2039 }
2040
2041 #[test]
2046 fn test_snapshot_fec_metrics_record_and_snapshot() {
2047 let m = SnapshotFecMetrics::new();
2048 m.record_encode(100, 4096);
2049 m.record_encode(64, 2048);
2050 let s = m.snapshot();
2051 assert_eq!(s.encoded_pages_total, 164);
2052 assert_eq!(s.sidecar_bytes_total, 6144);
2053 assert_eq!(s.encode_ops, 2);
2054 }
2055
2056 #[test]
2057 fn test_snapshot_fec_metrics_reset() {
2058 let m = SnapshotFecMetrics::new();
2059 m.record_encode(10, 500);
2060 m.reset();
2061 let s = m.snapshot();
2062 assert_eq!(s.encoded_pages_total, 0);
2063 assert_eq!(s.sidecar_bytes_total, 0);
2064 assert_eq!(s.encode_ops, 0);
2065 }
2066
2067 #[test]
2068 fn test_snapshot_fec_metrics_display() {
2069 let m = SnapshotFecMetrics::new();
2070 m.record_encode(42, 1024);
2071 let s = m.snapshot();
2072 let text = format!("{s}");
2073 assert!(text.contains("snapshot_fec_pages_encoded=42"));
2074 assert!(text.contains("sidecar_bytes=1024"));
2075 assert!(text.contains("encode_ops=1"));
2076 }
2077
2078 #[test]
2079 fn test_snapshot_fec_metrics_global_delta() {
2080 let before = GLOBAL_SNAPSHOT_FEC_METRICS.snapshot();
2082 GLOBAL_SNAPSHOT_FEC_METRICS.record_encode(7, 256);
2083 let after = GLOBAL_SNAPSHOT_FEC_METRICS.snapshot();
2084 assert_eq!(after.encoded_pages_total - before.encoded_pages_total, 7);
2085 assert_eq!(after.sidecar_bytes_total - before.sidecar_bytes_total, 256);
2086 assert_eq!(after.encode_ops - before.encode_ops, 1);
2087 }
2088}