mcp_core/transport/client/
stdio.rs

1use crate::protocol::{Protocol, ProtocolBuilder, RequestOptions};
2use crate::transport::{
3    JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Message, RequestId,
4    Transport,
5};
6use crate::types::ErrorCode;
7use anyhow::Result;
8use async_trait::async_trait;
9use std::future::Future;
10use std::io::{BufRead, BufReader, BufWriter, Write};
11use std::pin::Pin;
12use std::process::Command;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15use tokio::time::timeout;
16use tracing::debug;
17
18/// Client transport that communicates with an MCP server over standard I/O.
19///
20/// The `ClientStdioTransport` launches a child process specified by the provided
21/// program and arguments, then communicates with it using the standard input and output
22/// streams. It implements the `Transport` trait to send requests and receive responses
23/// over these streams.
24///
25/// This transport is useful for:
26/// - Running local MCP servers as child processes
27/// - Command-line tools that need to communicate with MCP servers
28/// - Testing and development scenarios
29///
30/// # Example
31///
32/// ```
33/// use mcp_core::transport::{ClientStdioTransport, Transport};
34/// use anyhow::Result;
35///
36/// async fn example() -> Result<()> {
37///     let transport = ClientStdioTransport::new("my-mcp-server", &["--flag"])?;
38///     transport.open().await?;
39///     // Use transport...
40///     transport.close().await?;
41///     Ok(())
42/// }
43/// ```
44#[derive(Clone)]
45pub struct ClientStdioTransport {
46    protocol: Protocol,
47    stdin: Arc<Mutex<Option<BufWriter<std::process::ChildStdin>>>>,
48    stdout: Arc<Mutex<Option<BufReader<std::process::ChildStdout>>>>,
49    child: Arc<Mutex<Option<std::process::Child>>>,
50    program: String,
51    args: Vec<String>,
52}
53
54impl ClientStdioTransport {
55    /// Creates a new `ClientStdioTransport` instance.
56    ///
57    /// # Arguments
58    ///
59    /// * `program` - The path or name of the program to execute
60    /// * `args` - Command-line arguments to pass to the program
61    ///
62    /// # Returns
63    ///
64    /// A `Result` containing the new transport instance if successful
65    pub fn new(program: &str, args: &[&str]) -> Result<Self> {
66        Ok(ClientStdioTransport {
67            protocol: ProtocolBuilder::new().build(),
68            stdin: Arc::new(Mutex::new(None)),
69            stdout: Arc::new(Mutex::new(None)),
70            child: Arc::new(Mutex::new(None)),
71            program: program.to_string(),
72            args: args.iter().map(|&s| s.to_string()).collect(),
73        })
74    }
75}
76
77#[async_trait()]
78impl Transport for ClientStdioTransport {
79    /// Opens the transport by launching the child process and setting up the communication channels.
80    ///
81    /// This method:
82    /// 1. Spawns the child process with the configured program and arguments
83    /// 2. Sets up pipes for stdin and stdout
84    /// 3. Starts a background task for handling incoming messages
85    ///
86    /// # Returns
87    ///
88    /// A `Result` indicating success or failure
89    async fn open(&self) -> Result<()> {
90        debug!("ClientStdioTransport: Opening transport");
91        let mut child = Command::new(&self.program)
92            .args(&self.args)
93            .stdin(std::process::Stdio::piped())
94            .stdout(std::process::Stdio::piped())
95            .spawn()?;
96
97        let stdin = child
98            .stdin
99            .take()
100            .ok_or_else(|| anyhow::anyhow!("Child process stdin not available"))?;
101        let stdout = child
102            .stdout
103            .take()
104            .ok_or_else(|| anyhow::anyhow!("Child process stdout not available"))?;
105
106        {
107            let mut stdin_lock = self.stdin.lock().await;
108            *stdin_lock = Some(BufWriter::new(stdin));
109        }
110        {
111            let mut stdout_lock = self.stdout.lock().await;
112            *stdout_lock = Some(BufReader::new(stdout));
113        }
114        {
115            let mut child_lock = self.child.lock().await;
116            *child_lock = Some(child);
117        }
118
119        // Spawn a background task to continuously poll messages.
120        let transport_clone = self.clone();
121        tokio::spawn(async move {
122            loop {
123                match transport_clone.poll_message().await {
124                    Ok(Some(message)) => match message {
125                        Message::Request(request) => {
126                            let response = transport_clone.protocol.handle_request(request).await;
127                            let _ = transport_clone
128                                .send_response(response.id, response.result, response.error)
129                                .await;
130                        }
131                        Message::Notification(notification) => {
132                            let _ = transport_clone
133                                .protocol
134                                .handle_notification(notification)
135                                .await;
136                        }
137                        Message::Response(response) => {
138                            transport_clone.protocol.handle_response(response).await;
139                        }
140                    },
141                    Ok(None) => break, // EOF encountered.
142                    Err(e) => {
143                        debug!("ClientStdioTransport: Error polling message: {:?}", e);
144                        break;
145                    }
146                }
147            }
148        });
149        Ok(())
150    }
151
152    /// Closes the transport by terminating the child process and cleaning up resources.
153    ///
154    /// This method:
155    /// 1. Kills the child process
156    /// 2. Clears the stdin and stdout handles
157    ///
158    /// # Returns
159    ///
160    /// A `Result` indicating success or failure
161    async fn close(&self) -> Result<()> {
162        let mut child_lock = self.child.lock().await;
163        if let Some(child) = child_lock.as_mut() {
164            let _ = child.kill();
165        }
166        *child_lock = None;
167
168        // Clear stdin and stdout
169        *self.stdin.lock().await = None;
170        *self.stdout.lock().await = None;
171
172        Ok(())
173    }
174
175    /// Polls for incoming messages from the child process's stdout.
176    ///
177    /// This method reads a line from the child process's stdout and parses it
178    /// as a JSON-RPC message.
179    ///
180    /// # Returns
181    ///
182    /// A `Result` containing an `Option<Message>`. `None` indicates EOF.
183    async fn poll_message(&self) -> Result<Option<Message>> {
184        debug!("ClientStdioTransport: Starting to receive message");
185
186        // Take ownership of stdout temporarily
187        let mut stdout_guard = self.stdout.lock().await;
188        let mut stdout = stdout_guard
189            .take()
190            .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
191
192        // Drop the lock before spawning the blocking task
193        drop(stdout_guard);
194
195        // Use a blocking operation in a spawn_blocking task
196        let (line_result, stdout) = tokio::task::spawn_blocking(move || {
197            let mut line = String::new();
198            let result = match stdout.read_line(&mut line) {
199                Ok(0) => Ok(None), // EOF
200                Ok(_) => Ok(Some(line)),
201                Err(e) => Err(anyhow::anyhow!("Error reading line: {}", e)),
202            };
203            // Return both the result and the stdout so we can put it back
204            (result, stdout)
205        })
206        .await?;
207
208        // Put stdout back
209        let mut stdout_guard = self.stdout.lock().await;
210        *stdout_guard = Some(stdout);
211
212        // Process the result
213        match line_result? {
214            Some(line) => {
215                debug!(
216                    "ClientStdioTransport: Received from process: {}",
217                    line.trim()
218                );
219                let message: Message = serde_json::from_str(&line)?;
220                debug!("ClientStdioTransport: Successfully parsed message");
221                Ok(Some(message))
222            }
223            None => {
224                debug!("ClientStdioTransport: Received EOF from process");
225                Ok(None)
226            }
227        }
228    }
229
230    /// Sends a request to the child process and waits for a response.
231    ///
232    /// This method:
233    /// 1. Creates a new request ID
234    /// 2. Constructs a JSON-RPC request
235    /// 3. Sends it to the child process's stdin
236    /// 4. Waits for a response with the same ID
237    ///
238    /// # Arguments
239    ///
240    /// * `method` - The method name for the request
241    /// * `params` - Optional parameters for the request
242    /// * `options` - Request options (like timeout)
243    ///
244    /// # Returns
245    ///
246    /// A `Future` that resolves to a `Result` containing the response
247    fn request(
248        &self,
249        method: &str,
250        params: Option<serde_json::Value>,
251        options: RequestOptions,
252    ) -> Pin<Box<dyn Future<Output = Result<JsonRpcResponse>> + Send + Sync>> {
253        let protocol = self.protocol.clone();
254        let stdin_arc = self.stdin.clone();
255        let method = method.to_owned();
256        Box::pin(async move {
257            let (id, rx) = protocol.create_request().await;
258            let request = JsonRpcRequest {
259                id,
260                method,
261                jsonrpc: Default::default(),
262                params,
263            };
264            let serialized = serde_json::to_string(&request)?;
265            debug!("ClientStdioTransport: Sending request: {}", serialized);
266
267            // Get the stdin writer
268            let mut stdin_guard = stdin_arc.lock().await;
269            let mut stdin = stdin_guard
270                .take()
271                .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
272
273            // Use a blocking operation in a spawn_blocking task
274            let stdin_result = tokio::task::spawn_blocking(move || {
275                stdin.write_all(serialized.as_bytes())?;
276                stdin.write_all(b"\n")?;
277                stdin.flush()?;
278                Ok::<_, anyhow::Error>(stdin)
279            })
280            .await??;
281
282            // Put the writer back
283            *stdin_guard = Some(stdin_result);
284
285            debug!("ClientStdioTransport: Request sent successfully");
286            let result = timeout(options.timeout, rx).await;
287            match result {
288                Ok(inner_result) => match inner_result {
289                    Ok(response) => Ok(response),
290                    Err(_) => {
291                        protocol.cancel_response(id).await;
292                        Ok(JsonRpcResponse {
293                            id,
294                            result: None,
295                            error: Some(JsonRpcError {
296                                code: ErrorCode::RequestTimeout as i32,
297                                message: "Request cancelled".to_string(),
298                                data: None,
299                            }),
300                            ..Default::default()
301                        })
302                    }
303                },
304                Err(_) => {
305                    protocol.cancel_response(id).await;
306                    Ok(JsonRpcResponse {
307                        id,
308                        result: None,
309                        error: Some(JsonRpcError {
310                            code: ErrorCode::RequestTimeout as i32,
311                            message: "Request timed out".to_string(),
312                            data: None,
313                        }),
314                        ..Default::default()
315                    })
316                }
317            }
318        })
319    }
320
321    /// Sends a response to a request previously received from the child process.
322    ///
323    /// # Arguments
324    ///
325    /// * `id` - The ID of the request being responded to
326    /// * `result` - Optional successful result
327    /// * `error` - Optional error information
328    ///
329    /// # Returns
330    ///
331    /// A `Result` indicating success or failure
332    async fn send_response(
333        &self,
334        id: RequestId,
335        result: Option<serde_json::Value>,
336        error: Option<JsonRpcError>,
337    ) -> Result<()> {
338        let response = JsonRpcResponse {
339            id,
340            result,
341            error,
342            jsonrpc: Default::default(),
343        };
344        let serialized = serde_json::to_string(&response)?;
345        debug!("ClientStdioTransport: Sending response: {}", serialized);
346
347        // Get the stdin writer
348        let mut stdin_guard = self.stdin.lock().await;
349        let mut stdin = stdin_guard
350            .take()
351            .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
352
353        // Use a blocking operation in a spawn_blocking task
354        let stdin_result = tokio::task::spawn_blocking(move || {
355            stdin.write_all(serialized.as_bytes())?;
356            stdin.write_all(b"\n")?;
357            stdin.flush()?;
358            Ok::<_, anyhow::Error>(stdin)
359        })
360        .await??;
361
362        // Put the writer back
363        *stdin_guard = Some(stdin_result);
364
365        Ok(())
366    }
367
368    /// Sends a notification to the child process.
369    ///
370    /// Unlike requests, notifications do not expect a response.
371    ///
372    /// # Arguments
373    ///
374    /// * `method` - The method name for the notification
375    /// * `params` - Optional parameters for the notification
376    ///
377    /// # Returns
378    ///
379    /// A `Result` indicating success or failure
380    async fn send_notification(
381        &self,
382        method: &str,
383        params: Option<serde_json::Value>,
384    ) -> Result<()> {
385        let notification = JsonRpcNotification {
386            jsonrpc: Default::default(),
387            method: method.to_owned(),
388            params,
389        };
390        let serialized = serde_json::to_string(&notification)?;
391        debug!("ClientStdioTransport: Sending notification: {}", serialized);
392
393        // Get the stdin writer
394        let mut stdin_guard = self.stdin.lock().await;
395        let mut stdin = stdin_guard
396            .take()
397            .ok_or_else(|| anyhow::anyhow!("Transport not opened"))?;
398
399        // Use a blocking operation in a spawn_blocking task
400        let stdin_result = tokio::task::spawn_blocking(move || {
401            stdin.write_all(serialized.as_bytes())?;
402            stdin.write_all(b"\n")?;
403            stdin.flush()?;
404            Ok::<_, anyhow::Error>(stdin)
405        })
406        .await??;
407
408        // Put the writer back
409        *stdin_guard = Some(stdin_result);
410
411        Ok(())
412    }
413}