Skip to main content

tower_mcp/client/
stdio.rs

1//! Stdio client transport for subprocess MCP servers.
2//!
3//! Provides [`StdioClientTransport`] which spawns a child process and
4//! communicates using line-delimited JSON over stdin/stdout.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! use tower_mcp::client::{McpClient, StdioClientTransport};
10//!
11//! # async fn example() -> Result<(), tower_mcp::BoxError> {
12//! let transport = StdioClientTransport::spawn("my-mcp-server", &["--flag"]).await?;
13//! let client = McpClient::connect(transport).await?;
14//! # Ok(())
15//! # }
16//! ```
17
18use std::process::Stdio;
19
20use async_trait::async_trait;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::process::{Child, Command};
23
24use super::transport::ClientTransport;
25use crate::error::{Error, Result};
26
27/// Client transport that communicates with a subprocess via stdio.
28///
29/// Spawns a child process and communicates using line-delimited JSON-RPC
30/// messages over stdin (write) and stdout (read). Stderr is inherited so
31/// server debug output appears in the client's terminal.
32pub struct StdioClientTransport {
33    child: Option<Child>,
34    stdin: Option<tokio::process::ChildStdin>,
35    stdout: BufReader<tokio::process::ChildStdout>,
36}
37
38impl StdioClientTransport {
39    /// Spawn a new subprocess and connect to it.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the process fails to spawn or if stdin/stdout
44    /// handles cannot be acquired.
45    pub async fn spawn(program: &str, args: &[&str]) -> Result<Self> {
46        let mut cmd = Command::new(program);
47        cmd.args(args);
48        Self::spawn_command(&mut cmd).await
49    }
50
51    /// Spawn from a pre-configured [`Command`].
52    ///
53    /// This allows setting environment variables, working directory, and
54    /// other process configuration before spawning.
55    ///
56    /// Stdin and stdout are automatically set to piped. Stderr is set to
57    /// inherited unless already configured.
58    ///
59    /// # Example
60    ///
61    /// ```rust,no_run
62    /// use tokio::process::Command;
63    /// use tower_mcp::client::StdioClientTransport;
64    ///
65    /// # async fn example() -> Result<(), tower_mcp::BoxError> {
66    /// let mut cmd = Command::new("npx");
67    /// cmd.args(["-y", "@modelcontextprotocol/server-github"])
68    ///    .env("GITHUB_TOKEN", "ghp_...");
69    /// let transport = StdioClientTransport::spawn_command(&mut cmd).await?;
70    /// # Ok(())
71    /// # }
72    /// ```
73    pub async fn spawn_command(cmd: &mut Command) -> Result<Self> {
74        cmd.stdin(Stdio::piped())
75            .stdout(Stdio::piped())
76            .stderr(Stdio::inherit());
77
78        let mut child = cmd
79            .spawn()
80            .map_err(|e| Error::Transport(format!("Failed to spawn process: {}", e)))?;
81
82        let stdin = child
83            .stdin
84            .take()
85            .ok_or_else(|| Error::Transport("Failed to get child stdin".to_string()))?;
86        let stdout = child
87            .stdout
88            .take()
89            .ok_or_else(|| Error::Transport("Failed to get child stdout".to_string()))?;
90
91        tracing::info!("Spawned MCP server process");
92
93        Ok(Self {
94            child: Some(child),
95            stdin: Some(stdin),
96            stdout: BufReader::new(stdout),
97        })
98    }
99
100    /// Create from an existing child process.
101    ///
102    /// The child must have piped stdin and stdout.
103    pub fn from_child(mut child: Child) -> Result<Self> {
104        let stdin = child
105            .stdin
106            .take()
107            .ok_or_else(|| Error::Transport("Failed to get child stdin".to_string()))?;
108        let stdout = child
109            .stdout
110            .take()
111            .ok_or_else(|| Error::Transport("Failed to get child stdout".to_string()))?;
112
113        Ok(Self {
114            child: Some(child),
115            stdin: Some(stdin),
116            stdout: BufReader::new(stdout),
117        })
118    }
119}
120
121#[async_trait]
122impl ClientTransport for StdioClientTransport {
123    async fn send(&mut self, message: &str) -> Result<()> {
124        let stdin = self
125            .stdin
126            .as_mut()
127            .ok_or_else(|| Error::Transport("Transport closed".to_string()))?;
128
129        stdin
130            .write_all(message.as_bytes())
131            .await
132            .map_err(|e| Error::Transport(format!("Failed to write: {}", e)))?;
133        stdin
134            .write_all(b"\n")
135            .await
136            .map_err(|e| Error::Transport(format!("Failed to write newline: {}", e)))?;
137        stdin
138            .flush()
139            .await
140            .map_err(|e| Error::Transport(format!("Failed to flush: {}", e)))?;
141        Ok(())
142    }
143
144    async fn recv(&mut self) -> Result<Option<String>> {
145        let mut line = String::new();
146        let bytes = self
147            .stdout
148            .read_line(&mut line)
149            .await
150            .map_err(|e| Error::Transport(format!("Failed to read: {}", e)))?;
151
152        if bytes == 0 {
153            return Ok(None); // EOF
154        }
155
156        Ok(Some(line.trim().to_string()))
157    }
158
159    fn is_connected(&self) -> bool {
160        self.child.is_some() && self.stdin.is_some()
161    }
162
163    async fn close(&mut self) -> Result<()> {
164        // Drop stdin to signal EOF to the child process
165        self.stdin.take();
166
167        if let Some(mut child) = self.child.take() {
168            let result =
169                tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
170
171            match result {
172                Ok(Ok(status)) => {
173                    tracing::info!(status = ?status, "Child process exited");
174                }
175                Ok(Err(e)) => {
176                    tracing::error!(error = %e, "Error waiting for child");
177                }
178                Err(_) => {
179                    tracing::warn!("Timeout waiting for child, killing");
180                    let _ = child.kill().await;
181                }
182            }
183        }
184
185        Ok(())
186    }
187}