Skip to main content

fsqlite_core/
permeation_map.rs

1//! §3.5.7 RaptorQ permeation map audit (`bd-1hi.27`).
2//!
3//! This module enforces the rule that every subsystem that persists or ships
4//! bytes declares an ECS object type, symbol policy, and repair story.
5
6use std::collections::{BTreeMap, BTreeSet, VecDeque};
7use std::error::Error;
8use std::fmt;
9
10use fsqlite_types::ObjectId;
11use tracing::{debug, error, info, warn};
12
13/// Bead identifier for this module.
14pub const PERMEATION_BEAD_ID: &str = "bd-1hi.27";
15/// Structured logging reference for this module.
16pub const PERMEATION_LOGGING_STANDARD: &str = "bd-1fpm";
17
18const IBLT_HASH_COUNT: usize = 3;
19const IBLT_HASH_SEEDS: [u64; IBLT_HASH_COUNT] = [
20    0x9E37_79B9_7F4A_7C15,
21    0xC2B2_AE3D_27D4_EB4F,
22    0x1656_67B1_9E37_79F9,
23];
24
25/// ECS permeation plane.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
27pub enum Plane {
28    /// Durable storage plane.
29    Durability,
30    /// In-memory concurrency/visibility plane.
31    Concurrency,
32    /// Replication/transport plane.
33    Replication,
34    /// Explainability/observability plane.
35    Observability,
36}
37
38impl fmt::Display for Plane {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            Self::Durability => f.write_str("Durability"),
42            Self::Concurrency => f.write_str("Concurrency"),
43            Self::Replication => f.write_str("Replication"),
44            Self::Observability => f.write_str("Observability"),
45        }
46    }
47}
48
49/// One permeation-map declaration entry (§3.5.7 checklist).
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct PermeationEntry {
52    /// Subsystem name.
53    pub subsystem: &'static str,
54    /// ECS object type or transport primitive.
55    pub object_type: &'static str,
56    /// Symbol size/redundancy policy declaration.
57    pub symbol_size_policy: &'static str,
58    /// Repair story declaration.
59    pub repair_story: &'static str,
60    /// Architecture plane.
61    pub plane: Plane,
62}
63
64/// V1 required subsystem names.
65pub static V1_REQUIRED_SUBSYSTEMS: &[&str] = &[
66    "Commits/CapsuleProof",
67    "Commits/MarkerStream",
68    "Checkpoints",
69    "Indices",
70    "Page storage",
71    "MVCC page history",
72    "Conflict reduction",
73    "SSI witness plane",
74    "Symbol streaming",
75    "Anti-entropy",
76    "Bootstrap",
77    "Multipath",
78    "Repair auditing",
79    "Schedule exploration",
80    "Invariant monitoring",
81    "Model checking",
82];
83
84/// Canonical permeation map for V1.
85pub static PERMEATION_MAP: &[PermeationEntry] = &[
86    PermeationEntry {
87        subsystem: "Commits/CapsuleProof",
88        object_type: "CommitCapsule+CommitProof",
89        symbol_size_policy: "T=min(page_size,4096), R=20%",
90        repair_story: "decode from surviving symbols",
91        plane: Plane::Durability,
92    },
93    PermeationEntry {
94        subsystem: "Commits/MarkerStream",
95        object_type: "CommitMarkerRecord",
96        symbol_size_policy: "fixed:88B record stream (no fountain)",
97        repair_story: "torn-tail ignore + record_xxh3 + hash-chain audit",
98        plane: Plane::Durability,
99    },
100    PermeationEntry {
101        subsystem: "Checkpoints",
102        object_type: "CheckpointChunk",
103        symbol_size_policy: "T=1024-4096B, R=policy-driven",
104        repair_story: "chunked snapshot objects; rebuild from marker stream if lost",
105        plane: Plane::Durability,
106    },
107    PermeationEntry {
108        subsystem: "Indices",
109        object_type: "IndexSegment",
110        symbol_size_policy: "T=1280-4096B, R=20%",
111        repair_story: "decode or rebuild-from-marker-scan",
112        plane: Plane::Durability,
113    },
114    PermeationEntry {
115        subsystem: "Page storage",
116        object_type: "PageHistory",
117        symbol_size_policy: "T=page_size, R=per-group",
118        repair_story: "decode from group symbols; on-the-fly repair on read",
119        plane: Plane::Durability,
120    },
121    PermeationEntry {
122        subsystem: "MVCC page history",
123        object_type: "PageHistoryPatchChain",
124        symbol_size_policy: "T=page_size, R=per-group",
125        repair_story: "bounded by GC horizon; repair through patch replay",
126        plane: Plane::Concurrency,
127    },
128    PermeationEntry {
129        subsystem: "Conflict reduction",
130        object_type: "IntentLog",
131        symbol_size_policy: "T=256-1024B, R=policy-driven",
132        repair_story: "replayed deterministically for rebase merge",
133        plane: Plane::Concurrency,
134    },
135    PermeationEntry {
136        subsystem: "SSI witness plane",
137        object_type: "ReadWitness+WriteWitness+WitnessIndexSegment+DependencyEdge+CommitProof",
138        symbol_size_policy: "T=1280-4096B, R=policy-driven",
139        repair_story: "decode witness stream and rebuild serialization graph",
140        plane: Plane::Concurrency,
141    },
142    PermeationEntry {
143        subsystem: "Symbol streaming",
144        object_type: "SymbolSink/SymbolStream",
145        symbol_size_policy: "T=1280-4096B, R=transport-policy",
146        repair_story: "symbol-native transport; recover with any K symbols",
147        plane: Plane::Replication,
148    },
149    PermeationEntry {
150        subsystem: "Anti-entropy",
151        object_type: "ObjectIdSetIBLT",
152        symbol_size_policy: "fixed:16B object-id atoms (IBLT), R=0%",
153        repair_story: "peel IBLT; fallback to segment hash scan on overflow",
154        plane: Plane::Replication,
155    },
156    PermeationEntry {
157        subsystem: "Bootstrap",
158        object_type: "CheckpointChunk",
159        symbol_size_policy: "T=1024-4096B, R=policy-driven",
160        repair_story: "late join by collecting K checkpoint symbols",
161        plane: Plane::Replication,
162    },
163    PermeationEntry {
164        subsystem: "Multipath",
165        object_type: "MultipathAggregator",
166        symbol_size_policy: "T=1280-4096B, R=transport-policy",
167        repair_story: "any K symbols from any path reconstructs object",
168        plane: Plane::Replication,
169    },
170    PermeationEntry {
171        subsystem: "Repair auditing",
172        object_type: "DecodeProof",
173        symbol_size_policy: "T=1024-4096B, R=0%",
174        repair_story: "attach decode proof artifacts to deterministic traces",
175        plane: Plane::Observability,
176    },
177    PermeationEntry {
178        subsystem: "Schedule exploration",
179        object_type: "LabRuntimeTrace",
180        symbol_size_policy: "T=1024-4096B, R=0%",
181        repair_story: "deterministic replay from seed and event stream",
182        plane: Plane::Observability,
183    },
184    PermeationEntry {
185        subsystem: "Invariant monitoring",
186        object_type: "EProcessMonitorEvent",
187        symbol_size_policy: "T=256-1024B, R=0%",
188        repair_story: "stream invariant events and enforce corruption budgets",
189        plane: Plane::Observability,
190    },
191    PermeationEntry {
192        subsystem: "Model checking",
193        object_type: "TlaExportTrace",
194        symbol_size_policy: "T=1024-4096B, R=0%",
195        repair_story: "export traces for bounded TLA+ model checking",
196        plane: Plane::Observability,
197    },
198];
199
200/// Parser output for a symbol policy declaration.
201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202pub struct ParsedSymbolPolicy {
203    /// Symbol size policy.
204    pub symbol_size: SymbolSizePolicy,
205    /// Redundancy policy.
206    pub redundancy: RedundancyPolicy,
207    /// Whether this policy is fountain-coded.
208    pub fountain_coded: bool,
209}
210
211/// Symbol-size policy grammar.
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213pub enum SymbolSizePolicy {
214    /// `T=min(page_size,<cap>)`
215    MinPageSize { cap_bytes: u32 },
216    /// `T=page_size`
217    PageSize,
218    /// `T=<lo>-<hi>B`
219    RangeBytes { min_bytes: u32, max_bytes: u32 },
220    /// `T=<fixed>B`
221    FixedBytes(u32),
222}
223
224/// Redundancy policy grammar.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum RedundancyPolicy {
227    /// Explicit percent, represented as basis points.
228    PercentBps(u16),
229    /// Policy-picked redundancy.
230    PolicyDriven,
231    /// Per-group redundancy.
232    PerGroup,
233    /// Transport policy redundancy.
234    TransportPolicy,
235}
236
237/// Concrete symbol policy resolved for a page size + defaults.
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct ResolvedSymbolPolicy {
240    /// Concrete symbol size in bytes.
241    pub symbol_size_bytes: u32,
242    /// Concrete redundancy in basis points.
243    pub redundancy_bps: u16,
244    /// Whether this remains fountain-coded after resolution.
245    pub fountain_coded: bool,
246}
247
248/// Resolution defaults for non-numeric redundancy declarations.
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub struct PolicyResolutionDefaults {
251    /// Default for `policy-driven`.
252    pub policy_driven_bps: u16,
253    /// Default for `per-group`.
254    pub per_group_bps: u16,
255    /// Default for `transport-policy`.
256    pub transport_policy_bps: u16,
257}
258
259impl Default for PolicyResolutionDefaults {
260    fn default() -> Self {
261        Self {
262            policy_driven_bps: 2_000,
263            per_group_bps: 2_000,
264            transport_policy_bps: 1_500,
265        }
266    }
267}
268
269impl ParsedSymbolPolicy {
270    /// Resolve a parse policy into concrete values.
271    #[must_use]
272    pub fn resolve(
273        self,
274        page_size: u32,
275        defaults: PolicyResolutionDefaults,
276    ) -> ResolvedSymbolPolicy {
277        let symbol_size_bytes = match self.symbol_size {
278            SymbolSizePolicy::MinPageSize { cap_bytes } => page_size.min(cap_bytes),
279            SymbolSizePolicy::PageSize => page_size,
280            SymbolSizePolicy::RangeBytes {
281                min_bytes,
282                max_bytes,
283            } => page_size.clamp(min_bytes, max_bytes),
284            SymbolSizePolicy::FixedBytes(bytes) => bytes,
285        };
286
287        let redundancy_bps = match self.redundancy {
288            RedundancyPolicy::PercentBps(bps) => bps,
289            RedundancyPolicy::PolicyDriven => defaults.policy_driven_bps,
290            RedundancyPolicy::PerGroup => defaults.per_group_bps,
291            RedundancyPolicy::TransportPolicy => defaults.transport_policy_bps,
292        };
293
294        ResolvedSymbolPolicy {
295            symbol_size_bytes,
296            redundancy_bps,
297            fountain_coded: self.fountain_coded,
298        }
299    }
300}
301
302/// Parse error for symbol policy declarations.
303#[derive(Debug, Clone, PartialEq, Eq)]
304pub struct SymbolPolicyParseError {
305    detail: String,
306}
307
308impl SymbolPolicyParseError {
309    fn new(detail: impl Into<String>) -> Self {
310        Self {
311            detail: detail.into(),
312        }
313    }
314}
315
316impl fmt::Display for SymbolPolicyParseError {
317    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318        f.write_str(&self.detail)
319    }
320}
321
322impl Error for SymbolPolicyParseError {}
323
324/// Audit failure category.
325#[derive(Debug, Clone, Copy, PartialEq, Eq)]
326pub enum AuditFailureKind {
327    /// Entry missing for a required subsystem.
328    MissingEntry,
329    /// Duplicate subsystem declaration in the same plane.
330    DuplicateSubsystemInPlane,
331    /// Empty declaration field.
332    EmptyField,
333    /// Unparseable symbol policy.
334    InvalidSymbolPolicy,
335}
336
337/// One audit failure.
338#[derive(Debug, Clone, PartialEq, Eq)]
339pub struct AuditFailure {
340    /// Failure category.
341    pub kind: AuditFailureKind,
342    /// Subsystem name associated with this failure.
343    pub subsystem: String,
344    /// Plane, when available.
345    pub plane: Option<Plane>,
346    /// Human-readable detail.
347    pub detail: String,
348}
349
350/// Parse one symbol policy declaration.
351///
352/// # Errors
353///
354/// Returns [`SymbolPolicyParseError`] when the string does not match the
355/// expected grammar.
356pub fn parse_symbol_policy(raw: &str) -> Result<ParsedSymbolPolicy, SymbolPolicyParseError> {
357    if let Some((bytes, redundancy)) = parse_fixed_policy(raw) {
358        let redundancy = parse_redundancy_policy(redundancy)?;
359        return Ok(ParsedSymbolPolicy {
360            symbol_size: SymbolSizePolicy::FixedBytes(bytes),
361            redundancy,
362            fountain_coded: false,
363        });
364    }
365
366    let (symbol_raw, redundancy_raw) = raw.split_once(", R=").ok_or_else(|| {
367        SymbolPolicyParseError::new(format!("policy missing ', R=' clause: {raw}"))
368    })?;
369
370    let symbol_size = parse_symbol_size_policy(symbol_raw.trim())?;
371    let redundancy = parse_redundancy_policy(redundancy_raw.trim())?;
372    Ok(ParsedSymbolPolicy {
373        symbol_size,
374        redundancy,
375        fountain_coded: true,
376    })
377}
378
379/// Run the permeation-map audit on the canonical V1 map.
380#[must_use]
381pub fn audit_permeation_map() -> Vec<AuditFailure> {
382    audit_permeation_entries(
383        PERMEATION_MAP,
384        V1_REQUIRED_SUBSYSTEMS,
385        4096,
386        PolicyResolutionDefaults::default(),
387    )
388}
389
390/// Run the permeation-map audit against arbitrary entries.
391///
392/// This helper is used by tests to enforce "new subsystem requires entry".
393#[must_use]
394pub fn audit_permeation_entries(
395    entries: &[PermeationEntry],
396    required_subsystems: &[&str],
397    page_size: u32,
398    defaults: PolicyResolutionDefaults,
399) -> Vec<AuditFailure> {
400    debug!(
401        bead_id = PERMEATION_BEAD_ID,
402        logging_standard = PERMEATION_LOGGING_STANDARD,
403        entry_count = entries.len(),
404        required_count = required_subsystems.len(),
405        page_size = page_size,
406        "starting permeation-map audit"
407    );
408
409    let mut failures = Vec::new();
410    let mut seen = BTreeSet::new();
411    let mut by_subsystem: BTreeMap<&str, usize> = BTreeMap::new();
412
413    for entry in entries {
414        *by_subsystem.entry(entry.subsystem).or_default() += 1;
415        push_empty_field_failures(&mut failures, entry);
416
417        if !seen.insert((entry.plane, entry.subsystem)) {
418            failures.push(AuditFailure {
419                kind: AuditFailureKind::DuplicateSubsystemInPlane,
420                subsystem: entry.subsystem.to_owned(),
421                plane: Some(entry.plane),
422                detail: format!(
423                    "duplicate subsystem '{}' in plane {}",
424                    entry.subsystem, entry.plane
425                ),
426            });
427        }
428
429        validate_symbol_policy_entry(&mut failures, entry, page_size, defaults);
430    }
431
432    for required in required_subsystems {
433        if !by_subsystem.contains_key(required) {
434            failures.push(AuditFailure {
435                kind: AuditFailureKind::MissingEntry,
436                subsystem: (*required).to_owned(),
437                plane: None,
438                detail: "required subsystem missing from permeation map".to_owned(),
439            });
440        }
441    }
442
443    if failures.is_empty() {
444        info!(
445            bead_id = PERMEATION_BEAD_ID,
446            logging_standard = PERMEATION_LOGGING_STANDARD,
447            entry_count = entries.len(),
448            "permeation-map audit complete: no gaps"
449        );
450    } else {
451        error!(
452            bead_id = PERMEATION_BEAD_ID,
453            logging_standard = PERMEATION_LOGGING_STANDARD,
454            entry_count = entries.len(),
455            failure_count = failures.len(),
456            "permeation-map audit detected failures"
457        );
458    }
459
460    failures
461}
462
463fn push_empty_field_failures(failures: &mut Vec<AuditFailure>, entry: &PermeationEntry) {
464    if entry.subsystem.trim().is_empty() {
465        failures.push(AuditFailure {
466            kind: AuditFailureKind::EmptyField,
467            subsystem: entry.subsystem.to_owned(),
468            plane: Some(entry.plane),
469            detail: "subsystem is empty".to_owned(),
470        });
471    }
472    if entry.object_type.trim().is_empty() {
473        failures.push(AuditFailure {
474            kind: AuditFailureKind::EmptyField,
475            subsystem: entry.subsystem.to_owned(),
476            plane: Some(entry.plane),
477            detail: "object_type is empty".to_owned(),
478        });
479    }
480    if entry.symbol_size_policy.trim().is_empty() {
481        failures.push(AuditFailure {
482            kind: AuditFailureKind::EmptyField,
483            subsystem: entry.subsystem.to_owned(),
484            plane: Some(entry.plane),
485            detail: "symbol_size_policy is empty".to_owned(),
486        });
487    }
488    if entry.repair_story.trim().is_empty() {
489        failures.push(AuditFailure {
490            kind: AuditFailureKind::EmptyField,
491            subsystem: entry.subsystem.to_owned(),
492            plane: Some(entry.plane),
493            detail: "repair_story is empty".to_owned(),
494        });
495    }
496}
497
498fn validate_symbol_policy_entry(
499    failures: &mut Vec<AuditFailure>,
500    entry: &PermeationEntry,
501    page_size: u32,
502    defaults: PolicyResolutionDefaults,
503) {
504    match parse_symbol_policy(entry.symbol_size_policy) {
505        Ok(parsed) => {
506            let resolved = parsed.resolve(page_size, defaults);
507            debug!(
508                bead_id = PERMEATION_BEAD_ID,
509                logging_standard = PERMEATION_LOGGING_STANDARD,
510                subsystem = entry.subsystem,
511                plane = %entry.plane,
512                symbol_size_bytes = resolved.symbol_size_bytes,
513                redundancy_bps = resolved.redundancy_bps,
514                fountain_coded = resolved.fountain_coded,
515                "validated symbol policy declaration"
516            );
517        }
518        Err(parse_error) => {
519            error!(
520                bead_id = PERMEATION_BEAD_ID,
521                logging_standard = PERMEATION_LOGGING_STANDARD,
522                subsystem = entry.subsystem,
523                plane = %entry.plane,
524                policy = entry.symbol_size_policy,
525                error = %parse_error,
526                "invalid permeation symbol policy"
527            );
528            failures.push(AuditFailure {
529                kind: AuditFailureKind::InvalidSymbolPolicy,
530                subsystem: entry.subsystem.to_owned(),
531                plane: Some(entry.plane),
532                detail: parse_error.to_string(),
533            });
534        }
535    }
536}
537
538/// Reconciliation delta between two object-id sets.
539#[derive(Debug, Clone, PartialEq, Eq)]
540pub struct ReconciliationDelta {
541    /// Object IDs present in remote but missing locally.
542    pub missing_locally: BTreeSet<ObjectId>,
543    /// Object IDs present in local but missing remotely.
544    pub missing_remotely: BTreeSet<ObjectId>,
545}
546
547impl ReconciliationDelta {
548    /// Whether both sides are converged.
549    #[must_use]
550    pub fn is_empty(&self) -> bool {
551        self.missing_locally.is_empty() && self.missing_remotely.is_empty()
552    }
553}
554
555/// Reconciliation result, including fallback flag.
556#[derive(Debug, Clone, PartialEq, Eq)]
557pub struct ReconciliationResult {
558    /// Symmetric-difference delta.
559    pub delta: ReconciliationDelta,
560    /// True when segment-hash fallback was used.
561    pub used_fallback: bool,
562}
563
564/// Errors emitted by IBLT operations.
565#[derive(Debug, Clone, PartialEq, Eq)]
566pub enum IbltError {
567    /// Invalid number of cells.
568    InvalidCellCount { cell_count: usize },
569    /// Shape mismatch between two IBLTs.
570    ShapeMismatch {
571        left_cell_count: usize,
572        right_cell_count: usize,
573    },
574    /// Peeling failed due to overflow/collision pressure.
575    PeelOverflow { residual_cells: usize },
576}
577
578impl fmt::Display for IbltError {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        match self {
581            Self::InvalidCellCount { cell_count } => {
582                write!(f, "invalid IBLT cell count: {cell_count}")
583            }
584            Self::ShapeMismatch {
585                left_cell_count,
586                right_cell_count,
587            } => write!(
588                f,
589                "IBLT shape mismatch: left={left_cell_count}, right={right_cell_count}"
590            ),
591            Self::PeelOverflow { residual_cells } => {
592                write!(f, "IBLT peel failed with {residual_cells} residual cells")
593            }
594        }
595    }
596}
597
598impl Error for IbltError {}
599
600#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
601struct IbltCell {
602    count: i32,
603    key_xor: [u8; 16],
604    checksum_xor: u32,
605}
606
607impl IbltCell {
608    fn is_zero(self) -> bool {
609        self.count == 0 && self.key_xor == [0_u8; 16] && self.checksum_xor == 0
610    }
611
612    fn is_pure(self) -> bool {
613        if self.count.unsigned_abs() != 1 {
614            return false;
615        }
616        checksum_for_bytes(&self.key_xor) == self.checksum_xor
617    }
618}
619
620/// Simple ObjectId IBLT implementation for anti-entropy reconciliation.
621#[derive(Debug, Clone, PartialEq, Eq)]
622pub struct ObjectIdIblt {
623    cells: Vec<IbltCell>,
624}
625
626impl ObjectIdIblt {
627    /// Construct an empty IBLT.
628    ///
629    /// # Errors
630    ///
631    /// Returns [`IbltError::InvalidCellCount`] for unusable cell counts.
632    pub fn new(cell_count: usize) -> Result<Self, IbltError> {
633        if cell_count < IBLT_HASH_COUNT {
634            return Err(IbltError::InvalidCellCount { cell_count });
635        }
636        Ok(Self {
637            cells: vec![IbltCell::default(); cell_count],
638        })
639    }
640
641    /// Build an IBLT from a set of object IDs.
642    ///
643    /// # Errors
644    ///
645    /// Returns the same errors as [`Self::new`].
646    pub fn from_set(object_ids: &BTreeSet<ObjectId>, cell_count: usize) -> Result<Self, IbltError> {
647        let mut iblt = Self::new(cell_count)?;
648        for object_id in object_ids {
649            iblt.insert(*object_id);
650        }
651        Ok(iblt)
652    }
653
654    fn insert(&mut self, object_id: ObjectId) {
655        self.apply_delta(object_id, 1);
656    }
657
658    fn apply_delta(&mut self, object_id: ObjectId, delta: i32) {
659        let checksum = checksum_for_bytes(object_id.as_bytes());
660        for index in bucket_indices(object_id, self.cells.len()) {
661            let cell = &mut self.cells[index];
662            cell.count += delta;
663            xor_in_place(&mut cell.key_xor, object_id.as_bytes());
664            cell.checksum_xor ^= checksum;
665        }
666    }
667
668    fn subtract_assign(&mut self, rhs: &Self) -> Result<(), IbltError> {
669        if self.cells.len() != rhs.cells.len() {
670            return Err(IbltError::ShapeMismatch {
671                left_cell_count: self.cells.len(),
672                right_cell_count: rhs.cells.len(),
673            });
674        }
675
676        for (left, right) in self.cells.iter_mut().zip(rhs.cells.iter()) {
677            left.count -= right.count;
678            xor_in_place(&mut left.key_xor, &right.key_xor);
679            left.checksum_xor ^= right.checksum_xor;
680        }
681        Ok(())
682    }
683
684    fn peel(self) -> Result<ReconciliationDelta, IbltError> {
685        let mut working = self;
686        let mut queue = VecDeque::new();
687        for (index, cell) in working.cells.iter().enumerate() {
688            if cell.is_pure() {
689                queue.push_back(index);
690            }
691        }
692
693        let mut missing_locally = BTreeSet::new();
694        let mut missing_remotely = BTreeSet::new();
695
696        while let Some(index) = queue.pop_front() {
697            let cell = working.cells[index];
698            if !cell.is_pure() {
699                continue;
700            }
701
702            let sign = cell.count.signum();
703            if sign == 0 {
704                continue;
705            }
706
707            let object_id = ObjectId::from_bytes(cell.key_xor);
708            if sign > 0 {
709                missing_locally.insert(object_id);
710            } else {
711                missing_remotely.insert(object_id);
712            }
713
714            let checksum = checksum_for_bytes(object_id.as_bytes());
715            for bucket in bucket_indices(object_id, working.cells.len()) {
716                let target = &mut working.cells[bucket];
717                target.count -= sign;
718                xor_in_place(&mut target.key_xor, object_id.as_bytes());
719                target.checksum_xor ^= checksum;
720                if target.is_pure() {
721                    queue.push_back(bucket);
722                }
723            }
724        }
725
726        if working.cells.iter().all(|cell| cell.is_zero()) {
727            Ok(ReconciliationDelta {
728                missing_locally,
729                missing_remotely,
730            })
731        } else {
732            let residual_cells = working.cells.iter().filter(|cell| !cell.is_zero()).count();
733            Err(IbltError::PeelOverflow { residual_cells })
734        }
735    }
736}
737
738/// Reconcile object-id sets via IBLT; fall back to segment-hash scan on failure.
739#[must_use]
740pub fn reconcile_object_id_sets(
741    local: &BTreeSet<ObjectId>,
742    remote: &BTreeSet<ObjectId>,
743    iblt_cell_count: usize,
744) -> ReconciliationResult {
745    debug!(
746        bead_id = PERMEATION_BEAD_ID,
747        logging_standard = PERMEATION_LOGGING_STANDARD,
748        local_count = local.len(),
749        remote_count = remote.len(),
750        iblt_cell_count = iblt_cell_count,
751        "starting object-id anti-entropy reconciliation"
752    );
753
754    let mut local_iblt = match ObjectIdIblt::from_set(local, iblt_cell_count) {
755        Ok(iblt) => iblt,
756        Err(new_error) => {
757            warn!(
758                bead_id = PERMEATION_BEAD_ID,
759                logging_standard = PERMEATION_LOGGING_STANDARD,
760                error = %new_error,
761                "invalid IBLT configuration; degrading to segment-hash fallback"
762            );
763            return segment_hash_scan_fallback(local, remote);
764        }
765    };
766    let remote_iblt = match ObjectIdIblt::from_set(remote, iblt_cell_count) {
767        Ok(iblt) => iblt,
768        Err(new_error) => {
769            warn!(
770                bead_id = PERMEATION_BEAD_ID,
771                logging_standard = PERMEATION_LOGGING_STANDARD,
772                error = %new_error,
773                "invalid remote IBLT configuration; degrading to segment-hash fallback"
774            );
775            return segment_hash_scan_fallback(local, remote);
776        }
777    };
778
779    if let Err(subtract_error) = local_iblt.subtract_assign(&remote_iblt) {
780        warn!(
781            bead_id = PERMEATION_BEAD_ID,
782            logging_standard = PERMEATION_LOGGING_STANDARD,
783            error = %subtract_error,
784            "IBLT subtraction failed; degrading to segment-hash fallback"
785        );
786        return segment_hash_scan_fallback(local, remote);
787    }
788
789    match local_iblt.peel() {
790        Ok(delta) => {
791            info!(
792                bead_id = PERMEATION_BEAD_ID,
793                logging_standard = PERMEATION_LOGGING_STANDARD,
794                missing_locally = delta.missing_locally.len(),
795                missing_remotely = delta.missing_remotely.len(),
796                "IBLT reconciliation completed"
797            );
798            ReconciliationResult {
799                delta,
800                used_fallback: false,
801            }
802        }
803        Err(peel_error) => {
804            warn!(
805                bead_id = PERMEATION_BEAD_ID,
806                logging_standard = PERMEATION_LOGGING_STANDARD,
807                error = %peel_error,
808                "IBLT peel overflow; degrading to segment-hash fallback"
809            );
810            segment_hash_scan_fallback(local, remote)
811        }
812    }
813}
814
815/// Fallback reconciliation with deterministic segment-hash scan.
816#[must_use]
817pub fn segment_hash_scan_fallback(
818    local: &BTreeSet<ObjectId>,
819    remote: &BTreeSet<ObjectId>,
820) -> ReconciliationResult {
821    let missing_locally: BTreeSet<ObjectId> = remote.difference(local).copied().collect();
822    let missing_remotely: BTreeSet<ObjectId> = local.difference(remote).copied().collect();
823
824    info!(
825        bead_id = PERMEATION_BEAD_ID,
826        logging_standard = PERMEATION_LOGGING_STANDARD,
827        missing_locally = missing_locally.len(),
828        missing_remotely = missing_remotely.len(),
829        "segment-hash fallback reconciliation completed"
830    );
831
832    ReconciliationResult {
833        delta: ReconciliationDelta {
834            missing_locally,
835            missing_remotely,
836        },
837        used_fallback: true,
838    }
839}
840
841fn parse_symbol_size_policy(raw: &str) -> Result<SymbolSizePolicy, SymbolPolicyParseError> {
842    if raw == "T=page_size" {
843        return Ok(SymbolSizePolicy::PageSize);
844    }
845
846    if let Some(inner) = raw
847        .strip_prefix("T=min(page_size,")
848        .and_then(|value| value.strip_suffix(')'))
849    {
850        let cap = inner
851            .parse::<u32>()
852            .map_err(|_| SymbolPolicyParseError::new(format!("invalid min() cap: {raw}")))?;
853        return Ok(SymbolSizePolicy::MinPageSize { cap_bytes: cap });
854    }
855
856    if let Some(bytes) = raw
857        .strip_prefix("T=")
858        .and_then(|value| value.strip_suffix('B'))
859    {
860        if let Some((lo, hi)) = bytes.split_once('-') {
861            let min_bytes = lo.parse::<u32>().map_err(|_| {
862                SymbolPolicyParseError::new(format!("invalid range lower bound: {raw}"))
863            })?;
864            let max_bytes = hi.parse::<u32>().map_err(|_| {
865                SymbolPolicyParseError::new(format!("invalid range upper bound: {raw}"))
866            })?;
867            if min_bytes > max_bytes {
868                return Err(SymbolPolicyParseError::new(format!(
869                    "range lower bound exceeds upper bound: {raw}"
870                )));
871            }
872            return Ok(SymbolSizePolicy::RangeBytes {
873                min_bytes,
874                max_bytes,
875            });
876        }
877
878        let fixed_bytes = bytes.parse::<u32>().map_err(|_| {
879            SymbolPolicyParseError::new(format!("invalid fixed symbol size: {raw}"))
880        })?;
881        return Ok(SymbolSizePolicy::FixedBytes(fixed_bytes));
882    }
883
884    Err(SymbolPolicyParseError::new(format!(
885        "unsupported symbol-size policy: {raw}"
886    )))
887}
888
889fn parse_redundancy_policy(raw: &str) -> Result<RedundancyPolicy, SymbolPolicyParseError> {
890    let normalized = raw.strip_suffix(" default").map_or(raw, str::trim).trim();
891    match normalized {
892        "policy-driven" => Ok(RedundancyPolicy::PolicyDriven),
893        "per-group" => Ok(RedundancyPolicy::PerGroup),
894        "transport-policy" => Ok(RedundancyPolicy::TransportPolicy),
895        _ => {
896            let bps = parse_percent_bps(normalized).ok_or_else(|| {
897                SymbolPolicyParseError::new(format!("invalid redundancy policy: {raw}"))
898            })?;
899            Ok(RedundancyPolicy::PercentBps(bps))
900        }
901    }
902}
903
904fn parse_percent_bps(raw: &str) -> Option<u16> {
905    let percent = raw.strip_suffix('%')?;
906    let (whole_raw, frac_raw) = percent.split_once('.').unwrap_or((percent, ""));
907    let whole = whole_raw.parse::<u16>().ok()?;
908    let frac_bps = if frac_raw.is_empty() {
909        0_u16
910    } else if frac_raw.len() == 1 {
911        frac_raw
912            .chars()
913            .next()
914            .and_then(|ch| ch.to_digit(10))
915            .and_then(|digit| u16::try_from(digit).ok())
916            .map_or(0, |digit| digit * 10)
917    } else if frac_raw.len() == 2 {
918        frac_raw.parse::<u16>().ok()?
919    } else {
920        return None;
921    };
922
923    let bps = whole.checked_mul(100)?.checked_add(frac_bps)?;
924    if bps > 10_000 { None } else { Some(bps) }
925}
926
927fn parse_fixed_policy(raw: &str) -> Option<(u32, &str)> {
928    let fixed = raw.strip_prefix("fixed:")?;
929    let (bytes_raw, rest) = fixed.split_once('B')?;
930    let bytes = bytes_raw.parse::<u32>().ok()?;
931    let redundancy = rest.split_once(", R=").map_or("0%", |(_, r)| r);
932    Some((bytes, redundancy.trim()))
933}
934
935fn xor_in_place(target: &mut [u8; 16], rhs: &[u8; 16]) {
936    for (left, right) in target.iter_mut().zip(rhs.iter()) {
937        *left ^= *right;
938    }
939}
940
941fn checksum_for_bytes(bytes: &[u8; 16]) -> u32 {
942    let mut state = 0x811C_9DC5_u32;
943    for byte in bytes {
944        state ^= u32::from(*byte);
945        state = state.wrapping_mul(0x0100_0193);
946    }
947    state
948}
949
950fn bucket_indices(object_id: ObjectId, cell_count: usize) -> [usize; IBLT_HASH_COUNT] {
951    let mut out = [0_usize; IBLT_HASH_COUNT];
952    let modulus = match u64::try_from(cell_count) {
953        Ok(value) => value.max(1),
954        Err(_) => u64::MAX,
955    };
956
957    for (slot, seed) in IBLT_HASH_SEEDS.iter().enumerate() {
958        let hash = seeded_object_hash(object_id.as_bytes(), *seed);
959        let index_u64 = hash % modulus;
960        out[slot] = usize::try_from(index_u64).unwrap_or(0);
961    }
962    out
963}
964
965fn seeded_object_hash(object_id: &[u8; 16], seed: u64) -> u64 {
966    let mut first = [0_u8; 8];
967    let mut second = [0_u8; 8];
968    first.copy_from_slice(&object_id[..8]);
969    second.copy_from_slice(&object_id[8..]);
970
971    let a = u64::from_le_bytes(first);
972    let b = u64::from_le_bytes(second);
973
974    let mut x = seed
975        ^ a.wrapping_mul(0x9E37_79B1_85EB_CA87)
976        ^ b.rotate_left(17).wrapping_mul(0xC2B2_AE3D_27D4_EB4F);
977    x ^= x >> 33;
978    x = x.wrapping_mul(0xFF51_AFD7_ED55_8CCD);
979    x ^= x >> 33;
980    x = x.wrapping_mul(0xC4CE_B9FE_1A85_EC53);
981    x ^ (x >> 33)
982}
983
984#[cfg(test)]
985mod tests {
986    use super::*;
987
988    fn oid_from_u64(value: u64) -> ObjectId {
989        let mut bytes = [0_u8; 16];
990        bytes[..8].copy_from_slice(&value.to_le_bytes());
991        bytes[8..].copy_from_slice(&(!value).to_le_bytes());
992        ObjectId::from_bytes(bytes)
993    }
994
995    fn find_entry(subsystem: &str) -> &'static PermeationEntry {
996        PERMEATION_MAP
997            .iter()
998            .find(|entry| entry.subsystem == subsystem)
999            .expect("expected subsystem entry")
1000    }
1001
1002    #[test]
1003    fn test_permeation_map_complete() {
1004        assert!(!PERMEATION_MAP.is_empty());
1005        for required in V1_REQUIRED_SUBSYSTEMS {
1006            assert!(
1007                PERMEATION_MAP
1008                    .iter()
1009                    .any(|entry| entry.subsystem == *required),
1010                "missing required subsystem: {required}"
1011            );
1012        }
1013
1014        for entry in PERMEATION_MAP {
1015            assert!(!entry.subsystem.is_empty());
1016            assert!(!entry.object_type.is_empty());
1017            assert!(!entry.symbol_size_policy.is_empty());
1018            assert!(!entry.repair_story.is_empty());
1019        }
1020    }
1021
1022    #[test]
1023    fn test_permeation_map_no_gaps() {
1024        let failures = audit_permeation_map();
1025        assert!(failures.is_empty(), "unexpected gaps: {failures:#?}");
1026    }
1027
1028    #[test]
1029    fn test_permeation_map_no_duplicates() {
1030        let mut seen = BTreeSet::new();
1031        for entry in PERMEATION_MAP {
1032            assert!(
1033                seen.insert((entry.plane, entry.subsystem)),
1034                "duplicate subsystem '{}' in plane {:?}",
1035                entry.subsystem,
1036                entry.plane
1037            );
1038        }
1039    }
1040
1041    #[test]
1042    fn test_permeation_map_symbol_policy_parseable() {
1043        for page_size in [1024_u32, 4096, 65536] {
1044            for entry in PERMEATION_MAP {
1045                let parsed = parse_symbol_policy(entry.symbol_size_policy)
1046                    .expect("symbol policy must be parseable");
1047                let resolved = parsed.resolve(page_size, PolicyResolutionDefaults::default());
1048                assert!(resolved.symbol_size_bytes >= 16);
1049                assert!(resolved.redundancy_bps <= 10_000);
1050            }
1051        }
1052    }
1053
1054    #[test]
1055    fn test_permeation_map_commit_capsule_policy() {
1056        let entry = find_entry("Commits/CapsuleProof");
1057        let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1058        let defaults = PolicyResolutionDefaults::default();
1059
1060        let resolved_4096 = parsed.resolve(4096, defaults);
1061        assert_eq!(resolved_4096.symbol_size_bytes, 4096);
1062        assert_eq!(resolved_4096.redundancy_bps, 2_000);
1063
1064        let resolved_65536 = parsed.resolve(65536, defaults);
1065        assert_eq!(resolved_65536.symbol_size_bytes, 4096);
1066        assert_eq!(resolved_65536.redundancy_bps, 2_000);
1067    }
1068
1069    #[test]
1070    fn test_permeation_map_page_history_policy() {
1071        let entry = find_entry("Page storage");
1072        let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1073        let resolved = parsed.resolve(4096, PolicyResolutionDefaults::default());
1074        assert_eq!(resolved.symbol_size_bytes, 4096);
1075        assert_eq!(resolved.redundancy_bps, 2_000);
1076    }
1077
1078    #[test]
1079    fn test_permeation_map_marker_record_policy() {
1080        let entry = find_entry("Commits/MarkerStream");
1081        let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1082        let resolved = parsed.resolve(4096, PolicyResolutionDefaults::default());
1083        assert_eq!(resolved.symbol_size_bytes, 88);
1084        assert_eq!(resolved.redundancy_bps, 0);
1085        assert!(!resolved.fountain_coded);
1086    }
1087
1088    #[test]
1089    fn test_iblt_set_reconciliation() {
1090        let local: BTreeSet<ObjectId> = (0_u64..100).map(oid_from_u64).collect();
1091        let remote: BTreeSet<ObjectId> = (5_u64..105).map(oid_from_u64).collect();
1092        let result = reconcile_object_id_sets(&local, &remote, 128);
1093
1094        assert!(!result.used_fallback, "expected IBLT to peel successfully");
1095        assert_eq!(result.delta.missing_locally.len(), 5);
1096        assert_eq!(result.delta.missing_remotely.len(), 5);
1097        assert_eq!(
1098            result.delta.missing_locally.len() + result.delta.missing_remotely.len(),
1099            10
1100        );
1101    }
1102
1103    #[test]
1104    fn test_iblt_fallback_on_overflow() {
1105        let local: BTreeSet<ObjectId> = (0_u64..300).map(oid_from_u64).collect();
1106        let remote: BTreeSet<ObjectId> = (300_u64..600).map(oid_from_u64).collect();
1107        let result = reconcile_object_id_sets(&local, &remote, 8);
1108
1109        assert!(result.used_fallback, "expected overflow fallback");
1110        assert_eq!(result.delta.missing_locally.len(), 300);
1111        assert_eq!(result.delta.missing_remotely.len(), 300);
1112    }
1113
1114    #[test]
1115    fn test_audit_no_gaps() {
1116        let failures = audit_permeation_map();
1117        assert!(
1118            failures.is_empty(),
1119            "expected no audit failures: {failures:#?}"
1120        );
1121    }
1122
1123    #[test]
1124    fn test_audit_new_subsystem_requires_entry() {
1125        let mut required = V1_REQUIRED_SUBSYSTEMS.to_vec();
1126        required.push("Future storage lane");
1127        let failures = audit_permeation_entries(
1128            PERMEATION_MAP,
1129            &required,
1130            4096,
1131            PolicyResolutionDefaults::default(),
1132        );
1133        assert!(
1134            failures
1135                .iter()
1136                .any(|failure| failure.kind == AuditFailureKind::MissingEntry
1137                    && failure.subsystem == "Future storage lane")
1138        );
1139    }
1140
1141    #[test]
1142    fn test_bd_1hi_27_unit_compliance_gate() {
1143        assert_eq!(PERMEATION_BEAD_ID, "bd-1hi.27");
1144        assert_eq!(PERMEATION_LOGGING_STANDARD, "bd-1fpm");
1145        assert!(audit_permeation_map().is_empty());
1146    }
1147
1148    #[test]
1149    fn prop_bd_1hi_27_structure_compliance() {
1150        let required_planes = [
1151            Plane::Durability,
1152            Plane::Concurrency,
1153            Plane::Replication,
1154            Plane::Observability,
1155        ];
1156        for plane in required_planes {
1157            assert!(
1158                PERMEATION_MAP.iter().any(|entry| entry.plane == plane),
1159                "missing plane entry: {plane:?}"
1160            );
1161        }
1162
1163        for page_size in [512_u32, 1024, 2048, 4096, 8192, 16384, 65536] {
1164            for entry in PERMEATION_MAP {
1165                let parsed = parse_symbol_policy(entry.symbol_size_policy).expect("parse");
1166                let resolved = parsed.resolve(page_size, PolicyResolutionDefaults::default());
1167                assert!(resolved.symbol_size_bytes >= 16);
1168                assert!(resolved.redundancy_bps <= 10_000);
1169            }
1170        }
1171    }
1172
1173    #[test]
1174    fn test_e2e_bd_1hi_27_compliance() {
1175        let failures = audit_permeation_map();
1176        assert!(failures.is_empty(), "audit should pass in e2e");
1177
1178        let local: BTreeSet<ObjectId> = (0_u64..64).map(oid_from_u64).collect();
1179        let remote: BTreeSet<ObjectId> = (32_u64..96).map(oid_from_u64).collect();
1180        let iblt_ok = reconcile_object_id_sets(&local, &remote, 192);
1181        assert!(!iblt_ok.used_fallback);
1182        assert_eq!(
1183            iblt_ok.delta.missing_locally.len() + iblt_ok.delta.missing_remotely.len(),
1184            64
1185        );
1186
1187        let overflow = reconcile_object_id_sets(&local, &remote, 4);
1188        assert!(overflow.used_fallback);
1189        let artifact = format!(
1190            "bead={} log={} iblt_ok={} fallback={}",
1191            PERMEATION_BEAD_ID,
1192            PERMEATION_LOGGING_STANDARD,
1193            !iblt_ok.used_fallback,
1194            overflow.used_fallback
1195        );
1196        assert!(artifact.contains("bd-1hi.27"));
1197        assert!(artifact.contains("bd-1fpm"));
1198    }
1199}