Skip to main content

agent_client_protocol_conductor/
conductor.rs

1//! # Conductor: ACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//!    requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//!    to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//!     "my-conductor",
73//!     vec![proxy1, proxy2, agent],
74//!     None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//!     "my-conductor",
87//!     |cx, conductor_tx, init_req| async move {
88//!         // Examine capabilities
89//!         let needs_auth = has_auth_capability(&init_req);
90//!
91//!         let mut components = Vec::new();
92//!         if needs_auth {
93//!             components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//!         }
95//!         components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//!         // Return (potentially modified) request and component list
98//!         Ok((init_req, components))
99//!     },
100//!     None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &ConnectionTo` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<ConnectionTo>` of spawned components
112
113use std::sync::Arc;
114
115use agent_client_protocol::{
116    Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
117    Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
118};
119use agent_client_protocol::{
120    Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest,
121};
122use agent_client_protocol::{
123    HandleDispatchFrom,
124    schema::{InitializeProxyRequest, v1::InitializeRequest},
125    util::MatchDispatchFrom,
126};
127use agent_client_protocol::{Handled, schema::SuccessorMessage};
128use futures::{
129    SinkExt, StreamExt,
130    channel::mpsc::{self},
131};
132use tracing::{debug, info};
133
134/// The conductor manages the proxy chain lifecycle and message routing.
135///
136/// It maintains connections to all components in the chain and routes messages
137/// bidirectionally between the editor, components, and agent.
138///
139#[derive(Debug)]
140pub struct ConductorImpl<Host: ConductorHostRole> {
141    host: Host,
142    name: String,
143    instantiator: Host::Instantiator,
144    trace_writer: Option<crate::trace::TraceWriter>,
145}
146
147impl<Host: ConductorHostRole> ConductorImpl<Host> {
148    pub fn new(host: Host, name: impl ToString, instantiator: Host::Instantiator) -> Self {
149        ConductorImpl {
150            name: name.to_string(),
151            host,
152            instantiator,
153            trace_writer: None,
154        }
155    }
156}
157
158impl ConductorImpl<Agent> {
159    /// Create a conductor in agent mode (the last component is an agent).
160    pub fn new_agent(
161        name: impl ToString,
162        instantiator: impl InstantiateProxiesAndAgent + 'static,
163    ) -> Self {
164        ConductorImpl::new(Agent, name, Box::new(instantiator))
165    }
166}
167
168impl ConductorImpl<Proxy> {
169    /// Create a conductor in proxy mode (forwards to another conductor).
170    pub fn new_proxy(name: impl ToString, instantiator: impl InstantiateProxies + 'static) -> Self {
171        ConductorImpl::new(Proxy, name, Box::new(instantiator))
172    }
173}
174
175impl<Host: ConductorHostRole> ConductorImpl<Host> {
176    /// Enable trace logging to a custom destination.
177    ///
178    /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
179    #[must_use]
180    pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
181        self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
182        self
183    }
184
185    /// Enable trace logging to a file path.
186    ///
187    /// Events will be written as newline-delimited JSON (`.jsons` format).
188    /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
189    pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
190        self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
191        Ok(self)
192    }
193
194    /// Enable trace logging with an existing TraceWriter.
195    #[must_use]
196    pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
197        self.trace_writer = Some(writer);
198        self
199    }
200
201    /// Run the conductor with a transport.
202    pub async fn run(
203        self,
204        transport: impl ConnectTo<Host>,
205    ) -> Result<(), agent_client_protocol::Error> {
206        let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
207
208        // Set up tracing if enabled - spawn writer task and get handle
209        let trace_handle;
210        let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
211        if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
212            trace_handle = Some(h);
213            trace_future = Box::pin(f);
214        } else {
215            trace_handle = None;
216            trace_future = Box::pin(std::future::ready(Ok(())));
217        }
218
219        let responder = ConductorResponder {
220            conductor_rx,
221            conductor_tx: conductor_tx.clone(),
222            instantiator: Some(self.instantiator),
223            proxies: Vec::default(),
224            successor: Arc::new(agent_client_protocol::util::internal_error(
225                "successor not initialized",
226            )),
227            trace_handle,
228            host: self.host.clone(),
229        };
230
231        Builder::new_with(
232            self.host.clone(),
233            ConductorMessageHandler {
234                conductor_tx,
235                host: self.host.clone(),
236            },
237        )
238        .name(self.name)
239        .with_responder(responder)
240        .with_spawned(|_cx| trace_future)
241        .connect_to(transport)
242        .await
243    }
244
245    async fn incoming_message_from_client(
246        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
247        message: Dispatch,
248    ) -> Result<(), agent_client_protocol::Error> {
249        conductor_tx
250            .send(ConductorMessage::LeftToRight {
251                target_component_index: 0,
252                message,
253            })
254            .await
255            .map_err(agent_client_protocol::util::internal_error)
256    }
257
258    async fn incoming_message_from_agent(
259        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
260        message: Dispatch,
261    ) -> Result<(), agent_client_protocol::Error> {
262        conductor_tx
263            .send(ConductorMessage::RightToLeft {
264                source_component_index: SourceComponentIndex::Successor,
265                message,
266            })
267            .await
268            .map_err(agent_client_protocol::util::internal_error)
269    }
270}
271
272impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
273    async fn connect_to(
274        self,
275        client: impl ConnectTo<Host>,
276    ) -> Result<(), agent_client_protocol::Error> {
277        self.run(client).await
278    }
279}
280
281struct ConductorMessageHandler<Host: ConductorHostRole> {
282    conductor_tx: mpsc::Sender<ConductorMessage>,
283    host: Host,
284}
285
286impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
287    for ConductorMessageHandler<Host>
288{
289    async fn handle_dispatch_from(
290        &mut self,
291        message: Dispatch,
292        connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
293    ) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
294        self.host
295            .handle_dispatch(message, connection, &mut self.conductor_tx)
296            .await
297    }
298
299    fn describe_chain(&self) -> impl std::fmt::Debug {
300        "ConductorMessageHandler"
301    }
302}
303
304/// The conductor manages the proxy chain lifecycle and message routing.
305///
306/// It maintains connections to all components in the chain and routes messages
307/// bidirectionally between the editor, components, and agent.
308///
309pub struct ConductorResponder<Host>
310where
311    Host: ConductorHostRole,
312{
313    conductor_rx: mpsc::Receiver<ConductorMessage>,
314
315    conductor_tx: mpsc::Sender<ConductorMessage>,
316
317    /// The instantiator for lazy initialization.
318    /// Set to None after components are instantiated.
319    instantiator: Option<Host::Instantiator>,
320
321    /// The chain of proxies before the agent (if any).
322    ///
323    /// Populated lazily when the first Initialize request is received.
324    proxies: Vec<ConnectionTo<Proxy>>,
325
326    /// If the conductor is operating in agent mode, this will direct messages to the agent.
327    /// If the conductor is operating in proxy mode, this will direct messages to the successor.
328    /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
329    successor: Arc<dyn ConductorSuccessor<Host>>,
330
331    /// Optional trace handle for sequence diagram visualization.
332    trace_handle: Option<crate::trace::TraceHandle>,
333
334    /// Defines what sort of link we have
335    host: Host,
336}
337
338impl<Host> std::fmt::Debug for ConductorResponder<Host>
339where
340    Host: ConductorHostRole,
341{
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        f.debug_struct("ConductorResponder")
344            .field("conductor_rx", &self.conductor_rx)
345            .field("conductor_tx", &self.conductor_tx)
346            .field("proxies", &self.proxies)
347            .field("trace_handle", &self.trace_handle)
348            .field("host", &self.host)
349            .finish_non_exhaustive()
350    }
351}
352
353impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
354where
355    Host: ConductorHostRole,
356{
357    async fn run_with_connection_to(
358        mut self,
359        connection: ConnectionTo<Host::Counterpart>,
360    ) -> Result<(), agent_client_protocol::Error> {
361        // Components are now spawned lazily in forward_initialize_request
362        // when the first Initialize request is received.
363
364        // This is the "central actor" of the conductor. Most other things forward messages
365        // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
366        while let Some(message) = self.conductor_rx.next().await {
367            self.handle_conductor_message(connection.clone(), message)
368                .await?;
369        }
370        Ok(())
371    }
372}
373
374impl<Host> ConductorResponder<Host>
375where
376    Host: ConductorHostRole,
377{
378    /// Recursively spawns components and builds the proxy chain.
379    ///
380    /// This function implements the recursive chain building pattern:
381    /// 1. Pop the next component from the `providers` list
382    /// 2. Create the component (either spawn subprocess or use mock)
383    /// 3. Set up JSON-RPC connection and message handlers
384    /// 4. Recursively call itself to spawn the next component
385    /// 5. When no components remain, start the message routing loop via `serve()`
386    ///
387    /// Central message handling logic for the conductor.
388    /// The conductor routes all [`ConductorMessage`] messages through to this function.
389    /// Each message corresponds to a request or notification from one component to another.
390    /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
391    /// Note that *responses to requests* are sent *directly* without going through this loop.
392    ///
393    /// The names we use are
394    ///
395    /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
396    /// * Then there is a sequence of *components* consisting of:
397    ///     * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
398    ///     * And finally the *agent*, which is the final component in the chain and handles the actual work.
399    ///
400    /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
401    ///
402    /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
403    /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
404    ///   through a stdio server that runs on localhost and bridges messages.
405    async fn handle_conductor_message(
406        &mut self,
407        client: ConnectionTo<Host::Counterpart>,
408        message: ConductorMessage,
409    ) -> Result<(), agent_client_protocol::Error> {
410        tracing::debug!(?message, "handle_conductor_message");
411
412        match message {
413            ConductorMessage::LeftToRight {
414                target_component_index,
415                message,
416            } => {
417                // Tracing happens inside forward_client_to_agent_message, after initialization,
418                // so that component_name() has access to the populated proxies list.
419                self.forward_client_to_agent_message(target_component_index, message, client)
420                    .await
421            }
422
423            ConductorMessage::RightToLeft {
424                source_component_index,
425                message,
426            } => {
427                tracing::debug!(
428                    ?source_component_index,
429                    message_method = ?message.method(),
430                    "Conductor: AgentToClient received"
431                );
432                self.send_message_to_predecessor_of(client, source_component_index, message)
433            }
434        }
435    }
436
437    /// Send a message (request or notification) to the predecessor of the given component.
438    ///
439    /// This is a bit subtle because the relationship of the conductor
440    /// is different depending on who will be receiving the message:
441    /// * If the message is going to the conductor's client, then no changes
442    ///   are needed, as the conductor is sending an agent-to-client message and
443    ///   the conductor is acting as the agent.
444    /// * If the message is going to a proxy component, then we have to wrap
445    ///   it in a "from successor" wrapper, because the conductor is the
446    ///   proxy's client.
447    fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
448        &mut self,
449        client: ConnectionTo<Host::Counterpart>,
450        source_component_index: SourceComponentIndex,
451        message: Dispatch<Req, N>,
452    ) -> Result<(), agent_client_protocol::Error>
453    where
454        Req::Response: Send,
455    {
456        let source_component_index = match source_component_index {
457            SourceComponentIndex::Successor => self.proxies.len(),
458            SourceComponentIndex::Proxy(index) => index,
459        };
460
461        match message {
462            Dispatch::Request(request, responder) => self
463                .send_request_to_predecessor_of(client, source_component_index, request)
464                .forward_response_to(responder),
465            Dispatch::Notification(notification) => {
466                // `$/cancel_request` is connection-scoped: its `requestId` was
467                // allocated on the connection the notification arrived over
468                // and means nothing on the predecessor's connection. The SDK
469                // already propagates the cancellation hop by hop through the
470                // `forward_response_to` calls above, so drop the raw
471                // notification instead of tunneling a meaningless ID.
472                #[cfg(feature = "unstable_cancel_request")]
473                if agent_client_protocol::is_cancel_request_notification(&notification) {
474                    tracing::debug!(
475                        "not forwarding hop-scoped `$/cancel_request` notification to predecessor"
476                    );
477                    return Ok(());
478                }
479                self.send_notification_to_predecessor_of(
480                    client,
481                    source_component_index,
482                    notification,
483                )
484            }
485            Dispatch::Response(result, router) => router.respond_with_result(result),
486        }
487    }
488
489    fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
490        &mut self,
491        client_connection: ConnectionTo<Host::Counterpart>,
492        source_component_index: usize,
493        request: Req,
494    ) -> SentRequest<Req::Response> {
495        if source_component_index == 0 {
496            client_connection.send_request_to(Client, request)
497        } else {
498            self.proxies[source_component_index - 1].send_request(SuccessorMessage {
499                message: request,
500                meta: None,
501            })
502        }
503    }
504
505    /// Send a notification to the predecessor of the given component.
506    ///
507    /// This is a bit subtle because the relationship of the conductor
508    /// is different depending on who will be receiving the message:
509    /// * If the notification is going to the conductor's client, then no changes
510    ///   are needed, as the conductor is sending an agent-to-client message and
511    ///   the conductor is acting as the agent.
512    /// * If the notification is going to a proxy component, then we have to wrap
513    ///   it in a "from successor" wrapper, because the conductor is the
514    ///   proxy's client.
515    fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
516        &mut self,
517        client: ConnectionTo<Host::Counterpart>,
518        source_component_index: usize,
519        notification: N,
520    ) -> Result<(), agent_client_protocol::Error> {
521        tracing::debug!(
522            source_component_index,
523            proxies_len = self.proxies.len(),
524            "send_notification_to_predecessor_of"
525        );
526        if source_component_index == 0 {
527            tracing::debug!("Sending notification directly to client");
528            client.send_notification_to(Client, notification)
529        } else {
530            tracing::debug!(
531                target_proxy = source_component_index - 1,
532                "Sending notification wrapped as SuccessorMessage to proxy"
533            );
534            self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
535                message: notification,
536                meta: None,
537            })
538        }
539    }
540
541    /// Send a message (request or notification) from 'left to right'.
542    /// Left-to-right means from the client or an intermediate proxy to the component
543    /// at `target_component_index` (could be a proxy or the agent).
544    /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
545    async fn forward_client_to_agent_message(
546        &mut self,
547        target_component_index: usize,
548        message: Dispatch,
549        client: ConnectionTo<Host::Counterpart>,
550    ) -> Result<(), agent_client_protocol::Error> {
551        tracing::trace!(
552            target_component_index,
553            ?message,
554            "forward_client_to_agent_message"
555        );
556
557        // Ensure components are initialized before processing any message.
558        let message = self.ensure_initialized(client.clone(), message).await?;
559
560        // In proxy mode, if the target is beyond our component chain,
561        // forward to the conductor's own successor (via client connection)
562        if target_component_index < self.proxies.len() {
563            self.forward_message_from_client_to_proxy(target_component_index, message)
564                .await
565        } else {
566            assert_eq!(target_component_index, self.proxies.len());
567
568            debug!(
569                target_component_index,
570                proxies_count = self.proxies.len(),
571                "Proxy mode: forwarding successor message to conductor's successor"
572            );
573            let successor = self.successor.clone();
574            successor.send_message(message, client, self).await
575        }
576    }
577
578    /// Ensures components are initialized before processing messages.
579    ///
580    /// If components haven't been initialized yet, this expects the first message
581    /// to be an `initialize` request and uses it to spawn the component chain.
582    ///
583    /// Returns:
584    /// - `Ok(Some(message))` - Components are initialized, continue processing this message
585    /// - `Ok(None)` - An error response was sent, caller should return early
586    /// - `Err(_)` - A fatal error occurred
587    async fn ensure_initialized(
588        &mut self,
589        client: ConnectionTo<Host::Counterpart>,
590        message: Dispatch,
591    ) -> Result<Dispatch, Error> {
592        // Already initialized - pass through
593        let Some(instantiator) = self.instantiator.take() else {
594            return Ok(message);
595        };
596
597        let host = self.host.clone();
598        let message = host.initialize(message, client, instantiator, self).await?;
599        Ok(message)
600    }
601
602    /// Wrap a proxy component with tracing if tracing is enabled.
603    ///
604    /// Returns the component unchanged if tracing is disabled.
605    fn trace_proxy(
606        &self,
607        proxy_index: ComponentIndex,
608        successor_index: ComponentIndex,
609        component: impl ConnectTo<Conductor>,
610    ) -> DynConnectTo<Conductor> {
611        match &self.trace_handle {
612            Some(trace_handle) => {
613                trace_handle.bridge_component(proxy_index, successor_index, component)
614            }
615            None => DynConnectTo::new(component),
616        }
617    }
618
619    /// Spawn proxy components and add them to the proxies list.
620    fn spawn_proxies(
621        &mut self,
622        client: ConnectionTo<Host::Counterpart>,
623        proxy_components: Vec<DynConnectTo<Conductor>>,
624    ) -> Result<(), agent_client_protocol::Error> {
625        assert!(self.proxies.is_empty());
626
627        let num_proxies = proxy_components.len();
628        info!(proxy_count = num_proxies, "spawn_proxies");
629
630        // Special case: if there are no user-defined proxies
631        // but tracing is enabled, we make a dummy proxy that just
632        // passes through messages but which can trigger the
633        // tracing events.
634        if self.trace_handle.is_some() && num_proxies == 0 {
635            self.connect_to_proxy(
636                &client,
637                0,
638                ComponentIndex::Client,
639                ComponentIndex::Agent,
640                Proxy.builder(),
641            )?;
642        } else {
643            // Spawn each proxy component
644            for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
645                debug!(component_index, "spawning proxy");
646
647                self.connect_to_proxy(
648                    &client,
649                    component_index,
650                    ComponentIndex::Proxy(component_index),
651                    ComponentIndex::successor_of(component_index, num_proxies),
652                    dyn_component,
653                )?;
654            }
655        }
656
657        info!(proxy_count = self.proxies.len(), "Proxies spawned");
658
659        Ok(())
660    }
661
662    /// Create a connection to the proxy with index `component_index` implemented in `component`.
663    ///
664    /// If tracing is enabled, the proxy's index is `trace_proxy_index` and its successor is `trace_successor_index`.
665    fn connect_to_proxy(
666        &mut self,
667        client: &ConnectionTo<Host::Counterpart>,
668        component_index: usize,
669        trace_proxy_index: ComponentIndex,
670        trace_successor_index: ComponentIndex,
671        component: impl ConnectTo<Conductor>,
672    ) -> Result<(), Error> {
673        let connection_builder = self.connection_to_proxy(component_index);
674        let connect_component =
675            self.trace_proxy(trace_proxy_index, trace_successor_index, component);
676        let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
677        self.proxies.push(proxy_connection);
678        Ok(())
679    }
680
681    /// Create the conductor's connection to the proxy with index `component_index`.
682    ///
683    /// Outgoing messages received from the proxy are sent to `self.conductor_tx` as either
684    /// left-to-right or right-to-left messages depending on whether they are wrapped
685    /// in `SuccessorMessage`.
686    fn connection_to_proxy(
687        &mut self,
688        component_index: usize,
689    ) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
690        type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
691        let mut conductor_tx = self.conductor_tx.clone();
692        Conductor
693            .builder()
694            .name(format!("conductor-to-component({component_index})"))
695            // Intercept messages sent by the proxy.
696            .on_receive_dispatch(
697                async move |dispatch: Dispatch, _connection| {
698                    MatchDispatch::new(dispatch)
699                        .if_message(async |dispatch: SuccessorDispatch| {
700                            //                         ------------------
701                            // SuccessorMessages sent by the proxy go to its successor.
702                            //
703                            // Subtle point:
704                            //
705                            // `ConductorToProxy` has only a single peer, `Agent`. This means that we see
706                            // "successor messages" in their "desugared form". So when we intercept an *outgoing*
707                            // message that matches `SuccessorMessage`, it could be one of three things
708                            //
709                            // - A request being sent by the proxy to its successor (hence going left->right)
710                            // - A notification being sent by the proxy to its successor (hence going left->right)
711                            // - A response to a request sent to the proxy *by* its successor. Here, the *request*
712                            //   was going right->left, but the *response* (the message we are processing now)
713                            //   is going left->right.
714                            //
715                            // So, in all cases, we forward as a left->right message.
716
717                            conductor_tx
718                                .send(ConductorMessage::LeftToRight {
719                                    target_component_index: component_index + 1,
720                                    message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
721                                })
722                                .await
723                                .map_err(agent_client_protocol::util::internal_error)
724                        })
725                        .await
726                        .otherwise(async |dispatch| {
727                            // Other messagrs send by the proxy go its predecessor.
728                            // As in the previous handler:
729                            //
730                            // Messages here are seen in their "desugared form", so we are seeing
731                            // one of three things
732                            //
733                            // - A request being sent by the proxy to its predecessor (hence going right->left)
734                            // - A notification being sent by the proxy to its predecessor (hence going right->left)
735                            // - A response to a request sent to the proxy *by* its predecessor. Here, the *request*
736                            //   was going left->right, but the *response* (the message we are processing now)
737                            //   is going right->left.
738                            //
739                            // So, in all cases, we forward as a right->left message.
740
741                            let message = ConductorMessage::RightToLeft {
742                                source_component_index: SourceComponentIndex::Proxy(
743                                    component_index,
744                                ),
745                                message: dispatch,
746                            };
747                            conductor_tx
748                                .send(message)
749                                .await
750                                .map_err(agent_client_protocol::util::internal_error)
751                        })
752                        .await
753                },
754                agent_client_protocol::on_receive_dispatch!(),
755            )
756    }
757
758    async fn forward_message_from_client_to_proxy(
759        &mut self,
760        target_component_index: usize,
761        message: Dispatch,
762    ) -> Result<(), agent_client_protocol::Error> {
763        tracing::debug!(?message, "forward_message_to_proxy");
764
765        MatchDispatch::new(message)
766            .if_request(async |_request: InitializeProxyRequest, responder| {
767                responder.respond_with_error(
768                    agent_client_protocol::Error::invalid_request()
769                        .data("initialize/proxy requests are only sent by the conductor"),
770                )
771            })
772            .await
773            .if_request(async |request: InitializeRequest, responder| {
774                // The pattern for `Initialize` messages is a bit subtle.
775                // Proxy receive incoming `Initialize` messages as if they
776                // were a client. The conductor (us) intercepts these and
777                // converts them to an `InitializeProxyRequest`.
778                //
779                // The proxy will then initialize itself and forward an `Initialize`
780                // request to its successor.
781                let sent = self.proxies[target_component_index]
782                    .send_request(InitializeProxyRequest::from(request));
783                // The request is rewritten, so `forward_response_to` cannot be
784                // used here; wire up cancellation forwarding explicitly to
785                // keep `initialize` cancellable like every other forwarded
786                // request.
787                #[cfg(feature = "unstable_cancel_request")]
788                let sent = sent.forward_cancellation_from(responder.cancellation());
789                sent.on_receiving_result(async move |result| {
790                    tracing::debug!(?result, "got initialize_proxy response from proxy");
791                    responder.respond_with_result(result)
792                })
793            })
794            .await
795            .otherwise(async |message| {
796                // Otherwise, just send the message along "as is".
797                self.proxies[target_component_index].send_proxied_message(message)
798            })
799            .await
800    }
801
802    /// Invoked when sending a message from the conductor to the agent that it manages.
803    /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
804    /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
805    /// running as a proxy).
806    async fn forward_message_to_agent(
807        &mut self,
808        _client_connection: ConnectionTo<Host::Counterpart>,
809        message: Dispatch,
810        agent_connection: ConnectionTo<Agent>,
811    ) -> Result<(), Error> {
812        MatchDispatch::new(message)
813            .if_request(async |_request: InitializeProxyRequest, responder| {
814                responder.respond_with_error(
815                    agent_client_protocol::Error::invalid_request()
816                        .data("initialize/proxy requests are only sent by the conductor"),
817                )
818            })
819            .await
820            .otherwise(async |message| {
821                // Forward all other messages to the agent as-is.
822                agent_connection.send_proxied_message_to(Agent, message)
823            })
824            .await
825    }
826}
827
828/// Identifies a component in the conductor's chain for tracing purposes.
829///
830/// Used to track message sources and destinations through the proxy chain.
831#[derive(Debug, Clone, Copy)]
832pub enum ComponentIndex {
833    /// The client (editor) at the start of the chain.
834    Client,
835
836    /// A proxy component at the given index.
837    Proxy(usize),
838
839    /// The successor (agent in agent mode, outer conductor in proxy mode).
840    Agent,
841}
842
843impl ComponentIndex {
844    /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
845    #[must_use]
846    pub fn predecessor_of(proxy_index: usize) -> Self {
847        match proxy_index.checked_sub(1) {
848            Some(p_i) => ComponentIndex::Proxy(p_i),
849            None => ComponentIndex::Client,
850        }
851    }
852
853    /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
854    #[must_use]
855    pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
856        if proxy_index == num_proxies {
857            ComponentIndex::Agent
858        } else {
859            ComponentIndex::Proxy(proxy_index + 1)
860        }
861    }
862}
863
864/// Identifies the source of an agent-to-client message.
865///
866/// This enum handles the fact that the conductor may receive messages from two different sources:
867/// 1. From one of its managed components (identified by index)
868/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
869#[derive(Debug, Clone, Copy)]
870pub enum SourceComponentIndex {
871    /// Message from a specific component at the given index in the managed chain.
872    Proxy(usize),
873
874    /// Message from the conductor's agent or successor.
875    Successor,
876}
877
878/// Trait for lazy proxy instantiation (proxy mode).
879///
880/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
881/// are proxies that forward to an outer conductor.
882pub trait InstantiateProxies: Send {
883    /// Instantiate proxy components based on the Initialize request.
884    ///
885    /// Returns proxy components typed as `DynConnectTo<Conductor>` since proxies
886    /// communicate with the conductor.
887    fn instantiate_proxies(
888        self: Box<Self>,
889        req: InitializeRequest,
890    ) -> futures::future::BoxFuture<
891        'static,
892        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
893    >;
894}
895
896/// Simple implementation: provide all proxy components unconditionally.
897///
898/// Requires `T: ConnectTo<Conductor>`.
899impl<T> InstantiateProxies for Vec<T>
900where
901    T: ConnectTo<Conductor> + 'static,
902{
903    fn instantiate_proxies(
904        self: Box<Self>,
905        req: InitializeRequest,
906    ) -> futures::future::BoxFuture<
907        'static,
908        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
909    > {
910        Box::pin(async move {
911            let components: Vec<DynConnectTo<Conductor>> =
912                (*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
913            Ok((req, components))
914        })
915    }
916}
917
918/// Dynamic implementation: closure receives the Initialize request and returns proxies.
919impl<F, Fut> InstantiateProxies for F
920where
921    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
922    Fut: std::future::Future<
923            Output = Result<
924                (InitializeRequest, Vec<DynConnectTo<Conductor>>),
925                agent_client_protocol::Error,
926            >,
927        > + Send
928        + 'static,
929{
930    fn instantiate_proxies(
931        self: Box<Self>,
932        req: InitializeRequest,
933    ) -> futures::future::BoxFuture<
934        'static,
935        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
936    > {
937        Box::pin(async move { (*self)(req).await })
938    }
939}
940
941/// Trait for lazy proxy and agent instantiation (agent mode).
942///
943/// Used by conductors in agent mode (`ConductorToClient`) where there are
944/// zero or more proxies followed by an agent component.
945pub trait InstantiateProxiesAndAgent: Send {
946    /// Instantiate proxy and agent components based on the Initialize request.
947    ///
948    /// Returns the (possibly modified) request, a vector of proxy components
949    /// (typed as `DynConnectTo<Conductor>`), and the agent component
950    /// (typed as `DynConnectTo<Client>`).
951    fn instantiate_proxies_and_agent(
952        self: Box<Self>,
953        req: InitializeRequest,
954    ) -> futures::future::BoxFuture<
955        'static,
956        Result<
957            (
958                InitializeRequest,
959                Vec<DynConnectTo<Conductor>>,
960                DynConnectTo<Client>,
961            ),
962            agent_client_protocol::Error,
963        >,
964    >;
965}
966
967/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
968#[derive(Debug)]
969pub struct AgentOnly<A>(pub A);
970
971impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
972    fn instantiate_proxies_and_agent(
973        self: Box<Self>,
974        req: InitializeRequest,
975    ) -> futures::future::BoxFuture<
976        'static,
977        Result<
978            (
979                InitializeRequest,
980                Vec<DynConnectTo<Conductor>>,
981                DynConnectTo<Client>,
982            ),
983            agent_client_protocol::Error,
984        >,
985    > {
986        Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
987    }
988}
989
990/// Builder for creating proxies and agent components.
991///
992/// # Example
993/// ```ignore
994/// ProxiesAndAgent::new(ElizaAgent::new())
995///     .proxy(LoggingProxy::new())
996///     .proxy(AuthProxy::new())
997/// ```
998#[derive(Debug)]
999pub struct ProxiesAndAgent {
1000    proxies: Vec<DynConnectTo<Conductor>>,
1001    agent: DynConnectTo<Client>,
1002}
1003
1004impl ProxiesAndAgent {
1005    /// Create a new builder with the given agent component.
1006    pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
1007        Self {
1008            proxies: vec![],
1009            agent: DynConnectTo::new(agent),
1010        }
1011    }
1012
1013    /// Add a single proxy component.
1014    #[must_use]
1015    pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
1016        self.proxies.push(DynConnectTo::new(proxy));
1017        self
1018    }
1019
1020    /// Add multiple proxy components.
1021    #[must_use]
1022    pub fn proxies<P, I>(mut self, proxies: I) -> Self
1023    where
1024        P: ConnectTo<Conductor> + 'static,
1025        I: IntoIterator<Item = P>,
1026    {
1027        self.proxies
1028            .extend(proxies.into_iter().map(DynConnectTo::new));
1029        self
1030    }
1031}
1032
1033impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1034    fn instantiate_proxies_and_agent(
1035        self: Box<Self>,
1036        req: InitializeRequest,
1037    ) -> futures::future::BoxFuture<
1038        'static,
1039        Result<
1040            (
1041                InitializeRequest,
1042                Vec<DynConnectTo<Conductor>>,
1043                DynConnectTo<Client>,
1044            ),
1045            agent_client_protocol::Error,
1046        >,
1047    > {
1048        Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1049    }
1050}
1051
1052/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1053impl<F, Fut> InstantiateProxiesAndAgent for F
1054where
1055    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1056    Fut: std::future::Future<
1057            Output = Result<
1058                (
1059                    InitializeRequest,
1060                    Vec<DynConnectTo<Conductor>>,
1061                    DynConnectTo<Client>,
1062                ),
1063                agent_client_protocol::Error,
1064            >,
1065        > + Send
1066        + 'static,
1067{
1068    fn instantiate_proxies_and_agent(
1069        self: Box<Self>,
1070        req: InitializeRequest,
1071    ) -> futures::future::BoxFuture<
1072        'static,
1073        Result<
1074            (
1075                InitializeRequest,
1076                Vec<DynConnectTo<Conductor>>,
1077                DynConnectTo<Client>,
1078            ),
1079            agent_client_protocol::Error,
1080        >,
1081    > {
1082        Box::pin(async move { (*self)(req).await })
1083    }
1084}
1085
1086/// Messages sent to the conductor's main event loop for routing.
1087///
1088/// These messages enable the conductor to route communication between:
1089/// - The editor and the first component
1090/// - Components and their successors in the chain
1091/// - Components and their clients (editor or predecessor)
1092///
1093/// All spawned tasks send messages via this enum through a shared channel,
1094/// allowing centralized routing logic in the `serve()` loop.
1095#[derive(Debug)]
1096pub enum ConductorMessage {
1097    /// If this message is a request or notification, then it is going "left-to-right"
1098    /// (e.g., a component making a request of its successor).
1099    ///
1100    /// If this message is a response, then it is going right-to-left
1101    /// (i.e., the successor answering a request made by its predecessor).
1102    LeftToRight {
1103        target_component_index: usize,
1104        message: Dispatch,
1105    },
1106
1107    /// If this message is a request or notification, then it is going "right-to-left"
1108    /// (e.g., a component making a request of its predecessor).
1109    ///
1110    /// If this message is a response, then it is going "left-to-right"
1111    /// (i.e., the predecessor answering a request made by its successor).
1112    RightToLeft {
1113        source_component_index: SourceComponentIndex,
1114        message: Dispatch,
1115    },
1116}
1117
1118/// Trait implemented for the two links the conductor can use:
1119///
1120/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1121/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1122pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
1123    /// The type used to instantiate components for this link type.
1124    type Instantiator: Send;
1125
1126    /// Handle initialization: parse the init request, instantiate components, and spawn them.
1127    ///
1128    /// Takes ownership of the instantiator and returns the (possibly modified) init request
1129    /// wrapped in a Dispatch for forwarding.
1130    fn initialize(
1131        &self,
1132        message: Dispatch,
1133        connection: ConnectionTo<Self::Counterpart>,
1134        instantiator: Self::Instantiator,
1135        responder: &mut ConductorResponder<Self>,
1136    ) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
1137
1138    /// Handle an incoming message from the client or conductor, depending on `Self`
1139    fn handle_dispatch(
1140        &self,
1141        message: Dispatch,
1142        connection: ConnectionTo<Self::Counterpart>,
1143        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1144    ) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
1145}
1146
1147/// Conductor acting as an agent
1148impl ConductorHostRole for Agent {
1149    type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1150
1151    async fn initialize(
1152        &self,
1153        message: Dispatch,
1154        client_connection: ConnectionTo<Client>,
1155        instantiator: Self::Instantiator,
1156        responder: &mut ConductorResponder<Self>,
1157    ) -> Result<Dispatch, agent_client_protocol::Error> {
1158        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1159
1160        // Not yet initialized - expect an initialize request.
1161        // Error if we get anything else.
1162        let Dispatch::Request(request, init_responder) = message else {
1163            message.respond_with_error(invalid_request(), client_connection.clone())?;
1164            return Err(invalid_request());
1165        };
1166        if !InitializeRequest::matches_method(request.method()) {
1167            init_responder.respond_with_error(invalid_request())?;
1168            return Err(invalid_request());
1169        }
1170
1171        let init_request =
1172            match InitializeRequest::parse_message(request.method(), request.params()) {
1173                Ok(r) => r,
1174                Err(error) => {
1175                    init_responder.respond_with_error(error)?;
1176                    return Err(invalid_request());
1177                }
1178            };
1179
1180        // Instantiate proxies and agent
1181        let (modified_req, proxy_components, agent_component) = instantiator
1182            .instantiate_proxies_and_agent(init_request)
1183            .await?;
1184
1185        // Spawn the agent component
1186        debug!(?agent_component, "spawning agent");
1187
1188        let connection_to_agent = client_connection.spawn_connection(
1189            Client
1190                .builder()
1191                .name("conductor-to-agent")
1192                // Intercept agent-to-client messages from the agent.
1193                .on_receive_dispatch(
1194                    {
1195                        let mut conductor_tx = responder.conductor_tx.clone();
1196                        async move |dispatch: Dispatch, _cx| {
1197                            conductor_tx
1198                                .send(ConductorMessage::RightToLeft {
1199                                    source_component_index: SourceComponentIndex::Successor,
1200                                    message: dispatch,
1201                                })
1202                                .await
1203                                .map_err(agent_client_protocol::util::internal_error)
1204                        }
1205                    },
1206                    agent_client_protocol::on_receive_dispatch!(),
1207                ),
1208            agent_component,
1209        )?;
1210        responder.successor = Arc::new(connection_to_agent);
1211
1212        // Spawn the proxy components
1213        responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1214
1215        Ok(Dispatch::Request(
1216            modified_req.to_untyped_message()?,
1217            init_responder,
1218        ))
1219    }
1220
1221    async fn handle_dispatch(
1222        &self,
1223        message: Dispatch,
1224        client_connection: ConnectionTo<Client>,
1225        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1226    ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1227        tracing::debug!(
1228            method = ?message.method(),
1229            "ConductorToClient::handle_dispatch"
1230        );
1231        MatchDispatchFrom::new(message, &client_connection)
1232            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1233            .if_message_from(Client, async move |message: Dispatch| {
1234                tracing::debug!(
1235                    method = ?message.method(),
1236                    "ConductorToClient::handle_dispatch - matched Client"
1237                );
1238                ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
1239            })
1240            .await
1241            .done()
1242    }
1243}
1244
1245/// Conductor acting as a proxy
1246impl ConductorHostRole for Proxy {
1247    type Instantiator = Box<dyn InstantiateProxies>;
1248
1249    async fn initialize(
1250        &self,
1251        message: Dispatch,
1252        client_connection: ConnectionTo<Conductor>,
1253        instantiator: Self::Instantiator,
1254        responder: &mut ConductorResponder<Self>,
1255    ) -> Result<Dispatch, agent_client_protocol::Error> {
1256        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1257
1258        // Not yet initialized - expect an InitializeProxy request.
1259        // Error if we get anything else.
1260        let Dispatch::Request(request, init_responder) = message else {
1261            message.respond_with_error(invalid_request(), client_connection.clone())?;
1262            return Err(invalid_request());
1263        };
1264        if !InitializeProxyRequest::matches_method(request.method()) {
1265            init_responder.respond_with_error(invalid_request())?;
1266            return Err(invalid_request());
1267        }
1268
1269        let InitializeProxyRequest { initialize } =
1270            match InitializeProxyRequest::parse_message(request.method(), request.params()) {
1271                Ok(r) => r,
1272                Err(error) => {
1273                    init_responder.respond_with_error(error)?;
1274                    return Err(invalid_request());
1275                }
1276            };
1277
1278        tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1279
1280        // Instantiate proxies (no agent in proxy mode)
1281        let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1282
1283        // In proxy mode, our successor is the outer conductor (via our client connection)
1284        responder.successor = Arc::new(GrandSuccessor);
1285
1286        // Spawn the proxy components
1287        responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1288
1289        Ok(Dispatch::Request(
1290            modified_req.to_untyped_message()?,
1291            init_responder,
1292        ))
1293    }
1294
1295    async fn handle_dispatch(
1296        &self,
1297        message: Dispatch,
1298        client_connection: ConnectionTo<Conductor>,
1299        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1300    ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1301        tracing::debug!(
1302            method = ?message.method(),
1303            ?message,
1304            "ConductorToConductor::handle_dispatch"
1305        );
1306        MatchDispatchFrom::new(message, &client_connection)
1307            .if_message_from(Agent, {
1308                // Messages from our successor arrive already unwrapped
1309                // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1310                async |message: Dispatch| {
1311                    tracing::debug!(
1312                        method = ?message.method(),
1313                        "ConductorToConductor::handle_dispatch - matched Agent"
1314                    );
1315                    let mut conductor_tx = conductor_tx.clone();
1316                    ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
1317                        .await
1318                }
1319            })
1320            .await
1321            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1322            .if_message_from(Client, async |message: Dispatch| {
1323                tracing::debug!(
1324                    method = ?message.method(),
1325                    "ConductorToConductor::handle_dispatch - matched Client"
1326                );
1327                let mut conductor_tx = conductor_tx.clone();
1328                ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
1329                    .await
1330            })
1331            .await
1332            .done()
1333    }
1334}
1335
1336pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
1337    /// Send a message to the successor.
1338    fn send_message<'a>(
1339        &self,
1340        message: Dispatch,
1341        connection_to_conductor: ConnectionTo<Host::Counterpart>,
1342        responder: &'a mut ConductorResponder<Host>,
1343    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
1344}
1345
1346impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
1347    fn send_message<'a>(
1348        &self,
1349        #[expect(unused_variables)] message: Dispatch,
1350        #[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
1351        #[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
1352    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1353        let error = self.clone();
1354        Box::pin(std::future::ready(Err(error)))
1355    }
1356}
1357
1358/// A dummy type handling messages sent to the conductor's
1359/// successor when it is acting as a proxy.
1360struct GrandSuccessor;
1361
1362/// When the conductor is acting as an proxy, messages sent by
1363/// the last proxy go to the conductor's successor.
1364///
1365/// ```text
1366/// client --> Conductor -----------------------------> GrandSuccessor
1367///            |                                  |
1368///            +-> Proxy[0] -> ... -> Proxy[n-1] -+
1369/// ```
1370impl ConductorSuccessor<Proxy> for GrandSuccessor {
1371    fn send_message<'a>(
1372        &self,
1373        message: Dispatch,
1374        connection: ConnectionTo<Conductor>,
1375        _responder: &'a mut ConductorResponder<Proxy>,
1376    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1377        Box::pin(async move {
1378            debug!("Proxy mode: forwarding successor message to conductor's successor");
1379            connection.send_proxied_message_to(Agent, message)
1380        })
1381    }
1382}
1383
1384/// When the conductor is acting as an agent, messages sent by
1385/// the last proxy to its successor go to the internal agent
1386/// (`self`).
1387impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
1388    fn send_message<'a>(
1389        &self,
1390        message: Dispatch,
1391        connection: ConnectionTo<Client>,
1392        responder: &'a mut ConductorResponder<Agent>,
1393    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1394        let connection_to_agent = self.clone();
1395        Box::pin(async move {
1396            debug!("Proxy mode: forwarding successor message to conductor's successor");
1397            responder
1398                .forward_message_to_agent(connection, message, connection_to_agent)
1399                .await
1400        })
1401    }
1402}