use agent_client_protocol::{
ByteStreams, ConnectTo, RunWithConnectionTo, mcp_server::McpServer, role::mcp, util::run_until,
};
use rmcp::{ClientHandler, ServiceExt, model::ClientInfo};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct EchoInput {
message: String,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct AddInput {
a: i32,
b: i32,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct AddOutput {
result: i32,
}
fn create_test_server() -> McpServer<mcp::Client, impl RunWithConnectionTo<mcp::Client>> {
McpServer::builder("test-server")
.instructions("A test MCP server")
.tool_fn(
"echo",
"Echo a message back",
async |input: EchoInput, _cx| Ok(format!("Echo: {}", input.message)),
agent_client_protocol::tool_fn!(),
)
.tool_fn(
"add",
"Add two numbers",
async |input: AddInput, _cx| {
Ok(AddOutput {
result: input.a + input.b,
})
},
agent_client_protocol::tool_fn!(),
)
.build()
}
#[derive(Debug, Clone, Default)]
struct MinimalClientHandler;
impl ClientHandler for MinimalClientHandler {
fn get_info(&self) -> ClientInfo {
ClientInfo::default()
}
}
#[tokio::test]
async fn test_standalone_server_list_tools() -> Result<(), agent_client_protocol::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);
let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());
run_until(
ConnectTo::<mcp::Client>::connect_to(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(agent_client_protocol::util::internal_error)?;
let tools_result = client
.list_tools(None)
.await
.map_err(agent_client_protocol::util::internal_error)?;
assert_eq!(tools_result.tools.len(), 2);
let tool_names: Vec<&str> =
tools_result.tools.iter().map(|t| t.name.as_ref()).collect();
assert!(tool_names.contains(&"echo"));
assert!(tool_names.contains(&"add"));
client
.cancel()
.await
.map_err(agent_client_protocol::util::internal_error)?;
Ok(())
},
)
.await
}
#[tokio::test]
async fn test_standalone_server_call_echo_tool() -> Result<(), agent_client_protocol::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);
let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());
run_until(
ConnectTo::<mcp::Client>::connect_to(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(agent_client_protocol::util::internal_error)?;
let result = client
.call_tool(
rmcp::model::CallToolRequestParams::new("echo").with_arguments(
serde_json::json!({ "message": "hello world" })
.as_object()
.unwrap()
.clone(),
),
)
.await
.map_err(agent_client_protocol::util::internal_error)?;
let text = result
.content
.first()
.and_then(|c| c.raw.as_text())
.map(|t| t.text.as_str())
.expect("Expected text content");
assert_eq!(text, r#""Echo: hello world""#, "Unexpected echo response");
client
.cancel()
.await
.map_err(agent_client_protocol::util::internal_error)?;
Ok(())
},
)
.await
}
#[tokio::test]
async fn test_standalone_server_call_add_tool() -> Result<(), agent_client_protocol::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);
let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());
run_until(
ConnectTo::<mcp::Client>::connect_to(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(agent_client_protocol::util::internal_error)?;
let result = client
.call_tool(
rmcp::model::CallToolRequestParams::new("add").with_arguments(
serde_json::json!({ "a": 5, "b": 3 })
.as_object()
.unwrap()
.clone(),
),
)
.await
.map_err(agent_client_protocol::util::internal_error)?;
assert!(!result.is_error.unwrap_or(false));
let content = result.content.first().expect("Expected content");
let text = content.raw.as_text().expect("Expected text content");
assert!(
text.text.contains('8') || text.text.contains("result"),
"Expected result to contain 8, got: {}",
text.text
);
client
.cancel()
.await
.map_err(agent_client_protocol::util::internal_error)?;
Ok(())
},
)
.await
}
#[tokio::test]
async fn test_standalone_server_tool_not_found() -> Result<(), agent_client_protocol::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);
let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());
run_until(
ConnectTo::<mcp::Client>::connect_to(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(agent_client_protocol::util::internal_error)?;
let result = client
.call_tool(rmcp::model::CallToolRequestParams::new("nonexistent"))
.await;
assert!(result.is_err(), "Expected error for non-existent tool");
client
.cancel()
.await
.map_err(agent_client_protocol::util::internal_error)?;
Ok(())
},
)
.await
}
#[tokio::test]
async fn test_standalone_server_with_disabled_tools() -> Result<(), agent_client_protocol::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);
let server = McpServer::builder("test-server")
.tool_fn(
"echo",
"Echo a message",
async |input: EchoInput, _cx| Ok(format!("Echo: {}", input.message)),
agent_client_protocol::tool_fn!(),
)
.tool_fn(
"add",
"Add two numbers",
async |input: AddInput, _cx| {
Ok(AddOutput {
result: input.a + input.b,
})
},
agent_client_protocol::tool_fn!(),
)
.disable_tool("echo")?
.build();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());
run_until(
ConnectTo::<mcp::Client>::connect_to(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(agent_client_protocol::util::internal_error)?;
let tools_result = client
.list_tools(None)
.await
.map_err(agent_client_protocol::util::internal_error)?;
assert_eq!(tools_result.tools.len(), 1);
assert_eq!(tools_result.tools[0].name.as_ref(), "add");
let result = client
.call_tool(
rmcp::model::CallToolRequestParams::new("echo").with_arguments(
serde_json::json!({ "message": "test" })
.as_object()
.unwrap()
.clone(),
),
)
.await;
assert!(result.is_err(), "Expected error for disabled tool");
client
.cancel()
.await
.map_err(agent_client_protocol::util::internal_error)?;
Ok(())
},
)
.await
}