#![forbid(unsafe_code)]
#![warn(rustdoc::broken_intra_doc_links)]
use std::{
borrow::Cow,
time::{SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use thiserror::Error;
#[cfg(feature = "asupersync")]
pub mod asupersync;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RuntimeMode {
Strict,
Hardened,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DecisionAction {
Allow,
Reject,
Repair,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IssueKind {
UnknownFeature,
MalformedInput,
JoinCardinality,
PolicyOverride,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompatibilityIssue {
pub kind: IssueKind,
pub subject: String,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EvidenceTerm {
pub name: Cow<'static, str>,
pub log_likelihood_if_compatible: f64,
pub log_likelihood_if_incompatible: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct LossMatrix {
pub allow_if_compatible: f64,
pub allow_if_incompatible: f64,
pub reject_if_compatible: f64,
pub reject_if_incompatible: f64,
pub repair_if_compatible: f64,
pub repair_if_incompatible: f64,
}
impl Default for LossMatrix {
fn default() -> Self {
Self {
allow_if_compatible: 0.0,
allow_if_incompatible: 100.0,
reject_if_compatible: 6.0,
reject_if_incompatible: 0.5,
repair_if_compatible: 2.0,
repair_if_incompatible: 3.0,
}
}
}
const UNKNOWN_FEATURE_PRIOR: f64 = 0.25;
const JOIN_ADMISSION_PRIOR: f64 = 0.6;
const PRIOR_COMPATIBLE_EPSILON: f64 = 1e-10;
const UNKNOWN_FEATURE_EVIDENCE: [EvidenceTerm; 2] = [
EvidenceTerm {
name: Cow::Borrowed("compatibility_allowlist_miss"),
log_likelihood_if_compatible: -3.5,
log_likelihood_if_incompatible: -0.2,
},
EvidenceTerm {
name: Cow::Borrowed("unknown_protocol_field"),
log_likelihood_if_compatible: -2.0,
log_likelihood_if_incompatible: -0.1,
},
];
const JOIN_ADMISSION_EVIDENCE_WITHIN_CAP: [EvidenceTerm; 2] = [
EvidenceTerm {
name: Cow::Borrowed("estimator_overflow_risk"),
log_likelihood_if_compatible: -0.3,
log_likelihood_if_incompatible: -1.2,
},
EvidenceTerm {
name: Cow::Borrowed("memory_budget_signal"),
log_likelihood_if_compatible: -0.4,
log_likelihood_if_incompatible: -1.5,
},
];
const JOIN_ADMISSION_EVIDENCE_OVER_CAP: [EvidenceTerm; 2] = [
EvidenceTerm {
name: Cow::Borrowed("estimator_overflow_risk"),
log_likelihood_if_compatible: -2.8,
log_likelihood_if_incompatible: -0.1,
},
EvidenceTerm {
name: Cow::Borrowed("memory_budget_signal"),
log_likelihood_if_compatible: -2.2,
log_likelihood_if_incompatible: -0.2,
},
];
const JOIN_ADMISSION_LOSS: LossMatrix = LossMatrix {
allow_if_compatible: 0.0,
allow_if_incompatible: 130.0,
reject_if_compatible: 5.0,
reject_if_incompatible: 0.5,
repair_if_compatible: 1.5,
repair_if_incompatible: 3.0,
};
const DEFAULT_CONFORMAL_ALPHA: f64 = 0.1;
const MIN_CONFORMAL_ALPHA: f64 = 0.01;
const MAX_CONFORMAL_ALPHA: f64 = 0.5;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DecisionMetrics {
pub posterior_compatible: f64,
pub bayes_factor_compatible_over_incompatible: f64,
pub expected_loss_allow: f64,
pub expected_loss_reject: f64,
pub expected_loss_repair: f64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DecisionRecord {
pub ts_unix_ms: u64,
pub mode: RuntimeMode,
pub action: DecisionAction,
pub issue: CompatibilityIssue,
pub prior_compatible: f64,
pub metrics: DecisionMetrics,
pub evidence: Vec<EvidenceTerm>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SemanticIndexIdentity {
pub role: String,
pub len: usize,
pub has_duplicates: bool,
pub fingerprint: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SemanticWitnessRecord {
pub ts_unix_ms: u64,
pub operation: String,
pub materialization_reason: String,
pub alignment_mode: String,
pub input_index_identity: Vec<SemanticIndexIdentity>,
pub output_index_identity: SemanticIndexIdentity,
pub null_nan_policy: String,
pub output_ordering_contract: String,
}
impl SemanticWitnessRecord {
#[must_use]
pub fn new(
operation: impl Into<String>,
materialization_reason: impl Into<String>,
alignment_mode: impl Into<String>,
input_index_identity: Vec<SemanticIndexIdentity>,
output_index_identity: SemanticIndexIdentity,
null_nan_policy: impl Into<String>,
output_ordering_contract: impl Into<String>,
) -> Self {
Self {
ts_unix_ms: now_unix_ms().unwrap_or_default(),
operation: operation.into(),
materialization_reason: materialization_reason.into(),
alignment_mode: alignment_mode.into(),
input_index_identity,
output_index_identity,
null_nan_policy: null_nan_policy.into(),
output_ordering_contract: output_ordering_contract.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GalaxyBrainCard {
pub title: String,
pub equation: String,
pub substitution: String,
pub intuition: String,
}
impl GalaxyBrainCard {
#[must_use]
pub fn render_plain(&self) -> String {
format!(
"[{}]\n{}\n{}\n{}",
self.title, self.equation, self.substitution, self.intuition
)
}
}
#[must_use]
pub fn decision_to_card(record: &DecisionRecord) -> GalaxyBrainCard {
GalaxyBrainCard {
title: format!("{}::{:?}", record.issue.subject, record.action),
equation: "argmin_a Σ_s L(a,s) P(s|evidence)".to_owned(),
substitution: format!(
"P(compatible|e)={:.4}, E[allow]={:.4}, E[reject]={:.4}, E[repair]={:.4}",
record.metrics.posterior_compatible,
record.metrics.expected_loss_allow,
record.metrics.expected_loss_reject,
record.metrics.expected_loss_repair
),
intuition: "Lower expected loss wins; strict mode may still force fail-closed.".to_owned(),
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EvidenceLedger {
records: Vec<DecisionRecord>,
#[serde(default)]
semantic_witnesses: Vec<SemanticWitnessRecord>,
#[serde(skip)]
record_semantic_witnesses: bool,
}
impl Default for EvidenceLedger {
fn default() -> Self {
Self::new()
}
}
impl EvidenceLedger {
#[must_use]
pub fn new() -> Self {
Self {
records: Vec::new(),
semantic_witnesses: Vec::new(),
record_semantic_witnesses: true,
}
}
#[must_use]
pub fn without_semantic_witnesses(mut self) -> Self {
self.record_semantic_witnesses = false;
self
}
#[must_use]
pub fn records_semantic_witnesses(&self) -> bool {
self.record_semantic_witnesses
}
pub fn push(&mut self, record: DecisionRecord) {
self.records.push(record);
}
pub fn push_semantic_witness(&mut self, record: SemanticWitnessRecord) {
self.semantic_witnesses.push(record);
}
#[must_use]
pub fn records(&self) -> &[DecisionRecord] {
&self.records
}
#[must_use]
pub fn semantic_witnesses(&self) -> &[SemanticWitnessRecord] {
&self.semantic_witnesses
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimePolicy {
pub mode: RuntimeMode,
pub fail_closed_unknown_features: bool,
pub hardened_join_row_cap: Option<usize>,
}
impl RuntimePolicy {
#[must_use]
pub fn strict() -> Self {
Self {
mode: RuntimeMode::Strict,
fail_closed_unknown_features: true,
hardened_join_row_cap: None,
}
}
#[must_use]
pub fn hardened(join_row_cap: Option<usize>) -> Self {
Self {
mode: RuntimeMode::Hardened,
fail_closed_unknown_features: false,
hardened_join_row_cap: join_row_cap,
}
}
pub fn decide_unknown_feature(
&self,
subject: impl Into<String>,
detail: impl Into<String>,
ledger: &mut EvidenceLedger,
) -> DecisionAction {
let issue = CompatibilityIssue {
kind: IssueKind::UnknownFeature,
subject: subject.into(),
detail: detail.into(),
};
let mut record = decide(
self.mode,
issue,
UNKNOWN_FEATURE_PRIOR,
LossMatrix::default(),
UNKNOWN_FEATURE_EVIDENCE.to_vec(),
);
if self.fail_closed_unknown_features {
record.action = DecisionAction::Reject;
}
let action = record.action;
ledger.push(record);
action
}
pub fn decide_join_admission(
&self,
estimated_rows: usize,
ledger: &mut EvidenceLedger,
) -> DecisionAction {
let issue = CompatibilityIssue {
kind: IssueKind::JoinCardinality,
subject: "join_estimator".to_owned(),
detail: format!("estimated_rows={estimated_rows}"),
};
let cap = self.hardened_join_row_cap.unwrap_or(usize::MAX);
let evidence = if estimated_rows <= cap {
JOIN_ADMISSION_EVIDENCE_WITHIN_CAP.to_vec()
} else {
JOIN_ADMISSION_EVIDENCE_OVER_CAP.to_vec()
};
let mut record = decide(
self.mode,
issue,
JOIN_ADMISSION_PRIOR,
JOIN_ADMISSION_LOSS,
evidence,
);
if matches!(self.mode, RuntimeMode::Hardened) && estimated_rows > cap {
record.action = DecisionAction::Repair;
}
let action = record.action;
ledger.push(record);
action
}
}
impl Default for RuntimePolicy {
fn default() -> Self {
Self::strict()
}
}
#[derive(Debug, Error)]
pub enum RuntimeError {
#[error("system clock is before UNIX_EPOCH")]
ClockSkew,
}
fn now_unix_ms() -> Result<u64, RuntimeError> {
let ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|_| RuntimeError::ClockSkew)?
.as_millis();
Ok(ms as u64)
}
fn normalize_prior_compatible(prior_compatible: f64) -> f64 {
if !prior_compatible.is_finite() {
return 0.5;
}
prior_compatible.clamp(PRIOR_COMPATIBLE_EPSILON, 1.0 - PRIOR_COMPATIBLE_EPSILON)
}
fn decide(
mode: RuntimeMode,
issue: CompatibilityIssue,
prior_compatible: f64,
loss: LossMatrix,
evidence: Vec<EvidenceTerm>,
) -> DecisionRecord {
let prior_compatible = normalize_prior_compatible(prior_compatible);
let log_odds_prior = (prior_compatible / (1.0 - prior_compatible)).ln();
let llr_sum: f64 = evidence
.iter()
.map(|term| term.log_likelihood_if_compatible - term.log_likelihood_if_incompatible)
.sum();
let log_odds_post = log_odds_prior + llr_sum;
let posterior_compatible = 1.0 / (1.0 + (-log_odds_post).exp());
let posterior_incompatible = 1.0 - posterior_compatible;
let expected_loss_allow = loss.allow_if_compatible * posterior_compatible
+ loss.allow_if_incompatible * posterior_incompatible;
let expected_loss_reject = loss.reject_if_compatible * posterior_compatible
+ loss.reject_if_incompatible * posterior_incompatible;
let expected_loss_repair = loss.repair_if_compatible * posterior_compatible
+ loss.repair_if_incompatible * posterior_incompatible;
let mut best_action = DecisionAction::Allow;
let mut best_loss = expected_loss_allow;
if expected_loss_repair < best_loss {
best_action = DecisionAction::Repair;
best_loss = expected_loss_repair;
}
if expected_loss_reject < best_loss {
best_action = DecisionAction::Reject;
}
DecisionRecord {
ts_unix_ms: now_unix_ms().unwrap_or_default(),
mode,
action: best_action,
issue,
prior_compatible,
metrics: DecisionMetrics {
posterior_compatible,
bayes_factor_compatible_over_incompatible: llr_sum.exp(),
expected_loss_allow,
expected_loss_reject,
expected_loss_repair,
},
evidence,
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RaptorQEnvelope {
pub artifact_id: String,
pub artifact_type: String,
pub source_hash: String,
pub raptorq: RaptorQMetadata,
pub scrub: ScrubStatus,
pub decode_proofs: Vec<DecodeProof>,
}
pub const MAX_DECODE_PROOFS: usize = 1_000;
pub const DEFAULT_RAPTORQ_SYMBOL_BYTES: usize = 1_024;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RaptorQMetadata {
pub k: u32,
pub repair_symbols: u32,
pub overhead_ratio: f64,
pub symbol_hashes: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ScrubStatus {
pub last_ok_unix_ms: u64,
pub status: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DecodeProof {
pub ts_unix_ms: u64,
pub reason: String,
pub recovered_blocks: u32,
pub proof_hash: String,
}
impl RaptorQEnvelope {
#[must_use]
pub fn from_source_bytes(
artifact_id: impl Into<String>,
artifact_type: impl Into<String>,
source_bytes: &[u8],
repair_symbols: u32,
) -> Self {
let symbol_hashes: Vec<String> = source_bytes
.chunks(DEFAULT_RAPTORQ_SYMBOL_BYTES)
.map(|chunk| format!("sha256:{}", sha256_hex(chunk)))
.collect();
let k = u32::try_from(symbol_hashes.len()).unwrap_or(u32::MAX);
let overhead_ratio = if k == 0 {
0.0
} else {
f64::from(repair_symbols) / f64::from(k)
};
Self {
artifact_id: artifact_id.into(),
artifact_type: artifact_type.into(),
source_hash: format!("sha256:{}", sha256_hex(source_bytes)),
raptorq: RaptorQMetadata {
k,
repair_symbols,
overhead_ratio,
symbol_hashes,
},
scrub: ScrubStatus {
last_ok_unix_ms: now_unix_ms().unwrap_or_default(),
status: "ok".to_owned(),
},
decode_proofs: Vec::new(),
}
}
pub fn push_decode_proof_capped(&mut self, proof: DecodeProof) {
if self.decode_proofs.len() >= MAX_DECODE_PROOFS {
let overflow = self.decode_proofs.len() + 1 - MAX_DECODE_PROOFS;
self.decode_proofs.drain(0..overflow);
}
self.decode_proofs.push(proof);
}
}
#[must_use]
pub fn semantic_fingerprint_bytes(bytes: &[u8]) -> String {
format!("sha256:{}", sha256_hex(bytes))
}
#[derive(Debug)]
pub struct SemanticFingerprintBuilder {
hasher: Sha256,
}
impl Default for SemanticFingerprintBuilder {
fn default() -> Self {
Self::new()
}
}
impl SemanticFingerprintBuilder {
#[must_use]
pub fn new() -> Self {
Self {
hasher: Sha256::new(),
}
}
pub fn update(&mut self, bytes: &[u8]) {
self.hasher.update(bytes);
}
#[must_use]
pub fn finish(self) -> String {
format!("sha256:{}", sha256_digest_hex(self.hasher.finalize()))
}
}
fn sha256_hex(bytes: &[u8]) -> String {
let digest = Sha256::digest(bytes);
sha256_digest_hex(digest)
}
fn sha256_digest_hex(digest: impl IntoIterator<Item = u8>) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut hex = String::with_capacity(64);
for byte in digest {
hex.push(char::from(HEX[usize::from(byte >> 4)]));
hex.push(char::from(HEX[usize::from(byte & 0x0f)]));
}
hex
}
fn nonconformity_score(record: &DecisionRecord) -> f64 {
let p = record
.metrics
.posterior_compatible
.clamp(1e-15, 1.0 - 1e-15);
(p / (1.0 - p)).ln().abs()
}
fn normalize_conformal_alpha(alpha: f64) -> f64 {
if alpha.is_finite() {
alpha.clamp(MIN_CONFORMAL_ALPHA, MAX_CONFORMAL_ALPHA)
} else {
DEFAULT_CONFORMAL_ALPHA
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ConformalPredictionSet {
pub quantile_threshold: f64,
pub current_score: f64,
pub bayesian_action_in_set: bool,
pub admissible_actions: Vec<DecisionAction>,
pub empirical_coverage: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConformalGuard {
scores: Vec<f64>,
window_size: usize,
alpha: f64,
in_set_count: usize,
total_count: usize,
}
impl ConformalGuard {
#[must_use]
pub fn new(window_size: usize, alpha: f64) -> Self {
let window_size = window_size.max(1);
Self {
scores: Vec::with_capacity(window_size),
window_size,
alpha: normalize_conformal_alpha(alpha),
in_set_count: 0,
total_count: 0,
}
}
#[must_use]
pub fn default_config() -> Self {
Self::new(1000, 0.1)
}
#[must_use]
pub fn conformal_quantile(&self) -> Option<f64> {
let mut sorted: Vec<f64> = self
.scores
.iter()
.copied()
.filter(|score| score.is_finite())
.collect();
if sorted.len() < 2 {
return None;
}
sorted.sort_by(f64::total_cmp);
let n = sorted.len() as f64;
let level = (1.0 - normalize_conformal_alpha(self.alpha)) * (1.0 + 1.0 / n);
let idx = (level * n).ceil() as usize;
let idx = idx.min(sorted.len()).saturating_sub(1);
Some(sorted[idx])
}
pub fn evaluate(&mut self, record: &DecisionRecord) -> ConformalPredictionSet {
self.normalize_runtime_config();
let score = nonconformity_score(record);
let quantile = self.conformal_quantile();
if self.scores.len() >= self.window_size {
self.scores.remove(0);
}
self.scores.push(score);
let threshold = match quantile {
Some(q) => q,
None => {
self.total_count += 1;
self.in_set_count += 1;
return ConformalPredictionSet {
quantile_threshold: f64::INFINITY,
current_score: score,
bayesian_action_in_set: true,
admissible_actions: vec![
DecisionAction::Allow,
DecisionAction::Reject,
DecisionAction::Repair,
],
empirical_coverage: 1.0,
};
}
};
let bayesian_in_set = score <= threshold;
let admissible = if bayesian_in_set {
vec![record.action]
} else {
vec![
DecisionAction::Allow,
DecisionAction::Reject,
DecisionAction::Repair,
]
};
self.total_count += 1;
if bayesian_in_set {
self.in_set_count += 1;
}
let empirical_coverage = if self.total_count > 0 {
self.in_set_count as f64 / self.total_count as f64
} else {
1.0
};
ConformalPredictionSet {
quantile_threshold: threshold,
current_score: score,
bayesian_action_in_set: bayesian_in_set,
admissible_actions: admissible,
empirical_coverage,
}
}
#[must_use]
pub fn empirical_coverage(&self) -> f64 {
if self.total_count == 0 {
return 1.0;
}
self.in_set_count.min(self.total_count) as f64 / self.total_count as f64
}
#[must_use]
pub fn calibration_count(&self) -> usize {
self.scores.len()
}
#[must_use]
pub fn is_calibrated(&self) -> bool {
self.scores.iter().filter(|score| score.is_finite()).count() >= 2
}
#[must_use]
pub fn coverage_alert(&self) -> bool {
self.total_count >= 100
&& self.empirical_coverage() < (1.0 - normalize_conformal_alpha(self.alpha))
}
fn normalize_runtime_config(&mut self) {
self.window_size = self.window_size.max(1);
self.alpha = normalize_conformal_alpha(self.alpha);
self.scores.retain(|score| score.is_finite());
if self.scores.len() > self.window_size {
let overflow = self.scores.len() - self.window_size;
self.scores.drain(0..overflow);
}
self.in_set_count = self.in_set_count.min(self.total_count);
}
}
#[cfg(feature = "asupersync")]
#[must_use]
pub fn outcome_to_action<T, E>(outcome: &::asupersync::Outcome<T, E>) -> DecisionAction {
match outcome {
::asupersync::Outcome::Ok(_) => DecisionAction::Allow,
::asupersync::Outcome::Err(_) => DecisionAction::Repair,
::asupersync::Outcome::Cancelled(_) | ::asupersync::Outcome::Panicked(_) => {
DecisionAction::Reject
}
}
}
#[cfg(test)]
mod tests {
use std::{borrow::Cow, hint::black_box, time::Instant};
use serde::Serialize;
use super::{
ConformalGuard, DecisionAction, EvidenceLedger, RaptorQEnvelope, RuntimeMode,
RuntimePolicy, SemanticIndexIdentity, SemanticWitnessRecord, decision_to_card,
};
const ASUPERSYNC_PACKET_ID: &str = "ASUPERSYNC-E";
const REPLAY_PREFIX: &str = "cargo test -p fp-runtime --";
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct StructuredTestLog {
packet_id: String,
case_id: String,
mode: RuntimeMode,
seed: u64,
trace_id: String,
assertion_path: String,
result: String,
replay_cmd: String,
}
fn make_structured_log(
case_id: &str,
mode: RuntimeMode,
seed: u64,
assertion_path: &str,
result: &str,
) -> StructuredTestLog {
StructuredTestLog {
packet_id: ASUPERSYNC_PACKET_ID.to_owned(),
case_id: case_id.to_owned(),
mode,
seed,
trace_id: format!("{ASUPERSYNC_PACKET_ID}:{case_id}:{seed:016x}"),
assertion_path: assertion_path.to_owned(),
result: result.to_owned(),
replay_cmd: format!("{REPLAY_PREFIX} {case_id} --nocapture"),
}
}
fn assert_required_log_fields(log: &serde_json::Value) {
for field in [
"packet_id",
"case_id",
"mode",
"seed",
"trace_id",
"assertion_path",
"result",
"replay_cmd",
] {
assert!(
log.get(field).is_some(),
"structured log missing field: {field}"
);
}
}
#[test]
fn evidence_ledger_records_semantic_witnesses_tn6qb3() {
let mut ledger = EvidenceLedger::new();
let witness = SemanticWitnessRecord::new(
"series.add",
"series_binary_arithmetic_materialization",
"outer",
vec![
SemanticIndexIdentity {
role: "left".to_owned(),
len: 2,
has_duplicates: false,
fingerprint: super::semantic_fingerprint_bytes(b"left"),
},
SemanticIndexIdentity {
role: "right".to_owned(),
len: 2,
has_duplicates: false,
fingerprint: super::semantic_fingerprint_bytes(b"right"),
},
],
SemanticIndexIdentity {
role: "output".to_owned(),
len: 3,
has_duplicates: false,
fingerprint: super::semantic_fingerprint_bytes(b"output"),
},
"missing aligned operands materialize as NaN/null before arithmetic",
"outer union preserves left order then right-only labels",
);
ledger.push_semantic_witness(witness);
let witnesses = ledger.semantic_witnesses();
assert_eq!(witnesses.len(), 1);
assert_eq!(witnesses[0].operation, "series.add");
assert_eq!(witnesses[0].alignment_mode, "outer");
assert_eq!(witnesses[0].output_index_identity.len, 3);
assert_eq!(witnesses[0].input_index_identity[0].role, "left");
assert!(
witnesses[0].input_index_identity[0]
.fingerprint
.starts_with("sha256:")
);
}
fn decide_join_admission_baseline(
policy: &RuntimePolicy,
estimated_rows: usize,
ledger: &mut EvidenceLedger,
) -> DecisionAction {
let issue = super::CompatibilityIssue {
kind: super::IssueKind::JoinCardinality,
subject: "join_estimator".to_owned(),
detail: format!("estimated_rows={estimated_rows}"),
};
let cap = policy.hardened_join_row_cap.unwrap_or(usize::MAX);
let evidence = vec![
super::EvidenceTerm {
name: Cow::Owned("estimator_overflow_risk".to_owned()),
log_likelihood_if_compatible: if estimated_rows <= cap { -0.3 } else { -2.8 },
log_likelihood_if_incompatible: if estimated_rows <= cap { -1.2 } else { -0.1 },
},
super::EvidenceTerm {
name: Cow::Owned("memory_budget_signal".to_owned()),
log_likelihood_if_compatible: if estimated_rows <= cap { -0.4 } else { -2.2 },
log_likelihood_if_incompatible: if estimated_rows <= cap { -1.5 } else { -0.2 },
},
];
let loss = super::LossMatrix {
allow_if_compatible: 0.0,
allow_if_incompatible: 130.0,
reject_if_compatible: 5.0,
reject_if_incompatible: 0.5,
repair_if_compatible: 1.5,
repair_if_incompatible: 3.0,
};
let mut record = super::decide(policy.mode, issue, 0.6, loss, evidence);
if matches!(policy.mode, RuntimeMode::Hardened) && estimated_rows > cap {
record.action = DecisionAction::Repair;
}
let action = record.action;
ledger.push(record);
action
}
fn assert_join_record_equivalent(
optimized: &super::DecisionRecord,
baseline: &super::DecisionRecord,
) {
assert_eq!(optimized.mode, baseline.mode);
assert_eq!(optimized.action, baseline.action);
assert_eq!(optimized.issue.kind, baseline.issue.kind);
assert_eq!(optimized.issue.subject, baseline.issue.subject);
assert_eq!(optimized.issue.detail, baseline.issue.detail);
assert_eq!(optimized.prior_compatible, baseline.prior_compatible);
assert_eq!(optimized.metrics, baseline.metrics);
assert_eq!(optimized.evidence.len(), baseline.evidence.len());
for (left, right) in optimized.evidence.iter().zip(&baseline.evidence) {
assert_eq!(left.name.as_ref(), right.name.as_ref());
assert_eq!(
left.log_likelihood_if_compatible,
right.log_likelihood_if_compatible
);
assert_eq!(
left.log_likelihood_if_incompatible,
right.log_likelihood_if_incompatible
);
}
}
fn quantile_from_sorted(samples: &[u128], pct: usize) -> u128 {
let len = samples.len();
assert!(len > 0);
let idx = (len.saturating_sub(1) * pct) / 100;
samples[idx]
}
fn latency_quantiles(mut samples_ns: Vec<u128>) -> (u128, u128, u128) {
samples_ns.sort_unstable();
(
quantile_from_sorted(&samples_ns, 50),
quantile_from_sorted(&samples_ns, 95),
quantile_from_sorted(&samples_ns, 99),
)
}
#[test]
fn asupersync_join_admission_optimized_path_is_isomorphic_to_baseline() {
let policy = RuntimePolicy::hardened(Some(1024));
let mut optimized = EvidenceLedger::new();
let mut baseline = EvidenceLedger::new();
for seed in 0_usize..256 {
let rows = if seed % 2 == 0 {
512 + seed
} else {
4096 + seed
};
let optimized_action = policy.decide_join_admission(rows, &mut optimized);
let baseline_action = decide_join_admission_baseline(&policy, rows, &mut baseline);
assert_eq!(optimized_action, baseline_action);
let optimized_record = optimized.records().last().expect("optimized record");
let baseline_record = baseline.records().last().expect("baseline record");
assert_join_record_equivalent(optimized_record, baseline_record);
}
}
#[test]
fn asupersync_join_admission_profile_snapshot_reports_allocation_delta() {
const ITERATIONS: usize = 256;
let policy = RuntimePolicy::hardened(Some(2048));
let mut optimized = EvidenceLedger::new();
let mut baseline = EvidenceLedger::new();
let mut optimized_ns = Vec::with_capacity(ITERATIONS);
let mut baseline_ns = Vec::with_capacity(ITERATIONS);
for seed in 0_usize..ITERATIONS {
let rows = if seed % 3 == 0 {
1024 + seed
} else {
8192 + seed
};
let baseline_start = Instant::now();
let baseline_action = decide_join_admission_baseline(&policy, rows, &mut baseline);
baseline_ns.push(baseline_start.elapsed().as_nanos());
black_box(baseline_action);
let optimized_start = Instant::now();
let optimized_action = policy.decide_join_admission(rows, &mut optimized);
optimized_ns.push(optimized_start.elapsed().as_nanos());
black_box(optimized_action);
}
for (optimized_record, baseline_record) in
optimized.records().iter().zip(baseline.records())
{
assert_join_record_equivalent(optimized_record, baseline_record);
}
let (baseline_p50_ns, baseline_p95_ns, baseline_p99_ns) = latency_quantiles(baseline_ns);
let (optimized_p50_ns, optimized_p95_ns, optimized_p99_ns) =
latency_quantiles(optimized_ns);
let baseline_name_bytes_per_call =
"estimator_overflow_risk".len() + "memory_budget_signal".len();
let baseline_name_bytes_total = baseline_name_bytes_per_call * ITERATIONS;
let optimized_name_bytes_total = 0_usize;
assert!(baseline_name_bytes_total > optimized_name_bytes_total);
println!(
"asupersync_join_admission_profile_snapshot baseline_ns[p50={baseline_p50_ns},p95={baseline_p95_ns},p99={baseline_p99_ns}] optimized_ns[p50={optimized_p50_ns},p95={optimized_p95_ns},p99={optimized_p99_ns}] name_alloc_bytes_baseline={baseline_name_bytes_total} name_alloc_bytes_optimized={optimized_name_bytes_total}"
);
}
#[test]
fn asupersync_structured_log_contains_required_fields() {
let log = make_structured_log(
"asupersync_structured_log_contains_required_fields",
RuntimeMode::Strict,
42,
"ASUPERSYNC-E/log_schema",
"pass",
);
let value = serde_json::to_value(log).expect("serialize log");
assert_required_log_fields(&value);
}
#[test]
fn asupersync_structured_log_is_deterministic_for_same_inputs() {
let left = make_structured_log(
"asupersync_structured_log_is_deterministic_for_same_inputs",
RuntimeMode::Hardened,
1337,
"ASUPERSYNC-E/log_determinism",
"pass",
);
let right = make_structured_log(
"asupersync_structured_log_is_deterministic_for_same_inputs",
RuntimeMode::Hardened,
1337,
"ASUPERSYNC-E/log_determinism",
"pass",
);
assert_eq!(left, right);
let left_json = serde_json::to_string(&left).expect("left json");
let right_json = serde_json::to_string(&right).expect("right json");
assert_eq!(left_json, right_json);
}
#[test]
fn asupersync_property_strict_unknown_feature_always_rejects() {
let policy = RuntimePolicy::strict();
let mut ledger = EvidenceLedger::new();
let case_id = "asupersync_property_strict_unknown_feature_always_rejects";
for seed in 0_u64..128 {
let action = policy.decide_unknown_feature(
format!("unknown_subject_{seed}"),
format!("unknown_detail_{:08x}", seed.wrapping_mul(37)),
&mut ledger,
);
let log = make_structured_log(
case_id,
RuntimeMode::Strict,
seed,
"ASUPERSYNC-E/strict_unknown_feature_reject",
if action == DecisionAction::Reject {
"pass"
} else {
"fail"
},
);
let log_json = serde_json::to_value(log).expect("serialize log");
assert_required_log_fields(&log_json);
assert_eq!(
action,
DecisionAction::Reject,
"strict mode must reject unknown feature; log={}",
serde_json::to_string(&log_json).expect("json")
);
}
assert_eq!(ledger.records().len(), 128);
}
#[test]
fn asupersync_property_hardened_over_cap_forces_repair() {
let cap = 1024_usize;
let policy = RuntimePolicy::hardened(Some(cap));
let mut ledger = EvidenceLedger::new();
let case_id = "asupersync_property_hardened_over_cap_forces_repair";
for seed in 0_u64..256 {
let rows = if seed % 2 == 0 {
cap + 1 + (seed as usize % 10_000)
} else {
cap.saturating_sub(seed as usize % cap)
};
let action = policy.decide_join_admission(rows, &mut ledger);
let log = make_structured_log(
case_id,
RuntimeMode::Hardened,
seed,
"ASUPERSYNC-E/hardened_join_cap_boundary",
if rows > cap && action == DecisionAction::Repair {
"pass"
} else {
"check"
},
);
let log_json = serde_json::to_value(log).expect("serialize log");
assert_required_log_fields(&log_json);
if rows > cap {
assert_eq!(
action,
DecisionAction::Repair,
"rows over cap must force repair; rows={rows}; log={}",
serde_json::to_string(&log_json).expect("json")
);
}
}
}
#[test]
fn asupersync_property_decision_metrics_are_finite_and_bounded() {
let policy = RuntimePolicy::hardened(Some(2048));
let mut ledger = EvidenceLedger::new();
let case_id = "asupersync_property_decision_metrics_are_finite_and_bounded";
for seed in 0_u64..128 {
let rows = 1 + (seed as usize * 97 % 500_000);
policy.decide_join_admission(rows, &mut ledger);
let record = ledger.records().last().expect("record");
let metrics = &record.metrics;
let posterior = metrics.posterior_compatible;
let bounded = (0.0..=1.0).contains(&posterior);
let finite = metrics
.bayes_factor_compatible_over_incompatible
.is_finite()
&& metrics.expected_loss_allow.is_finite()
&& metrics.expected_loss_reject.is_finite()
&& metrics.expected_loss_repair.is_finite();
let log = make_structured_log(
case_id,
RuntimeMode::Hardened,
seed,
"ASUPERSYNC-E/decision_metrics_finite",
if bounded && finite { "pass" } else { "fail" },
);
let log_json = serde_json::to_value(log).expect("serialize log");
assert_required_log_fields(&log_json);
assert!(bounded, "posterior out of range; log={log_json}");
assert!(finite, "non-finite metrics; log={log_json}");
}
}
#[test]
fn decide_clamps_boundary_priors_to_finite_range() {
for (input_prior, expected_prior) in [
(0.0, super::PRIOR_COMPATIBLE_EPSILON),
(1.0, 1.0 - super::PRIOR_COMPATIBLE_EPSILON),
] {
let record = super::decide(
RuntimeMode::Strict,
super::CompatibilityIssue {
kind: super::IssueKind::MalformedInput,
subject: "prior_clamp_test".to_owned(),
detail: "boundary prior".to_owned(),
},
input_prior,
super::LossMatrix::default(),
Vec::new(),
);
assert_eq!(
record.prior_compatible, expected_prior,
"prior should be clamped into open interval (0,1)"
);
assert!(
record.metrics.posterior_compatible.is_finite(),
"posterior must remain finite for boundary priors"
);
assert!(
record.metrics.expected_loss_allow.is_finite()
&& record.metrics.expected_loss_reject.is_finite()
&& record.metrics.expected_loss_repair.is_finite(),
"expected-loss metrics must remain finite for boundary priors"
);
}
}
#[test]
fn decide_normalizes_non_finite_priors_to_neutral() {
for input_prior in [f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
let record = super::decide(
RuntimeMode::Strict,
super::CompatibilityIssue {
kind: super::IssueKind::MalformedInput,
subject: "prior_clamp_test".to_owned(),
detail: "non-finite prior".to_owned(),
},
input_prior,
super::LossMatrix::default(),
Vec::new(),
);
assert_eq!(
record.prior_compatible, 0.5,
"non-finite priors should normalize to neutral prior"
);
assert!(
record.metrics.posterior_compatible.is_finite(),
"posterior must remain finite for non-finite priors"
);
}
}
#[test]
fn asupersync_adversarial_extreme_join_estimate_remains_repair_and_loggable() {
let policy = RuntimePolicy::hardened(Some(8));
let mut ledger = EvidenceLedger::new();
let action = policy.decide_join_admission(usize::MAX, &mut ledger);
assert_eq!(action, DecisionAction::Repair);
let record = ledger.records().last().expect("record");
assert_eq!(record.mode, RuntimeMode::Hardened);
assert!(
record.issue.detail.contains("estimated_rows="),
"issue detail should include estimated_rows"
);
let log = make_structured_log(
"asupersync_adversarial_extreme_join_estimate_remains_repair_and_loggable",
RuntimeMode::Hardened,
u64::MAX,
"ASUPERSYNC-E/adversarial_extreme_rows",
"pass",
);
let log_json = serde_json::to_value(log).expect("serialize log");
assert_required_log_fields(&log_json);
}
#[test]
fn strict_mode_fails_closed_for_unknown_features() {
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::strict();
let action = policy.decide_unknown_feature("csv", "field=experimental", &mut ledger);
assert_eq!(action, DecisionAction::Reject);
assert_eq!(ledger.records()[0].mode, RuntimeMode::Strict);
}
#[test]
fn hardened_mode_repairs_large_join_estimates() {
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(10_000));
let action = policy.decide_join_admission(100_000, &mut ledger);
assert_eq!(action, DecisionAction::Repair);
assert_eq!(ledger.records().len(), 1);
}
#[test]
fn source_backed_raptorq_envelope_records_manifest_fields() {
let mut source = vec![7_u8; super::DEFAULT_RAPTORQ_SYMBOL_BYTES];
source.extend_from_slice(b"tail");
let envelope = RaptorQEnvelope::from_source_bytes("packet-001", "conformance", &source, 3);
assert_eq!(envelope.artifact_id, "packet-001");
assert_eq!(envelope.artifact_type, "conformance");
assert!(envelope.source_hash.starts_with("sha256:"));
assert_eq!(envelope.source_hash.len(), "sha256:".len() + 64);
assert_eq!(envelope.raptorq.k, 2);
assert_eq!(envelope.raptorq.repair_symbols, 3);
assert_eq!(envelope.raptorq.overhead_ratio, 1.5);
assert_eq!(envelope.raptorq.symbol_hashes.len(), 2);
assert!(
envelope
.raptorq
.symbol_hashes
.iter()
.all(|hash| hash.starts_with("sha256:") && hash.len() == "sha256:".len() + 64)
);
assert_eq!(envelope.scrub.status, "ok");
assert!(envelope.scrub.last_ok_unix_ms > 0);
}
#[test]
fn decode_proof_append_is_capped_and_evicts_oldest() {
let mut envelope =
RaptorQEnvelope::from_source_bytes("packet-001", "conformance", b"source", 1);
let total = super::MAX_DECODE_PROOFS + 5;
for idx in 0..total {
envelope.push_decode_proof_capped(super::DecodeProof {
ts_unix_ms: u64::try_from(idx).expect("idx within u64 range"),
reason: format!("proof-{idx}"),
recovered_blocks: u32::try_from(idx).expect("idx within u32 range"),
proof_hash: format!("sha256:{idx:08x}"),
});
}
assert_eq!(envelope.decode_proofs.len(), super::MAX_DECODE_PROOFS);
assert_eq!(
envelope.decode_proofs[0].proof_hash,
format!("sha256:{:08x}", total - super::MAX_DECODE_PROOFS)
);
assert_eq!(
envelope
.decode_proofs
.last()
.expect("decode proof should exist")
.proof_hash,
format!("sha256:{:08x}", total - 1)
);
}
#[test]
fn decision_card_is_renderable_for_ftui_consumers() {
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::strict();
policy.decide_unknown_feature("csv", "field=experimental", &mut ledger);
let card = decision_to_card(&ledger.records()[0]);
let rendered = card.render_plain();
assert!(rendered.contains("argmin_a"));
assert!(rendered.contains("P(compatible|e)"));
}
#[test]
fn conformal_guard_uncalibrated_accepts_all() {
let mut guard = ConformalGuard::new(100, 0.1);
assert!(!guard.is_calibrated());
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::strict();
policy.decide_unknown_feature("test", "detail", &mut ledger);
let ps = guard.evaluate(&ledger.records()[0]);
assert!(ps.bayesian_action_in_set);
assert_eq!(ps.admissible_actions.len(), 3); assert_eq!(ps.quantile_threshold, f64::INFINITY);
}
#[test]
fn conformal_guard_calibrates_after_sufficient_data() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..10 {
policy.decide_join_admission(50_000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
assert!(guard.is_calibrated());
assert!(guard.conformal_quantile().is_some());
assert_eq!(guard.calibration_count(), 10);
}
#[test]
fn conformal_guard_rolling_window_evicts_old_scores() {
let mut guard = ConformalGuard::new(5, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..10 {
policy.decide_join_admission(1000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
assert_eq!(guard.calibration_count(), 5);
}
#[test]
fn conformal_guard_coverage_tracking() {
let mut guard = ConformalGuard::new(50, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..20 {
policy.decide_join_admission(1000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
let coverage = guard.empirical_coverage();
assert!(coverage > 0.5, "coverage should be reasonable: {coverage}");
}
#[test]
fn conformal_guard_no_coverage_alert_under_100_decisions() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..10 {
policy.decide_join_admission(1000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
assert!(!guard.coverage_alert());
}
#[test]
fn conformal_guard_zero_window_size_is_clamped() {
let mut guard = ConformalGuard::new(0, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
policy.decide_join_admission(1000, &mut ledger);
let set = guard.evaluate(&ledger.records()[0]);
assert!(set.bayesian_action_in_set);
assert_eq!(guard.calibration_count(), 1);
}
#[test]
fn conformal_guard_non_finite_alpha_uses_default() {
let guard = ConformalGuard::new(100, f64::NAN);
assert_eq!(guard.alpha, super::DEFAULT_CONFORMAL_ALPHA);
assert!(!guard.coverage_alert());
}
#[test]
fn conformal_guard_repairs_deserialized_zero_window_before_evaluate() {
let mut guard: ConformalGuard = serde_json::from_str(
r#"{"scores":[],"window_size":0,"alpha":0.1,"in_set_count":0,"total_count":0}"#,
)
.expect("deserialize guard");
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
policy.decide_join_admission(1000, &mut ledger);
let set = guard.evaluate(&ledger.records()[0]);
assert!(set.bayesian_action_in_set);
assert_eq!(guard.window_size, 1);
assert_eq!(guard.calibration_count(), 1);
}
#[test]
fn conformal_quantile_ignores_non_finite_persisted_scores() {
let guard = ConformalGuard {
scores: vec![f64::NAN, f64::INFINITY, 1.0, 2.0],
window_size: 10,
alpha: f64::NAN,
in_set_count: 5,
total_count: 3,
};
assert!(guard.is_calibrated());
assert_eq!(guard.conformal_quantile(), Some(2.0));
assert_eq!(guard.empirical_coverage(), 1.0);
}
#[test]
fn conformal_guard_quantile_is_deterministic() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..5 {
policy.decide_join_admission(1000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
let q1 = guard.conformal_quantile();
let q2 = guard.conformal_quantile();
assert_eq!(q1, q2);
}
#[test]
fn conformal_quantile_basic() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..5 {
policy.decide_join_admission(1000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
let q = guard.conformal_quantile();
assert!(q.is_some());
let quantile = q.unwrap();
assert!(quantile.is_finite(), "quantile must be finite: {quantile}");
assert!(quantile >= 0.0, "quantile must be non-negative: {quantile}");
}
#[test]
fn conformal_quantile_trivial() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
policy.decide_join_admission(1000, &mut ledger);
policy.decide_join_admission(1000, &mut ledger);
guard.evaluate(&ledger.records()[0]);
guard.evaluate(&ledger.records()[1]);
let q = guard.conformal_quantile();
assert!(q.is_some());
}
#[test]
fn conformal_quantile_empty() {
let guard = ConformalGuard::new(100, 0.1);
assert!(guard.conformal_quantile().is_none());
assert!(!guard.is_calibrated());
}
#[test]
fn conformal_guard_agrees_with_bayesian() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..20 {
policy.decide_join_admission(1000, &mut ledger);
}
let mut bayesian_agreed = 0;
let mut total = 0;
for record in ledger.records() {
let ps = guard.evaluate(record);
total += 1;
if ps.bayesian_action_in_set && ps.admissible_actions.len() == 1 {
assert_eq!(ps.admissible_actions[0], record.action);
bayesian_agreed += 1;
}
}
assert!(total > 0, "should have evaluated at least one decision");
assert!(
bayesian_agreed > 0 || total < 3,
"at least some decisions should agree with Bayesian"
);
}
#[test]
fn conformal_guard_widens_on_uncertainty() {
let mut guard = ConformalGuard::new(10, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..10 {
policy.decide_join_admission(100, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
let mut outlier_ledger = EvidenceLedger::new();
let extreme_policy = RuntimePolicy::hardened(Some(10));
extreme_policy.decide_join_admission(1_000_000, &mut outlier_ledger);
let ps = guard.evaluate(&outlier_ledger.records()[0]);
if !ps.bayesian_action_in_set {
assert_eq!(
ps.admissible_actions.len(),
3,
"widened set should admit all actions"
);
}
}
#[test]
fn conformal_coverage_guarantee_1000_decisions() {
let mut guard = ConformalGuard::new(1000, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for i in 0..1000 {
let rows = 1000 + (i * 7) % 500;
policy.decide_join_admission(rows, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
let coverage = guard.empirical_coverage();
assert!(
coverage >= 0.7,
"coverage {coverage} should be >= 0.7 (relaxed bound for finite sample)"
);
}
#[test]
fn conformal_rolling_window_exact_eviction() {
let window_size = 5;
let mut guard = ConformalGuard::new(window_size, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
for _ in 0..15 {
policy.decide_join_admission(1000, &mut ledger);
}
for record in ledger.records() {
guard.evaluate(record);
}
assert_eq!(
guard.calibration_count(),
window_size,
"window should be exactly {window_size}"
);
}
#[test]
fn conformal_galaxy_brain_card_content() {
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
policy.decide_join_admission(50_000, &mut ledger);
let card = decision_to_card(&ledger.records()[0]);
assert!(card.equation.contains("argmin_a"));
assert!(card.substitution.contains("P(compatible|e)"));
assert!(card.substitution.contains("E[allow]"));
assert!(card.substitution.contains("E[reject]"));
assert!(card.substitution.contains("E[repair]"));
}
#[test]
fn conformal_prediction_set_serializes() {
let mut guard = ConformalGuard::new(100, 0.1);
let mut ledger = EvidenceLedger::new();
let policy = RuntimePolicy::hardened(Some(100_000));
policy.decide_join_admission(1000, &mut ledger);
let ps = guard.evaluate(&ledger.records()[0]);
let json = serde_json::to_string(&ps).expect("serialize");
let _: serde_json::Value = serde_json::from_str(&json).expect("valid JSON");
assert!(json.contains("quantile_threshold"));
assert!(json.contains("empirical_coverage"));
}
}