use serde::{Deserialize, Serialize};
use std::time::Duration;
use crate::tool_executor::ToolResult;
use crate::tool_registry::ToolEntry;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum Blame {
None,
Server,
Caller,
Network,
}
impl Blame {
pub fn as_str(&self) -> &'static str {
match self {
Blame::None => "none",
Blame::Server => "server",
Blame::Caller => "caller",
Blame::Network => "network",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum EpistemicTaint {
Untrusted,
SchemaValidated,
Elevated,
}
impl EpistemicTaint {
pub fn as_str(&self) -> &'static str {
match self {
EpistemicTaint::Untrusted => "untrusted",
EpistemicTaint::SchemaValidated => "schema_validated",
EpistemicTaint::Elevated => "elevated",
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct McpCallResult {
pub tool_name: String,
pub output: String,
pub success: bool,
pub blame: Blame,
pub taint: EpistemicTaint,
pub server: String,
pub effects: Vec<String>,
}
impl McpCallResult {
pub fn to_tool_result(&self) -> ToolResult {
ToolResult {
success: self.success,
output: self.output.clone(),
tool_name: self.tool_name.clone(),
}
}
}
#[derive(Debug, Serialize)]
struct JsonRpcRequest {
jsonrpc: &'static str,
method: String,
params: serde_json::Value,
id: u64,
}
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
#[allow(dead_code)]
jsonrpc: String,
result: Option<serde_json::Value>,
error: Option<JsonRpcError>,
#[allow(dead_code)]
id: u64,
}
#[derive(Debug, Deserialize)]
struct JsonRpcError {
code: i64,
message: String,
}
pub struct McpClient {
server_url: String,
timeout: Duration,
next_id: u64,
}
impl McpClient {
pub fn new(server_url: &str, timeout: Duration) -> Self {
McpClient {
server_url: server_url.to_string(),
timeout,
next_id: 1,
}
}
pub fn call_tool(&mut self, tool_name: &str, argument: &str) -> McpCallResult {
let request_id = self.next_id;
self.next_id += 1;
let params = if argument.trim_start().starts_with('{') {
serde_json::from_str(argument).unwrap_or_else(|_| {
serde_json::json!({
"name": tool_name,
"arguments": { "input": argument }
})
})
} else {
serde_json::json!({
"name": tool_name,
"arguments": { "input": argument }
})
};
let rpc_request = JsonRpcRequest {
jsonrpc: "2.0",
method: "tools/call".to_string(),
params,
id: request_id,
};
match self.send_rpc(&rpc_request) {
Ok(response) => self.process_response(tool_name, response),
Err(e) => McpCallResult {
tool_name: tool_name.to_string(),
output: format!("ℰMCP error: {e}"),
success: false,
blame: Blame::Network,
taint: EpistemicTaint::Untrusted,
server: self.server_url.clone(),
effects: vec!["network".to_string()],
},
}
}
pub fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
let request_id = self.next_id;
self.next_id += 1;
let rpc_request = JsonRpcRequest {
jsonrpc: "2.0",
method: "tools/list".to_string(),
params: serde_json::json!({}),
id: request_id,
};
let response = self.send_rpc(&rpc_request)?;
match response.result {
Some(val) => {
if let Some(tools) = val.get("tools").and_then(|t| t.as_array()) {
let infos = tools
.iter()
.filter_map(|t| {
Some(McpToolInfo {
name: t.get("name")?.as_str()?.to_string(),
description: t
.get("description")
.and_then(|d| d.as_str())
.unwrap_or("")
.to_string(),
})
})
.collect();
Ok(infos)
} else {
Ok(Vec::new())
}
}
None => Err(response
.error
.map(|e| format!("JSON-RPC error {}: {}", e.code, e.message))
.unwrap_or_else(|| "empty response".to_string())),
}
}
pub fn read_resource(&mut self, uri: &str) -> McpCallResult {
let request_id = self.next_id;
self.next_id += 1;
let rpc_request = JsonRpcRequest {
jsonrpc: "2.0",
method: "resources/read".to_string(),
params: serde_json::json!({ "uri": uri }),
id: request_id,
};
match self.send_rpc(&rpc_request) {
Ok(response) => {
match response.result {
Some(val) => {
let text = val
.get("contents")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|item| item.get("text"))
.and_then(|t| t.as_str())
.unwrap_or_else(|| {
""
});
let output = if text.is_empty() {
serde_json::to_string(&val).unwrap_or_default()
} else {
text.to_string()
};
McpCallResult {
tool_name: format!("resource:{uri}"),
output,
success: true,
blame: Blame::None,
taint: EpistemicTaint::Untrusted, server: self.server_url.clone(),
effects: vec!["network".to_string(), "io".to_string()],
}
}
None => McpCallResult {
tool_name: format!("resource:{uri}"),
output: response
.error
.map(|e| format!("JSON-RPC error {}: {}", e.code, e.message))
.unwrap_or_else(|| "empty response".to_string()),
success: false,
blame: Blame::Server,
taint: EpistemicTaint::Untrusted,
server: self.server_url.clone(),
effects: vec!["network".to_string()],
},
}
}
Err(e) => McpCallResult {
tool_name: format!("resource:{uri}"),
output: format!("ℰMCP error: {e}"),
success: false,
blame: Blame::Network,
taint: EpistemicTaint::Untrusted,
server: self.server_url.clone(),
effects: vec!["network".to_string()],
},
}
}
fn send_rpc(&self, request: &JsonRpcRequest) -> Result<JsonRpcResponse, String> {
let client = reqwest::blocking::Client::builder()
.timeout(self.timeout)
.build()
.map_err(|e| format!("failed to create HTTP client: {e}"))?;
let body = serde_json::to_string(request)
.map_err(|e| format!("failed to serialize request: {e}"))?;
let response = client
.post(&self.server_url)
.header("Content-Type", "application/json")
.header("X-Axon-EMCP", "1.0")
.body(body)
.send()
.map_err(|e| {
if e.is_timeout() {
format!("MCP server timed out after {}s", self.timeout.as_secs())
} else if e.is_connect() {
format!("cannot connect to MCP server at {}", self.server_url)
} else {
format!("MCP request failed: {e}")
}
})?;
let status = response.status();
let text = response
.text()
.map_err(|e| format!("failed to read MCP response: {e}"))?;
if !status.is_success() {
return Err(format!("MCP server returned HTTP {}: {}", status.as_u16(), text));
}
serde_json::from_str(&text)
.map_err(|e| format!("invalid JSON-RPC response: {e}"))
}
fn process_response(&self, tool_name: &str, response: JsonRpcResponse) -> McpCallResult {
match response.result {
Some(val) => {
let output = val
.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|item| item.get("text"))
.and_then(|t| t.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| serde_json::to_string(&val).unwrap_or_default());
McpCallResult {
tool_name: tool_name.to_string(),
output,
success: true,
blame: Blame::None,
taint: EpistemicTaint::Untrusted, server: self.server_url.clone(),
effects: vec![
"network".to_string(),
"epistemic:speculate".to_string(),
],
}
}
None => {
let (blame, msg) = match response.error {
Some(e) => {
let b = if (-32603..=-32600).contains(&e.code) {
Blame::Caller
} else {
Blame::Server
};
(b, format!("JSON-RPC error {}: {}", e.code, e.message))
}
None => (Blame::Server, "empty response from MCP server".to_string()),
};
McpCallResult {
tool_name: tool_name.to_string(),
output: msg,
success: false,
blame,
taint: EpistemicTaint::Untrusted,
server: self.server_url.clone(),
effects: vec!["network".to_string()],
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct McpToolInfo {
pub name: String,
pub description: String,
}
pub fn dispatch_mcp(entry: &ToolEntry, argument: &str) -> ToolResult {
let server_url = entry.runtime.trim();
if server_url.is_empty() {
return ToolResult {
success: false,
output: format!(
"ℰMCP tool '{}': no server URL. Set runtime: \"http://...\" in tool definition.",
entry.name
),
tool_name: entry.name.clone(),
};
}
if !server_url.starts_with("http://") && !server_url.starts_with("https://") {
return ToolResult {
success: false,
output: format!(
"ℰMCP tool '{}': invalid server URL '{}'. Must start with http:// or https://.",
entry.name, server_url
),
tool_name: entry.name.clone(),
};
}
let timeout = crate::http_tool::parse_timeout_pub(&entry.timeout)
.unwrap_or(Duration::from_secs(30));
let mut client = McpClient::new(server_url, timeout);
let mcp_result = client.call_tool(&entry.name, argument);
if !mcp_result.success {
ToolResult {
success: false,
output: format!(
"{} [blame={}]",
mcp_result.output,
mcp_result.blame.as_str()
),
tool_name: entry.name.clone(),
}
} else {
mcp_result.to_tool_result()
}
}
#[derive(Debug, Default)]
pub struct BlameTracker {
records: Vec<BlameRecord>,
}
#[derive(Debug, Clone, Serialize)]
pub struct BlameRecord {
pub tool_name: String,
pub server: String,
pub blame: Blame,
pub taint: EpistemicTaint,
pub message: String,
}
impl BlameTracker {
pub fn new() -> Self {
BlameTracker { records: Vec::new() }
}
pub fn record(&mut self, result: &McpCallResult) {
self.records.push(BlameRecord {
tool_name: result.tool_name.clone(),
server: result.server.clone(),
blame: result.blame,
taint: result.taint,
message: if result.success {
"ok".to_string()
} else {
result.output.clone()
},
});
}
pub fn total(&self) -> usize {
self.records.len()
}
pub fn server_faults(&self) -> usize {
self.records.iter().filter(|r| r.blame == Blame::Server).count()
}
pub fn caller_faults(&self) -> usize {
self.records.iter().filter(|r| r.blame == Blame::Caller).count()
}
pub fn network_faults(&self) -> usize {
self.records.iter().filter(|r| r.blame == Blame::Network).count()
}
pub fn records(&self) -> &[BlameRecord] {
&self.records
}
}
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use crate::backends::sse_streaming::LineBuffer;
use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
pub struct McpStreamingTool {
name: String,
server_url: String,
timeout: Duration,
}
impl McpStreamingTool {
pub fn from_entry(entry: &ToolEntry) -> Result<Self, String> {
let url = entry.runtime.trim();
if url.is_empty() {
return Err(format!(
"ℰMCP tool '{}': no server URL. Set runtime: \"http://...\" in tool definition.",
entry.name
));
}
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(format!(
"ℰMCP tool '{}': invalid server URL '{}'. Must start with http:// or https://.",
entry.name, url
));
}
let timeout =
crate::http_tool::parse_timeout_pub(&entry.timeout).unwrap_or(Duration::from_secs(30));
Ok(Self {
name: entry.name.clone(),
server_url: url.to_string(),
timeout,
})
}
pub fn new(name: String, server_url: String, timeout: Duration) -> Self {
Self {
name,
server_url,
timeout,
}
}
}
fn build_mcp_request_body(tool_name: &str, args: &str, request_id: u64) -> String {
let params = if args.trim_start().starts_with('{') {
serde_json::from_str::<serde_json::Value>(args).unwrap_or_else(|_| {
serde_json::json!({
"name": tool_name,
"arguments": { "input": args }
})
})
} else {
serde_json::json!({
"name": tool_name,
"arguments": { "input": args }
})
};
serde_json::json!({
"jsonrpc": "2.0",
"method": "tools/call",
"params": params,
"id": request_id,
})
.to_string()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum McpFramingMode {
NdjsonStream,
SingleResponse,
}
fn classify_mcp_framing(content_type: &str) -> McpFramingMode {
let lc = content_type.to_ascii_lowercase();
if lc.contains("application/x-ndjson") || lc.contains("application/jsonl") {
McpFramingMode::NdjsonStream
} else {
McpFramingMode::SingleResponse
}
}
enum McpEnvelope {
Notification { delta: String },
Result { delta: String },
Error { message: String },
}
fn parse_mcp_envelope(line: &str) -> Option<McpEnvelope> {
if line.trim().is_empty() {
return None;
}
let value: serde_json::Value = serde_json::from_str(line).ok()?;
if let Some(err) = value.get("error") {
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let message = err
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown JSON-RPC error");
let blame = if (-32603..=-32600).contains(&code) {
"caller"
} else {
"server"
};
return Some(McpEnvelope::Error {
message: format!("JSON-RPC error {code}: {message} [blame={blame}]"),
});
}
if let Some(method) = value.get("method").and_then(|m| m.as_str()) {
if method == "notifications/message" || method == "notifications/progress" {
let delta = extract_notification_text(value.get("params"));
return Some(McpEnvelope::Notification { delta });
}
return None;
}
if let Some(result) = value.get("result") {
let delta = extract_result_text(result);
return Some(McpEnvelope::Result { delta });
}
None
}
fn extract_notification_text(params: Option<&serde_json::Value>) -> String {
let Some(p) = params else { return String::new() };
for key in ["data", "text", "message"] {
if let Some(val) = p.get(key).and_then(|v| v.as_str()) {
if !val.is_empty() {
return val.to_string();
}
}
}
serde_json::to_string(p).unwrap_or_default()
}
fn extract_result_text(result: &serde_json::Value) -> String {
result
.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|item| item.get("text"))
.and_then(|t| t.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| serde_json::to_string(result).unwrap_or_default())
}
fn fire_cancel_notification(server_url: String, request_id: u64, name: String) {
tokio::spawn(async move {
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/cancelled",
"params": { "requestId": request_id },
})
.to_string();
if let Ok(client) = reqwest::Client::builder()
.timeout(Duration::from_millis(500))
.build()
{
let _ = client
.post(&server_url)
.header("Content-Type", "application/json")
.header("X-Axon-EMCP", "1.0")
.header("X-Axon-Tool", name)
.body(body)
.send()
.await;
}
});
}
#[async_trait]
impl Tool for McpStreamingTool {
async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
let entry = ToolEntry {
name: self.name.clone(),
provider: "mcp".to_string(),
timeout: format!("{}s", self.timeout.as_secs()),
runtime: self.server_url.clone(),
sandbox: None,
max_results: None,
output_schema: String::new(),
effect_row: Vec::new(),
source: crate::tool_registry::ToolSource::Program,
is_streaming: false,
};
let args_owned = args;
match tokio::task::spawn_blocking(move || dispatch_mcp(&entry, &args_owned)).await {
Ok(result) => result,
Err(e) => ToolResult {
success: false,
output: format!("ℰMCP tool '{}': blocking task join failed: {e}", self.name),
tool_name: self.name.clone(),
},
}
}
async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
let server_url = self.server_url.clone();
let name = self.name.clone();
let timeout = self.timeout;
let cancel = ctx.cancel.clone();
let request_id = ctx.trace_id.max(1);
let body = build_mcp_request_body(&name, &args, request_id);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ToolChunk>();
tokio::spawn(async move {
let send_terminator = |reason: ToolFinishReason| {
let _ = tx.send(ToolChunk::terminator("", reason));
};
if cancel.is_cancelled() {
fire_cancel_notification(server_url.clone(), request_id, name.clone());
send_terminator(ToolFinishReason::Cancelled);
return;
}
let client = match reqwest::Client::builder().timeout(timeout).build() {
Ok(c) => c,
Err(e) => {
send_terminator(ToolFinishReason::Error {
message: format!(
"ℰMCP tool '{name}': failed to build async client: {e}"
),
});
return;
}
};
let response = match client
.post(&server_url)
.header("Content-Type", "application/json")
.header("X-Axon-EMCP", "1.0")
.header("Accept", "application/x-ndjson, application/json")
.body(body)
.send()
.await
{
Ok(r) => r,
Err(e) => {
let message = if e.is_timeout() {
format!(
"ℰMCP tool '{name}': server timed out after {}s",
timeout.as_secs()
)
} else if e.is_connect() {
format!("ℰMCP tool '{name}': cannot connect to MCP server at {server_url}")
} else {
format!("ℰMCP tool '{name}': MCP request failed: {e}")
};
send_terminator(ToolFinishReason::Error { message });
return;
}
};
let status = response.status();
if !status.is_success() {
let body_text = response.text().await.unwrap_or_default();
let truncated = if body_text.len() > 200 {
format!("{}...", &body_text[..200])
} else {
body_text
};
send_terminator(ToolFinishReason::Error {
message: format!(
"ℰMCP server '{name}' returned HTTP {}: {}",
status.as_u16(),
truncated
),
});
return;
}
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
let framing = classify_mcp_framing(&content_type);
let mut byte_stream = response.bytes_stream();
let drain_result = match framing {
McpFramingMode::NdjsonStream => {
drain_mcp_ndjson(&mut byte_stream, &cancel, &tx).await
}
McpFramingMode::SingleResponse => {
drain_mcp_single(&mut byte_stream, &cancel, &tx).await
}
};
match drain_result {
McpDrainOutcome::Completed => send_terminator(ToolFinishReason::Stop),
McpDrainOutcome::CompletedWith(reason) => send_terminator(reason),
McpDrainOutcome::Cancelled => {
fire_cancel_notification(server_url.clone(), request_id, name.clone());
send_terminator(ToolFinishReason::Cancelled);
}
McpDrainOutcome::Error(message) => {
send_terminator(ToolFinishReason::Error { message })
}
}
});
Box::pin(futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|chunk| (chunk, rx))
}))
}
fn is_streaming(&self) -> bool {
true
}
}
enum McpDrainOutcome {
Completed,
CompletedWith(ToolFinishReason),
Cancelled,
Error(String),
}
async fn drain_mcp_ndjson<S>(
byte_stream: &mut S,
cancel: &crate::cancel_token::CancellationFlag,
tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
) -> McpDrainOutcome
where
S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
{
let mut line_buf = LineBuffer::new();
loop {
if cancel.is_cancelled() {
return McpDrainOutcome::Cancelled;
}
match byte_stream.next().await {
None => break,
Some(Err(e)) => {
return McpDrainOutcome::Error(format!("MCP stream chunk error: {e}"))
}
Some(Ok(bytes)) => {
let lines = line_buf.push(&bytes);
for line in lines {
if let Some(env) = parse_mcp_envelope(&line) {
match env {
McpEnvelope::Notification { delta } => {
if !delta.is_empty()
&& tx.send(ToolChunk::intermediate(delta)).is_err()
{
return McpDrainOutcome::Cancelled;
}
}
McpEnvelope::Result { delta } => {
if !delta.is_empty() {
let _ = tx.send(ToolChunk::intermediate(delta));
}
return McpDrainOutcome::Completed;
}
McpEnvelope::Error { message } => {
return McpDrainOutcome::CompletedWith(
ToolFinishReason::Error { message },
);
}
}
}
}
}
}
}
if let Some(line) = line_buf.flush() {
if let Some(env) = parse_mcp_envelope(&line) {
match env {
McpEnvelope::Notification { delta } => {
if !delta.is_empty() {
let _ = tx.send(ToolChunk::intermediate(delta));
}
}
McpEnvelope::Result { delta } => {
if !delta.is_empty() {
let _ = tx.send(ToolChunk::intermediate(delta));
}
return McpDrainOutcome::Completed;
}
McpEnvelope::Error { message } => {
return McpDrainOutcome::CompletedWith(ToolFinishReason::Error {
message,
});
}
}
}
}
McpDrainOutcome::Completed
}
async fn drain_mcp_single<S>(
byte_stream: &mut S,
cancel: &crate::cancel_token::CancellationFlag,
tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
) -> McpDrainOutcome
where
S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
{
let mut acc: Vec<u8> = Vec::new();
loop {
if cancel.is_cancelled() {
return McpDrainOutcome::Cancelled;
}
match byte_stream.next().await {
None => break,
Some(Err(e)) => {
return McpDrainOutcome::Error(format!("MCP body chunk error: {e}"))
}
Some(Ok(bytes)) => acc.extend_from_slice(&bytes),
}
}
let body_text = String::from_utf8_lossy(&acc).into_owned();
if body_text.trim().is_empty() {
return McpDrainOutcome::Completed;
}
let value: serde_json::Value = match serde_json::from_str(&body_text) {
Ok(v) => v,
Err(e) => {
return McpDrainOutcome::Error(format!(
"ℰMCP server returned unparseable JSON-RPC response: {e}"
));
}
};
if let Some(err) = value.get("error") {
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let message = err
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown JSON-RPC error");
let blame = if (-32603..=-32600).contains(&code) {
"caller"
} else {
"server"
};
return McpDrainOutcome::CompletedWith(ToolFinishReason::Error {
message: format!("JSON-RPC error {code}: {message} [blame={blame}]"),
});
}
if let Some(result) = value.get("result") {
let delta = extract_result_text(result);
if !delta.is_empty() {
let _ = tx.send(ToolChunk::intermediate(delta));
}
return McpDrainOutcome::Completed;
}
McpDrainOutcome::Error("ℰMCP response has neither result nor error".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tool_registry::{ToolEntry, ToolSource};
fn make_mcp_entry(name: &str, url: &str, timeout: &str) -> ToolEntry {
ToolEntry {
name: name.to_string(),
provider: "mcp".to_string(),
timeout: timeout.to_string(),
runtime: url.to_string(),
sandbox: None,
max_results: None,
output_schema: "JSON".to_string(),
effect_row: vec!["network".to_string(), "epistemic:speculate".to_string()],
source: ToolSource::Program,
is_streaming: false,
}
}
#[test]
fn blame_variants() {
assert_eq!(Blame::None.as_str(), "none");
assert_eq!(Blame::Server.as_str(), "server");
assert_eq!(Blame::Caller.as_str(), "caller");
assert_eq!(Blame::Network.as_str(), "network");
}
#[test]
fn taint_levels() {
assert_eq!(EpistemicTaint::Untrusted.as_str(), "untrusted");
assert_eq!(EpistemicTaint::SchemaValidated.as_str(), "schema_validated");
assert_eq!(EpistemicTaint::Elevated.as_str(), "elevated");
}
#[test]
fn mcp_data_born_untrusted() {
let result = McpCallResult {
tool_name: "DataTool".into(),
output: "some data".into(),
success: true,
blame: Blame::None,
taint: EpistemicTaint::Untrusted,
server: "http://localhost:3000".into(),
effects: vec!["network".into()],
};
assert_eq!(result.taint, EpistemicTaint::Untrusted);
}
#[test]
fn mcp_result_to_tool_result() {
let mcp = McpCallResult {
tool_name: "TestTool".into(),
output: "hello".into(),
success: true,
blame: Blame::None,
taint: EpistemicTaint::Untrusted,
server: "http://localhost".into(),
effects: vec![],
};
let tr = mcp.to_tool_result();
assert!(tr.success);
assert_eq!(tr.output, "hello");
assert_eq!(tr.tool_name, "TestTool");
}
#[test]
fn dispatch_empty_url_fails() {
let entry = make_mcp_entry("McpTool", "", "5s");
let result = dispatch_mcp(&entry, "arg");
assert!(!result.success);
assert!(result.output.contains("no server URL"));
}
#[test]
fn dispatch_invalid_scheme_fails() {
let entry = make_mcp_entry("McpTool", "ws://localhost:3000", "5s");
let result = dispatch_mcp(&entry, "arg");
assert!(!result.success);
assert!(result.output.contains("invalid server URL"));
}
#[test]
fn dispatch_connection_refused() {
let entry = make_mcp_entry("McpTool", "http://127.0.0.1:1/mcp", "2s");
let result = dispatch_mcp(&entry, "test");
assert!(!result.success);
assert!(result.output.contains("blame=network"));
}
#[test]
fn blame_tracker_accumulates() {
let mut tracker = BlameTracker::new();
tracker.record(&McpCallResult {
tool_name: "A".into(),
output: "ok".into(),
success: true,
blame: Blame::None,
taint: EpistemicTaint::Untrusted,
server: "s1".into(),
effects: vec![],
});
tracker.record(&McpCallResult {
tool_name: "B".into(),
output: "server error".into(),
success: false,
blame: Blame::Server,
taint: EpistemicTaint::Untrusted,
server: "s1".into(),
effects: vec![],
});
tracker.record(&McpCallResult {
tool_name: "C".into(),
output: "bad params".into(),
success: false,
blame: Blame::Caller,
taint: EpistemicTaint::Untrusted,
server: "s2".into(),
effects: vec![],
});
tracker.record(&McpCallResult {
tool_name: "D".into(),
output: "timeout".into(),
success: false,
blame: Blame::Network,
taint: EpistemicTaint::Untrusted,
server: "s1".into(),
effects: vec![],
});
assert_eq!(tracker.total(), 4);
assert_eq!(tracker.server_faults(), 1);
assert_eq!(tracker.caller_faults(), 1);
assert_eq!(tracker.network_faults(), 1);
}
#[test]
fn mcp_client_creates() {
let client = McpClient::new("http://localhost:3000", Duration::from_secs(10));
assert_eq!(client.server_url, "http://localhost:3000");
assert_eq!(client.timeout, Duration::from_secs(10));
assert_eq!(client.next_id, 1);
}
#[test]
fn process_success_response() {
let client = McpClient::new("http://localhost", Duration::from_secs(5));
let response = JsonRpcResponse {
jsonrpc: "2.0".into(),
result: Some(serde_json::json!({
"content": [{"type": "text", "text": "result data"}]
})),
error: None,
id: 1,
};
let result = client.process_response("TestTool", response);
assert!(result.success);
assert_eq!(result.output, "result data");
assert_eq!(result.blame, Blame::None);
assert_eq!(result.taint, EpistemicTaint::Untrusted); }
#[test]
fn process_server_error_response() {
let client = McpClient::new("http://localhost", Duration::from_secs(5));
let response = JsonRpcResponse {
jsonrpc: "2.0".into(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "internal server error".into(),
}),
id: 1,
};
let result = client.process_response("TestTool", response);
assert!(!result.success);
assert_eq!(result.blame, Blame::Server);
assert!(result.output.contains("-32000"));
}
#[test]
fn process_caller_error_response() {
let client = McpClient::new("http://localhost", Duration::from_secs(5));
let response = JsonRpcResponse {
jsonrpc: "2.0".into(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "invalid params".into(),
}),
id: 1,
};
let result = client.process_response("TestTool", response);
assert!(!result.success);
assert_eq!(result.blame, Blame::Caller);
}
#[test]
fn mcp_calls_carry_epistemic_effects() {
let client = McpClient::new("http://localhost", Duration::from_secs(5));
let response = JsonRpcResponse {
jsonrpc: "2.0".into(),
result: Some(serde_json::json!({"content": [{"text": "data"}]})),
error: None,
id: 1,
};
let result = client.process_response("Tool", response);
assert!(result.effects.contains(&"network".to_string()));
assert!(result.effects.contains(&"epistemic:speculate".to_string()));
}
#[test]
fn blame_serializes() {
let json = serde_json::to_string(&Blame::Server).unwrap();
assert_eq!(json, "\"Server\"");
}
#[test]
fn mcp_call_result_serializes() {
let result = McpCallResult {
tool_name: "T".into(),
output: "out".into(),
success: true,
blame: Blame::None,
taint: EpistemicTaint::Untrusted,
server: "http://s".into(),
effects: vec!["network".into()],
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("\"blame\":\"None\""));
assert!(json.contains("\"taint\":\"Untrusted\""));
}
#[test]
fn mcp_streaming_tool_from_entry_accepts_valid_http_url() {
let entry = make_mcp_entry("McpTool", "http://localhost:3000/mcp", "10s");
let t = McpStreamingTool::from_entry(&entry).expect("ok");
assert_eq!(t.name, "McpTool");
assert_eq!(t.server_url, "http://localhost:3000/mcp");
assert_eq!(t.timeout, Duration::from_secs(10));
assert!(t.is_streaming());
}
#[test]
fn mcp_streaming_tool_from_entry_accepts_https_url() {
let entry = make_mcp_entry("McpTool", "https://api.example.com/mcp", "5s");
let t = McpStreamingTool::from_entry(&entry).expect("ok");
assert!(t.server_url.starts_with("https://"));
}
#[test]
fn mcp_streaming_tool_from_entry_rejects_empty_url() {
let entry = make_mcp_entry("McpTool", "", "10s");
let err = McpStreamingTool::from_entry(&entry).err().unwrap();
assert!(err.contains("no server URL"));
}
#[test]
fn mcp_streaming_tool_from_entry_rejects_invalid_scheme() {
let entry = make_mcp_entry("McpTool", "ws://localhost:3000", "10s");
let err = McpStreamingTool::from_entry(&entry).err().unwrap();
assert!(err.contains("invalid server URL"));
}
#[test]
fn mcp_streaming_tool_default_timeout_when_empty() {
let entry = make_mcp_entry("McpTool", "http://localhost", "");
let t = McpStreamingTool::from_entry(&entry).expect("ok");
assert_eq!(t.timeout, Duration::from_secs(30));
}
#[test]
fn classify_mcp_framing_ndjson() {
assert_eq!(
classify_mcp_framing("application/x-ndjson"),
McpFramingMode::NdjsonStream
);
assert_eq!(
classify_mcp_framing("application/x-ndjson; charset=utf-8"),
McpFramingMode::NdjsonStream
);
assert_eq!(
classify_mcp_framing("application/jsonl"),
McpFramingMode::NdjsonStream
);
}
#[test]
fn classify_mcp_framing_single_default() {
assert_eq!(
classify_mcp_framing("application/json"),
McpFramingMode::SingleResponse
);
assert_eq!(
classify_mcp_framing("text/plain"),
McpFramingMode::SingleResponse
);
assert_eq!(classify_mcp_framing(""), McpFramingMode::SingleResponse);
}
#[test]
fn parse_envelope_notification_message_data() {
let line = r#"{"jsonrpc":"2.0","method":"notifications/message","params":{"data":"partial-1"}}"#;
match parse_mcp_envelope(line) {
Some(McpEnvelope::Notification { delta }) => assert_eq!(delta, "partial-1"),
other => panic!("expected Notification, got {}", envelope_label(&other)),
}
}
#[test]
fn parse_envelope_notification_progress_text() {
let line = r#"{"jsonrpc":"2.0","method":"notifications/progress","params":{"text":"50% done"}}"#;
match parse_mcp_envelope(line) {
Some(McpEnvelope::Notification { delta }) => assert_eq!(delta, "50% done"),
other => panic!("expected Notification, got {}", envelope_label(&other)),
}
}
#[test]
fn parse_envelope_notification_message_field_fallback() {
let line = r#"{"jsonrpc":"2.0","method":"notifications/message","params":{"message":"hi"}}"#;
match parse_mcp_envelope(line) {
Some(McpEnvelope::Notification { delta }) => assert_eq!(delta, "hi"),
other => panic!("expected Notification, got {}", envelope_label(&other)),
}
}
#[test]
fn parse_envelope_response_with_content() {
let line = r#"{"jsonrpc":"2.0","result":{"content":[{"type":"text","text":"final answer"}]},"id":1}"#;
match parse_mcp_envelope(line) {
Some(McpEnvelope::Result { delta }) => assert_eq!(delta, "final answer"),
other => panic!("expected Result, got {}", envelope_label(&other)),
}
}
#[test]
fn parse_envelope_error_server_blame() {
let line = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"internal"},"id":1}"#;
match parse_mcp_envelope(line) {
Some(McpEnvelope::Error { message }) => {
assert!(message.contains("-32000"));
assert!(message.contains("blame=server"));
}
other => panic!("expected Error, got {}", envelope_label(&other)),
}
}
#[test]
fn parse_envelope_error_caller_blame_invalid_params() {
let line = r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"bad params"},"id":1}"#;
match parse_mcp_envelope(line) {
Some(McpEnvelope::Error { message }) => {
assert!(message.contains("blame=caller"));
}
other => panic!("expected Error, got {}", envelope_label(&other)),
}
}
#[test]
fn parse_envelope_blank_line_returns_none() {
assert!(parse_mcp_envelope("").is_none());
assert!(parse_mcp_envelope(" ").is_none());
}
#[test]
fn parse_envelope_unknown_method_returns_none() {
let line = r#"{"jsonrpc":"2.0","method":"notifications/heartbeat","params":{}}"#;
assert!(parse_mcp_envelope(line).is_none());
}
#[test]
fn parse_envelope_malformed_json_returns_none() {
assert!(parse_mcp_envelope("not json").is_none());
assert!(parse_mcp_envelope("{").is_none());
}
#[test]
fn build_mcp_request_body_json_args_passthrough() {
let body = build_mcp_request_body("Tool", r#"{"name":"Tool","arguments":{"x":1}}"#, 42);
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(v["jsonrpc"], "2.0");
assert_eq!(v["method"], "tools/call");
assert_eq!(v["id"], 42);
assert_eq!(v["params"]["arguments"]["x"], 1);
}
#[test]
fn build_mcp_request_body_plain_args_wrapped() {
let body = build_mcp_request_body("Tool", "search query", 1);
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(v["params"]["name"], "Tool");
assert_eq!(v["params"]["arguments"]["input"], "search query");
}
fn envelope_label(env: &Option<McpEnvelope>) -> &'static str {
match env {
None => "None",
Some(McpEnvelope::Notification { .. }) => "Notification",
Some(McpEnvelope::Result { .. }) => "Result",
Some(McpEnvelope::Error { .. }) => "Error",
}
}
}