use cortex_core::{
compose_policy_outcomes, evaluate_semantic_trust, BoundaryQuarantineState,
CapabilityTokenDecision, FailingEdge, PaiAxiomExecutionReceiptV1, PolicyContribution,
PolicyDecision, PolicyOutcome, ProofClosureReport, ProofEdge, ProofEdgeFailure, ProofEdgeKind,
ProofState as CoreProofState, ProvenanceClass, RuntimeIntegrityState, SemanticTrustInput,
SemanticTrustReport, SemanticUse,
};
use serde::{Deserialize, Serialize};
pub const AXIOM_ADMISSION_PROOF_CLOSURE_INVARIANT: &str =
"cortex_memory.admission.axiom.proof_closure";
pub const AXIOM_ADMISSION_PROOF_CLOSURE_RULE_ID: &str = "memory.admission.axiom.proof_closure";
pub type AdmissionValidationResult<T> = Result<T, Vec<AdmissionRejectionReason>>;
pub type AdmissionEnvelopeResult<T> = Result<T, AdmissionEnvelopeError>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AxiomMemoryAdmissionRequest {
pub candidate_state: CandidateState,
pub evidence_class: EvidenceClass,
pub phase_context: PhaseContext,
pub tool_provenance: ToolProvenance,
pub source_anchors: Vec<SourceAnchor>,
pub redaction_status: RedactionStatus,
pub proof_state: ProofState,
pub contradiction_scan: ContradictionScan,
pub explicit_non_promotion: bool,
}
impl AxiomMemoryAdmissionRequest {
pub fn from_json_envelope(input: &str) -> AdmissionEnvelopeResult<Self> {
serde_json::from_str(input).map_err(|err| AdmissionEnvelopeError::InvalidEnvelope {
message: err.to_string(),
})
}
pub fn validate(&self) -> AdmissionValidationResult<()> {
let mut reasons = Vec::new();
push_err(&mut reasons, require_candidate_state(self.candidate_state));
push_err(
&mut reasons,
require_admissible_evidence(self.evidence_class),
);
push_err(
&mut reasons,
require_axiom_origin_is_not_product_spec(&self.tool_provenance),
);
push_err(&mut reasons, require_source_anchors(&self.source_anchors));
push_err(
&mut reasons,
require_redaction_status(self.redaction_status),
);
push_err(&mut reasons, require_usable_proof_state(self.proof_state));
push_err(
&mut reasons,
require_contradiction_scan(&self.contradiction_scan),
);
push_err(
&mut reasons,
require_explicit_non_promotion(self.explicit_non_promotion),
);
push_err(&mut reasons, require_phase_context(self.phase_context));
if reasons.is_empty() {
Ok(())
} else {
Err(reasons)
}
}
#[must_use]
pub fn admission_decision(&self) -> AdmissionDecision {
match self.validate() {
Ok(()) => AdmissionDecision::AdmitCandidate,
Err(reasons)
if reasons
.iter()
.any(AdmissionRejectionReason::requires_rejection) =>
{
AdmissionDecision::Reject { reasons }
}
Err(reasons) => AdmissionDecision::Quarantine { reasons },
}
}
#[must_use]
pub fn policy_decision(&self) -> PolicyDecision {
match self.admission_decision() {
AdmissionDecision::AdmitCandidate => compose_policy_outcomes(
vec![PolicyContribution::new(
"memory.admission.allow",
PolicyOutcome::Allow,
"AXIOM memory submission passed admission gates",
)
.expect("static policy contribution is valid")],
None,
),
AdmissionDecision::Reject { reasons } | AdmissionDecision::Quarantine { reasons } => {
let contributions = reasons
.into_iter()
.map(|reason| {
PolicyContribution::new(
reason.policy_rule_id(),
reason.policy_outcome(),
reason.policy_reason(),
)
.expect("static policy contribution is valid")
})
.collect();
compose_policy_outcomes(contributions, None)
}
}
}
#[must_use]
pub fn proof_closure_report(&self) -> ProofClosureReport {
let axis_anchor = "axiom.admission";
let axis_target = "proof_closure";
match self.proof_state {
ProofState::FullChainVerified => {
ProofClosureReport::full_chain_verified(vec![ProofEdge::new(
ProofEdgeKind::LineageClosure,
axis_anchor,
axis_target,
)
.with_evidence_ref("envelope.proof_state=full_chain_verified")])
}
ProofState::Partial => ProofClosureReport::from_edges(
Vec::new(),
vec![FailingEdge::missing(
ProofEdgeKind::LineageClosure,
axis_anchor,
"envelope proof_state is Partial; ADR 0036 forbids durable promotion",
)],
),
ProofState::Broken => ProofClosureReport::from_edges(
Vec::new(),
vec![FailingEdge::broken(
ProofEdgeKind::HashChain,
axis_anchor,
axis_target,
ProofEdgeFailure::Mismatch,
"envelope proof_state is Broken; ADR 0036 fails closed",
)],
),
ProofState::Unknown => ProofClosureReport::from_edges(
Vec::new(),
vec![FailingEdge::unresolved(
ProofEdgeKind::LineageClosure,
axis_anchor,
"envelope proof_state is Unknown; ADR 0036 forbids durable promotion",
)],
),
}
}
#[must_use]
pub fn proof_closure_contribution(&self) -> PolicyContribution {
let report = self.proof_closure_report();
let (outcome, reason): (PolicyOutcome, &'static str) = match report.state() {
CoreProofState::FullChainVerified => (
PolicyOutcome::Allow,
"AXIOM admission envelope proof closure is fully verified",
),
CoreProofState::Partial => (
PolicyOutcome::Quarantine,
"AXIOM admission envelope proof closure is partial; ADR 0036 forbids durable promotion",
),
CoreProofState::Broken => (
PolicyOutcome::Reject,
"AXIOM admission envelope proof closure is broken; ADR 0036 fails closed",
),
};
PolicyContribution::new(AXIOM_ADMISSION_PROOF_CLOSURE_RULE_ID, outcome, reason)
.expect("static admission proof closure contribution is valid")
}
pub fn require_durable_admission_allowed(&self) -> Result<(), DurableAdmissionRefusal> {
let report = self.proof_closure_report();
if report.is_full_chain_verified() {
Ok(())
} else {
Err(DurableAdmissionRefusal {
proof_state: report.state(),
})
}
}
#[must_use]
pub fn semantic_trust_report(
&self,
input: AdmissionSemanticTrustInput,
) -> AdmissionSemanticTrustReport {
let provenance_class = self.semantic_provenance_class();
let unresolved_unknowns =
input.unresolved_semantic_unknowns || self.has_semantic_unknowns();
let semantic_input = SemanticTrustInput::new(input.intended_use)
.with_provenance([provenance_class])
.with_independent_source_families(input.independent_source_families)
.with_falsification_evidence(input.falsification_evidence)
.with_unresolved_unknowns(unresolved_unknowns);
AdmissionSemanticTrustReport {
intended_use: input.intended_use,
provenance_class,
semantic_trust: evaluate_semantic_trust(&semantic_input),
admission_decision: self.admission_decision(),
explicit_non_promotion: self.explicit_non_promotion,
}
}
#[must_use]
pub fn semantic_provenance_class(&self) -> ProvenanceClass {
match self.evidence_class {
EvidenceClass::Unknown => ProvenanceClass::UnknownProvenance,
EvidenceClass::Simulated => ProvenanceClass::SimulatedOrHypothetical,
EvidenceClass::Claimed => ProvenanceClass::ExternalClaimed,
EvidenceClass::Inferred => ProvenanceClass::SummaryDerived,
EvidenceClass::Observed => self.observed_semantic_provenance_class(),
}
}
fn observed_semantic_provenance_class(&self) -> ProvenanceClass {
match (
self.phase_context,
self.tool_provenance.import_class,
self.tool_provenance.tool_name.is_empty(),
self.tool_provenance.invocation_id.is_empty(),
) {
(PhaseContext::WorkRecord, AxiomImportClass::AgentProcedure, _, _) => {
ProvenanceClass::RuntimeDerived
}
(_, AxiomImportClass::OperatorProtocol, false, false) => ProvenanceClass::ToolObserved,
(_, _, false, false) => ProvenanceClass::ToolObserved,
_ => ProvenanceClass::RuntimeDerived,
}
}
fn has_semantic_unknowns(&self) -> bool {
matches!(self.evidence_class, EvidenceClass::Unknown)
|| matches!(self.phase_context, PhaseContext::Unknown)
|| matches!(self.proof_state, ProofState::Unknown | ProofState::Broken)
|| matches!(
self.contradiction_scan,
ContradictionScan::Incomplete | ContradictionScan::NotScanned
)
|| self.source_anchors.is_empty()
|| self
.source_anchors
.iter()
.any(|anchor| anchor.reference.trim().is_empty())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AdmissionSemanticTrustInput {
pub intended_use: SemanticUse,
pub independent_source_families: u16,
pub falsification_evidence: bool,
pub unresolved_semantic_unknowns: bool,
}
impl AdmissionSemanticTrustInput {
#[must_use]
pub const fn new(intended_use: SemanticUse) -> Self {
Self {
intended_use,
independent_source_families: 0,
falsification_evidence: false,
unresolved_semantic_unknowns: true,
}
}
#[must_use]
pub const fn with_independent_source_families(mut self, count: u16) -> Self {
self.independent_source_families = count;
self
}
#[must_use]
pub const fn with_falsification_evidence(mut self, present: bool) -> Self {
self.falsification_evidence = present;
self
}
#[must_use]
pub const fn with_unresolved_semantic_unknowns(mut self, present: bool) -> Self {
self.unresolved_semantic_unknowns = present;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdmissionSemanticTrustReport {
pub intended_use: SemanticUse,
pub provenance_class: ProvenanceClass,
pub semantic_trust: SemanticTrustReport,
pub admission_decision: AdmissionDecision,
pub explicit_non_promotion: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AdmissionEnvelopeError {
InvalidEnvelope {
message: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DurableAdmissionRefusal {
pub proof_state: CoreProofState,
}
impl std::fmt::Display for DurableAdmissionRefusal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let proof_state = self.proof_state;
write!(
f,
"invariant={AXIOM_ADMISSION_PROOF_CLOSURE_INVARIANT} AXIOM admission durable gate refused: proof closure must be FullChainVerified; observed {proof_state:?}"
)
}
}
impl std::error::Error for DurableAdmissionRefusal {}
impl From<PaiAxiomExecutionReceiptV1> for AxiomMemoryAdmissionRequest {
fn from(receipt: PaiAxiomExecutionReceiptV1) -> Self {
let first_tool = receipt.tool_provenance.first();
let tool_name = first_tool
.map(|tool| tool.tool_name.clone())
.unwrap_or_else(|| "pai-axiom".to_string());
let invocation_id = first_tool
.map(|tool| tool.invocation_id.clone())
.unwrap_or_else(|| receipt.runtime_id.clone());
let evidence_class = receipt_evidence_class(&receipt);
let proof_state = receipt_proof_state(&receipt);
let explicit_non_promotion = receipt.explicit_non_promotion
&& receipt.quarantine_state != BoundaryQuarantineState::Contaminated;
let source_anchors = receipt
.source_anchors
.into_iter()
.map(|anchor| SourceAnchor::new(anchor.reference, SourceAnchorKind::Artifact))
.collect();
Self {
candidate_state: CandidateState::Candidate,
evidence_class,
phase_context: PhaseContext::WorkRecord,
tool_provenance: ToolProvenance::new(
tool_name,
invocation_id,
AxiomImportClass::AgentProcedure,
),
source_anchors,
redaction_status: RedactionStatus::Abstracted,
proof_state,
contradiction_scan: ContradictionScan::Incomplete,
explicit_non_promotion,
}
}
}
fn receipt_evidence_class(receipt: &PaiAxiomExecutionReceiptV1) -> EvidenceClass {
if receipt.tool_provenance.is_empty()
|| matches!(
receipt.capability_token_state.decision,
CapabilityTokenDecision::Rejected
| CapabilityTokenDecision::Expired
| CapabilityTokenDecision::Revoked
)
{
EvidenceClass::Claimed
} else {
EvidenceClass::Observed
}
}
fn receipt_proof_state(receipt: &PaiAxiomExecutionReceiptV1) -> ProofState {
if matches!(
receipt.quarantine_state,
BoundaryQuarantineState::Contaminated
) || matches!(
receipt.capability_token_state.decision,
CapabilityTokenDecision::Rejected
| CapabilityTokenDecision::Expired
| CapabilityTokenDecision::Revoked
) || matches!(
receipt.execution_trust_state.runtime_integrity,
RuntimeIntegrityState::Compromised
) {
ProofState::Broken
} else if matches!(
receipt.execution_trust_state.runtime_integrity,
RuntimeIntegrityState::VerifiedRelease | RuntimeIntegrityState::VerifiedProvenance
) && matches!(
receipt.capability_token_state.decision,
CapabilityTokenDecision::Allowed | CapabilityTokenDecision::Warned
) {
ProofState::Partial
} else {
ProofState::Unknown
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CandidateState {
Candidate,
Active,
Principle,
Doctrine,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EvidenceClass {
Observed,
Inferred,
Claimed,
Simulated,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PhaseContext {
Model,
Act,
Check,
WorkRecord,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AxiomImportClass {
OperatorProtocol,
AgentProcedure,
ProductSpecification,
TestEvalInput,
HistoricalReference,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ToolProvenance {
pub tool_name: String,
pub invocation_id: String,
pub import_class: AxiomImportClass,
}
impl ToolProvenance {
#[must_use]
pub fn new(
tool_name: impl Into<String>,
invocation_id: impl Into<String>,
import_class: AxiomImportClass,
) -> Self {
Self {
tool_name: tool_name.into(),
invocation_id: invocation_id.into(),
import_class,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SourceAnchor {
pub reference: String,
pub kind: SourceAnchorKind,
}
impl SourceAnchor {
#[must_use]
pub fn new(reference: impl Into<String>, kind: SourceAnchorKind) -> Self {
Self {
reference: reference.into(),
kind,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SourceAnchorKind {
Event,
Trace,
Episode,
Audit,
Artifact,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RedactionStatus {
Redacted,
Abstracted,
OperatorRawOptIn,
RawUnredacted,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProofState {
FullChainVerified,
Partial,
Broken,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ContradictionScan {
ScannedClean,
OpenContradictions(Vec<String>),
Incomplete,
NotScanned,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AdmissionDecision {
AdmitCandidate,
Reject {
reasons: Vec<AdmissionRejectionReason>,
},
Quarantine {
reasons: Vec<AdmissionRejectionReason>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmissionRejectionReason {
CandidateStateRequired,
EvidenceClassRequired,
ToolProvenanceRequired,
ProductSpecificationImportRejected,
SourceAnchorRequired,
SourceAnchorBlank,
RedactionStatusRequired,
ProofStateRequired,
ContradictionScanRequired,
OpenContradiction,
ExplicitNonPromotionRequired,
PhaseContextRequired,
}
impl AdmissionRejectionReason {
const fn requires_rejection(&self) -> bool {
matches!(
self,
Self::CandidateStateRequired
| Self::ProductSpecificationImportRejected
| Self::RedactionStatusRequired
| Self::ProofStateRequired
| Self::ExplicitNonPromotionRequired
)
}
const fn policy_outcome(self) -> PolicyOutcome {
if self.requires_rejection() {
PolicyOutcome::Reject
} else {
PolicyOutcome::Quarantine
}
}
const fn policy_rule_id(self) -> &'static str {
match self {
Self::CandidateStateRequired => "memory.admission.candidate_state_required",
Self::EvidenceClassRequired => "memory.admission.evidence_class_required",
Self::ToolProvenanceRequired => "memory.admission.tool_provenance_required",
Self::ProductSpecificationImportRejected => {
"memory.admission.product_specification_import_rejected"
}
Self::SourceAnchorRequired => "memory.admission.source_anchor_required",
Self::SourceAnchorBlank => "memory.admission.source_anchor_blank",
Self::RedactionStatusRequired => "memory.admission.redaction_status_required",
Self::ProofStateRequired => "memory.admission.proof_state_required",
Self::ContradictionScanRequired => "memory.admission.contradiction_scan_required",
Self::OpenContradiction => "memory.admission.open_contradiction",
Self::ExplicitNonPromotionRequired => {
"memory.admission.explicit_non_promotion_required"
}
Self::PhaseContextRequired => "memory.admission.phase_context_required",
}
}
const fn policy_reason(self) -> &'static str {
match self {
Self::CandidateStateRequired => "AXIOM admission target must remain Candidate",
Self::EvidenceClassRequired => "AXIOM admission requires classified evidence",
Self::ToolProvenanceRequired => "AXIOM admission requires tool provenance",
Self::ProductSpecificationImportRejected => {
"AXIOM import cannot become Cortex product specification through memory admission"
}
Self::SourceAnchorRequired => "AXIOM admission requires at least one source anchor",
Self::SourceAnchorBlank => "AXIOM admission source anchors must not be blank",
Self::RedactionStatusRequired => {
"AXIOM admission requires redacted, abstracted, or operator-opted raw content"
}
Self::ProofStateRequired => "AXIOM admission requires supplied non-broken proof state",
Self::ContradictionScanRequired => {
"AXIOM admission requires completed contradiction scan"
}
Self::OpenContradiction => {
"AXIOM admission found open contradiction and must not enter clean candidate path"
}
Self::ExplicitNonPromotionRequired => "AXIOM admission requires explicit non-promotion",
Self::PhaseContextRequired => "AXIOM admission requires known AXIOM phase context",
}
}
}
pub fn require_candidate_state(state: CandidateState) -> Result<(), AdmissionRejectionReason> {
if state == CandidateState::Candidate {
Ok(())
} else {
Err(AdmissionRejectionReason::CandidateStateRequired)
}
}
pub fn require_admissible_evidence(
evidence_class: EvidenceClass,
) -> Result<(), AdmissionRejectionReason> {
if evidence_class == EvidenceClass::Unknown {
Err(AdmissionRejectionReason::EvidenceClassRequired)
} else {
Ok(())
}
}
pub fn require_axiom_origin_is_not_product_spec(
provenance: &ToolProvenance,
) -> Result<(), AdmissionRejectionReason> {
if provenance.tool_name.trim().is_empty() || provenance.invocation_id.trim().is_empty() {
return Err(AdmissionRejectionReason::ToolProvenanceRequired);
}
if provenance.import_class == AxiomImportClass::ProductSpecification {
return Err(AdmissionRejectionReason::ProductSpecificationImportRejected);
}
Ok(())
}
pub fn require_source_anchors(anchors: &[SourceAnchor]) -> Result<(), AdmissionRejectionReason> {
if anchors.is_empty() {
return Err(AdmissionRejectionReason::SourceAnchorRequired);
}
if anchors
.iter()
.any(|anchor| anchor.reference.trim().is_empty())
{
return Err(AdmissionRejectionReason::SourceAnchorBlank);
}
Ok(())
}
pub fn require_redaction_status(
redaction_status: RedactionStatus,
) -> Result<(), AdmissionRejectionReason> {
match redaction_status {
RedactionStatus::Redacted
| RedactionStatus::Abstracted
| RedactionStatus::OperatorRawOptIn => Ok(()),
RedactionStatus::RawUnredacted | RedactionStatus::Unknown => {
Err(AdmissionRejectionReason::RedactionStatusRequired)
}
}
}
pub fn require_usable_proof_state(proof_state: ProofState) -> Result<(), AdmissionRejectionReason> {
match proof_state {
ProofState::FullChainVerified | ProofState::Partial => Ok(()),
ProofState::Broken | ProofState::Unknown => {
Err(AdmissionRejectionReason::ProofStateRequired)
}
}
}
pub fn require_contradiction_scan(
contradiction_scan: &ContradictionScan,
) -> Result<(), AdmissionRejectionReason> {
match contradiction_scan {
ContradictionScan::ScannedClean => Ok(()),
ContradictionScan::OpenContradictions(_) => {
Err(AdmissionRejectionReason::OpenContradiction)
}
ContradictionScan::Incomplete | ContradictionScan::NotScanned => {
Err(AdmissionRejectionReason::ContradictionScanRequired)
}
}
}
pub fn require_explicit_non_promotion(
explicit_non_promotion: bool,
) -> Result<(), AdmissionRejectionReason> {
if explicit_non_promotion {
Ok(())
} else {
Err(AdmissionRejectionReason::ExplicitNonPromotionRequired)
}
}
pub fn require_phase_context(phase_context: PhaseContext) -> Result<(), AdmissionRejectionReason> {
if phase_context == PhaseContext::Unknown {
Err(AdmissionRejectionReason::PhaseContextRequired)
} else {
Ok(())
}
}
fn push_err(
reasons: &mut Vec<AdmissionRejectionReason>,
result: Result<(), AdmissionRejectionReason>,
) {
if let Err(reason) = result {
reasons.push(reason);
}
}
#[cfg(test)]
mod tests {
use cortex_core::{
BoundaryQuarantineState, BoundarySourceAnchor, BoundaryToolInvocation, BoundaryToolOutcome,
CapabilityTokenDecision, CapabilityTokenState, ClaimCeiling, ExecutionTrustState,
OperatorApprovalState, PaiAxiomExecutionReceiptV1, ProvenanceClass, RuntimeIntegrityState,
SemanticTrustClass, SemanticUse,
};
use super::*;
fn token_state(decision: CapabilityTokenDecision) -> CapabilityTokenState {
CapabilityTokenState {
decision,
valid_structure: true,
audience_bound: true,
scope_bound: true,
operation_bound: true,
not_expired: true,
not_revoked: true,
policy_allowed: true,
attestation_linked: true,
}
}
fn receipt(decision: CapabilityTokenDecision) -> PaiAxiomExecutionReceiptV1 {
let mut receipt = PaiAxiomExecutionReceiptV1::new(
"axiom-runtime:v3.1/run_01",
token_state(decision),
ExecutionTrustState {
runtime_integrity: RuntimeIntegrityState::VerifiedRelease,
evidence_ref: Some("release:verified".to_string()),
},
OperatorApprovalState::ApprovedBound,
);
receipt.tool_provenance.push(BoundaryToolInvocation {
tool_name: "cargo".to_string(),
invocation_id: "tool_01".to_string(),
input_ref: Some("cmd:cargo test".to_string()),
output_ref: Some("log:passed".to_string()),
outcome: BoundaryToolOutcome::Succeeded,
});
receipt.source_anchors.push(BoundarySourceAnchor {
reference: "artifact:log:passed".to_string(),
kind: "artifact".to_string(),
});
receipt
}
fn valid_request() -> AxiomMemoryAdmissionRequest {
AxiomMemoryAdmissionRequest {
candidate_state: CandidateState::Candidate,
evidence_class: EvidenceClass::Observed,
phase_context: PhaseContext::Check,
tool_provenance: ToolProvenance::new(
"axiom-runtime",
"run_01",
AxiomImportClass::AgentProcedure,
),
source_anchors: vec![SourceAnchor::new(
"evt_01ARZ3NDEKTSV4RRFFQ69G5FAV",
SourceAnchorKind::Event,
)],
redaction_status: RedactionStatus::Abstracted,
proof_state: ProofState::Partial,
contradiction_scan: ContradictionScan::ScannedClean,
explicit_non_promotion: true,
}
}
#[test]
fn axiom_submission_requires_candidate_state_and_tool_provenance() {
let mut request = valid_request();
request.candidate_state = CandidateState::Active;
request.tool_provenance.invocation_id = " ".into();
let reasons = request
.validate()
.expect_err("request must fail validation");
assert!(reasons.contains(&AdmissionRejectionReason::CandidateStateRequired));
assert!(reasons.contains(&AdmissionRejectionReason::ToolProvenanceRequired));
}
#[test]
fn attempted_direct_active_admission_rejects() {
let mut request = valid_request();
request.candidate_state = CandidateState::Active;
assert_eq!(
request.admission_decision(),
AdmissionDecision::Reject {
reasons: vec![AdmissionRejectionReason::CandidateStateRequired],
}
);
}
#[test]
fn missing_tool_provenance_is_quarantined() {
let mut request = valid_request();
request.tool_provenance.tool_name = " ".into();
request.tool_provenance.invocation_id.clear();
assert_eq!(
request.admission_decision(),
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::ToolProvenanceRequired],
}
);
}
#[test]
fn narrative_without_source_anchors_is_quarantined() {
let mut request = valid_request();
request.source_anchors.clear();
assert_eq!(
request.admission_decision(),
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::SourceAnchorRequired],
}
);
}
#[test]
fn admitted_axiom_derived_memory_remains_candidate_only() {
let request = valid_request();
assert_eq!(
request.admission_decision(),
AdmissionDecision::AdmitCandidate
);
assert_eq!(
request.policy_decision().final_outcome,
PolicyOutcome::Allow
);
assert_eq!(request.candidate_state, CandidateState::Candidate);
assert!(request.explicit_non_promotion);
}
#[test]
fn missing_explicit_non_promotion_rejects() {
let mut request = valid_request();
request.explicit_non_promotion = false;
assert_eq!(
request.admission_decision(),
AdmissionDecision::Reject {
reasons: vec![AdmissionRejectionReason::ExplicitNonPromotionRequired],
}
);
}
#[test]
fn product_spec_import_and_raw_unredacted_content_reject() {
let mut request = valid_request();
request.tool_provenance.import_class = AxiomImportClass::ProductSpecification;
request.redaction_status = RedactionStatus::RawUnredacted;
let decision = request.admission_decision();
match decision {
AdmissionDecision::Reject { reasons } => {
assert!(
reasons.contains(&AdmissionRejectionReason::ProductSpecificationImportRejected)
);
assert!(reasons.contains(&AdmissionRejectionReason::RedactionStatusRequired));
}
other => panic!("expected reject, got {other:?}"),
}
}
#[test]
fn open_contradiction_cannot_be_cleanly_admitted() {
let mut request = valid_request();
request.contradiction_scan = ContradictionScan::OpenContradictions(vec!["ctr_01".into()]);
assert_eq!(
request.admission_decision(),
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::OpenContradiction],
}
);
let policy = request.policy_decision();
assert_eq!(policy.final_outcome, PolicyOutcome::Quarantine);
assert_eq!(
policy.contributing[0].rule_id.as_str(),
"memory.admission.open_contradiction"
);
}
#[test]
fn unscanned_contradictions_are_quarantined() {
let mut request = valid_request();
request.contradiction_scan = ContradictionScan::NotScanned;
assert_eq!(
request.admission_decision(),
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::ContradictionScanRequired],
}
);
}
#[test]
fn rejection_reasons_compose_to_policy_reject() {
let mut request = valid_request();
request.candidate_state = CandidateState::Doctrine;
request.explicit_non_promotion = false;
let policy = request.policy_decision();
assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
assert_eq!(policy.contributing.len(), 2);
assert!(policy.contributing.iter().any(|contribution| {
contribution.rule_id.as_str() == "memory.admission.explicit_non_promotion_required"
}));
}
#[test]
fn execution_receipt_enters_candidate_admission_only() {
let request = AxiomMemoryAdmissionRequest::from(receipt(CapabilityTokenDecision::Allowed));
assert_eq!(request.candidate_state, CandidateState::Candidate);
assert_eq!(request.evidence_class, EvidenceClass::Observed);
assert_eq!(request.phase_context, PhaseContext::WorkRecord);
assert_eq!(request.proof_state, ProofState::Partial);
assert!(request.explicit_non_promotion);
assert_eq!(
request.admission_decision(),
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::ContradictionScanRequired],
}
);
}
#[test]
fn runtime_only_axiom_receipt_cannot_pass_high_force_semantic_use() {
let mut request =
AxiomMemoryAdmissionRequest::from(receipt(CapabilityTokenDecision::Allowed));
request.contradiction_scan = ContradictionScan::ScannedClean;
let report = request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::HighForceDoctrine)
.with_independent_source_families(2)
.with_falsification_evidence(true)
.with_unresolved_semantic_unknowns(false),
);
assert_eq!(report.provenance_class, ProvenanceClass::RuntimeDerived);
assert_eq!(
report.semantic_trust.semantic_trust,
SemanticTrustClass::SingleFamily
);
assert_eq!(report.semantic_trust.policy_outcome, PolicyOutcome::Reject);
assert_eq!(report.admission_decision, AdmissionDecision::AdmitCandidate);
assert!(report.explicit_non_promotion);
}
#[test]
fn admission_unknowns_warn_for_candidate_and_quarantine_for_default_context() {
let mut request = valid_request();
request.evidence_class = EvidenceClass::Unknown;
let candidate_report = request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::CandidateMemory)
.with_unresolved_semantic_unknowns(true),
);
assert_eq!(
candidate_report.provenance_class,
ProvenanceClass::UnknownProvenance
);
assert_eq!(
candidate_report.semantic_trust.semantic_trust,
SemanticTrustClass::Unknown
);
assert_eq!(
candidate_report.semantic_trust.policy_outcome,
PolicyOutcome::Warn
);
assert_eq!(
candidate_report.admission_decision,
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::EvidenceClassRequired],
}
);
let context_report = request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::DefaultContext)
.with_unresolved_semantic_unknowns(true),
);
assert_eq!(
context_report.semantic_trust.policy_outcome,
PolicyOutcome::Quarantine
);
assert_eq!(
context_report.semantic_trust.claim_ceiling,
ClaimCeiling::DevOnly
);
}
#[test]
fn observed_admission_passes_high_force_only_with_corroboration_and_falsification() {
let tool_request = valid_request();
let missing_falsification = tool_request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::HighForceDoctrine)
.with_independent_source_families(2)
.with_falsification_evidence(false)
.with_unresolved_semantic_unknowns(false),
);
assert_eq!(
missing_falsification.provenance_class,
ProvenanceClass::ToolObserved
);
assert_eq!(
missing_falsification.semantic_trust.policy_outcome,
PolicyOutcome::Reject
);
let corroborated = tool_request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::HighForceDoctrine)
.with_independent_source_families(2)
.with_falsification_evidence(true)
.with_unresolved_semantic_unknowns(false),
);
assert_eq!(
corroborated.semantic_trust.semantic_trust,
SemanticTrustClass::FalsificationTested
);
assert_eq!(
corroborated.semantic_trust.policy_outcome,
PolicyOutcome::Allow
);
assert_eq!(
corroborated.semantic_trust.claim_ceiling,
ClaimCeiling::AuthorityGrade
);
let mut operator_protocol_request = valid_request();
operator_protocol_request.tool_provenance.import_class = AxiomImportClass::OperatorProtocol;
let operator_protocol_report = operator_protocol_request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::HighForceDoctrine)
.with_independent_source_families(2)
.with_falsification_evidence(true)
.with_unresolved_semantic_unknowns(false),
);
assert_eq!(
operator_protocol_report.provenance_class,
ProvenanceClass::ToolObserved
);
assert_eq!(
operator_protocol_report.semantic_trust.policy_outcome,
PolicyOutcome::Allow
);
}
#[test]
fn semantic_allowance_does_not_replace_explicit_non_promotion() {
let mut request = valid_request();
request.tool_provenance.import_class = AxiomImportClass::OperatorProtocol;
request.explicit_non_promotion = false;
let report = request.semantic_trust_report(
AdmissionSemanticTrustInput::new(SemanticUse::HighForceDoctrine)
.with_independent_source_families(2)
.with_falsification_evidence(true)
.with_unresolved_semantic_unknowns(false),
);
assert_eq!(report.semantic_trust.policy_outcome, PolicyOutcome::Allow);
assert_eq!(
report.admission_decision,
AdmissionDecision::Reject {
reasons: vec![AdmissionRejectionReason::ExplicitNonPromotionRequired],
}
);
assert!(!report.explicit_non_promotion);
}
#[test]
fn rejected_or_contaminated_receipt_fails_closed() {
let mut receipt = receipt(CapabilityTokenDecision::Revoked);
receipt.quarantine_state = BoundaryQuarantineState::Contaminated;
let request = AxiomMemoryAdmissionRequest::from(receipt);
assert_eq!(request.candidate_state, CandidateState::Candidate);
assert_eq!(request.proof_state, ProofState::Broken);
assert!(!request.explicit_non_promotion);
match request.admission_decision() {
AdmissionDecision::Reject { reasons } => {
assert!(reasons.contains(&AdmissionRejectionReason::ProofStateRequired));
assert!(reasons.contains(&AdmissionRejectionReason::ExplicitNonPromotionRequired));
}
other => panic!("expected reject, got {other:?}"),
}
}
#[test]
fn receipt_without_tool_provenance_is_not_clean_admission() {
let mut receipt = receipt(CapabilityTokenDecision::Allowed);
receipt.tool_provenance.clear();
receipt.runtime_id.clear();
let request = AxiomMemoryAdmissionRequest::from(receipt);
assert_eq!(request.evidence_class, EvidenceClass::Claimed);
match request.admission_decision() {
AdmissionDecision::Reject { reasons } | AdmissionDecision::Quarantine { reasons } => {
assert!(reasons.contains(&AdmissionRejectionReason::ToolProvenanceRequired));
}
AdmissionDecision::AdmitCandidate => {
panic!("missing provenance must not admit cleanly")
}
}
}
#[test]
fn generic_axiom_import_envelope_parses_to_candidate_request() {
let json = serde_json::json!({
"candidate_state": "candidate",
"evidence_class": "observed",
"phase_context": "check",
"tool_provenance": {
"tool_name": "codex",
"invocation_id": "run_01",
"import_class": "agent_procedure"
},
"source_anchors": [{
"reference": "cmd:cargo test -p cortex-memory admission --lib",
"kind": "artifact"
}],
"redaction_status": "abstracted",
"proof_state": "partial",
"contradiction_scan": "scanned_clean",
"explicit_non_promotion": true
})
.to_string();
let request = AxiomMemoryAdmissionRequest::from_json_envelope(&json)
.expect("valid generic import envelope parses");
assert_eq!(request.candidate_state, CandidateState::Candidate);
assert_eq!(request.evidence_class, EvidenceClass::Observed);
assert_eq!(
request.admission_decision(),
AdmissionDecision::AdmitCandidate
);
}
#[test]
fn generic_axiom_import_envelope_rejects_unsupported_evidence_class() {
let json = serde_json::json!({
"candidate_state": "candidate",
"evidence_class": "operator_truth",
"phase_context": "check",
"tool_provenance": {
"tool_name": "codex",
"invocation_id": "run_01",
"import_class": "agent_procedure"
},
"source_anchors": [{
"reference": "cmd:cargo test",
"kind": "artifact"
}],
"redaction_status": "abstracted",
"proof_state": "partial",
"contradiction_scan": "scanned_clean",
"explicit_non_promotion": true
})
.to_string();
let err = AxiomMemoryAdmissionRequest::from_json_envelope(&json)
.expect_err("unsupported evidence class must not parse");
match err {
AdmissionEnvelopeError::InvalidEnvelope { message } => {
assert!(message.contains("operator_truth"), "message: {message}");
}
}
}
#[test]
fn generic_axiom_import_envelope_without_required_fields_cannot_admit() {
let json = serde_json::json!({
"candidate_state": "candidate",
"evidence_class": "observed",
"phase_context": "check",
"tool_provenance": {
"tool_name": "codex",
"invocation_id": "run_01",
"import_class": "agent_procedure"
},
"redaction_status": "abstracted",
"proof_state": "partial",
"contradiction_scan": "scanned_clean",
"explicit_non_promotion": true
})
.to_string();
let err = AxiomMemoryAdmissionRequest::from_json_envelope(&json)
.expect_err("missing source_anchors must not parse");
match err {
AdmissionEnvelopeError::InvalidEnvelope { message } => {
assert!(message.contains("source_anchors"), "message: {message}");
}
}
}
#[test]
fn generic_axiom_import_envelope_with_unscanned_contradictions_quarantines() {
let json = serde_json::json!({
"candidate_state": "candidate",
"evidence_class": "observed",
"phase_context": "check",
"tool_provenance": {
"tool_name": "codex",
"invocation_id": "run_01",
"import_class": "agent_procedure"
},
"source_anchors": [{
"reference": "cmd:cargo test",
"kind": "artifact"
}],
"redaction_status": "abstracted",
"proof_state": "partial",
"contradiction_scan": "not_scanned",
"explicit_non_promotion": true
})
.to_string();
let request = AxiomMemoryAdmissionRequest::from_json_envelope(&json)
.expect("known not_scanned posture parses");
assert_eq!(
request.admission_decision(),
AdmissionDecision::Quarantine {
reasons: vec![AdmissionRejectionReason::ContradictionScanRequired],
}
);
}
#[test]
fn admission_proof_closure_report_maps_full_to_full_chain_verified() {
let mut request = valid_request();
request.proof_state = ProofState::FullChainVerified;
let report = request.proof_closure_report();
assert_eq!(report.state(), CoreProofState::FullChainVerified);
assert!(report.failing_edges().is_empty());
}
#[test]
fn admission_proof_closure_report_maps_partial_and_unknown_to_partial() {
for state in [ProofState::Partial, ProofState::Unknown] {
let mut request = valid_request();
request.proof_state = state;
let report = request.proof_closure_report();
assert_eq!(
report.state(),
CoreProofState::Partial,
"{state:?} must map to typed Partial"
);
assert!(!report.is_full_chain_verified());
assert!(!report.is_broken());
}
}
#[test]
fn admission_proof_closure_report_maps_broken_to_broken() {
let mut request = valid_request();
request.proof_state = ProofState::Broken;
let report = request.proof_closure_report();
assert_eq!(report.state(), CoreProofState::Broken);
assert!(report.is_broken());
}
#[test]
fn admission_proof_closure_contribution_outcomes_track_proof_state() {
for (state, expected) in [
(ProofState::FullChainVerified, PolicyOutcome::Allow),
(ProofState::Partial, PolicyOutcome::Quarantine),
(ProofState::Unknown, PolicyOutcome::Quarantine),
(ProofState::Broken, PolicyOutcome::Reject),
] {
let mut request = valid_request();
request.proof_state = state;
let contribution = request.proof_closure_contribution();
assert_eq!(contribution.outcome, expected, "for {state:?}");
assert_eq!(
contribution.rule_id.as_str(),
AXIOM_ADMISSION_PROOF_CLOSURE_RULE_ID
);
}
}
#[test]
fn admission_durable_gate_allows_full_chain_verified_only() {
let mut request = valid_request();
request.proof_state = ProofState::FullChainVerified;
request
.require_durable_admission_allowed()
.expect("FullChainVerified must pass the durable gate");
}
#[test]
fn admission_durable_gate_refuses_partial_with_stable_invariant() {
for state in [ProofState::Partial, ProofState::Unknown, ProofState::Broken] {
let mut request = valid_request();
request.proof_state = state;
let err = request
.require_durable_admission_allowed()
.expect_err("non-full-chain proof state must refuse");
assert!(
err.to_string()
.contains(AXIOM_ADMISSION_PROOF_CLOSURE_INVARIANT),
"refusal must carry stable invariant for {state:?}: {err}"
);
}
}
#[test]
fn admission_proof_closure_invariant_keys_are_stable() {
assert_eq!(
AXIOM_ADMISSION_PROOF_CLOSURE_INVARIANT,
"cortex_memory.admission.axiom.proof_closure"
);
assert_eq!(
AXIOM_ADMISSION_PROOF_CLOSURE_RULE_ID,
"memory.admission.axiom.proof_closure"
);
}
}