jamjet_worker/executors/
mcp_tool.rs1use crate::executor::{ExecutionResult, NodeExecutor};
14use async_trait::async_trait;
15use jamjet_ir::workflow::{McpServerConfig, McpTransport as IrMcpTransport};
16use jamjet_mcp::{HttpSseTransport, McpClient, StdioTransport};
17use jamjet_state::backend::WorkItem;
18use serde_json::{json, Value};
19use tracing::{debug, instrument};
20
21pub struct McpToolExecutor {
23 servers: std::collections::HashMap<String, McpServerConfig>,
25}
26
27impl McpToolExecutor {
28 pub fn new(servers: std::collections::HashMap<String, McpServerConfig>) -> Self {
29 Self { servers }
30 }
31}
32
33#[async_trait]
34impl NodeExecutor for McpToolExecutor {
35 #[instrument(skip(self, item), fields(node_id = %item.node_id))]
36 async fn execute(&self, item: &WorkItem) -> Result<ExecutionResult, String> {
37 let start = std::time::Instant::now();
38
39 let server_alias = item
41 .payload
42 .get("server")
43 .and_then(|v| v.as_str())
44 .ok_or("McpTool: missing 'server' in payload")?;
45 let tool_name = item
46 .payload
47 .get("tool")
48 .and_then(|v| v.as_str())
49 .ok_or("McpTool: missing 'tool' in payload")?;
50 let arguments = item.payload.get("arguments").cloned().unwrap_or(json!({}));
51
52 let server_config = self
53 .servers
54 .get(server_alias)
55 .ok_or_else(|| format!("McpTool: no server config for alias '{server_alias}'"))?;
56
57 debug!(server = %server_alias, tool = %tool_name, "Invoking MCP tool");
58
59 let mcp_span = tracing::info_span!(
61 "jamjet.mcp_call",
62 "jamjet.tool.protocol" = "mcp",
63 "jamjet.mcp.server" = %server_alias,
64 "jamjet.tool.name" = %tool_name,
65 );
66 let _mcp_guard = mcp_span.enter();
67
68 let client: McpClient = match &server_config.transport {
70 IrMcpTransport::Stdio => {
71 let command = server_config
72 .command
73 .as_deref()
74 .ok_or("McpTool: stdio transport requires 'command'")?;
75 let arg_strs: Vec<&str> = server_config.args.iter().map(|s| s.as_str()).collect();
76 let transport = StdioTransport::spawn(command, &arg_strs).await?;
77 McpClient::new(server_alias.to_string(), Box::new(transport))
78 }
79 IrMcpTransport::HttpSse | IrMcpTransport::WebSocket => {
80 let url = server_config
81 .url
82 .as_deref()
83 .ok_or("McpTool: HTTP transport requires 'url'")?;
84 let transport = HttpSseTransport::new(url.to_string());
85 McpClient::new(server_alias.to_string(), Box::new(transport))
86 }
87 };
88
89 client
91 .initialize()
92 .await
93 .map_err(|e| format!("MCP initialize failed: {e}"))?;
94
95 let call_result = client
97 .call_tool(tool_name, arguments)
98 .await
99 .map_err(|e| format!("MCP tool call failed: {e}"))?;
100
101 client.close().await;
102
103 let output_value = content_to_json(&call_result.content);
105
106 Ok(ExecutionResult {
107 output: output_value.clone(),
108 state_patch: json!({}), duration_ms: start.elapsed().as_millis() as u64,
110 gen_ai_system: None,
111 gen_ai_model: None,
112 input_tokens: None,
113 output_tokens: None,
114 finish_reason: None,
115 })
116 }
117}
118
119fn content_to_json(content: &[jamjet_mcp::types::McpContent]) -> Value {
121 use jamjet_mcp::types::McpContent;
122 match content {
123 [] => json!(null),
124 [single] => match single {
125 McpContent::Text { text } => json!(text),
126 McpContent::Image { data, mime_type } => {
127 json!({ "type": "image", "data": data, "mime_type": mime_type })
128 }
129 McpContent::Resource {
130 uri,
131 text,
132 mime_type,
133 } => json!({ "type": "resource", "uri": uri, "text": text, "mime_type": mime_type }),
134 },
135 many => {
136 let items: Vec<Value> = many.iter().map(|c| match c {
137 McpContent::Text { text } => json!(text),
138 McpContent::Image { data, mime_type } => json!({ "type": "image", "data": data, "mime_type": mime_type }),
139 McpContent::Resource { uri, text, mime_type } => json!({ "type": "resource", "uri": uri, "text": text, "mime_type": mime_type }),
140 }).collect();
141 json!(items)
142 }
143 }
144}