1use std::collections::{BTreeMap, VecDeque};
10use std::fmt;
11use std::fs;
12use std::io::Write;
13use std::mem::size_of;
14use std::path::{Path, PathBuf};
15use std::str::FromStr;
16use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
18use std::thread;
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use asupersync::channel::mpsc;
22use asupersync::cx::Cx as NativeCx;
23use asupersync::runtime::{
24 JoinHandle as AsyncJoinHandle, RuntimeHandle, spawn_blocking, yield_now,
25};
26use fsqlite_error::{FrankenError, Result};
27use fsqlite_types::{ObjectId, Oti, PageSize, SymbolRecord, SymbolRecordFlags, cx::Cx};
28use tracing::{debug, error, info, warn};
29use xxhash_rust::xxh3::xxh3_64;
30
31use crate::checksum::{
32 WalSalts, Xxh3Checksum128, verify_wal_fec_source_hash, wal_fec_source_hash_xxh3_128,
33};
34
35pub const WAL_FEC_GROUP_META_MAGIC: [u8; 8] = *b"FSQLWFEC";
37pub const WAL_FEC_GROUP_META_VERSION: u32 = 1;
39pub const DEFAULT_RAPTORQ_REPAIR_SYMBOLS: u8 = 2;
41pub const MAX_RAPTORQ_REPAIR_SYMBOLS: u8 = u8::MAX;
43pub const WAL_FEC_PRAGMA_HEADER_MAGIC: [u8; 8] = *b"FSQLWFCP";
45pub const WAL_FEC_PRAGMA_HEADER_VERSION: u32 = 1;
47
48const LENGTH_PREFIX_BYTES: usize = 4;
49const META_FIXED_PREFIX_BYTES: usize = 8 + 4 + (8 * 4) + 22 + 16;
50const META_CHECKSUM_BYTES: usize = 8;
51const WAL_FEC_PRAGMA_HEADER_BYTES: usize = 8 + 4 + 1 + 3 + 8;
52const RAPTORQ_REPAIR_EVENT_CAPACITY: usize = 512;
56const RAPTORQ_REPAIR_EVIDENCE_CAPACITY: usize = 2048;
59
60type RaptorqRepairEquation = (Vec<usize>, Vec<asupersync::raptorq::gf256::Gf256>);
61
62trait IntoWalFecRepairEquation {
63 fn into_wal_fec_result(self, esi: u32) -> Result<RaptorqRepairEquation>;
64}
65
66impl IntoWalFecRepairEquation for RaptorqRepairEquation {
67 fn into_wal_fec_result(self, _esi: u32) -> Result<RaptorqRepairEquation> {
68 Ok(self)
69 }
70}
71
72impl IntoWalFecRepairEquation for Option<RaptorqRepairEquation> {
73 fn into_wal_fec_result(self, esi: u32) -> Result<RaptorqRepairEquation> {
74 self.ok_or_else(|| FrankenError::WalCorrupt {
75 detail: format!("invalid RaptorQ repair ESI {esi}: unsupported domain"),
76 })
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81struct WalFecPragmaHeader {
82 magic: [u8; 8],
83 version: u32,
84 raptorq_repair_symbols: u8,
85 reserved: [u8; 3],
86 checksum: u64,
87}
88
89impl WalFecPragmaHeader {
90 #[must_use]
91 fn new(raptorq_repair_symbols: u8) -> Self {
92 let mut header = Self {
93 magic: WAL_FEC_PRAGMA_HEADER_MAGIC,
94 version: WAL_FEC_PRAGMA_HEADER_VERSION,
95 raptorq_repair_symbols,
96 reserved: [0; 3],
97 checksum: 0,
98 };
99 header.checksum = header.compute_checksum();
100 header
101 }
102
103 fn from_prefix(bytes: &[u8]) -> Result<Option<Self>> {
104 if bytes.len() < WAL_FEC_PRAGMA_HEADER_BYTES {
105 return Ok(None);
106 }
107
108 let mut magic = [0_u8; 8];
109 magic.copy_from_slice(&bytes[..8]);
110 if magic != WAL_FEC_PRAGMA_HEADER_MAGIC {
111 return Ok(None);
112 }
113
114 let version = u32::from_le_bytes(bytes[8..12].try_into().expect("fixed-length slice"));
115 if version != WAL_FEC_PRAGMA_HEADER_VERSION {
116 return Err(FrankenError::WalCorrupt {
117 detail: format!(
118 "unsupported wal-fec pragma header version {version}, expected {WAL_FEC_PRAGMA_HEADER_VERSION}"
119 ),
120 });
121 }
122
123 let raptorq_repair_symbols = bytes[12];
124 let mut reserved = [0_u8; 3];
125 reserved.copy_from_slice(&bytes[13..16]);
126 let checksum = u64::from_le_bytes(bytes[16..24].try_into().expect("fixed-length slice"));
127
128 let header = Self {
129 magic,
130 version,
131 raptorq_repair_symbols,
132 reserved,
133 checksum,
134 };
135
136 let computed = header.compute_checksum();
137 if computed != checksum {
138 return Err(FrankenError::WalCorrupt {
139 detail: format!(
140 "wal-fec pragma header checksum mismatch: stored {checksum:#018x}, computed {computed:#018x}"
141 ),
142 });
143 }
144
145 Ok(Some(header))
146 }
147
148 #[must_use]
149 fn to_bytes(self) -> [u8; WAL_FEC_PRAGMA_HEADER_BYTES] {
150 let mut out = [0_u8; WAL_FEC_PRAGMA_HEADER_BYTES];
151 out[..8].copy_from_slice(&self.magic);
152 out[8..12].copy_from_slice(&self.version.to_le_bytes());
153 out[12] = self.raptorq_repair_symbols;
154 out[13..16].copy_from_slice(&self.reserved);
155 out[16..24].copy_from_slice(&self.checksum.to_le_bytes());
156 out
157 }
158
159 #[must_use]
160 fn compute_checksum(&self) -> u64 {
161 let mut payload = [0_u8; 16];
162 payload[..8].copy_from_slice(&self.magic);
163 payload[8..12].copy_from_slice(&self.version.to_le_bytes());
164 payload[12] = self.raptorq_repair_symbols;
165 payload[13..16].copy_from_slice(&self.reserved);
166 xxh3_64(&payload)
167 }
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
173pub struct WalFecGroupId {
174 pub wal_salt1: u32,
175 pub wal_salt2: u32,
176 pub end_frame_no: u32,
177}
178
179impl fmt::Display for WalFecGroupId {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 write!(
182 f,
183 "({}, {}, {})",
184 self.wal_salt1, self.wal_salt2, self.end_frame_no
185 )
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct WalFecGroupMetaInit {
192 pub wal_salt1: u32,
193 pub wal_salt2: u32,
194 pub start_frame_no: u32,
195 pub end_frame_no: u32,
196 pub db_size_pages: u32,
197 pub page_size: u32,
198 pub k_source: u32,
199 pub r_repair: u32,
200 pub oti: Oti,
201 pub object_id: ObjectId,
202 pub page_numbers: Vec<u32>,
203 pub source_page_xxh3_128: Vec<Xxh3Checksum128>,
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct WalFecGroupMeta {
209 pub magic: [u8; 8],
210 pub version: u32,
211 pub wal_salt1: u32,
212 pub wal_salt2: u32,
213 pub start_frame_no: u32,
214 pub end_frame_no: u32,
215 pub db_size_pages: u32,
216 pub page_size: u32,
217 pub k_source: u32,
218 pub r_repair: u32,
219 pub oti: Oti,
220 pub object_id: ObjectId,
221 pub page_numbers: Vec<u32>,
222 pub source_page_xxh3_128: Vec<Xxh3Checksum128>,
223 pub checksum: u64,
224}
225
226impl WalFecGroupMeta {
227 pub fn from_init(init: WalFecGroupMetaInit) -> Result<Self> {
229 let mut meta = Self {
230 magic: WAL_FEC_GROUP_META_MAGIC,
231 version: WAL_FEC_GROUP_META_VERSION,
232 wal_salt1: init.wal_salt1,
233 wal_salt2: init.wal_salt2,
234 start_frame_no: init.start_frame_no,
235 end_frame_no: init.end_frame_no,
236 db_size_pages: init.db_size_pages,
237 page_size: init.page_size,
238 k_source: init.k_source,
239 r_repair: init.r_repair,
240 oti: init.oti,
241 object_id: init.object_id,
242 page_numbers: init.page_numbers,
243 source_page_xxh3_128: init.source_page_xxh3_128,
244 checksum: 0,
245 };
246 meta.validate_invariants()?;
247 meta.checksum = meta.compute_checksum();
248 Ok(meta)
249 }
250
251 #[must_use]
253 pub const fn group_id(&self) -> WalFecGroupId {
254 WalFecGroupId {
255 wal_salt1: self.wal_salt1,
256 wal_salt2: self.wal_salt2,
257 end_frame_no: self.end_frame_no,
258 }
259 }
260
261 pub fn verify_salt_binding(&self, salts: WalSalts) -> Result<()> {
263 if self.wal_salt1 != salts.salt1 || self.wal_salt2 != salts.salt2 {
264 return Err(FrankenError::WalCorrupt {
265 detail: format!(
266 "wal-fec salt mismatch for group {}: sidecar=({}, {}), wal=({}, {})",
267 self.group_id(),
268 self.wal_salt1,
269 self.wal_salt2,
270 salts.salt1,
271 salts.salt2
272 ),
273 });
274 }
275 Ok(())
276 }
277
278 #[must_use]
280 pub fn to_record_bytes(&self) -> Vec<u8> {
281 let mut bytes = Vec::with_capacity(self.serialized_len_without_prefix());
282 bytes.extend_from_slice(&self.magic);
283 append_u32_le(&mut bytes, self.version);
284 append_u32_le(&mut bytes, self.wal_salt1);
285 append_u32_le(&mut bytes, self.wal_salt2);
286 append_u32_le(&mut bytes, self.start_frame_no);
287 append_u32_le(&mut bytes, self.end_frame_no);
288 append_u32_le(&mut bytes, self.db_size_pages);
289 append_u32_le(&mut bytes, self.page_size);
290 append_u32_le(&mut bytes, self.k_source);
291 append_u32_le(&mut bytes, self.r_repair);
292 bytes.extend_from_slice(&self.oti.to_bytes());
293 bytes.extend_from_slice(self.object_id.as_bytes());
294 for &page_number in &self.page_numbers {
295 append_u32_le(&mut bytes, page_number);
296 }
297 for &hash in &self.source_page_xxh3_128 {
298 bytes.extend_from_slice(&hash.to_le_bytes());
299 }
300 append_u64_le(&mut bytes, self.checksum);
301 bytes
302 }
303
304 pub fn from_record_bytes(bytes: &[u8]) -> Result<Self> {
306 if bytes.len() < META_FIXED_PREFIX_BYTES + META_CHECKSUM_BYTES {
307 return Err(FrankenError::WalCorrupt {
308 detail: format!(
309 "wal-fec group meta too short: expected at least {}, got {}",
310 META_FIXED_PREFIX_BYTES + META_CHECKSUM_BYTES,
311 bytes.len()
312 ),
313 });
314 }
315
316 let mut cursor = 0usize;
317 let magic = read_array::<8>(bytes, &mut cursor, "magic")?;
318 if magic != WAL_FEC_GROUP_META_MAGIC {
319 return Err(FrankenError::WalCorrupt {
320 detail: format!("invalid wal-fec magic: {magic:02x?}"),
321 });
322 }
323
324 let version = read_u32_le(bytes, &mut cursor, "version")?;
325 if version != WAL_FEC_GROUP_META_VERSION {
326 return Err(FrankenError::WalCorrupt {
327 detail: format!(
328 "unsupported wal-fec version {version}, expected {WAL_FEC_GROUP_META_VERSION}"
329 ),
330 });
331 }
332
333 let wal_salt1 = read_u32_le(bytes, &mut cursor, "wal_salt1")?;
334 let wal_salt2 = read_u32_le(bytes, &mut cursor, "wal_salt2")?;
335 let start_frame_no = read_u32_le(bytes, &mut cursor, "start_frame_no")?;
336 let end_frame_no = read_u32_le(bytes, &mut cursor, "end_frame_no")?;
337 let db_size_pages = read_u32_le(bytes, &mut cursor, "db_size_pages")?;
338 let page_size = read_u32_le(bytes, &mut cursor, "page_size")?;
339 let k_source = read_u32_le(bytes, &mut cursor, "k_source")?;
340 let r_repair = read_u32_le(bytes, &mut cursor, "r_repair")?;
341 let oti_bytes = read_array::<22>(bytes, &mut cursor, "oti")?;
342 let oti = Oti::from_bytes(&oti_bytes).ok_or_else(|| FrankenError::WalCorrupt {
343 detail: "invalid wal-fec OTI encoding".to_owned(),
344 })?;
345 let object_id = ObjectId::from_bytes(read_array::<16>(bytes, &mut cursor, "object_id")?);
346
347 let k_source_usize = usize::try_from(k_source).map_err(|_| FrankenError::WalCorrupt {
348 detail: format!("k_source {k_source} does not fit in usize"),
349 })?;
350
351 let required_array_bytes = k_source_usize.saturating_mul(20);
354 if bytes.len().saturating_sub(cursor) < required_array_bytes {
355 return Err(FrankenError::WalCorrupt {
356 detail: format!("k_source {} exceeds remaining buffer", k_source),
357 });
358 }
359
360 let mut page_numbers = Vec::with_capacity(k_source_usize);
361 for _ in 0..k_source_usize {
362 page_numbers.push(read_u32_le(bytes, &mut cursor, "page_number")?);
363 }
364 let mut source_page_xxh3_128 = Vec::with_capacity(k_source_usize);
365 for _ in 0..k_source_usize {
366 let digest = read_array::<16>(bytes, &mut cursor, "source_page_hash")?;
367 source_page_xxh3_128.push(Xxh3Checksum128 {
368 low: u64::from_le_bytes(digest[..8].try_into().expect("8-byte low hash slice")),
369 high: u64::from_le_bytes(digest[8..].try_into().expect("8-byte high hash slice")),
370 });
371 }
372 let checksum = read_u64_le(bytes, &mut cursor, "checksum")?;
373 if cursor != bytes.len() {
374 return Err(FrankenError::WalCorrupt {
375 detail: format!(
376 "wal-fec group meta trailing bytes: consumed {cursor}, total {}",
377 bytes.len()
378 ),
379 });
380 }
381
382 let meta = Self {
383 magic,
384 version,
385 wal_salt1,
386 wal_salt2,
387 start_frame_no,
388 end_frame_no,
389 db_size_pages,
390 page_size,
391 k_source,
392 r_repair,
393 oti,
394 object_id,
395 page_numbers,
396 source_page_xxh3_128,
397 checksum,
398 };
399 meta.validate_invariants()?;
400 let computed = meta.compute_checksum();
401 if computed != meta.checksum {
402 return Err(FrankenError::WalCorrupt {
403 detail: format!(
404 "wal-fec group checksum mismatch: stored {:#018x}, computed {computed:#018x}",
405 meta.checksum
406 ),
407 });
408 }
409 Ok(meta)
410 }
411
412 fn serialized_len_without_prefix(&self) -> usize {
413 META_FIXED_PREFIX_BYTES
414 + self.page_numbers.len() * size_of::<u32>()
415 + self.source_page_xxh3_128.len() * size_of::<[u8; 16]>()
416 + META_CHECKSUM_BYTES
417 }
418
419 fn compute_checksum(&self) -> u64 {
420 xxh3_64(&self.to_record_bytes_without_checksum())
421 }
422
423 fn to_record_bytes_without_checksum(&self) -> Vec<u8> {
424 let mut bytes =
425 Vec::with_capacity(self.serialized_len_without_prefix() - META_CHECKSUM_BYTES);
426 bytes.extend_from_slice(&self.magic);
427 append_u32_le(&mut bytes, self.version);
428 append_u32_le(&mut bytes, self.wal_salt1);
429 append_u32_le(&mut bytes, self.wal_salt2);
430 append_u32_le(&mut bytes, self.start_frame_no);
431 append_u32_le(&mut bytes, self.end_frame_no);
432 append_u32_le(&mut bytes, self.db_size_pages);
433 append_u32_le(&mut bytes, self.page_size);
434 append_u32_le(&mut bytes, self.k_source);
435 append_u32_le(&mut bytes, self.r_repair);
436 bytes.extend_from_slice(&self.oti.to_bytes());
437 bytes.extend_from_slice(self.object_id.as_bytes());
438 for &page_number in &self.page_numbers {
439 append_u32_le(&mut bytes, page_number);
440 }
441 for &hash in &self.source_page_xxh3_128 {
442 bytes.extend_from_slice(&hash.to_le_bytes());
443 }
444 bytes
445 }
446
447 fn validate_invariants(&self) -> Result<()> {
448 self.validate_meta_header()?;
449 self.validate_frame_span()?;
450 if self.r_repair == 0 {
451 return Err(FrankenError::WalCorrupt {
452 detail: "r_repair must be >= 1 for wal-fec groups".to_owned(),
453 });
454 }
455 let k_source_usize =
456 usize::try_from(self.k_source).map_err(|_| FrankenError::WalCorrupt {
457 detail: format!("k_source {} does not fit in usize", self.k_source),
458 })?;
459 self.validate_array_lengths(k_source_usize)?;
460 self.validate_page_size_and_oti()?;
461 if self.db_size_pages == 0 {
462 return Err(FrankenError::WalCorrupt {
463 detail: "db_size_pages must be non-zero commit frame size".to_owned(),
464 });
465 }
466 Ok(())
467 }
468
469 fn validate_meta_header(&self) -> Result<()> {
470 if self.magic != WAL_FEC_GROUP_META_MAGIC {
471 return Err(FrankenError::WalCorrupt {
472 detail: "invalid wal-fec magic".to_owned(),
473 });
474 }
475 if self.version != WAL_FEC_GROUP_META_VERSION {
476 return Err(FrankenError::WalCorrupt {
477 detail: format!(
478 "unsupported wal-fec meta version {} (expected {WAL_FEC_GROUP_META_VERSION})",
479 self.version
480 ),
481 });
482 }
483 Ok(())
484 }
485
486 fn validate_frame_span(&self) -> Result<()> {
487 if self.start_frame_no == 0 {
488 return Err(FrankenError::WalCorrupt {
489 detail: "start_frame_no must be 1-based and nonzero".to_owned(),
490 });
491 }
492 if self.end_frame_no < self.start_frame_no {
493 return Err(FrankenError::WalCorrupt {
494 detail: format!(
495 "end_frame_no {} must be >= start_frame_no {}",
496 self.end_frame_no, self.start_frame_no
497 ),
498 });
499 }
500 let expected_k = self
501 .end_frame_no
502 .checked_sub(self.start_frame_no)
503 .and_then(|delta| delta.checked_add(1))
504 .ok_or_else(|| FrankenError::WalCorrupt {
505 detail: "frame-range overflow while validating k_source".to_owned(),
506 })?;
507 if self.k_source != expected_k {
508 return Err(FrankenError::WalCorrupt {
509 detail: format!(
510 "k_source {} must equal frame span {} ({}..={})",
511 self.k_source, expected_k, self.start_frame_no, self.end_frame_no
512 ),
513 });
514 }
515 Ok(())
516 }
517
518 fn validate_array_lengths(&self, k_source_usize: usize) -> Result<()> {
519 if self.page_numbers.len() != k_source_usize {
520 return Err(FrankenError::WalCorrupt {
521 detail: format!(
522 "page_numbers length {} must equal k_source {}",
523 self.page_numbers.len(),
524 self.k_source
525 ),
526 });
527 }
528 if self.source_page_xxh3_128.len() != k_source_usize {
529 return Err(FrankenError::WalCorrupt {
530 detail: format!(
531 "source_page_xxh3_128 length {} must equal k_source {}",
532 self.source_page_xxh3_128.len(),
533 self.k_source
534 ),
535 });
536 }
537 Ok(())
538 }
539
540 fn validate_page_size_and_oti(&self) -> Result<()> {
541 if PageSize::new(self.page_size).is_none() {
542 return Err(FrankenError::WalCorrupt {
543 detail: format!("invalid SQLite page_size {}", self.page_size),
544 });
545 }
546 if self.oti.t != self.page_size {
547 return Err(FrankenError::WalCorrupt {
548 detail: format!(
549 "OTI.t {} must equal page_size {} for WAL source pages",
550 self.oti.t, self.page_size
551 ),
552 });
553 }
554 let expected_f = u64::from(self.k_source)
555 .checked_mul(u64::from(self.page_size))
556 .ok_or_else(|| FrankenError::WalCorrupt {
557 detail: "overflow computing expected OTI.f".to_owned(),
558 })?;
559 if self.oti.f != expected_f {
560 return Err(FrankenError::WalCorrupt {
561 detail: format!(
562 "OTI.f {} must equal k_source*page_size ({expected_f})",
563 self.oti.f
564 ),
565 });
566 }
567 Ok(())
568 }
569}
570
571#[derive(Debug, Clone, PartialEq, Eq)]
573pub struct WalFecGroupRecord {
574 pub meta: WalFecGroupMeta,
575 pub repair_symbols: Vec<SymbolRecord>,
576}
577
578impl WalFecGroupRecord {
579 pub fn new(meta: WalFecGroupMeta, repair_symbols: Vec<SymbolRecord>) -> Result<Self> {
580 let group = Self {
581 meta,
582 repair_symbols,
583 };
584 group.validate_layout()?;
585 Ok(group)
586 }
587
588 fn validate_layout(&self) -> Result<()> {
589 let expected_repair =
590 usize::try_from(self.meta.r_repair).map_err(|_| FrankenError::WalCorrupt {
591 detail: format!("r_repair {} does not fit in usize", self.meta.r_repair),
592 })?;
593 if self.repair_symbols.len() != expected_repair {
594 return Err(FrankenError::WalCorrupt {
595 detail: format!(
596 "repair symbol count {} must equal r_repair {}",
597 self.repair_symbols.len(),
598 self.meta.r_repair
599 ),
600 });
601 }
602 for (index, symbol) in self.repair_symbols.iter().enumerate() {
603 if symbol.object_id != self.meta.object_id {
604 return Err(FrankenError::WalCorrupt {
605 detail: format!(
606 "repair symbol {index} object_id mismatch: {} != {}",
607 symbol.object_id, self.meta.object_id
608 ),
609 });
610 }
611 if symbol.oti != self.meta.oti {
612 return Err(FrankenError::WalCorrupt {
613 detail: format!("repair symbol {index} OTI mismatch"),
614 });
615 }
616 let expected_esi = self
617 .meta
618 .k_source
619 .checked_add(u32::try_from(index).map_err(|_| FrankenError::WalCorrupt {
620 detail: format!("repair symbol index {index} does not fit in u32"),
621 })?)
622 .ok_or_else(|| FrankenError::WalCorrupt {
623 detail: "repair ESI overflow".to_owned(),
624 })?;
625 if symbol.esi != expected_esi {
626 return Err(FrankenError::WalCorrupt {
627 detail: format!(
628 "repair symbol {index} has ESI {}, expected {expected_esi}",
629 symbol.esi
630 ),
631 });
632 }
633 }
634 Ok(())
635 }
636}
637
638#[derive(Debug, Clone, Default, PartialEq, Eq)]
640pub struct WalFecScanResult {
641 pub groups: Vec<WalFecGroupRecord>,
642 pub truncated_tail: bool,
643}
644
645#[derive(Debug, Clone, Copy, PartialEq, Eq)]
647pub enum WalFecRecoveryFallbackReason {
648 MissingSidecarGroup,
649 SidecarUnreadable,
650 SaltMismatch,
651 InsufficientSymbols,
652 DecodeFailed,
653 DecodedPayloadMismatch,
654 RecoveryDisabled,
656}
657
658impl WalFecRecoveryFallbackReason {
659 #[must_use]
660 pub const fn reason_code(self) -> &'static str {
661 match self {
662 Self::MissingSidecarGroup => "missing_sidecar_group",
663 Self::SidecarUnreadable => "sidecar_unreadable",
664 Self::SaltMismatch => "salt_mismatch",
665 Self::InsufficientSymbols => "insufficient_symbols",
666 Self::DecodeFailed => "decode_failed",
667 Self::DecodedPayloadMismatch => "decoded_payload_mismatch",
668 Self::RecoveryDisabled => "recovery_disabled",
669 }
670 }
671}
672
673#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub struct WalFecRecoveryConfig {
685 pub recovery_enabled: bool,
687}
688
689impl Default for WalFecRecoveryConfig {
690 fn default() -> Self {
691 Self {
692 recovery_enabled: true,
693 }
694 }
695}
696
697#[derive(Debug, Clone, PartialEq, Eq)]
703#[allow(clippy::struct_excessive_bools)]
704pub struct WalFecRecoveryLog {
705 pub group_id: WalFecGroupId,
707 pub recovery_enabled: bool,
709 pub outcome_is_recovered: bool,
711 pub fallback_reason: Option<WalFecRecoveryFallbackReason>,
713 pub validated_source_symbols: u32,
715 pub validated_repair_symbols: u32,
717 pub required_symbols: u32,
719 pub available_symbols: u32,
721 pub recovered_frame_nos: Vec<u32>,
723 pub corruption_observations: u32,
725 pub decode_attempted: bool,
727 pub decode_succeeded: bool,
729}
730
731#[derive(Debug, Clone, Copy, PartialEq, Eq)]
733pub enum WalFecRepairSeverityBucket {
734 One,
735 TwoToFive,
736 SixToTen,
737 ElevenPlus,
738}
739
740impl WalFecRepairSeverityBucket {
741 #[must_use]
742 pub const fn as_str(self) -> &'static str {
743 match self {
744 Self::One => "1",
745 Self::TwoToFive => "2-5",
746 Self::SixToTen => "6-10",
747 Self::ElevenPlus => "11+",
748 }
749 }
750}
751
752impl FromStr for WalFecRepairSeverityBucket {
753 type Err = &'static str;
754
755 fn from_str(value: &str) -> std::result::Result<Self, Self::Err> {
756 match value.trim().to_ascii_lowercase().as_str() {
757 "1" | "one" => Ok(Self::One),
758 "2-5" | "two-to-five" | "two_to_five" => Ok(Self::TwoToFive),
759 "6-10" | "six-to-ten" | "six_to_ten" => Ok(Self::SixToTen),
760 "11+" | "eleven-plus" | "eleven_plus" => Ok(Self::ElevenPlus),
761 _ => Err("unrecognized RaptorQ repair severity bucket"),
762 }
763 }
764}
765
766#[derive(Debug, Clone, Copy, PartialEq, Eq)]
768pub enum WalFecRepairSource {
769 WalRepairSymbols,
770 SnapshotRepairSymbols,
771 WalAndSnapshotRepairSymbols,
772}
773
774impl WalFecRepairSource {
775 #[must_use]
776 pub const fn as_str(self) -> &'static str {
777 match self {
778 Self::WalRepairSymbols => "wal_repair_symbols",
779 Self::SnapshotRepairSymbols => "snapshot_repair_symbols",
780 Self::WalAndSnapshotRepairSymbols => "wal_and_snapshot_repair_symbols",
781 }
782 }
783}
784
785#[derive(Debug, Clone, Copy, PartialEq, Eq)]
787pub struct WalFecRepairWitnessTriple {
788 pub corrupted_hash_blake3: [u8; 32],
789 pub repaired_hash_blake3: [u8; 32],
790 pub expected_hash_blake3: [u8; 32],
791}
792
793#[derive(Debug, Clone, PartialEq, Eq)]
795pub struct WalFecRepairEvidenceCard {
796 pub group_id: WalFecGroupId,
797 pub frame_id: u32,
798 pub wal_file_offset_bytes: Option<u64>,
799 pub monotonic_timestamp_ns: u64,
800 pub wall_clock_unix_ns: u64,
801 pub corruption_signature_blake3: [u8; 32],
802 pub bit_error_pattern: Option<String>,
803 pub repair_source: WalFecRepairSource,
804 pub symbols_used: u32,
805 pub validated_source_symbols: u32,
806 pub validated_repair_symbols: u32,
807 pub required_symbols: u32,
808 pub available_symbols: u32,
809 pub witness: WalFecRepairWitnessTriple,
810 pub repair_latency_ns: u64,
811 pub confidence_per_mille: u32,
812 pub severity_bucket: WalFecRepairSeverityBucket,
813 pub ledger_epoch: u64,
814 pub chain_hash: [u8; 32],
815}
816
817fn hex_encode_32(bytes: [u8; 32]) -> String {
818 use std::fmt::Write as _;
819
820 let mut encoded = String::with_capacity(64);
821 for byte in bytes {
822 let _ = write!(&mut encoded, "{byte:02x}");
823 }
824 encoded
825}
826
827impl WalFecRepairEvidenceCard {
828 #[must_use]
829 pub fn chain_hash_hex(&self) -> String {
830 hex_encode_32(self.chain_hash)
831 }
832
833 #[must_use]
834 pub fn corruption_signature_hex(&self) -> String {
835 hex_encode_32(self.corruption_signature_blake3)
836 }
837}
838
839#[derive(Debug, Clone, Default, PartialEq, Eq)]
841pub struct WalFecRepairEvidenceQuery {
842 pub frame_id: Option<u32>,
843 pub severity_bucket: Option<WalFecRepairSeverityBucket>,
844 pub wall_clock_start_ns: Option<u64>,
845 pub wall_clock_end_ns: Option<u64>,
846 pub limit: Option<usize>,
847}
848
849#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
851pub struct WalFecRepairSeverityHistogram {
852 pub one: u64,
853 pub two_to_five: u64,
854 pub six_to_ten: u64,
855 pub eleven_plus: u64,
856}
857
858impl WalFecRepairSeverityHistogram {
859 fn bump(&mut self, bucket: WalFecRepairSeverityBucket) {
860 match bucket {
861 WalFecRepairSeverityBucket::One => {
862 self.one = self.one.saturating_add(1);
863 }
864 WalFecRepairSeverityBucket::TwoToFive => {
865 self.two_to_five = self.two_to_five.saturating_add(1);
866 }
867 WalFecRepairSeverityBucket::SixToTen => {
868 self.six_to_ten = self.six_to_ten.saturating_add(1);
869 }
870 WalFecRepairSeverityBucket::ElevenPlus => {
871 self.eleven_plus = self.eleven_plus.saturating_add(1);
872 }
873 }
874 }
875}
876
877#[derive(Debug, Clone, PartialEq, Eq)]
879pub struct WalFecRepairEvent {
880 pub group_id: WalFecGroupId,
882 pub frame_id: u32,
884 pub symbols_lost: u32,
886 pub symbols_used: u32,
888 pub repair_success: bool,
890 pub latency_ns: u64,
892 pub budget_utilization_pct: u32,
894 pub severity_bucket: WalFecRepairSeverityBucket,
896}
897
898#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
900pub struct WalFecRepairMetricsSnapshot {
901 pub repairs_total: u64,
902 pub repairs_failed: u64,
903 pub symbols_reclaimed: u64,
904 pub budget_utilization_pct: u32,
905 pub wal_health_score: u32,
906 pub severity_histogram: WalFecRepairSeverityHistogram,
907}
908
909#[derive(Debug, Default)]
910struct WalFecRepairTelemetryState {
911 repairs_total: u64,
912 repairs_failed: u64,
913 symbols_reclaimed: u64,
914 budget_utilization_sum: u64,
915 budget_utilization_count: u64,
916 severity_histogram: WalFecRepairSeverityHistogram,
917 events: VecDeque<WalFecRepairEvent>,
918 evidence_cards: VecDeque<WalFecRepairEvidenceCard>,
919 evidence_chain_tip: [u8; 32],
920 next_evidence_epoch: u64,
921}
922
923static RAPTORQ_REPAIR_TELEMETRY: OnceLock<Mutex<WalFecRepairTelemetryState>> = OnceLock::new();
924
925fn raptorq_repair_telemetry() -> &'static Mutex<WalFecRepairTelemetryState> {
926 RAPTORQ_REPAIR_TELEMETRY.get_or_init(|| Mutex::new(WalFecRepairTelemetryState::default()))
927}
928
929fn lock_raptorq_repair_telemetry() -> MutexGuard<'static, WalFecRepairTelemetryState> {
930 match raptorq_repair_telemetry().lock() {
931 Ok(guard) => guard,
932 Err(poisoned) => {
933 warn!("raptorq repair telemetry lock poisoned; recovering poisoned state");
934 poisoned.into_inner()
935 }
936 }
937}
938
939fn severity_bucket_for_loss(symbols_lost: u32) -> WalFecRepairSeverityBucket {
940 match symbols_lost {
941 0 | 1 => WalFecRepairSeverityBucket::One,
942 2..=5 => WalFecRepairSeverityBucket::TwoToFive,
943 6..=10 => WalFecRepairSeverityBucket::SixToTen,
944 _ => WalFecRepairSeverityBucket::ElevenPlus,
945 }
946}
947
948fn compute_health_score(state: &WalFecRepairTelemetryState) -> u32 {
949 if state.repairs_total == 0 {
950 return 100;
951 }
952
953 let failure_penalty = state.repairs_failed.saturating_mul(20).min(70);
954 let severity_penalty = state
955 .severity_histogram
956 .one
957 .saturating_mul(1)
958 .saturating_add(state.severity_histogram.two_to_five.saturating_mul(4))
959 .saturating_add(state.severity_histogram.six_to_ten.saturating_mul(8))
960 .saturating_add(state.severity_histogram.eleven_plus.saturating_mul(12))
961 .min(30);
962 let avg_budget_utilization = state
963 .budget_utilization_sum
964 .checked_div(state.budget_utilization_count)
965 .unwrap_or(0);
966 let utilization_penalty = if avg_budget_utilization >= 80 {
967 15
968 } else if avg_budget_utilization >= 60 {
969 10
970 } else if avg_budget_utilization >= 40 {
971 5
972 } else {
973 0
974 };
975
976 let total_penalty = failure_penalty
977 .saturating_add(severity_penalty)
978 .saturating_add(utilization_penalty)
979 .min(100);
980 let score = 100_u64.saturating_sub(total_penalty);
981 u32::try_from(score).unwrap_or(0)
982}
983
984fn build_repair_event(log: &WalFecRecoveryLog, latency: Duration) -> Option<WalFecRepairEvent> {
985 let symbols_lost = log
986 .required_symbols
987 .saturating_sub(log.validated_source_symbols);
988 let repair_activated =
989 symbols_lost > 0 || log.decode_attempted || log.fallback_reason.is_some();
990 if !repair_activated {
991 return None;
992 }
993
994 let symbols_used = log.available_symbols.min(log.required_symbols);
995 let repair_budget = log.validated_repair_symbols.max(1);
996 let utilization_num = u64::from(symbols_lost)
997 .saturating_mul(100)
998 .saturating_add(u64::from(repair_budget).saturating_sub(1));
999 let utilization = utilization_num / u64::from(repair_budget);
1000 let budget_utilization_pct = u32::try_from(utilization.min(100)).unwrap_or(100);
1001 let latency_ns = u64::try_from(latency.as_nanos()).unwrap_or(u64::MAX);
1002 let severity_bucket = severity_bucket_for_loss(symbols_lost);
1003 let repair_success =
1004 log.outcome_is_recovered && (log.decode_succeeded || !log.decode_attempted);
1005
1006 Some(WalFecRepairEvent {
1007 group_id: log.group_id,
1008 frame_id: log.group_id.end_frame_no,
1009 symbols_lost,
1010 symbols_used,
1011 repair_success,
1012 latency_ns,
1013 budget_utilization_pct,
1014 severity_bucket,
1015 })
1016}
1017
1018const fn recovery_outcome_code(log: &WalFecRecoveryLog) -> &'static str {
1019 if log.outcome_is_recovered {
1020 "recovered"
1021 } else {
1022 "truncate_before_group"
1023 }
1024}
1025
1026const fn recovery_reason_code_for_log(log: &WalFecRecoveryLog) -> &'static str {
1027 if let Some(reason) = log.fallback_reason {
1028 return reason.reason_code();
1029 }
1030 if log.decode_attempted {
1031 return "decode_recovered";
1032 }
1033 "intact_fast_path"
1034}
1035
1036const fn repair_attempt_for_log(log: &WalFecRecoveryLog) -> bool {
1037 log.recovery_enabled
1038 && (log.decode_attempted
1039 || log.fallback_reason.is_some()
1040 || log.corruption_observations > 0
1041 || log.validated_source_symbols < log.required_symbols)
1042}
1043
1044fn symbol_state_for_log(log: &WalFecRecoveryLog) -> String {
1045 format!(
1046 "source_validated={}/{};repair_validated={};available={};required={};decode_attempted={};decode_succeeded={}",
1047 log.validated_source_symbols,
1048 log.required_symbols,
1049 log.validated_repair_symbols,
1050 log.available_symbols,
1051 log.required_symbols,
1052 log.decode_attempted,
1053 log.decode_succeeded
1054 )
1055}
1056
1057fn monotonic_now_ns() -> u64 {
1058 static START: OnceLock<Instant> = OnceLock::new();
1059 let elapsed = START.get_or_init(Instant::now).elapsed().as_nanos();
1060 u64::try_from(elapsed).unwrap_or(u64::MAX)
1061}
1062
1063fn wall_clock_unix_ns() -> u64 {
1064 let Ok(delta) = SystemTime::now().duration_since(UNIX_EPOCH) else {
1065 return 0;
1066 };
1067 u64::try_from(delta.as_nanos()).unwrap_or(u64::MAX)
1068}
1069
1070const fn fallback_reason_tag(reason: Option<WalFecRecoveryFallbackReason>) -> u8 {
1071 match reason {
1072 None => 0,
1073 Some(WalFecRecoveryFallbackReason::MissingSidecarGroup) => 1,
1074 Some(WalFecRecoveryFallbackReason::SidecarUnreadable) => 2,
1075 Some(WalFecRecoveryFallbackReason::SaltMismatch) => 3,
1076 Some(WalFecRecoveryFallbackReason::InsufficientSymbols) => 4,
1077 Some(WalFecRecoveryFallbackReason::DecodeFailed) => 5,
1078 Some(WalFecRecoveryFallbackReason::DecodedPayloadMismatch) => 6,
1079 Some(WalFecRecoveryFallbackReason::RecoveryDisabled) => 7,
1080 }
1081}
1082
1083fn repair_source_for_log(log: &WalFecRecoveryLog) -> WalFecRepairSource {
1084 if log.validated_repair_symbols > 0 && log.validated_source_symbols > 0 {
1085 return WalFecRepairSource::WalAndSnapshotRepairSymbols;
1086 }
1087 if log.validated_repair_symbols > 0 {
1088 return WalFecRepairSource::WalRepairSymbols;
1089 }
1090 WalFecRepairSource::SnapshotRepairSymbols
1091}
1092
1093fn confidence_per_mille(required_symbols: u32, available_symbols: u32) -> u32 {
1094 if required_symbols == 0 {
1095 return 0;
1096 }
1097 let scaled = u64::from(available_symbols)
1098 .saturating_mul(1_000)
1099 .checked_div(u64::from(required_symbols))
1100 .unwrap_or(0);
1101 u32::try_from(scaled).unwrap_or(u32::MAX)
1102}
1103
1104fn blake3_hash_to_array(hasher: &blake3::Hasher) -> [u8; 32] {
1105 let mut output = [0_u8; 32];
1106 output.copy_from_slice(hasher.finalize().as_bytes());
1107 output
1108}
1109
1110fn compute_corruption_signature(log: &WalFecRecoveryLog, event: &WalFecRepairEvent) -> [u8; 32] {
1111 let mut hasher = blake3::Hasher::new();
1112 hasher.update(b"fsqlite:wal_fec:repair_corruption_signature:v1");
1113 hasher.update(&log.group_id.wal_salt1.to_le_bytes());
1114 hasher.update(&log.group_id.wal_salt2.to_le_bytes());
1115 hasher.update(&log.group_id.end_frame_no.to_le_bytes());
1116 hasher.update(&event.frame_id.to_le_bytes());
1117 hasher.update(&event.symbols_lost.to_le_bytes());
1118 hasher.update(&log.validated_source_symbols.to_le_bytes());
1119 hasher.update(&log.validated_repair_symbols.to_le_bytes());
1120 hasher.update(&log.required_symbols.to_le_bytes());
1121 hasher.update(&log.available_symbols.to_le_bytes());
1122 hasher.update(&log.corruption_observations.to_le_bytes());
1123 hasher.update(&[fallback_reason_tag(log.fallback_reason)]);
1124 blake3_hash_to_array(&hasher)
1125}
1126
1127fn compute_witness_hash(
1128 label: &[u8],
1129 log: &WalFecRecoveryLog,
1130 event: &WalFecRepairEvent,
1131 corruption_signature: [u8; 32],
1132) -> [u8; 32] {
1133 let mut hasher = blake3::Hasher::new();
1134 hasher.update(b"fsqlite:wal_fec:repair_witness:v1");
1135 hasher.update(label);
1136 hasher.update(&corruption_signature);
1137 hasher.update(&log.group_id.wal_salt1.to_le_bytes());
1138 hasher.update(&log.group_id.wal_salt2.to_le_bytes());
1139 hasher.update(&log.group_id.end_frame_no.to_le_bytes());
1140 hasher.update(&event.symbols_used.to_le_bytes());
1141 hasher.update(&event.budget_utilization_pct.to_le_bytes());
1142 hasher.update(&log.required_symbols.to_le_bytes());
1143 hasher.update(&log.available_symbols.to_le_bytes());
1144 hasher.update(&[u8::from(log.outcome_is_recovered)]);
1145 hasher.update(&[u8::from(log.decode_succeeded)]);
1146 blake3_hash_to_array(&hasher)
1147}
1148
1149fn compute_evidence_chain_hash(
1150 previous_tip: [u8; 32],
1151 card: &WalFecRepairEvidenceCard,
1152) -> [u8; 32] {
1153 let mut hasher = blake3::Hasher::new();
1154 hasher.update(b"fsqlite:wal_fec:repair_evidence_chain:v1");
1155 hasher.update(&previous_tip);
1156 hasher.update(&card.group_id.wal_salt1.to_le_bytes());
1157 hasher.update(&card.group_id.wal_salt2.to_le_bytes());
1158 hasher.update(&card.group_id.end_frame_no.to_le_bytes());
1159 hasher.update(&card.frame_id.to_le_bytes());
1160 hasher.update(&card.wal_file_offset_bytes.unwrap_or(u64::MAX).to_le_bytes());
1161 hasher.update(&card.monotonic_timestamp_ns.to_le_bytes());
1162 hasher.update(&card.wall_clock_unix_ns.to_le_bytes());
1163 hasher.update(&card.corruption_signature_blake3);
1164 hasher.update(
1165 card.bit_error_pattern
1166 .as_deref()
1167 .unwrap_or_default()
1168 .as_bytes(),
1169 );
1170 hasher.update(card.repair_source.as_str().as_bytes());
1171 hasher.update(&card.symbols_used.to_le_bytes());
1172 hasher.update(&card.validated_source_symbols.to_le_bytes());
1173 hasher.update(&card.validated_repair_symbols.to_le_bytes());
1174 hasher.update(&card.required_symbols.to_le_bytes());
1175 hasher.update(&card.available_symbols.to_le_bytes());
1176 hasher.update(&card.witness.corrupted_hash_blake3);
1177 hasher.update(&card.witness.repaired_hash_blake3);
1178 hasher.update(&card.witness.expected_hash_blake3);
1179 hasher.update(&card.repair_latency_ns.to_le_bytes());
1180 hasher.update(&card.confidence_per_mille.to_le_bytes());
1181 hasher.update(card.severity_bucket.as_str().as_bytes());
1182 hasher.update(&card.ledger_epoch.to_le_bytes());
1183 blake3_hash_to_array(&hasher)
1184}
1185
1186fn build_repair_evidence_card(
1187 log: &WalFecRecoveryLog,
1188 event: &WalFecRepairEvent,
1189 latency: Duration,
1190 previous_chain_tip: [u8; 32],
1191 ledger_epoch: u64,
1192) -> WalFecRepairEvidenceCard {
1193 let corruption_signature = compute_corruption_signature(log, event);
1194 let witness = WalFecRepairWitnessTriple {
1195 corrupted_hash_blake3: compute_witness_hash(b"corrupted", log, event, corruption_signature),
1196 repaired_hash_blake3: compute_witness_hash(b"repaired", log, event, corruption_signature),
1197 expected_hash_blake3: compute_witness_hash(b"expected", log, event, corruption_signature),
1198 };
1199 let repair_latency_ns = u64::try_from(latency.as_nanos()).unwrap_or(u64::MAX);
1200 let bit_error_pattern = if log.corruption_observations > 0 {
1201 Some(format!(
1202 "corruption_observations={}",
1203 log.corruption_observations
1204 ))
1205 } else {
1206 None
1207 };
1208
1209 let mut card = WalFecRepairEvidenceCard {
1210 group_id: log.group_id,
1211 frame_id: event.frame_id,
1212 wal_file_offset_bytes: None,
1213 monotonic_timestamp_ns: monotonic_now_ns(),
1214 wall_clock_unix_ns: wall_clock_unix_ns(),
1215 corruption_signature_blake3: corruption_signature,
1216 bit_error_pattern,
1217 repair_source: repair_source_for_log(log),
1218 symbols_used: event.symbols_used,
1219 validated_source_symbols: log.validated_source_symbols,
1220 validated_repair_symbols: log.validated_repair_symbols,
1221 required_symbols: log.required_symbols,
1222 available_symbols: log.available_symbols,
1223 witness,
1224 repair_latency_ns,
1225 confidence_per_mille: confidence_per_mille(log.required_symbols, log.available_symbols),
1226 severity_bucket: event.severity_bucket,
1227 ledger_epoch,
1228 chain_hash: [0_u8; 32],
1229 };
1230 card.chain_hash = compute_evidence_chain_hash(previous_chain_tip, &card);
1231 card
1232}
1233
1234pub fn record_raptorq_recovery_log(log: &WalFecRecoveryLog, latency: Duration) {
1238 let Some(event) = build_repair_event(log, latency) else {
1239 return;
1240 };
1241
1242 let mut state = lock_raptorq_repair_telemetry();
1243 state.repairs_total = state.repairs_total.saturating_add(1);
1244 if event.repair_success {
1245 state.symbols_reclaimed = state
1246 .symbols_reclaimed
1247 .saturating_add(u64::from(event.symbols_lost));
1248 } else {
1249 state.repairs_failed = state.repairs_failed.saturating_add(1);
1250 }
1251 state.budget_utilization_sum = state
1252 .budget_utilization_sum
1253 .saturating_add(u64::from(event.budget_utilization_pct));
1254 state.budget_utilization_count = state.budget_utilization_count.saturating_add(1);
1255 state.severity_histogram.bump(event.severity_bucket);
1256
1257 if state.events.len() == RAPTORQ_REPAIR_EVENT_CAPACITY {
1258 let _ = state.events.pop_front();
1259 }
1260 state.events.push_back(event.clone());
1261
1262 let ledger_epoch = state.next_evidence_epoch.max(1);
1263 let evidence_card =
1264 build_repair_evidence_card(log, &event, latency, state.evidence_chain_tip, ledger_epoch);
1265 state.next_evidence_epoch = ledger_epoch.saturating_add(1);
1266 state.evidence_chain_tip = evidence_card.chain_hash;
1267 if state.evidence_cards.len() == RAPTORQ_REPAIR_EVIDENCE_CAPACITY {
1268 let _ = state.evidence_cards.pop_front();
1269 }
1270 state.evidence_cards.push_back(evidence_card);
1271}
1272
1273#[must_use]
1275pub fn raptorq_repair_metrics_snapshot() -> WalFecRepairMetricsSnapshot {
1276 let state = lock_raptorq_repair_telemetry();
1277 let mean_budget_utilization = state
1278 .budget_utilization_sum
1279 .checked_div(state.budget_utilization_count)
1280 .unwrap_or(0);
1281 let budget_utilization_pct = u32::try_from(mean_budget_utilization).unwrap_or(u32::MAX);
1282 WalFecRepairMetricsSnapshot {
1283 repairs_total: state.repairs_total,
1284 repairs_failed: state.repairs_failed,
1285 symbols_reclaimed: state.symbols_reclaimed,
1286 budget_utilization_pct,
1287 wal_health_score: compute_health_score(&state),
1288 severity_histogram: state.severity_histogram,
1289 }
1290}
1291
1292#[must_use]
1296pub fn raptorq_repair_events_snapshot(limit: usize) -> Vec<WalFecRepairEvent> {
1297 let mut events = {
1298 let state = lock_raptorq_repair_telemetry();
1299 let take = if limit == 0 {
1300 state.events.len()
1301 } else {
1302 limit.min(state.events.len())
1303 };
1304 state
1305 .events
1306 .iter()
1307 .rev()
1308 .take(take)
1309 .cloned()
1310 .collect::<Vec<_>>()
1311 };
1312 events.reverse();
1313 events
1314}
1315
1316#[must_use]
1320pub fn raptorq_repair_evidence_snapshot(limit: usize) -> Vec<WalFecRepairEvidenceCard> {
1321 let mut cards = {
1322 let state = lock_raptorq_repair_telemetry();
1323 let take = if limit == 0 {
1324 state.evidence_cards.len()
1325 } else {
1326 limit.min(state.evidence_cards.len())
1327 };
1328 state
1329 .evidence_cards
1330 .iter()
1331 .rev()
1332 .take(take)
1333 .cloned()
1334 .collect::<Vec<_>>()
1335 };
1336 cards.reverse();
1337 cards
1338}
1339
1340#[must_use]
1342pub fn query_raptorq_repair_evidence(
1343 query: &WalFecRepairEvidenceQuery,
1344) -> Vec<WalFecRepairEvidenceCard> {
1345 let mut cards = {
1346 let state = lock_raptorq_repair_telemetry();
1347 state
1348 .evidence_cards
1349 .iter()
1350 .filter(|card| {
1351 query
1352 .frame_id
1353 .is_none_or(|frame_id| card.frame_id == frame_id)
1354 })
1355 .filter(|card| {
1356 query
1357 .severity_bucket
1358 .is_none_or(|severity| card.severity_bucket == severity)
1359 })
1360 .filter(|card| {
1361 query
1362 .wall_clock_start_ns
1363 .is_none_or(|start| card.wall_clock_unix_ns >= start)
1364 })
1365 .filter(|card| {
1366 query
1367 .wall_clock_end_ns
1368 .is_none_or(|end| card.wall_clock_unix_ns <= end)
1369 })
1370 .cloned()
1371 .collect::<Vec<_>>()
1372 };
1373
1374 if let Some(limit) = query.limit {
1375 if limit > 0 && cards.len() > limit {
1376 let keep_from = cards.len() - limit;
1377 cards.drain(..keep_from);
1378 }
1379 }
1380
1381 cards
1382}
1383
1384pub fn reset_raptorq_repair_telemetry() {
1386 let mut state = lock_raptorq_repair_telemetry();
1387 *state = WalFecRepairTelemetryState::default();
1388}
1389
1390#[derive(Debug, Clone, PartialEq, Eq)]
1392pub struct WalFecDecodeProof {
1393 pub group_id: WalFecGroupId,
1394 pub required_symbols: u32,
1395 pub available_symbols: u32,
1396 pub validated_source_symbols: u32,
1397 pub validated_repair_symbols: u32,
1398 pub corruption_observations: u32,
1400 pub decode_attempted: bool,
1401 pub decode_succeeded: bool,
1402 pub recovered_frame_nos: Vec<u32>,
1403 pub fallback_reason: Option<WalFecRecoveryFallbackReason>,
1404}
1405
1406#[derive(Debug, Clone, PartialEq, Eq)]
1408pub struct WalFecRecoveredGroup {
1409 pub meta: WalFecGroupMeta,
1410 pub recovered_pages: Vec<Vec<u8>>,
1411 pub recovered_frame_nos: Vec<u32>,
1412 pub db_size_pages: u32,
1413 pub decode_proof: WalFecDecodeProof,
1414}
1415
1416#[derive(Debug, Clone, PartialEq, Eq)]
1418pub enum WalFecRecoveryOutcome {
1419 Recovered(WalFecRecoveredGroup),
1420 TruncateBeforeGroup {
1421 truncate_before_frame_no: u32,
1422 decode_proof: WalFecDecodeProof,
1423 },
1424}
1425
1426#[derive(Debug, Clone, PartialEq, Eq)]
1428pub struct WalFrameCandidate {
1429 pub frame_no: u32,
1430 pub page_data: Vec<u8>,
1431}
1432
1433const DEFAULT_REPAIR_PIPELINE_QUEUE_CAPACITY: usize = 64;
1436
1437#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1439pub struct WalFecRepairPipelineConfig {
1440 pub queue_capacity: usize,
1444 pub per_symbol_delay: Duration,
1446}
1447
1448impl Default for WalFecRepairPipelineConfig {
1449 fn default() -> Self {
1450 Self {
1451 queue_capacity: DEFAULT_REPAIR_PIPELINE_QUEUE_CAPACITY,
1452 per_symbol_delay: Duration::ZERO,
1453 }
1454 }
1455}
1456
1457#[derive(Debug, Clone, PartialEq, Eq)]
1459pub struct WalFecRepairWorkItem {
1460 pub sidecar_path: PathBuf,
1461 pub meta: WalFecGroupMeta,
1462 pub source_pages: Vec<Vec<u8>>,
1463}
1464
1465impl WalFecRepairWorkItem {
1466 pub fn new(
1467 sidecar_path: impl Into<PathBuf>,
1468 meta: WalFecGroupMeta,
1469 source_pages: Vec<Vec<u8>>,
1470 ) -> Result<Self> {
1471 validate_source_pages(&meta, &source_pages)?;
1472 Ok(Self {
1473 sidecar_path: sidecar_path.into(),
1474 meta,
1475 source_pages,
1476 })
1477 }
1478}
1479
1480#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1482pub struct WalFecRepairPipelineStats {
1483 pub pending_jobs: usize,
1484 pub completed_jobs: usize,
1485 pub failed_jobs: usize,
1486 pub canceled_jobs: usize,
1487 pub max_pending_jobs: usize,
1488}
1489
1490#[derive(Debug)]
1491enum WalFecPipelineMessage {
1492 Work(WalFecRepairWorkItem),
1493}
1494
1495#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1496enum WalFecWorkOutcome {
1497 Completed,
1498 Canceled,
1499}
1500
1501#[derive(Debug, Clone)]
1502struct WalFecRepairWorkerState {
1503 cancel_flag: Arc<AtomicBool>,
1504 pending_jobs: Arc<AtomicUsize>,
1505 completed_jobs: Arc<AtomicUsize>,
1506 failed_jobs: Arc<AtomicUsize>,
1507 canceled_jobs: Arc<AtomicUsize>,
1508 worker_failure: Arc<Mutex<Option<String>>>,
1509}
1510
1511pub struct WalFecRepairPipeline {
1513 sender: Option<mpsc::Sender<WalFecPipelineMessage>>,
1514 cancel_flag: Arc<AtomicBool>,
1515 pending_jobs: Arc<AtomicUsize>,
1516 completed_jobs: Arc<AtomicUsize>,
1517 failed_jobs: Arc<AtomicUsize>,
1518 canceled_jobs: Arc<AtomicUsize>,
1519 max_pending_jobs: Arc<AtomicUsize>,
1520 worker_failure: Arc<Mutex<Option<String>>>,
1521 worker: Option<AsyncJoinHandle<()>>,
1522}
1523
1524impl WalFecRepairPipeline {
1525 pub fn start(
1527 runtime: &RuntimeHandle,
1528 parent_cx: &Cx,
1529 config: WalFecRepairPipelineConfig,
1530 ) -> Result<Self> {
1531 if config.queue_capacity == 0 {
1532 return Err(FrankenError::WalCorrupt {
1533 detail: "wal-fec repair pipeline queue_capacity must be >= 1".to_owned(),
1534 });
1535 }
1536
1537 let (tx, rx) = mpsc::channel(config.queue_capacity);
1538 let cancel_flag = Arc::new(AtomicBool::new(false));
1539 let pending_jobs = Arc::new(AtomicUsize::new(0));
1540 let completed_jobs = Arc::new(AtomicUsize::new(0));
1541 let failed_jobs = Arc::new(AtomicUsize::new(0));
1542 let canceled_jobs = Arc::new(AtomicUsize::new(0));
1543 let max_pending_jobs = Arc::new(AtomicUsize::new(0));
1544 let worker_failure = Arc::new(Mutex::new(None));
1545 let worker_state = WalFecRepairWorkerState {
1546 cancel_flag: Arc::clone(&cancel_flag),
1547 pending_jobs: Arc::clone(&pending_jobs),
1548 completed_jobs: Arc::clone(&completed_jobs),
1549 failed_jobs: Arc::clone(&failed_jobs),
1550 canceled_jobs: Arc::clone(&canceled_jobs),
1551 worker_failure: Arc::clone(&worker_failure),
1552 };
1553
1554 let worker_cx = parent_cx.create_child();
1555 let worker_handle = runtime
1556 .try_spawn(run_repair_pipeline_worker(
1557 rx,
1558 worker_state,
1559 worker_cx,
1560 config.per_symbol_delay,
1561 ))
1562 .map_err(|err| FrankenError::WalCorrupt {
1563 detail: format!("failed to spawn wal-fec repair worker task: {err}"),
1564 })?;
1565
1566 Ok(Self {
1567 sender: Some(tx),
1568 cancel_flag,
1569 pending_jobs,
1570 completed_jobs,
1571 failed_jobs,
1572 canceled_jobs,
1573 max_pending_jobs,
1574 worker_failure,
1575 worker: Some(worker_handle),
1576 })
1577 }
1578
1579 pub fn enqueue(&self, work_item: WalFecRepairWorkItem) -> Result<()> {
1581 if self.cancel_flag.load(Ordering::SeqCst) {
1582 return Err(FrankenError::WalCorrupt {
1583 detail: "wal-fec repair pipeline is canceled".to_owned(),
1584 });
1585 }
1586 let sender = self
1587 .sender
1588 .as_ref()
1589 .ok_or_else(|| FrankenError::WalCorrupt {
1590 detail: "wal-fec repair pipeline is shut down".to_owned(),
1591 })?;
1592
1593 let pending_after = self.pending_jobs.fetch_add(1, Ordering::SeqCst) + 1;
1594 update_max_pending(&self.max_pending_jobs, pending_after);
1595
1596 match sender.try_send(WalFecPipelineMessage::Work(work_item)) {
1597 Ok(()) => Ok(()),
1598 Err(mpsc::SendError::Full(_)) => {
1599 self.pending_jobs.fetch_sub(1, Ordering::SeqCst);
1600 Err(FrankenError::WalCorrupt {
1601 detail: "wal-fec repair pipeline queue full".to_owned(),
1602 })
1603 }
1604 Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
1605 self.pending_jobs.fetch_sub(1, Ordering::SeqCst);
1606 Err(FrankenError::WalCorrupt {
1607 detail: "wal-fec repair pipeline worker is disconnected".to_owned(),
1608 })
1609 }
1610 }
1611 }
1612
1613 pub fn cancel(&self) {
1615 self.cancel_flag.store(true, Ordering::SeqCst);
1616 }
1617
1618 #[must_use]
1620 pub async fn flush(&self, cx: &Cx, timeout: Duration) -> bool {
1621 let deadline = Instant::now() + timeout;
1622 loop {
1623 if lock_unpoisoned(&self.worker_failure).is_some() {
1624 return false;
1625 }
1626 if self.pending_jobs.load(Ordering::SeqCst) == 0 {
1627 return true;
1628 }
1629 if Instant::now() >= deadline {
1630 return false;
1631 }
1632 if cx.checkpoint().is_err() {
1633 return false;
1634 }
1635 yield_now().await;
1636 }
1637 }
1638
1639 #[must_use]
1641 pub fn stats(&self) -> WalFecRepairPipelineStats {
1642 WalFecRepairPipelineStats {
1643 pending_jobs: self.pending_jobs.load(Ordering::SeqCst),
1644 completed_jobs: self.completed_jobs.load(Ordering::SeqCst),
1645 failed_jobs: self.failed_jobs.load(Ordering::SeqCst),
1646 canceled_jobs: self.canceled_jobs.load(Ordering::SeqCst),
1647 max_pending_jobs: self.max_pending_jobs.load(Ordering::SeqCst),
1648 }
1649 }
1650
1651 pub async fn shutdown(&mut self, cx: &Cx) -> Result<WalFecRepairPipelineStats> {
1656 {
1657 let _mask = cx.masked();
1658 self.sender.take();
1659 }
1660 if let Some(worker) = self.worker.take() {
1661 worker.await;
1662 }
1663 let worker_failure_detail = lock_unpoisoned(&self.worker_failure).clone();
1664 if let Some(detail) = worker_failure_detail {
1665 return Err(FrankenError::WalCorrupt { detail });
1666 }
1667 Ok(self.stats())
1668 }
1669}
1670
1671impl Drop for WalFecRepairPipeline {
1672 fn drop(&mut self) {
1673 if self.worker.is_some() {
1674 self.cancel();
1675 self.sender.take();
1676 warn!(
1677 pending_jobs = self.pending_jobs.load(Ordering::SeqCst),
1678 "dropping wal-fec repair pipeline without awaiting shutdown"
1679 );
1680 }
1681 }
1682}
1683
1684async fn run_repair_pipeline_worker(
1685 mut receiver: mpsc::Receiver<WalFecPipelineMessage>,
1686 state: WalFecRepairWorkerState,
1687 worker_cx: Cx,
1688 per_symbol_delay: Duration,
1689) {
1690 let Some(native_worker_cx) = NativeCx::current() else {
1691 record_worker_failure(
1692 &state.worker_failure,
1693 "wal-fec repair worker task missing native runtime Cx".to_owned(),
1694 );
1695 drain_abandoned_work(
1696 &mut receiver,
1697 state.pending_jobs.as_ref(),
1698 state.canceled_jobs.as_ref(),
1699 );
1700 return;
1701 };
1702 worker_cx.set_native_cx(native_worker_cx.clone());
1703
1704 loop {
1705 let message = match receiver.recv(&native_worker_cx).await {
1706 Ok(message) => message,
1707 Err(mpsc::RecvError::Empty) => {
1708 yield_now().await;
1709 continue;
1710 }
1711 Err(mpsc::RecvError::Disconnected) => break,
1712 Err(mpsc::RecvError::Cancelled) => {
1713 record_worker_failure(
1714 &state.worker_failure,
1715 "wal-fec repair worker task cancelled before the queue drained".to_owned(),
1716 );
1717 drain_abandoned_work(
1718 &mut receiver,
1719 state.pending_jobs.as_ref(),
1720 state.canceled_jobs.as_ref(),
1721 );
1722 break;
1723 }
1724 };
1725
1726 match message {
1727 WalFecPipelineMessage::Work(work_item) => {
1728 let group_id = work_item.meta.group_id();
1729 let cancel_flag_for_work = Arc::clone(&state.cancel_flag);
1730 let work_cx = worker_cx.create_child();
1731 let native_worker_cx_for_work = native_worker_cx.clone();
1732 let outcome = spawn_blocking(move || {
1733 work_cx.set_native_cx(native_worker_cx_for_work);
1734 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1735 process_repair_work_item(
1736 &work_item,
1737 &work_cx,
1738 cancel_flag_for_work.as_ref(),
1739 per_symbol_delay,
1740 )
1741 }))
1742 })
1743 .await;
1744
1745 state.pending_jobs.fetch_sub(1, Ordering::SeqCst);
1746 match outcome {
1747 Ok(Ok(WalFecWorkOutcome::Completed)) => {
1748 state.completed_jobs.fetch_add(1, Ordering::SeqCst);
1749 info!(
1750 group_id = %group_id,
1751 "wal-fec repair work item completed"
1752 );
1753 }
1754 Ok(Ok(WalFecWorkOutcome::Canceled)) => {
1755 state.canceled_jobs.fetch_add(1, Ordering::SeqCst);
1756 warn!(
1757 group_id = %group_id,
1758 "wal-fec repair work item canceled before append"
1759 );
1760 }
1761 Ok(Err(err)) => {
1762 state.failed_jobs.fetch_add(1, Ordering::SeqCst);
1763 error!(
1764 group_id = %group_id,
1765 error = %err,
1766 "wal-fec repair work item failed"
1767 );
1768 }
1769 Err(_) => {
1770 state.failed_jobs.fetch_add(1, Ordering::SeqCst);
1771 let detail = format!(
1772 "wal-fec repair worker task panicked while processing group {group_id}"
1773 );
1774 record_worker_failure(&state.worker_failure, detail.clone());
1775 error!(group_id = %group_id, "{detail}");
1776 drain_abandoned_work(
1777 &mut receiver,
1778 state.pending_jobs.as_ref(),
1779 state.canceled_jobs.as_ref(),
1780 );
1781 break;
1782 }
1783 }
1784
1785 if worker_cx.checkpoint().is_err() {
1786 record_worker_failure(
1787 &state.worker_failure,
1788 "wal-fec repair worker task cancelled after processing work".to_owned(),
1789 );
1790 drain_abandoned_work(
1791 &mut receiver,
1792 state.pending_jobs.as_ref(),
1793 state.canceled_jobs.as_ref(),
1794 );
1795 break;
1796 }
1797 yield_now().await;
1798 }
1799 }
1800 }
1801}
1802
1803fn record_worker_failure(worker_failure: &Mutex<Option<String>>, detail: String) {
1804 let mut slot = lock_unpoisoned(worker_failure);
1805 if slot.is_none() {
1806 *slot = Some(detail);
1807 }
1808}
1809
1810fn lock_unpoisoned<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
1811 mutex
1812 .lock()
1813 .unwrap_or_else(std::sync::PoisonError::into_inner)
1814}
1815
1816trait ReceiverTryRecvCompat<T> {
1817 fn try_recv_compat(&mut self) -> std::result::Result<T, mpsc::RecvError>;
1818}
1819
1820impl<T> ReceiverTryRecvCompat<T> for mpsc::Receiver<T> {
1821 fn try_recv_compat(&mut self) -> std::result::Result<T, mpsc::RecvError> {
1822 self.try_recv()
1823 }
1824}
1825
1826fn drain_abandoned_work(
1827 receiver: &mut mpsc::Receiver<WalFecPipelineMessage>,
1828 pending_jobs: &AtomicUsize,
1829 canceled_jobs: &AtomicUsize,
1830) {
1831 while let Ok(WalFecPipelineMessage::Work(_)) = receiver.try_recv_compat() {
1832 pending_jobs.fetch_sub(1, Ordering::SeqCst);
1833 canceled_jobs.fetch_add(1, Ordering::SeqCst);
1834 }
1835}
1836
1837pub fn generate_wal_fec_repair_symbols(
1839 meta: &WalFecGroupMeta,
1840 source_pages: &[Vec<u8>],
1841) -> Result<Vec<SymbolRecord>> {
1842 match generate_wal_fec_repair_symbols_inner(meta, source_pages, None, None, Duration::ZERO)? {
1843 Some(symbols) => {
1844 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_encode();
1845 Ok(symbols)
1846 }
1847 None => Err(FrankenError::WalCorrupt {
1848 detail: "unexpected cancellation while generating wal-fec symbols".to_owned(),
1849 }),
1850 }
1851}
1852
1853fn process_repair_work_item(
1854 work_item: &WalFecRepairWorkItem,
1855 cx: &Cx,
1856 cancel_flag: &AtomicBool,
1857 per_symbol_delay: Duration,
1858) -> Result<WalFecWorkOutcome> {
1859 if cancel_flag.load(Ordering::SeqCst) || cx.checkpoint().is_err() {
1860 return Ok(WalFecWorkOutcome::Canceled);
1861 }
1862 let Some(repair_symbols) = generate_wal_fec_repair_symbols_inner(
1863 &work_item.meta,
1864 &work_item.source_pages,
1865 Some(cx),
1866 Some(cancel_flag),
1867 per_symbol_delay,
1868 )?
1869 else {
1870 return Ok(WalFecWorkOutcome::Canceled);
1871 };
1872 if cancel_flag.load(Ordering::SeqCst) || cx.checkpoint().is_err() {
1873 return Ok(WalFecWorkOutcome::Canceled);
1874 }
1875 let group = WalFecGroupRecord::new(work_item.meta.clone(), repair_symbols)?;
1876 append_wal_fec_group(&work_item.sidecar_path, &group)?;
1877 Ok(WalFecWorkOutcome::Completed)
1878}
1879
1880fn generate_wal_fec_repair_symbols_inner(
1881 meta: &WalFecGroupMeta,
1882 source_pages: &[Vec<u8>],
1883 cx: Option<&Cx>,
1884 cancel_flag: Option<&AtomicBool>,
1885 per_symbol_delay: Duration,
1886) -> Result<Option<Vec<SymbolRecord>>> {
1887 validate_source_pages(meta, source_pages)?;
1888 let symbol_len = usize::try_from(meta.oti.t).map_err(|_| FrankenError::WalCorrupt {
1889 detail: format!("OTI symbol size {} does not fit in usize", meta.oti.t),
1890 })?;
1891 let r_repair = usize::try_from(meta.r_repair).map_err(|_| FrankenError::WalCorrupt {
1892 detail: format!("r_repair {} does not fit in usize", meta.r_repair),
1893 })?;
1894
1895 let encoder_seed = derive_repair_seed(meta, 0);
1898
1899 let encoder = asupersync::raptorq::systematic::SystematicEncoder::new(
1900 source_pages,
1901 symbol_len,
1902 encoder_seed,
1903 )
1904 .ok_or_else(|| FrankenError::WalCorrupt {
1905 detail: "RaptorQ constraint matrix singular during encoding".to_owned(),
1906 })?;
1907
1908 let mut symbols = Vec::with_capacity(r_repair);
1909
1910 for repair_index in 0..r_repair {
1911 if let Some(cx) = cx {
1912 if cx.checkpoint().is_err() {
1913 return Ok(None);
1914 }
1915 }
1916 if let Some(flag) = cancel_flag {
1917 if flag.load(Ordering::SeqCst) {
1918 return Ok(None);
1919 }
1920 }
1921
1922 let esi = meta
1923 .k_source
1924 .checked_add(
1925 u32::try_from(repair_index).map_err(|_| FrankenError::WalCorrupt {
1926 detail: format!("repair_index {repair_index} does not fit in u32"),
1927 })?,
1928 )
1929 .ok_or_else(|| FrankenError::WalCorrupt {
1930 detail: "repair symbol ESI overflow".to_owned(),
1931 })?;
1932
1933 let payload = encoder.repair_symbol(esi);
1934
1935 if per_symbol_delay > Duration::ZERO {
1936 thread::sleep(per_symbol_delay);
1937 }
1938
1939 symbols.push(SymbolRecord::new(
1940 meta.object_id,
1941 meta.oti,
1942 esi,
1943 payload,
1944 SymbolRecordFlags::empty(),
1945 ));
1946 }
1947
1948 Ok(Some(symbols))
1949}
1950
1951fn validate_source_pages(meta: &WalFecGroupMeta, source_pages: &[Vec<u8>]) -> Result<()> {
1952 let expected_pages = usize::try_from(meta.k_source).map_err(|_| FrankenError::WalCorrupt {
1953 detail: format!("k_source {} does not fit in usize", meta.k_source),
1954 })?;
1955 if source_pages.len() != expected_pages {
1956 return Err(FrankenError::WalCorrupt {
1957 detail: format!(
1958 "source page count {} must equal k_source {}",
1959 source_pages.len(),
1960 meta.k_source
1961 ),
1962 });
1963 }
1964 let expected_len = usize::try_from(meta.page_size).map_err(|_| FrankenError::WalCorrupt {
1965 detail: format!("page_size {} does not fit in usize", meta.page_size),
1966 })?;
1967
1968 for (index, page) in source_pages.iter().enumerate() {
1969 if page.len() != expected_len {
1970 return Err(FrankenError::WalCorrupt {
1971 detail: format!(
1972 "source page {index} has length {}, expected {expected_len}",
1973 page.len()
1974 ),
1975 });
1976 }
1977 let actual_hash = wal_fec_source_hash_xxh3_128(page);
1978 let expected_hash = meta.source_page_xxh3_128[index];
1979 if actual_hash != expected_hash {
1980 return Err(FrankenError::WalCorrupt {
1981 detail: format!("source page hash mismatch at index {index}"),
1982 });
1983 }
1984 }
1985 Ok(())
1986}
1987
1988fn derive_repair_seed(meta: &WalFecGroupMeta, repair_index: u32) -> u64 {
1989 let mut seed_material = Vec::with_capacity(16 + (7 * size_of::<u32>()));
1990 seed_material.extend_from_slice(meta.object_id.as_bytes());
1991 seed_material.extend_from_slice(&meta.wal_salt1.to_le_bytes());
1992 seed_material.extend_from_slice(&meta.wal_salt2.to_le_bytes());
1993 seed_material.extend_from_slice(&meta.start_frame_no.to_le_bytes());
1994 seed_material.extend_from_slice(&meta.end_frame_no.to_le_bytes());
1995 seed_material.extend_from_slice(&meta.k_source.to_le_bytes());
1996 seed_material.extend_from_slice(&meta.r_repair.to_le_bytes());
1997 seed_material.extend_from_slice(&repair_index.to_le_bytes());
1998 xxh3_64(&seed_material)
1999}
2000
2001fn update_max_pending(max_pending: &AtomicUsize, candidate: usize) {
2002 let mut observed = max_pending.load(Ordering::SeqCst);
2003 while candidate > observed {
2004 match max_pending.compare_exchange(observed, candidate, Ordering::SeqCst, Ordering::SeqCst)
2005 {
2006 Ok(_) => break,
2007 Err(new_observed) => observed = new_observed,
2008 }
2009 }
2010}
2011
2012#[must_use]
2014pub fn build_source_page_hashes(page_payloads: &[Vec<u8>]) -> Vec<Xxh3Checksum128> {
2015 page_payloads
2016 .iter()
2017 .map(|page| wal_fec_source_hash_xxh3_128(page))
2018 .collect()
2019}
2020
2021pub fn wal_fec_raptorq_decode(
2031 meta: &WalFecGroupMeta,
2032 symbols: &[(u32, Vec<u8>)],
2033) -> Result<Vec<Vec<u8>>> {
2034 let k = usize::try_from(meta.k_source).map_err(|_| FrankenError::WalCorrupt {
2035 detail: format!("k_source {} does not fit in usize", meta.k_source),
2036 })?;
2037 let symbol_size = usize::try_from(meta.oti.t).map_err(|_| FrankenError::WalCorrupt {
2038 detail: format!("OTI symbol size {} does not fit in usize", meta.oti.t),
2039 })?;
2040
2041 let encoder_seed = derive_repair_seed(meta, 0);
2043 let decoder =
2044 asupersync::raptorq::decoder::InactivationDecoder::new(k, symbol_size, encoder_seed);
2045
2046 let mut received = decoder.constraint_symbols();
2048 let repair_padding_delta = {
2049 let params = decoder.params();
2050 params
2051 .k_prime
2052 .checked_sub(params.k)
2053 .and_then(|delta| u32::try_from(delta).ok())
2054 .ok_or_else(|| FrankenError::WalCorrupt {
2055 detail: format!(
2056 "invalid RaptorQ padding domain: K={} K'={}",
2057 params.k, params.k_prime
2058 ),
2059 })?
2060 };
2061
2062 for &(esi, ref data) in symbols {
2064 let esi_usize = esi as usize;
2065 if esi_usize < k {
2066 let (cols, coefs) = decoder.source_equation(esi);
2067 received.push(asupersync::raptorq::decoder::ReceivedSymbol {
2068 esi,
2069 is_source: true,
2070 columns: cols,
2071 coefficients: coefs,
2072 data: data.clone(),
2073 });
2074 } else {
2075 esi.checked_add(repair_padding_delta)
2076 .ok_or_else(|| FrankenError::WalCorrupt {
2077 detail: format!("invalid RaptorQ repair ESI {esi}: overflow"),
2078 })?;
2079 let (cols, coefs) = decoder
2080 .repair_equation_rfc6330(esi)
2081 .into_wal_fec_result(esi)?;
2082 received.push(asupersync::raptorq::decoder::ReceivedSymbol::repair(
2083 esi,
2084 cols,
2085 coefs,
2086 data.clone(),
2087 ));
2088 }
2089 }
2090
2091 let result = decoder
2092 .decode(&received)
2093 .map_err(|err| FrankenError::WalCorrupt {
2094 detail: format!("RaptorQ decode failed: {err:?}"),
2095 })?;
2096
2097 if result.source.len() != k {
2098 return Err(FrankenError::WalCorrupt {
2099 detail: format!(
2100 "RaptorQ decode returned {} source symbols, expected {k}",
2101 result.source.len()
2102 ),
2103 });
2104 }
2105
2106 debug!(
2107 k_source = k,
2108 peeled = result.stats.peeled,
2109 inactivated = result.stats.inactivated,
2110 gauss_ops = result.stats.gauss_ops,
2111 "wal-fec RaptorQ decode succeeded"
2112 );
2113
2114 Ok(result.source)
2115}
2116
2117#[must_use]
2119pub fn wal_fec_path_for_wal(wal_path: &Path) -> PathBuf {
2120 let wal_name = wal_path.to_string_lossy();
2121 if wal_name.ends_with("-wal") || wal_name.ends_with(".wal") {
2122 PathBuf::from(format!("{wal_name}-fec"))
2123 } else {
2124 PathBuf::from(format!("{wal_name}.wal-fec"))
2125 }
2126}
2127
2128pub fn read_wal_fec_raptorq_repair_symbols(sidecar_path: &Path) -> Result<u8> {
2133 if !sidecar_path.exists() {
2134 return Ok(DEFAULT_RAPTORQ_REPAIR_SYMBOLS);
2135 }
2136
2137 let mut file = match fs::File::open(sidecar_path) {
2138 Ok(f) => f,
2139 Err(_) => return Ok(DEFAULT_RAPTORQ_REPAIR_SYMBOLS),
2140 };
2141 let mut bytes = [0_u8; WAL_FEC_PRAGMA_HEADER_BYTES];
2142 use std::io::Read;
2143 let read_len = file.read(&mut bytes).unwrap_or(0);
2144
2145 let Some(header) = WalFecPragmaHeader::from_prefix(&bytes[..read_len])? else {
2146 return Ok(DEFAULT_RAPTORQ_REPAIR_SYMBOLS);
2147 };
2148
2149 debug!(
2150 sidecar = %sidecar_path.display(),
2151 raptorq_repair_symbols = header.raptorq_repair_symbols,
2152 "loaded wal-fec repair symbol setting from sidecar header"
2153 );
2154 Ok(header.raptorq_repair_symbols)
2155}
2156
2157pub fn persist_wal_fec_raptorq_repair_symbols(sidecar_path: &Path, value: u8) -> Result<()> {
2161 use std::io::{Read, Seek, SeekFrom, Write};
2162
2163 if !sidecar_path.exists() {
2164 if let Some(parent) = sidecar_path.parent() {
2165 if !parent.as_os_str().is_empty() {
2166 fs::create_dir_all(parent)?;
2167 }
2168 }
2169 let header = WalFecPragmaHeader::new(value);
2170 match fs::OpenOptions::new()
2174 .write(true)
2175 .create_new(true)
2176 .open(sidecar_path)
2177 {
2178 Ok(mut file) => {
2179 file.write_all(&header.to_bytes())?;
2180 file.sync_all()?;
2181 info!(
2182 sidecar = %sidecar_path.display(),
2183 raptorq_repair_symbols = value,
2184 "persisted wal-fec repair symbol setting (new file)"
2185 );
2186 return Ok(());
2187 }
2188 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
2189 }
2192 Err(e) => return Err(e.into()),
2193 }
2194 }
2195
2196 let mut file = fs::OpenOptions::new()
2197 .read(true)
2198 .write(true)
2199 .open(sidecar_path)?;
2200
2201 let mut header_buf = [0_u8; WAL_FEC_PRAGMA_HEADER_BYTES];
2202 let read_len = file.read(&mut header_buf).unwrap_or(0);
2203 let has_header = WalFecPragmaHeader::from_prefix(&header_buf[..read_len])?.is_some();
2204 let header = WalFecPragmaHeader::new(value);
2205
2206 file.seek(SeekFrom::Start(0))?;
2207 if has_header {
2208 file.write_all(&header.to_bytes())?;
2209 } else {
2210 let temp_path = sidecar_path.with_extension("wal-fec.tmp");
2213 let result = (|| -> Result<()> {
2214 let mut temp_file = std::io::BufWriter::new(fs::File::create(&temp_path)?);
2215 temp_file.write_all(&header.to_bytes())?;
2216 std::io::copy(&mut file, &mut temp_file)?;
2217 let inner = temp_file.into_inner().map_err(|e| e.into_error())?;
2218 inner.sync_all()?;
2219 fs::rename(&temp_path, sidecar_path)?;
2220 Ok(())
2221 })();
2222 if result.is_err() {
2223 let _ = fs::remove_file(&temp_path);
2224 }
2225 result?;
2226 }
2227
2228 info!(
2229 sidecar = %sidecar_path.display(),
2230 raptorq_repair_symbols = value,
2231 "persisted wal-fec repair symbol setting"
2232 );
2233 Ok(())
2234}
2235
2236pub fn ensure_wal_with_fec_sidecar(wal_path: &Path) -> Result<PathBuf> {
2238 fs::OpenOptions::new()
2239 .create(true)
2240 .append(true)
2241 .open(wal_path)?;
2242 let sidecar_path = wal_fec_path_for_wal(wal_path);
2243 fs::OpenOptions::new()
2244 .create(true)
2245 .append(true)
2246 .open(&sidecar_path)?;
2247 Ok(sidecar_path)
2248}
2249
2250pub fn append_wal_fec_group(sidecar_path: &Path, group: &WalFecGroupRecord) -> Result<()> {
2252 group.validate_layout()?;
2253 let group_id = group.meta.group_id();
2254 debug!(
2255 group_id = %group_id,
2256 k_source = group.meta.k_source,
2257 r_repair = group.meta.r_repair,
2258 "appending wal-fec group"
2259 );
2260
2261 let mut file = fs::OpenOptions::new()
2262 .create(true)
2263 .append(true)
2264 .open(sidecar_path)?;
2265 let meta_bytes = group.meta.to_record_bytes();
2266 write_length_prefixed(&mut file, &meta_bytes, "group metadata")?;
2267 for symbol in &group.repair_symbols {
2268 write_length_prefixed(&mut file, &symbol.to_bytes(), "repair symbol")?;
2269 }
2270 file.sync_data()?;
2271 info!(
2272 group_id = %group_id,
2273 sidecar = %sidecar_path.display(),
2274 repair_symbols = group.repair_symbols.len(),
2275 "wal-fec group appended"
2276 );
2277 Ok(())
2278}
2279
2280pub fn scan_wal_fec(sidecar_path: &Path) -> Result<WalFecScanResult> {
2285 if !sidecar_path.exists() {
2286 return Ok(WalFecScanResult::default());
2287 }
2288 let bytes = fs::read(sidecar_path)?;
2289 let mut cursor = scan_offset_after_optional_pragma_header(&bytes)?;
2290 let mut groups = Vec::new();
2291 let mut truncated_tail = false;
2292
2293 while cursor < bytes.len() {
2294 let Some(meta_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
2295 truncated_tail = true;
2296 warn!(
2297 sidecar = %sidecar_path.display(),
2298 cursor,
2299 "truncated wal-fec metadata tail detected"
2300 );
2301 break;
2302 };
2303 let meta = WalFecGroupMeta::from_record_bytes(meta_bytes)?;
2304 let r_repair_usize =
2305 usize::try_from(meta.r_repair).map_err(|_| FrankenError::WalCorrupt {
2306 detail: format!("r_repair {} does not fit in usize", meta.r_repair),
2307 })?;
2308
2309 if bytes.len().saturating_sub(cursor) < r_repair_usize.saturating_mul(4) {
2312 return Err(FrankenError::WalCorrupt {
2313 detail: format!("r_repair {} exceeds remaining buffer", meta.r_repair),
2314 });
2315 }
2316
2317 let mut repair_symbols = Vec::with_capacity(r_repair_usize);
2318
2319 for _ in 0..meta.r_repair {
2320 let Some(symbol_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
2321 truncated_tail = true;
2322 warn!(
2323 sidecar = %sidecar_path.display(),
2324 group_id = %meta.group_id(),
2325 cursor,
2326 "truncated wal-fec repair-symbol tail detected"
2327 );
2328 break;
2329 };
2330 let symbol = SymbolRecord::from_bytes(symbol_bytes).map_err(|err| {
2331 error!(
2332 sidecar = %sidecar_path.display(),
2333 group_id = %meta.group_id(),
2334 error = %err,
2335 "invalid wal-fec repair symbol"
2336 );
2337 FrankenError::WalCorrupt {
2338 detail: format!("invalid wal-fec repair symbol: {err}"),
2339 }
2340 })?;
2341 repair_symbols.push(symbol);
2342 }
2343
2344 if truncated_tail {
2345 break;
2346 }
2347 groups.push(WalFecGroupRecord::new(meta, repair_symbols)?);
2348 }
2349
2350 Ok(WalFecScanResult {
2351 groups,
2352 truncated_tail,
2353 })
2354}
2355
2356pub fn find_wal_fec_group(
2358 sidecar_path: &Path,
2359 group_id: WalFecGroupId,
2360) -> Result<Option<WalFecGroupRecord>> {
2361 let scan = scan_wal_fec(sidecar_path)?;
2362 Ok(scan
2363 .groups
2364 .into_iter()
2365 .find(|group| group.meta.group_id() == group_id))
2366}
2367
2368const BD_1HI_11_BEAD_ID: &str = "bd-1hi.11";
2369
2370#[derive(Debug, Clone, PartialEq, Eq)]
2371struct WalFecRecoveryGroupRecord {
2372 meta: WalFecGroupMeta,
2373 repair_symbols: Vec<SymbolRecord>,
2374 corruption_observations: u32,
2375}
2376
2377#[must_use]
2379pub fn identify_damaged_commit_group(
2380 groups: &[WalFecGroupRecord],
2381 wal_salts: WalSalts,
2382 damaged_frame_no: u32,
2383) -> Option<WalFecGroupId> {
2384 groups
2385 .iter()
2386 .find(|group| {
2387 let meta = &group.meta;
2388 meta.wal_salt1 == wal_salts.salt1
2389 && meta.wal_salt2 == wal_salts.salt2
2390 && meta.start_frame_no <= damaged_frame_no
2391 && damaged_frame_no <= meta.end_frame_no
2392 })
2393 .map(|group| group.meta.group_id())
2394}
2395
2396pub fn recover_wal_fec_group_with_decoder<F>(
2405 sidecar_path: &Path,
2406 group_id: WalFecGroupId,
2407 wal_salts: WalSalts,
2408 first_checksum_mismatch_frame_no: u32,
2409 wal_frames: &[WalFrameCandidate],
2410 mut decode: F,
2411) -> Result<WalFecRecoveryOutcome>
2412where
2413 F: FnMut(&WalFecGroupMeta, &[(u32, Vec<u8>)]) -> Result<Vec<Vec<u8>>>,
2414{
2415 let groups = match scan_wal_fec_for_recovery(sidecar_path) {
2416 Ok(groups) => groups,
2417 Err(err) => {
2418 warn!(
2419 bead_id = BD_1HI_11_BEAD_ID,
2420 group_id = %group_id,
2421 sidecar = %sidecar_path.display(),
2422 error = %err,
2423 "wal-fec sidecar unreadable; falling back to sqlite-compatible truncation"
2424 );
2425 return Ok(truncate_outcome(
2426 group_id,
2427 first_checksum_mismatch_frame_no,
2428 WalFecRecoveryFallbackReason::SidecarUnreadable,
2429 RecoveryProofStats::new(0),
2430 ));
2431 }
2432 };
2433
2434 let Some(group) = groups
2435 .into_iter()
2436 .find(|group| group.meta.group_id() == group_id)
2437 else {
2438 warn!(
2439 bead_id = BD_1HI_11_BEAD_ID,
2440 group_id = %group_id,
2441 sidecar = %sidecar_path.display(),
2442 "wal-fec group metadata missing; falling back to sqlite-compatible truncation"
2443 );
2444 return Ok(truncate_outcome(
2445 group_id,
2446 first_checksum_mismatch_frame_no,
2447 WalFecRecoveryFallbackReason::MissingSidecarGroup,
2448 RecoveryProofStats::new(0),
2449 ));
2450 };
2451
2452 if group.meta.verify_salt_binding(wal_salts).is_err() {
2453 warn!(
2454 bead_id = BD_1HI_11_BEAD_ID,
2455 group_id = %group_id,
2456 "wal-fec group salt mismatch; rejecting sidecar group and truncating"
2457 );
2458 return Ok(truncate_outcome(
2459 group_id,
2460 group.meta.start_frame_no,
2461 WalFecRecoveryFallbackReason::SaltMismatch,
2462 RecoveryProofStats::new(group.meta.k_source),
2463 ));
2464 }
2465
2466 recover_wal_fec_group_record_with_decoder(
2467 &group,
2468 first_checksum_mismatch_frame_no,
2469 wal_frames,
2470 &mut decode,
2471 )
2472}
2473
2474pub fn recover_wal_fec_group_with_config<F>(
2483 sidecar_path: &Path,
2484 group_id: WalFecGroupId,
2485 wal_salts: WalSalts,
2486 first_checksum_mismatch_frame_no: u32,
2487 wal_frames: &[WalFrameCandidate],
2488 config: &WalFecRecoveryConfig,
2489 decode: F,
2490) -> Result<(WalFecRecoveryOutcome, WalFecRecoveryLog)>
2491where
2492 F: FnMut(&WalFecGroupMeta, &[(u32, Vec<u8>)]) -> Result<Vec<Vec<u8>>>,
2493{
2494 let span = tracing::span!(
2495 tracing::Level::WARN,
2496 "wal_raptorq",
2497 segment_id = group_id.end_frame_no,
2498 corruption_detected = tracing::field::Empty,
2499 symbols_used_for_repair = tracing::field::Empty,
2500 repair_success = tracing::field::Empty,
2501 repair_duration_us = tracing::field::Empty,
2502 );
2503 let _guard = span.enter();
2504
2505 if !config.recovery_enabled {
2506 info!(
2507 bead_id = BD_1W6K_25_BEAD_ID,
2508 group_id = %group_id,
2509 "wal-fec recovery disabled; falling back to sqlite-compatible truncation"
2510 );
2511 span.record("corruption_detected", false);
2512 span.record("symbols_used_for_repair", 0_u32);
2513 span.record("repair_success", false);
2514 span.record("repair_duration_us", 0_u64);
2515 let outcome = truncate_outcome(
2516 group_id,
2517 first_checksum_mismatch_frame_no,
2518 WalFecRecoveryFallbackReason::RecoveryDisabled,
2519 RecoveryProofStats::new(0),
2520 );
2521 let log = WalFecRecoveryLog {
2522 group_id,
2523 recovery_enabled: false,
2524 outcome_is_recovered: false,
2525 fallback_reason: Some(WalFecRecoveryFallbackReason::RecoveryDisabled),
2526 validated_source_symbols: 0,
2527 validated_repair_symbols: 0,
2528 required_symbols: 0,
2529 available_symbols: 0,
2530 recovered_frame_nos: Vec::new(),
2531 corruption_observations: 0,
2532 decode_attempted: false,
2533 decode_succeeded: false,
2534 };
2535 record_raptorq_recovery_log(&log, Duration::ZERO);
2536 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_repair(false, 0);
2537 return Ok((outcome, log));
2538 }
2539
2540 let started = Instant::now();
2541 let outcome = recover_wal_fec_group_with_decoder(
2542 sidecar_path,
2543 group_id,
2544 wal_salts,
2545 first_checksum_mismatch_frame_no,
2546 wal_frames,
2547 decode,
2548 )?;
2549
2550 let log = recovery_log_from_outcome(group_id, &outcome, true);
2551 let elapsed = started.elapsed();
2552 let duration_us = crate::metrics::duration_us_saturating(elapsed);
2553 let repair_attempt = repair_attempt_for_log(&log);
2554 let reason_code = recovery_reason_code_for_log(&log);
2555 let outcome_code = recovery_outcome_code(&log);
2556 let symbol_state = symbol_state_for_log(&log);
2557
2558 span.record("corruption_detected", log.corruption_observations > 0);
2559 span.record(
2560 "symbols_used_for_repair",
2561 log.available_symbols.min(log.required_symbols),
2562 );
2563 span.record("repair_success", log.outcome_is_recovered);
2564 span.record("repair_duration_us", duration_us);
2565 info!(
2566 bead_id = BD_1W6K_25_BEAD_ID,
2567 group_id = %group_id,
2568 repair_attempt,
2569 symbol_state = %symbol_state,
2570 reason_code,
2571 outcome = outcome_code,
2572 "wal-fec recovery decision"
2573 );
2574
2575 record_raptorq_recovery_log(&log, elapsed);
2576 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS
2577 .record_repair(log.outcome_is_recovered, duration_us);
2578 Ok((outcome, log))
2579}
2580
2581#[must_use]
2583pub fn recovery_log_from_outcome(
2584 group_id: WalFecGroupId,
2585 outcome: &WalFecRecoveryOutcome,
2586 recovery_enabled: bool,
2587) -> WalFecRecoveryLog {
2588 match outcome {
2589 WalFecRecoveryOutcome::Recovered(group) => WalFecRecoveryLog {
2590 group_id,
2591 recovery_enabled,
2592 outcome_is_recovered: true,
2593 fallback_reason: None,
2594 validated_source_symbols: group.decode_proof.validated_source_symbols,
2595 validated_repair_symbols: group.decode_proof.validated_repair_symbols,
2596 required_symbols: group.decode_proof.required_symbols,
2597 available_symbols: group.decode_proof.available_symbols,
2598 recovered_frame_nos: group.decode_proof.recovered_frame_nos.clone(),
2599 corruption_observations: group.decode_proof.corruption_observations,
2600 decode_attempted: group.decode_proof.decode_attempted,
2601 decode_succeeded: group.decode_proof.decode_succeeded,
2602 },
2603 WalFecRecoveryOutcome::TruncateBeforeGroup { decode_proof, .. } => WalFecRecoveryLog {
2604 group_id,
2605 recovery_enabled,
2606 outcome_is_recovered: false,
2607 fallback_reason: decode_proof.fallback_reason,
2608 validated_source_symbols: decode_proof.validated_source_symbols,
2609 validated_repair_symbols: decode_proof.validated_repair_symbols,
2610 required_symbols: decode_proof.required_symbols,
2611 available_symbols: decode_proof.available_symbols,
2612 recovered_frame_nos: decode_proof.recovered_frame_nos.clone(),
2613 corruption_observations: decode_proof.corruption_observations,
2614 decode_attempted: decode_proof.decode_attempted,
2615 decode_succeeded: decode_proof.decode_succeeded,
2616 },
2617 }
2618}
2619
2620const BD_1W6K_25_BEAD_ID: &str = "bd-1w6k.2.5";
2621
2622fn recover_wal_fec_group_record_with_decoder<F>(
2623 group: &WalFecRecoveryGroupRecord,
2624 first_checksum_mismatch_frame_no: u32,
2625 wal_frames: &[WalFrameCandidate],
2626 decode: &mut F,
2627) -> Result<WalFecRecoveryOutcome>
2628where
2629 F: FnMut(&WalFecGroupMeta, &[(u32, Vec<u8>)]) -> Result<Vec<Vec<u8>>>,
2630{
2631 let meta = &group.meta;
2632 let group_id = meta.group_id();
2633 let k_required = usize::try_from(meta.k_source).map_err(|_| FrankenError::WalCorrupt {
2634 detail: format!("k_source {} does not fit in usize", meta.k_source),
2635 })?;
2636 let page_len = usize::try_from(meta.page_size).map_err(|_| FrankenError::WalCorrupt {
2637 detail: format!("page_size {} does not fit in usize", meta.page_size),
2638 })?;
2639
2640 let frame_payload_by_no = build_frame_payload_map(wal_frames);
2641 let mut source_collection = collect_valid_source_symbols(
2642 meta,
2643 group_id,
2644 first_checksum_mismatch_frame_no,
2645 page_len,
2646 &frame_payload_by_no,
2647 k_required,
2648 )?;
2649 let (repair_symbols, validated_repair_symbols, rejected_repair_symbols) =
2650 collect_valid_repair_symbols(meta, group_id, &group.repair_symbols);
2651 source_collection.available_symbols.extend(repair_symbols);
2652 source_collection
2653 .available_symbols
2654 .sort_unstable_by_key(|(esi, _)| *esi);
2655
2656 let mut stats = build_recovery_proof_stats(
2657 meta.k_source,
2658 &source_collection,
2659 validated_repair_symbols,
2660 rejected_repair_symbols,
2661 group.corruption_observations,
2662 );
2663
2664 if source_collection.available_symbols.len() < k_required {
2665 error!(
2666 bead_id = BD_1HI_11_BEAD_ID,
2667 group_id = %group_id,
2668 required_symbols = meta.k_source,
2669 available_symbols = stats.available_symbols,
2670 "insufficient symbols for wal-fec decode; truncating before group"
2671 );
2672 return Ok(insufficient_symbols_outcome(meta, group_id, stats));
2673 }
2674
2675 if stats.validated_source_symbols == meta.k_source {
2676 stats.decode_succeeded = true;
2677 info!(
2678 bead_id = BD_1HI_11_BEAD_ID,
2679 group_id = %group_id,
2680 validated_source_symbols = stats.validated_source_symbols,
2681 "wal-fec recovery fast path: group fully intact"
2682 );
2683 return Ok(fast_path_outcome(
2684 meta,
2685 group_id,
2686 source_collection.source_pages,
2687 stats,
2688 ));
2689 }
2690
2691 stats.decode_attempted = true;
2692 let decoded_pages = match decode(meta, &source_collection.available_symbols) {
2693 Ok(decoded) => decoded,
2694 Err(err) => {
2695 error!(
2696 bead_id = BD_1HI_11_BEAD_ID,
2697 group_id = %group_id,
2698 error = %err,
2699 "wal-fec decode failed; truncating before group"
2700 );
2701 return Ok(decode_failed_outcome(meta, group_id, stats));
2702 }
2703 };
2704
2705 if !decoded_pages_match_expected(meta, &decoded_pages, page_len) {
2706 error!(
2707 bead_id = BD_1HI_11_BEAD_ID,
2708 group_id = %group_id,
2709 "decoded payload failed structural/hash verification; truncating before group"
2710 );
2711 return Ok(decoded_mismatch_outcome(meta, group_id, stats));
2712 }
2713
2714 Ok(finalize_decoded_success_outcome(
2715 meta,
2716 group_id,
2717 &source_collection.source_pages,
2718 decoded_pages,
2719 stats,
2720 ))
2721}
2722
2723fn finalize_decoded_success_outcome(
2724 meta: &WalFecGroupMeta,
2725 group_id: WalFecGroupId,
2726 source_pages: &[Option<Vec<u8>>],
2727 decoded_pages: Vec<Vec<u8>>,
2728 mut stats: RecoveryProofStats,
2729) -> WalFecRecoveryOutcome {
2730 let recovered_frame_nos = recovered_frame_numbers(meta, source_pages);
2731 let recovered_count = usize_to_u32(recovered_frame_nos.len());
2732 if recovered_count >= meta.r_repair.saturating_sub(1) {
2733 warn!(
2734 bead_id = BD_1HI_11_BEAD_ID,
2735 group_id = %group_id,
2736 recovered_frames = recovered_count,
2737 repair_capacity = meta.r_repair,
2738 "wal-fec recovery near repair-capacity limit"
2739 );
2740 }
2741 info!(
2742 bead_id = BD_1HI_11_BEAD_ID,
2743 group_id = %group_id,
2744 recovered_frames = recovered_count,
2745 db_size_pages = meta.db_size_pages,
2746 "wal-fec recovery succeeded"
2747 );
2748 stats.decode_succeeded = true;
2749 stats.recovered_frame_nos.clone_from(&recovered_frame_nos);
2750 decoded_success_outcome(meta, group_id, decoded_pages, recovered_frame_nos, stats)
2751}
2752
2753fn insufficient_symbols_outcome(
2754 meta: &WalFecGroupMeta,
2755 group_id: WalFecGroupId,
2756 stats: RecoveryProofStats,
2757) -> WalFecRecoveryOutcome {
2758 truncate_outcome(
2759 group_id,
2760 meta.start_frame_no,
2761 WalFecRecoveryFallbackReason::InsufficientSymbols,
2762 stats,
2763 )
2764}
2765
2766fn fast_path_outcome(
2767 meta: &WalFecGroupMeta,
2768 group_id: WalFecGroupId,
2769 source_pages: Vec<Option<Vec<u8>>>,
2770 stats: RecoveryProofStats,
2771) -> WalFecRecoveryOutcome {
2772 let recovered_pages = source_pages
2773 .into_iter()
2774 .map(|page| page.expect("source_pages are complete when all sources validated"))
2775 .collect::<Vec<_>>();
2776 WalFecRecoveryOutcome::Recovered(WalFecRecoveredGroup {
2777 meta: meta.clone(),
2778 recovered_pages,
2779 recovered_frame_nos: Vec::new(),
2780 db_size_pages: meta.db_size_pages,
2781 decode_proof: build_decode_proof(group_id, stats, None),
2782 })
2783}
2784
2785fn decode_failed_outcome(
2786 meta: &WalFecGroupMeta,
2787 group_id: WalFecGroupId,
2788 stats: RecoveryProofStats,
2789) -> WalFecRecoveryOutcome {
2790 truncate_outcome(
2791 group_id,
2792 meta.start_frame_no,
2793 WalFecRecoveryFallbackReason::DecodeFailed,
2794 stats,
2795 )
2796}
2797
2798fn decoded_mismatch_outcome(
2799 meta: &WalFecGroupMeta,
2800 group_id: WalFecGroupId,
2801 stats: RecoveryProofStats,
2802) -> WalFecRecoveryOutcome {
2803 truncate_outcome(
2804 group_id,
2805 meta.start_frame_no,
2806 WalFecRecoveryFallbackReason::DecodedPayloadMismatch,
2807 stats,
2808 )
2809}
2810
2811fn decoded_success_outcome(
2812 meta: &WalFecGroupMeta,
2813 group_id: WalFecGroupId,
2814 decoded_pages: Vec<Vec<u8>>,
2815 recovered_frame_nos: Vec<u32>,
2816 stats: RecoveryProofStats,
2817) -> WalFecRecoveryOutcome {
2818 WalFecRecoveryOutcome::Recovered(WalFecRecoveredGroup {
2819 meta: meta.clone(),
2820 recovered_pages: decoded_pages,
2821 recovered_frame_nos,
2822 db_size_pages: meta.db_size_pages,
2823 decode_proof: build_decode_proof(group_id, stats, None),
2824 })
2825}
2826
2827#[derive(Debug, Clone, PartialEq, Eq, Default)]
2828struct RecoveryProofStats {
2829 required_symbols: u32,
2830 available_symbols: u32,
2831 validated_source_symbols: u32,
2832 validated_repair_symbols: u32,
2833 corruption_observations: u32,
2834 decode_attempted: bool,
2835 decode_succeeded: bool,
2836 recovered_frame_nos: Vec<u32>,
2837}
2838
2839impl RecoveryProofStats {
2840 const fn new(required_symbols: u32) -> Self {
2841 Self {
2842 required_symbols,
2843 available_symbols: 0,
2844 validated_source_symbols: 0,
2845 validated_repair_symbols: 0,
2846 corruption_observations: 0,
2847 decode_attempted: false,
2848 decode_succeeded: false,
2849 recovered_frame_nos: Vec::new(),
2850 }
2851 }
2852}
2853
2854fn build_recovery_proof_stats(
2855 required_symbols: u32,
2856 source_collection: &SourceSymbolCollection,
2857 validated_repair_symbols: u32,
2858 rejected_repair_symbols: u32,
2859 sidecar_corruption_observations: u32,
2860) -> RecoveryProofStats {
2861 let mut stats = RecoveryProofStats::new(required_symbols);
2862 stats.available_symbols = usize_to_u32(source_collection.available_symbols.len());
2863 stats.validated_source_symbols = source_collection.validated_source_symbols;
2864 stats.validated_repair_symbols = validated_repair_symbols;
2865 stats.corruption_observations =
2866 sidecar_corruption_observations.saturating_add(rejected_repair_symbols);
2867 stats
2868}
2869
2870#[derive(Debug, Clone, PartialEq, Eq)]
2871struct SourceSymbolCollection {
2872 available_symbols: Vec<(u32, Vec<u8>)>,
2873 source_pages: Vec<Option<Vec<u8>>>,
2874 validated_source_symbols: u32,
2875}
2876
2877fn build_frame_payload_map(wal_frames: &[WalFrameCandidate]) -> BTreeMap<u32, &[u8]> {
2878 let mut frame_payload_by_no = BTreeMap::new();
2879 for frame in wal_frames {
2880 frame_payload_by_no
2881 .entry(frame.frame_no)
2882 .or_insert(frame.page_data.as_slice());
2883 }
2884 frame_payload_by_no
2885}
2886
2887fn collect_valid_source_symbols(
2888 meta: &WalFecGroupMeta,
2889 group_id: WalFecGroupId,
2890 first_checksum_mismatch_frame_no: u32,
2891 page_len: usize,
2892 frame_payload_by_no: &BTreeMap<u32, &[u8]>,
2893 k_required: usize,
2894) -> Result<SourceSymbolCollection> {
2895 let mut available_symbols: Vec<(u32, Vec<u8>)> = Vec::new();
2896 let mut source_pages = vec![None; k_required];
2897 let mut validated_source_symbols = 0_u32;
2898
2899 for source_esi in 0..meta.k_source {
2900 let index = usize::try_from(source_esi).map_err(|_| FrankenError::WalCorrupt {
2901 detail: format!("source ESI {source_esi} does not fit in usize"),
2902 })?;
2903 let frame_no = meta.start_frame_no.checked_add(source_esi).ok_or_else(|| {
2904 FrankenError::WalCorrupt {
2905 detail: "frame number overflow while collecting source symbols".to_owned(),
2906 }
2907 })?;
2908 let Some(payload) = frame_payload_by_no.get(&frame_no).copied() else {
2909 debug!(
2910 bead_id = BD_1HI_11_BEAD_ID,
2911 group_id = %group_id,
2912 frame_no,
2913 "source frame missing from wal candidates"
2914 );
2915 continue;
2916 };
2917 if payload.len() != page_len {
2918 warn!(
2919 bead_id = BD_1HI_11_BEAD_ID,
2920 group_id = %group_id,
2921 frame_no,
2922 payload_len = payload.len(),
2923 expected_len = page_len,
2924 "source frame payload length mismatch; excluding from decoder input"
2925 );
2926 continue;
2927 }
2928 if frame_no >= first_checksum_mismatch_frame_no
2929 && !verify_wal_fec_source_hash(payload, meta.source_page_xxh3_128[index])
2930 {
2931 warn!(
2932 bead_id = BD_1HI_11_BEAD_ID,
2933 group_id = %group_id,
2934 frame_no,
2935 esi = source_esi,
2936 "source frame hash mismatch at/after wal chain break; excluding from decoder input"
2937 );
2938 continue;
2939 }
2940 if frame_no >= first_checksum_mismatch_frame_no {
2941 debug!(
2942 bead_id = BD_1HI_11_BEAD_ID,
2943 group_id = %group_id,
2944 frame_no,
2945 esi = source_esi,
2946 "source frame validated via independent xxh3 hash"
2947 );
2948 }
2949 let payload_vec = payload.to_vec();
2950 source_pages[index] = Some(payload_vec.clone());
2951 available_symbols.push((source_esi, payload_vec));
2952 validated_source_symbols = validated_source_symbols.saturating_add(1);
2953 }
2954
2955 Ok(SourceSymbolCollection {
2956 available_symbols,
2957 source_pages,
2958 validated_source_symbols,
2959 })
2960}
2961
2962fn collect_valid_repair_symbols(
2963 meta: &WalFecGroupMeta,
2964 group_id: WalFecGroupId,
2965 repair_symbols: &[SymbolRecord],
2966) -> (Vec<(u32, Vec<u8>)>, u32, u32) {
2967 let mut validated_symbols = Vec::new();
2968 let mut validated_repair_symbols = 0_u32;
2969 let mut rejected_repair_symbols = 0_u32;
2970
2971 for symbol in repair_symbols {
2972 if !repair_symbol_matches_meta(meta, symbol) {
2973 warn!(
2974 bead_id = BD_1HI_11_BEAD_ID,
2975 group_id = %group_id,
2976 esi = symbol.esi,
2977 "repair symbol failed metadata binding checks; excluding from decoder input"
2978 );
2979 rejected_repair_symbols = rejected_repair_symbols.saturating_add(1);
2980 continue;
2981 }
2982 validated_repair_symbols = validated_repair_symbols.saturating_add(1);
2983 validated_symbols.push((symbol.esi, symbol.symbol_data.clone()));
2984 }
2985
2986 (
2987 validated_symbols,
2988 validated_repair_symbols,
2989 rejected_repair_symbols,
2990 )
2991}
2992
2993fn recovered_frame_numbers(meta: &WalFecGroupMeta, source_pages: &[Option<Vec<u8>>]) -> Vec<u32> {
2994 source_pages
2995 .iter()
2996 .enumerate()
2997 .filter_map(|(index, page)| {
2998 if page.is_some() {
2999 None
3000 } else {
3001 let idx = u32::try_from(index).ok()?;
3002 meta.start_frame_no.checked_add(idx)
3003 }
3004 })
3005 .collect()
3006}
3007
3008fn decoded_pages_match_expected(
3009 meta: &WalFecGroupMeta,
3010 decoded_pages: &[Vec<u8>],
3011 page_len: usize,
3012) -> bool {
3013 let Ok(k_required) = usize::try_from(meta.k_source) else {
3014 return false;
3015 };
3016 if decoded_pages.len() != k_required {
3017 return false;
3018 }
3019 decoded_pages.iter().enumerate().all(|(index, payload)| {
3020 payload.len() == page_len
3021 && verify_wal_fec_source_hash(payload, meta.source_page_xxh3_128[index])
3022 })
3023}
3024
3025fn repair_symbol_matches_meta(meta: &WalFecGroupMeta, symbol: &SymbolRecord) -> bool {
3026 if symbol.object_id != meta.object_id || symbol.oti != meta.oti {
3027 return false;
3028 }
3029 let repair_start = meta.k_source;
3030 let Some(repair_end) = meta.k_source.checked_add(meta.r_repair) else {
3031 return false;
3032 };
3033 symbol.esi >= repair_start && symbol.esi < repair_end
3034}
3035
3036fn scan_wal_fec_for_recovery(sidecar_path: &Path) -> Result<Vec<WalFecRecoveryGroupRecord>> {
3037 if !sidecar_path.exists() {
3038 return Ok(Vec::new());
3039 }
3040
3041 let bytes = fs::read(sidecar_path)?;
3042 let mut cursor = scan_offset_after_optional_pragma_header(&bytes)?;
3043 let mut groups = Vec::new();
3044
3045 while cursor < bytes.len() {
3046 let Some(meta_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
3047 break;
3048 };
3049 let meta = WalFecGroupMeta::from_record_bytes(meta_bytes)?;
3050 let mut repair_symbols = Vec::new();
3051 let mut corruption_observations = 0_u32;
3052
3053 let mut truncated_tail = false;
3054 for _ in 0..meta.r_repair {
3055 let Some(symbol_bytes) = read_length_prefixed(&bytes, &mut cursor)? else {
3056 truncated_tail = true;
3057 break;
3058 };
3059 match SymbolRecord::from_bytes(symbol_bytes) {
3060 Ok(symbol) => repair_symbols.push(symbol),
3061 Err(err) => {
3062 warn!(
3063 bead_id = BD_1HI_11_BEAD_ID,
3064 group_id = %meta.group_id(),
3065 error = %err,
3066 "invalid wal-fec repair SymbolRecord excluded from recovery set"
3067 );
3068 corruption_observations = corruption_observations.saturating_add(1);
3069 }
3070 }
3071 }
3072 if truncated_tail {
3073 break;
3074 }
3075 groups.push(WalFecRecoveryGroupRecord {
3076 meta,
3077 repair_symbols,
3078 corruption_observations,
3079 });
3080 }
3081
3082 Ok(groups)
3083}
3084
3085fn truncate_outcome(
3086 group_id: WalFecGroupId,
3087 truncate_before_frame_no: u32,
3088 fallback_reason: WalFecRecoveryFallbackReason,
3089 stats: RecoveryProofStats,
3090) -> WalFecRecoveryOutcome {
3091 WalFecRecoveryOutcome::TruncateBeforeGroup {
3092 truncate_before_frame_no,
3093 decode_proof: build_decode_proof(group_id, stats, Some(fallback_reason)),
3094 }
3095}
3096
3097fn build_decode_proof(
3098 group_id: WalFecGroupId,
3099 stats: RecoveryProofStats,
3100 fallback_reason: Option<WalFecRecoveryFallbackReason>,
3101) -> WalFecDecodeProof {
3102 WalFecDecodeProof {
3103 group_id,
3104 required_symbols: stats.required_symbols,
3105 available_symbols: stats.available_symbols,
3106 validated_source_symbols: stats.validated_source_symbols,
3107 validated_repair_symbols: stats.validated_repair_symbols,
3108 corruption_observations: stats.corruption_observations,
3109 decode_attempted: stats.decode_attempted,
3110 decode_succeeded: stats.decode_succeeded,
3111 recovered_frame_nos: stats.recovered_frame_nos,
3112 fallback_reason,
3113 }
3114}
3115
3116fn usize_to_u32(value: usize) -> u32 {
3117 u32::try_from(value).unwrap_or(u32::MAX)
3118}
3119
3120fn scan_offset_after_optional_pragma_header(bytes: &[u8]) -> Result<usize> {
3121 let Some(header) = WalFecPragmaHeader::from_prefix(bytes)? else {
3122 return Ok(0);
3123 };
3124 debug!(
3125 raptorq_repair_symbols = header.raptorq_repair_symbols,
3126 "detected wal-fec pragma header during scan"
3127 );
3128 Ok(WAL_FEC_PRAGMA_HEADER_BYTES)
3129}
3130
3131fn write_length_prefixed(file: &mut fs::File, payload: &[u8], what: &str) -> Result<()> {
3132 let len_u32 = u32::try_from(payload.len()).map_err(|_| FrankenError::WalCorrupt {
3133 detail: format!(
3134 "{what} too large for wal-fec length prefix: {}",
3135 payload.len()
3136 ),
3137 })?;
3138 file.write_all(&len_u32.to_le_bytes())?;
3139 file.write_all(payload)?;
3140 Ok(())
3141}
3142
3143fn read_length_prefixed<'a>(bytes: &'a [u8], cursor: &mut usize) -> Result<Option<&'a [u8]>> {
3144 if *cursor >= bytes.len() {
3145 return Ok(None);
3146 }
3147 if bytes.len() - *cursor < LENGTH_PREFIX_BYTES {
3148 return Ok(None);
3149 }
3150 let mut len_raw = [0u8; LENGTH_PREFIX_BYTES];
3151 len_raw.copy_from_slice(&bytes[*cursor..*cursor + LENGTH_PREFIX_BYTES]);
3152 *cursor += LENGTH_PREFIX_BYTES;
3153 let payload_len =
3154 usize::try_from(u32::from_le_bytes(len_raw)).map_err(|_| FrankenError::WalCorrupt {
3155 detail: "wal-fec length prefix does not fit in usize".to_owned(),
3156 })?;
3157 let end = cursor
3158 .checked_add(payload_len)
3159 .ok_or_else(|| FrankenError::WalCorrupt {
3160 detail: "wal-fec length prefix overflow".to_owned(),
3161 })?;
3162 if end > bytes.len() {
3163 return Ok(None);
3164 }
3165 let payload = &bytes[*cursor..end];
3166 *cursor = end;
3167 Ok(Some(payload))
3168}
3169
3170fn append_u32_le(buf: &mut Vec<u8>, value: u32) {
3171 buf.extend_from_slice(&value.to_le_bytes());
3172}
3173
3174fn append_u64_le(buf: &mut Vec<u8>, value: u64) {
3175 buf.extend_from_slice(&value.to_le_bytes());
3176}
3177
3178fn read_u32_le(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u32> {
3179 let raw = read_array::<4>(bytes, cursor, field)?;
3180 Ok(u32::from_le_bytes(raw))
3181}
3182
3183fn read_u64_le(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u64> {
3184 let raw = read_array::<8>(bytes, cursor, field)?;
3185 Ok(u64::from_le_bytes(raw))
3186}
3187
3188fn read_array<const N: usize>(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<[u8; N]> {
3189 let end = cursor
3190 .checked_add(N)
3191 .ok_or_else(|| FrankenError::WalCorrupt {
3192 detail: format!("overflow reading wal-fec field {field}"),
3193 })?;
3194 if end > bytes.len() {
3195 return Err(FrankenError::WalCorrupt {
3196 detail: format!(
3197 "wal-fec field {field} out of bounds: need {N} bytes at offset {}, total {}",
3198 *cursor,
3199 bytes.len()
3200 ),
3201 });
3202 }
3203 let mut out = [0u8; N];
3204 out.copy_from_slice(&bytes[*cursor..end]);
3205 *cursor = end;
3206 Ok(out)
3207}
3208
3209#[cfg(test)]
3210mod tests {
3211 use super::*;
3212 use std::fs;
3213 use std::sync::{Mutex, OnceLock};
3214
3215 use tempfile::tempdir;
3216
3217 fn telemetry_test_lock() -> std::sync::MutexGuard<'static, ()> {
3218 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3219 match LOCK.get_or_init(|| Mutex::new(())).lock() {
3220 Ok(guard) => guard,
3221 Err(poisoned) => poisoned.into_inner(),
3222 }
3223 }
3224
3225 #[test]
3226 fn test_wal_fec_pragma_header_default_without_header() {
3227 let dir = tempdir().expect("tempdir");
3228 let sidecar = dir.path().join("db.wal-fec");
3229
3230 fs::write(&sidecar, b"legacy-groups-without-header").expect("write legacy bytes");
3231
3232 let value = read_wal_fec_raptorq_repair_symbols(&sidecar).expect("read default");
3233 assert_eq!(value, DEFAULT_RAPTORQ_REPAIR_SYMBOLS);
3234 }
3235
3236 #[test]
3237 fn test_wal_fec_pragma_persist_and_reload_across_reopen() {
3238 let dir = tempdir().expect("tempdir");
3239 let sidecar = dir.path().join("db.wal-fec");
3240
3241 persist_wal_fec_raptorq_repair_symbols(&sidecar, 4).expect("persist setting");
3242 let first_read = read_wal_fec_raptorq_repair_symbols(&sidecar).expect("read setting");
3243 let second_read = read_wal_fec_raptorq_repair_symbols(&sidecar).expect("re-read setting");
3244
3245 assert_eq!(first_read, 4);
3246 assert_eq!(second_read, 4);
3247 }
3248
3249 #[test]
3250 fn test_wal_fec_pragma_persist_preserves_payload_bytes() {
3251 let dir = tempdir().expect("tempdir");
3252 let sidecar = dir.path().join("db.wal-fec");
3253 let legacy_payload = vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE];
3254 fs::write(&sidecar, &legacy_payload).expect("write legacy payload");
3255
3256 persist_wal_fec_raptorq_repair_symbols(&sidecar, 9).expect("persist setting");
3257
3258 let rewritten = fs::read(&sidecar).expect("read rewritten sidecar");
3259 assert!(rewritten.len() >= WAL_FEC_PRAGMA_HEADER_BYTES);
3260 assert_eq!(
3261 &rewritten[WAL_FEC_PRAGMA_HEADER_BYTES..],
3262 legacy_payload.as_slice()
3263 );
3264 }
3265
3266 #[test]
3267 fn test_scan_wal_fec_accepts_header_only_file() {
3268 let dir = tempdir().expect("tempdir");
3269 let sidecar = dir.path().join("db.wal-fec");
3270
3271 persist_wal_fec_raptorq_repair_symbols(&sidecar, 3).expect("persist setting");
3272 let scan = scan_wal_fec(&sidecar).expect("scan header-only sidecar");
3273
3274 assert!(scan.groups.is_empty());
3275 assert!(!scan.truncated_tail);
3276 }
3277
3278 const BEAD_ID_2HA1: &str = "bd-2ha1";
3281
3282 fn make_test_init(k: u32) -> WalFecGroupMetaInit {
3283 let page_size = 4096_u32;
3284 WalFecGroupMetaInit {
3285 wal_salt1: 0x1234_5678,
3286 wal_salt2: 0xABCD_EF01,
3287 start_frame_no: 1,
3288 end_frame_no: k,
3289 db_size_pages: 100,
3290 page_size,
3291 k_source: k,
3292 r_repair: 2,
3293 oti: Oti {
3294 f: u64::from(k) * u64::from(page_size),
3295 al: 0,
3296 t: page_size,
3297 z: 1,
3298 n: 1,
3299 },
3300 object_id: ObjectId::from_bytes([0xAA; 16]),
3301 page_numbers: (1..=k).collect(),
3302 source_page_xxh3_128: (0..k)
3303 .map(|i| Xxh3Checksum128 {
3304 low: u64::from(i),
3305 high: u64::from(i).wrapping_add(1),
3306 })
3307 .collect(),
3308 }
3309 }
3310
3311 #[test]
3312 fn test_meta_roundtrip() {
3313 let init = make_test_init(3);
3314 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3315 let serialized = meta.to_record_bytes();
3316 let deserialized =
3317 WalFecGroupMeta::from_record_bytes(&serialized).expect("from_record_bytes");
3318
3319 assert_eq!(meta, deserialized, "bead_id={BEAD_ID_2HA1} case=roundtrip");
3320 eprintln!(
3321 "DEBUG bead_id={BEAD_ID_2HA1} case=meta_roundtrip serialized_len={}",
3322 serialized.len()
3323 );
3324 }
3325
3326 #[test]
3327 fn test_meta_magic() {
3328 let init = make_test_init(2);
3329 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3330
3331 assert_eq!(
3332 meta.magic, WAL_FEC_GROUP_META_MAGIC,
3333 "bead_id={BEAD_ID_2HA1} case=magic expected=FSQLWFEC"
3334 );
3335 assert_eq!(
3336 &meta.magic, b"FSQLWFEC",
3337 "bead_id={BEAD_ID_2HA1} case=magic_bytes"
3338 );
3339
3340 let bytes = meta.to_record_bytes();
3342 assert_eq!(
3343 &bytes[..8],
3344 b"FSQLWFEC",
3345 "bead_id={BEAD_ID_2HA1} case=serialized_magic"
3346 );
3347 }
3348
3349 #[test]
3350 fn test_meta_invariant_k_source() {
3351 let mut init = make_test_init(3);
3352 init.k_source = 99;
3354 let result = WalFecGroupMeta::from_init(init);
3355 assert!(
3356 result.is_err(),
3357 "bead_id={BEAD_ID_2HA1} case=k_source_mismatch expected=Err"
3358 );
3359 eprintln!(
3360 "INFO bead_id={BEAD_ID_2HA1} case=invariant_k_source error={}",
3361 result.unwrap_err()
3362 );
3363 }
3364
3365 #[test]
3366 fn test_meta_invariant_page_numbers_len() {
3367 let mut init = make_test_init(3);
3368 init.page_numbers.push(999);
3370 let result = WalFecGroupMeta::from_init(init);
3371 assert!(
3372 result.is_err(),
3373 "bead_id={BEAD_ID_2HA1} case=page_numbers_len_mismatch expected=Err"
3374 );
3375 eprintln!(
3376 "WARN bead_id={BEAD_ID_2HA1} case=invariant_page_numbers_len error={}",
3377 result.unwrap_err()
3378 );
3379 }
3380
3381 #[test]
3382 fn test_meta_invariant_xxh3_len() {
3383 let mut init = make_test_init(3);
3384 init.source_page_xxh3_128.pop();
3386 let result = WalFecGroupMeta::from_init(init);
3387 assert!(
3388 result.is_err(),
3389 "bead_id={BEAD_ID_2HA1} case=xxh3_len_mismatch expected=Err"
3390 );
3391 eprintln!(
3392 "WARN bead_id={BEAD_ID_2HA1} case=invariant_xxh3_len error={}",
3393 result.unwrap_err()
3394 );
3395 }
3396
3397 #[test]
3398 fn test_meta_checksum_valid() {
3399 let init = make_test_init(4);
3400 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3401 let serialized = meta.to_record_bytes();
3402
3403 let result = WalFecGroupMeta::from_record_bytes(&serialized);
3405 assert!(result.is_ok(), "bead_id={BEAD_ID_2HA1} case=checksum_valid");
3406 eprintln!(
3407 "INFO bead_id={BEAD_ID_2HA1} case=checksum_valid checksum={:#018x}",
3408 meta.checksum
3409 );
3410 }
3411
3412 #[test]
3413 fn test_meta_checksum_corrupt() {
3414 let init = make_test_init(4);
3415 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3416 let mut serialized = meta.to_record_bytes();
3417
3418 serialized[12] ^= 0x01;
3421
3422 let result = WalFecGroupMeta::from_record_bytes(&serialized);
3423 let err = result.expect_err("bead_id={BEAD_ID_2HA1} case=checksum_corrupt expected=Err");
3424 let msg = err.to_string();
3425 assert!(
3426 msg.contains("checksum mismatch"),
3427 "bead_id={BEAD_ID_2HA1} case=checksum_corrupt expected checksum error, got: {msg}"
3428 );
3429 eprintln!("ERROR bead_id={BEAD_ID_2HA1} case=checksum_corrupt error={err}");
3430 }
3431
3432 #[test]
3433 fn test_recovery_reason_codes_are_stable() {
3434 let base = WalFecRecoveryLog {
3435 group_id: WalFecGroupId {
3436 wal_salt1: 1,
3437 wal_salt2: 2,
3438 end_frame_no: 3,
3439 },
3440 recovery_enabled: true,
3441 outcome_is_recovered: true,
3442 fallback_reason: None,
3443 validated_source_symbols: 5,
3444 validated_repair_symbols: 1,
3445 required_symbols: 6,
3446 available_symbols: 6,
3447 recovered_frame_nos: vec![2],
3448 corruption_observations: 1,
3449 decode_attempted: true,
3450 decode_succeeded: true,
3451 };
3452
3453 assert_eq!(recovery_outcome_code(&base), "recovered");
3454 assert_eq!(recovery_reason_code_for_log(&base), "decode_recovered");
3455 assert!(repair_attempt_for_log(&base));
3456
3457 let mut fast_path = base.clone();
3458 fast_path.decode_attempted = false;
3459 fast_path.corruption_observations = 0;
3460 assert_eq!(recovery_reason_code_for_log(&fast_path), "intact_fast_path");
3461
3462 let mut truncated = base.clone();
3463 truncated.outcome_is_recovered = false;
3464 truncated.fallback_reason = Some(WalFecRecoveryFallbackReason::InsufficientSymbols);
3465 assert_eq!(recovery_outcome_code(&truncated), "truncate_before_group");
3466 assert_eq!(
3467 recovery_reason_code_for_log(&truncated),
3468 WalFecRecoveryFallbackReason::InsufficientSymbols.reason_code()
3469 );
3470 }
3471
3472 #[test]
3473 fn test_symbol_state_serialization_includes_required_fields() {
3474 let log = WalFecRecoveryLog {
3475 group_id: WalFecGroupId {
3476 wal_salt1: 10,
3477 wal_salt2: 20,
3478 end_frame_no: 30,
3479 },
3480 recovery_enabled: true,
3481 outcome_is_recovered: false,
3482 fallback_reason: Some(WalFecRecoveryFallbackReason::DecodeFailed),
3483 validated_source_symbols: 2,
3484 validated_repair_symbols: 1,
3485 required_symbols: 6,
3486 available_symbols: 3,
3487 recovered_frame_nos: Vec::new(),
3488 corruption_observations: 2,
3489 decode_attempted: true,
3490 decode_succeeded: false,
3491 };
3492
3493 let symbol_state = symbol_state_for_log(&log);
3494 assert!(symbol_state.contains("source_validated=2/6"));
3495 assert!(symbol_state.contains("repair_validated=1"));
3496 assert!(symbol_state.contains("available=3"));
3497 assert!(symbol_state.contains("required=6"));
3498 assert!(symbol_state.contains("decode_attempted=true"));
3499 assert!(symbol_state.contains("decode_succeeded=false"));
3500 }
3501
3502 #[allow(clippy::cast_possible_truncation)]
3504 fn make_source_pages(k: u32, page_size: u32) -> Vec<Vec<u8>> {
3505 let ps = page_size as usize;
3506 (0..k)
3507 .map(|i| {
3508 (0..ps)
3509 .map(|j| ((i as usize * 37 + j * 13 + 7) % 256) as u8)
3510 .collect()
3511 })
3512 .collect()
3513 }
3514
3515 #[allow(clippy::cast_possible_truncation)]
3516 fn make_test_init_with_hashes(k: u32, source_pages: &[Vec<u8>]) -> WalFecGroupMetaInit {
3517 let page_size = source_pages[0].len() as u32;
3518 WalFecGroupMetaInit {
3519 wal_salt1: 0x1234_5678,
3520 wal_salt2: 0xABCD_EF01,
3521 start_frame_no: 1,
3522 end_frame_no: k,
3523 db_size_pages: 100,
3524 page_size,
3525 k_source: k,
3526 r_repair: 2,
3527 oti: Oti {
3528 f: u64::from(k) * u64::from(page_size),
3529 al: 0,
3530 t: page_size,
3531 z: 1,
3532 n: 1,
3533 },
3534 object_id: ObjectId::from_bytes([0xAA; 16]),
3535 page_numbers: (1..=k).collect(),
3536 source_page_xxh3_128: build_source_page_hashes(source_pages),
3537 }
3538 }
3539
3540 #[test]
3541 fn test_raptorq_encode_produces_valid_symbols() {
3542 let k = 4_u32;
3543 let page_size = 4096_u32;
3544 let source_pages = make_source_pages(k, page_size);
3545 let init = make_test_init_with_hashes(k, &source_pages);
3546 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3547
3548 let symbols =
3549 generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3550 .expect("encode should succeed")
3551 .expect("should not be cancelled");
3552
3553 assert_eq!(symbols.len(), 2, "expected r_repair=2 repair symbols");
3554 for (i, sym) in symbols.iter().enumerate() {
3555 assert_eq!(
3556 sym.symbol_data.len(),
3557 page_size as usize,
3558 "repair symbol {i} size"
3559 );
3560 let expected_esi = k + u32::try_from(i).expect("i fits u32");
3561 assert_eq!(sym.esi, expected_esi, "repair symbol {i} ESI");
3562 }
3563 }
3564
3565 #[test]
3566 fn test_raptorq_encode_decode_roundtrip_all_source() {
3567 let k = 4_u32;
3571 let page_size = 512_u32;
3572 let source_pages = make_source_pages(k, page_size);
3573 let mut init = make_test_init_with_hashes(k, &source_pages);
3574 init.r_repair = 8;
3575 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3576
3577 let repair_symbols =
3578 generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3579 .expect("encode")
3580 .expect("not cancelled");
3581
3582 let mut all_symbols: Vec<(u32, Vec<u8>)> = source_pages
3584 .iter()
3585 .enumerate()
3586 .map(|(i, page)| (u32::try_from(i).expect("i fits u32"), page.clone()))
3587 .collect();
3588 for sym in &repair_symbols {
3589 all_symbols.push((sym.esi, sym.symbol_data.clone()));
3590 }
3591
3592 let decoded = wal_fec_raptorq_decode(&meta, &all_symbols)
3593 .expect("decode with all symbols should succeed");
3594
3595 for (i, original) in source_pages.iter().enumerate() {
3596 assert_eq!(&decoded[i], original, "decoded source page {i} mismatch");
3597 }
3598 }
3599
3600 #[test]
3601 fn test_raptorq_encode_decode_roundtrip_with_corruption() {
3602 let k = 4_u32;
3605 let page_size = 512_u32;
3606 let r_repair = 8_u32; let source_pages = make_source_pages(k, page_size);
3608 let mut init = make_test_init_with_hashes(k, &source_pages);
3609 init.r_repair = r_repair;
3610 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3611
3612 let repair_symbols =
3613 generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3614 .expect("encode")
3615 .expect("not cancelled");
3616
3617 let mut available_symbols: Vec<(u32, Vec<u8>)> = Vec::new();
3619 for (i, page) in source_pages.iter().enumerate() {
3620 if i != 1 {
3621 available_symbols.push((u32::try_from(i).expect("i fits u32"), page.clone()));
3622 }
3623 }
3624 for sym in &repair_symbols {
3625 available_symbols.push((sym.esi, sym.symbol_data.clone()));
3626 }
3627
3628 let decoded = wal_fec_raptorq_decode(&meta, &available_symbols)
3629 .expect("decode should recover missing page");
3630
3631 for (i, original) in source_pages.iter().enumerate() {
3632 assert_eq!(
3633 &decoded[i], original,
3634 "decoded source page {i} mismatch (page 1 was lost)"
3635 );
3636 }
3637 }
3638
3639 #[test]
3640 fn test_raptorq_encode_deterministic() {
3641 let k = 3_u32;
3643 let page_size = 512_u32;
3644 let source_pages = make_source_pages(k, page_size);
3645 let init = make_test_init_with_hashes(k, &source_pages);
3646 let meta = WalFecGroupMeta::from_init(init).expect("from_init");
3647
3648 let symbols1 =
3649 generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3650 .expect("encode 1")
3651 .expect("not cancelled");
3652
3653 let symbols2 =
3654 generate_wal_fec_repair_symbols_inner(&meta, &source_pages, None, None, Duration::ZERO)
3655 .expect("encode 2")
3656 .expect("not cancelled");
3657
3658 assert_eq!(symbols1.len(), symbols2.len());
3659 for (i, (s1, s2)) in symbols1.iter().zip(symbols2.iter()).enumerate() {
3660 assert_eq!(
3661 s1.symbol_data, s2.symbol_data,
3662 "repair symbol {i} not deterministic"
3663 );
3664 }
3665 }
3666
3667 #[test]
3668 fn test_raptorq_telemetry_records_metrics_and_events() {
3669 let _guard = telemetry_test_lock();
3670 reset_raptorq_repair_telemetry();
3671
3672 let group_id = WalFecGroupId {
3673 wal_salt1: 0xAA11_BB22,
3674 wal_salt2: 0xCC33_DD44,
3675 end_frame_no: 42,
3676 };
3677 let log = WalFecRecoveryLog {
3678 group_id,
3679 recovery_enabled: true,
3680 outcome_is_recovered: true,
3681 fallback_reason: None,
3682 validated_source_symbols: 4,
3683 validated_repair_symbols: 2,
3684 required_symbols: 6,
3685 available_symbols: 6,
3686 recovered_frame_nos: vec![40, 41],
3687 corruption_observations: 1,
3688 decode_attempted: true,
3689 decode_succeeded: true,
3690 };
3691 record_raptorq_recovery_log(&log, Duration::from_micros(75));
3692
3693 let metrics = raptorq_repair_metrics_snapshot();
3694 assert_eq!(metrics.repairs_total, 1);
3695 assert_eq!(metrics.repairs_failed, 0);
3696 assert_eq!(metrics.symbols_reclaimed, 2);
3697 assert_eq!(metrics.budget_utilization_pct, 100);
3698 assert_eq!(metrics.severity_histogram.two_to_five, 1);
3699 assert_eq!(metrics.wal_health_score, 81);
3700
3701 let events = raptorq_repair_events_snapshot(10);
3702 assert_eq!(events.len(), 1);
3703 let event = &events[0];
3704 assert_eq!(event.frame_id, 42);
3705 assert_eq!(event.symbols_lost, 2);
3706 assert_eq!(event.symbols_used, 6);
3707 assert!(event.repair_success);
3708 assert_eq!(event.severity_bucket, WalFecRepairSeverityBucket::TwoToFive);
3709 assert_eq!(event.budget_utilization_pct, 100);
3710 }
3711
3712 #[test]
3713 fn test_raptorq_telemetry_health_penalizes_failures() {
3714 let _guard = telemetry_test_lock();
3715 reset_raptorq_repair_telemetry();
3716
3717 let success_group = WalFecGroupId {
3718 wal_salt1: 1,
3719 wal_salt2: 2,
3720 end_frame_no: 10,
3721 };
3722 let success = WalFecRecoveryLog {
3723 group_id: success_group,
3724 recovery_enabled: true,
3725 outcome_is_recovered: true,
3726 fallback_reason: None,
3727 validated_source_symbols: 5,
3728 validated_repair_symbols: 1,
3729 required_symbols: 6,
3730 available_symbols: 6,
3731 recovered_frame_nos: vec![9],
3732 corruption_observations: 0,
3733 decode_attempted: true,
3734 decode_succeeded: true,
3735 };
3736 record_raptorq_recovery_log(&success, Duration::from_micros(50));
3737
3738 let failed_group = WalFecGroupId {
3739 wal_salt1: 3,
3740 wal_salt2: 4,
3741 end_frame_no: 20,
3742 };
3743 let failed = WalFecRecoveryLog {
3744 group_id: failed_group,
3745 recovery_enabled: true,
3746 outcome_is_recovered: false,
3747 fallback_reason: Some(WalFecRecoveryFallbackReason::InsufficientSymbols),
3748 validated_source_symbols: 2,
3749 validated_repair_symbols: 2,
3750 required_symbols: 6,
3751 available_symbols: 4,
3752 recovered_frame_nos: Vec::new(),
3753 corruption_observations: 2,
3754 decode_attempted: true,
3755 decode_succeeded: false,
3756 };
3757 record_raptorq_recovery_log(&failed, Duration::from_micros(90));
3758 record_raptorq_recovery_log(&failed, Duration::from_micros(120));
3759
3760 let metrics = raptorq_repair_metrics_snapshot();
3761 assert_eq!(metrics.repairs_total, 3);
3762 assert_eq!(metrics.repairs_failed, 2);
3763 assert!(metrics.wal_health_score < 100);
3764 assert_eq!(metrics.severity_histogram.one, 1);
3765 assert_eq!(metrics.severity_histogram.two_to_five, 2);
3766 }
3767
3768 #[test]
3769 fn test_raptorq_telemetry_histogram_buckets() {
3770 let _guard = telemetry_test_lock();
3771 reset_raptorq_repair_telemetry();
3772
3773 let samples = [
3774 (1_u32, WalFecRepairSeverityBucket::One),
3775 (3_u32, WalFecRepairSeverityBucket::TwoToFive),
3776 (8_u32, WalFecRepairSeverityBucket::SixToTen),
3777 (12_u32, WalFecRepairSeverityBucket::ElevenPlus),
3778 ];
3779
3780 for (idx, (loss, expected_bucket)) in samples.iter().enumerate() {
3781 let group_id = WalFecGroupId {
3782 wal_salt1: 10 + u32::try_from(idx).expect("small index"),
3783 wal_salt2: 20 + u32::try_from(idx).expect("small index"),
3784 end_frame_no: 30 + u32::try_from(idx).expect("small index"),
3785 };
3786 let log = WalFecRecoveryLog {
3787 group_id,
3788 recovery_enabled: true,
3789 outcome_is_recovered: true,
3790 fallback_reason: None,
3791 validated_source_symbols: 20_u32.saturating_sub(*loss),
3792 validated_repair_symbols: *loss,
3793 required_symbols: 20,
3794 available_symbols: 20,
3795 recovered_frame_nos: vec![group_id.end_frame_no],
3796 corruption_observations: 0,
3797 decode_attempted: true,
3798 decode_succeeded: true,
3799 };
3800 record_raptorq_recovery_log(&log, Duration::from_micros(30));
3801
3802 let events = raptorq_repair_events_snapshot(1);
3803 let event = events
3804 .last()
3805 .expect("latest event must be present after recording");
3806 assert_eq!(event.severity_bucket, *expected_bucket);
3807 }
3808
3809 let metrics = raptorq_repair_metrics_snapshot();
3810 assert_eq!(metrics.severity_histogram.one, 1);
3811 assert_eq!(metrics.severity_histogram.two_to_five, 1);
3812 assert_eq!(metrics.severity_histogram.six_to_ten, 1);
3813 assert_eq!(metrics.severity_histogram.eleven_plus, 1);
3814 }
3815
3816 #[test]
3817 fn test_raptorq_repair_evidence_chain_and_capacity() {
3818 let _guard = telemetry_test_lock();
3819 reset_raptorq_repair_telemetry();
3820
3821 let first_group = WalFecGroupId {
3822 wal_salt1: 0x0A0A_0B0B,
3823 wal_salt2: 0x0C0C_0D0D,
3824 end_frame_no: 101,
3825 };
3826 let first_log = WalFecRecoveryLog {
3827 group_id: first_group,
3828 recovery_enabled: true,
3829 outcome_is_recovered: true,
3830 fallback_reason: None,
3831 validated_source_symbols: 5,
3832 validated_repair_symbols: 1,
3833 required_symbols: 6,
3834 available_symbols: 6,
3835 recovered_frame_nos: vec![100, 101],
3836 corruption_observations: 1,
3837 decode_attempted: true,
3838 decode_succeeded: true,
3839 };
3840 record_raptorq_recovery_log(&first_log, Duration::from_micros(10));
3841
3842 let second_group = WalFecGroupId {
3843 wal_salt1: 0x1A1A_1B1B,
3844 wal_salt2: 0x1C1C_1D1D,
3845 end_frame_no: 202,
3846 };
3847 let second_log = WalFecRecoveryLog {
3848 group_id: second_group,
3849 recovery_enabled: true,
3850 outcome_is_recovered: false,
3851 fallback_reason: Some(WalFecRecoveryFallbackReason::InsufficientSymbols),
3852 validated_source_symbols: 2,
3853 validated_repair_symbols: 2,
3854 required_symbols: 6,
3855 available_symbols: 4,
3856 recovered_frame_nos: Vec::new(),
3857 corruption_observations: 2,
3858 decode_attempted: true,
3859 decode_succeeded: false,
3860 };
3861 record_raptorq_recovery_log(&second_log, Duration::from_micros(25));
3862
3863 let cards = raptorq_repair_evidence_snapshot(0);
3864 assert_eq!(cards.len(), 2);
3865 assert_eq!(cards[0].ledger_epoch, 1);
3866 assert_eq!(cards[1].ledger_epoch, 2);
3867 assert_ne!(cards[0].chain_hash, cards[1].chain_hash);
3868 assert!(cards[0].monotonic_timestamp_ns > 0);
3869 assert!(cards[0].wall_clock_unix_ns > 0);
3870 assert_eq!(cards[0].frame_id, first_group.end_frame_no);
3871 assert_eq!(cards[1].frame_id, second_group.end_frame_no);
3872 assert_eq!(cards[0].confidence_per_mille, 1_000);
3873 assert_eq!(cards[1].confidence_per_mille, 666);
3874 assert_ne!(cards[0].witness.corrupted_hash_blake3, [0_u8; 32]);
3875 }
3876
3877 #[test]
3878 fn test_raptorq_repair_evidence_query_filters() {
3879 let _guard = telemetry_test_lock();
3880 reset_raptorq_repair_telemetry();
3881
3882 let logs = [
3883 WalFecRecoveryLog {
3884 group_id: WalFecGroupId {
3885 wal_salt1: 11,
3886 wal_salt2: 12,
3887 end_frame_no: 301,
3888 },
3889 recovery_enabled: true,
3890 outcome_is_recovered: true,
3891 fallback_reason: None,
3892 validated_source_symbols: 9,
3893 validated_repair_symbols: 1,
3894 required_symbols: 10,
3895 available_symbols: 10,
3896 recovered_frame_nos: vec![301],
3897 corruption_observations: 0,
3898 decode_attempted: true,
3899 decode_succeeded: true,
3900 },
3901 WalFecRecoveryLog {
3902 group_id: WalFecGroupId {
3903 wal_salt1: 21,
3904 wal_salt2: 22,
3905 end_frame_no: 302,
3906 },
3907 recovery_enabled: true,
3908 outcome_is_recovered: true,
3909 fallback_reason: None,
3910 validated_source_symbols: 7,
3911 validated_repair_symbols: 3,
3912 required_symbols: 10,
3913 available_symbols: 10,
3914 recovered_frame_nos: vec![302],
3915 corruption_observations: 1,
3916 decode_attempted: true,
3917 decode_succeeded: true,
3918 },
3919 WalFecRecoveryLog {
3920 group_id: WalFecGroupId {
3921 wal_salt1: 31,
3922 wal_salt2: 32,
3923 end_frame_no: 303,
3924 },
3925 recovery_enabled: true,
3926 outcome_is_recovered: true,
3927 fallback_reason: None,
3928 validated_source_symbols: 1,
3929 validated_repair_symbols: 9,
3930 required_symbols: 10,
3931 available_symbols: 10,
3932 recovered_frame_nos: vec![303],
3933 corruption_observations: 3,
3934 decode_attempted: true,
3935 decode_succeeded: true,
3936 },
3937 ];
3938 for log in &logs {
3939 record_raptorq_recovery_log(log, Duration::from_micros(15));
3940 }
3941
3942 let cards = raptorq_repair_evidence_snapshot(0);
3943 assert_eq!(cards.len(), 3);
3944
3945 let by_frame = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3946 frame_id: Some(302),
3947 ..WalFecRepairEvidenceQuery::default()
3948 });
3949 assert_eq!(by_frame.len(), 1);
3950 assert_eq!(by_frame[0].frame_id, 302);
3951
3952 let by_severity = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3953 severity_bucket: Some(WalFecRepairSeverityBucket::SixToTen),
3954 ..WalFecRepairEvidenceQuery::default()
3955 });
3956 assert_eq!(by_severity.len(), 1);
3957 assert_eq!(by_severity[0].frame_id, 303);
3958
3959 let min_time = cards
3960 .first()
3961 .map(|card| card.wall_clock_unix_ns)
3962 .expect("evidence cards should include timestamps");
3963 let max_time = cards
3964 .last()
3965 .map(|card| card.wall_clock_unix_ns)
3966 .expect("evidence cards should include timestamps");
3967 let by_time = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3968 wall_clock_start_ns: Some(min_time),
3969 wall_clock_end_ns: Some(max_time),
3970 ..WalFecRepairEvidenceQuery::default()
3971 });
3972 assert_eq!(by_time.len(), 3);
3973
3974 let limited = query_raptorq_repair_evidence(&WalFecRepairEvidenceQuery {
3975 limit: Some(2),
3976 ..WalFecRepairEvidenceQuery::default()
3977 });
3978 assert_eq!(limited.len(), 2);
3979 assert_eq!(limited[0].frame_id, 302);
3980 assert_eq!(limited[1].frame_id, 303);
3981 }
3982
3983 #[test]
3986 fn test_wal_fec_path_for_wal_dash_suffix() {
3987 let path = wal_fec_path_for_wal(Path::new("/tmp/test.db-wal"));
3988 assert_eq!(path, PathBuf::from("/tmp/test.db-wal-fec"));
3989 }
3990
3991 #[test]
3992 fn test_wal_fec_path_for_wal_dot_suffix() {
3993 let path = wal_fec_path_for_wal(Path::new("/tmp/test.wal"));
3994 assert_eq!(path, PathBuf::from("/tmp/test.wal-fec"));
3995 }
3996
3997 #[test]
3998 fn test_wal_fec_path_for_wal_no_wal_suffix() {
3999 let path = wal_fec_path_for_wal(Path::new("/tmp/test.db"));
4000 assert_eq!(path, PathBuf::from("/tmp/test.db.wal-fec"));
4001 }
4002
4003 #[test]
4004 fn test_build_source_page_hashes_deterministic() {
4005 let pages = vec![vec![0xAA; 4096], vec![0xBB; 4096]];
4006 let hashes1 = build_source_page_hashes(&pages);
4007 let hashes2 = build_source_page_hashes(&pages);
4008 assert_eq!(hashes1, hashes2);
4009 assert_eq!(hashes1.len(), 2);
4010 }
4011
4012 #[test]
4013 fn test_build_source_page_hashes_empty() {
4014 let hashes = build_source_page_hashes(&[]);
4015 assert!(hashes.is_empty());
4016 }
4017
4018 #[test]
4019 fn test_build_source_page_hashes_distinct_pages_produce_distinct_hashes() {
4020 let pages = vec![vec![0x00; 4096], vec![0xFF; 4096]];
4021 let hashes = build_source_page_hashes(&pages);
4022 assert_ne!(hashes[0], hashes[1]);
4023 }
4024
4025 #[test]
4026 fn test_verify_salt_binding_match() {
4027 let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4028 let salts = WalSalts {
4029 salt1: 0x1234_5678,
4030 salt2: 0xABCD_EF01,
4031 };
4032 assert!(meta.verify_salt_binding(salts).is_ok());
4033 }
4034
4035 #[test]
4036 fn test_verify_salt_binding_mismatch_salt1() {
4037 let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4038 let salts = WalSalts {
4039 salt1: 0xDEAD_BEEF,
4040 salt2: 0xABCD_EF01,
4041 };
4042 let err = meta.verify_salt_binding(salts);
4043 assert!(err.is_err());
4044 assert!(err.unwrap_err().to_string().contains("salt mismatch"));
4045 }
4046
4047 #[test]
4048 fn test_verify_salt_binding_mismatch_salt2() {
4049 let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4050 let salts = WalSalts {
4051 salt1: 0x1234_5678,
4052 salt2: 0x0000_0000,
4053 };
4054 assert!(meta.verify_salt_binding(salts).is_err());
4055 }
4056
4057 #[test]
4058 fn test_group_record_new_wrong_repair_count() {
4059 let meta = WalFecGroupMeta::from_init(make_test_init(2)).expect("from_init");
4060 let result = WalFecGroupRecord::new(meta, vec![]);
4061 assert!(result.is_err());
4062 assert!(
4063 result
4064 .unwrap_err()
4065 .to_string()
4066 .contains("repair symbol count")
4067 );
4068 }
4069
4070 #[test]
4071 fn test_group_record_new_wrong_object_id() {
4072 let meta = WalFecGroupMeta::from_init(make_test_init(1)).expect("from_init");
4073 let bad_symbol = SymbolRecord {
4074 esi: meta.k_source,
4075 object_id: ObjectId::from_bytes([0xBB; 16]),
4076 oti: meta.oti,
4077 flags: SymbolRecordFlags::empty(),
4078 symbol_data: vec![0; meta.page_size as usize],
4079 frame_xxh3: 0,
4080 auth_tag: [0; 16],
4081 };
4082 let bad_symbol2 = SymbolRecord {
4083 esi: meta.k_source + 1,
4084 object_id: ObjectId::from_bytes([0xBB; 16]),
4085 oti: meta.oti,
4086 flags: SymbolRecordFlags::empty(),
4087 symbol_data: vec![0; meta.page_size as usize],
4088 frame_xxh3: 0,
4089 auth_tag: [0; 16],
4090 };
4091 let result = WalFecGroupRecord::new(meta, vec![bad_symbol, bad_symbol2]);
4092 assert!(result.is_err());
4093 assert!(
4094 result
4095 .unwrap_err()
4096 .to_string()
4097 .contains("object_id mismatch")
4098 );
4099 }
4100
4101 #[test]
4102 fn test_group_record_new_wrong_esi() {
4103 let meta = WalFecGroupMeta::from_init(make_test_init(1)).expect("from_init");
4104 let sym = SymbolRecord {
4105 esi: 999,
4106 object_id: meta.object_id,
4107 oti: meta.oti,
4108 flags: SymbolRecordFlags::empty(),
4109 symbol_data: vec![0; meta.page_size as usize],
4110 frame_xxh3: 0,
4111 auth_tag: [0; 16],
4112 };
4113 let sym2 = SymbolRecord {
4114 esi: 1000,
4115 object_id: meta.object_id,
4116 oti: meta.oti,
4117 flags: SymbolRecordFlags::empty(),
4118 symbol_data: vec![0; meta.page_size as usize],
4119 frame_xxh3: 0,
4120 auth_tag: [0; 16],
4121 };
4122 let result = WalFecGroupRecord::new(meta, vec![sym, sym2]);
4123 assert!(result.is_err());
4124 assert!(result.unwrap_err().to_string().contains("ESI"));
4125 }
4126
4127 #[test]
4128 fn test_group_id_display() {
4129 let id = WalFecGroupId {
4130 wal_salt1: 1,
4131 wal_salt2: 2,
4132 end_frame_no: 42,
4133 };
4134 assert_eq!(format!("{id}"), "(1, 2, 42)");
4135 }
4136
4137 #[test]
4138 fn test_severity_bucket_as_str_roundtrip() {
4139 let buckets = [
4140 WalFecRepairSeverityBucket::One,
4141 WalFecRepairSeverityBucket::TwoToFive,
4142 WalFecRepairSeverityBucket::SixToTen,
4143 WalFecRepairSeverityBucket::ElevenPlus,
4144 ];
4145 for bucket in buckets {
4146 let s = bucket.as_str();
4147 let parsed: WalFecRepairSeverityBucket = s.parse().expect(s);
4148 assert_eq!(parsed, bucket);
4149 }
4150 }
4151
4152 #[test]
4153 fn test_severity_bucket_from_str_aliases() {
4154 assert_eq!(
4155 "one".parse::<WalFecRepairSeverityBucket>().unwrap(),
4156 WalFecRepairSeverityBucket::One
4157 );
4158 assert_eq!(
4159 "two-to-five".parse::<WalFecRepairSeverityBucket>().unwrap(),
4160 WalFecRepairSeverityBucket::TwoToFive
4161 );
4162 assert_eq!(
4163 "six_to_ten".parse::<WalFecRepairSeverityBucket>().unwrap(),
4164 WalFecRepairSeverityBucket::SixToTen
4165 );
4166 assert_eq!(
4167 "eleven_plus".parse::<WalFecRepairSeverityBucket>().unwrap(),
4168 WalFecRepairSeverityBucket::ElevenPlus
4169 );
4170 assert!("unknown".parse::<WalFecRepairSeverityBucket>().is_err());
4171 }
4172
4173 #[test]
4174 fn test_severity_histogram_bump_all_buckets() {
4175 let mut hist = WalFecRepairSeverityHistogram::default();
4176 assert_eq!(hist.one, 0);
4177 hist.bump(WalFecRepairSeverityBucket::One);
4178 hist.bump(WalFecRepairSeverityBucket::One);
4179 hist.bump(WalFecRepairSeverityBucket::TwoToFive);
4180 hist.bump(WalFecRepairSeverityBucket::SixToTen);
4181 hist.bump(WalFecRepairSeverityBucket::ElevenPlus);
4182 hist.bump(WalFecRepairSeverityBucket::ElevenPlus);
4183 assert_eq!(hist.one, 2);
4184 assert_eq!(hist.two_to_five, 1);
4185 assert_eq!(hist.six_to_ten, 1);
4186 assert_eq!(hist.eleven_plus, 2);
4187 }
4188
4189 #[test]
4190 fn test_evidence_card_hex_methods() {
4191 let card = WalFecRepairEvidenceCard {
4192 group_id: WalFecGroupId {
4193 wal_salt1: 1,
4194 wal_salt2: 2,
4195 end_frame_no: 10,
4196 },
4197 frame_id: 10,
4198 wal_file_offset_bytes: None,
4199 monotonic_timestamp_ns: 0,
4200 wall_clock_unix_ns: 0,
4201 corruption_signature_blake3: [0xAB; 32],
4202 bit_error_pattern: None,
4203 repair_source: WalFecRepairSource::WalRepairSymbols,
4204 symbols_used: 2,
4205 validated_source_symbols: 1,
4206 validated_repair_symbols: 1,
4207 required_symbols: 2,
4208 available_symbols: 2,
4209 witness: WalFecRepairWitnessTriple {
4210 corrupted_hash_blake3: [0; 32],
4211 repaired_hash_blake3: [0; 32],
4212 expected_hash_blake3: [0; 32],
4213 },
4214 repair_latency_ns: 0,
4215 confidence_per_mille: 1000,
4216 severity_bucket: WalFecRepairSeverityBucket::One,
4217 ledger_epoch: 0,
4218 chain_hash: [0xFF; 32],
4219 };
4220 assert_eq!(card.chain_hash_hex().len(), 64);
4221 assert!(card.chain_hash_hex().chars().all(|c| c == 'f'));
4222 assert_eq!(card.corruption_signature_hex().len(), 64);
4223 assert!(
4224 card.corruption_signature_hex()
4225 .chars()
4226 .all(|c| c == 'a' || c == 'b')
4227 );
4228 }
4229
4230 #[test]
4231 fn test_repair_source_as_str() {
4232 assert_eq!(
4233 WalFecRepairSource::WalRepairSymbols.as_str(),
4234 "wal_repair_symbols"
4235 );
4236 assert_eq!(
4237 WalFecRepairSource::SnapshotRepairSymbols.as_str(),
4238 "snapshot_repair_symbols"
4239 );
4240 assert_eq!(
4241 WalFecRepairSource::WalAndSnapshotRepairSymbols.as_str(),
4242 "wal_and_snapshot_repair_symbols"
4243 );
4244 }
4245
4246 #[test]
4247 fn test_recovery_log_from_recovered_outcome() {
4248 let group_id = WalFecGroupId {
4249 wal_salt1: 10,
4250 wal_salt2: 20,
4251 end_frame_no: 5,
4252 };
4253 let proof = WalFecDecodeProof {
4254 group_id,
4255 required_symbols: 3,
4256 available_symbols: 4,
4257 validated_source_symbols: 2,
4258 validated_repair_symbols: 2,
4259 corruption_observations: 1,
4260 decode_attempted: true,
4261 decode_succeeded: true,
4262 recovered_frame_nos: vec![3, 4],
4263 fallback_reason: None,
4264 };
4265 let meta = WalFecGroupMeta::from_init(make_test_init(3)).expect("from_init");
4266 let outcome = WalFecRecoveryOutcome::Recovered(WalFecRecoveredGroup {
4267 meta,
4268 recovered_pages: vec![vec![0; 4096]; 3],
4269 recovered_frame_nos: vec![3, 4],
4270 db_size_pages: 100,
4271 decode_proof: proof,
4272 });
4273 let log = recovery_log_from_outcome(group_id, &outcome, true);
4274 assert!(log.outcome_is_recovered);
4275 assert!(log.recovery_enabled);
4276 assert!(log.fallback_reason.is_none());
4277 assert_eq!(log.validated_source_symbols, 2);
4278 assert_eq!(log.validated_repair_symbols, 2);
4279 assert_eq!(log.recovered_frame_nos, vec![3, 4]);
4280 assert!(log.decode_attempted);
4281 assert!(log.decode_succeeded);
4282 }
4283
4284 #[test]
4285 fn test_recovery_log_from_truncate_outcome() {
4286 let group_id = WalFecGroupId {
4287 wal_salt1: 10,
4288 wal_salt2: 20,
4289 end_frame_no: 5,
4290 };
4291 let proof = WalFecDecodeProof {
4292 group_id,
4293 required_symbols: 3,
4294 available_symbols: 1,
4295 validated_source_symbols: 1,
4296 validated_repair_symbols: 0,
4297 corruption_observations: 2,
4298 decode_attempted: false,
4299 decode_succeeded: false,
4300 recovered_frame_nos: vec![],
4301 fallback_reason: Some(WalFecRecoveryFallbackReason::InsufficientSymbols),
4302 };
4303 let outcome = WalFecRecoveryOutcome::TruncateBeforeGroup {
4304 truncate_before_frame_no: 3,
4305 decode_proof: proof,
4306 };
4307 let log = recovery_log_from_outcome(group_id, &outcome, true);
4308 assert!(!log.outcome_is_recovered);
4309 assert_eq!(
4310 log.fallback_reason,
4311 Some(WalFecRecoveryFallbackReason::InsufficientSymbols)
4312 );
4313 assert_eq!(log.available_symbols, 1);
4314 assert!(!log.decode_attempted);
4315 }
4316
4317 #[test]
4318 fn test_recovery_fallback_reason_codes_all_variants() {
4319 let variants = [
4320 (
4321 WalFecRecoveryFallbackReason::MissingSidecarGroup,
4322 "missing_sidecar_group",
4323 ),
4324 (
4325 WalFecRecoveryFallbackReason::SidecarUnreadable,
4326 "sidecar_unreadable",
4327 ),
4328 (WalFecRecoveryFallbackReason::SaltMismatch, "salt_mismatch"),
4329 (
4330 WalFecRecoveryFallbackReason::InsufficientSymbols,
4331 "insufficient_symbols",
4332 ),
4333 (WalFecRecoveryFallbackReason::DecodeFailed, "decode_failed"),
4334 (
4335 WalFecRecoveryFallbackReason::DecodedPayloadMismatch,
4336 "decoded_payload_mismatch",
4337 ),
4338 (
4339 WalFecRecoveryFallbackReason::RecoveryDisabled,
4340 "recovery_disabled",
4341 ),
4342 ];
4343 for (variant, expected_code) in variants {
4344 assert_eq!(variant.reason_code(), expected_code);
4345 }
4346 }
4347
4348 #[test]
4349 fn test_meta_from_init_rejects_zero_start_frame() {
4350 let mut init = make_test_init(2);
4351 init.start_frame_no = 0;
4352 init.k_source = 0;
4353 assert!(WalFecGroupMeta::from_init(init).is_err());
4354 }
4355
4356 #[test]
4357 fn test_meta_from_init_rejects_end_before_start() {
4358 let mut init = make_test_init(3);
4359 init.start_frame_no = 5;
4360 init.end_frame_no = 3;
4361 assert!(WalFecGroupMeta::from_init(init).is_err());
4362 }
4363
4364 #[test]
4365 fn test_meta_from_init_rejects_zero_r_repair() {
4366 let mut init = make_test_init(2);
4367 init.r_repair = 0;
4368 assert!(WalFecGroupMeta::from_init(init).is_err());
4369 }
4370
4371 #[test]
4372 fn test_meta_from_init_rejects_zero_db_size() {
4373 let mut init = make_test_init(2);
4374 init.db_size_pages = 0;
4375 assert!(WalFecGroupMeta::from_init(init).is_err());
4376 }
4377
4378 #[test]
4379 fn test_recovery_config_default_enabled() {
4380 let config = WalFecRecoveryConfig::default();
4381 assert!(config.recovery_enabled);
4382 }
4383
4384 #[test]
4385 fn test_repair_pipeline_config_default() {
4386 let config = WalFecRepairPipelineConfig::default();
4387 assert_eq!(config.queue_capacity, 64);
4388 assert_eq!(config.per_symbol_delay, Duration::ZERO);
4389 }
4390
4391 #[test]
4392 fn wal_fec_group_id_display_hash_eq() {
4393 use std::collections::HashSet;
4394 let a = WalFecGroupId {
4395 wal_salt1: 1,
4396 wal_salt2: 2,
4397 end_frame_no: 10,
4398 };
4399 let b = WalFecGroupId {
4400 wal_salt1: 1,
4401 wal_salt2: 2,
4402 end_frame_no: 11,
4403 };
4404 assert_ne!(a, b);
4405 let copied = a;
4406 assert_eq!(copied, a);
4407 let display = format!("{a}");
4408 assert_eq!(display, "(1, 2, 10)");
4409 let mut set = HashSet::new();
4410 set.insert(a);
4411 set.insert(b);
4412 assert_eq!(set.len(), 2);
4413 }
4414
4415 #[test]
4416 fn wal_fec_recovery_fallback_reason_all_variants() {
4417 let variants = [
4418 (
4419 WalFecRecoveryFallbackReason::MissingSidecarGroup,
4420 "missing_sidecar_group",
4421 ),
4422 (
4423 WalFecRecoveryFallbackReason::SidecarUnreadable,
4424 "sidecar_unreadable",
4425 ),
4426 (WalFecRecoveryFallbackReason::SaltMismatch, "salt_mismatch"),
4427 (
4428 WalFecRecoveryFallbackReason::InsufficientSymbols,
4429 "insufficient_symbols",
4430 ),
4431 (WalFecRecoveryFallbackReason::DecodeFailed, "decode_failed"),
4432 (
4433 WalFecRecoveryFallbackReason::DecodedPayloadMismatch,
4434 "decoded_payload_mismatch",
4435 ),
4436 (
4437 WalFecRecoveryFallbackReason::RecoveryDisabled,
4438 "recovery_disabled",
4439 ),
4440 ];
4441 for (v, code) in &variants {
4442 assert_eq!(v.reason_code(), *code);
4443 let copied = *v;
4444 assert_eq!(copied, *v);
4445 }
4446 let dbg = format!("{:?}", WalFecRecoveryFallbackReason::SaltMismatch);
4447 assert!(dbg.contains("SaltMismatch"));
4448 }
4449
4450 #[test]
4451 fn wal_fec_repair_severity_bucket_as_str_and_from_str() {
4452 assert_eq!(WalFecRepairSeverityBucket::One.as_str(), "1");
4453 assert_eq!(WalFecRepairSeverityBucket::TwoToFive.as_str(), "2-5");
4454 assert_eq!(WalFecRepairSeverityBucket::SixToTen.as_str(), "6-10");
4455 assert_eq!(WalFecRepairSeverityBucket::ElevenPlus.as_str(), "11+");
4456 assert_eq!(
4457 "1".parse::<WalFecRepairSeverityBucket>().unwrap(),
4458 WalFecRepairSeverityBucket::One
4459 );
4460 assert_eq!(
4461 "two-to-five".parse::<WalFecRepairSeverityBucket>().unwrap(),
4462 WalFecRepairSeverityBucket::TwoToFive
4463 );
4464 assert!("invalid".parse::<WalFecRepairSeverityBucket>().is_err());
4465 }
4466
4467 #[test]
4468 fn wal_fec_repair_source_all_variants_debug_copy_eq() {
4469 let variants = [
4470 WalFecRepairSource::WalRepairSymbols,
4471 WalFecRepairSource::SnapshotRepairSymbols,
4472 WalFecRepairSource::WalAndSnapshotRepairSymbols,
4473 ];
4474 for (i, v) in variants.iter().enumerate() {
4475 let copied = *v;
4476 assert_eq!(copied, *v);
4477 for (j, w) in variants.iter().enumerate() {
4478 assert_eq!(i == j, v == w);
4479 }
4480 }
4481 let dbg = format!("{:?}", WalFecRepairSource::SnapshotRepairSymbols);
4482 assert!(dbg.contains("SnapshotRepairSymbols"));
4483 }
4484
4485 #[test]
4486 fn wal_fec_repair_pipeline_config_default_and_copy() {
4487 let cfg = WalFecRepairPipelineConfig::default();
4488 assert_eq!(cfg.queue_capacity, 64);
4489 assert_eq!(cfg.per_symbol_delay, Duration::ZERO);
4490 let copied = cfg;
4491 assert_eq!(copied, cfg);
4492 let dbg = format!("{cfg:?}");
4493 assert!(dbg.contains("WalFecRepairPipelineConfig"));
4494 }
4495
4496 #[test]
4497 fn wal_fec_repair_pipeline_stats_default_all_zero() {
4498 let stats = WalFecRepairPipelineStats::default();
4499 assert_eq!(stats.pending_jobs, 0);
4500 assert_eq!(stats.completed_jobs, 0);
4501 assert_eq!(stats.failed_jobs, 0);
4502 assert_eq!(stats.canceled_jobs, 0);
4503 assert_eq!(stats.max_pending_jobs, 0);
4504 let copied = stats;
4505 assert_eq!(copied, stats);
4506 }
4507
4508 #[test]
4509 fn wal_frame_candidate_clone_eq_debug() {
4510 let c = WalFrameCandidate {
4511 frame_no: 7,
4512 page_data: vec![0xAB; 16],
4513 };
4514 let cloned = c.clone();
4515 assert_eq!(cloned, c);
4516 assert_eq!(c.frame_no, 7);
4517 assert_eq!(c.page_data.len(), 16);
4518 let dbg = format!("{c:?}");
4519 assert!(dbg.contains("WalFrameCandidate"));
4520 }
4521
4522 #[test]
4523 fn wal_fec_decode_proof_clone_eq_debug() {
4524 let proof = WalFecDecodeProof {
4525 group_id: WalFecGroupId {
4526 wal_salt1: 1,
4527 wal_salt2: 2,
4528 end_frame_no: 4,
4529 },
4530 required_symbols: 4,
4531 available_symbols: 6,
4532 validated_source_symbols: 4,
4533 validated_repair_symbols: 2,
4534 corruption_observations: 0,
4535 decode_attempted: true,
4536 decode_succeeded: true,
4537 recovered_frame_nos: vec![1, 2, 3, 4],
4538 fallback_reason: None,
4539 };
4540 let cloned = proof.clone();
4541 assert_eq!(cloned, proof);
4542 assert!(proof.decode_succeeded);
4543 assert_eq!(proof.recovered_frame_nos.len(), 4);
4544 let dbg = format!("{proof:?}");
4545 assert!(dbg.contains("WalFecDecodeProof"));
4546 }
4547}