Skip to main content

phi_core/mcp/
transport.rs

1//! MCP transport implementations: stdio and HTTP+SSE.
2/*
3ARCHITECTURE: transport.rs — how messages travel between client and MCP server
4
5The `McpTransport` trait abstracts the communication channel. Two implementations:
6
7`StdioTransport` — subprocess communication via stdin/stdout
8  - Spawns the MCP server as a child process
9  - Sends JSON-RPC requests as newline-delimited JSON to the child's stdin
10  - Reads JSON-RPC responses from the child's stdout (one line = one response)
11  - Used for local servers: filesystem, git, shell, custom scripts
12
13`HttpTransport` — HTTP POST for remote MCP servers
14  - Sends requests as HTTP POST with JSON body
15  - Used for remote or cloud-hosted MCP servers
16
17Why a trait?
18  The `McpClient` is generic over transport — tests can use a mock transport,
19  production uses stdio or HTTP. Same pattern as StreamProvider for LLMs.
20
21RUST QUIRK: `Arc<Mutex<tokio::process::ChildStdin>>` — async-safe shared mutable I/O
22
23`ChildStdin` is an async write handle to the child's stdin.
24It's not `Copy` or `Clone` — it's an exclusive resource.
25
26Why `Arc<Mutex<...>>`?
27  `McpTransport::send(&self, ...)` takes `&self` (shared reference).
28  But we need to WRITE to stdin (mutate it). This requires interior mutability.
29  `tokio::sync::Mutex` (async-aware mutex) guards `ChildStdin`:
30    - Multiple concurrent `send()` calls wait for the lock (serialized)
31    - No blocking — `.lock().await` yields to the tokio runtime while waiting
32
33`Arc` wraps the mutex so `StdioTransport` can implement `Clone` cheaply (just bump
34reference count), and so the struct can be shared across tasks.
35
36RUST QUIRK: `BufReader<ChildStdout>` — buffered async reading
37  `tokio::io::BufReader` wraps `ChildStdout` (raw byte stream) with line-buffering.
38  `.read_line(&mut String)` reads until `\n` — used to receive one JSON-RPC response.
39  Without buffering, we'd have to implement line-splitting manually.
40  Python analogy: wrapping a socket with io.BufferedReader or using readline().
41*/
42
43use super::types::*;
44use async_trait::async_trait;
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
49use tokio::process::{Child, Command};
50use tokio::sync::Mutex;
51
52/// Default per-request timeout for MCP transports.
53///
54/// A stuck MCP subprocess or unresponsive HTTP server would otherwise block the
55/// agent loop indefinitely (the transport mutex serialises all requests). 30 s
56/// is conservative for normal tool-call latency and aggressive enough to fail
57/// fast on hangs. Override via the `with_timeout` builder.
58pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
59
60/// Transport trait for MCP communication.
61/*
62ARCHITECTURE: McpTransport — pluggable communication channel
63
64Any struct that implements `McpTransport` can be used as the communication
65channel for `McpClient`. The trait has two methods:
66  `send(request) → response` — request/response round-trip
67  `close()` — clean shutdown (kill process, close connections)
68
69`Send + Sync` bounds are required because `McpClient` may be used from multiple
70async tasks (e.g., when the agent executes tool calls in parallel).
71*/
72#[async_trait]
73pub trait McpTransport: Send + Sync {
74    /// Send a JSON-RPC request and receive the response.
75    async fn send(&self, request: JsonRpcRequest) -> Result<JsonRpcResponse, McpError>;
76    /// Close the transport (kill child process, close HTTP connections, etc.).
77    async fn close(&self) -> Result<(), McpError>;
78}
79
80// ---------------------------------------------------------------------------
81// Stdio Transport
82// ---------------------------------------------------------------------------
83
84/// Communicates with an MCP server via stdin/stdout of a child process.
85/// One JSON-RPC message per line (newline-delimited JSON, i.e. NDJSON protocol).
86pub struct StdioTransport {
87    stdin: Arc<Mutex<tokio::process::ChildStdin>>, // write requests here
88    stdout: Arc<Mutex<BufReader<tokio::process::ChildStdout>>>, // read responses here
89    child: Arc<Mutex<Child>>,                      // keep handle to kill on close()
90    request_timeout: Duration,                     // per-request timeout for send()
91}
92
93impl StdioTransport {
94    /// Spawn a child process and create a stdio transport.
95    pub async fn new(
96        command: &str, // EXECUTABLE — binary to spawn as the MCP server subprocess
97        args: &[&str], // ARGV — command-line arguments passed to the subprocess
98        env: Option<HashMap<String, String>>, // ENV OVERRIDES — extra env vars injected into the child; None = inherit parent env
99    ) -> Result<Self, McpError> {
100        let mut cmd = Command::new(command);
101        cmd.args(args)
102            .stdin(std::process::Stdio::piped())
103            .stdout(std::process::Stdio::piped())
104            .stderr(std::process::Stdio::piped());
105
106        if let Some(env_vars) = env {
107            for (k, v) in env_vars {
108                cmd.env(k, v);
109            }
110        }
111
112        let mut child = cmd
113            .spawn()
114            .map_err(|e| McpError::Transport(format!("Failed to spawn '{}': {}", command, e)))?;
115
116        /*
117        RUST QUIRK: `child.stdin.take()` — transferring ownership of I/O handles
118
119        `Child.stdin` is `Option<ChildStdin>`. After `spawn()`, it holds `Some(stdin)`.
120        `.take()` moves the `ChildStdin` OUT of the `Option`, leaving `None` behind.
121        We must `.take()` because we can't hold a `&mut` to it while also keeping `child`.
122        Rust's borrow checker prevents two mutable references to overlapping data.
123
124        `.ok_or_else(|| McpError::Transport("...".into()))` converts `Option<T>` → `Result<T, McpError>`:
125          `Some(stdin)` → `Ok(stdin)`
126          `None`        → `Err(McpError::Transport("Failed to capture stdin"))`
127        The `?` propagates the error out if `None`.
128        Python analogy: stdin = child.stdin or raise McpError("Failed to capture stdin")
129        */
130        let stdin = child
131            .stdin
132            .take()
133            .ok_or_else(|| McpError::Transport("Failed to capture stdin".into()))?;
134        let stdout = child
135            .stdout
136            .take()
137            .ok_or_else(|| McpError::Transport("Failed to capture stdout".into()))?;
138
139        Ok(Self {
140            stdin: Arc::new(Mutex::new(stdin)),
141            stdout: Arc::new(Mutex::new(BufReader::new(stdout))), // wrap for line-buffered reads
142            child: Arc::new(Mutex::new(child)),
143            request_timeout: DEFAULT_REQUEST_TIMEOUT,
144        })
145    }
146
147    /// Override the per-request timeout. Default is `DEFAULT_REQUEST_TIMEOUT` (30 s).
148    ///
149    /// Applies to each `send()` call independently — write + read + parse share the
150    /// same budget. On timeout, `send()` returns `McpError::Timeout` and the next
151    /// caller gets a fresh budget.
152    pub fn with_timeout(mut self, request_timeout: Duration) -> Self {
153        self.request_timeout = request_timeout;
154        self
155    }
156}
157
158#[async_trait]
159impl McpTransport for StdioTransport {
160    async fn send(
161        &self,
162        request: JsonRpcRequest, // OUTGOING — serialized to newline-terminated JSON, written to the child's stdin
163    ) -> Result<JsonRpcResponse, McpError> {
164        let timeout = self.request_timeout;
165        let work = async {
166            let mut line = serde_json::to_string(&request)?;
167            line.push('\n');
168
169            // Write request
170            {
171                let mut stdin = self.stdin.lock().await;
172                stdin
173                    .write_all(line.as_bytes())
174                    .await
175                    .map_err(|e| McpError::Transport(format!("Write error: {}", e)))?;
176                stdin
177                    .flush()
178                    .await
179                    .map_err(|e| McpError::Transport(format!("Flush error: {}", e)))?;
180            }
181
182            // Read response
183            let mut response_line = String::new();
184            {
185                let mut stdout = self.stdout.lock().await;
186                let bytes_read = stdout
187                    .read_line(&mut response_line)
188                    .await
189                    .map_err(|e| McpError::Transport(format!("Read error: {}", e)))?;
190                if bytes_read == 0 {
191                    return Err(McpError::ConnectionClosed);
192                }
193            }
194
195            let response: JsonRpcResponse = serde_json::from_str(response_line.trim())?;
196            Ok::<_, McpError>(response)
197        };
198
199        match tokio::time::timeout(timeout, work).await {
200            Ok(result) => result,
201            Err(_) => Err(McpError::Timeout { duration: timeout }),
202        }
203    }
204
205    async fn close(&self) -> Result<(), McpError> {
206        // Drop stdin to signal EOF, then kill the child
207        let mut child = self.child.lock().await;
208        let _ = child.kill().await;
209        Ok(())
210    }
211}
212
213// ---------------------------------------------------------------------------
214// HTTP Transport
215// ---------------------------------------------------------------------------
216
217/// Communicates with an MCP server via HTTP POST (JSON-RPC over HTTP).
218pub struct HttpTransport {
219    client: reqwest::Client,
220    base_url: String,
221    request_timeout: Duration,
222}
223
224impl HttpTransport {
225    /// Create a new HTTP transport with the default request timeout.
226    pub fn new(url: &str) -> Result<Self, McpError> {
227        Self::new_with_timeout(url, DEFAULT_REQUEST_TIMEOUT)
228    }
229
230    /// Create a new HTTP transport with a custom request timeout.
231    ///
232    /// The timeout is enforced by the outer `tokio::time::timeout` in `send()`,
233    /// which hard-cancels the inner future (connect + read + JSON parse) on expiry.
234    /// We deliberately do not set `reqwest::Client::builder().timeout(d)` to avoid
235    /// a race between reqwest's internal timer and the outer tokio timer — having
236    /// both fire at the same duration produces a non-deterministic error
237    /// classification.
238    pub fn new_with_timeout(url: &str, request_timeout: Duration) -> Result<Self, McpError> {
239        Ok(Self {
240            client: reqwest::Client::new(),
241            base_url: url.trim_end_matches('/').to_string(),
242            request_timeout,
243        })
244    }
245}
246
247#[async_trait]
248impl McpTransport for HttpTransport {
249    async fn send(
250        &self,
251        request: JsonRpcRequest, // OUTGOING — sent as HTTP POST body to base_url; response parsed from JSON reply
252    ) -> Result<JsonRpcResponse, McpError> {
253        let timeout = self.request_timeout;
254        let work = async {
255            let resp = self
256                .client
257                .post(&self.base_url)
258                .json(&request)
259                .send()
260                .await
261                .map_err(|e| McpError::Transport(format!("HTTP error: {}", e)))?;
262
263            if !resp.status().is_success() {
264                return Err(McpError::Transport(format!(
265                    "HTTP {} from server",
266                    resp.status()
267                )));
268            }
269
270            let response: JsonRpcResponse = resp
271                .json()
272                .await
273                .map_err(|e| McpError::Transport(format!("Response parse error: {}", e)))?;
274            Ok::<_, McpError>(response)
275        };
276
277        match tokio::time::timeout(timeout, work).await {
278            Ok(result) => result,
279            Err(_) => Err(McpError::Timeout { duration: timeout }),
280        }
281    }
282
283    async fn close(&self) -> Result<(), McpError> {
284        // HTTP is stateless; nothing to close.
285        Ok(())
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[tokio::test]
294    async fn test_stdio_transport_with_cat() {
295        // Use `cat` as a simple echo server — it reflects stdin to stdout.
296        let transport = StdioTransport::new("cat", &[], None).await.unwrap();
297
298        let request = JsonRpcRequest::new("test/echo", Some(serde_json::json!({"hello": "world"})));
299        let request_id = request.id;
300
301        // Write the request; cat will echo it back as-is.
302        // Since cat echoes JSON-RPC requests, the "response" will actually be the request.
303        // This tests the transport layer I/O, not protocol correctness.
304        let mut line = serde_json::to_string(&request).unwrap();
305        line.push('\n');
306
307        {
308            let mut stdin = transport.stdin.lock().await;
309            stdin.write_all(line.as_bytes()).await.unwrap();
310            stdin.flush().await.unwrap();
311        }
312
313        let mut response_line = String::new();
314        {
315            let mut stdout = transport.stdout.lock().await;
316            stdout.read_line(&mut response_line).await.unwrap();
317        }
318
319        // Cat echoes the request, so we can parse it as a request
320        let echoed: JsonRpcRequest = serde_json::from_str(response_line.trim()).unwrap();
321        assert_eq!(echoed.id, request_id);
322        assert_eq!(echoed.method, "test/echo");
323
324        transport.close().await.unwrap();
325    }
326
327    #[test]
328    fn test_http_transport_creation() {
329        let transport = HttpTransport::new("http://localhost:8080/mcp").unwrap();
330        assert_eq!(transport.base_url, "http://localhost:8080/mcp");
331
332        // Trailing slash stripped
333        let transport = HttpTransport::new("http://localhost:8080/mcp/").unwrap();
334        assert_eq!(transport.base_url, "http://localhost:8080/mcp");
335    }
336
337    #[tokio::test]
338    async fn stdio_send_times_out_on_silent_child() {
339        // `sleep 60` accepts no input and never writes — perfect "stuck child" stand-in.
340        let transport = StdioTransport::new("sleep", &["60"], None)
341            .await
342            .unwrap()
343            .with_timeout(Duration::from_millis(150));
344
345        let request = JsonRpcRequest::new("test/timeout", None);
346        let start = std::time::Instant::now();
347        let result = transport.send(request).await;
348        let elapsed = start.elapsed();
349
350        match result {
351            Err(McpError::Timeout { duration }) => {
352                assert_eq!(duration, Duration::from_millis(150));
353            }
354            other => panic!("expected McpError::Timeout, got {:?}", other),
355        }
356        // Wall-clock must reflect the timeout, not block on the 60s sleep.
357        assert!(
358            elapsed < Duration::from_secs(2),
359            "send() should have returned promptly after timeout, took {:?}",
360            elapsed
361        );
362        transport.close().await.unwrap();
363    }
364
365    #[tokio::test]
366    async fn stdio_send_succeeds_within_timeout() {
367        // A tiny bash echo-server: read one line then write a valid JSON-RPC response
368        // for each request. Loop forever so the transport can issue multiple sends.
369        let script = r#"while IFS= read -r _line; do printf '{"jsonrpc":"2.0","id":1,"result":{"ok":true}}\n'; done"#;
370        let transport = StdioTransport::new("bash", &["-c", script], None)
371            .await
372            .unwrap()
373            .with_timeout(Duration::from_secs(5));
374
375        let request = JsonRpcRequest::new("test/ok", None);
376        let response = transport.send(request).await.expect("send should succeed");
377        assert!(response.result.is_some());
378        assert!(response.error.is_none());
379        transport.close().await.unwrap();
380    }
381
382    #[tokio::test]
383    async fn http_send_times_out_on_silent_server() {
384        // Bind a listener that accepts connections but never writes — reqwest will hang
385        // on the response read, and our outer tokio::time::timeout must fire.
386        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
387        let addr = listener.local_addr().unwrap();
388        // Background task: accept connections forever, hold them open without replying.
389        tokio::spawn(async move {
390            loop {
391                if let Ok((stream, _)) = listener.accept().await {
392                    // Keep the connection open; never write.
393                    tokio::spawn(async move {
394                        let _stream = stream;
395                        tokio::time::sleep(Duration::from_secs(60)).await;
396                    });
397                }
398            }
399        });
400
401        let url = format!("http://{}/", addr);
402        let transport = HttpTransport::new_with_timeout(&url, Duration::from_millis(200)).unwrap();
403        let request = JsonRpcRequest::new("test/timeout", None);
404        let start = std::time::Instant::now();
405        let result = transport.send(request).await;
406        let elapsed = start.elapsed();
407
408        match result {
409            Err(McpError::Timeout { duration }) => {
410                assert_eq!(duration, Duration::from_millis(200));
411            }
412            other => panic!("expected McpError::Timeout, got {:?}", other),
413        }
414        assert!(
415            elapsed < Duration::from_secs(2),
416            "send() should have returned promptly after timeout, took {:?}",
417            elapsed
418        );
419    }
420
421    #[test]
422    fn stdio_default_timeout_is_30s() {
423        // Verify the documented default constant.
424        assert_eq!(DEFAULT_REQUEST_TIMEOUT, Duration::from_secs(30));
425    }
426}