mcp_sdk_rs/transport/
stdio.rs

1//! Standard I/O Transport Implementation
2//!
3//! This module provides a transport implementation that uses standard input/output (stdio)
4//! for communication. This is particularly useful for:
5//! - Command-line tools that need to communicate with an MCP server
6//! - Local development and testing
7//! - Situations where network transport is not desired or available
8//!
9//! The implementation uses Tokio for asynchronous I/O operations and provides thread-safe
10//! access to stdin/stdout through Arc and Mutex.
11
12use async_trait::async_trait;
13use futures::Stream;
14use std::{pin::Pin, sync::Arc};
15use tokio::sync::{
16    mpsc::{Receiver, Sender},
17    Mutex,
18};
19
20use crate::{
21    error::Error,
22    transport::{Message, Transport},
23};
24
25pub struct StdioTransport {
26    read_connection: Arc<Mutex<Receiver<String>>>,
27    write_connection: Sender<String>,
28}
29
30impl StdioTransport {
31    /// Creates a new stdio transport instance using a child's stdin/stdout.
32    pub fn new(read: Receiver<String>, write: Sender<String>) -> Self {
33        let transport = Self {
34            read_connection: Arc::new(Mutex::new(read)),
35            write_connection: write,
36        };
37        transport
38    }
39}
40
41#[async_trait]
42impl Transport for StdioTransport {
43    /// Sends a message by writing it to the child process' stdin
44    async fn send(&self, message: Message) -> Result<(), Error> {
45        let json = serde_json::to_string(&message)?;
46        let _ =
47            self.write_connection.send(json).await.map_err(|_| {
48                Error::Transport("failed to send message to child process".to_string())
49            })?;
50        Ok(())
51    }
52
53    /// Creates a stream of messages received from the child process' stdout
54    ///
55    /// # Returns
56    ///
57    /// Returns a pinned box containing a stream that yields Result<Message, Error>.
58    /// The stream will continue until stdin is closed or an error occurs.
59    ///
60    /// # Message Flow
61    ///
62    /// 1. Messages are read from stdin in the background task created in `new()`
63    /// 2. Each message is sent through the broadcast channel
64    /// 3. This stream receives messages from the broadcast channel
65    fn receive(&self) -> Pin<Box<dyn Stream<Item = Result<Message, Error>> + Send>> {
66        let read_connection = self.read_connection.clone();
67        Box::pin(futures::stream::unfold(
68            read_connection,
69            async move |read_connection| {
70                let mut guard = read_connection.lock().await;
71                loop {
72                    match guard.recv().await {
73                        Some(s) => {
74                            let message: Message = serde_json::from_str(s.as_str()).unwrap();
75                            return Some((Ok(message), read_connection.clone()));
76                        }
77                        None => return None,
78                    }
79                }
80            },
81        ))
82    }
83
84    /// Closes the transport.
85    ///
86    /// For stdio transport, this is a no-op as we don't own stdin/stdout.
87    ///
88    /// # Returns
89    ///
90    /// Always returns `Ok(())`.
91    async fn close(&self) -> Result<(), Error> {
92        // Nothing to do for stdio transport
93        Ok(())
94    }
95}