mcp_core/transport/client/
stdio.rs1use crate::protocol::{Protocol, ProtocolBuilder, RequestOptions};
2use crate::transport::{
3 JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Message, RequestId,
4 Transport,
5};
6use crate::types::ErrorCode;
7use anyhow::Result;
8use async_trait::async_trait;
9use std::future::Future;
10use std::io::{BufRead, BufReader, BufWriter, Write};
11use std::pin::Pin;
12use std::process::Command;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15use tokio::time::timeout;
16use tracing::debug;
17
18#[derive(Clone)]
45pub struct ClientStdioTransport {
46 protocol: Protocol,
47 stdin: Arc<Mutex<Option<BufWriter<std::process::ChildStdin>>>>,
48 stdout: Arc<Mutex<Option<BufReader<std::process::ChildStdout>>>>,
49 child: Arc<Mutex<Option<std::process::Child>>>,
50 program: String,
51 args: Vec<String>,
52}
53
54impl ClientStdioTransport {
55 pub fn new(program: &str, args: &[&str]) -> Result<Self> {
66 Ok(ClientStdioTransport {
67 protocol: ProtocolBuilder::new().build(),
68 stdin: Arc::new(Mutex::new(None)),
69 stdout: Arc::new(Mutex::new(None)),
70 child: Arc::new(Mutex::new(None)),
71 program: program.to_string(),
72 args: args.iter().map(|&s| s.to_string()).collect(),
73 })
74 }
75}
76
77#[async_trait()]
78impl Transport for ClientStdioTransport {
79 async fn open(&self) -> Result<()> {
90 debug!("ClientStdioTransport: Opening transport");
91 let mut child = Command::new(&self.program)
92 .args(&self.args)
93 .stdin(std::process::Stdio::piped())
94 .stdout(std::process::Stdio::piped())
95 .spawn()?;
96
97 let stdin = child
98 .stdin
99 .take()
100 .ok_or_else(|| anyhow::anyhow!("Child process stdin not available"))?;
101 let stdout = child
102 .stdout
103 .take()
104 .ok_or_else(|| anyhow::anyhow!("Child process stdout not available"))?;
105
106 {
107 let mut stdin_lock = self.stdin.lock().await;
108 *stdin_lock = Some(BufWriter::new(stdin));
109 }
110 {
111 let mut stdout_lock = self.stdout.lock().await;
112 *stdout_lock = Some(BufReader::new(stdout));
113 }
114 {
115 let mut child_lock = self.child.lock().await;
116 *child_lock = Some(child);
117 }
118
119 let transport_clone = self.clone();
121 tokio::spawn(async move {
122 loop {
123 match transport_clone.poll_message().await {
124 Ok(Some(message)) => match message {
125 Message::Request(request) => {
126 let response = transport_clone.protocol.handle_request(request).await;
127 let _ = transport_clone
128 .send_response(response.id, response.result, response.error)
129 .await;
130 }
131 Message::Notification(notification) => {
132 let _ = transport_clone
133 .protocol
134 .handle_notification(notification)
135 .await;
136 }
137 Message::Response(response) => {
138 transport_clone.protocol.handle_response(response).await;
139 }
140 },
141 Ok(None) => break, Err(e) => {
143 debug!("ClientStdioTransport: Error polling message: {:?}", e);
144 break;
145 }
146 }
147 }
148 });
149 Ok(())
150 }
151
152 async fn close(&self) -> Result<()> {
162 let mut child_lock = self.child.lock().await;
163 if let Some(child) = child_lock.as_mut() {
164 let _ = child.kill();
165 }
166 *child_lock = None;
167
168 *self.stdin.lock().await = None;
170 *self.stdout.lock().await = None;
171
172 Ok(())
173 }
174
175 async fn poll_message(&self) -> Result<Option<Message>> {
184 debug!("ClientStdioTransport: Starting to receive message");
185
186 let mut stdout_guard = self.stdout.lock().await;
188 let mut stdout = stdout_guard
189 .take()
190 .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
191
192 drop(stdout_guard);
194
195 let (line_result, stdout) = tokio::task::spawn_blocking(move || {
197 let mut line = String::new();
198 let result = match stdout.read_line(&mut line) {
199 Ok(0) => Ok(None), Ok(_) => Ok(Some(line)),
201 Err(e) => Err(anyhow::anyhow!("Error reading line: {}", e)),
202 };
203 (result, stdout)
205 })
206 .await?;
207
208 let mut stdout_guard = self.stdout.lock().await;
210 *stdout_guard = Some(stdout);
211
212 match line_result? {
214 Some(line) => {
215 debug!(
216 "ClientStdioTransport: Received from process: {}",
217 line.trim()
218 );
219 let message: Message = serde_json::from_str(&line)?;
220 debug!("ClientStdioTransport: Successfully parsed message");
221 Ok(Some(message))
222 }
223 None => {
224 debug!("ClientStdioTransport: Received EOF from process");
225 Ok(None)
226 }
227 }
228 }
229
230 fn request(
248 &self,
249 method: &str,
250 params: Option<serde_json::Value>,
251 options: RequestOptions,
252 ) -> Pin<Box<dyn Future<Output = Result<JsonRpcResponse>> + Send + Sync>> {
253 let protocol = self.protocol.clone();
254 let stdin_arc = self.stdin.clone();
255 let method = method.to_owned();
256 Box::pin(async move {
257 let (id, rx) = protocol.create_request().await;
258 let request = JsonRpcRequest {
259 id,
260 method,
261 jsonrpc: Default::default(),
262 params,
263 };
264 let serialized = serde_json::to_string(&request)?;
265 debug!("ClientStdioTransport: Sending request: {}", serialized);
266
267 let mut stdin_guard = stdin_arc.lock().await;
269 let mut stdin = stdin_guard
270 .take()
271 .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
272
273 let stdin_result = tokio::task::spawn_blocking(move || {
275 stdin.write_all(serialized.as_bytes())?;
276 stdin.write_all(b"\n")?;
277 stdin.flush()?;
278 Ok::<_, anyhow::Error>(stdin)
279 })
280 .await??;
281
282 *stdin_guard = Some(stdin_result);
284
285 debug!("ClientStdioTransport: Request sent successfully");
286 let result = timeout(options.timeout, rx).await;
287 match result {
288 Ok(inner_result) => match inner_result {
289 Ok(response) => Ok(response),
290 Err(_) => {
291 protocol.cancel_response(id).await;
292 Ok(JsonRpcResponse {
293 id,
294 result: None,
295 error: Some(JsonRpcError {
296 code: ErrorCode::RequestTimeout as i32,
297 message: "Request cancelled".to_string(),
298 data: None,
299 }),
300 ..Default::default()
301 })
302 }
303 },
304 Err(_) => {
305 protocol.cancel_response(id).await;
306 Ok(JsonRpcResponse {
307 id,
308 result: None,
309 error: Some(JsonRpcError {
310 code: ErrorCode::RequestTimeout as i32,
311 message: "Request timed out".to_string(),
312 data: None,
313 }),
314 ..Default::default()
315 })
316 }
317 }
318 })
319 }
320
321 async fn send_response(
333 &self,
334 id: RequestId,
335 result: Option<serde_json::Value>,
336 error: Option<JsonRpcError>,
337 ) -> Result<()> {
338 let response = JsonRpcResponse {
339 id,
340 result,
341 error,
342 jsonrpc: Default::default(),
343 };
344 let serialized = serde_json::to_string(&response)?;
345 debug!("ClientStdioTransport: Sending response: {}", serialized);
346
347 let mut stdin_guard = self.stdin.lock().await;
349 let mut stdin = stdin_guard
350 .take()
351 .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
352
353 let stdin_result = tokio::task::spawn_blocking(move || {
355 stdin.write_all(serialized.as_bytes())?;
356 stdin.write_all(b"\n")?;
357 stdin.flush()?;
358 Ok::<_, anyhow::Error>(stdin)
359 })
360 .await??;
361
362 *stdin_guard = Some(stdin_result);
364
365 Ok(())
366 }
367
368 async fn send_notification(
381 &self,
382 method: &str,
383 params: Option<serde_json::Value>,
384 ) -> Result<()> {
385 let notification = JsonRpcNotification {
386 jsonrpc: Default::default(),
387 method: method.to_owned(),
388 params,
389 };
390 let serialized = serde_json::to_string(¬ification)?;
391 debug!("ClientStdioTransport: Sending notification: {}", serialized);
392
393 let mut stdin_guard = self.stdin.lock().await;
395 let mut stdin = stdin_guard
396 .take()
397 .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
398
399 let stdin_result = tokio::task::spawn_blocking(move || {
401 stdin.write_all(serialized.as_bytes())?;
402 stdin.write_all(b"\n")?;
403 stdin.flush()?;
404 Ok::<_, anyhow::Error>(stdin)
405 })
406 .await??;
407
408 *stdin_guard = Some(stdin_result);
410
411 Ok(())
412 }
413}