agent_client_protocol_conductor/conductor.rs
1//! # Conductor: ACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//! requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//! to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//! "my-conductor",
73//! vec![proxy1, proxy2, agent],
74//! None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//! "my-conductor",
87//! |cx, conductor_tx, init_req| async move {
88//! // Examine capabilities
89//! let needs_auth = has_auth_capability(&init_req);
90//!
91//! let mut components = Vec::new();
92//! if needs_auth {
93//! components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//! }
95//! components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//! // Return (potentially modified) request and component list
98//! Ok((init_req, components))
99//! },
100//! None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &ConnectionTo` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<ConnectionTo>` of spawned components
112
113use std::sync::Arc;
114
115use agent_client_protocol::{
116 Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
117 Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
118};
119use agent_client_protocol::{
120 Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest,
121};
122use agent_client_protocol::{
123 HandleDispatchFrom,
124 schema::{InitializeProxyRequest, v1::InitializeRequest},
125 util::MatchDispatchFrom,
126};
127use agent_client_protocol::{Handled, schema::SuccessorMessage};
128use futures::{
129 SinkExt, StreamExt,
130 channel::mpsc::{self},
131};
132use tracing::{debug, info};
133
134/// The conductor manages the proxy chain lifecycle and message routing.
135///
136/// It maintains connections to all components in the chain and routes messages
137/// bidirectionally between the editor, components, and agent.
138///
139#[derive(Debug)]
140pub struct ConductorImpl<Host: ConductorHostRole> {
141 host: Host,
142 name: String,
143 instantiator: Host::Instantiator,
144 trace_writer: Option<crate::trace::TraceWriter>,
145}
146
147impl<Host: ConductorHostRole> ConductorImpl<Host> {
148 pub fn new(host: Host, name: impl ToString, instantiator: Host::Instantiator) -> Self {
149 ConductorImpl {
150 name: name.to_string(),
151 host,
152 instantiator,
153 trace_writer: None,
154 }
155 }
156}
157
158impl ConductorImpl<Agent> {
159 /// Create a conductor in agent mode (the last component is an agent).
160 pub fn new_agent(
161 name: impl ToString,
162 instantiator: impl InstantiateProxiesAndAgent + 'static,
163 ) -> Self {
164 ConductorImpl::new(Agent, name, Box::new(instantiator))
165 }
166}
167
168impl ConductorImpl<Proxy> {
169 /// Create a conductor in proxy mode (forwards to another conductor).
170 pub fn new_proxy(name: impl ToString, instantiator: impl InstantiateProxies + 'static) -> Self {
171 ConductorImpl::new(Proxy, name, Box::new(instantiator))
172 }
173}
174
175impl<Host: ConductorHostRole> ConductorImpl<Host> {
176 /// Enable trace logging to a custom destination.
177 ///
178 /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
179 #[must_use]
180 pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
181 self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
182 self
183 }
184
185 /// Enable trace logging to a file path.
186 ///
187 /// Events will be written as newline-delimited JSON (`.jsons` format).
188 /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
189 pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
190 self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
191 Ok(self)
192 }
193
194 /// Enable trace logging with an existing TraceWriter.
195 #[must_use]
196 pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
197 self.trace_writer = Some(writer);
198 self
199 }
200
201 /// Run the conductor with a transport.
202 pub async fn run(
203 self,
204 transport: impl ConnectTo<Host>,
205 ) -> Result<(), agent_client_protocol::Error> {
206 let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
207
208 // Set up tracing if enabled - spawn writer task and get handle
209 let trace_handle;
210 let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
211 if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
212 trace_handle = Some(h);
213 trace_future = Box::pin(f);
214 } else {
215 trace_handle = None;
216 trace_future = Box::pin(std::future::ready(Ok(())));
217 }
218
219 let responder = ConductorResponder {
220 conductor_rx,
221 conductor_tx: conductor_tx.clone(),
222 instantiator: Some(self.instantiator),
223 proxies: Vec::default(),
224 successor: Arc::new(agent_client_protocol::util::internal_error(
225 "successor not initialized",
226 )),
227 trace_handle,
228 host: self.host.clone(),
229 };
230
231 Builder::new_with(
232 self.host.clone(),
233 ConductorMessageHandler {
234 conductor_tx,
235 host: self.host.clone(),
236 },
237 )
238 .name(self.name)
239 .with_responder(responder)
240 .with_spawned(|_cx| trace_future)
241 .connect_to(transport)
242 .await
243 }
244
245 async fn incoming_message_from_client(
246 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
247 message: Dispatch,
248 ) -> Result<(), agent_client_protocol::Error> {
249 conductor_tx
250 .send(ConductorMessage::LeftToRight {
251 target_component_index: 0,
252 message,
253 })
254 .await
255 .map_err(agent_client_protocol::util::internal_error)
256 }
257
258 async fn incoming_message_from_agent(
259 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
260 message: Dispatch,
261 ) -> Result<(), agent_client_protocol::Error> {
262 conductor_tx
263 .send(ConductorMessage::RightToLeft {
264 source_component_index: SourceComponentIndex::Successor,
265 message,
266 })
267 .await
268 .map_err(agent_client_protocol::util::internal_error)
269 }
270}
271
272impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
273 async fn connect_to(
274 self,
275 client: impl ConnectTo<Host>,
276 ) -> Result<(), agent_client_protocol::Error> {
277 self.run(client).await
278 }
279}
280
281struct ConductorMessageHandler<Host: ConductorHostRole> {
282 conductor_tx: mpsc::Sender<ConductorMessage>,
283 host: Host,
284}
285
286impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
287 for ConductorMessageHandler<Host>
288{
289 async fn handle_dispatch_from(
290 &mut self,
291 message: Dispatch,
292 connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
293 ) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
294 self.host
295 .handle_dispatch(message, connection, &mut self.conductor_tx)
296 .await
297 }
298
299 fn describe_chain(&self) -> impl std::fmt::Debug {
300 "ConductorMessageHandler"
301 }
302}
303
304/// The conductor manages the proxy chain lifecycle and message routing.
305///
306/// It maintains connections to all components in the chain and routes messages
307/// bidirectionally between the editor, components, and agent.
308///
309pub struct ConductorResponder<Host>
310where
311 Host: ConductorHostRole,
312{
313 conductor_rx: mpsc::Receiver<ConductorMessage>,
314
315 conductor_tx: mpsc::Sender<ConductorMessage>,
316
317 /// The instantiator for lazy initialization.
318 /// Set to None after components are instantiated.
319 instantiator: Option<Host::Instantiator>,
320
321 /// The chain of proxies before the agent (if any).
322 ///
323 /// Populated lazily when the first Initialize request is received.
324 proxies: Vec<ConnectionTo<Proxy>>,
325
326 /// If the conductor is operating in agent mode, this will direct messages to the agent.
327 /// If the conductor is operating in proxy mode, this will direct messages to the successor.
328 /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
329 successor: Arc<dyn ConductorSuccessor<Host>>,
330
331 /// Optional trace handle for sequence diagram visualization.
332 trace_handle: Option<crate::trace::TraceHandle>,
333
334 /// Defines what sort of link we have
335 host: Host,
336}
337
338impl<Host> std::fmt::Debug for ConductorResponder<Host>
339where
340 Host: ConductorHostRole,
341{
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 f.debug_struct("ConductorResponder")
344 .field("conductor_rx", &self.conductor_rx)
345 .field("conductor_tx", &self.conductor_tx)
346 .field("proxies", &self.proxies)
347 .field("trace_handle", &self.trace_handle)
348 .field("host", &self.host)
349 .finish_non_exhaustive()
350 }
351}
352
353impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
354where
355 Host: ConductorHostRole,
356{
357 async fn run_with_connection_to(
358 mut self,
359 connection: ConnectionTo<Host::Counterpart>,
360 ) -> Result<(), agent_client_protocol::Error> {
361 // Components are now spawned lazily in forward_initialize_request
362 // when the first Initialize request is received.
363
364 // This is the "central actor" of the conductor. Most other things forward messages
365 // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
366 while let Some(message) = self.conductor_rx.next().await {
367 self.handle_conductor_message(connection.clone(), message)
368 .await?;
369 }
370 Ok(())
371 }
372}
373
374impl<Host> ConductorResponder<Host>
375where
376 Host: ConductorHostRole,
377{
378 /// Recursively spawns components and builds the proxy chain.
379 ///
380 /// This function implements the recursive chain building pattern:
381 /// 1. Pop the next component from the `providers` list
382 /// 2. Create the component (either spawn subprocess or use mock)
383 /// 3. Set up JSON-RPC connection and message handlers
384 /// 4. Recursively call itself to spawn the next component
385 /// 5. When no components remain, start the message routing loop via `serve()`
386 ///
387 /// Central message handling logic for the conductor.
388 /// The conductor routes all [`ConductorMessage`] messages through to this function.
389 /// Each message corresponds to a request or notification from one component to another.
390 /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
391 /// Note that *responses to requests* are sent *directly* without going through this loop.
392 ///
393 /// The names we use are
394 ///
395 /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
396 /// * Then there is a sequence of *components* consisting of:
397 /// * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
398 /// * And finally the *agent*, which is the final component in the chain and handles the actual work.
399 ///
400 /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
401 ///
402 /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
403 /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
404 /// through a stdio server that runs on localhost and bridges messages.
405 async fn handle_conductor_message(
406 &mut self,
407 client: ConnectionTo<Host::Counterpart>,
408 message: ConductorMessage,
409 ) -> Result<(), agent_client_protocol::Error> {
410 tracing::debug!(?message, "handle_conductor_message");
411
412 match message {
413 ConductorMessage::LeftToRight {
414 target_component_index,
415 message,
416 } => {
417 // Tracing happens inside forward_client_to_agent_message, after initialization,
418 // so that component_name() has access to the populated proxies list.
419 self.forward_client_to_agent_message(target_component_index, message, client)
420 .await
421 }
422
423 ConductorMessage::RightToLeft {
424 source_component_index,
425 message,
426 } => {
427 tracing::debug!(
428 ?source_component_index,
429 message_method = ?message.method(),
430 "Conductor: AgentToClient received"
431 );
432 self.send_message_to_predecessor_of(client, source_component_index, message)
433 }
434 }
435 }
436
437 /// Send a message (request or notification) to the predecessor of the given component.
438 ///
439 /// This is a bit subtle because the relationship of the conductor
440 /// is different depending on who will be receiving the message:
441 /// * If the message is going to the conductor's client, then no changes
442 /// are needed, as the conductor is sending an agent-to-client message and
443 /// the conductor is acting as the agent.
444 /// * If the message is going to a proxy component, then we have to wrap
445 /// it in a "from successor" wrapper, because the conductor is the
446 /// proxy's client.
447 fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
448 &mut self,
449 client: ConnectionTo<Host::Counterpart>,
450 source_component_index: SourceComponentIndex,
451 message: Dispatch<Req, N>,
452 ) -> Result<(), agent_client_protocol::Error>
453 where
454 Req::Response: Send,
455 {
456 let source_component_index = match source_component_index {
457 SourceComponentIndex::Successor => self.proxies.len(),
458 SourceComponentIndex::Proxy(index) => index,
459 };
460
461 match message {
462 Dispatch::Request(request, responder) => self
463 .send_request_to_predecessor_of(client, source_component_index, request)
464 .forward_response_to(responder),
465 Dispatch::Notification(notification) => {
466 // `$/cancel_request` is connection-scoped: its `requestId` was
467 // allocated on the connection the notification arrived over
468 // and means nothing on the predecessor's connection. The SDK
469 // already propagates the cancellation hop by hop through the
470 // `forward_response_to` calls above, so drop the raw
471 // notification instead of tunneling a meaningless ID.
472 #[cfg(feature = "unstable_cancel_request")]
473 if agent_client_protocol::is_cancel_request_notification(¬ification) {
474 tracing::debug!(
475 "not forwarding hop-scoped `$/cancel_request` notification to predecessor"
476 );
477 return Ok(());
478 }
479 self.send_notification_to_predecessor_of(
480 client,
481 source_component_index,
482 notification,
483 )
484 }
485 Dispatch::Response(result, router) => router.respond_with_result(result),
486 }
487 }
488
489 fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
490 &mut self,
491 client_connection: ConnectionTo<Host::Counterpart>,
492 source_component_index: usize,
493 request: Req,
494 ) -> SentRequest<Req::Response> {
495 if source_component_index == 0 {
496 client_connection.send_request_to(Client, request)
497 } else {
498 self.proxies[source_component_index - 1].send_request(SuccessorMessage {
499 message: request,
500 meta: None,
501 })
502 }
503 }
504
505 /// Send a notification to the predecessor of the given component.
506 ///
507 /// This is a bit subtle because the relationship of the conductor
508 /// is different depending on who will be receiving the message:
509 /// * If the notification is going to the conductor's client, then no changes
510 /// are needed, as the conductor is sending an agent-to-client message and
511 /// the conductor is acting as the agent.
512 /// * If the notification is going to a proxy component, then we have to wrap
513 /// it in a "from successor" wrapper, because the conductor is the
514 /// proxy's client.
515 fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
516 &mut self,
517 client: ConnectionTo<Host::Counterpart>,
518 source_component_index: usize,
519 notification: N,
520 ) -> Result<(), agent_client_protocol::Error> {
521 tracing::debug!(
522 source_component_index,
523 proxies_len = self.proxies.len(),
524 "send_notification_to_predecessor_of"
525 );
526 if source_component_index == 0 {
527 tracing::debug!("Sending notification directly to client");
528 client.send_notification_to(Client, notification)
529 } else {
530 tracing::debug!(
531 target_proxy = source_component_index - 1,
532 "Sending notification wrapped as SuccessorMessage to proxy"
533 );
534 self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
535 message: notification,
536 meta: None,
537 })
538 }
539 }
540
541 /// Send a message (request or notification) from 'left to right'.
542 /// Left-to-right means from the client or an intermediate proxy to the component
543 /// at `target_component_index` (could be a proxy or the agent).
544 /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
545 async fn forward_client_to_agent_message(
546 &mut self,
547 target_component_index: usize,
548 message: Dispatch,
549 client: ConnectionTo<Host::Counterpart>,
550 ) -> Result<(), agent_client_protocol::Error> {
551 tracing::trace!(
552 target_component_index,
553 ?message,
554 "forward_client_to_agent_message"
555 );
556
557 // Ensure components are initialized before processing any message.
558 let message = self.ensure_initialized(client.clone(), message).await?;
559
560 // In proxy mode, if the target is beyond our component chain,
561 // forward to the conductor's own successor (via client connection)
562 if target_component_index < self.proxies.len() {
563 self.forward_message_from_client_to_proxy(target_component_index, message)
564 .await
565 } else {
566 assert_eq!(target_component_index, self.proxies.len());
567
568 debug!(
569 target_component_index,
570 proxies_count = self.proxies.len(),
571 "Proxy mode: forwarding successor message to conductor's successor"
572 );
573 let successor = self.successor.clone();
574 successor.send_message(message, client, self).await
575 }
576 }
577
578 /// Ensures components are initialized before processing messages.
579 ///
580 /// If components haven't been initialized yet, this expects the first message
581 /// to be an `initialize` request and uses it to spawn the component chain.
582 ///
583 /// Returns:
584 /// - `Ok(Some(message))` - Components are initialized, continue processing this message
585 /// - `Ok(None)` - An error response was sent, caller should return early
586 /// - `Err(_)` - A fatal error occurred
587 async fn ensure_initialized(
588 &mut self,
589 client: ConnectionTo<Host::Counterpart>,
590 message: Dispatch,
591 ) -> Result<Dispatch, Error> {
592 // Already initialized - pass through
593 let Some(instantiator) = self.instantiator.take() else {
594 return Ok(message);
595 };
596
597 let host = self.host.clone();
598 let message = host.initialize(message, client, instantiator, self).await?;
599 Ok(message)
600 }
601
602 /// Wrap a proxy component with tracing if tracing is enabled.
603 ///
604 /// Returns the component unchanged if tracing is disabled.
605 fn trace_proxy(
606 &self,
607 proxy_index: ComponentIndex,
608 successor_index: ComponentIndex,
609 component: impl ConnectTo<Conductor>,
610 ) -> DynConnectTo<Conductor> {
611 match &self.trace_handle {
612 Some(trace_handle) => {
613 trace_handle.bridge_component(proxy_index, successor_index, component)
614 }
615 None => DynConnectTo::new(component),
616 }
617 }
618
619 /// Spawn proxy components and add them to the proxies list.
620 fn spawn_proxies(
621 &mut self,
622 client: ConnectionTo<Host::Counterpart>,
623 proxy_components: Vec<DynConnectTo<Conductor>>,
624 ) -> Result<(), agent_client_protocol::Error> {
625 assert!(self.proxies.is_empty());
626
627 let num_proxies = proxy_components.len();
628 info!(proxy_count = num_proxies, "spawn_proxies");
629
630 // Special case: if there are no user-defined proxies
631 // but tracing is enabled, we make a dummy proxy that just
632 // passes through messages but which can trigger the
633 // tracing events.
634 if self.trace_handle.is_some() && num_proxies == 0 {
635 self.connect_to_proxy(
636 &client,
637 0,
638 ComponentIndex::Client,
639 ComponentIndex::Agent,
640 Proxy.builder(),
641 )?;
642 } else {
643 // Spawn each proxy component
644 for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
645 debug!(component_index, "spawning proxy");
646
647 self.connect_to_proxy(
648 &client,
649 component_index,
650 ComponentIndex::Proxy(component_index),
651 ComponentIndex::successor_of(component_index, num_proxies),
652 dyn_component,
653 )?;
654 }
655 }
656
657 info!(proxy_count = self.proxies.len(), "Proxies spawned");
658
659 Ok(())
660 }
661
662 /// Create a connection to the proxy with index `component_index` implemented in `component`.
663 ///
664 /// If tracing is enabled, the proxy's index is `trace_proxy_index` and its successor is `trace_successor_index`.
665 fn connect_to_proxy(
666 &mut self,
667 client: &ConnectionTo<Host::Counterpart>,
668 component_index: usize,
669 trace_proxy_index: ComponentIndex,
670 trace_successor_index: ComponentIndex,
671 component: impl ConnectTo<Conductor>,
672 ) -> Result<(), Error> {
673 let connection_builder = self.connection_to_proxy(component_index);
674 let connect_component =
675 self.trace_proxy(trace_proxy_index, trace_successor_index, component);
676 let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
677 self.proxies.push(proxy_connection);
678 Ok(())
679 }
680
681 /// Create the conductor's connection to the proxy with index `component_index`.
682 ///
683 /// Outgoing messages received from the proxy are sent to `self.conductor_tx` as either
684 /// left-to-right or right-to-left messages depending on whether they are wrapped
685 /// in `SuccessorMessage`.
686 fn connection_to_proxy(
687 &mut self,
688 component_index: usize,
689 ) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
690 type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
691 let mut conductor_tx = self.conductor_tx.clone();
692 Conductor
693 .builder()
694 .name(format!("conductor-to-component({component_index})"))
695 // Intercept messages sent by the proxy.
696 .on_receive_dispatch(
697 async move |dispatch: Dispatch, _connection| {
698 MatchDispatch::new(dispatch)
699 .if_message(async |dispatch: SuccessorDispatch| {
700 // ------------------
701 // SuccessorMessages sent by the proxy go to its successor.
702 //
703 // Subtle point:
704 //
705 // `ConductorToProxy` has only a single peer, `Agent`. This means that we see
706 // "successor messages" in their "desugared form". So when we intercept an *outgoing*
707 // message that matches `SuccessorMessage`, it could be one of three things
708 //
709 // - A request being sent by the proxy to its successor (hence going left->right)
710 // - A notification being sent by the proxy to its successor (hence going left->right)
711 // - A response to a request sent to the proxy *by* its successor. Here, the *request*
712 // was going right->left, but the *response* (the message we are processing now)
713 // is going left->right.
714 //
715 // So, in all cases, we forward as a left->right message.
716
717 conductor_tx
718 .send(ConductorMessage::LeftToRight {
719 target_component_index: component_index + 1,
720 message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
721 })
722 .await
723 .map_err(agent_client_protocol::util::internal_error)
724 })
725 .await
726 .otherwise(async |dispatch| {
727 // Other messagrs send by the proxy go its predecessor.
728 // As in the previous handler:
729 //
730 // Messages here are seen in their "desugared form", so we are seeing
731 // one of three things
732 //
733 // - A request being sent by the proxy to its predecessor (hence going right->left)
734 // - A notification being sent by the proxy to its predecessor (hence going right->left)
735 // - A response to a request sent to the proxy *by* its predecessor. Here, the *request*
736 // was going left->right, but the *response* (the message we are processing now)
737 // is going right->left.
738 //
739 // So, in all cases, we forward as a right->left message.
740
741 let message = ConductorMessage::RightToLeft {
742 source_component_index: SourceComponentIndex::Proxy(
743 component_index,
744 ),
745 message: dispatch,
746 };
747 conductor_tx
748 .send(message)
749 .await
750 .map_err(agent_client_protocol::util::internal_error)
751 })
752 .await
753 },
754 agent_client_protocol::on_receive_dispatch!(),
755 )
756 }
757
758 async fn forward_message_from_client_to_proxy(
759 &mut self,
760 target_component_index: usize,
761 message: Dispatch,
762 ) -> Result<(), agent_client_protocol::Error> {
763 tracing::debug!(?message, "forward_message_to_proxy");
764
765 MatchDispatch::new(message)
766 .if_request(async |_request: InitializeProxyRequest, responder| {
767 responder.respond_with_error(
768 agent_client_protocol::Error::invalid_request()
769 .data("initialize/proxy requests are only sent by the conductor"),
770 )
771 })
772 .await
773 .if_request(async |request: InitializeRequest, responder| {
774 // The pattern for `Initialize` messages is a bit subtle.
775 // Proxy receive incoming `Initialize` messages as if they
776 // were a client. The conductor (us) intercepts these and
777 // converts them to an `InitializeProxyRequest`.
778 //
779 // The proxy will then initialize itself and forward an `Initialize`
780 // request to its successor.
781 let sent = self.proxies[target_component_index]
782 .send_request(InitializeProxyRequest::from(request));
783 // The request is rewritten, so `forward_response_to` cannot be
784 // used here; wire up cancellation forwarding explicitly to
785 // keep `initialize` cancellable like every other forwarded
786 // request.
787 #[cfg(feature = "unstable_cancel_request")]
788 let sent = sent.forward_cancellation_from(responder.cancellation());
789 sent.on_receiving_result(async move |result| {
790 tracing::debug!(?result, "got initialize_proxy response from proxy");
791 responder.respond_with_result(result)
792 })
793 })
794 .await
795 .otherwise(async |message| {
796 // Otherwise, just send the message along "as is".
797 self.proxies[target_component_index].send_proxied_message(message)
798 })
799 .await
800 }
801
802 /// Invoked when sending a message from the conductor to the agent that it manages.
803 /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
804 /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
805 /// running as a proxy).
806 async fn forward_message_to_agent(
807 &mut self,
808 _client_connection: ConnectionTo<Host::Counterpart>,
809 message: Dispatch,
810 agent_connection: ConnectionTo<Agent>,
811 ) -> Result<(), Error> {
812 MatchDispatch::new(message)
813 .if_request(async |_request: InitializeProxyRequest, responder| {
814 responder.respond_with_error(
815 agent_client_protocol::Error::invalid_request()
816 .data("initialize/proxy requests are only sent by the conductor"),
817 )
818 })
819 .await
820 .otherwise(async |message| {
821 // Forward all other messages to the agent as-is.
822 agent_connection.send_proxied_message_to(Agent, message)
823 })
824 .await
825 }
826}
827
828/// Identifies a component in the conductor's chain for tracing purposes.
829///
830/// Used to track message sources and destinations through the proxy chain.
831#[derive(Debug, Clone, Copy)]
832pub enum ComponentIndex {
833 /// The client (editor) at the start of the chain.
834 Client,
835
836 /// A proxy component at the given index.
837 Proxy(usize),
838
839 /// The successor (agent in agent mode, outer conductor in proxy mode).
840 Agent,
841}
842
843impl ComponentIndex {
844 /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
845 #[must_use]
846 pub fn predecessor_of(proxy_index: usize) -> Self {
847 match proxy_index.checked_sub(1) {
848 Some(p_i) => ComponentIndex::Proxy(p_i),
849 None => ComponentIndex::Client,
850 }
851 }
852
853 /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
854 #[must_use]
855 pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
856 if proxy_index == num_proxies {
857 ComponentIndex::Agent
858 } else {
859 ComponentIndex::Proxy(proxy_index + 1)
860 }
861 }
862}
863
864/// Identifies the source of an agent-to-client message.
865///
866/// This enum handles the fact that the conductor may receive messages from two different sources:
867/// 1. From one of its managed components (identified by index)
868/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
869#[derive(Debug, Clone, Copy)]
870pub enum SourceComponentIndex {
871 /// Message from a specific component at the given index in the managed chain.
872 Proxy(usize),
873
874 /// Message from the conductor's agent or successor.
875 Successor,
876}
877
878/// Trait for lazy proxy instantiation (proxy mode).
879///
880/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
881/// are proxies that forward to an outer conductor.
882pub trait InstantiateProxies: Send {
883 /// Instantiate proxy components based on the Initialize request.
884 ///
885 /// Returns proxy components typed as `DynConnectTo<Conductor>` since proxies
886 /// communicate with the conductor.
887 fn instantiate_proxies(
888 self: Box<Self>,
889 req: InitializeRequest,
890 ) -> futures::future::BoxFuture<
891 'static,
892 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
893 >;
894}
895
896/// Simple implementation: provide all proxy components unconditionally.
897///
898/// Requires `T: ConnectTo<Conductor>`.
899impl<T> InstantiateProxies for Vec<T>
900where
901 T: ConnectTo<Conductor> + 'static,
902{
903 fn instantiate_proxies(
904 self: Box<Self>,
905 req: InitializeRequest,
906 ) -> futures::future::BoxFuture<
907 'static,
908 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
909 > {
910 Box::pin(async move {
911 let components: Vec<DynConnectTo<Conductor>> =
912 (*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
913 Ok((req, components))
914 })
915 }
916}
917
918/// Dynamic implementation: closure receives the Initialize request and returns proxies.
919impl<F, Fut> InstantiateProxies for F
920where
921 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
922 Fut: std::future::Future<
923 Output = Result<
924 (InitializeRequest, Vec<DynConnectTo<Conductor>>),
925 agent_client_protocol::Error,
926 >,
927 > + Send
928 + 'static,
929{
930 fn instantiate_proxies(
931 self: Box<Self>,
932 req: InitializeRequest,
933 ) -> futures::future::BoxFuture<
934 'static,
935 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
936 > {
937 Box::pin(async move { (*self)(req).await })
938 }
939}
940
941/// Trait for lazy proxy and agent instantiation (agent mode).
942///
943/// Used by conductors in agent mode (`ConductorToClient`) where there are
944/// zero or more proxies followed by an agent component.
945pub trait InstantiateProxiesAndAgent: Send {
946 /// Instantiate proxy and agent components based on the Initialize request.
947 ///
948 /// Returns the (possibly modified) request, a vector of proxy components
949 /// (typed as `DynConnectTo<Conductor>`), and the agent component
950 /// (typed as `DynConnectTo<Client>`).
951 fn instantiate_proxies_and_agent(
952 self: Box<Self>,
953 req: InitializeRequest,
954 ) -> futures::future::BoxFuture<
955 'static,
956 Result<
957 (
958 InitializeRequest,
959 Vec<DynConnectTo<Conductor>>,
960 DynConnectTo<Client>,
961 ),
962 agent_client_protocol::Error,
963 >,
964 >;
965}
966
967/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
968#[derive(Debug)]
969pub struct AgentOnly<A>(pub A);
970
971impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
972 fn instantiate_proxies_and_agent(
973 self: Box<Self>,
974 req: InitializeRequest,
975 ) -> futures::future::BoxFuture<
976 'static,
977 Result<
978 (
979 InitializeRequest,
980 Vec<DynConnectTo<Conductor>>,
981 DynConnectTo<Client>,
982 ),
983 agent_client_protocol::Error,
984 >,
985 > {
986 Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
987 }
988}
989
990/// Builder for creating proxies and agent components.
991///
992/// # Example
993/// ```ignore
994/// ProxiesAndAgent::new(ElizaAgent::new())
995/// .proxy(LoggingProxy::new())
996/// .proxy(AuthProxy::new())
997/// ```
998#[derive(Debug)]
999pub struct ProxiesAndAgent {
1000 proxies: Vec<DynConnectTo<Conductor>>,
1001 agent: DynConnectTo<Client>,
1002}
1003
1004impl ProxiesAndAgent {
1005 /// Create a new builder with the given agent component.
1006 pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
1007 Self {
1008 proxies: vec![],
1009 agent: DynConnectTo::new(agent),
1010 }
1011 }
1012
1013 /// Add a single proxy component.
1014 #[must_use]
1015 pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
1016 self.proxies.push(DynConnectTo::new(proxy));
1017 self
1018 }
1019
1020 /// Add multiple proxy components.
1021 #[must_use]
1022 pub fn proxies<P, I>(mut self, proxies: I) -> Self
1023 where
1024 P: ConnectTo<Conductor> + 'static,
1025 I: IntoIterator<Item = P>,
1026 {
1027 self.proxies
1028 .extend(proxies.into_iter().map(DynConnectTo::new));
1029 self
1030 }
1031}
1032
1033impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1034 fn instantiate_proxies_and_agent(
1035 self: Box<Self>,
1036 req: InitializeRequest,
1037 ) -> futures::future::BoxFuture<
1038 'static,
1039 Result<
1040 (
1041 InitializeRequest,
1042 Vec<DynConnectTo<Conductor>>,
1043 DynConnectTo<Client>,
1044 ),
1045 agent_client_protocol::Error,
1046 >,
1047 > {
1048 Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1049 }
1050}
1051
1052/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1053impl<F, Fut> InstantiateProxiesAndAgent for F
1054where
1055 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1056 Fut: std::future::Future<
1057 Output = Result<
1058 (
1059 InitializeRequest,
1060 Vec<DynConnectTo<Conductor>>,
1061 DynConnectTo<Client>,
1062 ),
1063 agent_client_protocol::Error,
1064 >,
1065 > + Send
1066 + 'static,
1067{
1068 fn instantiate_proxies_and_agent(
1069 self: Box<Self>,
1070 req: InitializeRequest,
1071 ) -> futures::future::BoxFuture<
1072 'static,
1073 Result<
1074 (
1075 InitializeRequest,
1076 Vec<DynConnectTo<Conductor>>,
1077 DynConnectTo<Client>,
1078 ),
1079 agent_client_protocol::Error,
1080 >,
1081 > {
1082 Box::pin(async move { (*self)(req).await })
1083 }
1084}
1085
1086/// Messages sent to the conductor's main event loop for routing.
1087///
1088/// These messages enable the conductor to route communication between:
1089/// - The editor and the first component
1090/// - Components and their successors in the chain
1091/// - Components and their clients (editor or predecessor)
1092///
1093/// All spawned tasks send messages via this enum through a shared channel,
1094/// allowing centralized routing logic in the `serve()` loop.
1095#[derive(Debug)]
1096pub enum ConductorMessage {
1097 /// If this message is a request or notification, then it is going "left-to-right"
1098 /// (e.g., a component making a request of its successor).
1099 ///
1100 /// If this message is a response, then it is going right-to-left
1101 /// (i.e., the successor answering a request made by its predecessor).
1102 LeftToRight {
1103 target_component_index: usize,
1104 message: Dispatch,
1105 },
1106
1107 /// If this message is a request or notification, then it is going "right-to-left"
1108 /// (e.g., a component making a request of its predecessor).
1109 ///
1110 /// If this message is a response, then it is going "left-to-right"
1111 /// (i.e., the predecessor answering a request made by its successor).
1112 RightToLeft {
1113 source_component_index: SourceComponentIndex,
1114 message: Dispatch,
1115 },
1116}
1117
1118/// Trait implemented for the two links the conductor can use:
1119///
1120/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1121/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1122pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
1123 /// The type used to instantiate components for this link type.
1124 type Instantiator: Send;
1125
1126 /// Handle initialization: parse the init request, instantiate components, and spawn them.
1127 ///
1128 /// Takes ownership of the instantiator and returns the (possibly modified) init request
1129 /// wrapped in a Dispatch for forwarding.
1130 fn initialize(
1131 &self,
1132 message: Dispatch,
1133 connection: ConnectionTo<Self::Counterpart>,
1134 instantiator: Self::Instantiator,
1135 responder: &mut ConductorResponder<Self>,
1136 ) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
1137
1138 /// Handle an incoming message from the client or conductor, depending on `Self`
1139 fn handle_dispatch(
1140 &self,
1141 message: Dispatch,
1142 connection: ConnectionTo<Self::Counterpart>,
1143 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1144 ) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
1145}
1146
1147/// Conductor acting as an agent
1148impl ConductorHostRole for Agent {
1149 type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1150
1151 async fn initialize(
1152 &self,
1153 message: Dispatch,
1154 client_connection: ConnectionTo<Client>,
1155 instantiator: Self::Instantiator,
1156 responder: &mut ConductorResponder<Self>,
1157 ) -> Result<Dispatch, agent_client_protocol::Error> {
1158 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1159
1160 // Not yet initialized - expect an initialize request.
1161 // Error if we get anything else.
1162 let Dispatch::Request(request, init_responder) = message else {
1163 message.respond_with_error(invalid_request(), client_connection.clone())?;
1164 return Err(invalid_request());
1165 };
1166 if !InitializeRequest::matches_method(request.method()) {
1167 init_responder.respond_with_error(invalid_request())?;
1168 return Err(invalid_request());
1169 }
1170
1171 let init_request =
1172 match InitializeRequest::parse_message(request.method(), request.params()) {
1173 Ok(r) => r,
1174 Err(error) => {
1175 init_responder.respond_with_error(error)?;
1176 return Err(invalid_request());
1177 }
1178 };
1179
1180 // Instantiate proxies and agent
1181 let (modified_req, proxy_components, agent_component) = instantiator
1182 .instantiate_proxies_and_agent(init_request)
1183 .await?;
1184
1185 // Spawn the agent component
1186 debug!(?agent_component, "spawning agent");
1187
1188 let connection_to_agent = client_connection.spawn_connection(
1189 Client
1190 .builder()
1191 .name("conductor-to-agent")
1192 // Intercept agent-to-client messages from the agent.
1193 .on_receive_dispatch(
1194 {
1195 let mut conductor_tx = responder.conductor_tx.clone();
1196 async move |dispatch: Dispatch, _cx| {
1197 conductor_tx
1198 .send(ConductorMessage::RightToLeft {
1199 source_component_index: SourceComponentIndex::Successor,
1200 message: dispatch,
1201 })
1202 .await
1203 .map_err(agent_client_protocol::util::internal_error)
1204 }
1205 },
1206 agent_client_protocol::on_receive_dispatch!(),
1207 ),
1208 agent_component,
1209 )?;
1210 responder.successor = Arc::new(connection_to_agent);
1211
1212 // Spawn the proxy components
1213 responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1214
1215 Ok(Dispatch::Request(
1216 modified_req.to_untyped_message()?,
1217 init_responder,
1218 ))
1219 }
1220
1221 async fn handle_dispatch(
1222 &self,
1223 message: Dispatch,
1224 client_connection: ConnectionTo<Client>,
1225 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1226 ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1227 tracing::debug!(
1228 method = ?message.method(),
1229 "ConductorToClient::handle_dispatch"
1230 );
1231 MatchDispatchFrom::new(message, &client_connection)
1232 // Any incoming messages from the client are client-to-agent messages targeting the first component.
1233 .if_message_from(Client, async move |message: Dispatch| {
1234 tracing::debug!(
1235 method = ?message.method(),
1236 "ConductorToClient::handle_dispatch - matched Client"
1237 );
1238 ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
1239 })
1240 .await
1241 .done()
1242 }
1243}
1244
1245/// Conductor acting as a proxy
1246impl ConductorHostRole for Proxy {
1247 type Instantiator = Box<dyn InstantiateProxies>;
1248
1249 async fn initialize(
1250 &self,
1251 message: Dispatch,
1252 client_connection: ConnectionTo<Conductor>,
1253 instantiator: Self::Instantiator,
1254 responder: &mut ConductorResponder<Self>,
1255 ) -> Result<Dispatch, agent_client_protocol::Error> {
1256 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1257
1258 // Not yet initialized - expect an InitializeProxy request.
1259 // Error if we get anything else.
1260 let Dispatch::Request(request, init_responder) = message else {
1261 message.respond_with_error(invalid_request(), client_connection.clone())?;
1262 return Err(invalid_request());
1263 };
1264 if !InitializeProxyRequest::matches_method(request.method()) {
1265 init_responder.respond_with_error(invalid_request())?;
1266 return Err(invalid_request());
1267 }
1268
1269 let InitializeProxyRequest { initialize } =
1270 match InitializeProxyRequest::parse_message(request.method(), request.params()) {
1271 Ok(r) => r,
1272 Err(error) => {
1273 init_responder.respond_with_error(error)?;
1274 return Err(invalid_request());
1275 }
1276 };
1277
1278 tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1279
1280 // Instantiate proxies (no agent in proxy mode)
1281 let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1282
1283 // In proxy mode, our successor is the outer conductor (via our client connection)
1284 responder.successor = Arc::new(GrandSuccessor);
1285
1286 // Spawn the proxy components
1287 responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1288
1289 Ok(Dispatch::Request(
1290 modified_req.to_untyped_message()?,
1291 init_responder,
1292 ))
1293 }
1294
1295 async fn handle_dispatch(
1296 &self,
1297 message: Dispatch,
1298 client_connection: ConnectionTo<Conductor>,
1299 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1300 ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1301 tracing::debug!(
1302 method = ?message.method(),
1303 ?message,
1304 "ConductorToConductor::handle_dispatch"
1305 );
1306 MatchDispatchFrom::new(message, &client_connection)
1307 .if_message_from(Agent, {
1308 // Messages from our successor arrive already unwrapped
1309 // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1310 async |message: Dispatch| {
1311 tracing::debug!(
1312 method = ?message.method(),
1313 "ConductorToConductor::handle_dispatch - matched Agent"
1314 );
1315 let mut conductor_tx = conductor_tx.clone();
1316 ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
1317 .await
1318 }
1319 })
1320 .await
1321 // Any incoming messages from the client are client-to-agent messages targeting the first component.
1322 .if_message_from(Client, async |message: Dispatch| {
1323 tracing::debug!(
1324 method = ?message.method(),
1325 "ConductorToConductor::handle_dispatch - matched Client"
1326 );
1327 let mut conductor_tx = conductor_tx.clone();
1328 ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
1329 .await
1330 })
1331 .await
1332 .done()
1333 }
1334}
1335
1336pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
1337 /// Send a message to the successor.
1338 fn send_message<'a>(
1339 &self,
1340 message: Dispatch,
1341 connection_to_conductor: ConnectionTo<Host::Counterpart>,
1342 responder: &'a mut ConductorResponder<Host>,
1343 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
1344}
1345
1346impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
1347 fn send_message<'a>(
1348 &self,
1349 #[expect(unused_variables)] message: Dispatch,
1350 #[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
1351 #[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
1352 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1353 let error = self.clone();
1354 Box::pin(std::future::ready(Err(error)))
1355 }
1356}
1357
1358/// A dummy type handling messages sent to the conductor's
1359/// successor when it is acting as a proxy.
1360struct GrandSuccessor;
1361
1362/// When the conductor is acting as an proxy, messages sent by
1363/// the last proxy go to the conductor's successor.
1364///
1365/// ```text
1366/// client --> Conductor -----------------------------> GrandSuccessor
1367/// | |
1368/// +-> Proxy[0] -> ... -> Proxy[n-1] -+
1369/// ```
1370impl ConductorSuccessor<Proxy> for GrandSuccessor {
1371 fn send_message<'a>(
1372 &self,
1373 message: Dispatch,
1374 connection: ConnectionTo<Conductor>,
1375 _responder: &'a mut ConductorResponder<Proxy>,
1376 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1377 Box::pin(async move {
1378 debug!("Proxy mode: forwarding successor message to conductor's successor");
1379 connection.send_proxied_message_to(Agent, message)
1380 })
1381 }
1382}
1383
1384/// When the conductor is acting as an agent, messages sent by
1385/// the last proxy to its successor go to the internal agent
1386/// (`self`).
1387impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
1388 fn send_message<'a>(
1389 &self,
1390 message: Dispatch,
1391 connection: ConnectionTo<Client>,
1392 responder: &'a mut ConductorResponder<Agent>,
1393 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1394 let connection_to_agent = self.clone();
1395 Box::pin(async move {
1396 debug!("Proxy mode: forwarding successor message to conductor's successor");
1397 responder
1398 .forward_message_to_agent(connection, message, connection_to_agent)
1399 .await
1400 })
1401 }
1402}