modelcontextprotocol_server/transport/
stdio.rs1use anyhow::Result;
3use async_trait::async_trait;
4use mcp_protocol::messages::JsonRpcMessage;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
6use tokio::sync::mpsc;
7use tracing::debug;
8
9#[derive(Clone)]
11pub struct StdioTransport;
12
13impl StdioTransport {
14 pub fn new() -> Self {
16 Self
17 }
18}
19
20#[async_trait]
21impl super::Transport for StdioTransport {
22 async fn start(&self, message_tx: mpsc::Sender<JsonRpcMessage>) -> Result<()> {
23 let stdin = tokio::io::stdin();
24 let mut reader = BufReader::new(stdin);
25 let mut line = String::new();
26
27 tokio::spawn(async move {
28 while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
29 debug!("Received line: {}", line);
30 match serde_json::from_str::<JsonRpcMessage>(&line) {
31 Ok(message) => {
32 if message_tx.send(message).await.is_err() {
33 break;
34 }
35 }
36 Err(err) => {
37 tracing::error!("Failed to parse JSON-RPC message: {}", err);
38 }
39 }
40
41 line.clear();
42 }
43 });
44
45 Ok(())
46 }
47
48 async fn send(&self, message: JsonRpcMessage) -> Result<()> {
49 let mut stdout = tokio::io::stdout();
50 let serialized = serde_json::to_string(&message)?;
51
52 debug!("Sending message: {}", serialized);
53 stdout.write_all(serialized.as_bytes()).await?;
54 stdout.write_all(b"\n").await?;
55 stdout.flush().await?;
56
57 Ok(())
58 }
59
60 async fn close(&self) -> Result<()> {
61 Ok(())
63 }
64
65 fn box_clone(&self) -> Box<dyn super::Transport> {
66 Box::new(self.clone())
67 }
68}