use agent_client_protocol::{
ConnectTo, Dispatch, DynConnectTo, role::mcp, schema::McpDisconnectNotification,
};
use futures::{SinkExt as _, StreamExt as _, channel::mpsc};
use tracing::info;
use crate::conductor::ConductorMessage;
#[derive(Debug)]
pub struct McpBridgeConnectionActor {
transport: DynConnectTo<mcp::Client>,
conductor_tx: mpsc::Sender<ConductorMessage>,
to_mcp_client_rx: mpsc::Receiver<Dispatch>,
}
impl McpBridgeConnectionActor {
pub fn new(
component: impl ConnectTo<mcp::Client>,
conductor_tx: mpsc::Sender<ConductorMessage>,
to_mcp_client_rx: mpsc::Receiver<Dispatch>,
) -> Self {
Self {
transport: DynConnectTo::new(component),
conductor_tx,
to_mcp_client_rx,
}
}
pub async fn run(self, connection_id: String) -> Result<(), agent_client_protocol::Error> {
info!(connection_id, "MCP bridge connected");
let McpBridgeConnectionActor {
transport,
mut conductor_tx,
to_mcp_client_rx,
} = self;
let result = mcp::Client
.builder()
.name(format!("mpc-client-to-conductor({connection_id})"))
.on_receive_dispatch(
{
let mut conductor_tx = conductor_tx.clone();
let connection_id = connection_id.clone();
async move |message: agent_client_protocol::Dispatch, _cx| {
conductor_tx
.send(ConductorMessage::McpClientToMcpServer {
connection_id: connection_id.clone(),
message,
})
.await
.map_err(|_| agent_client_protocol::Error::internal_error())
}
},
agent_client_protocol::on_receive_dispatch!(),
)
.connect_with(transport, async move |mcp_connection_to_client| {
let mut to_mcp_client_rx = to_mcp_client_rx;
while let Some(message) = to_mcp_client_rx.next().await {
mcp_connection_to_client.send_proxied_message(message)?;
}
Ok(())
})
.await;
conductor_tx
.send(ConductorMessage::McpConnectionDisconnected {
notification: McpDisconnectNotification {
connection_id,
meta: None,
},
})
.await
.map_err(|_| agent_client_protocol::Error::internal_error())?;
result
}
}