synwire_agent/mcp/
stdio.rs1use std::collections::HashMap;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use serde_json::Value;
9use tokio::io::AsyncWriteExt;
10use tokio::process::{Child, ChildStdin, Command};
11use tokio::sync::Mutex;
12
13use synwire_core::BoxFuture;
14use synwire_core::agents::error::AgentError;
15use synwire_core::mcp::traits::{
16 McpConnectionState, McpServerStatus, McpToolDescriptor, McpTransport,
17};
18
19#[derive(Debug)]
26pub struct StdioMcpTransport {
27 name: String,
28 command: String,
29 args: Vec<String>,
30 env: HashMap<String, String>,
31 state: Arc<Mutex<Inner>>,
32 next_id: AtomicU64,
33 calls_succeeded: AtomicU64,
34 calls_failed: AtomicU64,
35}
36
37#[derive(Debug, Default)]
38struct Inner {
39 child: Option<Child>,
40 stdin: Option<ChildStdin>,
41 state: McpConnectionState,
42 enabled: bool,
43}
44
45impl StdioMcpTransport {
46 #[must_use]
48 pub fn new(
49 name: impl Into<String>,
50 command: impl Into<String>,
51 args: Vec<String>,
52 env: HashMap<String, String>,
53 ) -> Self {
54 Self {
55 name: name.into(),
56 command: command.into(),
57 args,
58 env,
59 state: Arc::new(Mutex::new(Inner {
60 state: McpConnectionState::Disconnected,
61 enabled: true,
62 ..Inner::default()
63 })),
64 next_id: AtomicU64::new(1),
65 calls_succeeded: AtomicU64::new(0),
66 calls_failed: AtomicU64::new(0),
67 }
68 }
69
70 fn next_id(&self) -> u64 {
71 self.next_id.fetch_add(1, Ordering::Relaxed)
72 }
73
74 async fn rpc(&self, method: &str, params: Value) -> Result<Value, AgentError> {
76 let id = self.next_id();
77 let request = serde_json::json!({
78 "jsonrpc": "2.0",
79 "id": id,
80 "method": method,
81 "params": params,
82 });
83
84 let mut line =
85 serde_json::to_string(&request).map_err(|e| AgentError::Vfs(e.to_string()))?;
86 line.push('\n');
87
88 let mut guard = self.state.lock().await;
89 let stdin = guard
90 .stdin
91 .as_mut()
92 .ok_or_else(|| AgentError::Vfs("MCP server not connected".to_string()))?;
93
94 stdin
95 .write_all(line.as_bytes())
96 .await
97 .map_err(|e| AgentError::Vfs(e.to_string()))?;
98
99 drop(guard);
103
104 Ok(serde_json::json!({ "jsonrpc": "2.0", "id": id, "result": null }))
106 }
107}
108
109impl McpTransport for StdioMcpTransport {
110 fn connect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
111 Box::pin(async move {
112 let mut guard = self.state.lock().await;
113 if guard.state == McpConnectionState::Connected {
114 return Ok(());
115 }
116 guard.state = McpConnectionState::Connecting;
117
118 let mut cmd = Command::new(&self.command);
119 let _ = cmd
120 .args(&self.args)
121 .envs(&self.env)
122 .stdin(std::process::Stdio::piped())
123 .stdout(std::process::Stdio::piped())
124 .stderr(std::process::Stdio::null());
125
126 let mut child = cmd
127 .spawn()
128 .map_err(|e| AgentError::Vfs(format!("Failed to spawn MCP server: {e}")))?;
129
130 guard.stdin = child.stdin.take();
131 guard.child = Some(child);
132 guard.state = McpConnectionState::Connected;
133 guard.enabled = true;
134 drop(guard);
135
136 tracing::info!(server = %self.name, "MCP stdio server connected");
137 Ok(())
138 })
139 }
140
141 fn reconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
142 Box::pin(async move {
143 {
144 let mut guard = self.state.lock().await;
145 guard.state = McpConnectionState::Reconnecting;
146 guard.stdin = None;
147 if let Some(mut child) = guard.child.take() {
148 let _ = child.kill().await;
149 }
150 }
151 self.connect().await
152 })
153 }
154
155 fn status(&self) -> BoxFuture<'_, McpServerStatus> {
156 Box::pin(async move {
157 let guard = self.state.lock().await;
158 McpServerStatus {
159 name: self.name.clone(),
160 state: guard.state,
161 calls_succeeded: self.calls_succeeded.load(Ordering::Relaxed),
162 calls_failed: self.calls_failed.load(Ordering::Relaxed),
163 enabled: guard.enabled,
164 }
165 })
166 }
167
168 fn list_tools(&self) -> BoxFuture<'_, Result<Vec<McpToolDescriptor>, AgentError>> {
169 Box::pin(async move {
170 let _response = self.rpc("tools/list", serde_json::json!({})).await?;
171 Ok(Vec::new())
173 })
174 }
175
176 fn call_tool(
177 &self,
178 tool_name: &str,
179 arguments: Value,
180 ) -> BoxFuture<'_, Result<Value, AgentError>> {
181 let tool_name = tool_name.to_string();
182 Box::pin(async move {
183 let result = self
184 .rpc(
185 "tools/call",
186 serde_json::json!({ "name": tool_name, "arguments": arguments }),
187 )
188 .await;
189 match &result {
190 Ok(_) => {
191 let _ = self.calls_succeeded.fetch_add(1, Ordering::Relaxed);
192 }
193 Err(_) => {
194 let _ = self.calls_failed.fetch_add(1, Ordering::Relaxed);
195 }
196 }
197 result.map(|r| r["result"].clone())
198 })
199 }
200
201 fn disconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
202 Box::pin(async move {
203 let mut guard = self.state.lock().await;
204 guard.stdin = None;
205 let child = guard.child.take();
206 guard.state = McpConnectionState::Shutdown;
207 drop(guard);
208 if let Some(mut child) = child {
209 let _ = child.kill().await;
210 }
211 tracing::info!(server = %self.name, "MCP stdio server disconnected");
212 Ok(())
213 })
214 }
215}