use agent_client_protocol::mcp_server::McpServer;
use agent_client_protocol::schema::{
AgentCapabilities, InitializeRequest, InitializeResponse, NewSessionRequest,
NewSessionResponse, ProtocolVersion, SessionId,
};
use agent_client_protocol::{Agent, Client, Conductor, ConnectTo, DynConnectTo, Proxy};
use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct EchoParams {
message: String,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct EchoOutput {
result: String,
}
async fn recv<T: agent_client_protocol::JsonRpcResponse + Send>(
response: agent_client_protocol::SentRequest<T>,
) -> Result<T, agent_client_protocol::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.on_receiving_result(async move |result| {
tx.send(result)
.map_err(|_| agent_client_protocol::Error::internal_error())
})?;
rx.await
.map_err(|_| agent_client_protocol::Error::internal_error())?
}
struct HandlerConfig {
new_session_handler_called: AtomicBool,
}
impl HandlerConfig {
fn new() -> Arc<Self> {
Arc::new(Self {
new_session_handler_called: AtomicBool::new(false),
})
}
fn was_handler_called(&self) -> bool {
self.new_session_handler_called.load(Ordering::SeqCst)
}
}
struct ProxyWithMcpAndHandler {
config: Arc<HandlerConfig>,
}
impl ConnectTo<Conductor> for ProxyWithMcpAndHandler {
async fn connect_to(
self,
client: impl ConnectTo<Proxy>,
) -> Result<(), agent_client_protocol::Error> {
let config = Arc::clone(&self.config);
let mcp_server = McpServer::builder("test-server".to_string())
.instructions("A test MCP server")
.tool_fn_mut(
"echo",
"Echoes back the input",
async |params: EchoParams, _cx| {
Ok(EchoOutput {
result: format!("Echo: {}", params.message),
})
},
agent_client_protocol::tool_fn_mut!(),
)
.build();
agent_client_protocol::Proxy
.builder()
.name("proxy-with-mcp-and-handler")
.with_mcp_server(mcp_server)
.on_receive_request_from(
Client,
async move |request: NewSessionRequest, responder, cx| {
config
.new_session_handler_called
.store(true, Ordering::SeqCst);
cx.send_request_to(Agent, request)
.on_receiving_result(async move |result| {
let response: NewSessionResponse = result?;
responder.respond(response)
})
},
agent_client_protocol::on_receive_request!(),
)
.connect_to(client)
.await
}
}
struct SimpleAgent;
impl ConnectTo<Client> for SimpleAgent {
async fn connect_to(
self,
client: impl ConnectTo<Agent>,
) -> Result<(), agent_client_protocol::Error> {
Agent
.builder()
.name("simple-agent")
.on_receive_request(
async |request: InitializeRequest, responder, _cx| {
responder.respond(
InitializeResponse::new(request.protocol_version)
.agent_capabilities(AgentCapabilities::new()),
)
},
agent_client_protocol::on_receive_request!(),
)
.on_receive_request(
async |_request: NewSessionRequest, responder, _cx| {
responder.respond(NewSessionResponse::new(SessionId::new(
uuid::Uuid::new_v4().to_string(),
)))
},
agent_client_protocol::on_receive_request!(),
)
.connect_to(client)
.await
}
}
async fn run_test(
proxies: Vec<DynConnectTo<Conductor>>,
agent: DynConnectTo<Client>,
editor_task: impl AsyncFnOnce(
agent_client_protocol::ConnectionTo<Agent>,
) -> Result<(), agent_client_protocol::Error>,
) -> Result<(), agent_client_protocol::Error> {
let (editor_out, conductor_in) = duplex(1024);
let (conductor_out, editor_in) = duplex(1024);
let transport =
agent_client_protocol::ByteStreams::new(editor_out.compat_write(), editor_in.compat());
agent_client_protocol::Client
.builder()
.name("editor-to-conductor")
.with_spawned(|_cx| async move {
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(agent).proxies(proxies),
McpBridgeMode::default(),
)
.run(agent_client_protocol::ByteStreams::new(
conductor_out.compat_write(),
conductor_in.compat(),
))
.await
})
.connect_with(transport, editor_task)
.await
}
#[tokio::test]
async fn test_new_session_handler_invoked_with_mcp_server()
-> Result<(), agent_client_protocol::Error> {
let handler_config = HandlerConfig::new();
let handler_config_clone = Arc::clone(&handler_config);
let proxy = DynConnectTo::<Conductor>::new(ProxyWithMcpAndHandler {
config: handler_config,
});
let agent = DynConnectTo::<Client>::new(SimpleAgent);
run_test(vec![proxy], agent, async |connection_to_editor| {
let _init_response = recv(
connection_to_editor.send_request(InitializeRequest::new(ProtocolVersion::LATEST)),
)
.await?;
let session_response =
recv(connection_to_editor.send_request(NewSessionRequest::new(PathBuf::from("/tmp"))))
.await?;
assert!(
!session_response.session_id.0.is_empty(),
"Should receive a valid session ID"
);
Ok::<(), agent_client_protocol::Error>(())
})
.await?;
assert!(
handler_config_clone.was_handler_called(),
"NewSessionRequest handler should be invoked even when MCP server is in the chain. \
This is a regression - the MCP server was incorrectly forwarding the request directly \
to the agent instead of letting it flow through the handler chain."
);
Ok(())
}