Skip to main content

jamjet_worker/executors/
mcp_tool.rs

1//! Executor for `McpTool` workflow nodes.
2//!
3//! When a workflow node has kind `McpTool`, this executor:
4//! 1. Resolves the MCP server configuration from the workflow IR.
5//! 2. Connects to the MCP server (stdio or HTTP).
6//! 3. Sends the initialize handshake.
7//! 4. Invokes the specified tool with mapped inputs.
8//! 5. Returns the tool's output as the node result.
9//!
10//! Each invocation opens a fresh connection. For Phase 2, connection pooling
11//! and persistent stdio processes can be added.
12
13use crate::executor::{ExecutionResult, NodeExecutor};
14use async_trait::async_trait;
15use jamjet_ir::workflow::{McpServerConfig, McpTransport as IrMcpTransport};
16use jamjet_mcp::{HttpSseTransport, McpClient, StdioTransport};
17use jamjet_state::backend::WorkItem;
18use serde_json::{json, Value};
19use tracing::{debug, instrument};
20
21/// Executor for `mcp_tool` nodes.
22pub struct McpToolExecutor {
23    /// Resolved MCP server configs from the workflow IR, keyed by server alias.
24    servers: std::collections::HashMap<String, McpServerConfig>,
25}
26
27impl McpToolExecutor {
28    pub fn new(servers: std::collections::HashMap<String, McpServerConfig>) -> Self {
29        Self { servers }
30    }
31}
32
33#[async_trait]
34impl NodeExecutor for McpToolExecutor {
35    #[instrument(skip(self, item), fields(node_id = %item.node_id))]
36    async fn execute(&self, item: &WorkItem) -> Result<ExecutionResult, String> {
37        let start = std::time::Instant::now();
38
39        // Extract server alias and tool name from the payload.
40        let server_alias = item
41            .payload
42            .get("server")
43            .and_then(|v| v.as_str())
44            .ok_or("McpTool: missing 'server' in payload")?;
45        let tool_name = item
46            .payload
47            .get("tool")
48            .and_then(|v| v.as_str())
49            .ok_or("McpTool: missing 'tool' in payload")?;
50        let arguments = item.payload.get("arguments").cloned().unwrap_or(json!({}));
51
52        let server_config = self
53            .servers
54            .get(server_alias)
55            .ok_or_else(|| format!("McpTool: no server config for alias '{server_alias}'"))?;
56
57        debug!(server = %server_alias, tool = %tool_name, "Invoking MCP tool");
58
59        // Open a protocol-level span for MCP call latency tracking (H2.4).
60        let mcp_span = tracing::info_span!(
61            "jamjet.mcp_call",
62            "jamjet.tool.protocol" = "mcp",
63            "jamjet.mcp.server" = %server_alias,
64            "jamjet.tool.name" = %tool_name,
65        );
66        let _mcp_guard = mcp_span.enter();
67
68        // Connect to the MCP server based on transport type.
69        let client: McpClient = match &server_config.transport {
70            IrMcpTransport::Stdio => {
71                let command = server_config
72                    .command
73                    .as_deref()
74                    .ok_or("McpTool: stdio transport requires 'command'")?;
75                let arg_strs: Vec<&str> = server_config.args.iter().map(|s| s.as_str()).collect();
76                let transport = StdioTransport::spawn(command, &arg_strs).await?;
77                McpClient::new(server_alias.to_string(), Box::new(transport))
78            }
79            IrMcpTransport::HttpSse | IrMcpTransport::WebSocket => {
80                let url = server_config
81                    .url
82                    .as_deref()
83                    .ok_or("McpTool: HTTP transport requires 'url'")?;
84                let transport = HttpSseTransport::new(url.to_string());
85                McpClient::new(server_alias.to_string(), Box::new(transport))
86            }
87        };
88
89        // Initialize the MCP connection.
90        client
91            .initialize()
92            .await
93            .map_err(|e| format!("MCP initialize failed: {e}"))?;
94
95        // Invoke the tool.
96        let call_result = client
97            .call_tool(tool_name, arguments)
98            .await
99            .map_err(|e| format!("MCP tool call failed: {e}"))?;
100
101        client.close().await;
102
103        // Convert MCP content to a JSON value.
104        let output_value = content_to_json(&call_result.content);
105
106        Ok(ExecutionResult {
107            output: output_value.clone(),
108            state_patch: json!({}), // caller can configure mapping
109            duration_ms: start.elapsed().as_millis() as u64,
110            gen_ai_system: None,
111            gen_ai_model: None,
112            input_tokens: None,
113            output_tokens: None,
114            finish_reason: None,
115        })
116    }
117}
118
119/// Convert a list of `McpContent` items to a single JSON value.
120fn content_to_json(content: &[jamjet_mcp::types::McpContent]) -> Value {
121    use jamjet_mcp::types::McpContent;
122    match content {
123        [] => json!(null),
124        [single] => match single {
125            McpContent::Text { text } => json!(text),
126            McpContent::Image { data, mime_type } => {
127                json!({ "type": "image", "data": data, "mime_type": mime_type })
128            }
129            McpContent::Resource {
130                uri,
131                text,
132                mime_type,
133            } => json!({ "type": "resource", "uri": uri, "text": text, "mime_type": mime_type }),
134        },
135        many => {
136            let items: Vec<Value> = many.iter().map(|c| match c {
137                McpContent::Text { text } => json!(text),
138                McpContent::Image { data, mime_type } => json!({ "type": "image", "data": data, "mime_type": mime_type }),
139                McpContent::Resource { uri, text, mime_type } => json!({ "type": "resource", "uri": uri, "text": text, "mime_type": mime_type }),
140            }).collect();
141            json!(items)
142        }
143    }
144}