use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tower_mcp::transport::stdio::BidirectionalStdioTransport;
use tower_mcp::{McpRouter, StdioTransport};
use tower_mcp_types::testing::assert_jsonrpc_error_response;
fn router() -> McpRouter {
McpRouter::new().server_info("stdio-loop-test", "0.0.0")
}
async fn read_n_frames<R>(mut reader: BufReader<R>, expected: usize) -> Vec<serde_json::Value>
where
R: tokio::io::AsyncRead + Unpin,
{
let mut out = Vec::with_capacity(expected);
while out.len() < expected {
let mut line = String::new();
let n = reader
.read_line(&mut line)
.await
.expect("read from server output");
if n == 0 {
break; }
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let v: serde_json::Value = serde_json::from_str(trimmed)
.unwrap_or_else(|e| panic!("invalid JSON on output: {e}: {trimmed}"));
out.push(v);
}
out
}
#[tokio::test]
async fn stdio_transport_parse_error_wire_shape() {
let mut transport = StdioTransport::new(router());
let (server_stdin_writer, server_stdin) = tokio::io::duplex(4096);
let (server_stdout, server_stdout_reader) = tokio::io::duplex(4096);
let handle = tokio::spawn(async move {
transport
.run_with_streams(server_stdin, server_stdout)
.await
});
let mut stdin_writer = server_stdin_writer;
stdin_writer
.write_all(b"not valid json{{{\n")
.await
.unwrap();
stdin_writer.flush().await.unwrap();
drop(stdin_writer);
let reader = BufReader::new(server_stdout_reader);
let frames = read_n_frames(reader, 1).await;
handle
.await
.expect("transport task join")
.expect("run_with_streams ok");
assert_eq!(
frames.len(),
1,
"expected one parse-error frame, got: {frames:?}"
);
let frame = &frames[0];
assert_jsonrpc_error_response(frame);
assert!(
frame["id"].is_null(),
"parse error id must be null, got: {frame}"
);
assert_eq!(frame["error"]["code"].as_i64().unwrap(), -32700);
}
#[tokio::test]
async fn stdio_transport_loop_continues_after_parse_error() {
let mut transport = StdioTransport::new(router());
let (server_stdin_writer, server_stdin) = tokio::io::duplex(4096);
let (server_stdout, server_stdout_reader) = tokio::io::duplex(4096);
let handle = tokio::spawn(async move {
transport
.run_with_streams(server_stdin, server_stdout)
.await
});
let mut stdin_writer = server_stdin_writer;
stdin_writer.write_all(b"this is not json\n").await.unwrap();
stdin_writer
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":42,\"method\":\"ping\"}\n")
.await
.unwrap();
stdin_writer.flush().await.unwrap();
drop(stdin_writer);
let reader = BufReader::new(server_stdout_reader);
let frames = read_n_frames(reader, 2).await;
handle
.await
.expect("transport task join")
.expect("run_with_streams ok");
assert_eq!(
frames.len(),
2,
"expected parse-error frame + ping response, got: {frames:?}"
);
assert_jsonrpc_error_response(&frames[0]);
assert!(frames[0]["id"].is_null());
assert_eq!(frames[0]["error"]["code"].as_i64().unwrap(), -32700);
assert_eq!(frames[1]["jsonrpc"], "2.0");
assert_eq!(frames[1]["id"], 42);
assert!(
frames[1].get("result").is_some(),
"ping must return a successful result frame, got: {}",
frames[1]
);
}
#[tokio::test]
async fn stdio_transport_eof_returns_ok() {
let mut transport = StdioTransport::new(router());
let (server_stdin_writer, server_stdin) = tokio::io::duplex(4096);
let (server_stdout, _server_stdout_reader) = tokio::io::duplex(4096);
let handle = tokio::spawn(async move {
transport
.run_with_streams(server_stdin, server_stdout)
.await
});
drop(server_stdin_writer);
let result = handle.await.expect("transport task join");
assert!(
result.is_ok(),
"run_with_streams must return Ok on EOF, got: {result:?}"
);
}
#[tokio::test]
async fn bidi_transport_parse_error_wire_shape() {
let mut transport = BidirectionalStdioTransport::new(router());
let (server_stdin_writer, server_stdin) = tokio::io::duplex(4096);
let (server_stdout, server_stdout_reader) = tokio::io::duplex(4096);
let handle = tokio::spawn(async move {
transport
.run_with_streams(server_stdin, server_stdout)
.await
});
let mut stdin_writer = server_stdin_writer;
stdin_writer
.write_all(b"not valid json{{{\n")
.await
.unwrap();
stdin_writer.flush().await.unwrap();
drop(stdin_writer);
let reader = BufReader::new(server_stdout_reader);
let frames = read_n_frames(reader, 1).await;
handle
.await
.expect("transport task join")
.expect("run_with_streams ok");
assert_eq!(
frames.len(),
1,
"expected one parse-error frame, got: {frames:?}"
);
assert_jsonrpc_error_response(&frames[0]);
assert!(frames[0]["id"].is_null());
assert_eq!(frames[0]["error"]["code"].as_i64().unwrap(), -32700);
}
#[tokio::test]
async fn bidi_transport_loop_continues_after_parse_error() {
let mut transport = BidirectionalStdioTransport::new(router());
let (server_stdin_writer, server_stdin) = tokio::io::duplex(4096);
let (server_stdout, server_stdout_reader) = tokio::io::duplex(4096);
let handle = tokio::spawn(async move {
transport
.run_with_streams(server_stdin, server_stdout)
.await
});
let mut stdin_writer = server_stdin_writer;
stdin_writer.write_all(b"this is not json\n").await.unwrap();
stdin_writer
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":7,\"method\":\"ping\"}\n")
.await
.unwrap();
stdin_writer.flush().await.unwrap();
drop(stdin_writer);
let reader = BufReader::new(server_stdout_reader);
let frames = read_n_frames(reader, 2).await;
handle
.await
.expect("transport task join")
.expect("run_with_streams ok");
assert_eq!(
frames.len(),
2,
"expected parse-error frame + ping response, got: {frames:?}"
);
assert_jsonrpc_error_response(&frames[0]);
assert!(frames[0]["id"].is_null());
assert_eq!(frames[0]["error"]["code"].as_i64().unwrap(), -32700);
assert_eq!(frames[1]["jsonrpc"], "2.0");
assert_eq!(frames[1]["id"], 7);
assert!(
frames[1].get("result").is_some(),
"ping must return a successful result frame, got: {}",
frames[1]
);
}