agent_client_protocol_conductor/conductor.rs
1//! # Conductor: ACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//! requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//! to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//! "my-conductor",
73//! vec![proxy1, proxy2, agent],
74//! None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//! "my-conductor",
87//! |cx, conductor_tx, init_req| async move {
88//! // Examine capabilities
89//! let needs_auth = has_auth_capability(&init_req);
90//!
91//! let mut components = Vec::new();
92//! if needs_auth {
93//! components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//! }
95//! components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//! // Return (potentially modified) request and component list
98//! Ok((init_req, components))
99//! },
100//! None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &ConnectionTo` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<ConnectionTo>` of spawned components
112
113use std::{collections::HashMap, sync::Arc};
114
115use agent_client_protocol::{
116 Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
117 Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
118};
119use agent_client_protocol::{
120 Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest, UntypedMessage,
121};
122use agent_client_protocol::{
123 HandleDispatchFrom,
124 schema::{InitializeProxyRequest, InitializeRequest, NewSessionRequest},
125 util::MatchDispatchFrom,
126};
127use agent_client_protocol::{
128 Handled,
129 schema::{
130 McpConnectRequest, McpConnectResponse, McpDisconnectNotification, McpOverAcpMessage,
131 SuccessorMessage,
132 },
133};
134use futures::{
135 SinkExt, StreamExt,
136 channel::mpsc::{self},
137};
138use tracing::{debug, info};
139
140use crate::conductor::mcp_bridge::{
141 McpBridgeConnection, McpBridgeConnectionActor, McpBridgeListeners,
142};
143
144mod mcp_bridge;
145
146/// The conductor manages the proxy chain lifecycle and message routing.
147///
148/// It maintains connections to all components in the chain and routes messages
149/// bidirectionally between the editor, components, and agent.
150///
151#[derive(Debug)]
152pub struct ConductorImpl<Host: ConductorHostRole> {
153 host: Host,
154 name: String,
155 instantiator: Host::Instantiator,
156 mcp_bridge_mode: crate::McpBridgeMode,
157 trace_writer: Option<crate::trace::TraceWriter>,
158}
159
160impl<Host: ConductorHostRole> ConductorImpl<Host> {
161 pub fn new(
162 host: Host,
163 name: impl ToString,
164 instantiator: Host::Instantiator,
165 mcp_bridge_mode: crate::McpBridgeMode,
166 ) -> Self {
167 ConductorImpl {
168 name: name.to_string(),
169 host,
170 instantiator,
171 mcp_bridge_mode,
172 trace_writer: None,
173 }
174 }
175}
176
177impl ConductorImpl<Agent> {
178 /// Create a conductor in agent mode (the last component is an agent).
179 pub fn new_agent(
180 name: impl ToString,
181 instantiator: impl InstantiateProxiesAndAgent + 'static,
182 mcp_bridge_mode: crate::McpBridgeMode,
183 ) -> Self {
184 ConductorImpl::new(Agent, name, Box::new(instantiator), mcp_bridge_mode)
185 }
186}
187
188impl ConductorImpl<Proxy> {
189 /// Create a conductor in proxy mode (forwards to another conductor).
190 pub fn new_proxy(
191 name: impl ToString,
192 instantiator: impl InstantiateProxies + 'static,
193 mcp_bridge_mode: crate::McpBridgeMode,
194 ) -> Self {
195 ConductorImpl::new(Proxy, name, Box::new(instantiator), mcp_bridge_mode)
196 }
197}
198
199impl<Host: ConductorHostRole> ConductorImpl<Host> {
200 /// Enable trace logging to a custom destination.
201 ///
202 /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
203 #[must_use]
204 pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
205 self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
206 self
207 }
208
209 /// Enable trace logging to a file path.
210 ///
211 /// Events will be written as newline-delimited JSON (`.jsons` format).
212 /// Use `agent-client-protocol-trace-viewer` to view the trace as an interactive sequence diagram.
213 pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
214 self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
215 Ok(self)
216 }
217
218 /// Enable trace logging with an existing TraceWriter.
219 #[must_use]
220 pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
221 self.trace_writer = Some(writer);
222 self
223 }
224
225 /// Run the conductor with a transport.
226 pub async fn run(
227 self,
228 transport: impl ConnectTo<Host>,
229 ) -> Result<(), agent_client_protocol::Error> {
230 let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
231
232 // Set up tracing if enabled - spawn writer task and get handle
233 let trace_handle;
234 let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
235 if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
236 trace_handle = Some(h);
237 trace_future = Box::pin(f);
238 } else {
239 trace_handle = None;
240 trace_future = Box::pin(std::future::ready(Ok(())));
241 }
242
243 let responder = ConductorResponder {
244 conductor_rx,
245 conductor_tx: conductor_tx.clone(),
246 instantiator: Some(self.instantiator),
247 bridge_listeners: McpBridgeListeners::default(),
248 bridge_connections: HashMap::default(),
249 mcp_bridge_mode: self.mcp_bridge_mode,
250 proxies: Vec::default(),
251 successor: Arc::new(agent_client_protocol::util::internal_error(
252 "successor not initialized",
253 )),
254 trace_handle,
255 host: self.host.clone(),
256 };
257
258 Builder::new_with(
259 self.host.clone(),
260 ConductorMessageHandler {
261 conductor_tx,
262 host: self.host.clone(),
263 },
264 )
265 .name(self.name)
266 .with_responder(responder)
267 .with_spawned(|_cx| trace_future)
268 .connect_to(transport)
269 .await
270 }
271
272 async fn incoming_message_from_client(
273 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
274 message: Dispatch,
275 ) -> Result<(), agent_client_protocol::Error> {
276 conductor_tx
277 .send(ConductorMessage::LeftToRight {
278 target_component_index: 0,
279 message,
280 })
281 .await
282 .map_err(agent_client_protocol::util::internal_error)
283 }
284
285 async fn incoming_message_from_agent(
286 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
287 message: Dispatch,
288 ) -> Result<(), agent_client_protocol::Error> {
289 conductor_tx
290 .send(ConductorMessage::RightToLeft {
291 source_component_index: SourceComponentIndex::Successor,
292 message,
293 })
294 .await
295 .map_err(agent_client_protocol::util::internal_error)
296 }
297}
298
299impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
300 async fn connect_to(
301 self,
302 client: impl ConnectTo<Host>,
303 ) -> Result<(), agent_client_protocol::Error> {
304 self.run(client).await
305 }
306}
307
308struct ConductorMessageHandler<Host: ConductorHostRole> {
309 conductor_tx: mpsc::Sender<ConductorMessage>,
310 host: Host,
311}
312
313impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
314 for ConductorMessageHandler<Host>
315{
316 async fn handle_dispatch_from(
317 &mut self,
318 message: Dispatch,
319 connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
320 ) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
321 self.host
322 .handle_dispatch(message, connection, &mut self.conductor_tx)
323 .await
324 }
325
326 fn describe_chain(&self) -> impl std::fmt::Debug {
327 "ConductorMessageHandler"
328 }
329}
330
331/// The conductor manages the proxy chain lifecycle and message routing.
332///
333/// It maintains connections to all components in the chain and routes messages
334/// bidirectionally between the editor, components, and agent.
335///
336pub struct ConductorResponder<Host>
337where
338 Host: ConductorHostRole,
339{
340 conductor_rx: mpsc::Receiver<ConductorMessage>,
341
342 conductor_tx: mpsc::Sender<ConductorMessage>,
343
344 /// Manages the TCP listeners for MCP connections that will be proxied over ACP.
345 bridge_listeners: McpBridgeListeners,
346
347 /// Manages active connections to MCP clients.
348 bridge_connections: HashMap<String, McpBridgeConnection>,
349
350 /// The instantiator for lazy initialization.
351 /// Set to None after components are instantiated.
352 instantiator: Option<Host::Instantiator>,
353
354 /// The chain of proxies before the agent (if any).
355 ///
356 /// Populated lazily when the first Initialize request is received.
357 proxies: Vec<ConnectionTo<Proxy>>,
358
359 /// If the conductor is operating in agent mode, this will direct messages to the agent.
360 /// If the conductor is operating in proxy mode, this will direct messages to the successor.
361 /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
362 successor: Arc<dyn ConductorSuccessor<Host>>,
363
364 /// Mode for the MCP bridge (determines how to spawn bridge processes).
365 mcp_bridge_mode: crate::McpBridgeMode,
366
367 /// Optional trace handle for sequence diagram visualization.
368 trace_handle: Option<crate::trace::TraceHandle>,
369
370 /// Defines what sort of link we have
371 host: Host,
372}
373
374impl<Host> std::fmt::Debug for ConductorResponder<Host>
375where
376 Host: ConductorHostRole,
377{
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 f.debug_struct("ConductorResponder")
380 .field("conductor_rx", &self.conductor_rx)
381 .field("conductor_tx", &self.conductor_tx)
382 .field("bridge_listeners", &self.bridge_listeners)
383 .field("bridge_connections", &self.bridge_connections)
384 .field("proxies", &self.proxies)
385 .field("mcp_bridge_mode", &self.mcp_bridge_mode)
386 .field("trace_handle", &self.trace_handle)
387 .field("host", &self.host)
388 .finish_non_exhaustive()
389 }
390}
391
392impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
393where
394 Host: ConductorHostRole,
395{
396 async fn run_with_connection_to(
397 mut self,
398 connection: ConnectionTo<Host::Counterpart>,
399 ) -> Result<(), agent_client_protocol::Error> {
400 // Components are now spawned lazily in forward_initialize_request
401 // when the first Initialize request is received.
402
403 // This is the "central actor" of the conductor. Most other things forward messages
404 // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
405 while let Some(message) = self.conductor_rx.next().await {
406 self.handle_conductor_message(connection.clone(), message)
407 .await?;
408 }
409 Ok(())
410 }
411}
412
413impl<Host> ConductorResponder<Host>
414where
415 Host: ConductorHostRole,
416{
417 /// Recursively spawns components and builds the proxy chain.
418 ///
419 /// This function implements the recursive chain building pattern:
420 /// 1. Pop the next component from the `providers` list
421 /// 2. Create the component (either spawn subprocess or use mock)
422 /// 3. Set up JSON-RPC connection and message handlers
423 /// 4. Recursively call itself to spawn the next component
424 /// 5. When no components remain, start the message routing loop via `serve()`
425 ///
426 /// Central message handling logic for the conductor.
427 /// The conductor routes all [`ConductorMessage`] messages through to this function.
428 /// Each message corresponds to a request or notification from one component to another.
429 /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
430 /// Note that *responses to requests* are sent *directly* without going through this loop.
431 ///
432 /// The names we use are
433 ///
434 /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
435 /// * Then there is a sequence of *components* consisting of:
436 /// * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
437 /// * And finally the *agent*, which is the final component in the chain and handles the actual work.
438 ///
439 /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
440 ///
441 /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
442 /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
443 /// through a stdio server that runs on localhost and bridges messages.
444 async fn handle_conductor_message(
445 &mut self,
446 client: ConnectionTo<Host::Counterpart>,
447 message: ConductorMessage,
448 ) -> Result<(), agent_client_protocol::Error> {
449 tracing::debug!(?message, "handle_conductor_message");
450
451 match message {
452 ConductorMessage::LeftToRight {
453 target_component_index,
454 message,
455 } => {
456 // Tracing happens inside forward_client_to_agent_message, after initialization,
457 // so that component_name() has access to the populated proxies list.
458 self.forward_client_to_agent_message(target_component_index, message, client)
459 .await
460 }
461
462 ConductorMessage::RightToLeft {
463 source_component_index,
464 message,
465 } => {
466 tracing::debug!(
467 ?source_component_index,
468 message_method = ?message.method(),
469 "Conductor: AgentToClient received"
470 );
471 self.send_message_to_predecessor_of(client, source_component_index, message)
472 }
473
474 // New MCP connection request. Send it back along the chain to get a connection id.
475 // When the connection id arrives, send a message back into this conductor loop with
476 // the connection id and the (as yet unspawned) actor.
477 ConductorMessage::McpConnectionReceived {
478 acp_url,
479 connection,
480 actor,
481 } => {
482 // MCP connection requests always come from the agent
483 // (we must be in agent mode, in fact), so send the MCP request
484 // to the final proxy.
485 self.send_request_to_predecessor_of(
486 client,
487 self.proxies.len(),
488 McpConnectRequest {
489 acp_url,
490 meta: None,
491 },
492 )
493 .on_receiving_result({
494 let mut conductor_tx = self.conductor_tx.clone();
495 async move |result| {
496 match result {
497 Ok(response) => conductor_tx
498 .send(ConductorMessage::McpConnectionEstablished {
499 response,
500 actor,
501 connection,
502 })
503 .await
504 .map_err(|_| agent_client_protocol::Error::internal_error()),
505 Err(_) => {
506 // Error occurred, just drop the connection.
507 Ok(())
508 }
509 }
510 }
511 })
512 }
513
514 // MCP connection successfully established. Spawn the actor
515 // and insert the connection into our map for future reference.
516 ConductorMessage::McpConnectionEstablished {
517 response: McpConnectResponse { connection_id, .. },
518 actor,
519 connection,
520 } => {
521 self.bridge_connections
522 .insert(connection_id.clone(), connection);
523 client.spawn(actor.run(connection_id))
524 }
525
526 // Message meant for the MCP client received. Forward it to the appropriate actor's mailbox.
527 ConductorMessage::McpClientToMcpServer {
528 connection_id,
529 message,
530 } => {
531 let wrapped = message.map(
532 |request, responder| {
533 (
534 McpOverAcpMessage {
535 connection_id: connection_id.clone(),
536 message: request,
537 meta: None,
538 },
539 responder,
540 )
541 },
542 |notification| McpOverAcpMessage {
543 connection_id: connection_id.clone(),
544 message: notification,
545 meta: None,
546 },
547 );
548
549 // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent,
550 // so send them to the final proxy.
551 self.send_message_to_predecessor_of(
552 client,
553 SourceComponentIndex::Successor,
554 wrapped,
555 )
556 }
557
558 // MCP client disconnected. Remove it from our map and send the
559 // notification backwards along the chain.
560 ConductorMessage::McpConnectionDisconnected { notification } => {
561 // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent.
562
563 self.bridge_connections.remove(¬ification.connection_id);
564 self.send_notification_to_predecessor_of(client, self.proxies.len(), notification)
565 }
566 }
567 }
568
569 /// Send a message (request or notification) to the predecessor of the given component.
570 ///
571 /// This is a bit subtle because the relationship of the conductor
572 /// is different depending on who will be receiving the message:
573 /// * If the message is going to the conductor's client, then no changes
574 /// are needed, as the conductor is sending an agent-to-client message and
575 /// the conductor is acting as the agent.
576 /// * If the message is going to a proxy component, then we have to wrap
577 /// it in a "from successor" wrapper, because the conductor is the
578 /// proxy's client.
579 fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
580 &mut self,
581 client: ConnectionTo<Host::Counterpart>,
582 source_component_index: SourceComponentIndex,
583 message: Dispatch<Req, N>,
584 ) -> Result<(), agent_client_protocol::Error>
585 where
586 Req::Response: Send,
587 {
588 let source_component_index = match source_component_index {
589 SourceComponentIndex::Successor => self.proxies.len(),
590 SourceComponentIndex::Proxy(index) => index,
591 };
592
593 match message {
594 Dispatch::Request(request, responder) => self
595 .send_request_to_predecessor_of(client, source_component_index, request)
596 .forward_response_to(responder),
597 Dispatch::Notification(notification) => self.send_notification_to_predecessor_of(
598 client,
599 source_component_index,
600 notification,
601 ),
602 Dispatch::Response(result, router) => router.respond_with_result(result),
603 }
604 }
605
606 fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
607 &mut self,
608 client_connection: ConnectionTo<Host::Counterpart>,
609 source_component_index: usize,
610 request: Req,
611 ) -> SentRequest<Req::Response> {
612 if source_component_index == 0 {
613 client_connection.send_request_to(Client, request)
614 } else {
615 self.proxies[source_component_index - 1].send_request(SuccessorMessage {
616 message: request,
617 meta: None,
618 })
619 }
620 }
621
622 /// Send a notification to the predecessor of the given component.
623 ///
624 /// This is a bit subtle because the relationship of the conductor
625 /// is different depending on who will be receiving the message:
626 /// * If the notification is going to the conductor's client, then no changes
627 /// are needed, as the conductor is sending an agent-to-client message and
628 /// the conductor is acting as the agent.
629 /// * If the notification is going to a proxy component, then we have to wrap
630 /// it in a "from successor" wrapper, because the conductor is the
631 /// proxy's client.
632 fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
633 &mut self,
634 client: ConnectionTo<Host::Counterpart>,
635 source_component_index: usize,
636 notification: N,
637 ) -> Result<(), agent_client_protocol::Error> {
638 tracing::debug!(
639 source_component_index,
640 proxies_len = self.proxies.len(),
641 "send_notification_to_predecessor_of"
642 );
643 if source_component_index == 0 {
644 tracing::debug!("Sending notification directly to client");
645 client.send_notification_to(Client, notification)
646 } else {
647 tracing::debug!(
648 target_proxy = source_component_index - 1,
649 "Sending notification wrapped as SuccessorMessage to proxy"
650 );
651 self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
652 message: notification,
653 meta: None,
654 })
655 }
656 }
657
658 /// Send a message (request or notification) from 'left to right'.
659 /// Left-to-right means from the client or an intermediate proxy to the component
660 /// at `target_component_index` (could be a proxy or the agent).
661 /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
662 async fn forward_client_to_agent_message(
663 &mut self,
664 target_component_index: usize,
665 message: Dispatch,
666 client: ConnectionTo<Host::Counterpart>,
667 ) -> Result<(), agent_client_protocol::Error> {
668 tracing::trace!(
669 target_component_index,
670 ?message,
671 "forward_client_to_agent_message"
672 );
673
674 // Ensure components are initialized before processing any message.
675 let message = self.ensure_initialized(client.clone(), message).await?;
676
677 // In proxy mode, if the target is beyond our component chain,
678 // forward to the conductor's own successor (via client connection)
679 if target_component_index < self.proxies.len() {
680 self.forward_message_from_client_to_proxy(target_component_index, message)
681 .await
682 } else {
683 assert_eq!(target_component_index, self.proxies.len());
684
685 debug!(
686 target_component_index,
687 proxies_count = self.proxies.len(),
688 "Proxy mode: forwarding successor message to conductor's successor"
689 );
690 let successor = self.successor.clone();
691 successor.send_message(message, client, self).await
692 }
693 }
694
695 /// Ensures components are initialized before processing messages.
696 ///
697 /// If components haven't been initialized yet, this expects the first message
698 /// to be an `initialize` request and uses it to spawn the component chain.
699 ///
700 /// Returns:
701 /// - `Ok(Some(message))` - Components are initialized, continue processing this message
702 /// - `Ok(None)` - An error response was sent, caller should return early
703 /// - `Err(_)` - A fatal error occurred
704 async fn ensure_initialized(
705 &mut self,
706 client: ConnectionTo<Host::Counterpart>,
707 message: Dispatch,
708 ) -> Result<Dispatch, Error> {
709 // Already initialized - pass through
710 let Some(instantiator) = self.instantiator.take() else {
711 return Ok(message);
712 };
713
714 let host = self.host.clone();
715 let message = host.initialize(message, client, instantiator, self).await?;
716 Ok(message)
717 }
718
719 /// Wrap a proxy component with tracing if tracing is enabled.
720 ///
721 /// Returns the component unchanged if tracing is disabled.
722 fn trace_proxy(
723 &self,
724 proxy_index: ComponentIndex,
725 successor_index: ComponentIndex,
726 component: impl ConnectTo<Conductor>,
727 ) -> DynConnectTo<Conductor> {
728 match &self.trace_handle {
729 Some(trace_handle) => {
730 trace_handle.bridge_component(proxy_index, successor_index, component)
731 }
732 None => DynConnectTo::new(component),
733 }
734 }
735
736 /// Spawn proxy components and add them to the proxies list.
737 fn spawn_proxies(
738 &mut self,
739 client: ConnectionTo<Host::Counterpart>,
740 proxy_components: Vec<DynConnectTo<Conductor>>,
741 ) -> Result<(), agent_client_protocol::Error> {
742 assert!(self.proxies.is_empty());
743
744 let num_proxies = proxy_components.len();
745 info!(proxy_count = num_proxies, "spawn_proxies");
746
747 // Special case: if there are no user-defined proxies
748 // but tracing is enabled, we make a dummy proxy that just
749 // passes through messages but which can trigger the
750 // tracing events.
751 if self.trace_handle.is_some() && num_proxies == 0 {
752 self.connect_to_proxy(
753 &client,
754 0,
755 ComponentIndex::Client,
756 ComponentIndex::Agent,
757 Proxy.builder(),
758 )?;
759 } else {
760 // Spawn each proxy component
761 for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
762 debug!(component_index, "spawning proxy");
763
764 self.connect_to_proxy(
765 &client,
766 component_index,
767 ComponentIndex::Proxy(component_index),
768 ComponentIndex::successor_of(component_index, num_proxies),
769 dyn_component,
770 )?;
771 }
772 }
773
774 info!(proxy_count = self.proxies.len(), "Proxies spawned");
775
776 Ok(())
777 }
778
779 /// Create a connection to the proxy with index `component_index` implemented in `component`.
780 ///
781 /// If tracing is enabled, the proxy's index is `trace_proxy_index` and its successor is `trace_successor_index`.
782 fn connect_to_proxy(
783 &mut self,
784 client: &ConnectionTo<Host::Counterpart>,
785 component_index: usize,
786 trace_proxy_index: ComponentIndex,
787 trace_successor_index: ComponentIndex,
788 component: impl ConnectTo<Conductor>,
789 ) -> Result<(), Error> {
790 let connection_builder = self.connection_to_proxy(component_index);
791 let connect_component =
792 self.trace_proxy(trace_proxy_index, trace_successor_index, component);
793 let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
794 self.proxies.push(proxy_connection);
795 Ok(())
796 }
797
798 /// Create the conductor's connection to the proxy with index `component_index`.
799 ///
800 /// Outgoing messages received from the proxy are sent to `self.conductor_tx` as either
801 /// left-to-right or right-to-left messages depending on whether they are wrapped
802 /// in `SuccessorMessage`.
803 fn connection_to_proxy(
804 &mut self,
805 component_index: usize,
806 ) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
807 type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
808 let mut conductor_tx = self.conductor_tx.clone();
809 Conductor
810 .builder()
811 .name(format!("conductor-to-component({component_index})"))
812 // Intercept messages sent by the proxy.
813 .on_receive_dispatch(
814 async move |dispatch: Dispatch, _connection| {
815 MatchDispatch::new(dispatch)
816 .if_message(async |dispatch: SuccessorDispatch| {
817 // ------------------
818 // SuccessorMessages sent by the proxy go to its successor.
819 //
820 // Subtle point:
821 //
822 // `ConductorToProxy` has only a single peer, `Agent`. This means that we see
823 // "successor messages" in their "desugared form". So when we intercept an *outgoing*
824 // message that matches `SuccessorMessage`, it could be one of three things
825 //
826 // - A request being sent by the proxy to its successor (hence going left->right)
827 // - A notification being sent by the proxy to its successor (hence going left->right)
828 // - A response to a request sent to the proxy *by* its successor. Here, the *request*
829 // was going right->left, but the *response* (the message we are processing now)
830 // is going left->right.
831 //
832 // So, in all cases, we forward as a left->right message.
833
834 conductor_tx
835 .send(ConductorMessage::LeftToRight {
836 target_component_index: component_index + 1,
837 message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
838 })
839 .await
840 .map_err(agent_client_protocol::util::internal_error)
841 })
842 .await
843 .otherwise(async |dispatch| {
844 // Other messagrs send by the proxy go its predecessor.
845 // As in the previous handler:
846 //
847 // Messages here are seen in their "desugared form", so we are seeing
848 // one of three things
849 //
850 // - A request being sent by the proxy to its predecessor (hence going right->left)
851 // - A notification being sent by the proxy to its predecessor (hence going right->left)
852 // - A response to a request sent to the proxy *by* its predecessor. Here, the *request*
853 // was going left->right, but the *response* (the message we are processing now)
854 // is going right->left.
855 //
856 // So, in all cases, we forward as a right->left message.
857
858 let message = ConductorMessage::RightToLeft {
859 source_component_index: SourceComponentIndex::Proxy(
860 component_index,
861 ),
862 message: dispatch,
863 };
864 conductor_tx
865 .send(message)
866 .await
867 .map_err(agent_client_protocol::util::internal_error)
868 })
869 .await
870 },
871 agent_client_protocol::on_receive_dispatch!(),
872 )
873 }
874
875 async fn forward_message_from_client_to_proxy(
876 &mut self,
877 target_component_index: usize,
878 message: Dispatch,
879 ) -> Result<(), agent_client_protocol::Error> {
880 tracing::debug!(?message, "forward_message_to_proxy");
881
882 MatchDispatch::new(message)
883 .if_request(async |_request: InitializeProxyRequest, responder| {
884 responder.respond_with_error(
885 agent_client_protocol::Error::invalid_request()
886 .data("initialize/proxy requests are only sent by the conductor"),
887 )
888 })
889 .await
890 .if_request(async |request: InitializeRequest, responder| {
891 // The pattern for `Initialize` messages is a bit subtle.
892 // Proxy receive incoming `Initialize` messages as if they
893 // were a client. The conductor (us) intercepts these and
894 // converts them to an `InitializeProxyRequest`.
895 //
896 // The proxy will then initialize itself and forward an `Initialize`
897 // request to its successor.
898 self.proxies[target_component_index]
899 .send_request(InitializeProxyRequest::from(request))
900 .on_receiving_result(async move |result| {
901 tracing::debug!(?result, "got initialize_proxy response from proxy");
902 responder.respond_with_result(result)
903 })
904 })
905 .await
906 .otherwise(async |message| {
907 // Otherwise, just send the message along "as is".
908 self.proxies[target_component_index].send_proxied_message(message)
909 })
910 .await
911 }
912
913 /// Invoked when sending a message from the conductor to the agent that it manages.
914 /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
915 /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
916 /// running as a proxy).
917 async fn forward_message_to_agent(
918 &mut self,
919 client_connection: ConnectionTo<Host::Counterpart>,
920 message: Dispatch,
921 agent_connection: ConnectionTo<Agent>,
922 ) -> Result<(), Error> {
923 MatchDispatch::new(message)
924 .if_request(async |_request: InitializeProxyRequest, responder| {
925 responder.respond_with_error(
926 agent_client_protocol::Error::invalid_request()
927 .data("initialize/proxy requests are only sent by the conductor"),
928 )
929 })
930 .await
931 .if_request(async |mut request: NewSessionRequest, responder| {
932 // When forwarding "session/new" to the agent,
933 // we adjust MCP servers to manage "acp:" URLs.
934 for mcp_server in &mut request.mcp_servers {
935 self.bridge_listeners
936 .transform_mcp_server(
937 client_connection.clone(),
938 mcp_server,
939 &self.conductor_tx,
940 &self.mcp_bridge_mode,
941 )
942 .await?;
943 }
944
945 agent_connection
946 .send_request(request)
947 .forward_response_to(responder)
948 })
949 .await
950 .if_request(
951 async |request: McpOverAcpMessage<UntypedMessage>, responder| {
952 let McpOverAcpMessage {
953 connection_id,
954 message: mcp_request,
955 ..
956 } = request;
957 self.bridge_connections
958 .get_mut(&connection_id)
959 .ok_or_else(|| {
960 agent_client_protocol::util::internal_error(format!(
961 "unknown connection id: {connection_id}"
962 ))
963 })?
964 .send(Dispatch::Request(mcp_request, responder))
965 .await
966 },
967 )
968 .await
969 .if_notification(async |notification: McpOverAcpMessage<UntypedMessage>| {
970 let McpOverAcpMessage {
971 connection_id,
972 message: mcp_notification,
973 ..
974 } = notification;
975 self.bridge_connections
976 .get_mut(&connection_id)
977 .ok_or_else(|| {
978 agent_client_protocol::util::internal_error(format!(
979 "unknown connection id: {connection_id}"
980 ))
981 })?
982 .send(Dispatch::Notification(mcp_notification))
983 .await
984 })
985 .await
986 .otherwise(async |message| {
987 // Otherwise, just send the message along "as is".
988 agent_connection.send_proxied_message_to(Agent, message)
989 })
990 .await
991 }
992}
993
994/// Identifies a component in the conductor's chain for tracing purposes.
995///
996/// Used to track message sources and destinations through the proxy chain.
997#[derive(Debug, Clone, Copy)]
998pub enum ComponentIndex {
999 /// The client (editor) at the start of the chain.
1000 Client,
1001
1002 /// A proxy component at the given index.
1003 Proxy(usize),
1004
1005 /// The successor (agent in agent mode, outer conductor in proxy mode).
1006 Agent,
1007}
1008
1009impl ComponentIndex {
1010 /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
1011 #[must_use]
1012 pub fn predecessor_of(proxy_index: usize) -> Self {
1013 match proxy_index.checked_sub(1) {
1014 Some(p_i) => ComponentIndex::Proxy(p_i),
1015 None => ComponentIndex::Client,
1016 }
1017 }
1018
1019 /// Return the index for the predecessor of `proxy_index`, which might be `Client`.
1020 #[must_use]
1021 pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
1022 if proxy_index == num_proxies {
1023 ComponentIndex::Agent
1024 } else {
1025 ComponentIndex::Proxy(proxy_index + 1)
1026 }
1027 }
1028}
1029
1030/// Identifies the source of an agent-to-client message.
1031///
1032/// This enum handles the fact that the conductor may receive messages from two different sources:
1033/// 1. From one of its managed components (identified by index)
1034/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
1035#[derive(Debug, Clone, Copy)]
1036pub enum SourceComponentIndex {
1037 /// Message from a specific component at the given index in the managed chain.
1038 Proxy(usize),
1039
1040 /// Message from the conductor's agent or successor.
1041 Successor,
1042}
1043
1044/// Trait for lazy proxy instantiation (proxy mode).
1045///
1046/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
1047/// are proxies that forward to an outer conductor.
1048pub trait InstantiateProxies: Send {
1049 /// Instantiate proxy components based on the Initialize request.
1050 ///
1051 /// Returns proxy components typed as `DynConnectTo<Conductor>` since proxies
1052 /// communicate with the conductor.
1053 fn instantiate_proxies(
1054 self: Box<Self>,
1055 req: InitializeRequest,
1056 ) -> futures::future::BoxFuture<
1057 'static,
1058 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
1059 >;
1060}
1061
1062/// Simple implementation: provide all proxy components unconditionally.
1063///
1064/// Requires `T: ConnectTo<Conductor>`.
1065impl<T> InstantiateProxies for Vec<T>
1066where
1067 T: ConnectTo<Conductor> + 'static,
1068{
1069 fn instantiate_proxies(
1070 self: Box<Self>,
1071 req: InitializeRequest,
1072 ) -> futures::future::BoxFuture<
1073 'static,
1074 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
1075 > {
1076 Box::pin(async move {
1077 let components: Vec<DynConnectTo<Conductor>> =
1078 (*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
1079 Ok((req, components))
1080 })
1081 }
1082}
1083
1084/// Dynamic implementation: closure receives the Initialize request and returns proxies.
1085impl<F, Fut> InstantiateProxies for F
1086where
1087 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1088 Fut: std::future::Future<
1089 Output = Result<
1090 (InitializeRequest, Vec<DynConnectTo<Conductor>>),
1091 agent_client_protocol::Error,
1092 >,
1093 > + Send
1094 + 'static,
1095{
1096 fn instantiate_proxies(
1097 self: Box<Self>,
1098 req: InitializeRequest,
1099 ) -> futures::future::BoxFuture<
1100 'static,
1101 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
1102 > {
1103 Box::pin(async move { (*self)(req).await })
1104 }
1105}
1106
1107/// Trait for lazy proxy and agent instantiation (agent mode).
1108///
1109/// Used by conductors in agent mode (`ConductorToClient`) where there are
1110/// zero or more proxies followed by an agent component.
1111pub trait InstantiateProxiesAndAgent: Send {
1112 /// Instantiate proxy and agent components based on the Initialize request.
1113 ///
1114 /// Returns the (possibly modified) request, a vector of proxy components
1115 /// (typed as `DynConnectTo<Conductor>`), and the agent component
1116 /// (typed as `DynConnectTo<Client>`).
1117 fn instantiate_proxies_and_agent(
1118 self: Box<Self>,
1119 req: InitializeRequest,
1120 ) -> futures::future::BoxFuture<
1121 'static,
1122 Result<
1123 (
1124 InitializeRequest,
1125 Vec<DynConnectTo<Conductor>>,
1126 DynConnectTo<Client>,
1127 ),
1128 agent_client_protocol::Error,
1129 >,
1130 >;
1131}
1132
1133/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
1134#[derive(Debug)]
1135pub struct AgentOnly<A>(pub A);
1136
1137impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
1138 fn instantiate_proxies_and_agent(
1139 self: Box<Self>,
1140 req: InitializeRequest,
1141 ) -> futures::future::BoxFuture<
1142 'static,
1143 Result<
1144 (
1145 InitializeRequest,
1146 Vec<DynConnectTo<Conductor>>,
1147 DynConnectTo<Client>,
1148 ),
1149 agent_client_protocol::Error,
1150 >,
1151 > {
1152 Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
1153 }
1154}
1155
1156/// Builder for creating proxies and agent components.
1157///
1158/// # Example
1159/// ```ignore
1160/// ProxiesAndAgent::new(ElizaAgent::new())
1161/// .proxy(LoggingProxy::new())
1162/// .proxy(AuthProxy::new())
1163/// ```
1164#[derive(Debug)]
1165pub struct ProxiesAndAgent {
1166 proxies: Vec<DynConnectTo<Conductor>>,
1167 agent: DynConnectTo<Client>,
1168}
1169
1170impl ProxiesAndAgent {
1171 /// Create a new builder with the given agent component.
1172 pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
1173 Self {
1174 proxies: vec![],
1175 agent: DynConnectTo::new(agent),
1176 }
1177 }
1178
1179 /// Add a single proxy component.
1180 #[must_use]
1181 pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
1182 self.proxies.push(DynConnectTo::new(proxy));
1183 self
1184 }
1185
1186 /// Add multiple proxy components.
1187 #[must_use]
1188 pub fn proxies<P, I>(mut self, proxies: I) -> Self
1189 where
1190 P: ConnectTo<Conductor> + 'static,
1191 I: IntoIterator<Item = P>,
1192 {
1193 self.proxies
1194 .extend(proxies.into_iter().map(DynConnectTo::new));
1195 self
1196 }
1197}
1198
1199impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1200 fn instantiate_proxies_and_agent(
1201 self: Box<Self>,
1202 req: InitializeRequest,
1203 ) -> futures::future::BoxFuture<
1204 'static,
1205 Result<
1206 (
1207 InitializeRequest,
1208 Vec<DynConnectTo<Conductor>>,
1209 DynConnectTo<Client>,
1210 ),
1211 agent_client_protocol::Error,
1212 >,
1213 > {
1214 Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1215 }
1216}
1217
1218/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1219impl<F, Fut> InstantiateProxiesAndAgent for F
1220where
1221 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1222 Fut: std::future::Future<
1223 Output = Result<
1224 (
1225 InitializeRequest,
1226 Vec<DynConnectTo<Conductor>>,
1227 DynConnectTo<Client>,
1228 ),
1229 agent_client_protocol::Error,
1230 >,
1231 > + Send
1232 + 'static,
1233{
1234 fn instantiate_proxies_and_agent(
1235 self: Box<Self>,
1236 req: InitializeRequest,
1237 ) -> futures::future::BoxFuture<
1238 'static,
1239 Result<
1240 (
1241 InitializeRequest,
1242 Vec<DynConnectTo<Conductor>>,
1243 DynConnectTo<Client>,
1244 ),
1245 agent_client_protocol::Error,
1246 >,
1247 > {
1248 Box::pin(async move { (*self)(req).await })
1249 }
1250}
1251
1252/// Messages sent to the conductor's main event loop for routing.
1253///
1254/// These messages enable the conductor to route communication between:
1255/// - The editor and the first component
1256/// - Components and their successors in the chain
1257/// - Components and their clients (editor or predecessor)
1258///
1259/// All spawned tasks send messages via this enum through a shared channel,
1260/// allowing centralized routing logic in the `serve()` loop.
1261#[derive(Debug)]
1262pub enum ConductorMessage {
1263 /// If this message is a request or notification, then it is going "left-to-right"
1264 /// (e.g., a component making a request of its successor).
1265 ///
1266 /// If this message is a response, then it is going right-to-left
1267 /// (i.e., the successor answering a request made by its predecessor).
1268 LeftToRight {
1269 target_component_index: usize,
1270 message: Dispatch,
1271 },
1272
1273 /// If this message is a request or notification, then it is going "right-to-left"
1274 /// (e.g., a component making a request of its predecessor).
1275 ///
1276 /// If this message is a response, then it is going "left-to-right"
1277 /// (i.e., the predecessor answering a request made by its successor).
1278 RightToLeft {
1279 source_component_index: SourceComponentIndex,
1280 message: Dispatch,
1281 },
1282
1283 /// A pending MCP bridge connection request request.
1284 /// The request must be sent back over ACP to receive the connection-id.
1285 /// Once the connection-id is received, the actor must be spawned.
1286 McpConnectionReceived {
1287 /// The acp:$UUID URL identifying this bridge
1288 acp_url: String,
1289
1290 /// The actor that should be spawned once the connection-id is available.
1291 actor: McpBridgeConnectionActor,
1292
1293 /// The connection to the bridge
1294 connection: McpBridgeConnection,
1295 },
1296
1297 /// A pending MCP bridge connection request request.
1298 /// The request must be sent back over ACP to receive the connection-id.
1299 /// Once the connection-id is received, the actor must be spawned.
1300 McpConnectionEstablished {
1301 response: McpConnectResponse,
1302
1303 /// The actor that should be spawned once the connection-id is available.
1304 actor: McpBridgeConnectionActor,
1305
1306 /// The connection to the bridge
1307 connection: McpBridgeConnection,
1308 },
1309
1310 /// MCP message (request or notification) received from a bridge that needs to be routed to the final proxy.
1311 ///
1312 /// Sent when the bridge receives an MCP tool call from the agent and forwards it
1313 /// to the conductor via TCP. The conductor routes this to the appropriate proxy component.
1314 McpClientToMcpServer {
1315 connection_id: String,
1316 message: Dispatch,
1317 },
1318
1319 /// Message sent when MCP client disconnects
1320 McpConnectionDisconnected {
1321 notification: McpDisconnectNotification,
1322 },
1323}
1324
1325/// Trait implemented for the two links the conductor can use:
1326///
1327/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1328/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1329pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
1330 /// The type used to instantiate components for this link type.
1331 type Instantiator: Send;
1332
1333 /// Handle initialization: parse the init request, instantiate components, and spawn them.
1334 ///
1335 /// Takes ownership of the instantiator and returns the (possibly modified) init request
1336 /// wrapped in a Dispatch for forwarding.
1337 fn initialize(
1338 &self,
1339 message: Dispatch,
1340 connection: ConnectionTo<Self::Counterpart>,
1341 instantiator: Self::Instantiator,
1342 responder: &mut ConductorResponder<Self>,
1343 ) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
1344
1345 /// Handle an incoming message from the client or conductor, depending on `Self`
1346 fn handle_dispatch(
1347 &self,
1348 message: Dispatch,
1349 connection: ConnectionTo<Self::Counterpart>,
1350 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1351 ) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
1352}
1353
1354/// Conductor acting as an agent
1355impl ConductorHostRole for Agent {
1356 type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1357
1358 async fn initialize(
1359 &self,
1360 message: Dispatch,
1361 client_connection: ConnectionTo<Client>,
1362 instantiator: Self::Instantiator,
1363 responder: &mut ConductorResponder<Self>,
1364 ) -> Result<Dispatch, agent_client_protocol::Error> {
1365 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1366
1367 // Not yet initialized - expect an initialize request.
1368 // Error if we get anything else.
1369 let Dispatch::Request(request, init_responder) = message else {
1370 message.respond_with_error(invalid_request(), client_connection.clone())?;
1371 return Err(invalid_request());
1372 };
1373 if !InitializeRequest::matches_method(request.method()) {
1374 init_responder.respond_with_error(invalid_request())?;
1375 return Err(invalid_request());
1376 }
1377
1378 let init_request =
1379 match InitializeRequest::parse_message(request.method(), request.params()) {
1380 Ok(r) => r,
1381 Err(error) => {
1382 init_responder.respond_with_error(error)?;
1383 return Err(invalid_request());
1384 }
1385 };
1386
1387 // Instantiate proxies and agent
1388 let (modified_req, proxy_components, agent_component) = instantiator
1389 .instantiate_proxies_and_agent(init_request)
1390 .await?;
1391
1392 // Spawn the agent component
1393 debug!(?agent_component, "spawning agent");
1394
1395 let connection_to_agent = client_connection.spawn_connection(
1396 Client
1397 .builder()
1398 .name("conductor-to-agent")
1399 // Intercept agent-to-client messages from the agent.
1400 .on_receive_dispatch(
1401 {
1402 let mut conductor_tx = responder.conductor_tx.clone();
1403 async move |dispatch: Dispatch, _cx| {
1404 conductor_tx
1405 .send(ConductorMessage::RightToLeft {
1406 source_component_index: SourceComponentIndex::Successor,
1407 message: dispatch,
1408 })
1409 .await
1410 .map_err(agent_client_protocol::util::internal_error)
1411 }
1412 },
1413 agent_client_protocol::on_receive_dispatch!(),
1414 ),
1415 agent_component,
1416 )?;
1417 responder.successor = Arc::new(connection_to_agent);
1418
1419 // Spawn the proxy components
1420 responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1421
1422 Ok(Dispatch::Request(
1423 modified_req.to_untyped_message()?,
1424 init_responder,
1425 ))
1426 }
1427
1428 async fn handle_dispatch(
1429 &self,
1430 message: Dispatch,
1431 client_connection: ConnectionTo<Client>,
1432 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1433 ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1434 tracing::debug!(
1435 method = ?message.method(),
1436 "ConductorToClient::handle_dispatch"
1437 );
1438 MatchDispatchFrom::new(message, &client_connection)
1439 // Any incoming messages from the client are client-to-agent messages targeting the first component.
1440 .if_message_from(Client, async move |message: Dispatch| {
1441 tracing::debug!(
1442 method = ?message.method(),
1443 "ConductorToClient::handle_dispatch - matched Client"
1444 );
1445 ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
1446 })
1447 .await
1448 .done()
1449 }
1450}
1451
1452/// Conductor acting as a proxy
1453impl ConductorHostRole for Proxy {
1454 type Instantiator = Box<dyn InstantiateProxies>;
1455
1456 async fn initialize(
1457 &self,
1458 message: Dispatch,
1459 client_connection: ConnectionTo<Conductor>,
1460 instantiator: Self::Instantiator,
1461 responder: &mut ConductorResponder<Self>,
1462 ) -> Result<Dispatch, agent_client_protocol::Error> {
1463 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1464
1465 // Not yet initialized - expect an InitializeProxy request.
1466 // Error if we get anything else.
1467 let Dispatch::Request(request, init_responder) = message else {
1468 message.respond_with_error(invalid_request(), client_connection.clone())?;
1469 return Err(invalid_request());
1470 };
1471 if !InitializeProxyRequest::matches_method(request.method()) {
1472 init_responder.respond_with_error(invalid_request())?;
1473 return Err(invalid_request());
1474 }
1475
1476 let InitializeProxyRequest { initialize } =
1477 match InitializeProxyRequest::parse_message(request.method(), request.params()) {
1478 Ok(r) => r,
1479 Err(error) => {
1480 init_responder.respond_with_error(error)?;
1481 return Err(invalid_request());
1482 }
1483 };
1484
1485 tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1486
1487 // Instantiate proxies (no agent in proxy mode)
1488 let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1489
1490 // In proxy mode, our successor is the outer conductor (via our client connection)
1491 responder.successor = Arc::new(GrandSuccessor);
1492
1493 // Spawn the proxy components
1494 responder.spawn_proxies(client_connection.clone(), proxy_components)?;
1495
1496 Ok(Dispatch::Request(
1497 modified_req.to_untyped_message()?,
1498 init_responder,
1499 ))
1500 }
1501
1502 async fn handle_dispatch(
1503 &self,
1504 message: Dispatch,
1505 client_connection: ConnectionTo<Conductor>,
1506 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1507 ) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
1508 tracing::debug!(
1509 method = ?message.method(),
1510 ?message,
1511 "ConductorToConductor::handle_dispatch"
1512 );
1513 MatchDispatchFrom::new(message, &client_connection)
1514 .if_message_from(Agent, {
1515 // Messages from our successor arrive already unwrapped
1516 // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1517 async |message: Dispatch| {
1518 tracing::debug!(
1519 method = ?message.method(),
1520 "ConductorToConductor::handle_dispatch - matched Agent"
1521 );
1522 let mut conductor_tx = conductor_tx.clone();
1523 ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
1524 .await
1525 }
1526 })
1527 .await
1528 // Any incoming messages from the client are client-to-agent messages targeting the first component.
1529 .if_message_from(Client, async |message: Dispatch| {
1530 tracing::debug!(
1531 method = ?message.method(),
1532 "ConductorToConductor::handle_dispatch - matched Client"
1533 );
1534 let mut conductor_tx = conductor_tx.clone();
1535 ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
1536 .await
1537 })
1538 .await
1539 .done()
1540 }
1541}
1542
1543pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
1544 /// Send a message to the successor.
1545 fn send_message<'a>(
1546 &self,
1547 message: Dispatch,
1548 connection_to_conductor: ConnectionTo<Host::Counterpart>,
1549 responder: &'a mut ConductorResponder<Host>,
1550 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
1551}
1552
1553impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
1554 fn send_message<'a>(
1555 &self,
1556 #[expect(unused_variables)] message: Dispatch,
1557 #[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
1558 #[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
1559 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1560 let error = self.clone();
1561 Box::pin(std::future::ready(Err(error)))
1562 }
1563}
1564
1565/// A dummy type handling messages sent to the conductor's
1566/// successor when it is acting as a proxy.
1567struct GrandSuccessor;
1568
1569/// When the conductor is acting as an proxy, messages sent by
1570/// the last proxy go to the conductor's successor.
1571///
1572/// ```text
1573/// client --> Conductor -----------------------------> GrandSuccessor
1574/// | |
1575/// +-> Proxy[0] -> ... -> Proxy[n-1] -+
1576/// ```
1577impl ConductorSuccessor<Proxy> for GrandSuccessor {
1578 fn send_message<'a>(
1579 &self,
1580 message: Dispatch,
1581 connection: ConnectionTo<Conductor>,
1582 _responder: &'a mut ConductorResponder<Proxy>,
1583 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1584 Box::pin(async move {
1585 debug!("Proxy mode: forwarding successor message to conductor's successor");
1586 connection.send_proxied_message_to(Agent, message)
1587 })
1588 }
1589}
1590
1591/// When the conductor is acting as an agent, messages sent by
1592/// the last proxy to its successor go to the internal agent
1593/// (`self`).
1594impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
1595 fn send_message<'a>(
1596 &self,
1597 message: Dispatch,
1598 connection: ConnectionTo<Client>,
1599 responder: &'a mut ConductorResponder<Agent>,
1600 ) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
1601 let connection_to_agent = self.clone();
1602 Box::pin(async move {
1603 debug!("Proxy mode: forwarding successor message to conductor's successor");
1604 responder
1605 .forward_message_to_agent(connection, message, connection_to_agent)
1606 .await
1607 })
1608 }
1609}