use exo_core::{Did, SignerType};
use serde::{Deserialize, Serialize};
use crate::{error::GatekeeperError, types::PermissionSet};
const MCP_TYPED_SIGNATURE_DOMAIN: &str = "exo.gatekeeper.mcp.typed-signature.v1";
const MCP_TYPED_SIGNATURE_SCHEMA_VERSION: u16 = 1;
#[derive(Serialize)]
struct McpTypedSignaturePayload<'a> {
domain: &'static str,
schema_version: u16,
signer_type: &'a SignerType,
message: &'a [u8],
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum McpRule {
Mcp001BctsScope,
Mcp002NoSelfEscalation,
Mcp003ProvenanceRequired,
Mcp004NoIdentityForge,
Mcp005Distinguishable,
Mcp006ConsentBoundaries,
}
impl McpRule {
#[must_use]
pub fn all() -> Vec<McpRule> {
vec![
McpRule::Mcp001BctsScope,
McpRule::Mcp002NoSelfEscalation,
McpRule::Mcp003ProvenanceRequired,
McpRule::Mcp004NoIdentityForge,
McpRule::Mcp005Distinguishable,
McpRule::Mcp006ConsentBoundaries,
]
}
#[must_use]
pub fn id(&self) -> &'static str {
match self {
McpRule::Mcp001BctsScope => "mcp-001-bcts-scope",
McpRule::Mcp002NoSelfEscalation => "mcp-002-no-self-escalation",
McpRule::Mcp003ProvenanceRequired => "mcp-003-provenance-required",
McpRule::Mcp004NoIdentityForge => "mcp-004-no-identity-forge",
McpRule::Mcp005Distinguishable => "mcp-005-distinguishable",
McpRule::Mcp006ConsentBoundaries => "mcp-006-consent-boundaries",
}
}
#[must_use]
pub fn description(&self) -> &'static str {
match self {
McpRule::Mcp001BctsScope => "AI must operate within BCTS scope",
McpRule::Mcp002NoSelfEscalation => "AI cannot self-escalate capabilities",
McpRule::Mcp003ProvenanceRequired => "AI actions require provenance metadata",
McpRule::Mcp004NoIdentityForge => "AI cannot forge identity or signatures",
McpRule::Mcp005Distinguishable => "AI outputs must be distinguishable from human",
McpRule::Mcp006ConsentBoundaries => "AI must respect consent boundaries",
}
}
}
#[derive(Debug, Clone)]
pub struct McpContext {
pub actor_did: Did,
pub signer_type: SignerType,
pub bcts_scope: Option<String>,
pub capabilities: PermissionSet,
pub action: String,
pub has_provenance: bool,
pub forging_identity: bool,
pub output_marked_ai: bool,
pub consent_active: bool,
pub self_escalation: bool,
}
impl McpContext {
#[must_use]
pub fn is_ai(&self) -> bool {
self.signer_type.is_ai()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpViolation {
pub rule: McpRule,
pub description: String,
pub evidence: Vec<String>,
pub severity: u8,
}
pub fn enforce(rules: &[McpRule], context: &McpContext) -> Result<(), McpViolation> {
if !context.is_ai() {
return Ok(());
}
for rule in rules {
check_rule(*rule, context)?;
}
Ok(())
}
fn check_rule(rule: McpRule, ctx: &McpContext) -> Result<(), McpViolation> {
match rule {
McpRule::Mcp001BctsScope => {
if ctx.bcts_scope.is_none() {
return Err(McpViolation {
rule,
description: "AI operating outside BCTS scope".into(),
evidence: vec![format!("actor: {}", ctx.actor_did)],
severity: 5,
});
}
Ok(())
}
McpRule::Mcp002NoSelfEscalation => {
if ctx.self_escalation {
return Err(McpViolation {
rule,
description: "AI attempted self-escalation".into(),
evidence: vec![
format!("actor: {}", ctx.actor_did),
format!("action: {}", ctx.action),
],
severity: 5,
});
}
Ok(())
}
McpRule::Mcp003ProvenanceRequired => {
if !ctx.has_provenance {
return Err(McpViolation {
rule,
description: "AI action lacks provenance".into(),
evidence: vec![format!("action: {}", ctx.action)],
severity: 4,
});
}
Ok(())
}
McpRule::Mcp004NoIdentityForge => {
if ctx.forging_identity {
return Err(McpViolation {
rule,
description: "AI attempted identity forge".into(),
evidence: vec![format!("actor: {}", ctx.actor_did)],
severity: 5,
});
}
Ok(())
}
McpRule::Mcp005Distinguishable => {
if !ctx.output_marked_ai {
return Err(McpViolation {
rule,
description: "AI output not marked".into(),
evidence: vec![format!("action: {}", ctx.action)],
severity: 3,
});
}
Ok(())
}
McpRule::Mcp006ConsentBoundaries => {
if !ctx.consent_active {
return Err(McpViolation {
rule,
description: "AI operating without consent".into(),
evidence: vec![format!("actor: {}", ctx.actor_did)],
severity: 5,
});
}
Ok(())
}
}
}
pub fn build_signed_payload(
signer_type: &SignerType,
message: &[u8],
) -> Result<Vec<u8>, GatekeeperError> {
let payload = McpTypedSignaturePayload {
domain: MCP_TYPED_SIGNATURE_DOMAIN,
schema_version: MCP_TYPED_SIGNATURE_SCHEMA_VERSION,
signer_type,
message,
};
let mut encoded = Vec::new();
ciborium::ser::into_writer(&payload, &mut encoded).map_err(|error| {
GatekeeperError::McpTypedSignatureEncodingFailed {
reason: error.to_string(),
}
})?;
Ok(encoded)
}
#[must_use]
pub fn verify_typed_signature(
signer_type: &SignerType,
message: &[u8],
signature: &exo_core::Signature,
public_key: &exo_core::PublicKey,
) -> bool {
match build_signed_payload(signer_type, message) {
Ok(payload) => exo_core::crypto::verify(&payload, signature, public_key),
Err(_) => false,
}
}
#[cfg(test)]
mod tests {
use exo_core::{Hash256, crypto::KeyPair};
use serde::Deserialize;
use super::*;
use crate::types::Permission;
fn production_source() -> &'static str {
let source = include_str!("mcp.rs");
let end = source
.find("// ===========================================================================")
.expect("tests section marker must exist");
&source[..end]
}
fn did(s: &str) -> Did {
Did::new(s).expect("valid DID")
}
fn valid_ai() -> McpContext {
McpContext {
actor_did: did("did:exo:ai-agent-1"),
signer_type: SignerType::Ai {
delegation_id: Hash256::digest(b"delegation-1"),
},
bcts_scope: Some("data:medical".into()),
capabilities: PermissionSet::new(vec![Permission::new("read")]),
action: "summarize".into(),
has_provenance: true,
forging_identity: false,
output_marked_ai: true,
consent_active: true,
self_escalation: false,
}
}
fn human() -> McpContext {
McpContext {
actor_did: did("did:exo:human-1"),
signer_type: SignerType::Human,
bcts_scope: None,
capabilities: PermissionSet::default(),
action: "anything".into(),
has_provenance: false,
forging_identity: false,
output_marked_ai: false,
consent_active: false,
self_escalation: false,
}
}
#[test]
fn all_pass_valid_ai() {
assert!(enforce(&McpRule::all(), &valid_ai()).is_ok());
}
#[test]
fn human_exempt() {
assert!(enforce(&McpRule::all(), &human()).is_ok());
}
#[test]
fn mcp001_fail() {
let mut c = valid_ai();
c.bcts_scope = None;
let e = enforce(&[McpRule::Mcp001BctsScope], &c).unwrap_err();
assert_eq!(e.rule, McpRule::Mcp001BctsScope);
assert_eq!(e.severity, 5);
}
#[test]
fn mcp001_pass() {
assert!(enforce(&[McpRule::Mcp001BctsScope], &valid_ai()).is_ok());
}
#[test]
fn mcp002_fail() {
let mut c = valid_ai();
c.self_escalation = true;
assert_eq!(
enforce(&[McpRule::Mcp002NoSelfEscalation], &c)
.unwrap_err()
.rule,
McpRule::Mcp002NoSelfEscalation
);
}
#[test]
fn mcp002_pass() {
assert!(enforce(&[McpRule::Mcp002NoSelfEscalation], &valid_ai()).is_ok());
}
#[test]
fn mcp003_fail() {
let mut c = valid_ai();
c.has_provenance = false;
let e = enforce(&[McpRule::Mcp003ProvenanceRequired], &c).unwrap_err();
assert_eq!(e.rule, McpRule::Mcp003ProvenanceRequired);
assert_eq!(e.severity, 4);
}
#[test]
fn mcp003_pass() {
assert!(enforce(&[McpRule::Mcp003ProvenanceRequired], &valid_ai()).is_ok());
}
#[test]
fn mcp004_fail() {
let mut c = valid_ai();
c.forging_identity = true;
assert_eq!(
enforce(&[McpRule::Mcp004NoIdentityForge], &c)
.unwrap_err()
.rule,
McpRule::Mcp004NoIdentityForge
);
}
#[test]
fn mcp004_pass() {
assert!(enforce(&[McpRule::Mcp004NoIdentityForge], &valid_ai()).is_ok());
}
#[test]
fn mcp005_fail() {
let mut c = valid_ai();
c.output_marked_ai = false;
let e = enforce(&[McpRule::Mcp005Distinguishable], &c).unwrap_err();
assert_eq!(e.rule, McpRule::Mcp005Distinguishable);
assert_eq!(e.severity, 3);
}
#[test]
fn mcp005_pass() {
assert!(enforce(&[McpRule::Mcp005Distinguishable], &valid_ai()).is_ok());
}
#[test]
fn mcp006_fail() {
let mut c = valid_ai();
c.consent_active = false;
assert_eq!(
enforce(&[McpRule::Mcp006ConsentBoundaries], &c)
.unwrap_err()
.rule,
McpRule::Mcp006ConsentBoundaries
);
}
#[test]
fn mcp006_pass() {
assert!(enforce(&[McpRule::Mcp006ConsentBoundaries], &valid_ai()).is_ok());
}
#[test]
fn first_violation_returned() {
let mut c = valid_ai();
c.bcts_scope = None;
c.has_provenance = false;
assert_eq!(
enforce(
&[McpRule::Mcp001BctsScope, McpRule::Mcp003ProvenanceRequired],
&c
)
.unwrap_err()
.rule,
McpRule::Mcp001BctsScope
);
}
#[test]
fn empty_rules_pass() {
assert!(enforce(&[], &valid_ai()).is_ok());
}
#[test]
fn all_six_rules() {
assert_eq!(McpRule::all().len(), 6);
}
#[test]
fn descriptions_non_empty() {
for r in McpRule::all() {
assert!(!r.description().is_empty());
}
}
#[test]
fn rule_ids_are_stable_and_non_debug() {
assert_eq!(McpRule::Mcp001BctsScope.id(), "mcp-001-bcts-scope");
for r in McpRule::all() {
assert!(r.id().starts_with("mcp-00"));
assert!(
!r.id().contains("Mcp"),
"stable MCP rule IDs must not mirror Rust Debug variant names"
);
}
}
#[test]
fn ai_cannot_impersonate_human() {
let kp = KeyPair::generate();
let message = b"important governance vote";
let ai_type = SignerType::Ai {
delegation_id: Hash256::digest(b"session-1"),
};
let ai_payload = build_signed_payload(&ai_type, message).expect("AI payload encodes");
let ai_sig = kp.sign(&ai_payload);
assert!(verify_typed_signature(
&ai_type,
message,
&ai_sig,
kp.public_key()
));
assert!(!verify_typed_signature(
&SignerType::Human,
message,
&ai_sig,
kp.public_key()
));
}
#[test]
fn human_signature_cannot_be_replayed_as_ai() {
let kp = KeyPair::generate();
let message = b"budget approval";
let human_payload =
build_signed_payload(&SignerType::Human, message).expect("human payload encodes");
let human_sig = kp.sign(&human_payload);
assert!(verify_typed_signature(
&SignerType::Human,
message,
&human_sig,
kp.public_key()
));
let ai_type = SignerType::Ai {
delegation_id: Hash256::digest(b"d"),
};
assert!(!verify_typed_signature(
&ai_type,
message,
&human_sig,
kp.public_key()
));
}
#[test]
fn different_delegation_ids_produce_different_signatures() {
let kp = KeyPair::generate();
let message = b"action";
let ai1 = SignerType::Ai {
delegation_id: Hash256::digest(b"delegation-A"),
};
let ai2 = SignerType::Ai {
delegation_id: Hash256::digest(b"delegation-B"),
};
let payload1 = build_signed_payload(&ai1, message).expect("AI payload encodes");
let payload2 = build_signed_payload(&ai2, message).expect("AI payload encodes");
assert_ne!(payload1, payload2);
let sig1 = kp.sign(&payload1);
assert!(verify_typed_signature(
&ai1,
message,
&sig1,
kp.public_key()
));
assert!(!verify_typed_signature(
&ai2,
message,
&sig1,
kp.public_key()
));
}
#[test]
fn typed_signature_payload_is_domain_separated_versioned_cbor() {
#[derive(Debug, Deserialize, PartialEq, Eq)]
struct TypedSignaturePayload {
domain: String,
schema_version: u16,
signer_type: SignerType,
message: Vec<u8>,
}
let signer_type = SignerType::Ai {
delegation_id: Hash256::digest(b"typed-signature-session"),
};
let message = b"constitutional MCP action";
let payload = build_signed_payload(&signer_type, message).expect("payload encodes");
let decoded: TypedSignaturePayload =
ciborium::from_reader(payload.as_slice()).expect("typed signature payload is CBOR");
assert_eq!(decoded.domain, "exo.gatekeeper.mcp.typed-signature.v1");
assert_eq!(decoded.schema_version, 1);
assert_eq!(decoded.signer_type, signer_type);
assert_eq!(decoded.message, message);
}
#[test]
fn typed_signature_payload_source_uses_cbor_not_raw_concatenation() {
let production = production_source();
let start = production
.find("pub fn build_signed_payload")
.expect("build_signed_payload exists");
let end = production
.find("/// Verify that a signature was produced with the claimed signer type.")
.expect("verify_typed_signature marker exists");
let body = &production[start..end];
assert!(
body.contains("ciborium::"),
"typed signature payloads must use canonical CBOR"
);
assert!(
!body.contains("extend_from_slice"),
"typed signature payloads must not be raw byte concatenations"
);
assert!(
!body.contains("to_payload_prefix"),
"typed signature payloads must bind the structured signer type, not an ad hoc prefix"
);
assert!(
!body.contains("return Vec::new()"),
"typed signature serialization must not silently fall back to an empty payload"
);
}
#[test]
fn signer_type_prefix_bytes() {
assert_eq!(SignerType::Human.prefix_byte(), 0x01);
let ai = SignerType::Ai {
delegation_id: Hash256::ZERO,
};
assert_eq!(ai.prefix_byte(), 0x02);
assert_eq!(ai.to_payload_prefix().len(), 33); }
#[test]
fn signer_type_is_checks() {
assert!(SignerType::Human.is_human());
assert!(!SignerType::Human.is_ai());
let ai = SignerType::Ai {
delegation_id: Hash256::ZERO,
};
assert!(ai.is_ai());
assert!(!ai.is_human());
}
#[test]
fn context_is_ai_derived() {
assert!(valid_ai().is_ai());
assert!(!human().is_ai());
}
}