phi_core/mcp/transport.rs
1//! MCP transport implementations: stdio and HTTP+SSE.
2/*
3ARCHITECTURE: transport.rs — how messages travel between client and MCP server
4
5The `McpTransport` trait abstracts the communication channel. Two implementations:
6
7`StdioTransport` — subprocess communication via stdin/stdout
8 - Spawns the MCP server as a child process
9 - Sends JSON-RPC requests as newline-delimited JSON to the child's stdin
10 - Reads JSON-RPC responses from the child's stdout (one line = one response)
11 - Used for local servers: filesystem, git, shell, custom scripts
12
13`HttpTransport` — HTTP POST for remote MCP servers
14 - Sends requests as HTTP POST with JSON body
15 - Used for remote or cloud-hosted MCP servers
16
17Why a trait?
18 The `McpClient` is generic over transport — tests can use a mock transport,
19 production uses stdio or HTTP. Same pattern as StreamProvider for LLMs.
20
21RUST QUIRK: `Arc<Mutex<tokio::process::ChildStdin>>` — async-safe shared mutable I/O
22
23`ChildStdin` is an async write handle to the child's stdin.
24It's not `Copy` or `Clone` — it's an exclusive resource.
25
26Why `Arc<Mutex<...>>`?
27 `McpTransport::send(&self, ...)` takes `&self` (shared reference).
28 But we need to WRITE to stdin (mutate it). This requires interior mutability.
29 `tokio::sync::Mutex` (async-aware mutex) guards `ChildStdin`:
30 - Multiple concurrent `send()` calls wait for the lock (serialized)
31 - No blocking — `.lock().await` yields to the tokio runtime while waiting
32
33`Arc` wraps the mutex so `StdioTransport` can implement `Clone` cheaply (just bump
34reference count), and so the struct can be shared across tasks.
35
36RUST QUIRK: `BufReader<ChildStdout>` — buffered async reading
37 `tokio::io::BufReader` wraps `ChildStdout` (raw byte stream) with line-buffering.
38 `.read_line(&mut String)` reads until `\n` — used to receive one JSON-RPC response.
39 Without buffering, we'd have to implement line-splitting manually.
40 Python analogy: wrapping a socket with io.BufferedReader or using readline().
41*/
42
43use super::types::*;
44use async_trait::async_trait;
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
49use tokio::process::{Child, Command};
50use tokio::sync::Mutex;
51
52/// Default per-request timeout for MCP transports.
53///
54/// A stuck MCP subprocess or unresponsive HTTP server would otherwise block the
55/// agent loop indefinitely (the transport mutex serialises all requests). 30 s
56/// is conservative for normal tool-call latency and aggressive enough to fail
57/// fast on hangs. Override via the `with_timeout` builder.
58pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
59
60/// Transport trait for MCP communication.
61/*
62ARCHITECTURE: McpTransport — pluggable communication channel
63
64Any struct that implements `McpTransport` can be used as the communication
65channel for `McpClient`. The trait has two methods:
66 `send(request) → response` — request/response round-trip
67 `close()` — clean shutdown (kill process, close connections)
68
69`Send + Sync` bounds are required because `McpClient` may be used from multiple
70async tasks (e.g., when the agent executes tool calls in parallel).
71*/
72#[async_trait]
73pub trait McpTransport: Send + Sync {
74 /// Send a JSON-RPC request and receive the response.
75 async fn send(&self, request: JsonRpcRequest) -> Result<JsonRpcResponse, McpError>;
76 /// Close the transport (kill child process, close HTTP connections, etc.).
77 async fn close(&self) -> Result<(), McpError>;
78}
79
80// ---------------------------------------------------------------------------
81// Stdio Transport
82// ---------------------------------------------------------------------------
83
84/// Communicates with an MCP server via stdin/stdout of a child process.
85/// One JSON-RPC message per line (newline-delimited JSON, i.e. NDJSON protocol).
86pub struct StdioTransport {
87 stdin: Arc<Mutex<tokio::process::ChildStdin>>, // write requests here
88 stdout: Arc<Mutex<BufReader<tokio::process::ChildStdout>>>, // read responses here
89 child: Arc<Mutex<Child>>, // keep handle to kill on close()
90 request_timeout: Duration, // per-request timeout for send()
91}
92
93impl StdioTransport {
94 /// Spawn a child process and create a stdio transport.
95 pub async fn new(
96 command: &str, // EXECUTABLE — binary to spawn as the MCP server subprocess
97 args: &[&str], // ARGV — command-line arguments passed to the subprocess
98 env: Option<HashMap<String, String>>, // ENV OVERRIDES — extra env vars injected into the child; None = inherit parent env
99 ) -> Result<Self, McpError> {
100 let mut cmd = Command::new(command);
101 cmd.args(args)
102 .stdin(std::process::Stdio::piped())
103 .stdout(std::process::Stdio::piped())
104 .stderr(std::process::Stdio::piped());
105
106 if let Some(env_vars) = env {
107 for (k, v) in env_vars {
108 cmd.env(k, v);
109 }
110 }
111
112 let mut child = cmd
113 .spawn()
114 .map_err(|e| McpError::Transport(format!("Failed to spawn '{}': {}", command, e)))?;
115
116 /*
117 RUST QUIRK: `child.stdin.take()` — transferring ownership of I/O handles
118
119 `Child.stdin` is `Option<ChildStdin>`. After `spawn()`, it holds `Some(stdin)`.
120 `.take()` moves the `ChildStdin` OUT of the `Option`, leaving `None` behind.
121 We must `.take()` because we can't hold a `&mut` to it while also keeping `child`.
122 Rust's borrow checker prevents two mutable references to overlapping data.
123
124 `.ok_or_else(|| McpError::Transport("...".into()))` converts `Option<T>` → `Result<T, McpError>`:
125 `Some(stdin)` → `Ok(stdin)`
126 `None` → `Err(McpError::Transport("Failed to capture stdin"))`
127 The `?` propagates the error out if `None`.
128 Python analogy: stdin = child.stdin or raise McpError("Failed to capture stdin")
129 */
130 let stdin = child
131 .stdin
132 .take()
133 .ok_or_else(|| McpError::Transport("Failed to capture stdin".into()))?;
134 let stdout = child
135 .stdout
136 .take()
137 .ok_or_else(|| McpError::Transport("Failed to capture stdout".into()))?;
138
139 Ok(Self {
140 stdin: Arc::new(Mutex::new(stdin)),
141 stdout: Arc::new(Mutex::new(BufReader::new(stdout))), // wrap for line-buffered reads
142 child: Arc::new(Mutex::new(child)),
143 request_timeout: DEFAULT_REQUEST_TIMEOUT,
144 })
145 }
146
147 /// Override the per-request timeout. Default is `DEFAULT_REQUEST_TIMEOUT` (30 s).
148 ///
149 /// Applies to each `send()` call independently — write + read + parse share the
150 /// same budget. On timeout, `send()` returns `McpError::Timeout` and the next
151 /// caller gets a fresh budget.
152 pub fn with_timeout(mut self, request_timeout: Duration) -> Self {
153 self.request_timeout = request_timeout;
154 self
155 }
156}
157
158#[async_trait]
159impl McpTransport for StdioTransport {
160 async fn send(
161 &self,
162 request: JsonRpcRequest, // OUTGOING — serialized to newline-terminated JSON, written to the child's stdin
163 ) -> Result<JsonRpcResponse, McpError> {
164 let timeout = self.request_timeout;
165 let work = async {
166 let mut line = serde_json::to_string(&request)?;
167 line.push('\n');
168
169 // Write request
170 {
171 let mut stdin = self.stdin.lock().await;
172 stdin
173 .write_all(line.as_bytes())
174 .await
175 .map_err(|e| McpError::Transport(format!("Write error: {}", e)))?;
176 stdin
177 .flush()
178 .await
179 .map_err(|e| McpError::Transport(format!("Flush error: {}", e)))?;
180 }
181
182 // Read response
183 let mut response_line = String::new();
184 {
185 let mut stdout = self.stdout.lock().await;
186 let bytes_read = stdout
187 .read_line(&mut response_line)
188 .await
189 .map_err(|e| McpError::Transport(format!("Read error: {}", e)))?;
190 if bytes_read == 0 {
191 return Err(McpError::ConnectionClosed);
192 }
193 }
194
195 let response: JsonRpcResponse = serde_json::from_str(response_line.trim())?;
196 Ok::<_, McpError>(response)
197 };
198
199 match tokio::time::timeout(timeout, work).await {
200 Ok(result) => result,
201 Err(_) => Err(McpError::Timeout { duration: timeout }),
202 }
203 }
204
205 async fn close(&self) -> Result<(), McpError> {
206 // Drop stdin to signal EOF, then kill the child
207 let mut child = self.child.lock().await;
208 let _ = child.kill().await;
209 Ok(())
210 }
211}
212
213// ---------------------------------------------------------------------------
214// HTTP Transport
215// ---------------------------------------------------------------------------
216
217/// Communicates with an MCP server via HTTP POST (JSON-RPC over HTTP).
218pub struct HttpTransport {
219 client: reqwest::Client,
220 base_url: String,
221 request_timeout: Duration,
222}
223
224impl HttpTransport {
225 /// Create a new HTTP transport with the default request timeout.
226 pub fn new(url: &str) -> Result<Self, McpError> {
227 Self::new_with_timeout(url, DEFAULT_REQUEST_TIMEOUT)
228 }
229
230 /// Create a new HTTP transport with a custom request timeout.
231 ///
232 /// The timeout is enforced by the outer `tokio::time::timeout` in `send()`,
233 /// which hard-cancels the inner future (connect + read + JSON parse) on expiry.
234 /// We deliberately do not set `reqwest::Client::builder().timeout(d)` to avoid
235 /// a race between reqwest's internal timer and the outer tokio timer — having
236 /// both fire at the same duration produces a non-deterministic error
237 /// classification.
238 pub fn new_with_timeout(url: &str, request_timeout: Duration) -> Result<Self, McpError> {
239 Ok(Self {
240 client: reqwest::Client::new(),
241 base_url: url.trim_end_matches('/').to_string(),
242 request_timeout,
243 })
244 }
245}
246
247#[async_trait]
248impl McpTransport for HttpTransport {
249 async fn send(
250 &self,
251 request: JsonRpcRequest, // OUTGOING — sent as HTTP POST body to base_url; response parsed from JSON reply
252 ) -> Result<JsonRpcResponse, McpError> {
253 let timeout = self.request_timeout;
254 let work = async {
255 let resp = self
256 .client
257 .post(&self.base_url)
258 .json(&request)
259 .send()
260 .await
261 .map_err(|e| McpError::Transport(format!("HTTP error: {}", e)))?;
262
263 if !resp.status().is_success() {
264 return Err(McpError::Transport(format!(
265 "HTTP {} from server",
266 resp.status()
267 )));
268 }
269
270 let response: JsonRpcResponse = resp
271 .json()
272 .await
273 .map_err(|e| McpError::Transport(format!("Response parse error: {}", e)))?;
274 Ok::<_, McpError>(response)
275 };
276
277 match tokio::time::timeout(timeout, work).await {
278 Ok(result) => result,
279 Err(_) => Err(McpError::Timeout { duration: timeout }),
280 }
281 }
282
283 async fn close(&self) -> Result<(), McpError> {
284 // HTTP is stateless; nothing to close.
285 Ok(())
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[tokio::test]
294 async fn test_stdio_transport_with_cat() {
295 // Use `cat` as a simple echo server — it reflects stdin to stdout.
296 let transport = StdioTransport::new("cat", &[], None).await.unwrap();
297
298 let request = JsonRpcRequest::new("test/echo", Some(serde_json::json!({"hello": "world"})));
299 let request_id = request.id;
300
301 // Write the request; cat will echo it back as-is.
302 // Since cat echoes JSON-RPC requests, the "response" will actually be the request.
303 // This tests the transport layer I/O, not protocol correctness.
304 let mut line = serde_json::to_string(&request).unwrap();
305 line.push('\n');
306
307 {
308 let mut stdin = transport.stdin.lock().await;
309 stdin.write_all(line.as_bytes()).await.unwrap();
310 stdin.flush().await.unwrap();
311 }
312
313 let mut response_line = String::new();
314 {
315 let mut stdout = transport.stdout.lock().await;
316 stdout.read_line(&mut response_line).await.unwrap();
317 }
318
319 // Cat echoes the request, so we can parse it as a request
320 let echoed: JsonRpcRequest = serde_json::from_str(response_line.trim()).unwrap();
321 assert_eq!(echoed.id, request_id);
322 assert_eq!(echoed.method, "test/echo");
323
324 transport.close().await.unwrap();
325 }
326
327 #[test]
328 fn test_http_transport_creation() {
329 let transport = HttpTransport::new("http://localhost:8080/mcp").unwrap();
330 assert_eq!(transport.base_url, "http://localhost:8080/mcp");
331
332 // Trailing slash stripped
333 let transport = HttpTransport::new("http://localhost:8080/mcp/").unwrap();
334 assert_eq!(transport.base_url, "http://localhost:8080/mcp");
335 }
336
337 #[tokio::test]
338 async fn stdio_send_times_out_on_silent_child() {
339 // `sleep 60` accepts no input and never writes — perfect "stuck child" stand-in.
340 let transport = StdioTransport::new("sleep", &["60"], None)
341 .await
342 .unwrap()
343 .with_timeout(Duration::from_millis(150));
344
345 let request = JsonRpcRequest::new("test/timeout", None);
346 let start = std::time::Instant::now();
347 let result = transport.send(request).await;
348 let elapsed = start.elapsed();
349
350 match result {
351 Err(McpError::Timeout { duration }) => {
352 assert_eq!(duration, Duration::from_millis(150));
353 }
354 other => panic!("expected McpError::Timeout, got {:?}", other),
355 }
356 // Wall-clock must reflect the timeout, not block on the 60s sleep.
357 assert!(
358 elapsed < Duration::from_secs(2),
359 "send() should have returned promptly after timeout, took {:?}",
360 elapsed
361 );
362 transport.close().await.unwrap();
363 }
364
365 #[tokio::test]
366 async fn stdio_send_succeeds_within_timeout() {
367 // A tiny bash echo-server: read one line then write a valid JSON-RPC response
368 // for each request. Loop forever so the transport can issue multiple sends.
369 let script = r#"while IFS= read -r _line; do printf '{"jsonrpc":"2.0","id":1,"result":{"ok":true}}\n'; done"#;
370 let transport = StdioTransport::new("bash", &["-c", script], None)
371 .await
372 .unwrap()
373 .with_timeout(Duration::from_secs(5));
374
375 let request = JsonRpcRequest::new("test/ok", None);
376 let response = transport.send(request).await.expect("send should succeed");
377 assert!(response.result.is_some());
378 assert!(response.error.is_none());
379 transport.close().await.unwrap();
380 }
381
382 #[tokio::test]
383 async fn http_send_times_out_on_silent_server() {
384 // Bind a listener that accepts connections but never writes — reqwest will hang
385 // on the response read, and our outer tokio::time::timeout must fire.
386 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
387 let addr = listener.local_addr().unwrap();
388 // Background task: accept connections forever, hold them open without replying.
389 tokio::spawn(async move {
390 loop {
391 if let Ok((stream, _)) = listener.accept().await {
392 // Keep the connection open; never write.
393 tokio::spawn(async move {
394 let _stream = stream;
395 tokio::time::sleep(Duration::from_secs(60)).await;
396 });
397 }
398 }
399 });
400
401 let url = format!("http://{}/", addr);
402 let transport = HttpTransport::new_with_timeout(&url, Duration::from_millis(200)).unwrap();
403 let request = JsonRpcRequest::new("test/timeout", None);
404 let start = std::time::Instant::now();
405 let result = transport.send(request).await;
406 let elapsed = start.elapsed();
407
408 match result {
409 Err(McpError::Timeout { duration }) => {
410 assert_eq!(duration, Duration::from_millis(200));
411 }
412 other => panic!("expected McpError::Timeout, got {:?}", other),
413 }
414 assert!(
415 elapsed < Duration::from_secs(2),
416 "send() should have returned promptly after timeout, took {:?}",
417 elapsed
418 );
419 }
420
421 #[test]
422 fn stdio_default_timeout_is_30s() {
423 // Verify the documented default constant.
424 assert_eq!(DEFAULT_REQUEST_TIMEOUT, Duration::from_secs(30));
425 }
426}