use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone)]
pub struct Section {
pub txn_id: u64,
pub observations: HashMap<u64, u64>,
}
#[derive(Debug, Clone)]
pub struct SheafObstruction {
pub page: u64,
pub txn_a: u64,
pub version_a: u64,
pub txn_b: u64,
pub version_b: u64,
}
impl fmt::Display for SheafObstruction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"page={} txn_a={} ver_a={} txn_b={} ver_b={}",
self.page, self.txn_a, self.version_a, self.txn_b, self.version_b
)
}
}
#[derive(Debug, Clone)]
pub struct SheafResult {
pub consistent: bool,
pub obstructions: Vec<SheafObstruction>,
}
impl SheafResult {
#[must_use]
pub fn is_consistent(&self) -> bool {
self.consistent
}
}
#[must_use]
pub fn check_sheaf_consistency(
sections: &[Section],
version_order: Option<&dyn Fn(u64, u64) -> bool>,
) -> SheafResult {
let mut obstructions = Vec::new();
for i in 0..sections.len() {
for j in (i + 1)..sections.len() {
let a = §ions[i];
let b = §ions[j];
let mut a_le_b = true;
let mut b_le_a = true;
let mut overlap_pages = Vec::new();
for (&page, &ver_a) in &a.observations {
if let Some(&ver_b) = b.observations.get(&page) {
overlap_pages.push((page, ver_a, ver_b));
if let Some(order_fn) = version_order {
if !order_fn(ver_a, ver_b) {
a_le_b = false;
}
if !order_fn(ver_b, ver_a) {
b_le_a = false;
}
} else if ver_a != ver_b {
a_le_b = false;
b_le_a = false;
}
}
}
if !overlap_pages.is_empty() && !a_le_b && !b_le_a {
for (page, ver_a, ver_b) in overlap_pages {
let disagrees = if let Some(order_fn) = version_order {
!order_fn(ver_a, ver_b) || !order_fn(ver_b, ver_a)
} else {
ver_a != ver_b
};
if disagrees {
obstructions.push(SheafObstruction {
page,
txn_a: a.txn_id,
version_a: ver_a,
txn_b: b.txn_id,
version_b: ver_b,
});
}
}
}
}
}
SheafResult {
consistent: obstructions.is_empty(),
obstructions,
}
}
#[must_use]
pub fn check_sheaf_consistency_with_chains<S: std::hash::BuildHasher>(
sections: &[Section],
global_version_chains: &HashMap<u64, Vec<u64>, S>,
) -> SheafResult {
let mut obstructions = Vec::new();
for i in 0..sections.len() {
for j in (i + 1)..sections.len() {
let a = §ions[i];
let b = §ions[j];
let mut a_le_b = true;
let mut b_le_a = true;
let mut overlap_pages = Vec::new();
for (&page, &ver_a) in &a.observations {
let Some(&ver_b) = b.observations.get(&page) else {
continue;
};
overlap_pages.push((page, ver_a, ver_b));
if ver_a == ver_b {
continue;
}
if let Some(chain) = global_version_chains.get(&page) {
let pos_a = chain.iter().position(|&v| v == ver_a);
let pos_b = chain.iter().position(|&v| v == ver_b);
match (pos_a, pos_b) {
(Some(pa), Some(pb)) => {
if pa > pb {
a_le_b = false;
}
if pb > pa {
b_le_a = false;
}
}
_ => {
a_le_b = false;
b_le_a = false;
}
}
} else {
a_le_b = false;
b_le_a = false;
}
}
if !overlap_pages.is_empty() && !a_le_b && !b_le_a {
for (page, ver_a, ver_b) in overlap_pages {
if ver_a != ver_b {
obstructions.push(SheafObstruction {
page,
txn_a: a.txn_id,
version_a: ver_a,
txn_b: b.txn_id,
version_b: ver_b,
});
}
}
}
}
}
SheafResult {
consistent: obstructions.is_empty(),
obstructions,
}
}
#[derive(Debug, Clone, Copy)]
pub struct ConformalCalibratorConfig {
pub alpha: f64,
pub min_calibration_samples: usize,
}
impl Default for ConformalCalibratorConfig {
fn default() -> Self {
Self {
alpha: 0.05,
min_calibration_samples: 50,
}
}
}
#[derive(Debug, Clone)]
pub struct InvariantScore {
pub invariant: String,
pub score: f64,
}
#[derive(Debug, Clone)]
pub struct OracleReport {
pub scores: Vec<InvariantScore>,
}
#[derive(Debug, Clone)]
pub struct PredictionSetEntry {
pub invariant: String,
pub score: f64,
pub threshold: f64,
pub conforming: bool,
}
#[derive(Debug, Clone)]
pub struct ConformalPrediction {
pub prediction_sets: Vec<PredictionSetEntry>,
}
#[derive(Debug, Clone)]
pub struct ConformalOracleCalibrator {
config: ConformalCalibratorConfig,
calibration_scores: HashMap<String, Vec<f64>>,
sample_count: usize,
}
impl ConformalOracleCalibrator {
#[must_use]
pub fn new(config: ConformalCalibratorConfig) -> Self {
Self {
config,
calibration_scores: HashMap::new(),
sample_count: 0,
}
}
pub fn calibrate(&mut self, report: &OracleReport) {
self.sample_count += 1;
for score in &report.scores {
self.calibration_scores
.entry(score.invariant.clone())
.or_default()
.push(score.score);
}
}
#[must_use]
pub fn is_calibrated(&self) -> bool {
self.sample_count >= self.config.min_calibration_samples
}
#[must_use]
pub fn sample_count(&self) -> usize {
self.sample_count
}
#[must_use]
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
pub fn predict(&self, report: &OracleReport) -> Option<ConformalPrediction> {
if !self.is_calibrated() {
return None;
}
let mut entries = Vec::with_capacity(report.scores.len());
for score in &report.scores {
let threshold = self.compute_threshold(&score.invariant);
let conforming = score.score <= threshold;
entries.push(PredictionSetEntry {
invariant: score.invariant.clone(),
score: score.score,
threshold,
conforming,
});
}
Some(ConformalPrediction {
prediction_sets: entries,
})
}
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
fn compute_threshold(&self, invariant: &str) -> f64 {
let Some(scores) = self.calibration_scores.get(invariant) else {
return f64::INFINITY; };
if scores.is_empty() {
return f64::INFINITY;
}
let mut sorted = scores.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let n = sorted.len();
let q_idx = ((1.0 - self.config.alpha) * (n + 1) as f64).ceil() as usize;
let idx = q_idx.min(n).saturating_sub(1);
sorted[idx]
}
}
#[derive(Debug, Clone, Copy)]
pub struct OpportunityScore {
pub impact: f64,
pub confidence: f64,
pub effort: f64,
}
impl OpportunityScore {
#[must_use]
pub fn score(&self) -> f64 {
let effort = self.effort.max(f64::EPSILON);
self.impact * self.confidence / effort
}
#[must_use]
pub fn passes_gate(&self) -> bool {
self.score() >= 2.0
}
}
impl fmt::Display for OpportunityScore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"impact={:.1} confidence={:.2} effort={:.1} score={:.2} {}",
self.impact,
self.confidence,
self.effort,
self.score(),
if self.passes_gate() { "PASS" } else { "REJECT" }
)
}
}
#[cfg(test)]
mod tests {
use super::*;
const BEAD_ID: &str = "bd-3go.6";
#[test]
fn test_sheaf_consistent_non_overlapping_txns() {
let sections = vec![
Section {
txn_id: 1,
observations: std::iter::once((10, 100)).collect(),
},
Section {
txn_id: 2,
observations: std::iter::once((20, 200)).collect(),
},
];
let result = check_sheaf_consistency(§ions, None);
assert!(
result.is_consistent(),
"bead_id={BEAD_ID} non_overlapping_consistent"
);
assert!(result.obstructions.is_empty());
}
#[test]
fn test_sheaf_consistent_overlapping_agree() {
let sections = vec![
Section {
txn_id: 1,
observations: [(10, 100), (20, 200)].into_iter().collect(),
},
Section {
txn_id: 2,
observations: [(10, 100), (30, 300)].into_iter().collect(),
},
];
let result = check_sheaf_consistency(§ions, None);
assert!(
result.is_consistent(),
"bead_id={BEAD_ID} overlapping_agree"
);
}
#[test]
fn test_sheaf_detects_inconsistency() {
let sections = vec![
Section {
txn_id: 1,
observations: std::iter::once((10, 100)).collect(),
},
Section {
txn_id: 2,
observations: std::iter::once((10, 999)).collect(),
},
];
let result = check_sheaf_consistency(§ions, None);
assert!(
!result.is_consistent(),
"bead_id={BEAD_ID} detects_inconsistency"
);
assert_eq!(result.obstructions.len(), 1);
let obs = &result.obstructions[0];
assert_eq!(obs.page, 10);
assert!(
!obs.to_string().is_empty(),
"bead_id={BEAD_ID} obstruction_display"
);
}
#[test]
fn test_sheaf_ordered_versions_consistent() {
let sections = vec![
Section {
txn_id: 1,
observations: std::iter::once((10, 100)).collect(),
},
Section {
txn_id: 2,
observations: std::iter::once((10, 200)).collect(),
},
];
let order_fn = |a: u64, b: u64| a <= b;
let result = check_sheaf_consistency(§ions, Some(&order_fn));
assert!(
result.is_consistent(),
"bead_id={BEAD_ID} ordered_versions_consistent"
);
}
#[test]
fn test_sheaf_consistency_with_global_version_chain() {
let sections = vec![
Section {
txn_id: 1,
observations: std::iter::once((10, 100)).collect(),
},
Section {
txn_id: 2,
observations: std::iter::once((10, 200)).collect(),
},
];
let chains = HashMap::from([(10_u64, vec![50_u64, 100_u64, 200_u64, 300_u64])]);
let result = check_sheaf_consistency_with_chains(§ions, &chains);
assert!(
result.is_consistent(),
"bead_id={BEAD_ID} explicit_chain_consistent"
);
}
#[test]
fn test_sheaf_consistency_with_chains_flags_unknown_version() {
let sections = vec![
Section {
txn_id: 1,
observations: std::iter::once((10, 100)).collect(),
},
Section {
txn_id: 2,
observations: std::iter::once((10, 9_999)).collect(),
},
];
let chains = HashMap::from([(10_u64, vec![50_u64, 100_u64, 200_u64, 300_u64])]);
let result = check_sheaf_consistency_with_chains(§ions, &chains);
assert!(
!result.is_consistent(),
"bead_id={BEAD_ID} explicit_chain_unknown_version"
);
assert_eq!(result.obstructions.len(), 1);
}
#[test]
fn test_conformal_calibrator_min_samples() {
let cal = ConformalOracleCalibrator::new(ConformalCalibratorConfig {
min_calibration_samples: 50,
..Default::default()
});
let report = OracleReport {
scores: vec![InvariantScore {
invariant: "inv1".into(),
score: 1.0,
}],
};
assert!(
cal.predict(&report).is_none(),
"bead_id={BEAD_ID} min_samples_none"
);
assert!(!cal.is_calibrated());
}
#[test]
fn test_conformal_calibrator_detects_anomaly() {
let mut cal = ConformalOracleCalibrator::new(ConformalCalibratorConfig::default());
for i in 0..100_i32 {
let score = f64::from(i) / 100.0;
cal.calibrate(&OracleReport {
scores: vec![InvariantScore {
invariant: "durability".into(),
score,
}],
});
}
assert!(cal.is_calibrated());
let normal_pred = cal
.predict(&OracleReport {
scores: vec![InvariantScore {
invariant: "durability".into(),
score: 0.5,
}],
})
.expect("calibrated");
assert!(
normal_pred.prediction_sets[0].conforming,
"bead_id={BEAD_ID} normal_conforming"
);
let anomalous_pred = cal
.predict(&OracleReport {
scores: vec![InvariantScore {
invariant: "durability".into(),
score: 100.0,
}],
})
.expect("calibrated");
assert!(
!anomalous_pred.prediction_sets[0].conforming,
"bead_id={BEAD_ID} anomaly_detected: score=100 threshold={}",
anomalous_pred.prediction_sets[0].threshold
);
}
#[test]
fn test_conformal_calibrator_coverage_guarantee() {
let mut cal = ConformalOracleCalibrator::new(ConformalCalibratorConfig {
alpha: 0.05,
min_calibration_samples: 50,
});
for i in 0..100_i32 {
cal.calibrate(&OracleReport {
scores: vec![InvariantScore {
invariant: "inv".into(),
score: f64::from(i) / 100.0,
}],
});
}
let mut conforming_count = 0usize;
for i in 100..300_i32 {
let score = f64::from((i * 37 + 13) % 100) / 100.0;
let pred = cal
.predict(&OracleReport {
scores: vec![InvariantScore {
invariant: "inv".into(),
score,
}],
})
.expect("calibrated");
if pred.prediction_sets[0].conforming {
conforming_count += 1;
}
}
#[allow(clippy::cast_precision_loss)]
let rate = conforming_count as f64 / 200.0;
assert!(
rate >= 0.90, "bead_id={BEAD_ID} coverage_rate={rate}"
);
}
#[test]
fn test_opportunity_score_gate() {
let good = OpportunityScore {
impact: 3.0,
confidence: 0.9,
effort: 1.0,
};
assert!(
good.passes_gate(),
"bead_id={BEAD_ID} good_score={:.2}",
good.score()
);
assert!(good.score() >= 2.0);
let bad = OpportunityScore {
impact: 1.5,
confidence: 0.5,
effort: 1.0,
};
assert!(
!bad.passes_gate(),
"bead_id={BEAD_ID} bad_score={:.2}",
bad.score()
);
assert!(bad.score() < 2.0);
let edge = OpportunityScore {
impact: 2.0,
confidence: 1.0,
effort: 1.0,
};
assert!(
edge.passes_gate(),
"bead_id={BEAD_ID} edge_score={:.2}",
edge.score()
);
assert!(!good.to_string().is_empty(), "bead_id={BEAD_ID} display");
}
}