use chrono::{DateTime, TimeZone, Utc};
use cortex_core::{
compose_policy_outcomes, KeyLifecycleState, PolicyContribution, PolicyOutcome,
TemporalAuthorityReason, TrustTier,
};
use cortex_store::migrate::apply_pending;
use cortex_store::proof::temporal_authority_proof_report;
use cortex_store::repo::authority::{
key_state_policy_decision_test_allow, principal_state_policy_decision_test_allow,
KEY_STATE_ATTESTED_BY_OPERATOR_RULE_ID, KEY_STATE_TRUST_TIER_GATE_RULE_ID,
PRINCIPAL_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PRINCIPAL_STATE_TIER_PROMOTION_ATTESTATION_RULE_ID, PRINCIPAL_STATE_TRUST_TIER_GATE_RULE_ID,
};
use cortex_store::repo::{
AuthorityRepo, KeyTimelineRecord, PrincipalTimelineRecord, TemporalAuthorityQuery,
};
use cortex_store::Pool;
use rusqlite::Connection;
fn test_pool() -> Pool {
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
apply_pending(&pool).expect("apply migrations");
pool
}
fn at(day: u32) -> DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 1, day, 12, 0, 0).unwrap()
}
fn key(key_id: &str, state: KeyLifecycleState, day: u32) -> KeyTimelineRecord {
KeyTimelineRecord {
key_id: key_id.to_string(),
principal_id: "operator-principal".into(),
state,
effective_at: at(day),
reason: None,
audit_ref: None,
}
}
fn principal(tier: TrustTier, day: u32) -> PrincipalTimelineRecord {
PrincipalTimelineRecord {
principal_id: "operator-principal".into(),
trust_tier: tier,
effective_at: at(day),
trust_review_due_at: None,
removed_at: None,
audit_ref: None,
}
}
#[test]
fn authority_valid_at_t1_invalid_at_t2() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("append operator trust state");
repo.append_key_state(
&key("key-1", KeyLifecycleState::Active, 1),
&key_state_policy_decision_test_allow(),
)
.expect("append active key state");
repo.append_key_state(
&key("key-1", KeyLifecycleState::Revoked, 3),
&key_state_policy_decision_test_allow(),
)
.expect("append revoked key state");
let report = repo
.revalidate(&TemporalAuthorityQuery {
key_id: "key-1".into(),
event_time: at(2),
now: at(4),
minimum_trust_tier: TrustTier::Verified,
})
.expect("revalidate temporal authority");
assert!(report.valid_at_event_time);
assert!(!report.valid_now);
assert_eq!(report.invalidated_after, Some(at(3)));
assert!(report.has_reason(TemporalAuthorityReason::RevokedAfterSigning));
let edge = report
.current_use_failing_edge("doctrine:doc_01ARZ3NDEKTSV4RRFFQ69G5FAV")
.expect("invalid current use has proof edge");
assert!(edge.reason.contains("revoked_after_signing"));
let proof = temporal_authority_proof_report("doctrine:doc_01ARZ3NDEKTSV4RRFFQ69G5FAV", &report);
assert!(proof.is_broken());
assert!(proof.failing_edges()[0]
.reason
.contains("revoked_after_signing"));
}
#[test]
fn authority_signed_after_revocation_is_invalid_at_event_time() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("append operator trust state");
repo.append_key_state(
&key("key-1", KeyLifecycleState::Active, 1),
&key_state_policy_decision_test_allow(),
)
.expect("append active key state");
repo.append_key_state(
&key("key-1", KeyLifecycleState::Revoked, 3),
&key_state_policy_decision_test_allow(),
)
.expect("append revoked key state");
let report = repo
.revalidate(&TemporalAuthorityQuery {
key_id: "key-1".into(),
event_time: at(4),
now: at(5),
minimum_trust_tier: TrustTier::Verified,
})
.expect("revalidate temporal authority");
assert!(!report.valid_at_event_time);
assert!(!report.valid_now);
assert!(report.has_reason(TemporalAuthorityReason::SignedAfterRevocation));
}
#[test]
fn authority_trust_tier_downgrade_invalidates_current_reasoning() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("append operator trust state");
repo.append_key_state(
&key("key-1", KeyLifecycleState::Active, 1),
&key_state_policy_decision_test_allow(),
)
.expect("append active key state");
repo.append_principal_state(
&principal(TrustTier::Observed, 3),
&principal_state_policy_decision_test_allow(),
)
.expect("append downgraded trust state");
let report = repo
.revalidate(&TemporalAuthorityQuery {
key_id: "key-1".into(),
event_time: at(2),
now: at(4),
minimum_trust_tier: TrustTier::Verified,
})
.expect("revalidate temporal authority");
assert!(report.valid_at_event_time);
assert!(!report.valid_now);
assert_eq!(report.invalidated_after, Some(at(3)));
assert!(report.has_reason(TemporalAuthorityReason::TrustTierDowngraded));
}
fn timeline_row_counts(pool: &Pool) -> (i64, i64) {
let key_rows: i64 = pool
.query_row("SELECT COUNT(*) FROM authority_key_timeline;", [], |row| {
row.get(0)
})
.expect("count key timeline rows");
let principal_rows: i64 = pool
.query_row(
"SELECT COUNT(*) FROM authority_principal_timeline;",
[],
|row| row.get(0),
)
.expect("count principal timeline rows");
(key_rows, principal_rows)
}
#[test]
fn append_key_state_rejects_policy_reject_without_mutation() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("seed operator trust state");
let pre = timeline_row_counts(&pool);
let reject_decision = compose_policy_outcomes(
vec![
PolicyContribution::new(
KEY_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PolicyOutcome::Allow,
"operator attestation present",
)
.unwrap(),
PolicyContribution::new(
KEY_STATE_TRUST_TIER_GATE_RULE_ID,
PolicyOutcome::Reject,
"tier below Verified for new key registration",
)
.unwrap(),
],
None,
);
assert_eq!(reject_decision.final_outcome, PolicyOutcome::Reject);
let err = repo
.append_key_state(
&key("key-1", KeyLifecycleState::Active, 1),
&reject_decision,
)
.expect_err("Reject policy must fail closed");
assert!(
err.to_string().contains("Reject"),
"error must name the blocking outcome: {err}"
);
assert_eq!(
timeline_row_counts(&pool),
pre,
"no timeline rows must be written when policy rejects"
);
}
#[test]
fn append_key_state_rejects_missing_contributor_rule() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("seed operator trust state");
let pre = timeline_row_counts(&pool);
let decision_missing_tier_gate = compose_policy_outcomes(
vec![PolicyContribution::new(
KEY_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PolicyOutcome::Allow,
"operator attestation present",
)
.unwrap()],
None,
);
let err = repo
.append_key_state(
&key("key-1", KeyLifecycleState::Active, 1),
&decision_missing_tier_gate,
)
.expect_err("missing trust-tier-gate contributor must fail closed");
assert!(
err.to_string().contains(KEY_STATE_TRUST_TIER_GATE_RULE_ID),
"error must name the missing contributor: {err}"
);
assert_eq!(timeline_row_counts(&pool), pre);
}
#[test]
fn append_key_state_accepts_valid_break_glass_only_when_attestation_passed() {
use cortex_core::{BreakGlassAuthorization, BreakGlassReasonCode, BreakGlassScope};
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("seed operator trust state");
let bg_decision = compose_policy_outcomes(
vec![
PolicyContribution::new(
KEY_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PolicyOutcome::Allow,
"operator attestation present",
)
.unwrap(),
PolicyContribution::new(
KEY_STATE_TRUST_TIER_GATE_RULE_ID,
PolicyOutcome::Quarantine,
"tier review pending",
)
.unwrap()
.allow_break_glass_override(),
PolicyContribution::new(
"authority.key_state.operator_override",
PolicyOutcome::BreakGlass,
"incident response key activation",
)
.unwrap(),
],
Some(BreakGlassAuthorization {
permitted: true,
attested: true,
scope: BreakGlassScope {
operation_type: "authority.key_state.activate".into(),
artifact_refs: vec!["key-1".into()],
not_before: None,
not_after: None,
},
reason_code: BreakGlassReasonCode::IncidentResponse,
}),
);
assert_eq!(bg_decision.final_outcome, PolicyOutcome::BreakGlass);
repo.append_key_state(&key("key-1", KeyLifecycleState::Active, 1), &bg_decision)
.expect("valid break-glass with intact attestation must be accepted");
}
#[test]
fn append_key_state_refuses_break_glassed_attestation_at_authority_root() {
use cortex_core::{BreakGlassAuthorization, BreakGlassReasonCode, BreakGlassScope};
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("seed operator trust state");
let pre = timeline_row_counts(&pool);
let bg_decision = compose_policy_outcomes(
vec![
PolicyContribution::new(
KEY_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PolicyOutcome::Reject,
"operator signature failed to verify",
)
.unwrap()
.allow_break_glass_override(),
PolicyContribution::new(
KEY_STATE_TRUST_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"trust tier is Operator",
)
.unwrap(),
PolicyContribution::new(
"authority.key_state.operator_override",
PolicyOutcome::BreakGlass,
"incident response key activation",
)
.unwrap(),
],
Some(BreakGlassAuthorization {
permitted: true,
attested: true,
scope: BreakGlassScope {
operation_type: "authority.key_state.activate".into(),
artifact_refs: vec!["key-1".into()],
not_before: None,
not_after: None,
},
reason_code: BreakGlassReasonCode::IncidentResponse,
}),
);
let err = repo
.append_key_state(&key("key-1", KeyLifecycleState::Active, 1), &bg_decision)
.expect_err("break-glass must not bypass attestation contributor");
assert!(
err.to_string()
.contains(KEY_STATE_ATTESTED_BY_OPERATOR_RULE_ID),
"error must name the gated contributor: {err}"
);
assert_eq!(timeline_row_counts(&pool), pre);
}
#[test]
fn append_key_state_refuses_active_key_for_sub_verified_principal() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Observed, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("seed observed trust state");
let pre = timeline_row_counts(&pool);
let err = repo
.append_key_state(
&key("key-1", KeyLifecycleState::Active, 2),
&key_state_policy_decision_test_allow(),
)
.expect_err("activating a key for an Observed principal must fail closed");
assert!(
err.to_string().contains("Observed"),
"error must name the offending tier: {err}"
);
assert_eq!(timeline_row_counts(&pool), pre);
}
#[test]
fn append_principal_state_refuses_break_glassed_tier_promotion_attestation() {
use cortex_core::{BreakGlassAuthorization, BreakGlassReasonCode, BreakGlassScope};
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
let pre = timeline_row_counts(&pool);
let bg_decision = compose_policy_outcomes(
vec![
PolicyContribution::new(
PRINCIPAL_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PolicyOutcome::Allow,
"operator attestation present",
)
.unwrap(),
PolicyContribution::new(
PRINCIPAL_STATE_TRUST_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"current operator-tier authority composes the change",
)
.unwrap(),
PolicyContribution::new(
PRINCIPAL_STATE_TIER_PROMOTION_ATTESTATION_RULE_ID,
PolicyOutcome::Reject,
"no fresh promotion attestation on file",
)
.unwrap()
.allow_break_glass_override(),
PolicyContribution::new(
"authority.principal_state.operator_override",
PolicyOutcome::BreakGlass,
"operator override",
)
.unwrap(),
],
Some(BreakGlassAuthorization {
permitted: true,
attested: true,
scope: BreakGlassScope {
operation_type: "authority.principal_state.promote".into(),
artifact_refs: vec!["operator-principal".into()],
not_before: None,
not_after: None,
},
reason_code: BreakGlassReasonCode::OperatorCorrection,
}),
);
let err = repo
.append_principal_state(&principal(TrustTier::Operator, 2), &bg_decision)
.expect_err("break-glass must not bypass tier promotion attestation");
assert!(
err.to_string()
.contains(PRINCIPAL_STATE_TIER_PROMOTION_ATTESTATION_RULE_ID),
"error must name the gated contributor: {err}"
);
assert_eq!(timeline_row_counts(&pool), pre);
}
#[test]
fn append_principal_state_rejects_missing_tier_promotion_attestation() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
let pre = timeline_row_counts(&pool);
let decision_missing_promotion = compose_policy_outcomes(
vec![
PolicyContribution::new(
PRINCIPAL_STATE_ATTESTED_BY_OPERATOR_RULE_ID,
PolicyOutcome::Allow,
"operator attestation present",
)
.unwrap(),
PolicyContribution::new(
PRINCIPAL_STATE_TRUST_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"gate satisfied",
)
.unwrap(),
],
None,
);
let err = repo
.append_principal_state(
&principal(TrustTier::Operator, 1),
&decision_missing_promotion,
)
.expect_err("missing tier_promotion_attestation contributor must fail closed");
assert!(
err.to_string()
.contains(PRINCIPAL_STATE_TIER_PROMOTION_ATTESTATION_RULE_ID),
"error must name the missing contributor: {err}"
);
assert_eq!(timeline_row_counts(&pool), pre);
}
#[test]
fn append_principal_state_accepts_test_allow_decision() {
let pool = test_pool();
let repo = AuthorityRepo::new(&pool);
repo.append_principal_state(
&principal(TrustTier::Operator, 1),
&principal_state_policy_decision_test_allow(),
)
.expect("test_allow decision must accept the principal state mutation");
let (_, principal_rows) = timeline_row_counts(&pool);
assert_eq!(principal_rows, 1);
}