tower_mcp/client/
stdio.rs1use std::process::Stdio;
19
20use async_trait::async_trait;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::process::{Child, Command};
23
24use super::transport::ClientTransport;
25use crate::error::{Error, Result};
26
27pub struct StdioClientTransport {
33 child: Option<Child>,
34 stdin: Option<tokio::process::ChildStdin>,
35 stdout: BufReader<tokio::process::ChildStdout>,
36}
37
38impl StdioClientTransport {
39 pub async fn spawn(program: &str, args: &[&str]) -> Result<Self> {
46 let mut cmd = Command::new(program);
47 cmd.args(args);
48 Self::spawn_command(&mut cmd).await
49 }
50
51 pub async fn spawn_command(cmd: &mut Command) -> Result<Self> {
74 cmd.stdin(Stdio::piped())
75 .stdout(Stdio::piped())
76 .stderr(Stdio::inherit());
77
78 let mut child = cmd
79 .spawn()
80 .map_err(|e| Error::Transport(format!("Failed to spawn process: {}", e)))?;
81
82 let stdin = child
83 .stdin
84 .take()
85 .ok_or_else(|| Error::Transport("Failed to get child stdin".to_string()))?;
86 let stdout = child
87 .stdout
88 .take()
89 .ok_or_else(|| Error::Transport("Failed to get child stdout".to_string()))?;
90
91 tracing::info!("Spawned MCP server process");
92
93 Ok(Self {
94 child: Some(child),
95 stdin: Some(stdin),
96 stdout: BufReader::new(stdout),
97 })
98 }
99
100 pub fn from_child(mut child: Child) -> Result<Self> {
104 let stdin = child
105 .stdin
106 .take()
107 .ok_or_else(|| Error::Transport("Failed to get child stdin".to_string()))?;
108 let stdout = child
109 .stdout
110 .take()
111 .ok_or_else(|| Error::Transport("Failed to get child stdout".to_string()))?;
112
113 Ok(Self {
114 child: Some(child),
115 stdin: Some(stdin),
116 stdout: BufReader::new(stdout),
117 })
118 }
119}
120
121#[async_trait]
122impl ClientTransport for StdioClientTransport {
123 async fn send(&mut self, message: &str) -> Result<()> {
124 let stdin = self
125 .stdin
126 .as_mut()
127 .ok_or_else(|| Error::Transport("Transport closed".to_string()))?;
128
129 stdin
130 .write_all(message.as_bytes())
131 .await
132 .map_err(|e| Error::Transport(format!("Failed to write: {}", e)))?;
133 stdin
134 .write_all(b"\n")
135 .await
136 .map_err(|e| Error::Transport(format!("Failed to write newline: {}", e)))?;
137 stdin
138 .flush()
139 .await
140 .map_err(|e| Error::Transport(format!("Failed to flush: {}", e)))?;
141 Ok(())
142 }
143
144 async fn recv(&mut self) -> Result<Option<String>> {
145 let mut line = String::new();
146 let bytes = self
147 .stdout
148 .read_line(&mut line)
149 .await
150 .map_err(|e| Error::Transport(format!("Failed to read: {}", e)))?;
151
152 if bytes == 0 {
153 return Ok(None); }
155
156 Ok(Some(line.trim().to_string()))
157 }
158
159 fn is_connected(&self) -> bool {
160 self.child.is_some() && self.stdin.is_some()
161 }
162
163 async fn close(&mut self) -> Result<()> {
164 self.stdin.take();
166
167 if let Some(mut child) = self.child.take() {
168 let result =
169 tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
170
171 match result {
172 Ok(Ok(status)) => {
173 tracing::info!(status = ?status, "Child process exited");
174 }
175 Ok(Err(e)) => {
176 tracing::error!(error = %e, "Error waiting for child");
177 }
178 Err(_) => {
179 tracing::warn!("Timeout waiting for child, killing");
180 let _ = child.kill().await;
181 }
182 }
183 }
184
185 Ok(())
186 }
187}