mcp_core/transport/server/stdio.rs
1use crate::protocol::{Protocol, RequestOptions};
2use crate::transport::{
3 JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Message, RequestId,
4 Transport,
5};
6use crate::types::ErrorCode;
7use anyhow::Result;
8use async_trait::async_trait;
9use std::future::Future;
10use std::io::{self, BufRead, Write};
11use std::pin::Pin;
12use tokio::time::timeout;
13use tracing::debug;
14
15/// Server transport that communicates with MCP clients over standard I/O.
16///
17/// The `ServerStdioTransport` uses standard input and output streams (stdin/stdout)
18/// to send and receive MCP messages. This transport is ideal for command-line
19/// applications, where the server needs to communicate with a client that launched
20/// it as a child process.
21///
22/// Use cases include:
23/// - CLI tools that implement MCP
24/// - Embedding MCP in existing command-line applications
25/// - Testing and development scenarios
26///
27/// # Example
28///
29/// ```
30/// use mcp_core::{protocol::Protocol, transport::{ServerStdioTransport, Transport}};
31///
32/// async fn example() {
33/// let protocol = Protocol::builder().build();
34/// let transport = ServerStdioTransport::new(protocol);
35/// // Start handling messages
36/// transport.open().await.expect("Failed to start stdio server");
37/// }
38/// ```
39#[derive(Clone)]
40pub struct ServerStdioTransport {
41 protocol: Protocol,
42}
43
44impl ServerStdioTransport {
45 /// Creates a new `ServerStdioTransport` instance.
46 ///
47 /// # Arguments
48 ///
49 /// * `protocol` - The MCP protocol instance to use for handling messages
50 ///
51 /// # Returns
52 ///
53 /// A new `ServerStdioTransport` instance
54 pub fn new(protocol: Protocol) -> Self {
55 Self { protocol }
56 }
57}
58
59#[async_trait()]
60impl Transport for ServerStdioTransport {
61 /// Opens the transport and starts processing messages.
62 ///
63 /// This method enters a loop that:
64 /// 1. Polls for incoming messages from stdin
65 /// 2. Processes each message according to its type (request, notification, response)
66 /// 3. Sends responses as needed
67 /// 4. Continues until EOF is received on stdin
68 ///
69 /// # Returns
70 ///
71 /// A `Result` indicating success or failure
72 async fn open(&self) -> Result<()> {
73 loop {
74 match self.poll_message().await {
75 Ok(Some(message)) => match message {
76 Message::Request(request) => {
77 let response = self.protocol.handle_request(request).await;
78 self.send_response(response.id, response.result, response.error)
79 .await?;
80 }
81 Message::Notification(notification) => {
82 self.protocol.handle_notification(notification).await;
83 }
84 Message::Response(response) => {
85 self.protocol.handle_response(response).await;
86 }
87 },
88 Ok(None) => {
89 break;
90 }
91 Err(e) => {
92 tracing::error!("Error receiving message: {:?}", e);
93 }
94 }
95 }
96 Ok(())
97 }
98
99 /// Closes the transport.
100 ///
101 /// This is a no-op for the stdio transport as standard I/O streams are managed by the OS.
102 ///
103 /// # Returns
104 ///
105 /// A `Result` indicating success
106 async fn close(&self) -> Result<()> {
107 Ok(())
108 }
109
110 /// Polls for incoming messages from stdin.
111 ///
112 /// This method reads a line from stdin and parses it as a JSON-RPC message.
113 ///
114 /// # Returns
115 ///
116 /// A `Result` containing an `Option<Message>`. `None` indicates EOF.
117 async fn poll_message(&self) -> Result<Option<Message>> {
118 let stdin = io::stdin();
119 let mut reader = stdin.lock();
120 let mut line = String::new();
121 reader.read_line(&mut line)?;
122 if line.is_empty() {
123 return Ok(None);
124 }
125
126 debug!("Received: {line}");
127 let message: Message = serde_json::from_str(&line)?;
128 Ok(Some(message))
129 }
130
131 /// Sends a request to the client and waits for a response.
132 ///
133 /// This method:
134 /// 1. Creates a new request ID
135 /// 2. Constructs a JSON-RPC request
136 /// 3. Sends it to stdout
137 /// 4. Waits for a response with the same ID, with a timeout
138 ///
139 /// # Arguments
140 ///
141 /// * `method` - The method name for the request
142 /// * `params` - Optional parameters for the request
143 /// * `options` - Request options (like timeout)
144 ///
145 /// # Returns
146 ///
147 /// A `Future` that resolves to a `Result` containing the response
148 fn request(
149 &self,
150 method: &str,
151 params: Option<serde_json::Value>,
152 options: RequestOptions,
153 ) -> Pin<Box<dyn Future<Output = Result<JsonRpcResponse>> + Send + Sync>> {
154 let protocol = self.protocol.clone();
155 let method = method.to_owned();
156 Box::pin(async move {
157 let (id, rx) = protocol.create_request().await;
158 let request = JsonRpcRequest {
159 id,
160 method,
161 jsonrpc: Default::default(),
162 params,
163 };
164 let serialized = serde_json::to_string(&request).unwrap_or_default();
165 debug!("Sending: {serialized}");
166
167 // Use Tokio's async stdout to perform thread-safe, nonblocking writes.
168 let mut stdout = io::stdout();
169 stdout.write_all(serialized.as_bytes())?;
170 stdout.write_all(b"\n")?;
171 stdout.flush()?;
172
173 let result = timeout(options.timeout, rx).await;
174 match result {
175 // The request future completed before the timeout.
176 Ok(inner_result) => match inner_result {
177 Ok(response) => Ok(response),
178 Err(_) => {
179 protocol.cancel_response(id).await;
180 Ok(JsonRpcResponse {
181 id,
182 result: None,
183 error: Some(JsonRpcError {
184 code: ErrorCode::RequestTimeout as i32,
185 message: "Request cancelled".to_string(),
186 data: None,
187 }),
188 ..Default::default()
189 })
190 }
191 },
192 // The timeout expired.
193 Err(_) => {
194 protocol.cancel_response(id).await;
195 Ok(JsonRpcResponse {
196 id,
197 result: None,
198 error: Some(JsonRpcError {
199 code: ErrorCode::RequestTimeout as i32,
200 message: "Request cancelled".to_string(),
201 data: None,
202 }),
203 ..Default::default()
204 })
205 }
206 }
207 })
208 }
209
210 /// Sends a notification to the client.
211 ///
212 /// This method constructs a JSON-RPC notification and writes it to stdout.
213 /// Unlike requests, notifications do not expect a response.
214 ///
215 /// # Arguments
216 ///
217 /// * `method` - The method name for the notification
218 /// * `params` - Optional parameters for the notification
219 ///
220 /// # Returns
221 ///
222 /// A `Result` indicating success or failure
223 async fn send_notification(
224 &self,
225 method: &str,
226 params: Option<serde_json::Value>,
227 ) -> Result<()> {
228 let notification = JsonRpcNotification {
229 jsonrpc: Default::default(),
230 method: method.to_owned(),
231 params,
232 };
233 let serialized = serde_json::to_string(¬ification).unwrap_or_default();
234 let stdout = io::stdout();
235 let mut writer = stdout.lock();
236 debug!("Sending: {serialized}");
237 writer.write_all(serialized.as_bytes())?;
238 writer.write_all(b"\n")?;
239 writer.flush()?;
240 Ok(())
241 }
242
243 /// Sends a response to the client.
244 ///
245 /// This method constructs a JSON-RPC response and writes it to stdout.
246 ///
247 /// # Arguments
248 ///
249 /// * `id` - The ID of the request being responded to
250 /// * `result` - Optional successful result
251 /// * `error` - Optional error information
252 ///
253 /// # Returns
254 ///
255 /// A `Result` indicating success or failure
256 async fn send_response(
257 &self,
258 id: RequestId,
259 result: Option<serde_json::Value>,
260 error: Option<JsonRpcError>,
261 ) -> Result<()> {
262 let response = JsonRpcResponse {
263 id,
264 result,
265 error,
266 jsonrpc: Default::default(),
267 };
268 let serialized = serde_json::to_string(&response).unwrap_or_default();
269 let stdout = io::stdout();
270 let mut writer = stdout.lock();
271 debug!("Sending: {serialized}");
272 writer.write_all(serialized.as_bytes())?;
273 writer.write_all(b"\n")?;
274 writer.flush()?;
275 Ok(())
276 }
277}