1use std::process::Stdio;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use deck_core::traits::ToolDescriptor;
11use deck_core::{DeckError, McpClient, Result, ToolCall, ToolResult};
12use serde_json::json;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::{Child, ChildStdin, ChildStdout, Command};
15use tokio::sync::Mutex;
16use tracing::{debug, warn};
17
18use crate::wire::{make_request, JsonRpcResponse};
19
20const RPC_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub struct StdioMcpClient {
25 name: String,
26 next_id: AtomicU64,
27 poisoned: AtomicBool,
32 inner: Mutex<Inner>,
33}
34
35struct Inner {
36 _child: Child,
39 stdin: ChildStdin,
40 stdout: BufReader<ChildStdout>,
41}
42
43impl std::fmt::Debug for StdioMcpClient {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("StdioMcpClient")
46 .field("name", &self.name)
47 .finish_non_exhaustive()
48 }
49}
50
51impl StdioMcpClient {
52 pub async fn spawn(name: impl Into<String>, command: &str, args: &[String]) -> Result<Self> {
53 let mut child = Command::new(command)
54 .args(args)
55 .stdin(Stdio::piped())
56 .stdout(Stdio::piped())
57 .stderr(Stdio::inherit())
58 .spawn()
59 .map_err(|e| DeckError::Mcp(format!("spawn {command}: {e}")))?;
60 let stdin = child
61 .stdin
62 .take()
63 .ok_or_else(|| DeckError::Mcp("no stdin pipe".into()))?;
64 let stdout = child
65 .stdout
66 .take()
67 .ok_or_else(|| DeckError::Mcp("no stdout pipe".into()))?;
68 let me = Self {
69 name: name.into(),
70 next_id: AtomicU64::new(1),
71 poisoned: AtomicBool::new(false),
72 inner: Mutex::new(Inner {
73 _child: child,
74 stdin,
75 stdout: BufReader::new(stdout),
76 }),
77 };
78 me.initialize().await?;
79 Ok(me)
80 }
81
82 async fn initialize(&self) -> Result<()> {
83 let resp = self
84 .request(
85 "initialize",
86 Some(json!({
87 "protocolVersion": "2024-11-05",
88 "capabilities": {},
89 "clientInfo": {"name": "ono-sendai", "version": env!("CARGO_PKG_VERSION")}
90 })),
91 )
92 .await?;
93 debug!(server = %self.name, ?resp, "mcp initialized");
94 Ok(())
95 }
96
97 async fn request(
98 &self,
99 method: &str,
100 params: Option<serde_json::Value>,
101 ) -> Result<JsonRpcResponse> {
102 if self.poisoned.load(Ordering::Acquire) {
103 return Err(DeckError::Mcp(format!(
104 "client `{}` is poisoned after a prior timeout — drop it and respawn",
105 self.name
106 )));
107 }
108 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
109 let line = make_request(id, method, params);
110 let mut inner = self.inner.lock().await;
111 inner
112 .stdin
113 .write_all(line.as_bytes())
114 .await
115 .map_err(|e| DeckError::Mcp(format!("write: {e}")))?;
116 inner
117 .stdin
118 .write_all(b"\n")
119 .await
120 .map_err(|e| DeckError::Mcp(format!("write newline: {e}")))?;
121 inner
122 .stdin
123 .flush()
124 .await
125 .map_err(|e| DeckError::Mcp(format!("flush: {e}")))?;
126 let mut buf = String::new();
127 let Ok(read) = tokio::time::timeout(RPC_TIMEOUT, inner.stdout.read_line(&mut buf)).await
128 else {
129 self.poisoned.store(true, Ordering::Release);
133 return Err(DeckError::Mcp(format!(
134 "rpc timeout after {}s; client poisoned",
135 RPC_TIMEOUT.as_secs()
136 )));
137 };
138 let n = read.map_err(|e| DeckError::Mcp(format!("read: {e}")))?;
139 if n == 0 {
140 return Err(DeckError::Mcp("server closed pipe".into()));
141 }
142 let resp: JsonRpcResponse = serde_json::from_str(buf.trim())?;
143 if resp.id != Some(id) {
147 self.poisoned.store(true, Ordering::Release);
148 return Err(DeckError::Mcp(format!(
149 "rpc id mismatch (sent {id}, got {:?}); client poisoned",
150 resp.id
151 )));
152 }
153 if let Some(err) = &resp.error {
154 warn!(code = err.code, msg = %err.message, "mcp jsonrpc error");
155 return Err(DeckError::Mcp(format!(
156 "rpc error {}: {}",
157 err.code, err.message
158 )));
159 }
160 Ok(resp)
161 }
162}
163
164#[async_trait]
165impl McpClient for StdioMcpClient {
166 fn server_name(&self) -> &str {
167 &self.name
168 }
169
170 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>> {
171 let resp = self.request("tools/list", None).await?;
172 let result = resp
173 .result
174 .ok_or_else(|| DeckError::Mcp("missing result".into()))?;
175 let tools = result
176 .get("tools")
177 .and_then(|v| v.as_array())
178 .cloned()
179 .unwrap_or_default();
180 Ok(tools
181 .into_iter()
182 .filter_map(|t| {
183 let name = t.get("name")?.as_str()?.to_owned();
184 let description = t
185 .get("description")
186 .and_then(|v| v.as_str())
187 .unwrap_or("")
188 .to_owned();
189 let schema = t.get("inputSchema").cloned().unwrap_or(json!({}));
190 Some(ToolDescriptor {
191 name,
192 description,
193 json_schema: schema,
194 })
195 })
196 .collect())
197 }
198
199 async fn call(&self, call: &ToolCall) -> Result<ToolResult> {
200 let resp = self
201 .request(
202 "tools/call",
203 Some(json!({
204 "name": call.tool,
205 "arguments": call.arguments,
206 })),
207 )
208 .await?;
209 let result = resp.result.unwrap_or(json!({}));
210 let is_error = result
211 .get("isError")
212 .and_then(serde_json::Value::as_bool)
213 .unwrap_or(false);
214 Ok(ToolResult {
215 call_id: call.id.clone(),
216 content: result,
217 is_error,
218 })
219 }
220}