1use std::collections::{BTreeMap, BTreeSet, VecDeque};
7use std::error::Error;
8use std::fmt;
9
10use fsqlite_types::ObjectId;
11use tracing::{debug, error, info, warn};
12
13pub const PERMEATION_BEAD_ID: &str = "bd-1hi.27";
15pub const PERMEATION_LOGGING_STANDARD: &str = "bd-1fpm";
17
18const IBLT_HASH_COUNT: usize = 3;
19const IBLT_HASH_SEEDS: [u64; IBLT_HASH_COUNT] = [
20 0x9E37_79B9_7F4A_7C15,
21 0xC2B2_AE3D_27D4_EB4F,
22 0x1656_67B1_9E37_79F9,
23];
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
27pub enum Plane {
28 Durability,
30 Concurrency,
32 Replication,
34 Observability,
36}
37
38impl fmt::Display for Plane {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 Self::Durability => f.write_str("Durability"),
42 Self::Concurrency => f.write_str("Concurrency"),
43 Self::Replication => f.write_str("Replication"),
44 Self::Observability => f.write_str("Observability"),
45 }
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct PermeationEntry {
52 pub subsystem: &'static str,
54 pub object_type: &'static str,
56 pub symbol_size_policy: &'static str,
58 pub repair_story: &'static str,
60 pub plane: Plane,
62}
63
64pub static V1_REQUIRED_SUBSYSTEMS: &[&str] = &[
66 "Commits/CapsuleProof",
67 "Commits/MarkerStream",
68 "Checkpoints",
69 "Indices",
70 "Page storage",
71 "MVCC page history",
72 "Conflict reduction",
73 "SSI witness plane",
74 "Symbol streaming",
75 "Anti-entropy",
76 "Bootstrap",
77 "Multipath",
78 "Repair auditing",
79 "Schedule exploration",
80 "Invariant monitoring",
81 "Model checking",
82];
83
84pub static PERMEATION_MAP: &[PermeationEntry] = &[
86 PermeationEntry {
87 subsystem: "Commits/CapsuleProof",
88 object_type: "CommitCapsule+CommitProof",
89 symbol_size_policy: "T=min(page_size,4096), R=20%",
90 repair_story: "decode from surviving symbols",
91 plane: Plane::Durability,
92 },
93 PermeationEntry {
94 subsystem: "Commits/MarkerStream",
95 object_type: "CommitMarkerRecord",
96 symbol_size_policy: "fixed:88B record stream (no fountain)",
97 repair_story: "torn-tail ignore + record_xxh3 + hash-chain audit",
98 plane: Plane::Durability,
99 },
100 PermeationEntry {
101 subsystem: "Checkpoints",
102 object_type: "CheckpointChunk",
103 symbol_size_policy: "T=1024-4096B, R=policy-driven",
104 repair_story: "chunked snapshot objects; rebuild from marker stream if lost",
105 plane: Plane::Durability,
106 },
107 PermeationEntry {
108 subsystem: "Indices",
109 object_type: "IndexSegment",
110 symbol_size_policy: "T=1280-4096B, R=20%",
111 repair_story: "decode or rebuild-from-marker-scan",
112 plane: Plane::Durability,
113 },
114 PermeationEntry {
115 subsystem: "Page storage",
116 object_type: "PageHistory",
117 symbol_size_policy: "T=page_size, R=per-group",
118 repair_story: "decode from group symbols; on-the-fly repair on read",
119 plane: Plane::Durability,
120 },
121 PermeationEntry {
122 subsystem: "MVCC page history",
123 object_type: "PageHistoryPatchChain",
124 symbol_size_policy: "T=page_size, R=per-group",
125 repair_story: "bounded by GC horizon; repair through patch replay",
126 plane: Plane::Concurrency,
127 },
128 PermeationEntry {
129 subsystem: "Conflict reduction",
130 object_type: "IntentLog",
131 symbol_size_policy: "T=256-1024B, R=policy-driven",
132 repair_story: "replayed deterministically for rebase merge",
133 plane: Plane::Concurrency,
134 },
135 PermeationEntry {
136 subsystem: "SSI witness plane",
137 object_type: "ReadWitness+WriteWitness+WitnessIndexSegment+DependencyEdge+CommitProof",
138 symbol_size_policy: "T=1280-4096B, R=policy-driven",
139 repair_story: "decode witness stream and rebuild serialization graph",
140 plane: Plane::Concurrency,
141 },
142 PermeationEntry {
143 subsystem: "Symbol streaming",
144 object_type: "SymbolSink/SymbolStream",
145 symbol_size_policy: "T=1280-4096B, R=transport-policy",
146 repair_story: "symbol-native transport; recover with any K symbols",
147 plane: Plane::Replication,
148 },
149 PermeationEntry {
150 subsystem: "Anti-entropy",
151 object_type: "ObjectIdSetIBLT",
152 symbol_size_policy: "fixed:16B object-id atoms (IBLT), R=0%",
153 repair_story: "peel IBLT; fallback to segment hash scan on overflow",
154 plane: Plane::Replication,
155 },
156 PermeationEntry {
157 subsystem: "Bootstrap",
158 object_type: "CheckpointChunk",
159 symbol_size_policy: "T=1024-4096B, R=policy-driven",
160 repair_story: "late join by collecting K checkpoint symbols",
161 plane: Plane::Replication,
162 },
163 PermeationEntry {
164 subsystem: "Multipath",
165 object_type: "MultipathAggregator",
166 symbol_size_policy: "T=1280-4096B, R=transport-policy",
167 repair_story: "any K symbols from any path reconstructs object",
168 plane: Plane::Replication,
169 },
170 PermeationEntry {
171 subsystem: "Repair auditing",
172 object_type: "DecodeProof",
173 symbol_size_policy: "T=1024-4096B, R=0%",
174 repair_story: "attach decode proof artifacts to deterministic traces",
175 plane: Plane::Observability,
176 },
177 PermeationEntry {
178 subsystem: "Schedule exploration",
179 object_type: "LabRuntimeTrace",
180 symbol_size_policy: "T=1024-4096B, R=0%",
181 repair_story: "deterministic replay from seed and event stream",
182 plane: Plane::Observability,
183 },
184 PermeationEntry {
185 subsystem: "Invariant monitoring",
186 object_type: "EProcessMonitorEvent",
187 symbol_size_policy: "T=256-1024B, R=0%",
188 repair_story: "stream invariant events and enforce corruption budgets",
189 plane: Plane::Observability,
190 },
191 PermeationEntry {
192 subsystem: "Model checking",
193 object_type: "TlaExportTrace",
194 symbol_size_policy: "T=1024-4096B, R=0%",
195 repair_story: "export traces for bounded TLA+ model checking",
196 plane: Plane::Observability,
197 },
198];
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202pub struct ParsedSymbolPolicy {
203 pub symbol_size: SymbolSizePolicy,
205 pub redundancy: RedundancyPolicy,
207 pub fountain_coded: bool,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213pub enum SymbolSizePolicy {
214 MinPageSize { cap_bytes: u32 },
216 PageSize,
218 RangeBytes { min_bytes: u32, max_bytes: u32 },
220 FixedBytes(u32),
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum RedundancyPolicy {
227 PercentBps(u16),
229 PolicyDriven,
231 PerGroup,
233 TransportPolicy,
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct ResolvedSymbolPolicy {
240 pub symbol_size_bytes: u32,
242 pub redundancy_bps: u16,
244 pub fountain_coded: bool,
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub struct PolicyResolutionDefaults {
251 pub policy_driven_bps: u16,
253 pub per_group_bps: u16,
255 pub transport_policy_bps: u16,
257}
258
259impl Default for PolicyResolutionDefaults {
260 fn default() -> Self {
261 Self {
262 policy_driven_bps: 2_000,
263 per_group_bps: 2_000,
264 transport_policy_bps: 1_500,
265 }
266 }
267}
268
269impl ParsedSymbolPolicy {
270 #[must_use]
272 pub fn resolve(
273 self,
274 page_size: u32,
275 defaults: PolicyResolutionDefaults,
276 ) -> ResolvedSymbolPolicy {
277 let symbol_size_bytes = match self.symbol_size {
278 SymbolSizePolicy::MinPageSize { cap_bytes } => page_size.min(cap_bytes),
279 SymbolSizePolicy::PageSize => page_size,
280 SymbolSizePolicy::RangeBytes {
281 min_bytes,
282 max_bytes,
283 } => page_size.clamp(min_bytes, max_bytes),
284 SymbolSizePolicy::FixedBytes(bytes) => bytes,
285 };
286
287 let redundancy_bps = match self.redundancy {
288 RedundancyPolicy::PercentBps(bps) => bps,
289 RedundancyPolicy::PolicyDriven => defaults.policy_driven_bps,
290 RedundancyPolicy::PerGroup => defaults.per_group_bps,
291 RedundancyPolicy::TransportPolicy => defaults.transport_policy_bps,
292 };
293
294 ResolvedSymbolPolicy {
295 symbol_size_bytes,
296 redundancy_bps,
297 fountain_coded: self.fountain_coded,
298 }
299 }
300}
301
302#[derive(Debug, Clone, PartialEq, Eq)]
304pub struct SymbolPolicyParseError {
305 detail: String,
306}
307
308impl SymbolPolicyParseError {
309 fn new(detail: impl Into<String>) -> Self {
310 Self {
311 detail: detail.into(),
312 }
313 }
314}
315
316impl fmt::Display for SymbolPolicyParseError {
317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318 f.write_str(&self.detail)
319 }
320}
321
322impl Error for SymbolPolicyParseError {}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq)]
326pub enum AuditFailureKind {
327 MissingEntry,
329 DuplicateSubsystemInPlane,
331 EmptyField,
333 InvalidSymbolPolicy,
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
339pub struct AuditFailure {
340 pub kind: AuditFailureKind,
342 pub subsystem: String,
344 pub plane: Option<Plane>,
346 pub detail: String,
348}
349
350pub fn parse_symbol_policy(raw: &str) -> Result<ParsedSymbolPolicy, SymbolPolicyParseError> {
357 if let Some((bytes, redundancy)) = parse_fixed_policy(raw) {
358 let redundancy = parse_redundancy_policy(redundancy)?;
359 return Ok(ParsedSymbolPolicy {
360 symbol_size: SymbolSizePolicy::FixedBytes(bytes),
361 redundancy,
362 fountain_coded: false,
363 });
364 }
365
366 let (symbol_raw, redundancy_raw) = raw.split_once(", R=").ok_or_else(|| {
367 SymbolPolicyParseError::new(format!("policy missing ', R=' clause: {raw}"))
368 })?;
369
370 let symbol_size = parse_symbol_size_policy(symbol_raw.trim())?;
371 let redundancy = parse_redundancy_policy(redundancy_raw.trim())?;
372 Ok(ParsedSymbolPolicy {
373 symbol_size,
374 redundancy,
375 fountain_coded: true,
376 })
377}
378
379#[must_use]
381pub fn audit_permeation_map() -> Vec<AuditFailure> {
382 audit_permeation_entries(
383 PERMEATION_MAP,
384 V1_REQUIRED_SUBSYSTEMS,
385 4096,
386 PolicyResolutionDefaults::default(),
387 )
388}
389
390#[must_use]
394pub fn audit_permeation_entries(
395 entries: &[PermeationEntry],
396 required_subsystems: &[&str],
397 page_size: u32,
398 defaults: PolicyResolutionDefaults,
399) -> Vec<AuditFailure> {
400 debug!(
401 bead_id = PERMEATION_BEAD_ID,
402 logging_standard = PERMEATION_LOGGING_STANDARD,
403 entry_count = entries.len(),
404 required_count = required_subsystems.len(),
405 page_size = page_size,
406 "starting permeation-map audit"
407 );
408
409 let mut failures = Vec::new();
410 let mut seen = BTreeSet::new();
411 let mut by_subsystem: BTreeMap<&str, usize> = BTreeMap::new();
412
413 for entry in entries {
414 *by_subsystem.entry(entry.subsystem).or_default() += 1;
415 push_empty_field_failures(&mut failures, entry);
416
417 if !seen.insert((entry.plane, entry.subsystem)) {
418 failures.push(AuditFailure {
419 kind: AuditFailureKind::DuplicateSubsystemInPlane,
420 subsystem: entry.subsystem.to_owned(),
421 plane: Some(entry.plane),
422 detail: format!(
423 "duplicate subsystem '{}' in plane {}",
424 entry.subsystem, entry.plane
425 ),
426 });
427 }
428
429 validate_symbol_policy_entry(&mut failures, entry, page_size, defaults);
430 }
431
432 for required in required_subsystems {
433 if !by_subsystem.contains_key(required) {
434 failures.push(AuditFailure {
435 kind: AuditFailureKind::MissingEntry,
436 subsystem: (*required).to_owned(),
437 plane: None,
438 detail: "required subsystem missing from permeation map".to_owned(),
439 });
440 }
441 }
442
443 if failures.is_empty() {
444 info!(
445 bead_id = PERMEATION_BEAD_ID,
446 logging_standard = PERMEATION_LOGGING_STANDARD,
447 entry_count = entries.len(),
448 "permeation-map audit complete: no gaps"
449 );
450 } else {
451 error!(
452 bead_id = PERMEATION_BEAD_ID,
453 logging_standard = PERMEATION_LOGGING_STANDARD,
454 entry_count = entries.len(),
455 failure_count = failures.len(),
456 "permeation-map audit detected failures"
457 );
458 }
459
460 failures
461}
462
463fn push_empty_field_failures(failures: &mut Vec<AuditFailure>, entry: &PermeationEntry) {
464 if entry.subsystem.trim().is_empty() {
465 failures.push(AuditFailure {
466 kind: AuditFailureKind::EmptyField,
467 subsystem: entry.subsystem.to_owned(),
468 plane: Some(entry.plane),
469 detail: "subsystem is empty".to_owned(),
470 });
471 }
472 if entry.object_type.trim().is_empty() {
473 failures.push(AuditFailure {
474 kind: AuditFailureKind::EmptyField,
475 subsystem: entry.subsystem.to_owned(),
476 plane: Some(entry.plane),
477 detail: "object_type is empty".to_owned(),
478 });
479 }
480 if entry.symbol_size_policy.trim().is_empty() {
481 failures.push(AuditFailure {
482 kind: AuditFailureKind::EmptyField,
483 subsystem: entry.subsystem.to_owned(),
484 plane: Some(entry.plane),
485 detail: "symbol_size_policy is empty".to_owned(),
486 });
487 }
488 if entry.repair_story.trim().is_empty() {
489 failures.push(AuditFailure {
490 kind: AuditFailureKind::EmptyField,
491 subsystem: entry.subsystem.to_owned(),
492 plane: Some(entry.plane),
493 detail: "repair_story is empty".to_owned(),
494 });
495 }
496}
497
498fn validate_symbol_policy_entry(
499 failures: &mut Vec<AuditFailure>,
500 entry: &PermeationEntry,
501 page_size: u32,
502 defaults: PolicyResolutionDefaults,
503) {
504 match parse_symbol_policy(entry.symbol_size_policy) {
505 Ok(parsed) => {
506 let resolved = parsed.resolve(page_size, defaults);
507 debug!(
508 bead_id = PERMEATION_BEAD_ID,
509 logging_standard = PERMEATION_LOGGING_STANDARD,
510 subsystem = entry.subsystem,
511 plane = %entry.plane,
512 symbol_size_bytes = resolved.symbol_size_bytes,
513 redundancy_bps = resolved.redundancy_bps,
514 fountain_coded = resolved.fountain_coded,
515 "validated symbol policy declaration"
516 );
517 }
518 Err(parse_error) => {
519 error!(
520 bead_id = PERMEATION_BEAD_ID,
521 logging_standard = PERMEATION_LOGGING_STANDARD,
522 subsystem = entry.subsystem,
523 plane = %entry.plane,
524 policy = entry.symbol_size_policy,
525 error = %parse_error,
526 "invalid permeation symbol policy"
527 );
528 failures.push(AuditFailure {
529 kind: AuditFailureKind::InvalidSymbolPolicy,
530 subsystem: entry.subsystem.to_owned(),
531 plane: Some(entry.plane),
532 detail: parse_error.to_string(),
533 });
534 }
535 }
536}
537
538#[derive(Debug, Clone, PartialEq, Eq)]
540pub struct ReconciliationDelta {
541 pub missing_locally: BTreeSet<ObjectId>,
543 pub missing_remotely: BTreeSet<ObjectId>,
545}
546
547impl ReconciliationDelta {
548 #[must_use]
550 pub fn is_empty(&self) -> bool {
551 self.missing_locally.is_empty() && self.missing_remotely.is_empty()
552 }
553}
554
555#[derive(Debug, Clone, PartialEq, Eq)]
557pub struct ReconciliationResult {
558 pub delta: ReconciliationDelta,
560 pub used_fallback: bool,
562}
563
564#[derive(Debug, Clone, PartialEq, Eq)]
566pub enum IbltError {
567 InvalidCellCount { cell_count: usize },
569 ShapeMismatch {
571 left_cell_count: usize,
572 right_cell_count: usize,
573 },
574 PeelOverflow { residual_cells: usize },
576}
577
578impl fmt::Display for IbltError {
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580 match self {
581 Self::InvalidCellCount { cell_count } => {
582 write!(f, "invalid IBLT cell count: {cell_count}")
583 }
584 Self::ShapeMismatch {
585 left_cell_count,
586 right_cell_count,
587 } => write!(
588 f,
589 "IBLT shape mismatch: left={left_cell_count}, right={right_cell_count}"
590 ),
591 Self::PeelOverflow { residual_cells } => {
592 write!(f, "IBLT peel failed with {residual_cells} residual cells")
593 }
594 }
595 }
596}
597
598impl Error for IbltError {}
599
600#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
601struct IbltCell {
602 count: i32,
603 key_xor: [u8; 16],
604 checksum_xor: u32,
605}
606
607impl IbltCell {
608 fn is_zero(self) -> bool {
609 self.count == 0 && self.key_xor == [0_u8; 16] && self.checksum_xor == 0
610 }
611
612 fn is_pure(self) -> bool {
613 if self.count.unsigned_abs() != 1 {
614 return false;
615 }
616 checksum_for_bytes(&self.key_xor) == self.checksum_xor
617 }
618}
619
620#[derive(Debug, Clone, PartialEq, Eq)]
622pub struct ObjectIdIblt {
623 cells: Vec<IbltCell>,
624}
625
626impl ObjectIdIblt {
627 pub fn new(cell_count: usize) -> Result<Self, IbltError> {
633 if cell_count < IBLT_HASH_COUNT {
634 return Err(IbltError::InvalidCellCount { cell_count });
635 }
636 Ok(Self {
637 cells: vec![IbltCell::default(); cell_count],
638 })
639 }
640
641 pub fn from_set(object_ids: &BTreeSet<ObjectId>, cell_count: usize) -> Result<Self, IbltError> {
647 let mut iblt = Self::new(cell_count)?;
648 for object_id in object_ids {
649 iblt.insert(*object_id);
650 }
651 Ok(iblt)
652 }
653
654 fn insert(&mut self, object_id: ObjectId) {
655 self.apply_delta(object_id, 1);
656 }
657
658 fn apply_delta(&mut self, object_id: ObjectId, delta: i32) {
659 let checksum = checksum_for_bytes(object_id.as_bytes());
660 for index in bucket_indices(object_id, self.cells.len()) {
661 let cell = &mut self.cells[index];
662 cell.count += delta;
663 xor_in_place(&mut cell.key_xor, object_id.as_bytes());
664 cell.checksum_xor ^= checksum;
665 }
666 }
667
668 fn subtract_assign(&mut self, rhs: &Self) -> Result<(), IbltError> {
669 if self.cells.len() != rhs.cells.len() {
670 return Err(IbltError::ShapeMismatch {
671 left_cell_count: self.cells.len(),
672 right_cell_count: rhs.cells.len(),
673 });
674 }
675
676 for (left, right) in self.cells.iter_mut().zip(rhs.cells.iter()) {
677 left.count -= right.count;
678 xor_in_place(&mut left.key_xor, &right.key_xor);
679 left.checksum_xor ^= right.checksum_xor;
680 }
681 Ok(())
682 }
683
684 fn peel(self) -> Result<ReconciliationDelta, IbltError> {
685 let mut working = self;
686 let mut queue = VecDeque::new();
687 for (index, cell) in working.cells.iter().enumerate() {
688 if cell.is_pure() {
689 queue.push_back(index);
690 }
691 }
692
693 let mut missing_locally = BTreeSet::new();
694 let mut missing_remotely = BTreeSet::new();
695
696 while let Some(index) = queue.pop_front() {
697 let cell = working.cells[index];
698 if !cell.is_pure() {
699 continue;
700 }
701
702 let sign = cell.count.signum();
703 if sign == 0 {
704 continue;
705 }
706
707 let object_id = ObjectId::from_bytes(cell.key_xor);
708 if sign > 0 {
709 missing_locally.insert(object_id);
710 } else {
711 missing_remotely.insert(object_id);
712 }
713
714 let checksum = checksum_for_bytes(object_id.as_bytes());
715 for bucket in bucket_indices(object_id, working.cells.len()) {
716 let target = &mut working.cells[bucket];
717 target.count -= sign;
718 xor_in_place(&mut target.key_xor, object_id.as_bytes());
719 target.checksum_xor ^= checksum;
720 if target.is_pure() {
721 queue.push_back(bucket);
722 }
723 }
724 }
725
726 if working.cells.iter().all(|cell| cell.is_zero()) {
727 Ok(ReconciliationDelta {
728 missing_locally,
729 missing_remotely,
730 })
731 } else {
732 let residual_cells = working.cells.iter().filter(|cell| !cell.is_zero()).count();
733 Err(IbltError::PeelOverflow { residual_cells })
734 }
735 }
736}
737
738#[must_use]
740pub fn reconcile_object_id_sets(
741 local: &BTreeSet<ObjectId>,
742 remote: &BTreeSet<ObjectId>,
743 iblt_cell_count: usize,
744) -> ReconciliationResult {
745 debug!(
746 bead_id = PERMEATION_BEAD_ID,
747 logging_standard = PERMEATION_LOGGING_STANDARD,
748 local_count = local.len(),
749 remote_count = remote.len(),
750 iblt_cell_count = iblt_cell_count,
751 "starting object-id anti-entropy reconciliation"
752 );
753
754 let mut local_iblt = match ObjectIdIblt::from_set(local, iblt_cell_count) {
755 Ok(iblt) => iblt,
756 Err(new_error) => {
757 warn!(
758 bead_id = PERMEATION_BEAD_ID,
759 logging_standard = PERMEATION_LOGGING_STANDARD,
760 error = %new_error,
761 "invalid IBLT configuration; degrading to segment-hash fallback"
762 );
763 return segment_hash_scan_fallback(local, remote);
764 }
765 };
766 let remote_iblt = match ObjectIdIblt::from_set(remote, iblt_cell_count) {
767 Ok(iblt) => iblt,
768 Err(new_error) => {
769 warn!(
770 bead_id = PERMEATION_BEAD_ID,
771 logging_standard = PERMEATION_LOGGING_STANDARD,
772 error = %new_error,
773 "invalid remote IBLT configuration; degrading to segment-hash fallback"
774 );
775 return segment_hash_scan_fallback(local, remote);
776 }
777 };
778
779 if let Err(subtract_error) = local_iblt.subtract_assign(&remote_iblt) {
780 warn!(
781 bead_id = PERMEATION_BEAD_ID,
782 logging_standard = PERMEATION_LOGGING_STANDARD,
783 error = %subtract_error,
784 "IBLT subtraction failed; degrading to segment-hash fallback"
785 );
786 return segment_hash_scan_fallback(local, remote);
787 }
788
789 match local_iblt.peel() {
790 Ok(delta) => {
791 info!(
792 bead_id = PERMEATION_BEAD_ID,
793 logging_standard = PERMEATION_LOGGING_STANDARD,
794 missing_locally = delta.missing_locally.len(),
795 missing_remotely = delta.missing_remotely.len(),
796 "IBLT reconciliation completed"
797 );
798 ReconciliationResult {
799 delta,
800 used_fallback: false,
801 }
802 }
803 Err(peel_error) => {
804 warn!(
805 bead_id = PERMEATION_BEAD_ID,
806 logging_standard = PERMEATION_LOGGING_STANDARD,
807 error = %peel_error,
808 "IBLT peel overflow; degrading to segment-hash fallback"
809 );
810 segment_hash_scan_fallback(local, remote)
811 }
812 }
813}
814
815#[must_use]
817pub fn segment_hash_scan_fallback(
818 local: &BTreeSet<ObjectId>,
819 remote: &BTreeSet<ObjectId>,
820) -> ReconciliationResult {
821 let missing_locally: BTreeSet<ObjectId> = remote.difference(local).copied().collect();
822 let missing_remotely: BTreeSet<ObjectId> = local.difference(remote).copied().collect();
823
824 info!(
825 bead_id = PERMEATION_BEAD_ID,
826 logging_standard = PERMEATION_LOGGING_STANDARD,
827 missing_locally = missing_locally.len(),
828 missing_remotely = missing_remotely.len(),
829 "segment-hash fallback reconciliation completed"
830 );
831
832 ReconciliationResult {
833 delta: ReconciliationDelta {
834 missing_locally,
835 missing_remotely,
836 },
837 used_fallback: true,
838 }
839}
840
841fn parse_symbol_size_policy(raw: &str) -> Result<SymbolSizePolicy, SymbolPolicyParseError> {
842 if raw == "T=page_size" {
843 return Ok(SymbolSizePolicy::PageSize);
844 }
845
846 if let Some(inner) = raw
847 .strip_prefix("T=min(page_size,")
848 .and_then(|value| value.strip_suffix(')'))
849 {
850 let cap = inner
851 .parse::<u32>()
852 .map_err(|_| SymbolPolicyParseError::new(format!("invalid min() cap: {raw}")))?;
853 return Ok(SymbolSizePolicy::MinPageSize { cap_bytes: cap });
854 }
855
856 if let Some(bytes) = raw
857 .strip_prefix("T=")
858 .and_then(|value| value.strip_suffix('B'))
859 {
860 if let Some((lo, hi)) = bytes.split_once('-') {
861 let min_bytes = lo.parse::<u32>().map_err(|_| {
862 SymbolPolicyParseError::new(format!("invalid range lower bound: {raw}"))
863 })?;
864 let max_bytes = hi.parse::<u32>().map_err(|_| {
865 SymbolPolicyParseError::new(format!("invalid range upper bound: {raw}"))
866 })?;
867 if min_bytes > max_bytes {
868 return Err(SymbolPolicyParseError::new(format!(
869 "range lower bound exceeds upper bound: {raw}"
870 )));
871 }
872 return Ok(SymbolSizePolicy::RangeBytes {
873 min_bytes,
874 max_bytes,
875 });
876 }
877
878 let fixed_bytes = bytes.parse::<u32>().map_err(|_| {
879 SymbolPolicyParseError::new(format!("invalid fixed symbol size: {raw}"))
880 })?;
881 return Ok(SymbolSizePolicy::FixedBytes(fixed_bytes));
882 }
883
884 Err(SymbolPolicyParseError::new(format!(
885 "unsupported symbol-size policy: {raw}"
886 )))
887}
888
889fn parse_redundancy_policy(raw: &str) -> Result<RedundancyPolicy, SymbolPolicyParseError> {
890 let normalized = raw.strip_suffix(" default").map_or(raw, str::trim).trim();
891 match normalized {
892 "policy-driven" => Ok(RedundancyPolicy::PolicyDriven),
893 "per-group" => Ok(RedundancyPolicy::PerGroup),
894 "transport-policy" => Ok(RedundancyPolicy::TransportPolicy),
895 _ => {
896 let bps = parse_percent_bps(normalized).ok_or_else(|| {
897 SymbolPolicyParseError::new(format!("invalid redundancy policy: {raw}"))
898 })?;
899 Ok(RedundancyPolicy::PercentBps(bps))
900 }
901 }
902}
903
904fn parse_percent_bps(raw: &str) -> Option<u16> {
905 let percent = raw.strip_suffix('%')?;
906 let (whole_raw, frac_raw) = percent.split_once('.').unwrap_or((percent, ""));
907 let whole = whole_raw.parse::<u16>().ok()?;
908 let frac_bps = if frac_raw.is_empty() {
909 0_u16
910 } else if frac_raw.len() == 1 {
911 frac_raw
912 .chars()
913 .next()
914 .and_then(|ch| ch.to_digit(10))
915 .and_then(|digit| u16::try_from(digit).ok())
916 .map_or(0, |digit| digit * 10)
917 } else if frac_raw.len() == 2 {
918 frac_raw.parse::<u16>().ok()?
919 } else {
920 return None;
921 };
922
923 let bps = whole.checked_mul(100)?.checked_add(frac_bps)?;
924 if bps > 10_000 { None } else { Some(bps) }
925}
926
927fn parse_fixed_policy(raw: &str) -> Option<(u32, &str)> {
928 let fixed = raw.strip_prefix("fixed:")?;
929 let (bytes_raw, rest) = fixed.split_once('B')?;
930 let bytes = bytes_raw.parse::<u32>().ok()?;
931 let redundancy = rest.split_once(", R=").map_or("0%", |(_, r)| r);
932 Some((bytes, redundancy.trim()))
933}
934
935fn xor_in_place(target: &mut [u8; 16], rhs: &[u8; 16]) {
936 for (left, right) in target.iter_mut().zip(rhs.iter()) {
937 *left ^= *right;
938 }
939}
940
941fn checksum_for_bytes(bytes: &[u8; 16]) -> u32 {
942 let mut state = 0x811C_9DC5_u32;
943 for byte in bytes {
944 state ^= u32::from(*byte);
945 state = state.wrapping_mul(0x0100_0193);
946 }
947 state
948}
949
950fn bucket_indices(object_id: ObjectId, cell_count: usize) -> [usize; IBLT_HASH_COUNT] {
951 let mut out = [0_usize; IBLT_HASH_COUNT];
952 let modulus = match u64::try_from(cell_count) {
953 Ok(value) => value.max(1),
954 Err(_) => u64::MAX,
955 };
956
957 for (slot, seed) in IBLT_HASH_SEEDS.iter().enumerate() {
958 let hash = seeded_object_hash(object_id.as_bytes(), *seed);
959 let index_u64 = hash % modulus;
960 out[slot] = usize::try_from(index_u64).unwrap_or(0);
961 }
962 out
963}
964
965fn seeded_object_hash(object_id: &[u8; 16], seed: u64) -> u64 {
966 let mut first = [0_u8; 8];
967 let mut second = [0_u8; 8];
968 first.copy_from_slice(&object_id[..8]);
969 second.copy_from_slice(&object_id[8..]);
970
971 let a = u64::from_le_bytes(first);
972 let b = u64::from_le_bytes(second);
973
974 let mut x = seed
975 ^ a.wrapping_mul(0x9E37_79B1_85EB_CA87)
976 ^ b.rotate_left(17).wrapping_mul(0xC2B2_AE3D_27D4_EB4F);
977 x ^= x >> 33;
978 x = x.wrapping_mul(0xFF51_AFD7_ED55_8CCD);
979 x ^= x >> 33;
980 x = x.wrapping_mul(0xC4CE_B9FE_1A85_EC53);
981 x ^ (x >> 33)
982}
983
984#[cfg(test)]
985mod tests {
986 use super::*;
987
988 fn oid_from_u64(value: u64) -> ObjectId {
989 let mut bytes = [0_u8; 16];
990 bytes[..8].copy_from_slice(&value.to_le_bytes());
991 bytes[8..].copy_from_slice(&(!value).to_le_bytes());
992 ObjectId::from_bytes(bytes)
993 }
994
995 fn find_entry(subsystem: &str) -> &'static PermeationEntry {
996 PERMEATION_MAP
997 .iter()
998 .find(|entry| entry.subsystem == subsystem)
999 .expect("expected subsystem entry")
1000 }
1001
1002 #[test]
1003 fn test_permeation_map_complete() {
1004 assert!(!PERMEATION_MAP.is_empty());
1005 for required in V1_REQUIRED_SUBSYSTEMS {
1006 assert!(
1007 PERMEATION_MAP
1008 .iter()
1009 .any(|entry| entry.subsystem == *required),
1010 "missing required subsystem: {required}"
1011 );
1012 }
1013
1014 for entry in PERMEATION_MAP {
1015 assert!(!entry.subsystem.is_empty());
1016 assert!(!entry.object_type.is_empty());
1017 assert!(!entry.symbol_size_policy.is_empty());
1018 assert!(!entry.repair_story.is_empty());
1019 }
1020 }
1021
1022 #[test]
1023 fn test_permeation_map_no_gaps() {
1024 let failures = audit_permeation_map();
1025 assert!(failures.is_empty(), "unexpected gaps: {failures:#?}");
1026 }
1027
1028 #[test]
1029 fn test_permeation_map_no_duplicates() {
1030 let mut seen = BTreeSet::new();
1031 for entry in PERMEATION_MAP {
1032 assert!(
1033 seen.insert((entry.plane, entry.subsystem)),
1034 "duplicate subsystem '{}' in plane {:?}",
1035 entry.subsystem,
1036 entry.plane
1037 );
1038 }
1039 }
1040
1041 #[test]
1042 fn test_permeation_map_symbol_policy_parseable() {
1043 for page_size in [1024_u32, 4096, 65536] {
1044 for entry in PERMEATION_MAP {
1045 let parsed = parse_symbol_policy(entry.symbol_size_policy)
1046 .expect("symbol policy must be parseable");
1047 let resolved = parsed.resolve(page_size, PolicyResolutionDefaults::default());
1048 assert!(resolved.symbol_size_bytes >= 16);
1049 assert!(resolved.redundancy_bps <= 10_000);
1050 }
1051 }
1052 }
1053
1054 #[test]
1055 fn test_permeation_map_commit_capsule_policy() {
1056 let entry = find_entry("Commits/CapsuleProof");
1057 let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1058 let defaults = PolicyResolutionDefaults::default();
1059
1060 let resolved_4096 = parsed.resolve(4096, defaults);
1061 assert_eq!(resolved_4096.symbol_size_bytes, 4096);
1062 assert_eq!(resolved_4096.redundancy_bps, 2_000);
1063
1064 let resolved_65536 = parsed.resolve(65536, defaults);
1065 assert_eq!(resolved_65536.symbol_size_bytes, 4096);
1066 assert_eq!(resolved_65536.redundancy_bps, 2_000);
1067 }
1068
1069 #[test]
1070 fn test_permeation_map_page_history_policy() {
1071 let entry = find_entry("Page storage");
1072 let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1073 let resolved = parsed.resolve(4096, PolicyResolutionDefaults::default());
1074 assert_eq!(resolved.symbol_size_bytes, 4096);
1075 assert_eq!(resolved.redundancy_bps, 2_000);
1076 }
1077
1078 #[test]
1079 fn test_permeation_map_marker_record_policy() {
1080 let entry = find_entry("Commits/MarkerStream");
1081 let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1082 let resolved = parsed.resolve(4096, PolicyResolutionDefaults::default());
1083 assert_eq!(resolved.symbol_size_bytes, 88);
1084 assert_eq!(resolved.redundancy_bps, 0);
1085 assert!(!resolved.fountain_coded);
1086 }
1087
1088 #[test]
1089 fn test_iblt_set_reconciliation() {
1090 let local: BTreeSet<ObjectId> = (0_u64..100).map(oid_from_u64).collect();
1091 let remote: BTreeSet<ObjectId> = (5_u64..105).map(oid_from_u64).collect();
1092 let result = reconcile_object_id_sets(&local, &remote, 128);
1093
1094 assert!(!result.used_fallback, "expected IBLT to peel successfully");
1095 assert_eq!(result.delta.missing_locally.len(), 5);
1096 assert_eq!(result.delta.missing_remotely.len(), 5);
1097 assert_eq!(
1098 result.delta.missing_locally.len() + result.delta.missing_remotely.len(),
1099 10
1100 );
1101 }
1102
1103 #[test]
1104 fn test_iblt_fallback_on_overflow() {
1105 let local: BTreeSet<ObjectId> = (0_u64..300).map(oid_from_u64).collect();
1106 let remote: BTreeSet<ObjectId> = (300_u64..600).map(oid_from_u64).collect();
1107 let result = reconcile_object_id_sets(&local, &remote, 8);
1108
1109 assert!(result.used_fallback, "expected overflow fallback");
1110 assert_eq!(result.delta.missing_locally.len(), 300);
1111 assert_eq!(result.delta.missing_remotely.len(), 300);
1112 }
1113
1114 #[test]
1115 fn test_audit_no_gaps() {
1116 let failures = audit_permeation_map();
1117 assert!(
1118 failures.is_empty(),
1119 "expected no audit failures: {failures:#?}"
1120 );
1121 }
1122
1123 #[test]
1124 fn test_audit_new_subsystem_requires_entry() {
1125 let mut required = V1_REQUIRED_SUBSYSTEMS.to_vec();
1126 required.push("Future storage lane");
1127 let failures = audit_permeation_entries(
1128 PERMEATION_MAP,
1129 &required,
1130 4096,
1131 PolicyResolutionDefaults::default(),
1132 );
1133 assert!(
1134 failures
1135 .iter()
1136 .any(|failure| failure.kind == AuditFailureKind::MissingEntry
1137 && failure.subsystem == "Future storage lane")
1138 );
1139 }
1140
1141 #[test]
1142 fn test_bd_1hi_27_unit_compliance_gate() {
1143 assert_eq!(PERMEATION_BEAD_ID, "bd-1hi.27");
1144 assert_eq!(PERMEATION_LOGGING_STANDARD, "bd-1fpm");
1145 assert!(audit_permeation_map().is_empty());
1146 }
1147
1148 #[test]
1149 fn prop_bd_1hi_27_structure_compliance() {
1150 let required_planes = [
1151 Plane::Durability,
1152 Plane::Concurrency,
1153 Plane::Replication,
1154 Plane::Observability,
1155 ];
1156 for plane in required_planes {
1157 assert!(
1158 PERMEATION_MAP.iter().any(|entry| entry.plane == plane),
1159 "missing plane entry: {plane:?}"
1160 );
1161 }
1162
1163 for page_size in [512_u32, 1024, 2048, 4096, 8192, 16384, 65536] {
1164 for entry in PERMEATION_MAP {
1165 let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1166 let resolved = parsed.resolve(page_size, PolicyResolutionDefaults::default());
1167 assert!(resolved.symbol_size_bytes >= 16);
1168 assert!(resolved.redundancy_bps <= 10_000);
1169 }
1170 }
1171 }
1172
1173 #[test]
1174 fn test_e2e_bd_1hi_27_compliance() {
1175 let failures = audit_permeation_map();
1176 assert!(failures.is_empty(), "audit should pass in e2e");
1177
1178 let local: BTreeSet<ObjectId> = (0_u64..64).map(oid_from_u64).collect();
1179 let remote: BTreeSet<ObjectId> = (32_u64..96).map(oid_from_u64).collect();
1180 let iblt_ok = reconcile_object_id_sets(&local, &remote, 192);
1181 assert!(!iblt_ok.used_fallback);
1182 assert_eq!(
1183 iblt_ok.delta.missing_locally.len() + iblt_ok.delta.missing_remotely.len(),
1184 64
1185 );
1186
1187 let overflow = reconcile_object_id_sets(&local, &remote, 4);
1188 assert!(overflow.used_fallback);
1189 let artifact = format!(
1190 "bead={} log={} iblt_ok={} fallback={}",
1191 PERMEATION_BEAD_ID,
1192 PERMEATION_LOGGING_STANDARD,
1193 !iblt_ok.used_fallback,
1194 overflow.used_fallback
1195 );
1196 assert!(artifact.contains("bd-1hi.27"));
1197 assert!(artifact.contains("bd-1fpm"));
1198 }
1199}