use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::Value;
use tokio::io::AsyncWriteExt;
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::Mutex;
use synwire_core::BoxFuture;
use synwire_core::agents::error::AgentError;
use synwire_core::mcp::traits::{
McpConnectionState, McpServerStatus, McpToolDescriptor, McpTransport,
};
#[derive(Debug)]
pub struct StdioMcpTransport {
name: String,
command: String,
args: Vec<String>,
env: HashMap<String, String>,
state: Arc<Mutex<Inner>>,
next_id: AtomicU64,
calls_succeeded: AtomicU64,
calls_failed: AtomicU64,
}
#[derive(Debug, Default)]
struct Inner {
child: Option<Child>,
stdin: Option<ChildStdin>,
state: McpConnectionState,
enabled: bool,
}
impl StdioMcpTransport {
#[must_use]
pub fn new(
name: impl Into<String>,
command: impl Into<String>,
args: Vec<String>,
env: HashMap<String, String>,
) -> Self {
Self {
name: name.into(),
command: command.into(),
args,
env,
state: Arc::new(Mutex::new(Inner {
state: McpConnectionState::Disconnected,
enabled: true,
..Inner::default()
})),
next_id: AtomicU64::new(1),
calls_succeeded: AtomicU64::new(0),
calls_failed: AtomicU64::new(0),
}
}
fn next_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
async fn rpc(&self, method: &str, params: Value) -> Result<Value, AgentError> {
let id = self.next_id();
let request = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
let mut line =
serde_json::to_string(&request).map_err(|e| AgentError::Vfs(e.to_string()))?;
line.push('\n');
let mut guard = self.state.lock().await;
let stdin = guard
.stdin
.as_mut()
.ok_or_else(|| AgentError::Vfs("MCP server not connected".to_string()))?;
stdin
.write_all(line.as_bytes())
.await
.map_err(|e| AgentError::Vfs(e.to_string()))?;
drop(guard);
Ok(serde_json::json!({ "jsonrpc": "2.0", "id": id, "result": null }))
}
}
impl McpTransport for StdioMcpTransport {
fn connect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
Box::pin(async move {
let mut guard = self.state.lock().await;
if guard.state == McpConnectionState::Connected {
return Ok(());
}
guard.state = McpConnectionState::Connecting;
let mut cmd = Command::new(&self.command);
let _ = cmd
.args(&self.args)
.envs(&self.env)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null());
let mut child = cmd
.spawn()
.map_err(|e| AgentError::Vfs(format!("Failed to spawn MCP server: {e}")))?;
guard.stdin = child.stdin.take();
guard.child = Some(child);
guard.state = McpConnectionState::Connected;
guard.enabled = true;
drop(guard);
tracing::info!(server = %self.name, "MCP stdio server connected");
Ok(())
})
}
fn reconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
Box::pin(async move {
{
let mut guard = self.state.lock().await;
guard.state = McpConnectionState::Reconnecting;
guard.stdin = None;
if let Some(mut child) = guard.child.take() {
let _ = child.kill().await;
}
}
self.connect().await
})
}
fn status(&self) -> BoxFuture<'_, McpServerStatus> {
Box::pin(async move {
let guard = self.state.lock().await;
McpServerStatus {
name: self.name.clone(),
state: guard.state,
calls_succeeded: self.calls_succeeded.load(Ordering::Relaxed),
calls_failed: self.calls_failed.load(Ordering::Relaxed),
enabled: guard.enabled,
}
})
}
fn list_tools(&self) -> BoxFuture<'_, Result<Vec<McpToolDescriptor>, AgentError>> {
Box::pin(async move {
let _response = self.rpc("tools/list", serde_json::json!({})).await?;
Ok(Vec::new())
})
}
fn call_tool(
&self,
tool_name: &str,
arguments: Value,
) -> BoxFuture<'_, Result<Value, AgentError>> {
let tool_name = tool_name.to_string();
Box::pin(async move {
let result = self
.rpc(
"tools/call",
serde_json::json!({ "name": tool_name, "arguments": arguments }),
)
.await;
match &result {
Ok(_) => {
let _ = self.calls_succeeded.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
let _ = self.calls_failed.fetch_add(1, Ordering::Relaxed);
}
}
result.map(|r| r["result"].clone())
})
}
fn disconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
Box::pin(async move {
let mut guard = self.state.lock().await;
guard.stdin = None;
let child = guard.child.take();
guard.state = McpConnectionState::Shutdown;
drop(guard);
if let Some(mut child) = child {
let _ = child.kill().await;
}
tracing::info!(server = %self.name, "MCP stdio server disconnected");
Ok(())
})
}
}