Skip to main content

agent_client_protocol_conductor/
conductor.rs

1//! # Conductor: ACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//!    requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//!    to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//!     "my-conductor",
73//!     vec![proxy1, proxy2, agent],
74//!     None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//!     "my-conductor",
87//!     |cx, conductor_tx, init_req| async move {
88//!         // Examine capabilities
89//!         let needs_auth = has_auth_capability(&init_req);
90//!
91//!         let mut components = Vec::new();
92//!         if needs_auth {
93//!             components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//!         }
95//!         components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//!         // Return (potentially modified) request and component list
98//!         Ok((init_req, components))
99//!     },
100//!     None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &ConnectionTo` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<ConnectionTo>` of spawned components
112
113use std::sync::Arc;
114
115use agent_client_protocol::{
116    Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
117    Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
118};
119use agent_client_protocol::{
120    Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest,
121};
122use agent_client_protocol::{
123    HandleDispatchFrom,
124    schema::{InitializeProxyRequest, InitializeRequest},
125    util::MatchDispatchFrom,
126};
127use agent_client_protocol::{Handled, schema::SuccessorMessage};
128use futures::{
129    SinkExt, StreamExt,
130    channel::mpsc::{self},
131};
132use tracing::{debug, info};
133
134/// The conductor manages the proxy chain lifecycle and message routing.
135///
136/// It maintains connections to all components in the chain and routes messages
137/// bidirectionally between the editor, components, and agent.
138///
139#[derive(Debug)]
140pub struct ConductorImpl<Host: ConductorHostRole> {
141    host: Host,
142    name: String,
143    instantiator: Host::Instantiator,
144    trace_writer: Option<crate::trace::TraceWriter>,
145}
146
147impl<Host: ConductorHostRole> ConductorImpl<Host> {
148    pub fn new(host: Host, name: impl ToString, instantiator: Host::Instantiator) -> Self {
149        ConductorImpl {
150            name: name.to_string(),
151            host,
152            instantiator,
153            trace_writer: None,
154        }
155    }
156}
157
158impl ConductorImpl<Agent> {
159    /// Create a conductor in agent mode (the last component is an agent).
160    pub fn new_agent(
161        name: impl ToString,
162        instantiator: impl InstantiateProxiesAndAgent + 'static,
163    ) -> Self {
164        ConductorImpl::new(Agent, name, Box::new(instantiator))
165    }
166}
167
168impl ConductorImpl<Proxy> {
169    /// Create a conductor in proxy mode (forwards to another conductor).
170    pub fn new_proxy(name: impl ToString, instantiator: impl InstantiateProxies + 'static) -> Self {
171        ConductorImpl::new(Proxy, name, Box::new(instantiator))
172    }
173}
174
175impl<Host: ConductorHostRole> ConductorImpl<Host> {
176    /// Enable trace logging to a custom destination.
177    ///
178    /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
179    #[must_use]
180    pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
181        self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
182        self
183    }
184
185    /// Enable trace logging to a file path.
186    ///
187    /// Events will be written as newline-delimited JSON (`.jsons` format).
188    /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
189    pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
190        self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
191        Ok(self)
192    }
193
194    /// Enable trace logging with an existing TraceWriter.
195    #[must_use]
196    pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
197        self.trace_writer = Some(writer);
198        self
199    }
200
201    /// Run the conductor with a transport.
202    pub async fn run(
203        self,
204        transport: impl ConnectTo<Host>,
205    ) -> Result<(), agent_client_protocol::Error> {
206        let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
207
208        // Set up tracing if enabled - spawn writer task and get handle
209        let trace_handle;
210        let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
211        if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
212            trace_handle = Some(h);
213            trace_future = Box::pin(f);
214        } else {
215            trace_handle = None;
216            trace_future = Box::pin(std::future::ready(Ok(())));
217        }
218
219        let responder = ConductorResponder {
220            conductor_rx,
221            conductor_tx: conductor_tx.clone(),
222            instantiator: Some(self.instantiator),
223            proxies: Vec::default(),
224            successor: Arc::new(agent_client_protocol::util::internal_error(
225                "successor not initialized",
226            )),
227            trace_handle,
228            host: self.host.clone(),
229        };
230
231        Builder::new_with(
232            self.host.clone(),
233            ConductorMessageHandler {
234                conductor_tx,
235                host: self.host.clone(),
236            },
237        )
238        .name(self.name)
239        .with_responder(responder)
240        .with_spawned(|_cx| trace_future)
241        .connect_to(transport)
242        .await
243    }
244
245    async fn incoming_message_from_client(
246        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
247        message: Dispatch,
248    ) -> Result<(), agent_client_protocol::Error> {
249        conductor_tx
250            .send(ConductorMessage::LeftToRight {
251                target_component_index: 0,
252                message,
253            })
254            .await
255            .map_err(agent_client_protocol::util::internal_error)
256    }
257
258    async fn incoming_message_from_agent(
259        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
260        message: Dispatch,
261    ) -> Result<(), agent_client_protocol::Error> {
262        conductor_tx
263            .send(ConductorMessage::RightToLeft {
264                source_component_index: SourceComponentIndex::Successor,
265                message,
266            })
267            .await
268            .map_err(agent_client_protocol::util::internal_error)
269    }
270}
271
272impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
273    async fn connect_to(
274        self,
275        client: impl ConnectTo<Host>,
276    ) -> Result<(), agent_client_protocol::Error> {
277        self.run(client).await
278    }
279}
280
281struct ConductorMessageHandler<Host: ConductorHostRole> {
282    conductor_tx: mpsc::Sender<ConductorMessage>,
283    host: Host,
284}
285
286impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
287    for ConductorMessageHandler<Host>
288{
289    async fn handle_dispatch_from(
290        &mut self,
291        message: Dispatch,
292        connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
293    ) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
294        self.host
295            .handle_dispatch(message, connection, &mut self.conductor_tx)
296            .await
297    }
298
299    fn describe_chain(&self) -> impl std::fmt::Debug {
300        "ConductorMessageHandler"
301    }
302}
303
304/// The conductor manages the proxy chain lifecycle and message routing.
305///
306/// It maintains connections to all components in the chain and routes messages
307/// bidirectionally between the editor, components, and agent.
308///
309pub struct ConductorResponder<Host>
310where
311    Host: ConductorHostRole,
312{
313    conductor_rx: mpsc::Receiver<ConductorMessage>,
314
315    conductor_tx: mpsc::Sender<ConductorMessage>,
316
317    /// The instantiator for lazy initialization.
318    /// Set to None after components are instantiated.
319    instantiator: Option<Host::Instantiator>,
320
321    /// The chain of proxies before the agent (if any).
322    ///
323    /// Populated lazily when the first Initialize request is received.
324    proxies: Vec<ConnectionTo<Proxy>>,
325
326    /// If the conductor is operating in agent mode, this will direct messages to the agent.
327    /// If the conductor is operating in proxy mode, this will direct messages to the successor.
328    /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
329    successor: Arc<dyn ConductorSuccessor<Host>>,
330
331    /// Optional trace handle for sequence diagram visualization.
332    trace_handle: Option<crate::trace::TraceHandle>,
333
334    /// Defines what sort of link we have
335    host: Host,
336}
337
338impl<Host> std::fmt::Debug for ConductorResponder<Host>
339where
340    Host: ConductorHostRole,
341{
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        f.debug_struct("ConductorResponder")
344            .field("conductor_rx", &self.conductor_rx)
345            .field("conductor_tx", &self.conductor_tx)
346            .field("proxies", &self.proxies)
347            .field("trace_handle", &self.trace_handle)
348            .field("host", &self.host)
349            .finish_non_exhaustive()
350    }
351}
352
353impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
354where
355    Host: ConductorHostRole,
356{
357    async fn run_with_connection_to(
358        mut self,
359        connection: ConnectionTo<Host::Counterpart>,
360    ) -> Result<(), agent_client_protocol::Error> {
361        // Components are now spawned lazily in forward_initialize_request
362        // when the first Initialize request is received.
363
364        // This is the "central actor" of the conductor. Most other things forward messages
365        // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
366        while let Some(message) = self.conductor_rx.next().await {
367            self.handle_conductor_message(connection.clone(), message)
368                .await?;
369        }
370        Ok(())
371    }
372}
373
374impl<Host> ConductorResponder<Host>
375where
376    Host: ConductorHostRole,
377{
378    /// Recursively spawns components and builds the proxy chain.
379    ///
380    /// This function implements the recursive chain building pattern:
381    /// 1. Pop the next component from the `providers` list
382    /// 2. Create the component (either spawn subprocess or use mock)
383    /// 3. Set up JSON-RPC connection and message handlers
384    /// 4. Recursively call itself to spawn the next component
385    /// 5. When no components remain, start the message routing loop via `serve()`
386    ///
387    /// Central message handling logic for the conductor.
388    /// The conductor routes all [`ConductorMessage`] messages through to this function.
389    /// Each message corresponds to a request or notification from one component to another.
390    /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
391    /// Note that *responses to requests* are sent *directly* without going through this loop.
392    ///
393    /// The names we use are
394    ///
395    /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
396    /// * Then there is a sequence of *components* consisting of:
397    ///     * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
398    ///     * And finally the *agent*, which is the final component in the chain and handles the actual work.
399    ///
400    /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
401    ///
402    /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
403    /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
404    ///   through a stdio server that runs on localhost and bridges messages.
405    async fn handle_conductor_message(
406        &mut self,
407        client: ConnectionTo<Host::Counterpart>,
408        message: ConductorMessage,
409    ) -> Result<(), agent_client_protocol::Error> {
410        tracing::debug!(?message, "handle_conductor_message");
411
412        match message {
413            ConductorMessage::LeftToRight {
414                target_component_index,
415                message,
416            } => {
417                // Tracing happens inside forward_client_to_agent_message, after initialization,
418                // so that component_name() has access to the populated proxies list.
419                self.forward_client_to_agent_message(target_component_index, message, client)
420                    .await
421            }
422
423            ConductorMessage::RightToLeft {
424                source_component_index,
425                message,
426            } => {
427                tracing::debug!(
428                    ?source_component_index,
429                    message_method = ?message.method(),
430                    "Conductor: AgentToClient received"
431                );
432                self.send_message_to_predecessor_of(client, source_component_index, message)
433            }
434        }
435    }
436
437    /// Send a message (request or notification) to the predecessor of the given component.
438    ///
439    /// This is a bit subtle because the relationship of the conductor
440    /// is different depending on who will be receiving the message:
441    /// * If the message is going to the conductor's client, then no changes
442    ///   are needed, as the conductor is sending an agent-to-client message and
443    ///   the conductor is acting as the agent.
444    /// * If the message is going to a proxy component, then we have to wrap
445    ///   it in a "from successor" wrapper, because the conductor is the
446    ///   proxy's client.
447    fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
448        &mut self,
449        client: ConnectionTo<Host::Counterpart>,
450        source_component_index: SourceComponentIndex,
451        message: Dispatch<Req, N>,
452    ) -> Result<(), agent_client_protocol::Error>
453    where
454        Req::Response: Send,
455    {
456        let source_component_index = match source_component_index {
457            SourceComponentIndex::Successor => self.proxies.len(),
458            SourceComponentIndex::Proxy(index) => index,
459        };
460
461        match message {
462            Dispatch::Request(request, responder) => self
463                .send_request_to_predecessor_of(client, source_component_index, request)
464                .forward_response_to(responder),
465            Dispatch::Notification(notification) => self.send_notification_to_predecessor_of(
466                client,
467                source_component_index,
468                notification,
469            ),
470            Dispatch::Response(result, router) => router.respond_with_result(result),
471        }
472    }
473
474    fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
475        &mut self,
476        client_connection: ConnectionTo<Host::Counterpart>,
477        source_component_index: usize,
478        request: Req,
479    ) -> SentRequest<Req::Response> {
480        if source_component_index == 0 {
481            client_connection.send_request_to(Client, request)
482        } else {
483            self.proxies[source_component_index - 1].send_request(SuccessorMessage {
484                message: request,
485                meta: None,
486            })
487        }
488    }
489
490    /// Send a notification to the predecessor of the given component.
491    ///
492    /// This is a bit subtle because the relationship of the conductor
493    /// is different depending on who will be receiving the message:
494    /// * If the notification is going to the conductor's client, then no changes
495    ///   are needed, as the conductor is sending an agent-to-client message and
496    ///   the conductor is acting as the agent.
497    /// * If the notification is going to a proxy component, then we have to wrap
498    ///   it in a "from successor" wrapper, because the conductor is the
499    ///   proxy's client.
500    fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
501        &mut self,
502        client: ConnectionTo<Host::Counterpart>,
503        source_component_index: usize,
504        notification: N,
505    ) -> Result<(), agent_client_protocol::Error> {
506        tracing::debug!(
507            source_component_index,
508            proxies_len = self.proxies.len(),
509            "send_notification_to_predecessor_of"
510        );
511        if source_component_index == 0 {
512            tracing::debug!("Sending notification directly to client");
513            client.send_notification_to(Client, notification)
514        } else {
515            tracing::debug!(
516                target_proxy = source_component_index - 1,
517                "Sending notification wrapped as SuccessorMessage to proxy"
518            );
519            self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
520                message: notification,
521                meta: None,
522            })
523        }
524    }
525
526    /// Send a message (request or notification) from 'left to right'.
527    /// Left-to-right means from the client or an intermediate proxy to the component
528    /// at `target_component_index` (could be a proxy or the agent).
529    /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
530    async fn forward_client_to_agent_message(
531        &mut self,
532        target_component_index: usize,
533        message: Dispatch,
534        client: ConnectionTo<Host::Counterpart>,
535    ) -> Result<(), agent_client_protocol::Error> {
536        tracing::trace!(
537            target_component_index,
538            ?message,
539            "forward_client_to_agent_message"
540        );
541
542        // Ensure components are initialized before processing any message.
543        let message = self.ensure_initialized(client.clone(), message).await?;
544
545        // In proxy mode, if the target is beyond our component chain,
546        // forward to the conductor's own successor (via client connection)
547        if target_component_index < self.proxies.len() {
548            self.forward_message_from_client_to_proxy(target_component_index, message)
549                .await
550        } else {
551            assert_eq!(target_component_index, self.proxies.len());
552
553            debug!(
554                target_component_index,
555                proxies_count = self.proxies.len(),
556                "Proxy mode: forwarding successor message to conductor's successor"
557            );
558            let successor = self.successor.clone();
559            successor.send_message(message, client, self).await
560        }
561    }
562
563    /// Ensures components are initialized before processing messages.
564    ///
565    /// If components haven't been initialized yet, this expects the first message
566    /// to be an `initialize` request and uses it to spawn the component chain.
567    ///
568    /// Returns:
569    /// - `Ok(Some(message))` - Components are initialized, continue processing this message
570    /// - `Ok(None)` - An error response was sent, caller should return early
571    /// - `Err(_)` - A fatal error occurred
572    async fn ensure_initialized(
573        &mut self,
574        client: ConnectionTo<Host::Counterpart>,
575        message: Dispatch,
576    ) -> Result<Dispatch, Error> {
577        // Already initialized - pass through
578        let Some(instantiator) = self.instantiator.take() else {
579            return Ok(message);
580        };
581
582        let host = self.host.clone();
583        let message = host.initialize(message, client, instantiator, self).await?;
584        Ok(message)
585    }
586
587    /// Wrap a proxy component with tracing if tracing is enabled.
588    ///
589    /// Returns the component unchanged if tracing is disabled.
590    fn trace_proxy(
591        &self,
592        proxy_index: ComponentIndex,
593        successor_index: ComponentIndex,
594        component: impl ConnectTo<Conductor>,
595    ) -> DynConnectTo<Conductor> {
596        match &self.trace_handle {
597            Some(trace_handle) => {
598                trace_handle.bridge_component(proxy_index, successor_index, component)
599            }
600            None => DynConnectTo::new(component),
601        }
602    }
603
604    /// Spawn proxy components and add them to the proxies list.
605    fn spawn_proxies(
606        &mut self,
607        client: ConnectionTo<Host::Counterpart>,
608        proxy_components: Vec<DynConnectTo<Conductor>>,
609    ) -> Result<(), agent_client_protocol::Error> {
610        assert!(self.proxies.is_empty());
611
612        let num_proxies = proxy_components.len();
613        info!(proxy_count = num_proxies, "spawn_proxies");
614
615        // Special case: if there are no user-defined proxies
616        // but tracing is enabled, we make a dummy proxy that just
617        // passes through messages but which can trigger the
618        // tracing events.
619        if self.trace_handle.is_some() && num_proxies == 0 {
620            self.connect_to_proxy(
621                &client,
622                0,
623                ComponentIndex::Client,
624                ComponentIndex::Agent,
625                Proxy.builder(),
626            )?;
627        } else {
628            // Spawn each proxy component
629            for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
630                debug!(component_index, "spawning proxy");
631
632                self.connect_to_proxy(
633                    &client,
634                    component_index,
635                    ComponentIndex::Proxy(component_index),
636                    ComponentIndex::successor_of(component_index, num_proxies),
637                    dyn_component,
638                )?;
639            }
640        }
641
642        info!(proxy_count = self.proxies.len(), "Proxies spawned");
643
644        Ok(())
645    }
646
647    /// Create a connection to the proxy with index `component_index` implemented in `component`.
648    ///
649    /// If tracing is enabled, the proxy's index is `trace_proxy_index` and its successor is `trace_successor_index`.
650    fn connect_to_proxy(
651        &mut self,
652        client: &ConnectionTo<Host::Counterpart>,
653        component_index: usize,
654        trace_proxy_index: ComponentIndex,
655        trace_successor_index: ComponentIndex,
656        component: impl ConnectTo<Conductor>,
657    ) -> Result<(), Error> {
658        let connection_builder = self.connection_to_proxy(component_index);
659        let connect_component =
660            self.trace_proxy(trace_proxy_index, trace_successor_index, component);
661        let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
662        self.proxies.push(proxy_connection);
663        Ok(())
664    }
665
666    /// Create the conductor's connection to the proxy with index `component_index`.
667    ///
668    /// Outgoing messages received from the proxy are sent to `self.conductor_tx` as either
669    /// left-to-right or right-to-left messages depending on whether they are wrapped
670    /// in `SuccessorMessage`.
671    fn connection_to_proxy(
672        &mut self,
673        component_index: usize,
674    ) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
675        type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
676        let mut conductor_tx = self.conductor_tx.clone();
677        Conductor
678            .builder()
679            .name(format!("conductor-to-component({component_index})"))
680            // Intercept messages sent by the proxy.
681            .on_receive_dispatch(
682                async move |dispatch: Dispatch, _connection| {
683                    MatchDispatch::new(dispatch)
684                        .if_message(async |dispatch: SuccessorDispatch| {
685                            //                         ------------------
686                            // SuccessorMessages sent by the proxy go to its successor.
687                            //
688                            // Subtle point:
689                            //
690                            // `ConductorToProxy` has only a single peer, `Agent`. This means that we see
691                            // "successor messages" in their "desugared form". So when we intercept an *outgoing*
692                            // message that matches `SuccessorMessage`, it could be one of three things
693                            //
694                            // - A request being sent by the proxy to its successor (hence going left->right)
695                            // - A notification being sent by the proxy to its successor (hence going left->right)
696                            // - A response to a request sent to the proxy *by* its successor. Here, the *request*
697                            //   was going right->left, but the *response* (the message we are processing now)
698                            //   is going left->right.
699                            //
700                            // So, in all cases, we forward as a left->right message.
701
702                            conductor_tx
703                                .send(ConductorMessage::LeftToRight {
704                                    target_component_index: component_index + 1,
705                                    message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
706                                })
707                                .await
708                                .map_err(agent_client_protocol::util::internal_error)
709                        })
710                        .await
711                        .otherwise(async |dispatch| {
712                            // Other messagrs send by the proxy go its predecessor.
713                            // As in the previous handler:
714                            //
715                            // Messages here are seen in their "desugared form", so we are seeing
716                            // one of three things
717                            //
718                            // - A request being sent by the proxy to its predecessor (hence going right->left)
719                            // - A notification being sent by the proxy to its predecessor (hence going right->left)
720                            // - A response to a request sent to the proxy *by* its predecessor. Here, the *request*
721                            //   was going left->right, but the *response* (the message we are processing now)
722                            //   is going right->left.
723                            //
724                            // So, in all cases, we forward as a right->left message.
725
726                            let message = ConductorMessage::RightToLeft {
727                                source_component_index: SourceComponentIndex::Proxy(
728                                    component_index,
729                                ),
730                                message: dispatch,
731                            };
732                            conductor_tx
733                                .send(message)
734                                .await
735                                .map_err(agent_client_protocol::util::internal_error)
736                        })
737                        .await
738                },
739                agent_client_protocol::on_receive_dispatch!(),
740            )
741    }
742
743    async fn forward_message_from_client_to_proxy(
744        &mut self,
745        target_component_index: usize,
746        message: Dispatch,
747    ) -> Result<(), agent_client_protocol::Error> {
748        tracing::debug!(?message, "forward_message_to_proxy");
749
750        MatchDispatch::new(message)
751            .if_request(async |_request: InitializeProxyRequest, responder| {
752                responder.respond_with_error(
753                    agent_client_protocol::Error::invalid_request()
754                        .data("initialize/proxy requests are only sent by the conductor"),
755                )
756            })
757            .await
758            .if_request(async |request: InitializeRequest, responder| {
759                // The pattern for `Initialize` messages is a bit subtle.
760                // Proxy receive incoming `Initialize` messages as if they
761                // were a client. The conductor (us) intercepts these and
762                // converts them to an `InitializeProxyRequest`.
763                //
764                // The proxy will then initialize itself and forward an `Initialize`
765                // request to its successor.
766                self.proxies[target_component_index]
767                    .send_request(InitializeProxyRequest::from(request))
768                    .on_receiving_result(async move |result| {
769                        tracing::debug!(?result, "got initialize_proxy response from proxy");
770                        responder.respond_with_result(result)
771                    })
772            })
773            .await
774            .otherwise(async |message| {
775                // Otherwise, just send the message along "as is".
776                self.proxies[target_component_index].send_proxied_message(message)
777            })
778            .await
779    }
780
781    /// Invoked when sending a message from the conductor to the agent that it manages.
782    /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
783    /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
784    /// running as a proxy).
785    async fn forward_message_to_agent(
786        &mut self,
787        _client_connection: ConnectionTo<Host::Counterpart>,
788        message: Dispatch,
789        agent_connection: ConnectionTo<Agent>,
790    ) -> Result<(), Error> {
791        MatchDispatch::new(message)
792            .if_request(async |_request: InitializeProxyRequest, responder| {
793                responder.respond_with_error(
794                    agent_client_protocol::Error::invalid_request()
795                        .data("initialize/proxy requests are only sent by the conductor"),
796                )
797            })
798            .await
799            .otherwise(async |message| {
800                // Forward all other messages to the agent as-is.
801                agent_connection.send_proxied_message_to(Agent, message)
802            })
803            .await
804    }
805}
806
807/// Identifies a component in the conductor's chain for tracing purposes.
808///
809/// Used to track message sources and destinations through the proxy chain.
810#[derive(Debug, Clone, Copy)]
811pub enum ComponentIndex {
812    /// The client (editor) at the start of the chain.
813    Client,
814
815    /// A proxy component at the given index.
816    Proxy(usize),
817
818    /// The successor (agent in agent mode, outer conductor in proxy mode).
819    Agent,
820}
821
822impl ComponentIndex {
823    /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
824    #[must_use]
825    pub fn predecessor_of(proxy_index: usize) -> Self {
826        match proxy_index.checked_sub(1) {
827            Some(p_i) => ComponentIndex::Proxy(p_i),
828            None => ComponentIndex::Client,
829        }
830    }
831
832    /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
833    #[must_use]
834    pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
835        if proxy_index == num_proxies {
836            ComponentIndex::Agent
837        } else {
838            ComponentIndex::Proxy(proxy_index + 1)
839        }
840    }
841}
842
843/// Identifies the source of an agent-to-client message.
844///
845/// This enum handles the fact that the conductor may receive messages from two different sources:
846/// 1. From one of its managed components (identified by index)
847/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
848#[derive(Debug, Clone, Copy)]
849pub enum SourceComponentIndex {
850    /// Message from a specific component at the given index in the managed chain.
851    Proxy(usize),
852
853    /// Message from the conductor's agent or successor.
854    Successor,
855}
856
857/// Trait for lazy proxy instantiation (proxy mode).
858///
859/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
860/// are proxies that forward to an outer conductor.
861pub trait InstantiateProxies: Send {
862    /// Instantiate proxy components based on the Initialize request.
863    ///
864    /// Returns proxy components typed as `DynConnectTo<Conductor>` since proxies
865    /// communicate with the conductor.
866    fn instantiate_proxies(
867        self: Box<Self>,
868        req: InitializeRequest,
869    ) -> futures::future::BoxFuture<
870        'static,
871        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
872    >;
873}
874
875/// Simple implementation: provide all proxy components unconditionally.
876///
877/// Requires `T: ConnectTo<Conductor>`.
878impl<T> InstantiateProxies for Vec<T>
879where
880    T: ConnectTo<Conductor> + 'static,
881{
882    fn instantiate_proxies(
883        self: Box<Self>,
884        req: InitializeRequest,
885    ) -> futures::future::BoxFuture<
886        'static,
887        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
888    > {
889        Box::pin(async move {
890            let components: Vec<DynConnectTo<Conductor>> =
891                (*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
892            Ok((req, components))
893        })
894    }
895}
896
897/// Dynamic implementation: closure receives the Initialize request and returns proxies.
898impl<F, Fut> InstantiateProxies for F
899where
900    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
901    Fut: std::future::Future<
902            Output = Result<
903                (InitializeRequest, Vec<DynConnectTo<Conductor>>),
904                agent_client_protocol::Error,
905            >,
906        > + Send
907        + 'static,
908{
909    fn instantiate_proxies(
910        self: Box<Self>,
911        req: InitializeRequest,
912    ) -> futures::future::BoxFuture<
913        'static,
914        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
915    > {
916        Box::pin(async move { (*self)(req).await })
917    }
918}
919
920/// Trait for lazy proxy and agent instantiation (agent mode).
921///
922/// Used by conductors in agent mode (`ConductorToClient`) where there are
923/// zero or more proxies followed by an agent component.
924pub trait InstantiateProxiesAndAgent: Send {
925    /// Instantiate proxy and agent components based on the Initialize request.
926    ///
927    /// Returns the (possibly modified) request, a vector of proxy components
928    /// (typed as `DynConnectTo<Conductor>`), and the agent component
929    /// (typed as `DynConnectTo<Client>`).
930    fn instantiate_proxies_and_agent(
931        self: Box<Self>,
932        req: InitializeRequest,
933    ) -> futures::future::BoxFuture<
934        'static,
935        Result<
936            (
937                InitializeRequest,
938                Vec<DynConnectTo<Conductor>>,
939                DynConnectTo<Client>,
940            ),
941            agent_client_protocol::Error,
942        >,
943    >;
944}
945
946/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
947#[derive(Debug)]
948pub struct AgentOnly<A>(pub A);
949
950impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
951    fn instantiate_proxies_and_agent(
952        self: Box<Self>,
953        req: InitializeRequest,
954    ) -> futures::future::BoxFuture<
955        'static,
956        Result<
957            (
958                InitializeRequest,
959                Vec<DynConnectTo<Conductor>>,
960                DynConnectTo<Client>,
961            ),
962            agent_client_protocol::Error,
963        >,
964    > {
965        Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
966    }
967}
968
969/// Builder for creating proxies and agent components.
970///
971/// # Example
972/// ```ignore
973/// ProxiesAndAgent::new(ElizaAgent::new())
974///     .proxy(LoggingProxy::new())
975///     .proxy(AuthProxy::new())
976/// ```
977#[derive(Debug)]
978pub struct ProxiesAndAgent {
979    proxies: Vec<DynConnectTo<Conductor>>,
980    agent: DynConnectTo<Client>,
981}
982
983impl ProxiesAndAgent {
984    /// Create a new builder with the given agent component.
985    pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
986        Self {
987            proxies: vec![],
988            agent: DynConnectTo::new(agent),
989        }
990    }
991
992    /// Add a single proxy component.
993    #[must_use]
994    pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
995        self.proxies.push(DynConnectTo::new(proxy));
996        self
997    }
998
999    /// Add multiple proxy components.
1000    #[must_use]
1001    pub fn proxies<P, I>(mut self, proxies: I) -> Self
1002    where
1003        P: ConnectTo<Conductor> + 'static,
1004        I: IntoIterator<Item = P>,
1005    {
1006        self.proxies
1007            .extend(proxies.into_iter().map(DynConnectTo::new));
1008        self
1009    }
1010}
1011
1012impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1013    fn instantiate_proxies_and_agent(
1014        self: Box<Self>,
1015        req: InitializeRequest,
1016    ) -> futures::future::BoxFuture<
1017        'static,
1018        Result<
1019            (
1020                InitializeRequest,
1021                Vec<DynConnectTo<Conductor>>,
1022                DynConnectTo<Client>,
1023            ),
1024            agent_client_protocol::Error,
1025        >,
1026    > {
1027        Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1028    }
1029}
1030
1031/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1032impl<F, Fut> InstantiateProxiesAndAgent for F
1033where
1034    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1035    Fut: std::future::Future<
1036            Output = Result<
1037                (
1038                    InitializeRequest,
1039                    Vec<DynConnectTo<Conductor>>,
1040                    DynConnectTo<Client>,
1041                ),
1042                agent_client_protocol::Error,
1043            >,
1044        > + Send
1045        + 'static,
1046{
1047    fn instantiate_proxies_and_agent(
1048        self: Box<Self>,
1049        req: InitializeRequest,
1050    ) -> futures::future::BoxFuture<
1051        'static,
1052        Result<
1053            (
1054                InitializeRequest,
1055                Vec<DynConnectTo<Conductor>>,
1056                DynConnectTo<Client>,
1057            ),
1058            agent_client_protocol::Error,
1059        >,
1060    > {
1061        Box::pin(async move { (*self)(req).await })
1062    }
1063}
1064
1065/// Messages sent to the conductor's main event loop for routing.
1066///
1067/// These messages enable the conductor to route communication between:
1068/// - The editor and the first component
1069/// - Components and their successors in the chain
1070/// - Components and their clients (editor or predecessor)
1071///
1072/// All spawned tasks send messages via this enum through a shared channel,
1073/// allowing centralized routing logic in the `serve()` loop.
1074#[derive(Debug)]
1075pub enum ConductorMessage {
1076    /// If this message is a request or notification, then it is going "left-to-right"
1077    /// (e.g., a component making a request of its successor).
1078    ///
1079    /// If this message is a response, then it is going right-to-left
1080    /// (i.e., the successor answering a request made by its predecessor).
1081    LeftToRight {
1082        target_component_index: usize,
1083        message: Dispatch,
1084    },
1085
1086    /// If this message is a request or notification, then it is going "right-to-left"
1087    /// (e.g., a component making a request of its predecessor).
1088    ///
1089    /// If this message is a response, then it is going "left-to-right"
1090    /// (i.e., the predecessor answering a request made by its successor).
1091    RightToLeft {
1092        source_component_index: SourceComponentIndex,
1093        message: Dispatch,
1094    },
1095}
1096
1097/// Trait implemented for the two links the conductor can use:
1098///
1099/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1100/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1101pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
1102    /// The type used to instantiate components for this link type.
1103    type Instantiator: Send;
1104
1105    /// Handle initialization: parse the init request, instantiate components, and spawn them.
1106    ///
1107    /// Takes ownership of the instantiator and returns the (possibly modified) init request
1108    /// wrapped in a Dispatch for forwarding.
1109    fn initialize(
1110        &self,
1111        message: Dispatch,
1112        connection: ConnectionTo<Self::Counterpart>,
1113        instantiator: Self::Instantiator,
1114        responder: &mut ConductorResponder<Self>,
1115    ) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
1116
1117    /// Handle an incoming message from the client or conductor, depending on `Self`
1118    fn handle_dispatch(
1119        &self,
1120        message: Dispatch,
1121        connection: ConnectionTo<Self::Counterpart>,
1122        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1123    ) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
1124}
1125
1126/// Conductor acting as an agent
1127impl ConductorHostRole for Agent {
1128    type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1129
1130    async fn initialize(
1131        &self,
1132        message: Dispatch,
1133        client_connection: ConnectionTo<Client>,
1134        instantiator: Self::Instantiator,
1135        responder: &mut ConductorResponder<Self>,
1136    ) -> Result<Dispatch, agent_client_protocol::Error> {
1137        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1138
1139        // Not yet initialized - expect an initialize request.
1140        // Error if we get anything else.
1141        let Dispatch::Request(request, init_responder) = message else {
1142            message.respond_with_error(invalid_request(), client_connection.clone())?;
1143            return Err(invalid_request());
1144        };
1145        if !InitializeRequest::matches_method(request.method()) {
1146            init_responder.respond_with_error(invalid_request())?;
1147            return Err(invalid_request());
1148        }
1149
1150        let init_request =
1151            match InitializeRequest::parse_message(request.method(), request.params()) {
1152                Ok(r) => r,
1153                Err(error) => {
1154                    init_responder.respond_with_error(error)?;
1155                    return Err(invalid_request());
1156                }
1157            };
1158
1159        // Instantiate proxies and agent
1160        let (modified_req, proxy_components, agent_component) = instantiator
1161            .instantiate_proxies_and_agent(init_request)
1162            .await?;
1163
1164        // Spawn the agent component
1165        debug!(?agent_component, "spawning agent");
1166
1167        let connection_to_agent = client_connection.spawn_connection(
1168            Client
1169                .builder()
1170                .name("conductor-to-agent")
1171                // Intercept agent-to-client messages from the agent.
1172                .on_receive_dispatch(
1173                    {
1174                        let mut conductor_tx = responder.conductor_tx.clone();
1175                        async move |dispatch: Dispatch, _cx| {
1176                            conductor_tx
1177                                .send(ConductorMessage::RightToLeft {
1178                                    source_component_index: SourceComponentIndex::Successor,
1179                                    message: dispatch,
1180                                })
1181                                .await
1182                                .map_err(agent_client_protocol::util::internal_error)
1183                        }
1184                    },
1185                    agent_client_protocol::on_receive_dispatch!(),
1186                ),
1187            agent_component,
1188        )?;
1189        responder.successor = Arc::new(connection_to_agent);
1190
1191        // Spawn the proxy components
1192        responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1193
1194        Ok(Dispatch::Request(
1195            modified_req.to_untyped_message()?,
1196            init_responder,
1197        ))
1198    }
1199
1200    async fn handle_dispatch(
1201        &self,
1202        message: Dispatch,
1203        client_connection: ConnectionTo<Client>,
1204        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1205    ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1206        tracing::debug!(
1207            method = ?message.method(),
1208            "ConductorToClient::handle_dispatch"
1209        );
1210        MatchDispatchFrom::new(message, &client_connection)
1211            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1212            .if_message_from(Client, async move |message: Dispatch| {
1213                tracing::debug!(
1214                    method = ?message.method(),
1215                    "ConductorToClient::handle_dispatch - matched Client"
1216                );
1217                ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
1218            })
1219            .await
1220            .done()
1221    }
1222}
1223
1224/// Conductor acting as a proxy
1225impl ConductorHostRole for Proxy {
1226    type Instantiator = Box<dyn InstantiateProxies>;
1227
1228    async fn initialize(
1229        &self,
1230        message: Dispatch,
1231        client_connection: ConnectionTo<Conductor>,
1232        instantiator: Self::Instantiator,
1233        responder: &mut ConductorResponder<Self>,
1234    ) -> Result<Dispatch, agent_client_protocol::Error> {
1235        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1236
1237        // Not yet initialized - expect an InitializeProxy request.
1238        // Error if we get anything else.
1239        let Dispatch::Request(request, init_responder) = message else {
1240            message.respond_with_error(invalid_request(), client_connection.clone())?;
1241            return Err(invalid_request());
1242        };
1243        if !InitializeProxyRequest::matches_method(request.method()) {
1244            init_responder.respond_with_error(invalid_request())?;
1245            return Err(invalid_request());
1246        }
1247
1248        let InitializeProxyRequest { initialize } =
1249            match InitializeProxyRequest::parse_message(request.method(), request.params()) {
1250                Ok(r) => r,
1251                Err(error) => {
1252                    init_responder.respond_with_error(error)?;
1253                    return Err(invalid_request());
1254                }
1255            };
1256
1257        tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1258
1259        // Instantiate proxies (no agent in proxy mode)
1260        let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1261
1262        // In proxy mode, our successor is the outer conductor (via our client connection)
1263        responder.successor = Arc::new(GrandSuccessor);
1264
1265        // Spawn the proxy components
1266        responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1267
1268        Ok(Dispatch::Request(
1269            modified_req.to_untyped_message()?,
1270            init_responder,
1271        ))
1272    }
1273
1274    async fn handle_dispatch(
1275        &self,
1276        message: Dispatch,
1277        client_connection: ConnectionTo<Conductor>,
1278        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1279    ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1280        tracing::debug!(
1281            method = ?message.method(),
1282            ?message,
1283            "ConductorToConductor::handle_dispatch"
1284        );
1285        MatchDispatchFrom::new(message, &client_connection)
1286            .if_message_from(Agent, {
1287                // Messages from our successor arrive already unwrapped
1288                // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1289                async |message: Dispatch| {
1290                    tracing::debug!(
1291                        method = ?message.method(),
1292                        "ConductorToConductor::handle_dispatch - matched Agent"
1293                    );
1294                    let mut conductor_tx = conductor_tx.clone();
1295                    ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
1296                        .await
1297                }
1298            })
1299            .await
1300            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1301            .if_message_from(Client, async |message: Dispatch| {
1302                tracing::debug!(
1303                    method = ?message.method(),
1304                    "ConductorToConductor::handle_dispatch - matched Client"
1305                );
1306                let mut conductor_tx = conductor_tx.clone();
1307                ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
1308                    .await
1309            })
1310            .await
1311            .done()
1312    }
1313}
1314
1315pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
1316    /// Send a message to the successor.
1317    fn send_message<'a>(
1318        &self,
1319        message: Dispatch,
1320        connection_to_conductor: ConnectionTo<Host::Counterpart>,
1321        responder: &'a mut ConductorResponder<Host>,
1322    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
1323}
1324
1325impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
1326    fn send_message<'a>(
1327        &self,
1328        #[expect(unused_variables)] message: Dispatch,
1329        #[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
1330        #[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
1331    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1332        let error = self.clone();
1333        Box::pin(std::future::ready(Err(error)))
1334    }
1335}
1336
1337/// A dummy type handling messages sent to the conductor's
1338/// successor when it is acting as a proxy.
1339struct GrandSuccessor;
1340
1341/// When the conductor is acting as an proxy, messages sent by
1342/// the last proxy go to the conductor's successor.
1343///
1344/// ```text
1345/// client --> Conductor -----------------------------> GrandSuccessor
1346///            |                                  |
1347///            +-> Proxy[0] -> ... -> Proxy[n-1] -+
1348/// ```
1349impl ConductorSuccessor<Proxy> for GrandSuccessor {
1350    fn send_message<'a>(
1351        &self,
1352        message: Dispatch,
1353        connection: ConnectionTo<Conductor>,
1354        _responder: &'a mut ConductorResponder<Proxy>,
1355    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1356        Box::pin(async move {
1357            debug!("Proxy mode: forwarding successor message to conductor's successor");
1358            connection.send_proxied_message_to(Agent, message)
1359        })
1360    }
1361}
1362
1363/// When the conductor is acting as an agent, messages sent by
1364/// the last proxy to its successor go to the internal agent
1365/// (`self`).
1366impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
1367    fn send_message<'a>(
1368        &self,
1369        message: Dispatch,
1370        connection: ConnectionTo<Client>,
1371        responder: &'a mut ConductorResponder<Agent>,
1372    ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1373        let connection_to_agent = self.clone();
1374        Box::pin(async move {
1375            debug!("Proxy mode: forwarding successor message to conductor's successor");
1376            responder
1377                .forward_message_to_agent(connection, message, connection_to_agent)
1378                .await
1379        })
1380    }
1381}