1use futures::{StreamExt, TryStreamExt};
2use mmcp_protocol::{mcp::JSONRPCMessage, port::RPCPort};
3use mmcp_rpc::RPCRuntime;
4use std::io::Write;
5use tokio::io::AsyncBufReadExt;
6use tokio_stream::wrappers::LinesStream;
7
8pub fn stdio_server_rpc() -> impl RPCPort {
9 let (tx, mut rx) = futures::channel::mpsc::channel::<JSONRPCMessage>(100);
10 let stream = LinesStream::new(tokio::io::BufReader::new(tokio::io::stdin()).lines())
11 .map_err(anyhow::Error::from)
12 .map_ok(|line| {
13 Box::pin(async move {
14 match serde_json::from_str::<JSONRPCMessage>(&line) {
15 Ok(message) => Some(message),
16 Err(e) => {
17 eprintln!("Error parsing JSON: {}: {}", e, line);
18 None
19 }
20 }
21 })
22 })
23 .try_filter_map(|message| Box::pin(async move { Ok(message.await) }));
24 let mut writer = std::io::BufWriter::new(std::io::stdout());
25
26 let rpc = RPCRuntime::new(tx, stream);
27
28 tokio::spawn(async move {
30 while let Some(message) = rx.next().await {
31 let json = serde_json::to_string(&message).unwrap();
32 writeln!(&mut writer, "{}", json).unwrap();
33 writer.flush().unwrap();
34 }
35 });
36
37 rpc
38}