mod mcp_integration;
use agent_client_protocol::Agent;
use agent_client_protocol::schema::{
ContentBlock, InitializeRequest, NewSessionRequest, PromptRequest, ProtocolVersion,
SessionNotification, TextContent,
};
use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent};
use agent_client_protocol_test::test_binaries;
use agent_client_protocol_test::testy::{Testy, TestyCommand};
use futures::{SinkExt, StreamExt, channel::mpsc};
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
async fn recv<T: agent_client_protocol::JsonRpcResponse + Send>(
response: agent_client_protocol::SentRequest<T>,
) -> Result<T, agent_client_protocol::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.on_receiving_result(async move |result| {
tx.send(result)
.map_err(|_| agent_client_protocol::Error::internal_error())
})?;
rx.await
.map_err(|_| agent_client_protocol::Error::internal_error())?
}
fn conductor_command() -> Vec<String> {
let binary_path = test_binaries::conductor_binary();
vec![binary_path.to_string_lossy().to_string()]
}
async fn run_test_with_mode(
mode: McpBridgeMode,
components: ProxiesAndAgent,
editor_task: impl AsyncFnOnce(
agent_client_protocol::ConnectionTo<Agent>,
) -> Result<(), agent_client_protocol::Error>,
) -> Result<(), agent_client_protocol::Error> {
drop(
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_test_writer()
.try_init(),
);
let (editor_out, conductor_in) = duplex(1024);
let (conductor_out, editor_in) = duplex(1024);
let transport =
agent_client_protocol::ByteStreams::new(editor_out.compat_write(), editor_in.compat());
agent_client_protocol::Client
.builder()
.name("editor-to-connector")
.with_spawned(|_cx| async move {
ConductorImpl::new_agent("conductor".to_string(), components, mode)
.run(agent_client_protocol::ByteStreams::new(
conductor_out.compat_write(),
conductor_in.compat(),
))
.await
})
.connect_with(transport, editor_task)
.await
}
#[tokio::test]
async fn test_proxy_provides_mcp_tools_stdio() -> Result<(), agent_client_protocol::Error> {
run_test_with_mode(
McpBridgeMode::Stdio {
conductor_command: conductor_command(),
},
ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent),
async |connection_to_editor| {
let init_response = recv(
connection_to_editor.send_request(InitializeRequest::new(ProtocolVersion::LATEST)),
)
.await;
assert!(
init_response.is_ok(),
"Initialize should succeed: {init_response:?}"
);
let session_response = recv(
connection_to_editor
.send_request(NewSessionRequest::new(std::path::PathBuf::from("/"))),
)
.await;
assert!(
session_response.is_ok(),
"Session/new should succeed: {session_response:?}"
);
let session = session_response.unwrap();
assert!(!session.session_id.0.is_empty());
Ok(())
},
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_proxy_provides_mcp_tools_http() -> Result<(), agent_client_protocol::Error> {
run_test_with_mode(
McpBridgeMode::Http,
ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent),
async |connection_to_editor| {
let init_response = recv(
connection_to_editor.send_request(InitializeRequest::new(ProtocolVersion::LATEST)),
)
.await;
assert!(
init_response.is_ok(),
"Initialize should succeed: {init_response:?}"
);
let session_response = recv(
connection_to_editor
.send_request(NewSessionRequest::new(std::path::PathBuf::from("/"))),
)
.await;
assert!(
session_response.is_ok(),
"Session/new should succeed: {session_response:?}"
);
let session = session_response.unwrap();
assert!(!session.session_id.0.is_empty());
Ok(())
},
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_agent_handles_prompt() -> Result<(), agent_client_protocol::Error> {
drop(
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_test_writer()
.try_init(),
);
let (mut log_tx, mut log_rx) = mpsc::unbounded();
let (client_write, conductor_read) = duplex(8192);
let (conductor_write, client_read) = duplex(8192);
let conductor_handle = tokio::spawn(async move {
ConductorImpl::new_agent(
"mcp-integration-conductor".to_string(),
ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent),
McpBridgeMode::default(),
)
.run(agent_client_protocol::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
))
.await
});
let result = agent_client_protocol::Client
.builder()
.name("editor-to-connector")
.on_receive_notification(
{
let mut log_tx = log_tx.clone();
async move |notification: SessionNotification,
_cx: agent_client_protocol::ConnectionTo<Agent>| {
log_tx
.send(format!("{notification:?}"))
.await
.map_err(|_| agent_client_protocol::Error::internal_error())
}
},
agent_client_protocol::on_receive_notification!(),
)
.connect_with(
agent_client_protocol::ByteStreams::new(
client_write.compat_write(),
client_read.compat(),
),
async |connection_to_editor| {
recv(
connection_to_editor
.send_request(InitializeRequest::new(ProtocolVersion::LATEST)),
)
.await?;
let session = recv(
connection_to_editor
.send_request(NewSessionRequest::new(std::path::PathBuf::from("/"))),
)
.await?;
tracing::debug!(session_id = %session.session_id.0, "Session created");
let prompt_response = recv(connection_to_editor.send_request(PromptRequest::new(
session.session_id.clone(),
vec![ContentBlock::Text(TextContent::new(TestyCommand::CallTool {
server: "test".to_string(),
tool: "echo".to_string(),
params: serde_json::json!({"message": "Hello from the test!"}),
}.to_prompt()))],
)))
.await?;
log_tx
.send(format!("{prompt_response:?}"))
.await
.map_err(|_| agent_client_protocol::Error::internal_error())?;
Ok(())
},
)
.await;
conductor_handle.abort();
result?;
drop(log_tx);
let mut log_entries = Vec::new();
while let Some(entry) = log_rx.next().await {
log_entries.push(entry);
}
assert_eq!(log_entries.len(), 2, "Expected notification + response");
assert!(
log_entries[0].contains("OK: CallToolResult"),
"Expected successful tool call, got: {}",
log_entries[0]
);
assert!(
log_entries[0].contains("Echo: Hello from the test!"),
"Expected echo result, got: {}",
log_entries[0]
);
assert!(
log_entries[1].contains("PromptResponse"),
"Expected prompt response, got: {}",
log_entries[1]
);
Ok(())
}