agent_client_protocol_conductor/conductor.rs
1//! # Conductor: ACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//! requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//! to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//! "my-conductor",
73//! vec![proxy1, proxy2, agent],
74//! None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//! "my-conductor",
87//! |cx, conductor_tx, init_req| async move {
88//! // Examine capabilities
89//! let needs_auth = has_auth_capability(&init_req);
90//!
91//! let mut components = Vec::new();
92//! if needs_auth {
93//! components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//! }
95//! components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//! // Return (potentially modified) request and component list
98//! Ok((init_req, components))
99//! },
100//! None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &ConnectionTo` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<ConnectionTo>` of spawned components
112
113use std::sync::Arc;
114
115use agent_client_protocol::{
116 Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
117 Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
118};
119use agent_client_protocol::{
120 Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest,
121};
122use agent_client_protocol::{
123 HandleDispatchFrom,
124 schema::{InitializeProxyRequest, InitializeRequest},
125 util::MatchDispatchFrom,
126};
127use agent_client_protocol::{Handled, schema::SuccessorMessage};
128use futures::{
129 SinkExt, StreamExt,
130 channel::mpsc::{self},
131};
132use tracing::{debug, info};
133
134/// The conductor manages the proxy chain lifecycle and message routing.
135///
136/// It maintains connections to all components in the chain and routes messages
137/// bidirectionally between the editor, components, and agent.
138///
139#[derive(Debug)]
140pub struct ConductorImpl<Host: ConductorHostRole> {
141 host: Host,
142 name: String,
143 instantiator: Host::Instantiator,
144 trace_writer: Option<crate::trace::TraceWriter>,
145}
146
147impl<Host: ConductorHostRole> ConductorImpl<Host> {
148 pub fn new(host: Host, name: impl ToString, instantiator: Host::Instantiator) -> Self {
149 ConductorImpl {
150 name: name.to_string(),
151 host,
152 instantiator,
153 trace_writer: None,
154 }
155 }
156}
157
158impl ConductorImpl<Agent> {
159 /// Create a conductor in agent mode (the last component is an agent).
160 pub fn new_agent(
161 name: impl ToString,
162 instantiator: impl InstantiateProxiesAndAgent + 'static,
163 ) -> Self {
164 ConductorImpl::new(Agent, name, Box::new(instantiator))
165 }
166}
167
168impl ConductorImpl<Proxy> {
169 /// Create a conductor in proxy mode (forwards to another conductor).
170 pub fn new_proxy(name: impl ToString, instantiator: impl InstantiateProxies + 'static) -> Self {
171 ConductorImpl::new(Proxy, name, Box::new(instantiator))
172 }
173}
174
175impl<Host: ConductorHostRole> ConductorImpl<Host> {
176 /// Enable trace logging to a custom destination.
177 ///
178 /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
179 #[must_use]
180 pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
181 self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
182 self
183 }
184
185 /// Enable trace logging to a file path.
186 ///
187 /// Events will be written as newline-delimited JSON (`.jsons` format).
188 /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
189 pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
190 self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
191 Ok(self)
192 }
193
194 /// Enable trace logging with an existing TraceWriter.
195 #[must_use]
196 pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
197 self.trace_writer = Some(writer);
198 self
199 }
200
201 /// Run the conductor with a transport.
202 pub async fn run(
203 self,
204 transport: impl ConnectTo<Host>,
205 ) -> Result<(), agent_client_protocol::Error> {
206 let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
207
208 // Set up tracing if enabled - spawn writer task and get handle
209 let trace_handle;
210 let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
211 if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
212 trace_handle = Some(h);
213 trace_future = Box::pin(f);
214 } else {
215 trace_handle = None;
216 trace_future = Box::pin(std::future::ready(Ok(())));
217 }
218
219 let responder = ConductorResponder {
220 conductor_rx,
221 conductor_tx: conductor_tx.clone(),
222 instantiator: Some(self.instantiator),
223 proxies: Vec::default(),
224 successor: Arc::new(agent_client_protocol::util::internal_error(
225 "successor not initialized",
226 )),
227 trace_handle,
228 host: self.host.clone(),
229 };
230
231 Builder::new_with(
232 self.host.clone(),
233 ConductorMessageHandler {
234 conductor_tx,
235 host: self.host.clone(),
236 },
237 )
238 .name(self.name)
239 .with_responder(responder)
240 .with_spawned(|_cx| trace_future)
241 .connect_to(transport)
242 .await
243 }
244
245 async fn incoming_message_from_client(
246 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
247 message: Dispatch,
248 ) -> Result<(), agent_client_protocol::Error> {
249 conductor_tx
250 .send(ConductorMessage::LeftToRight {
251 target_component_index: 0,
252 message,
253 })
254 .await
255 .map_err(agent_client_protocol::util::internal_error)
256 }
257
258 async fn incoming_message_from_agent(
259 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
260 message: Dispatch,
261 ) -> Result<(), agent_client_protocol::Error> {
262 conductor_tx
263 .send(ConductorMessage::RightToLeft {
264 source_component_index: SourceComponentIndex::Successor,
265 message,
266 })
267 .await
268 .map_err(agent_client_protocol::util::internal_error)
269 }
270}
271
272impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
273 async fn connect_to(
274 self,
275 client: impl ConnectTo<Host>,
276 ) -> Result<(), agent_client_protocol::Error> {
277 self.run(client).await
278 }
279}
280
281struct ConductorMessageHandler<Host: ConductorHostRole> {
282 conductor_tx: mpsc::Sender<ConductorMessage>,
283 host: Host,
284}
285
286impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
287 for ConductorMessageHandler<Host>
288{
289 async fn handle_dispatch_from(
290 &mut self,
291 message: Dispatch,
292 connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
293 ) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
294 self.host
295 .handle_dispatch(message, connection, &mut self.conductor_tx)
296 .await
297 }
298
299 fn describe_chain(&self) -> impl std::fmt::Debug {
300 "ConductorMessageHandler"
301 }
302}
303
304/// The conductor manages the proxy chain lifecycle and message routing.
305///
306/// It maintains connections to all components in the chain and routes messages
307/// bidirectionally between the editor, components, and agent.
308///
309pub struct ConductorResponder<Host>
310where
311 Host: ConductorHostRole,
312{
313 conductor_rx: mpsc::Receiver<ConductorMessage>,
314
315 conductor_tx: mpsc::Sender<ConductorMessage>,
316
317 /// The instantiator for lazy initialization.
318 /// Set to None after components are instantiated.
319 instantiator: Option<Host::Instantiator>,
320
321 /// The chain of proxies before the agent (if any).
322 ///
323 /// Populated lazily when the first Initialize request is received.
324 proxies: Vec<ConnectionTo<Proxy>>,
325
326 /// If the conductor is operating in agent mode, this will direct messages to the agent.
327 /// If the conductor is operating in proxy mode, this will direct messages to the successor.
328 /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
329 successor: Arc<dyn ConductorSuccessor<Host>>,
330
331 /// Optional trace handle for sequence diagram visualization.
332 trace_handle: Option<crate::trace::TraceHandle>,
333
334 /// Defines what sort of link we have
335 host: Host,
336}
337
338impl<Host> std::fmt::Debug for ConductorResponder<Host>
339where
340 Host: ConductorHostRole,
341{
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 f.debug_struct("ConductorResponder")
344 .field("conductor_rx", &self.conductor_rx)
345 .field("conductor_tx", &self.conductor_tx)
346 .field("proxies", &self.proxies)
347 .field("trace_handle", &self.trace_handle)
348 .field("host", &self.host)
349 .finish_non_exhaustive()
350 }
351}
352
353impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
354where
355 Host: ConductorHostRole,
356{
357 async fn run_with_connection_to(
358 mut self,
359 connection: ConnectionTo<Host::Counterpart>,
360 ) -> Result<(), agent_client_protocol::Error> {
361 // Components are now spawned lazily in forward_initialize_request
362 // when the first Initialize request is received.
363
364 // This is the "central actor" of the conductor. Most other things forward messages
365 // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
366 while let Some(message) = self.conductor_rx.next().await {
367 self.handle_conductor_message(connection.clone(), message)
368 .await?;
369 }
370 Ok(())
371 }
372}
373
374impl<Host> ConductorResponder<Host>
375where
376 Host: ConductorHostRole,
377{
378 /// Recursively spawns components and builds the proxy chain.
379 ///
380 /// This function implements the recursive chain building pattern:
381 /// 1. Pop the next component from the `providers` list
382 /// 2. Create the component (either spawn subprocess or use mock)
383 /// 3. Set up JSON-RPC connection and message handlers
384 /// 4. Recursively call itself to spawn the next component
385 /// 5. When no components remain, start the message routing loop via `serve()`
386 ///
387 /// Central message handling logic for the conductor.
388 /// The conductor routes all [`ConductorMessage`] messages through to this function.
389 /// Each message corresponds to a request or notification from one component to another.
390 /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
391 /// Note that *responses to requests* are sent *directly* without going through this loop.
392 ///
393 /// The names we use are
394 ///
395 /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
396 /// * Then there is a sequence of *components* consisting of:
397 /// * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
398 /// * And finally the *agent*, which is the final component in the chain and handles the actual work.
399 ///
400 /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
401 ///
402 /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
403 /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
404 /// through a stdio server that runs on localhost and bridges messages.
405 async fn handle_conductor_message(
406 &mut self,
407 client: ConnectionTo<Host::Counterpart>,
408 message: ConductorMessage,
409 ) -> Result<(), agent_client_protocol::Error> {
410 tracing::debug!(?message, "handle_conductor_message");
411
412 match message {
413 ConductorMessage::LeftToRight {
414 target_component_index,
415 message,
416 } => {
417 // Tracing happens inside forward_client_to_agent_message, after initialization,
418 // so that component_name() has access to the populated proxies list.
419 self.forward_client_to_agent_message(target_component_index, message, client)
420 .await
421 }
422
423 ConductorMessage::RightToLeft {
424 source_component_index,
425 message,
426 } => {
427 tracing::debug!(
428 ?source_component_index,
429 message_method = ?message.method(),
430 "Conductor: AgentToClient received"
431 );
432 self.send_message_to_predecessor_of(client, source_component_index, message)
433 }
434 }
435 }
436
437 /// Send a message (request or notification) to the predecessor of the given component.
438 ///
439 /// This is a bit subtle because the relationship of the conductor
440 /// is different depending on who will be receiving the message:
441 /// * If the message is going to the conductor's client, then no changes
442 /// are needed, as the conductor is sending an agent-to-client message and
443 /// the conductor is acting as the agent.
444 /// * If the message is going to a proxy component, then we have to wrap
445 /// it in a "from successor" wrapper, because the conductor is the
446 /// proxy's client.
447 fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
448 &mut self,
449 client: ConnectionTo<Host::Counterpart>,
450 source_component_index: SourceComponentIndex,
451 message: Dispatch<Req, N>,
452 ) -> Result<(), agent_client_protocol::Error>
453 where
454 Req::Response: Send,
455 {
456 let source_component_index = match source_component_index {
457 SourceComponentIndex::Successor => self.proxies.len(),
458 SourceComponentIndex::Proxy(index) => index,
459 };
460
461 match message {
462 Dispatch::Request(request, responder) => self
463 .send_request_to_predecessor_of(client, source_component_index, request)
464 .forward_response_to(responder),
465 Dispatch::Notification(notification) => self.send_notification_to_predecessor_of(
466 client,
467 source_component_index,
468 notification,
469 ),
470 Dispatch::Response(result, router) => router.respond_with_result(result),
471 }
472 }
473
474 fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
475 &mut self,
476 client_connection: ConnectionTo<Host::Counterpart>,
477 source_component_index: usize,
478 request: Req,
479 ) -> SentRequest<Req::Response> {
480 if source_component_index == 0 {
481 client_connection.send_request_to(Client, request)
482 } else {
483 self.proxies[source_component_index - 1].send_request(SuccessorMessage {
484 message: request,
485 meta: None,
486 })
487 }
488 }
489
490 /// Send a notification to the predecessor of the given component.
491 ///
492 /// This is a bit subtle because the relationship of the conductor
493 /// is different depending on who will be receiving the message:
494 /// * If the notification is going to the conductor's client, then no changes
495 /// are needed, as the conductor is sending an agent-to-client message and
496 /// the conductor is acting as the agent.
497 /// * If the notification is going to a proxy component, then we have to wrap
498 /// it in a "from successor" wrapper, because the conductor is the
499 /// proxy's client.
500 fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
501 &mut self,
502 client: ConnectionTo<Host::Counterpart>,
503 source_component_index: usize,
504 notification: N,
505 ) -> Result<(), agent_client_protocol::Error> {
506 tracing::debug!(
507 source_component_index,
508 proxies_len = self.proxies.len(),
509 "send_notification_to_predecessor_of"
510 );
511 if source_component_index == 0 {
512 tracing::debug!("Sending notification directly to client");
513 client.send_notification_to(Client, notification)
514 } else {
515 tracing::debug!(
516 target_proxy = source_component_index - 1,
517 "Sending notification wrapped as SuccessorMessage to proxy"
518 );
519 self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
520 message: notification,
521 meta: None,
522 })
523 }
524 }
525
526 /// Send a message (request or notification) from 'left to right'.
527 /// Left-to-right means from the client or an intermediate proxy to the component
528 /// at `target_component_index` (could be a proxy or the agent).
529 /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
530 async fn forward_client_to_agent_message(
531 &mut self,
532 target_component_index: usize,
533 message: Dispatch,
534 client: ConnectionTo<Host::Counterpart>,
535 ) -> Result<(), agent_client_protocol::Error> {
536 tracing::trace!(
537 target_component_index,
538 ?message,
539 "forward_client_to_agent_message"
540 );
541
542 // Ensure components are initialized before processing any message.
543 let message = self.ensure_initialized(client.clone(), message).await?;
544
545 // In proxy mode, if the target is beyond our component chain,
546 // forward to the conductor's own successor (via client connection)
547 if target_component_index < self.proxies.len() {
548 self.forward_message_from_client_to_proxy(target_component_index, message)
549 .await
550 } else {
551 assert_eq!(target_component_index, self.proxies.len());
552
553 debug!(
554 target_component_index,
555 proxies_count = self.proxies.len(),
556 "Proxy mode: forwarding successor message to conductor's successor"
557 );
558 let successor = self.successor.clone();
559 successor.send_message(message, client, self).await
560 }
561 }
562
563 /// Ensures components are initialized before processing messages.
564 ///
565 /// If components haven't been initialized yet, this expects the first message
566 /// to be an `initialize` request and uses it to spawn the component chain.
567 ///
568 /// Returns:
569 /// - `Ok(Some(message))` - Components are initialized, continue processing this message
570 /// - `Ok(None)` - An error response was sent, caller should return early
571 /// - `Err(_)` - A fatal error occurred
572 async fn ensure_initialized(
573 &mut self,
574 client: ConnectionTo<Host::Counterpart>,
575 message: Dispatch,
576 ) -> Result<Dispatch, Error> {
577 // Already initialized - pass through
578 let Some(instantiator) = self.instantiator.take() else {
579 return Ok(message);
580 };
581
582 let host = self.host.clone();
583 let message = host.initialize(message, client, instantiator, self).await?;
584 Ok(message)
585 }
586
587 /// Wrap a proxy component with tracing if tracing is enabled.
588 ///
589 /// Returns the component unchanged if tracing is disabled.
590 fn trace_proxy(
591 &self,
592 proxy_index: ComponentIndex,
593 successor_index: ComponentIndex,
594 component: impl ConnectTo<Conductor>,
595 ) -> DynConnectTo<Conductor> {
596 match &self.trace_handle {
597 Some(trace_handle) => {
598 trace_handle.bridge_component(proxy_index, successor_index, component)
599 }
600 None => DynConnectTo::new(component),
601 }
602 }
603
604 /// Spawn proxy components and add them to the proxies list.
605 fn spawn_proxies(
606 &mut self,
607 client: ConnectionTo<Host::Counterpart>,
608 proxy_components: Vec<DynConnectTo<Conductor>>,
609 ) -> Result<(), agent_client_protocol::Error> {
610 assert!(self.proxies.is_empty());
611
612 let num_proxies = proxy_components.len();
613 info!(proxy_count = num_proxies, "spawn_proxies");
614
615 // Special case: if there are no user-defined proxies
616 // but tracing is enabled, we make a dummy proxy that just
617 // passes through messages but which can trigger the
618 // tracing events.
619 if self.trace_handle.is_some() && num_proxies == 0 {
620 self.connect_to_proxy(
621 &client,
622 0,
623 ComponentIndex::Client,
624 ComponentIndex::Agent,
625 Proxy.builder(),
626 )?;
627 } else {
628 // Spawn each proxy component
629 for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
630 debug!(component_index, "spawning proxy");
631
632 self.connect_to_proxy(
633 &client,
634 component_index,
635 ComponentIndex::Proxy(component_index),
636 ComponentIndex::successor_of(component_index, num_proxies),
637 dyn_component,
638 )?;
639 }
640 }
641
642 info!(proxy_count = self.proxies.len(), "Proxies spawned");
643
644 Ok(())
645 }
646
647 /// Create a connection to the proxy with index `component_index` implemented in `component`.
648 ///
649 /// If tracing is enabled, the proxy's index is `trace_proxy_index` and its successor is `trace_successor_index`.
650 fn connect_to_proxy(
651 &mut self,
652 client: &ConnectionTo<Host::Counterpart>,
653 component_index: usize,
654 trace_proxy_index: ComponentIndex,
655 trace_successor_index: ComponentIndex,
656 component: impl ConnectTo<Conductor>,
657 ) -> Result<(), Error> {
658 let connection_builder = self.connection_to_proxy(component_index);
659 let connect_component =
660 self.trace_proxy(trace_proxy_index, trace_successor_index, component);
661 let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
662 self.proxies.push(proxy_connection);
663 Ok(())
664 }
665
666 /// Create the conductor's connection to the proxy with index `component_index`.
667 ///
668 /// Outgoing messages received from the proxy are sent to `self.conductor_tx` as either
669 /// left-to-right or right-to-left messages depending on whether they are wrapped
670 /// in `SuccessorMessage`.
671 fn connection_to_proxy(
672 &mut self,
673 component_index: usize,
674 ) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
675 type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
676 let mut conductor_tx = self.conductor_tx.clone();
677 Conductor
678 .builder()
679 .name(format!("conductor-to-component({component_index})"))
680 // Intercept messages sent by the proxy.
681 .on_receive_dispatch(
682 async move |dispatch: Dispatch, _connection| {
683 MatchDispatch::new(dispatch)
684 .if_message(async |dispatch: SuccessorDispatch| {
685 // ------------------
686 // SuccessorMessages sent by the proxy go to its successor.
687 //
688 // Subtle point:
689 //
690 // `ConductorToProxy` has only a single peer, `Agent`. This means that we see
691 // "successor messages" in their "desugared form". So when we intercept an *outgoing*
692 // message that matches `SuccessorMessage`, it could be one of three things
693 //
694 // - A request being sent by the proxy to its successor (hence going left->right)
695 // - A notification being sent by the proxy to its successor (hence going left->right)
696 // - A response to a request sent to the proxy *by* its successor. Here, the *request*
697 // was going right->left, but the *response* (the message we are processing now)
698 // is going left->right.
699 //
700 // So, in all cases, we forward as a left->right message.
701
702 conductor_tx
703 .send(ConductorMessage::LeftToRight {
704 target_component_index: component_index + 1,
705 message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
706 })
707 .await
708 .map_err(agent_client_protocol::util::internal_error)
709 })
710 .await
711 .otherwise(async |dispatch| {
712 // Other messagrs send by the proxy go its predecessor.
713 // As in the previous handler:
714 //
715 // Messages here are seen in their "desugared form", so we are seeing
716 // one of three things
717 //
718 // - A request being sent by the proxy to its predecessor (hence going right->left)
719 // - A notification being sent by the proxy to its predecessor (hence going right->left)
720 // - A response to a request sent to the proxy *by* its predecessor. Here, the *request*
721 // was going left->right, but the *response* (the message we are processing now)
722 // is going right->left.
723 //
724 // So, in all cases, we forward as a right->left message.
725
726 let message = ConductorMessage::RightToLeft {
727 source_component_index: SourceComponentIndex::Proxy(
728 component_index,
729 ),
730 message: dispatch,
731 };
732 conductor_tx
733 .send(message)
734 .await
735 .map_err(agent_client_protocol::util::internal_error)
736 })
737 .await
738 },
739 agent_client_protocol::on_receive_dispatch!(),
740 )
741 }
742
743 async fn forward_message_from_client_to_proxy(
744 &mut self,
745 target_component_index: usize,
746 message: Dispatch,
747 ) -> Result<(), agent_client_protocol::Error> {
748 tracing::debug!(?message, "forward_message_to_proxy");
749
750 MatchDispatch::new(message)
751 .if_request(async |_request: InitializeProxyRequest, responder| {
752 responder.respond_with_error(
753 agent_client_protocol::Error::invalid_request()
754 .data("initialize/proxy requests are only sent by the conductor"),
755 )
756 })
757 .await
758 .if_request(async |request: InitializeRequest, responder| {
759 // The pattern for `Initialize` messages is a bit subtle.
760 // Proxy receive incoming `Initialize` messages as if they
761 // were a client. The conductor (us) intercepts these and
762 // converts them to an `InitializeProxyRequest`.
763 //
764 // The proxy will then initialize itself and forward an `Initialize`
765 // request to its successor.
766 self.proxies[target_component_index]
767 .send_request(InitializeProxyRequest::from(request))
768 .on_receiving_result(async move |result| {
769 tracing::debug!(?result, "got initialize_proxy response from proxy");
770 responder.respond_with_result(result)
771 })
772 })
773 .await
774 .otherwise(async |message| {
775 // Otherwise, just send the message along "as is".
776 self.proxies[target_component_index].send_proxied_message(message)
777 })
778 .await
779 }
780
781 /// Invoked when sending a message from the conductor to the agent that it manages.
782 /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
783 /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
784 /// running as a proxy).
785 async fn forward_message_to_agent(
786 &mut self,
787 _client_connection: ConnectionTo<Host::Counterpart>,
788 message: Dispatch,
789 agent_connection: ConnectionTo<Agent>,
790 ) -> Result<(), Error> {
791 MatchDispatch::new(message)
792 .if_request(async |_request: InitializeProxyRequest, responder| {
793 responder.respond_with_error(
794 agent_client_protocol::Error::invalid_request()
795 .data("initialize/proxy requests are only sent by the conductor"),
796 )
797 })
798 .await
799 .otherwise(async |message| {
800 // Forward all other messages to the agent as-is.
801 agent_connection.send_proxied_message_to(Agent, message)
802 })
803 .await
804 }
805}
806
807/// Identifies a component in the conductor's chain for tracing purposes.
808///
809/// Used to track message sources and destinations through the proxy chain.
810#[derive(Debug, Clone, Copy)]
811pub enum ComponentIndex {
812 /// The client (editor) at the start of the chain.
813 Client,
814
815 /// A proxy component at the given index.
816 Proxy(usize),
817
818 /// The successor (agent in agent mode, outer conductor in proxy mode).
819 Agent,
820}
821
822impl ComponentIndex {
823 /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
824 #[must_use]
825 pub fn predecessor_of(proxy_index: usize) -> Self {
826 match proxy_index.checked_sub(1) {
827 Some(p_i) => ComponentIndex::Proxy(p_i),
828 None => ComponentIndex::Client,
829 }
830 }
831
832 /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
833 #[must_use]
834 pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
835 if proxy_index == num_proxies {
836 ComponentIndex::Agent
837 } else {
838 ComponentIndex::Proxy(proxy_index + 1)
839 }
840 }
841}
842
843/// Identifies the source of an agent-to-client message.
844///
845/// This enum handles the fact that the conductor may receive messages from two different sources:
846/// 1. From one of its managed components (identified by index)
847/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
848#[derive(Debug, Clone, Copy)]
849pub enum SourceComponentIndex {
850 /// Message from a specific component at the given index in the managed chain.
851 Proxy(usize),
852
853 /// Message from the conductor's agent or successor.
854 Successor,
855}
856
857/// Trait for lazy proxy instantiation (proxy mode).
858///
859/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
860/// are proxies that forward to an outer conductor.
861pub trait InstantiateProxies: Send {
862 /// Instantiate proxy components based on the Initialize request.
863 ///
864 /// Returns proxy components typed as `DynConnectTo<Conductor>` since proxies
865 /// communicate with the conductor.
866 fn instantiate_proxies(
867 self: Box<Self>,
868 req: InitializeRequest,
869 ) -> futures::future::BoxFuture<
870 'static,
871 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
872 >;
873}
874
875/// Simple implementation: provide all proxy components unconditionally.
876///
877/// Requires `T: ConnectTo<Conductor>`.
878impl<T> InstantiateProxies for Vec<T>
879where
880 T: ConnectTo<Conductor> + 'static,
881{
882 fn instantiate_proxies(
883 self: Box<Self>,
884 req: InitializeRequest,
885 ) -> futures::future::BoxFuture<
886 'static,
887 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
888 > {
889 Box::pin(async move {
890 let components: Vec<DynConnectTo<Conductor>> =
891 (*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
892 Ok((req, components))
893 })
894 }
895}
896
897/// Dynamic implementation: closure receives the Initialize request and returns proxies.
898impl<F, Fut> InstantiateProxies for F
899where
900 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
901 Fut: std::future::Future<
902 Output = Result<
903 (InitializeRequest, Vec<DynConnectTo<Conductor>>),
904 agent_client_protocol::Error,
905 >,
906 > + Send
907 + 'static,
908{
909 fn instantiate_proxies(
910 self: Box<Self>,
911 req: InitializeRequest,
912 ) -> futures::future::BoxFuture<
913 'static,
914 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
915 > {
916 Box::pin(async move { (*self)(req).await })
917 }
918}
919
920/// Trait for lazy proxy and agent instantiation (agent mode).
921///
922/// Used by conductors in agent mode (`ConductorToClient`) where there are
923/// zero or more proxies followed by an agent component.
924pub trait InstantiateProxiesAndAgent: Send {
925 /// Instantiate proxy and agent components based on the Initialize request.
926 ///
927 /// Returns the (possibly modified) request, a vector of proxy components
928 /// (typed as `DynConnectTo<Conductor>`), and the agent component
929 /// (typed as `DynConnectTo<Client>`).
930 fn instantiate_proxies_and_agent(
931 self: Box<Self>,
932 req: InitializeRequest,
933 ) -> futures::future::BoxFuture<
934 'static,
935 Result<
936 (
937 InitializeRequest,
938 Vec<DynConnectTo<Conductor>>,
939 DynConnectTo<Client>,
940 ),
941 agent_client_protocol::Error,
942 >,
943 >;
944}
945
946/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
947#[derive(Debug)]
948pub struct AgentOnly<A>(pub A);
949
950impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
951 fn instantiate_proxies_and_agent(
952 self: Box<Self>,
953 req: InitializeRequest,
954 ) -> futures::future::BoxFuture<
955 'static,
956 Result<
957 (
958 InitializeRequest,
959 Vec<DynConnectTo<Conductor>>,
960 DynConnectTo<Client>,
961 ),
962 agent_client_protocol::Error,
963 >,
964 > {
965 Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
966 }
967}
968
969/// Builder for creating proxies and agent components.
970///
971/// # Example
972/// ```ignore
973/// ProxiesAndAgent::new(ElizaAgent::new())
974/// .proxy(LoggingProxy::new())
975/// .proxy(AuthProxy::new())
976/// ```
977#[derive(Debug)]
978pub struct ProxiesAndAgent {
979 proxies: Vec<DynConnectTo<Conductor>>,
980 agent: DynConnectTo<Client>,
981}
982
983impl ProxiesAndAgent {
984 /// Create a new builder with the given agent component.
985 pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
986 Self {
987 proxies: vec![],
988 agent: DynConnectTo::new(agent),
989 }
990 }
991
992 /// Add a single proxy component.
993 #[must_use]
994 pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
995 self.proxies.push(DynConnectTo::new(proxy));
996 self
997 }
998
999 /// Add multiple proxy components.
1000 #[must_use]
1001 pub fn proxies<P, I>(mut self, proxies: I) -> Self
1002 where
1003 P: ConnectTo<Conductor> + 'static,
1004 I: IntoIterator<Item = P>,
1005 {
1006 self.proxies
1007 .extend(proxies.into_iter().map(DynConnectTo::new));
1008 self
1009 }
1010}
1011
1012impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1013 fn instantiate_proxies_and_agent(
1014 self: Box<Self>,
1015 req: InitializeRequest,
1016 ) -> futures::future::BoxFuture<
1017 'static,
1018 Result<
1019 (
1020 InitializeRequest,
1021 Vec<DynConnectTo<Conductor>>,
1022 DynConnectTo<Client>,
1023 ),
1024 agent_client_protocol::Error,
1025 >,
1026 > {
1027 Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1028 }
1029}
1030
1031/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1032impl<F, Fut> InstantiateProxiesAndAgent for F
1033where
1034 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1035 Fut: std::future::Future<
1036 Output = Result<
1037 (
1038 InitializeRequest,
1039 Vec<DynConnectTo<Conductor>>,
1040 DynConnectTo<Client>,
1041 ),
1042 agent_client_protocol::Error,
1043 >,
1044 > + Send
1045 + 'static,
1046{
1047 fn instantiate_proxies_and_agent(
1048 self: Box<Self>,
1049 req: InitializeRequest,
1050 ) -> futures::future::BoxFuture<
1051 'static,
1052 Result<
1053 (
1054 InitializeRequest,
1055 Vec<DynConnectTo<Conductor>>,
1056 DynConnectTo<Client>,
1057 ),
1058 agent_client_protocol::Error,
1059 >,
1060 > {
1061 Box::pin(async move { (*self)(req).await })
1062 }
1063}
1064
1065/// Messages sent to the conductor's main event loop for routing.
1066///
1067/// These messages enable the conductor to route communication between:
1068/// - The editor and the first component
1069/// - Components and their successors in the chain
1070/// - Components and their clients (editor or predecessor)
1071///
1072/// All spawned tasks send messages via this enum through a shared channel,
1073/// allowing centralized routing logic in the `serve()` loop.
1074#[derive(Debug)]
1075pub enum ConductorMessage {
1076 /// If this message is a request or notification, then it is going "left-to-right"
1077 /// (e.g., a component making a request of its successor).
1078 ///
1079 /// If this message is a response, then it is going right-to-left
1080 /// (i.e., the successor answering a request made by its predecessor).
1081 LeftToRight {
1082 target_component_index: usize,
1083 message: Dispatch,
1084 },
1085
1086 /// If this message is a request or notification, then it is going "right-to-left"
1087 /// (e.g., a component making a request of its predecessor).
1088 ///
1089 /// If this message is a response, then it is going "left-to-right"
1090 /// (i.e., the predecessor answering a request made by its successor).
1091 RightToLeft {
1092 source_component_index: SourceComponentIndex,
1093 message: Dispatch,
1094 },
1095}
1096
1097/// Trait implemented for the two links the conductor can use:
1098///
1099/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1100/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1101pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
1102 /// The type used to instantiate components for this link type.
1103 type Instantiator: Send;
1104
1105 /// Handle initialization: parse the init request, instantiate components, and spawn them.
1106 ///
1107 /// Takes ownership of the instantiator and returns the (possibly modified) init request
1108 /// wrapped in a Dispatch for forwarding.
1109 fn initialize(
1110 &self,
1111 message: Dispatch,
1112 connection: ConnectionTo<Self::Counterpart>,
1113 instantiator: Self::Instantiator,
1114 responder: &mut ConductorResponder<Self>,
1115 ) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
1116
1117 /// Handle an incoming message from the client or conductor, depending on `Self`
1118 fn handle_dispatch(
1119 &self,
1120 message: Dispatch,
1121 connection: ConnectionTo<Self::Counterpart>,
1122 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1123 ) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
1124}
1125
1126/// Conductor acting as an agent
1127impl ConductorHostRole for Agent {
1128 type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1129
1130 async fn initialize(
1131 &self,
1132 message: Dispatch,
1133 client_connection: ConnectionTo<Client>,
1134 instantiator: Self::Instantiator,
1135 responder: &mut ConductorResponder<Self>,
1136 ) -> Result<Dispatch, agent_client_protocol::Error> {
1137 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1138
1139 // Not yet initialized - expect an initialize request.
1140 // Error if we get anything else.
1141 let Dispatch::Request(request, init_responder) = message else {
1142 message.respond_with_error(invalid_request(), client_connection.clone())?;
1143 return Err(invalid_request());
1144 };
1145 if !InitializeRequest::matches_method(request.method()) {
1146 init_responder.respond_with_error(invalid_request())?;
1147 return Err(invalid_request());
1148 }
1149
1150 let init_request =
1151 match InitializeRequest::parse_message(request.method(), request.params()) {
1152 Ok(r) => r,
1153 Err(error) => {
1154 init_responder.respond_with_error(error)?;
1155 return Err(invalid_request());
1156 }
1157 };
1158
1159 // Instantiate proxies and agent
1160 let (modified_req, proxy_components, agent_component) = instantiator
1161 .instantiate_proxies_and_agent(init_request)
1162 .await?;
1163
1164 // Spawn the agent component
1165 debug!(?agent_component, "spawning agent");
1166
1167 let connection_to_agent = client_connection.spawn_connection(
1168 Client
1169 .builder()
1170 .name("conductor-to-agent")
1171 // Intercept agent-to-client messages from the agent.
1172 .on_receive_dispatch(
1173 {
1174 let mut conductor_tx = responder.conductor_tx.clone();
1175 async move |dispatch: Dispatch, _cx| {
1176 conductor_tx
1177 .send(ConductorMessage::RightToLeft {
1178 source_component_index: SourceComponentIndex::Successor,
1179 message: dispatch,
1180 })
1181 .await
1182 .map_err(agent_client_protocol::util::internal_error)
1183 }
1184 },
1185 agent_client_protocol::on_receive_dispatch!(),
1186 ),
1187 agent_component,
1188 )?;
1189 responder.successor = Arc::new(connection_to_agent);
1190
1191 // Spawn the proxy components
1192 responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1193
1194 Ok(Dispatch::Request(
1195 modified_req.to_untyped_message()?,
1196 init_responder,
1197 ))
1198 }
1199
1200 async fn handle_dispatch(
1201 &self,
1202 message: Dispatch,
1203 client_connection: ConnectionTo<Client>,
1204 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1205 ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1206 tracing::debug!(
1207 method = ?message.method(),
1208 "ConductorToClient::handle_dispatch"
1209 );
1210 MatchDispatchFrom::new(message, &client_connection)
1211 // Any incoming messages from the client are client-to-agent messages targeting the first component.
1212 .if_message_from(Client, async move |message: Dispatch| {
1213 tracing::debug!(
1214 method = ?message.method(),
1215 "ConductorToClient::handle_dispatch - matched Client"
1216 );
1217 ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
1218 })
1219 .await
1220 .done()
1221 }
1222}
1223
1224/// Conductor acting as a proxy
1225impl ConductorHostRole for Proxy {
1226 type Instantiator = Box<dyn InstantiateProxies>;
1227
1228 async fn initialize(
1229 &self,
1230 message: Dispatch,
1231 client_connection: ConnectionTo<Conductor>,
1232 instantiator: Self::Instantiator,
1233 responder: &mut ConductorResponder<Self>,
1234 ) -> Result<Dispatch, agent_client_protocol::Error> {
1235 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1236
1237 // Not yet initialized - expect an InitializeProxy request.
1238 // Error if we get anything else.
1239 let Dispatch::Request(request, init_responder) = message else {
1240 message.respond_with_error(invalid_request(), client_connection.clone())?;
1241 return Err(invalid_request());
1242 };
1243 if !InitializeProxyRequest::matches_method(request.method()) {
1244 init_responder.respond_with_error(invalid_request())?;
1245 return Err(invalid_request());
1246 }
1247
1248 let InitializeProxyRequest { initialize } =
1249 match InitializeProxyRequest::parse_message(request.method(), request.params()) {
1250 Ok(r) => r,
1251 Err(error) => {
1252 init_responder.respond_with_error(error)?;
1253 return Err(invalid_request());
1254 }
1255 };
1256
1257 tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1258
1259 // Instantiate proxies (no agent in proxy mode)
1260 let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1261
1262 // In proxy mode, our successor is the outer conductor (via our client connection)
1263 responder.successor = Arc::new(GrandSuccessor);
1264
1265 // Spawn the proxy components
1266 responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1267
1268 Ok(Dispatch::Request(
1269 modified_req.to_untyped_message()?,
1270 init_responder,
1271 ))
1272 }
1273
1274 async fn handle_dispatch(
1275 &self,
1276 message: Dispatch,
1277 client_connection: ConnectionTo<Conductor>,
1278 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1279 ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1280 tracing::debug!(
1281 method = ?message.method(),
1282 ?message,
1283 "ConductorToConductor::handle_dispatch"
1284 );
1285 MatchDispatchFrom::new(message, &client_connection)
1286 .if_message_from(Agent, {
1287 // Messages from our successor arrive already unwrapped
1288 // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1289 async |message: Dispatch| {
1290 tracing::debug!(
1291 method = ?message.method(),
1292 "ConductorToConductor::handle_dispatch - matched Agent"
1293 );
1294 let mut conductor_tx = conductor_tx.clone();
1295 ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
1296 .await
1297 }
1298 })
1299 .await
1300 // Any incoming messages from the client are client-to-agent messages targeting the first component.
1301 .if_message_from(Client, async |message: Dispatch| {
1302 tracing::debug!(
1303 method = ?message.method(),
1304 "ConductorToConductor::handle_dispatch - matched Client"
1305 );
1306 let mut conductor_tx = conductor_tx.clone();
1307 ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
1308 .await
1309 })
1310 .await
1311 .done()
1312 }
1313}
1314
1315pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
1316 /// Send a message to the successor.
1317 fn send_message<'a>(
1318 &self,
1319 message: Dispatch,
1320 connection_to_conductor: ConnectionTo<Host::Counterpart>,
1321 responder: &'a mut ConductorResponder<Host>,
1322 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
1323}
1324
1325impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
1326 fn send_message<'a>(
1327 &self,
1328 #[expect(unused_variables)] message: Dispatch,
1329 #[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
1330 #[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
1331 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1332 let error = self.clone();
1333 Box::pin(std::future::ready(Err(error)))
1334 }
1335}
1336
1337/// A dummy type handling messages sent to the conductor's
1338/// successor when it is acting as a proxy.
1339struct GrandSuccessor;
1340
1341/// When the conductor is acting as an proxy, messages sent by
1342/// the last proxy go to the conductor's successor.
1343///
1344/// ```text
1345/// client --> Conductor -----------------------------> GrandSuccessor
1346/// | |
1347/// +-> Proxy[0] -> ... -> Proxy[n-1] -+
1348/// ```
1349impl ConductorSuccessor<Proxy> for GrandSuccessor {
1350 fn send_message<'a>(
1351 &self,
1352 message: Dispatch,
1353 connection: ConnectionTo<Conductor>,
1354 _responder: &'a mut ConductorResponder<Proxy>,
1355 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1356 Box::pin(async move {
1357 debug!("Proxy mode: forwarding successor message to conductor's successor");
1358 connection.send_proxied_message_to(Agent, message)
1359 })
1360 }
1361}
1362
1363/// When the conductor is acting as an agent, messages sent by
1364/// the last proxy to its successor go to the internal agent
1365/// (`self`).
1366impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
1367 fn send_message<'a>(
1368 &self,
1369 message: Dispatch,
1370 connection: ConnectionTo<Client>,
1371 responder: &'a mut ConductorResponder<Agent>,
1372 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1373 let connection_to_agent = self.clone();
1374 Box::pin(async move {
1375 debug!("Proxy mode: forwarding successor message to conductor's successor");
1376 responder
1377 .forward_message_to_agent(connection, message, connection_to_agent)
1378 .await
1379 })
1380 }
1381}