mcp_sdk_rs/transport/stdio.rs
1//! Standard I/O Transport Implementation
2//!
3//! This module provides a transport implementation that uses standard input/output (stdio)
4//! for communication. This is particularly useful for:
5//! - Command-line tools that need to communicate with an MCP server
6//! - Local development and testing
7//! - Situations where network transport is not desired or available
8//!
9//! The implementation uses Tokio for asynchronous I/O operations and provides thread-safe
10//! access to stdin/stdout through Arc and Mutex.
11
12use async_trait::async_trait;
13use futures::Stream;
14use std::{pin::Pin, sync::Arc};
15use tokio::sync::{
16 mpsc::{Receiver, Sender},
17 Mutex,
18};
19
20use crate::{
21 error::Error,
22 transport::{Message, Transport},
23};
24
25pub struct StdioTransport {
26 read_connection: Arc<Mutex<Receiver<String>>>,
27 write_connection: Sender<String>,
28}
29
30impl StdioTransport {
31 /// Creates a new stdio transport instance using a child's stdin/stdout.
32 pub fn new(read: Receiver<String>, write: Sender<String>) -> Self {
33 let transport = Self {
34 read_connection: Arc::new(Mutex::new(read)),
35 write_connection: write,
36 };
37 transport
38 }
39}
40
41#[async_trait]
42impl Transport for StdioTransport {
43 /// Sends a message by writing it to the child process' stdin
44 async fn send(&self, message: Message) -> Result<(), Error> {
45 let json = serde_json::to_string(&message)?;
46 let _ =
47 self.write_connection.send(json).await.map_err(|_| {
48 Error::Transport("failed to send message to child process".to_string())
49 })?;
50 Ok(())
51 }
52
53 /// Creates a stream of messages received from the child process' stdout
54 ///
55 /// # Returns
56 ///
57 /// Returns a pinned box containing a stream that yields Result<Message, Error>.
58 /// The stream will continue until stdin is closed or an error occurs.
59 ///
60 /// # Message Flow
61 ///
62 /// 1. Messages are read from stdin in the background task created in `new()`
63 /// 2. Each message is sent through the broadcast channel
64 /// 3. This stream receives messages from the broadcast channel
65 fn receive(&self) -> Pin<Box<dyn Stream<Item = Result<Message, Error>> + Send>> {
66 let read_connection = self.read_connection.clone();
67 Box::pin(futures::stream::unfold(
68 read_connection,
69 async move |read_connection| {
70 let mut guard = read_connection.lock().await;
71 loop {
72 match guard.recv().await {
73 Some(s) => {
74 let message: Message = serde_json::from_str(s.as_str()).unwrap();
75 return Some((Ok(message), read_connection.clone()));
76 }
77 None => return None,
78 }
79 }
80 },
81 ))
82 }
83
84 /// Closes the transport.
85 ///
86 /// For stdio transport, this is a no-op as we don't own stdin/stdout.
87 ///
88 /// # Returns
89 ///
90 /// Always returns `Ok(())`.
91 async fn close(&self) -> Result<(), Error> {
92 // Nothing to do for stdio transport
93 Ok(())
94 }
95}