mmcp_server_stdio/
lib.rs

1use futures::{StreamExt, TryStreamExt};
2use mmcp_protocol::{mcp::JSONRPCMessage, port::RPCPort};
3use mmcp_rpc::RPCRuntime;
4use std::io::Write;
5use tokio::io::AsyncBufReadExt;
6use tokio_stream::wrappers::LinesStream;
7
8pub fn stdio_server_rpc() -> impl RPCPort {
9    let (tx, mut rx) = futures::channel::mpsc::channel::<JSONRPCMessage>(100);
10    let stream = LinesStream::new(tokio::io::BufReader::new(tokio::io::stdin()).lines())
11        .map_err(anyhow::Error::from)
12        .map_ok(|line| {
13            Box::pin(async move {
14                match serde_json::from_str::<JSONRPCMessage>(&line) {
15                    Ok(message) => Some(message),
16                    Err(e) => {
17                        eprintln!("Error parsing JSON: {}: {}", e, line);
18                        None
19                    }
20                }
21            })
22        })
23        .try_filter_map(|message| Box::pin(async move { Ok(message.await) }));
24    let mut writer = std::io::BufWriter::new(std::io::stdout());
25
26    let rpc = RPCRuntime::new(tx, stream);
27
28    // forward the stream to the channel and enforce tx close on stream close.
29    tokio::spawn(async move {
30        while let Some(message) = rx.next().await {
31            let json = serde_json::to_string(&message).unwrap();
32            writeln!(&mut writer, "{}", json).unwrap();
33            writer.flush().unwrap();
34        }
35    });
36
37    rpc
38}