sacp_conductor/
conductor.rs

1//! # Conductor: SACP Proxy Chain Orchestrator
2//!
3//! This module implements the Conductor conductor, which orchestrates a chain of
4//! proxy components that sit between an editor and an agent, transforming the
5//! Agent-Client Protocol (ACP) stream bidirectionally.
6//!
7//! ## Architecture Overview
8//!
9//! The conductor builds and manages a chain of components:
10//!
11//! ```text
12//! Editor <-ACP-> [Component 0] <-ACP-> [Component 1] <-ACP-> ... <-ACP-> Agent
13//! ```
14//!
15//! Each component receives ACP messages, can transform them, and forwards them
16//! to the next component in the chain. The conductor:
17//!
18//! 1. Spawns each component as a subprocess
19//! 2. Establishes bidirectional JSON-RPC connections with each component
20//! 3. Routes messages between editor, components, and agent
21//! 4. Distinguishes proxy vs agent components via distinct request types
22//!
23//! ## Recursive Chain Building
24//!
25//! The chain is built recursively through the `_proxy/successor/*` protocol:
26//!
27//! 1. Editor connects to Component 0 via the conductor
28//! 2. When Component 0 wants to communicate with its successor, it sends
29//!    requests/notifications with method prefix `_proxy/successor/`
30//! 3. The conductor intercepts these messages, strips the prefix, and forwards
31//!    to Component 1
32//! 4. Component 1 does the same for Component 2, and so on
33//! 5. The last component talks directly to the agent (no `_proxy/successor/` prefix)
34//!
35//! This allows each component to be written as if it's talking to a single successor,
36//! without knowing about the full chain.
37//!
38//! ## Proxy vs Agent Initialization
39//!
40//! Components discover whether they're a proxy or agent via the initialization request they receive:
41//!
42//! - **Proxy components**: Receive `InitializeProxyRequest` (`_proxy/initialize` method)
43//! - **Agent component**: Receives standard `InitializeRequest` (`initialize` method)
44//!
45//! The conductor sends `InitializeProxyRequest` to all proxy components in the chain,
46//! and `InitializeRequest` only to the final agent component. This allows proxies to
47//! know they should forward messages to a successor, while agents know they are the
48//! terminal component
49//!
50//! ## Message Routing
51//!
52//! The conductor runs an event loop processing messages from:
53//!
54//! - **Editor to first component**: Standard ACP messages
55//! - **Component to successor**: Via `_proxy/successor/*` prefix
56//! - **Component responses**: Via futures channels back to requesters
57//!
58//! The message flow ensures bidirectional communication while maintaining the
59//! abstraction that each component only knows about its immediate successor.
60//!
61//! ## Lazy Component Initialization
62//!
63//! Components are instantiated lazily when the first `initialize` request is received
64//! from the editor. This enables dynamic proxy chain construction based on client capabilities.
65//!
66//! ### Simple Usage
67//!
68//! Pass a Vec of components that implement `Component`:
69//!
70//! ```ignore
71//! let conductor = Conductor::new(
72//!     "my-conductor",
73//!     vec![proxy1, proxy2, agent],
74//!     None,
75//! );
76//! ```
77//!
78//! All components are spawned in order when the editor sends the first `initialize` request.
79//!
80//! ### Dynamic Component Selection
81//!
82//! Pass a closure to examine the `InitializeRequest` and dynamically construct the chain:
83//!
84//! ```ignore
85//! let conductor = Conductor::new(
86//!     "my-conductor",
87//!     |cx, conductor_tx, init_req| async move {
88//!         // Examine capabilities
89//!         let needs_auth = has_auth_capability(&init_req);
90//!
91//!         let mut components = Vec::new();
92//!         if needs_auth {
93//!             components.push(spawn_auth_proxy(&cx, &conductor_tx)?);
94//!         }
95//!         components.push(spawn_agent(&cx, &conductor_tx)?);
96//!
97//!         // Return (potentially modified) request and component list
98//!         Ok((init_req, components))
99//!     },
100//!     None,
101//! );
102//! ```
103//!
104//! The closure receives:
105//! - `cx: &JrConnectionCx` - Connection context for spawning components
106//! - `conductor_tx: &mpsc::Sender<ConductorMessage>` - Channel for message routing
107//! - `init_req: InitializeRequest` - The Initialize request from the editor
108//!
109//! And returns:
110//! - Modified `InitializeRequest` to forward downstream
111//! - `Vec<JrConnectionCx>` of spawned components
112
113use std::{collections::HashMap, sync::Arc};
114
115use futures::{
116    SinkExt, StreamExt,
117    channel::mpsc::{self},
118};
119use sacp::{
120    AgentPeer, BoxFuture, ClientPeer, Component, Error, HasPeer, JrMessage,
121    link::{
122        AgentToClient, ConductorToAgent, ConductorToClient, ConductorToConductor, ConductorToProxy,
123        ProxyToConductor,
124    },
125    util::MatchMessage,
126};
127use sacp::{
128    Handled,
129    schema::{
130        McpConnectRequest, McpConnectResponse, McpDisconnectNotification, McpOverAcpMessage,
131        SuccessorMessage,
132    },
133};
134use sacp::{
135    JrConnectionBuilder, JrConnectionCx, JrLink, JrNotification, JrPeer, JrRequest, JrRequestCx,
136    JrResponse, MessageCx, UntypedMessage,
137};
138use sacp::{
139    JrMessageHandler, JrResponder, JrResponsePayload,
140    schema::{InitializeProxyRequest, InitializeRequest, NewSessionRequest},
141    util::MatchMessageFrom,
142};
143use tracing::{debug, info};
144
145use crate::conductor::mcp_bridge::{
146    McpBridgeConnection, McpBridgeConnectionActor, McpBridgeListeners,
147};
148
149mod mcp_bridge;
150
151/// The conductor manages the proxy chain lifecycle and message routing.
152///
153/// It maintains connections to all components in the chain and routes messages
154/// bidirectionally between the editor, components, and agent.
155///
156pub struct Conductor<Link: ConductorLink> {
157    name: String,
158    instantiator: Link::Instantiator,
159    mcp_bridge_mode: crate::McpBridgeMode,
160    trace_writer: Option<crate::trace::TraceWriter>,
161    link: Link,
162}
163
164impl<Link: ConductorLink> Conductor<Link> {
165    pub fn new(
166        link: Link,
167        name: impl ToString,
168        instantiator: Link::Instantiator,
169        mcp_bridge_mode: crate::McpBridgeMode,
170    ) -> Self {
171        Conductor {
172            name: name.to_string(),
173            instantiator,
174            mcp_bridge_mode,
175            trace_writer: None,
176            link,
177        }
178    }
179}
180
181impl Conductor<ConductorToClient> {
182    /// Create a conductor in agent mode (the last component is an agent).
183    pub fn new_agent(
184        name: impl ToString,
185        instantiator: impl InstantiateProxiesAndAgent + 'static,
186        mcp_bridge_mode: crate::McpBridgeMode,
187    ) -> Self {
188        Conductor::new(
189            ConductorToClient,
190            name,
191            Box::new(instantiator),
192            mcp_bridge_mode,
193        )
194    }
195}
196
197impl Conductor<ConductorToConductor> {
198    /// Create a conductor in proxy mode (forwards to another conductor).
199    pub fn new_proxy(
200        name: impl ToString,
201        instantiator: impl InstantiateProxies + 'static,
202        mcp_bridge_mode: crate::McpBridgeMode,
203    ) -> Self {
204        Conductor::new(
205            ConductorToConductor,
206            name,
207            Box::new(instantiator),
208            mcp_bridge_mode,
209        )
210    }
211}
212
213impl<Link: ConductorLink> Conductor<Link> {
214    /// Enable trace logging to a custom destination.
215    ///
216    /// Use `sacp-trace-viewer` to view the trace as an interactive sequence diagram.
217    pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
218        self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
219        self
220    }
221
222    /// Enable trace logging to a file path.
223    ///
224    /// Events will be written as newline-delimited JSON (`.jsons` format).
225    /// Use `sacp-trace-viewer` to view the trace as an interactive sequence diagram.
226    pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
227        self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
228        Ok(self)
229    }
230
231    /// Enable trace logging with an existing TraceWriter.
232    pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
233        self.trace_writer = Some(writer);
234        self
235    }
236
237    pub fn into_connection_builder(
238        self,
239    ) -> JrConnectionBuilder<impl JrMessageHandler<Link = Link>, impl JrResponder<Link>> {
240        let (conductor_tx, conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
241
242        let responder = ConductorResponder {
243            conductor_rx,
244            conductor_tx: conductor_tx.clone(),
245            instantiator: Some(self.instantiator),
246            bridge_listeners: Default::default(),
247            bridge_connections: Default::default(),
248            mcp_bridge_mode: self.mcp_bridge_mode,
249            proxies: Default::default(),
250            successor: Arc::new(sacp::util::internal_error("successor not initialized")),
251            trace_writer: self.trace_writer,
252            pending_requests: Default::default(),
253            link: self.link,
254        };
255
256        JrConnectionBuilder::new_with(ConductorMessageHandler {
257            conductor_tx,
258            link: self.link,
259        })
260        .name(self.name)
261        .with_responder(responder)
262    }
263
264    /// Convenience method to run the conductor with a transport.
265    ///
266    /// This is equivalent to:
267    /// ```ignore
268    /// conductor.into_connection_builder()
269    ///     .connect_to(transport)
270    ///     .serve()
271    ///     .await
272    /// ```
273    pub async fn run(
274        self,
275        transport: impl Component<Link::ConnectsTo> + 'static,
276    ) -> Result<(), sacp::Error> {
277        self.into_connection_builder()
278            .connect_to(transport)?
279            .serve()
280            .await
281    }
282
283    async fn incoming_message_from_client(
284        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
285        message: MessageCx,
286    ) -> Result<(), sacp::Error> {
287        conductor_tx
288            .send(ConductorMessage::ClientToAgent {
289                target_component_index: 0,
290                message,
291            })
292            .await
293            .map_err(sacp::util::internal_error)
294    }
295
296    async fn incoming_message_from_agent(
297        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
298        message: MessageCx,
299    ) -> Result<(), sacp::Error> {
300        conductor_tx
301            .send(ConductorMessage::AgentToClient {
302                source_component_index: SourceComponentIndex::Successor,
303                message,
304            })
305            .await
306            .map_err(sacp::util::internal_error)
307    }
308}
309
310impl<Link: ConductorLink> Component<Link::Speaks> for Conductor<Link> {
311    async fn serve(
312        self,
313        client: impl sacp::Component<Link::ConnectsTo>,
314    ) -> Result<(), sacp::Error> {
315        self.run(client).await
316    }
317}
318
319struct ConductorMessageHandler<Link: ConductorLink> {
320    conductor_tx: mpsc::Sender<ConductorMessage>,
321    link: Link,
322}
323
324impl<Link: ConductorLink> JrMessageHandler for ConductorMessageHandler<Link> {
325    type Link = Link;
326
327    async fn handle_message(
328        &mut self,
329        message: MessageCx,
330        cx: sacp::JrConnectionCx<Link>,
331    ) -> Result<sacp::Handled<MessageCx>, sacp::Error> {
332        self.link
333            .handle_message(message, cx, &mut self.conductor_tx)
334            .await
335    }
336
337    fn describe_chain(&self) -> impl std::fmt::Debug {
338        "ConductorMessageHandler"
339    }
340}
341
342/// The conductor manages the proxy chain lifecycle and message routing.
343///
344/// It maintains connections to all components in the chain and routes messages
345/// bidirectionally between the editor, components, and agent.
346///
347pub struct ConductorResponder<Link>
348where
349    Link: ConductorLink,
350{
351    conductor_rx: mpsc::Receiver<ConductorMessage>,
352
353    conductor_tx: mpsc::Sender<ConductorMessage>,
354
355    /// Manages the TCP listeners for MCP connections that will be proxied over ACP.
356    bridge_listeners: McpBridgeListeners,
357
358    /// Manages active connections to MCP clients.
359    bridge_connections: HashMap<String, McpBridgeConnection>,
360
361    /// The instantiator for lazy initialization.
362    /// Set to None after components are instantiated.
363    instantiator: Option<Link::Instantiator>,
364
365    /// The chain of proxies before the agent (if any).
366    ///
367    /// Populated lazily when the first Initialize request is received.
368    proxies: Vec<JrConnectionCx<ConductorToProxy>>,
369
370    /// If the conductor is operating in agent mode, this will direct messages to the agent.
371    /// If the conductor is operating in proxy mode, this will direct messages to the successor.
372    /// Populated lazily when the first Initialize request is received; the initial value just returns errors.
373    successor: Arc<dyn ConductorSuccessor<Link>>,
374
375    /// Mode for the MCP bridge (determines how to spawn bridge processes).
376    mcp_bridge_mode: crate::McpBridgeMode,
377
378    /// Optional trace writer for sequence diagram visualization.
379    trace_writer: Option<crate::trace::TraceWriter>,
380
381    /// Tracks pending requests for response tracing: id -> (from, to)
382    pending_requests: HashMap<String, (String, String)>,
383
384    /// Defines what sort of link we have
385    link: Link,
386}
387
388impl<Link> JrResponder<Link> for ConductorResponder<Link>
389where
390    Link: ConductorLink,
391{
392    async fn run(mut self, cx: JrConnectionCx<Link>) -> Result<(), sacp::Error> {
393        // Components are now spawned lazily in forward_initialize_request
394        // when the first Initialize request is received.
395
396        // This is the "central actor" of the conductor. Most other things forward messages
397        // via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
398        while let Some(message) = self.conductor_rx.next().await {
399            self.handle_conductor_message(cx.clone(), message).await?;
400        }
401        Ok(())
402    }
403}
404
405impl<Link> ConductorResponder<Link>
406where
407    Link: ConductorLink,
408{
409    /// Convert a component index to a trace-friendly name.
410    fn component_name(&self, index: usize) -> String {
411        if index == self.proxies.len() {
412            "agent".to_string()
413        } else {
414            format!("proxy:{}", index)
415        }
416    }
417
418    /// Convert a source component index to a trace-friendly name.
419    fn source_component_name(&self, index: SourceComponentIndex) -> String {
420        match index {
421            SourceComponentIndex::Successor => "agent".to_string(), // In proxy mode, successor is effectively the agent
422            SourceComponentIndex::Proxy(i) => self.component_name(i),
423        }
424    }
425
426    /// Extract the protocol and idealized method/params from a message.
427    ///
428    /// For MCP-over-ACP messages, this extracts the inner MCP message.
429    /// For regular ACP messages, returns the message as-is.
430    fn extract_trace_info<R: sacp::JrRequest, N: sacp::JrNotification>(
431        message: &sacp::MessageCx<R, N>,
432    ) -> Result<(crate::trace::Protocol, String, serde_json::Value), sacp::Error> {
433        match message {
434            sacp::MessageCx::Request(request, _) => {
435                let untyped = request.to_untyped_message()?;
436
437                // Try to parse as MCP-over-ACP request
438                if let Some(Ok(mcp_req)) = <McpOverAcpMessage<UntypedMessage>>::parse_message(
439                    &untyped.method,
440                    &untyped.params,
441                ) {
442                    return Ok((
443                        crate::trace::Protocol::Mcp,
444                        mcp_req.message.method,
445                        mcp_req.message.params,
446                    ));
447                }
448
449                // Regular ACP request
450                Ok((crate::trace::Protocol::Acp, untyped.method, untyped.params))
451            }
452            sacp::MessageCx::Notification(notification) => {
453                let untyped = notification.to_untyped_message()?;
454
455                // Try to parse as MCP-over-ACP notification
456                if let Some(Ok(mcp_notif)) = <McpOverAcpMessage<UntypedMessage>>::parse_message(
457                    &untyped.method,
458                    &untyped.params,
459                ) {
460                    return Ok((
461                        crate::trace::Protocol::Mcp,
462                        mcp_notif.message.method,
463                        mcp_notif.message.params,
464                    ));
465                }
466
467                // Regular ACP notification
468                Ok((crate::trace::Protocol::Acp, untyped.method, untyped.params))
469            }
470        }
471    }
472
473    /// Trace a client-to-agent message (request or notification).
474    fn trace_client_to_agent<R: sacp::JrRequest, N: sacp::JrNotification>(
475        &mut self,
476        target_index: usize,
477        message: &sacp::MessageCx<R, N>,
478    ) -> Result<(), sacp::Error> {
479        if self.trace_writer.is_none() {
480            return Ok(());
481        }
482
483        let from = if target_index == 0 {
484            "client".to_string()
485        } else {
486            self.component_name(target_index - 1)
487        };
488        let to = self.component_name(target_index);
489
490        let (protocol, method, params) = Self::extract_trace_info(message)?;
491
492        let writer = self.trace_writer.as_mut().unwrap();
493        match message.id() {
494            Some(id) => {
495                // Track pending request for response correlation
496                let id_key = id.to_string();
497                self.pending_requests
498                    .insert(id_key, (from.clone(), to.clone()));
499                writer.request(protocol, from, to, id, &method, None, params);
500            }
501            None => {
502                writer.notification(protocol, from, to, &method, None, params);
503            }
504        }
505        Ok(())
506    }
507
508    /// Trace an agent-to-client message (request or notification).
509    fn trace_agent_to_client<R: sacp::JrRequest, N: sacp::JrNotification>(
510        &mut self,
511        source_index: SourceComponentIndex,
512        message: &sacp::MessageCx<R, N>,
513    ) -> Result<(), sacp::Error> {
514        if self.trace_writer.is_none() {
515            return Ok(());
516        }
517
518        let from = self.source_component_name(source_index);
519        let to = match source_index {
520            SourceComponentIndex::Successor => {
521                if self.proxies.is_empty() {
522                    "client".to_string()
523                } else {
524                    self.component_name(self.proxies.len() - 1)
525                }
526            }
527            SourceComponentIndex::Proxy(0) => "client".to_string(),
528            SourceComponentIndex::Proxy(i) => self.component_name(i - 1),
529        };
530
531        let (protocol, method, params) = Self::extract_trace_info(message)?;
532
533        let writer = self.trace_writer.as_mut().unwrap();
534        match message.id() {
535            Some(id) => {
536                // Track pending request for response correlation
537                let id_key = id.to_string();
538                self.pending_requests
539                    .insert(id_key, (from.clone(), to.clone()));
540                writer.request(protocol, from, to, id, &method, None, params);
541            }
542            None => {
543                writer.notification(protocol, from, to, &method, None, params);
544            }
545        }
546        Ok(())
547    }
548
549    /// Trace a response to a previous request.
550    fn trace_response(
551        &mut self,
552        request_cx: &sacp::JrRequestCx<serde_json::Value>,
553        result: &Result<serde_json::Value, sacp::Error>,
554    ) {
555        let Some(writer) = &mut self.trace_writer else {
556            return;
557        };
558
559        let id = request_cx.id();
560        let id_key = id.to_string();
561
562        // Look up the original request's from/to (response goes in reverse)
563        if let Some((original_from, original_to)) = self.pending_requests.remove(&id_key) {
564            let (is_error, payload) = match result {
565                Ok(v) => (false, v.clone()),
566                Err(e) => (true, serde_json::json!({ "error": e.to_string() })),
567            };
568            // Response goes from original_to back to original_from
569            writer.response(&original_to, &original_from, id, is_error, payload);
570        }
571    }
572
573    /// Recursively spawns components and builds the proxy chain.
574    ///
575    /// This function implements the recursive chain building pattern:
576    /// 1. Pop the next component from the `providers` list
577    /// 2. Create the component (either spawn subprocess or use mock)
578    /// 3. Set up JSON-RPC connection and message handlers
579    /// 4. Recursively call itself to spawn the next component
580    /// 5. When no components remain, start the message routing loop via `serve()`
581    ///
582    /// Central message handling logic for the conductor.
583    /// The conductor routes all [`ConductorMessage`] messages through to this function.
584    /// Each message corresponds to a request or notification from one component to another.
585    /// The conductor ferries messages from one place to another, sometimes making modifications along the way.
586    /// Note that *responses to requests* are sent *directly* without going through this loop.
587    ///
588    /// The names we use are
589    ///
590    /// * The *client* is the originator of all ACP traffic, typically an editor or GUI.
591    /// * Then there is a sequence of *components* consisting of:
592    ///     * Zero or more *proxies*, which receive messages and forward them to the next component in the chain.
593    ///     * And finally the *agent*, which is the final component in the chain and handles the actual work.
594    ///
595    /// For the most part, we just pass messages through the chain without modification, but there are a few exceptions:
596    ///
597    /// * We send `InitializeProxyRequest` to proxy components and `InitializeRequest` to the agent component.
598    /// * We modify "session/new" requests that use `acp:...` as the URL for an MCP server to redirect
599    ///   through a stdio server that runs on localhost and bridges messages.
600    async fn handle_conductor_message(
601        &mut self,
602        client: JrConnectionCx<Link>,
603        message: ConductorMessage,
604    ) -> Result<(), sacp::Error> {
605        tracing::debug!(?message, "handle_conductor_message");
606
607        match message {
608            ConductorMessage::ClientToAgent {
609                target_component_index,
610                message,
611            } => {
612                // Tracing happens inside forward_client_to_agent_message, after initialization,
613                // so that component_name() has access to the populated proxies list.
614                self.forward_client_to_agent_message(target_component_index, message, client)
615                    .await
616            }
617
618            ConductorMessage::AgentToClient {
619                source_component_index,
620                message,
621            } => {
622                tracing::debug!(
623                    ?source_component_index,
624                    message_method = ?message.message().method(),
625                    "Conductor: AgentToClient received"
626                );
627                if let Err(e) = self.trace_agent_to_client(source_component_index, &message) {
628                    tracing::warn!("Failed to trace agent-to-client message: {e}");
629                }
630                self.send_message_to_predecessor_of(client, source_component_index, message)
631            }
632
633            // New MCP connection request. Send it back along the chain to get a connection id.
634            // When the connection id arrives, send a message back into this conductor loop with
635            // the connection id and the (as yet unspawned) actor.
636            ConductorMessage::McpConnectionReceived {
637                acp_url,
638                connection,
639                actor,
640            } => {
641                // MCP connection requests always come from the agent
642                // (we must be in agent mode, in fact), so send the MCP request
643                // to the final proxy.
644                self.send_request_to_predecessor_of(
645                    client,
646                    self.proxies.len(),
647                    McpConnectRequest {
648                        acp_url,
649                        meta: None,
650                    },
651                )
652                .on_receiving_result({
653                    let mut conductor_tx = self.conductor_tx.clone();
654                    async move |result| {
655                        match result {
656                            Ok(response) => conductor_tx
657                                .send(ConductorMessage::McpConnectionEstablished {
658                                    response,
659                                    actor,
660                                    connection,
661                                })
662                                .await
663                                .map_err(|_| sacp::Error::internal_error()),
664                            Err(_) => {
665                                // Error occurred, just drop the connection.
666                                Ok(())
667                            }
668                        }
669                    }
670                })
671            }
672
673            // MCP connection successfully established. Spawn the actor
674            // and insert the connection into our map fot future reference.
675            ConductorMessage::McpConnectionEstablished {
676                response: McpConnectResponse { connection_id, .. },
677                actor,
678                connection,
679            } => {
680                self.bridge_connections
681                    .insert(connection_id.clone(), connection);
682                client.spawn(actor.run(connection_id))
683            }
684
685            // Message meant for the MCP client received. Forward it to the appropriate actor's mailbox.
686            ConductorMessage::McpClientToMcpServer {
687                connection_id,
688                message,
689            } => {
690                let wrapped = message.map(
691                    |request, request_cx| {
692                        (
693                            McpOverAcpMessage {
694                                connection_id: connection_id.clone(),
695                                message: request,
696                                meta: None,
697                            },
698                            request_cx,
699                        )
700                    },
701                    |notification| McpOverAcpMessage {
702                        connection_id: connection_id.clone(),
703                        message: notification,
704                        meta: None,
705                    },
706                );
707
708                // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent,
709                // so send them to the final proxy.
710                self.trace_agent_to_client(SourceComponentIndex::Successor, &wrapped)?;
711                self.send_message_to_predecessor_of(
712                    client,
713                    SourceComponentIndex::Successor,
714                    wrapped,
715                )
716            }
717
718            // MCP client disconnected. Remove it from our map and send the
719            // notification backwards along the chain.
720            ConductorMessage::McpConnectionDisconnected { notification } => {
721                // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent.
722
723                self.bridge_connections.remove(&notification.connection_id);
724                self.send_notification_to_predecessor_of(client, self.proxies.len(), notification)
725            }
726
727            // Forward a response back to the original request context.
728            // This ensures responses are processed in order with notifications by
729            // going through the central conductor queue.
730            ConductorMessage::ForwardResponse { request_cx, result } => {
731                self.trace_response(&request_cx, &result);
732                request_cx.respond_with_result(result)
733            }
734        }
735    }
736
737    /// Send a message (request or notification) to the predecessor of the given component.
738    ///
739    /// This is a bit subtle because the relationship of the conductor
740    /// is different depending on who will be receiving the message:
741    /// * If the message is going to the conductor's client, then no changes
742    ///   are needed, as the conductor is sending an agent-to-client message and
743    ///   the conductor is acting as the agent.
744    /// * If the message is going to a proxy component, then we have to wrap
745    ///   it in a "from successor" wrapper, because the conductor is the
746    ///   proxy's client.
747    fn send_message_to_predecessor_of<Req: JrRequest, N: JrNotification>(
748        &mut self,
749        client: JrConnectionCx<Link>,
750        source_component_index: SourceComponentIndex,
751        message: MessageCx<Req, N>,
752    ) -> Result<(), sacp::Error>
753    where
754        Req::Response: Send,
755    {
756        let source_component_index = match source_component_index {
757            SourceComponentIndex::Successor => self.proxies.len(),
758            SourceComponentIndex::Proxy(index) => index,
759        };
760
761        match message {
762            MessageCx::Request(request, request_cx) => self
763                .send_request_to_predecessor_of(client, source_component_index, request)
764                .forward_response_via(&self.conductor_tx, request_cx),
765            MessageCx::Notification(notification) => self.send_notification_to_predecessor_of(
766                client,
767                source_component_index,
768                notification,
769            ),
770        }
771    }
772
773    fn send_request_to_predecessor_of<Req: JrRequest>(
774        &mut self,
775        client: JrConnectionCx<Link>,
776        source_component_index: usize,
777        request: Req,
778    ) -> JrResponse<Req::Response> {
779        if source_component_index == 0 {
780            client.send_request_to(ClientPeer, request)
781        } else {
782            self.proxies[source_component_index - 1].send_request(SuccessorMessage {
783                message: request,
784                meta: None,
785            })
786        }
787    }
788
789    /// Send a notification to the predecessor of the given component.
790    ///
791    /// This is a bit subtle because the relationship of the conductor
792    /// is different depending on who will be receiving the message:
793    /// * If the notification is going to the conductor's client, then no changes
794    ///   are needed, as the conductor is sending an agent-to-client message and
795    ///   the conductor is acting as the agent.
796    /// * If the notification is going to a proxy component, then we have to wrap
797    ///   it in a "from successor" wrapper, because the conductor is the
798    ///   proxy's client.
799    fn send_notification_to_predecessor_of<N: JrNotification>(
800        &mut self,
801        client: JrConnectionCx<Link>,
802        source_component_index: usize,
803        notification: N,
804    ) -> Result<(), sacp::Error> {
805        tracing::debug!(
806            source_component_index,
807            proxies_len = self.proxies.len(),
808            "send_notification_to_predecessor_of"
809        );
810        if source_component_index == 0 {
811            tracing::debug!("Sending notification directly to client");
812            client.send_notification_to(ClientPeer, notification)
813        } else {
814            tracing::debug!(
815                target_proxy = source_component_index - 1,
816                "Sending notification wrapped as SuccessorMessage to proxy"
817            );
818            self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
819                message: notification,
820                meta: None,
821            })
822        }
823    }
824
825    /// Send a message (request or notification) from 'left to right'.
826    /// Left-to-right means from the client or an intermediate proxy to the component
827    /// at `target_component_index` (could be a proxy or the agent).
828    /// Makes changes to select messages along the way (e.g., `initialize` and `session/new`).
829    async fn forward_client_to_agent_message(
830        &mut self,
831        target_component_index: usize,
832        message: MessageCx,
833        conductor_cx: JrConnectionCx<Link>,
834    ) -> Result<(), sacp::Error> {
835        tracing::trace!(
836            target_component_index,
837            ?message,
838            "forward_client_to_agent_message"
839        );
840
841        // Ensure components are initialized before processing any message.
842        let message = self
843            .ensure_initialized(conductor_cx.clone(), message)
844            .await?;
845
846        // Trace after initialization so component_name() has access to the populated proxies list.
847        if let Err(e) = self.trace_client_to_agent(target_component_index, &message) {
848            tracing::warn!("Failed to trace client-to-agent message: {e}");
849        }
850
851        // In proxy mode, if the target is beyond our component chain,
852        // forward to the conductor's own successor (via client connection)
853        if target_component_index < self.proxies.len() {
854            self.forward_message_to_proxy(target_component_index, message)
855                .await
856        } else {
857            assert_eq!(target_component_index, self.proxies.len());
858
859            debug!(
860                target_component_index,
861                proxies_count = self.proxies.len(),
862                "Proxy mode: forwarding successor message to conductor's successor"
863            );
864            let successor = self.successor.clone();
865            successor.send_message(message, conductor_cx, self).await
866        }
867    }
868
869    /// Ensures components are initialized before processing messages.
870    ///
871    /// If components haven't been initialized yet, this expects the first message
872    /// to be an `initialize` request and uses it to spawn the component chain.
873    ///
874    /// Returns:
875    /// - `Ok(Some(message))` - Components are initialized, continue processing this message
876    /// - `Ok(None)` - An error response was sent, caller should return early
877    /// - `Err(_)` - A fatal error occurred
878    async fn ensure_initialized(
879        &mut self,
880        client: JrConnectionCx<Link>,
881        message: MessageCx,
882    ) -> Result<MessageCx, Error> {
883        // Already initialized - pass through
884        let Some(instantiator) = self.instantiator.take() else {
885            return Ok(message);
886        };
887
888        let message = self
889            .link
890            .initialize(message, client, instantiator, self)
891            .await?;
892        Ok(message)
893    }
894
895    /// Spawn proxy components and add them to the proxies list.
896    fn spawn_proxies(
897        &mut self,
898        cx: JrConnectionCx<Link>,
899        proxy_components: Vec<sacp::DynComponent<ProxyToConductor>>,
900    ) -> Result<(), sacp::Error> {
901        assert!(self.proxies.is_empty());
902
903        info!(proxy_count = proxy_components.len(), "spawn_proxies");
904
905        // Spawn each proxy component
906        for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
907            debug!(component_index, "spawning proxy");
908
909            let proxy_cx = cx.spawn_connection(
910                ConductorToProxy::builder()
911                    .name(format!("conductor-to-component({})", component_index))
912                    // Intercept messages sent by a proxy component to its successor.
913                    .on_receive_message(
914                        {
915                            let mut conductor_tx = self.conductor_tx.clone();
916                            async move |message_cx: MessageCx<
917                                SuccessorMessage,
918                                SuccessorMessage,
919                            >,
920                                        _cx| {
921                                conductor_tx
922                                    .send(ConductorMessage::ClientToAgent {
923                                        target_component_index: component_index + 1,
924                                        message: message_cx
925                                            .map(|r, cx| (r.message, cx), |n| n.message),
926                                    })
927                                    .await
928                                    .map_err(sacp::util::internal_error)
929                            }
930                        },
931                        sacp::on_receive_message!(),
932                    )
933                    // Intercept agent-to-client messages from the proxy.
934                    .on_receive_message(
935                        {
936                            let mut conductor_tx = self.conductor_tx.clone();
937                            async move |message_cx: MessageCx<UntypedMessage, UntypedMessage>,
938                                        _cx| {
939                                conductor_tx
940                                    .send(ConductorMessage::AgentToClient {
941                                        source_component_index: SourceComponentIndex::Proxy(
942                                            component_index,
943                                        ),
944                                        message: message_cx,
945                                    })
946                                    .await
947                                    .map_err(sacp::util::internal_error)
948                            }
949                        },
950                        sacp::on_receive_message!(),
951                    )
952                    .connect_to(dyn_component)?,
953                |c| Box::pin(c.serve()),
954            )?;
955            self.proxies.push(proxy_cx);
956        }
957
958        info!(proxy_count = self.proxies.len(), "Proxies spawned");
959
960        Ok(())
961    }
962
963    async fn forward_message_to_proxy(
964        &mut self,
965        target_component_index: usize,
966        message: MessageCx,
967    ) -> Result<(), sacp::Error> {
968        tracing::debug!(?message, "forward_message_to_proxy");
969
970        MatchMessage::new(message)
971            .if_request(async |_request: InitializeProxyRequest, request_cx| {
972                request_cx.respond_with_error(
973                    sacp::Error::invalid_request()
974                        .data("initialize/proxy requests are only sent by the conductor"),
975                )
976            })
977            .await
978            .if_request(async |request: InitializeRequest, request_cx| {
979                // The pattern for `Initialize` messages is a bit subtle.
980                // Proxy receive incoming `Initialize` messages as if they
981                // were a client. The conductor (us) intercepts these and
982                // converts them to an `InitializeProxyRequest`.
983                //
984                // The proxy will then initialize itself and forward an `Initialize`
985                // request to its successor.
986                self.proxies[target_component_index]
987                    .send_request(InitializeProxyRequest::from(request))
988                    .on_receiving_result({
989                        let conductor_tx = self.conductor_tx.clone();
990                        async move |result| {
991                            tracing::debug!(?result, "got initialize_proxy response from proxy");
992                            request_cx
993                                .respond_with_result_via(&conductor_tx, result)
994                                .await
995                        }
996                    })
997            })
998            .await
999            .otherwise(async |message| {
1000                // Otherwise, just send the message along "as is".
1001                self.proxies[target_component_index].send_proxied_message_to_via(
1002                    AgentPeer,
1003                    &self.conductor_tx,
1004                    message,
1005                )
1006            })
1007            .await
1008    }
1009
1010    /// Invoked when sending a message from the conductor to the agent that it manages.
1011    /// This is called by `self.successor`'s [`ConductorSuccessor::send_message`]
1012    /// method when `Link = ConductorToClient` (i.e., the conductor is not itself
1013    /// running as a proxy).
1014    async fn forward_message_to_agent(
1015        &mut self,
1016        conductor_cx: JrConnectionCx<ConductorToClient>,
1017        message: MessageCx,
1018        agent_cx: JrConnectionCx<ConductorToAgent>,
1019    ) -> Result<(), Error> {
1020        MatchMessage::new(message)
1021            .if_request(async |_request: InitializeProxyRequest, request_cx| {
1022                request_cx.respond_with_error(
1023                    sacp::Error::invalid_request()
1024                        .data("initialize/proxy requests are only sent by the conductor"),
1025                )
1026            })
1027            .await
1028            .if_request(async |mut request: NewSessionRequest, request_cx| {
1029                // When forwarding "session/new" to the agent,
1030                // we adjust MCP servers to manage "acp:" URLs.
1031                for mcp_server in &mut request.mcp_servers {
1032                    self.bridge_listeners
1033                        .transform_mcp_server(
1034                            conductor_cx.clone(),
1035                            mcp_server,
1036                            &self.conductor_tx,
1037                            &self.mcp_bridge_mode,
1038                        )
1039                        .await?;
1040                }
1041
1042                agent_cx
1043                    .send_request(request)
1044                    .forward_response_via(&self.conductor_tx, request_cx)
1045            })
1046            .await
1047            .if_request(
1048                async |request: McpOverAcpMessage<UntypedMessage>, request_cx| {
1049                    let McpOverAcpMessage {
1050                        connection_id,
1051                        message: mcp_request,
1052                        ..
1053                    } = request;
1054                    self.bridge_connections
1055                        .get_mut(&connection_id)
1056                        .ok_or_else(|| {
1057                            sacp::util::internal_error(format!(
1058                                "unknown connection id: {}",
1059                                connection_id
1060                            ))
1061                        })?
1062                        .send(MessageCx::Request(mcp_request, request_cx))
1063                        .await
1064                },
1065            )
1066            .await
1067            .if_notification(async |notification: McpOverAcpMessage<UntypedMessage>| {
1068                let McpOverAcpMessage {
1069                    connection_id,
1070                    message: mcp_notification,
1071                    ..
1072                } = notification;
1073                self.bridge_connections
1074                    .get_mut(&connection_id)
1075                    .ok_or_else(|| {
1076                        sacp::util::internal_error(format!(
1077                            "unknown connection id: {}",
1078                            connection_id
1079                        ))
1080                    })?
1081                    .send(MessageCx::Notification(mcp_notification))
1082                    .await
1083            })
1084            .await
1085            .otherwise(async |message| {
1086                // Otherwise, just send the message along "as is".
1087                agent_cx.send_proxied_message_to_via(AgentPeer, &self.conductor_tx, message)
1088            })
1089            .await
1090    }
1091}
1092
1093/// Identifies the source of an agent-to-client message.
1094///
1095/// This enum handles the fact that the conductor may receive messages from two different sources:
1096/// 1. From one of its managed components (identified by index)
1097/// 2. From the conductor's own successor in a larger proxy chain (when in proxy mode)
1098#[derive(Debug, Clone, Copy)]
1099pub enum SourceComponentIndex {
1100    /// Message from the conductor's agent or successor.
1101    Successor,
1102
1103    /// Message from a specific component at the given index in the managed chain.
1104    Proxy(usize),
1105}
1106
1107/// Trait for lazy proxy instantiation (proxy mode).
1108///
1109/// Used by conductors in proxy mode (`ConductorToConductor`) where all components
1110/// are proxies that forward to an outer conductor.
1111pub trait InstantiateProxies: Send {
1112    /// Instantiate proxy components based on the Initialize request.
1113    ///
1114    /// Returns proxy components typed as `DynComponent<ProxyToConductor>` since proxies
1115    /// communicate with the conductor.
1116    fn instantiate_proxies(
1117        self: Box<Self>,
1118        req: InitializeRequest,
1119    ) -> futures::future::BoxFuture<
1120        'static,
1121        Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
1122    >;
1123}
1124
1125/// Simple implementation: provide all proxy components unconditionally.
1126///
1127/// Requires `T: Component<ProxyToConductor>`.
1128impl<T> InstantiateProxies for Vec<T>
1129where
1130    T: Component<ProxyToConductor> + 'static,
1131{
1132    fn instantiate_proxies(
1133        self: Box<Self>,
1134        req: InitializeRequest,
1135    ) -> futures::future::BoxFuture<
1136        'static,
1137        Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
1138    > {
1139        Box::pin(async move {
1140            let components: Vec<sacp::DynComponent<ProxyToConductor>> = (*self)
1141                .into_iter()
1142                .map(|c| sacp::DynComponent::new(c))
1143                .collect();
1144            Ok((req, components))
1145        })
1146    }
1147}
1148
1149/// Dynamic implementation: closure receives the Initialize request and returns proxies.
1150impl<F, Fut> InstantiateProxies for F
1151where
1152    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1153    Fut: std::future::Future<
1154            Output = Result<
1155                (InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>),
1156                sacp::Error,
1157            >,
1158        > + Send
1159        + 'static,
1160{
1161    fn instantiate_proxies(
1162        self: Box<Self>,
1163        req: InitializeRequest,
1164    ) -> futures::future::BoxFuture<
1165        'static,
1166        Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
1167    > {
1168        Box::pin(async move { (*self)(req).await })
1169    }
1170}
1171
1172/// Trait for lazy proxy and agent instantiation (agent mode).
1173///
1174/// Used by conductors in agent mode (`ConductorToClient`) where there are
1175/// zero or more proxies followed by an agent component.
1176pub trait InstantiateProxiesAndAgent: Send {
1177    /// Instantiate proxy and agent components based on the Initialize request.
1178    ///
1179    /// Returns the (possibly modified) request, a vector of proxy components
1180    /// (typed as `DynComponent<ProxyToConductor>`), and the agent component
1181    /// (typed as `DynComponent<AgentToClient>`).
1182    fn instantiate_proxies_and_agent(
1183        self: Box<Self>,
1184        req: InitializeRequest,
1185    ) -> futures::future::BoxFuture<
1186        'static,
1187        Result<
1188            (
1189                InitializeRequest,
1190                Vec<sacp::DynComponent<ProxyToConductor>>,
1191                sacp::DynComponent<AgentToClient>,
1192            ),
1193            sacp::Error,
1194        >,
1195    >;
1196}
1197
1198/// Wrapper to convert a single agent component (no proxies) into InstantiateProxiesAndAgent.
1199pub struct AgentOnly<A>(pub A);
1200
1201impl<A: Component<AgentToClient> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
1202    fn instantiate_proxies_and_agent(
1203        self: Box<Self>,
1204        req: InitializeRequest,
1205    ) -> futures::future::BoxFuture<
1206        'static,
1207        Result<
1208            (
1209                InitializeRequest,
1210                Vec<sacp::DynComponent<ProxyToConductor>>,
1211                sacp::DynComponent<AgentToClient>,
1212            ),
1213            sacp::Error,
1214        >,
1215    > {
1216        Box::pin(async move { Ok((req, Vec::new(), sacp::DynComponent::new(self.0))) })
1217    }
1218}
1219
1220/// Builder for creating proxies and agent components.
1221///
1222/// # Example
1223/// ```ignore
1224/// ProxiesAndAgent::new(ElizaAgent::new())
1225///     .proxy(LoggingProxy::new())
1226///     .proxy(AuthProxy::new())
1227/// ```
1228pub struct ProxiesAndAgent {
1229    proxies: Vec<sacp::DynComponent<ProxyToConductor>>,
1230    agent: sacp::DynComponent<AgentToClient>,
1231}
1232
1233impl ProxiesAndAgent {
1234    /// Create a new builder with the given agent component.
1235    pub fn new(agent: impl Component<AgentToClient> + 'static) -> Self {
1236        Self {
1237            proxies: vec![],
1238            agent: sacp::DynComponent::new(agent),
1239        }
1240    }
1241
1242    /// Add a single proxy component.
1243    pub fn proxy(mut self, proxy: impl Component<ProxyToConductor> + 'static) -> Self {
1244        self.proxies.push(sacp::DynComponent::new(proxy));
1245        self
1246    }
1247
1248    /// Add multiple proxy components.
1249    pub fn proxies<P, I>(mut self, proxies: I) -> Self
1250    where
1251        P: Component<ProxyToConductor> + 'static,
1252        I: IntoIterator<Item = P>,
1253    {
1254        self.proxies
1255            .extend(proxies.into_iter().map(sacp::DynComponent::new));
1256        self
1257    }
1258}
1259
1260impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1261    fn instantiate_proxies_and_agent(
1262        self: Box<Self>,
1263        req: InitializeRequest,
1264    ) -> futures::future::BoxFuture<
1265        'static,
1266        Result<
1267            (
1268                InitializeRequest,
1269                Vec<sacp::DynComponent<ProxyToConductor>>,
1270                sacp::DynComponent<AgentToClient>,
1271            ),
1272            sacp::Error,
1273        >,
1274    > {
1275        Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1276    }
1277}
1278
1279/// Dynamic implementation: closure receives the Initialize request and returns proxies + agent.
1280impl<F, Fut> InstantiateProxiesAndAgent for F
1281where
1282    F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1283    Fut: std::future::Future<
1284            Output = Result<
1285                (
1286                    InitializeRequest,
1287                    Vec<sacp::DynComponent<ProxyToConductor>>,
1288                    sacp::DynComponent<AgentToClient>,
1289                ),
1290                sacp::Error,
1291            >,
1292        > + Send
1293        + 'static,
1294{
1295    fn instantiate_proxies_and_agent(
1296        self: Box<Self>,
1297        req: InitializeRequest,
1298    ) -> futures::future::BoxFuture<
1299        'static,
1300        Result<
1301            (
1302                InitializeRequest,
1303                Vec<sacp::DynComponent<ProxyToConductor>>,
1304                sacp::DynComponent<AgentToClient>,
1305            ),
1306            sacp::Error,
1307        >,
1308    > {
1309        Box::pin(async move { (*self)(req).await })
1310    }
1311}
1312
1313/// Messages sent to the conductor's main event loop for routing.
1314///
1315/// These messages enable the conductor to route communication between:
1316/// - The editor and the first component
1317/// - Components and their successors in the chain
1318/// - Components and their clients (editor or predecessor)
1319///
1320/// All spawned tasks send messages via this enum through a shared channel,
1321/// allowing centralized routing logic in the `serve()` loop.
1322#[derive(Debug)]
1323pub enum ConductorMessage {
1324    /// A message (request or notification) targeting a component from its client.
1325    /// This message will be forwarded "as is" to the component.
1326    ClientToAgent {
1327        target_component_index: usize,
1328        message: MessageCx,
1329    },
1330
1331    /// A message (request or notification) sent by a component to its client.
1332    /// This message will be forwarded "as is" to its client.
1333    AgentToClient {
1334        source_component_index: SourceComponentIndex,
1335        message: MessageCx,
1336    },
1337
1338    /// A pending MCP bridge connection request request.
1339    /// The request must be sent back over ACP to receive the connection-id.
1340    /// Once the connection-id is received, the actor must be spawned.
1341    McpConnectionReceived {
1342        /// The acp:$UUID URL identifying this bridge
1343        acp_url: String,
1344
1345        /// The actor that should be spawned once the connection-id is available.
1346        actor: McpBridgeConnectionActor,
1347
1348        /// The connection to the bridge
1349        connection: McpBridgeConnection,
1350    },
1351
1352    /// A pending MCP bridge connection request request.
1353    /// The request must be sent back over ACP to receive the connection-id.
1354    /// Once the connection-id is received, the actor must be spawned.
1355    McpConnectionEstablished {
1356        response: McpConnectResponse,
1357
1358        /// The actor that should be spawned once the connection-id is available.
1359        actor: McpBridgeConnectionActor,
1360
1361        /// The connection to the bridge
1362        connection: McpBridgeConnection,
1363    },
1364
1365    /// MCP message (request or notification) received from a bridge that needs to be routed to the final proxy.
1366    ///
1367    /// Sent when the bridge receives an MCP tool call from the agent and forwards it
1368    /// to the conductor via TCP. The conductor routes this to the appropriate proxy component.
1369    McpClientToMcpServer {
1370        connection_id: String,
1371        message: MessageCx,
1372    },
1373
1374    /// Message sent when MCP client disconnects
1375    McpConnectionDisconnected {
1376        notification: McpDisconnectNotification,
1377    },
1378
1379    /// Forward a response back to a request context.
1380    ///
1381    /// This variant avoids a subtle race condition by preserving the
1382    /// order of responses vis-a-vis notifications and requests. Whenever a new message
1383    /// from a component arrives, whether it's a new request or a notification, we route
1384    /// it through the conductor's central message queue.
1385    ///
1386    /// The invariant we must ensure in particular is that any requests or notifications
1387    /// that arrive BEFORE the response will be processed first.
1388    ForwardResponse {
1389        request_cx: JrRequestCx<serde_json::Value>,
1390        result: Result<serde_json::Value, sacp::Error>,
1391    },
1392}
1393
1394trait JrConnectionCxExt<Link: JrLink> {
1395    fn send_proxied_message_to_via<Peer: JrPeer>(
1396        &self,
1397        peer: Peer,
1398        conductor_tx: &mpsc::Sender<ConductorMessage>,
1399        message: MessageCx,
1400    ) -> Result<(), sacp::Error>
1401    where
1402        Link: sacp::HasPeer<Peer>;
1403}
1404
1405impl<Link: JrLink> JrConnectionCxExt<Link> for JrConnectionCx<Link> {
1406    fn send_proxied_message_to_via<Peer: JrPeer>(
1407        &self,
1408        peer: Peer,
1409        conductor_tx: &mpsc::Sender<ConductorMessage>,
1410        message: MessageCx,
1411    ) -> Result<(), sacp::Error>
1412    where
1413        Link: sacp::HasPeer<Peer>,
1414    {
1415        match message {
1416            MessageCx::Request(request, request_cx) => self
1417                .send_request_to(peer, request)
1418                .forward_response_via(conductor_tx, request_cx),
1419            MessageCx::Notification(notification) => self.send_notification_to(peer, notification),
1420        }
1421    }
1422}
1423
1424trait JrRequestCxExt<T: JrResponsePayload> {
1425    async fn respond_with_result_via(
1426        self,
1427        conductor_tx: &mpsc::Sender<ConductorMessage>,
1428        result: Result<T, sacp::Error>,
1429    ) -> Result<(), sacp::Error>;
1430}
1431
1432impl<T: JrResponsePayload> JrRequestCxExt<T> for JrRequestCx<T> {
1433    async fn respond_with_result_via(
1434        self,
1435        conductor_tx: &mpsc::Sender<ConductorMessage>,
1436        result: Result<T, sacp::Error>,
1437    ) -> Result<(), sacp::Error> {
1438        let result = result.and_then(|response| response.into_json(self.method()));
1439        conductor_tx
1440            .clone()
1441            .send(ConductorMessage::ForwardResponse {
1442                request_cx: self.erase_to_json(),
1443                result,
1444            })
1445            .await
1446            .map_err(|e| sacp::util::internal_error(format!("Failed to send response: {}", e)))
1447    }
1448}
1449
1450pub trait JrResponseExt<T: JrResponsePayload> {
1451    fn forward_response_via(
1452        self,
1453        conductor_tx: &mpsc::Sender<ConductorMessage>,
1454        request_cx: JrRequestCx<T>,
1455    ) -> Result<(), sacp::Error>;
1456}
1457
1458impl<T: JrResponsePayload> JrResponseExt<T> for JrResponse<T> {
1459    fn forward_response_via(
1460        self,
1461        conductor_tx: &mpsc::Sender<ConductorMessage>,
1462        request_cx: JrRequestCx<T>,
1463    ) -> Result<(), sacp::Error> {
1464        let conductor_tx = conductor_tx.clone();
1465        self.on_receiving_result(async move |result| {
1466            request_cx
1467                .respond_with_result_via(&conductor_tx, result)
1468                .await
1469        })
1470    }
1471}
1472
1473/// Trait implemented for the two links the conductor can use:
1474///
1475/// * ConductorToClient -- conductor is acting as an agent, so when its last proxy sends to its successor, the conductor sends that message to its agent component
1476/// * ConductorToConductor -- conductor is acting as a proxy, so when its last proxy sends to its successor, the (inner) conductor sends that message to its successor, via the outer conductor
1477pub trait ConductorLink: JrLink + HasPeer<ClientPeer> {
1478    type Speaks: JrLink<ConnectsTo = Self::ConnectsTo>;
1479
1480    /// The type used to instantiate components for this link type.
1481    type Instantiator: Send;
1482
1483    /// Handle initialization: parse the init request, instantiate components, and spawn them.
1484    ///
1485    /// Takes ownership of the instantiator and returns the (possibly modified) init request
1486    /// wrapped in a MessageCx for forwarding.
1487    fn initialize(
1488        self,
1489        message: MessageCx,
1490        cx: JrConnectionCx<Self>,
1491        instantiator: Self::Instantiator,
1492        responder: &mut ConductorResponder<Self>,
1493    ) -> impl Future<Output = Result<MessageCx, sacp::Error>> + Send;
1494
1495    /// Handle an incoming message from the client or conductor, depending on `Self`
1496    fn handle_message(
1497        self,
1498        message: MessageCx,
1499        cx: JrConnectionCx<Self>,
1500        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1501    ) -> impl Future<Output = Result<Handled<MessageCx>, sacp::Error>> + Send;
1502}
1503
1504impl ConductorLink for ConductorToClient {
1505    /// In this mode, the conductor acts as an agent talking to a client.
1506    type Speaks = AgentToClient;
1507
1508    type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1509
1510    async fn initialize(
1511        self,
1512        message: MessageCx,
1513        client: JrConnectionCx<Self>,
1514        instantiator: Self::Instantiator,
1515        responder: &mut ConductorResponder<Self>,
1516    ) -> Result<MessageCx, sacp::Error> {
1517        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1518
1519        // Not yet initialized - expect an initialize request.
1520        // Error if we get anything else.
1521        let MessageCx::Request(request, request_cx) = message else {
1522            message.respond_with_error(invalid_request(), client.clone())?;
1523            return Err(invalid_request());
1524        };
1525        let Some(result) = InitializeRequest::parse_message(request.method(), request.params())
1526        else {
1527            request_cx.respond_with_error(invalid_request())?;
1528            return Err(invalid_request());
1529        };
1530
1531        let init_request = match result {
1532            Ok(r) => r,
1533            Err(error) => {
1534                request_cx.respond_with_error(error)?;
1535                return Err(invalid_request());
1536            }
1537        };
1538
1539        // Instantiate proxies and agent
1540        let (modified_req, proxy_components, agent_component) = instantiator
1541            .instantiate_proxies_and_agent(init_request)
1542            .await?;
1543
1544        // Spawn the agent component
1545        debug!(?agent_component, "spawning agent");
1546        let agent_cx = client.spawn_connection(
1547            ConductorToAgent::builder()
1548                .name("conductor-to-agent")
1549                // Intercept agent-to-client messages from the agent.
1550                .on_receive_message(
1551                    {
1552                        let mut conductor_tx = responder.conductor_tx.clone();
1553                        async move |message_cx: MessageCx, _cx| {
1554                            conductor_tx
1555                                .send(ConductorMessage::AgentToClient {
1556                                    source_component_index: SourceComponentIndex::Successor,
1557                                    message: message_cx,
1558                                })
1559                                .await
1560                                .map_err(sacp::util::internal_error)
1561                        }
1562                    },
1563                    sacp::on_receive_message!(),
1564                )
1565                .connect_to(agent_component)?,
1566            |c| Box::pin(c.serve()),
1567        )?;
1568        responder.successor = Arc::new(agent_cx);
1569
1570        // Spawn the proxy components
1571        responder.spawn_proxies(client.clone(), proxy_components)?;
1572
1573        Ok(MessageCx::Request(
1574            modified_req.to_untyped_message()?,
1575            request_cx,
1576        ))
1577    }
1578
1579    async fn handle_message(
1580        self,
1581        message: MessageCx,
1582        cx: JrConnectionCx<Self>,
1583        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1584    ) -> Result<Handled<MessageCx>, sacp::Error> {
1585        tracing::debug!(
1586            method = ?message.message().method(),
1587            "ConductorToClient::handle_message"
1588        );
1589        MatchMessageFrom::new(message, &cx)
1590            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1591            .if_message_from(ClientPeer, async move |message: MessageCx| {
1592                tracing::debug!(
1593                    method = ?message.message().method(),
1594                    "ConductorToClient::handle_message - matched Client"
1595                );
1596                Conductor::<Self>::incoming_message_from_client(conductor_tx, message).await
1597            })
1598            .await
1599            .done()
1600    }
1601}
1602
1603impl ConductorLink for ConductorToConductor {
1604    /// In this mode, the conductor acts as a proxy talking to an (outer) conductor..
1605    type Speaks = ProxyToConductor;
1606
1607    type Instantiator = Box<dyn InstantiateProxies>;
1608
1609    async fn initialize(
1610        self,
1611        message: MessageCx,
1612        client_cx: JrConnectionCx<Self>,
1613        instantiator: Self::Instantiator,
1614        responder: &mut ConductorResponder<Self>,
1615    ) -> Result<MessageCx, sacp::Error> {
1616        let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1617
1618        // Not yet initialized - expect an InitializeProxy request.
1619        // Error if we get anything else.
1620        let MessageCx::Request(request, request_cx) = message else {
1621            message.respond_with_error(invalid_request(), client_cx.clone())?;
1622            return Err(invalid_request());
1623        };
1624        let Some(result) =
1625            InitializeProxyRequest::parse_message(request.method(), request.params())
1626        else {
1627            request_cx.respond_with_error(invalid_request())?;
1628            return Err(invalid_request());
1629        };
1630
1631        let InitializeProxyRequest { initialize } = match result {
1632            Ok(r) => r,
1633            Err(error) => {
1634                request_cx.respond_with_error(error)?;
1635                return Err(invalid_request());
1636            }
1637        };
1638
1639        tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1640
1641        // Instantiate proxies (no agent in proxy mode)
1642        let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1643
1644        // In proxy mode, our successor is the outer conductor (via our client connection)
1645        responder.successor = Arc::new(());
1646
1647        // Spawn the proxy components
1648        responder.spawn_proxies(client_cx.clone(), proxy_components)?;
1649
1650        Ok(MessageCx::Request(
1651            modified_req.to_untyped_message()?,
1652            request_cx,
1653        ))
1654    }
1655
1656    async fn handle_message(
1657        self,
1658        message: MessageCx,
1659        cx: JrConnectionCx<Self>,
1660        conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1661    ) -> Result<Handled<MessageCx>, sacp::Error> {
1662        tracing::debug!(
1663            method = ?message.message().method(),
1664            "ConductorToConductor::handle_message"
1665        );
1666        MatchMessageFrom::new(message, &cx)
1667            .if_message_from(AgentPeer, {
1668                // Messages from our successor arrive already unwrapped
1669                // (RemoteRoleStyle::Successor strips the SuccessorMessage envelope).
1670                async |message: MessageCx| {
1671                    tracing::debug!(
1672                        method = ?message.message().method(),
1673                        "ConductorToConductor::handle_message - matched Agent"
1674                    );
1675                    let mut conductor_tx = conductor_tx.clone();
1676                    Conductor::<Self>::incoming_message_from_agent(&mut conductor_tx, message).await
1677                }
1678            })
1679            .await
1680            // Any incoming messages from the client are client-to-agent messages targeting the first component.
1681            .if_message_from(ClientPeer, async |message: MessageCx| {
1682                tracing::debug!(
1683                    method = ?message.message().method(),
1684                    "ConductorToConductor::handle_message - matched Client"
1685                );
1686                let mut conductor_tx = conductor_tx.clone();
1687                Conductor::<Self>::incoming_message_from_client(&mut conductor_tx, message).await
1688            })
1689            .await
1690            .done()
1691    }
1692}
1693
1694pub trait ConductorSuccessor<Link: ConductorLink>: Send + Sync + 'static {
1695    fn send_message<'a>(
1696        &self,
1697        message: MessageCx,
1698        conductor_cx: JrConnectionCx<Link>,
1699        responder: &'a mut ConductorResponder<Link>,
1700    ) -> BoxFuture<'a, Result<(), sacp::Error>>;
1701}
1702
1703impl<Link: ConductorLink> ConductorSuccessor<Link> for sacp::Error {
1704    fn send_message<'a>(
1705        &self,
1706        #[expect(unused_variables)] message: MessageCx,
1707        #[expect(unused_variables)] conductor_cx: JrConnectionCx<Link>,
1708        #[expect(unused_variables)] responder: &'a mut ConductorResponder<Link>,
1709    ) -> BoxFuture<'a, Result<(), sacp::Error>> {
1710        let error = self.clone();
1711        Box::pin(std::future::ready(Err(error)))
1712    }
1713}
1714
1715impl ConductorSuccessor<ConductorToConductor> for () {
1716    fn send_message<'a>(
1717        &self,
1718        message: MessageCx,
1719        conductor_cx: JrConnectionCx<ConductorToConductor>,
1720        responder: &'a mut ConductorResponder<ConductorToConductor>,
1721    ) -> BoxFuture<'a, Result<(), sacp::Error>> {
1722        Box::pin(async move {
1723            debug!("Proxy mode: forwarding successor message to conductor's successor");
1724            conductor_cx.send_proxied_message_to_via(
1725                AgentPeer,
1726                &mut responder.conductor_tx,
1727                message,
1728            )
1729        })
1730    }
1731}
1732
1733impl ConductorSuccessor<ConductorToClient> for JrConnectionCx<ConductorToAgent> {
1734    fn send_message<'a>(
1735        &self,
1736        message: MessageCx,
1737        conductor_cx: JrConnectionCx<ConductorToClient>,
1738        responder: &'a mut ConductorResponder<ConductorToClient>,
1739    ) -> BoxFuture<'a, Result<(), sacp::Error>> {
1740        let agent_cx = self.clone();
1741        Box::pin(async move {
1742            debug!("Proxy mode: forwarding successor message to conductor's successor");
1743            responder
1744                .forward_message_to_agent(conductor_cx, message, agent_cx)
1745                .await
1746        })
1747    }
1748}