Skip to main content

fsqlite_core/
db_fec.rs

1//! `.db-fec` sidecar — erasure-coded page storage for on-the-fly repair (§3.4.6, bd-1hi.18).
2//!
3//! Provides `DbFecHeader`, `DbFecGroupMeta`, page group partitioning (G=64, R=4),
4//! O(1) segment offset computation, stale-sidecar guard via `db_gen_digest`, and
5//! the read-path repair algorithm.
6//!
7//! Note: the sidecar generation and group-read helpers are intentionally public so
8//! the `fsqlite-e2e` recovery demos can validate end-to-end repair flows.
9
10use std::fmt;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13
14use fsqlite_error::{FrankenError, Result};
15use fsqlite_vfs::host_fs;
16use tracing::{Level, debug, error, info, span, warn};
17
18// ---------------------------------------------------------------------------
19// Constants
20// ---------------------------------------------------------------------------
21
22const BEAD_ID: &str = "bd-1hi.18";
23
24/// Magic bytes for `.db-fec` header.
25pub const DB_FEC_MAGIC: [u8; 8] = *b"FSQLDFEC";
26
27/// Magic bytes for group metadata.
28pub const GROUP_META_MAGIC: [u8; 8] = *b"FSQLDGRP";
29
30/// Current format version.
31pub const DB_FEC_VERSION: u32 = 1;
32
33/// Default pages per group (256 KiB blast radius at 4 KiB pages).
34pub const DEFAULT_GROUP_SIZE: u32 = 64;
35
36/// Default repair symbols per group (tolerates 4 corrupted pages per group).
37pub const DEFAULT_R_REPAIR: u32 = 4;
38
39/// Header page (page 1) gets special 400% redundancy: G=1, R=4.
40pub const HEADER_PAGE_R_REPAIR: u32 = 4;
41
42/// Domain separation string for `db_gen_digest`.
43pub const DB_GEN_DIGEST_DOMAIN: &str = "fsqlite:compat:dbgen:v1";
44
45/// Domain separation string for group `object_id`.
46pub const GROUP_OBJECT_ID_DOMAIN: &str = "fsqlite:compat:db-fec-group:v1";
47
48/// `DbFecHeader` serialized size: 8 (magic) + 4 (version) + 4 (page_size)
49/// + 4 (default_group_size) + 4 (default_r_repair) + 4 (header_page_r_repair)
50/// + 16 (db_gen_digest) + 8 (checksum) = 52 bytes.
51pub const DB_FEC_HEADER_SIZE: usize = 52;
52
53// ---------------------------------------------------------------------------
54// Snapshot FEC metrics
55// ---------------------------------------------------------------------------
56
57/// Global snapshot FEC metrics singleton.
58pub static GLOBAL_SNAPSHOT_FEC_METRICS: SnapshotFecMetrics = SnapshotFecMetrics::new();
59
60/// Atomic counters for snapshot page FEC encoding.
61pub struct SnapshotFecMetrics {
62    /// Total pages encoded into FEC repair symbols.
63    pub encoded_pages_total: AtomicU64,
64    /// Total bytes of sidecar data generated.
65    pub sidecar_bytes_total: AtomicU64,
66    /// Total encoding operations.
67    pub encode_ops: AtomicU64,
68}
69
70impl SnapshotFecMetrics {
71    /// Create a zeroed metrics instance.
72    #[must_use]
73    pub const fn new() -> Self {
74        Self {
75            encoded_pages_total: AtomicU64::new(0),
76            sidecar_bytes_total: AtomicU64::new(0),
77            encode_ops: AtomicU64::new(0),
78        }
79    }
80
81    /// Record a snapshot FEC encoding operation.
82    pub fn record_encode(&self, pages_encoded: u64, sidecar_bytes: u64) {
83        self.encode_ops.fetch_add(1, Ordering::Relaxed);
84        self.encoded_pages_total
85            .fetch_add(pages_encoded, Ordering::Relaxed);
86        self.sidecar_bytes_total
87            .fetch_add(sidecar_bytes, Ordering::Relaxed);
88    }
89
90    /// Take a snapshot.
91    #[must_use]
92    pub fn snapshot(&self) -> SnapshotFecMetricsSnapshot {
93        SnapshotFecMetricsSnapshot {
94            encoded_pages_total: self.encoded_pages_total.load(Ordering::Relaxed),
95            sidecar_bytes_total: self.sidecar_bytes_total.load(Ordering::Relaxed),
96            encode_ops: self.encode_ops.load(Ordering::Relaxed),
97        }
98    }
99
100    /// Reset all counters to zero.
101    pub fn reset(&self) {
102        self.encoded_pages_total.store(0, Ordering::Relaxed);
103        self.sidecar_bytes_total.store(0, Ordering::Relaxed);
104        self.encode_ops.store(0, Ordering::Relaxed);
105    }
106}
107
108impl Default for SnapshotFecMetrics {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114/// Point-in-time snapshot of snapshot FEC metrics.
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct SnapshotFecMetricsSnapshot {
117    pub encoded_pages_total: u64,
118    pub sidecar_bytes_total: u64,
119    pub encode_ops: u64,
120}
121
122impl fmt::Display for SnapshotFecMetricsSnapshot {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        write!(
125            f,
126            "snapshot_fec_pages_encoded={} sidecar_bytes={} encode_ops={}",
127            self.encoded_pages_total, self.sidecar_bytes_total, self.encode_ops,
128        )
129    }
130}
131
132// ---------------------------------------------------------------------------
133// PageGroup — partition of database pages into repair groups
134// ---------------------------------------------------------------------------
135
136/// A contiguous group of database pages sharing a single repair-symbol set.
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub struct PageGroup {
139    /// 1-based first page number.
140    pub start_pgno: u32,
141    /// Number of source pages in this group (K).
142    pub group_size: u32,
143    /// Number of repair symbols (R).
144    pub repair: u32,
145}
146
147/// Partition pages into groups per spec pseudocode (§3.4.6).
148///
149/// Page 1 gets its own group with `HEADER_PAGE_R_REPAIR` repair symbols.
150/// Remaining pages are grouped in chunks of `DEFAULT_GROUP_SIZE`.
151#[must_use]
152pub fn partition_page_groups(db_size_pages: u32) -> Vec<PageGroup> {
153    if db_size_pages == 0 {
154        return Vec::new();
155    }
156
157    let mut groups = Vec::new();
158
159    // Special group for database header page.
160    groups.push(PageGroup {
161        start_pgno: 1,
162        group_size: 1,
163        repair: HEADER_PAGE_R_REPAIR,
164    });
165
166    let mut pgno: u32 = 2;
167    while pgno <= db_size_pages {
168        let remaining = db_size_pages - pgno + 1;
169        let group_size = remaining.min(DEFAULT_GROUP_SIZE);
170        groups.push(PageGroup {
171            start_pgno: pgno,
172            group_size,
173            repair: DEFAULT_R_REPAIR,
174        });
175        if let Some(next) = pgno.checked_add(group_size) {
176            pgno = next;
177        } else {
178            break;
179        }
180    }
181
182    debug!(
183        bead_id = BEAD_ID,
184        db_size_pages,
185        group_count = groups.len(),
186        "partitioned pages into .db-fec groups"
187    );
188
189    groups
190}
191
192// ---------------------------------------------------------------------------
193// db_gen_digest — staleness guard
194// ---------------------------------------------------------------------------
195
196/// Compute `db_gen_digest` from `.db` header fields.
197///
198/// Uses offsets 24, 28, 36, 40 (big-endian u32):
199/// `Trunc128(BLAKE3(domain || change_counter || page_count || freelist_count || schema_cookie))`.
200#[must_use]
201pub fn compute_db_gen_digest(
202    change_counter: u32,
203    page_count: u32,
204    freelist_count: u32,
205    schema_cookie: u32,
206) -> [u8; 16] {
207    let mut hasher = blake3::Hasher::new();
208    hasher.update(DB_GEN_DIGEST_DOMAIN.as_bytes());
209    hasher.update(&change_counter.to_be_bytes());
210    hasher.update(&page_count.to_be_bytes());
211    hasher.update(&freelist_count.to_be_bytes());
212    hasher.update(&schema_cookie.to_be_bytes());
213    let hash = hasher.finalize();
214    let mut digest = [0u8; 16];
215    digest.copy_from_slice(&hash.as_bytes()[..16]);
216    digest
217}
218
219// ---------------------------------------------------------------------------
220// DbFecHeader
221// ---------------------------------------------------------------------------
222
223/// Header of the `.db-fec` sidecar file (byte offset 0).
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct DbFecHeader {
226    pub magic: [u8; 8],
227    pub version: u32,
228    pub page_size: u32,
229    pub default_group_size: u32,
230    pub default_r_repair: u32,
231    pub header_page_r_repair: u32,
232    pub db_gen_digest: [u8; 16],
233    pub checksum: u64,
234}
235
236impl DbFecHeader {
237    /// Create a new header for the given page size and db generation fields.
238    #[must_use]
239    pub fn new(
240        page_size: u32,
241        change_counter: u32,
242        page_count: u32,
243        freelist_count: u32,
244        schema_cookie: u32,
245    ) -> Self {
246        let digest =
247            compute_db_gen_digest(change_counter, page_count, freelist_count, schema_cookie);
248        let mut hdr = Self {
249            magic: DB_FEC_MAGIC,
250            version: DB_FEC_VERSION,
251            page_size,
252            default_group_size: DEFAULT_GROUP_SIZE,
253            default_r_repair: DEFAULT_R_REPAIR,
254            header_page_r_repair: HEADER_PAGE_R_REPAIR,
255            db_gen_digest: digest,
256            checksum: 0,
257        };
258        hdr.checksum = hdr.compute_checksum();
259        hdr
260    }
261
262    /// Serialize to bytes.
263    #[must_use]
264    pub fn to_bytes(&self) -> [u8; DB_FEC_HEADER_SIZE] {
265        let mut buf = [0u8; DB_FEC_HEADER_SIZE];
266        buf[0..8].copy_from_slice(&self.magic);
267        buf[8..12].copy_from_slice(&self.version.to_le_bytes());
268        buf[12..16].copy_from_slice(&self.page_size.to_le_bytes());
269        buf[16..20].copy_from_slice(&self.default_group_size.to_le_bytes());
270        buf[20..24].copy_from_slice(&self.default_r_repair.to_le_bytes());
271        buf[24..28].copy_from_slice(&self.header_page_r_repair.to_le_bytes());
272        buf[28..44].copy_from_slice(&self.db_gen_digest);
273        buf[44..52].copy_from_slice(&self.checksum.to_le_bytes());
274        buf
275    }
276
277    /// Deserialize from bytes.
278    pub fn from_bytes(buf: &[u8; DB_FEC_HEADER_SIZE]) -> Result<Self> {
279        let magic: [u8; 8] = buf[0..8].try_into().expect("slice len");
280        if magic != DB_FEC_MAGIC {
281            return Err(FrankenError::DatabaseCorrupt {
282                detail: format!("bad .db-fec magic: {magic:?}"),
283            });
284        }
285        let version = u32::from_le_bytes(buf[8..12].try_into().expect("slice len"));
286        if version != DB_FEC_VERSION {
287            return Err(FrankenError::DatabaseCorrupt {
288                detail: format!("unsupported .db-fec version: {version}"),
289            });
290        }
291        let page_size = u32::from_le_bytes(buf[12..16].try_into().expect("slice len"));
292        let default_group_size = u32::from_le_bytes(buf[16..20].try_into().expect("slice len"));
293        let default_r_repair = u32::from_le_bytes(buf[20..24].try_into().expect("slice len"));
294        let header_page_r_repair = u32::from_le_bytes(buf[24..28].try_into().expect("slice len"));
295        let mut db_gen_digest = [0u8; 16];
296        db_gen_digest.copy_from_slice(&buf[28..44]);
297        let checksum = u64::from_le_bytes(buf[44..52].try_into().expect("slice len"));
298
299        let hdr = Self {
300            magic,
301            version,
302            page_size,
303            default_group_size,
304            default_r_repair,
305            header_page_r_repair,
306            db_gen_digest,
307            checksum,
308        };
309
310        let expected = hdr.compute_checksum();
311        if hdr.checksum != expected {
312            return Err(FrankenError::DatabaseCorrupt {
313                detail: format!(
314                    ".db-fec header checksum mismatch: stored={:#x}, computed={expected:#x}",
315                    hdr.checksum
316                ),
317            });
318        }
319
320        info!(
321            bead_id = BEAD_ID,
322            page_size,
323            G_pages_per_group = default_group_size,
324            R_repair_pages = default_r_repair,
325            header_group_policy = header_page_r_repair,
326            format_version = version,
327            ".db-fec config on open"
328        );
329
330        Ok(hdr)
331    }
332
333    /// Compute xxh3_64 checksum of all fields preceding the checksum field.
334    #[must_use]
335    fn compute_checksum(&self) -> u64 {
336        let buf = self.to_bytes();
337        // Checksum covers bytes 0..44 (everything except the checksum field itself).
338        xxhash_rust::xxh3::xxh3_64(&buf[..44])
339    }
340
341    /// Verify that this header's `db_gen_digest` matches the current `.db` generation.
342    #[must_use]
343    pub fn is_current(
344        &self,
345        change_counter: u32,
346        page_count: u32,
347        freelist_count: u32,
348        schema_cookie: u32,
349    ) -> bool {
350        let current =
351            compute_db_gen_digest(change_counter, page_count, freelist_count, schema_cookie);
352        self.db_gen_digest == current
353    }
354}
355
356// ---------------------------------------------------------------------------
357// DbFecGroupMeta
358// ---------------------------------------------------------------------------
359
360/// Per-group metadata stored in the `.db-fec` sidecar.
361#[derive(Debug, Clone, PartialEq, Eq)]
362pub struct DbFecGroupMeta {
363    pub magic: [u8; 8],
364    pub version: u32,
365    pub page_size: u32,
366    pub start_pgno: u32,
367    pub group_size: u32,
368    pub r_repair: u32,
369    /// Content-addressed: `Trunc128(BLAKE3(domain || canonical))`.
370    pub object_id: [u8; 16],
371    /// Per-source-page xxh3_128 hashes; length == group_size.
372    pub source_page_xxh3_128: Vec<[u8; 16]>,
373    /// Must match `DbFecHeader.db_gen_digest`.
374    pub db_gen_digest: [u8; 16],
375    pub checksum: u64,
376}
377
378impl DbFecGroupMeta {
379    /// Create a new group meta. Computes `object_id` and `checksum` automatically.
380    #[must_use]
381    pub fn new(
382        page_size: u32,
383        start_pgno: u32,
384        group_size: u32,
385        r_repair: u32,
386        source_page_xxh3_128: Vec<[u8; 16]>,
387        db_gen_digest: [u8; 16],
388    ) -> Self {
389        assert!(
390            source_page_xxh3_128.len() == group_size as usize,
391            "source_page_xxh3_128.len() must equal group_size"
392        );
393        let mut meta = Self {
394            magic: GROUP_META_MAGIC,
395            version: DB_FEC_VERSION,
396            page_size,
397            start_pgno,
398            group_size,
399            r_repair,
400            object_id: [0u8; 16],
401            source_page_xxh3_128,
402            db_gen_digest,
403            checksum: 0,
404        };
405        meta.object_id = meta.compute_object_id();
406        meta.checksum = meta.compute_checksum();
407        meta
408    }
409
410    /// Fixed-size portion of the serialized meta (excluding variable-length hash array).
411    /// 8 (magic) + 4 (version) + 4 (page_size) + 4 (start_pgno) + 4 (group_size)
412    /// + 4 (r_repair) + 16 (object_id) + 16 (db_gen_digest) + 8 (checksum) = 68.
413    const FIXED_SIZE: usize = 68;
414
415    /// Total serialized size.
416    #[must_use]
417    pub fn serialized_size(&self) -> usize {
418        Self::FIXED_SIZE + self.source_page_xxh3_128.len() * 16
419    }
420
421    /// Serialized size for a group with the given `group_size`.
422    #[must_use]
423    pub fn serialized_size_for(group_size: u32) -> usize {
424        (group_size as usize)
425            .saturating_mul(16)
426            .saturating_add(Self::FIXED_SIZE)
427    }
428
429    /// Serialize to bytes.
430    #[must_use]
431    pub fn to_bytes(&self) -> Vec<u8> {
432        let total = self.serialized_size();
433        let mut buf = vec![0u8; total];
434        buf[0..8].copy_from_slice(&self.magic);
435        buf[8..12].copy_from_slice(&self.version.to_le_bytes());
436        buf[12..16].copy_from_slice(&self.page_size.to_le_bytes());
437        buf[16..20].copy_from_slice(&self.start_pgno.to_le_bytes());
438        buf[20..24].copy_from_slice(&self.group_size.to_le_bytes());
439        buf[24..28].copy_from_slice(&self.r_repair.to_le_bytes());
440        buf[28..44].copy_from_slice(&self.object_id);
441        let hash_start = 44;
442        for (i, h) in self.source_page_xxh3_128.iter().enumerate() {
443            let off = hash_start + i * 16;
444            buf[off..off + 16].copy_from_slice(h);
445        }
446        let digest_off = hash_start + self.source_page_xxh3_128.len() * 16;
447        buf[digest_off..digest_off + 16].copy_from_slice(&self.db_gen_digest);
448        buf[digest_off + 16..digest_off + 24].copy_from_slice(&self.checksum.to_le_bytes());
449        buf
450    }
451
452    /// Deserialize from bytes.
453    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
454        if buf.len() < Self::FIXED_SIZE {
455            return Err(FrankenError::DatabaseCorrupt {
456                detail: format!("group meta too short: {} < {}", buf.len(), Self::FIXED_SIZE),
457            });
458        }
459        let magic: [u8; 8] = buf[0..8].try_into().expect("slice len");
460        if magic != GROUP_META_MAGIC {
461            return Err(FrankenError::DatabaseCorrupt {
462                detail: format!("bad group meta magic: {magic:?}"),
463            });
464        }
465        let version = u32::from_le_bytes(buf[8..12].try_into().expect("slice len"));
466        if version != DB_FEC_VERSION {
467            return Err(FrankenError::DatabaseCorrupt {
468                detail: format!("unsupported group meta version: {version}"),
469            });
470        }
471        let page_size = u32::from_le_bytes(buf[12..16].try_into().expect("slice len"));
472        let start_pgno = u32::from_le_bytes(buf[16..20].try_into().expect("slice len"));
473        let group_size = u32::from_le_bytes(buf[20..24].try_into().expect("slice len"));
474        let r_repair = u32::from_le_bytes(buf[24..28].try_into().expect("slice len"));
475        let mut object_id = [0u8; 16];
476        object_id.copy_from_slice(&buf[28..44]);
477
478        let expected_total = Self::serialized_size_for(group_size);
479        if buf.len() < expected_total {
480            return Err(FrankenError::DatabaseCorrupt {
481                detail: format!(
482                    "group meta truncated: {} < {expected_total} for group_size={group_size}",
483                    buf.len()
484                ),
485            });
486        }
487
488        let hash_start = 44;
489        let mut source_page_xxh3_128 = Vec::with_capacity(group_size as usize);
490        for i in 0..group_size as usize {
491            let off = hash_start + i * 16;
492            let mut h = [0u8; 16];
493            h.copy_from_slice(&buf[off..off + 16]);
494            source_page_xxh3_128.push(h);
495        }
496
497        let digest_off = hash_start + group_size as usize * 16;
498        let mut db_gen_digest = [0u8; 16];
499        db_gen_digest.copy_from_slice(&buf[digest_off..digest_off + 16]);
500        let checksum = u64::from_le_bytes(
501            buf[digest_off + 16..digest_off + 24]
502                .try_into()
503                .expect("slice len"),
504        );
505
506        let meta = Self {
507            magic,
508            version,
509            page_size,
510            start_pgno,
511            group_size,
512            r_repair,
513            object_id,
514            source_page_xxh3_128,
515            db_gen_digest,
516            checksum,
517        };
518
519        let expected_cksum = meta.compute_checksum();
520        if meta.checksum != expected_cksum {
521            return Err(FrankenError::DatabaseCorrupt {
522                detail: format!(
523                    "group meta checksum mismatch: stored={:#x}, computed={expected_cksum:#x}",
524                    meta.checksum
525                ),
526            });
527        }
528
529        let expected_oid = meta.compute_object_id();
530        if meta.object_id != expected_oid {
531            return Err(FrankenError::DatabaseCorrupt {
532                detail: "group meta object_id mismatch".into(),
533            });
534        }
535
536        debug!(
537            bead_id = BEAD_ID,
538            group_idx = meta.start_pgno,
539            pgno_start = meta.start_pgno,
540            K = meta.group_size,
541            R = meta.r_repair,
542            "group meta validated"
543        );
544
545        Ok(meta)
546    }
547
548    /// Compute the content-addressed `object_id`.
549    #[must_use]
550    fn compute_object_id(&self) -> [u8; 16] {
551        let mut hasher = blake3::Hasher::new();
552        hasher.update(GROUP_OBJECT_ID_DOMAIN.as_bytes());
553        // Canonical representation: all fields except object_id and checksum.
554        hasher.update(&self.magic);
555        hasher.update(&self.version.to_le_bytes());
556        hasher.update(&self.page_size.to_le_bytes());
557        hasher.update(&self.start_pgno.to_le_bytes());
558        hasher.update(&self.group_size.to_le_bytes());
559        hasher.update(&self.r_repair.to_le_bytes());
560        for h in &self.source_page_xxh3_128 {
561            hasher.update(h);
562        }
563        hasher.update(&self.db_gen_digest);
564        let hash = hasher.finalize();
565        let mut oid = [0u8; 16];
566        oid.copy_from_slice(&hash.as_bytes()[..16]);
567        oid
568    }
569
570    /// Compute xxh3_64 checksum of all fields except the checksum field itself.
571    #[must_use]
572    fn compute_checksum(&self) -> u64 {
573        let bytes = self.to_bytes();
574        // Everything except the last 8 bytes (checksum field).
575        xxhash_rust::xxh3::xxh3_64(&bytes[..bytes.len() - 8])
576    }
577}
578
579// ---------------------------------------------------------------------------
580// Segment layout — O(1) random access
581// ---------------------------------------------------------------------------
582
583/// Compute the byte offset in the `.db-fec` file for the segment belonging to
584/// the full-group at 0-based index `g` (groups starting at page 2).
585///
586/// Layout:
587///   \[DbFecHeader\]\[Seg1 (page 1)\]\[SegG\_0\]\[SegG\_1\]...
588///
589/// `segment_1_len`: The total byte size of the page-1 segment (meta + R repair symbols).
590/// `full_segment_len`: The total byte size of a full-group segment (meta + R repair symbols).
591#[must_use]
592pub fn segment_offset(g: u32, segment_1_len: usize, full_segment_len: usize) -> usize {
593    DB_FEC_HEADER_SIZE + segment_1_len + g as usize * full_segment_len
594}
595
596/// Compute the total size of a group segment.
597///
598/// Each segment stores its `DbFecGroupMeta` plus R repair symbols of `page_size` bytes each.
599#[must_use]
600pub fn group_segment_size(group_size: u32, r_repair: u32, page_size: u32) -> usize {
601    DbFecGroupMeta::serialized_size_for(group_size) + r_repair as usize * page_size as usize
602}
603
604/// Find which 0-based full-group index a page number belongs to.
605/// Returns `None` for page 1 (header group) or invalid pgno.
606#[must_use]
607pub fn find_full_group_index(pgno: u32) -> Option<u32> {
608    if pgno < 2 {
609        return None;
610    }
611    Some((pgno - 2) / DEFAULT_GROUP_SIZE)
612}
613
614// ---------------------------------------------------------------------------
615// Read path — on-the-fly repair
616// ---------------------------------------------------------------------------
617
618/// Result of an on-the-fly repair attempt.
619#[derive(Debug, Clone, PartialEq, Eq)]
620pub enum RepairResult {
621    /// Page was intact, no repair needed.
622    Intact,
623    /// Page was repaired from group erasure coding.
624    Repaired { pgno: u32, symbols_used: u32 },
625    /// Repair failed — insufficient symbols.
626    Unrecoverable {
627        pgno: u32,
628        missing_pages: u32,
629        r_budget: u32,
630    },
631}
632
633/// Simulated page integrity check. In production this would check AEAD tag,
634/// page checksum, or structural integrity. Here we check xxh3_128 against
635/// the expected hash from group metadata.
636#[must_use]
637pub fn verify_page_xxh3_128(page_data: &[u8], expected_xxh3_128: &[u8; 16]) -> bool {
638    let hash = xxhash_rust::xxh3::xxh3_128(page_data);
639    hash.to_le_bytes() == *expected_xxh3_128
640}
641
642/// Compute xxh3_128 of page data, returned as a 16-byte LE array.
643#[must_use]
644pub fn page_xxh3_128(page_data: &[u8]) -> [u8; 16] {
645    let hash = xxhash_rust::xxh3::xxh3_128(page_data);
646    hash.to_le_bytes()
647}
648
649/// Attempt on-the-fly repair of a corrupted page using `.db-fec` group data.
650///
651/// `target_pgno` — the 1-based page to repair.
652/// `group_meta` — metadata for the group containing the page.
653/// `all_page_data` — function to read raw page data by pgno.
654/// `repair_symbols` — the R repair symbol data blocks for this group.
655///
656/// Uses the RFC 6330 RaptorQ `InactivationDecoder` to reconstruct missing
657/// source pages from any combination of intact sources and repair symbols,
658/// provided at least K total symbols are available.
659///
660/// Returns the repaired page bytes or an error.
661#[allow(clippy::too_many_lines)]
662pub fn attempt_page_repair(
663    target_pgno: u32,
664    group_meta: &DbFecGroupMeta,
665    all_page_data: &dyn Fn(u32) -> Vec<u8>,
666    repair_symbols: &[(u32, Vec<u8>)],
667) -> Result<(Vec<u8>, RepairResult)> {
668    let local_idx = target_pgno - group_meta.start_pgno;
669    let k = group_meta.group_size;
670
671    debug!(
672        bead_id = BEAD_ID,
673        target_pgno,
674        group_start = group_meta.start_pgno,
675        K = k,
676        R = group_meta.r_repair,
677        "attempting on-the-fly page repair"
678    );
679
680    // Collect available source symbols (intact pages in the group, excluding target).
681    let mut available: Vec<(u32, Vec<u8>)> = Vec::new();
682    let mut corrupt_count: u32 = 0;
683
684    for i in 0..k {
685        let pgno = group_meta.start_pgno + i;
686        if pgno == target_pgno {
687            corrupt_count += 1;
688            continue;
689        }
690        let data = all_page_data(pgno);
691        if verify_page_xxh3_128(&data, &group_meta.source_page_xxh3_128[i as usize]) {
692            available.push((i, data));
693        } else {
694            corrupt_count += 1;
695        }
696    }
697
698    // Add repair symbols.
699    for (esi, sym_data) in repair_symbols {
700        available.push((*esi, sym_data.clone()));
701    }
702
703    debug!(
704        bead_id = BEAD_ID,
705        target_pgno,
706        available_symbols = available.len(),
707        corrupt_count,
708        K = k,
709        "collected symbols for repair"
710    );
711
712    #[allow(clippy::cast_possible_truncation)]
713    let available_count = available.len() as u32;
714    if available_count < k {
715        error!(
716            bead_id = BEAD_ID,
717            target_pgno,
718            missing_or_corrupt_pages = corrupt_count,
719            R_budget = group_meta.r_repair,
720            action = "fail",
721            "unrecoverable group loss"
722        );
723        return Err(FrankenError::DatabaseCorrupt {
724            detail: format!(
725                "page {target_pgno}: insufficient symbols for repair ({} available, {k} needed, {corrupt_count} corrupt)",
726                available.len()
727            ),
728        });
729    }
730
731    // Decode via RFC 6330 RaptorQ InactivationDecoder.
732    let page_size = group_meta.page_size as usize;
733    let k_usize = k as usize;
734    let seed = derive_db_fec_repair_seed(group_meta);
735    let decoder = asupersync::raptorq::decoder::InactivationDecoder::new(k_usize, page_size, seed);
736    let params = decoder.params();
737    let base_rows = params.s + params.h;
738    let constraints = asupersync::raptorq::systematic::ConstraintMatrix::build(params, seed);
739
740    let mut received = decoder.constraint_symbols();
741
742    for (esi, data) in &available {
743        if (*esi as usize) < k_usize {
744            let (cols, coefs) = decoder.source_equation(*esi);
745            received.push(asupersync::raptorq::decoder::ReceivedSymbol {
746                esi: *esi,
747                is_source: true,
748                columns: cols,
749                coefficients: coefs,
750                data: data.clone(),
751            });
752        } else {
753            let (cols, coefs) = decoder.repair_equation(*esi);
754            received.push(asupersync::raptorq::decoder::ReceivedSymbol::repair(
755                *esi,
756                cols,
757                coefs,
758                data.clone(),
759            ));
760        }
761    }
762
763    // RFC 6330 decode requires K' source-domain rows; PI rows (K'−K) are
764    // zero-padded source symbols that must be represented explicitly.
765    for source_index in k_usize..params.k_prime {
766        let row = base_rows + source_index;
767        let mut columns = Vec::new();
768        let mut coefficients = Vec::new();
769        for col in 0..constraints.cols {
770            let coeff = constraints.get(row, col);
771            if !coeff.is_zero() {
772                columns.push(col);
773                coefficients.push(coeff);
774            }
775        }
776        received.push(asupersync::raptorq::decoder::ReceivedSymbol {
777            esi: u32::try_from(source_index).expect("source index fits u32"),
778            is_source: true,
779            columns,
780            coefficients,
781            data: vec![0_u8; page_size],
782        });
783    }
784
785    let result = decoder
786        .decode(&received)
787        .map_err(|err| FrankenError::DatabaseCorrupt {
788            detail: format!("page {target_pgno}: RaptorQ decode failed: {err:?}"),
789        })?;
790
791    if result.source.len() != k_usize {
792        return Err(FrankenError::DatabaseCorrupt {
793            detail: format!(
794                "page {target_pgno}: RaptorQ decode returned {} source symbols, expected {k}",
795                result.source.len()
796            ),
797        });
798    }
799
800    let recovered = result.source[local_idx as usize].clone();
801
802    // Validate recovered page.
803    if verify_page_xxh3_128(
804        &recovered,
805        &group_meta.source_page_xxh3_128[local_idx as usize],
806    ) {
807        info!(
808            bead_id = BEAD_ID,
809            target_pgno,
810            group_start = group_meta.start_pgno,
811            pages_repaired = 1,
812            symbols_used = available.len(),
813            "successful on-the-fly page repair"
814        );
815        Ok((
816            recovered,
817            RepairResult::Repaired {
818                pgno: target_pgno,
819                symbols_used: available_count,
820            },
821        ))
822    } else {
823        warn!(
824            bead_id = BEAD_ID,
825            target_pgno,
826            missing_or_corrupt_pages = corrupt_count,
827            R_budget = group_meta.r_repair,
828            "near-capacity repair: recovered page xxh3 mismatch"
829        );
830        Err(FrankenError::DatabaseCorrupt {
831            detail: format!("page {target_pgno}: recovered page failed xxh3_128 validation"),
832        })
833    }
834}
835
836// ---------------------------------------------------------------------------
837// Sidecar generation utility (bd-2r4z)
838// ---------------------------------------------------------------------------
839
840/// Compute the `.db-fec` sidecar path from a database path.
841#[must_use]
842pub fn db_fec_path_for_db(db_path: &Path) -> PathBuf {
843    let mut p = db_path.as_os_str().to_owned();
844    p.push("-fec");
845    PathBuf::from(p)
846}
847
848/// SQLite header field offsets (big-endian u32/u16).
849const SQLITE_HEADER_MIN_BYTES: usize = 100;
850const PAGE_SIZE_OFFSET: usize = 16;
851const CHANGE_COUNTER_OFFSET: usize = 24;
852const PAGE_COUNT_OFFSET: usize = 28;
853const FREELIST_COUNT_OFFSET: usize = 36;
854const SCHEMA_COOKIE_OFFSET: usize = 40;
855
856/// Fields extracted from a SQLite database header for FEC generation.
857#[derive(Debug, Clone, Copy)]
858pub struct DbHeaderFields {
859    pub page_size: u32,
860    pub change_counter: u32,
861    pub page_count: u32,
862    pub freelist_count: u32,
863    pub schema_cookie: u32,
864}
865
866/// Read the header fields from a SQLite database file.
867pub fn read_db_header_fields(db_path: &Path) -> Result<DbHeaderFields> {
868    let data = host_fs::read(db_path)?;
869    parse_db_header_fields(&data)
870}
871
872/// Parse header fields from raw database bytes.
873pub fn parse_db_header_fields(data: &[u8]) -> Result<DbHeaderFields> {
874    if data.len() < SQLITE_HEADER_MIN_BYTES {
875        return Err(FrankenError::DatabaseCorrupt {
876            detail: format!(
877                "database too short for header: {} < {SQLITE_HEADER_MIN_BYTES}",
878                data.len()
879            ),
880        });
881    }
882
883    let page_size_raw = u16::from_be_bytes(
884        data[PAGE_SIZE_OFFSET..PAGE_SIZE_OFFSET + 2]
885            .try_into()
886            .expect("fixed-length slice"),
887    );
888    // SQLite encoding: 1 means 65536.
889    let page_size = if page_size_raw == 1 {
890        65536
891    } else {
892        u32::from(page_size_raw)
893    };
894
895    let change_counter = u32::from_be_bytes(
896        data[CHANGE_COUNTER_OFFSET..CHANGE_COUNTER_OFFSET + 4]
897            .try_into()
898            .expect("fixed-length slice"),
899    );
900    let page_count = u32::from_be_bytes(
901        data[PAGE_COUNT_OFFSET..PAGE_COUNT_OFFSET + 4]
902            .try_into()
903            .expect("fixed-length slice"),
904    );
905    let freelist_count = u32::from_be_bytes(
906        data[FREELIST_COUNT_OFFSET..FREELIST_COUNT_OFFSET + 4]
907            .try_into()
908            .expect("fixed-length slice"),
909    );
910    let schema_cookie = u32::from_be_bytes(
911        data[SCHEMA_COOKIE_OFFSET..SCHEMA_COOKIE_OFFSET + 4]
912            .try_into()
913            .expect("fixed-length slice"),
914    );
915
916    Ok(DbHeaderFields {
917        page_size,
918        change_counter,
919        page_count,
920        freelist_count,
921        schema_cookie,
922    })
923}
924
925/// Derive a deterministic RaptorQ encoder seed from group metadata.
926///
927/// Uses xxh3_64 over the group's content-addressed fields to produce a
928/// seed that is unique per group and deterministic across encode/decode.
929fn derive_db_fec_repair_seed(meta: &DbFecGroupMeta) -> u64 {
930    let mut seed_material = Vec::with_capacity(16 + 4 * 4 + 16);
931    seed_material.extend_from_slice(&meta.object_id);
932    seed_material.extend_from_slice(&meta.page_size.to_le_bytes());
933    seed_material.extend_from_slice(&meta.start_pgno.to_le_bytes());
934    seed_material.extend_from_slice(&meta.group_size.to_le_bytes());
935    seed_material.extend_from_slice(&meta.r_repair.to_le_bytes());
936    seed_material.extend_from_slice(&meta.db_gen_digest);
937    xxhash_rust::xxh3::xxh3_64(&seed_material)
938}
939
940/// Compute RFC 6330 RaptorQ repair symbols for a group of source pages.
941///
942/// Uses `asupersync::raptorq::systematic::SystematicEncoder` to produce
943/// `r_repair` repair symbols with ESIs `[K, K+R)`.
944pub fn compute_raptorq_repair_symbols(
945    meta: &DbFecGroupMeta,
946    source_pages: &[&[u8]],
947    page_size: usize,
948) -> Result<Vec<Vec<u8>>> {
949    if source_pages.len() != meta.group_size as usize {
950        return Err(FrankenError::DatabaseCorrupt {
951            detail: format!(
952                "source_pages.len()={} != meta.group_size={}; encoder/decoder seed mismatch would corrupt data",
953                source_pages.len(),
954                meta.group_size,
955            ),
956        });
957    }
958    let seed = derive_db_fec_repair_seed(meta);
959    let source_vecs: Vec<Vec<u8>> = source_pages.iter().map(|s| s.to_vec()).collect();
960    let encoder =
961        asupersync::raptorq::systematic::SystematicEncoder::new(&source_vecs, page_size, seed)
962            .ok_or_else(|| FrankenError::DatabaseCorrupt {
963                detail: "RaptorQ constraint matrix singular during encoding".to_owned(),
964            })?;
965
966    let k = u32::try_from(source_pages.len()).map_err(|_| FrankenError::DatabaseCorrupt {
967        detail: "source page count does not fit in u32".to_owned(),
968    })?;
969
970    let mut symbols = Vec::with_capacity(meta.r_repair as usize);
971    for r_idx in 0..meta.r_repair {
972        let esi = k + r_idx;
973        symbols.push(encoder.repair_symbol(esi));
974    }
975    Ok(symbols)
976}
977
978/// Read a single page from raw database bytes, zero-padding if file is short.
979fn read_page_from_bytes(db_data: &[u8], pgno: u32, page_size: usize) -> Vec<u8> {
980    let offset_u64 = (u64::from(pgno) - 1) * (page_size as u64);
981    let offset = usize::try_from(offset_u64).unwrap_or(usize::MAX);
982    if offset.saturating_add(page_size) <= db_data.len() {
983        db_data[offset..offset + page_size].to_vec()
984    } else {
985        let mut page = vec![0u8; page_size];
986        if offset < db_data.len() {
987            let available = db_data.len() - offset;
988            page[..available].copy_from_slice(&db_data[offset..offset + available]);
989        }
990        page
991    }
992}
993
994/// Generate a complete `.db-fec` sidecar from raw database bytes.
995///
996/// Returns the sidecar file content as a byte vector. The layout is:
997/// `[DbFecHeader][Seg_page1][Seg_group0][Seg_group1]...`
998///
999/// Each general segment is padded to `full_segment_len` for O(1) random access.
1000#[allow(clippy::too_many_lines)]
1001pub fn generate_db_fec_from_bytes(db_data: &[u8]) -> Result<Vec<u8>> {
1002    let fields = parse_db_header_fields(db_data)?;
1003    let ps = fields.page_size as usize;
1004
1005    let header = DbFecHeader::new(
1006        fields.page_size,
1007        fields.change_counter,
1008        fields.page_count,
1009        fields.freelist_count,
1010        fields.schema_cookie,
1011    );
1012    let digest = header.db_gen_digest;
1013    let groups = partition_page_groups(fields.page_count);
1014
1015    // Pre-compute segment sizes for O(1) layout.
1016    let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, fields.page_size);
1017    let full_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, fields.page_size);
1018
1019    // Total sidecar size: header + seg1 + (num_general_groups * full_seg_len).
1020    let num_general_groups = groups.len().saturating_sub(1);
1021    let total_size = if groups.is_empty() {
1022        DB_FEC_HEADER_SIZE
1023    } else {
1024        DB_FEC_HEADER_SIZE + seg1_len + num_general_groups * full_seg_len
1025    };
1026    let mut sidecar = vec![0u8; total_size];
1027
1028    // Write header.
1029    sidecar[..DB_FEC_HEADER_SIZE].copy_from_slice(&header.to_bytes());
1030
1031    let mut cursor = DB_FEC_HEADER_SIZE;
1032
1033    for (gi, group) in groups.iter().enumerate() {
1034        // Read source pages.
1035        let source_refs: Vec<Vec<u8>> = (0..group.group_size)
1036            .map(|i| read_page_from_bytes(db_data, group.start_pgno + i, ps))
1037            .collect();
1038        let source_slices: Vec<&[u8]> = source_refs.iter().map(Vec::as_slice).collect();
1039
1040        // Compute per-page hashes.
1041        let hashes: Vec<[u8; 16]> = source_slices.iter().map(|p| page_xxh3_128(p)).collect();
1042
1043        // Build group metadata.
1044        let meta = DbFecGroupMeta::new(
1045            fields.page_size,
1046            group.start_pgno,
1047            group.group_size,
1048            group.repair,
1049            hashes,
1050            digest,
1051        );
1052
1053        // Compute repair symbols.
1054        let repair_symbols = compute_raptorq_repair_symbols(&meta, &source_slices, ps)?;
1055
1056        // Write metadata.
1057        let meta_bytes = meta.to_bytes();
1058        sidecar[cursor..cursor + meta_bytes.len()].copy_from_slice(&meta_bytes);
1059        cursor += meta_bytes.len();
1060
1061        // Write repair symbols.
1062        for sym in &repair_symbols {
1063            sidecar[cursor..cursor + ps].copy_from_slice(sym);
1064            cursor += ps;
1065        }
1066
1067        // Pad general segments to full_seg_len for O(1) access.
1068        if gi > 0 {
1069            let actual_seg_size = meta_bytes.len() + group.repair as usize * ps;
1070            let padding = full_seg_len - actual_seg_size;
1071            cursor += padding; // Already zeroed by vec![0u8; total_size].
1072        }
1073    }
1074
1075    let sidecar_len = sidecar.len() as u64;
1076    let page_count_u64 = u64::from(fields.page_count);
1077
1078    // Structured tracing span for snapshot FEC encoding.
1079    let _span = span!(
1080        Level::INFO,
1081        "snapshot_raptorq",
1082        pages_encoded = page_count_u64,
1083        total_bytes = sidecar_len,
1084        groups = groups.len(),
1085    )
1086    .entered();
1087
1088    GLOBAL_SNAPSHOT_FEC_METRICS.record_encode(page_count_u64, sidecar_len);
1089
1090    info!(
1091        bead_id = "bd-2r4z",
1092        page_count = fields.page_count,
1093        page_size = fields.page_size,
1094        groups = groups.len(),
1095        sidecar_bytes = sidecar.len(),
1096        "generated .db-fec sidecar"
1097    );
1098
1099    Ok(sidecar)
1100}
1101
1102/// Generate a `.db-fec` sidecar for a database file path.
1103pub fn generate_db_fec_sidecar(db_path: &Path) -> Result<Vec<u8>> {
1104    let db_data = host_fs::read(db_path)?;
1105    generate_db_fec_from_bytes(&db_data)
1106}
1107
1108/// Generate and write a `.db-fec` sidecar file, returning the sidecar path.
1109pub fn write_db_fec_sidecar(db_path: &Path) -> Result<PathBuf> {
1110    let sidecar_data = generate_db_fec_sidecar(db_path)?;
1111    let sidecar_path = db_fec_path_for_db(db_path);
1112    host_fs::write(&sidecar_path, &sidecar_data)?;
1113
1114    info!(
1115        bead_id = "bd-2r4z",
1116        db_path = %db_path.display(),
1117        sidecar_path = %sidecar_path.display(),
1118        sidecar_bytes = sidecar_data.len(),
1119        "wrote .db-fec sidecar"
1120    );
1121
1122    Ok(sidecar_path)
1123}
1124
1125/// Read the [`DbFecHeader`] from a `.db-fec` sidecar file.
1126pub fn read_db_fec_header(sidecar_path: &Path) -> Result<DbFecHeader> {
1127    let data = host_fs::read(sidecar_path)?;
1128    if data.len() < DB_FEC_HEADER_SIZE {
1129        return Err(FrankenError::DatabaseCorrupt {
1130            detail: format!(
1131                "sidecar too short for header: {} < {DB_FEC_HEADER_SIZE}",
1132                data.len()
1133            ),
1134        });
1135    }
1136    let buf: [u8; DB_FEC_HEADER_SIZE] = data[..DB_FEC_HEADER_SIZE]
1137        .try_into()
1138        .expect("fixed-length slice");
1139    DbFecHeader::from_bytes(&buf)
1140}
1141
1142/// Read group metadata and repair symbols for a target page from sidecar bytes.
1143///
1144/// Returns `(group_meta, repair_symbols)` where repair symbols are `(esi, data)` pairs
1145/// compatible with [`attempt_page_repair`].
1146#[allow(clippy::type_complexity)]
1147pub fn read_db_fec_group_for_page(
1148    sidecar_data: &[u8],
1149    header: &DbFecHeader,
1150    target_pgno: u32,
1151) -> Result<(DbFecGroupMeta, Vec<(u32, Vec<u8>)>)> {
1152    let ps = header.page_size as usize;
1153
1154    // Determine which segment to read.
1155    let (seg_offset, group_size_hint) = if target_pgno == 1 {
1156        (DB_FEC_HEADER_SIZE, 1_u32)
1157    } else {
1158        let gi =
1159            find_full_group_index(target_pgno).ok_or_else(|| FrankenError::DatabaseCorrupt {
1160                detail: format!("invalid target page number: {target_pgno}"),
1161            })?;
1162        let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, header.page_size);
1163        let full_seg_len =
1164            group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, header.page_size);
1165        let offset = segment_offset(gi, seg1_len, full_seg_len);
1166        (offset, DEFAULT_GROUP_SIZE)
1167    };
1168
1169    if seg_offset >= sidecar_data.len() {
1170        return Err(FrankenError::DatabaseCorrupt {
1171            detail: format!(
1172                "sidecar too short for segment at offset {seg_offset}: len={}",
1173                sidecar_data.len()
1174            ),
1175        });
1176    }
1177
1178    // Read group metadata (variable-length due to hash array).
1179    let meta_size = DbFecGroupMeta::serialized_size_for(group_size_hint);
1180    let meta_end = seg_offset + meta_size;
1181    if meta_end > sidecar_data.len() {
1182        return Err(FrankenError::DatabaseCorrupt {
1183            detail: format!(
1184                "sidecar truncated reading group meta at {seg_offset}: need {meta_size}, have {}",
1185                sidecar_data.len() - seg_offset
1186            ),
1187        });
1188    }
1189    let meta = DbFecGroupMeta::from_bytes(&sidecar_data[seg_offset..meta_end])?;
1190
1191    let actual_r = meta.r_repair;
1192    let actual_meta_size = meta.serialized_size();
1193    let mut sym_cursor = seg_offset + actual_meta_size;
1194
1195    // Defend against OOM from malicious r_repair values.
1196    let needed_repair_bytes = (actual_r as usize).saturating_mul(ps);
1197    if sym_cursor.saturating_add(needed_repair_bytes) > sidecar_data.len() {
1198        return Err(FrankenError::DatabaseCorrupt {
1199            detail: format!("sidecar too short for {} repair symbols", actual_r),
1200        });
1201    }
1202
1203    // Read repair symbols.
1204    let mut symbols = Vec::with_capacity(actual_r as usize);
1205    for r_idx in 0..actual_r {
1206        if sym_cursor + ps > sidecar_data.len() {
1207            return Err(FrankenError::DatabaseCorrupt {
1208                detail: format!("sidecar truncated reading repair symbol {r_idx} at {sym_cursor}"),
1209            });
1210        }
1211        let esi = meta.group_size + r_idx;
1212        symbols.push((esi, sidecar_data[sym_cursor..sym_cursor + ps].to_vec()));
1213        sym_cursor += ps;
1214    }
1215
1216    debug!(
1217        bead_id = "bd-2r4z",
1218        target_pgno,
1219        group_start = meta.start_pgno,
1220        K = meta.group_size,
1221        R = actual_r,
1222        "read .db-fec group for repair"
1223    );
1224
1225    Ok((meta, symbols))
1226}
1227
1228// ---------------------------------------------------------------------------
1229// Tests
1230// ---------------------------------------------------------------------------
1231
1232#[cfg(test)]
1233mod tests {
1234    use super::*;
1235
1236    // -- DbFecHeader tests --
1237
1238    #[test]
1239    fn test_db_fec_header_roundtrip() {
1240        let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1241        let bytes = hdr.to_bytes();
1242        assert_eq!(bytes.len(), DB_FEC_HEADER_SIZE);
1243        let decoded = DbFecHeader::from_bytes(&bytes).expect("decode");
1244        assert_eq!(hdr, decoded);
1245    }
1246
1247    #[test]
1248    fn test_db_gen_digest_computation() {
1249        // Known inputs.
1250        let d1 = compute_db_gen_digest(42, 100, 5, 99);
1251        let d2 = compute_db_gen_digest(42, 100, 5, 99);
1252        assert_eq!(d1, d2, "deterministic");
1253
1254        // Changing any field changes the digest.
1255        let d3 = compute_db_gen_digest(43, 100, 5, 99);
1256        assert_ne!(d1, d3);
1257        let d4 = compute_db_gen_digest(42, 101, 5, 99);
1258        assert_ne!(d1, d4);
1259        let d5 = compute_db_gen_digest(42, 100, 6, 99);
1260        assert_ne!(d1, d5);
1261        let d6 = compute_db_gen_digest(42, 100, 5, 100);
1262        assert_ne!(d1, d6);
1263    }
1264
1265    #[test]
1266    fn test_stale_sidecar_detection() {
1267        let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1268        assert!(hdr.is_current(42, 100, 5, 99));
1269        // Mismatched db_gen_digest -> sidecar ignored.
1270        assert!(!hdr.is_current(43, 100, 5, 99));
1271        assert!(!hdr.is_current(42, 101, 5, 99));
1272    }
1273
1274    #[test]
1275    fn test_db_fec_header_bad_checksum() {
1276        let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1277        let mut bytes = hdr.to_bytes();
1278        // Corrupt checksum.
1279        bytes[44] ^= 0xFF;
1280        let result = DbFecHeader::from_bytes(&bytes);
1281        assert!(result.is_err());
1282    }
1283
1284    #[test]
1285    fn test_db_fec_header_bad_magic() {
1286        let hdr = DbFecHeader::new(4096, 42, 100, 5, 99);
1287        let mut bytes = hdr.to_bytes();
1288        bytes[0] = b'X';
1289        let result = DbFecHeader::from_bytes(&bytes);
1290        assert!(result.is_err());
1291    }
1292
1293    // -- Page group partitioning tests --
1294
1295    #[test]
1296    fn test_page_group_partitioning_single_page() {
1297        let groups = partition_page_groups(1);
1298        assert_eq!(groups.len(), 1);
1299        assert_eq!(
1300            groups[0],
1301            PageGroup {
1302                start_pgno: 1,
1303                group_size: 1,
1304                repair: HEADER_PAGE_R_REPAIR
1305            }
1306        );
1307    }
1308
1309    #[test]
1310    fn test_page_group_partitioning_64_pages() {
1311        let groups = partition_page_groups(64);
1312        assert_eq!(groups.len(), 2);
1313        // Page 1 special.
1314        assert_eq!(groups[0].start_pgno, 1);
1315        assert_eq!(groups[0].group_size, 1);
1316        assert_eq!(groups[0].repair, HEADER_PAGE_R_REPAIR);
1317        // Pages 2-64.
1318        assert_eq!(groups[1].start_pgno, 2);
1319        assert_eq!(groups[1].group_size, 63);
1320        assert_eq!(groups[1].repair, DEFAULT_R_REPAIR);
1321    }
1322
1323    #[test]
1324    fn test_page_group_partitioning_65_pages() {
1325        let groups = partition_page_groups(65);
1326        assert_eq!(groups.len(), 2);
1327        assert_eq!(groups[1].start_pgno, 2);
1328        assert_eq!(groups[1].group_size, 64);
1329        assert_eq!(groups[1].repair, DEFAULT_R_REPAIR);
1330    }
1331
1332    #[test]
1333    fn test_page_group_partitioning_128_pages() {
1334        let groups = partition_page_groups(128);
1335        assert_eq!(groups.len(), 3);
1336        assert_eq!(groups[0].start_pgno, 1);
1337        assert_eq!(groups[0].group_size, 1);
1338        assert_eq!(groups[1].start_pgno, 2);
1339        assert_eq!(groups[1].group_size, 64);
1340        assert_eq!(groups[2].start_pgno, 66);
1341        assert_eq!(groups[2].group_size, 63);
1342    }
1343
1344    #[test]
1345    fn test_page_group_partitioning_1000_pages() {
1346        let groups = partition_page_groups(1000);
1347        // Page 1 + ceil((1000-1)/64) = 1 + 16 = 17 groups.
1348        assert_eq!(groups.len(), 17);
1349        assert_eq!(groups[0].group_size, 1);
1350        // Verify all pages covered.
1351        let total_pages: u32 = groups.iter().map(|g| g.group_size).sum();
1352        assert_eq!(total_pages, 1000);
1353    }
1354
1355    #[test]
1356    fn test_page_group_partitioning_zero() {
1357        let groups = partition_page_groups(0);
1358        assert!(groups.is_empty());
1359    }
1360
1361    #[test]
1362    fn test_header_page_400pct_redundancy() {
1363        let groups = partition_page_groups(100);
1364        // Page 1 group: G=1, R=4 -> 400% redundancy.
1365        assert_eq!(groups[0].group_size, 1);
1366        assert_eq!(groups[0].repair, 4);
1367    }
1368
1369    // -- Segment offset tests --
1370
1371    #[test]
1372    fn test_segment_offset_o1() {
1373        let page_size: u32 = 4096;
1374        let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, page_size);
1375        let general_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, page_size);
1376
1377        // Sequential layout check.
1378        for g in 0..10_u32 {
1379            let off = segment_offset(g, seg1_len, general_seg_len);
1380            let expected = DB_FEC_HEADER_SIZE + seg1_len + g as usize * general_seg_len;
1381            assert_eq!(off, expected, "segment offset mismatch for g={g}");
1382        }
1383    }
1384
1385    // -- DbFecGroupMeta tests --
1386
1387    #[test]
1388    fn test_group_meta_roundtrip() {
1389        let hashes: Vec<[u8; 16]> = (0..4)
1390            .map(|i| {
1391                let mut h = [0u8; 16];
1392                h[0] = i;
1393                h
1394            })
1395            .collect();
1396        let digest = compute_db_gen_digest(1, 100, 0, 42);
1397        let meta = DbFecGroupMeta::new(4096, 2, 4, 4, hashes, digest);
1398        let bytes = meta.to_bytes();
1399        let decoded = DbFecGroupMeta::from_bytes(&bytes).expect("decode");
1400        assert_eq!(meta, decoded);
1401    }
1402
1403    #[test]
1404    fn test_group_meta_object_id() {
1405        let hashes: Vec<[u8; 16]> = (0..2)
1406            .map(|i| {
1407                let mut h = [0u8; 16];
1408                h[0] = i;
1409                h
1410            })
1411            .collect();
1412        let digest = compute_db_gen_digest(1, 100, 0, 42);
1413        let meta = DbFecGroupMeta::new(4096, 2, 2, 4, hashes, digest);
1414
1415        // object_id must be deterministic and content-addressed.
1416        let oid = meta.object_id;
1417        assert_ne!(oid, [0u8; 16], "object_id should be non-zero");
1418
1419        // Changing a hash changes the object_id.
1420        let mut hashes2: Vec<[u8; 16]> = (0..2)
1421            .map(|i| {
1422                let mut h = [0u8; 16];
1423                h[0] = i;
1424                h
1425            })
1426            .collect();
1427        hashes2[0][1] = 0xFF;
1428        let meta2 = DbFecGroupMeta::new(4096, 2, 2, 4, hashes2, digest);
1429        assert_ne!(meta.object_id, meta2.object_id);
1430    }
1431
1432    #[test]
1433    fn test_group_meta_stale_guard() {
1434        let hashes = vec![[0u8; 16]; 1];
1435        let digest = compute_db_gen_digest(1, 100, 0, 42);
1436        let meta = DbFecGroupMeta::new(4096, 1, 1, 4, hashes, digest);
1437
1438        let stale_digest = compute_db_gen_digest(2, 100, 0, 42);
1439        // Group meta with mismatched db_gen_digest should be ignored.
1440        assert_ne!(meta.db_gen_digest, stale_digest);
1441    }
1442
1443    #[test]
1444    fn test_group_meta_bad_checksum() {
1445        let hashes = vec![[1u8; 16]; 2];
1446        let digest = compute_db_gen_digest(1, 100, 0, 42);
1447        let meta = DbFecGroupMeta::new(4096, 2, 2, 4, hashes, digest);
1448        let mut bytes = meta.to_bytes();
1449        // Corrupt last byte (checksum).
1450        let last = bytes.len() - 1;
1451        bytes[last] ^= 0xFF;
1452        let result = DbFecGroupMeta::from_bytes(&bytes);
1453        assert!(result.is_err());
1454    }
1455
1456    // -- Read path repair tests --
1457
1458    #[test]
1459    fn test_read_path_intact() {
1460        let page_size = 64_u32;
1461        let page_data: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i; page_size as usize]).collect();
1462        let hashes: Vec<[u8; 16]> = page_data.iter().map(|d| page_xxh3_128(d)).collect();
1463        let digest = compute_db_gen_digest(1, 5, 0, 1);
1464        let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1465
1466        // All pages intact — verify_page_xxh3_128 succeeds.
1467        for (i, d) in page_data.iter().enumerate() {
1468            assert!(verify_page_xxh3_128(d, &meta.source_page_xxh3_128[i]));
1469        }
1470    }
1471
1472    #[test]
1473    fn test_read_path_single_corruption() {
1474        let page_size = 64_u32;
1475        let page_data: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i + 1; page_size as usize]).collect();
1476        let hashes: Vec<[u8; 16]> = page_data.iter().map(|d| page_xxh3_128(d)).collect();
1477        let digest = compute_db_gen_digest(1, 5, 0, 1);
1478        let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1479
1480        // Generate RaptorQ repair symbols.
1481        let source_slices: Vec<&[u8]> = page_data.iter().map(Vec::as_slice).collect();
1482        let repair_data = compute_raptorq_repair_symbols(&meta, &source_slices, page_size as usize)
1483            .expect("encode");
1484
1485        // Corrupt page 3 (pgno=4, index=2 in group).
1486        let target_pgno = 4;
1487        let corrupted = vec![0xFF_u8; page_size as usize];
1488
1489        let read_fn = |pgno: u32| -> Vec<u8> {
1490            if pgno == target_pgno {
1491                corrupted.clone()
1492            } else {
1493                page_data[(pgno - 2) as usize].clone()
1494            }
1495        };
1496
1497        // Pair ESIs with repair data: ESI = K + r_idx.
1498        let repair_symbols: Vec<(u32, Vec<u8>)> = repair_data
1499            .into_iter()
1500            .enumerate()
1501            .map(|(i, d)| (4 + u32::try_from(i).expect("i fits u32"), d))
1502            .collect();
1503        let result = attempt_page_repair(target_pgno, &meta, &read_fn, &repair_symbols);
1504        let (recovered, status) = result.expect("repair should succeed");
1505        assert_eq!(
1506            recovered, page_data[2],
1507            "recovered page must match original"
1508        );
1509        assert!(matches!(status, RepairResult::Repaired { pgno: 4, .. }));
1510    }
1511
1512    #[test]
1513    fn test_read_path_exceed_corruption() {
1514        let page_size = 64_u32;
1515        let page_data: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i + 1; page_size as usize]).collect();
1516        let hashes: Vec<[u8; 16]> = page_data.iter().map(|d| page_xxh3_128(d)).collect();
1517        let digest = compute_db_gen_digest(1, 5, 0, 1);
1518        let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1519
1520        // All pages corrupted — no repair possible.
1521        let corrupted = vec![0xFF_u8; page_size as usize];
1522        let read_fn = |_pgno: u32| -> Vec<u8> { corrupted.clone() };
1523        let repair_symbols: Vec<(u32, Vec<u8>)> = Vec::new();
1524
1525        let result = attempt_page_repair(3, &meta, &read_fn, &repair_symbols);
1526        assert!(result.is_err());
1527    }
1528
1529    #[test]
1530    fn test_e2e_bitrot_recovery() {
1531        // Insert data, corrupt one page, read back with repair.
1532        let page_size = 128_u32;
1533        let num_pages = 4_u32;
1534        let pages: Vec<Vec<u8>> = (0..num_pages)
1535            .map(|i| {
1536                let mut data = vec![0u8; page_size as usize];
1537                // Write unique pattern.
1538                for (j, b) in data.iter_mut().enumerate() {
1539                    #[allow(clippy::cast_possible_truncation)]
1540                    {
1541                        *b = ((i as usize * 37 + j * 13) & 0xFF) as u8;
1542                    }
1543                }
1544                data
1545            })
1546            .collect();
1547
1548        let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1549        let digest = compute_db_gen_digest(1, num_pages + 1, 0, 1);
1550        let meta = DbFecGroupMeta::new(page_size, 2, num_pages, 4, hashes, digest);
1551
1552        // Generate RaptorQ repair symbols.
1553        let source_slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1554        let repair_data = compute_raptorq_repair_symbols(&meta, &source_slices, page_size as usize)
1555            .expect("encode");
1556
1557        // Corrupt page 2 (index 0 in group, pgno=2).
1558        let target = 2_u32;
1559        let corrupted = vec![0xAA_u8; page_size as usize];
1560
1561        let read_fn = |pgno: u32| -> Vec<u8> {
1562            if pgno == target {
1563                corrupted.clone()
1564            } else {
1565                pages[(pgno - 2) as usize].clone()
1566            }
1567        };
1568
1569        let repair_symbols: Vec<(u32, Vec<u8>)> = repair_data
1570            .into_iter()
1571            .enumerate()
1572            .map(|(i, d)| (num_pages + u32::try_from(i).expect("i fits u32"), d))
1573            .collect();
1574        let (recovered, _) =
1575            attempt_page_repair(target, &meta, &read_fn, &repair_symbols).expect("repair");
1576        assert_eq!(recovered, pages[0]);
1577    }
1578
1579    #[test]
1580    fn test_e2e_stale_sidecar_rejected() {
1581        let hdr1 = DbFecHeader::new(4096, 1, 100, 0, 1);
1582        let hdr2 = DbFecHeader::new(4096, 2, 100, 0, 1); // Different change_counter.
1583        assert_ne!(hdr1.db_gen_digest, hdr2.db_gen_digest);
1584        assert!(!hdr1.is_current(2, 100, 0, 1));
1585    }
1586
1587    #[test]
1588    fn test_overflow_threshold_g64_r4() {
1589        // Overhead = R/G = 4/64 = 6.25%.
1590        let overhead = f64::from(DEFAULT_R_REPAIR) / f64::from(DEFAULT_GROUP_SIZE);
1591        assert!((overhead - 0.0625).abs() < f64::EPSILON);
1592    }
1593
1594    #[test]
1595    fn test_last_group_partial() {
1596        // 100 pages: page 1 special, pages 2-65 (64 pages), pages 66-100 (35 pages).
1597        let groups = partition_page_groups(100);
1598        assert_eq!(groups.len(), 3);
1599        assert_eq!(groups[2].start_pgno, 66);
1600        assert_eq!(groups[2].group_size, 35);
1601
1602        // Segment offset formula still applies (last group has smaller K but offset
1603        // is computed from the full-group formula for stable seekability).
1604        let page_size = 4096_u32;
1605        let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, page_size);
1606        let general_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, page_size);
1607        let off = segment_offset(1, seg1_len, general_seg_len);
1608        assert_eq!(
1609            off,
1610            DB_FEC_HEADER_SIZE + seg1_len + general_seg_len,
1611            "second full-group offset"
1612        );
1613    }
1614
1615    #[test]
1616    fn test_find_full_group_index() {
1617        assert_eq!(find_full_group_index(1), None); // Header page.
1618        assert_eq!(find_full_group_index(2), Some(0));
1619        assert_eq!(find_full_group_index(65), Some(0));
1620        assert_eq!(find_full_group_index(66), Some(1));
1621        assert_eq!(find_full_group_index(130), Some(2));
1622    }
1623
1624    // -- Compliance gates --
1625
1626    #[test]
1627    fn test_bd_1hi_18_unit_compliance_gate() {
1628        // Verify bead identifiers and mandatory test presence.
1629        assert_eq!(BEAD_ID, "bd-1hi.18");
1630        assert_eq!(DB_FEC_MAGIC, *b"FSQLDFEC");
1631        assert_eq!(GROUP_META_MAGIC, *b"FSQLDGRP");
1632        assert_eq!(DB_FEC_VERSION, 1);
1633        assert_eq!(DEFAULT_GROUP_SIZE, 64);
1634        assert_eq!(DEFAULT_R_REPAIR, 4);
1635        assert_eq!(HEADER_PAGE_R_REPAIR, 4);
1636    }
1637
1638    #[test]
1639    fn prop_bd_1hi_18_structure_compliance() {
1640        // Property: partition_page_groups covers all pages exactly once.
1641        for n in [1_u32, 2, 63, 64, 65, 128, 129, 500, 1000] {
1642            let groups = partition_page_groups(n);
1643            let total: u32 = groups.iter().map(|g| g.group_size).sum();
1644            assert_eq!(total, n, "total pages mismatch for n={n}");
1645
1646            // No overlaps.
1647            let mut covered = 0_u32;
1648            for g in &groups {
1649                assert!(g.start_pgno > covered, "overlap at pgno {}", g.start_pgno);
1650                covered = g.start_pgno + g.group_size - 1;
1651            }
1652            assert_eq!(covered, n);
1653        }
1654    }
1655
1656    #[test]
1657    fn test_e2e_bd_1hi_18_compliance() {
1658        // End-to-end: create header, create groups, verify sidecar coherence.
1659        let page_size = 4096_u32;
1660        let db_pages = 200_u32;
1661        let hdr = DbFecHeader::new(page_size, 10, db_pages, 3, 42);
1662
1663        // Verify round-trip.
1664        let hdr2 = DbFecHeader::from_bytes(&hdr.to_bytes()).expect("roundtrip");
1665        assert_eq!(hdr, hdr2);
1666        assert!(hdr.is_current(10, db_pages, 3, 42));
1667
1668        // Verify groups.
1669        let groups = partition_page_groups(db_pages);
1670        assert!(!groups.is_empty());
1671        let total: u32 = groups.iter().map(|g| g.group_size).sum();
1672        assert_eq!(total, db_pages);
1673
1674        // Page 1 special group.
1675        assert_eq!(groups[0].group_size, 1);
1676        assert_eq!(groups[0].repair, HEADER_PAGE_R_REPAIR);
1677
1678        // Verify segment offset monotonicity.
1679        let seg1_len = group_segment_size(1, HEADER_PAGE_R_REPAIR, page_size);
1680        let general_seg_len = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, page_size);
1681        let mut prev_off = 0;
1682        #[allow(clippy::cast_possible_truncation)]
1683        let group_count = groups.len().saturating_sub(1) as u32;
1684        for g in 0..group_count {
1685            let off = segment_offset(g, seg1_len, general_seg_len);
1686            assert!(
1687                off > prev_off || g == 0,
1688                "offsets must be monotonically increasing"
1689            );
1690            prev_off = off;
1691        }
1692    }
1693
1694    // -- Property: db_gen_digest deterministic --
1695
1696    #[test]
1697    fn prop_db_gen_digest_deterministic() {
1698        for i in 0..50_u32 {
1699            let d1 = compute_db_gen_digest(i, i * 10, i * 2, i * 3);
1700            let d2 = compute_db_gen_digest(i, i * 10, i * 2, i * 3);
1701            assert_eq!(d1, d2, "digest must be deterministic for i={i}");
1702        }
1703    }
1704
1705    // -- Property: group_segment_size consistent --
1706
1707    #[test]
1708    fn prop_group_segment_sizes_consistent() {
1709        for ps in [512_u32, 1024, 4096, 8192, 16384, 32768, 65536] {
1710            let seg1 = group_segment_size(1, HEADER_PAGE_R_REPAIR, ps);
1711            let general_seg = group_segment_size(DEFAULT_GROUP_SIZE, DEFAULT_R_REPAIR, ps);
1712
1713            // seg1 should be smaller (fewer source pages = fewer hashes).
1714            assert!(seg1 < general_seg, "page-1 segment should be smaller");
1715
1716            // Verify formula: meta_size + R * page_size.
1717            let expected_seg1 = DbFecGroupMeta::serialized_size_for(1)
1718                + HEADER_PAGE_R_REPAIR as usize * ps as usize;
1719            assert_eq!(seg1, expected_seg1);
1720
1721            let expected_general_seg = DbFecGroupMeta::serialized_size_for(DEFAULT_GROUP_SIZE)
1722                + DEFAULT_R_REPAIR as usize * ps as usize;
1723            assert_eq!(general_seg, expected_general_seg);
1724        }
1725    }
1726
1727    // -- Sidecar generation utility tests (bd-2r4z) --
1728
1729    fn make_synthetic_db(page_size: u32, page_count: u32) -> Vec<u8> {
1730        let ps = page_size as usize;
1731        let mut db = vec![0u8; ps * page_count as usize];
1732        db[..16].copy_from_slice(b"SQLite format 3\0");
1733        #[allow(clippy::cast_possible_truncation)]
1734        let ps_enc: u16 = if page_size == 65536 {
1735            1
1736        } else {
1737            page_size as u16
1738        };
1739        db[PAGE_SIZE_OFFSET..PAGE_SIZE_OFFSET + 2].copy_from_slice(&ps_enc.to_be_bytes());
1740        db[CHANGE_COUNTER_OFFSET..CHANGE_COUNTER_OFFSET + 4].copy_from_slice(&1_u32.to_be_bytes());
1741        db[PAGE_COUNT_OFFSET..PAGE_COUNT_OFFSET + 4].copy_from_slice(&page_count.to_be_bytes());
1742        db[FREELIST_COUNT_OFFSET..FREELIST_COUNT_OFFSET + 4].copy_from_slice(&0_u32.to_be_bytes());
1743        db[SCHEMA_COOKIE_OFFSET..SCHEMA_COOKIE_OFFSET + 4].copy_from_slice(&42_u32.to_be_bytes());
1744        for pgno in 1..=page_count {
1745            let offset = (pgno as usize - 1) * ps;
1746            let start = if pgno == 1 { 100 } else { 0 };
1747            for j in start..ps {
1748                #[allow(clippy::cast_possible_truncation)]
1749                {
1750                    db[offset + j] = ((pgno as usize * 37 + j * 13) & 0xFF) as u8;
1751                }
1752            }
1753        }
1754        db
1755    }
1756
1757    #[test]
1758    fn test_parse_db_header_fields() {
1759        let db = make_synthetic_db(4096, 10);
1760        let fields = parse_db_header_fields(&db).expect("parse");
1761        assert_eq!(fields.page_size, 4096);
1762        assert_eq!(fields.change_counter, 1);
1763        assert_eq!(fields.page_count, 10);
1764        assert_eq!(fields.freelist_count, 0);
1765        assert_eq!(fields.schema_cookie, 42);
1766    }
1767
1768    #[test]
1769    fn test_parse_db_header_too_short() {
1770        assert!(parse_db_header_fields(&[0u8; 50]).is_err());
1771    }
1772
1773    #[test]
1774    fn test_db_fec_path_for_db() {
1775        let p = db_fec_path_for_db(Path::new("/tmp/test.db"));
1776        assert_eq!(p, PathBuf::from("/tmp/test.db-fec"));
1777    }
1778
1779    #[test]
1780    fn test_generate_db_fec_sidecar_header_valid() {
1781        let db = make_synthetic_db(512, 5);
1782        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1783        assert!(sidecar.len() >= DB_FEC_HEADER_SIZE);
1784        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1785        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1786        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1787        assert_eq!(hdr.page_size, 512);
1788        assert!(hdr.is_current(1, 5, 0, 42));
1789    }
1790
1791    #[test]
1792    fn test_generate_and_read_group_roundtrip() {
1793        let db = make_synthetic_db(512, 5);
1794        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1795        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1796        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1797        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1798        let (meta1, syms1) = read_db_fec_group_for_page(&sidecar, &hdr, 1).expect("page 1 group");
1799        assert_eq!(meta1.start_pgno, 1);
1800        assert_eq!(meta1.group_size, 1);
1801        assert_eq!(meta1.r_repair, HEADER_PAGE_R_REPAIR);
1802        assert_eq!(syms1.len(), HEADER_PAGE_R_REPAIR as usize);
1803        let (meta2, syms2) = read_db_fec_group_for_page(&sidecar, &hdr, 2).expect("page 2 group");
1804        assert_eq!(meta2.start_pgno, 2);
1805        assert_eq!(meta2.group_size, 4);
1806        assert_eq!(syms2.len(), DEFAULT_R_REPAIR as usize);
1807        for i in 0..meta2.group_size {
1808            let page = read_page_from_bytes(&db, meta2.start_pgno + i, 512);
1809            assert!(verify_page_xxh3_128(
1810                &page,
1811                &meta2.source_page_xxh3_128[i as usize]
1812            ));
1813        }
1814    }
1815
1816    #[test]
1817    fn test_sidecar_encode_corrupt_decode_cycle() {
1818        let ps = 512_usize;
1819        let mut db = make_synthetic_db(512, 5);
1820        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1821        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1822        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1823        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1824        let target_pgno = 3_u32;
1825        let original_page = read_page_from_bytes(&db, target_pgno, ps);
1826        let corrupt_offset = (target_pgno as usize - 1) * ps;
1827        for b in &mut db[corrupt_offset..corrupt_offset + ps] {
1828            *b = 0xDE;
1829        }
1830        let (meta, repair_symbols) =
1831            read_db_fec_group_for_page(&sidecar, &hdr, target_pgno).expect("read group");
1832        let corrupted_data = read_page_from_bytes(&db, target_pgno, ps);
1833        let idx = (target_pgno - meta.start_pgno) as usize;
1834        assert!(!verify_page_xxh3_128(
1835            &corrupted_data,
1836            &meta.source_page_xxh3_128[idx]
1837        ));
1838        let read_fn = |pgno: u32| -> Vec<u8> { read_page_from_bytes(&db, pgno, ps) };
1839        let (recovered, result) =
1840            attempt_page_repair(target_pgno, &meta, &read_fn, &repair_symbols)
1841                .expect("repair should succeed");
1842        assert_eq!(recovered, original_page);
1843        assert!(matches!(result, RepairResult::Repaired { pgno: 3, .. }));
1844    }
1845
1846    #[test]
1847    fn test_sidecar_header_page_repair() {
1848        let ps = 256_usize;
1849        let mut db = make_synthetic_db(256, 3);
1850        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1851        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1852        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1853        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1854        let original_page1 = read_page_from_bytes(&db, 1, ps);
1855        for b in &mut db[..ps] {
1856            *b = 0xCC;
1857        }
1858        let (meta, repair_symbols) =
1859            read_db_fec_group_for_page(&sidecar, &hdr, 1).expect("read group");
1860        assert_eq!(meta.group_size, 1);
1861        assert_eq!(meta.r_repair, 4);
1862        let read_fn = |_pgno: u32| -> Vec<u8> { read_page_from_bytes(&db, 1, ps) };
1863        let (recovered, _) =
1864            attempt_page_repair(1, &meta, &read_fn, &repair_symbols).expect("repair page 1");
1865        assert_eq!(recovered, original_page1);
1866    }
1867
1868    #[test]
1869    fn test_sidecar_stale_digest_detection() {
1870        let db = make_synthetic_db(512, 5);
1871        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1872        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1873        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1874        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1875        assert!(hdr.is_current(1, 5, 0, 42));
1876        assert!(!hdr.is_current(2, 5, 0, 42));
1877        assert!(!hdr.is_current(1, 6, 0, 42));
1878    }
1879
1880    #[test]
1881    fn test_sidecar_xxh3_validates_corruption() {
1882        let db = make_synthetic_db(512, 5);
1883        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1884        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1885        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1886        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1887        let (meta, _) = read_db_fec_group_for_page(&sidecar, &hdr, 3).expect("read");
1888        let page = read_page_from_bytes(&db, 3, 512);
1889        let idx = (3 - meta.start_pgno) as usize;
1890        assert!(verify_page_xxh3_128(&page, &meta.source_page_xxh3_128[idx]));
1891        let corrupt = vec![0xFF_u8; 512];
1892        assert!(!verify_page_xxh3_128(
1893            &corrupt,
1894            &meta.source_page_xxh3_128[idx]
1895        ));
1896    }
1897
1898    #[test]
1899    fn test_sidecar_large_db_128_pages() {
1900        let mut db = make_synthetic_db(512, 128);
1901        let sidecar = generate_db_fec_from_bytes(&db).expect("generate");
1902        let mut hdr_buf = [0u8; DB_FEC_HEADER_SIZE];
1903        hdr_buf.copy_from_slice(&sidecar[..DB_FEC_HEADER_SIZE]);
1904        let hdr = DbFecHeader::from_bytes(&hdr_buf).expect("header");
1905        let (m1, _) = read_db_fec_group_for_page(&sidecar, &hdr, 1).expect("page 1");
1906        assert_eq!(m1.group_size, 1);
1907        let (m2, _) = read_db_fec_group_for_page(&sidecar, &hdr, 30).expect("page 30");
1908        assert_eq!(m2.start_pgno, 2);
1909        assert_eq!(m2.group_size, 64);
1910        let (m3, _) = read_db_fec_group_for_page(&sidecar, &hdr, 100).expect("page 100");
1911        assert_eq!(m3.start_pgno, 66);
1912        assert_eq!(m3.group_size, 63);
1913        let original = read_page_from_bytes(&db, 100, 512);
1914        let off = (100 - 1) * 512;
1915        for b in &mut db[off..off + 512] {
1916            *b = 0xBB;
1917        }
1918        let (meta, syms) = read_db_fec_group_for_page(&sidecar, &hdr, 100).expect("read");
1919        let read_fn = |pgno: u32| -> Vec<u8> { read_page_from_bytes(&db, pgno, 512) };
1920        let (recovered, _) =
1921            attempt_page_repair(100, &meta, &read_fn, &syms).expect("repair page 100");
1922        assert_eq!(recovered, original);
1923    }
1924
1925    #[test]
1926    fn test_sidecar_file_write_read_roundtrip() {
1927        let dir = tempfile::tempdir().expect("tempdir");
1928        let db_path = dir.path().join("test.db");
1929        let db = make_synthetic_db(512, 5);
1930        std::fs::write(&db_path, &db).expect("write db");
1931        let sidecar_path = write_db_fec_sidecar(&db_path).expect("write sidecar");
1932        assert_eq!(sidecar_path, db_fec_path_for_db(&db_path));
1933        assert!(sidecar_path.exists());
1934        let hdr = read_db_fec_header(&sidecar_path).expect("read header");
1935        assert_eq!(hdr.page_size, 512);
1936        assert!(hdr.is_current(1, 5, 0, 42));
1937    }
1938
1939    // -- RaptorQ-specific tests (bd-n0g4q.2) --
1940
1941    #[test]
1942    fn test_raptorq_encode_deterministic() {
1943        let page_size = 128_u32;
1944        let pages: Vec<Vec<u8>> = (0..4_u8).map(|i| vec![i + 1; page_size as usize]).collect();
1945        let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1946        let digest = compute_db_gen_digest(1, 5, 0, 1);
1947        let meta = DbFecGroupMeta::new(page_size, 2, 4, 4, hashes, digest);
1948        let slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1949        let r1 = compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("e1");
1950        let r2 = compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("e2");
1951        assert_eq!(r1, r2, "RaptorQ encoding must be deterministic");
1952    }
1953
1954    #[test]
1955    fn test_raptorq_encode_produces_correct_count() {
1956        let page_size = 64_u32;
1957        let pages: Vec<Vec<u8>> = (0..8_u8).map(|i| vec![i; page_size as usize]).collect();
1958        let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1959        let digest = compute_db_gen_digest(1, 9, 0, 1);
1960        let meta = DbFecGroupMeta::new(page_size, 2, 8, 4, hashes, digest);
1961        let slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1962        let syms =
1963            compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("encode");
1964        assert_eq!(syms.len(), 4, "should produce R=4 repair symbols");
1965        for sym in &syms {
1966            assert_eq!(sym.len(), page_size as usize, "symbol size = page_size");
1967        }
1968    }
1969
1970    #[test]
1971    fn test_raptorq_multi_corruption_recovery() {
1972        // Verify that RaptorQ can recover from multiple corrupted pages
1973        // (up to R) — something the old XOR parity could not do.
1974        let page_size = 128_u32;
1975        let k = 8_u32;
1976        let r = 4_u32;
1977        let pages: Vec<Vec<u8>> = (0..k)
1978            .map(|i| {
1979                let mut data = vec![0u8; page_size as usize];
1980                for (j, b) in data.iter_mut().enumerate() {
1981                    #[allow(clippy::cast_possible_truncation)]
1982                    {
1983                        *b = ((i as usize * 41 + j * 7) & 0xFF) as u8;
1984                    }
1985                }
1986                data
1987            })
1988            .collect();
1989
1990        let hashes: Vec<[u8; 16]> = pages.iter().map(|d| page_xxh3_128(d)).collect();
1991        let digest = compute_db_gen_digest(1, k + 1, 0, 1);
1992        let meta = DbFecGroupMeta::new(page_size, 2, k, r, hashes, digest);
1993
1994        let slices: Vec<&[u8]> = pages.iter().map(Vec::as_slice).collect();
1995        let repair_data =
1996            compute_raptorq_repair_symbols(&meta, &slices, page_size as usize).expect("encode");
1997        let repair_symbols: Vec<(u32, Vec<u8>)> = repair_data
1998            .into_iter()
1999            .enumerate()
2000            .map(|(i, d)| (k + u32::try_from(i).expect("i fits u32"), d))
2001            .collect();
2002
2003        // Corrupt pages 2 and 3 (indices 0 and 1 in the group).
2004        let corrupt_pgnos = [2_u32, 3_u32];
2005        let corrupted = vec![0xDD_u8; page_size as usize];
2006
2007        let read_fn = |pgno: u32| -> Vec<u8> {
2008            if corrupt_pgnos.contains(&pgno) {
2009                corrupted.clone()
2010            } else {
2011                pages[(pgno - 2) as usize].clone()
2012            }
2013        };
2014
2015        // Repair page 2.
2016        let (recovered_p2, status) =
2017            attempt_page_repair(2, &meta, &read_fn, &repair_symbols).expect("repair page 2");
2018        assert_eq!(recovered_p2, pages[0]);
2019        assert!(matches!(status, RepairResult::Repaired { pgno: 2, .. }));
2020
2021        // Repair page 3.
2022        let (recovered_p3, status) =
2023            attempt_page_repair(3, &meta, &read_fn, &repair_symbols).expect("repair page 3");
2024        assert_eq!(recovered_p3, pages[1]);
2025        assert!(matches!(status, RepairResult::Repaired { pgno: 3, .. }));
2026    }
2027
2028    #[test]
2029    fn test_raptorq_seed_differs_per_group() {
2030        let digest = compute_db_gen_digest(1, 200, 0, 42);
2031        let meta_a = DbFecGroupMeta::new(4096, 1, 1, 4, vec![[0u8; 16]], digest);
2032        let meta_b = DbFecGroupMeta::new(4096, 2, 64, 4, vec![[0u8; 16]; 64], digest);
2033        let seed_a = derive_db_fec_repair_seed(&meta_a);
2034        let seed_b = derive_db_fec_repair_seed(&meta_b);
2035        assert_ne!(
2036            seed_a, seed_b,
2037            "different groups must produce different seeds"
2038        );
2039    }
2040
2041    // -------------------------------------------------------------------
2042    // Snapshot FEC metrics tests
2043    // -------------------------------------------------------------------
2044
2045    #[test]
2046    fn test_snapshot_fec_metrics_record_and_snapshot() {
2047        let m = SnapshotFecMetrics::new();
2048        m.record_encode(100, 4096);
2049        m.record_encode(64, 2048);
2050        let s = m.snapshot();
2051        assert_eq!(s.encoded_pages_total, 164);
2052        assert_eq!(s.sidecar_bytes_total, 6144);
2053        assert_eq!(s.encode_ops, 2);
2054    }
2055
2056    #[test]
2057    fn test_snapshot_fec_metrics_reset() {
2058        let m = SnapshotFecMetrics::new();
2059        m.record_encode(10, 500);
2060        m.reset();
2061        let s = m.snapshot();
2062        assert_eq!(s.encoded_pages_total, 0);
2063        assert_eq!(s.sidecar_bytes_total, 0);
2064        assert_eq!(s.encode_ops, 0);
2065    }
2066
2067    #[test]
2068    fn test_snapshot_fec_metrics_display() {
2069        let m = SnapshotFecMetrics::new();
2070        m.record_encode(42, 1024);
2071        let s = m.snapshot();
2072        let text = format!("{s}");
2073        assert!(text.contains("snapshot_fec_pages_encoded=42"));
2074        assert!(text.contains("sidecar_bytes=1024"));
2075        assert!(text.contains("encode_ops=1"));
2076    }
2077
2078    #[test]
2079    fn test_snapshot_fec_metrics_global_delta() {
2080        // Delta-based test safe for parallel execution.
2081        let before = GLOBAL_SNAPSHOT_FEC_METRICS.snapshot();
2082        GLOBAL_SNAPSHOT_FEC_METRICS.record_encode(7, 256);
2083        let after = GLOBAL_SNAPSHOT_FEC_METRICS.snapshot();
2084        assert_eq!(after.encoded_pages_total - before.encoded_pages_total, 7);
2085        assert_eq!(after.sidecar_bytes_total - before.sidecar_bytes_total, 256);
2086        assert_eq!(after.encode_ops - before.encode_ops, 1);
2087    }
2088}