Skip to main content

agent_client_protocol_polyfill/mcp_over_acp/
mod.rs

1//! MCP-over-ACP polyfill proxy.
2//!
3//! This proxy bridges MCP-over-ACP transport for agents that don't support
4//! `mcpCapabilities.acp` natively. It sits in the proxy chain and:
5//!
6//! - Intercepts `NewSessionRequest` to transform `McpServer::Http` entries with `acp:` URLs
7//!   into localhost TCP bridges
8//! - Handles `_mcp/connect`, `_mcp/message`, `_mcp/disconnect` by routing through those bridges
9//!
10//! # Usage
11//!
12//! ```rust,ignore
13//! use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill;
14//!
15//! // Add to a conductor proxy chain
16//! let conductor = ConductorImpl::new_agent(
17//!     "conductor",
18//!     ProxiesAndAgent::new(my_agent).proxy(McpOverAcpPolyfill::http()),
19//!     McpBridgeMode::default(),
20//! );
21//! ```
22
23mod actor;
24pub(crate) mod http;
25pub(crate) mod stdio;
26
27use std::collections::HashMap;
28use std::path::PathBuf;
29
30use agent_client_protocol::schema::{
31    InitializeProxyRequest, McpConnectRequest, McpConnectResponse, McpDisconnectNotification,
32    McpOverAcpMessage, McpServer, McpServerHttp, McpServerStdio, NewSessionRequest,
33};
34use agent_client_protocol::{
35    Agent, Client, Conductor, ConnectTo, ConnectionTo, Dispatch, Proxy, Role,
36};
37use futures::{SinkExt, channel::mpsc};
38use tokio::net::TcpListener;
39use tracing::info;
40
41use self::actor::BridgeConnectionActor;
42
43/// Internal messages for the polyfill's bridge management.
44#[derive(Debug)]
45pub(crate) enum BridgeMessage {
46    /// A new TCP connection was accepted and needs an ACP connection ID.
47    ConnectionReceived {
48        acp_id: String,
49        actor: BridgeConnectionActor,
50        connection: BridgeConnection,
51    },
52
53    /// ACP connection ID received — spawn the actor and store the connection.
54    ConnectionEstablished {
55        response: McpConnectResponse,
56        actor: BridgeConnectionActor,
57        connection: BridgeConnection,
58    },
59
60    /// MCP message from a bridge client that needs to be forwarded over ACP.
61    ClientToServer {
62        connection_id: String,
63        message: Dispatch,
64    },
65
66    /// Bridge client disconnected.
67    Disconnected {
68        notification: McpDisconnectNotification,
69    },
70}
71
72/// Connection handle for sending messages to an MCP client via a bridge.
73#[derive(Clone, Debug)]
74#[allow(dead_code)]
75pub(crate) struct BridgeConnection {
76    to_mcp_client_tx: mpsc::Sender<Dispatch>,
77}
78
79impl BridgeConnection {
80    pub fn new(to_mcp_client_tx: mpsc::Sender<Dispatch>) -> Self {
81        Self { to_mcp_client_tx }
82    }
83
84    #[allow(dead_code)]
85    pub async fn send(&mut self, message: Dispatch) -> Result<(), agent_client_protocol::Error> {
86        self.to_mcp_client_tx
87            .send(message)
88            .await
89            .map_err(|_| agent_client_protocol::Error::internal_error())
90    }
91}
92
93/// Mode for the MCP bridge transport.
94#[derive(Debug, Clone, Default)]
95pub enum BridgeMode {
96    /// Use stdio-based MCP bridge with a subprocess.
97    Stdio {
98        /// Command and args to spawn bridge processes.
99        conductor_command: Vec<String>,
100    },
101
102    /// Use HTTP-based MCP bridge (default).
103    #[default]
104    Http,
105}
106
107/// MCP-over-ACP polyfill proxy.
108///
109/// Bridges MCP-over-ACP transport for agents that don't support `mcpCapabilities.acp`.
110#[derive(Debug)]
111pub struct McpOverAcpPolyfill {
112    mode: BridgeMode,
113}
114
115impl McpOverAcpPolyfill {
116    /// Create a polyfill using HTTP bridge mode.
117    #[must_use]
118    pub fn http() -> Self {
119        Self {
120            mode: BridgeMode::Http,
121        }
122    }
123
124    /// Create a polyfill using stdio bridge mode.
125    #[must_use]
126    pub fn stdio(conductor_command: Vec<String>) -> Self {
127        Self {
128            mode: BridgeMode::Stdio { conductor_command },
129        }
130    }
131}
132
133impl ConnectTo<Conductor> for McpOverAcpPolyfill {
134    async fn connect_to(
135        self,
136        client: impl ConnectTo<Proxy>,
137    ) -> Result<(), agent_client_protocol::Error> {
138        let (bridge_tx, bridge_rx) = mpsc::channel(128);
139        let mode = self.mode;
140
141        Proxy
142            .builder()
143            .name("mcp-over-acp-polyfill")
144            .with_responder(BridgeResponder {
145                bridge_tx: bridge_tx.clone(),
146                bridge_rx,
147                bridge_connections: HashMap::new(),
148            })
149            .on_receive_request_from(
150                Client,
151                async move |request: InitializeProxyRequest,
152                            responder,
153                            cx: ConnectionTo<Conductor>| {
154                    // Forward initialize to successor, then set mcpCapabilities.acp = true
155                    // in the response to advertise that we handle MCP-over-ACP.
156                    cx.send_request_to(Agent, request.initialize)
157                        .on_receiving_result(async move |result| {
158                            responder.respond_with_result(result.map(|mut response| {
159                                response.agent_capabilities.mcp_capabilities.acp = true;
160                                response
161                            }))
162                        })
163                },
164                agent_client_protocol::on_receive_request!(),
165            )
166            .on_receive_request_from(
167                Client,
168                {
169                    let bridge_tx = bridge_tx.clone();
170                    async move |mut request: NewSessionRequest,
171                                responder,
172                                cx: ConnectionTo<Conductor>| {
173                        // Transform acp: URLs in MCP servers
174                        let mut listeners = BridgeListeners::default();
175                        for mcp_server in &mut request.mcp_servers {
176                            listeners
177                                .transform_mcp_server(cx.clone(), mcp_server, &bridge_tx, &mode)
178                                .await?;
179                        }
180                        // Forward modified request to successor
181                        cx.send_request_to(Agent, request)
182                            .forward_response_to(responder)
183                    }
184                },
185                agent_client_protocol::on_receive_request!(),
186            )
187            .connect_to(client)
188            .await
189    }
190}
191
192/// Manages active bridge listeners (TCP listeners for acp: URLs).
193#[derive(Default, Debug)]
194struct BridgeListeners {
195    listeners: HashMap<String, BridgeListener>,
196}
197
198#[derive(Clone, Debug)]
199struct BridgeListener {
200    server: McpServer,
201}
202
203impl BridgeListeners {
204    /// Transform an MCP server with `acp:` URL into a bridged localhost server.
205    async fn transform_mcp_server(
206        &mut self,
207        connection: ConnectionTo<impl Role>,
208        mcp_server: &mut McpServer,
209        bridge_tx: &mpsc::Sender<BridgeMessage>,
210        mode: &BridgeMode,
211    ) -> Result<(), agent_client_protocol::Error> {
212        let McpServer::Http(http) = mcp_server else {
213            return Ok(());
214        };
215
216        if !http.url.starts_with("acp:") {
217            return Ok(());
218        }
219
220        if !http.headers.is_empty() {
221            return Err(agent_client_protocol::Error::internal_error());
222        }
223
224        let name = http.name.clone();
225        let url = http.url.clone();
226
227        info!(
228            server_name = %name,
229            acp_id = %url,
230            "Detected MCP server with ACP transport, spawning TCP bridge"
231        );
232
233        let transformed = self
234            .spawn_bridge(connection, &name, &url, bridge_tx, mode)
235            .await?;
236        *mcp_server = transformed;
237        Ok(())
238    }
239
240    async fn spawn_bridge(
241        &mut self,
242        connection: ConnectionTo<impl Role>,
243        server_name: &str,
244        acp_id: &str,
245        bridge_tx: &mpsc::Sender<BridgeMessage>,
246        mode: &BridgeMode,
247    ) -> anyhow::Result<McpServer> {
248        if let Some(listener) = self.listeners.get(acp_id) {
249            return Ok(listener.server.clone());
250        }
251
252        let tcp_listener = TcpListener::bind("127.0.0.1:0").await?;
253        let tcp_port = tcp_listener.local_addr()?.port();
254
255        info!(acp_id = acp_id, tcp_port, "Bound listener for MCP bridge");
256
257        let new_server = match mode {
258            BridgeMode::Stdio { conductor_command } => McpServer::Stdio(
259                McpServerStdio::new(
260                    server_name.to_string(),
261                    PathBuf::from(&conductor_command[0]),
262                )
263                .args(
264                    conductor_command[1..]
265                        .iter()
266                        .cloned()
267                        .chain(vec!["mcp".to_string(), format!("{tcp_port}")])
268                        .collect::<Vec<_>>(),
269                ),
270            ),
271
272            BridgeMode::Http => McpServer::Http(McpServerHttp::new(
273                server_name.to_string(),
274                format!("http://localhost:{tcp_port}"),
275            )),
276        };
277
278        self.listeners.insert(
279            acp_id.to_string(),
280            BridgeListener {
281                server: new_server.clone(),
282            },
283        );
284
285        connection.spawn({
286            let acp_id = acp_id.to_string();
287            let bridge_tx = bridge_tx.clone();
288            let mode = mode.clone();
289            async move {
290                info!(
291                    acp_id = acp_id,
292                    tcp_port, "now accepting bridge connections"
293                );
294                match mode {
295                    BridgeMode::Stdio {
296                        conductor_command: _,
297                    } => stdio::run_tcp_listener(tcp_listener, acp_id, bridge_tx).await,
298                    BridgeMode::Http => {
299                        http::run_http_listener(tcp_listener, acp_id, bridge_tx).await
300                    }
301                }
302            }
303        })?;
304
305        Ok(new_server)
306    }
307}
308
309/// Responder that runs alongside the proxy, managing bridge state.
310struct BridgeResponder {
311    bridge_tx: mpsc::Sender<BridgeMessage>,
312    bridge_rx: mpsc::Receiver<BridgeMessage>,
313    bridge_connections: HashMap<String, BridgeConnection>,
314}
315
316impl std::fmt::Debug for BridgeResponder {
317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318        f.debug_struct("BridgeResponder")
319            .field("bridge_connections", &self.bridge_connections.len())
320            .finish_non_exhaustive()
321    }
322}
323
324impl agent_client_protocol::RunWithConnectionTo<Conductor> for BridgeResponder {
325    async fn run_with_connection_to(
326        mut self,
327        connection: ConnectionTo<Conductor>,
328    ) -> Result<(), agent_client_protocol::Error> {
329        use futures::StreamExt;
330
331        while let Some(message) = self.bridge_rx.next().await {
332            match message {
333                BridgeMessage::ConnectionReceived {
334                    acp_id,
335                    actor,
336                    connection: bridge_conn,
337                } => {
338                    // Send _mcp/connect request back through the chain.
339                    // When the response arrives, send ConnectionEstablished back to ourselves.
340                    connection
341                        .send_request_to(Client, McpConnectRequest { acp_id, meta: None })
342                        .on_receiving_result({
343                            let mut bridge_tx = self.bridge_tx.clone();
344                            async move |result| match result {
345                                Ok(response) => bridge_tx
346                                    .send(BridgeMessage::ConnectionEstablished {
347                                        response,
348                                        actor,
349                                        connection: bridge_conn,
350                                    })
351                                    .await
352                                    .map_err(|_| agent_client_protocol::Error::internal_error()),
353                                Err(_) => Ok(()),
354                            }
355                        })?;
356                }
357
358                BridgeMessage::ConnectionEstablished {
359                    response: McpConnectResponse { connection_id, .. },
360                    actor,
361                    connection: bridge_conn,
362                } => {
363                    self.bridge_connections
364                        .insert(connection_id.clone(), bridge_conn);
365                    connection.spawn(actor.run(connection_id))?;
366                }
367
368                BridgeMessage::ClientToServer {
369                    connection_id,
370                    message,
371                } => {
372                    let wrapped = message.map(
373                        |request, responder| {
374                            (
375                                McpOverAcpMessage {
376                                    connection_id: connection_id.clone(),
377                                    message: request,
378                                    meta: None,
379                                },
380                                responder,
381                            )
382                        },
383                        |notification| McpOverAcpMessage {
384                            connection_id: connection_id.clone(),
385                            message: notification,
386                            meta: None,
387                        },
388                    );
389                    connection.send_proxied_message_to(Client, wrapped)?;
390                }
391
392                BridgeMessage::Disconnected { notification } => {
393                    self.bridge_connections.remove(&notification.connection_id);
394                    connection.send_notification_to(Client, notification)?;
395                }
396            }
397        }
398        Ok(())
399    }
400}