use std::collections::HashMap;
use serde_json::{Value, json};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tracing::{debug, info, warn};
use crate::{
adapters::{
graphql::{GraphQlConfig, GraphQlService},
http::{HttpAdapter, HttpConfig},
rest_api::RestApiAdapter,
rss_feed::RssFeedAdapter,
sitemap::SitemapAdapter,
},
application::pipeline_parser::{NodeDecl, PipelineParser, ServiceDecl},
ports::{ScrapingService, ServiceInput},
};
fn error_response(id: &Value, code: i64, message: &str) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"error": { "code": code, "message": message }
})
}
fn ok_response(id: &Value, result: Value) -> Value {
let mut map = serde_json::Map::new();
map.insert("jsonrpc".to_owned(), json!("2.0"));
map.insert("id".to_owned(), id.clone());
map.insert("result".to_owned(), result);
Value::Object(map)
}
pub struct McpGraphServer;
impl McpGraphServer {
#[must_use]
pub const fn new() -> Self {
Self
}
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
info!("stygian-graph MCP server starting");
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);
let mut stdout = tokio::io::stdout();
let mut line = String::new();
loop {
line.clear();
let bytes = reader.read_line(&mut line).await?;
if bytes == 0 {
break; }
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
debug!(request = trimmed, "received");
let response = match serde_json::from_str::<Value>(trimmed) {
Ok(req) => {
let is_well_formed_notification = req.is_object()
&& req.get("jsonrpc").and_then(Value::as_str) == Some("2.0")
&& req.get("id").is_none()
&& req.get("method").and_then(Value::as_str).is_some();
let response = Self::handle(&req).await;
if is_well_formed_notification {
continue;
}
response
}
Err(e) => json!({
"jsonrpc": "2.0",
"id": null,
"error": { "code": -32700, "message": format!("Parse error: {e}") }
}),
};
let mut out = serde_json::to_string(&response)?;
out.push('\n');
stdout.write_all(out.as_bytes()).await?;
stdout.flush().await?;
}
info!("stygian-graph MCP server stopped");
Ok(())
}
pub async fn handle_request(req: &Value) -> Value {
Self::handle(req).await
}
async fn handle(req: &Value) -> Value {
let null = Value::Null;
let id = req.get("id").unwrap_or(&null);
let method = req.get("method").and_then(Value::as_str).unwrap_or("");
match method {
"initialize" => Self::handle_initialize(id),
"initialized" | "notifications/initialized" | "ping" => {
json!({"jsonrpc":"2.0","id":id,"result":{}})
}
"tools/list" => Self::handle_tools_list(id),
"tools/call" => Self::handle_tools_call(id, req).await,
_ => error_response(id, -32601, &format!("Method not found: {method}")),
}
}
fn handle_initialize(id: &Value) -> Value {
ok_response(
id,
json!({
"protocolVersion": "2025-11-25",
"capabilities": {
"tools": { "listChanged": false },
"resources": { "listChanged": false }
},
"serverInfo": {
"name": "stygian-graph",
"version": env!("CARGO_PKG_VERSION")
}
}),
)
}
fn scraping_tool_defs() -> Vec<Value> {
vec![
json!({
"name": "scrape",
"description": "Fetch a URL with anti-bot UA rotation and retry logic. Returns raw HTML/JSON content and response metadata.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "Target URL" },
"timeout_secs": { "type": "integer", "description": "Request timeout in seconds (default: 30)" },
"proxy_url": { "type": "string", "description": "HTTP/SOCKS5 proxy URL (e.g. socks5://user:pass@host:1080)" },
"rotate_ua": { "type": "boolean", "description": "Rotate User-Agent on each request (default: true)" }
},
"required": ["url"]
}
}),
json!({
"name": "scrape_rest",
"description": "Call a REST/JSON API. Supports bearer/API-key auth, arbitrary HTTP methods, query parameters, request bodies, pagination, and response path extraction.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "API endpoint URL" },
"method": { "type": "string", "description": "HTTP method (GET, POST, PUT, PATCH, DELETE — default: GET)" },
"auth": {
"type": "object",
"description": "Authentication config",
"properties": {
"type": { "type": "string", "description": "bearer | api_key | basic | header" },
"token": { "type": "string", "description": "Token or credential value" },
"header":{ "type": "string", "description": "Custom header name (for type=header)" }
}
},
"query": { "type": "object", "description": "URL query parameters as key-value pairs" },
"body": { "type": "object", "description": "Request body (JSON)" },
"headers": { "type": "object", "description": "Custom request headers" },
"pagination": {
"type": "object",
"description": "Pagination config",
"properties": {
"strategy": { "type": "string", "description": "link_header | offset | cursor" },
"max_pages": { "type": "integer", "description": "Maximum pages to fetch (default: 1)" }
}
},
"data_path": { "type": "string", "description": "Dot-separated JSON path to extract (e.g. data.items)" }
},
"required": ["url"]
}
}),
json!({
"name": "scrape_graphql",
"description": "Execute a GraphQL query against any spec-compliant endpoint. Supports bearer/API-key auth, variables, and dot-path data extraction.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "GraphQL endpoint URL" },
"query": { "type": "string", "description": "GraphQL query or mutation string" },
"variables": { "type": "object", "description": "Query variables (JSON object)" },
"auth": {
"type": "object",
"description": "Auth config",
"properties": {
"kind": { "type": "string", "description": "bearer | api_key | header | none" },
"token": { "type": "string", "description": "Auth token or key" },
"header_name": { "type": "string", "description": "Custom header name (default: X-Api-Key)" }
}
},
"data_path": { "type": "string", "description": "Dot-separated path to extract from response (e.g. data.countries)" },
"timeout_secs": { "type": "integer", "description": "Request timeout in seconds (default: 30)" }
},
"required": ["url", "query"]
}
}),
json!({
"name": "scrape_sitemap",
"description": "Parse a sitemap.xml or sitemap index and return all discovered URLs with their priorities and change frequencies.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "Sitemap URL (sitemap.xml or sitemap index)" },
"max_depth": { "type": "integer", "description": "Maximum sitemap index recursion depth (default: 5)" }
},
"required": ["url"]
}
}),
json!({
"name": "scrape_rss",
"description": "Parse an RSS or Atom feed and return all entries as structured JSON.",
"inputSchema": {
"type": "object",
"properties": {
"url": { "type": "string", "description": "RSS/Atom feed URL" }
},
"required": ["url"]
}
}),
]
}
fn graph_tool_defs() -> Vec<Value> {
vec![
json!({
"name": "pipeline_validate",
"description": "Parse and validate a TOML pipeline definition without executing it. Returns the node list, service declarations, and computed execution order.",
"inputSchema": {
"type": "object",
"properties": {
"toml": { "type": "string", "description": "TOML pipeline definition string" }
},
"required": ["toml"]
}
}),
json!({
"name": "pipeline_run",
"description": "Parse, validate, and execute a TOML pipeline DAG. HTTP, REST, GraphQL, sitemap, and RSS nodes are executed. AI/browser nodes are recorded in the skipped list.",
"inputSchema": {
"type": "object",
"properties": {
"toml": { "type": "string", "description": "TOML pipeline definition string" },
"timeout_secs": { "type": "integer", "description": "Per-node timeout in seconds (default: 30)" }
},
"required": ["toml"]
}
}),
json!({
"name": "inspect",
"description": "Get a complete snapshot of a pipeline's graph structure including nodes, edges, execution waves, critical path, and connectivity metrics.",
"inputSchema": {
"type": "object",
"properties": {
"toml": { "type": "string", "description": "TOML pipeline definition string" }
},
"required": ["toml"]
}
}),
json!({
"name": "node_info",
"description": "Get detailed information about a specific node in the pipeline graph, including its service type, depth, predecessors, and successors.",
"inputSchema": {
"type": "object",
"properties": {
"toml": { "type": "string", "description": "TOML pipeline definition string" },
"node_id": { "type": "string", "description": "Node ID to inspect" }
},
"required": ["toml", "node_id"]
}
}),
json!({
"name": "impact",
"description": "Analyze what would be affected by changing a node. Returns all upstream dependencies and downstream dependents.",
"inputSchema": {
"type": "object",
"properties": {
"toml": { "type": "string", "description": "TOML pipeline definition string" },
"node_id": { "type": "string", "description": "Node ID to analyze impact for" }
},
"required": ["toml", "node_id"]
}
}),
json!({
"name": "query_nodes",
"description": "Query nodes in the pipeline graph by various criteria: service type, root/leaf status, depth range, or ID pattern.",
"inputSchema": {
"type": "object",
"properties": {
"toml": { "type": "string", "description": "TOML pipeline definition string" },
"service": { "type": "string", "description": "Filter by service type (http, ai, browser, etc.)" },
"id_pattern": { "type": "string", "description": "Filter by node ID substring match" },
"is_root": { "type": "boolean", "description": "Only return root nodes (no predecessors)" },
"is_leaf": { "type": "boolean", "description": "Only return leaf nodes (no successors)" },
"min_depth": { "type": "integer", "description": "Minimum depth from root nodes" },
"max_depth": { "type": "integer", "description": "Maximum depth from root nodes" }
},
"required": ["toml"]
}
}),
]
}
fn handle_tools_list(id: &Value) -> Value {
let mut tools = Self::scraping_tool_defs();
tools.extend(Self::graph_tool_defs());
ok_response(id, json!({ "tools": tools }))
}
async fn handle_tools_call(id: &Value, req: &Value) -> Value {
let null = Value::Null;
let params = req.get("params").unwrap_or(&null);
let name = params.get("name").and_then(Value::as_str).unwrap_or("");
let args = params.get("arguments").cloned().unwrap_or(Value::Null);
match name {
"scrape" => Self::tool_scrape(id, &args).await,
"scrape_rest" => Self::tool_scrape_rest(id, &args).await,
"scrape_graphql" => Self::tool_scrape_graphql(id, &args).await,
"scrape_sitemap" => Self::tool_scrape_sitemap(id, &args).await,
"scrape_rss" => Self::tool_scrape_rss(id, &args).await,
"pipeline_validate" => Self::tool_pipeline_validate(id, &args),
"pipeline_run" => Self::tool_pipeline_run(id, &args).await,
"inspect" => Self::tool_graph_inspect(id, &args),
"node_info" => Self::tool_graph_node_info(id, &args),
"impact" => Self::tool_graph_impact(id, &args),
"query_nodes" => Self::tool_graph_query(id, &args),
_ => error_response(id, -32602, &format!("Unknown tool: {name}")),
}
}
async fn tool_scrape(id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: url");
};
let timeout_secs = args
.get("timeout_secs")
.and_then(Value::as_u64)
.unwrap_or(30);
let proxy_url = args
.get("proxy_url")
.and_then(Value::as_str)
.map(str::to_string);
let rotate_ua = args
.get("rotate_ua")
.and_then(Value::as_bool)
.unwrap_or(true);
let config = HttpConfig {
timeout: std::time::Duration::from_secs(timeout_secs),
proxy_url,
rotate_user_agent: rotate_ua,
..HttpConfig::default()
};
let adapter = HttpAdapter::with_config(config);
let input = ServiceInput {
url: url.to_string(),
params: json!({}),
};
match adapter.execute(input).await {
Ok(output) => ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"data": output.data,
"metadata": output.metadata
})).unwrap_or_default()
}]
}),
),
Err(e) => error_response(id, -32603, &format!("Scrape failed: {e}")),
}
}
async fn tool_scrape_rest(id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: url");
};
let mut map = serde_json::Map::new();
if let Some(method) = args.get("method").and_then(Value::as_str) {
map.insert("method".to_owned(), json!(method));
}
if let Some(auth) = args.get("auth").filter(|v| !v.is_null()) {
map.insert("auth".to_owned(), auth.clone());
}
if let Some(query) = args.get("query").filter(|v| !v.is_null()) {
map.insert("query".to_owned(), query.clone());
}
if let Some(body) = args.get("body").filter(|v| !v.is_null()) {
map.insert("body".to_owned(), body.clone());
}
if let Some(headers) = args.get("headers").filter(|v| !v.is_null()) {
map.insert("headers".to_owned(), headers.clone());
}
if let Some(pagination) = args.get("pagination").filter(|v| !v.is_null()) {
map.insert("pagination".to_owned(), pagination.clone());
}
if let Some(dp) = args.get("data_path").and_then(Value::as_str) {
map.insert("response".to_owned(), json!({ "data_path": dp }));
}
let params = Value::Object(map);
let adapter = RestApiAdapter::new();
let input = ServiceInput {
url: url.to_string(),
params,
};
match adapter.execute(input).await {
Ok(output) => ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"data": output.data,
"metadata": output.metadata
})).unwrap_or_default()
}]
}),
),
Err(e) => error_response(id, -32603, &format!("REST scrape failed: {e}")),
}
}
async fn tool_scrape_graphql(id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: url");
};
let Some(query) = args.get("query").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: query");
};
let timeout_secs = args
.get("timeout_secs")
.and_then(Value::as_u64)
.unwrap_or(30);
let config = GraphQlConfig {
timeout_secs,
..GraphQlConfig::default()
};
let service = GraphQlService::new(config, None);
let mut gql_map = serde_json::Map::new();
gql_map.insert("query".to_owned(), json!(query));
if let Some(variables) = args.get("variables").filter(|v| !v.is_null()) {
gql_map.insert("variables".to_owned(), variables.clone());
}
if let Some(auth) = args.get("auth").filter(|v| !v.is_null()) {
gql_map.insert("auth".to_owned(), auth.clone());
}
if let Some(dp) = args.get("data_path").and_then(Value::as_str) {
gql_map.insert("data_path".to_owned(), json!(dp));
}
let params = Value::Object(gql_map);
let input = ServiceInput {
url: url.to_string(),
params,
};
match service.execute(input).await {
Ok(output) => ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"data": output.data,
"metadata": output.metadata
})).unwrap_or_default()
}]
}),
),
Err(e) => error_response(id, -32603, &format!("GraphQL scrape failed: {e}")),
}
}
async fn tool_scrape_sitemap(id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: url");
};
let max_depth = args
.get("max_depth")
.and_then(Value::as_u64)
.map_or(5, |v| usize::try_from(v).unwrap_or(5));
let client = reqwest::Client::new();
let adapter = SitemapAdapter::new(client, max_depth);
let input = ServiceInput {
url: url.to_string(),
params: json!({}),
};
match adapter.execute(input).await {
Ok(output) => ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"data": output.data,
"metadata": output.metadata
})).unwrap_or_default()
}]
}),
),
Err(e) => error_response(id, -32603, &format!("Sitemap scrape failed: {e}")),
}
}
async fn tool_scrape_rss(id: &Value, args: &Value) -> Value {
let Some(url) = args.get("url").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: url");
};
let client = reqwest::Client::new();
let adapter = RssFeedAdapter::new(client);
let input = ServiceInput {
url: url.to_string(),
params: json!({}),
};
match adapter.execute(input).await {
Ok(output) => ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"data": output.data,
"metadata": output.metadata
})).unwrap_or_default()
}]
}),
),
Err(e) => error_response(id, -32603, &format!("RSS scrape failed: {e}")),
}
}
fn tool_pipeline_validate(id: &Value, args: &Value) -> Value {
let Some(toml) = args.get("toml").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: toml");
};
let def = match PipelineParser::from_str(toml) {
Ok(d) => d,
Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
};
if let Err(e) = def.validate() {
return ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"valid": false,
"error": e.to_string(),
"nodes": def.nodes.len(),
"services": def.services.len()
})).unwrap_or_default()
}]
}),
);
}
let order = match def.topological_order() {
Ok(o) => o,
Err(e) => return error_response(id, -32603, &format!("Topology error: {e}")),
};
let node_info: Vec<Value> = def
.nodes
.iter()
.map(|n| {
json!({
"name": n.name,
"service": n.service,
"url": n.url,
"depends_on": n.depends_on
})
})
.collect();
let svc_info: Vec<Value> = def
.services
.iter()
.map(|s| {
json!({
"name": s.name,
"kind": s.kind,
"model": s.model
})
})
.collect();
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"valid": true,
"node_count": def.nodes.len(),
"service_count": def.services.len(),
"execution_order": order,
"nodes": node_info,
"services": svc_info
})).unwrap_or_default()
}]
}),
)
}
async fn tool_pipeline_run(id: &Value, args: &Value) -> Value {
let Some(toml) = args.get("toml").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: toml");
};
let timeout_secs = args
.get("timeout_secs")
.and_then(Value::as_u64)
.unwrap_or(30);
let def = match PipelineParser::from_str(toml) {
Ok(d) => d,
Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
};
if let Err(e) = def.validate() {
return error_response(id, -32603, &format!("Validation error: {e}"));
}
let order = match def.topological_order() {
Ok(o) => o,
Err(e) => return error_response(id, -32603, &format!("Topology error: {e}")),
};
let svc_kinds: HashMap<String, ServiceDecl> = def
.services
.iter()
.map(|s| (s.name.clone(), s.clone()))
.collect();
let mut outputs: HashMap<String, Value> = HashMap::new();
let mut skipped: Vec<String> = Vec::new();
let mut errors: HashMap<String, String> = HashMap::new();
for node_name in &order {
let Some(node) = def.nodes.iter().find(|n| n.name == *node_name) else {
continue;
};
let kind = svc_kinds
.get(&node.service)
.map_or(node.service.as_str(), |s| s.kind.as_str());
let Some(url) = node.url.as_deref() else {
skipped.push(node_name.clone());
continue;
};
match execute_pipeline_node(kind, url, node_name, node, timeout_secs).await {
Some(Ok(out)) => {
outputs.insert(node_name.clone(), out);
}
Some(Err(e)) => {
errors.insert(node_name.clone(), e);
}
None => {
skipped.push(node_name.clone());
}
}
}
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&json!({
"execution_order": order,
"outputs": outputs,
"skipped": skipped,
"errors": errors
})).unwrap_or_default()
}]
}),
)
}
fn tool_graph_inspect(id: &Value, args: &Value) -> Value {
let Some(toml) = args.get("toml").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: toml");
};
let def = match PipelineParser::from_str(toml) {
Ok(d) => d,
Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
};
if let Err(e) = def.validate() {
return error_response(id, -32603, &format!("Validation error: {e}"));
}
let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
for node in &def.nodes {
pipeline.add_node(crate::domain::graph::Node::with_metadata(
&node.name,
&node.service,
serde_json::json!({
"url": node.url,
"params": toml_to_json(&toml::Value::Table(
node.params.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
))
}),
serde_json::Value::Null,
));
for dep in &node.depends_on {
pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
}
}
let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
Ok(e) => e,
Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
};
let snapshot = executor.snapshot();
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&snapshot).unwrap_or_default()
}]
}),
)
}
fn tool_graph_node_info(id: &Value, args: &Value) -> Value {
let Some(toml) = args.get("toml").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: toml");
};
let Some(node_id) = args.get("node_id").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: node_id");
};
let def = match PipelineParser::from_str(toml) {
Ok(d) => d,
Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
};
if let Err(e) = def.validate() {
return error_response(id, -32603, &format!("Validation error: {e}"));
}
let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
for node in &def.nodes {
pipeline.add_node(crate::domain::graph::Node::with_metadata(
&node.name,
&node.service,
serde_json::json!({
"url": node.url,
"params": toml_to_json(&toml::Value::Table(
node.params.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
))
}),
serde_json::Value::Null,
));
for dep in &node.depends_on {
pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
}
}
let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
Ok(e) => e,
Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
};
executor.node_info(node_id).map_or_else(
|| error_response(id, -32602, &format!("Node not found: {node_id}")),
|info| {
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&info).unwrap_or_default()
}]
}),
)
},
)
}
fn tool_graph_impact(id: &Value, args: &Value) -> Value {
let Some(toml) = args.get("toml").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: toml");
};
let Some(node_id) = args.get("node_id").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: node_id");
};
let def = match PipelineParser::from_str(toml) {
Ok(d) => d,
Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
};
if let Err(e) = def.validate() {
return error_response(id, -32603, &format!("Validation error: {e}"));
}
let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
for node in &def.nodes {
pipeline.add_node(crate::domain::graph::Node::with_metadata(
&node.name,
&node.service,
serde_json::json!({
"url": node.url,
"params": toml_to_json(&toml::Value::Table(
node.params.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
))
}),
serde_json::Value::Null,
));
for dep in &node.depends_on {
pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
}
}
let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
Ok(e) => e,
Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
};
let impact = executor.impact_analysis(node_id);
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&impact).unwrap_or_default()
}]
}),
)
}
fn tool_graph_query(id: &Value, args: &Value) -> Value {
let Some(toml) = args.get("toml").and_then(Value::as_str) else {
return error_response(id, -32602, "Missing required parameter: toml");
};
let def = match PipelineParser::from_str(toml) {
Ok(d) => d,
Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
};
if let Err(e) = def.validate() {
return error_response(id, -32603, &format!("Validation error: {e}"));
}
let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
for node in &def.nodes {
pipeline.add_node(crate::domain::graph::Node::with_metadata(
&node.name,
&node.service,
serde_json::json!({
"url": node.url,
"params": toml_to_json(&toml::Value::Table(
node.params.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
))
}),
serde_json::Value::Null,
));
for dep in &node.depends_on {
pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
}
}
let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
Ok(e) => e,
Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
};
let query = crate::domain::introspection::NodeQuery {
service: args
.get("service")
.and_then(Value::as_str)
.map(String::from),
id: None,
id_pattern: args
.get("id_pattern")
.and_then(Value::as_str)
.map(String::from),
is_root: args.get("is_root").and_then(Value::as_bool),
is_leaf: args.get("is_leaf").and_then(Value::as_bool),
min_depth: args
.get("min_depth")
.and_then(Value::as_u64)
.map(|v| usize::try_from(v).unwrap_or(0)),
max_depth: args
.get("max_depth")
.and_then(Value::as_u64)
.map(|v| usize::try_from(v).unwrap_or(0)),
};
let results = executor.query_nodes(&query);
ok_response(
id,
json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&results).unwrap_or_default()
}]
}),
)
}
}
impl Default for McpGraphServer {
fn default() -> Self {
Self::new()
}
}
fn build_graphql_node_request(
node: &NodeDecl,
url: &str,
timeout_secs: u64,
) -> (GraphQlService, ServiceInput) {
let query = node
.params
.get("query")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let config = GraphQlConfig {
timeout_secs,
..GraphQlConfig::default()
};
let service = GraphQlService::new(config, None);
let mut gql_map = serde_json::Map::new();
gql_map.insert("query".to_owned(), json!(query));
if let Some(variables) = node.params.get("variables") {
gql_map.insert("variables".to_owned(), toml_to_json(variables));
}
if let Some(auth) = node.params.get("auth") {
gql_map.insert("auth".to_owned(), toml_to_json(auth));
}
if let Some(dp) = node.params.get("data_path").and_then(|v| v.as_str()) {
gql_map.insert("data_path".to_owned(), json!(dp));
}
(
service,
ServiceInput {
url: url.to_string(),
params: Value::Object(gql_map),
},
)
}
async fn execute_pipeline_node(
kind: &str,
url: &str,
node_name: &str,
node: &NodeDecl,
timeout_secs: u64,
) -> Option<Result<Value, String>> {
match kind {
"http" => {
let config = HttpConfig {
timeout: std::time::Duration::from_secs(timeout_secs),
..HttpConfig::default()
};
let adapter = HttpAdapter::with_config(config);
let input = ServiceInput {
url: url.to_string(),
params: json!({}),
};
Some(
adapter
.execute(input)
.await
.map(|out| json!({ "data": out.data, "metadata": out.metadata }))
.map_err(|e| e.to_string()),
)
}
"rest" => {
let params = build_rest_params_from_node(node);
let adapter = RestApiAdapter::new();
let input = ServiceInput {
url: url.to_string(),
params,
};
Some(
adapter
.execute(input)
.await
.map(|out| json!({ "data": out.data, "metadata": out.metadata }))
.map_err(|e| e.to_string()),
)
}
"graphql" => {
let (service, input) = build_graphql_node_request(node, url, timeout_secs);
Some(
service
.execute(input)
.await
.map(|out| json!({ "data": out.data, "metadata": out.metadata }))
.map_err(|e| e.to_string()),
)
}
"sitemap" => {
let max_depth = node
.params
.get("max_depth")
.and_then(toml::Value::as_integer)
.map_or(5, |v| usize::try_from(v).unwrap_or(5));
let client = reqwest::Client::new();
let adapter = SitemapAdapter::new(client, max_depth);
let input = ServiceInput {
url: url.to_string(),
params: json!({}),
};
Some(
adapter
.execute(input)
.await
.map(|out| json!({ "data": out.data, "metadata": out.metadata }))
.map_err(|e| e.to_string()),
)
}
"rss" => {
let client = reqwest::Client::new();
let adapter = RssFeedAdapter::new(client);
let input = ServiceInput {
url: url.to_string(),
params: json!({}),
};
Some(
adapter
.execute(input)
.await
.map(|out| json!({ "data": out.data, "metadata": out.metadata }))
.map_err(|e| e.to_string()),
)
}
other => {
warn!(
kind = other,
node = node_name,
"skipping unsupported service kind in pipeline_run"
);
None
}
}
}
fn toml_to_json(v: &toml::Value) -> Value {
match v {
toml::Value::String(s) => Value::String(s.clone()),
toml::Value::Integer(i) => Value::Number((*i).into()),
toml::Value::Float(f) => {
serde_json::Number::from_f64(*f).map_or(Value::Null, Value::Number)
}
toml::Value::Boolean(b) => Value::Bool(*b),
toml::Value::Array(arr) => Value::Array(arr.iter().map(toml_to_json).collect()),
toml::Value::Table(tbl) => Value::Object(
tbl.iter()
.map(|(k, v)| (k.clone(), toml_to_json(v)))
.collect(),
),
toml::Value::Datetime(dt) => Value::String(dt.to_string()),
}
}
fn build_rest_params_from_node(node: &NodeDecl) -> Value {
let mut map = serde_json::Map::new();
if let Some(method) = node.params.get("method").and_then(|v| v.as_str()) {
map.insert("method".to_owned(), json!(method));
}
if let Some(auth) = node.params.get("auth") {
map.insert("auth".to_owned(), toml_to_json(auth));
}
if let Some(headers) = node.params.get("headers") {
map.insert("headers".to_owned(), toml_to_json(headers));
}
if let Some(query) = node.params.get("query") {
map.insert("query".to_owned(), toml_to_json(query));
}
if let Some(body) = node.params.get("body") {
map.insert("body".to_owned(), toml_to_json(body));
}
if let Some(pagination) = node.params.get("pagination") {
map.insert("pagination".to_owned(), toml_to_json(pagination));
}
if let Some(dp) = node.params.get("data_path").and_then(|v| v.as_str()) {
map.insert("response".to_owned(), json!({ "data_path": dp }));
}
Value::Object(map)
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn server_builds() {
let _ = McpGraphServer::new();
}
#[test]
fn initialize_response_contains_version() {
let id = json!(1);
let resp = McpGraphServer::handle_initialize(&id);
assert_eq!(
resp.pointer("/result/protocolVersion")
.and_then(Value::as_str),
Some("2025-11-25")
);
}
#[test]
fn tools_list_contains_all_tools() {
let id = json!(1);
let resp = McpGraphServer::handle_tools_list(&id);
let tools = resp
.pointer("/result/tools")
.and_then(Value::as_array)
.unwrap();
let names: Vec<&str> = tools
.iter()
.map(|t| t.get("name").and_then(Value::as_str).unwrap())
.collect();
assert!(names.contains(&"scrape"));
assert!(names.contains(&"scrape_rest"));
assert!(names.contains(&"scrape_graphql"));
assert!(names.contains(&"scrape_sitemap"));
assert!(names.contains(&"scrape_rss"));
assert!(names.contains(&"pipeline_validate"));
assert!(names.contains(&"pipeline_run"));
}
#[test]
fn pipeline_validate_rejects_bad_toml() {
let id = json!(1);
let args = json!({ "toml": "this is not valid toml [[[[" });
let resp = McpGraphServer::tool_pipeline_validate(&id, &args);
assert!(
resp.get("error").is_some_and(Value::is_object)
|| resp
.pointer("/result/content/0/text")
.and_then(Value::as_str)
.unwrap_or("")
.contains("false")
);
}
#[test]
fn pipeline_validate_accepts_valid_pipeline() {
let id = json!(1);
let toml = r#"
[[nodes]]
name = "fetch"
service = "http"
url = "https://example.com"
[[nodes]]
name = "process"
service = "http"
url = "https://example.com/api"
depends_on = ["fetch"]
"#;
let args = json!({ "toml": toml });
let resp = McpGraphServer::tool_pipeline_validate(&id, &args);
let text = resp
.pointer("/result/content/0/text")
.and_then(Value::as_str)
.unwrap();
let parsed: Value = serde_json::from_str(text).unwrap();
assert_eq!(parsed.get("valid"), Some(&json!(true)));
assert_eq!(parsed.get("node_count"), Some(&json!(2)));
}
#[test]
fn pipeline_validate_missing_toml_returns_error() {
let id = json!(1);
let args = json!({});
let resp = McpGraphServer::tool_pipeline_validate(&id, &args);
assert!(resp.get("error").is_some_and(Value::is_object));
}
}