Skip to main content

fsqlite_wal/
wal_fec.rs

1//! WAL-FEC sidecar format (`.wal-fec`) for self-healing WAL durability (§3.4.1).
2//!
3//! The sidecar is append-only. Each group is encoded as:
4//! 1. length-prefixed [`WalFecGroupMeta`]
5//! 2. `R` length-prefixed ECS [`SymbolRecord`] repair symbols (`esi = K..K+R-1`)
6//!
7//! Source symbols remain in `.wal` frames and are never duplicated in sidecar.
8
9use std::collections::{BTreeMap, VecDeque};
10use std::fmt;
11use std::fs;
12use std::io::Write;
13use std::mem::size_of;
14use std::path::{Path, PathBuf};
15use std::str::FromStr;
16use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
18use std::thread;
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use asupersync::channel::mpsc;
22use asupersync::cx::Cx as NativeCx;
23use asupersync::runtime::{
24    JoinHandle as AsyncJoinHandle, RuntimeHandle, spawn_blocking, yield_now,
25};
26use fsqlite_error::{FrankenError, Result};
27use fsqlite_types::{ObjectId, Oti, PageSize, SymbolRecord, SymbolRecordFlags, cx::Cx};
28use tracing::{debug, error, info, warn};
29use xxhash_rust::xxh3::xxh3_64;
30
31use crate::checksum::{
32    WalSalts, Xxh3Checksum128, verify_wal_fec_source_hash, wal_fec_source_hash_xxh3_128,
33};
34
35/// Magic bytes for [`WalFecGroupMeta`].
36pub const WAL_FEC_GROUP_META_MAGIC: [u8; 8] = *b"FSQLWFEC";
37/// Current [`WalFecGroupMeta`] wire version.
38pub const WAL_FEC_GROUP_META_VERSION: u32 = 1;
39/// Default `PRAGMA raptorq_repair_symbols` value for fresh databases.
40pub const DEFAULT_RAPTORQ_REPAIR_SYMBOLS: u8 = 2;
41/// Maximum accepted `PRAGMA raptorq_repair_symbols` value (`u8` range).
42pub const MAX_RAPTORQ_REPAIR_SYMBOLS: u8 = u8::MAX;
43/// Magic bytes for the optional `.wal-fec` configuration header.
44pub const WAL_FEC_PRAGMA_HEADER_MAGIC: [u8; 8] = *b"FSQLWFCP";
45/// Current `.wal-fec` configuration header version.
46pub const WAL_FEC_PRAGMA_HEADER_VERSION: u32 = 1;
47
48const LENGTH_PREFIX_BYTES: usize = 4;
49const META_FIXED_PREFIX_BYTES: usize = 8 + 4 + (8 * 4) + 22 + 16;
50const META_CHECKSUM_BYTES: usize = 8;
51const WAL_FEC_PRAGMA_HEADER_BYTES: usize = 8 + 4 + 1 + 3 + 8;
52/// Maximum queued repair events. 512 is sufficient for databases with up to
53/// ~500 concurrent corrupted frames. Exhaustion means repair events are dropped
54/// (logged, not silently lost). Monitor `wal_fec_repair_events_dropped` counter.
55const RAPTORQ_REPAIR_EVENT_CAPACITY: usize = 512;
56/// Maximum queued repair evidence records. 2048 covers 4× the event capacity
57/// to allow multiple evidence records per event (typical for multi-symbol repairs).
58const RAPTORQ_REPAIR_EVIDENCE_CAPACITY: usize = 2048;
59
60type RaptorqRepairEquation = (Vec<usize>, Vec<asupersync::raptorq::gf256::Gf256>);
61
62trait IntoWalFecRepairEquation {
63    fn into_wal_fec_result(self, esi: u32) -> Result<RaptorqRepairEquation>;
64}
65
66impl IntoWalFecRepairEquation for RaptorqRepairEquation {
67    fn into_wal_fec_result(self, _esi: u32) -> Result<RaptorqRepairEquation> {
68        Ok(self)
69    }
70}
71
72impl IntoWalFecRepairEquation for Option<RaptorqRepairEquation> {
73    fn into_wal_fec_result(self, esi: u32) -> Result<RaptorqRepairEquation> {
74        self.ok_or_else(|| FrankenError::WalCorrupt {
75            detail: format!("invalid RaptorQ repair ESI {esi}: unsupported domain"),
76        })
77    }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81struct WalFecPragmaHeader {
82    magic: [u8; 8],
83    version: u32,
84    raptorq_repair_symbols: u8,
85    reserved: [u8; 3],
86    checksum: u64,
87}
88
89impl WalFecPragmaHeader {
90    #[must_use]
91    fn new(raptorq_repair_symbols: u8) -> Self {
92        let mut header = Self {
93            magic: WAL_FEC_PRAGMA_HEADER_MAGIC,
94            version: WAL_FEC_PRAGMA_HEADER_VERSION,
95            raptorq_repair_symbols,
96            reserved: [0; 3],
97            checksum: 0,
98        };
99        header.checksum = header.compute_checksum();
100        header
101    }
102
103    fn from_prefix(bytes: &[u8]) -> Result<Option<Self>> {
104        if bytes.len() < WAL_FEC_PRAGMA_HEADER_BYTES {
105            return Ok(None);
106        }
107
108        let mut magic = [0_u8; 8];
109        magic.copy_from_slice(&bytes[..8]);
110        if magic != WAL_FEC_PRAGMA_HEADER_MAGIC {
111            return Ok(None);
112        }
113
114        let version = u32::from_le_bytes(bytes[8..12].try_into().expect("fixed-length slice"));
115        if version != WAL_FEC_PRAGMA_HEADER_VERSION {
116            return Err(FrankenError::WalCorrupt {
117                detail: format!(
118                    "unsupported wal-fec pragma header version {version}, expected {WAL_FEC_PRAGMA_HEADER_VERSION}"
119                ),
120            });
121        }
122
123        let raptorq_repair_symbols = bytes[12];
124        let mut reserved = [0_u8; 3];
125        reserved.copy_from_slice(&bytes[13..16]);
126        let checksum = u64::from_le_bytes(bytes[16..24].try_into().expect("fixed-length slice"));
127
128        let header = Self {
129            magic,
130            version,
131            raptorq_repair_symbols,
132            reserved,
133            checksum,
134        };
135
136        let computed = header.compute_checksum();
137        if computed != checksum {
138            return Err(FrankenError::WalCorrupt {
139                detail: format!(
140                    "wal-fec pragma header checksum mismatch: stored {checksum:#018x}, computed {computed:#018x}"
141                ),
142            });
143        }
144
145        Ok(Some(header))
146    }
147
148    #[must_use]
149    fn to_bytes(self) -> [u8; WAL_FEC_PRAGMA_HEADER_BYTES] {
150        let mut out = [0_u8; WAL_FEC_PRAGMA_HEADER_BYTES];
151        out[..8].copy_from_slice(&self.magic);
152        out[8..12].copy_from_slice(&self.version.to_le_bytes());
153        out[12] = self.raptorq_repair_symbols;
154        out[13..16].copy_from_slice(&self.reserved);
155        out[16..24].copy_from_slice(&self.checksum.to_le_bytes());
156        out
157    }
158
159    #[must_use]
160    fn compute_checksum(&self) -> u64 {
161        let mut payload = [0_u8; 16];
162        payload[..8].copy_from_slice(&self.magic);
163        payload[8..12].copy_from_slice(&self.version.to_le_bytes());
164        payload[12] = self.raptorq_repair_symbols;
165        payload[13..16].copy_from_slice(&self.reserved);
166        xxh3_64(&payload)
167    }
168}
169
170/// Unique commit-group identifier:
171/// `group_id := (wal_salt1, wal_salt2, end_frame_no)`.
172#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
173pub struct WalFecGroupId {
174    pub wal_salt1: u32,
175    pub wal_salt2: u32,
176    pub end_frame_no: u32,
177}
178
179impl fmt::Display for WalFecGroupId {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        write!(
182            f,
183            "({}, {}, {})",
184            self.wal_salt1, self.wal_salt2, self.end_frame_no
185        )
186    }
187}
188
189/// Builder fields for [`WalFecGroupMeta`].
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct WalFecGroupMetaInit {
192    pub wal_salt1: u32,
193    pub wal_salt2: u32,
194    pub start_frame_no: u32,
195    pub end_frame_no: u32,
196    pub db_size_pages: u32,
197    pub page_size: u32,
198    pub k_source: u32,
199    pub r_repair: u32,
200    pub oti: Oti,
201    pub object_id: ObjectId,
202    pub page_numbers: Vec<u32>,
203    pub source_page_xxh3_128: Vec<Xxh3Checksum128>,
204}
205
206/// Length-prefixed metadata record preceding repair symbols.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct WalFecGroupMeta {
209    pub magic: [u8; 8],
210    pub version: u32,
211    pub wal_salt1: u32,
212    pub wal_salt2: u32,
213    pub start_frame_no: u32,
214    pub end_frame_no: u32,
215    pub db_size_pages: u32,
216    pub page_size: u32,
217    pub k_source: u32,
218    pub r_repair: u32,
219    pub oti: Oti,
220    pub object_id: ObjectId,
221    pub page_numbers: Vec<u32>,
222    pub source_page_xxh3_128: Vec<Xxh3Checksum128>,
223    pub checksum: u64,
224}
225
226impl WalFecGroupMeta {
227    /// Create and validate metadata, computing checksum automatically.
228    pub fn from_init(init: WalFecGroupMetaInit) -> Result<Self> {
229        let mut meta = Self {
230            magic: WAL_FEC_GROUP_META_MAGIC,
231            version: WAL_FEC_GROUP_META_VERSION,
232            wal_salt1: init.wal_salt1,
233            wal_salt2: init.wal_salt2,
234            start_frame_no: init.start_frame_no,
235            end_frame_no: init.end_frame_no,
236            db_size_pages: init.db_size_pages,
237            page_size: init.page_size,
238            k_source: init.k_source,
239            r_repair: init.r_repair,
240            oti: init.oti,
241            object_id: init.object_id,
242            page_numbers: init.page_numbers,
243            source_page_xxh3_128: init.source_page_xxh3_128,
244            checksum: 0,
245        };
246        meta.validate_invariants()?;
247        meta.checksum = meta.compute_checksum();
248        Ok(meta)
249    }
250
251    /// Return `(wal_salt1, wal_salt2, end_frame_no)`.
252    #[must_use]
253    pub const fn group_id(&self) -> WalFecGroupId {
254        WalFecGroupId {
255            wal_salt1: self.wal_salt1,
256            wal_salt2: self.wal_salt2,
257            end_frame_no: self.end_frame_no,
258        }
259    }
260
261    /// Verify metadata is bound to the WAL salts.
262    pub fn verify_salt_binding(&self, salts: WalSalts) -> Result<()> {
263        if self.wal_salt1 != salts.salt1 || self.wal_salt2 != salts.salt2 {
264            return Err(FrankenError::WalCorrupt {
265                detail: format!(
266                    "wal-fec salt mismatch for group {}: sidecar=({}, {}), wal=({}, {})",
267                    self.group_id(),
268                    self.wal_salt1,
269                    self.wal_salt2,
270                    salts.salt1,
271                    salts.salt2
272                ),
273            });
274        }
275        Ok(())
276    }
277
278    /// Serialize as on-disk record payload (without outer length prefix).
279    #[must_use]
280    pub fn to_record_bytes(&self) -> Vec<u8> {
281        let mut bytes = Vec::with_capacity(self.serialized_len_without_prefix());
282        bytes.extend_from_slice(&self.magic);
283        append_u32_le(&mut bytes, self.version);
284        append_u32_le(&mut bytes, self.wal_salt1);
285        append_u32_le(&mut bytes, self.wal_salt2);
286        append_u32_le(&mut bytes, self.start_frame_no);
287        append_u32_le(&mut bytes, self.end_frame_no);
288        append_u32_le(&mut bytes, self.db_size_pages);
289        append_u32_le(&mut bytes, self.page_size);
290        append_u32_le(&mut bytes, self.k_source);
291        append_u32_le(&mut bytes, self.r_repair);
292        bytes.extend_from_slice(&self.oti.to_bytes());
293        bytes.extend_from_slice(self.object_id.as_bytes());
294        for &page_number in &self.page_numbers {
295            append_u32_le(&mut bytes, page_number);
296        }
297        for &hash in &self.source_page_xxh3_128 {
298            bytes.extend_from_slice(&hash.to_le_bytes());
299        }
300        append_u64_le(&mut bytes, self.checksum);
301        bytes
302    }
303
304    /// Deserialize and validate metadata from an on-disk payload.
305    pub fn from_record_bytes(bytes: &[u8]) -> Result<Self> {
306        if bytes.len() < META_FIXED_PREFIX_BYTES + META_CHECKSUM_BYTES {
307            return Err(FrankenError::WalCorrupt {
308                detail: format!(
309                    "wal-fec group meta too short: expected at least {}, got {}",
310                    META_FIXED_PREFIX_BYTES + META_CHECKSUM_BYTES,
311                    bytes.len()
312                ),
313            });
314        }
315
316        let mut cursor = 0usize;
317        let magic = read_array::<8>(bytes, &mut cursor, "magic")?;
318        if magic != WAL_FEC_GROUP_META_MAGIC {
319            return Err(FrankenError::WalCorrupt {
320                detail: format!("invalid wal-fec magic: {magic:02x?}"),
321            });
322        }
323
324        let version = read_u32_le(bytes, &mut cursor, "version")?;
325        if version != WAL_FEC_GROUP_META_VERSION {
326            return Err(FrankenError::WalCorrupt {
327                detail: format!(
328                    "unsupported wal-fec version {version}, expected {WAL_FEC_GROUP_META_VERSION}"
329                ),
330            });
331        }
332
333        let wal_salt1 = read_u32_le(bytes, &mut cursor, "wal_salt1")?;
334        let wal_salt2 = read_u32_le(bytes, &mut cursor, "wal_salt2")?;
335        let start_frame_no = read_u32_le(bytes, &mut cursor, "start_frame_no")?;
336        let end_frame_no = read_u32_le(bytes, &mut cursor, "end_frame_no")?;
337        let db_size_pages = read_u32_le(bytes, &mut cursor, "db_size_pages")?;
338        let page_size = read_u32_le(bytes, &mut cursor, "page_size")?;
339        let k_source = read_u32_le(bytes, &mut cursor, "k_source")?;
340        let r_repair = read_u32_le(bytes, &mut cursor, "r_repair")?;
341        let oti_bytes = read_array::<22>(bytes, &mut cursor, "oti")?;
342        let oti = Oti::from_bytes(&oti_bytes).ok_or_else(|| FrankenError::WalCorrupt {
343            detail: "invalid wal-fec OTI encoding".to_owned(),
344        })?;
345        let object_id = ObjectId::from_bytes(read_array::<16>(bytes, &mut cursor, "object_id")?);
346
347        let k_source_usize = usize::try_from(k_source).map_err(|_| FrankenError::WalCorrupt {
348            detail: format!("k_source {k_source} does not fit in usize"),
349        })?;
350
351        // Prevent OOM panics from maliciously large k_source values by ensuring
352        // the buffer actually contains enough bytes for the arrays (4 bytes for pgno, 16 for hash).
353        let required_array_bytes = k_source_usize.saturating_mul(20);
354        if bytes.len().saturating_sub(cursor) < required_array_bytes {
355            return Err(FrankenError::WalCorrupt {
356                detail: format!("k_source {} exceeds remaining buffer", k_source),
357            });
358        }
359
360        let mut page_numbers = Vec::with_capacity(k_source_usize);
361        for _ in 0..k_source_usize {
362            page_numbers.push(read_u32_le(bytes, &mut cursor, "page_number")?);
363        }
364        let mut source_page_xxh3_128 = Vec::with_capacity(k_source_usize);
365        for _ in 0..k_source_usize {
366            let digest = read_array::<16>(bytes, &mut cursor, "source_page_hash")?;
367            source_page_xxh3_128.push(Xxh3Checksum128 {
368                low: u64::from_le_bytes(digest[..8].try_into().expect("8-byte low hash slice")),
369                high: u64::from_le_bytes(digest[8..].try_into().expect("8-byte high hash slice")),
370            });
371        }
372        let checksum = read_u64_le(bytes, &mut cursor, "checksum")?;
373        if cursor != bytes.len() {
374            return Err(FrankenError::WalCorrupt {
375                detail: format!(
376                    "wal-fec group meta trailing bytes: consumed {cursor}, total {}",
377                    bytes.len()
378                ),
379            });
380        }
381
382        let meta = Self {
383            magic,
384            version,
385            wal_salt1,
386            wal_salt2,
387            start_frame_no,
388            end_frame_no,
389            db_size_pages,
390            page_size,
391            k_source,
392            r_repair,
393            oti,
394            object_id,
395            page_numbers,
396            source_page_xxh3_128,
397            checksum,
398        };
399        meta.validate_invariants()?;
400        let computed = meta.compute_checksum();
401        if computed != meta.checksum {
402            return Err(FrankenError::WalCorrupt {
403                detail: format!(
404                    "wal-fec group checksum mismatch: stored {:#018x}, computed {computed:#018x}",
405                    meta.checksum
406                ),
407            });
408        }
409        Ok(meta)
410    }
411
412    fn serialized_len_without_prefix(&self) -> usize {
413        META_FIXED_PREFIX_BYTES
414            + self.page_numbers.len() * size_of::<u32>()
415            + self.source_page_xxh3_128.len() * size_of::<[u8; 16]>()
416            + META_CHECKSUM_BYTES
417    }
418
419    fn compute_checksum(&self) -> u64 {
420        xxh3_64(&self.to_record_bytes_without_checksum())
421    }
422
423    fn to_record_bytes_without_checksum(&self) -> Vec<u8> {
424        let mut bytes =
425            Vec::with_capacity(self.serialized_len_without_prefix() - META_CHECKSUM_BYTES);
426        bytes.extend_from_slice(&self.magic);
427        append_u32_le(&mut bytes, self.version);
428        append_u32_le(&mut bytes, self.wal_salt1);
429        append_u32_le(&mut bytes, self.wal_salt2);
430        append_u32_le(&mut bytes, self.start_frame_no);
431        append_u32_le(&mut bytes, self.end_frame_no);
432        append_u32_le(&mut bytes, self.db_size_pages);
433        append_u32_le(&mut bytes, self.page_size);
434        append_u32_le(&mut bytes, self.k_source);
435        append_u32_le(&mut bytes, self.r_repair);
436        bytes.extend_from_slice(&self.oti.to_bytes());
437        bytes.extend_from_slice(self.object_id.as_bytes());
438        for &page_number in &self.page_numbers {
439            append_u32_le(&mut bytes, page_number);
440        }
441        for &hash in &self.source_page_xxh3_128 {
442            bytes.extend_from_slice(&hash.to_le_bytes());
443        }
444        bytes
445    }
446
447    fn validate_invariants(&self) -> Result<()> {
448        self.validate_meta_header()?;
449        self.validate_frame_span()?;
450        if self.r_repair == 0 {
451            return Err(FrankenError::WalCorrupt {
452                detail: "r_repair must be >= 1 for wal-fec groups".to_owned(),
453            });
454        }
455        let k_source_usize =
456            usize::try_from(self.k_source).map_err(|_| FrankenError::WalCorrupt {
457                detail: format!("k_source {} does not fit in usize", self.k_source),
458            })?;
459        self.validate_array_lengths(k_source_usize)?;
460        self.validate_page_size_and_oti()?;
461        if self.db_size_pages == 0 {
462            return Err(FrankenError::WalCorrupt {
463                detail: "db_size_pages must be non-zero commit frame size".to_owned(),
464            });
465        }
466        Ok(())
467    }
468
469    fn validate_meta_header(&self) -> Result<()> {
470        if self.magic != WAL_FEC_GROUP_META_MAGIC {
471            return Err(FrankenError::WalCorrupt {
472                detail: "invalid wal-fec magic".to_owned(),
473            });
474        }
475        if self.version != WAL_FEC_GROUP_META_VERSION {
476            return Err(FrankenError::WalCorrupt {
477                detail: format!(
478                    "unsupported wal-fec meta version {} (expected {WAL_FEC_GROUP_META_VERSION})",
479                    self.version
480                ),
481            });
482        }
483        Ok(())
484    }
485
486    fn validate_frame_span(&self) -> Result<()> {
487        if self.start_frame_no == 0 {
488            return Err(FrankenError::WalCorrupt {
489                detail: "start_frame_no must be 1-based and nonzero".to_owned(),
490            });
491        }
492        if self.end_frame_no < self.start_frame_no {
493            return Err(FrankenError::WalCorrupt {
494                detail: format!(
495                    "end_frame_no {} must be >= start_frame_no {}",
496                    self.end_frame_no, self.start_frame_no
497                ),
498            });
499        }
500        let expected_k = self
501            .end_frame_no
502            .checked_sub(self.start_frame_no)
503            .and_then(|delta| delta.checked_add(1))
504            .ok_or_else(|| FrankenError::WalCorrupt {
505                detail: "frame-range overflow while validating k_source".to_owned(),
506            })?;
507        if self.k_source != expected_k {
508            return Err(FrankenError::WalCorrupt {
509                detail: format!(
510                    "k_source {} must equal frame span {} ({}..={})",
511                    self.k_source, expected_k, self.start_frame_no, self.end_frame_no
512                ),
513            });
514        }
515        Ok(())
516    }
517
518    fn validate_array_lengths(&self, k_source_usize: usize) -> Result<()> {
519        if self.page_numbers.len() != k_source_usize {
520            return Err(FrankenError::WalCorrupt {
521                detail: format!(
522                    "page_numbers length {} must equal k_source {}",
523                    self.page_numbers.len(),
524                    self.k_source
525                ),
526            });
527        }
528        if self.source_page_xxh3_128.len() != k_source_usize {
529            return Err(FrankenError::WalCorrupt {
530                detail: format!(
531                    "source_page_xxh3_128 length {} must equal k_source {}",
532                    self.source_page_xxh3_128.len(),
533                    self.k_source
534                ),
535            });
536        }
537        Ok(())
538    }
539
540    fn validate_page_size_and_oti(&self) -> Result<()> {
541        if PageSize::new(self.page_size).is_none() {
542            return Err(FrankenError::WalCorrupt {
543                detail: format!("invalid SQLite page_size {}", self.page_size),
544            });
545        }
546        if self.oti.t != self.page_size {
547            return Err(FrankenError::WalCorrupt {
548                detail: format!(
549                    "OTI.t {} must equal page_size {} for WAL source pages",
550                    self.oti.t, self.page_size
551                ),
552            });
553        }
554        let expected_f = u64::from(self.k_source)
555            .checked_mul(u64::from(self.page_size))
556            .ok_or_else(|| FrankenError::WalCorrupt {
557                detail: "overflow computing expected OTI.f".to_owned(),
558            })?;
559        if self.oti.f != expected_f {
560            return Err(FrankenError::WalCorrupt {
561                detail: format!(
562                    "OTI.f {} must equal k_source*page_size ({expected_f})",
563                    self.oti.f
564                ),
565            });
566        }
567        Ok(())
568    }
569}
570
571/// One complete append-only sidecar group.
572#[derive(Debug, Clone, PartialEq, Eq)]
573pub struct WalFecGroupRecord {
574    pub meta: WalFecGroupMeta,
575    pub repair_symbols: Vec<SymbolRecord>,
576}
577
578impl WalFecGroupRecord {
579    pub fn new(meta: WalFecGroupMeta, repair_symbols: Vec<SymbolRecord>) -> Result<Self> {
580        let group = Self {
581            meta,
582            repair_symbols,
583        };
584        group.validate_layout()?;
585        Ok(group)
586    }
587
588    fn validate_layout(&self) -> Result<()> {
589        let expected_repair =
590            usize::try_from(self.meta.r_repair).map_err(|_| FrankenError::WalCorrupt {
591                detail: format!("r_repair {} does not fit in usize", self.meta.r_repair),
592            })?;
593        if self.repair_symbols.len() != expected_repair {
594            return Err(FrankenError::WalCorrupt {
595                detail: format!(
596                    "repair symbol count {} must equal r_repair {}",
597                    self.repair_symbols.len(),
598                    self.meta.r_repair
599                ),
600            });
601        }
602        for (index, symbol) in self.repair_symbols.iter().enumerate() {
603            if symbol.object_id != self.meta.object_id {
604                return Err(FrankenError::WalCorrupt {
605                    detail: format!(
606                        "repair symbol {index} object_id mismatch: {} != {}",
607                        symbol.object_id, self.meta.object_id
608                    ),
609                });
610            }
611            if symbol.oti != self.meta.oti {
612                return Err(FrankenError::WalCorrupt {
613                    detail: format!("repair symbol {index} OTI mismatch"),
614                });
615            }
616            let expected_esi = self
617                .meta
618                .k_source
619                .checked_add(u32::try_from(index).map_err(|_| FrankenError::WalCorrupt {
620                    detail: format!("repair symbol index {index} does not fit in u32"),
621                })?)
622                .ok_or_else(|| FrankenError::WalCorrupt {
623                    detail: "repair ESI overflow".to_owned(),
624                })?;
625            if symbol.esi != expected_esi {
626                return Err(FrankenError::WalCorrupt {
627                    detail: format!(
628                        "repair symbol {index} has ESI {}, expected {expected_esi}",
629                        symbol.esi
630                    ),
631                });
632            }
633        }
634        Ok(())
635    }
636}
637
638/// Scan result for `.wal-fec` sidecar files.
639#[derive(Debug, Clone, Default, PartialEq, Eq)]
640pub struct WalFecScanResult {
641    pub groups: Vec<WalFecGroupRecord>,
642    pub truncated_tail: bool,
643}
644
645/// Why WAL-FEC recovery fell back to SQLite-compatible truncation.
646#[derive(Debug, Clone, Copy, PartialEq, Eq)]
647pub enum WalFecRecoveryFallbackReason {
648    MissingSidecarGroup,
649    SidecarUnreadable,
650    SaltMismatch,
651    InsufficientSymbols,
652    DecodeFailed,
653    DecodedPayloadMismatch,
654    /// Recovery was explicitly disabled via [`WalFecRecoveryConfig`].
655    RecoveryDisabled,
656}
657
658impl WalFecRecoveryFallbackReason {
659    #[must_use]
660    pub const fn reason_code(self) -> &'static str {
661        match self {
662            Self::MissingSidecarGroup => "missing_sidecar_group",
663            Self::SidecarUnreadable => "sidecar_unreadable",
664            Self::SaltMismatch => "salt_mismatch",
665            Self::InsufficientSymbols => "insufficient_symbols",
666            Self::DecodeFailed => "decode_failed",
667            Self::DecodedPayloadMismatch => "decoded_payload_mismatch",
668            Self::RecoveryDisabled => "recovery_disabled",
669        }
670    }
671}
672
673/// Configuration for WAL-FEC recovery behaviour.
674///
675/// When `recovery_enabled` is `false`, the recovery path immediately returns
676/// a [`WalFecRecoveryOutcome::TruncateBeforeGroup`] with
677/// [`WalFecRecoveryFallbackReason::RecoveryDisabled`], emulating what C SQLite
678/// does on WAL corruption (discard from the first checksum mismatch onward).
679///
680/// This allows the corruption demo harness to contrast:
681/// - Recovery OFF → expect data loss / truncation (C SQLite behaviour).
682/// - Recovery ON  → expect self-healing when repair symbols are sufficient.
683#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub struct WalFecRecoveryConfig {
685    /// Whether WAL-FEC recovery is attempted.  Default: `true`.
686    pub recovery_enabled: bool,
687}
688
689impl Default for WalFecRecoveryConfig {
690    fn default() -> Self {
691        Self {
692            recovery_enabled: true,
693        }
694    }
695}
696
697/// Structured log entry for a single WAL-FEC recovery attempt (bd-1w6k.2.5).
698///
699/// Captures machine-readable statistics for the corruption demo harness.
700/// The harness can validate expected outcomes and render human-readable
701/// recovery reports from these entries.
702#[derive(Debug, Clone, PartialEq, Eq)]
703#[allow(clippy::struct_excessive_bools)]
704pub struct WalFecRecoveryLog {
705    /// Identity of the commit group targeted for recovery.
706    pub group_id: WalFecGroupId,
707    /// Whether recovery was enabled for this attempt.
708    pub recovery_enabled: bool,
709    /// The final outcome: `Recovered` or `TruncateBeforeGroup`.
710    pub outcome_is_recovered: bool,
711    /// Why recovery fell back, if it did.
712    pub fallback_reason: Option<WalFecRecoveryFallbackReason>,
713    /// Source symbols that passed xxh3 verification.
714    pub validated_source_symbols: u32,
715    /// Repair symbols that passed metadata binding.
716    pub validated_repair_symbols: u32,
717    /// Symbols required for decode (= K).
718    pub required_symbols: u32,
719    /// Total usable symbols (source + repair).
720    pub available_symbols: u32,
721    /// Frame numbers that were recovered from repair symbols.
722    pub recovered_frame_nos: Vec<u32>,
723    /// Count of corrupt observations during validation.
724    pub corruption_observations: u32,
725    /// Whether the RaptorQ decoder was invoked.
726    pub decode_attempted: bool,
727    /// Whether decode succeeded.
728    pub decode_succeeded: bool,
729}
730
731/// Severity bucket for symbol-loss events in WAL-FEC recovery telemetry.
732#[derive(Debug, Clone, Copy, PartialEq, Eq)]
733pub enum WalFecRepairSeverityBucket {
734    One,
735    TwoToFive,
736    SixToTen,
737    ElevenPlus,
738}
739
740impl WalFecRepairSeverityBucket {
741    #[must_use]
742    pub const fn as_str(self) -> &'static str {
743        match self {
744            Self::One => "1",
745            Self::TwoToFive => "2-5",
746            Self::SixToTen => "6-10",
747            Self::ElevenPlus => "11+",
748        }
749    }
750}
751
752impl FromStr for WalFecRepairSeverityBucket {
753    type Err = &'static str;
754
755    fn from_str(value: &str) -> std::result::Result<Self, Self::Err> {
756        match value.trim().to_ascii_lowercase().as_str() {
757            "1" | "one" => Ok(Self::One),
758            "2-5" | "two-to-five" | "two_to_five" => Ok(Self::TwoToFive),
759            "6-10" | "six-to-ten" | "six_to_ten" => Ok(Self::SixToTen),
760            "11+" | "eleven-plus" | "eleven_plus" => Ok(Self::ElevenPlus),
761            _ => Err("unrecognized RaptorQ repair severity bucket"),
762        }
763    }
764}
765
766/// Source class used to repair a WAL commit group.
767#[derive(Debug, Clone, Copy, PartialEq, Eq)]
768pub enum WalFecRepairSource {
769    WalRepairSymbols,
770    SnapshotRepairSymbols,
771    WalAndSnapshotRepairSymbols,
772}
773
774impl WalFecRepairSource {
775    #[must_use]
776    pub const fn as_str(self) -> &'static str {
777        match self {
778            Self::WalRepairSymbols => "wal_repair_symbols",
779            Self::SnapshotRepairSymbols => "snapshot_repair_symbols",
780            Self::WalAndSnapshotRepairSymbols => "wal_and_snapshot_repair_symbols",
781        }
782    }
783}
784
785/// Witness triple proving repair integrity.
786#[derive(Debug, Clone, Copy, PartialEq, Eq)]
787pub struct WalFecRepairWitnessTriple {
788    pub corrupted_hash_blake3: [u8; 32],
789    pub repaired_hash_blake3: [u8; 32],
790    pub expected_hash_blake3: [u8; 32],
791}
792
793/// One append-only evidence card for a RaptorQ repair action.
794#[derive(Debug, Clone, PartialEq, Eq)]
795pub struct WalFecRepairEvidenceCard {
796    pub group_id: WalFecGroupId,
797    pub frame_id: u32,
798    pub wal_file_offset_bytes: Option<u64>,
799    pub monotonic_timestamp_ns: u64,
800    pub wall_clock_unix_ns: u64,
801    pub corruption_signature_blake3: [u8; 32],
802    pub bit_error_pattern: Option<String>,
803    pub repair_source: WalFecRepairSource,
804    pub symbols_used: u32,
805    pub validated_source_symbols: u32,
806    pub validated_repair_symbols: u32,
807    pub required_symbols: u32,
808    pub available_symbols: u32,
809    pub witness: WalFecRepairWitnessTriple,
810    pub repair_latency_ns: u64,
811    pub confidence_per_mille: u32,
812    pub severity_bucket: WalFecRepairSeverityBucket,
813    pub ledger_epoch: u64,
814    pub chain_hash: [u8; 32],
815}
816
817fn hex_encode_32(bytes: [u8; 32]) -> String {
818    use std::fmt::Write as _;
819
820    let mut encoded = String::with_capacity(64);
821    for byte in bytes {
822        let _ = write!(&mut encoded, "{byte:02x}");
823    }
824    encoded
825}
826
827impl WalFecRepairEvidenceCard {
828    #[must_use]
829    pub fn chain_hash_hex(&self) -> String {
830        hex_encode_32(self.chain_hash)
831    }
832
833    #[must_use]
834    pub fn corruption_signature_hex(&self) -> String {
835        hex_encode_32(self.corruption_signature_blake3)
836    }
837}
838
839/// Query filters for repair evidence cards.
840#[derive(Debug, Clone, Default, PartialEq, Eq)]
841pub struct WalFecRepairEvidenceQuery {
842    pub frame_id: Option<u32>,
843    pub severity_bucket: Option<WalFecRepairSeverityBucket>,
844    pub wall_clock_start_ns: Option<u64>,
845    pub wall_clock_end_ns: Option<u64>,
846    pub limit: Option<usize>,
847}
848
849/// Severity histogram used by [`WalFecRepairMetricsSnapshot`].
850#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
851pub struct WalFecRepairSeverityHistogram {
852    pub one: u64,
853    pub two_to_five: u64,
854    pub six_to_ten: u64,
855    pub eleven_plus: u64,
856}
857
858impl WalFecRepairSeverityHistogram {
859    fn bump(&mut self, bucket: WalFecRepairSeverityBucket) {
860        match bucket {
861            WalFecRepairSeverityBucket::One => {
862                self.one = self.one.saturating_add(1);
863            }
864            WalFecRepairSeverityBucket::TwoToFive => {
865                self.two_to_five = self.two_to_five.saturating_add(1);
866            }
867            WalFecRepairSeverityBucket::SixToTen => {
868                self.six_to_ten = self.six_to_ten.saturating_add(1);
869            }
870            WalFecRepairSeverityBucket::ElevenPlus => {
871                self.eleven_plus = self.eleven_plus.saturating_add(1);
872            }
873        }
874    }
875}
876
877/// One structured RaptorQ repair event.
878#[derive(Debug, Clone, PartialEq, Eq)]
879pub struct WalFecRepairEvent {
880    /// Full WAL-FEC group identity.
881    pub group_id: WalFecGroupId,
882    /// Convenience key for dashboards (same as `group_id.end_frame_no`).
883    pub frame_id: u32,
884    /// Number of source symbols lost.
885    pub symbols_lost: u32,
886    /// Number of symbols considered during decode (bounded by `K`).
887    pub symbols_used: u32,
888    /// Whether the recovery attempt produced a repaired group.
889    pub repair_success: bool,
890    /// Recovery latency in nanoseconds.
891    pub latency_ns: u64,
892    /// Estimated budget utilization percentage (0-100).
893    pub budget_utilization_pct: u32,
894    /// Severity bucket derived from `symbols_lost`.
895    pub severity_bucket: WalFecRepairSeverityBucket,
896}
897
898/// Snapshot of RaptorQ repair telemetry.
899#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
900pub struct WalFecRepairMetricsSnapshot {
901    pub repairs_total: u64,
902    pub repairs_failed: u64,
903    pub symbols_reclaimed: u64,
904    pub budget_utilization_pct: u32,
905    pub wal_health_score: u32,
906    pub severity_histogram: WalFecRepairSeverityHistogram,
907}
908
909#[derive(Debug, Default)]
910struct WalFecRepairTelemetryState {
911    repairs_total: u64,
912    repairs_failed: u64,
913    symbols_reclaimed: u64,
914    budget_utilization_sum: u64,
915    budget_utilization_count: u64,
916    severity_histogram: WalFecRepairSeverityHistogram,
917    events: VecDeque<WalFecRepairEvent>,
918    evidence_cards: VecDeque<WalFecRepairEvidenceCard>,
919    evidence_chain_tip: [u8; 32],
920    next_evidence_epoch: u64,
921}
922
923static RAPTORQ_REPAIR_TELEMETRY: OnceLock<Mutex<WalFecRepairTelemetryState>> = OnceLock::new();
924
925fn raptorq_repair_telemetry() -> &'static Mutex<WalFecRepairTelemetryState> {
926    RAPTORQ_REPAIR_TELEMETRY.get_or_init(|| Mutex::new(WalFecRepairTelemetryState::default()))
927}
928
929fn lock_raptorq_repair_telemetry() -> MutexGuard<'static, WalFecRepairTelemetryState> {
930    match raptorq_repair_telemetry().lock() {
931        Ok(guard) => guard,
932        Err(poisoned) => {
933            warn!("raptorq repair telemetry lock poisoned; recovering poisoned state");
934            poisoned.into_inner()
935        }
936    }
937}
938
939fn severity_bucket_for_loss(symbols_lost: u32) -> WalFecRepairSeverityBucket {
940    match symbols_lost {
941        0 | 1 => WalFecRepairSeverityBucket::One,
942        2..=5 => WalFecRepairSeverityBucket::TwoToFive,
943        6..=10 => WalFecRepairSeverityBucket::SixToTen,
944        _ => WalFecRepairSeverityBucket::ElevenPlus,
945    }
946}
947
948fn compute_health_score(state: &WalFecRepairTelemetryState) -> u32 {
949    if state.repairs_total == 0 {
950        return 100;
951    }
952
953    let failure_penalty = state.repairs_failed.saturating_mul(20).min(70);
954    let severity_penalty = state
955        .severity_histogram
956        .one
957        .saturating_mul(1)
958        .saturating_add(state.severity_histogram.two_to_five.saturating_mul(4))
959        .saturating_add(state.severity_histogram.six_to_ten.saturating_mul(8))
960        .saturating_add(state.severity_histogram.eleven_plus.saturating_mul(12))
961        .min(30);
962    let avg_budget_utilization = state
963        .budget_utilization_sum
964        .checked_div(state.budget_utilization_count)
965        .unwrap_or(0);
966    let utilization_penalty = if avg_budget_utilization >= 80 {
967        15
968    } else if avg_budget_utilization >= 60 {
969        10
970    } else if avg_budget_utilization >= 40 {
971        5
972    } else {
973        0
974    };
975
976    let total_penalty = failure_penalty
977        .saturating_add(severity_penalty)
978        .saturating_add(utilization_penalty)
979        .min(100);
980    let score = 100_u64.saturating_sub(total_penalty);
981    u32::try_from(score).unwrap_or(0)
982}
983
984fn build_repair_event(log: &WalFecRecoveryLog, latency: Duration) -> Option<WalFecRepairEvent> {
985    let symbols_lost = log
986        .required_symbols
987        .saturating_sub(log.validated_source_symbols);
988    let repair_activated =
989        symbols_lost > 0 || log.decode_attempted || log.fallback_reason.is_some();
990    if !repair_activated {
991        return None;
992    }
993
994    let symbols_used = log.available_symbols.min(log.required_symbols);
995    let repair_budget = log.validated_repair_symbols.max(1);
996    let utilization_num = u64::from(symbols_lost)
997        .saturating_mul(100)
998        .saturating_add(u64::from(repair_budget).saturating_sub(1));
999    let utilization = utilization_num / u64::from(repair_budget);
1000    let budget_utilization_pct = u32::try_from(utilization.min(100)).unwrap_or(100);
1001    let latency_ns = u64::try_from(latency.as_nanos()).unwrap_or(u64::MAX);
1002    let severity_bucket = severity_bucket_for_loss(symbols_lost);
1003    let repair_success =
1004        log.outcome_is_recovered && (log.decode_succeeded || !log.decode_attempted);
1005
1006    Some(WalFecRepairEvent {
1007        group_id: log.group_id,
1008        frame_id: log.group_id.end_frame_no,
1009        symbols_lost,
1010        symbols_used,
1011        repair_success,
1012        latency_ns,
1013        budget_utilization_pct,
1014        severity_bucket,
1015    })
1016}
1017
1018const fn recovery_outcome_code(log: &WalFecRecoveryLog) -> &'static str {
1019    if log.outcome_is_recovered {
1020        "recovered"
1021    } else {
1022        "truncate_before_group"
1023    }
1024}
1025
1026const fn recovery_reason_code_for_log(log: &WalFecRecoveryLog) -> &'static str {
1027    if let Some(reason) = log.fallback_reason {
1028        return reason.reason_code();
1029    }
1030    if log.decode_attempted {
1031        return "decode_recovered";
1032    }
1033    "intact_fast_path"
1034}
1035
1036const fn repair_attempt_for_log(log: &WalFecRecoveryLog) -> bool {
1037    log.recovery_enabled
1038        && (log.decode_attempted
1039            || log.fallback_reason.is_some()
1040            || log.corruption_observations > 0
1041            || log.validated_source_symbols < log.required_symbols)
1042}
1043
1044fn symbol_state_for_log(log: &WalFecRecoveryLog) -> String {
1045    format!(
1046        "source_validated={}/{};repair_validated={};available={};required={};decode_attempted={};decode_succeeded={}",
1047        log.validated_source_symbols,
1048        log.required_symbols,
1049        log.validated_repair_symbols,
1050        log.available_symbols,
1051        log.required_symbols,
1052        log.decode_attempted,
1053        log.decode_succeeded
1054    )
1055}
1056
1057fn monotonic_now_ns() -> u64 {
1058    static START: OnceLock<Instant> = OnceLock::new();
1059    let elapsed = START.get_or_init(Instant::now).elapsed().as_nanos();
1060    u64::try_from(elapsed).unwrap_or(u64::MAX)
1061}
1062
1063fn wall_clock_unix_ns() -> u64 {
1064    let Ok(delta) = SystemTime::now().duration_since(UNIX_EPOCH) else {
1065        return 0;
1066    };
1067    u64::try_from(delta.as_nanos()).unwrap_or(u64::MAX)
1068}
1069
1070const fn fallback_reason_tag(reason: Option<WalFecRecoveryFallbackReason>) -> u8 {
1071    match reason {
1072        None => 0,
1073        Some(WalFecRecoveryFallbackReason::MissingSidecarGroup) => 1,
1074        Some(WalFecRecoveryFallbackReason::SidecarUnreadable) => 2,
1075        Some(WalFecRecoveryFallbackReason::SaltMismatch) => 3,
1076        Some(WalFecRecoveryFallbackReason::InsufficientSymbols) => 4,
1077        Some(WalFecRecoveryFallbackReason::DecodeFailed) => 5,
1078        Some(WalFecRecoveryFallbackReason::DecodedPayloadMismatch) => 6,
1079        Some(WalFecRecoveryFallbackReason::RecoveryDisabled) => 7,
1080    }
1081}
1082
1083fn repair_source_for_log(log: &WalFecRecoveryLog) -> WalFecRepairSource {
1084    if log.validated_repair_symbols > 0 && log.validated_source_symbols > 0 {
1085        return WalFecRepairSource::WalAndSnapshotRepairSymbols;
1086    }
1087    if log.validated_repair_symbols > 0 {
1088        return WalFecRepairSource::WalRepairSymbols;
1089    }
1090    WalFecRepairSource::SnapshotRepairSymbols
1091}
1092
1093fn confidence_per_mille(required_symbols: u32, available_symbols: u32) -> u32 {
1094    if required_symbols == 0 {
1095        return 0;
1096    }
1097    let scaled = u64::from(available_symbols)
1098        .saturating_mul(1_000)
1099        .checked_div(u64::from(required_symbols))
1100        .unwrap_or(0);
1101    u32::try_from(scaled).unwrap_or(u32::MAX)
1102}
1103
1104fn blake3_hash_to_array(hasher: &blake3::Hasher) -> [u8; 32] {
1105    let mut output = [0_u8; 32];
1106    output.copy_from_slice(hasher.finalize().as_bytes());
1107    output
1108}
1109
1110fn compute_corruption_signature(log: &WalFecRecoveryLog, event: &WalFecRepairEvent) -> [u8; 32] {
1111    let mut hasher = blake3::Hasher::new();
1112    hasher.update(b"fsqlite:wal_fec:repair_corruption_signature:v1");
1113    hasher.update(&log.group_id.wal_salt1.to_le_bytes());
1114    hasher.update(&log.group_id.wal_salt2.to_le_bytes());
1115    hasher.update(&log.group_id.end_frame_no.to_le_bytes());
1116    hasher.update(&event.frame_id.to_le_bytes());
1117    hasher.update(&event.symbols_lost.to_le_bytes());
1118    hasher.update(&log.validated_source_symbols.to_le_bytes());
1119    hasher.update(&log.validated_repair_symbols.to_le_bytes());
1120    hasher.update(&log.required_symbols.to_le_bytes());
1121    hasher.update(&log.available_symbols.to_le_bytes());
1122    hasher.update(&log.corruption_observations.to_le_bytes());
1123    hasher.update(&[fallback_reason_tag(log.fallback_reason)]);
1124    blake3_hash_to_array(&hasher)
1125}
1126
1127fn compute_witness_hash(
1128    label: &[u8],
1129    log: &WalFecRecoveryLog,
1130    event: &WalFecRepairEvent,
1131    corruption_signature: [u8; 32],
1132) -> [u8; 32] {
1133    let mut hasher = blake3::Hasher::new();
1134    hasher.update(b"fsqlite:wal_fec:repair_witness:v1");
1135    hasher.update(label);
1136    hasher.update(&corruption_signature);
1137    hasher.update(&log.group_id.wal_salt1.to_le_bytes());
1138    hasher.update(&log.group_id.wal_salt2.to_le_bytes());
1139    hasher.update(&log.group_id.end_frame_no.to_le_bytes());
1140    hasher.update(&event.symbols_used.to_le_bytes());
1141    hasher.update(&event.budget_utilization_pct.to_le_bytes());
1142    hasher.update(&log.required_symbols.to_le_bytes());
1143    hasher.update(&log.available_symbols.to_le_bytes());
1144    hasher.update(&[u8::from(log.outcome_is_recovered)]);
1145    hasher.update(&[u8::from(log.decode_succeeded)]);
1146    blake3_hash_to_array(&hasher)
1147}
1148
1149fn compute_evidence_chain_hash(
1150    previous_tip: [u8; 32],
1151    card: &WalFecRepairEvidenceCard,
1152) -> [u8; 32] {
1153    let mut hasher = blake3::Hasher::new();
1154    hasher.update(b"fsqlite:wal_fec:repair_evidence_chain:v1");
1155    hasher.update(&previous_tip);
1156    hasher.update(&card.group_id.wal_salt1.to_le_bytes());
1157    hasher.update(&card.group_id.wal_salt2.to_le_bytes());
1158    hasher.update(&card.group_id.end_frame_no.to_le_bytes());
1159    hasher.update(&card.frame_id.to_le_bytes());
1160    hasher.update(&card.wal_file_offset_bytes.unwrap_or(u64::MAX).to_le_bytes());
1161    hasher.update(&card.monotonic_timestamp_ns.to_le_bytes());
1162    hasher.update(&card.wall_clock_unix_ns.to_le_bytes());
1163    hasher.update(&card.corruption_signature_blake3);
1164    hasher.update(
1165        card.bit_error_pattern
1166            .as_deref()
1167            .unwrap_or_default()
1168            .as_bytes(),
1169    );
1170    hasher.update(card.repair_source.as_str().as_bytes());
1171    hasher.update(&card.symbols_used.to_le_bytes());
1172    hasher.update(&card.validated_source_symbols.to_le_bytes());
1173    hasher.update(&card.validated_repair_symbols.to_le_bytes());
1174    hasher.update(&card.required_symbols.to_le_bytes());
1175    hasher.update(&card.available_symbols.to_le_bytes());
1176    hasher.update(&card.witness.corrupted_hash_blake3);
1177    hasher.update(&card.witness.repaired_hash_blake3);
1178    hasher.update(&card.witness.expected_hash_blake3);
1179    hasher.update(&card.repair_latency_ns.to_le_bytes());
1180    hasher.update(&card.confidence_per_mille.to_le_bytes());
1181    hasher.update(card.severity_bucket.as_str().as_bytes());
1182    hasher.update(&card.ledger_epoch.to_le_bytes());
1183    blake3_hash_to_array(&hasher)
1184}
1185
1186fn build_repair_evidence_card(
1187    log: &WalFecRecoveryLog,
1188    event: &WalFecRepairEvent,
1189    latency: Duration,
1190    previous_chain_tip: [u8; 32],
1191    ledger_epoch: u64,
1192) -> WalFecRepairEvidenceCard {
1193    let corruption_signature = compute_corruption_signature(log, event);
1194    let witness = WalFecRepairWitnessTriple {
1195        corrupted_hash_blake3: compute_witness_hash(b"corrupted", log, event, corruption_signature),
1196        repaired_hash_blake3: compute_witness_hash(b"repaired", log, event, corruption_signature),
1197        expected_hash_blake3: compute_witness_hash(b"expected", log, event, corruption_signature),
1198    };
1199    let repair_latency_ns = u64::try_from(latency.as_nanos()).unwrap_or(u64::MAX);
1200    let bit_error_pattern = if log.corruption_observations > 0 {
1201        Some(format!(
1202            "corruption_observations={}",
1203            log.corruption_observations
1204        ))
1205    } else {
1206        None
1207    };
1208
1209    let mut card = WalFecRepairEvidenceCard {
1210        group_id: log.group_id,
1211        frame_id: event.frame_id,
1212        wal_file_offset_bytes: None,
1213        monotonic_timestamp_ns: monotonic_now_ns(),
1214        wall_clock_unix_ns: wall_clock_unix_ns(),
1215        corruption_signature_blake3: corruption_signature,
1216        bit_error_pattern,
1217        repair_source: repair_source_for_log(log),
1218        symbols_used: event.symbols_used,
1219        validated_source_symbols: log.validated_source_symbols,
1220        validated_repair_symbols: log.validated_repair_symbols,
1221        required_symbols: log.required_symbols,
1222        available_symbols: log.available_symbols,
1223        witness,
1224        repair_latency_ns,
1225        confidence_per_mille: confidence_per_mille(log.required_symbols, log.available_symbols),
1226        severity_bucket: event.severity_bucket,
1227        ledger_epoch,
1228        chain_hash: [0_u8; 32],
1229    };
1230    card.chain_hash = compute_evidence_chain_hash(previous_chain_tip, &card);
1231    card
1232}
1233
1234/// Record one recovery log into the global telemetry ledger.
1235///
1236/// This is non-blocking aside from a short in-process mutex critical section.
1237pub fn record_raptorq_recovery_log(log: &WalFecRecoveryLog, latency: Duration) {
1238    let Some(event) = build_repair_event(log, latency) else {
1239        return;
1240    };
1241
1242    let mut state = lock_raptorq_repair_telemetry();
1243    state.repairs_total = state.repairs_total.saturating_add(1);
1244    if event.repair_success {
1245        state.symbols_reclaimed = state
1246            .symbols_reclaimed
1247            .saturating_add(u64::from(event.symbols_lost));
1248    } else {
1249        state.repairs_failed = state.repairs_failed.saturating_add(1);
1250    }
1251    state.budget_utilization_sum = state
1252        .budget_utilization_sum
1253        .saturating_add(u64::from(event.budget_utilization_pct));
1254    state.budget_utilization_count = state.budget_utilization_count.saturating_add(1);
1255    state.severity_histogram.bump(event.severity_bucket);
1256
1257    if state.events.len() == RAPTORQ_REPAIR_EVENT_CAPACITY {
1258        let _ = state.events.pop_front();
1259    }
1260    state.events.push_back(event.clone());
1261
1262    let ledger_epoch = state.next_evidence_epoch.max(1);
1263    let evidence_card =
1264        build_repair_evidence_card(log, &event, latency, state.evidence_chain_tip, ledger_epoch);
1265    state.next_evidence_epoch = ledger_epoch.saturating_add(1);
1266    state.evidence_chain_tip = evidence_card.chain_hash;
1267    if state.evidence_cards.len() == RAPTORQ_REPAIR_EVIDENCE_CAPACITY {
1268        let _ = state.evidence_cards.pop_front();
1269    }
1270    state.evidence_cards.push_back(evidence_card);
1271}
1272
1273/// Snapshot aggregate RaptorQ repair telemetry for dashboard/PRAGMA surfaces.
1274#[must_use]
1275pub fn raptorq_repair_metrics_snapshot() -> WalFecRepairMetricsSnapshot {
1276    let state = lock_raptorq_repair_telemetry();
1277    let mean_budget_utilization = state
1278        .budget_utilization_sum
1279        .checked_div(state.budget_utilization_count)
1280        .unwrap_or(0);
1281    let budget_utilization_pct = u32::try_from(mean_budget_utilization).unwrap_or(u32::MAX);
1282    WalFecRepairMetricsSnapshot {
1283        repairs_total: state.repairs_total,
1284        repairs_failed: state.repairs_failed,
1285        symbols_reclaimed: state.symbols_reclaimed,
1286        budget_utilization_pct,
1287        wal_health_score: compute_health_score(&state),
1288        severity_histogram: state.severity_histogram,
1289    }
1290}
1291
1292/// Snapshot recent RaptorQ repair events.
1293///
1294/// `limit = 0` returns the full retained ledger.
1295#[must_use]
1296pub fn raptorq_repair_events_snapshot(limit: usize) -> Vec<WalFecRepairEvent> {
1297    let mut events = {
1298        let state = lock_raptorq_repair_telemetry();
1299        let take = if limit == 0 {
1300            state.events.len()
1301        } else {
1302            limit.min(state.events.len())
1303        };
1304        state
1305            .events
1306            .iter()
1307            .rev()
1308            .take(take)
1309            .cloned()
1310            .collect::<Vec<_>>()
1311    };
1312    events.reverse();
1313    events
1314}
1315
1316/// Snapshot recent RaptorQ repair evidence cards.
1317///
1318/// `limit = 0` returns all retained cards.
1319#[must_use]
1320pub fn raptorq_repair_evidence_snapshot(limit: usize) -> Vec<WalFecRepairEvidenceCard> {
1321    let mut cards = {
1322        let state = lock_raptorq_repair_telemetry();
1323        let take = if limit == 0 {
1324            state.evidence_cards.len()
1325        } else {
1326            limit.min(state.evidence_cards.len())
1327        };
1328        state
1329            .evidence_cards
1330            .iter()
1331            .rev()
1332            .take(take)
1333            .cloned()
1334            .collect::<Vec<_>>()
1335    };
1336    cards.reverse();
1337    cards
1338}
1339
1340/// Query RaptorQ repair evidence cards by page/time/severity.
1341#[must_use]
1342pub fn query_raptorq_repair_evidence(
1343    query: &WalFecRepairEvidenceQuery,
1344) -> Vec<WalFecRepairEvidenceCard> {
1345    let mut cards = {
1346        let state = lock_raptorq_repair_telemetry();
1347        state
1348            .evidence_cards
1349            .iter()
1350            .filter(|card| {
1351                query
1352                    .frame_id
1353                    .is_none_or(|frame_id| card.frame_id == frame_id)
1354            })
1355            .filter(|card| {
1356                query
1357                    .severity_bucket
1358                    .is_none_or(|severity| card.severity_bucket == severity)
1359            })
1360            .filter(|card| {
1361                query
1362                    .wall_clock_start_ns
1363                    .is_none_or(|start| card.wall_clock_unix_ns >= start)
1364            })
1365            .filter(|card| {
1366                query
1367                    .wall_clock_end_ns
1368                    .is_none_or(|end| card.wall_clock_unix_ns <= end)
1369            })
1370            .cloned()
1371            .collect::<Vec<_>>()
1372    };
1373
1374    if let Some(limit) = query.limit {
1375        if limit > 0 && cards.len() > limit {
1376            let keep_from = cards.len() - limit;
1377            cards.drain(..keep_from);
1378        }
1379    }
1380
1381    cards
1382}
1383
1384/// Reset all global RaptorQ repair telemetry.
1385pub fn reset_raptorq_repair_telemetry() {
1386    let mut state = lock_raptorq_repair_telemetry();
1387    *state = WalFecRepairTelemetryState::default();
1388}
1389
1390/// Recovery audit artifact for a single WAL-FEC group attempt (§3.4.1).
1391#[derive(Debug, Clone, PartialEq, Eq)]
1392pub struct WalFecDecodeProof {
1393    pub group_id: WalFecGroupId,
1394    pub required_symbols: u32,
1395    pub available_symbols: u32,
1396    pub validated_source_symbols: u32,
1397    pub validated_repair_symbols: u32,
1398    /// Count of repair symbols rejected as corrupt/mismatched during verification.
1399    pub corruption_observations: u32,
1400    pub decode_attempted: bool,
1401    pub decode_succeeded: bool,
1402    pub recovered_frame_nos: Vec<u32>,
1403    pub fallback_reason: Option<WalFecRecoveryFallbackReason>,
1404}
1405
1406/// Successful recovery payload for one commit group.
1407#[derive(Debug, Clone, PartialEq, Eq)]
1408pub struct WalFecRecoveredGroup {
1409    pub meta: WalFecGroupMeta,
1410    pub recovered_pages: Vec<Vec<u8>>,
1411    pub recovered_frame_nos: Vec<u32>,
1412    pub db_size_pages: u32,
1413    pub decode_proof: WalFecDecodeProof,
1414}
1415
1416/// Final action for a WAL-FEC recovery attempt.
1417#[derive(Debug, Clone, PartialEq, Eq)]
1418pub enum WalFecRecoveryOutcome {
1419    Recovered(WalFecRecoveredGroup),
1420    TruncateBeforeGroup {
1421        truncate_before_frame_no: u32,
1422        decode_proof: WalFecDecodeProof,
1423    },
1424}
1425
1426/// Candidate WAL source frame payload read from `.wal`.
1427#[derive(Debug, Clone, PartialEq, Eq)]
1428pub struct WalFrameCandidate {
1429    pub frame_no: u32,
1430    pub page_data: Vec<u8>,
1431}
1432
1433/// Default repair pipeline queue capacity. Configurable via
1434/// `WalFecRepairPipelineConfig::queue_capacity` at construction time.
1435const DEFAULT_REPAIR_PIPELINE_QUEUE_CAPACITY: usize = 64;
1436
1437/// Pipeline configuration for asynchronous WAL-FEC repair generation.
1438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1439pub struct WalFecRepairPipelineConfig {
1440    /// Maximum queued work items before backpressure.
1441    ///
1442    /// This is the bounded async repair-latency window in commit-count units.
1443    pub queue_capacity: usize,
1444    /// Optional deterministic delay per generated repair symbol (test hook).
1445    pub per_symbol_delay: Duration,
1446}
1447
1448impl Default for WalFecRepairPipelineConfig {
1449    fn default() -> Self {
1450        Self {
1451            queue_capacity: DEFAULT_REPAIR_PIPELINE_QUEUE_CAPACITY,
1452            per_symbol_delay: Duration::ZERO,
1453        }
1454    }
1455}
1456
1457/// A single asynchronous WAL-FEC repair-generation work item.
1458#[derive(Debug, Clone, PartialEq, Eq)]
1459pub struct WalFecRepairWorkItem {
1460    pub sidecar_path: PathBuf,
1461    pub meta: WalFecGroupMeta,
1462    pub source_pages: Vec<Vec<u8>>,
1463}
1464
1465impl WalFecRepairWorkItem {
1466    pub fn new(
1467        sidecar_path: impl Into<PathBuf>,
1468        meta: WalFecGroupMeta,
1469        source_pages: Vec<Vec<u8>>,
1470    ) -> Result<Self> {
1471        validate_source_pages(&meta, &source_pages)?;
1472        Ok(Self {
1473            sidecar_path: sidecar_path.into(),
1474            meta,
1475            source_pages,
1476        })
1477    }
1478}
1479
1480/// Snapshot of asynchronous WAL-FEC pipeline counters.
1481#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1482pub struct WalFecRepairPipelineStats {
1483    pub pending_jobs: usize,
1484    pub completed_jobs: usize,
1485    pub failed_jobs: usize,
1486    pub canceled_jobs: usize,
1487    pub max_pending_jobs: usize,
1488}
1489
1490#[derive(Debug)]
1491enum WalFecPipelineMessage {
1492    Work(WalFecRepairWorkItem),
1493}
1494
1495#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1496enum WalFecWorkOutcome {
1497    Completed,
1498    Canceled,
1499}
1500
1501#[derive(Debug, Clone)]
1502struct WalFecRepairWorkerState {
1503    cancel_flag: Arc<AtomicBool>,
1504    pending_jobs: Arc<AtomicUsize>,
1505    completed_jobs: Arc<AtomicUsize>,
1506    failed_jobs: Arc<AtomicUsize>,
1507    canceled_jobs: Arc<AtomicUsize>,
1508    worker_failure: Arc<Mutex<Option<String>>>,
1509}
1510
1511/// Background worker that computes and appends WAL-FEC repair symbols.
1512pub struct WalFecRepairPipeline {
1513    sender: Option<mpsc::Sender<WalFecPipelineMessage>>,
1514    cancel_flag: Arc<AtomicBool>,
1515    pending_jobs: Arc<AtomicUsize>,
1516    completed_jobs: Arc<AtomicUsize>,
1517    failed_jobs: Arc<AtomicUsize>,
1518    canceled_jobs: Arc<AtomicUsize>,
1519    max_pending_jobs: Arc<AtomicUsize>,
1520    worker_failure: Arc<Mutex<Option<String>>>,
1521    worker: Option<AsyncJoinHandle<()>>,
1522}
1523
1524impl WalFecRepairPipeline {
1525    /// Start the pipeline worker on an existing asupersync runtime.
1526    pub fn start(
1527        runtime: &RuntimeHandle,
1528        parent_cx: &Cx,
1529        config: WalFecRepairPipelineConfig,
1530    ) -> Result<Self> {
1531        if config.queue_capacity == 0 {
1532            return Err(FrankenError::WalCorrupt {
1533                detail: "wal-fec repair pipeline queue_capacity must be >= 1".to_owned(),
1534            });
1535        }
1536
1537        let (tx, rx) = mpsc::channel(config.queue_capacity);
1538        let cancel_flag = Arc::new(AtomicBool::new(false));
1539        let pending_jobs = Arc::new(AtomicUsize::new(0));
1540        let completed_jobs = Arc::new(AtomicUsize::new(0));
1541        let failed_jobs = Arc::new(AtomicUsize::new(0));
1542        let canceled_jobs = Arc::new(AtomicUsize::new(0));
1543        let max_pending_jobs = Arc::new(AtomicUsize::new(0));
1544        let worker_failure = Arc::new(Mutex::new(None));
1545        let worker_state = WalFecRepairWorkerState {
1546            cancel_flag: Arc::clone(&cancel_flag),
1547            pending_jobs: Arc::clone(&pending_jobs),
1548            completed_jobs: Arc::clone(&completed_jobs),
1549            failed_jobs: Arc::clone(&failed_jobs),
1550            canceled_jobs: Arc::clone(&canceled_jobs),
1551            worker_failure: Arc::clone(&worker_failure),
1552        };
1553
1554        let worker_cx = parent_cx.create_child();
1555        let worker_handle = runtime
1556            .try_spawn(run_repair_pipeline_worker(
1557                rx,
1558                worker_state,
1559                worker_cx,
1560                config.per_symbol_delay,
1561            ))
1562            .map_err(|err| FrankenError::WalCorrupt {
1563                detail: format!("failed to spawn wal-fec repair worker task: {err}"),
1564            })?;
1565
1566        Ok(Self {
1567            sender: Some(tx),
1568            cancel_flag,
1569            pending_jobs,
1570            completed_jobs,
1571            failed_jobs,
1572            canceled_jobs,
1573            max_pending_jobs,
1574            worker_failure,
1575            worker: Some(worker_handle),
1576        })
1577    }
1578
1579    /// Queue a new repair-generation work item without blocking commit path.
1580    pub fn enqueue(&self, work_item: WalFecRepairWorkItem) -> Result<()> {
1581        if self.cancel_flag.load(Ordering::SeqCst) {
1582            return Err(FrankenError::WalCorrupt {
1583                detail: "wal-fec repair pipeline is canceled".to_owned(),
1584            });
1585        }
1586        let sender = self
1587            .sender
1588            .as_ref()
1589            .ok_or_else(|| FrankenError::WalCorrupt {
1590                detail: "wal-fec repair pipeline is shut down".to_owned(),
1591            })?;
1592
1593        let pending_after = self.pending_jobs.fetch_add(1, Ordering::SeqCst) + 1;
1594        update_max_pending(&self.max_pending_jobs, pending_after);
1595
1596        match sender.try_send(WalFecPipelineMessage::Work(work_item)) {
1597            Ok(()) => Ok(()),
1598            Err(mpsc::SendError::Full(_)) => {
1599                self.pending_jobs.fetch_sub(1, Ordering::SeqCst);
1600                Err(FrankenError::WalCorrupt {
1601                    detail: "wal-fec repair pipeline queue full".to_owned(),
1602                })
1603            }
1604            Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
1605                self.pending_jobs.fetch_sub(1, Ordering::SeqCst);
1606                Err(FrankenError::WalCorrupt {
1607                    detail: "wal-fec repair pipeline worker is disconnected".to_owned(),
1608                })
1609            }
1610        }
1611    }
1612
1613    /// Request cancellation for queued/in-flight work.
1614    pub fn cancel(&self) {
1615        self.cancel_flag.store(true, Ordering::SeqCst);
1616    }
1617
1618    /// Wait until queue drains or timeout expires.
1619    #[must_use]
1620    pub async fn flush(&self, cx: &Cx, timeout: Duration) -> bool {
1621        let deadline = Instant::now() + timeout;
1622        loop {
1623            if lock_unpoisoned(&self.worker_failure).is_some() {
1624                return false;
1625            }
1626            if self.pending_jobs.load(Ordering::SeqCst) == 0 {
1627                return true;
1628            }
1629            if Instant::now() >= deadline {
1630                return false;
1631            }
1632            if cx.checkpoint().is_err() {
1633                return false;
1634            }
1635            yield_now().await;
1636        }
1637    }
1638
1639    /// Read current counters.
1640    #[must_use]
1641    pub fn stats(&self) -> WalFecRepairPipelineStats {
1642        WalFecRepairPipelineStats {
1643            pending_jobs: self.pending_jobs.load(Ordering::SeqCst),
1644            completed_jobs: self.completed_jobs.load(Ordering::SeqCst),
1645            failed_jobs: self.failed_jobs.load(Ordering::SeqCst),
1646            canceled_jobs: self.canceled_jobs.load(Ordering::SeqCst),
1647            max_pending_jobs: self.max_pending_jobs.load(Ordering::SeqCst),
1648        }
1649    }
1650
1651    /// Stop the worker and await task completion.
1652    ///
1653    /// This is a graceful shutdown: queued work is drained before the worker
1654    /// exits. To force immediate cancellation, call [`Self::cancel`] first.
1655    pub async fn shutdown(&mut self, cx: &Cx) -> Result<WalFecRepairPipelineStats> {
1656        {
1657            let _mask = cx.masked();
1658            self.sender.take();
1659        }
1660        if let Some(worker) = self.worker.take() {
1661            worker.await;
1662        }
1663        let worker_failure_detail = lock_unpoisoned(&self.worker_failure).clone();
1664        if let Some(detail) = worker_failure_detail {
1665            return Err(FrankenError::WalCorrupt { detail });
1666        }
1667        Ok(self.stats())
1668    }
1669}
1670
1671impl Drop for WalFecRepairPipeline {
1672    fn drop(&mut self) {
1673        if self.worker.is_some() {
1674            self.cancel();
1675            self.sender.take();
1676            warn!(
1677                pending_jobs = self.pending_jobs.load(Ordering::SeqCst),
1678                "dropping wal-fec repair pipeline without awaiting shutdown"
1679            );
1680        }
1681    }
1682}
1683
1684async fn run_repair_pipeline_worker(
1685    mut receiver: mpsc::Receiver<WalFecPipelineMessage>,
1686    state: WalFecRepairWorkerState,
1687    worker_cx: Cx,
1688    per_symbol_delay: Duration,
1689) {
1690    let Some(native_worker_cx) = NativeCx::current() else {
1691        record_worker_failure(
1692            &state.worker_failure,
1693            "wal-fec repair worker task missing native runtime Cx".to_owned(),
1694        );
1695        drain_abandoned_work(
1696            &mut receiver,
1697            state.pending_jobs.as_ref(),
1698            state.canceled_jobs.as_ref(),
1699        );
1700        return;
1701    };
1702    worker_cx.set_native_cx(native_worker_cx.clone());
1703
1704    loop {
1705        let message = match receiver.recv(&native_worker_cx).await {
1706            Ok(message) => message,
1707            Err(mpsc::RecvError::Empty) => {
1708                yield_now().await;
1709                continue;
1710            }
1711            Err(mpsc::RecvError::Disconnected) => break,
1712            Err(mpsc::RecvError::Cancelled) => {
1713                record_worker_failure(
1714                    &state.worker_failure,
1715                    "wal-fec repair worker task cancelled before the queue drained".to_owned(),
1716                );
1717                drain_abandoned_work(
1718                    &mut receiver,
1719                    state.pending_jobs.as_ref(),
1720                    state.canceled_jobs.as_ref(),
1721                );
1722                break;
1723            }
1724        };
1725
1726        match message {
1727            WalFecPipelineMessage::Work(work_item) => {
1728                let group_id = work_item.meta.group_id();
1729                let cancel_flag_for_work = Arc::clone(&state.cancel_flag);
1730                let work_cx = worker_cx.create_child();
1731                let native_worker_cx_for_work = native_worker_cx.clone();
1732                let outcome = spawn_blocking(move || {
1733                    work_cx.set_native_cx(native_worker_cx_for_work);
1734                    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1735                        process_repair_work_item(
1736                            &work_item,
1737                            &work_cx,
1738                            cancel_flag_for_work.as_ref(),
1739                            per_symbol_delay,
1740                        )
1741                    }))
1742                })
1743                .await;
1744
1745                state.pending_jobs.fetch_sub(1, Ordering::SeqCst);
1746                match outcome {
1747                    Ok(Ok(WalFecWorkOutcome::Completed)) => {
1748                        state.completed_jobs.fetch_add(1, Ordering::SeqCst);
1749                        info!(
1750                            group_id = %group_id,
1751                            "wal-fec repair work item completed"
1752                        );
1753                    }
1754                    Ok(Ok(WalFecWorkOutcome::Canceled)) => {
1755                        state.canceled_jobs.fetch_add(1, Ordering::SeqCst);
1756                        warn!(
1757                            group_id = %group_id,
1758                            "wal-fec repair work item canceled before append"
1759                        );
1760                    }
1761                    Ok(Err(err)) => {
1762                        state.failed_jobs.fetch_add(1, Ordering::SeqCst);
1763                        error!(
1764                            group_id = %group_id,
1765                            error = %err,
1766                            "wal-fec repair work item failed"
1767                        );
1768                    }
1769                    Err(_) => {
1770                        state.failed_jobs.fetch_add(1, Ordering::SeqCst);
1771                        let detail = format!(
1772                            "wal-fec repair worker task panicked while processing group {group_id}"
1773                        );
1774                        record_worker_failure(&state.worker_failure, detail.clone());
1775                        error!(group_id = %group_id, "{detail}");
1776                        drain_abandoned_work(
1777                            &mut receiver,
1778                            state.pending_jobs.as_ref(),
1779                            state.canceled_jobs.as_ref(),
1780                        );
1781                        break;
1782                    }
1783                }
1784
1785                if worker_cx.checkpoint().is_err() {
1786                    record_worker_failure(
1787                        &state.worker_failure,
1788                        "wal-fec repair worker task cancelled after processing work".to_owned(),
1789                    );
1790                    drain_abandoned_work(
1791                        &mut receiver,
1792                        state.pending_jobs.as_ref(),
1793                        state.canceled_jobs.as_ref(),
1794                    );
1795                    break;
1796                }
1797                yield_now().await;
1798            }
1799        }
1800    }
1801}
1802
1803fn record_worker_failure(worker_failure: &Mutex<Option<String>>, detail: String) {
1804    let mut slot = lock_unpoisoned(worker_failure);
1805    if slot.is_none() {
1806        *slot = Some(detail);
1807    }
1808}
1809
1810fn lock_unpoisoned<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
1811    mutex
1812        .lock()
1813        .unwrap_or_else(std::sync::PoisonError::into_inner)
1814}
1815
1816trait ReceiverTryRecvCompat<T> {
1817    fn try_recv_compat(&mut self) -> std::result::Result<T, mpsc::RecvError>;
1818}
1819
1820impl<T> ReceiverTryRecvCompat<T> for mpsc::Receiver<T> {
1821    fn try_recv_compat(&mut self) -> std::result::Result<T, mpsc::RecvError> {
1822        self.try_recv()
1823    }
1824}
1825
1826fn drain_abandoned_work(
1827    receiver: &mut mpsc::Receiver<WalFecPipelineMessage>,
1828    pending_jobs: &AtomicUsize,
1829    canceled_jobs: &AtomicUsize,
1830) {
1831    while let Ok(WalFecPipelineMessage::Work(_)) = receiver.try_recv_compat() {
1832        pending_jobs.fetch_sub(1, Ordering::SeqCst);
1833        canceled_jobs.fetch_add(1, Ordering::SeqCst);
1834    }
1835}
1836
1837/// Deterministically generate repair symbols from source pages.
1838pub fn generate_wal_fec_repair_symbols(
1839    meta: &WalFecGroupMeta,
1840    source_pages: &[Vec<u8>],
1841) -> Result<Vec<SymbolRecord>> {
1842    match generate_wal_fec_repair_symbols_inner(meta, source_pages, None, None, Duration::ZERO)? {
1843        Some(symbols) => {
1844            crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_encode();
1845            Ok(symbols)
1846        }
1847        None => Err(FrankenError::WalCorrupt {
1848            detail: "unexpected cancellation while generating wal-fec symbols".to_owned(),
1849        }),
1850    }
1851}
1852
1853fn process_repair_work_item(
1854    work_item: &WalFecRepairWorkItem,
1855    cx: &Cx,
1856    cancel_flag: &AtomicBool,
1857    per_symbol_delay: Duration,
1858) -> Result<WalFecWorkOutcome> {
1859    if cancel_flag.load(Ordering::SeqCst) || cx.checkpoint().is_err() {
1860        return Ok(WalFecWorkOutcome::Canceled);
1861    }
1862    let Some(repair_symbols) = generate_wal_fec_repair_symbols_inner(
1863        &work_item.meta,
1864        &work_item.source_pages,
1865        Some(cx),
1866        Some(cancel_flag),
1867        per_symbol_delay,
1868    )?
1869    else {
1870        return Ok(WalFecWorkOutcome::Canceled);
1871    };
1872    if cancel_flag.load(Ordering::SeqCst) || cx.checkpoint().is_err() {
1873        return Ok(WalFecWorkOutcome::Canceled);
1874    }
1875    let group = WalFecGroupRecord::new(work_item.meta.clone(), repair_symbols)?;
1876    append_wal_fec_group(&work_item.sidecar_path, &group)?;
1877    Ok(WalFecWorkOutcome::Completed)
1878}
1879
1880fn generate_wal_fec_repair_symbols_inner(
1881    meta: &WalFecGroupMeta,
1882    source_pages: &[Vec<u8>],
1883    cx: Option<&Cx>,
1884    cancel_flag: Option<&AtomicBool>,
1885    per_symbol_delay: Duration,
1886) -> Result<Option<Vec<SymbolRecord>>> {
1887    validate_source_pages(meta, source_pages)?;
1888    let symbol_len = usize::try_from(meta.oti.t).map_err(|_| FrankenError::WalCorrupt {
1889        detail: format!("OTI symbol size {} does not fit in usize", meta.oti.t),
1890    })?;
1891    let r_repair = usize::try_from(meta.r_repair).map_err(|_| FrankenError::WalCorrupt {
1892        detail: format!("r_repair {} does not fit in usize", meta.r_repair),
1893    })?;
1894
1895    // Derive a deterministic group-level seed for the SystematicEncoder from
1896    // the group metadata (object_id, salts, frame range, k, r).
1897    let encoder_seed = derive_repair_seed(meta, 0);
1898
1899    let encoder = asupersync::raptorq::systematic::SystematicEncoder::new(
1900        source_pages,
1901        symbol_len,
1902        encoder_seed,
1903    )
1904    .ok_or_else(|| FrankenError::WalCorrupt {
1905        detail: "RaptorQ constraint matrix singular during encoding".to_owned(),
1906    })?;
1907
1908    let mut symbols = Vec::with_capacity(r_repair);
1909
1910    for repair_index in 0..r_repair {
1911        if let Some(cx) = cx {
1912            if cx.checkpoint().is_err() {
1913                return Ok(None);
1914            }
1915        }
1916        if let Some(flag) = cancel_flag {
1917            if flag.load(Ordering::SeqCst) {
1918                return Ok(None);
1919            }
1920        }
1921
1922        let esi = meta
1923            .k_source
1924            .checked_add(
1925                u32::try_from(repair_index).map_err(|_| FrankenError::WalCorrupt {
1926                    detail: format!("repair_index {repair_index} does not fit in u32"),
1927                })?,
1928            )
1929            .ok_or_else(|| FrankenError::WalCorrupt {
1930                detail: "repair symbol ESI overflow".to_owned(),
1931            })?;
1932
1933        let payload = encoder.repair_symbol(esi);
1934
1935        if per_symbol_delay > Duration::ZERO {
1936            thread::sleep(per_symbol_delay);
1937        }
1938
1939        symbols.push(SymbolRecord::new(
1940            meta.object_id,
1941            meta.oti,
1942            esi,
1943            payload,
1944            SymbolRecordFlags::empty(),
1945        ));
1946    }
1947
1948    Ok(Some(symbols))
1949}
1950
1951fn validate_source_pages(meta: &WalFecGroupMeta, source_pages: &[Vec<u8>]) -> Result<()> {
1952    let expected_pages = usize::try_from(meta.k_source).map_err(|_| FrankenError::WalCorrupt {
1953        detail: format!("k_source {} does not fit in usize", meta.k_source),
1954    })?;
1955    if source_pages.len() != expected_pages {
1956        return Err(FrankenError::WalCorrupt {
1957            detail: format!(
1958                "source page count {} must equal k_source {}",
1959                source_pages.len(),
1960                meta.k_source
1961            ),
1962        });
1963    }
1964    let expected_len = usize::try_from(meta.page_size).map_err(|_| FrankenError::WalCorrupt {
1965        detail: format!("page_size {} does not fit in usize", meta.page_size),
1966    })?;
1967
1968    for (index, page) in source_pages.iter().enumerate() {
1969        if page.len() != expected_len {
1970            return Err(FrankenError::WalCorrupt {
1971                detail: format!(
1972                    "source page {index} has length {}, expected {expected_len}",
1973                    page.len()
1974                ),
1975            });
1976        }
1977        let actual_hash = wal_fec_source_hash_xxh3_128(page);
1978        let expected_hash = meta.source_page_xxh3_128[index];
1979        if actual_hash != expected_hash {
1980            return Err(FrankenError::WalCorrupt {
1981                detail: format!("source page hash mismatch at index {index}"),
1982            });
1983        }
1984    }
1985    Ok(())
1986}
1987
1988fn derive_repair_seed(meta: &WalFecGroupMeta, repair_index: u32) -> u64 {
1989    let mut seed_material = Vec::with_capacity(16 + (7 * size_of::<u32>()));
1990    seed_material.extend_from_slice(meta.object_id.as_bytes());
1991    seed_material.extend_from_slice(&meta.wal_salt1.to_le_bytes());
1992    seed_material.extend_from_slice(&meta.wal_salt2.to_le_bytes());
1993    seed_material.extend_from_slice(&meta.start_frame_no.to_le_bytes());
1994    seed_material.extend_from_slice(&meta.end_frame_no.to_le_bytes());
1995    seed_material.extend_from_slice(&meta.k_source.to_le_bytes());
1996    seed_material.extend_from_slice(&meta.r_repair.to_le_bytes());
1997    seed_material.extend_from_slice(&repair_index.to_le_bytes());
1998    xxh3_64(&seed_material)
1999}
2000
2001fn update_max_pending(max_pending: &AtomicUsize, candidate: usize) {
2002    let mut observed = max_pending.load(Ordering::SeqCst);
2003    while candidate > observed {
2004        match max_pending.compare_exchange(observed, candidate, Ordering::SeqCst, Ordering::SeqCst)
2005        {
2006            Ok(_) => break,
2007            Err(new_observed) => observed = new_observed,
2008        }
2009    }
2010}
2011
2012/// Build source hashes for `K` WAL payload pages.
2013#[must_use]
2014pub fn build_source_page_hashes(page_payloads: &[Vec<u8>]) -> Vec<Xxh3Checksum128> {
2015    page_payloads
2016        .iter()
2017        .map(|page| wal_fec_source_hash_xxh3_128(page))
2018        .collect()
2019}
2020
2021/// RFC 6330 RaptorQ decode function for WAL-FEC recovery.
2022///
2023/// Accepts the group metadata and a slice of `(esi, symbol_data)` pairs
2024/// (source symbols with ESI < K and repair symbols with ESI >= K).
2025/// Returns `K` recovered source pages on success.
2026///
2027/// This function is the companion decoder for the `SystematicEncoder`-based
2028/// encoding in [`generate_wal_fec_repair_symbols_inner`] and is intended as
2029/// the `decode` closure for [`recover_wal_fec_group_with_decoder`].
2030pub fn wal_fec_raptorq_decode(
2031    meta: &WalFecGroupMeta,
2032    symbols: &[(u32, Vec<u8>)],
2033) -> Result<Vec<Vec<u8>>> {
2034    let k = usize::try_from(meta.k_source).map_err(|_| FrankenError::WalCorrupt {
2035        detail: format!("k_source {} does not fit in usize", meta.k_source),
2036    })?;
2037    let symbol_size = usize::try_from(meta.oti.t).map_err(|_| FrankenError::WalCorrupt {
2038        detail: format!("OTI symbol size {} does not fit in usize", meta.oti.t),
2039    })?;
2040
2041    // Must use the same seed as the encoder.
2042    let encoder_seed = derive_repair_seed(meta, 0);
2043    let decoder =
2044        asupersync::raptorq::decoder::InactivationDecoder::new(k, symbol_size, encoder_seed);
2045
2046    // Start with constraint symbols (LDPC + HDPC with zero data).
2047    let mut received = decoder.constraint_symbols();
2048    let repair_padding_delta = {
2049        let params = decoder.params();
2050        params
2051            .k_prime
2052            .checked_sub(params.k)
2053            .and_then(|delta| u32::try_from(delta).ok())
2054            .ok_or_else(|| FrankenError::WalCorrupt {
2055                detail: format!(
2056                    "invalid RaptorQ padding domain: K={} K'={}",
2057                    params.k, params.k_prime
2058                ),
2059            })?
2060    };
2061
2062    // Convert caller-provided (esi, data) pairs into ReceivedSymbol entries.
2063    for &(esi, ref data) in symbols {
2064        let esi_usize = esi as usize;
2065        if esi_usize < k {
2066            let (cols, coefs) = decoder.source_equation(esi);
2067            received.push(asupersync::raptorq::decoder::ReceivedSymbol {
2068                esi,
2069                is_source: true,
2070                columns: cols,
2071                coefficients: coefs,
2072                data: data.clone(),
2073            });
2074        } else {
2075            esi.checked_add(repair_padding_delta)
2076                .ok_or_else(|| FrankenError::WalCorrupt {
2077                    detail: format!("invalid RaptorQ repair ESI {esi}: overflow"),
2078                })?;
2079            let (cols, coefs) = decoder
2080                .repair_equation_rfc6330(esi)
2081                .into_wal_fec_result(esi)?;
2082            received.push(asupersync::raptorq::decoder::ReceivedSymbol::repair(
2083                esi,
2084                cols,
2085                coefs,
2086                data.clone(),
2087            ));
2088        }
2089    }
2090
2091    let result = decoder
2092        .decode(&received)
2093        .map_err(|err| FrankenError::WalCorrupt {
2094            detail: format!("RaptorQ decode failed: {err:?}"),
2095        })?;
2096
2097    if result.source.len() != k {
2098        return Err(FrankenError::WalCorrupt {
2099            detail: format!(
2100                "RaptorQ decode returned {} source symbols, expected {k}",
2101                result.source.len()
2102            ),
2103        });
2104    }
2105
2106    debug!(
2107        k_source = k,
2108        peeled = result.stats.peeled,
2109        inactivated = result.stats.inactivated,
2110        gauss_ops = result.stats.gauss_ops,
2111        "wal-fec RaptorQ decode succeeded"
2112    );
2113
2114    Ok(result.source)
2115}
2116
2117/// Resolve sidecar path from WAL path.
2118#[must_use]
2119pub fn wal_fec_path_for_wal(wal_path: &Path) -> PathBuf {
2120    let wal_name = wal_path.to_string_lossy();
2121    if wal_name.ends_with("-wal") || wal_name.ends_with(".wal") {
2122        PathBuf::from(format!("{wal_name}-fec"))
2123    } else {
2124        PathBuf::from(format!("{wal_name}.wal-fec"))
2125    }
2126}
2127
2128/// Read persistent `PRAGMA raptorq_repair_symbols` from `.wal-fec` header.
2129///
2130/// Returns [`DEFAULT_RAPTORQ_REPAIR_SYMBOLS`] when the sidecar is missing or
2131/// still in legacy format without a config header.
2132pub fn read_wal_fec_raptorq_repair_symbols(sidecar_path: &Path) -> Result<u8> {
2133    if !sidecar_path.exists() {
2134        return Ok(DEFAULT_RAPTORQ_REPAIR_SYMBOLS);
2135    }
2136
2137    let mut file = match fs::File::open(sidecar_path) {
2138        Ok(f) => f,
2139        Err(_) => return Ok(DEFAULT_RAPTORQ_REPAIR_SYMBOLS),
2140    };
2141    let mut bytes = [0_u8; WAL_FEC_PRAGMA_HEADER_BYTES];
2142    use std::io::Read;
2143    let read_len = file.read(&mut bytes).unwrap_or(0);
2144
2145    let Some(header) = WalFecPragmaHeader::from_prefix(&bytes[..read_len])? else {
2146        return Ok(DEFAULT_RAPTORQ_REPAIR_SYMBOLS);
2147    };
2148
2149    debug!(
2150        sidecar = %sidecar_path.display(),
2151        raptorq_repair_symbols = header.raptorq_repair_symbols,
2152        "loaded wal-fec repair symbol setting from sidecar header"
2153    );
2154    Ok(header.raptorq_repair_symbols)
2155}
2156
2157/// Persist `PRAGMA raptorq_repair_symbols` in a checksummed `.wal-fec` header.
2158///
2159/// Existing sidecar group data is preserved exactly after the header region.
2160pub fn persist_wal_fec_raptorq_repair_symbols(sidecar_path: &Path, value: u8) -> Result<()> {
2161    use std::io::{Read, Seek, SeekFrom, Write};
2162
2163    if !sidecar_path.exists() {
2164        if let Some(parent) = sidecar_path.parent() {
2165            if !parent.as_os_str().is_empty() {
2166                fs::create_dir_all(parent)?;
2167            }
2168        }
2169        let header = WalFecPragmaHeader::new(value);
2170        // Use create_new (O_EXCL) to atomically create the file, avoiding a
2171        // TOCTOU race where another process could create the sidecar between
2172        // our exists() check and the write.
2173        match fs::OpenOptions::new()
2174            .write(true)
2175            .create_new(true)
2176            .open(sidecar_path)
2177        {
2178            Ok(mut file) => {
2179                file.write_all(&header.to_bytes())?;
2180                file.sync_all()?;
2181                info!(
2182                    sidecar = %sidecar_path.display(),
2183                    raptorq_repair_symbols = value,
2184                    "persisted wal-fec repair symbol setting (new file)"
2185                );
2186                return Ok(());
2187            }
2188            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
2189                // Another process created the sidecar first — fall through
2190                // to the read-modify-write path below.
2191            }
2192            Err(e) => return Err(e.into()),
2193        }
2194    }
2195
2196    let mut file = fs::OpenOptions::new()
2197        .read(true)
2198        .write(true)
2199        .open(sidecar_path)?;
2200
2201    let mut header_buf = [0_u8; WAL_FEC_PRAGMA_HEADER_BYTES];
2202    let read_len = file.read(&mut header_buf).unwrap_or(0);
2203    let has_header = WalFecPragmaHeader::from_prefix(&header_buf[..read_len])?.is_some();
2204    let header = WalFecPragmaHeader::new(value);
2205
2206    file.seek(SeekFrom::Start(0))?;
2207    if has_header {
2208        file.write_all(&header.to_bytes())?;
2209    } else {
2210        // Rewrite the file with the header prepended using a buffered stream to avoid OOM.
2211        // Wrap in a closure so the temp file is cleaned up on any I/O error.
2212        let temp_path = sidecar_path.with_extension("wal-fec.tmp");
2213        let result = (|| -> Result<()> {
2214            let mut temp_file = std::io::BufWriter::new(fs::File::create(&temp_path)?);
2215            temp_file.write_all(&header.to_bytes())?;
2216            std::io::copy(&mut file, &mut temp_file)?;
2217            let inner = temp_file.into_inner().map_err(|e| e.into_error())?;
2218            inner.sync_all()?;
2219            fs::rename(&temp_path, sidecar_path)?;
2220            Ok(())
2221        })();
2222        if result.is_err() {
2223            let _ = fs::remove_file(&temp_path);
2224        }
2225        result?;
2226    }
2227
2228    info!(
2229        sidecar = %sidecar_path.display(),
2230        raptorq_repair_symbols = value,
2231        "persisted wal-fec repair symbol setting"
2232    );
2233    Ok(())
2234}
2235
2236/// Ensure WAL file and `.wal-fec` sidecar both exist.
2237pub fn ensure_wal_with_fec_sidecar(wal_path: &Path) -> Result<PathBuf> {
2238    fs::OpenOptions::new()
2239        .create(true)
2240        .append(true)
2241        .open(wal_path)?;
2242    let sidecar_path = wal_fec_path_for_wal(wal_path);
2243    fs::OpenOptions::new()
2244        .create(true)
2245        .append(true)
2246        .open(&sidecar_path)?;
2247    Ok(sidecar_path)
2248}
2249
2250/// Append a complete group (meta + repair symbols) to a sidecar file.
2251pub fn append_wal_fec_group(sidecar_path: &Path, group: &WalFecGroupRecord) -> Result<()> {
2252    group.validate_layout()?;
2253    let group_id = group.meta.group_id();
2254    debug!(
2255        group_id = %group_id,
2256        k_source = group.meta.k_source,
2257        r_repair = group.meta.r_repair,
2258        "appending wal-fec group"
2259    );
2260
2261    let mut file = fs::OpenOptions::new()
2262        .create(true)
2263        .append(true)
2264        .open(sidecar_path)?;
2265    let meta_bytes = group.meta.to_record_bytes();
2266    write_length_prefixed(&mut file, &meta_bytes, "group metadata")?;
2267    for symbol in &group.repair_symbols {
2268        write_length_prefixed(&mut file, &symbol.to_bytes(), "repair symbol")?;
2269    }
2270    file.sync_data()?;
2271    info!(
2272        group_id = %group_id,
2273        sidecar = %sidecar_path.display(),
2274        repair_symbols = group.repair_symbols.len(),
2275        "wal-fec group appended"
2276    );
2277    Ok(())
2278}
2279
2280/// Scan a sidecar file and parse all fully-written groups.
2281///
2282/// On truncated tail (e.g. crash during append), returns `truncated_tail=true`
2283/// and only fully-validated preceding groups.
2284pub fn scan_wal_fec(sidecar_path: &Path) -> Result<WalFecScanResult> {
2285    if !sidecar_path.exists() {
2286        return Ok(WalFecScanResult::default());
2287    }
2288    let bytes = fs::read(sidecar_path)?;
2289    let mut cursor = scan_offset_after_optional_pragma_header(&bytes)?;
2290    let mut groups = Vec::new();
2291    let mut truncated_tail = false;
2292
2293    while cursor < bytes.len() {
2294        let Some(meta_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
2295            truncated_tail = true;
2296            warn!(
2297                sidecar = %sidecar_path.display(),
2298                cursor,
2299                "truncated wal-fec metadata tail detected"
2300            );
2301            break;
2302        };
2303        let meta = WalFecGroupMeta::from_record_bytes(meta_bytes)?;
2304        let r_repair_usize =
2305            usize::try_from(meta.r_repair).map_err(|_| FrankenError::WalCorrupt {
2306                detail: format!("r_repair {} does not fit in usize", meta.r_repair),
2307            })?;
2308
2309        // Prevent OOM panics from maliciously large r_repair values.
2310        // Each repair symbol requires at least 4 bytes for its length prefix.
2311        if bytes.len().saturating_sub(cursor) < r_repair_usize.saturating_mul(4) {
2312            return Err(FrankenError::WalCorrupt {
2313                detail: format!("r_repair {} exceeds remaining buffer", meta.r_repair),
2314            });
2315        }
2316
2317        let mut repair_symbols = Vec::with_capacity(r_repair_usize);
2318
2319        for _ in 0..meta.r_repair {
2320            let Some(symbol_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
2321                truncated_tail = true;
2322                warn!(
2323                    sidecar = %sidecar_path.display(),
2324                    group_id = %meta.group_id(),
2325                    cursor,
2326                    "truncated wal-fec repair-symbol tail detected"
2327                );
2328                break;
2329            };
2330            let symbol = SymbolRecord::from_bytes(symbol_bytes).map_err(|err| {
2331                error!(
2332                    sidecar = %sidecar_path.display(),
2333                    group_id = %meta.group_id(),
2334                    error = %err,
2335                    "invalid wal-fec repair symbol"
2336                );
2337                FrankenError::WalCorrupt {
2338                    detail: format!("invalid wal-fec repair symbol: {err}"),
2339                }
2340            })?;
2341            repair_symbols.push(symbol);
2342        }
2343
2344        if truncated_tail {
2345            break;
2346        }
2347        groups.push(WalFecGroupRecord::new(meta, repair_symbols)?);
2348    }
2349
2350    Ok(WalFecScanResult {
2351        groups,
2352        truncated_tail,
2353    })
2354}
2355
2356/// Find one group by `(wal_salt1, wal_salt2, end_frame_no)`.
2357pub fn find_wal_fec_group(
2358    sidecar_path: &Path,
2359    group_id: WalFecGroupId,
2360) -> Result<Option<WalFecGroupRecord>> {
2361    let scan = scan_wal_fec(sidecar_path)?;
2362    Ok(scan
2363        .groups
2364        .into_iter()
2365        .find(|group| group.meta.group_id() == group_id))
2366}
2367
2368const BD_1HI_11_BEAD_ID: &str = "bd-1hi.11";
2369
2370#[derive(Debug, Clone, PartialEq, Eq)]
2371struct WalFecRecoveryGroupRecord {
2372    meta: WalFecGroupMeta,
2373    repair_symbols: Vec<SymbolRecord>,
2374    corruption_observations: u32,
2375}
2376
2377/// Locate the commit group containing the first checksum-mismatching frame.
2378#[must_use]
2379pub fn identify_damaged_commit_group(
2380    groups: &[WalFecGroupRecord],
2381    wal_salts: WalSalts,
2382    damaged_frame_no: u32,
2383) -> Option<WalFecGroupId> {
2384    groups
2385        .iter()
2386        .find(|group| {
2387            let meta = &group.meta;
2388            meta.wal_salt1 == wal_salts.salt1
2389                && meta.wal_salt2 == wal_salts.salt2
2390                && meta.start_frame_no <= damaged_frame_no
2391                && damaged_frame_no <= meta.end_frame_no
2392        })
2393        .map(|group| group.meta.group_id())
2394}
2395
2396/// Recover one WAL-FEC commit group with caller-provided decode logic.
2397///
2398/// This implements the §3.4.1 compatibility-mode recovery flow:
2399/// 1. locate group metadata by `(wal_salt1, wal_salt2, end_frame_no)`;
2400/// 2. validate source payloads from `.wal` (independent xxh3 at/after chain break);
2401/// 3. collect valid repair symbols from `.wal-fec`;
2402/// 4. decode if at least `K` symbols are available;
2403/// 5. otherwise fall back to SQLite-compatible truncation.
2404pub fn recover_wal_fec_group_with_decoder<F>(
2405    sidecar_path: &Path,
2406    group_id: WalFecGroupId,
2407    wal_salts: WalSalts,
2408    first_checksum_mismatch_frame_no: u32,
2409    wal_frames: &[WalFrameCandidate],
2410    mut decode: F,
2411) -> Result<WalFecRecoveryOutcome>
2412where
2413    F: FnMut(&WalFecGroupMeta, &[(u32, Vec<u8>)]) -> Result<Vec<Vec<u8>>>,
2414{
2415    let groups = match scan_wal_fec_for_recovery(sidecar_path) {
2416        Ok(groups) => groups,
2417        Err(err) => {
2418            warn!(
2419                bead_id = BD_1HI_11_BEAD_ID,
2420                group_id = %group_id,
2421                sidecar = %sidecar_path.display(),
2422                error = %err,
2423                "wal-fec sidecar unreadable; falling back to sqlite-compatible truncation"
2424            );
2425            return Ok(truncate_outcome(
2426                group_id,
2427                first_checksum_mismatch_frame_no,
2428                WalFecRecoveryFallbackReason::SidecarUnreadable,
2429                RecoveryProofStats::new(0),
2430            ));
2431        }
2432    };
2433
2434    let Some(group) = groups
2435        .into_iter()
2436        .find(|group| group.meta.group_id() == group_id)
2437    else {
2438        warn!(
2439            bead_id = BD_1HI_11_BEAD_ID,
2440            group_id = %group_id,
2441            sidecar = %sidecar_path.display(),
2442            "wal-fec group metadata missing; falling back to sqlite-compatible truncation"
2443        );
2444        return Ok(truncate_outcome(
2445            group_id,
2446            first_checksum_mismatch_frame_no,
2447            WalFecRecoveryFallbackReason::MissingSidecarGroup,
2448            RecoveryProofStats::new(0),
2449        ));
2450    };
2451
2452    if group.meta.verify_salt_binding(wal_salts).is_err() {
2453        warn!(
2454            bead_id = BD_1HI_11_BEAD_ID,
2455            group_id = %group_id,
2456            "wal-fec group salt mismatch; rejecting sidecar group and truncating"
2457        );
2458        return Ok(truncate_outcome(
2459            group_id,
2460            group.meta.start_frame_no,
2461            WalFecRecoveryFallbackReason::SaltMismatch,
2462            RecoveryProofStats::new(group.meta.k_source),
2463        ));
2464    }
2465
2466    recover_wal_fec_group_record_with_decoder(
2467        &group,
2468        first_checksum_mismatch_frame_no,
2469        wal_frames,
2470        &mut decode,
2471    )
2472}
2473
2474/// Config-aware WAL-FEC recovery that produces a [`WalFecRecoveryLog`].
2475///
2476/// When `config.recovery_enabled` is `false`, immediately returns
2477/// `TruncateBeforeGroup` (simulating C SQLite behaviour) and a log entry
2478/// recording the skip.
2479///
2480/// When enabled, delegates to [`recover_wal_fec_group_with_decoder`] and
2481/// converts the resulting [`WalFecDecodeProof`] into a structured log.
2482pub fn recover_wal_fec_group_with_config<F>(
2483    sidecar_path: &Path,
2484    group_id: WalFecGroupId,
2485    wal_salts: WalSalts,
2486    first_checksum_mismatch_frame_no: u32,
2487    wal_frames: &[WalFrameCandidate],
2488    config: &WalFecRecoveryConfig,
2489    decode: F,
2490) -> Result<(WalFecRecoveryOutcome, WalFecRecoveryLog)>
2491where
2492    F: FnMut(&WalFecGroupMeta, &[(u32, Vec<u8>)]) -> Result<Vec<Vec<u8>>>,
2493{
2494    let span = tracing::span!(
2495        tracing::Level::WARN,
2496        "wal_raptorq",
2497        segment_id = group_id.end_frame_no,
2498        corruption_detected = tracing::field::Empty,
2499        symbols_used_for_repair = tracing::field::Empty,
2500        repair_success = tracing::field::Empty,
2501        repair_duration_us = tracing::field::Empty,
2502    );
2503    let _guard = span.enter();
2504
2505    if !config.recovery_enabled {
2506        info!(
2507            bead_id = BD_1W6K_25_BEAD_ID,
2508            group_id = %group_id,
2509            "wal-fec recovery disabled; falling back to sqlite-compatible truncation"
2510        );
2511        span.record("corruption_detected", false);
2512        span.record("symbols_used_for_repair", 0_u32);
2513        span.record("repair_success", false);
2514        span.record("repair_duration_us", 0_u64);
2515        let outcome = truncate_outcome(
2516            group_id,
2517            first_checksum_mismatch_frame_no,
2518            WalFecRecoveryFallbackReason::RecoveryDisabled,
2519            RecoveryProofStats::new(0),
2520        );
2521        let log = WalFecRecoveryLog {
2522            group_id,
2523            recovery_enabled: false,
2524            outcome_is_recovered: false,
2525            fallback_reason: Some(WalFecRecoveryFallbackReason::RecoveryDisabled),
2526            validated_source_symbols: 0,
2527            validated_repair_symbols: 0,
2528            required_symbols: 0,
2529            available_symbols: 0,
2530            recovered_frame_nos: Vec::new(),
2531            corruption_observations: 0,
2532            decode_attempted: false,
2533            decode_succeeded: false,
2534        };
2535        record_raptorq_recovery_log(&log, Duration::ZERO);
2536        crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_repair(false, 0);
2537        return Ok((outcome, log));
2538    }
2539
2540    let started = Instant::now();
2541    let outcome = recover_wal_fec_group_with_decoder(
2542        sidecar_path,
2543        group_id,
2544        wal_salts,
2545        first_checksum_mismatch_frame_no,
2546        wal_frames,
2547        decode,
2548    )?;
2549
2550    let log = recovery_log_from_outcome(group_id, &outcome, true);
2551    let elapsed = started.elapsed();
2552    let duration_us = crate::metrics::duration_us_saturating(elapsed);
2553    let repair_attempt = repair_attempt_for_log(&log);
2554    let reason_code = recovery_reason_code_for_log(&log);
2555    let outcome_code = recovery_outcome_code(&log);
2556    let symbol_state = symbol_state_for_log(&log);
2557
2558    span.record("corruption_detected", log.corruption_observations > 0);
2559    span.record(
2560        "symbols_used_for_repair",
2561        log.available_symbols.min(log.required_symbols),
2562    );
2563    span.record("repair_success", log.outcome_is_recovered);
2564    span.record("repair_duration_us", duration_us);
2565    info!(
2566        bead_id = BD_1W6K_25_BEAD_ID,
2567        group_id = %group_id,
2568        repair_attempt,
2569        symbol_state = %symbol_state,
2570        reason_code,
2571        outcome = outcome_code,
2572        "wal-fec recovery decision"
2573    );
2574
2575    record_raptorq_recovery_log(&log, elapsed);
2576    crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS
2577        .record_repair(log.outcome_is_recovered, duration_us);
2578    Ok((outcome, log))
2579}
2580
2581/// Extract a [`WalFecRecoveryLog`] from a completed recovery outcome.
2582#[must_use]
2583pub fn recovery_log_from_outcome(
2584    group_id: WalFecGroupId,
2585    outcome: &WalFecRecoveryOutcome,
2586    recovery_enabled: bool,
2587) -> WalFecRecoveryLog {
2588    match outcome {
2589        WalFecRecoveryOutcome::Recovered(group) => WalFecRecoveryLog {
2590            group_id,
2591            recovery_enabled,
2592            outcome_is_recovered: true,
2593            fallback_reason: None,
2594            validated_source_symbols: group.decode_proof.validated_source_symbols,
2595            validated_repair_symbols: group.decode_proof.validated_repair_symbols,
2596            required_symbols: group.decode_proof.required_symbols,
2597            available_symbols: group.decode_proof.available_symbols,
2598            recovered_frame_nos: group.decode_proof.recovered_frame_nos.clone(),
2599            corruption_observations: group.decode_proof.corruption_observations,
2600            decode_attempted: group.decode_proof.decode_attempted,
2601            decode_succeeded: group.decode_proof.decode_succeeded,
2602        },
2603        WalFecRecoveryOutcome::TruncateBeforeGroup { decode_proof, .. } => WalFecRecoveryLog {
2604            group_id,
2605            recovery_enabled,
2606            outcome_is_recovered: false,
2607            fallback_reason: decode_proof.fallback_reason,
2608            validated_source_symbols: decode_proof.validated_source_symbols,
2609            validated_repair_symbols: decode_proof.validated_repair_symbols,
2610            required_symbols: decode_proof.required_symbols,
2611            available_symbols: decode_proof.available_symbols,
2612            recovered_frame_nos: decode_proof.recovered_frame_nos.clone(),
2613            corruption_observations: decode_proof.corruption_observations,
2614            decode_attempted: decode_proof.decode_attempted,
2615            decode_succeeded: decode_proof.decode_succeeded,
2616        },
2617    }
2618}
2619
2620const BD_1W6K_25_BEAD_ID: &str = "bd-1w6k.2.5";
2621
2622fn recover_wal_fec_group_record_with_decoder<F>(
2623    group: &WalFecRecoveryGroupRecord,
2624    first_checksum_mismatch_frame_no: u32,
2625    wal_frames: &[WalFrameCandidate],
2626    decode: &mut F,
2627) -> Result<WalFecRecoveryOutcome>
2628where
2629    F: FnMut(&WalFecGroupMeta, &[(u32, Vec<u8>)]) -> Result<Vec<Vec<u8>>>,
2630{
2631    let meta = &group.meta;
2632    let group_id = meta.group_id();
2633    let k_required = usize::try_from(meta.k_source).map_err(|_| FrankenError::WalCorrupt {
2634        detail: format!("k_source {} does not fit in usize", meta.k_source),
2635    })?;
2636    let page_len = usize::try_from(meta.page_size).map_err(|_| FrankenError::WalCorrupt {
2637        detail: format!("page_size {} does not fit in usize", meta.page_size),
2638    })?;
2639
2640    let frame_payload_by_no = build_frame_payload_map(wal_frames);
2641    let mut source_collection = collect_valid_source_symbols(
2642        meta,
2643        group_id,
2644        first_checksum_mismatch_frame_no,
2645        page_len,
2646        &frame_payload_by_no,
2647        k_required,
2648    )?;
2649    let (repair_symbols, validated_repair_symbols, rejected_repair_symbols) =
2650        collect_valid_repair_symbols(meta, group_id, &group.repair_symbols);
2651    source_collection.available_symbols.extend(repair_symbols);
2652    source_collection
2653        .available_symbols
2654        .sort_unstable_by_key(|(esi, _)| *esi);
2655
2656    let mut stats = build_recovery_proof_stats(
2657        meta.k_source,
2658        &source_collection,
2659        validated_repair_symbols,
2660        rejected_repair_symbols,
2661        group.corruption_observations,
2662    );
2663
2664    if source_collection.available_symbols.len() < k_required {
2665        error!(
2666            bead_id = BD_1HI_11_BEAD_ID,
2667            group_id = %group_id,
2668            required_symbols = meta.k_source,
2669            available_symbols = stats.available_symbols,
2670            "insufficient symbols for wal-fec decode; truncating before group"
2671        );
2672        return Ok(insufficient_symbols_outcome(meta, group_id, stats));
2673    }
2674
2675    if stats.validated_source_symbols == meta.k_source {
2676        stats.decode_succeeded = true;
2677        info!(
2678            bead_id = BD_1HI_11_BEAD_ID,
2679            group_id = %group_id,
2680            validated_source_symbols = stats.validated_source_symbols,
2681            "wal-fec recovery fast path: group fully intact"
2682        );
2683        return Ok(fast_path_outcome(
2684            meta,
2685            group_id,
2686            source_collection.source_pages,
2687            stats,
2688        ));
2689    }
2690
2691    stats.decode_attempted = true;
2692    let decoded_pages = match decode(meta, &source_collection.available_symbols) {
2693        Ok(decoded) => decoded,
2694        Err(err) => {
2695            error!(
2696                bead_id = BD_1HI_11_BEAD_ID,
2697                group_id = %group_id,
2698                error = %err,
2699                "wal-fec decode failed; truncating before group"
2700            );
2701            return Ok(decode_failed_outcome(meta, group_id, stats));
2702        }
2703    };
2704
2705    if !decoded_pages_match_expected(meta, &decoded_pages, page_len) {
2706        error!(
2707            bead_id = BD_1HI_11_BEAD_ID,
2708            group_id = %group_id,
2709            "decoded payload failed structural/hash verification; truncating before group"
2710        );
2711        return Ok(decoded_mismatch_outcome(meta, group_id, stats));
2712    }
2713
2714    Ok(finalize_decoded_success_outcome(
2715        meta,
2716        group_id,
2717        &source_collection.source_pages,
2718        decoded_pages,
2719        stats,
2720    ))
2721}
2722
2723fn finalize_decoded_success_outcome(
2724    meta: &WalFecGroupMeta,
2725    group_id: WalFecGroupId,
2726    source_pages: &[Option<Vec<u8>>],
2727    decoded_pages: Vec<Vec<u8>>,
2728    mut stats: RecoveryProofStats,
2729) -> WalFecRecoveryOutcome {
2730    let recovered_frame_nos = recovered_frame_numbers(meta, source_pages);
2731    let recovered_count = usize_to_u32(recovered_frame_nos.len());
2732    if recovered_count >= meta.r_repair.saturating_sub(1) {
2733        warn!(
2734            bead_id = BD_1HI_11_BEAD_ID,
2735            group_id = %group_id,
2736            recovered_frames = recovered_count,
2737            repair_capacity = meta.r_repair,
2738            "wal-fec recovery near repair-capacity limit"
2739        );
2740    }
2741    info!(
2742        bead_id = BD_1HI_11_BEAD_ID,
2743        group_id = %group_id,
2744        recovered_frames = recovered_count,
2745        db_size_pages = meta.db_size_pages,
2746        "wal-fec recovery succeeded"
2747    );
2748    stats.decode_succeeded = true;
2749    stats.recovered_frame_nos.clone_from(&recovered_frame_nos);
2750    decoded_success_outcome(meta, group_id, decoded_pages, recovered_frame_nos, stats)
2751}
2752
2753fn insufficient_symbols_outcome(
2754    meta: &WalFecGroupMeta,
2755    group_id: WalFecGroupId,
2756    stats: RecoveryProofStats,
2757) -> WalFecRecoveryOutcome {
2758    truncate_outcome(
2759        group_id,
2760        meta.start_frame_no,
2761        WalFecRecoveryFallbackReason::InsufficientSymbols,
2762        stats,
2763    )
2764}
2765
2766fn fast_path_outcome(
2767    meta: &WalFecGroupMeta,
2768    group_id: WalFecGroupId,
2769    source_pages: Vec<Option<Vec<u8>>>,
2770    stats: RecoveryProofStats,
2771) -> WalFecRecoveryOutcome {
2772    let recovered_pages = source_pages
2773        .into_iter()
2774        .map(|page| page.expect("source_pages are complete when all sources validated"))
2775        .collect::<Vec<_>>();
2776    WalFecRecoveryOutcome::Recovered(WalFecRecoveredGroup {
2777        meta: meta.clone(),
2778        recovered_pages,
2779        recovered_frame_nos: Vec::new(),
2780        db_size_pages: meta.db_size_pages,
2781        decode_proof: build_decode_proof(group_id, stats, None),
2782    })
2783}
2784
2785fn decode_failed_outcome(
2786    meta: &WalFecGroupMeta,
2787    group_id: WalFecGroupId,
2788    stats: RecoveryProofStats,
2789) -> WalFecRecoveryOutcome {
2790    truncate_outcome(
2791        group_id,
2792        meta.start_frame_no,
2793        WalFecRecoveryFallbackReason::DecodeFailed,
2794        stats,
2795    )
2796}
2797
2798fn decoded_mismatch_outcome(
2799    meta: &WalFecGroupMeta,
2800    group_id: WalFecGroupId,
2801    stats: RecoveryProofStats,
2802) -> WalFecRecoveryOutcome {
2803    truncate_outcome(
2804        group_id,
2805        meta.start_frame_no,
2806        WalFecRecoveryFallbackReason::DecodedPayloadMismatch,
2807        stats,
2808    )
2809}
2810
2811fn decoded_success_outcome(
2812    meta: &WalFecGroupMeta,
2813    group_id: WalFecGroupId,
2814    decoded_pages: Vec<Vec<u8>>,
2815    recovered_frame_nos: Vec<u32>,
2816    stats: RecoveryProofStats,
2817) -> WalFecRecoveryOutcome {
2818    WalFecRecoveryOutcome::Recovered(WalFecRecoveredGroup {
2819        meta: meta.clone(),
2820        recovered_pages: decoded_pages,
2821        recovered_frame_nos,
2822        db_size_pages: meta.db_size_pages,
2823        decode_proof: build_decode_proof(group_id, stats, None),
2824    })
2825}
2826
2827#[derive(Debug, Clone, PartialEq, Eq, Default)]
2828struct RecoveryProofStats {
2829    required_symbols: u32,
2830    available_symbols: u32,
2831    validated_source_symbols: u32,
2832    validated_repair_symbols: u32,
2833    corruption_observations: u32,
2834    decode_attempted: bool,
2835    decode_succeeded: bool,
2836    recovered_frame_nos: Vec<u32>,
2837}
2838
2839impl RecoveryProofStats {
2840    const fn new(required_symbols: u32) -> Self {
2841        Self {
2842            required_symbols,
2843            available_symbols: 0,
2844            validated_source_symbols: 0,
2845            validated_repair_symbols: 0,
2846            corruption_observations: 0,
2847            decode_attempted: false,
2848            decode_succeeded: false,
2849            recovered_frame_nos: Vec::new(),
2850        }
2851    }
2852}
2853
2854fn build_recovery_proof_stats(
2855    required_symbols: u32,
2856    source_collection: &SourceSymbolCollection,
2857    validated_repair_symbols: u32,
2858    rejected_repair_symbols: u32,
2859    sidecar_corruption_observations: u32,
2860) -> RecoveryProofStats {
2861    let mut stats = RecoveryProofStats::new(required_symbols);
2862    stats.available_symbols = usize_to_u32(source_collection.available_symbols.len());
2863    stats.validated_source_symbols = source_collection.validated_source_symbols;
2864    stats.validated_repair_symbols = validated_repair_symbols;
2865    stats.corruption_observations =
2866        sidecar_corruption_observations.saturating_add(rejected_repair_symbols);
2867    stats
2868}
2869
2870#[derive(Debug, Clone, PartialEq, Eq)]
2871struct SourceSymbolCollection {
2872    available_symbols: Vec<(u32, Vec<u8>)>,
2873    source_pages: Vec<Option<Vec<u8>>>,
2874    validated_source_symbols: u32,
2875}
2876
2877fn build_frame_payload_map(wal_frames: &[WalFrameCandidate]) -> BTreeMap<u32, &[u8]> {
2878    let mut frame_payload_by_no = BTreeMap::new();
2879    for frame in wal_frames {
2880        frame_payload_by_no
2881            .entry(frame.frame_no)
2882            .or_insert(frame.page_data.as_slice());
2883    }
2884    frame_payload_by_no
2885}
2886
2887fn collect_valid_source_symbols(
2888    meta: &WalFecGroupMeta,
2889    group_id: WalFecGroupId,
2890    first_checksum_mismatch_frame_no: u32,
2891    page_len: usize,
2892    frame_payload_by_no: &BTreeMap<u32, &[u8]>,
2893    k_required: usize,
2894) -> Result<SourceSymbolCollection> {
2895    let mut available_symbols: Vec<(u32, Vec<u8>)> = Vec::new();
2896    let mut source_pages = vec![None; k_required];
2897    let mut validated_source_symbols = 0_u32;
2898
2899    for source_esi in 0..meta.k_source {
2900        let index = usize::try_from(source_esi).map_err(|_| FrankenError::WalCorrupt {
2901            detail: format!("source ESI {source_esi} does not fit in usize"),
2902        })?;
2903        let frame_no = meta.start_frame_no.checked_add(source_esi).ok_or_else(|| {
2904            FrankenError::WalCorrupt {
2905                detail: "frame number overflow while collecting source symbols".to_owned(),
2906            }
2907        })?;
2908        let Some(payload) = frame_payload_by_no.get(&frame_no).copied() else {
2909            debug!(
2910                bead_id = BD_1HI_11_BEAD_ID,
2911                group_id = %group_id,
2912                frame_no,
2913                "source frame missing from wal candidates"
2914            );
2915            continue;
2916        };
2917        if payload.len() != page_len {
2918            warn!(
2919                bead_id = BD_1HI_11_BEAD_ID,
2920                group_id = %group_id,
2921                frame_no,
2922                payload_len = payload.len(),
2923                expected_len = page_len,
2924                "source frame payload length mismatch; excluding from decoder input"
2925            );
2926            continue;
2927        }
2928        if frame_no >= first_checksum_mismatch_frame_no
2929            && !verify_wal_fec_source_hash(payload, meta.source_page_xxh3_128[index])
2930        {
2931            warn!(
2932                bead_id = BD_1HI_11_BEAD_ID,
2933                group_id = %group_id,
2934                frame_no,
2935                esi = source_esi,
2936                "source frame hash mismatch at/after wal chain break; excluding from decoder input"
2937            );
2938            continue;
2939        }
2940        if frame_no >= first_checksum_mismatch_frame_no {
2941            debug!(
2942                bead_id = BD_1HI_11_BEAD_ID,
2943                group_id = %group_id,
2944                frame_no,
2945                esi = source_esi,
2946                "source frame validated via independent xxh3 hash"
2947            );
2948        }
2949        let payload_vec = payload.to_vec();
2950        source_pages[index] = Some(payload_vec.clone());
2951        available_symbols.push((source_esi, payload_vec));
2952        validated_source_symbols = validated_source_symbols.saturating_add(1);
2953    }
2954
2955    Ok(SourceSymbolCollection {
2956        available_symbols,
2957        source_pages,
2958        validated_source_symbols,
2959    })
2960}
2961
2962fn collect_valid_repair_symbols(
2963    meta: &WalFecGroupMeta,
2964    group_id: WalFecGroupId,
2965    repair_symbols: &[SymbolRecord],
2966) -> (Vec<(u32, Vec<u8>)>, u32, u32) {
2967    let mut validated_symbols = Vec::new();
2968    let mut validated_repair_symbols = 0_u32;
2969    let mut rejected_repair_symbols = 0_u32;
2970
2971    for symbol in repair_symbols {
2972        if !repair_symbol_matches_meta(meta, symbol) {
2973            warn!(
2974                bead_id = BD_1HI_11_BEAD_ID,
2975                group_id = %group_id,
2976                esi = symbol.esi,
2977                "repair symbol failed metadata binding checks; excluding from decoder input"
2978            );
2979            rejected_repair_symbols = rejected_repair_symbols.saturating_add(1);
2980            continue;
2981        }
2982        validated_repair_symbols = validated_repair_symbols.saturating_add(1);
2983        validated_symbols.push((symbol.esi, symbol.symbol_data.clone()));
2984    }
2985
2986    (
2987        validated_symbols,
2988        validated_repair_symbols,
2989        rejected_repair_symbols,
2990    )
2991}
2992
2993fn recovered_frame_numbers(meta: &WalFecGroupMeta, source_pages: &[Option<Vec<u8>>]) -> Vec<u32> {
2994    source_pages
2995        .iter()
2996        .enumerate()
2997        .filter_map(|(index, page)| {
2998            if page.is_some() {
2999                None
3000            } else {
3001                let idx = u32::try_from(index).ok()?;
3002                meta.start_frame_no.checked_add(idx)
3003            }
3004        })
3005        .collect()
3006}
3007
3008fn decoded_pages_match_expected(
3009    meta: &WalFecGroupMeta,
3010    decoded_pages: &[Vec<u8>],
3011    page_len: usize,
3012) -> bool {
3013    let Ok(k_required) = usize::try_from(meta.k_source) else {
3014        return false;
3015    };
3016    if decoded_pages.len() != k_required {
3017        return false;
3018    }
3019    decoded_pages.iter().enumerate().all(|(index, payload)| {
3020        payload.len() == page_len
3021            && verify_wal_fec_source_hash(payload, meta.source_page_xxh3_128[index])
3022    })
3023}
3024
3025fn repair_symbol_matches_meta(meta: &WalFecGroupMeta, symbol: &SymbolRecord) -> bool {
3026    if symbol.object_id != meta.object_id || symbol.oti != meta.oti {
3027        return false;
3028    }
3029    let repair_start = meta.k_source;
3030    let Some(repair_end) = meta.k_source.checked_add(meta.r_repair) else {
3031        return false;
3032    };
3033    symbol.esi >= repair_start && symbol.esi < repair_end
3034}
3035
3036fn scan_wal_fec_for_recovery(sidecar_path: &Path) -> Result<Vec<WalFecRecoveryGroupRecord>> {
3037    if !sidecar_path.exists() {
3038        return Ok(Vec::new());
3039    }
3040
3041    let bytes = fs::read(sidecar_path)?;
3042    let mut cursor = scan_offset_after_optional_pragma_header(&bytes)?;
3043    let mut groups = Vec::new();
3044
3045    while cursor < bytes.len() {
3046        let Some(meta_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
3047            break;
3048        };
3049        let meta = WalFecGroupMeta::from_record_bytes(meta_bytes)?;
3050        let mut repair_symbols = Vec::new();
3051        let mut corruption_observations = 0_u32;
3052
3053        let mut truncated_tail = false;
3054        for _ in 0..meta.r_repair {
3055            let Some(symbol_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
3056                truncated_tail = true;
3057                break;
3058            };
3059            match SymbolRecord::from_bytes(symbol_bytes) {
3060                Ok(symbol) => repair_symbols.push(symbol),
3061                Err(err) => {
3062                    warn!(
3063                        bead_id = BD_1HI_11_BEAD_ID,
3064                        group_id = %meta.group_id(),
3065                        error = %err,
3066                        "invalid wal-fec repair SymbolRecord excluded from recovery set"
3067                    );
3068                    corruption_observations = corruption_observations.saturating_add(1);
3069                }
3070            }
3071        }
3072        if truncated_tail {
3073            break;
3074        }
3075        groups.push(WalFecRecoveryGroupRecord {
3076            meta,
3077            repair_symbols,
3078            corruption_observations,
3079        });
3080    }
3081
3082    Ok(groups)
3083}
3084
3085fn truncate_outcome(
3086    group_id: WalFecGroupId,
3087    truncate_before_frame_no: u32,
3088    fallback_reason: WalFecRecoveryFallbackReason,
3089    stats: RecoveryProofStats,
3090) -> WalFecRecoveryOutcome {
3091    WalFecRecoveryOutcome::TruncateBeforeGroup {
3092        truncate_before_frame_no,
3093        decode_proof: build_decode_proof(group_id, stats, Some(fallback_reason)),
3094    }
3095}
3096
3097fn build_decode_proof(
3098    group_id: WalFecGroupId,
3099    stats: RecoveryProofStats,
3100    fallback_reason: Option<WalFecRecoveryFallbackReason>,
3101) -> WalFecDecodeProof {
3102    WalFecDecodeProof {
3103        group_id,
3104        required_symbols: stats.required_symbols,
3105        available_symbols: stats.available_symbols,
3106        validated_source_symbols: stats.validated_source_symbols,
3107        validated_repair_symbols: stats.validated_repair_symbols,
3108        corruption_observations: stats.corruption_observations,
3109        decode_attempted: stats.decode_attempted,
3110        decode_succeeded: stats.decode_succeeded,
3111        recovered_frame_nos: stats.recovered_frame_nos,
3112        fallback_reason,
3113    }
3114}
3115
3116fn usize_to_u32(value: usize) -> u32 {
3117    u32::try_from(value).unwrap_or(u32::MAX)
3118}
3119
3120fn scan_offset_after_optional_pragma_header(bytes: &[u8]) -> Result<usize> {
3121    let Some(header) = WalFecPragmaHeader::from_prefix(bytes)? else {
3122        return Ok(0);
3123    };
3124    debug!(
3125        raptorq_repair_symbols = header.raptorq_repair_symbols,
3126        "detected wal-fec pragma header during scan"
3127    );
3128    Ok(WAL_FEC_PRAGMA_HEADER_BYTES)
3129}
3130
3131fn write_length_prefixed(file: &mut fs::File, payload: &[u8], what: &str) -> Result<()> {
3132    let len_u32 = u32::try_from(payload.len()).map_err(|_| FrankenError::WalCorrupt {
3133        detail: format!(
3134            "{what} too large for wal-fec length prefix: {}",
3135            payload.len()
3136        ),
3137    })?;
3138    file.write_all(&len_u32.to_le_bytes())?;
3139    file.write_all(payload)?;
3140    Ok(())
3141}
3142
3143fn read_length_prefixed<'a>(bytes: &'a [u8], cursor: &mut usize) -> Result<Option<&'a [u8]>> {
3144    if *cursor >= bytes.len() {
3145        return Ok(None);
3146    }
3147    if bytes.len() - *cursor < LENGTH_PREFIX_BYTES {
3148        return Ok(None);
3149    }
3150    let mut len_raw = [0u8; LENGTH_PREFIX_BYTES];
3151    len_raw.copy_from_slice(&bytes[*cursor..*cursor + LENGTH_PREFIX_BYTES]);
3152    *cursor += LENGTH_PREFIX_BYTES;
3153    let payload_len =
3154        usize::try_from(u32::from_le_bytes(len_raw)).map_err(|_| FrankenError::WalCorrupt {
3155            detail: "wal-fec length prefix does not fit in usize".to_owned(),
3156        })?;
3157    let end = cursor
3158        .checked_add(payload_len)
3159        .ok_or_else(|| FrankenError::WalCorrupt {
3160            detail: "wal-fec length prefix overflow".to_owned(),
3161        })?;
3162    if end > bytes.len() {
3163        return Ok(None);
3164    }
3165    let payload = &bytes[*cursor..end];
3166    *cursor = end;
3167    Ok(Some(payload))
3168}
3169
3170fn append_u32_le(buf: &mut Vec<u8>, value: u32) {
3171    buf.extend_from_slice(&value.to_le_bytes());
3172}
3173
3174fn append_u64_le(buf: &mut Vec<u8>, value: u64) {
3175    buf.extend_from_slice(&value.to_le_bytes());
3176}
3177
3178fn read_u32_le(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u32> {
3179    let raw = read_array::<4>(bytes, cursor, field)?;
3180    Ok(u32::from_le_bytes(raw))
3181}
3182
3183fn read_u64_le(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u64> {
3184    let raw = read_array::<8>(bytes, cursor, field)?;
3185    Ok(u64::from_le_bytes(raw))
3186}
3187
3188fn read_array<const N: usize>(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<[u8; N]> {
3189    let end = cursor
3190        .checked_add(N)
3191        .ok_or_else(|| FrankenError::WalCorrupt {
3192            detail: format!("overflow reading wal-fec field {field}"),
3193        })?;
3194    if end > bytes.len() {
3195        return Err(FrankenError::WalCorrupt {
3196            detail: format!(
3197                "wal-fec field {field} out of bounds: need {N} bytes at offset {}, total {}",
3198                *cursor,
3199                bytes.len()
3200            ),
3201        });
3202    }
3203    let mut out = [0u8; N];
3204    out.copy_from_slice(&bytes[*cursor..end]);
3205    *cursor = end;
3206    Ok(out)
3207}
3208
3209#[cfg(test)]
3210mod tests {
3211    use super::*;
3212    use std::fs;
3213    use std::sync::{Mutex, OnceLock};
3214
3215    use tempfile::tempdir;
3216
3217    fn telemetry_test_lock() -> std::sync::MutexGuard<'static, ()> {
3218        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3219        match LOCK.get_or_init(|| Mutex::new(())).lock() {
3220            Ok(guard) => guard,
3221            Err(poisoned) => poisoned.into_inner(),
3222        }
3223    }
3224
3225    #[test]
3226    fn test_wal_fec_pragma_header_default_without_header() {
3227        let dir = tempdir().expect("tempdir");
3228        let sidecar = dir.path().join("db.wal-fec");
3229
3230        fs::write(&sidecar, b"legacy-groups-without-header").expect("write legacy bytes");
3231
3232        let value = read_wal_fec_raptorq_repair_symbols(&sidecar).expect("read default");
3233        assert_eq!(value, DEFAULT_RAPTORQ_REPAIR_SYMBOLS);
3234    }
3235
3236    #[test]
3237    fn test_wal_fec_pragma_persist_and_reload_across_reopen() {
3238        let dir = tempdir().expect("tempdir");
3239        let sidecar = dir.path().join("db.wal-fec");
3240
3241        persist_wal_fec_raptorq_repair_symbols(&sidecar, 4).expect("persist setting");
3242        let first_read = read_wal_fec_raptorq_repair_symbols(&sidecar).expect("read setting");
3243        let second_read = read_wal_fec_raptorq_repair_symbols(&sidecar).expect("re-read setting");
3244
3245        assert_eq!(first_read, 4);
3246        assert_eq!(second_read, 4);
3247    }
3248
3249    #[test]
3250    fn test_wal_fec_pragma_persist_preserves_payload_bytes() {
3251        let dir = tempdir().expect("tempdir");
3252        let sidecar = dir.path().join("db.wal-fec");
3253        let legacy_payload = vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE];
3254        fs::write(&sidecar, &legacy_payload).expect("write legacy payload");
3255
3256        persist_wal_fec_raptorq_repair_symbols(&sidecar, 9).expect("persist setting");
3257
3258        let rewritten = fs::read(&sidecar).expect("read rewritten sidecar");
3259        assert!(rewritten.len() >= WAL_FEC_PRAGMA_HEADER_BYTES);
3260        assert_eq!(
3261            &rewritten[WAL_FEC_PRAGMA_HEADER_BYTES..],
3262            legacy_payload.as_slice()
3263        );
3264    }
3265
3266    #[test]
3267    fn test_scan_wal_fec_accepts_header_only_file() {
3268        let dir = tempdir().expect("tempdir");
3269        let sidecar = dir.path().join("db.wal-fec");
3270
3271        persist_wal_fec_raptorq_repair_symbols(&sidecar, 3).expect("persist setting");
3272        let scan = scan_wal_fec(&sidecar).expect("scan header-only sidecar");
3273
3274        assert!(scan.groups.is_empty());
3275        assert!(!scan.truncated_tail);
3276    }
3277
3278    // -- bd-2ha1: WalFecGroupMeta unit tests --
3279
3280    const BEAD_ID_2HA1: &str = "bd-2ha1";
3281
3282    fn make_test_init(k: u32) -> WalFecGroupMetaInit {
3283        let page_size = 4096_u32;
3284        WalFecGroupMetaInit {
3285            wal_salt1: 0x1234_5678,
3286            wal_salt2: 0xABCD_EF01,
3287            start_frame_no: 1,
3288            end_frame_no: k,
3289            db_size_pages: 100,
3290            page_size,
3291            k_source: k,
3292            r_repair: 2,
3293            oti: Oti {
3294                f: u64::from(k) * u64::from(page_size),
3295                al: 0,
3296                t: page_size,
3297                z: 1,
3298                n: 1,
3299            },
3300            object_id: ObjectId::from_bytes([0xAA; 16]),
3301            page_numbers: (1..=k).collect(),
3302            source_page_xxh3_128: (0..k)
3303                .map(|i| Xxh3Checksum128 {
3304                    low: u64::from(i),
3305                    high: u64::from(i).wrapping_add(1),
3306                })
3307                .collect(),
3308        }
3309    }
3310
3311    #[test]
3312    fn test_meta_roundtrip() {
3313        let init = make_test_init(3);
3314        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3315        let serialized = meta.to_record_bytes();
3316        let deserialized =
3317            WalFecGroupMeta::from_record_bytes(&serialized).expect("from_record_bytes");
3318
3319        assert_eq!(meta, deserialized, "bead_id={BEAD_ID_2HA1} case=roundtrip");
3320        eprintln!(
3321            "DEBUG bead_id={BEAD_ID_2HA1} case=meta_roundtrip serialized_len={}",
3322            serialized.len()
3323        );
3324    }
3325
3326    #[test]
3327    fn test_meta_magic() {
3328        let init = make_test_init(2);
3329        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3330
3331        assert_eq!(
3332            meta.magic, WAL_FEC_GROUP_META_MAGIC,
3333            "bead_id={BEAD_ID_2HA1} case=magic expected=FSQLWFEC"
3334        );
3335        assert_eq!(
3336            &meta.magic, b"FSQLWFEC",
3337            "bead_id={BEAD_ID_2HA1} case=magic_bytes"
3338        );
3339
3340        // Serialized form also starts with magic.
3341        let bytes = meta.to_record_bytes();
3342        assert_eq!(
3343            &bytes[..8],
3344            b"FSQLWFEC",
3345            "bead_id={BEAD_ID_2HA1} case=serialized_magic"
3346        );
3347    }
3348
3349    #[test]
3350    fn test_meta_invariant_k_source() {
3351        let mut init = make_test_init(3);
3352        // Break invariant: k_source != frame span.
3353        init.k_source = 99;
3354        let result = WalFecGroupMeta::from_init(init);
3355        assert!(
3356            result.is_err(),
3357            "bead_id={BEAD_ID_2HA1} case=k_source_mismatch expected=Err"
3358        );
3359        eprintln!(
3360            "INFO bead_id={BEAD_ID_2HA1} case=invariant_k_source error={}",
3361            result.unwrap_err()
3362        );
3363    }
3364
3365    #[test]
3366    fn test_meta_invariant_page_numbers_len() {
3367        let mut init = make_test_init(3);
3368        // Break invariant: page_numbers.len != k_source.
3369        init.page_numbers.push(999);
3370        let result = WalFecGroupMeta::from_init(init);
3371        assert!(
3372            result.is_err(),
3373            "bead_id={BEAD_ID_2HA1} case=page_numbers_len_mismatch expected=Err"
3374        );
3375        eprintln!(
3376            "WARN bead_id={BEAD_ID_2HA1} case=invariant_page_numbers_len error={}",
3377            result.unwrap_err()
3378        );
3379    }
3380
3381    #[test]
3382    fn test_meta_invariant_xxh3_len() {
3383        let mut init = make_test_init(3);
3384        // Break invariant: source_page_xxh3_128.len != k_source.
3385        init.source_page_xxh3_128.pop();
3386        let result = WalFecGroupMeta::from_init(init);
3387        assert!(
3388            result.is_err(),
3389            "bead_id={BEAD_ID_2HA1} case=xxh3_len_mismatch expected=Err"
3390        );
3391        eprintln!(
3392            "WARN bead_id={BEAD_ID_2HA1} case=invariant_xxh3_len error={}",
3393            result.unwrap_err()
3394        );
3395    }
3396
3397    #[test]
3398    fn test_meta_checksum_valid() {
3399        let init = make_test_init(4);
3400        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3401        let serialized = meta.to_record_bytes();
3402
3403        // Deserialization validates checksum internally.
3404        let result = WalFecGroupMeta::from_record_bytes(&serialized);
3405        assert!(result.is_ok(), "bead_id={BEAD_ID_2HA1} case=checksum_valid");
3406        eprintln!(
3407            "INFO bead_id={BEAD_ID_2HA1} case=checksum_valid checksum={:#018x}",
3408            meta.checksum
3409        );
3410    }
3411
3412    #[test]
3413    fn test_meta_checksum_corrupt() {
3414        let init = make_test_init(4);
3415        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3416        let mut serialized = meta.to_record_bytes();
3417
3418        // Flip one bit in wal_salt1 (bytes 12..16) — no invariant checks on salt
3419        // fields, so this corruption is detected only by checksum mismatch.
3420        serialized[12] ^= 0x01;
3421
3422        let result = WalFecGroupMeta::from_record_bytes(&serialized);
3423        let err = result.expect_err("bead_id={BEAD_ID_2HA1} case=checksum_corrupt expected=Err");
3424        let msg = err.to_string();
3425        assert!(
3426            msg.contains("checksum mismatch"),
3427            "bead_id={BEAD_ID_2HA1} case=checksum_corrupt expected checksum error, got: {msg}"
3428        );
3429        eprintln!("ERROR bead_id={BEAD_ID_2HA1} case=checksum_corrupt error={err}");
3430    }
3431
3432    #[test]
3433    fn test_recovery_reason_codes_are_stable() {
3434        let base = WalFecRecoveryLog {
3435            group_id: WalFecGroupId {
3436                wal_salt1: 1,
3437                wal_salt2: 2,
3438                end_frame_no: 3,
3439            },
3440            recovery_enabled: true,
3441            outcome_is_recovered: true,
3442            fallback_reason: None,
3443            validated_source_symbols: 5,
3444            validated_repair_symbols: 1,
3445            required_symbols: 6,
3446            available_symbols: 6,
3447            recovered_frame_nos: vec![2],
3448            corruption_observations: 1,
3449            decode_attempted: true,
3450            decode_succeeded: true,
3451        };
3452
3453        assert_eq!(recovery_outcome_code(&base), "recovered");
3454        assert_eq!(recovery_reason_code_for_log(&base), "decode_recovered");
3455        assert!(repair_attempt_for_log(&base));
3456
3457        let mut fast_path = base.clone();
3458        fast_path.decode_attempted = false;
3459        fast_path.corruption_observations = 0;
3460        assert_eq!(recovery_reason_code_for_log(&fast_path), "intact_fast_path");
3461
3462        let mut truncated = base.clone();
3463        truncated.outcome_is_recovered = false;
3464        truncated.fallback_reason = Some(WalFecRecoveryFallbackReason::InsufficientSymbols);
3465        assert_eq!(recovery_outcome_code(&truncated), "truncate_before_group");
3466        assert_eq!(
3467            recovery_reason_code_for_log(&truncated),
3468            WalFecRecoveryFallbackReason::InsufficientSymbols.reason_code()
3469        );
3470    }
3471
3472    #[test]
3473    fn test_symbol_state_serialization_includes_required_fields() {
3474        let log = WalFecRecoveryLog {
3475            group_id: WalFecGroupId {
3476                wal_salt1: 10,
3477                wal_salt2: 20,
3478                end_frame_no: 30,
3479            },
3480            recovery_enabled: true,
3481            outcome_is_recovered: false,
3482            fallback_reason: Some(WalFecRecoveryFallbackReason::DecodeFailed),
3483            validated_source_symbols: 2,
3484            validated_repair_symbols: 1,
3485            required_symbols: 6,
3486            available_symbols: 3,
3487            recovered_frame_nos: Vec::new(),
3488            corruption_observations: 2,
3489            decode_attempted: true,
3490            decode_succeeded: false,
3491        };
3492
3493        let symbol_state = symbol_state_for_log(&log);
3494        assert!(symbol_state.contains("source_validated=2/6"));
3495        assert!(symbol_state.contains("repair_validated=1"));
3496        assert!(symbol_state.contains("available=3"));
3497        assert!(symbol_state.contains("required=6"));
3498        assert!(symbol_state.contains("decode_attempted=true"));
3499        assert!(symbol_state.contains("decode_succeeded=false"));
3500    }
3501
3502    /// Build deterministic source pages of `page_size` bytes each.
3503    #[allow(clippy::cast_possible_truncation)]
3504    fn make_source_pages(k: u32, page_size: u32) -> Vec<Vec<u8>> {
3505        let ps = page_size as usize;
3506        (0..k)
3507            .map(|i| {
3508                (0..ps)
3509                    .map(|j| ((i as usize * 37 + j * 13 + 7) % 256) as u8)
3510                    .collect()
3511            })
3512            .collect()
3513    }
3514
3515    #[allow(clippy::cast_possible_truncation)]
3516    fn make_test_init_with_hashes(k: u32, source_pages: &[Vec<u8>]) -> WalFecGroupMetaInit {
3517        let page_size = source_pages[0].len() as u32;
3518        WalFecGroupMetaInit {
3519            wal_salt1: 0x1234_5678,
3520            wal_salt2: 0xABCD_EF01,
3521            start_frame_no: 1,
3522            end_frame_no: k,
3523            db_size_pages: 100,
3524            page_size,
3525            k_source: k,
3526            r_repair: 2,
3527            oti: Oti {
3528                f: u64::from(k) * u64::from(page_size),
3529                al: 0,
3530                t: page_size,
3531                z: 1,
3532                n: 1,
3533            },
3534            object_id: ObjectId::from_bytes([0xAA; 16]),
3535            page_numbers: (1..=k).collect(),
3536            source_page_xxh3_128: build_source_page_hashes(source_pages),
3537        }
3538    }
3539
3540    #[test]
3541    fn test_raptorq_encode_produces_valid_symbols() {
3542        let k = 4_u32;
3543        let page_size = 4096_u32;
3544        let source_pages = make_source_pages(k, page_size);
3545        let init = make_test_init_with_hashes(k, &source_pages);
3546        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3547
3548        let symbols =
3549            generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3550                .expect("encode should succeed")
3551                .expect("should not be cancelled");
3552
3553        assert_eq!(symbols.len(), 2, "expected r_repair=2 repair symbols");
3554        for (i, sym) in symbols.iter().enumerate() {
3555            assert_eq!(
3556                sym.symbol_data.len(),
3557                page_size as usize,
3558                "repair symbol {i} size"
3559            );
3560            let expected_esi = k + u32::try_from(i).expect("i fits u32");
3561            assert_eq!(sym.esi, expected_esi, "repair symbol {i} ESI");
3562        }
3563    }
3564
3565    #[test]
3566    fn test_raptorq_encode_decode_roundtrip_all_source() {
3567        // When all source symbols are available, decode should still succeed.
3568        // Use enough repair symbols to satisfy the decoder's internal constraint
3569        // matrix requirements (LDPC + HDPC parity checks).
3570        let k = 4_u32;
3571        let page_size = 512_u32;
3572        let source_pages = make_source_pages(k, page_size);
3573        let mut init = make_test_init_with_hashes(k, &source_pages);
3574        init.r_repair = 8;
3575        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3576
3577        let repair_symbols =
3578            generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3579                .expect("encode")
3580                .expect("not cancelled");
3581
3582        // Build (esi, data) pairs: all source + all repair.
3583        let mut all_symbols: Vec<(u32, Vec<u8>)> = source_pages
3584            .iter()
3585            .enumerate()
3586            .map(|(i, page)| (u32::try_from(i).expect("i fits u32"), page.clone()))
3587            .collect();
3588        for sym in &repair_symbols {
3589            all_symbols.push((sym.esi, sym.symbol_data.clone()));
3590        }
3591
3592        let decoded = wal_fec_raptorq_decode(&meta, &all_symbols)
3593            .expect("decode with all symbols should succeed");
3594
3595        for (i, original) in source_pages.iter().enumerate() {
3596            assert_eq!(&decoded[i], original, "decoded source page {i} mismatch");
3597        }
3598    }
3599
3600    #[test]
3601    fn test_raptorq_encode_decode_roundtrip_with_corruption() {
3602        // Lose one source page, recover from remaining source + repair symbols.
3603        // Use generous repair count to satisfy decoder constraint matrix.
3604        let k = 4_u32;
3605        let page_size = 512_u32;
3606        let r_repair = 8_u32; // Need enough repair symbols for recovery
3607        let source_pages = make_source_pages(k, page_size);
3608        let mut init = make_test_init_with_hashes(k, &source_pages);
3609        init.r_repair = r_repair;
3610        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3611
3612        let repair_symbols =
3613            generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3614                .expect("encode")
3615                .expect("not cancelled");
3616
3617        // Simulate losing source page 1: only provide pages 0, 2, 3 + all repair.
3618        let mut available_symbols: Vec<(u32, Vec<u8>)> = Vec::new();
3619        for (i, page) in source_pages.iter().enumerate() {
3620            if i != 1 {
3621                available_symbols.push((u32::try_from(i).expect("i fits u32"), page.clone()));
3622            }
3623        }
3624        for sym in &repair_symbols {
3625            available_symbols.push((sym.esi, sym.symbol_data.clone()));
3626        }
3627
3628        let decoded = wal_fec_raptorq_decode(&meta, &available_symbols)
3629            .expect("decode should recover missing page");
3630
3631        for (i, original) in source_pages.iter().enumerate() {
3632            assert_eq!(
3633                &decoded[i], original,
3634                "decoded source page {i} mismatch (page 1 was lost)"
3635            );
3636        }
3637    }
3638
3639    #[test]
3640    fn test_raptorq_encode_deterministic() {
3641        // Same inputs produce identical repair symbols.
3642        let k = 3_u32;
3643        let page_size = 512_u32;
3644        let source_pages = make_source_pages(k, page_size);
3645        let init = make_test_init_with_hashes(k, &source_pages);
3646        let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3647
3648        let symbols1 =
3649            generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3650                .expect("encode 1")
3651                .expect("not cancelled");
3652
3653        let symbols2 =
3654            generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3655                .expect("encode 2")
3656                .expect("not cancelled");
3657
3658        assert_eq!(symbols1.len(), symbols2.len());
3659        for (i, (s1, s2)) in symbols1.iter().zip(symbols2.iter()).enumerate() {
3660            assert_eq!(
3661                s1.symbol_data, s2.symbol_data,
3662                "repair symbol {i} not deterministic"
3663            );
3664        }
3665    }
3666
3667    #[test]
3668    fn test_raptorq_telemetry_records_metrics_and_events() {
3669        let _guard = telemetry_test_lock();
3670        reset_raptorq_repair_telemetry();
3671
3672        let group_id = WalFecGroupId {
3673            wal_salt1: 0xAA11_BB22,
3674            wal_salt2: 0xCC33_DD44,
3675            end_frame_no: 42,
3676        };
3677        let log = WalFecRecoveryLog {
3678            group_id,
3679            recovery_enabled: true,
3680            outcome_is_recovered: true,
3681            fallback_reason: None,
3682            validated_source_symbols: 4,
3683            validated_repair_symbols: 2,
3684            required_symbols: 6,
3685            available_symbols: 6,
3686            recovered_frame_nos: vec![40, 41],
3687            corruption_observations: 1,
3688            decode_attempted: true,
3689            decode_succeeded: true,
3690        };
3691        record_raptorq_recovery_log(&log, Duration::from_micros(75));
3692
3693        let metrics = raptorq_repair_metrics_snapshot();
3694        assert_eq!(metrics.repairs_total, 1);
3695        assert_eq!(metrics.repairs_failed, 0);
3696        assert_eq!(metrics.symbols_reclaimed, 2);
3697        assert_eq!(metrics.budget_utilization_pct, 100);
3698        assert_eq!(metrics.severity_histogram.two_to_five, 1);
3699        assert_eq!(metrics.wal_health_score, 81);
3700
3701        let events = raptorq_repair_events_snapshot(10);
3702        assert_eq!(events.len(), 1);
3703        let event = &events[0];
3704        assert_eq!(event.frame_id, 42);
3705        assert_eq!(event.symbols_lost, 2);
3706        assert_eq!(event.symbols_used, 6);
3707        assert!(event.repair_success);
3708        assert_eq!(event.severity_bucket, WalFecRepairSeverityBucket::TwoToFive);
3709        assert_eq!(event.budget_utilization_pct, 100);
3710    }
3711
3712    #[test]
3713    fn test_raptorq_telemetry_health_penalizes_failures() {
3714        let _guard = telemetry_test_lock();
3715        reset_raptorq_repair_telemetry();
3716
3717        let success_group = WalFecGroupId {
3718            wal_salt1: 1,
3719            wal_salt2: 2,
3720            end_frame_no: 10,
3721        };
3722        let success = WalFecRecoveryLog {
3723            group_id: success_group,
3724            recovery_enabled: true,
3725            outcome_is_recovered: true,
3726            fallback_reason: None,
3727            validated_source_symbols: 5,
3728            validated_repair_symbols: 1,
3729            required_symbols: 6,
3730            available_symbols: 6,
3731            recovered_frame_nos: vec![9],
3732            corruption_observations: 0,
3733            decode_attempted: true,
3734            decode_succeeded: true,
3735        };
3736        record_raptorq_recovery_log(&success, Duration::from_micros(50));
3737
3738        let failed_group = WalFecGroupId {
3739            wal_salt1: 3,
3740            wal_salt2: 4,
3741            end_frame_no: 20,
3742        };
3743        let failed = WalFecRecoveryLog {
3744            group_id: failed_group,
3745            recovery_enabled: true,
3746            outcome_is_recovered: false,
3747            fallback_reason: Some(WalFecRecoveryFallbackReason::InsufficientSymbols),
3748            validated_source_symbols: 2,
3749            validated_repair_symbols: 2,
3750            required_symbols: 6,
3751            available_symbols: 4,
3752            recovered_frame_nos: Vec::new(),
3753            corruption_observations: 2,
3754            decode_attempted: true,
3755            decode_succeeded: false,
3756        };
3757        record_raptorq_recovery_log(&failed, Duration::from_micros(90));
3758        record_raptorq_recovery_log(&failed, Duration::from_micros(120));
3759
3760        let metrics = raptorq_repair_metrics_snapshot();
3761        assert_eq!(metrics.repairs_total, 3);
3762        assert_eq!(metrics.repairs_failed, 2);
3763        assert!(metrics.wal_health_score < 100);
3764        assert_eq!(metrics.severity_histogram.one, 1);
3765        assert_eq!(metrics.severity_histogram.two_to_five, 2);
3766    }
3767
3768    #[test]
3769    fn test_raptorq_telemetry_histogram_buckets() {
3770        let _guard = telemetry_test_lock();
3771        reset_raptorq_repair_telemetry();
3772
3773        let samples = [
3774            (1_u32, WalFecRepairSeverityBucket::One),
3775            (3_u32, WalFecRepairSeverityBucket::TwoToFive),
3776            (8_u32, WalFecRepairSeverityBucket::SixToTen),
3777            (12_u32, WalFecRepairSeverityBucket::ElevenPlus),
3778        ];
3779
3780        for (idx, (loss, expected_bucket)) in samples.iter().enumerate() {
3781            let group_id = WalFecGroupId {
3782                wal_salt1: 10 + u32::try_from(idx).expect("small index"),
3783                wal_salt2: 20 + u32::try_from(idx).expect("small index"),
3784                end_frame_no: 30 + u32::try_from(idx).expect("small index"),
3785            };
3786            let log = WalFecRecoveryLog {
3787                group_id,
3788                recovery_enabled: true,
3789                outcome_is_recovered: true,
3790                fallback_reason: None,
3791                validated_source_symbols: 20_u32.saturating_sub(*loss),
3792                validated_repair_symbols: *loss,
3793                required_symbols: 20,
3794                available_symbols: 20,
3795                recovered_frame_nos: vec![group_id.end_frame_no],
3796                corruption_observations: 0,
3797                decode_attempted: true,
3798                decode_succeeded: true,
3799            };
3800            record_raptorq_recovery_log(&log, Duration::from_micros(30));
3801
3802            let events = raptorq_repair_events_snapshot(1);
3803            let event = events
3804                .last()
3805                .expect("latest event must be present after recording");
3806            assert_eq!(event.severity_bucket, *expected_bucket);
3807        }
3808
3809        let metrics = raptorq_repair_metrics_snapshot();
3810        assert_eq!(metrics.severity_histogram.one, 1);
3811        assert_eq!(metrics.severity_histogram.two_to_five, 1);
3812        assert_eq!(metrics.severity_histogram.six_to_ten, 1);
3813        assert_eq!(metrics.severity_histogram.eleven_plus, 1);
3814    }
3815
3816    #[test]
3817    fn test_raptorq_repair_evidence_chain_and_capacity() {
3818        let _guard = telemetry_test_lock();
3819        reset_raptorq_repair_telemetry();
3820
3821        let first_group = WalFecGroupId {
3822            wal_salt1: 0x0A0A_0B0B,
3823            wal_salt2: 0x0C0C_0D0D,
3824            end_frame_no: 101,
3825        };
3826        let first_log = WalFecRecoveryLog {
3827            group_id: first_group,
3828            recovery_enabled: true,
3829            outcome_is_recovered: true,
3830            fallback_reason: None,
3831            validated_source_symbols: 5,
3832            validated_repair_symbols: 1,
3833            required_symbols: 6,
3834            available_symbols: 6,
3835            recovered_frame_nos: vec![100, 101],
3836            corruption_observations: 1,
3837            decode_attempted: true,
3838            decode_succeeded: true,
3839        };
3840        record_raptorq_recovery_log(&first_log, Duration::from_micros(10));
3841
3842        let second_group = WalFecGroupId {
3843            wal_salt1: 0x1A1A_1B1B,
3844            wal_salt2: 0x1C1C_1D1D,
3845            end_frame_no: 202,
3846        };
3847        let second_log = WalFecRecoveryLog {
3848            group_id: second_group,
3849            recovery_enabled: true,
3850            outcome_is_recovered: false,
3851            fallback_reason: Some(WalFecRecoveryFallbackReason::InsufficientSymbols),
3852            validated_source_symbols: 2,
3853            validated_repair_symbols: 2,
3854            required_symbols: 6,
3855            available_symbols: 4,
3856            recovered_frame_nos: Vec::new(),
3857            corruption_observations: 2,
3858            decode_attempted: true,
3859            decode_succeeded: false,
3860        };
3861        record_raptorq_recovery_log(&second_log, Duration::from_micros(25));
3862
3863        let cards = raptorq_repair_evidence_snapshot(0);
3864        assert_eq!(cards.len(), 2);
3865        assert_eq!(cards[0].ledger_epoch, 1);
3866        assert_eq!(cards[1].ledger_epoch, 2);
3867        assert_ne!(cards[0].chain_hash, cards[1].chain_hash);
3868        assert!(cards[0].monotonic_timestamp_ns > 0);
3869        assert!(cards[0].wall_clock_unix_ns > 0);
3870        assert_eq!(cards[0].frame_id, first_group.end_frame_no);
3871        assert_eq!(cards[1].frame_id, second_group.end_frame_no);
3872        assert_eq!(cards[0].confidence_per_mille, 1_000);
3873        assert_eq!(cards[1].confidence_per_mille, 666);
3874        assert_ne!(cards[0].witness.corrupted_hash_blake3, [0_u8; 32]);
3875    }
3876
3877    #[test]
3878    fn test_raptorq_repair_evidence_query_filters() {
3879        let _guard = telemetry_test_lock();
3880        reset_raptorq_repair_telemetry();
3881
3882        let logs = [
3883            WalFecRecoveryLog {
3884                group_id: WalFecGroupId {
3885                    wal_salt1: 11,
3886                    wal_salt2: 12,
3887                    end_frame_no: 301,
3888                },
3889                recovery_enabled: true,
3890                outcome_is_recovered: true,
3891                fallback_reason: None,
3892                validated_source_symbols: 9,
3893                validated_repair_symbols: 1,
3894                required_symbols: 10,
3895                available_symbols: 10,
3896                recovered_frame_nos: vec![301],
3897                corruption_observations: 0,
3898                decode_attempted: true,
3899                decode_succeeded: true,
3900            },
3901            WalFecRecoveryLog {
3902                group_id: WalFecGroupId {
3903                    wal_salt1: 21,
3904                    wal_salt2: 22,
3905                    end_frame_no: 302,
3906                },
3907                recovery_enabled: true,
3908                outcome_is_recovered: true,
3909                fallback_reason: None,
3910                validated_source_symbols: 7,
3911                validated_repair_symbols: 3,
3912                required_symbols: 10,
3913                available_symbols: 10,
3914                recovered_frame_nos: vec![302],
3915                corruption_observations: 1,
3916                decode_attempted: true,
3917                decode_succeeded: true,
3918            },
3919            WalFecRecoveryLog {
3920                group_id: WalFecGroupId {
3921                    wal_salt1: 31,
3922                    wal_salt2: 32,
3923                    end_frame_no: 303,
3924                },
3925                recovery_enabled: true,
3926                outcome_is_recovered: true,
3927                fallback_reason: None,
3928                validated_source_symbols: 1,
3929                validated_repair_symbols: 9,
3930                required_symbols: 10,
3931                available_symbols: 10,
3932                recovered_frame_nos: vec![303],
3933                corruption_observations: 3,
3934                decode_attempted: true,
3935                decode_succeeded: true,
3936            },
3937        ];
3938        for log in &logs {
3939            record_raptorq_recovery_log(log, Duration::from_micros(15));
3940        }
3941
3942        let cards = raptorq_repair_evidence_snapshot(0);
3943        assert_eq!(cards.len(), 3);
3944
3945        let by_frame = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3946            frame_id: Some(302),
3947            ..WalFecRepairEvidenceQuery::default()
3948        });
3949        assert_eq!(by_frame.len(), 1);
3950        assert_eq!(by_frame[0].frame_id, 302);
3951
3952        let by_severity = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3953            severity_bucket: Some(WalFecRepairSeverityBucket::SixToTen),
3954            ..WalFecRepairEvidenceQuery::default()
3955        });
3956        assert_eq!(by_severity.len(), 1);
3957        assert_eq!(by_severity[0].frame_id, 303);
3958
3959        let min_time = cards
3960            .first()
3961            .map(|card| card.wall_clock_unix_ns)
3962            .expect("evidence cards should include timestamps");
3963        let max_time = cards
3964            .last()
3965            .map(|card| card.wall_clock_unix_ns)
3966            .expect("evidence cards should include timestamps");
3967        let by_time = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3968            wall_clock_start_ns: Some(min_time),
3969            wall_clock_end_ns: Some(max_time),
3970            ..WalFecRepairEvidenceQuery::default()
3971        });
3972        assert_eq!(by_time.len(), 3);
3973
3974        let limited = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3975            limit: Some(2),
3976            ..WalFecRepairEvidenceQuery::default()
3977        });
3978        assert_eq!(limited.len(), 2);
3979        assert_eq!(limited[0].frame_id, 302);
3980        assert_eq!(limited[1].frame_id, 303);
3981    }
3982
3983    // -- Pure-logic tests for untested public API surface --
3984
3985    #[test]
3986    fn test_wal_fec_path_for_wal_dash_suffix() {
3987        let path = wal_fec_path_for_wal(Path::new("/tmp/test.db-wal"));
3988        assert_eq!(path, PathBuf::from("/tmp/test.db-wal-fec"));
3989    }
3990
3991    #[test]
3992    fn test_wal_fec_path_for_wal_dot_suffix() {
3993        let path = wal_fec_path_for_wal(Path::new("/tmp/test.wal"));
3994        assert_eq!(path, PathBuf::from("/tmp/test.wal-fec"));
3995    }
3996
3997    #[test]
3998    fn test_wal_fec_path_for_wal_no_wal_suffix() {
3999        let path = wal_fec_path_for_wal(Path::new("/tmp/test.db"));
4000        assert_eq!(path, PathBuf::from("/tmp/test.db.wal-fec"));
4001    }
4002
4003    #[test]
4004    fn test_build_source_page_hashes_deterministic() {
4005        let pages = vec![vec![0xAA; 4096], vec![0xBB; 4096]];
4006        let hashes1 = build_source_page_hashes(&pages);
4007        let hashes2 = build_source_page_hashes(&pages);
4008        assert_eq!(hashes1, hashes2);
4009        assert_eq!(hashes1.len(), 2);
4010    }
4011
4012    #[test]
4013    fn test_build_source_page_hashes_empty() {
4014        let hashes = build_source_page_hashes(&[]);
4015        assert!(hashes.is_empty());
4016    }
4017
4018    #[test]
4019    fn test_build_source_page_hashes_distinct_pages_produce_distinct_hashes() {
4020        let pages = vec![vec![0x00; 4096], vec![0xFF; 4096]];
4021        let hashes = build_source_page_hashes(&pages);
4022        assert_ne!(hashes[0], hashes[1]);
4023    }
4024
4025    #[test]
4026    fn test_verify_salt_binding_match() {
4027        let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4028        let salts = WalSalts {
4029            salt1: 0x1234_5678,
4030            salt2: 0xABCD_EF01,
4031        };
4032        assert!(meta.verify_salt_binding(salts).is_ok());
4033    }
4034
4035    #[test]
4036    fn test_verify_salt_binding_mismatch_salt1() {
4037        let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4038        let salts = WalSalts {
4039            salt1: 0xDEAD_BEEF,
4040            salt2: 0xABCD_EF01,
4041        };
4042        let err = meta.verify_salt_binding(salts);
4043        assert!(err.is_err());
4044        assert!(err.unwrap_err().to_string().contains("salt mismatch"));
4045    }
4046
4047    #[test]
4048    fn test_verify_salt_binding_mismatch_salt2() {
4049        let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4050        let salts = WalSalts {
4051            salt1: 0x1234_5678,
4052            salt2: 0x0000_0000,
4053        };
4054        assert!(meta.verify_salt_binding(salts).is_err());
4055    }
4056
4057    #[test]
4058    fn test_group_record_new_wrong_repair_count() {
4059        let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4060        let result = WalFecGroupRecord::new(meta, vec![]);
4061        assert!(result.is_err());
4062        assert!(
4063            result
4064                .unwrap_err()
4065                .to_string()
4066                .contains("repair symbol count")
4067        );
4068    }
4069
4070    #[test]
4071    fn test_group_record_new_wrong_object_id() {
4072        let meta = WalFecGroupMeta::from_init(make_test_init(1)).expect("from_init");
4073        let bad_symbol = SymbolRecord {
4074            esi: meta.k_source,
4075            object_id: ObjectId::from_bytes([0xBB; 16]),
4076            oti: meta.oti,
4077            flags: SymbolRecordFlags::empty(),
4078            symbol_data: vec![0; meta.page_size as usize],
4079            frame_xxh3: 0,
4080            auth_tag: [0; 16],
4081        };
4082        let bad_symbol2 = SymbolRecord {
4083            esi: meta.k_source + 1,
4084            object_id: ObjectId::from_bytes([0xBB; 16]),
4085            oti: meta.oti,
4086            flags: SymbolRecordFlags::empty(),
4087            symbol_data: vec![0; meta.page_size as usize],
4088            frame_xxh3: 0,
4089            auth_tag: [0; 16],
4090        };
4091        let result = WalFecGroupRecord::new(meta, vec![bad_symbol, bad_symbol2]);
4092        assert!(result.is_err());
4093        assert!(
4094            result
4095                .unwrap_err()
4096                .to_string()
4097                .contains("object_id mismatch")
4098        );
4099    }
4100
4101    #[test]
4102    fn test_group_record_new_wrong_esi() {
4103        let meta = WalFecGroupMeta::from_init(make_test_init(1)).expect("from_init");
4104        let sym = SymbolRecord {
4105            esi: 999,
4106            object_id: meta.object_id,
4107            oti: meta.oti,
4108            flags: SymbolRecordFlags::empty(),
4109            symbol_data: vec![0; meta.page_size as usize],
4110            frame_xxh3: 0,
4111            auth_tag: [0; 16],
4112        };
4113        let sym2 = SymbolRecord {
4114            esi: 1000,
4115            object_id: meta.object_id,
4116            oti: meta.oti,
4117            flags: SymbolRecordFlags::empty(),
4118            symbol_data: vec![0; meta.page_size as usize],
4119            frame_xxh3: 0,
4120            auth_tag: [0; 16],
4121        };
4122        let result = WalFecGroupRecord::new(meta, vec![sym, sym2]);
4123        assert!(result.is_err());
4124        assert!(result.unwrap_err().to_string().contains("ESI"));
4125    }
4126
4127    #[test]
4128    fn test_group_id_display() {
4129        let id = WalFecGroupId {
4130            wal_salt1: 1,
4131            wal_salt2: 2,
4132            end_frame_no: 42,
4133        };
4134        assert_eq!(format!("{id}"), "(1, 2, 42)");
4135    }
4136
4137    #[test]
4138    fn test_severity_bucket_as_str_roundtrip() {
4139        let buckets = [
4140            WalFecRepairSeverityBucket::One,
4141            WalFecRepairSeverityBucket::TwoToFive,
4142            WalFecRepairSeverityBucket::SixToTen,
4143            WalFecRepairSeverityBucket::ElevenPlus,
4144        ];
4145        for bucket in buckets {
4146            let s = bucket.as_str();
4147            let parsed: WalFecRepairSeverityBucket = s.parse().expect(s);
4148            assert_eq!(parsed, bucket);
4149        }
4150    }
4151
4152    #[test]
4153    fn test_severity_bucket_from_str_aliases() {
4154        assert_eq!(
4155            "one".parse::<WalFecRepairSeverityBucket>().unwrap(),
4156            WalFecRepairSeverityBucket::One
4157        );
4158        assert_eq!(
4159            "two-to-five".parse::<WalFecRepairSeverityBucket>().unwrap(),
4160            WalFecRepairSeverityBucket::TwoToFive
4161        );
4162        assert_eq!(
4163            "six_to_ten".parse::<WalFecRepairSeverityBucket>().unwrap(),
4164            WalFecRepairSeverityBucket::SixToTen
4165        );
4166        assert_eq!(
4167            "eleven_plus".parse::<WalFecRepairSeverityBucket>().unwrap(),
4168            WalFecRepairSeverityBucket::ElevenPlus
4169        );
4170        assert!("unknown".parse::<WalFecRepairSeverityBucket>().is_err());
4171    }
4172
4173    #[test]
4174    fn test_severity_histogram_bump_all_buckets() {
4175        let mut hist = WalFecRepairSeverityHistogram::default();
4176        assert_eq!(hist.one, 0);
4177        hist.bump(WalFecRepairSeverityBucket::One);
4178        hist.bump(WalFecRepairSeverityBucket::One);
4179        hist.bump(WalFecRepairSeverityBucket::TwoToFive);
4180        hist.bump(WalFecRepairSeverityBucket::SixToTen);
4181        hist.bump(WalFecRepairSeverityBucket::ElevenPlus);
4182        hist.bump(WalFecRepairSeverityBucket::ElevenPlus);
4183        assert_eq!(hist.one, 2);
4184        assert_eq!(hist.two_to_five, 1);
4185        assert_eq!(hist.six_to_ten, 1);
4186        assert_eq!(hist.eleven_plus, 2);
4187    }
4188
4189    #[test]
4190    fn test_evidence_card_hex_methods() {
4191        let card = WalFecRepairEvidenceCard {
4192            group_id: WalFecGroupId {
4193                wal_salt1: 1,
4194                wal_salt2: 2,
4195                end_frame_no: 10,
4196            },
4197            frame_id: 10,
4198            wal_file_offset_bytes: None,
4199            monotonic_timestamp_ns: 0,
4200            wall_clock_unix_ns: 0,
4201            corruption_signature_blake3: [0xAB; 32],
4202            bit_error_pattern: None,
4203            repair_source: WalFecRepairSource::WalRepairSymbols,
4204            symbols_used: 2,
4205            validated_source_symbols: 1,
4206            validated_repair_symbols: 1,
4207            required_symbols: 2,
4208            available_symbols: 2,
4209            witness: WalFecRepairWitnessTriple {
4210                corrupted_hash_blake3: [0; 32],
4211                repaired_hash_blake3: [0; 32],
4212                expected_hash_blake3: [0; 32],
4213            },
4214            repair_latency_ns: 0,
4215            confidence_per_mille: 1000,
4216            severity_bucket: WalFecRepairSeverityBucket::One,
4217            ledger_epoch: 0,
4218            chain_hash: [0xFF; 32],
4219        };
4220        assert_eq!(card.chain_hash_hex().len(), 64);
4221        assert!(card.chain_hash_hex().chars().all(|c| c == 'f'));
4222        assert_eq!(card.corruption_signature_hex().len(), 64);
4223        assert!(
4224            card.corruption_signature_hex()
4225                .chars()
4226                .all(|c| c == 'a' || c == 'b')
4227        );
4228    }
4229
4230    #[test]
4231    fn test_repair_source_as_str() {
4232        assert_eq!(
4233            WalFecRepairSource::WalRepairSymbols.as_str(),
4234            "wal_repair_symbols"
4235        );
4236        assert_eq!(
4237            WalFecRepairSource::SnapshotRepairSymbols.as_str(),
4238            "snapshot_repair_symbols"
4239        );
4240        assert_eq!(
4241            WalFecRepairSource::WalAndSnapshotRepairSymbols.as_str(),
4242            "wal_and_snapshot_repair_symbols"
4243        );
4244    }
4245
4246    #[test]
4247    fn test_recovery_log_from_recovered_outcome() {
4248        let group_id = WalFecGroupId {
4249            wal_salt1: 10,
4250            wal_salt2: 20,
4251            end_frame_no: 5,
4252        };
4253        let proof = WalFecDecodeProof {
4254            group_id,
4255            required_symbols: 3,
4256            available_symbols: 4,
4257            validated_source_symbols: 2,
4258            validated_repair_symbols: 2,
4259            corruption_observations: 1,
4260            decode_attempted: true,
4261            decode_succeeded: true,
4262            recovered_frame_nos: vec![3, 4],
4263            fallback_reason: None,
4264        };
4265        let meta = WalFecGroupMeta::from_init(make_test_init(3)).expect("from_init");
4266        let outcome = WalFecRecoveryOutcome::Recovered(WalFecRecoveredGroup {
4267            meta,
4268            recovered_pages: vec![vec![0; 4096]; 3],
4269            recovered_frame_nos: vec![3, 4],
4270            db_size_pages: 100,
4271            decode_proof: proof,
4272        });
4273        let log = recovery_log_from_outcome(group_id, &outcome, true);
4274        assert!(log.outcome_is_recovered);
4275        assert!(log.recovery_enabled);
4276        assert!(log.fallback_reason.is_none());
4277        assert_eq!(log.validated_source_symbols, 2);
4278        assert_eq!(log.validated_repair_symbols, 2);
4279        assert_eq!(log.recovered_frame_nos, vec![3, 4]);
4280        assert!(log.decode_attempted);
4281        assert!(log.decode_succeeded);
4282    }
4283
4284    #[test]
4285    fn test_recovery_log_from_truncate_outcome() {
4286        let group_id = WalFecGroupId {
4287            wal_salt1: 10,
4288            wal_salt2: 20,
4289            end_frame_no: 5,
4290        };
4291        let proof = WalFecDecodeProof {
4292            group_id,
4293            required_symbols: 3,
4294            available_symbols: 1,
4295            validated_source_symbols: 1,
4296            validated_repair_symbols: 0,
4297            corruption_observations: 2,
4298            decode_attempted: false,
4299            decode_succeeded: false,
4300            recovered_frame_nos: vec![],
4301            fallback_reason: Some(WalFecRecoveryFallbackReason::InsufficientSymbols),
4302        };
4303        let outcome = WalFecRecoveryOutcome::TruncateBeforeGroup {
4304            truncate_before_frame_no: 3,
4305            decode_proof: proof,
4306        };
4307        let log = recovery_log_from_outcome(group_id, &outcome, true);
4308        assert!(!log.outcome_is_recovered);
4309        assert_eq!(
4310            log.fallback_reason,
4311            Some(WalFecRecoveryFallbackReason::InsufficientSymbols)
4312        );
4313        assert_eq!(log.available_symbols, 1);
4314        assert!(!log.decode_attempted);
4315    }
4316
4317    #[test]
4318    fn test_recovery_fallback_reason_codes_all_variants() {
4319        let variants = [
4320            (
4321                WalFecRecoveryFallbackReason::MissingSidecarGroup,
4322                "missing_sidecar_group",
4323            ),
4324            (
4325                WalFecRecoveryFallbackReason::SidecarUnreadable,
4326                "sidecar_unreadable",
4327            ),
4328            (WalFecRecoveryFallbackReason::SaltMismatch, "salt_mismatch"),
4329            (
4330                WalFecRecoveryFallbackReason::InsufficientSymbols,
4331                "insufficient_symbols",
4332            ),
4333            (WalFecRecoveryFallbackReason::DecodeFailed, "decode_failed"),
4334            (
4335                WalFecRecoveryFallbackReason::DecodedPayloadMismatch,
4336                "decoded_payload_mismatch",
4337            ),
4338            (
4339                WalFecRecoveryFallbackReason::RecoveryDisabled,
4340                "recovery_disabled",
4341            ),
4342        ];
4343        for (variant, expected_code) in variants {
4344            assert_eq!(variant.reason_code(), expected_code);
4345        }
4346    }
4347
4348    #[test]
4349    fn test_meta_from_init_rejects_zero_start_frame() {
4350        let mut init = make_test_init(2);
4351        init.start_frame_no = 0;
4352        init.k_source = 0;
4353        assert!(WalFecGroupMeta::from_init(init).is_err());
4354    }
4355
4356    #[test]
4357    fn test_meta_from_init_rejects_end_before_start() {
4358        let mut init = make_test_init(3);
4359        init.start_frame_no = 5;
4360        init.end_frame_no = 3;
4361        assert!(WalFecGroupMeta::from_init(init).is_err());
4362    }
4363
4364    #[test]
4365    fn test_meta_from_init_rejects_zero_r_repair() {
4366        let mut init = make_test_init(2);
4367        init.r_repair = 0;
4368        assert!(WalFecGroupMeta::from_init(init).is_err());
4369    }
4370
4371    #[test]
4372    fn test_meta_from_init_rejects_zero_db_size() {
4373        let mut init = make_test_init(2);
4374        init.db_size_pages = 0;
4375        assert!(WalFecGroupMeta::from_init(init).is_err());
4376    }
4377
4378    #[test]
4379    fn test_recovery_config_default_enabled() {
4380        let config = WalFecRecoveryConfig::default();
4381        assert!(config.recovery_enabled);
4382    }
4383
4384    #[test]
4385    fn test_repair_pipeline_config_default() {
4386        let config = WalFecRepairPipelineConfig::default();
4387        assert_eq!(config.queue_capacity, 64);
4388        assert_eq!(config.per_symbol_delay, Duration::ZERO);
4389    }
4390
4391    #[test]
4392    fn wal_fec_group_id_display_hash_eq() {
4393        use std::collections::HashSet;
4394        let a = WalFecGroupId {
4395            wal_salt1: 1,
4396            wal_salt2: 2,
4397            end_frame_no: 10,
4398        };
4399        let b = WalFecGroupId {
4400            wal_salt1: 1,
4401            wal_salt2: 2,
4402            end_frame_no: 11,
4403        };
4404        assert_ne!(a, b);
4405        let copied = a;
4406        assert_eq!(copied, a);
4407        let display = format!("{a}");
4408        assert_eq!(display, "(1, 2, 10)");
4409        let mut set = HashSet::new();
4410        set.insert(a);
4411        set.insert(b);
4412        assert_eq!(set.len(), 2);
4413    }
4414
4415    #[test]
4416    fn wal_fec_recovery_fallback_reason_all_variants() {
4417        let variants = [
4418            (
4419                WalFecRecoveryFallbackReason::MissingSidecarGroup,
4420                "missing_sidecar_group",
4421            ),
4422            (
4423                WalFecRecoveryFallbackReason::SidecarUnreadable,
4424                "sidecar_unreadable",
4425            ),
4426            (WalFecRecoveryFallbackReason::SaltMismatch, "salt_mismatch"),
4427            (
4428                WalFecRecoveryFallbackReason::InsufficientSymbols,
4429                "insufficient_symbols",
4430            ),
4431            (WalFecRecoveryFallbackReason::DecodeFailed, "decode_failed"),
4432            (
4433                WalFecRecoveryFallbackReason::DecodedPayloadMismatch,
4434                "decoded_payload_mismatch",
4435            ),
4436            (
4437                WalFecRecoveryFallbackReason::RecoveryDisabled,
4438                "recovery_disabled",
4439            ),
4440        ];
4441        for (v, code) in &variants {
4442            assert_eq!(v.reason_code(), *code);
4443            let copied = *v;
4444            assert_eq!(copied, *v);
4445        }
4446        let dbg = format!("{:?}", WalFecRecoveryFallbackReason::SaltMismatch);
4447        assert!(dbg.contains("SaltMismatch"));
4448    }
4449
4450    #[test]
4451    fn wal_fec_repair_severity_bucket_as_str_and_from_str() {
4452        assert_eq!(WalFecRepairSeverityBucket::One.as_str(), "1");
4453        assert_eq!(WalFecRepairSeverityBucket::TwoToFive.as_str(), "2-5");
4454        assert_eq!(WalFecRepairSeverityBucket::SixToTen.as_str(), "6-10");
4455        assert_eq!(WalFecRepairSeverityBucket::ElevenPlus.as_str(), "11+");
4456        assert_eq!(
4457            "1".parse::<WalFecRepairSeverityBucket>().unwrap(),
4458            WalFecRepairSeverityBucket::One
4459        );
4460        assert_eq!(
4461            "two-to-five".parse::<WalFecRepairSeverityBucket>().unwrap(),
4462            WalFecRepairSeverityBucket::TwoToFive
4463        );
4464        assert!("invalid".parse::<WalFecRepairSeverityBucket>().is_err());
4465    }
4466
4467    #[test]
4468    fn wal_fec_repair_source_all_variants_debug_copy_eq() {
4469        let variants = [
4470            WalFecRepairSource::WalRepairSymbols,
4471            WalFecRepairSource::SnapshotRepairSymbols,
4472            WalFecRepairSource::WalAndSnapshotRepairSymbols,
4473        ];
4474        for (i, v) in variants.iter().enumerate() {
4475            let copied = *v;
4476            assert_eq!(copied, *v);
4477            for (j, w) in variants.iter().enumerate() {
4478                assert_eq!(i == j, v == w);
4479            }
4480        }
4481        let dbg = format!("{:?}", WalFecRepairSource::SnapshotRepairSymbols);
4482        assert!(dbg.contains("SnapshotRepairSymbols"));
4483    }
4484
4485    #[test]
4486    fn wal_fec_repair_pipeline_config_default_and_copy() {
4487        let cfg = WalFecRepairPipelineConfig::default();
4488        assert_eq!(cfg.queue_capacity, 64);
4489        assert_eq!(cfg.per_symbol_delay, Duration::ZERO);
4490        let copied = cfg;
4491        assert_eq!(copied, cfg);
4492        let dbg = format!("{cfg:?}");
4493        assert!(dbg.contains("WalFecRepairPipelineConfig"));
4494    }
4495
4496    #[test]
4497    fn wal_fec_repair_pipeline_stats_default_all_zero() {
4498        let stats = WalFecRepairPipelineStats::default();
4499        assert_eq!(stats.pending_jobs, 0);
4500        assert_eq!(stats.completed_jobs, 0);
4501        assert_eq!(stats.failed_jobs, 0);
4502        assert_eq!(stats.canceled_jobs, 0);
4503        assert_eq!(stats.max_pending_jobs, 0);
4504        let copied = stats;
4505        assert_eq!(copied, stats);
4506    }
4507
4508    #[test]
4509    fn wal_frame_candidate_clone_eq_debug() {
4510        let c = WalFrameCandidate {
4511            frame_no: 7,
4512            page_data: vec![0xAB; 16],
4513        };
4514        let cloned = c.clone();
4515        assert_eq!(cloned, c);
4516        assert_eq!(c.frame_no, 7);
4517        assert_eq!(c.page_data.len(), 16);
4518        let dbg = format!("{c:?}");
4519        assert!(dbg.contains("WalFrameCandidate"));
4520    }
4521
4522    #[test]
4523    fn wal_fec_decode_proof_clone_eq_debug() {
4524        let proof = WalFecDecodeProof {
4525            group_id: WalFecGroupId {
4526                wal_salt1: 1,
4527                wal_salt2: 2,
4528                end_frame_no: 4,
4529            },
4530            required_symbols: 4,
4531            available_symbols: 6,
4532            validated_source_symbols: 4,
4533            validated_repair_symbols: 2,
4534            corruption_observations: 0,
4535            decode_attempted: true,
4536            decode_succeeded: true,
4537            recovered_frame_nos: vec![1, 2, 3, 4],
4538            fallback_reason: None,
4539        };
4540        let cloned = proof.clone();
4541        assert_eq!(cloned, proof);
4542        assert!(proof.decode_succeeded);
4543        assert_eq!(proof.recovered_frame_nos.len(), 4);
4544        let dbg = format!("{proof:?}");
4545        assert!(dbg.contains("WalFecDecodeProof"));
4546    }
4547}