use chrono::{DateTime, Utc};
use cortex_core::{
compose_policy_outcomes, ArtifactLifecycleState, AuthorityFeedbackLoop, AxiomExecutionTrust,
ContextProofStateValue, ContextRedactionStatus, CortexContextTrust, ExecutionPolicyResult,
NamedQuarantineOutputs, PolicyContribution, PolicyDecision, PolicyOutcome, QuarantineOutput,
RepoTrustResult, TargetDomainValidationResult, TokenRevocationResult, TrustExchangeFieldError,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AdmissionLifecycle {
CandidateOnly,
Validated,
Promoted,
Stale,
Quarantined,
Unknown,
}
impl AdmissionLifecycle {
#[must_use]
pub const fn is_candidate_only(self) -> bool {
matches!(self, Self::CandidateOnly)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct AxiomTrustExchangeAdmissionRequest {
pub cortex_context_trust: Option<CortexContextTrust>,
pub axiom_execution_trust: AxiomExecutionTrust,
pub authority_feedback_loop: Option<AuthorityFeedbackLoop>,
pub lifecycle: AdmissionLifecycle,
pub now: DateTime<Utc>,
pub derived_from_quarantined: bool,
}
impl AxiomTrustExchangeAdmissionRequest {
#[must_use]
pub fn new(axiom_execution_trust: AxiomExecutionTrust, lifecycle: AdmissionLifecycle) -> Self {
Self {
cortex_context_trust: None,
axiom_execution_trust,
authority_feedback_loop: None,
lifecycle,
now: Utc::now(),
derived_from_quarantined: false,
}
}
#[must_use]
pub fn with_cortex_context_trust(mut self, ctx: CortexContextTrust) -> Self {
self.cortex_context_trust = Some(ctx);
self
}
#[must_use]
pub fn with_authority_feedback_loop(mut self, loop_record: AuthorityFeedbackLoop) -> Self {
self.authority_feedback_loop = Some(loop_record);
self
}
#[must_use]
pub const fn with_now(mut self, now: DateTime<Utc>) -> Self {
self.now = now;
self
}
#[must_use]
pub const fn with_derived_from_quarantined(mut self, derived: bool) -> Self {
self.derived_from_quarantined = derived;
self
}
#[must_use]
pub fn decide(&self) -> TrustExchangeAdmission {
let mut rejects: Vec<TrustExchangeFieldError> = Vec::new();
let mut quarantines: Vec<TrustExchangeFieldError> = Vec::new();
let mut named_quarantine_outputs = NamedQuarantineOutputs::default();
if !self.lifecycle.is_candidate_only() {
rejects.push(TrustExchangeFieldError::new(
"axiom.admission.lifecycle.must_be_candidate_only",
"Cortex admits pai-axiom trust exchange only as candidate_only",
));
}
if let Some(loop_record) = &self.authority_feedback_loop {
if loop_record.violates_same_loop_invariant() {
rejects.push(TrustExchangeFieldError::new(
"authority_feedback_loop.same_loop_promotion_must_be_false",
"same_loop_promotion_allowed must be false at the Cortex receiver",
));
}
if loop_record.claims_durable_authority() {
rejects.push(TrustExchangeFieldError::new(
"authority_feedback_loop.authority_claims.over_authorized",
"Cortex refuses durable_truth_promotion or full_execution_authority claims",
));
}
}
if let Err(errors) = self.axiom_execution_trust.validate() {
rejects.extend(errors);
}
if let Some(ctx) = &self.cortex_context_trust {
if let Err(errors) = ctx.validate() {
rejects.extend(errors);
}
}
if let Some(loop_record) = &self.authority_feedback_loop {
if let Err(errors) = loop_record.validate() {
rejects.extend(errors);
}
}
if let Some(ctx) = &self.cortex_context_trust {
if ctx.quarantine_state.propagates_quarantine() {
let invariant = "axiom.admission.quarantine.propagated".to_string();
let reason = format!(
"cortex_context_trust.quarantine_state == {:?}",
ctx.quarantine_state
);
quarantines.push(TrustExchangeFieldError::new(
invariant.clone(),
reason.clone(),
));
named_quarantine_outputs.source_context = Some(
QuarantineOutput::new(invariant, reason)
.with_source_ref("cortex_context_trust"),
);
}
if matches!(ctx.redaction_state.status, ContextRedactionStatus::Redacted)
&& ctx.redaction_state.blocks_critical_premise.unwrap_or(false)
{
quarantines.push(TrustExchangeFieldError::new(
"cortex_context_trust.redaction_state.blocks_critical_premise",
"redaction removed critical premise; treated as quarantine",
));
}
}
if self
.axiom_execution_trust
.token_scope
.revocation_result
.must_reject()
{
let invariant = match self.axiom_execution_trust.token_scope.revocation_result {
TokenRevocationResult::Revoked => "axiom_execution_trust.token_scope.revoked",
TokenRevocationResult::Inactive => "axiom_execution_trust.token_scope.inactive",
_ => "axiom_execution_trust.token_scope.invalid_state",
};
rejects.push(TrustExchangeFieldError::new(
invariant,
"capability token must be active for Cortex admission",
));
named_quarantine_outputs.token_revocation = Some(QuarantineOutput::new(
invariant,
"capability token must be active for Cortex admission",
));
}
if self.axiom_execution_trust.token_expired_at(self.now) {
rejects.push(TrustExchangeFieldError::new(
"axiom_execution_trust.token_scope.expired",
"capability token expires_at is in the past for the supplied now",
));
named_quarantine_outputs.token_revocation = Some(QuarantineOutput::new(
"axiom_execution_trust.token_scope.expired",
"capability token is expired",
));
}
if matches!(
self.axiom_execution_trust.repo_trust.result,
RepoTrustResult::Untrusted
) {
quarantines.push(TrustExchangeFieldError::new(
"axiom_execution_trust.repo_trust.untrusted",
"repo_trust.result == untrusted is propagated as quarantine",
));
named_quarantine_outputs.repo_trust = Some(QuarantineOutput::new(
"axiom_execution_trust.repo_trust.untrusted",
"repo trust untrusted",
));
}
if matches!(
self.axiom_execution_trust.policy_decision.result,
ExecutionPolicyResult::Deny
) {
rejects.push(TrustExchangeFieldError::new(
"axiom_execution_trust.policy_decision.deny",
"policy_decision.result == deny is a hard receiver refusal",
));
named_quarantine_outputs.policy_denial = Some(QuarantineOutput::new(
"axiom_execution_trust.policy_decision.deny",
"policy denied",
));
}
if let Some(loop_record) = &self.authority_feedback_loop {
if loop_record.target_domain_validation.required
&& !matches!(
loop_record.target_domain_validation.result,
TargetDomainValidationResult::Pass
)
{
quarantines.push(TrustExchangeFieldError::new(
"authority_feedback_loop.target_domain_validation.not_pass",
"target_domain_validation.result must be pass for clean admission",
));
named_quarantine_outputs.target_validation = Some(QuarantineOutput::new(
"authority_feedback_loop.target_domain_validation.not_pass",
"target domain validation not pass",
));
}
if loop_record
.returned_artifacts
.iter()
.any(|a| matches!(a.lifecycle_state, ArtifactLifecycleState::Quarantined))
{
quarantines.push(TrustExchangeFieldError::new(
"authority_feedback_loop.returned_artifacts.quarantined",
"at least one returned artifact lifecycle_state == quarantined",
));
named_quarantine_outputs.derived_artifact = Some(QuarantineOutput::new(
"authority_feedback_loop.returned_artifacts.quarantined",
"derived artifact quarantined",
));
}
if loop_record.quarantine_state.propagates_quarantine() {
quarantines.push(TrustExchangeFieldError::new(
"axiom.admission.quarantine.propagated",
"authority_feedback_loop.quarantine_state propagates",
));
named_quarantine_outputs.contradiction = Some(QuarantineOutput::new(
"axiom.admission.quarantine.propagated",
"feedback loop quarantine state",
));
}
}
if self.derived_from_quarantined {
quarantines.push(TrustExchangeFieldError::new(
"axiom.admission.quarantine.derived_from_quarantined",
"operator marked admission lineage as derived_from_quarantined",
));
named_quarantine_outputs.source_context = Some(QuarantineOutput::new(
"axiom.admission.quarantine.derived_from_quarantined",
"lineage trace indicates quarantined ancestor",
));
}
if let Some(ctx) = &self.cortex_context_trust {
if matches!(
ctx.proof_state.state,
ContextProofStateValue::Failed | ContextProofStateValue::Missing
) {
rejects.push(TrustExchangeFieldError::new(
"cortex_context_trust.proof_state.state.unusable",
"proof_state.state failed or missing is a hard refusal",
));
}
}
let forbidden_uses = forbidden_uses_for_candidate();
let policy_decision = compose_decision_outcomes(&rejects, &quarantines);
if !rejects.is_empty() {
TrustExchangeAdmission::Reject {
rejects,
quarantines,
named_quarantine_outputs,
policy_decision,
}
} else if !quarantines.is_empty() {
TrustExchangeAdmission::Quarantine {
quarantines,
named_quarantine_outputs,
policy_decision,
forbidden_uses,
}
} else {
TrustExchangeAdmission::AdmitCandidate {
forbidden_uses,
policy_decision,
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TrustExchangeAdmission {
AdmitCandidate {
forbidden_uses: Vec<ForbiddenUse>,
policy_decision: PolicyDecision,
},
Quarantine {
quarantines: Vec<TrustExchangeFieldError>,
named_quarantine_outputs: NamedQuarantineOutputs,
policy_decision: PolicyDecision,
forbidden_uses: Vec<ForbiddenUse>,
},
Reject {
rejects: Vec<TrustExchangeFieldError>,
quarantines: Vec<TrustExchangeFieldError>,
named_quarantine_outputs: NamedQuarantineOutputs,
policy_decision: PolicyDecision,
},
}
impl TrustExchangeAdmission {
#[must_use]
pub const fn decision_name(&self) -> &'static str {
match self {
Self::AdmitCandidate { .. } => "admit_candidate",
Self::Quarantine { .. } => "quarantine",
Self::Reject { .. } => "reject",
}
}
#[must_use]
pub fn named_quarantine_outputs(&self) -> Option<&NamedQuarantineOutputs> {
match self {
Self::AdmitCandidate { .. } => None,
Self::Quarantine {
named_quarantine_outputs,
..
}
| Self::Reject {
named_quarantine_outputs,
..
} => Some(named_quarantine_outputs),
}
}
#[must_use]
pub fn policy_decision(&self) -> &PolicyDecision {
match self {
Self::AdmitCandidate {
policy_decision, ..
}
| Self::Quarantine {
policy_decision, ..
}
| Self::Reject {
policy_decision, ..
} => policy_decision,
}
}
#[must_use]
pub fn invariants(&self) -> Vec<&str> {
match self {
Self::AdmitCandidate { .. } => Vec::new(),
Self::Quarantine { quarantines, .. } => {
quarantines.iter().map(|e| e.invariant.as_str()).collect()
}
Self::Reject {
rejects,
quarantines,
..
} => rejects
.iter()
.chain(quarantines.iter())
.map(|e| e.invariant.as_str())
.collect(),
}
}
#[must_use]
pub fn forbidden_uses(&self) -> Option<&[ForbiddenUse]> {
match self {
Self::AdmitCandidate { forbidden_uses, .. }
| Self::Quarantine { forbidden_uses, .. } => Some(forbidden_uses),
Self::Reject { .. } => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ForbiddenUse {
DurablePromotion,
ReleaseAcceptance,
RuntimeAuthority,
CortexTruth,
TrustedHistory,
}
#[must_use]
pub fn forbidden_uses_for_candidate() -> Vec<ForbiddenUse> {
vec![
ForbiddenUse::DurablePromotion,
ForbiddenUse::ReleaseAcceptance,
ForbiddenUse::RuntimeAuthority,
ForbiddenUse::CortexTruth,
ForbiddenUse::TrustedHistory,
]
}
fn compose_decision_outcomes(
rejects: &[TrustExchangeFieldError],
quarantines: &[TrustExchangeFieldError],
) -> PolicyDecision {
let mut contributions: Vec<PolicyContribution> = Vec::new();
for err in rejects {
contributions.push(
PolicyContribution::new(
stable_policy_rule_id(&err.invariant),
PolicyOutcome::Reject,
err.reason.clone(),
)
.expect("stable invariant policy contribution is well-formed"),
);
}
for err in quarantines {
contributions.push(
PolicyContribution::new(
stable_policy_rule_id(&err.invariant),
PolicyOutcome::Quarantine,
err.reason.clone(),
)
.expect("stable invariant policy contribution is well-formed"),
);
}
if contributions.is_empty() {
contributions.push(
PolicyContribution::new(
"axiom.admission.trust_exchange.allow_candidate",
PolicyOutcome::Allow,
"pai-axiom trust exchange admitted as Cortex candidate only",
)
.expect("static policy contribution shape is valid"),
);
}
compose_policy_outcomes(contributions, None)
}
fn stable_policy_rule_id(invariant: &str) -> String {
format!("axiom.admission.trust_exchange.{invariant}")
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use cortex_core::{
parse_axiom_execution_trust, parse_cortex_context_trust, AmplificationRisk,
ArtifactLifecycleState, AuthorityClaimStatus, ConfidenceCeiling, ContextQuarantineState,
ExecutionPolicyResult, FeedbackAuthorityClaims, FeedbackAxiomAction,
FeedbackInitiatingContext, FeedbackReturnedArtifact, RepoTrustResult, ReproducibilityLevel,
TargetDomainValidation, TargetDomainValidationResult, AUTHORITY_FEEDBACK_LOOP_SCHEMA,
};
const VALID_CTX: &str =
include_str!("../../cortex-core/tests/fixtures/pai-axiom/valid-cortex-context-trust.json");
const VALID_EXEC: &str =
include_str!("../../cortex-core/tests/fixtures/pai-axiom/valid-axiom-execution-trust.json");
fn valid_loop_record() -> AuthorityFeedbackLoop {
AuthorityFeedbackLoop {
schema: AUTHORITY_FEEDBACK_LOOP_SCHEMA.to_string(),
version: 1,
authority_feedback_loop_ref: Some("loop_ref".to_string()),
loop_id: "loop_valid".to_string(),
started_at: Utc.with_ymd_and_hms(2026, 5, 4, 18, 0, 0).unwrap(),
initiating_context: FeedbackInitiatingContext {
context_id: "ctx_valid".to_string(),
cortex_context_trust_ref: "ref://ctx".to_string(),
},
axiom_action: FeedbackAxiomAction {
action_id: "action_valid".to_string(),
axiom_execution_trust_ref: "ref://exec".to_string(),
},
returned_artifacts: vec![FeedbackReturnedArtifact {
artifact_id: "art_valid".to_string(),
lineage_ref: "lin_valid".to_string(),
lifecycle_state: ArtifactLifecycleState::Candidate,
reproducibility_level: ReproducibilityLevel::Observational,
}],
amplification_risk: AmplificationRisk::Low,
independent_evidence_refs: vec!["evi://ind".to_string()],
external_grounding_refs: vec!["gnd://ext".to_string()],
contradiction_scan_ref: "scan_ref".to_string(),
quarantine_state: ContextQuarantineState::Clear,
confidence_ceiling: ConfidenceCeiling::Advisory,
same_loop_promotion_allowed: false,
authority_claims: FeedbackAuthorityClaims {
durable_truth_promotion: AuthorityClaimStatus::Denied,
full_execution_authority: AuthorityClaimStatus::Denied,
review_required: true,
},
target_domain_validation: TargetDomainValidation {
required: true,
independent_validation_ref: Some("validation_ref".to_string()),
result: TargetDomainValidationResult::Pass,
},
residual_risk: vec![],
}
}
fn fixed_now() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 5, 12, 0, 0, 0).unwrap()
}
fn valid_request() -> AxiomTrustExchangeAdmissionRequest {
let exec = parse_axiom_execution_trust(VALID_EXEC).unwrap();
let ctx = parse_cortex_context_trust(VALID_CTX).unwrap();
AxiomTrustExchangeAdmissionRequest::new(exec, AdmissionLifecycle::CandidateOnly)
.with_cortex_context_trust(ctx)
.with_authority_feedback_loop(valid_loop_record())
.with_now(fixed_now())
}
#[test]
fn valid_request_admits_candidate_with_forbidden_uses() {
let decision = valid_request().decide();
assert_eq!(decision.decision_name(), "admit_candidate");
let forbidden = decision
.forbidden_uses()
.expect("candidate has forbidden_uses");
assert!(forbidden.contains(&ForbiddenUse::DurablePromotion));
assert!(forbidden.contains(&ForbiddenUse::ReleaseAcceptance));
assert!(forbidden.contains(&ForbiddenUse::RuntimeAuthority));
assert!(forbidden.contains(&ForbiddenUse::CortexTruth));
assert!(forbidden.contains(&ForbiddenUse::TrustedHistory));
assert_eq!(
decision.policy_decision().final_outcome,
PolicyOutcome::Allow
);
}
#[test]
fn lifecycle_not_candidate_only_rejects() {
let exec = parse_axiom_execution_trust(VALID_EXEC).unwrap();
let req = AxiomTrustExchangeAdmissionRequest::new(exec, AdmissionLifecycle::Validated)
.with_now(fixed_now());
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"axiom.admission.lifecycle.must_be_candidate_only"));
}
#[test]
fn same_loop_promotion_true_rejects_structurally() {
let mut req = valid_request();
if let Some(loop_record) = req.authority_feedback_loop.as_mut() {
loop_record.same_loop_promotion_allowed = true;
}
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"authority_feedback_loop.same_loop_promotion_must_be_false"));
}
#[test]
fn over_authorized_durable_truth_rejects() {
let mut req = valid_request();
if let Some(loop_record) = req.authority_feedback_loop.as_mut() {
loop_record.authority_claims.durable_truth_promotion =
AuthorityClaimStatus::EligibleAfterIndependentValidation;
}
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"authority_feedback_loop.authority_claims.over_authorized"));
}
#[test]
fn over_authorized_full_execution_authority_rejects() {
let mut req = valid_request();
if let Some(loop_record) = req.authority_feedback_loop.as_mut() {
loop_record.authority_claims.full_execution_authority =
AuthorityClaimStatus::EligibleAfterIndependentValidation;
}
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"authority_feedback_loop.authority_claims.over_authorized"));
}
#[test]
fn quarantine_state_propagated_quarantines() {
let mut req = valid_request();
req.cortex_context_trust
.as_mut()
.expect("ctx present")
.quarantine_state = ContextQuarantineState::Quarantined;
let decision = req.decide();
assert_eq!(decision.decision_name(), "quarantine");
assert!(decision
.invariants()
.contains(&"axiom.admission.quarantine.propagated"));
let outputs = decision.named_quarantine_outputs().unwrap();
assert!(outputs.source_context.is_some());
assert!(decision
.forbidden_uses()
.unwrap()
.contains(&ForbiddenUse::DurablePromotion));
}
#[test]
fn derived_from_quarantined_quarantines() {
let req = valid_request().with_derived_from_quarantined(true);
let decision = req.decide();
assert_eq!(decision.decision_name(), "quarantine");
assert!(decision
.invariants()
.contains(&"axiom.admission.quarantine.derived_from_quarantined"));
}
#[test]
fn target_domain_validation_not_pass_quarantines() {
let mut req = valid_request();
if let Some(loop_record) = req.authority_feedback_loop.as_mut() {
loop_record.target_domain_validation.result = TargetDomainValidationResult::Fail;
}
let decision = req.decide();
assert_eq!(decision.decision_name(), "quarantine");
assert!(decision
.invariants()
.contains(&"authority_feedback_loop.target_domain_validation.not_pass"));
}
#[test]
fn expired_token_rejects() {
let mut req = valid_request();
req.axiom_execution_trust.token_scope.expires_at =
Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"axiom_execution_trust.token_scope.expired"));
}
#[test]
fn revoked_token_rejects() {
let mut req = valid_request();
req.axiom_execution_trust.token_scope.revocation_result = TokenRevocationResult::Revoked;
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"axiom_execution_trust.token_scope.revoked"));
}
#[test]
fn inactive_token_rejects() {
let mut req = valid_request();
req.axiom_execution_trust.token_scope.revocation_result = TokenRevocationResult::Inactive;
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"axiom_execution_trust.token_scope.inactive"));
}
#[test]
fn untrusted_repo_quarantines() {
let mut req = valid_request();
req.axiom_execution_trust.repo_trust.result = RepoTrustResult::Untrusted;
let decision = req.decide();
assert_eq!(decision.decision_name(), "quarantine");
assert!(decision
.invariants()
.contains(&"axiom_execution_trust.repo_trust.untrusted"));
}
#[test]
fn deny_policy_rejects() {
let mut req = valid_request();
req.axiom_execution_trust.policy_decision.result = ExecutionPolicyResult::Deny;
let decision = req.decide();
assert_eq!(decision.decision_name(), "reject");
assert!(decision
.invariants()
.contains(&"axiom_execution_trust.policy_decision.deny"));
}
}