mcp_core/transport/server/
stdio.rs

1use crate::protocol::{Protocol, RequestOptions};
2use crate::transport::{
3    JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Message, RequestId,
4    Transport,
5};
6use crate::types::ErrorCode;
7use anyhow::Result;
8use async_trait::async_trait;
9use std::future::Future;
10use std::io::{self, BufRead, Write};
11use std::pin::Pin;
12use tokio::time::timeout;
13use tracing::debug;
14
15/// Server transport that communicates with MCP clients over standard I/O.
16///
17/// The `ServerStdioTransport` uses standard input and output streams (stdin/stdout)
18/// to send and receive MCP messages. This transport is ideal for command-line
19/// applications, where the server needs to communicate with a client that launched
20/// it as a child process.
21///
22/// Use cases include:
23/// - CLI tools that implement MCP
24/// - Embedding MCP in existing command-line applications
25/// - Testing and development scenarios
26///
27/// # Example
28///
29/// ```
30/// use mcp_core::{protocol::Protocol, transport::{ServerStdioTransport, Transport}};
31///
32/// async fn example() {
33///     let protocol = Protocol::builder().build();
34///     let transport = ServerStdioTransport::new(protocol);
35///     // Start handling messages
36///     transport.open().await.expect("Failed to start stdio server");
37/// }
38/// ```
39#[derive(Clone)]
40pub struct ServerStdioTransport {
41    protocol: Protocol,
42}
43
44impl ServerStdioTransport {
45    /// Creates a new `ServerStdioTransport` instance.
46    ///
47    /// # Arguments
48    ///
49    /// * `protocol` - The MCP protocol instance to use for handling messages
50    ///
51    /// # Returns
52    ///
53    /// A new `ServerStdioTransport` instance
54    pub fn new(protocol: Protocol) -> Self {
55        Self { protocol }
56    }
57}
58
59#[async_trait()]
60impl Transport for ServerStdioTransport {
61    /// Opens the transport and starts processing messages.
62    ///
63    /// This method enters a loop that:
64    /// 1. Polls for incoming messages from stdin
65    /// 2. Processes each message according to its type (request, notification, response)
66    /// 3. Sends responses as needed
67    /// 4. Continues until EOF is received on stdin
68    ///
69    /// # Returns
70    ///
71    /// A `Result` indicating success or failure
72    async fn open(&self) -> Result<()> {
73        loop {
74            match self.poll_message().await {
75                Ok(Some(message)) => match message {
76                    Message::Request(request) => {
77                        let response = self.protocol.handle_request(request).await;
78                        self.send_response(response.id, response.result, response.error)
79                            .await?;
80                    }
81                    Message::Notification(notification) => {
82                        self.protocol.handle_notification(notification).await;
83                    }
84                    Message::Response(response) => {
85                        self.protocol.handle_response(response).await;
86                    }
87                },
88                Ok(None) => {
89                    break;
90                }
91                Err(e) => {
92                    tracing::error!("Error receiving message: {:?}", e);
93                }
94            }
95        }
96        Ok(())
97    }
98
99    /// Closes the transport.
100    ///
101    /// This is a no-op for the stdio transport as standard I/O streams are managed by the OS.
102    ///
103    /// # Returns
104    ///
105    /// A `Result` indicating success
106    async fn close(&self) -> Result<()> {
107        Ok(())
108    }
109
110    /// Polls for incoming messages from stdin.
111    ///
112    /// This method reads a line from stdin and parses it as a JSON-RPC message.
113    ///
114    /// # Returns
115    ///
116    /// A `Result` containing an `Option<Message>`. `None` indicates EOF.
117    async fn poll_message(&self) -> Result<Option<Message>> {
118        let stdin = io::stdin();
119        let mut reader = stdin.lock();
120        let mut line = String::new();
121        reader.read_line(&mut line)?;
122        if line.is_empty() {
123            return Ok(None);
124        }
125
126        debug!("Received: {line}");
127        let message: Message = serde_json::from_str(&line)?;
128        Ok(Some(message))
129    }
130
131    /// Sends a request to the client and waits for a response.
132    ///
133    /// This method:
134    /// 1. Creates a new request ID
135    /// 2. Constructs a JSON-RPC request
136    /// 3. Sends it to stdout
137    /// 4. Waits for a response with the same ID, with a timeout
138    ///
139    /// # Arguments
140    ///
141    /// * `method` - The method name for the request
142    /// * `params` - Optional parameters for the request
143    /// * `options` - Request options (like timeout)
144    ///
145    /// # Returns
146    ///
147    /// A `Future` that resolves to a `Result` containing the response
148    fn request(
149        &self,
150        method: &str,
151        params: Option<serde_json::Value>,
152        options: RequestOptions,
153    ) -> Pin<Box<dyn Future<Output = Result<JsonRpcResponse>> + Send + Sync>> {
154        let protocol = self.protocol.clone();
155        let method = method.to_owned();
156        Box::pin(async move {
157            let (id, rx) = protocol.create_request().await;
158            let request = JsonRpcRequest {
159                id,
160                method,
161                jsonrpc: Default::default(),
162                params,
163            };
164            let serialized = serde_json::to_string(&request).unwrap_or_default();
165            debug!("Sending: {serialized}");
166
167            // Use Tokio's async stdout to perform thread-safe, nonblocking writes.
168            let mut stdout = io::stdout();
169            stdout.write_all(serialized.as_bytes())?;
170            stdout.write_all(b"\n")?;
171            stdout.flush()?;
172
173            let result = timeout(options.timeout, rx).await;
174            match result {
175                // The request future completed before the timeout.
176                Ok(inner_result) => match inner_result {
177                    Ok(response) => Ok(response),
178                    Err(_) => {
179                        protocol.cancel_response(id).await;
180                        Ok(JsonRpcResponse {
181                            id,
182                            result: None,
183                            error: Some(JsonRpcError {
184                                code: ErrorCode::RequestTimeout as i32,
185                                message: "Request cancelled".to_string(),
186                                data: None,
187                            }),
188                            ..Default::default()
189                        })
190                    }
191                },
192                // The timeout expired.
193                Err(_) => {
194                    protocol.cancel_response(id).await;
195                    Ok(JsonRpcResponse {
196                        id,
197                        result: None,
198                        error: Some(JsonRpcError {
199                            code: ErrorCode::RequestTimeout as i32,
200                            message: "Request cancelled".to_string(),
201                            data: None,
202                        }),
203                        ..Default::default()
204                    })
205                }
206            }
207        })
208    }
209
210    /// Sends a notification to the client.
211    ///
212    /// This method constructs a JSON-RPC notification and writes it to stdout.
213    /// Unlike requests, notifications do not expect a response.
214    ///
215    /// # Arguments
216    ///
217    /// * `method` - The method name for the notification
218    /// * `params` - Optional parameters for the notification
219    ///
220    /// # Returns
221    ///
222    /// A `Result` indicating success or failure
223    async fn send_notification(
224        &self,
225        method: &str,
226        params: Option<serde_json::Value>,
227    ) -> Result<()> {
228        let notification = JsonRpcNotification {
229            jsonrpc: Default::default(),
230            method: method.to_owned(),
231            params,
232        };
233        let serialized = serde_json::to_string(&notification).unwrap_or_default();
234        let stdout = io::stdout();
235        let mut writer = stdout.lock();
236        debug!("Sending: {serialized}");
237        writer.write_all(serialized.as_bytes())?;
238        writer.write_all(b"\n")?;
239        writer.flush()?;
240        Ok(())
241    }
242
243    /// Sends a response to the client.
244    ///
245    /// This method constructs a JSON-RPC response and writes it to stdout.
246    ///
247    /// # Arguments
248    ///
249    /// * `id` - The ID of the request being responded to
250    /// * `result` - Optional successful result
251    /// * `error` - Optional error information
252    ///
253    /// # Returns
254    ///
255    /// A `Result` indicating success or failure
256    async fn send_response(
257        &self,
258        id: RequestId,
259        result: Option<serde_json::Value>,
260        error: Option<JsonRpcError>,
261    ) -> Result<()> {
262        let response = JsonRpcResponse {
263            id,
264            result,
265            error,
266            jsonrpc: Default::default(),
267        };
268        let serialized = serde_json::to_string(&response).unwrap_or_default();
269        let stdout = io::stdout();
270        let mut writer = stdout.lock();
271        debug!("Sending: {serialized}");
272        writer.write_all(serialized.as_bytes())?;
273        writer.write_all(b"\n")?;
274        writer.flush()?;
275        Ok(())
276    }
277}