use super::audit::{AuditEvent, AuditLog};
use super::decision::{
reason_codes, refresh_contract_projections, Decision, DecisionEmitter, DecisionEvent,
FileDecisionEmitter, NullDecisionEmitter,
};
use super::jsonrpc::JsonRpcRequest;
use super::policy::{
make_deny_response, McpPolicy, PolicyDecision, PolicyMatchMetadata, PolicyState,
};
use super::tool_definition::{binding_from_tools_list_tool, ToolDefinitionBinding};
use std::{
collections::HashMap,
io::{self, BufRead, BufReader, Write},
process::{Child, Command, Stdio},
sync::{Arc, Mutex},
thread,
};
#[derive(Clone, Debug)]
pub struct ProxyConfig {
pub dry_run: bool,
pub verbose: bool,
pub audit_log_path: Option<std::path::PathBuf>,
pub server_id: String,
pub decision_log_path: Option<std::path::PathBuf>,
pub event_source: Option<String>,
}
#[derive(Clone, Debug, Default)]
pub struct ProxyConfigRaw {
pub dry_run: bool,
pub verbose: bool,
pub audit_log_path: Option<std::path::PathBuf>,
pub server_id: String,
pub decision_log_path: Option<std::path::PathBuf>,
pub event_source: Option<String>,
}
impl ProxyConfig {
pub fn try_from_raw(raw: ProxyConfigRaw) -> anyhow::Result<Self> {
let logging_enabled = raw.audit_log_path.is_some() || raw.decision_log_path.is_some();
let event_source = raw
.event_source
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
if logging_enabled && event_source.is_none() {
anyhow::bail!(
"event_source is required when logging is enabled (e.g. --event-source assay://org/app)"
);
}
if let Some(ref src) = event_source {
validate_event_source(src)?;
}
Ok(ProxyConfig {
dry_run: raw.dry_run,
verbose: raw.verbose,
audit_log_path: raw.audit_log_path,
server_id: raw.server_id,
decision_log_path: raw.decision_log_path,
event_source,
})
}
}
fn validate_event_source(s: &str) -> anyhow::Result<()> {
let s = s.trim();
if s.is_empty() {
anyhow::bail!("event_source must be absolute URI with scheme (e.g. assay://org/app)");
}
if s.chars().any(|c| c.is_whitespace()) {
anyhow::bail!("event_source must not contain whitespace");
}
let Some(pos) = s.find("://") else {
anyhow::bail!("event_source must be absolute URI with scheme (e.g. assay://org/app)");
};
if pos == 0 {
anyhow::bail!("event_source must have scheme before :// (e.g. assay://org/app)");
}
let scheme = &s[..pos];
let mut chars = scheme.chars();
match chars.next() {
Some(c) if c.is_ascii_alphabetic() => {}
_ => anyhow::bail!("event_source URI scheme must start with a letter"),
}
if !chars.all(|c| c.is_ascii_alphanumeric() || c == '+' || c == '-' || c == '.') {
anyhow::bail!("event_source URI scheme contains invalid characters");
}
Ok(())
}
pub struct McpProxy {
child: Child,
policy: McpPolicy,
config: ProxyConfig,
identity_cache: Arc<Mutex<HashMap<String, super::identity::ToolIdentity>>>,
tool_definition_cache: Arc<Mutex<HashMap<String, ToolDefinitionBinding>>>,
}
impl Drop for McpProxy {
fn drop(&mut self) {
let _ = self.child.kill();
}
}
impl McpProxy {
pub fn spawn(
command: &str,
args: &[String],
policy: McpPolicy,
config: ProxyConfig,
) -> io::Result<Self> {
let child = Command::new(command)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit()) .spawn()?;
Ok(Self {
child,
policy,
config,
identity_cache: Arc::new(Mutex::new(HashMap::new())),
tool_definition_cache: Arc::new(Mutex::new(HashMap::new())),
})
}
pub fn run(mut self) -> io::Result<i32> {
let mut child_stdin = self.child.stdin.take().expect("child stdin");
let child_stdout = self.child.stdout.take().expect("child stdout");
let stdout = Arc::new(Mutex::new(io::stdout()));
let policy = self.policy.clone();
let config = self.config.clone();
let identity_cache_a = self.identity_cache.clone();
let identity_cache_b = self.identity_cache.clone();
let tool_definition_cache_a = self.tool_definition_cache.clone();
let tool_definition_cache_b = self.tool_definition_cache.clone();
let decision_emitter: Arc<dyn DecisionEmitter> =
if let Some(path) = &config.decision_log_path {
Arc::new(FileDecisionEmitter::new(path)?)
} else {
Arc::new(NullDecisionEmitter)
};
let event_source = config
.event_source
.clone()
.unwrap_or_else(|| format!("assay://{}", config.server_id));
let stdout_a = stdout.clone();
let t_server_to_client = thread::spawn(move || -> io::Result<()> {
let mut reader = BufReader::new(child_stdout);
let mut line = String::new();
while reader.read_line(&mut line)? > 0 {
let mut processed_line = line.clone();
if let Ok(mut v) = serde_json::from_str::<serde_json::Value>(&line) {
if let Some(result) = v.get_mut("result") {
if let Some(tools) = result.get_mut("tools").and_then(|t| t.as_array_mut())
{
for tool in tools {
if let Some(observation) =
Self::observe_tool_definition(tool, &config.server_id)
{
let mut identity_cache = identity_cache_a.lock().unwrap();
identity_cache.insert(
observation.name.clone(),
observation.identity.clone(),
);
drop(identity_cache);
if let Some(binding) = observation.binding {
let mut binding_cache =
tool_definition_cache_a.lock().unwrap();
binding_cache.insert(observation.name, binding);
}
}
}
processed_line =
serde_json::to_string(&v).unwrap_or(line.clone()) + "\n";
}
}
}
let mut out = stdout_a
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
out.write_all(processed_line.as_bytes())?;
out.flush()?;
line.clear();
}
Ok(())
});
let stdout_b = stdout.clone();
let emitter_b = decision_emitter.clone();
let event_source_b = event_source.clone();
let t_client_to_server = thread::spawn(move || -> io::Result<()> {
let stdin = io::stdin();
let mut reader = stdin.lock();
let mut line = String::new();
let mut state = PolicyState::default();
let mut audit_log = AuditLog::new(config.audit_log_path.as_deref());
while reader.read_line(&mut line)? > 0 {
match serde_json::from_str::<JsonRpcRequest>(&line) {
Ok(req) => {
let (runtime_id, tool_definition_binding) = if req.is_tool_call() {
let name = req.tool_params().map(|p| p.name).unwrap_or_default();
let runtime_id = {
let cache = identity_cache_b.lock().unwrap();
cache.get(&name).cloned()
};
let tool_definition_binding = {
let cache = tool_definition_cache_b.lock().unwrap();
cache.get(&name).cloned()
};
(runtime_id, tool_definition_binding)
} else {
(None, None)
};
let tool_name = req.tool_params().map(|p| p.name).unwrap_or_default();
let tool_call_id = Self::extract_tool_call_id(&req);
let policy_eval = policy.evaluate_with_metadata(
&tool_name,
&req.tool_params()
.map(|p| p.arguments)
.unwrap_or(serde_json::Value::Null),
&mut state,
runtime_id.as_ref(),
);
match policy_eval.decision {
PolicyDecision::Allow => {
Self::handle_allow(&req, &mut audit_log, config.verbose);
if req.is_tool_call() {
Self::emit_decision(
&emitter_b,
&event_source_b,
&tool_call_id,
&tool_name,
Decision::Allow,
reason_codes::P_POLICY_ALLOW,
None,
req.id.clone(),
&policy_eval.metadata,
tool_definition_binding.as_ref(),
);
}
}
PolicyDecision::AllowWithWarning { tool, code, reason } => {
if config.verbose {
eprintln!(
"[assay] WARNING: Allowing tool '{}' with warning (code: {}, reason: {}).",
tool,
code,
reason
);
}
audit_log.log(&AuditEvent {
timestamp: chrono::Utc::now().to_rfc3339(),
decision: "allow_with_warning".to_string(),
tool: Some(tool.clone()),
reason: Some(reason.clone()),
request_id: req.id.clone(),
agentic: None,
});
Self::emit_decision(
&emitter_b,
&event_source_b,
&tool_call_id,
&tool,
Decision::Allow,
&code,
Some(reason),
req.id.clone(),
&policy_eval.metadata,
tool_definition_binding.as_ref(),
);
Self::handle_allow(&req, &mut audit_log, false);
}
PolicyDecision::Deny {
tool,
code,
reason,
contract,
} => {
let decision_str =
if config.dry_run { "would_deny" } else { "deny" };
if config.verbose {
eprintln!(
"[assay] {} {} (reason: {})",
decision_str.to_uppercase(),
tool,
reason
);
}
audit_log.log(&AuditEvent {
timestamp: chrono::Utc::now().to_rfc3339(),
decision: decision_str.to_string(),
tool: Some(tool.clone()),
reason: Some(reason.clone()),
request_id: req.id.clone(),
agentic: Some(contract.clone()),
});
let reason_code = Self::map_policy_code(&code);
Self::emit_decision(
&emitter_b,
&event_source_b,
&tool_call_id,
&tool,
if config.dry_run {
Decision::Allow
} else {
Decision::Deny
},
&reason_code,
Some(reason),
req.id.clone(),
&policy_eval.metadata,
tool_definition_binding.as_ref(),
);
if config.dry_run {
} else {
let id = req.id.unwrap_or(serde_json::Value::Null);
let response_json = make_deny_response(
id,
"Content blocked by policy",
contract,
);
let mut out = stdout_b
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
out.write_all(response_json.as_bytes())?;
out.flush()?;
line.clear();
continue; }
}
}
}
Err(_) => {
let trimmed = line.trim();
if trimmed.starts_with('{')
&& (trimmed.contains("\"method\"")
|| trimmed.contains("\"params\"")
|| trimmed.contains("\"tool\""))
{
eprintln!("[assay] WARNING: Suspicious unparsable JSON, forwarding anyway (potential bypass attempt?): {:.60}...", trimmed);
}
}
}
child_stdin.write_all(line.as_bytes())?;
child_stdin.flush()?;
line.clear();
}
Ok(())
});
t_client_to_server
.join()
.map_err(|_| io::Error::other("client->server thread panicked"))??;
let _ = t_server_to_client.join();
let status = self.child.wait()?;
Ok(status.code().unwrap_or(1))
}
fn handle_allow(req: &JsonRpcRequest, audit_log: &mut AuditLog, verbose: bool) {
if verbose && req.is_tool_call() {
let tool = req
.tool_params()
.map(|p| p.name)
.unwrap_or_else(|| "unknown".to_string());
eprintln!("[assay] ALLOW {}", tool);
}
if req.is_tool_call() {
let tool = req.tool_params().map(|p| p.name);
audit_log.log(&AuditEvent {
timestamp: chrono::Utc::now().to_rfc3339(),
decision: "allow".to_string(),
tool,
reason: None,
request_id: req.id.clone(),
agentic: None,
});
}
}
fn extract_tool_call_id(request: &JsonRpcRequest) -> String {
if let Some(params) = request.tool_params() {
if let Some(meta) = params.arguments.get("_meta") {
if let Some(id) = meta.get("tool_call_id").and_then(|v| v.as_str()) {
return id.to_string();
}
}
}
if let Some(id) = &request.id {
if let Some(s) = id.as_str() {
return format!("req_{}", s);
}
if let Some(n) = id.as_i64() {
return format!("req_{}", n);
}
}
format!("gen_{}", uuid::Uuid::new_v4())
}
fn map_policy_code(code: &str) -> String {
match code {
"E_TOOL_DENIED" => reason_codes::P_TOOL_DENIED.to_string(),
"E_TOOL_NOT_ALLOWED" => reason_codes::P_TOOL_NOT_ALLOWED.to_string(),
"E_ARG_SCHEMA" => reason_codes::P_ARG_SCHEMA.to_string(),
"E_RATE_LIMIT" => reason_codes::P_RATE_LIMIT.to_string(),
"E_TOOL_DRIFT" => reason_codes::P_TOOL_DRIFT.to_string(),
_ => reason_codes::P_POLICY_DENY.to_string(),
}
}
#[allow(clippy::too_many_arguments)]
fn emit_decision(
emitter: &Arc<dyn DecisionEmitter>,
source: &str,
tool_call_id: &str,
tool: &str,
decision: Decision,
reason_code: &str,
reason: Option<String>,
request_id: Option<serde_json::Value>,
metadata: &PolicyMatchMetadata,
tool_definition_binding: Option<&ToolDefinitionBinding>,
) {
let mut event = DecisionEvent::new(
source.to_string(),
tool_call_id.to_string(),
tool.to_string(),
);
event.data.decision = decision;
event.data.reason_code = reason_code.to_string();
event.data.reason = reason;
event.data.request_id = request_id;
event.data.tool_classes = metadata.tool_classes.clone();
event.data.matched_tool_classes = metadata.matched_tool_classes.clone();
event.data.match_basis = metadata.match_basis.as_str().map(ToString::to_string);
event.data.matched_rule = metadata.matched_rule.clone();
event.data.typed_decision = metadata.typed_decision;
event.data.policy_version = metadata.policy_version.clone();
event.data.policy_digest = metadata.policy_digest.clone();
event.data.apply_policy_snapshot_projection();
event
.data
.apply_tool_definition_binding(tool_definition_binding);
event.data.obligations = metadata.obligations.clone();
event.data.obligation_outcomes =
super::obligations::execute_log_only(&metadata.obligations, tool);
event.data.approval_state = metadata.approval_state.clone();
if let Some(artifact) = &metadata.approval_artifact {
event.data.approval_id = Some(artifact.approval_id.clone());
event.data.approver = Some(artifact.approver.clone());
event.data.issued_at = Some(artifact.issued_at.clone());
event.data.expires_at = Some(artifact.expires_at.clone());
event.data.scope = Some(artifact.scope.clone());
event.data.approval_bound_tool = Some(artifact.bound_tool.clone());
event.data.approval_bound_resource = Some(artifact.bound_resource.clone());
}
event.data.approval_freshness = metadata.approval_freshness;
event.data.approval_failure_reason = metadata.approval_failure_reason.clone();
event.data.scope_type = metadata.scope_type.clone();
event.data.scope_value = metadata.scope_value.clone();
event.data.scope_match_mode = metadata.scope_match_mode.clone();
event.data.scope_evaluation_state = metadata.scope_evaluation_state.clone();
event.data.scope_failure_reason = metadata.scope_failure_reason.clone();
event.data.restrict_scope_present = metadata.restrict_scope_present;
event.data.restrict_scope_target = metadata.restrict_scope_target.clone();
event.data.restrict_scope_match = metadata.restrict_scope_match;
event.data.restrict_scope_reason = metadata.restrict_scope_reason.clone();
event.data.redaction_target = metadata.redaction_target.clone();
event.data.redaction_mode = metadata.redaction_mode.clone();
event.data.redaction_scope = metadata.redaction_scope.clone();
event.data.redaction_applied_state = metadata.redaction_applied_state.clone();
event.data.redaction_reason = metadata.redaction_reason.clone();
event.data.redaction_failure_reason = metadata.redaction_failure_reason.clone();
event.data.redact_args_present = metadata.redact_args_present;
event.data.redact_args_target = metadata.redact_args_target.clone();
event.data.redact_args_mode = metadata.redact_args_mode.clone();
event.data.redact_args_result = metadata.redact_args_result.clone();
event.data.redact_args_reason = metadata.redact_args_reason.clone();
event.data.fail_closed = metadata.fail_closed.clone();
event.data.lane = metadata.lane.clone();
event.data.principal = metadata.principal.clone();
event.data.auth_context_summary = metadata.auth_context_summary.clone();
event.data.auth_scheme = metadata.auth_scheme.clone();
event.data.auth_issuer = metadata.auth_issuer.clone();
event.data.delegated_from = metadata.delegated_from.clone();
event.data.delegation_depth = metadata.delegation_depth;
refresh_contract_projections(&mut event.data);
emitter.emit(&event);
}
fn observe_tool_definition(
tool: &mut serde_json::Value,
server_id: &str,
) -> Option<ToolDefinitionObservation> {
let name = tool.get("name").and_then(|n| n.as_str())?;
if name.trim().is_empty() {
return None;
}
let name = name.to_string();
let description = tool
.get("description")
.and_then(|d| d.as_str())
.map(|s| s.to_string());
let input_schema = tool
.get("inputSchema")
.or_else(|| tool.get("input_schema"))
.cloned();
let identity =
super::identity::ToolIdentity::new(server_id, &name, &input_schema, &description);
let binding = binding_from_tools_list_tool(tool, Some(server_id))
.ok()
.flatten();
tool.as_object_mut().and_then(|m| {
m.insert(
"tool_identity".to_string(),
serde_json::to_value(&identity).unwrap(),
)
});
Some(ToolDefinitionObservation {
name,
identity,
binding,
})
}
}
struct ToolDefinitionObservation {
name: String,
identity: super::identity::ToolIdentity,
binding: Option<ToolDefinitionBinding>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mcp::tool_definition::{
TOOL_DEFINITION_CANONICALIZATION_JCS_MCP_TOOL_DEFINITION_V1,
TOOL_DEFINITION_DIGEST_ALG_SHA256, TOOL_DEFINITION_SCHEMA_V1,
TOOL_DEFINITION_SOURCE_MCP_TOOLS_LIST,
};
use std::sync::Mutex as StdMutex;
struct CapturingEmitter {
events: StdMutex<Vec<DecisionEvent>>,
}
impl CapturingEmitter {
fn new() -> Self {
Self {
events: StdMutex::new(Vec::new()),
}
}
}
impl DecisionEmitter for CapturingEmitter {
fn emit(&self, event: &DecisionEvent) {
self.events.lock().unwrap().push(event.clone());
}
}
#[test]
fn event_source_accepts_assay_uri() {
validate_event_source("assay://myorg/myapp").unwrap();
}
#[test]
fn event_source_accepts_https_uri() {
validate_event_source("https://example.com/agent").unwrap();
}
#[test]
fn event_source_rejects_empty() {
assert!(validate_event_source("").is_err());
assert!(validate_event_source(" ").is_err());
}
#[test]
fn event_source_rejects_whitespace() {
assert!(validate_event_source("assay://myorg/my app").is_err());
assert!(validate_event_source("assay://myorg/\tmyapp").is_err());
}
#[test]
fn event_source_rejects_missing_scheme() {
assert!(validate_event_source("myorg/myapp").is_err());
assert!(validate_event_source("://myorg/myapp").is_err());
}
#[test]
fn event_source_rejects_did_and_urn() {
assert!(validate_event_source("did:example:123").is_err());
assert!(validate_event_source("urn:example:foo").is_err());
}
#[test]
fn event_source_rejects_scheme_starting_with_non_letter() {
assert!(validate_event_source("1assay://myorg/myapp").is_err());
assert!(validate_event_source("-assay://myorg/myapp").is_err());
}
#[test]
fn event_source_rejects_scheme_with_invalid_chars() {
assert!(validate_event_source("as_say://myorg/myapp").is_err());
assert!(validate_event_source("as@say://myorg/myapp").is_err());
}
#[test]
fn config_requires_event_source_when_logging_enabled() {
let raw = ProxyConfigRaw {
dry_run: false,
verbose: false,
audit_log_path: None,
decision_log_path: Some(std::path::PathBuf::from("decisions.ndjson")),
event_source: None,
server_id: "srv".to_string(),
};
let err = ProxyConfig::try_from_raw(raw).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("event_source is required"));
}
#[test]
fn config_allows_no_event_source_when_logging_disabled() {
let raw = ProxyConfigRaw {
dry_run: false,
verbose: false,
audit_log_path: None,
decision_log_path: None,
event_source: None,
server_id: "srv".to_string(),
};
ProxyConfig::try_from_raw(raw).unwrap();
}
#[test]
fn config_accepts_valid_event_source() {
let raw = ProxyConfigRaw {
dry_run: false,
verbose: false,
audit_log_path: None,
decision_log_path: Some(std::path::PathBuf::from("decisions.ndjson")),
event_source: Some("assay://myorg/myapp".to_string()),
server_id: "srv".to_string(),
};
let cfg = ProxyConfig::try_from_raw(raw).unwrap();
assert_eq!(cfg.event_source.as_deref(), Some("assay://myorg/myapp"));
}
#[test]
fn config_rejects_invalid_event_source_uri() {
let raw = ProxyConfigRaw {
dry_run: false,
verbose: false,
audit_log_path: None,
decision_log_path: Some(std::path::PathBuf::from("decisions.ndjson")),
event_source: Some("not a uri".to_string()),
server_id: "srv".to_string(),
};
assert!(ProxyConfig::try_from_raw(raw).is_err());
}
#[test]
fn observe_tool_definition_computes_identity_and_binding() {
let mut tool = serde_json::json!({
"name": "read_file",
"description": " Read files ",
"inputSchema": {"type": "object"},
"annotations": {"title": "Read"},
"x-assay-sig": {"signature": "opaque"}
});
let observation = McpProxy::observe_tool_definition(&mut tool, "server-a")
.expect("supported tool definition should be observed");
assert_eq!(observation.name, "read_file");
assert_eq!(observation.identity.server_id, "server-a");
assert!(observation.binding.is_some());
assert!(tool.get("tool_identity").is_some());
}
#[test]
fn emit_decision_projects_tool_definition_binding_atomically() {
let mut tool = serde_json::json!({
"name": "read_file",
"description": "Read files",
"inputSchema": {"type": "object"}
});
let observation = McpProxy::observe_tool_definition(&mut tool, "server-a")
.expect("supported tool definition should be observed");
let binding = observation.binding.expect("binding should be visible");
let emitter = Arc::new(CapturingEmitter::new());
let emitter_trait: Arc<dyn DecisionEmitter> = emitter.clone();
McpProxy::emit_decision(
&emitter_trait,
"assay://test",
"tc_tool_definition",
"read_file",
Decision::Allow,
reason_codes::P_POLICY_ALLOW,
None,
None,
&PolicyMatchMetadata::default(),
Some(&binding),
);
let events = emitter.events.lock().unwrap();
let data = &events[0].data;
assert!(data.tool_definition_digest.is_some());
assert_eq!(
data.tool_definition_digest_alg.as_deref(),
Some(TOOL_DEFINITION_DIGEST_ALG_SHA256)
);
assert_eq!(
data.tool_definition_canonicalization.as_deref(),
Some(TOOL_DEFINITION_CANONICALIZATION_JCS_MCP_TOOL_DEFINITION_V1)
);
assert_eq!(
data.tool_definition_schema.as_deref(),
Some(TOOL_DEFINITION_SCHEMA_V1)
);
assert_eq!(
data.tool_definition_source.as_deref(),
Some(TOOL_DEFINITION_SOURCE_MCP_TOOLS_LIST)
);
}
}