Skip to main content

lex_runtime/
mcp_client.rs

1//! Minimal stdio MCP (Model Context Protocol) client for the
2//! `agent.call_mcp` builtin (#185). Spawns the named MCP server
3//! as a subprocess, completes the `initialize` handshake, then
4//! forwards a `tools/call` request and returns the result.
5//!
6//! Scope:
7//!
8//! - One client per call (spawn-per-call). MCP servers are cheap
9//!   to start; per-call spawning keeps the implementation
10//!   stateless. A connection cache is a clear v2 optimization
11//!   once benchmarks show it matters.
12//! - stdio transport only. Future transports (TCP / SSE / HTTP)
13//!   can branch on a URL prefix in `command` later.
14//! - No auth; the issue body explicitly defers credential
15//!   handling to a follow-up. Don't expose this builtin to
16//!   untrusted code paths until that's done.
17
18use serde_json::{json, Value};
19use std::io::{BufRead, BufReader, Write};
20use std::process::{ChildStdin, ChildStdout, Command, Stdio};
21use std::time::Duration;
22
23pub struct McpClient {
24    child: std::process::Child,
25    stdin: ChildStdin,
26    stdout: BufReader<ChildStdout>,
27    next_id: i64,
28}
29
30impl McpClient {
31    /// Spawn the MCP server described by `command_line` (whitespace-
32    /// separated argv), perform the JSON-RPC `initialize` handshake,
33    /// and return a ready client. Caller is expected to drop the
34    /// client when finished — `Drop` reaps the subprocess.
35    pub fn spawn(command_line: &str) -> Result<Self, String> {
36        let parts: Vec<&str> = command_line.split_whitespace().collect();
37        let cmd = parts.first()
38            .ok_or_else(|| "mcp.spawn: empty command".to_string())?;
39        let mut child = Command::new(cmd)
40            .args(&parts[1..])
41            .stdin(Stdio::piped())
42            .stdout(Stdio::piped())
43            .stderr(Stdio::piped())
44            .spawn()
45            .map_err(|e| format!("mcp.spawn `{cmd}`: {e}"))?;
46        let stdin = child.stdin.take()
47            .ok_or_else(|| "mcp.spawn: no stdin".to_string())?;
48        let stdout = BufReader::new(child.stdout.take()
49            .ok_or_else(|| "mcp.spawn: no stdout".to_string())?);
50        let mut client = Self { child, stdin, stdout, next_id: 0 };
51        client.initialize()?;
52        Ok(client)
53    }
54
55    fn next_id(&mut self) -> i64 {
56        self.next_id += 1;
57        self.next_id
58    }
59
60    fn rpc(&mut self, method: &str, params: Value) -> Result<Value, String> {
61        let id = self.next_id();
62        let req = json!({
63            "jsonrpc": "2.0",
64            "id": id,
65            "method": method,
66            "params": params,
67        });
68        let line = serde_json::to_string(&req)
69            .map_err(|e| format!("mcp.rpc: serialize: {e}"))?;
70        self.stdin.write_all(line.as_bytes())
71            .map_err(|e| format!("mcp.rpc: write: {e}"))?;
72        self.stdin.write_all(b"\n")
73            .map_err(|e| format!("mcp.rpc: write: {e}"))?;
74        self.stdin.flush()
75            .map_err(|e| format!("mcp.rpc: flush: {e}"))?;
76        // The server may emit notifications before the response;
77        // skip lines whose id doesn't match.
78        loop {
79            let mut buf = String::new();
80            let n = self.stdout.read_line(&mut buf)
81                .map_err(|e| format!("mcp.rpc: read: {e}"))?;
82            if n == 0 {
83                return Err("mcp.rpc: server closed stdout".into());
84            }
85            let resp: Value = serde_json::from_str(buf.trim())
86                .map_err(|e| format!("mcp.rpc: parse `{}`: {e}",
87                    buf.trim().chars().take(120).collect::<String>()))?;
88            // Skip notifications (no `id` field).
89            if resp.get("id").is_none() { continue; }
90            if resp["id"] != json!(id) { continue; }
91            if let Some(err) = resp.get("error") {
92                return Err(format!("mcp.rpc {method}: {err}"));
93            }
94            return Ok(resp.get("result").cloned().unwrap_or(Value::Null));
95        }
96    }
97
98    fn initialize(&mut self) -> Result<(), String> {
99        // Minimal client capabilities — Phase 1 only needs
100        // synchronous tool calls. `protocolVersion` matches the
101        // version `lex-api`'s server reports.
102        self.rpc("initialize", json!({
103            "protocolVersion": "2024-11-05",
104            "capabilities": {},
105            "clientInfo": { "name": "lex-runtime", "version": env!("CARGO_PKG_VERSION") },
106        }))?;
107        Ok(())
108    }
109
110    /// Send `tools/call` for the named tool with the supplied
111    /// JSON arguments. Returns the server's `result` field as
112    /// JSON; tool-side errors come back as `Err`.
113    pub fn call_tool(&mut self, name: &str, args: Value) -> Result<Value, String> {
114        self.rpc("tools/call", json!({
115            "name": name,
116            "arguments": args,
117        }))
118    }
119}
120
121impl Drop for McpClient {
122    fn drop(&mut self) {
123        // Best-effort reap: if the server hasn't already exited
124        // on stdin EOF, kill it. Don't propagate errors — Drop
125        // can't fail meaningfully and the process either exits
126        // or gets reaped by the OS.
127        let _ = self.child.kill();
128        // Avoid zombies: wait briefly. If the server is misbehaving
129        // we don't want Drop to block forever.
130        let _ = self.child.wait();
131        let _ = Duration::from_millis(0);
132    }
133}