use serde_json::{Value, json};
use std::path::PathBuf;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, DuplexStream};
fn echo_mcp_server_path() -> PathBuf {
let mut path = std::env::current_exe().unwrap();
path.pop(); if path.ends_with("deps") {
path.pop();
}
path.push("echo-mcp-server");
path
}
fn test_config(timeout_secs: u64) -> atm_agent_mcp::proxy::ProxyServer {
use atm_agent_mcp::config::AgentMcpConfig;
let config = AgentMcpConfig {
codex_bin: echo_mcp_server_path().to_string_lossy().to_string(),
request_timeout_secs: timeout_secs,
auto_mail: false,
..Default::default()
};
let unique_team = format!("test-{}", uuid::Uuid::new_v4());
atm_agent_mcp::proxy::ProxyServer::new_with_team(config, unique_team)
}
async fn send_content_length(writer: &mut DuplexStream, msg: &Value) {
let json = serde_json::to_string(msg).unwrap();
let header = format!("Content-Length: {}\r\n\r\n", json.len());
writer.write_all(header.as_bytes()).await.unwrap();
writer.write_all(json.as_bytes()).await.unwrap();
writer.flush().await.unwrap();
}
async fn send_newline(writer: &mut DuplexStream, msg: &Value) {
let json = serde_json::to_string(msg).unwrap();
writer.write_all(json.as_bytes()).await.unwrap();
writer.write_all(b"\n").await.unwrap();
writer.flush().await.unwrap();
}
async fn read_response(reader: &mut BufReader<DuplexStream>) -> Option<Value> {
let mut header_line = String::new();
match tokio::time::timeout(Duration::from_secs(10), async {
loop {
header_line.clear();
let n = reader.read_line(&mut header_line).await.ok()?;
if n == 0 {
return None;
}
let trimmed = header_line.trim();
if trimmed.starts_with("Content-Length:") {
break;
}
}
Some(())
})
.await
{
Ok(Some(())) => {}
Ok(None) | Err(_) => return None,
}
let len: usize = header_line
.trim()
.strip_prefix("Content-Length:")
.unwrap()
.trim()
.parse()
.unwrap();
loop {
let mut line = String::new();
reader.read_line(&mut line).await.ok()?;
if line.trim().is_empty() {
break;
}
}
let mut body = vec![0u8; len];
tokio::io::AsyncReadExt::read_exact(reader, &mut body)
.await
.ok()?;
let s = String::from_utf8(body).ok()?;
serde_json::from_str(&s).ok()
}
async fn read_all_responses(
reader: &mut BufReader<DuplexStream>,
timeout_duration: Duration,
) -> Vec<Value> {
let mut results = Vec::new();
let deadline = tokio::time::Instant::now() + timeout_duration;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, read_response(reader)).await {
Ok(Some(v)) => results.push(v),
_ => break,
}
}
results
}
fn spawn_proxy(
timeout_secs: u64,
) -> (
DuplexStream,
BufReader<DuplexStream>,
tokio::task::JoinHandle<anyhow::Result<()>>,
) {
let (client_write, proxy_read) = tokio::io::duplex(16384);
let (proxy_write, client_read) = tokio::io::duplex(16384);
let handle = tokio::spawn(async move {
let mut proxy = test_config(timeout_secs);
proxy.run(proxy_read, proxy_write).await
});
(client_write, BufReader::new(client_read), handle)
}
#[tokio::test]
async fn test_initialize_passes_through() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let init_req = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {}
}
});
send_content_length(&mut writer, &init_req).await;
let resp = read_response(&mut reader).await.expect("initialize response");
assert_eq!(resp["id"], 2);
assert!(resp.get("result").is_some(), "initialize should succeed");
assert_eq!(
resp["result"]["serverInfo"]["name"],
"atm-agent-mcp",
"proxy must respond with its own serverInfo, not the child's"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_notifications_initialized_passes_through() {
let (mut writer, _reader, handle) = spawn_proxy(300);
let notif = json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
});
send_newline(&mut writer, ¬if).await;
tokio::time::sleep(Duration::from_millis(100)).await;
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_tools_list_adds_synthetic_tools() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "init"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let list_req = json!({
"jsonrpc": "2.0",
"id": 10,
"method": "tools/list"
});
send_newline(&mut writer, &list_req).await;
let resp = read_response(&mut reader).await.expect("tools/list response");
assert_eq!(resp["id"], 10);
let tools = resp["result"]["tools"].as_array().expect("tools array");
let names: Vec<&str> = tools
.iter()
.filter_map(|t| t.get("name").and_then(|n| n.as_str()))
.collect();
assert!(names.contains(&"atm_send"), "missing atm_send");
assert!(names.contains(&"atm_read"), "missing atm_read");
assert!(names.contains(&"atm_broadcast"), "missing atm_broadcast");
assert!(
names.contains(&"atm_pending_count"),
"missing atm_pending_count"
);
assert!(
names.contains(&"agent_sessions"),
"missing agent_sessions"
);
assert!(names.contains(&"agent_status"), "missing agent_status");
assert!(names.contains(&"agent_close"), "missing agent_close");
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_tools_list_preserves_codex_tools() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "init"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let list_req = json!({
"jsonrpc": "2.0",
"id": 20,
"method": "tools/list"
});
send_newline(&mut writer, &list_req).await;
let resp = read_response(&mut reader).await.expect("tools/list response");
let tools = resp["result"]["tools"].as_array().expect("tools array");
let names: Vec<&str> = tools
.iter()
.filter_map(|t| t.get("name").and_then(|n| n.as_str()))
.collect();
assert!(names.contains(&"codex"), "original codex tool missing");
assert!(
names.contains(&"codex-reply"),
"original codex-reply tool missing"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_multiple_synthetic_tools_count() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "init"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let list_req = json!({
"jsonrpc": "2.0",
"id": 30,
"method": "tools/list"
});
send_newline(&mut writer, &list_req).await;
let resp = read_response(&mut reader).await.expect("tools/list response");
let tools = resp["result"]["tools"].as_array().expect("tools array");
assert_eq!(tools.len(), 9, "expected 2 native + 7 synthetic tools");
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_unknown_method_passes_through() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "init"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let req = json!({
"jsonrpc": "2.0",
"id": 40,
"method": "custom/foobar",
"params": {}
});
send_newline(&mut writer, &req).await;
let resp = read_response(&mut reader).await.expect("should get error response");
assert_eq!(resp["id"], 40);
assert_eq!(resp["error"]["code"], -32601);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_lazy_spawn_on_first_codex_call() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let list_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list"
});
send_newline(&mut writer, &list_req).await;
let resp = read_response(&mut reader).await.expect("tools/list response");
assert_eq!(resp["id"], 1);
assert!(
resp.get("error").is_none(),
"tools/list before child spawn should return synthetic tools, not an error; got: {resp}"
);
assert!(
resp["result"]["tools"].is_array(),
"expected tools array in result"
);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "hello"}}
});
send_newline(&mut writer, &codex_req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
assert!(!responses.is_empty(), "should have received response(s)");
let main_resp = responses.iter().find(|r| r.get("id") == Some(&json!(2)));
assert!(
main_resp.is_some(),
"should have the codex response with id=2"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_child_crash_returns_error() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "init"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let crash_req = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": "crash", "arguments": {}}
});
send_newline(&mut writer, &crash_req).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let codex_req2 = json!({
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "after crash"}}
});
send_newline(&mut writer, &codex_req2).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let error_resp = responses.iter().find(|r| {
r.get("id") == Some(&json!(3))
&& r.pointer("/error/code").and_then(|v| v.as_i64()) == Some(-32005)
});
assert!(
error_resp.is_some(),
"expected -32005 CHILD_PROCESS_DEAD error, got: {responses:?}"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_child_crash_includes_exit_code() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "init"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let crash_req = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": "crash", "arguments": {}}
});
send_newline(&mut writer, &crash_req).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let req = json!({
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "after"}}
});
send_newline(&mut writer, &req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let error_resp = responses.iter().find(|r| r.get("id") == Some(&json!(3)));
assert!(error_resp.is_some(), "expected error response");
let err = error_resp.unwrap();
let exit_code = err.pointer("/error/data/exit_code").and_then(|v| v.as_i64());
assert_eq!(exit_code, Some(42), "expected exit code 42");
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_request_timeout_returns_error() {
let (mut writer, mut reader, handle) = spawn_proxy(1);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "slow", "slow": true}}
});
send_newline(&mut writer, &req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(10)).await;
let timeout_resp = responses.iter().find(|r| {
r.get("id") == Some(&json!(1))
&& r.pointer("/error/code").and_then(|v| v.as_i64()) == Some(-32006)
});
assert!(
timeout_resp.is_some(),
"expected -32006 timeout error, got: {responses:?}"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_timeout_includes_proxy_source() {
let (mut writer, mut reader, handle) = spawn_proxy(1);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "slow", "slow": true}}
});
send_newline(&mut writer, &req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(10)).await;
let timeout_resp = responses.iter().find(|r| {
r.pointer("/error/code").and_then(|v| v.as_i64()) == Some(-32006)
});
assert!(timeout_resp.is_some(), "expected timeout error");
assert_eq!(
timeout_resp.unwrap().pointer("/error/data/error_source"),
Some(&json!("proxy"))
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_codex_event_forwarded_to_upstream() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "hello"}}
});
send_newline(&mut writer, &req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let events: Vec<&Value> = responses
.iter()
.filter(|r| r.get("method") == Some(&json!("codex/event")))
.collect();
assert!(
events.len() >= 2,
"expected at least 2 codex/event notifications, got {}",
events.len()
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_codex_event_has_agent_id() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "hello"}}
});
send_newline(&mut writer, &req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let events: Vec<&Value> = responses
.iter()
.filter(|r| r.get("method") == Some(&json!("codex/event")))
.collect();
for event in &events {
let agent_id = event
.pointer("/params/agent_id")
.and_then(|v| v.as_str());
assert!(
agent_id.is_some(),
"event should have an agent_id field"
);
}
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_event_content_unchanged() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "hello"}}
});
send_newline(&mut writer, &req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let events: Vec<&Value> = responses
.iter()
.filter(|r| r.get("method") == Some(&json!("codex/event")))
.collect();
assert!(!events.is_empty(), "expected events");
let first_event = events[0];
let msg_type = first_event
.pointer("/params/msg/type")
.and_then(|v| v.as_str());
assert!(
msg_type.is_some(),
"event msg.type should be present"
);
assert_eq!(msg_type, Some("session_configured"));
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_proxy_shuts_down_on_stdin_eof() {
let (writer, _reader, handle) = spawn_proxy(300);
drop(writer);
let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
assert!(result.is_ok(), "proxy should exit on stdin EOF");
assert!(result.unwrap().is_ok(), "proxy should exit without panic");
}
#[tokio::test]
async fn test_tools_list_schema_valid() {
let tools = atm_agent_mcp::tools::synthetic_tools();
for tool in &tools {
let name = tool["name"].as_str().unwrap();
let schema = tool
.get("inputSchema")
.unwrap_or_else(|| panic!("{name} missing inputSchema"));
assert_eq!(
schema["type"].as_str(),
Some("object"),
"{name} inputSchema type should be 'object'"
);
assert!(
schema.get("properties").is_some(),
"{name} inputSchema should have properties"
);
}
}
#[tokio::test]
async fn test_codex_reply_passes_through() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let codex_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "codex", "arguments": {"prompt": "start session"}}
});
send_newline(&mut writer, &codex_req).await;
let _ = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let reply_req = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "codex-reply",
"arguments": {"prompt": "continue", "threadId": "test-thread-001"}
}
});
send_newline(&mut writer, &reply_req).await;
let responses = read_all_responses(&mut reader, Duration::from_secs(5)).await;
let main_resp = responses.iter().find(|r| r.get("id") == Some(&json!(2)));
assert!(main_resp.is_some(), "should get codex-reply response");
let resp = main_resp.unwrap();
let content = resp
.pointer("/result/structuredContent/threadId")
.and_then(|v| v.as_str());
assert_eq!(content, Some("test-thread-001"));
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_synthetic_tool_returns_not_implemented() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "atm_send",
"arguments": {"to": "agent1", "message": "hello"}
}
});
send_newline(&mut writer, &req).await;
let resp = read_response(&mut reader).await.expect("synthetic tool response");
assert_eq!(resp["id"], 1);
let code = resp
.pointer("/error/code")
.and_then(|v| v.as_i64())
.expect("error code must be present");
assert_eq!(
code,
atm_agent_mcp::proxy::ERR_IDENTITY_REQUIRED,
"atm_send without identity should return ERR_IDENTITY_REQUIRED (-32009)"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_content_length_upstream_framing() {
let (mut writer, mut reader, handle) = spawn_proxy(300);
let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "agent_status",
"arguments": {}
}
});
send_content_length(&mut writer, &req).await;
let resp = read_response(&mut reader).await.expect("response to CL-framed request");
assert_eq!(resp["id"], 1);
assert!(resp.get("error").is_none(), "should not be a protocol error; got: {resp}");
assert_ne!(
resp["result"]["isError"],
json!(true),
"agent_status should return a success result; got: {resp}"
);
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_dropped_events_counter_accessible() {
use atm_agent_mcp::config::AgentMcpConfig;
use std::sync::atomic::Ordering;
let config = AgentMcpConfig::default();
let proxy = atm_agent_mcp::proxy::ProxyServer::new(config);
assert_eq!(proxy.dropped_events.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_initialize_returns_capabilities() {
let (mut writer, mut reader, handle) = spawn_proxy(5);
send_content_length(
&mut writer,
&json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test", "version": "0"}
}
}),
)
.await;
let response = read_response(&mut reader)
.await
.expect("should get initialize response");
assert!(
response.get("error").is_none(),
"should not have error: {response}"
);
let result = &response["result"];
assert_eq!(result["protocolVersion"], "2024-11-05");
assert_eq!(result["serverInfo"]["name"], "atm-agent-mcp");
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_tools_list_before_child_spawn_returns_synthetic_tools() {
let (mut writer, mut reader, handle) = spawn_proxy(5);
send_content_length(
&mut writer,
&json!({"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}),
)
.await;
let response = read_response(&mut reader)
.await
.expect("should get tools/list response");
assert!(
response.get("error").is_none(),
"should not have error: {response}"
);
let tools = response["result"]["tools"]
.as_array()
.expect("tools should be array");
assert_eq!(
tools.len(),
7,
"expected 7 synthetic tools, got {}",
tools.len()
);
let names: Vec<&str> = tools
.iter()
.filter_map(|t| t["name"].as_str())
.collect();
for expected in &[
"atm_send",
"atm_read",
"atm_broadcast",
"atm_pending_count",
"agent_sessions",
"agent_status",
"agent_close",
] {
assert!(
names.contains(expected),
"missing tool: {expected}, got: {names:?}"
);
}
drop(writer);
let _ = handle.await;
}
#[tokio::test]
async fn test_notifications_initialized_does_not_produce_response() {
let (mut writer, mut reader, handle) = spawn_proxy(5);
send_content_length(
&mut writer,
&json!({"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}),
)
.await;
let result = tokio::time::timeout(
Duration::from_millis(200),
read_response(&mut reader),
)
.await;
assert!(result.is_err(), "notification must not produce a response");
drop(writer);
let _ = handle.await;
}