agent_client_protocol_polyfill/mcp_over_acp/
mod.rs1mod actor;
24pub(crate) mod http;
25pub(crate) mod stdio;
26
27use std::collections::HashMap;
28use std::path::PathBuf;
29
30use agent_client_protocol::schema::{
31 InitializeProxyRequest, McpConnectRequest, McpConnectResponse, McpDisconnectNotification,
32 McpOverAcpMessage, McpServer, McpServerHttp, McpServerStdio, NewSessionRequest,
33};
34use agent_client_protocol::{
35 Agent, Client, Conductor, ConnectTo, ConnectionTo, Dispatch, Proxy, Role,
36};
37use futures::{SinkExt, channel::mpsc};
38use tokio::net::TcpListener;
39use tracing::info;
40
41use self::actor::BridgeConnectionActor;
42
43#[derive(Debug)]
45pub(crate) enum BridgeMessage {
46 ConnectionReceived {
48 acp_id: String,
49 actor: BridgeConnectionActor,
50 connection: BridgeConnection,
51 },
52
53 ConnectionEstablished {
55 response: McpConnectResponse,
56 actor: BridgeConnectionActor,
57 connection: BridgeConnection,
58 },
59
60 ClientToServer {
62 connection_id: String,
63 message: Dispatch,
64 },
65
66 Disconnected {
68 notification: McpDisconnectNotification,
69 },
70}
71
72#[derive(Clone, Debug)]
74#[allow(dead_code)]
75pub(crate) struct BridgeConnection {
76 to_mcp_client_tx: mpsc::Sender<Dispatch>,
77}
78
79impl BridgeConnection {
80 pub fn new(to_mcp_client_tx: mpsc::Sender<Dispatch>) -> Self {
81 Self { to_mcp_client_tx }
82 }
83
84 #[allow(dead_code)]
85 pub async fn send(&mut self, message: Dispatch) -> Result<(), agent_client_protocol::Error> {
86 self.to_mcp_client_tx
87 .send(message)
88 .await
89 .map_err(|_| agent_client_protocol::Error::internal_error())
90 }
91}
92
93#[derive(Debug, Clone, Default)]
95pub enum BridgeMode {
96 Stdio {
98 conductor_command: Vec<String>,
100 },
101
102 #[default]
104 Http,
105}
106
107#[derive(Debug)]
111pub struct McpOverAcpPolyfill {
112 mode: BridgeMode,
113}
114
115impl McpOverAcpPolyfill {
116 #[must_use]
118 pub fn http() -> Self {
119 Self {
120 mode: BridgeMode::Http,
121 }
122 }
123
124 #[must_use]
126 pub fn stdio(conductor_command: Vec<String>) -> Self {
127 Self {
128 mode: BridgeMode::Stdio { conductor_command },
129 }
130 }
131}
132
133impl ConnectTo<Conductor> for McpOverAcpPolyfill {
134 async fn connect_to(
135 self,
136 client: impl ConnectTo<Proxy>,
137 ) -> Result<(), agent_client_protocol::Error> {
138 let (bridge_tx, bridge_rx) = mpsc::channel(128);
139 let mode = self.mode;
140
141 Proxy
142 .builder()
143 .name("mcp-over-acp-polyfill")
144 .with_responder(BridgeResponder {
145 bridge_tx: bridge_tx.clone(),
146 bridge_rx,
147 bridge_connections: HashMap::new(),
148 })
149 .on_receive_request_from(
150 Client,
151 async move |request: InitializeProxyRequest,
152 responder,
153 cx: ConnectionTo<Conductor>| {
154 cx.send_request_to(Agent, request.initialize)
157 .on_receiving_result(async move |result| {
158 responder.respond_with_result(result.map(|mut response| {
159 response.agent_capabilities.mcp_capabilities.acp = true;
160 response
161 }))
162 })
163 },
164 agent_client_protocol::on_receive_request!(),
165 )
166 .on_receive_request_from(
167 Client,
168 {
169 let bridge_tx = bridge_tx.clone();
170 async move |mut request: NewSessionRequest,
171 responder,
172 cx: ConnectionTo<Conductor>| {
173 let mut listeners = BridgeListeners::default();
175 for mcp_server in &mut request.mcp_servers {
176 listeners
177 .transform_mcp_server(cx.clone(), mcp_server, &bridge_tx, &mode)
178 .await?;
179 }
180 cx.send_request_to(Agent, request)
182 .forward_response_to(responder)
183 }
184 },
185 agent_client_protocol::on_receive_request!(),
186 )
187 .connect_to(client)
188 .await
189 }
190}
191
192#[derive(Default, Debug)]
194struct BridgeListeners {
195 listeners: HashMap<String, BridgeListener>,
196}
197
198#[derive(Clone, Debug)]
199struct BridgeListener {
200 server: McpServer,
201}
202
203impl BridgeListeners {
204 async fn transform_mcp_server(
206 &mut self,
207 connection: ConnectionTo<impl Role>,
208 mcp_server: &mut McpServer,
209 bridge_tx: &mpsc::Sender<BridgeMessage>,
210 mode: &BridgeMode,
211 ) -> Result<(), agent_client_protocol::Error> {
212 let McpServer::Http(http) = mcp_server else {
213 return Ok(());
214 };
215
216 if !http.url.starts_with("acp:") {
217 return Ok(());
218 }
219
220 if !http.headers.is_empty() {
221 return Err(agent_client_protocol::Error::internal_error());
222 }
223
224 let name = http.name.clone();
225 let url = http.url.clone();
226
227 info!(
228 server_name = %name,
229 acp_id = %url,
230 "Detected MCP server with ACP transport, spawning TCP bridge"
231 );
232
233 let transformed = self
234 .spawn_bridge(connection, &name, &url, bridge_tx, mode)
235 .await?;
236 *mcp_server = transformed;
237 Ok(())
238 }
239
240 async fn spawn_bridge(
241 &mut self,
242 connection: ConnectionTo<impl Role>,
243 server_name: &str,
244 acp_id: &str,
245 bridge_tx: &mpsc::Sender<BridgeMessage>,
246 mode: &BridgeMode,
247 ) -> anyhow::Result<McpServer> {
248 if let Some(listener) = self.listeners.get(acp_id) {
249 return Ok(listener.server.clone());
250 }
251
252 let tcp_listener = TcpListener::bind("127.0.0.1:0").await?;
253 let tcp_port = tcp_listener.local_addr()?.port();
254
255 info!(acp_id = acp_id, tcp_port, "Bound listener for MCP bridge");
256
257 let new_server = match mode {
258 BridgeMode::Stdio { conductor_command } => McpServer::Stdio(
259 McpServerStdio::new(
260 server_name.to_string(),
261 PathBuf::from(&conductor_command[0]),
262 )
263 .args(
264 conductor_command[1..]
265 .iter()
266 .cloned()
267 .chain(vec!["mcp".to_string(), format!("{tcp_port}")])
268 .collect::<Vec<_>>(),
269 ),
270 ),
271
272 BridgeMode::Http => McpServer::Http(McpServerHttp::new(
273 server_name.to_string(),
274 format!("http://localhost:{tcp_port}"),
275 )),
276 };
277
278 self.listeners.insert(
279 acp_id.to_string(),
280 BridgeListener {
281 server: new_server.clone(),
282 },
283 );
284
285 connection.spawn({
286 let acp_id = acp_id.to_string();
287 let bridge_tx = bridge_tx.clone();
288 let mode = mode.clone();
289 async move {
290 info!(
291 acp_id = acp_id,
292 tcp_port, "now accepting bridge connections"
293 );
294 match mode {
295 BridgeMode::Stdio {
296 conductor_command: _,
297 } => stdio::run_tcp_listener(tcp_listener, acp_id, bridge_tx).await,
298 BridgeMode::Http => {
299 http::run_http_listener(tcp_listener, acp_id, bridge_tx).await
300 }
301 }
302 }
303 })?;
304
305 Ok(new_server)
306 }
307}
308
309struct BridgeResponder {
311 bridge_tx: mpsc::Sender<BridgeMessage>,
312 bridge_rx: mpsc::Receiver<BridgeMessage>,
313 bridge_connections: HashMap<String, BridgeConnection>,
314}
315
316impl std::fmt::Debug for BridgeResponder {
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 f.debug_struct("BridgeResponder")
319 .field("bridge_connections", &self.bridge_connections.len())
320 .finish_non_exhaustive()
321 }
322}
323
324impl agent_client_protocol::RunWithConnectionTo<Conductor> for BridgeResponder {
325 async fn run_with_connection_to(
326 mut self,
327 connection: ConnectionTo<Conductor>,
328 ) -> Result<(), agent_client_protocol::Error> {
329 use futures::StreamExt;
330
331 while let Some(message) = self.bridge_rx.next().await {
332 match message {
333 BridgeMessage::ConnectionReceived {
334 acp_id,
335 actor,
336 connection: bridge_conn,
337 } => {
338 connection
341 .send_request_to(Client, McpConnectRequest { acp_id, meta: None })
342 .on_receiving_result({
343 let mut bridge_tx = self.bridge_tx.clone();
344 async move |result| match result {
345 Ok(response) => bridge_tx
346 .send(BridgeMessage::ConnectionEstablished {
347 response,
348 actor,
349 connection: bridge_conn,
350 })
351 .await
352 .map_err(|_| agent_client_protocol::Error::internal_error()),
353 Err(_) => Ok(()),
354 }
355 })?;
356 }
357
358 BridgeMessage::ConnectionEstablished {
359 response: McpConnectResponse { connection_id, .. },
360 actor,
361 connection: bridge_conn,
362 } => {
363 self.bridge_connections
364 .insert(connection_id.clone(), bridge_conn);
365 connection.spawn(actor.run(connection_id))?;
366 }
367
368 BridgeMessage::ClientToServer {
369 connection_id,
370 message,
371 } => {
372 let wrapped = message.map(
373 |request, responder| {
374 (
375 McpOverAcpMessage {
376 connection_id: connection_id.clone(),
377 message: request,
378 meta: None,
379 },
380 responder,
381 )
382 },
383 |notification| McpOverAcpMessage {
384 connection_id: connection_id.clone(),
385 message: notification,
386 meta: None,
387 },
388 );
389 connection.send_proxied_message_to(Client, wrapped)?;
390 }
391
392 BridgeMessage::Disconnected { notification } => {
393 self.bridge_connections.remove(¬ification.connection_id);
394 connection.send_notification_to(Client, notification)?;
395 }
396 }
397 }
398 Ok(())
399 }
400}