use std::sync::Arc;
use serde_json::{Value, json};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use stygian_browser::{BrowserPool, mcp::McpBrowserServer};
use stygian_graph::mcp::McpGraphServer;
use stygian_proxy::mcp::McpProxyServer;
pub struct McpAggregator {
graph: Arc<McpGraphServer>,
browser: Arc<McpBrowserServer>,
proxy: Arc<McpProxyServer>,
proxy_token: CancellationToken,
proxy_bg: Option<JoinHandle<()>>,
}
impl McpAggregator {
pub async fn try_new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let pool = BrowserPool::new(stygian_browser::BrowserConfig::default()).await?;
let graph = Arc::new(McpGraphServer::new());
let browser = Arc::new(McpBrowserServer::new(pool));
let proxy = Arc::new(McpProxyServer::new()?);
let (proxy_token, proxy_bg) = proxy.start_background();
Ok(Self {
graph,
browser,
proxy,
proxy_token,
proxy_bg: Some(proxy_bg),
})
}
pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("stygian-mcp aggregator starting (stdin/stdout mode)");
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin).lines();
let mut stdout = stdout;
while let Some(line) = reader.next_line().await? {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
debug!(?line, "MCP request");
let response = match serde_json::from_str::<Value>(&line) {
Ok(req) => self.handle(&req).await,
Err(e) => error_response(&Value::Null, -32700, &format!("Parse error: {e}")),
};
let mut out = serde_json::to_string(&response).unwrap_or_default();
out.push('\n');
stdout.write_all(out.as_bytes()).await?;
stdout.flush().await?;
}
info!("stygian-mcp aggregator stopping (stdin closed)");
self.proxy_token.cancel();
if let Some(bg) = self.proxy_bg.take()
&& let Err(e) = bg.await
{
warn!("proxy background task panicked during shutdown: {e:?}");
}
Ok(())
}
async fn handle(&self, req: &Value) -> Value {
let id = &req["id"];
let method = req["method"].as_str().unwrap_or("");
match method {
"initialize" => handle_initialize(id),
"initialized" | "ping" | "notifications/initialized" => ok_response(id, json!({})),
"tools/list" => self.handle_tools_list(id).await,
"tools/call" => self.handle_tools_call(id, req).await,
"resources/list" => self.handle_resources_list(id).await,
"resources/read" => self.handle_resources_read(id, req).await,
other => error_response(id, -32601, &format!("Method not found: {other}")),
}
}
async fn handle_tools_list(&self, id: &Value) -> Value {
let list_req = json!({"jsonrpc":"2.0","id":0,"method":"tools/list","params":{}});
let graph_resp = self.graph.handle_request(&list_req).await;
let graph_tools: Vec<Value> = graph_resp["result"]["tools"]
.as_array()
.cloned()
.unwrap_or_default()
.into_iter()
.map(|mut t| {
if let Some(name) = t["name"].as_str() {
let prefixed = format!("graph_{name}");
t["name"] = json!(prefixed);
let desc = t["description"].as_str().unwrap_or("").to_string();
t["description"] = json!(format!("[graph] {desc}"));
}
t
})
.collect();
let browser_resp = self.browser.dispatch(&list_req).await;
let browser_tools: Vec<Value> = browser_resp["result"]["tools"]
.as_array()
.cloned()
.unwrap_or_default();
let proxy_resp = self.proxy.handle_request(&list_req).await;
let proxy_tools: Vec<Value> = proxy_resp["result"]["tools"]
.as_array()
.cloned()
.unwrap_or_default();
let cross_tools = vec![
json!({
"name": "scrape_proxied",
"description": "Fetch a URL through a proxy automatically selected from the pool. Acquires a proxy, performs an HTTP scrape, then releases the proxy. Returns the scraped content.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "Target URL to scrape" },
"timeout_secs": { "type": "integer", "description": "Request timeout in seconds (default: 30)" }
},
"required": ["url"]
}
}),
json!({
"name": "browser_proxied",
"description": "Navigate to a URL in a full headless browser session routed through a proxy automatically selected from the pool. Acquires a proxy and browser session, navigates, captures HTML content, then releases both. Returns navigation metadata and page HTML.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "Target URL to visit" }
},
"required": ["url"]
}
}),
];
let all_tools: Vec<Value> = [graph_tools, browser_tools, proxy_tools, cross_tools]
.into_iter()
.flatten()
.collect();
ok_response(id, json!({ "tools": all_tools }))
}
async fn handle_tools_call(&self, id: &Value, req: &Value) -> Value {
let params = &req["params"];
let name = match params["name"].as_str() {
Some(n) => n,
None => return error_response(id, -32602, "Missing tool 'name'"),
};
let args = ¶ms["arguments"];
if let Some(short) = name.strip_prefix("graph_") {
let sub = json!({
"jsonrpc": "2.0",
"id": id,
"method": "tools/call",
"params": { "name": short, "arguments": args }
});
self.graph.handle_request(&sub).await
} else if name.starts_with("browser_") {
let sub = json!({
"jsonrpc": "2.0",
"id": id,
"method": "tools/call",
"params": { "name": name, "arguments": args }
});
self.browser.dispatch(&sub).await
} else if name.starts_with("proxy_") {
let sub = json!({
"jsonrpc": "2.0",
"id": id,
"method": "tools/call",
"params": { "name": name, "arguments": args }
});
self.proxy.handle_request(&sub).await
} else if name == "scrape_proxied" {
self.tool_scrape_proxied(id, args).await
} else if name == "browser_proxied" {
self.tool_browser_proxied(id, args).await
} else {
error_response(id, -32602, &format!("Unknown tool: {name}"))
}
}
async fn handle_resources_list(&self, id: &Value) -> Value {
let list_req = json!({"jsonrpc":"2.0","id":0,"method":"resources/list","params":{}});
let browser_resp = self.browser.dispatch(&list_req).await;
let browser_resources: Vec<Value> = browser_resp["result"]["resources"]
.as_array()
.cloned()
.unwrap_or_default();
let proxy_resp = self.proxy.handle_request(&list_req).await;
let proxy_resources: Vec<Value> = proxy_resp["result"]["resources"]
.as_array()
.cloned()
.unwrap_or_default();
let all: Vec<Value> = [browser_resources, proxy_resources]
.into_iter()
.flatten()
.collect();
ok_response(id, json!({ "resources": all }))
}
async fn handle_resources_read(&self, id: &Value, req: &Value) -> Value {
let uri = req["params"]["uri"].as_str().unwrap_or("");
if uri.starts_with("browser://") {
self.browser.dispatch(req).await
} else if uri.starts_with("proxy://") {
self.proxy.handle_request(req).await
} else {
error_response(id, -32602, &format!("Unknown resource URI: {uri}"))
}
}
async fn tool_scrape_proxied(&self, id: &Value, args: &Value) -> Value {
let url = match args["url"].as_str() {
Some(u) => u.to_string(),
None => return error_response(id, -32602, "Missing 'url'"),
};
let timeout_secs = args["timeout_secs"].as_u64().unwrap_or(30);
let acquire_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
});
let acquire_resp = self.proxy.handle_request(&acquire_req).await;
if !acquire_resp["error"].is_null() {
let code = acquire_resp["error"]["code"]
.as_i64()
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
return error_response(
id,
code,
acquire_resp["error"]["message"]
.as_str()
.unwrap_or("No proxy available — add proxies via proxy_add first"),
);
}
let handle_info = parse_content_text(&acquire_resp);
let handle_token = match handle_info["handle_token"].as_str() {
Some(t) => t.to_string(),
None => {
return error_response(
id,
-32603,
"No proxy available — add proxies via proxy_add first",
);
}
};
let proxy_url = match handle_info["proxy_url"].as_str() {
Some(u) => u.to_string(),
None => {
release_proxy(&self.proxy, &handle_token, false).await;
return error_response(id, -32603, "proxy_acquire returned no proxy_url");
}
};
let scrape_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "scrape",
"arguments": { "url": url, "proxy_url": proxy_url, "timeout_secs": timeout_secs }
}
});
let scrape_resp = self.graph.handle_request(&scrape_req).await;
let success = scrape_resp["error"].is_null();
release_proxy(&self.proxy, &handle_token, success).await;
if success {
let text = scrape_resp["result"]["content"][0]["text"].clone();
ok_response(
id,
json!({
"content": [{"type": "text", "text": text}]
}),
)
} else {
let code = scrape_resp["error"]["code"]
.as_i64()
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = scrape_resp["error"]["message"]
.as_str()
.unwrap_or("Graph scrape failed");
error_response(id, code, message)
}
}
async fn tool_browser_proxied(&self, id: &Value, args: &Value) -> Value {
let url = match args["url"].as_str() {
Some(u) => u.to_string(),
None => return error_response(id, -32602, "Missing 'url'"),
};
let acquire_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
});
let acquire_resp = self.proxy.handle_request(&acquire_req).await;
if !acquire_resp["error"].is_null() {
let code = acquire_resp["error"]["code"]
.as_i64()
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
return error_response(
id,
code,
acquire_resp["error"]["message"]
.as_str()
.unwrap_or("No proxy available — add proxies via proxy_add first"),
);
}
let handle_info = parse_content_text(&acquire_resp);
let handle_token = match handle_info["handle_token"].as_str() {
Some(t) => t.to_string(),
None => {
return error_response(
id,
-32603,
"No proxy available — add proxies via proxy_add first",
);
}
};
let proxy_url = match handle_info["proxy_url"].as_str() {
Some(u) => u.to_string(),
None => {
release_proxy(&self.proxy, &handle_token, false).await;
return error_response(id, -32603, "proxy_acquire returned no proxy_url");
}
};
let acquire_browser_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "browser_acquire", "arguments": { "proxy": proxy_url } }
});
let acquire_browser_resp = self.browser.dispatch(&acquire_browser_req).await;
let acquire_is_error = acquire_browser_resp["result"]["isError"].as_bool() == Some(true);
let session_info = parse_content_text(&acquire_browser_resp);
let session_id = match session_info["session_id"].as_str() {
Some(s) => s.to_string(),
None => {
release_proxy(&self.proxy, &handle_token, false).await;
let err_msg = if acquire_is_error {
acquire_browser_resp["result"]["content"]
.get(0)
.and_then(|c| c["text"].as_str())
.unwrap_or("Failed to acquire browser session")
} else {
"Failed to acquire browser session"
};
return error_response(id, -32603, err_msg);
}
};
let nav_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "browser_navigate",
"arguments": { "session_id": session_id, "url": url }
}
});
let nav_resp = self.browser.dispatch(&nav_req).await;
let nav_ok =
nav_resp["error"].is_null() && nav_resp["result"]["isError"].as_bool() != Some(true);
if !nav_ok {
let _ = self
.browser
.dispatch(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "browser_release",
"arguments": { "session_id": session_id }
}
}))
.await;
release_proxy(&self.proxy, &handle_token, false).await;
let nav_err = nav_resp["error"]["message"]
.as_str()
.or_else(|| {
nav_resp["result"]["content"]
.get(0)
.and_then(|c| c["text"].as_str())
})
.unwrap_or("Browser navigation failed");
return error_response(id, -32603, nav_err);
}
let content_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "browser_content",
"arguments": { "session_id": session_id }
}
});
let content_resp = self.browser.dispatch(&content_req).await;
let content_ok = content_resp["error"].is_null()
&& content_resp["result"]["isError"].as_bool() != Some(true);
let _ = self
.browser
.dispatch(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "browser_release",
"arguments": { "session_id": session_id }
}
}))
.await;
release_proxy(&self.proxy, &handle_token, content_ok).await;
if !content_ok {
let content_err = content_resp["error"]["message"]
.as_str()
.or_else(|| {
content_resp["result"]["content"]
.get(0)
.and_then(|c| c["text"].as_str())
})
.unwrap_or("Browser content retrieval failed");
return error_response(id, -32603, content_err);
}
let nav_text_raw = &nav_resp["result"]["content"][0]["text"];
let html_text_raw = &content_resp["result"]["content"][0]["text"];
let nav_json = nav_text_raw
.as_str()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.unwrap_or_else(|| nav_text_raw.clone());
let html_json = html_text_raw
.as_str()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.unwrap_or_else(|| html_text_raw.clone());
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"navigation": nav_json,
"html": html_json
}))
.unwrap_or_default()
}]
}),
)
}
}
impl Drop for McpAggregator {
fn drop(&mut self) {
self.proxy_token.cancel();
if let Some(bg) = self.proxy_bg.take() {
bg.abort();
}
}
}
fn handle_initialize(id: &Value) -> Value {
ok_response(
id,
json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": { "listChanged": false },
"resources": { "listChanged": false, "subscribe": false }
},
"serverInfo": {
"name": "stygian-mcp",
"version": env!("CARGO_PKG_VERSION")
}
}),
)
}
fn ok_response(id: &Value, result: Value) -> Value {
json!({ "jsonrpc": "2.0", "id": id, "result": result })
}
fn error_response(id: &Value, code: i32, message: &str) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"error": { "code": code, "message": message }
})
}
fn parse_content_text(resp: &Value) -> Value {
resp["result"]["content"][0]["text"]
.as_str()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or(Value::Null)
}
async fn release_proxy(proxy: &Arc<McpProxyServer>, handle_token: &str, success: bool) {
let _ = proxy
.handle_request(&json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": {
"name": "proxy_release",
"arguments": { "handle_token": handle_token, "success": success }
}
}))
.await;
}
#[cfg(test)]
mod tests {
use super::*;
use stygian_graph::mcp::McpGraphServer;
use stygian_proxy::mcp::McpProxyServer;
#[tokio::test]
async fn test_routes_graph_tool() {
let graph = Arc::new(McpGraphServer::new());
let proxy = Arc::new(McpProxyServer::new().expect("proxy server init"));
let list_req = json!({
"jsonrpc": "2.0", "id": 1,
"method": "tools/list",
"params": {}
});
let resp = graph.handle_request(&list_req).await;
let tools = resp["result"]["tools"].as_array().expect("tools array");
assert!(tools.iter().any(|t| t["name"] == "scrape"));
let proxy_resp = proxy.handle_request(&list_req).await;
let proxy_tools = proxy_resp["result"]["tools"]
.as_array()
.expect("tools array");
assert!(!proxy_tools.iter().any(|t| t["name"] == "scrape"));
drop(proxy);
}
#[test]
fn test_error_response_structure() {
let resp = error_response(&json!(42), -32602, "bad param");
assert_eq!(resp["error"]["code"], -32602);
assert_eq!(resp["id"], 42);
}
#[test]
fn test_ok_response_structure() {
let resp = ok_response(&json!(1), json!({"foo": "bar"}));
assert_eq!(resp["result"]["foo"], "bar");
assert_eq!(resp["jsonrpc"], "2.0");
}
#[tokio::test]
async fn test_scrape_proxied_no_proxy_returns_error() {
let graph = Arc::new(McpGraphServer::new());
let proxy = Arc::new(McpProxyServer::new().expect("proxy server init"));
let id = json!(99);
let acquire_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
});
let acquire_resp = proxy.handle_request(&acquire_req).await;
let resp = if !acquire_resp["error"].is_null() {
let code = acquire_resp["error"]["code"]
.as_i64()
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = acquire_resp["error"]["message"]
.as_str()
.unwrap_or("No proxy available");
error_response(&id, code, message)
} else {
let handle_info = parse_content_text(&acquire_resp);
if handle_info["handle_token"].as_str().is_none() {
error_response(
&id,
-32603,
"No proxy available — add proxies via proxy_add first",
)
} else {
ok_response(&id, json!({"unexpected": "success"}))
}
};
assert!(!resp["error"].is_null(), "expected an error response");
assert_eq!(resp["id"], 99, "id must be the caller's id, not 0");
drop(graph);
drop(proxy);
}
#[tokio::test]
async fn test_browser_proxied_acquire_failure_uses_caller_id() {
let proxy = Arc::new(McpProxyServer::new().expect("proxy server init"));
let id = json!(77);
let acquire_req = json!({
"jsonrpc": "2.0", "id": 0, "method": "tools/call",
"params": { "name": "proxy_acquire", "arguments": {} }
});
let acquire_resp = proxy.handle_request(&acquire_req).await;
let resp = if !acquire_resp["error"].is_null() {
let code = acquire_resp["error"]["code"]
.as_i64()
.and_then(|c| i32::try_from(c).ok())
.unwrap_or(-32603);
let message = acquire_resp["error"]["message"]
.as_str()
.unwrap_or("No proxy available");
error_response(&id, code, message)
} else {
let handle_info = parse_content_text(&acquire_resp);
if handle_info["handle_token"].as_str().is_none() {
error_response(
&id,
-32603,
"No proxy available — add proxies via proxy_add first",
)
} else {
ok_response(&id, json!({"unexpected": "success"}))
}
};
assert!(!resp["error"].is_null(), "expected an error response");
assert_eq!(
resp["id"], 77,
"caller id must be preserved in error response"
);
}
#[test]
fn test_parse_content_text_on_error_response_returns_null() {
let error_resp = error_response(&json!(1), -32603, "internal error");
let parsed = parse_content_text(&error_resp);
assert_eq!(parsed, Value::Null);
}
#[tokio::test]
async fn test_tools_list_prefixes_graph_tools() {
let graph = Arc::new(McpGraphServer::new());
let proxy = Arc::new(McpProxyServer::new().expect("proxy server init"));
let list_req = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} });
let graph_resp = graph.handle_request(&list_req).await;
let graph_tools = graph_resp["result"]["tools"].as_array().expect("tools");
assert!(
graph_tools.iter().any(|t| t["name"] == "scrape"),
"graph server exposes un-prefixed 'scrape'"
);
let proxy_resp = proxy.handle_request(&list_req).await;
let proxy_tools = proxy_resp["result"]["tools"].as_array().expect("tools");
assert!(
proxy_tools.iter().any(|t| t["name"] == "proxy_add"),
"proxy server exposes 'proxy_add'"
);
let prefixed: Vec<String> = graph_tools
.iter()
.filter_map(|t| t["name"].as_str())
.map(|n| format!("graph_{n}"))
.collect();
assert!(
prefixed.contains(&"graph_scrape".to_string()),
"graph_scrape must appear after prefixing"
);
assert!(
!prefixed.contains(&"scrape".to_string()),
"un-prefixed 'scrape' must not appear after prefixing"
);
}
#[tokio::test]
async fn test_tools_call_dispatch_by_prefix() {
let graph = Arc::new(McpGraphServer::new());
let proxy = Arc::new(McpProxyServer::new().expect("proxy server init"));
let graph_call = json!({
"jsonrpc": "2.0", "id": 2, "method": "tools/call",
"params": { "name": "pipeline_validate", "arguments": { "toml": "" } }
});
let graph_resp = graph.handle_request(&graph_call).await;
assert!(
graph_resp["error"].is_null()
|| graph_resp["result"]["content"][0]["text"]
.as_str()
.is_some(),
"graph server must respond to pipeline_validate"
);
let proxy_call = json!({
"jsonrpc": "2.0", "id": 3, "method": "tools/call",
"params": { "name": "proxy_add", "arguments": {} }
});
let proxy_resp = proxy.handle_request(&proxy_call).await;
let err_msg = proxy_resp["error"]["message"]
.as_str()
.or_else(|| proxy_resp["result"]["content"][0]["text"].as_str())
.unwrap_or("");
assert!(
err_msg.contains("url") || err_msg.contains("required") || !err_msg.is_empty(),
"proxy server must respond to proxy_add (got: {err_msg})"
);
}
#[tokio::test]
async fn test_resources_list_includes_proxy() {
let proxy = Arc::new(McpProxyServer::new().expect("proxy server init"));
let list_req =
json!({ "jsonrpc": "2.0", "id": 4, "method": "resources/list", "params": {} });
let proxy_resp = proxy.handle_request(&list_req).await;
let resources = proxy_resp["result"]["resources"].as_array();
assert!(
resources.map_or(false, |r| r
.iter()
.any(|res| { res["uri"].as_str() == Some("proxy://pool/stats") })),
"proxy resources/list must contain proxy://pool/stats"
);
}
}