lex_runtime/mcp_client.rs
1//! Minimal stdio MCP (Model Context Protocol) client for the
2//! `agent.call_mcp` builtin (#185). Spawns the named MCP server
3//! as a subprocess, completes the `initialize` handshake, then
4//! forwards a `tools/call` request and returns the result.
5//!
6//! Scope:
7//!
8//! - One client per call (spawn-per-call). MCP servers are cheap
9//! to start; per-call spawning keeps the implementation
10//! stateless. A connection cache is a clear v2 optimization
11//! once benchmarks show it matters.
12//! - stdio transport only. Future transports (TCP / SSE / HTTP)
13//! can branch on a URL prefix in `command` later.
14//! - No auth; the issue body explicitly defers credential
15//! handling to a follow-up. Don't expose this builtin to
16//! untrusted code paths until that's done.
17
18use serde_json::{json, Value};
19use std::io::{BufRead, BufReader, Write};
20use std::process::{ChildStdin, ChildStdout, Command, Stdio};
21use std::time::Duration;
22
23pub struct McpClient {
24 child: std::process::Child,
25 stdin: ChildStdin,
26 stdout: BufReader<ChildStdout>,
27 next_id: i64,
28}
29
30impl McpClient {
31 /// Spawn the MCP server described by `command_line` (whitespace-
32 /// separated argv), perform the JSON-RPC `initialize` handshake,
33 /// and return a ready client. Caller is expected to drop the
34 /// client when finished — `Drop` reaps the subprocess.
35 pub fn spawn(command_line: &str) -> Result<Self, String> {
36 let parts: Vec<&str> = command_line.split_whitespace().collect();
37 let cmd = parts.first()
38 .ok_or_else(|| "mcp.spawn: empty command".to_string())?;
39 let mut child = Command::new(cmd)
40 .args(&parts[1..])
41 .stdin(Stdio::piped())
42 .stdout(Stdio::piped())
43 .stderr(Stdio::piped())
44 .spawn()
45 .map_err(|e| format!("mcp.spawn `{cmd}`: {e}"))?;
46 let stdin = child.stdin.take()
47 .ok_or_else(|| "mcp.spawn: no stdin".to_string())?;
48 let stdout = BufReader::new(child.stdout.take()
49 .ok_or_else(|| "mcp.spawn: no stdout".to_string())?);
50 let mut client = Self { child, stdin, stdout, next_id: 0 };
51 client.initialize()?;
52 Ok(client)
53 }
54
55 fn next_id(&mut self) -> i64 {
56 self.next_id += 1;
57 self.next_id
58 }
59
60 fn rpc(&mut self, method: &str, params: Value) -> Result<Value, String> {
61 let id = self.next_id();
62 let req = json!({
63 "jsonrpc": "2.0",
64 "id": id,
65 "method": method,
66 "params": params,
67 });
68 let line = serde_json::to_string(&req)
69 .map_err(|e| format!("mcp.rpc: serialize: {e}"))?;
70 self.stdin.write_all(line.as_bytes())
71 .map_err(|e| format!("mcp.rpc: write: {e}"))?;
72 self.stdin.write_all(b"\n")
73 .map_err(|e| format!("mcp.rpc: write: {e}"))?;
74 self.stdin.flush()
75 .map_err(|e| format!("mcp.rpc: flush: {e}"))?;
76 // The server may emit notifications before the response;
77 // skip lines whose id doesn't match.
78 loop {
79 let mut buf = String::new();
80 let n = self.stdout.read_line(&mut buf)
81 .map_err(|e| format!("mcp.rpc: read: {e}"))?;
82 if n == 0 {
83 return Err("mcp.rpc: server closed stdout".into());
84 }
85 let resp: Value = serde_json::from_str(buf.trim())
86 .map_err(|e| format!("mcp.rpc: parse `{}`: {e}",
87 buf.trim().chars().take(120).collect::<String>()))?;
88 // Skip notifications (no `id` field).
89 if resp.get("id").is_none() { continue; }
90 if resp["id"] != json!(id) { continue; }
91 if let Some(err) = resp.get("error") {
92 return Err(format!("mcp.rpc {method}: {err}"));
93 }
94 return Ok(resp.get("result").cloned().unwrap_or(Value::Null));
95 }
96 }
97
98 fn initialize(&mut self) -> Result<(), String> {
99 // Minimal client capabilities — Phase 1 only needs
100 // synchronous tool calls. `protocolVersion` matches the
101 // version `lex-api`'s server reports.
102 self.rpc("initialize", json!({
103 "protocolVersion": "2024-11-05",
104 "capabilities": {},
105 "clientInfo": { "name": "lex-runtime", "version": env!("CARGO_PKG_VERSION") },
106 }))?;
107 Ok(())
108 }
109
110 /// Send `tools/call` for the named tool with the supplied
111 /// JSON arguments. Returns the server's `result` field as
112 /// JSON; tool-side errors come back as `Err`.
113 pub fn call_tool(&mut self, name: &str, args: Value) -> Result<Value, String> {
114 self.rpc("tools/call", json!({
115 "name": name,
116 "arguments": args,
117 }))
118 }
119}
120
121impl Drop for McpClient {
122 fn drop(&mut self) {
123 // Best-effort reap: if the server hasn't already exited
124 // on stdin EOF, kill it. Don't propagate errors — Drop
125 // can't fail meaningfully and the process either exits
126 // or gets reaped by the OS.
127 let _ = self.child.kill();
128 // Avoid zombies: wait briefly. If the server is misbehaving
129 // we don't want Drop to block forever.
130 let _ = self.child.wait();
131 let _ = Duration::from_millis(0);
132 }
133}
134
135// ---- LRU connection cache (#197) --------------------------------
136
137/// Bounded cache of `McpClient` instances keyed by the
138/// command-line string. Keeps a Vec of `(key, client)` pairs
139/// in usage order — most-recently-used at the back. On cache
140/// miss past `cap`, the front (oldest) entry is dropped.
141///
142/// Why a Vec rather than a `HashMap` + linked list: cap is
143/// small (16 by default) so linear scan is cheaper than the
144/// pointer chase, and a Vec lets us own the Clients directly
145/// instead of through `RefCell`/`Arc`.
146///
147/// Subprocess death is detected lazily: when `call_tool` fails
148/// the offending entry is dropped. Next call to the same
149/// server respawns. A handler that sits idle long enough for
150/// upstream MCP servers to be killed by ops will see one
151/// `Err` per server before recovering.
152pub struct McpClientCache {
153 entries: Vec<(String, McpClient)>,
154 cap: usize,
155}
156
157impl McpClientCache {
158 pub fn with_capacity(cap: usize) -> Self {
159 Self { entries: Vec::with_capacity(cap), cap }
160 }
161
162 /// Send a `tools/call` to the named server, spawning the
163 /// subprocess on cache miss and reusing it on hit. Returns
164 /// the server's `result` JSON or an error message; on error,
165 /// the offending client is dropped so the next call respawns.
166 pub fn call(
167 &mut self,
168 server: &str,
169 tool: &str,
170 args: serde_json::Value,
171 ) -> Result<serde_json::Value, String> {
172 // Hit: move the entry to the back (mark MRU) and call.
173 if let Some(idx) = self.entries.iter().position(|(k, _)| k == server) {
174 let (key, mut client) = self.entries.remove(idx);
175 match client.call_tool(tool, args) {
176 Ok(v) => {
177 self.entries.push((key, client));
178 Ok(v)
179 }
180 Err(e) => {
181 // Dropping `client` reaps the subprocess.
182 Err(e)
183 }
184 }
185 } else {
186 // Miss: spawn, evict if at capacity, push.
187 let mut client = McpClient::spawn(server)?;
188 let result = client.call_tool(tool, args);
189 if result.is_ok() {
190 if self.entries.len() >= self.cap && !self.entries.is_empty() {
191 self.entries.remove(0);
192 }
193 self.entries.push((server.to_string(), client));
194 }
195 result
196 }
197 }
198
199 /// Number of cached subprocesses. Useful for tests and
200 /// observability; not on the hot path.
201 pub fn len(&self) -> usize { self.entries.len() }
202
203 pub fn is_empty(&self) -> bool { self.entries.is_empty() }
204}
205
206impl Default for McpClientCache {
207 fn default() -> Self { Self::with_capacity(16) }
208}
209
210#[cfg(test)]
211mod cache_tests {
212 use super::*;
213
214 /// Smoke test that the cache structure works without
215 /// actually spawning subprocesses (which would require a
216 /// real MCP server). Tests against the real `lex serve --mcp`
217 /// fixture live in `tests/std_agent_mcp_client.rs`.
218 #[test]
219 fn empty_cache_starts_at_zero_with_configured_cap() {
220 let c = McpClientCache::with_capacity(4);
221 assert_eq!(c.len(), 0);
222 assert!(c.is_empty());
223 assert_eq!(c.cap, 4);
224 }
225}