pub mod actor;
pub mod http;
pub mod stdio;
use std::collections::HashMap;
use std::path::PathBuf;
use agent_client_protocol::schema::{McpServer, McpServerHttp, McpServerStdio};
use agent_client_protocol::{ConnectionTo, Dispatch, Role};
use futures::{SinkExt, channel::mpsc};
use tokio::net::TcpListener;
use tracing::info;
pub use self::actor::McpBridgeConnectionActor;
use crate::conductor::ConductorMessage;
#[derive(Default, Debug)]
pub struct McpBridgeListeners {
listeners: HashMap<String, McpBridgeListener>,
}
#[derive(Clone, Debug)]
pub(super) struct McpBridgeListener {
pub server: McpServer,
}
#[derive(Clone, Debug)]
pub struct McpBridgeConnection {
to_mcp_client_tx: mpsc::Sender<Dispatch>,
}
impl McpBridgeConnection {
pub fn new(to_mcp_client_tx: mpsc::Sender<Dispatch>) -> Self {
Self { to_mcp_client_tx }
}
pub async fn send(&mut self, message: Dispatch) -> Result<(), agent_client_protocol::Error> {
self.to_mcp_client_tx
.send(message)
.await
.map_err(|_| agent_client_protocol::Error::internal_error())
}
}
impl McpBridgeListeners {
pub async fn transform_mcp_server(
&mut self,
connection: ConnectionTo<impl Role>,
mcp_server: &mut McpServer,
conductor_tx: &mpsc::Sender<ConductorMessage>,
mcp_bridge_mode: &crate::McpBridgeMode,
) -> Result<(), agent_client_protocol::Error> {
use agent_client_protocol::schema::McpServer;
let McpServer::Http(http) = mcp_server else {
return Ok(());
};
if !http.url.starts_with("acp:") {
return Ok(());
}
if !http.headers.is_empty() {
return Err(agent_client_protocol::Error::internal_error());
}
let name = &http.name;
let url = &http.url;
info!(
server_name = name,
acp_url = url,
"Detected MCP server with ACP transport, spawning TCP bridge"
);
let transformed = self
.spawn_bridge(connection, name, url, conductor_tx, mcp_bridge_mode)
.await?;
*mcp_server = transformed;
Ok(())
}
async fn spawn_bridge(
&mut self,
connection: ConnectionTo<impl Role>,
server_name: &str,
acp_url: &str,
conductor_tx: &mpsc::Sender<ConductorMessage>,
mcp_bridge_mode: &crate::McpBridgeMode,
) -> anyhow::Result<McpServer> {
if let Some(listener) = self.listeners.get(acp_url) {
return Ok(listener.server.clone());
}
let tcp_listener = TcpListener::bind("127.0.0.1:0").await?;
let tcp_port = tcp_listener.local_addr()?.port();
info!(acp_url = acp_url, tcp_port, "Bound listener for MCP bridge");
let new_server = match mcp_bridge_mode {
crate::McpBridgeMode::Stdio { conductor_command } => McpServer::Stdio(
McpServerStdio::new(
server_name.to_string(),
PathBuf::from(&conductor_command[0]),
)
.args(
conductor_command[1..]
.iter()
.cloned()
.chain(vec!["mcp".to_string(), format!("{tcp_port}")])
.collect::<Vec<_>>(),
),
),
crate::McpBridgeMode::Http => McpServer::Http(McpServerHttp::new(
server_name.to_string(),
format!("http://localhost:{tcp_port}"),
)),
};
self.listeners.insert(
acp_url.to_string(),
McpBridgeListener {
server: new_server.clone(),
},
);
connection.spawn({
let acp_url = acp_url.to_string();
let conductor_tx = conductor_tx.clone();
let mcp_bridge_mode = mcp_bridge_mode.clone();
async move {
info!(
acp_url = acp_url,
tcp_port, "now accepting bridge connections"
);
match mcp_bridge_mode {
crate::McpBridgeMode::Stdio {
conductor_command: _,
} => stdio::run_tcp_listener(tcp_listener, acp_url, conductor_tx).await,
crate::McpBridgeMode::Http => {
http::run_http_listener(tcp_listener, acp_url, conductor_tx).await
}
}
}
})?;
Ok(new_server)
}
}