use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use crate::dispatch::dispatch_request;
use crate::initialize::ServerInfo;
use crate::jsonrpc::{parse_envelope, RequestEnvelope, Response};
use crate::router::McpState;
use crate::tools::ToolDispatch;
pub async fn serve_stdio(state: McpState) -> std::io::Result<()> {
serve(
&state.dispatch,
&state.server_info,
tokio::io::stdin(),
tokio::io::stdout(),
)
.await
}
pub async fn serve<R, W>(
dispatch: &ToolDispatch,
server_info: &ServerInfo,
reader: R,
mut writer: W,
) -> std::io::Result<()>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let mut lines = BufReader::new(reader).lines();
while let Some(line) = lines.next_line().await? {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let out: Option<String> = match parse_envelope(trimmed.as_bytes()) {
Ok(RequestEnvelope::Single(req)) => dispatch_request(dispatch, server_info, req)
.await
.map(|r| serde_json::to_string(&r).expect("response serializes")),
Ok(RequestEnvelope::Batch(reqs)) => {
let mut responses: Vec<Response> = Vec::with_capacity(reqs.len());
for req in reqs {
if let Some(r) = dispatch_request(dispatch, server_info, req).await {
responses.push(r);
}
}
if responses.is_empty() {
None
} else {
Some(serde_json::to_string(&responses).expect("batch serializes"))
}
}
Err(err) => Some(
serde_json::json!({ "jsonrpc": "2.0", "id": serde_json::Value::Null, "error": err })
.to_string(),
),
};
if let Some(msg) = out {
writer.write_all(msg.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
}
}
Ok(())
}
#[cfg(test)]
mod tests;