use exo_core::{Did, Hash256, hash::hash_structured};
use serde::{Deserialize, Serialize};
use crate::types::{
AuthorityChain, AuthorityLink, BailmentState, ConsentRecord, GovernmentBranch,
IndependenceClaim, PermissionSet, Provenance, QuorumEvidence, QuorumVote, ReviewOrder, Role,
TrustedAuthorityKeys, TrustedProvenanceKeys, VoiceKind,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ConstitutionalInvariant {
SeparationOfPowers,
ConsentRequired,
NoSelfGrant,
HumanOverride,
KernelImmutability,
AuthorityChainValid,
QuorumLegitimate,
ProvenanceVerifiable,
}
impl ConstitutionalInvariant {
#[must_use]
pub fn id(&self) -> &'static str {
match self {
ConstitutionalInvariant::SeparationOfPowers => "separation-of-powers",
ConstitutionalInvariant::ConsentRequired => "consent-required",
ConstitutionalInvariant::NoSelfGrant => "no-self-grant",
ConstitutionalInvariant::HumanOverride => "human-override",
ConstitutionalInvariant::KernelImmutability => "kernel-immutability",
ConstitutionalInvariant::AuthorityChainValid => "authority-chain-valid",
ConstitutionalInvariant::QuorumLegitimate => "quorum-legitimate",
ConstitutionalInvariant::ProvenanceVerifiable => "provenance-verifiable",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InvariantSet {
pub invariants: Vec<ConstitutionalInvariant>,
}
impl InvariantSet {
#[must_use]
pub fn all() -> Self {
Self {
invariants: vec![
ConstitutionalInvariant::SeparationOfPowers,
ConstitutionalInvariant::ConsentRequired,
ConstitutionalInvariant::NoSelfGrant,
ConstitutionalInvariant::HumanOverride,
ConstitutionalInvariant::KernelImmutability,
ConstitutionalInvariant::AuthorityChainValid,
ConstitutionalInvariant::QuorumLegitimate,
ConstitutionalInvariant::ProvenanceVerifiable,
],
}
}
#[must_use]
pub fn with(invariants: Vec<ConstitutionalInvariant>) -> Self {
Self { invariants }
}
}
#[derive(Debug, Clone)]
pub struct InvariantContext {
pub actor: Did,
pub actor_roles: Vec<Role>,
pub bailment_state: BailmentState,
pub consent_records: Vec<ConsentRecord>,
pub authority_chain: AuthorityChain,
pub is_self_grant: bool,
pub human_override_preserved: bool,
pub kernel_modification_attempted: bool,
pub quorum_evidence: Option<QuorumEvidence>,
pub provenance: Option<Provenance>,
pub actor_permissions: PermissionSet,
pub requested_permissions: PermissionSet,
pub trusted_authority_keys: TrustedAuthorityKeys,
pub trusted_provenance_keys: TrustedProvenanceKeys,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InvariantViolation {
pub invariant: ConstitutionalInvariant,
pub description: String,
pub evidence: Vec<String>,
}
const AUTHORITY_LINK_SIGNATURE_DOMAIN: &str = "exo.gatekeeper.authority-link-signature";
const PROVENANCE_SIGNATURE_DOMAIN: &str = "exo.gatekeeper.provenance-signature";
const QUORUM_VOTE_SIGNATURE_DOMAIN: &str = "exo.gatekeeper.quorum-vote-signature.v1";
const SIGNATURE_PAYLOAD_SCHEMA_VERSION: u16 = 1;
#[derive(Serialize)]
struct AuthorityLinkSignaturePayload<'a> {
domain: &'static str,
schema_version: u16,
grantor: &'a Did,
grantee: &'a Did,
permissions: Vec<&'a str>,
}
#[derive(Serialize)]
struct ProvenanceSignaturePayload<'a> {
domain: &'static str,
schema_version: u16,
actor: &'a Did,
action_hash: &'a [u8],
timestamp: &'a str,
voice_kind: Option<VoiceKind>,
independence: Option<IndependenceClaim>,
review_order: Option<ReviewOrder>,
}
#[derive(Serialize)]
struct QuorumVoteSignaturePayload<'a> {
domain: &'static str,
schema_version: u16,
voter: &'a Did,
approved: bool,
provenance_actor: &'a Did,
provenance_action_hash: &'a [u8],
provenance_timestamp: &'a str,
voice_kind: Option<VoiceKind>,
independence: Option<IndependenceClaim>,
review_order: Option<ReviewOrder>,
}
#[derive(Debug, Clone)]
pub struct InvariantEngine {
pub invariant_set: InvariantSet,
}
impl InvariantEngine {
#[must_use]
pub fn new(invariant_set: InvariantSet) -> Self {
Self { invariant_set }
}
#[must_use]
pub fn all() -> Self {
Self::new(InvariantSet::all())
}
}
pub fn enforce_all(
engine: &InvariantEngine,
context: &InvariantContext,
) -> Result<(), Vec<InvariantViolation>> {
let mut violations = Vec::new();
for invariant in &engine.invariant_set.invariants {
if let Err(v) = check_invariant(*invariant, context) {
violations.push(v);
}
}
if violations.is_empty() {
Ok(())
} else {
Err(violations)
}
}
fn check_invariant(
invariant: ConstitutionalInvariant,
context: &InvariantContext,
) -> Result<(), InvariantViolation> {
match invariant {
ConstitutionalInvariant::SeparationOfPowers => check_separation_of_powers(context),
ConstitutionalInvariant::ConsentRequired => check_consent_required(context),
ConstitutionalInvariant::NoSelfGrant => check_no_self_grant(context),
ConstitutionalInvariant::HumanOverride => check_human_override(context),
ConstitutionalInvariant::KernelImmutability => check_kernel_immutability(context),
ConstitutionalInvariant::AuthorityChainValid => check_authority_chain_valid(context),
ConstitutionalInvariant::QuorumLegitimate => check_quorum_legitimate(context),
ConstitutionalInvariant::ProvenanceVerifiable => check_provenance_verifiable(context),
}
}
fn government_branch_label(branch: GovernmentBranch) -> &'static str {
branch.as_str()
}
fn role_evidence_label(roles: &[Role]) -> String {
roles
.iter()
.map(|role| format!("{}@{}", role.name, government_branch_label(role.branch)))
.collect::<Vec<_>>()
.join(",")
}
fn branch_evidence_label(branches: &std::collections::BTreeSet<GovernmentBranch>) -> String {
branches
.iter()
.map(|branch| government_branch_label(*branch))
.collect::<Vec<_>>()
.join(",")
}
fn bailment_state_evidence(state: &BailmentState) -> Vec<String> {
match state {
BailmentState::None => vec!["bailment_state: none".into()],
BailmentState::Active {
bailor,
bailee,
scope,
} => vec![
"bailment_state: active".into(),
format!("bailor: {bailor}"),
format!("bailee: {bailee}"),
format!("scope: {scope}"),
],
BailmentState::Suspended { reason } => {
vec![
"bailment_state: suspended".into(),
format!("reason: {reason}"),
]
}
BailmentState::Terminated => vec!["bailment_state: terminated".into()],
}
}
fn voice_kind_evidence_label(value: Option<VoiceKind>) -> &'static str {
match value {
Some(VoiceKind::Human) => "human",
Some(VoiceKind::Synthetic) => "synthetic",
Some(VoiceKind::System) => "system",
None => "missing",
}
}
fn independence_evidence_label(value: Option<IndependenceClaim>) -> &'static str {
match value {
Some(IndependenceClaim::Independent) => "independent",
Some(IndependenceClaim::Coordinated) => "coordinated",
None => "missing",
}
}
fn review_order_evidence_label(value: Option<ReviewOrder>) -> &'static str {
match value {
Some(ReviewOrder::FirstOrder) => "first-order",
Some(ReviewOrder::Derivative) => "derivative",
None => "missing",
}
}
fn check_separation_of_powers(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
let mut branches = std::collections::BTreeSet::new();
for (index, role) in ctx.actor_roles.iter().enumerate() {
if let Err(err) = role.validate_governed() {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::SeparationOfPowers,
description: err.to_string(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("role_index: {index}"),
format!("actual_branch: {}", government_branch_label(role.branch)),
],
});
}
branches.insert(role.branch);
}
if branches.contains(&GovernmentBranch::Legislative)
&& branches.contains(&GovernmentBranch::Executive)
&& branches.contains(&GovernmentBranch::Judicial)
{
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::SeparationOfPowers,
description: "Actor holds roles in all three branches of government".into(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("roles: {}", role_evidence_label(&ctx.actor_roles)),
],
});
}
if branches.len() > 1 {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::SeparationOfPowers,
description: "Actor holds roles in multiple branches of government".into(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("branches: {}", branch_evidence_label(&branches)),
],
});
}
Ok(())
}
fn check_consent_required(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
let (bailor, bailee, scope) = match &ctx.bailment_state {
BailmentState::Active {
bailor,
bailee,
scope,
} => (bailor, bailee, scope),
_ => {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ConsentRequired,
description: "No active bailment for this action".into(),
evidence: bailment_state_evidence(&ctx.bailment_state),
});
}
};
if bailee != &ctx.actor {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ConsentRequired,
description: "Active bailment is not granted to actor".into(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("bailment_bailee: {bailee}"),
],
});
}
let has_active = ctx.consent_records.iter().any(|c| {
c.subject == *bailor && c.granted_to == ctx.actor && c.scope == *scope && c.active
});
if !has_active {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ConsentRequired,
description: "No active consent record matching actor, bailor, and scope".into(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("bailor: {bailor}"),
format!("scope: {scope}"),
format!("records: {}", ctx.consent_records.len()),
],
});
}
if !consent_scope_covers_permissions(scope, &ctx.requested_permissions) {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ConsentRequired,
description: "Active consent scope does not cover requested permission".into(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("scope: {scope}"),
format!(
"requested_permissions: {}",
permission_set_evidence_label(&ctx.requested_permissions)
),
],
});
}
Ok(())
}
#[must_use]
pub fn consent_scope_covers_permissions(
scope: &str,
requested_permissions: &PermissionSet,
) -> bool {
requested_permissions
.permissions
.iter()
.all(|permission| consent_scope_covers_permission(scope, &permission.0))
}
fn consent_scope_covers_permission(scope: &str, permission: &str) -> bool {
let scope = scope.trim();
let permission = permission.trim();
if permission.is_empty() || scope.is_empty() {
return false;
}
if scope.eq_ignore_ascii_case(permission) {
return true;
}
consent_scope_clauses(scope)
.iter()
.any(|scope_clause| consent_scope_clause_covers_permission(scope_clause, permission))
}
fn consent_scope_clauses(scope: &str) -> Vec<&str> {
scope
.split([',', ';'])
.map(str::trim)
.filter(|scope_clause| !scope_clause.is_empty())
.collect()
}
fn consent_scope_clause_covers_permission(scope_clause: &str, permission: &str) -> bool {
if scope_clause.eq_ignore_ascii_case(permission) {
return true;
}
let scope_tokens = scope_permission_tokens(scope_clause);
let permission_tokens = scope_permission_tokens(permission);
if scope_tokens.is_empty() || permission_tokens.is_empty() {
return false;
}
if permission_tokens.len() == 1
&& scope_tokens
.last()
.is_some_and(|last| last == &permission_tokens[0])
{
return true;
}
if scope_tokens.len() > 1
&& permission_tokens.len() >= scope_tokens.len()
&& permission_tokens.starts_with(&scope_tokens)
{
return true;
}
if scope_tokens.len() > 1
&& permission_tokens.len() > 1
&& scope_tokens.last() == permission_tokens.last()
&& scope_tokens
.last()
.is_some_and(|token| !is_generic_permission_action_token(token))
{
return true;
}
canonical_scope_tokens(scope_tokens) == canonical_scope_tokens(permission_tokens)
}
fn is_generic_permission_action_token(token: &str) -> bool {
matches!(
token,
"attest"
| "create"
| "delete"
| "execute"
| "grant"
| "propose"
| "publish"
| "read"
| "recommend"
| "revoke"
| "sign"
| "update"
| "validate"
| "verify"
| "write"
)
}
fn scope_permission_tokens(value: &str) -> Vec<String> {
value
.split([':', '_', '.', '/', '-', ' '])
.filter_map(|token| {
let token = token.trim();
if token.is_empty() {
None
} else {
Some(token.to_ascii_lowercase())
}
})
.collect()
}
fn canonical_scope_tokens(mut tokens: Vec<String>) -> Vec<String> {
tokens.sort();
tokens.dedup();
tokens
}
fn permission_set_evidence_label(permissions: &PermissionSet) -> String {
let mut labels: Vec<&str> = permissions
.permissions
.iter()
.map(|permission| permission.0.as_str())
.collect();
labels.sort_unstable();
labels.dedup();
if labels.is_empty() {
"none".to_string()
} else {
labels.join(",")
}
}
fn check_no_self_grant(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
if ctx.is_self_grant {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::NoSelfGrant,
description: "Actor attempted to expand own permissions".into(),
evidence: vec![format!("actor: {}", ctx.actor)],
});
}
Ok(())
}
fn check_human_override(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
if !ctx.human_override_preserved {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::HumanOverride,
description: "Human override capability is not preserved".into(),
evidence: vec!["human_override_preserved: false".into()],
});
}
Ok(())
}
fn check_kernel_immutability(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
if ctx.kernel_modification_attempted {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::KernelImmutability,
description: "Attempted to modify immutable kernel configuration".into(),
evidence: vec!["kernel_modification_attempted: true".into()],
});
}
Ok(())
}
fn check_authority_chain_valid(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
if ctx.authority_chain.is_empty() {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: "Authority chain is empty — no delegation path".into(),
evidence: vec!["authority_chain: empty".into()],
});
}
let links = &ctx.authority_chain.links;
for i in 0..links.len().saturating_sub(1) {
if links[i].grantee != links[i + 1].grantor {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: "Authority chain is broken — delegation gap".into(),
evidence: vec![
format!("link[{}].grantee: {}", i, links[i].grantee),
format!("link[{}].grantor: {}", i + 1, links[i + 1].grantor),
],
});
}
}
if let Some(last) = links.last() {
if last.grantee != ctx.actor {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: "Authority chain does not terminate at actor".into(),
evidence: vec![
format!("terminal: {}", last.grantee),
format!("actor: {}", ctx.actor),
],
});
}
}
for (idx, link) in links.iter().enumerate() {
let pk_bytes = link
.grantor_public_key
.as_ref()
.ok_or_else(|| InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!("link[{idx}] missing grantor_public_key"),
evidence: vec![format!("grantor: {}", link.grantor)],
})?;
let pk_arr: [u8; 32] = pk_bytes
.as_slice()
.try_into()
.map_err(|_| InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!("link[{idx}] grantor_public_key is not 32 bytes"),
evidence: vec![format!("key_len: {}", pk_bytes.len())],
})?;
let trusted_keys = ctx
.trusted_authority_keys
.get(&link.grantor)
.ok_or_else(|| InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!(
"link[{idx}] grantor_public_key is unresolved for grantor DID"
),
evidence: vec![format!("grantor: {}", link.grantor)],
})?;
if !trusted_keys
.iter()
.any(|trusted_key| trusted_key.as_slice() == pk_bytes.as_slice())
{
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!("link[{idx}] grantor_public_key is not bound to grantor DID"),
evidence: vec![format!("grantor: {}", link.grantor)],
});
}
let sig_arr: [u8; 64] =
link.signature
.as_slice()
.try_into()
.map_err(|_| InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!(
"link[{idx}] signature is not 64 bytes (required for Ed25519)"
),
evidence: vec![format!("sig_len: {}", link.signature.len())],
})?;
let message = authority_link_signature_message(link).map_err(|err| InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!("link[{idx}] canonical signature payload could not be encoded"),
evidence: vec![
format!("grantor: {}", link.grantor),
format!("grantee: {}", link.grantee),
format!("error: {err}"),
],
})?;
let pubkey = exo_core::PublicKey::from_bytes(pk_arr);
let sig = exo_core::Signature::from_bytes(sig_arr);
if !exo_core::crypto::verify(message.as_bytes(), &sig, &pubkey) {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: format!("link[{idx}] Ed25519 signature is cryptographically invalid"),
evidence: vec![
format!("grantor: {}", link.grantor),
format!("grantee: {}", link.grantee),
],
});
}
}
for permission in &ctx.requested_permissions.permissions {
if !ctx.actor_permissions.contains(permission) {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: "Requested permission is absent from actor authority context".into(),
evidence: vec![
format!("actor: {}", ctx.actor),
format!("requested_permission: {}", permission.0),
],
});
}
for (idx, link) in links.iter().enumerate() {
if !link.permissions.contains(permission) {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::AuthorityChainValid,
description: "Requested permission is absent from authority chain scope".into(),
evidence: vec![
format!("link_index: {idx}"),
format!("grantor: {}", link.grantor),
format!("grantee: {}", link.grantee),
format!("requested_permission: {}", permission.0),
],
});
}
}
}
Ok(())
}
fn check_quorum_legitimate(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
match &ctx.quorum_evidence {
None => Ok(()),
Some(evidence) => {
let Some(threshold) = usize::try_from(evidence.threshold).ok() else {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::QuorumLegitimate,
description: "Quorum threshold cannot be represented on this platform".into(),
evidence: vec![format!("threshold: {}", evidence.threshold)],
});
};
let duplicate_voters = evidence.duplicate_voters();
if !duplicate_voters.is_empty() {
let duplicate_voters = duplicate_voters
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ");
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::QuorumLegitimate,
description: "Quorum evidence contains duplicate voter entries".into(),
evidence: vec![
format!("threshold: {}", evidence.threshold),
format!("duplicate_voters: {duplicate_voters}"),
],
});
}
let (verified_human_approvals, rejected_vote_evidence) =
verified_human_quorum_approvals(evidence, ctx);
if verified_human_approvals < threshold {
let synthetic_excluded = evidence.synthetic_vote_count();
let mut violation_evidence = vec![
format!("threshold: {}", evidence.threshold),
format!("verified_human_approvals: {verified_human_approvals}"),
format!("synthetic_votes_excluded: {synthetic_excluded}"),
format!("rejected_approved_votes: {}", rejected_vote_evidence.len()),
];
violation_evidence.extend(rejected_vote_evidence);
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::QuorumLegitimate,
description: "Quorum threshold not met by verified human vote provenance"
.into(),
evidence: violation_evidence,
});
}
Ok(())
}
}
}
fn verified_human_quorum_approvals(
evidence: &QuorumEvidence,
ctx: &InvariantContext,
) -> (usize, Vec<String>) {
let mut approved_voters = std::collections::BTreeSet::new();
let mut rejected_vote_evidence = Vec::new();
for (index, vote) in evidence.votes.iter().enumerate() {
if !vote.approved {
continue;
}
match verify_human_quorum_vote(vote, index, ctx) {
Ok(()) => {
approved_voters.insert(vote.voter.clone());
}
Err(reason) => rejected_vote_evidence.push(reason),
}
}
(approved_voters.len(), rejected_vote_evidence)
}
fn verify_human_quorum_vote(
vote: &QuorumVote,
index: usize,
ctx: &InvariantContext,
) -> Result<(), String> {
let Some(prov) = vote.provenance.as_ref() else {
return Err(format!(
"vote_index: {index}; voter: {}; provenance: missing",
vote.voter
));
};
if prov.actor != vote.voter {
return Err(format!(
"vote_index: {index}; voter: {}; provenance_actor: {}",
vote.voter, prov.actor
));
}
if !prov.is_human_voice() {
return Err(format!(
"vote_index: {index}; voter: {}; voice_kind: {}",
vote.voter,
voice_kind_evidence_label(prov.voice_kind)
));
}
if !prov.is_independent() {
return Err(format!(
"vote_index: {index}; voter: {}; independence: {}",
vote.voter,
independence_evidence_label(prov.independence)
));
}
if !prov.is_first_order_review() {
return Err(format!(
"vote_index: {index}; voter: {}; review_order: {}",
vote.voter,
review_order_evidence_label(prov.review_order)
));
}
let Some(pk_bytes) = prov.public_key.as_ref() else {
return Err(format!(
"vote_index: {index}; voter: {}; public_key: missing",
vote.voter
));
};
let Ok(pk_arr) = <[u8; 32]>::try_from(pk_bytes.as_slice()) else {
return Err(format!(
"vote_index: {index}; voter: {}; public_key_len: {}",
vote.voter,
pk_bytes.len()
));
};
let Some(trusted_keys) = ctx.trusted_provenance_keys.get(&vote.voter) else {
return Err(format!(
"vote_index: {index}; voter: {}; public_key: unresolved",
vote.voter
));
};
if !trusted_keys
.iter()
.any(|trusted_key| trusted_key.as_slice() == pk_bytes.as_slice())
{
return Err(format!(
"vote_index: {index}; voter: {}; public_key: not-bound",
vote.voter
));
}
let Ok(sig_arr) = <[u8; 64]>::try_from(prov.signature.as_slice()) else {
return Err(format!(
"vote_index: {index}; voter: {}; signature_len: {}",
vote.voter,
prov.signature.len()
));
};
let message = provenance_signature_message(prov).map_err(|err| {
format!(
"vote_index: {index}; voter: {}; signature_payload_error: {err}",
vote.voter
)
})?;
let pubkey = exo_core::PublicKey::from_bytes(pk_arr);
let provenance_sig = exo_core::Signature::from_bytes(sig_arr);
if !exo_core::crypto::verify(message.as_bytes(), &provenance_sig, &pubkey) {
return Err(format!(
"vote_index: {index}; voter: {}; signature: invalid",
vote.voter
));
}
let Ok(vote_sig_arr) = <[u8; 64]>::try_from(vote.signature.as_slice()) else {
return Err(format!(
"vote_index: {index}; voter: {}; vote_signature_len: {}",
vote.voter,
vote.signature.len()
));
};
let vote_message = quorum_vote_signature_message(vote, prov).map_err(|err| {
format!(
"vote_index: {index}; voter: {}; vote_signature_payload_error: {err}",
vote.voter
)
})?;
let vote_sig = exo_core::Signature::from_bytes(vote_sig_arr);
if !exo_core::crypto::verify(vote_message.as_bytes(), &vote_sig, &pubkey) {
return Err(format!(
"vote_index: {index}; voter: {}; vote_signature: invalid",
vote.voter
));
}
Ok(())
}
fn check_provenance_verifiable(ctx: &InvariantContext) -> Result<(), InvariantViolation> {
match &ctx.provenance {
None => Err(InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "No provenance metadata provided".into(),
evidence: vec!["provenance: None".into()],
}),
Some(prov) => {
if prov.actor != ctx.actor {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance actor does not match request actor".into(),
evidence: vec![
format!("provenance.actor: {}", prov.actor),
format!("context.actor: {}", ctx.actor),
],
});
}
let pk_bytes = prov.public_key.as_ref().ok_or_else(|| InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance public_key is required for Ed25519 verification".into(),
evidence: vec![format!("actor: {}", prov.actor)],
})?;
let pk_arr: [u8; 32] =
pk_bytes
.as_slice()
.try_into()
.map_err(|_| InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance public_key is not 32 bytes".into(),
evidence: vec![format!("key_len: {}", pk_bytes.len())],
})?;
let trusted_keys = ctx
.trusted_provenance_keys
.get(&prov.actor)
.ok_or_else(|| InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance public_key is unresolved for actor DID".into(),
evidence: vec![format!("actor: {}", prov.actor)],
})?;
if !trusted_keys
.iter()
.any(|trusted_key| trusted_key.as_slice() == pk_bytes.as_slice())
{
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance public_key is not bound to actor DID".into(),
evidence: vec![format!("actor: {}", prov.actor)],
});
}
let sig_arr: [u8; 64] =
prov.signature
.as_slice()
.try_into()
.map_err(|_| InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance signature is not 64 bytes (required for Ed25519)"
.into(),
evidence: vec![format!("sig_len: {}", prov.signature.len())],
})?;
let message = provenance_signature_message(prov).map_err(|err| InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance canonical signature payload could not be encoded".into(),
evidence: vec![format!("actor: {}", prov.actor), format!("error: {err}")],
})?;
let pubkey = exo_core::PublicKey::from_bytes(pk_arr);
let sig = exo_core::Signature::from_bytes(sig_arr);
if !exo_core::crypto::verify(message.as_bytes(), &sig, &pubkey) {
return Err(InvariantViolation {
invariant: ConstitutionalInvariant::ProvenanceVerifiable,
description: "Provenance Ed25519 signature is cryptographically invalid".into(),
evidence: vec![format!("actor: {}", prov.actor)],
});
}
Ok(())
}
}
}
pub fn authority_link_signature_message(link: &AuthorityLink) -> exo_core::Result<Hash256> {
let mut permissions = link
.permissions
.permissions
.iter()
.map(|permission| permission.0.as_str())
.collect::<Vec<_>>();
permissions.sort_unstable();
hash_structured(&AuthorityLinkSignaturePayload {
domain: AUTHORITY_LINK_SIGNATURE_DOMAIN,
schema_version: SIGNATURE_PAYLOAD_SCHEMA_VERSION,
grantor: &link.grantor,
grantee: &link.grantee,
permissions,
})
}
pub fn provenance_signature_message(prov: &Provenance) -> exo_core::Result<Hash256> {
hash_structured(&ProvenanceSignaturePayload {
domain: PROVENANCE_SIGNATURE_DOMAIN,
schema_version: SIGNATURE_PAYLOAD_SCHEMA_VERSION,
actor: &prov.actor,
action_hash: &prov.action_hash,
timestamp: &prov.timestamp,
voice_kind: prov.voice_kind,
independence: prov.independence,
review_order: prov.review_order,
})
}
fn quorum_vote_signature_message(
vote: &QuorumVote,
prov: &Provenance,
) -> exo_core::Result<Hash256> {
hash_structured(&QuorumVoteSignaturePayload {
domain: QUORUM_VOTE_SIGNATURE_DOMAIN,
schema_version: SIGNATURE_PAYLOAD_SCHEMA_VERSION,
voter: &vote.voter,
approved: vote.approved,
provenance_actor: &prov.actor,
provenance_action_hash: &prov.action_hash,
provenance_timestamp: &prov.timestamp,
voice_kind: prov.voice_kind,
independence: prov.independence,
review_order: prov.review_order,
})
}
#[cfg(test)]
mod tests {
use exo_core::Hash256;
use super::*;
use crate::types::{
AuthorityLink, IndependenceClaim, Permission, QuorumVote, ReviewOrder, VoiceKind,
};
fn did(s: &str) -> Did {
Did::new(s).expect("valid DID")
}
fn production_source() -> &'static str {
let source = include_str!("invariants.rs");
let end = source
.find("// ===========================================================================")
.expect("tests section marker must exist");
&source[..end]
}
fn passing_context() -> InvariantContext {
let actor = did("did:exo:actor1");
let (authority_link, authority_public_key) = signed_link("did:exo:root", actor.as_str());
let (provenance, provenance_public_key, _) = signed_provenance(actor.as_str());
let mut trusted_authority_keys = TrustedAuthorityKeys::default();
trusted_authority_keys.insert(
authority_link.grantor.clone(),
vec![authority_public_key.as_bytes().to_vec()],
);
let mut trusted_provenance_keys = TrustedProvenanceKeys::default();
trusted_provenance_keys.insert(
actor.clone(),
vec![provenance_public_key.as_bytes().to_vec()],
);
InvariantContext {
actor: actor.clone(),
actor_roles: vec![Role {
name: "judge".into(),
branch: GovernmentBranch::Judicial,
}],
bailment_state: BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: actor.clone(),
scope: "data:medical".into(),
},
consent_records: vec![ConsentRecord {
subject: did("did:exo:bailor"),
granted_to: actor.clone(),
scope: "data:medical".into(),
active: true,
}],
authority_chain: AuthorityChain {
links: vec![authority_link],
},
is_self_grant: false,
human_override_preserved: true,
kernel_modification_attempted: false,
quorum_evidence: None,
provenance: Some(provenance),
actor_permissions: PermissionSet::new(vec![Permission::new("read")]),
requested_permissions: PermissionSet::default(),
trusted_authority_keys,
trusted_provenance_keys,
}
}
#[test]
fn all_invariants_pass() {
let engine = InvariantEngine::all();
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn separation_fails_multi_branch() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::SeparationOfPowers,
]));
let mut ctx = passing_context();
ctx.actor_roles = vec![
Role {
name: "senator".into(),
branch: GovernmentBranch::Legislative,
},
Role {
name: "judge".into(),
branch: GovernmentBranch::Judicial,
},
];
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn separation_evidence_uses_stable_role_labels() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::SeparationOfPowers,
]));
let mut ctx = passing_context();
ctx.actor_roles = vec![
Role {
name: "senator".into(),
branch: GovernmentBranch::Legislative,
},
Role {
name: "judge".into(),
branch: GovernmentBranch::Judicial,
},
];
let err = enforce_all(&engine, &ctx).expect_err("multi-branch role must fail");
assert_eq!(
err[0].evidence,
vec![
"actor: did:exo:actor1".to_string(),
"branches: legislative,judicial".to_string(),
]
);
assert!(
err[0]
.evidence
.iter()
.all(|entry| !entry.contains("Role {") && !entry.contains("Legislative")),
"public evidence must not expose Rust Debug labels"
);
}
#[test]
fn separation_fails_all_three() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::SeparationOfPowers,
]));
let mut ctx = passing_context();
ctx.actor_roles = vec![
Role {
name: "senator".into(),
branch: GovernmentBranch::Legislative,
},
Role {
name: "worker".into(),
branch: GovernmentBranch::Executive,
},
Role {
name: "judge".into(),
branch: GovernmentBranch::Judicial,
},
];
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn separation_passes_single_branch() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::SeparationOfPowers,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn separation_rejects_unknown_role_name() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::SeparationOfPowers,
]));
let mut ctx = passing_context();
ctx.actor_roles = vec![Role {
name: "root".into(),
branch: GovernmentBranch::Judicial,
}];
let err = enforce_all(&engine, &ctx).expect_err("unknown role name must fail closed");
assert_eq!(
err[0].invariant,
ConstitutionalInvariant::SeparationOfPowers
);
assert!(
err[0].description.contains("unknown governed role name"),
"unexpected description: {}",
err[0].description
);
}
#[test]
fn separation_rejects_role_name_branch_mismatch() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::SeparationOfPowers,
]));
let mut ctx = passing_context();
ctx.actor_roles = vec![Role {
name: "judge".into(),
branch: GovernmentBranch::Executive,
}];
let err = enforce_all(&engine, &ctx).expect_err("role name branch mismatch must fail");
assert_eq!(
err[0].invariant,
ConstitutionalInvariant::SeparationOfPowers
);
assert!(
err[0]
.description
.contains("does not match governed branch"),
"unexpected description: {}",
err[0].description
);
}
#[test]
fn consent_fails_no_bailment() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::None;
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::ConsentRequired);
}
#[test]
fn consent_evidence_uses_stable_bailment_state_labels() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Suspended {
reason: "audit hold".into(),
};
let err = enforce_all(&engine, &ctx).expect_err("suspended bailment must fail");
assert_eq!(
err[0].evidence,
vec![
"bailment_state: suspended".to_string(),
"reason: audit hold".to_string(),
]
);
assert!(
err[0]
.evidence
.iter()
.all(|entry| !entry.contains("Suspended {")),
"public evidence must not expose Rust Debug labels"
);
}
#[test]
fn consent_fails_inactive_record() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.consent_records[0].active = false;
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn consent_fails_wrong_grantee() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.consent_records[0].granted_to = did("did:exo:other");
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn consent_fails_when_bailment_bailee_is_not_actor() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: did("did:exo:other-bailee"),
scope: "data:medical".into(),
};
assert!(
enforce_all(&engine, &ctx).is_err(),
"ConsentRequired must bind the active bailment bailee to the actor"
);
}
#[test]
fn consent_fails_when_record_subject_does_not_match_bailor() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.consent_records[0].subject = did("did:exo:unrelated-subject");
assert!(
enforce_all(&engine, &ctx).is_err(),
"ConsentRequired must bind the consent subject to the active bailment bailor"
);
}
#[test]
fn consent_fails_when_record_scope_does_not_match_bailment_scope() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.consent_records[0].scope = "data:genomics".into();
assert!(
enforce_all(&engine, &ctx).is_err(),
"ConsentRequired must bind the consent record scope to the active bailment scope"
);
}
#[test]
fn consent_fails_when_scope_does_not_cover_requested_permission() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: ctx.actor.clone(),
scope: "data:profile".into(),
};
ctx.consent_records[0].scope = "data:profile".into();
ctx.requested_permissions = PermissionSet::new(vec![Permission::new("advance_pace")]);
assert!(
enforce_all(&engine, &ctx).is_err(),
"ConsentRequired must reject active consent whose scope does not cover the requested permission"
);
}
#[test]
fn consent_passes_when_scope_covers_requested_permission() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: ctx.actor.clone(),
scope: "pace:advance".into(),
};
ctx.consent_records[0].scope = "pace:advance".into();
ctx.requested_permissions = PermissionSet::new(vec![Permission::new("advance_pace")]);
assert!(
enforce_all(&engine, &ctx).is_ok(),
"ConsentRequired must accept the established pace:advance scope for advance_pace"
);
}
#[test]
fn consent_passes_when_resource_scope_covers_requested_permission_object() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: ctx.actor.clone(),
scope: "governance:decision".into(),
};
ctx.consent_records[0].scope = "governance:decision".into();
ctx.requested_permissions = PermissionSet::new(vec![Permission::new("enact:decision")]);
assert!(
enforce_all(&engine, &ctx).is_ok(),
"ConsentRequired must accept resource consent for a permission on the same object class"
);
}
#[test]
fn consent_passes_when_explicit_scope_list_covers_all_requested_permissions() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: ctx.actor.clone(),
scope: "network.peers.read;network.topology.recommend".into(),
};
ctx.consent_records[0].scope = "network.peers.read;network.topology.recommend".into();
ctx.requested_permissions = PermissionSet::new(vec![
Permission::new("network.peers.read"),
Permission::new("network.topology.recommend"),
]);
assert!(
enforce_all(&engine, &ctx).is_ok(),
"ConsentRequired must accept explicit multi-permission consent scopes"
);
}
#[test]
fn consent_fails_when_scope_only_shares_non_object_token_with_permission() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Active {
bailor: did("did:exo:bailor"),
bailee: ctx.actor.clone(),
scope: "data:profile".into(),
};
ctx.consent_records[0].scope = "data:profile".into();
ctx.requested_permissions = PermissionSet::new(vec![Permission::new("profile:delete")]);
assert!(
enforce_all(&engine, &ctx).is_err(),
"ConsentRequired must not treat a shared verb or middle token as object consent"
);
}
#[test]
fn consent_passes() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn consent_fails_suspended() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Suspended {
reason: "audit".into(),
};
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn consent_fails_terminated() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ConsentRequired,
]));
let mut ctx = passing_context();
ctx.bailment_state = BailmentState::Terminated;
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn no_self_grant_fails() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::NoSelfGrant,
]));
let mut ctx = passing_context();
ctx.is_self_grant = true;
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn no_self_grant_passes() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::NoSelfGrant,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn human_override_fails() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::HumanOverride,
]));
let mut ctx = passing_context();
ctx.human_override_preserved = false;
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn human_override_passes() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::HumanOverride,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn kernel_immutability_fails() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::KernelImmutability,
]));
let mut ctx = passing_context();
ctx.kernel_modification_attempted = true;
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn kernel_immutability_passes() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::KernelImmutability,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn authority_chain_fails_empty() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
ctx.authority_chain = AuthorityChain::default();
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn authority_chain_fails_broken() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
ctx.authority_chain = AuthorityChain {
links: vec![
AuthorityLink {
grantor: did("did:exo:root"),
grantee: did("did:exo:mid"),
permissions: PermissionSet::default(),
signature: vec![1],
grantor_public_key: None,
},
AuthorityLink {
grantor: did("did:exo:WRONG"),
grantee: ctx.actor.clone(),
permissions: PermissionSet::default(),
signature: vec![2],
grantor_public_key: None,
},
],
};
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("broken"));
}
#[test]
fn authority_chain_fails_wrong_terminal() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
ctx.authority_chain = AuthorityChain {
links: vec![AuthorityLink {
grantor: did("did:exo:root"),
grantee: did("did:exo:other"),
permissions: PermissionSet::default(),
signature: vec![1],
grantor_public_key: None,
}],
};
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("terminate"));
}
#[test]
fn authority_chain_passes_valid() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn authority_chain_passes_multi_link() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (link1, pk1) = signed_link("did:exo:root", "did:exo:mid");
let (link2, pk2) = signed_link("did:exo:mid", "did:exo:actor1");
ctx.trusted_authority_keys
.insert(link1.grantor.clone(), vec![pk1.as_bytes().to_vec()]);
ctx.trusted_authority_keys
.insert(link2.grantor.clone(), vec![pk2.as_bytes().to_vec()]);
ctx.authority_chain = AuthorityChain {
links: vec![link1, link2],
};
assert!(enforce_all(&engine, &ctx).is_ok());
}
#[test]
fn quorum_passes_none() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn quorum_fails_threshold() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 3,
votes: vec![
QuorumVote {
voter: did("did:exo:v1"),
approved: true,
signature: vec![1],
provenance: None,
},
QuorumVote {
voter: did("did:exo:v2"),
approved: false,
signature: vec![2],
provenance: None,
},
],
});
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn quorum_rejects_raw_votes_without_verified_vote_provenance() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 2,
votes: vec![
QuorumVote {
voter: did("did:exo:v1"),
approved: true,
signature: vec![1],
provenance: None,
},
QuorumVote {
voter: did("did:exo:v2"),
approved: true,
signature: vec![2],
provenance: None,
},
],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(err[0].description.contains("verified human"));
}
#[test]
fn provenance_fails_missing() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
ctx.provenance = None;
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn provenance_fails_unsigned() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (pk, _) = exo_core::crypto::generate_keypair();
ctx.provenance = Some(Provenance {
actor: ctx.actor.clone(),
timestamp: "t".into(),
action_hash: vec![1],
signature: vec![],
public_key: Some(pk.as_bytes().to_vec()),
voice_kind: None,
independence: None,
review_order: None,
});
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn provenance_fails_without_public_key_even_with_non_empty_signature() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
ctx.provenance = Some(Provenance {
actor: ctx.actor.clone(),
timestamp: "2026-01-01T00:00:00Z".into(),
action_hash: vec![1],
signature: vec![7u8; 64],
public_key: None,
voice_kind: None,
independence: None,
review_order: None,
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("public_key"));
}
#[test]
fn provenance_fails_actor_mismatch() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
ctx.provenance = Some(Provenance {
actor: did("did:exo:wrong"),
timestamp: "t".into(),
action_hash: vec![1],
signature: vec![1],
public_key: None,
voice_kind: None,
independence: None,
review_order: None,
});
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn provenance_passes() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
assert!(enforce_all(&engine, &passing_context()).is_ok());
}
#[test]
fn multiple_violations_collected() {
let engine = InvariantEngine::all();
let mut ctx = passing_context();
ctx.is_self_grant = true;
ctx.human_override_preserved = false;
ctx.kernel_modification_attempted = true;
let violations = enforce_all(&engine, &ctx).unwrap_err();
assert!(violations.len() >= 3);
}
#[test]
fn invariant_set_all_count() {
assert_eq!(InvariantSet::all().invariants.len(), 8);
}
#[test]
fn invariant_ids_are_stable_and_non_debug() {
let ids: Vec<&str> = InvariantSet::all()
.invariants
.iter()
.map(ConstitutionalInvariant::id)
.collect();
assert_eq!(
ids,
vec![
"separation-of-powers",
"consent-required",
"no-self-grant",
"human-override",
"kernel-immutability",
"authority-chain-valid",
"quorum-legitimate",
"provenance-verifiable",
]
);
assert!(
ids.iter()
.all(|id| !id.contains("Of") && !id.contains("Required")),
"stable invariant IDs must not mirror Rust Debug variant names"
);
}
#[test]
fn invariant_production_source_has_no_debug_evidence_formatting() {
let production = production_source();
for forbidden in [
"format!(\"roles: {:?}\"",
"format!(\"branches: {:?}\"",
"format!(\"bailment_state: {:?}\"",
] {
assert!(
!production.contains(forbidden),
"invariant evidence must use stable labels, not Debug formatting: {forbidden}"
);
}
}
#[test]
fn invariant_set_with_custom() {
assert_eq!(
InvariantSet::with(vec![ConstitutionalInvariant::NoSelfGrant])
.invariants
.len(),
1
);
}
#[test]
fn engine_all_constructor() {
assert_eq!(InvariantEngine::all().invariant_set.invariants.len(), 8);
}
fn make_vote(voter: &str, approved: bool, sig: u8, voice: Option<VoiceKind>) -> QuorumVote {
QuorumVote {
voter: did(voter),
approved,
signature: vec![sig],
provenance: voice.map(|vk| Provenance {
actor: did(voter),
timestamp: "t".into(),
action_hash: vec![1],
signature: vec![sig],
public_key: None,
voice_kind: Some(vk),
independence: (vk == VoiceKind::Human).then_some(IndependenceClaim::Independent),
review_order: (vk == VoiceKind::Human).then_some(ReviewOrder::FirstOrder),
}),
}
}
fn signed_human_quorum_vote(ctx: &mut InvariantContext, voter: &str, sig: u8) -> QuorumVote {
signed_human_quorum_vote_with_approval(ctx, voter, sig, true)
}
fn signed_human_quorum_vote_with_approval(
ctx: &mut InvariantContext,
voter: &str,
sig: u8,
approved: bool,
) -> QuorumVote {
let voter = did(voter);
let (pk, sk) = exo_core::crypto::generate_keypair();
let mut provenance = Provenance {
actor: voter.clone(),
timestamp: "2026-05-16T00:00:00Z".to_string(),
action_hash: vec![sig; 32],
signature: Vec::new(),
public_key: Some(pk.as_bytes().to_vec()),
voice_kind: Some(VoiceKind::Human),
independence: Some(IndependenceClaim::Independent),
review_order: Some(ReviewOrder::FirstOrder),
};
let message = provenance_signature_message(&provenance).expect("canonical provenance");
provenance.signature = exo_core::crypto::sign(message.as_bytes(), &sk)
.to_bytes()
.to_vec();
ctx.trusted_provenance_keys
.insert(voter.clone(), vec![pk.as_bytes().to_vec()]);
let mut vote = QuorumVote {
voter,
approved,
signature: Vec::new(),
provenance: Some(provenance),
};
let vote_message = quorum_vote_signature_message(
&vote,
vote.provenance.as_ref().expect("signed provenance"),
)
.expect("canonical vote signature");
vote.signature = exo_core::crypto::sign(vote_message.as_bytes(), &sk)
.to_bytes()
.to_vec();
vote
}
#[test]
fn quorum_fails_when_synthetic_makes_up_threshold() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 3,
votes: vec![
make_vote("did:exo:human1", true, 1, Some(VoiceKind::Human)),
make_vote("did:exo:ai1", true, 2, Some(VoiceKind::Synthetic)),
make_vote("did:exo:ai2", true, 3, Some(VoiceKind::Synthetic)),
],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(err[0].description.contains("verified human"));
assert!(
err[0]
.evidence
.iter()
.any(|e| e.contains("synthetic_votes_excluded: 2"))
);
}
#[test]
fn quorum_passes_when_humans_meet_threshold_despite_synthetics() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
let h1 = signed_human_quorum_vote(&mut ctx, "did:exo:h1", 1);
let h2 = signed_human_quorum_vote(&mut ctx, "did:exo:h2", 2);
let h3 = signed_human_quorum_vote(&mut ctx, "did:exo:h3", 3);
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 3,
votes: vec![
h1,
h2,
h3,
make_vote("did:exo:ai1", true, 4, Some(VoiceKind::Synthetic)),
make_vote("did:exo:ai2", true, 5, Some(VoiceKind::Synthetic)),
],
});
assert!(enforce_all(&engine, &ctx).is_ok());
}
#[test]
fn quorum_legitimate_rejects_duplicate_voter_evidence() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 2,
votes: vec![
make_vote("did:exo:h1", true, 1, Some(VoiceKind::Human)),
make_vote("did:exo:h1", true, 2, Some(VoiceKind::Human)),
make_vote("did:exo:h2", true, 3, Some(VoiceKind::Human)),
],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0].description.contains("duplicate voter"),
"duplicate voter evidence must be rejected explicitly"
);
assert!(
err[0]
.evidence
.iter()
.any(|entry| entry.contains("did:exo:h1")),
"violation evidence must identify the duplicated voter"
);
}
#[test]
fn quorum_rejects_legacy_votes_no_provenance() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 2,
votes: vec![
QuorumVote {
voter: did("did:exo:v1"),
approved: true,
signature: vec![1],
provenance: None,
},
QuorumVote {
voter: did("did:exo:v2"),
approved: true,
signature: vec![2],
provenance: None,
},
],
});
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn quorum_rejects_votes_without_signed_human_provenance() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 2,
votes: vec![
QuorumVote {
voter: did("did:exo:v1"),
approved: true,
signature: vec![1],
provenance: None,
},
QuorumVote {
voter: did("did:exo:v2"),
approved: true,
signature: vec![2],
provenance: None,
},
],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0].description.contains("verified human"),
"missing vote provenance must fail with a human-verification explanation"
);
}
#[test]
fn quorum_rejects_non_human_or_non_independent_vote_provenance() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
let mut coordinated = make_vote("did:exo:h1", true, 1, Some(VoiceKind::Human));
coordinated
.provenance
.as_mut()
.expect("human provenance")
.independence = Some(IndependenceClaim::Coordinated);
coordinated
.provenance
.as_mut()
.expect("human provenance")
.review_order = Some(ReviewOrder::FirstOrder);
let mut derivative = make_vote("did:exo:h2", true, 2, Some(VoiceKind::Human));
derivative
.provenance
.as_mut()
.expect("human provenance")
.independence = Some(IndependenceClaim::Independent);
derivative
.provenance
.as_mut()
.expect("human provenance")
.review_order = Some(ReviewOrder::Derivative);
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 3,
votes: vec![
make_vote("did:exo:system", true, 3, Some(VoiceKind::System)),
coordinated,
derivative,
],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0].evidence.iter().any(|entry| entry.contains("system")),
"violation evidence must identify non-human vote provenance"
);
}
#[test]
fn quorum_rejects_vote_provenance_actor_mismatch() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
let mut vote = make_vote("did:exo:voter", true, 1, Some(VoiceKind::Human));
let provenance = vote.provenance.as_mut().expect("human provenance");
provenance.actor = did("did:exo:different-actor");
provenance.independence = Some(IndependenceClaim::Independent);
provenance.review_order = Some(ReviewOrder::FirstOrder);
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 1,
votes: vec![vote],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0]
.evidence
.iter()
.any(|entry| entry.contains("provenance_actor")),
"violation evidence must identify actor/voter mismatch"
);
}
#[test]
fn quorum_rejects_tampered_human_voice_metadata_after_signing() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
let voter = did("did:exo:voter");
let (pk, sk) = exo_core::crypto::generate_keypair();
let mut provenance = Provenance {
actor: voter.clone(),
timestamp: "2026-05-16T00:00:00Z".to_string(),
action_hash: vec![0x41; 32],
signature: Vec::new(),
public_key: Some(pk.as_bytes().to_vec()),
voice_kind: Some(VoiceKind::Synthetic),
independence: Some(IndependenceClaim::Coordinated),
review_order: Some(ReviewOrder::Derivative),
};
let message = provenance_signature_message(&provenance).expect("canonical provenance");
provenance.signature = exo_core::crypto::sign(message.as_bytes(), &sk)
.to_bytes()
.to_vec();
provenance.voice_kind = Some(VoiceKind::Human);
provenance.independence = Some(IndependenceClaim::Independent);
provenance.review_order = Some(ReviewOrder::FirstOrder);
ctx.trusted_provenance_keys
.insert(voter.clone(), vec![pk.as_bytes().to_vec()]);
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 1,
votes: vec![QuorumVote {
voter,
approved: true,
signature: vec![1],
provenance: Some(provenance),
}],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0]
.evidence
.iter()
.any(|entry| entry.contains("signature")),
"tampered voice metadata must invalidate the signed vote provenance"
);
}
#[test]
fn quorum_rejects_unsigned_vote_decision_with_valid_human_provenance() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
let mut vote = signed_human_quorum_vote(&mut ctx, "did:exo:voter", 1);
vote.signature = vec![1];
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 1,
votes: vec![vote],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0]
.evidence
.iter()
.any(|entry| entry.contains("vote_signature")),
"a valid human provenance signature must not authorize an unsigned approval decision"
);
}
#[test]
fn quorum_rejects_approval_flag_tampering_after_vote_signing() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
let mut vote = signed_human_quorum_vote_with_approval(&mut ctx, "did:exo:voter", 1, false);
vote.approved = true;
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 1,
votes: vec![vote],
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert_eq!(err[0].invariant, ConstitutionalInvariant::QuorumLegitimate);
assert!(
err[0]
.evidence
.iter()
.any(|entry| entry.contains("vote_signature: invalid")),
"approval decisions must be bound to the vote signature"
);
}
#[test]
fn quorum_fails_all_synthetic_votes() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::QuorumLegitimate,
]));
let mut ctx = passing_context();
ctx.quorum_evidence = Some(QuorumEvidence {
threshold: 1,
votes: vec![
make_vote("did:exo:ai1", true, 1, Some(VoiceKind::Synthetic)),
make_vote("did:exo:ai2", true, 2, Some(VoiceKind::Synthetic)),
],
});
assert!(enforce_all(&engine, &ctx).is_err());
}
#[test]
fn invariant_signature_payloads_do_not_use_raw_delimited_hashing() {
let source = production_source();
assert!(
!source.contains("payload.push(0x00)"),
"signature payloads must use structured canonical CBOR, not ad hoc null separators"
);
assert!(
!source.contains("Hash256::digest(&payload)"),
"signature payloads must be hashed through hash_structured"
);
}
#[test]
fn authority_chain_validation_requires_trusted_grantor_key_resolution() {
let source = production_source();
let checker = source
.split("fn check_authority_chain_valid")
.nth(1)
.and_then(|rest| rest.split("fn check_quorum_legitimate").next())
.expect("AuthorityChainValid checker source must be present");
let compact_checker = checker
.chars()
.filter(|ch| !ch.is_whitespace())
.collect::<String>();
assert!(
compact_checker.contains("ctx.trusted_authority_keys"),
"AuthorityChainValid must consult runtime-resolved trusted grantor keys"
);
assert!(
checker.contains("grantor_public_key is unresolved for grantor DID"),
"AuthorityChainValid must fail closed when the grantor DID has no resolved key"
);
assert!(
checker.contains("grantor_public_key is not bound to grantor DID"),
"AuthorityChainValid must reject self-attested keys not bound to the grantor DID"
);
assert!(
compact_checker.find("ctx.trusted_authority_keys").unwrap()
< compact_checker.find("exo_core::crypto::verify").unwrap(),
"grantor key binding must be checked before accepting the Ed25519 signature"
);
}
#[test]
fn provenance_validation_requires_trusted_actor_key_resolution() {
let source = production_source();
let checker = source
.split("fn check_provenance_verifiable")
.nth(1)
.and_then(|section| {
section
.split("pub fn authority_link_signature_message")
.next()
})
.expect("provenance invariant checker must exist");
let compact_checker = checker
.chars()
.filter(|ch| !ch.is_whitespace())
.collect::<String>();
assert!(
compact_checker.contains("ctx.trusted_provenance_keys"),
"ProvenanceVerifiable must consult runtime-resolved trusted actor keys"
);
assert!(
checker.contains("public_key is unresolved for actor DID"),
"ProvenanceVerifiable must fail closed when the actor DID has no resolved key"
);
assert!(
checker.contains("public_key is not bound to actor DID"),
"ProvenanceVerifiable must reject self-attested keys not bound to the actor DID"
);
assert!(
compact_checker.find("ctx.trusted_provenance_keys").unwrap()
< compact_checker.find("exo_core::crypto::verify").unwrap(),
"actor key binding must be checked before accepting the Ed25519 signature"
);
}
fn signed_provenance(
actor_str: &str,
) -> (Provenance, exo_core::PublicKey, exo_core::SecretKey) {
let (pk, sk) = exo_core::crypto::generate_keypair();
let actor = did(actor_str);
let action_hash = vec![0xde, 0xad, 0xbe, 0xef];
let timestamp = "2026-01-01T00:00:00Z".to_string();
let mut prov = Provenance {
actor,
timestamp,
action_hash,
signature: Vec::new(),
public_key: Some(pk.as_bytes().to_vec()),
voice_kind: Some(VoiceKind::Human),
independence: Some(IndependenceClaim::Independent),
review_order: Some(ReviewOrder::FirstOrder),
};
let message = provenance_signature_message(&prov).expect("canonical provenance payload");
let sig = exo_core::crypto::sign(message.as_bytes(), &sk);
prov.signature = sig.to_bytes().to_vec();
(prov, pk, sk)
}
fn trust_provenance_key(
ctx: &mut InvariantContext,
actor: Did,
public_key: &exo_core::PublicKey,
) {
ctx.trusted_provenance_keys = TrustedProvenanceKeys::default();
ctx.trusted_provenance_keys
.insert(actor, vec![public_key.as_bytes().to_vec()]);
}
#[test]
fn provenance_passes_valid_ed25519_signature() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (prov, pk, _sk) = signed_provenance("did:exo:actor1");
trust_provenance_key(&mut ctx, prov.actor.clone(), &pk);
ctx.provenance = Some(prov);
assert!(enforce_all(&engine, &ctx).is_ok());
}
#[test]
fn provenance_rejects_unresolved_actor_public_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (prov, _pk, _sk) = signed_provenance("did:exo:actor1");
ctx.provenance = Some(prov);
ctx.trusted_provenance_keys = TrustedProvenanceKeys::default();
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("unresolved for actor DID"));
}
#[test]
fn provenance_rejects_actor_public_key_not_bound_to_actor_did() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (prov, _pk, _sk) = signed_provenance("did:exo:actor1");
let (other_pk, _) = exo_core::crypto::generate_keypair();
ctx.provenance = Some(prov);
ctx.trusted_provenance_keys = TrustedProvenanceKeys::default();
ctx.trusted_provenance_keys
.insert(ctx.actor.clone(), vec![other_pk.as_bytes().to_vec()]);
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("not bound to actor DID"));
}
#[test]
fn provenance_fails_tampered_ed25519_signature() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (mut prov, pk, _sk) = signed_provenance("did:exo:actor1");
trust_provenance_key(&mut ctx, prov.actor.clone(), &pk);
prov.signature[0] ^= 0xFF; ctx.provenance = Some(prov);
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("cryptographically invalid"));
}
#[test]
fn provenance_fails_wrong_public_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (mut prov, _pk, _sk) = signed_provenance("did:exo:actor1");
let (other_pk, _) = exo_core::crypto::generate_keypair();
prov.public_key = Some(other_pk.as_bytes().to_vec());
trust_provenance_key(&mut ctx, prov.actor.clone(), &other_pk);
ctx.provenance = Some(prov);
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("cryptographically invalid"));
}
#[test]
fn provenance_fails_malformed_public_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
ctx.provenance = Some(Provenance {
actor: ctx.actor.clone(),
timestamp: "t".into(),
action_hash: vec![1],
signature: vec![0u8; 64],
public_key: Some(vec![0u8; 16]), voice_kind: None,
independence: None,
review_order: None,
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("not 32 bytes"));
}
#[test]
fn provenance_fails_malformed_signature_with_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (pk, _sk) = exo_core::crypto::generate_keypair();
let actor = ctx.actor.clone();
trust_provenance_key(&mut ctx, actor, &pk);
ctx.provenance = Some(Provenance {
actor: ctx.actor.clone(),
timestamp: "t".into(),
action_hash: vec![1],
signature: vec![0u8; 32], public_key: Some(pk.as_bytes().to_vec()),
voice_kind: None,
independence: None,
review_order: None,
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("not 64 bytes"));
}
#[test]
fn provenance_rejects_legacy_delimited_signature_payload() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::ProvenanceVerifiable,
]));
let mut ctx = passing_context();
let (pk, sk) = exo_core::crypto::generate_keypair();
let actor = ctx.actor.clone();
trust_provenance_key(&mut ctx, actor.clone(), &pk);
let action_hash = vec![0xde, 0xad, 0xbe, 0xef];
let timestamp = "2026-01-01T00:00:00Z".to_string();
let mut payload = Vec::new();
payload.extend_from_slice(actor.as_str().as_bytes());
payload.push(0x00);
payload.extend_from_slice(&action_hash);
payload.push(0x00);
payload.extend_from_slice(timestamp.as_bytes());
let message = Hash256::digest(&payload);
let sig = exo_core::crypto::sign(message.as_bytes(), &sk);
ctx.provenance = Some(Provenance {
actor,
timestamp,
action_hash,
signature: sig.to_bytes().to_vec(),
public_key: Some(pk.as_bytes().to_vec()),
voice_kind: Some(VoiceKind::Human),
independence: Some(IndependenceClaim::Independent),
review_order: Some(ReviewOrder::FirstOrder),
});
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("cryptographically invalid"));
}
fn signed_link(grantor_str: &str, grantee_str: &str) -> (AuthorityLink, exo_core::PublicKey) {
let (pk, sk) = exo_core::crypto::generate_keypair();
let grantor = did(grantor_str);
let grantee = did(grantee_str);
let perms = PermissionSet::new(vec![Permission::new("read")]);
let mut link = AuthorityLink {
grantor,
grantee,
permissions: perms,
signature: Vec::new(),
grantor_public_key: Some(pk.as_bytes().to_vec()),
};
let message = authority_link_signature_message(&link).expect("canonical link payload");
let sig = exo_core::crypto::sign(message.as_bytes(), &sk);
link.signature = sig.to_bytes().to_vec();
(link, pk)
}
#[test]
fn authority_chain_passes_with_valid_ed25519_signature() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (link, pk) = signed_link("did:exo:root", "did:exo:actor1");
ctx.trusted_authority_keys
.insert(link.grantor.clone(), vec![pk.as_bytes().to_vec()]);
ctx.authority_chain = AuthorityChain { links: vec![link] };
assert!(enforce_all(&engine, &ctx).is_ok());
}
#[test]
fn authority_chain_rejects_unresolved_grantor_public_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (link, _pk) = signed_link("did:exo:unresolved-root", "did:exo:actor1");
ctx.authority_chain = AuthorityChain { links: vec![link] };
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(
err[0].description.contains("grantor_public_key")
&& err[0].description.contains("unresolved"),
"self-attested grantor public keys must not satisfy AuthorityChainValid: {err:?}"
);
}
#[test]
fn authority_chain_fails_when_requested_permission_absent_from_actor_permissions() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
ctx.requested_permissions = PermissionSet::new(vec![Permission::new("advance_pace")]);
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(
err[0].description.contains("Requested permission"),
"requested permission missing from actor permissions must fail AuthorityChainValid"
);
}
#[test]
fn authority_chain_fails_when_requested_permission_absent_from_chain_scope() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
ctx.actor_permissions = PermissionSet::new(vec![Permission::new("advance_pace")]);
ctx.requested_permissions = PermissionSet::new(vec![Permission::new("advance_pace")]);
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(
err[0].description.contains("authority chain scope"),
"requested permission missing from signed chain scope must fail AuthorityChainValid"
);
}
#[test]
fn authority_chain_fails_with_tampered_signature() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (mut link, pk) = signed_link("did:exo:root", "did:exo:actor1");
ctx.trusted_authority_keys
.insert(link.grantor.clone(), vec![pk.as_bytes().to_vec()]);
link.signature[0] ^= 0xFF;
ctx.authority_chain = AuthorityChain { links: vec![link] };
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("cryptographically invalid"));
}
#[test]
fn authority_chain_fails_with_wrong_public_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (mut link, pk) = signed_link("did:exo:root", "did:exo:actor1");
ctx.trusted_authority_keys
.insert(link.grantor.clone(), vec![pk.as_bytes().to_vec()]);
let (other_pk, _other_sk) = exo_core::crypto::generate_keypair();
link.grantor_public_key = Some(other_pk.as_bytes().to_vec());
ctx.authority_chain = AuthorityChain { links: vec![link] };
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("not bound"));
}
#[test]
fn authority_chain_fails_with_malformed_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let link = AuthorityLink {
grantor: did("did:exo:root"),
grantee: did("did:exo:actor1"),
permissions: PermissionSet::default(),
signature: vec![0u8; 64],
grantor_public_key: Some(vec![0u8; 16]), };
ctx.authority_chain = AuthorityChain { links: vec![link] };
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("not 32 bytes"));
}
#[test]
fn authority_chain_fails_empty_signature_with_public_key() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (mut link, pk) = signed_link("did:exo:root", "did:exo:actor1");
ctx.trusted_authority_keys
.insert(link.grantor.clone(), vec![pk.as_bytes().to_vec()]);
link.signature.clear();
ctx.authority_chain = AuthorityChain { links: vec![link] };
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("not 64 bytes"));
}
#[test]
fn authority_chain_fails_without_grantor_public_key_even_with_non_empty_signature() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
ctx.authority_chain = AuthorityChain {
links: vec![AuthorityLink {
grantor: did("did:exo:root"),
grantee: did("did:exo:actor1"),
permissions: PermissionSet::new(vec![Permission::new("read")]),
signature: vec![7u8; 64],
grantor_public_key: None,
}],
};
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("grantor_public_key"));
}
#[test]
fn authority_chain_rejects_legacy_delimited_signature_payload() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (pk, sk) = exo_core::crypto::generate_keypair();
let grantor = did("did:exo:root");
let grantee = ctx.actor.clone();
let permissions = PermissionSet::new(vec![Permission::new("read")]);
let mut payload = Vec::new();
payload.extend_from_slice(grantor.as_str().as_bytes());
payload.push(0x00);
payload.extend_from_slice(grantee.as_str().as_bytes());
payload.push(0x00);
for permission in &permissions.permissions {
payload.extend_from_slice(permission.0.as_bytes());
payload.push(0x00);
}
let message = Hash256::digest(&payload);
let sig = exo_core::crypto::sign(message.as_bytes(), &sk);
ctx.trusted_authority_keys
.insert(grantor.clone(), vec![pk.as_bytes().to_vec()]);
ctx.authority_chain = AuthorityChain {
links: vec![AuthorityLink {
grantor,
grantee,
permissions,
signature: sig.to_bytes().to_vec(),
grantor_public_key: Some(pk.as_bytes().to_vec()),
}],
};
let err = enforce_all(&engine, &ctx).unwrap_err();
assert!(err[0].description.contains("cryptographically invalid"));
}
#[test]
fn authority_chain_passes_multi_link_with_ed25519() {
let engine = InvariantEngine::new(InvariantSet::with(vec![
ConstitutionalInvariant::AuthorityChainValid,
]));
let mut ctx = passing_context();
let (link1, pk1) = signed_link("did:exo:root", "did:exo:mid");
let (link2, pk2) = signed_link("did:exo:mid", "did:exo:actor1");
ctx.trusted_authority_keys
.insert(link1.grantor.clone(), vec![pk1.as_bytes().to_vec()]);
ctx.trusted_authority_keys
.insert(link2.grantor.clone(), vec![pk2.as_bytes().to_vec()]);
ctx.authority_chain = AuthorityChain {
links: vec![link1, link2],
};
assert!(enforce_all(&engine, &ctx).is_ok());
}
}