Skip to main content

fp_runtime/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(rustdoc::broken_intra_doc_links)]
3
4//! Runtime policy + decision-recording layer for **frankenpandas**.
5//!
6//! Pandas operations frequently hit "do we accept this input or fail
7//! closed?" decisions: a dtype that doesn't quite match, a frequency
8//! that's almost-but-not-quite regular, an alignment that produces
9//! NaNs the user maybe didn't expect. fp-runtime gives the rest of
10//! the workspace a single place to record those decisions, score
11//! their compatibility, and persist a verifiable evidence trail so
12//! pipelines can audit "why did the IO layer / groupby / merge make
13//! this choice on this input?" after the fact.
14//!
15//! ## Decision recording
16//!
17//! - [`RuntimePolicy`]: the active policy bundle — mode, fail-closed
18//!   flags, decision thresholds. Constructed once per pipeline and
19//!   threaded through hot-path code.
20//! - [`RuntimeMode`]: the top-level mode (Permissive / Hardened /
21//!   Strict) controlling how aggressively the policy fails on
22//!   ambiguity.
23//! - [`EvidenceLedger`]: append-only log of [`DecisionRecord`]
24//!   entries. Thread it through long-running pipelines to capture
25//!   every decision made; serialize at the end for audit.
26//! - [`DecisionRecord`]: one decision's structured trail —
27//!   [`DecisionAction`] taken, the [`DecisionMetrics`] /
28//!   [`LossMatrix`] / [`EvidenceTerm`] inputs, any
29//!   [`CompatibilityIssue`] entries surfaced.
30//! - [`GalaxyBrainCard`]: human-readable summary of one decision
31//!   suitable for surfacing in IDE plugins or CI logs.
32//! - [`decision_to_card`]: convert a [`DecisionRecord`] to a card.
33//!
34//! ## Conformal prediction guards
35//!
36//! - [`ConformalGuard`]: rolling-window nonconformity calibration
37//!   used to gate uncertain decisions inside hot-path code (e.g.
38//!   "this dtype inference is too unsure — fail closed").
39//! - [`ConformalPredictionSet`]: the calibrated prediction set
40//!   (inclusion / exclusion of candidate labels) the guard
41//!   produces.
42//!
43//! ## RaptorQ envelopes
44//!
45//! - [`RaptorQEnvelope`] / [`RaptorQMetadata`] / [`ScrubStatus`] /
46//!   [`DecodeProof`]: forward-error-correction envelope types used
47//!   for verifying / scrubbing on-disk artifacts that the runtime
48//!   policy needs to trust.
49//!
50//! ## Error reporting
51//!
52//! - [`RuntimeError`]: structural errors in policy construction or
53//!   ledger serialization.
54//! - [`IssueKind`]: enum tagging the category of a
55//!   [`CompatibilityIssue`].
56//!
57//! ## Cargo features
58//!
59//! - `asupersync` (off by default): enables the `asupersync`
60//!   submodule and the `outcome_to_action` helper for converting
61//!   an `asupersync::Outcome` into a [`DecisionAction`]. Pulls in
62//!   the `asupersync` crate as an optional dep. (Items are gated
63//!   behind the feature so they don't appear in the default
64//!   docs.rs render.)
65
66use std::{
67    borrow::Cow,
68    time::{SystemTime, UNIX_EPOCH},
69};
70
71use serde::{Deserialize, Serialize};
72use sha2::{Digest, Sha256};
73use thiserror::Error;
74
75#[cfg(feature = "asupersync")]
76pub mod asupersync;
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum RuntimeMode {
81    Strict,
82    Hardened,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum DecisionAction {
88    Allow,
89    Reject,
90    Repair,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
94#[serde(rename_all = "snake_case")]
95pub enum IssueKind {
96    UnknownFeature,
97    MalformedInput,
98    JoinCardinality,
99    PolicyOverride,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub struct CompatibilityIssue {
104    pub kind: IssueKind,
105    pub subject: String,
106    pub detail: String,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct EvidenceTerm {
111    pub name: Cow<'static, str>,
112    pub log_likelihood_if_compatible: f64,
113    pub log_likelihood_if_incompatible: f64,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
117pub struct LossMatrix {
118    pub allow_if_compatible: f64,
119    pub allow_if_incompatible: f64,
120    pub reject_if_compatible: f64,
121    pub reject_if_incompatible: f64,
122    pub repair_if_compatible: f64,
123    pub repair_if_incompatible: f64,
124}
125
126impl Default for LossMatrix {
127    fn default() -> Self {
128        Self {
129            allow_if_compatible: 0.0,
130            allow_if_incompatible: 100.0,
131            reject_if_compatible: 6.0,
132            reject_if_incompatible: 0.5,
133            repair_if_compatible: 2.0,
134            repair_if_incompatible: 3.0,
135        }
136    }
137}
138
139const UNKNOWN_FEATURE_PRIOR: f64 = 0.25;
140const JOIN_ADMISSION_PRIOR: f64 = 0.6;
141const PRIOR_COMPATIBLE_EPSILON: f64 = 1e-10;
142
143const UNKNOWN_FEATURE_EVIDENCE: [EvidenceTerm; 2] = [
144    EvidenceTerm {
145        name: Cow::Borrowed("compatibility_allowlist_miss"),
146        log_likelihood_if_compatible: -3.5,
147        log_likelihood_if_incompatible: -0.2,
148    },
149    EvidenceTerm {
150        name: Cow::Borrowed("unknown_protocol_field"),
151        log_likelihood_if_compatible: -2.0,
152        log_likelihood_if_incompatible: -0.1,
153    },
154];
155
156const JOIN_ADMISSION_EVIDENCE_WITHIN_CAP: [EvidenceTerm; 2] = [
157    EvidenceTerm {
158        name: Cow::Borrowed("estimator_overflow_risk"),
159        log_likelihood_if_compatible: -0.3,
160        log_likelihood_if_incompatible: -1.2,
161    },
162    EvidenceTerm {
163        name: Cow::Borrowed("memory_budget_signal"),
164        log_likelihood_if_compatible: -0.4,
165        log_likelihood_if_incompatible: -1.5,
166    },
167];
168
169const JOIN_ADMISSION_EVIDENCE_OVER_CAP: [EvidenceTerm; 2] = [
170    EvidenceTerm {
171        name: Cow::Borrowed("estimator_overflow_risk"),
172        log_likelihood_if_compatible: -2.8,
173        log_likelihood_if_incompatible: -0.1,
174    },
175    EvidenceTerm {
176        name: Cow::Borrowed("memory_budget_signal"),
177        log_likelihood_if_compatible: -2.2,
178        log_likelihood_if_incompatible: -0.2,
179    },
180];
181
182const JOIN_ADMISSION_LOSS: LossMatrix = LossMatrix {
183    allow_if_compatible: 0.0,
184    allow_if_incompatible: 130.0,
185    reject_if_compatible: 5.0,
186    reject_if_incompatible: 0.5,
187    repair_if_compatible: 1.5,
188    repair_if_incompatible: 3.0,
189};
190
191const DEFAULT_CONFORMAL_ALPHA: f64 = 0.1;
192const MIN_CONFORMAL_ALPHA: f64 = 0.01;
193const MAX_CONFORMAL_ALPHA: f64 = 0.5;
194
195#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
196pub struct DecisionMetrics {
197    pub posterior_compatible: f64,
198    pub bayes_factor_compatible_over_incompatible: f64,
199    pub expected_loss_allow: f64,
200    pub expected_loss_reject: f64,
201    pub expected_loss_repair: f64,
202}
203
204#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
205pub struct DecisionRecord {
206    pub ts_unix_ms: u64,
207    pub mode: RuntimeMode,
208    pub action: DecisionAction,
209    pub issue: CompatibilityIssue,
210    pub prior_compatible: f64,
211    pub metrics: DecisionMetrics,
212    pub evidence: Vec<EvidenceTerm>,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
216pub struct SemanticIndexIdentity {
217    pub role: String,
218    pub len: usize,
219    pub has_duplicates: bool,
220    pub fingerprint: String,
221}
222
223#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
224pub struct SemanticWitnessRecord {
225    pub ts_unix_ms: u64,
226    pub operation: String,
227    pub materialization_reason: String,
228    pub alignment_mode: String,
229    pub input_index_identity: Vec<SemanticIndexIdentity>,
230    pub output_index_identity: SemanticIndexIdentity,
231    pub null_nan_policy: String,
232    pub output_ordering_contract: String,
233}
234
235impl SemanticWitnessRecord {
236    #[must_use]
237    pub fn new(
238        operation: impl Into<String>,
239        materialization_reason: impl Into<String>,
240        alignment_mode: impl Into<String>,
241        input_index_identity: Vec<SemanticIndexIdentity>,
242        output_index_identity: SemanticIndexIdentity,
243        null_nan_policy: impl Into<String>,
244        output_ordering_contract: impl Into<String>,
245    ) -> Self {
246        Self {
247            ts_unix_ms: now_unix_ms().unwrap_or_default(),
248            operation: operation.into(),
249            materialization_reason: materialization_reason.into(),
250            alignment_mode: alignment_mode.into(),
251            input_index_identity,
252            output_index_identity,
253            null_nan_policy: null_nan_policy.into(),
254            output_ordering_contract: output_ordering_contract.into(),
255        }
256    }
257}
258
259#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
260pub struct GalaxyBrainCard {
261    pub title: String,
262    pub equation: String,
263    pub substitution: String,
264    pub intuition: String,
265}
266
267impl GalaxyBrainCard {
268    #[must_use]
269    pub fn render_plain(&self) -> String {
270        format!(
271            "[{}]\n{}\n{}\n{}",
272            self.title, self.equation, self.substitution, self.intuition
273        )
274    }
275}
276
277#[must_use]
278pub fn decision_to_card(record: &DecisionRecord) -> GalaxyBrainCard {
279    GalaxyBrainCard {
280        title: format!("{}::{:?}", record.issue.subject, record.action),
281        equation: "argmin_a Σ_s L(a,s) P(s|evidence)".to_owned(),
282        substitution: format!(
283            "P(compatible|e)={:.4}, E[allow]={:.4}, E[reject]={:.4}, E[repair]={:.4}",
284            record.metrics.posterior_compatible,
285            record.metrics.expected_loss_allow,
286            record.metrics.expected_loss_reject,
287            record.metrics.expected_loss_repair
288        ),
289        intuition: "Lower expected loss wins; strict mode may still force fail-closed.".to_owned(),
290    }
291}
292
293#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
294pub struct EvidenceLedger {
295    records: Vec<DecisionRecord>,
296    #[serde(default)]
297    semantic_witnesses: Vec<SemanticWitnessRecord>,
298    // Runtime-only switch: when false, callers that build a throwaway ledger
299    // (e.g. the public `Series::add` convenience wrappers that discard the
300    // ledger) can skip the expensive semantic-witness fingerprinting. Not part
301    // of the serialized audit artifact, and `#[serde(skip)]` keeps the on-disk
302    // format unchanged. Per br-frankenpandas-b75cc.
303    #[serde(skip)]
304    record_semantic_witnesses: bool,
305}
306
307impl Default for EvidenceLedger {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313impl EvidenceLedger {
314    #[must_use]
315    pub fn new() -> Self {
316        Self {
317            records: Vec::new(),
318            semantic_witnesses: Vec::new(),
319            record_semantic_witnesses: true,
320        }
321    }
322
323    /// Build a ledger that does not record semantic witnesses. Used by the
324    /// public arithmetic convenience methods that discard their ledger, so the
325    /// AACE witness fingerprint (a sha256 over the index) is not computed when
326    /// nothing will read it. Observable operation output is unaffected.
327    #[must_use]
328    pub fn without_semantic_witnesses(mut self) -> Self {
329        self.record_semantic_witnesses = false;
330        self
331    }
332
333    /// Whether this ledger records AACE semantic witnesses (default true).
334    #[must_use]
335    pub fn records_semantic_witnesses(&self) -> bool {
336        self.record_semantic_witnesses
337    }
338
339    pub fn push(&mut self, record: DecisionRecord) {
340        self.records.push(record);
341    }
342
343    pub fn push_semantic_witness(&mut self, record: SemanticWitnessRecord) {
344        self.semantic_witnesses.push(record);
345    }
346
347    #[must_use]
348    pub fn records(&self) -> &[DecisionRecord] {
349        &self.records
350    }
351
352    #[must_use]
353    pub fn semantic_witnesses(&self) -> &[SemanticWitnessRecord] {
354        &self.semantic_witnesses
355    }
356}
357
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub struct RuntimePolicy {
360    pub mode: RuntimeMode,
361    pub fail_closed_unknown_features: bool,
362    pub hardened_join_row_cap: Option<usize>,
363}
364
365impl RuntimePolicy {
366    #[must_use]
367    pub fn strict() -> Self {
368        Self {
369            mode: RuntimeMode::Strict,
370            fail_closed_unknown_features: true,
371            hardened_join_row_cap: None,
372        }
373    }
374
375    #[must_use]
376    pub fn hardened(join_row_cap: Option<usize>) -> Self {
377        Self {
378            mode: RuntimeMode::Hardened,
379            fail_closed_unknown_features: false,
380            hardened_join_row_cap: join_row_cap,
381        }
382    }
383
384    pub fn decide_unknown_feature(
385        &self,
386        subject: impl Into<String>,
387        detail: impl Into<String>,
388        ledger: &mut EvidenceLedger,
389    ) -> DecisionAction {
390        let issue = CompatibilityIssue {
391            kind: IssueKind::UnknownFeature,
392            subject: subject.into(),
393            detail: detail.into(),
394        };
395
396        let mut record = decide(
397            self.mode,
398            issue,
399            UNKNOWN_FEATURE_PRIOR,
400            LossMatrix::default(),
401            UNKNOWN_FEATURE_EVIDENCE.to_vec(),
402        );
403        if self.fail_closed_unknown_features {
404            record.action = DecisionAction::Reject;
405        }
406        let action = record.action;
407        ledger.push(record);
408        action
409    }
410
411    pub fn decide_join_admission(
412        &self,
413        estimated_rows: usize,
414        ledger: &mut EvidenceLedger,
415    ) -> DecisionAction {
416        let issue = CompatibilityIssue {
417            kind: IssueKind::JoinCardinality,
418            subject: "join_estimator".to_owned(),
419            detail: format!("estimated_rows={estimated_rows}"),
420        };
421
422        let cap = self.hardened_join_row_cap.unwrap_or(usize::MAX);
423        let evidence = if estimated_rows <= cap {
424            JOIN_ADMISSION_EVIDENCE_WITHIN_CAP.to_vec()
425        } else {
426            JOIN_ADMISSION_EVIDENCE_OVER_CAP.to_vec()
427        };
428        let mut record = decide(
429            self.mode,
430            issue,
431            JOIN_ADMISSION_PRIOR,
432            JOIN_ADMISSION_LOSS,
433            evidence,
434        );
435
436        if matches!(self.mode, RuntimeMode::Hardened) && estimated_rows > cap {
437            record.action = DecisionAction::Repair;
438        }
439
440        let action = record.action;
441        ledger.push(record);
442        action
443    }
444}
445
446impl Default for RuntimePolicy {
447    fn default() -> Self {
448        Self::strict()
449    }
450}
451
452#[derive(Debug, Error)]
453pub enum RuntimeError {
454    #[error("system clock is before UNIX_EPOCH")]
455    ClockSkew,
456}
457
458fn now_unix_ms() -> Result<u64, RuntimeError> {
459    let ms = SystemTime::now()
460        .duration_since(UNIX_EPOCH)
461        .map_err(|_| RuntimeError::ClockSkew)?
462        .as_millis();
463    Ok(ms as u64)
464}
465
466fn normalize_prior_compatible(prior_compatible: f64) -> f64 {
467    if !prior_compatible.is_finite() {
468        return 0.5;
469    }
470    prior_compatible.clamp(PRIOR_COMPATIBLE_EPSILON, 1.0 - PRIOR_COMPATIBLE_EPSILON)
471}
472
473fn decide(
474    mode: RuntimeMode,
475    issue: CompatibilityIssue,
476    prior_compatible: f64,
477    loss: LossMatrix,
478    evidence: Vec<EvidenceTerm>,
479) -> DecisionRecord {
480    let prior_compatible = normalize_prior_compatible(prior_compatible);
481    let log_odds_prior = (prior_compatible / (1.0 - prior_compatible)).ln();
482    let llr_sum: f64 = evidence
483        .iter()
484        .map(|term| term.log_likelihood_if_compatible - term.log_likelihood_if_incompatible)
485        .sum();
486    let log_odds_post = log_odds_prior + llr_sum;
487
488    let posterior_compatible = 1.0 / (1.0 + (-log_odds_post).exp());
489    let posterior_incompatible = 1.0 - posterior_compatible;
490
491    let expected_loss_allow = loss.allow_if_compatible * posterior_compatible
492        + loss.allow_if_incompatible * posterior_incompatible;
493    let expected_loss_reject = loss.reject_if_compatible * posterior_compatible
494        + loss.reject_if_incompatible * posterior_incompatible;
495    let expected_loss_repair = loss.repair_if_compatible * posterior_compatible
496        + loss.repair_if_incompatible * posterior_incompatible;
497
498    let mut best_action = DecisionAction::Allow;
499    let mut best_loss = expected_loss_allow;
500
501    if expected_loss_repair < best_loss {
502        best_action = DecisionAction::Repair;
503        best_loss = expected_loss_repair;
504    }
505    if expected_loss_reject < best_loss {
506        best_action = DecisionAction::Reject;
507    }
508
509    DecisionRecord {
510        ts_unix_ms: now_unix_ms().unwrap_or_default(),
511        mode,
512        action: best_action,
513        issue,
514        prior_compatible,
515        metrics: DecisionMetrics {
516            posterior_compatible,
517            bayes_factor_compatible_over_incompatible: llr_sum.exp(),
518            expected_loss_allow,
519            expected_loss_reject,
520            expected_loss_repair,
521        },
522        evidence,
523    }
524}
525
526#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
527pub struct RaptorQEnvelope {
528    pub artifact_id: String,
529    pub artifact_type: String,
530    pub source_hash: String,
531    pub raptorq: RaptorQMetadata,
532    pub scrub: ScrubStatus,
533    pub decode_proofs: Vec<DecodeProof>,
534}
535
536pub const MAX_DECODE_PROOFS: usize = 1_000;
537pub const DEFAULT_RAPTORQ_SYMBOL_BYTES: usize = 1_024;
538
539#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
540pub struct RaptorQMetadata {
541    pub k: u32,
542    pub repair_symbols: u32,
543    pub overhead_ratio: f64,
544    pub symbol_hashes: Vec<String>,
545}
546
547#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
548pub struct ScrubStatus {
549    pub last_ok_unix_ms: u64,
550    pub status: String,
551}
552
553#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
554pub struct DecodeProof {
555    pub ts_unix_ms: u64,
556    pub reason: String,
557    pub recovered_blocks: u32,
558    pub proof_hash: String,
559}
560
561impl RaptorQEnvelope {
562    #[must_use]
563    pub fn from_source_bytes(
564        artifact_id: impl Into<String>,
565        artifact_type: impl Into<String>,
566        source_bytes: &[u8],
567        repair_symbols: u32,
568    ) -> Self {
569        let symbol_hashes: Vec<String> = source_bytes
570            .chunks(DEFAULT_RAPTORQ_SYMBOL_BYTES)
571            .map(|chunk| format!("sha256:{}", sha256_hex(chunk)))
572            .collect();
573        let k = u32::try_from(symbol_hashes.len()).unwrap_or(u32::MAX);
574        let overhead_ratio = if k == 0 {
575            0.0
576        } else {
577            f64::from(repair_symbols) / f64::from(k)
578        };
579
580        Self {
581            artifact_id: artifact_id.into(),
582            artifact_type: artifact_type.into(),
583            source_hash: format!("sha256:{}", sha256_hex(source_bytes)),
584            raptorq: RaptorQMetadata {
585                k,
586                repair_symbols,
587                overhead_ratio,
588                symbol_hashes,
589            },
590            scrub: ScrubStatus {
591                last_ok_unix_ms: now_unix_ms().unwrap_or_default(),
592                status: "ok".to_owned(),
593            },
594            decode_proofs: Vec::new(),
595        }
596    }
597
598    /// Append a decode proof while enforcing a bounded history size.
599    ///
600    /// When the cap is exceeded, oldest proofs are evicted first.
601    pub fn push_decode_proof_capped(&mut self, proof: DecodeProof) {
602        if self.decode_proofs.len() >= MAX_DECODE_PROOFS {
603            let overflow = self.decode_proofs.len() + 1 - MAX_DECODE_PROOFS;
604            self.decode_proofs.drain(0..overflow);
605        }
606        self.decode_proofs.push(proof);
607    }
608}
609
610#[must_use]
611pub fn semantic_fingerprint_bytes(bytes: &[u8]) -> String {
612    format!("sha256:{}", sha256_hex(bytes))
613}
614
615#[derive(Debug)]
616pub struct SemanticFingerprintBuilder {
617    hasher: Sha256,
618}
619
620impl Default for SemanticFingerprintBuilder {
621    fn default() -> Self {
622        Self::new()
623    }
624}
625
626impl SemanticFingerprintBuilder {
627    #[must_use]
628    pub fn new() -> Self {
629        Self {
630            hasher: Sha256::new(),
631        }
632    }
633
634    pub fn update(&mut self, bytes: &[u8]) {
635        self.hasher.update(bytes);
636    }
637
638    #[must_use]
639    pub fn finish(self) -> String {
640        format!("sha256:{}", sha256_digest_hex(self.hasher.finalize()))
641    }
642}
643
644fn sha256_hex(bytes: &[u8]) -> String {
645    let digest = Sha256::digest(bytes);
646    sha256_digest_hex(digest)
647}
648
649fn sha256_digest_hex(digest: impl IntoIterator<Item = u8>) -> String {
650    const HEX: &[u8; 16] = b"0123456789abcdef";
651    let mut hex = String::with_capacity(64);
652    for byte in digest {
653        hex.push(char::from(HEX[usize::from(byte >> 4)]));
654        hex.push(char::from(HEX[usize::from(byte & 0x0f)]));
655    }
656    hex
657}
658
659// === Conformal Calibration for Decision Engine (bd-2t5e.9, AG-09) ===
660
661/// Nonconformity score computed from a single decision record.
662/// Higher score = more "strange" relative to calibration window.
663fn nonconformity_score(record: &DecisionRecord) -> f64 {
664    // Score is the absolute log-posterior-odds: high when decision is extreme
665    let p = record
666        .metrics
667        .posterior_compatible
668        .clamp(1e-15, 1.0 - 1e-15);
669    (p / (1.0 - p)).ln().abs()
670}
671
672fn normalize_conformal_alpha(alpha: f64) -> f64 {
673    if alpha.is_finite() {
674        alpha.clamp(MIN_CONFORMAL_ALPHA, MAX_CONFORMAL_ALPHA)
675    } else {
676        DEFAULT_CONFORMAL_ALPHA
677    }
678}
679
680/// Conformal prediction set: which actions are admissible at significance level alpha.
681#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
682pub struct ConformalPredictionSet {
683    /// The conformal quantile threshold at significance level alpha.
684    pub quantile_threshold: f64,
685    /// The nonconformity score of the current decision.
686    pub current_score: f64,
687    /// Whether the Bayesian argmin action is inside the conformal set.
688    pub bayesian_action_in_set: bool,
689    /// Actions that are admissible (score <= threshold).
690    pub admissible_actions: Vec<DecisionAction>,
691    /// Empirical coverage rate over the calibration window.
692    pub empirical_coverage: f64,
693}
694
695/// Calibration window for conformal guard.
696#[derive(Debug, Clone, Serialize, Deserialize)]
697pub struct ConformalGuard {
698    /// Rolling window of nonconformity scores.
699    scores: Vec<f64>,
700    /// Maximum window size.
701    window_size: usize,
702    /// Significance level (e.g., 0.1 for 90% coverage).
703    alpha: f64,
704    /// Count of decisions where Bayesian action was in the conformal set.
705    in_set_count: usize,
706    /// Total decisions evaluated.
707    total_count: usize,
708}
709
710impl ConformalGuard {
711    /// Create a new conformal guard with the given window size and significance level.
712    #[must_use]
713    pub fn new(window_size: usize, alpha: f64) -> Self {
714        let window_size = window_size.max(1);
715        Self {
716            scores: Vec::with_capacity(window_size),
717            window_size,
718            alpha: normalize_conformal_alpha(alpha),
719            in_set_count: 0,
720            total_count: 0,
721        }
722    }
723
724    /// Default: 1000-element window, alpha=0.1 (90% coverage guarantee).
725    #[must_use]
726    pub fn default_config() -> Self {
727        Self::new(1000, 0.1)
728    }
729
730    /// Compute the conformal quantile from the calibration window.
731    /// Returns None if the window has fewer than 2 scores.
732    #[must_use]
733    pub fn conformal_quantile(&self) -> Option<f64> {
734        let mut sorted: Vec<f64> = self
735            .scores
736            .iter()
737            .copied()
738            .filter(|score| score.is_finite())
739            .collect();
740        if sorted.len() < 2 {
741            return None;
742        }
743        sorted.sort_by(f64::total_cmp);
744        // Quantile at level (1 - alpha)(1 + 1/n) per split conformal prediction
745        let n = sorted.len() as f64;
746        let level = (1.0 - normalize_conformal_alpha(self.alpha)) * (1.0 + 1.0 / n);
747        let idx = (level * n).ceil() as usize;
748        let idx = idx.min(sorted.len()).saturating_sub(1);
749        Some(sorted[idx])
750    }
751
752    /// Evaluate a decision record against the conformal guard.
753    /// Returns the prediction set and whether the Bayesian action is admissible.
754    pub fn evaluate(&mut self, record: &DecisionRecord) -> ConformalPredictionSet {
755        self.normalize_runtime_config();
756        let score = nonconformity_score(record);
757
758        let quantile = self.conformal_quantile();
759
760        // Add score to calibration window (rolling)
761        if self.scores.len() >= self.window_size {
762            self.scores.remove(0);
763        }
764        self.scores.push(score);
765
766        let threshold = match quantile {
767            Some(q) => q,
768            None => {
769                // Insufficient calibration data: accept all actions
770                self.total_count += 1;
771                self.in_set_count += 1;
772                return ConformalPredictionSet {
773                    quantile_threshold: f64::INFINITY,
774                    current_score: score,
775                    bayesian_action_in_set: true,
776                    admissible_actions: vec![
777                        DecisionAction::Allow,
778                        DecisionAction::Reject,
779                        DecisionAction::Repair,
780                    ],
781                    empirical_coverage: 1.0,
782                };
783            }
784        };
785
786        let bayesian_in_set = score <= threshold;
787
788        // Determine which actions would have scores <= threshold
789        // For now, if the Bayesian action is in set, it's the only admissible one.
790        // If not, we admit all actions (conformal guard widens the set).
791        let admissible = if bayesian_in_set {
792            vec![record.action]
793        } else {
794            vec![
795                DecisionAction::Allow,
796                DecisionAction::Reject,
797                DecisionAction::Repair,
798            ]
799        };
800
801        self.total_count += 1;
802        if bayesian_in_set {
803            self.in_set_count += 1;
804        }
805
806        let empirical_coverage = if self.total_count > 0 {
807            self.in_set_count as f64 / self.total_count as f64
808        } else {
809            1.0
810        };
811
812        ConformalPredictionSet {
813            quantile_threshold: threshold,
814            current_score: score,
815            bayesian_action_in_set: bayesian_in_set,
816            admissible_actions: admissible,
817            empirical_coverage,
818        }
819    }
820
821    /// Current empirical coverage rate.
822    #[must_use]
823    pub fn empirical_coverage(&self) -> f64 {
824        if self.total_count == 0 {
825            return 1.0;
826        }
827        self.in_set_count.min(self.total_count) as f64 / self.total_count as f64
828    }
829
830    /// Number of scores in the calibration window.
831    #[must_use]
832    pub fn calibration_count(&self) -> usize {
833        self.scores.len()
834    }
835
836    /// Whether the calibration window has sufficient data.
837    #[must_use]
838    pub fn is_calibrated(&self) -> bool {
839        self.scores.iter().filter(|score| score.is_finite()).count() >= 2
840    }
841
842    /// Whether coverage has dropped below target for the alert threshold.
843    #[must_use]
844    pub fn coverage_alert(&self) -> bool {
845        self.total_count >= 100
846            && self.empirical_coverage() < (1.0 - normalize_conformal_alpha(self.alpha))
847    }
848
849    fn normalize_runtime_config(&mut self) {
850        self.window_size = self.window_size.max(1);
851        self.alpha = normalize_conformal_alpha(self.alpha);
852        self.scores.retain(|score| score.is_finite());
853        if self.scores.len() > self.window_size {
854            let overflow = self.scores.len() - self.window_size;
855            self.scores.drain(0..overflow);
856        }
857        self.in_set_count = self.in_set_count.min(self.total_count);
858    }
859}
860
861#[cfg(feature = "asupersync")]
862#[must_use]
863pub fn outcome_to_action<T, E>(outcome: &::asupersync::Outcome<T, E>) -> DecisionAction {
864    match outcome {
865        ::asupersync::Outcome::Ok(_) => DecisionAction::Allow,
866        ::asupersync::Outcome::Err(_) => DecisionAction::Repair,
867        ::asupersync::Outcome::Cancelled(_) | ::asupersync::Outcome::Panicked(_) => {
868            DecisionAction::Reject
869        }
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use std::{borrow::Cow, hint::black_box, time::Instant};
876
877    use serde::Serialize;
878
879    use super::{
880        ConformalGuard, DecisionAction, EvidenceLedger, RaptorQEnvelope, RuntimeMode,
881        RuntimePolicy, SemanticIndexIdentity, SemanticWitnessRecord, decision_to_card,
882    };
883
884    const ASUPERSYNC_PACKET_ID: &str = "ASUPERSYNC-E";
885    const REPLAY_PREFIX: &str = "cargo test -p fp-runtime --";
886
887    #[derive(Debug, Clone, PartialEq, Eq, Serialize)]
888    struct StructuredTestLog {
889        packet_id: String,
890        case_id: String,
891        mode: RuntimeMode,
892        seed: u64,
893        trace_id: String,
894        assertion_path: String,
895        result: String,
896        replay_cmd: String,
897    }
898
899    fn make_structured_log(
900        case_id: &str,
901        mode: RuntimeMode,
902        seed: u64,
903        assertion_path: &str,
904        result: &str,
905    ) -> StructuredTestLog {
906        StructuredTestLog {
907            packet_id: ASUPERSYNC_PACKET_ID.to_owned(),
908            case_id: case_id.to_owned(),
909            mode,
910            seed,
911            trace_id: format!("{ASUPERSYNC_PACKET_ID}:{case_id}:{seed:016x}"),
912            assertion_path: assertion_path.to_owned(),
913            result: result.to_owned(),
914            replay_cmd: format!("{REPLAY_PREFIX} {case_id} --nocapture"),
915        }
916    }
917
918    fn assert_required_log_fields(log: &serde_json::Value) {
919        for field in [
920            "packet_id",
921            "case_id",
922            "mode",
923            "seed",
924            "trace_id",
925            "assertion_path",
926            "result",
927            "replay_cmd",
928        ] {
929            assert!(
930                log.get(field).is_some(),
931                "structured log missing field: {field}"
932            );
933        }
934    }
935
936    #[test]
937    fn evidence_ledger_records_semantic_witnesses_tn6qb3() {
938        let mut ledger = EvidenceLedger::new();
939        let witness = SemanticWitnessRecord::new(
940            "series.add",
941            "series_binary_arithmetic_materialization",
942            "outer",
943            vec![
944                SemanticIndexIdentity {
945                    role: "left".to_owned(),
946                    len: 2,
947                    has_duplicates: false,
948                    fingerprint: super::semantic_fingerprint_bytes(b"left"),
949                },
950                SemanticIndexIdentity {
951                    role: "right".to_owned(),
952                    len: 2,
953                    has_duplicates: false,
954                    fingerprint: super::semantic_fingerprint_bytes(b"right"),
955                },
956            ],
957            SemanticIndexIdentity {
958                role: "output".to_owned(),
959                len: 3,
960                has_duplicates: false,
961                fingerprint: super::semantic_fingerprint_bytes(b"output"),
962            },
963            "missing aligned operands materialize as NaN/null before arithmetic",
964            "outer union preserves left order then right-only labels",
965        );
966
967        ledger.push_semantic_witness(witness);
968
969        let witnesses = ledger.semantic_witnesses();
970        assert_eq!(witnesses.len(), 1);
971        assert_eq!(witnesses[0].operation, "series.add");
972        assert_eq!(witnesses[0].alignment_mode, "outer");
973        assert_eq!(witnesses[0].output_index_identity.len, 3);
974        assert_eq!(witnesses[0].input_index_identity[0].role, "left");
975        assert!(
976            witnesses[0].input_index_identity[0]
977                .fingerprint
978                .starts_with("sha256:")
979        );
980    }
981
982    fn decide_join_admission_baseline(
983        policy: &RuntimePolicy,
984        estimated_rows: usize,
985        ledger: &mut EvidenceLedger,
986    ) -> DecisionAction {
987        let issue = super::CompatibilityIssue {
988            kind: super::IssueKind::JoinCardinality,
989            subject: "join_estimator".to_owned(),
990            detail: format!("estimated_rows={estimated_rows}"),
991        };
992        let cap = policy.hardened_join_row_cap.unwrap_or(usize::MAX);
993        let evidence = vec![
994            super::EvidenceTerm {
995                name: Cow::Owned("estimator_overflow_risk".to_owned()),
996                log_likelihood_if_compatible: if estimated_rows <= cap { -0.3 } else { -2.8 },
997                log_likelihood_if_incompatible: if estimated_rows <= cap { -1.2 } else { -0.1 },
998            },
999            super::EvidenceTerm {
1000                name: Cow::Owned("memory_budget_signal".to_owned()),
1001                log_likelihood_if_compatible: if estimated_rows <= cap { -0.4 } else { -2.2 },
1002                log_likelihood_if_incompatible: if estimated_rows <= cap { -1.5 } else { -0.2 },
1003            },
1004        ];
1005        let loss = super::LossMatrix {
1006            allow_if_compatible: 0.0,
1007            allow_if_incompatible: 130.0,
1008            reject_if_compatible: 5.0,
1009            reject_if_incompatible: 0.5,
1010            repair_if_compatible: 1.5,
1011            repair_if_incompatible: 3.0,
1012        };
1013        let mut record = super::decide(policy.mode, issue, 0.6, loss, evidence);
1014        if matches!(policy.mode, RuntimeMode::Hardened) && estimated_rows > cap {
1015            record.action = DecisionAction::Repair;
1016        }
1017        let action = record.action;
1018        ledger.push(record);
1019        action
1020    }
1021
1022    fn assert_join_record_equivalent(
1023        optimized: &super::DecisionRecord,
1024        baseline: &super::DecisionRecord,
1025    ) {
1026        assert_eq!(optimized.mode, baseline.mode);
1027        assert_eq!(optimized.action, baseline.action);
1028        assert_eq!(optimized.issue.kind, baseline.issue.kind);
1029        assert_eq!(optimized.issue.subject, baseline.issue.subject);
1030        assert_eq!(optimized.issue.detail, baseline.issue.detail);
1031        assert_eq!(optimized.prior_compatible, baseline.prior_compatible);
1032        assert_eq!(optimized.metrics, baseline.metrics);
1033        assert_eq!(optimized.evidence.len(), baseline.evidence.len());
1034        for (left, right) in optimized.evidence.iter().zip(&baseline.evidence) {
1035            assert_eq!(left.name.as_ref(), right.name.as_ref());
1036            assert_eq!(
1037                left.log_likelihood_if_compatible,
1038                right.log_likelihood_if_compatible
1039            );
1040            assert_eq!(
1041                left.log_likelihood_if_incompatible,
1042                right.log_likelihood_if_incompatible
1043            );
1044        }
1045    }
1046
1047    fn quantile_from_sorted(samples: &[u128], pct: usize) -> u128 {
1048        let len = samples.len();
1049        assert!(len > 0);
1050        let idx = (len.saturating_sub(1) * pct) / 100;
1051        samples[idx]
1052    }
1053
1054    fn latency_quantiles(mut samples_ns: Vec<u128>) -> (u128, u128, u128) {
1055        samples_ns.sort_unstable();
1056        (
1057            quantile_from_sorted(&samples_ns, 50),
1058            quantile_from_sorted(&samples_ns, 95),
1059            quantile_from_sorted(&samples_ns, 99),
1060        )
1061    }
1062
1063    #[test]
1064    fn asupersync_join_admission_optimized_path_is_isomorphic_to_baseline() {
1065        let policy = RuntimePolicy::hardened(Some(1024));
1066        let mut optimized = EvidenceLedger::new();
1067        let mut baseline = EvidenceLedger::new();
1068
1069        for seed in 0_usize..256 {
1070            let rows = if seed % 2 == 0 {
1071                512 + seed
1072            } else {
1073                4096 + seed
1074            };
1075            let optimized_action = policy.decide_join_admission(rows, &mut optimized);
1076            let baseline_action = decide_join_admission_baseline(&policy, rows, &mut baseline);
1077            assert_eq!(optimized_action, baseline_action);
1078
1079            let optimized_record = optimized.records().last().expect("optimized record");
1080            let baseline_record = baseline.records().last().expect("baseline record");
1081            assert_join_record_equivalent(optimized_record, baseline_record);
1082        }
1083    }
1084
1085    #[test]
1086    fn asupersync_join_admission_profile_snapshot_reports_allocation_delta() {
1087        const ITERATIONS: usize = 256;
1088        let policy = RuntimePolicy::hardened(Some(2048));
1089        let mut optimized = EvidenceLedger::new();
1090        let mut baseline = EvidenceLedger::new();
1091        let mut optimized_ns = Vec::with_capacity(ITERATIONS);
1092        let mut baseline_ns = Vec::with_capacity(ITERATIONS);
1093
1094        for seed in 0_usize..ITERATIONS {
1095            let rows = if seed % 3 == 0 {
1096                1024 + seed
1097            } else {
1098                8192 + seed
1099            };
1100
1101            let baseline_start = Instant::now();
1102            let baseline_action = decide_join_admission_baseline(&policy, rows, &mut baseline);
1103            baseline_ns.push(baseline_start.elapsed().as_nanos());
1104            black_box(baseline_action);
1105
1106            let optimized_start = Instant::now();
1107            let optimized_action = policy.decide_join_admission(rows, &mut optimized);
1108            optimized_ns.push(optimized_start.elapsed().as_nanos());
1109            black_box(optimized_action);
1110        }
1111
1112        for (optimized_record, baseline_record) in
1113            optimized.records().iter().zip(baseline.records())
1114        {
1115            assert_join_record_equivalent(optimized_record, baseline_record);
1116        }
1117
1118        let (baseline_p50_ns, baseline_p95_ns, baseline_p99_ns) = latency_quantiles(baseline_ns);
1119        let (optimized_p50_ns, optimized_p95_ns, optimized_p99_ns) =
1120            latency_quantiles(optimized_ns);
1121        let baseline_name_bytes_per_call =
1122            "estimator_overflow_risk".len() + "memory_budget_signal".len();
1123        let baseline_name_bytes_total = baseline_name_bytes_per_call * ITERATIONS;
1124        let optimized_name_bytes_total = 0_usize;
1125        assert!(baseline_name_bytes_total > optimized_name_bytes_total);
1126
1127        println!(
1128            "asupersync_join_admission_profile_snapshot baseline_ns[p50={baseline_p50_ns},p95={baseline_p95_ns},p99={baseline_p99_ns}] optimized_ns[p50={optimized_p50_ns},p95={optimized_p95_ns},p99={optimized_p99_ns}] name_alloc_bytes_baseline={baseline_name_bytes_total} name_alloc_bytes_optimized={optimized_name_bytes_total}"
1129        );
1130    }
1131
1132    #[test]
1133    fn asupersync_structured_log_contains_required_fields() {
1134        let log = make_structured_log(
1135            "asupersync_structured_log_contains_required_fields",
1136            RuntimeMode::Strict,
1137            42,
1138            "ASUPERSYNC-E/log_schema",
1139            "pass",
1140        );
1141        let value = serde_json::to_value(log).expect("serialize log");
1142        assert_required_log_fields(&value);
1143    }
1144
1145    #[test]
1146    fn asupersync_structured_log_is_deterministic_for_same_inputs() {
1147        let left = make_structured_log(
1148            "asupersync_structured_log_is_deterministic_for_same_inputs",
1149            RuntimeMode::Hardened,
1150            1337,
1151            "ASUPERSYNC-E/log_determinism",
1152            "pass",
1153        );
1154        let right = make_structured_log(
1155            "asupersync_structured_log_is_deterministic_for_same_inputs",
1156            RuntimeMode::Hardened,
1157            1337,
1158            "ASUPERSYNC-E/log_determinism",
1159            "pass",
1160        );
1161        assert_eq!(left, right);
1162        let left_json = serde_json::to_string(&left).expect("left json");
1163        let right_json = serde_json::to_string(&right).expect("right json");
1164        assert_eq!(left_json, right_json);
1165    }
1166
1167    #[test]
1168    fn asupersync_property_strict_unknown_feature_always_rejects() {
1169        let policy = RuntimePolicy::strict();
1170        let mut ledger = EvidenceLedger::new();
1171        let case_id = "asupersync_property_strict_unknown_feature_always_rejects";
1172
1173        for seed in 0_u64..128 {
1174            let action = policy.decide_unknown_feature(
1175                format!("unknown_subject_{seed}"),
1176                format!("unknown_detail_{:08x}", seed.wrapping_mul(37)),
1177                &mut ledger,
1178            );
1179            let log = make_structured_log(
1180                case_id,
1181                RuntimeMode::Strict,
1182                seed,
1183                "ASUPERSYNC-E/strict_unknown_feature_reject",
1184                if action == DecisionAction::Reject {
1185                    "pass"
1186                } else {
1187                    "fail"
1188                },
1189            );
1190            let log_json = serde_json::to_value(log).expect("serialize log");
1191            assert_required_log_fields(&log_json);
1192            assert_eq!(
1193                action,
1194                DecisionAction::Reject,
1195                "strict mode must reject unknown feature; log={}",
1196                serde_json::to_string(&log_json).expect("json")
1197            );
1198        }
1199
1200        assert_eq!(ledger.records().len(), 128);
1201    }
1202
1203    #[test]
1204    fn asupersync_property_hardened_over_cap_forces_repair() {
1205        let cap = 1024_usize;
1206        let policy = RuntimePolicy::hardened(Some(cap));
1207        let mut ledger = EvidenceLedger::new();
1208        let case_id = "asupersync_property_hardened_over_cap_forces_repair";
1209
1210        for seed in 0_u64..256 {
1211            let rows = if seed % 2 == 0 {
1212                cap + 1 + (seed as usize % 10_000)
1213            } else {
1214                cap.saturating_sub(seed as usize % cap)
1215            };
1216            let action = policy.decide_join_admission(rows, &mut ledger);
1217            let log = make_structured_log(
1218                case_id,
1219                RuntimeMode::Hardened,
1220                seed,
1221                "ASUPERSYNC-E/hardened_join_cap_boundary",
1222                if rows > cap && action == DecisionAction::Repair {
1223                    "pass"
1224                } else {
1225                    "check"
1226                },
1227            );
1228            let log_json = serde_json::to_value(log).expect("serialize log");
1229            assert_required_log_fields(&log_json);
1230            if rows > cap {
1231                assert_eq!(
1232                    action,
1233                    DecisionAction::Repair,
1234                    "rows over cap must force repair; rows={rows}; log={}",
1235                    serde_json::to_string(&log_json).expect("json")
1236                );
1237            }
1238        }
1239    }
1240
1241    #[test]
1242    fn asupersync_property_decision_metrics_are_finite_and_bounded() {
1243        let policy = RuntimePolicy::hardened(Some(2048));
1244        let mut ledger = EvidenceLedger::new();
1245        let case_id = "asupersync_property_decision_metrics_are_finite_and_bounded";
1246
1247        for seed in 0_u64..128 {
1248            let rows = 1 + (seed as usize * 97 % 500_000);
1249            policy.decide_join_admission(rows, &mut ledger);
1250            let record = ledger.records().last().expect("record");
1251            let metrics = &record.metrics;
1252            let posterior = metrics.posterior_compatible;
1253            let bounded = (0.0..=1.0).contains(&posterior);
1254            let finite = metrics
1255                .bayes_factor_compatible_over_incompatible
1256                .is_finite()
1257                && metrics.expected_loss_allow.is_finite()
1258                && metrics.expected_loss_reject.is_finite()
1259                && metrics.expected_loss_repair.is_finite();
1260
1261            let log = make_structured_log(
1262                case_id,
1263                RuntimeMode::Hardened,
1264                seed,
1265                "ASUPERSYNC-E/decision_metrics_finite",
1266                if bounded && finite { "pass" } else { "fail" },
1267            );
1268            let log_json = serde_json::to_value(log).expect("serialize log");
1269            assert_required_log_fields(&log_json);
1270            assert!(bounded, "posterior out of range; log={log_json}");
1271            assert!(finite, "non-finite metrics; log={log_json}");
1272        }
1273    }
1274
1275    #[test]
1276    fn decide_clamps_boundary_priors_to_finite_range() {
1277        for (input_prior, expected_prior) in [
1278            (0.0, super::PRIOR_COMPATIBLE_EPSILON),
1279            (1.0, 1.0 - super::PRIOR_COMPATIBLE_EPSILON),
1280        ] {
1281            let record = super::decide(
1282                RuntimeMode::Strict,
1283                super::CompatibilityIssue {
1284                    kind: super::IssueKind::MalformedInput,
1285                    subject: "prior_clamp_test".to_owned(),
1286                    detail: "boundary prior".to_owned(),
1287                },
1288                input_prior,
1289                super::LossMatrix::default(),
1290                Vec::new(),
1291            );
1292
1293            assert_eq!(
1294                record.prior_compatible, expected_prior,
1295                "prior should be clamped into open interval (0,1)"
1296            );
1297            assert!(
1298                record.metrics.posterior_compatible.is_finite(),
1299                "posterior must remain finite for boundary priors"
1300            );
1301            assert!(
1302                record.metrics.expected_loss_allow.is_finite()
1303                    && record.metrics.expected_loss_reject.is_finite()
1304                    && record.metrics.expected_loss_repair.is_finite(),
1305                "expected-loss metrics must remain finite for boundary priors"
1306            );
1307        }
1308    }
1309
1310    #[test]
1311    fn decide_normalizes_non_finite_priors_to_neutral() {
1312        for input_prior in [f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
1313            let record = super::decide(
1314                RuntimeMode::Strict,
1315                super::CompatibilityIssue {
1316                    kind: super::IssueKind::MalformedInput,
1317                    subject: "prior_clamp_test".to_owned(),
1318                    detail: "non-finite prior".to_owned(),
1319                },
1320                input_prior,
1321                super::LossMatrix::default(),
1322                Vec::new(),
1323            );
1324
1325            assert_eq!(
1326                record.prior_compatible, 0.5,
1327                "non-finite priors should normalize to neutral prior"
1328            );
1329            assert!(
1330                record.metrics.posterior_compatible.is_finite(),
1331                "posterior must remain finite for non-finite priors"
1332            );
1333        }
1334    }
1335
1336    #[test]
1337    fn asupersync_adversarial_extreme_join_estimate_remains_repair_and_loggable() {
1338        let policy = RuntimePolicy::hardened(Some(8));
1339        let mut ledger = EvidenceLedger::new();
1340        let action = policy.decide_join_admission(usize::MAX, &mut ledger);
1341        assert_eq!(action, DecisionAction::Repair);
1342        let record = ledger.records().last().expect("record");
1343        assert_eq!(record.mode, RuntimeMode::Hardened);
1344        assert!(
1345            record.issue.detail.contains("estimated_rows="),
1346            "issue detail should include estimated_rows"
1347        );
1348
1349        let log = make_structured_log(
1350            "asupersync_adversarial_extreme_join_estimate_remains_repair_and_loggable",
1351            RuntimeMode::Hardened,
1352            u64::MAX,
1353            "ASUPERSYNC-E/adversarial_extreme_rows",
1354            "pass",
1355        );
1356        let log_json = serde_json::to_value(log).expect("serialize log");
1357        assert_required_log_fields(&log_json);
1358    }
1359
1360    #[test]
1361    fn strict_mode_fails_closed_for_unknown_features() {
1362        let mut ledger = EvidenceLedger::new();
1363        let policy = RuntimePolicy::strict();
1364
1365        let action = policy.decide_unknown_feature("csv", "field=experimental", &mut ledger);
1366        assert_eq!(action, DecisionAction::Reject);
1367        assert_eq!(ledger.records()[0].mode, RuntimeMode::Strict);
1368    }
1369
1370    #[test]
1371    fn hardened_mode_repairs_large_join_estimates() {
1372        let mut ledger = EvidenceLedger::new();
1373        let policy = RuntimePolicy::hardened(Some(10_000));
1374
1375        let action = policy.decide_join_admission(100_000, &mut ledger);
1376        assert_eq!(action, DecisionAction::Repair);
1377        assert_eq!(ledger.records().len(), 1);
1378    }
1379
1380    #[test]
1381    fn source_backed_raptorq_envelope_records_manifest_fields() {
1382        let mut source = vec![7_u8; super::DEFAULT_RAPTORQ_SYMBOL_BYTES];
1383        source.extend_from_slice(b"tail");
1384
1385        let envelope = RaptorQEnvelope::from_source_bytes("packet-001", "conformance", &source, 3);
1386
1387        assert_eq!(envelope.artifact_id, "packet-001");
1388        assert_eq!(envelope.artifact_type, "conformance");
1389        assert!(envelope.source_hash.starts_with("sha256:"));
1390        assert_eq!(envelope.source_hash.len(), "sha256:".len() + 64);
1391        assert_eq!(envelope.raptorq.k, 2);
1392        assert_eq!(envelope.raptorq.repair_symbols, 3);
1393        assert_eq!(envelope.raptorq.overhead_ratio, 1.5);
1394        assert_eq!(envelope.raptorq.symbol_hashes.len(), 2);
1395        assert!(
1396            envelope
1397                .raptorq
1398                .symbol_hashes
1399                .iter()
1400                .all(|hash| hash.starts_with("sha256:") && hash.len() == "sha256:".len() + 64)
1401        );
1402        assert_eq!(envelope.scrub.status, "ok");
1403        assert!(envelope.scrub.last_ok_unix_ms > 0);
1404    }
1405
1406    #[test]
1407    fn decode_proof_append_is_capped_and_evicts_oldest() {
1408        let mut envelope =
1409            RaptorQEnvelope::from_source_bytes("packet-001", "conformance", b"source", 1);
1410        let total = super::MAX_DECODE_PROOFS + 5;
1411
1412        for idx in 0..total {
1413            envelope.push_decode_proof_capped(super::DecodeProof {
1414                ts_unix_ms: u64::try_from(idx).expect("idx within u64 range"),
1415                reason: format!("proof-{idx}"),
1416                recovered_blocks: u32::try_from(idx).expect("idx within u32 range"),
1417                proof_hash: format!("sha256:{idx:08x}"),
1418            });
1419        }
1420
1421        assert_eq!(envelope.decode_proofs.len(), super::MAX_DECODE_PROOFS);
1422        assert_eq!(
1423            envelope.decode_proofs[0].proof_hash,
1424            format!("sha256:{:08x}", total - super::MAX_DECODE_PROOFS)
1425        );
1426        assert_eq!(
1427            envelope
1428                .decode_proofs
1429                .last()
1430                .expect("decode proof should exist")
1431                .proof_hash,
1432            format!("sha256:{:08x}", total - 1)
1433        );
1434    }
1435
1436    #[test]
1437    fn decision_card_is_renderable_for_ftui_consumers() {
1438        let mut ledger = EvidenceLedger::new();
1439        let policy = RuntimePolicy::strict();
1440        policy.decide_unknown_feature("csv", "field=experimental", &mut ledger);
1441
1442        let card = decision_to_card(&ledger.records()[0]);
1443        let rendered = card.render_plain();
1444        assert!(rendered.contains("argmin_a"));
1445        assert!(rendered.contains("P(compatible|e)"));
1446    }
1447
1448    // === Conformal Calibration Tests (bd-2t5e.9) ===
1449
1450    #[test]
1451    fn conformal_guard_uncalibrated_accepts_all() {
1452        let mut guard = ConformalGuard::new(100, 0.1);
1453        assert!(!guard.is_calibrated());
1454
1455        let mut ledger = EvidenceLedger::new();
1456        let policy = RuntimePolicy::strict();
1457        policy.decide_unknown_feature("test", "detail", &mut ledger);
1458
1459        let ps = guard.evaluate(&ledger.records()[0]);
1460        assert!(ps.bayesian_action_in_set);
1461        assert_eq!(ps.admissible_actions.len(), 3); // all actions admissible
1462        assert_eq!(ps.quantile_threshold, f64::INFINITY);
1463    }
1464
1465    #[test]
1466    fn conformal_guard_calibrates_after_sufficient_data() {
1467        let mut guard = ConformalGuard::new(100, 0.1);
1468        let mut ledger = EvidenceLedger::new();
1469        let policy = RuntimePolicy::hardened(Some(100_000));
1470
1471        // Feed 10 decisions to build calibration window
1472        for _ in 0..10 {
1473            policy.decide_join_admission(50_000, &mut ledger);
1474        }
1475
1476        for record in ledger.records() {
1477            guard.evaluate(record);
1478        }
1479
1480        assert!(guard.is_calibrated());
1481        assert!(guard.conformal_quantile().is_some());
1482        assert_eq!(guard.calibration_count(), 10);
1483    }
1484
1485    #[test]
1486    fn conformal_guard_rolling_window_evicts_old_scores() {
1487        let mut guard = ConformalGuard::new(5, 0.1);
1488        let mut ledger = EvidenceLedger::new();
1489        let policy = RuntimePolicy::hardened(Some(100_000));
1490
1491        for _ in 0..10 {
1492            policy.decide_join_admission(1000, &mut ledger);
1493        }
1494
1495        for record in ledger.records() {
1496            guard.evaluate(record);
1497        }
1498
1499        // Window should be capped at 5
1500        assert_eq!(guard.calibration_count(), 5);
1501    }
1502
1503    #[test]
1504    fn conformal_guard_coverage_tracking() {
1505        let mut guard = ConformalGuard::new(50, 0.1);
1506        let mut ledger = EvidenceLedger::new();
1507        let policy = RuntimePolicy::hardened(Some(100_000));
1508
1509        // Generate consistent decisions
1510        for _ in 0..20 {
1511            policy.decide_join_admission(1000, &mut ledger);
1512        }
1513
1514        for record in ledger.records() {
1515            guard.evaluate(record);
1516        }
1517
1518        // With consistent decisions, most should be in the conformal set
1519        let coverage = guard.empirical_coverage();
1520        assert!(coverage > 0.5, "coverage should be reasonable: {coverage}");
1521    }
1522
1523    #[test]
1524    fn conformal_guard_no_coverage_alert_under_100_decisions() {
1525        let mut guard = ConformalGuard::new(100, 0.1);
1526        let mut ledger = EvidenceLedger::new();
1527        let policy = RuntimePolicy::hardened(Some(100_000));
1528
1529        for _ in 0..10 {
1530            policy.decide_join_admission(1000, &mut ledger);
1531        }
1532        for record in ledger.records() {
1533            guard.evaluate(record);
1534        }
1535
1536        // Under 100 decisions, no alert regardless of coverage
1537        assert!(!guard.coverage_alert());
1538    }
1539
1540    #[test]
1541    fn conformal_guard_zero_window_size_is_clamped() {
1542        let mut guard = ConformalGuard::new(0, 0.1);
1543        let mut ledger = EvidenceLedger::new();
1544        let policy = RuntimePolicy::hardened(Some(100_000));
1545
1546        policy.decide_join_admission(1000, &mut ledger);
1547        let set = guard.evaluate(&ledger.records()[0]);
1548
1549        assert!(set.bayesian_action_in_set);
1550        assert_eq!(guard.calibration_count(), 1);
1551    }
1552
1553    #[test]
1554    fn conformal_guard_non_finite_alpha_uses_default() {
1555        let guard = ConformalGuard::new(100, f64::NAN);
1556        assert_eq!(guard.alpha, super::DEFAULT_CONFORMAL_ALPHA);
1557        assert!(!guard.coverage_alert());
1558    }
1559
1560    #[test]
1561    fn conformal_guard_repairs_deserialized_zero_window_before_evaluate() {
1562        let mut guard: ConformalGuard = serde_json::from_str(
1563            r#"{"scores":[],"window_size":0,"alpha":0.1,"in_set_count":0,"total_count":0}"#,
1564        )
1565        .expect("deserialize guard");
1566        let mut ledger = EvidenceLedger::new();
1567        let policy = RuntimePolicy::hardened(Some(100_000));
1568
1569        policy.decide_join_admission(1000, &mut ledger);
1570        let set = guard.evaluate(&ledger.records()[0]);
1571
1572        assert!(set.bayesian_action_in_set);
1573        assert_eq!(guard.window_size, 1);
1574        assert_eq!(guard.calibration_count(), 1);
1575    }
1576
1577    #[test]
1578    fn conformal_quantile_ignores_non_finite_persisted_scores() {
1579        let guard = ConformalGuard {
1580            scores: vec![f64::NAN, f64::INFINITY, 1.0, 2.0],
1581            window_size: 10,
1582            alpha: f64::NAN,
1583            in_set_count: 5,
1584            total_count: 3,
1585        };
1586
1587        assert!(guard.is_calibrated());
1588        assert_eq!(guard.conformal_quantile(), Some(2.0));
1589        assert_eq!(guard.empirical_coverage(), 1.0);
1590    }
1591
1592    #[test]
1593    fn conformal_guard_quantile_is_deterministic() {
1594        let mut guard = ConformalGuard::new(100, 0.1);
1595        let mut ledger = EvidenceLedger::new();
1596        let policy = RuntimePolicy::hardened(Some(100_000));
1597
1598        for _ in 0..5 {
1599            policy.decide_join_admission(1000, &mut ledger);
1600        }
1601        for record in ledger.records() {
1602            guard.evaluate(record);
1603        }
1604
1605        let q1 = guard.conformal_quantile();
1606        let q2 = guard.conformal_quantile();
1607        assert_eq!(q1, q2);
1608    }
1609
1610    // --- AG-09-T: Conformal Calibration Tests ---
1611
1612    /// AG-09-T #1: conformal_quantile with known scores returns correct quantile.
1613    #[test]
1614    fn conformal_quantile_basic() {
1615        let mut guard = ConformalGuard::new(100, 0.1);
1616        // Manually feed scores into the window
1617        let mut ledger = EvidenceLedger::new();
1618        let policy = RuntimePolicy::hardened(Some(100_000));
1619
1620        // Generate 5 decisions to fill window
1621        for _ in 0..5 {
1622            policy.decide_join_admission(1000, &mut ledger);
1623        }
1624        for record in ledger.records() {
1625            guard.evaluate(record);
1626        }
1627
1628        let q = guard.conformal_quantile();
1629        assert!(q.is_some());
1630        let quantile = q.unwrap();
1631        assert!(quantile.is_finite(), "quantile must be finite: {quantile}");
1632        assert!(quantile >= 0.0, "quantile must be non-negative: {quantile}");
1633    }
1634
1635    /// AG-09-T #2: Single-element score (after 2 evals) -> returns finite quantile.
1636    #[test]
1637    fn conformal_quantile_trivial() {
1638        let mut guard = ConformalGuard::new(100, 0.1);
1639        let mut ledger = EvidenceLedger::new();
1640        let policy = RuntimePolicy::hardened(Some(100_000));
1641
1642        // Need at least 2 scores
1643        policy.decide_join_admission(1000, &mut ledger);
1644        policy.decide_join_admission(1000, &mut ledger);
1645        guard.evaluate(&ledger.records()[0]);
1646        guard.evaluate(&ledger.records()[1]);
1647
1648        let q = guard.conformal_quantile();
1649        assert!(q.is_some());
1650    }
1651
1652    /// AG-09-T #3: Empty calibration window -> returns None.
1653    #[test]
1654    fn conformal_quantile_empty() {
1655        let guard = ConformalGuard::new(100, 0.1);
1656        assert!(guard.conformal_quantile().is_none());
1657        assert!(!guard.is_calibrated());
1658    }
1659
1660    /// AG-09-T #4: When conformal set is singleton (Bayesian in set),
1661    /// guard agrees with Bayesian argmin.
1662    #[test]
1663    fn conformal_guard_agrees_with_bayesian() {
1664        let mut guard = ConformalGuard::new(100, 0.1);
1665        let mut ledger = EvidenceLedger::new();
1666        let policy = RuntimePolicy::hardened(Some(100_000));
1667
1668        // Build calibration window with consistent decisions
1669        for _ in 0..20 {
1670            policy.decide_join_admission(1000, &mut ledger);
1671        }
1672
1673        let mut bayesian_agreed = 0;
1674        let mut total = 0;
1675
1676        for record in ledger.records() {
1677            let ps = guard.evaluate(record);
1678            total += 1;
1679            if ps.bayesian_action_in_set && ps.admissible_actions.len() == 1 {
1680                // Singleton set: only the Bayesian action is admissible
1681                assert_eq!(ps.admissible_actions[0], record.action);
1682                bayesian_agreed += 1;
1683            }
1684        }
1685
1686        // Most decisions with consistent data should agree
1687        assert!(total > 0, "should have evaluated at least one decision");
1688        // With uniform decisions, some will be in set
1689        assert!(
1690            bayesian_agreed > 0 || total < 3,
1691            "at least some decisions should agree with Bayesian"
1692        );
1693    }
1694
1695    /// AG-09-T #5: When conformal set widens (score > threshold),
1696    /// guard admits multiple actions.
1697    #[test]
1698    fn conformal_guard_widens_on_uncertainty() {
1699        let mut guard = ConformalGuard::new(10, 0.1);
1700        let mut ledger = EvidenceLedger::new();
1701        let policy = RuntimePolicy::hardened(Some(100_000));
1702
1703        // Build a tight calibration window with small-row decisions
1704        for _ in 0..10 {
1705            policy.decide_join_admission(100, &mut ledger);
1706        }
1707        for record in ledger.records() {
1708            guard.evaluate(record);
1709        }
1710
1711        // Now make a very different decision (large row estimate -> different posterior)
1712        let mut outlier_ledger = EvidenceLedger::new();
1713        let extreme_policy = RuntimePolicy::hardened(Some(10));
1714        extreme_policy.decide_join_admission(1_000_000, &mut outlier_ledger);
1715
1716        let ps = guard.evaluate(&outlier_ledger.records()[0]);
1717        // If the score exceeds threshold, the set should widen to 3 actions
1718        if !ps.bayesian_action_in_set {
1719            assert_eq!(
1720                ps.admissible_actions.len(),
1721                3,
1722                "widened set should admit all actions"
1723            );
1724        }
1725    }
1726
1727    /// AG-09-T #8: Coverage guarantee over 1000 exchangeable decisions.
1728    #[test]
1729    fn conformal_coverage_guarantee_1000_decisions() {
1730        let mut guard = ConformalGuard::new(1000, 0.1);
1731        let mut ledger = EvidenceLedger::new();
1732        let policy = RuntimePolicy::hardened(Some(100_000));
1733
1734        // Generate 1000 similar decisions (exchangeable)
1735        for i in 0..1000 {
1736            // Vary the row estimate slightly to create some variance
1737            let rows = 1000 + (i * 7) % 500;
1738            policy.decide_join_admission(rows, &mut ledger);
1739        }
1740
1741        for record in ledger.records() {
1742            guard.evaluate(record);
1743        }
1744
1745        // With exchangeable data, coverage should be >= 1 - alpha = 0.9
1746        // Allow some slack for finite sample effects
1747        let coverage = guard.empirical_coverage();
1748        assert!(
1749            coverage >= 0.7,
1750            "coverage {coverage} should be >= 0.7 (relaxed bound for finite sample)"
1751        );
1752    }
1753
1754    /// AG-09-T #10: Rolling window correctly drops oldest entries.
1755    #[test]
1756    fn conformal_rolling_window_exact_eviction() {
1757        let window_size = 5;
1758        let mut guard = ConformalGuard::new(window_size, 0.1);
1759        let mut ledger = EvidenceLedger::new();
1760        let policy = RuntimePolicy::hardened(Some(100_000));
1761
1762        // Generate more decisions than window size
1763        for _ in 0..15 {
1764            policy.decide_join_admission(1000, &mut ledger);
1765        }
1766
1767        for record in ledger.records() {
1768            guard.evaluate(record);
1769        }
1770
1771        assert_eq!(
1772            guard.calibration_count(),
1773            window_size,
1774            "window should be exactly {window_size}"
1775        );
1776    }
1777
1778    /// AG-09-T #12: decision_to_card produces a valid galaxy brain card
1779    /// with conformal-relevant information.
1780    #[test]
1781    fn conformal_galaxy_brain_card_content() {
1782        let mut ledger = EvidenceLedger::new();
1783        let policy = RuntimePolicy::hardened(Some(100_000));
1784        policy.decide_join_admission(50_000, &mut ledger);
1785
1786        let card = decision_to_card(&ledger.records()[0]);
1787        assert!(card.equation.contains("argmin_a"));
1788        assert!(card.substitution.contains("P(compatible|e)"));
1789        assert!(card.substitution.contains("E[allow]"));
1790        assert!(card.substitution.contains("E[reject]"));
1791        assert!(card.substitution.contains("E[repair]"));
1792    }
1793
1794    #[test]
1795    fn conformal_prediction_set_serializes() {
1796        let mut guard = ConformalGuard::new(100, 0.1);
1797        let mut ledger = EvidenceLedger::new();
1798        let policy = RuntimePolicy::hardened(Some(100_000));
1799        policy.decide_join_admission(1000, &mut ledger);
1800
1801        let ps = guard.evaluate(&ledger.records()[0]);
1802        let json = serde_json::to_string(&ps).expect("serialize");
1803        let _: serde_json::Value = serde_json::from_str(&json).expect("valid JSON");
1804        assert!(json.contains("quantile_threshold"));
1805        assert!(json.contains("empirical_coverage"));
1806    }
1807}