Skip to main content

deck_mcp/
stdio.rs

1//! A stdio-transport MCP client. Each instance owns one child process and
2//! its stdin/stdout pipes. Reads and writes are serialized through Mutexes
3//! to keep request/response ordering deterministic.
4
5use std::process::Stdio;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use deck_core::traits::ToolDescriptor;
11use deck_core::{DeckError, McpClient, Result, ToolCall, ToolResult};
12use serde_json::json;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::{Child, ChildStdin, ChildStdout, Command};
15use tokio::sync::Mutex;
16use tracing::{debug, warn};
17
18use crate::wire::{make_request, JsonRpcResponse};
19
20/// Default per-RPC timeout. A hanging server cannot stall the
21/// orchestrator forever.
22const RPC_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub struct StdioMcpClient {
25    name: String,
26    next_id: AtomicU64,
27    /// Set when an RPC times out — the wire is desynced (the server may
28    /// still deliver the late response, which the next caller would read
29    /// in place of its own). We refuse all subsequent requests on a
30    /// poisoned client so id/response pairing cannot drift.
31    poisoned: AtomicBool,
32    inner: Mutex<Inner>,
33}
34
35struct Inner {
36    /// Held only to keep the child alive — `tokio::process::Child` kills
37    /// the subprocess on drop. The field is read by `Drop`, not by us.
38    _child: Child,
39    stdin: ChildStdin,
40    stdout: BufReader<ChildStdout>,
41}
42
43impl std::fmt::Debug for StdioMcpClient {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("StdioMcpClient")
46            .field("name", &self.name)
47            .finish_non_exhaustive()
48    }
49}
50
51impl StdioMcpClient {
52    pub async fn spawn(name: impl Into<String>, command: &str, args: &[String]) -> Result<Self> {
53        let mut child = Command::new(command)
54            .args(args)
55            .stdin(Stdio::piped())
56            .stdout(Stdio::piped())
57            .stderr(Stdio::inherit())
58            .spawn()
59            .map_err(|e| DeckError::Mcp(format!("spawn {command}: {e}")))?;
60        let stdin = child
61            .stdin
62            .take()
63            .ok_or_else(|| DeckError::Mcp("no stdin pipe".into()))?;
64        let stdout = child
65            .stdout
66            .take()
67            .ok_or_else(|| DeckError::Mcp("no stdout pipe".into()))?;
68        let me = Self {
69            name: name.into(),
70            next_id: AtomicU64::new(1),
71            poisoned: AtomicBool::new(false),
72            inner: Mutex::new(Inner {
73                _child: child,
74                stdin,
75                stdout: BufReader::new(stdout),
76            }),
77        };
78        me.initialize().await?;
79        Ok(me)
80    }
81
82    async fn initialize(&self) -> Result<()> {
83        let resp = self
84            .request(
85                "initialize",
86                Some(json!({
87                    "protocolVersion": "2024-11-05",
88                    "capabilities": {},
89                    "clientInfo": {"name": "ono-sendai", "version": env!("CARGO_PKG_VERSION")}
90                })),
91            )
92            .await?;
93        debug!(server = %self.name, ?resp, "mcp initialized");
94        Ok(())
95    }
96
97    async fn request(
98        &self,
99        method: &str,
100        params: Option<serde_json::Value>,
101    ) -> Result<JsonRpcResponse> {
102        if self.poisoned.load(Ordering::Acquire) {
103            return Err(DeckError::Mcp(format!(
104                "client `{}` is poisoned after a prior timeout — drop it and respawn",
105                self.name
106            )));
107        }
108        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
109        let line = make_request(id, method, params);
110        let mut inner = self.inner.lock().await;
111        inner
112            .stdin
113            .write_all(line.as_bytes())
114            .await
115            .map_err(|e| DeckError::Mcp(format!("write: {e}")))?;
116        inner
117            .stdin
118            .write_all(b"\n")
119            .await
120            .map_err(|e| DeckError::Mcp(format!("write newline: {e}")))?;
121        inner
122            .stdin
123            .flush()
124            .await
125            .map_err(|e| DeckError::Mcp(format!("flush: {e}")))?;
126        let mut buf = String::new();
127        let Ok(read) = tokio::time::timeout(RPC_TIMEOUT, inner.stdout.read_line(&mut buf)).await
128        else {
129            // Wire is now desynced: the server may still deliver the
130            // late response. Poison the client so subsequent calls
131            // refuse rather than read out-of-order.
132            self.poisoned.store(true, Ordering::Release);
133            return Err(DeckError::Mcp(format!(
134                "rpc timeout after {}s; client poisoned",
135                RPC_TIMEOUT.as_secs()
136            )));
137        };
138        let n = read.map_err(|e| DeckError::Mcp(format!("read: {e}")))?;
139        if n == 0 {
140            return Err(DeckError::Mcp("server closed pipe".into()));
141        }
142        let resp: JsonRpcResponse = serde_json::from_str(buf.trim())?;
143        // Pair response to request. An unsolicited server notification or
144        // out-of-order reply would otherwise be returned to the wrong
145        // caller — desync the wire to be safe.
146        if resp.id != Some(id) {
147            self.poisoned.store(true, Ordering::Release);
148            return Err(DeckError::Mcp(format!(
149                "rpc id mismatch (sent {id}, got {:?}); client poisoned",
150                resp.id
151            )));
152        }
153        if let Some(err) = &resp.error {
154            warn!(code = err.code, msg = %err.message, "mcp jsonrpc error");
155            return Err(DeckError::Mcp(format!(
156                "rpc error {}: {}",
157                err.code, err.message
158            )));
159        }
160        Ok(resp)
161    }
162}
163
164#[async_trait]
165impl McpClient for StdioMcpClient {
166    fn server_name(&self) -> &str {
167        &self.name
168    }
169
170    async fn list_tools(&self) -> Result<Vec<ToolDescriptor>> {
171        let resp = self.request("tools/list", None).await?;
172        let result = resp
173            .result
174            .ok_or_else(|| DeckError::Mcp("missing result".into()))?;
175        let tools = result
176            .get("tools")
177            .and_then(|v| v.as_array())
178            .cloned()
179            .unwrap_or_default();
180        Ok(tools
181            .into_iter()
182            .filter_map(|t| {
183                let name = t.get("name")?.as_str()?.to_owned();
184                let description = t
185                    .get("description")
186                    .and_then(|v| v.as_str())
187                    .unwrap_or("")
188                    .to_owned();
189                let schema = t.get("inputSchema").cloned().unwrap_or(json!({}));
190                Some(ToolDescriptor {
191                    name,
192                    description,
193                    json_schema: schema,
194                })
195            })
196            .collect())
197    }
198
199    async fn call(&self, call: &ToolCall) -> Result<ToolResult> {
200        let resp = self
201            .request(
202                "tools/call",
203                Some(json!({
204                    "name": call.tool,
205                    "arguments": call.arguments,
206                })),
207            )
208            .await?;
209        let result = resp.result.unwrap_or(json!({}));
210        let is_error = result
211            .get("isError")
212            .and_then(serde_json::Value::as_bool)
213            .unwrap_or(false);
214        Ok(ToolResult {
215            call_id: call.id.clone(),
216            content: result,
217            is_error,
218        })
219    }
220}