use std::sync::Arc;
use serde_json::{Value, json};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use stygian_browser::{BrowserPool, mcp::McpBrowserServer};
use stygian_graph::mcp::McpGraphServer;
use stygian_proxy::mcp::McpProxyServer;
const SUPPORTED_PROTOCOL_VERSIONS: &[&str] = &["2025-11-25", "2025-06-18", "2024-11-05"];
pub struct McpAggregator {
browser: Arc<McpBrowserServer>,
proxy: Arc<McpProxyServer>,
proxy_token: CancellationToken,
proxy_bg: Option<JoinHandle<()>>,
}
impl McpAggregator {
pub async fn try_new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let pool = BrowserPool::new(stygian_browser::BrowserConfig::default()).await?;
let browser = Arc::new(McpBrowserServer::new(pool));
let proxy = Arc::new(McpProxyServer::new()?);
let (proxy_token, proxy_bg) = proxy.start_background();
Ok(Self {
browser,
proxy,
proxy_token,
proxy_bg: Some(proxy_bg),
})
}
pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("stygian-mcp aggregator starting (stdin/stdout mode)");
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin).lines();
let mut stdout = stdout;
while let Some(line) = reader.next_line().await? {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
debug!(?line, "MCP request");
let response = match serde_json::from_str::<Value>(&line) {
Ok(req) => self.handle(&req).await,
Err(e) => Some(error_response(
&Value::Null,
-32700,
&format!("Parse error: {e}"),
)),
};
if let Some(response) = response {
let id = response.get("id").cloned().unwrap_or(Value::Null);
let mut out = serde_json::to_string(&response).unwrap_or_else(|e| {
tracing::error!("BUG: failed to serialize MCP response: {e}");
serde_json::to_string(&error_response(
&id,
-32603,
"Internal serialization error",
))
.unwrap_or_default()
});
out.push('\n');
stdout.write_all(out.as_bytes()).await?;
stdout.flush().await?;
}
}
info!("stygian-mcp aggregator stopping (stdin closed)");
self.proxy_token.cancel();
if let Some(bg) = self.proxy_bg.take()
&& let Err(e) = bg.await
{
warn!("proxy background task panicked during shutdown: {e:?}");
}
Ok(())
}
async fn handle(&self, req: &Value) -> Option<Value> {
let is_well_formed_notification = is_jsonrpc_notification(req);
let id = req.get("id").unwrap_or(&Value::Null);
if req.get("jsonrpc").and_then(Value::as_str) != Some("2.0") {
return Some(error_response(
id,
-32600,
"Invalid request: expected jsonrpc='2.0'",
));
}
let Some(method) = req.get("method").and_then(Value::as_str) else {
return Some(error_response(
id,
-32600,
"Invalid request: missing string 'method'",
));
};
let response = match method {
"initialize" => handle_initialize(req),
"initialized" | "notifications/initialized" | "ping" => ok_response(id, &json!({})),
"tools/list" => self.handle_tools_list(id).await,
"tools/call" => self.handle_tools_call(id, req).await,
"resources/list" => self.handle_resources_list(id).await,
"resources/read" => self.handle_resources_read(id, req).await,
other => error_response(id, -32601, &format!("Method not found: {other}")),
};
if is_well_formed_notification {
None
} else {
Some(response)
}
}
async fn handle_tools_list(&self, id: &Value) -> Value {
let list_req = json!({"jsonrpc":"2.0","id":0,"method":"tools/list","params":{}});
let graph_resp = McpGraphServer::handle_request(&list_req).await;
let graph_tools: Vec<Value> = graph_resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(serde_json::Value::as_array)
.cloned()
.unwrap_or_default()
.into_iter()
.map(|mut t| {
if let Some(obj) = t.as_object_mut() {
let name = obj
.get("name")
.and_then(serde_json::Value::as_str)
.map(str::to_owned);
if let Some(name) = name {
let prefixed = format!("graph_{name}");
let desc = obj
.get("description")
.and_then(serde_json::Value::as_str)
.unwrap_or("")
.to_string();
obj.insert("name".to_string(), json!(prefixed));
obj.insert("description".to_string(), json!(format!("[graph] {desc}")));
}
}
t
})
.collect();
let browser_resp = self.browser.dispatch(&list_req).await;
let browser_tools: Vec<Value> = browser_resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(serde_json::Value::as_array)
.cloned()
.unwrap_or_default();
let proxy_resp = self.proxy.handle_request(&list_req).await;
let proxy_tools: Vec<Value> = proxy_resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(serde_json::Value::as_array)
.cloned()
.unwrap_or_default();
let cross_tools = vec![
json!({
"name": "scrape_proxied",
"description": "Fetch a URL through a proxy automatically selected from the pool. Acquires a proxy, performs an HTTP scrape, then releases the proxy. Returns the scraped content.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "Target URL to scrape" },
"timeout_secs": { "type": "integer", "description": "Request timeout in seconds (default: 30)" }
},
"required": ["url"]
}
}),
json!({
"name": "browser_proxied",
"description": "Navigate to a URL in a full headless browser session routed through a proxy automatically selected from the pool. Acquires a proxy and browser session, navigates, captures HTML content, then releases both. Returns navigation metadata and page HTML.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "Target URL to visit" }
},
"required": ["url"]
}
}),
];
let all_tools: Vec<Value> = [graph_tools, browser_tools, proxy_tools, cross_tools]
.into_iter()
.flatten()
.collect();
ok_response(id, &json!({ "tools": all_tools }))
}
async fn handle_tools_call(&self, id: &Value, req: &Value) -> Value {
let Some(params) = req.get("params") else {
return error_response(id, -32602, "Missing tool 'name'");
};
let Some(name) = params.get("name").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing tool 'name'");
};
let empty = Value::Null;
let args = params.get("arguments").unwrap_or(&empty);
if let Some(short) = name.strip_prefix("graph_") {
let sub = json!({
"jsonrpc": "2.0",
"id": id,
"method": "tools/call",
"params": { "name": short, "arguments": args }
});
McpGraphServer::handle_request(&sub).await
} else if name.starts_with("browser_") {
let sub = json!({
"jsonrpc": "2.0",
"id": id,
"method": "tools/call",
"params": { "name": name, "arguments": args }
});
self.browser.dispatch(&sub).await
} else if name.starts_with("proxy_") {
let sub = json!({
"jsonrpc": "2.0",
"id": id,
"method": "tools/call",
"params": { "name": name, "arguments": args }
});
self.proxy.handle_request(&sub).await
} else if name == "scrape_proxied" {
self.tool_scrape_proxied(id, args).await
} else if name == "browser_proxied" {
self.tool_browser_proxied(id, args).await
} else {
error_response(id, -32602, &format!("Unknown tool: {name}"))
}
}
async fn handle_resources_list(&self, id: &Value) -> Value {
let list_req = json!({"jsonrpc":"2.0","id":0,"method":"resources/list","params":{}});
let browser_resp = self.browser.dispatch(&list_req).await;
let browser_resources: Vec<Value> = browser_resp
.get("result")
.and_then(|r| r.get("resources"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let proxy_resp = self.proxy.handle_request(&list_req).await;
let proxy_resources: Vec<Value> = proxy_resp
.get("result")
.and_then(|r| r.get("resources"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let all: Vec<Value> = [browser_resources, proxy_resources]
.into_iter()
.flatten()
.collect();
ok_response(id, &json!({ "resources": all }))
}
async fn handle_resources_read(&self, id: &Value, req: &Value) -> Value {
let uri = req
.get("params")
.and_then(|p| p.get("uri"))
.and_then(Value::as_str)
.unwrap_or("");
if uri.starts_with("browser://") {
self.browser.dispatch(req).await
} else if uri.starts_with("proxy://") {
self.proxy.handle_request(req).await
} else {
error_response(id, -32602, &format!("Unknown resource URI: {uri}"))
}
}
async fn tool_scrape_proxied(&self, id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str).map(str::to_owned) else {
return error_response(id, -32602, "Missing 'url'");
};
let timeout_secs = args
.get("timeout_secs")
.and_then(Value::as_u64)
.unwrap_or(30);
let (handle_token, proxy_url) = match self.acquire_proxy_and_token(id).await {
Ok(v) => v,
Err(e) => return e,
};
let scrape_resp = McpGraphServer::handle_request(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "scrape",
"arguments": { "url": url, "proxy_url": proxy_url, "timeout_secs": timeout_secs }
}
}))
.await;
let success = scrape_resp.get("error").is_none_or(Value::is_null);
release_proxy(&self.proxy, &handle_token, success).await;
if success {
let text = mcp_content_text_raw(&scrape_resp);
ok_response(id, &json!({ "content": [{"type": "text", "text": text}] }))
} else {
let code = scrape_resp
.get("error")
.and_then(|e| e.get("code"))
.and_then(Value::as_i64)
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = scrape_resp
.get("error")
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.unwrap_or("Graph scrape failed");
error_response(id, code, message)
}
}
async fn tool_browser_proxied(&self, id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str).map(str::to_owned) else {
return error_response(id, -32602, "Missing 'url'");
};
let (handle_token, proxy_url) = match self.acquire_proxy_and_token(id).await {
Ok(v) => v,
Err(e) => return e,
};
let acquire_browser_resp = self
.browser
.dispatch(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "browser_acquire", "arguments": { "proxy": proxy_url } }
}))
.await;
let acquire_is_error = acquire_browser_resp
.get("result")
.and_then(|r| r.get("isError"))
.and_then(Value::as_bool)
== Some(true);
let session_info = parse_content_text(&acquire_browser_resp);
let Some(session_id) = session_info
.get("session_id")
.and_then(Value::as_str)
.map(str::to_owned)
else {
release_proxy(&self.proxy, &handle_token, false).await;
let err_msg = if acquire_is_error {
mcp_error_message(&acquire_browser_resp)
.unwrap_or("Failed to acquire browser session")
} else {
"Failed to acquire browser session"
};
return error_response(id, -32603, err_msg);
};
let nav_resp = self
.browser
.dispatch(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "browser_navigate",
"arguments": { "session_id": session_id, "url": url }
}
}))
.await;
if is_mcp_error_or_tool_error(&nav_resp) {
self.browser_release(&session_id).await;
release_proxy(&self.proxy, &handle_token, false).await;
let nav_err = mcp_error_message(&nav_resp).unwrap_or("Browser navigation failed");
return error_response(id, -32603, nav_err);
}
let content_resp = self
.browser
.dispatch(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "browser_content", "arguments": { "session_id": session_id } }
}))
.await;
let content_ok = !is_mcp_error_or_tool_error(&content_resp);
self.browser_release(&session_id).await;
release_proxy(&self.proxy, &handle_token, content_ok).await;
if !content_ok {
let content_err =
mcp_error_message(&content_resp).unwrap_or("Browser content retrieval failed");
return error_response(id, -32603, content_err);
}
let nav_raw = mcp_content_text_raw(&nav_resp);
let html_raw = mcp_content_text_raw(&content_resp);
let nav_json = nav_raw
.as_str()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.unwrap_or_else(|| nav_raw.clone());
let html_json = html_raw
.as_str()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.unwrap_or_else(|| html_raw.clone());
ok_response(
id,
&json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"navigation": nav_json,
"html": html_json
}))
.unwrap_or_default()
}]
}),
)
}
async fn acquire_proxy_and_token(&self, id: &Value) -> Result<(String, String), Value> {
let acquire_resp = self
.proxy
.handle_request(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
}))
.await;
if let Some(err) = acquire_resp.get("error").filter(|e| !e.is_null()) {
let code = err
.get("code")
.and_then(Value::as_i64)
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = err
.get("message")
.and_then(Value::as_str)
.unwrap_or("No proxy available — add proxies via proxy_add first");
return Err(error_response(id, code, message));
}
let handle_info = parse_content_text(&acquire_resp);
let Some(handle_token) = handle_info
.get("handle_token")
.and_then(Value::as_str)
.map(str::to_owned)
else {
return Err(error_response(
id,
-32603,
"No proxy available — add proxies via proxy_add first",
));
};
let Some(proxy_url) = handle_info
.get("proxy_url")
.and_then(Value::as_str)
.map(str::to_owned)
else {
release_proxy(&self.proxy, &handle_token, false).await;
return Err(error_response(
id,
-32603,
"proxy_acquire returned no proxy_url",
));
};
Ok((handle_token, proxy_url))
}
async fn browser_release(&self, session_id: &str) {
let _ = self
.browser
.dispatch(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "browser_release",
"arguments": { "session_id": session_id }
}
}))
.await;
}
}
impl Drop for McpAggregator {
fn drop(&mut self) {
self.proxy_token.cancel();
if let Some(bg) = self.proxy_bg.take() {
bg.abort();
}
}
}
fn handle_initialize(req: &Value) -> Value {
let id = req.get("id").unwrap_or(&Value::Null);
let requested = req
.get("params")
.and_then(|p| p.get("protocolVersion"))
.and_then(Value::as_str);
let protocol_version = match requested {
Some(version) if SUPPORTED_PROTOCOL_VERSIONS.contains(&version) => version,
Some(version) => {
return error_response(
id,
-32602,
&format!(
"Unsupported protocolVersion: {version}. Supported: {}",
SUPPORTED_PROTOCOL_VERSIONS.join(", ")
),
);
}
None => SUPPORTED_PROTOCOL_VERSIONS
.first()
.copied()
.unwrap_or("2024-11-05"),
};
ok_response(
id,
&json!({
"protocolVersion": protocol_version,
"capabilities": {
"tools": { "listChanged": false },
"resources": { "listChanged": false, "subscribe": false }
},
"serverInfo": {
"name": "stygian-mcp",
"version": env!("CARGO_PKG_VERSION")
}
}),
)
}
fn ok_response(id: &Value, result: &Value) -> Value {
json!({ "jsonrpc": "2.0", "id": id, "result": result })
}
fn error_response(id: &Value, code: i32, message: &str) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"error": { "code": code, "message": message }
})
}
fn parse_content_text(resp: &Value) -> Value {
resp.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(Value::as_str)
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or(Value::Null)
}
fn mcp_content_text_raw(resp: &Value) -> Value {
resp.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.cloned()
.unwrap_or(Value::Null)
}
fn is_mcp_error_or_tool_error(resp: &Value) -> bool {
!resp.get("error").is_none_or(Value::is_null)
|| resp
.get("result")
.and_then(|r| r.get("isError"))
.and_then(Value::as_bool)
== Some(true)
}
fn mcp_error_message(resp: &Value) -> Option<&str> {
resp.get("error")
.filter(|e| !e.is_null())
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.or_else(|| {
resp.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(Value::as_str)
})
}
fn is_jsonrpc_notification(req: &Value) -> bool {
req.is_object()
&& req.get("jsonrpc").and_then(Value::as_str) == Some("2.0")
&& req.get("id").is_none()
&& req.get("method").and_then(Value::as_str).is_some()
}
async fn release_proxy(proxy: &Arc<McpProxyServer>, handle_token: &str, success: bool) {
let _ = proxy
.handle_request(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "proxy_release",
"arguments": { "handle_token": handle_token, "success": success }
}
}))
.await;
}
#[cfg(test)]
mod tests {
use super::*;
use stygian_graph::mcp::McpGraphServer;
use stygian_proxy::mcp::McpProxyServer;
#[tokio::test]
async fn test_routes_graph_tool() -> Result<(), Box<dyn std::error::Error>> {
let proxy = Arc::new(McpProxyServer::new()?);
let list_req = json!({
"jsonrpc": "2.0", "id": 1,
"method": "tools/list",
"params": {}
});
let resp = McpGraphServer::handle_request(&list_req).await;
let tools = resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(Value::as_array)
.ok_or("graph tools/list missing result.tools")?;
assert!(
tools
.iter()
.any(|t| t.get("name").and_then(Value::as_str) == Some("scrape"))
);
let proxy_resp = proxy.handle_request(&list_req).await;
let proxy_tools = proxy_resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(Value::as_array)
.ok_or("proxy tools/list missing result.tools")?;
assert!(
!proxy_tools
.iter()
.any(|t| t.get("name").and_then(Value::as_str) == Some("scrape"))
);
drop(proxy);
Ok(())
}
#[test]
fn test_error_response_structure() {
let resp = error_response(&json!(42), -32602, "bad param");
assert_eq!(
resp.pointer("/error/code").and_then(Value::as_i64),
Some(-32602)
);
assert_eq!(resp.get("id").and_then(Value::as_i64), Some(42));
}
#[test]
fn test_ok_response_structure() {
let resp = ok_response(&json!(1), &json!({"foo": "bar"}));
assert_eq!(
resp.pointer("/result/foo").and_then(Value::as_str),
Some("bar")
);
assert_eq!(resp.get("jsonrpc").and_then(Value::as_str), Some("2.0"));
}
#[test]
fn test_initialize_negotiates_supported_protocol() {
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": { "protocolVersion": "2024-11-05" }
});
let resp = handle_initialize(&req);
assert_eq!(
resp.pointer("/result/protocolVersion")
.and_then(Value::as_str),
Some("2024-11-05")
);
}
#[test]
fn test_initialize_rejects_unsupported_protocol() {
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": { "protocolVersion": "1999-01-01" }
});
let resp = handle_initialize(&req);
assert_eq!(
resp.pointer("/error/code").and_then(Value::as_i64),
Some(-32602)
);
}
#[tokio::test]
async fn test_scrape_proxied_no_proxy_returns_error() -> Result<(), Box<dyn std::error::Error>>
{
let proxy = Arc::new(McpProxyServer::new()?);
let id = json!(99);
let acquire_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
});
let acquire_resp = proxy.handle_request(&acquire_req).await;
let resp = if acquire_resp.get("error").is_none_or(Value::is_null) {
let handle_info = parse_content_text(&acquire_resp);
if handle_info
.get("handle_token")
.and_then(Value::as_str)
.is_none()
{
error_response(
&id,
-32603,
"No proxy available — add proxies via proxy_add first",
)
} else {
ok_response(&id, &json!({"unexpected": "success"}))
}
} else {
let code = acquire_resp
.get("error")
.and_then(|e| e.get("code"))
.and_then(Value::as_i64)
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = acquire_resp
.get("error")
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.unwrap_or("No proxy available");
error_response(&id, code, message)
};
assert!(
resp.get("error").is_some_and(|e| !e.is_null()),
"expected an error response"
);
assert_eq!(
resp.get("id").and_then(Value::as_i64),
Some(99),
"id must be the caller's id, not 0"
);
drop(proxy);
Ok(())
}
#[tokio::test]
async fn test_browser_proxied_acquire_failure_uses_caller_id()
-> Result<(), Box<dyn std::error::Error>> {
let proxy = Arc::new(McpProxyServer::new()?);
let id = json!(77);
let acquire_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
});
let acquire_resp = proxy.handle_request(&acquire_req).await;
let resp = if acquire_resp.get("error").is_none_or(Value::is_null) {
let handle_info = parse_content_text(&acquire_resp);
if handle_info
.get("handle_token")
.and_then(Value::as_str)
.is_none()
{
error_response(
&id,
-32603,
"No proxy available — add proxies via proxy_add first",
)
} else {
ok_response(&id, &json!({"unexpected": "success"}))
}
} else {
let code = acquire_resp
.get("error")
.and_then(|e| e.get("code"))
.and_then(Value::as_i64)
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = acquire_resp
.get("error")
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.unwrap_or("No proxy available");
error_response(&id, code, message)
};
assert!(
resp.get("error").is_some_and(|e| !e.is_null()),
"expected an error response"
);
assert_eq!(
resp.get("id").and_then(Value::as_i64),
Some(77),
"caller id must be preserved in error response"
);
Ok(())
}
#[test]
fn test_parse_content_text_on_error_response_returns_null() {
let error_resp = error_response(&json!(1), -32603, "internal error");
let parsed = parse_content_text(&error_resp);
assert_eq!(parsed, Value::Null);
}
#[test]
fn test_notification_detection_requires_valid_jsonrpc() {
let valid_notification = json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
});
assert!(is_jsonrpc_notification(&valid_notification));
let missing_jsonrpc = json!({
"method": "notifications/initialized",
"params": {}
});
assert!(!is_jsonrpc_notification(&missing_jsonrpc));
}
#[test]
fn test_invalid_request_missing_jsonrpc_returns_invalid_request_error() {
let req = json!({
"method": "tools/list"
});
let id = req.get("id").unwrap_or(&Value::Null);
let resp = if req.get("jsonrpc").and_then(Value::as_str) == Some("2.0") {
ok_response(id, &json!({}))
} else {
error_response(id, -32600, "Invalid request: expected jsonrpc='2.0'")
};
assert_eq!(
resp.pointer("/error/code").and_then(Value::as_i64),
Some(-32600)
);
assert!(resp.get("id").is_none_or(Value::is_null));
}
#[tokio::test]
async fn test_tools_list_prefixes_graph_tools() -> Result<(), Box<dyn std::error::Error>> {
let proxy = Arc::new(McpProxyServer::new()?);
let list_req = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} });
let graph_resp = McpGraphServer::handle_request(&list_req).await;
let graph_tools = graph_resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(Value::as_array)
.ok_or("graph tools/list missing result.tools")?;
assert!(
graph_tools
.iter()
.any(|t| t.get("name").and_then(Value::as_str) == Some("scrape")),
"graph server exposes un-prefixed 'scrape'"
);
let proxy_resp = proxy.handle_request(&list_req).await;
let proxy_tools = proxy_resp
.get("result")
.and_then(|r| r.get("tools"))
.and_then(Value::as_array)
.ok_or("proxy tools/list missing result.tools")?;
assert!(
proxy_tools
.iter()
.any(|t| t.get("name").and_then(Value::as_str) == Some("proxy_add")),
"proxy server exposes 'proxy_add'"
);
let prefixed: Vec<String> = graph_tools
.iter()
.filter_map(|t| t.get("name").and_then(Value::as_str))
.map(|n| format!("graph_{n}"))
.collect();
assert!(
prefixed.contains(&"graph_scrape".to_string()),
"graph_scrape must appear after prefixing"
);
assert!(
!prefixed.contains(&"scrape".to_string()),
"un-prefixed 'scrape' must not appear after prefixing"
);
Ok(())
}
#[tokio::test]
async fn test_tools_call_dispatch_by_prefix() -> Result<(), Box<dyn std::error::Error>> {
let proxy = Arc::new(McpProxyServer::new()?);
let graph_call = json!({
"jsonrpc": "2.0", "id": 2, "method": "tools/call",
"params": { "name": "pipeline_validate", "arguments": { "toml": "" } }
});
let graph_resp = McpGraphServer::handle_request(&graph_call).await;
assert!(
graph_resp.get("error").is_none_or(Value::is_null)
|| graph_resp
.pointer("/result/content/0/text")
.and_then(Value::as_str)
.is_some(),
"graph server must respond to pipeline_validate"
);
let proxy_call = json!({
"jsonrpc": "2.0", "id": 3, "method": "tools/call",
"params": { "name": "proxy_add", "arguments": {} }
});
let proxy_resp = proxy.handle_request(&proxy_call).await;
let err_msg = proxy_resp
.get("error")
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.or_else(|| {
proxy_resp
.pointer("/result/content/0/text")
.and_then(Value::as_str)
})
.unwrap_or("");
assert!(
err_msg.contains("url") || err_msg.contains("required") || !err_msg.is_empty(),
"proxy server must respond to proxy_add (got: {err_msg})"
);
Ok(())
}
#[tokio::test]
async fn test_resources_list_includes_proxy() -> Result<(), Box<dyn std::error::Error>> {
let proxy = Arc::new(McpProxyServer::new()?);
let list_req =
json!({ "jsonrpc": "2.0", "id": 4, "method": "resources/list", "params": {} });
let proxy_resp = proxy.handle_request(&list_req).await;
let resources = proxy_resp
.get("result")
.and_then(|r| r.get("resources"))
.and_then(Value::as_array);
assert!(
resources.is_some_and(|r| r
.iter()
.any(|res| res.get("uri").and_then(Value::as_str) == Some("proxy://pool/stats"))),
"proxy resources/list must contain proxy://pool/stats"
);
Ok(())
}
}