Skip to main content

agent_client_protocol_conductor/
conductor.rs

1//! # Conductor: ACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//!    requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//!    to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//!     "my-conductor",
73//!     vec![proxy1, proxy2, agent],
74//!     None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//!     "my-conductor",
87//!     |cx, conductor_tx, init_req| async move {
88//!         // Examine capabilities
89//!         let needs_auth = has_auth_capability(&init_req);
90//!
91//!         let mut components = Vec::new();
92//!         if needs_auth {
93//!             components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//!         }
95//!         components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//!         // Return (potentially modified) request and component list
98//!         Ok((init_req, components))
99//!     },
100//!     None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &ConnectionTo` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<ConnectionTo>` of spawned components
112
113use std::{collections::HashMap, sync::Arc};
114
115use agent_client_protocol::{
116    Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
117    Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
118};
119use agent_client_protocol::{
120    Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest, UntypedMessage,
121};
122use agent_client_protocol::{
123    HandleDispatchFrom,
124    schema::{InitializeProxyRequest, InitializeRequest, NewSessionRequest},
125    util::MatchDispatchFrom,
126};
127use agent_client_protocol::{
128    Handled,
129    schema::{
130        McpConnectRequest, McpConnectResponse, McpDisconnectNotification, McpOverAcpMessage,
131        SuccessorMessage,
132    },
133};
134use futures::{
135    SinkExt, StreamExt,
136    channel::mpsc::{self},
137};
138use tracing::{debug, info};
139
140use crate::conductor::mcp_bridge::{
141    McpBridgeConnection, McpBridgeConnectionActor, McpBridgeListeners,
142};
143
144mod mcp_bridge;
145
146/// The conductor manages the proxy chain lifecycle and message routing.
147///
148/// It maintains connections to all components in the chain and routes messages
149/// bidirectionally between the editor, components, and agent.
150///
151#[derive(Debug)]
152pub struct ConductorImpl<Host: ConductorHostRole> {
153    host: Host,
154    name: String,
155    instantiator: Host::Instantiator,
156    mcp_bridge_mode: crate::McpBridgeMode,
157    trace_writer: Option<crate::trace::TraceWriter>,
158}
159
160impl<Host: ConductorHostRole> ConductorImpl<Host> {
161    pub fn new(
162        host: Host,
163        name: impl ToString,
164        instantiator: Host::Instantiator,
165        mcp_bridge_mode: crate::McpBridgeMode,
166    ) -> Self {
167        ConductorImpl {
168            name: name.to_string(),
169            host,
170            instantiator,
171            mcp_bridge_mode,
172            trace_writer: None,
173        }
174    }
175}
176
177impl ConductorImpl<Agent> {
178    /// Create a conductor in agent mode (the last component is an agent).
179    pub fn new_agent(
180        name: impl ToString,
181        instantiator: impl InstantiateProxiesAndAgent + 'static,
182        mcp_bridge_mode: crate::McpBridgeMode,
183    ) -> Self {
184        ConductorImpl::new(Agent, name, Box::new(instantiator), mcp_bridge_mode)
185    }
186}
187
188impl ConductorImpl<Proxy> {
189    /// Create a conductor in proxy mode (forwards to another conductor).
190    pub fn new_proxy(
191        name: impl ToString,
192        instantiator: impl InstantiateProxies + 'static,
193        mcp_bridge_mode: crate::McpBridgeMode,
194    ) -> Self {
195        ConductorImpl::new(Proxy, name, Box::new(instantiator), mcp_bridge_mode)
196    }
197}
198
199impl<Host: ConductorHostRole> ConductorImpl<Host> {
200    /// Enable trace logging to a custom destination.
201    ///
202    /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
203    #[must_use]
204    pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
205        self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
206        self
207    }
208
209    /// Enable trace logging to a file path.
210    ///
211    /// Events will be written as newline-delimited JSON (`.jsons` format).
212    /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
213    pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
214        self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
215        Ok(self)
216    }
217
218    /// Enable trace logging with an existing TraceWriter.
219    #[must_use]
220    pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
221        self.trace_writer = Some(writer);
222        self
223    }
224
225    /// Run the conductor with a transport.
226    pub async fn run(
227        self,
228        transport: impl ConnectTo<Host>,
229    ) -> Result<(), agent_client_protocol::Error> {
230        let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
231
232        // Set up tracing if enabled - spawn writer task and get handle
233        let trace_handle;
234        let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
235        if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
236            trace_handle = Some(h);
237            trace_future = Box::pin(f);
238        } else {
239            trace_handle = None;
240            trace_future = Box::pin(std::future::ready(Ok(())));
241        }
242
243        let responder = ConductorResponder {
244            conductor_rx,
245            conductor_tx: conductor_tx.clone(),
246            instantiator: Some(self.instantiator),
247            bridge_listeners: McpBridgeListeners::default(),
248            bridge_connections: HashMap::default(),
249            mcp_bridge_mode: self.mcp_bridge_mode,
250            proxies: Vec::default(),
251            successor: Arc::new(agent_client_protocol::util::internal_error(
252                "successor not initialized",
253            )),
254            trace_handle,
255            host: self.host.clone(),
256        };
257
258        Builder::new_with(
259            self.host.clone(),
260            ConductorMessageHandler {
261                conductor_tx,
262                host: self.host.clone(),
263            },
264        )
265        .name(self.name)
266        .with_responder(responder)
267        .with_spawned(|_cx| trace_future)
268        .connect_to(transport)
269        .await
270    }
271
272    async fn incoming_message_from_client(
273        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
274        message: Dispatch,
275    ) -> Result<(), agent_client_protocol::Error> {
276        conductor_tx
277            .send(ConductorMessage::LeftToRight {
278                target_component_index: 0,
279                message,
280            })
281            .await
282            .map_err(agent_client_protocol::util::internal_error)
283    }
284
285    async fn incoming_message_from_agent(
286        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
287        message: Dispatch,
288    ) -> Result<(), agent_client_protocol::Error> {
289        conductor_tx
290            .send(ConductorMessage::RightToLeft {
291                source_component_index: SourceComponentIndex::Successor,
292                message,
293            })
294            .await
295            .map_err(agent_client_protocol::util::internal_error)
296    }
297}
298
299impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
300    async fn connect_to(
301        self,
302        client: impl ConnectTo<Host>,
303    ) -> Result<(), agent_client_protocol::Error> {
304        self.run(client).await
305    }
306}
307
308struct ConductorMessageHandler<Host: ConductorHostRole> {
309    conductor_tx: mpsc::Sender<ConductorMessage>,
310    host: Host,
311}
312
313impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
314    for ConductorMessageHandler<Host>
315{
316    async fn handle_dispatch_from(
317        &mut self,
318        message: Dispatch,
319        connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
320    ) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
321        self.host
322            .handle_dispatch(message, connection, &mut self.conductor_tx)
323            .await
324    }
325
326    fn describe_chain(&self) -> impl std::fmt::Debug {
327        "ConductorMessageHandler"
328    }
329}
330
331/// The conductor manages the proxy chain lifecycle and message routing.
332///
333/// It maintains connections to all components in the chain and routes messages
334/// bidirectionally between the editor, components, and agent.
335///
336pub struct ConductorResponder<Host>
337where
338    Host: ConductorHostRole,
339{
340    conductor_rx: mpsc::Receiver<ConductorMessage>,
341
342    conductor_tx: mpsc::Sender<ConductorMessage>,
343
344    /// Manages the TCP listeners for MCP connections that will be proxied over ACP.
345    bridge_listeners: McpBridgeListeners,
346
347    /// Manages active connections to MCP clients.
348    bridge_connections: HashMap<String, McpBridgeConnection>,
349
350    /// The instantiator for lazy initialization.
351    /// Set to None after components are instantiated.
352    instantiator: Option<Host::Instantiator>,
353
354    /// The chain of proxies before the agent (if any).
355    ///
356    /// Populated lazily when the first Initialize request is received.
357    proxies: Vec<ConnectionTo<Proxy>>,
358
359    /// If the conductor is operating in agent mode, this will direct messages to the agent.
360    /// If the conductor is operating in proxy mode, this will direct messages to the successor.
361    /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
362    successor: Arc<dyn ConductorSuccessor<Host>>,
363
364    /// Mode for the MCP bridge (determines how to spawn bridge processes).
365    mcp_bridge_mode: crate::McpBridgeMode,
366
367    /// Optional trace handle for sequence diagram visualization.
368    trace_handle: Option<crate::trace::TraceHandle>,
369
370    /// Defines what sort of link we have
371    host: Host,
372}
373
374impl<Host> std::fmt::Debug for ConductorResponder<Host>
375where
376    Host: ConductorHostRole,
377{
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        f.debug_struct("ConductorResponder")
380            .field("conductor_rx", &self.conductor_rx)
381            .field("conductor_tx", &self.conductor_tx)
382            .field("bridge_listeners", &self.bridge_listeners)
383            .field("bridge_connections", &self.bridge_connections)
384            .field("proxies", &self.proxies)
385            .field("mcp_bridge_mode", &self.mcp_bridge_mode)
386            .field("trace_handle", &self.trace_handle)
387            .field("host", &self.host)
388            .finish_non_exhaustive()
389    }
390}
391
392impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
393where
394    Host: ConductorHostRole,
395{
396    async fn run_with_connection_to(
397        mut self,
398        connection: ConnectionTo<Host::Counterpart>,
399    ) -> Result<(), agent_client_protocol::Error> {
400        // Components are now spawned lazily in forward_initialize_request
401        // when the first Initialize request is received.
402
403        // This is the "central actor" of the conductor. Most other things forward messages
404        // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
405        while let Some(message) = self.conductor_rx.next().await {
406            self.handle_conductor_message(connection.clone(), message)
407                .await?;
408        }
409        Ok(())
410    }
411}
412
413impl<Host> ConductorResponder<Host>
414where
415    Host: ConductorHostRole,
416{
417    /// Recursively spawns components and builds the proxy chain.
418    ///
419    /// This function implements the recursive chain building pattern:
420    /// 1. Pop the next component from the `providers` list
421    /// 2. Create the component (either spawn subprocess or use mock)
422    /// 3. Set up JSON-RPC connection and message handlers
423    /// 4. Recursively call itself to spawn the next component
424    /// 5. When no components remain, start the message routing loop via `serve()`
425    ///
426    /// Central message handling logic for the conductor.
427    /// The conductor routes all [`ConductorMessage`] messages through to this function.
428    /// Each message corresponds to a request or notification from one component to another.
429    /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
430    /// Note that *responses to requests* are sent *directly* without going through this loop.
431    ///
432    /// The names we use are
433    ///
434    /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
435    /// * Then there is a sequence of *components* consisting of:
436    ///     * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
437    ///     * And finally the *agent*, which is the final component in the chain and handles the actual work.
438    ///
439    /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
440    ///
441    /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
442    /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
443    ///   through a stdio server that runs on localhost and bridges messages.
444    async fn handle_conductor_message(
445        &mut self,
446        client: ConnectionTo<Host::Counterpart>,
447        message: ConductorMessage,
448    ) -> Result<(), agent_client_protocol::Error> {
449        tracing::debug!(?message, "handle_conductor_message");
450
451        match message {
452            ConductorMessage::LeftToRight {
453                target_component_index,
454                message,
455            } => {
456                // Tracing happens inside forward_client_to_agent_message, after initialization,
457                // so that component_name() has access to the populated proxies list.
458                self.forward_client_to_agent_message(target_component_index, message, client)
459                    .await
460            }
461
462            ConductorMessage::RightToLeft {
463                source_component_index,
464                message,
465            } => {
466                tracing::debug!(
467                    ?source_component_index,
468                    message_method = ?message.method(),
469                    "Conductor: AgentToClient received"
470                );
471                self.send_message_to_predecessor_of(client, source_component_index, message)
472            }
473
474            // New MCP connection request. Send it back along the chain to get a connection id.
475            // When the connection id arrives, send a message back into this conductor loop with
476            // the connection id and the (as yet unspawned) actor.
477            ConductorMessage::McpConnectionReceived {
478                acp_url,
479                connection,
480                actor,
481            } => {
482                // MCP connection requests always come from the agent
483                // (we must be in agent mode, in fact), so send the MCP request
484                // to the final proxy.
485                self.send_request_to_predecessor_of(
486                    client,
487                    self.proxies.len(),
488                    McpConnectRequest {
489                        acp_url,
490                        meta: None,
491                    },
492                )
493                .on_receiving_result({
494                    let mut conductor_tx = self.conductor_tx.clone();
495                    async move |result| {
496                        match result {
497                            Ok(response) => conductor_tx
498                                .send(ConductorMessage::McpConnectionEstablished {
499                                    response,
500                                    actor,
501                                    connection,
502                                })
503                                .await
504                                .map_err(|_| agent_client_protocol::Error::internal_error()),
505                            Err(_) => {
506                                // Error occurred, just drop the connection.
507                                Ok(())
508                            }
509                        }
510                    }
511                })
512            }
513
514            // MCP connection successfully established. Spawn the actor
515            // and insert the connection into our map for future reference.
516            ConductorMessage::McpConnectionEstablished {
517                response: McpConnectResponse { connection_id, .. },
518                actor,
519                connection,
520            } => {
521                self.bridge_connections
522                    .insert(connection_id.clone(), connection);
523                client.spawn(actor.run(connection_id))
524            }
525
526            // Message meant for the MCP client received. Forward it to the appropriate actor's mailbox.
527            ConductorMessage::McpClientToMcpServer {
528                connection_id,
529                message,
530            } => {
531                let wrapped = message.map(
532                    |request, responder| {
533                        (
534                            McpOverAcpMessage {
535                                connection_id: connection_id.clone(),
536                                message: request,
537                                meta: None,
538                            },
539                            responder,
540                        )
541                    },
542                    |notification| McpOverAcpMessage {
543                        connection_id: connection_id.clone(),
544                        message: notification,
545                        meta: None,
546                    },
547                );
548
549                // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent,
550                // so send them to the final proxy.
551                self.send_message_to_predecessor_of(
552                    client,
553                    SourceComponentIndex::Successor,
554                    wrapped,
555                )
556            }
557
558            // MCP client disconnected. Remove it from our map and send the
559            // notification backwards along the chain.
560            ConductorMessage::McpConnectionDisconnected { notification } => {
561                // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent.
562
563                self.bridge_connections.remove(&notification.connection_id);
564                self.send_notification_to_predecessor_of(client, self.proxies.len(), notification)
565            }
566        }
567    }
568
569    /// Send a message (request or notification) to the predecessor of the given component.
570    ///
571    /// This is a bit subtle because the relationship of the conductor
572    /// is different depending on who will be receiving the message:
573    /// * If the message is going to the conductor's client, then no changes
574    ///   are needed, as the conductor is sending an agent-to-client message and
575    ///   the conductor is acting as the agent.
576    /// * If the message is going to a proxy component, then we have to wrap
577    ///   it in a "from successor" wrapper, because the conductor is the
578    ///   proxy's client.
579    fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
580        &mut self,
581        client: ConnectionTo<Host::Counterpart>,
582        source_component_index: SourceComponentIndex,
583        message: Dispatch<Req, N>,
584    ) -> Result<(), agent_client_protocol::Error>
585    where
586        Req::Response: Send,
587    {
588        let source_component_index = match source_component_index {
589            SourceComponentIndex::Successor => self.proxies.len(),
590            SourceComponentIndex::Proxy(index) => index,
591        };
592
593        match message {
594            Dispatch::Request(request, responder) => self
595                .send_request_to_predecessor_of(client, source_component_index, request)
596                .forward_response_to(responder),
597            Dispatch::Notification(notification) => self.send_notification_to_predecessor_of(
598                client,
599                source_component_index,
600                notification,
601            ),
602            Dispatch::Response(result, router) => router.respond_with_result(result),
603        }
604    }
605
606    fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
607        &mut self,
608        client_connection: ConnectionTo<Host::Counterpart>,
609        source_component_index: usize,
610        request: Req,
611    ) -> SentRequest<Req::Response> {
612        if source_component_index == 0 {
613            client_connection.send_request_to(Client, request)
614        } else {
615            self.proxies[source_component_index - 1].send_request(SuccessorMessage {
616                message: request,
617                meta: None,
618            })
619        }
620    }
621
622    /// Send a notification to the predecessor of the given component.
623    ///
624    /// This is a bit subtle because the relationship of the conductor
625    /// is different depending on who will be receiving the message:
626    /// * If the notification is going to the conductor's client, then no changes
627    ///   are needed, as the conductor is sending an agent-to-client message and
628    ///   the conductor is acting as the agent.
629    /// * If the notification is going to a proxy component, then we have to wrap
630    ///   it in a "from successor" wrapper, because the conductor is the
631    ///   proxy's client.
632    fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
633        &mut self,
634        client: ConnectionTo<Host::Counterpart>,
635        source_component_index: usize,
636        notification: N,
637    ) -> Result<(), agent_client_protocol::Error> {
638        tracing::debug!(
639            source_component_index,
640            proxies_len = self.proxies.len(),
641            "send_notification_to_predecessor_of"
642        );
643        if source_component_index == 0 {
644            tracing::debug!("Sending notification directly to client");
645            client.send_notification_to(Client, notification)
646        } else {
647            tracing::debug!(
648                target_proxy = source_component_index - 1,
649                "Sending notification wrapped as SuccessorMessage to proxy"
650            );
651            self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
652                message: notification,
653                meta: None,
654            })
655        }
656    }
657
658    /// Send a message (request or notification) from 'left to right'.
659    /// Left-to-right means from the client or an intermediate proxy to the component
660    /// at `target_component_index` (could be a proxy or the agent).
661    /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
662    async fn forward_client_to_agent_message(
663        &mut self,
664        target_component_index: usize,
665        message: Dispatch,
666        client: ConnectionTo<Host::Counterpart>,
667    ) -> Result<(), agent_client_protocol::Error> {
668        tracing::trace!(
669            target_component_index,
670            ?message,
671            "forward_client_to_agent_message"
672        );
673
674        // Ensure components are initialized before processing any message.
675        let message = self.ensure_initialized(client.clone(), message).await?;
676
677        // In proxy mode, if the target is beyond our component chain,
678        // forward to the conductor's own successor (via client connection)
679        if target_component_index < self.proxies.len() {
680            self.forward_message_from_client_to_proxy(target_component_index, message)
681                .await
682        } else {
683            assert_eq!(target_component_index, self.proxies.len());
684
685            debug!(
686                target_component_index,
687                proxies_count = self.proxies.len(),
688                "Proxy mode: forwarding successor message to conductor's successor"
689            );
690            let successor = self.successor.clone();
691            successor.send_message(message, client, self).await
692        }
693    }
694
695    /// Ensures components are initialized before processing messages.
696    ///
697    /// If components haven't been initialized yet, this expects the first message
698    /// to be an `initialize` request and uses it to spawn the component chain.
699    ///
700    /// Returns:
701    /// - `Ok(Some(message))` - Components are initialized, continue processing this message
702    /// - `Ok(None)` - An error response was sent, caller should return early
703    /// - `Err(_)` - A fatal error occurred
704    async fn ensure_initialized(
705        &mut self,
706        client: ConnectionTo<Host::Counterpart>,
707        message: Dispatch,
708    ) -> Result<Dispatch, Error> {
709        // Already initialized - pass through
710        let Some(instantiator) = self.instantiator.take() else {
711            return Ok(message);
712        };
713
714        let host = self.host.clone();
715        let message = host.initialize(message, client, instantiator, self).await?;
716        Ok(message)
717    }
718
719    /// Wrap a proxy component with tracing if tracing is enabled.
720    ///
721    /// Returns the component unchanged if tracing is disabled.
722    fn trace_proxy(
723        &self,
724        proxy_index: ComponentIndex,
725        successor_index: ComponentIndex,
726        component: impl ConnectTo<Conductor>,
727    ) -> DynConnectTo<Conductor> {
728        match &self.trace_handle {
729            Some(trace_handle) => {
730                trace_handle.bridge_component(proxy_index, successor_index, component)
731            }
732            None => DynConnectTo::new(component),
733        }
734    }
735
736    /// Spawn proxy components and add them to the proxies list.
737    fn spawn_proxies(
738        &mut self,
739        client: ConnectionTo<Host::Counterpart>,
740        proxy_components: Vec<DynConnectTo<Conductor>>,
741    ) -> Result<(), agent_client_protocol::Error> {
742        assert!(self.proxies.is_empty());
743
744        let num_proxies = proxy_components.len();
745        info!(proxy_count = num_proxies, "spawn_proxies");
746
747        // Special case: if there are no user-defined proxies
748        // but tracing is enabled, we make a dummy proxy that just
749        // passes through messages but which can trigger the
750        // tracing events.
751        if self.trace_handle.is_some() && num_proxies == 0 {
752            self.connect_to_proxy(
753                &client,
754                0,
755                ComponentIndex::Client,
756                ComponentIndex::Agent,
757                Proxy.builder(),
758            )?;
759        } else {
760            // Spawn each proxy component
761            for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
762                debug!(component_index, "spawning proxy");
763
764                self.connect_to_proxy(
765                    &client,
766                    component_index,
767                    ComponentIndex::Proxy(component_index),
768                    ComponentIndex::successor_of(component_index, num_proxies),
769                    dyn_component,
770                )?;
771            }
772        }
773
774        info!(proxy_count = self.proxies.len(), "Proxies spawned");
775
776        Ok(())
777    }
778
779    /// Create a connection to the proxy with index `component_index` implemented in `component`.
780    ///
781    /// If tracing is enabled, the proxy's index is `trace_proxy_index` and its successor is `trace_successor_index`.
782    fn connect_to_proxy(
783        &mut self,
784        client: &ConnectionTo<Host::Counterpart>,
785        component_index: usize,
786        trace_proxy_index: ComponentIndex,
787        trace_successor_index: ComponentIndex,
788        component: impl ConnectTo<Conductor>,
789    ) -> Result<(), Error> {
790        let connection_builder = self.connection_to_proxy(component_index);
791        let connect_component =
792            self.trace_proxy(trace_proxy_index, trace_successor_index, component);
793        let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
794        self.proxies.push(proxy_connection);
795        Ok(())
796    }
797
798    /// Create the conductor's connection to the proxy with index `component_index`.
799    ///
800    /// Outgoing messages received from the proxy are sent to `self.conductor_tx` as either
801    /// left-to-right or right-to-left messages depending on whether they are wrapped
802    /// in `SuccessorMessage`.
803    fn connection_to_proxy(
804        &mut self,
805        component_index: usize,
806    ) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
807        type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
808        let mut conductor_tx = self.conductor_tx.clone();
809        Conductor
810            .builder()
811            .name(format!("conductor-to-component({component_index})"))
812            // Intercept messages sent by the proxy.
813            .on_receive_dispatch(
814                async move |dispatch: Dispatch, _connection| {
815                    MatchDispatch::new(dispatch)
816                        .if_message(async |dispatch: SuccessorDispatch| {
817                            //                         ------------------
818                            // SuccessorMessages sent by the proxy go to its successor.
819                            //
820                            // Subtle point:
821                            //
822                            // `ConductorToProxy` has only a single peer, `Agent`. This means that we see
823                            // "successor messages" in their "desugared form". So when we intercept an *outgoing*
824                            // message that matches `SuccessorMessage`, it could be one of three things
825                            //
826                            // - A request being sent by the proxy to its successor (hence going left->right)
827                            // - A notification being sent by the proxy to its successor (hence going left->right)
828                            // - A response to a request sent to the proxy *by* its successor. Here, the *request*
829                            //   was going right->left, but the *response* (the message we are processing now)
830                            //   is going left->right.
831                            //
832                            // So, in all cases, we forward as a left->right message.
833
834                            conductor_tx
835                                .send(ConductorMessage::LeftToRight {
836                                    target_component_index: component_index + 1,
837                                    message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
838                                })
839                                .await
840                                .map_err(agent_client_protocol::util::internal_error)
841                        })
842                        .await
843                        .otherwise(async |dispatch| {
844                            // Other messagrs send by the proxy go its predecessor.
845                            // As in the previous handler:
846                            //
847                            // Messages here are seen in their "desugared form", so we are seeing
848                            // one of three things
849                            //
850                            // - A request being sent by the proxy to its predecessor (hence going right->left)
851                            // - A notification being sent by the proxy to its predecessor (hence going right->left)
852                            // - A response to a request sent to the proxy *by* its predecessor. Here, the *request*
853                            //   was going left->right, but the *response* (the message we are processing now)
854                            //   is going right->left.
855                            //
856                            // So, in all cases, we forward as a right->left message.
857
858                            let message = ConductorMessage::RightToLeft {
859                                source_component_index: SourceComponentIndex::Proxy(
860                                    component_index,
861                                ),
862                                message: dispatch,
863                            };
864                            conductor_tx
865                                .send(message)
866                                .await
867                                .map_err(agent_client_protocol::util::internal_error)
868                        })
869                        .await
870                },
871                agent_client_protocol::on_receive_dispatch!(),
872            )
873    }
874
875    async fn forward_message_from_client_to_proxy(
876        &mut self,
877        target_component_index: usize,
878        message: Dispatch,
879    ) -> Result<(), agent_client_protocol::Error> {
880        tracing::debug!(?message, "forward_message_to_proxy");
881
882        MatchDispatch::new(message)
883            .if_request(async |_request: InitializeProxyRequest, responder| {
884                responder.respond_with_error(
885                    agent_client_protocol::Error::invalid_request()
886                        .data("initialize/proxy requests are only sent by the conductor"),
887                )
888            })
889            .await
890            .if_request(async |request: InitializeRequest, responder| {
891                // The pattern for `Initialize` messages is a bit subtle.
892                // Proxy receive incoming `Initialize` messages as if they
893                // were a client. The conductor (us) intercepts these and
894                // converts them to an `InitializeProxyRequest`.
895                //
896                // The proxy will then initialize itself and forward an `Initialize`
897                // request to its successor.
898                self.proxies[target_component_index]
899                    .send_request(InitializeProxyRequest::from(request))
900                    .on_receiving_result(async move |result| {
901                        tracing::debug!(?result, "got initialize_proxy response from proxy");
902                        responder.respond_with_result(result)
903                    })
904            })
905            .await
906            .otherwise(async |message| {
907                // Otherwise, just send the message along "as is".
908                self.proxies[target_component_index].send_proxied_message(message)
909            })
910            .await
911    }
912
913    /// Invoked when sending a message from the conductor to the agent that it manages.
914    /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
915    /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
916    /// running as a proxy).
917    async fn forward_message_to_agent(
918        &mut self,
919        client_connection: ConnectionTo<Host::Counterpart>,
920        message: Dispatch,
921        agent_connection: ConnectionTo<Agent>,
922    ) -> Result<(), Error> {
923        MatchDispatch::new(message)
924            .if_request(async |_request: InitializeProxyRequest, responder| {
925                responder.respond_with_error(
926                    agent_client_protocol::Error::invalid_request()
927                        .data("initialize/proxy requests are only sent by the conductor"),
928                )
929            })
930            .await
931            .if_request(async |mut request: NewSessionRequest, responder| {
932                // When forwarding "session/new" to the agent,
933                // we adjust MCP servers to manage "acp:" URLs.
934                for mcp_server in &mut request.mcp_servers {
935                    self.bridge_listeners
936                        .transform_mcp_server(
937                            client_connection.clone(),
938                            mcp_server,
939                            &self.conductor_tx,
940                            &self.mcp_bridge_mode,
941                        )
942                        .await?;
943                }
944
945                agent_connection
946                    .send_request(request)
947                    .forward_response_to(responder)
948            })
949            .await
950            .if_request(
951                async |request: McpOverAcpMessage<UntypedMessage>, responder| {
952                    let McpOverAcpMessage {
953                        connection_id,
954                        message: mcp_request,
955                        ..
956                    } = request;
957                    self.bridge_connections
958                        .get_mut(&connection_id)
959                        .ok_or_else(|| {
960                            agent_client_protocol::util::internal_error(format!(
961                                "unknown connection id: {connection_id}"
962                            ))
963                        })?
964                        .send(Dispatch::Request(mcp_request, responder))
965                        .await
966                },
967            )
968            .await
969            .if_notification(async |notification: McpOverAcpMessage<UntypedMessage>| {
970                let McpOverAcpMessage {
971                    connection_id,
972                    message: mcp_notification,
973                    ..
974                } = notification;
975                self.bridge_connections
976                    .get_mut(&connection_id)
977                    .ok_or_else(|| {
978                        agent_client_protocol::util::internal_error(format!(
979                            "unknown connection id: {connection_id}"
980                        ))
981                    })?
982                    .send(Dispatch::Notification(mcp_notification))
983                    .await
984            })
985            .await
986            .otherwise(async |message| {
987                // Otherwise, just send the message along "as is".
988                agent_connection.send_proxied_message_to(Agent, message)
989            })
990            .await
991    }
992}
993
994/// Identifies a component in the conductor's chain for tracing purposes.
995///
996/// Used to track message sources and destinations through the proxy chain.
997#[derive(Debug, Clone, Copy)]
998pub enum ComponentIndex {
999    /// The client (editor) at the start of the chain.
1000    Client,
1001
1002    /// A proxy component at the given index.
1003    Proxy(usize),
1004
1005    /// The successor (agent in agent mode, outer conductor in proxy mode).
1006    Agent,
1007}
1008
1009impl ComponentIndex {
1010    /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
1011    #[must_use]
1012    pub fn predecessor_of(proxy_index: usize) -> Self {
1013        match proxy_index.checked_sub(1) {
1014            Some(p_i) => ComponentIndex::Proxy(p_i),
1015            None => ComponentIndex::Client,
1016        }
1017    }
1018
1019    /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
1020    #[must_use]
1021    pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
1022        if proxy_index == num_proxies {
1023            ComponentIndex::Agent
1024        } else {
1025            ComponentIndex::Proxy(proxy_index + 1)
1026        }
1027    }
1028}
1029
1030/// Identifies the source of an agent-to-client message.
1031///
1032/// This enum handles the fact that the conductor may receive messages from two different sources:
1033/// 1. From one of its managed components (identified by index)
1034/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
1035#[derive(Debug, Clone, Copy)]
1036pub enum SourceComponentIndex {
1037    /// Message from a specific component at the given index in the managed chain.
1038    Proxy(usize),
1039
1040    /// Message from the conductor's agent or successor.
1041    Successor,
1042}
1043
1044/// Trait for lazy proxy instantiation (proxy mode).
1045///
1046/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
1047/// are proxies that forward to an outer conductor.
1048pub trait InstantiateProxies: Send {
1049    /// Instantiate proxy components based on the Initialize request.
1050    ///
1051    /// Returns proxy components typed as `DynConnectTo<Conductor>` since proxies
1052    /// communicate with the conductor.
1053    fn instantiate_proxies(
1054        self: Box<Self>,
1055        req: InitializeRequest,
1056    ) -> futures::future::BoxFuture<
1057        'static,
1058        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
1059    >;
1060}
1061
1062/// Simple implementation: provide all proxy components unconditionally.
1063///
1064/// Requires `T: ConnectTo<Conductor>`.
1065impl<T> InstantiateProxies for Vec<T>
1066where
1067    T: ConnectTo<Conductor> + 'static,
1068{
1069    fn instantiate_proxies(
1070        self: Box<Self>,
1071        req: InitializeRequest,
1072    ) -> futures::future::BoxFuture<
1073        'static,
1074        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
1075    > {
1076        Box::pin(async move {
1077            let components: Vec<DynConnectTo<Conductor>> =
1078                (*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
1079            Ok((req, components))
1080        })
1081    }
1082}
1083
1084/// Dynamic implementation: closure receives the Initialize request and returns proxies.
1085impl<F, Fut> InstantiateProxies for F
1086where
1087    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1088    Fut: std::future::Future<
1089            Output = Result<
1090                (InitializeRequest, Vec<DynConnectTo<Conductor>>),
1091                agent_client_protocol::Error,
1092            >,
1093        > + Send
1094        + 'static,
1095{
1096    fn instantiate_proxies(
1097        self: Box<Self>,
1098        req: InitializeRequest,
1099    ) -> futures::future::BoxFuture<
1100        'static,
1101        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
1102    > {
1103        Box::pin(async move { (*self)(req).await })
1104    }
1105}
1106
1107/// Trait for lazy proxy and agent instantiation (agent mode).
1108///
1109/// Used by conductors in agent mode (`ConductorToClient`) where there are
1110/// zero or more proxies followed by an agent component.
1111pub trait InstantiateProxiesAndAgent: Send {
1112    /// Instantiate proxy and agent components based on the Initialize request.
1113    ///
1114    /// Returns the (possibly modified) request, a vector of proxy components
1115    /// (typed as `DynConnectTo<Conductor>`), and the agent component
1116    /// (typed as `DynConnectTo<Client>`).
1117    fn instantiate_proxies_and_agent(
1118        self: Box<Self>,
1119        req: InitializeRequest,
1120    ) -> futures::future::BoxFuture<
1121        'static,
1122        Result<
1123            (
1124                InitializeRequest,
1125                Vec<DynConnectTo<Conductor>>,
1126                DynConnectTo<Client>,
1127            ),
1128            agent_client_protocol::Error,
1129        >,
1130    >;
1131}
1132
1133/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
1134#[derive(Debug)]
1135pub struct AgentOnly<A>(pub A);
1136
1137impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
1138    fn instantiate_proxies_and_agent(
1139        self: Box<Self>,
1140        req: InitializeRequest,
1141    ) -> futures::future::BoxFuture<
1142        'static,
1143        Result<
1144            (
1145                InitializeRequest,
1146                Vec<DynConnectTo<Conductor>>,
1147                DynConnectTo<Client>,
1148            ),
1149            agent_client_protocol::Error,
1150        >,
1151    > {
1152        Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
1153    }
1154}
1155
1156/// Builder for creating proxies and agent components.
1157///
1158/// # Example
1159/// ```ignore
1160/// ProxiesAndAgent::new(ElizaAgent::new())
1161///     .proxy(LoggingProxy::new())
1162///     .proxy(AuthProxy::new())
1163/// ```
1164#[derive(Debug)]
1165pub struct ProxiesAndAgent {
1166    proxies: Vec<DynConnectTo<Conductor>>,
1167    agent: DynConnectTo<Client>,
1168}
1169
1170impl ProxiesAndAgent {
1171    /// Create a new builder with the given agent component.
1172    pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
1173        Self {
1174            proxies: vec![],
1175            agent: DynConnectTo::new(agent),
1176        }
1177    }
1178
1179    /// Add a single proxy component.
1180    #[must_use]
1181    pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
1182        self.proxies.push(DynConnectTo::new(proxy));
1183        self
1184    }
1185
1186    /// Add multiple proxy components.
1187    #[must_use]
1188    pub fn proxies<P, I>(mut self, proxies: I) -> Self
1189    where
1190        P: ConnectTo<Conductor> + 'static,
1191        I: IntoIterator<Item = P>,
1192    {
1193        self.proxies
1194            .extend(proxies.into_iter().map(DynConnectTo::new));
1195        self
1196    }
1197}
1198
1199impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1200    fn instantiate_proxies_and_agent(
1201        self: Box<Self>,
1202        req: InitializeRequest,
1203    ) -> futures::future::BoxFuture<
1204        'static,
1205        Result<
1206            (
1207                InitializeRequest,
1208                Vec<DynConnectTo<Conductor>>,
1209                DynConnectTo<Client>,
1210            ),
1211            agent_client_protocol::Error,
1212        >,
1213    > {
1214        Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1215    }
1216}
1217
1218/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1219impl<F, Fut> InstantiateProxiesAndAgent for F
1220where
1221    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1222    Fut: std::future::Future<
1223            Output = Result<
1224                (
1225                    InitializeRequest,
1226                    Vec<DynConnectTo<Conductor>>,
1227                    DynConnectTo<Client>,
1228                ),
1229                agent_client_protocol::Error,
1230            >,
1231        > + Send
1232        + 'static,
1233{
1234    fn instantiate_proxies_and_agent(
1235        self: Box<Self>,
1236        req: InitializeRequest,
1237    ) -> futures::future::BoxFuture<
1238        'static,
1239        Result<
1240            (
1241                InitializeRequest,
1242                Vec<DynConnectTo<Conductor>>,
1243                DynConnectTo<Client>,
1244            ),
1245            agent_client_protocol::Error,
1246        >,
1247    > {
1248        Box::pin(async move { (*self)(req).await })
1249    }
1250}
1251
1252/// Messages sent to the conductor's main event loop for routing.
1253///
1254/// These messages enable the conductor to route communication between:
1255/// - The editor and the first component
1256/// - Components and their successors in the chain
1257/// - Components and their clients (editor or predecessor)
1258///
1259/// All spawned tasks send messages via this enum through a shared channel,
1260/// allowing centralized routing logic in the `serve()` loop.
1261#[derive(Debug)]
1262pub enum ConductorMessage {
1263    /// If this message is a request or notification, then it is going "left-to-right"
1264    /// (e.g., a component making a request of its successor).
1265    ///
1266    /// If this message is a response, then it is going right-to-left
1267    /// (i.e., the successor answering a request made by its predecessor).
1268    LeftToRight {
1269        target_component_index: usize,
1270        message: Dispatch,
1271    },
1272
1273    /// If this message is a request or notification, then it is going "right-to-left"
1274    /// (e.g., a component making a request of its predecessor).
1275    ///
1276    /// If this message is a response, then it is going "left-to-right"
1277    /// (i.e., the predecessor answering a request made by its successor).
1278    RightToLeft {
1279        source_component_index: SourceComponentIndex,
1280        message: Dispatch,
1281    },
1282
1283    /// A pending MCP bridge connection request request.
1284    /// The request must be sent back over ACP to receive the connection-id.
1285    /// Once the connection-id is received, the actor must be spawned.
1286    McpConnectionReceived {
1287        /// The acp:$UUID URL identifying this bridge
1288        acp_url: String,
1289
1290        /// The actor that should be spawned once the connection-id is available.
1291        actor: McpBridgeConnectionActor,
1292
1293        /// The connection to the bridge
1294        connection: McpBridgeConnection,
1295    },
1296
1297    /// A pending MCP bridge connection request request.
1298    /// The request must be sent back over ACP to receive the connection-id.
1299    /// Once the connection-id is received, the actor must be spawned.
1300    McpConnectionEstablished {
1301        response: McpConnectResponse,
1302
1303        /// The actor that should be spawned once the connection-id is available.
1304        actor: McpBridgeConnectionActor,
1305
1306        /// The connection to the bridge
1307        connection: McpBridgeConnection,
1308    },
1309
1310    /// MCP message (request or notification) received from a bridge that needs to be routed to the final proxy.
1311    ///
1312    /// Sent when the bridge receives an MCP tool call from the agent and forwards it
1313    /// to the conductor via TCP. The conductor routes this to the appropriate proxy component.
1314    McpClientToMcpServer {
1315        connection_id: String,
1316        message: Dispatch,
1317    },
1318
1319    /// Message sent when MCP client disconnects
1320    McpConnectionDisconnected {
1321        notification: McpDisconnectNotification,
1322    },
1323}
1324
1325/// Trait implemented for the two links the conductor can use:
1326///
1327/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1328/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1329pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
1330    /// The type used to instantiate components for this link type.
1331    type Instantiator: Send;
1332
1333    /// Handle initialization: parse the init request, instantiate components, and spawn them.
1334    ///
1335    /// Takes ownership of the instantiator and returns the (possibly modified) init request
1336    /// wrapped in a Dispatch for forwarding.
1337    fn initialize(
1338        &self,
1339        message: Dispatch,
1340        connection: ConnectionTo<Self::Counterpart>,
1341        instantiator: Self::Instantiator,
1342        responder: &mut ConductorResponder<Self>,
1343    ) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
1344
1345    /// Handle an incoming message from the client or conductor, depending on `Self`
1346    fn handle_dispatch(
1347        &self,
1348        message: Dispatch,
1349        connection: ConnectionTo<Self::Counterpart>,
1350        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1351    ) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
1352}
1353
1354/// Conductor acting as an agent
1355impl ConductorHostRole for Agent {
1356    type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1357
1358    async fn initialize(
1359        &self,
1360        message: Dispatch,
1361        client_connection: ConnectionTo<Client>,
1362        instantiator: Self::Instantiator,
1363        responder: &mut ConductorResponder<Self>,
1364    ) -> Result<Dispatch, agent_client_protocol::Error> {
1365        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1366
1367        // Not yet initialized - expect an initialize request.
1368        // Error if we get anything else.
1369        let Dispatch::Request(request, init_responder) = message else {
1370            message.respond_with_error(invalid_request(), client_connection.clone())?;
1371            return Err(invalid_request());
1372        };
1373        if !InitializeRequest::matches_method(request.method()) {
1374            init_responder.respond_with_error(invalid_request())?;
1375            return Err(invalid_request());
1376        }
1377
1378        let init_request =
1379            match InitializeRequest::parse_message(request.method(), request.params()) {
1380                Ok(r) => r,
1381                Err(error) => {
1382                    init_responder.respond_with_error(error)?;
1383                    return Err(invalid_request());
1384                }
1385            };
1386
1387        // Instantiate proxies and agent
1388        let (modified_req, proxy_components, agent_component) = instantiator
1389            .instantiate_proxies_and_agent(init_request)
1390            .await?;
1391
1392        // Spawn the agent component
1393        debug!(?agent_component, "spawning agent");
1394
1395        let connection_to_agent = client_connection.spawn_connection(
1396            Client
1397                .builder()
1398                .name("conductor-to-agent")
1399                // Intercept agent-to-client messages from the agent.
1400                .on_receive_dispatch(
1401                    {
1402                        let mut conductor_tx = responder.conductor_tx.clone();
1403                        async move |dispatch: Dispatch, _cx| {
1404                            conductor_tx
1405                                .send(ConductorMessage::RightToLeft {
1406                                    source_component_index: SourceComponentIndex::Successor,
1407                                    message: dispatch,
1408                                })
1409                                .await
1410                                .map_err(agent_client_protocol::util::internal_error)
1411                        }
1412                    },
1413                    agent_client_protocol::on_receive_dispatch!(),
1414                ),
1415            agent_component,
1416        )?;
1417        responder.successor = Arc::new(connection_to_agent);
1418
1419        // Spawn the proxy components
1420        responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1421
1422        Ok(Dispatch::Request(
1423            modified_req.to_untyped_message()?,
1424            init_responder,
1425        ))
1426    }
1427
1428    async fn handle_dispatch(
1429        &self,
1430        message: Dispatch,
1431        client_connection: ConnectionTo<Client>,
1432        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1433    ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1434        tracing::debug!(
1435            method = ?message.method(),
1436            "ConductorToClient::handle_dispatch"
1437        );
1438        MatchDispatchFrom::new(message, &client_connection)
1439            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1440            .if_message_from(Client, async move |message: Dispatch| {
1441                tracing::debug!(
1442                    method = ?message.method(),
1443                    "ConductorToClient::handle_dispatch - matched Client"
1444                );
1445                ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
1446            })
1447            .await
1448            .done()
1449    }
1450}
1451
1452/// Conductor acting as a proxy
1453impl ConductorHostRole for Proxy {
1454    type Instantiator = Box<dyn InstantiateProxies>;
1455
1456    async fn initialize(
1457        &self,
1458        message: Dispatch,
1459        client_connection: ConnectionTo<Conductor>,
1460        instantiator: Self::Instantiator,
1461        responder: &mut ConductorResponder<Self>,
1462    ) -> Result<Dispatch, agent_client_protocol::Error> {
1463        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1464
1465        // Not yet initialized - expect an InitializeProxy request.
1466        // Error if we get anything else.
1467        let Dispatch::Request(request, init_responder) = message else {
1468            message.respond_with_error(invalid_request(), client_connection.clone())?;
1469            return Err(invalid_request());
1470        };
1471        if !InitializeProxyRequest::matches_method(request.method()) {
1472            init_responder.respond_with_error(invalid_request())?;
1473            return Err(invalid_request());
1474        }
1475
1476        let InitializeProxyRequest { initialize } =
1477            match InitializeProxyRequest::parse_message(request.method(), request.params()) {
1478                Ok(r) => r,
1479                Err(error) => {
1480                    init_responder.respond_with_error(error)?;
1481                    return Err(invalid_request());
1482                }
1483            };
1484
1485        tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1486
1487        // Instantiate proxies (no agent in proxy mode)
1488        let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1489
1490        // In proxy mode, our successor is the outer conductor (via our client connection)
1491        responder.successor = Arc::new(GrandSuccessor);
1492
1493        // Spawn the proxy components
1494        responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1495
1496        Ok(Dispatch::Request(
1497            modified_req.to_untyped_message()?,
1498            init_responder,
1499        ))
1500    }
1501
1502    async fn handle_dispatch(
1503        &self,
1504        message: Dispatch,
1505        client_connection: ConnectionTo<Conductor>,
1506        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1507    ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1508        tracing::debug!(
1509            method = ?message.method(),
1510            ?message,
1511            "ConductorToConductor::handle_dispatch"
1512        );
1513        MatchDispatchFrom::new(message, &client_connection)
1514            .if_message_from(Agent, {
1515                // Messages from our successor arrive already unwrapped
1516                // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1517                async |message: Dispatch| {
1518                    tracing::debug!(
1519                        method = ?message.method(),
1520                        "ConductorToConductor::handle_dispatch - matched Agent"
1521                    );
1522                    let mut conductor_tx = conductor_tx.clone();
1523                    ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
1524                        .await
1525                }
1526            })
1527            .await
1528            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1529            .if_message_from(Client, async |message: Dispatch| {
1530                tracing::debug!(
1531                    method = ?message.method(),
1532                    "ConductorToConductor::handle_dispatch - matched Client"
1533                );
1534                let mut conductor_tx = conductor_tx.clone();
1535                ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
1536                    .await
1537            })
1538            .await
1539            .done()
1540    }
1541}
1542
1543pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
1544    /// Send a message to the successor.
1545    fn send_message<'a>(
1546        &self,
1547        message: Dispatch,
1548        connection_to_conductor: ConnectionTo<Host::Counterpart>,
1549        responder: &'a mut ConductorResponder<Host>,
1550    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
1551}
1552
1553impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
1554    fn send_message<'a>(
1555        &self,
1556        #[expect(unused_variables)] message: Dispatch,
1557        #[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
1558        #[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
1559    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1560        let error = self.clone();
1561        Box::pin(std::future::ready(Err(error)))
1562    }
1563}
1564
1565/// A dummy type handling messages sent to the conductor's
1566/// successor when it is acting as a proxy.
1567struct GrandSuccessor;
1568
1569/// When the conductor is acting as an proxy, messages sent by
1570/// the last proxy go to the conductor's successor.
1571///
1572/// ```text
1573/// client --> Conductor -----------------------------> GrandSuccessor
1574///            |                                  |
1575///            +-> Proxy[0] -> ... -> Proxy[n-1] -+
1576/// ```
1577impl ConductorSuccessor<Proxy> for GrandSuccessor {
1578    fn send_message<'a>(
1579        &self,
1580        message: Dispatch,
1581        connection: ConnectionTo<Conductor>,
1582        _responder: &'a mut ConductorResponder<Proxy>,
1583    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1584        Box::pin(async move {
1585            debug!("Proxy mode: forwarding successor message to conductor's successor");
1586            connection.send_proxied_message_to(Agent, message)
1587        })
1588    }
1589}
1590
1591/// When the conductor is acting as an agent, messages sent by
1592/// the last proxy to its successor go to the internal agent
1593/// (`self`).
1594impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
1595    fn send_message<'a>(
1596        &self,
1597        message: Dispatch,
1598        connection: ConnectionTo<Client>,
1599        responder: &'a mut ConductorResponder<Agent>,
1600    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1601        let connection_to_agent = self.clone();
1602        Box::pin(async move {
1603            debug!("Proxy mode: forwarding successor message to conductor's successor");
1604            responder
1605                .forward_message_to_agent(connection, message, connection_to_agent)
1606                .await
1607        })
1608    }
1609}