1use std::{collections::HashMap, sync::Arc};
114
115use futures::{
116 SinkExt, StreamExt,
117 channel::mpsc::{self},
118};
119use sacp::{
120 AgentPeer, BoxFuture, ClientPeer, Component, Error, HasPeer, JrMessage,
121 link::{
122 AgentToClient, ConductorToAgent, ConductorToClient, ConductorToConductor, ConductorToProxy,
123 ProxyToConductor,
124 },
125 util::MatchMessage,
126};
127use sacp::{
128 Handled,
129 schema::{
130 McpConnectRequest, McpConnectResponse, McpDisconnectNotification, McpOverAcpMessage,
131 SuccessorMessage,
132 },
133};
134use sacp::{
135 JrConnectionBuilder, JrConnectionCx, JrLink, JrNotification, JrPeer, JrRequest, JrRequestCx,
136 JrResponse, MessageCx, UntypedMessage,
137};
138use sacp::{
139 JrMessageHandler, JrResponder, JrResponsePayload,
140 schema::{InitializeProxyRequest, InitializeRequest, NewSessionRequest},
141 util::MatchMessageFrom,
142};
143use tracing::{debug, info};
144
145use crate::conductor::mcp_bridge::{
146 McpBridgeConnection, McpBridgeConnectionActor, McpBridgeListeners,
147};
148
149mod mcp_bridge;
150
151pub struct Conductor<Link: ConductorLink> {
157 name: String,
158 instantiator: Link::Instantiator,
159 mcp_bridge_mode: crate::McpBridgeMode,
160 trace_writer: Option<crate::trace::TraceWriter>,
161 link: Link,
162}
163
164impl<Link: ConductorLink> Conductor<Link> {
165 pub fn new(
166 link: Link,
167 name: impl ToString,
168 instantiator: Link::Instantiator,
169 mcp_bridge_mode: crate::McpBridgeMode,
170 ) -> Self {
171 Conductor {
172 name: name.to_string(),
173 instantiator,
174 mcp_bridge_mode,
175 trace_writer: None,
176 link,
177 }
178 }
179}
180
181impl Conductor<ConductorToClient> {
182 pub fn new_agent(
184 name: impl ToString,
185 instantiator: impl InstantiateProxiesAndAgent + 'static,
186 mcp_bridge_mode: crate::McpBridgeMode,
187 ) -> Self {
188 Conductor::new(
189 ConductorToClient,
190 name,
191 Box::new(instantiator),
192 mcp_bridge_mode,
193 )
194 }
195}
196
197impl Conductor<ConductorToConductor> {
198 pub fn new_proxy(
200 name: impl ToString,
201 instantiator: impl InstantiateProxies + 'static,
202 mcp_bridge_mode: crate::McpBridgeMode,
203 ) -> Self {
204 Conductor::new(
205 ConductorToConductor,
206 name,
207 Box::new(instantiator),
208 mcp_bridge_mode,
209 )
210 }
211}
212
213impl<Link: ConductorLink> Conductor<Link> {
214 pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
218 self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
219 self
220 }
221
222 pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
227 self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
228 Ok(self)
229 }
230
231 pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
233 self.trace_writer = Some(writer);
234 self
235 }
236
237 pub fn into_connection_builder(
238 self,
239 ) -> JrConnectionBuilder<impl JrMessageHandler<Link = Link>, impl JrResponder<Link>> {
240 let (conductor_tx, conductor_rx) = mpsc::channel(128 );
241
242 let responder = ConductorResponder {
243 conductor_rx,
244 conductor_tx: conductor_tx.clone(),
245 instantiator: Some(self.instantiator),
246 bridge_listeners: Default::default(),
247 bridge_connections: Default::default(),
248 mcp_bridge_mode: self.mcp_bridge_mode,
249 proxies: Default::default(),
250 successor: Arc::new(sacp::util::internal_error("successor not initialized")),
251 trace_writer: self.trace_writer,
252 pending_requests: Default::default(),
253 link: self.link,
254 };
255
256 JrConnectionBuilder::new_with(ConductorMessageHandler {
257 conductor_tx,
258 link: self.link,
259 })
260 .name(self.name)
261 .with_responder(responder)
262 }
263
264 pub async fn run(
274 self,
275 transport: impl Component<Link::ConnectsTo> + 'static,
276 ) -> Result<(), sacp::Error> {
277 self.into_connection_builder()
278 .connect_to(transport)?
279 .serve()
280 .await
281 }
282
283 async fn incoming_message_from_client(
284 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
285 message: MessageCx,
286 ) -> Result<(), sacp::Error> {
287 conductor_tx
288 .send(ConductorMessage::ClientToAgent {
289 target_component_index: 0,
290 message,
291 })
292 .await
293 .map_err(sacp::util::internal_error)
294 }
295
296 async fn incoming_message_from_agent(
297 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
298 message: MessageCx,
299 ) -> Result<(), sacp::Error> {
300 conductor_tx
301 .send(ConductorMessage::AgentToClient {
302 source_component_index: SourceComponentIndex::Successor,
303 message,
304 })
305 .await
306 .map_err(sacp::util::internal_error)
307 }
308}
309
310impl<Link: ConductorLink> Component<Link::Speaks> for Conductor<Link> {
311 async fn serve(
312 self,
313 client: impl sacp::Component<Link::ConnectsTo>,
314 ) -> Result<(), sacp::Error> {
315 self.run(client).await
316 }
317}
318
319struct ConductorMessageHandler<Link: ConductorLink> {
320 conductor_tx: mpsc::Sender<ConductorMessage>,
321 link: Link,
322}
323
324impl<Link: ConductorLink> JrMessageHandler for ConductorMessageHandler<Link> {
325 type Link = Link;
326
327 async fn handle_message(
328 &mut self,
329 message: MessageCx,
330 cx: sacp::JrConnectionCx<Link>,
331 ) -> Result<sacp::Handled<MessageCx>, sacp::Error> {
332 self.link
333 .handle_message(message, cx, &mut self.conductor_tx)
334 .await
335 }
336
337 fn describe_chain(&self) -> impl std::fmt::Debug {
338 "ConductorMessageHandler"
339 }
340}
341
342pub struct ConductorResponder<Link>
348where
349 Link: ConductorLink,
350{
351 conductor_rx: mpsc::Receiver<ConductorMessage>,
352
353 conductor_tx: mpsc::Sender<ConductorMessage>,
354
355 bridge_listeners: McpBridgeListeners,
357
358 bridge_connections: HashMap<String, McpBridgeConnection>,
360
361 instantiator: Option<Link::Instantiator>,
364
365 proxies: Vec<JrConnectionCx<ConductorToProxy>>,
369
370 successor: Arc<dyn ConductorSuccessor<Link>>,
374
375 mcp_bridge_mode: crate::McpBridgeMode,
377
378 trace_writer: Option<crate::trace::TraceWriter>,
380
381 pending_requests: HashMap<String, (String, String)>,
383
384 link: Link,
386}
387
388impl<Link> JrResponder<Link> for ConductorResponder<Link>
389where
390 Link: ConductorLink,
391{
392 async fn run(mut self, cx: JrConnectionCx<Link>) -> Result<(), sacp::Error> {
393 while let Some(message) = self.conductor_rx.next().await {
399 self.handle_conductor_message(cx.clone(), message).await?;
400 }
401 Ok(())
402 }
403}
404
405impl<Link> ConductorResponder<Link>
406where
407 Link: ConductorLink,
408{
409 fn component_name(&self, index: usize) -> String {
411 if index == self.proxies.len() {
412 "agent".to_string()
413 } else {
414 format!("proxy:{}", index)
415 }
416 }
417
418 fn source_component_name(&self, index: SourceComponentIndex) -> String {
420 match index {
421 SourceComponentIndex::Successor => "agent".to_string(), SourceComponentIndex::Proxy(i) => self.component_name(i),
423 }
424 }
425
426 fn extract_trace_info<R: sacp::JrRequest, N: sacp::JrNotification>(
431 message: &sacp::MessageCx<R, N>,
432 ) -> Result<(crate::trace::Protocol, String, serde_json::Value), sacp::Error> {
433 match message {
434 sacp::MessageCx::Request(request, _) => {
435 let untyped = request.to_untyped_message()?;
436
437 if let Some(Ok(mcp_req)) = <McpOverAcpMessage<UntypedMessage>>::parse_message(
439 &untyped.method,
440 &untyped.params,
441 ) {
442 return Ok((
443 crate::trace::Protocol::Mcp,
444 mcp_req.message.method,
445 mcp_req.message.params,
446 ));
447 }
448
449 Ok((crate::trace::Protocol::Acp, untyped.method, untyped.params))
451 }
452 sacp::MessageCx::Notification(notification) => {
453 let untyped = notification.to_untyped_message()?;
454
455 if let Some(Ok(mcp_notif)) = <McpOverAcpMessage<UntypedMessage>>::parse_message(
457 &untyped.method,
458 &untyped.params,
459 ) {
460 return Ok((
461 crate::trace::Protocol::Mcp,
462 mcp_notif.message.method,
463 mcp_notif.message.params,
464 ));
465 }
466
467 Ok((crate::trace::Protocol::Acp, untyped.method, untyped.params))
469 }
470 }
471 }
472
473 fn trace_client_to_agent<R: sacp::JrRequest, N: sacp::JrNotification>(
475 &mut self,
476 target_index: usize,
477 message: &sacp::MessageCx<R, N>,
478 ) -> Result<(), sacp::Error> {
479 if self.trace_writer.is_none() {
480 return Ok(());
481 }
482
483 let from = if target_index == 0 {
484 "client".to_string()
485 } else {
486 self.component_name(target_index - 1)
487 };
488 let to = self.component_name(target_index);
489
490 let (protocol, method, params) = Self::extract_trace_info(message)?;
491
492 let writer = self.trace_writer.as_mut().unwrap();
493 match message.id() {
494 Some(id) => {
495 let id_key = id.to_string();
497 self.pending_requests
498 .insert(id_key, (from.clone(), to.clone()));
499 writer.request(protocol, from, to, id, &method, None, params);
500 }
501 None => {
502 writer.notification(protocol, from, to, &method, None, params);
503 }
504 }
505 Ok(())
506 }
507
508 fn trace_agent_to_client<R: sacp::JrRequest, N: sacp::JrNotification>(
510 &mut self,
511 source_index: SourceComponentIndex,
512 message: &sacp::MessageCx<R, N>,
513 ) -> Result<(), sacp::Error> {
514 if self.trace_writer.is_none() {
515 return Ok(());
516 }
517
518 let from = self.source_component_name(source_index);
519 let to = match source_index {
520 SourceComponentIndex::Successor => {
521 if self.proxies.is_empty() {
522 "client".to_string()
523 } else {
524 self.component_name(self.proxies.len() - 1)
525 }
526 }
527 SourceComponentIndex::Proxy(0) => "client".to_string(),
528 SourceComponentIndex::Proxy(i) => self.component_name(i - 1),
529 };
530
531 let (protocol, method, params) = Self::extract_trace_info(message)?;
532
533 let writer = self.trace_writer.as_mut().unwrap();
534 match message.id() {
535 Some(id) => {
536 let id_key = id.to_string();
538 self.pending_requests
539 .insert(id_key, (from.clone(), to.clone()));
540 writer.request(protocol, from, to, id, &method, None, params);
541 }
542 None => {
543 writer.notification(protocol, from, to, &method, None, params);
544 }
545 }
546 Ok(())
547 }
548
549 fn trace_response(
551 &mut self,
552 request_cx: &sacp::JrRequestCx<serde_json::Value>,
553 result: &Result<serde_json::Value, sacp::Error>,
554 ) {
555 let Some(writer) = &mut self.trace_writer else {
556 return;
557 };
558
559 let id = request_cx.id();
560 let id_key = id.to_string();
561
562 if let Some((original_from, original_to)) = self.pending_requests.remove(&id_key) {
564 let (is_error, payload) = match result {
565 Ok(v) => (false, v.clone()),
566 Err(e) => (true, serde_json::json!({ "error": e.to_string() })),
567 };
568 writer.response(&original_to, &original_from, id, is_error, payload);
570 }
571 }
572
573 async fn handle_conductor_message(
601 &mut self,
602 client: JrConnectionCx<Link>,
603 message: ConductorMessage,
604 ) -> Result<(), sacp::Error> {
605 tracing::debug!(?message, "handle_conductor_message");
606
607 match message {
608 ConductorMessage::ClientToAgent {
609 target_component_index,
610 message,
611 } => {
612 self.forward_client_to_agent_message(target_component_index, message, client)
615 .await
616 }
617
618 ConductorMessage::AgentToClient {
619 source_component_index,
620 message,
621 } => {
622 tracing::debug!(
623 ?source_component_index,
624 message_method = ?message.message().method(),
625 "Conductor: AgentToClient received"
626 );
627 if let Err(e) = self.trace_agent_to_client(source_component_index, &message) {
628 tracing::warn!("Failed to trace agent-to-client message: {e}");
629 }
630 self.send_message_to_predecessor_of(client, source_component_index, message)
631 }
632
633 ConductorMessage::McpConnectionReceived {
637 acp_url,
638 connection,
639 actor,
640 } => {
641 self.send_request_to_predecessor_of(
645 client,
646 self.proxies.len(),
647 McpConnectRequest {
648 acp_url,
649 meta: None,
650 },
651 )
652 .on_receiving_result({
653 let mut conductor_tx = self.conductor_tx.clone();
654 async move |result| {
655 match result {
656 Ok(response) => conductor_tx
657 .send(ConductorMessage::McpConnectionEstablished {
658 response,
659 actor,
660 connection,
661 })
662 .await
663 .map_err(|_| sacp::Error::internal_error()),
664 Err(_) => {
665 Ok(())
667 }
668 }
669 }
670 })
671 }
672
673 ConductorMessage::McpConnectionEstablished {
676 response: McpConnectResponse { connection_id, .. },
677 actor,
678 connection,
679 } => {
680 self.bridge_connections
681 .insert(connection_id.clone(), connection);
682 client.spawn(actor.run(connection_id))
683 }
684
685 ConductorMessage::McpClientToMcpServer {
687 connection_id,
688 message,
689 } => {
690 let wrapped = message.map(
691 |request, request_cx| {
692 (
693 McpOverAcpMessage {
694 connection_id: connection_id.clone(),
695 message: request,
696 meta: None,
697 },
698 request_cx,
699 )
700 },
701 |notification| McpOverAcpMessage {
702 connection_id: connection_id.clone(),
703 message: notification,
704 meta: None,
705 },
706 );
707
708 self.trace_agent_to_client(SourceComponentIndex::Successor, &wrapped)?;
711 self.send_message_to_predecessor_of(
712 client,
713 SourceComponentIndex::Successor,
714 wrapped,
715 )
716 }
717
718 ConductorMessage::McpConnectionDisconnected { notification } => {
721 self.bridge_connections.remove(¬ification.connection_id);
724 self.send_notification_to_predecessor_of(client, self.proxies.len(), notification)
725 }
726
727 ConductorMessage::ForwardResponse { request_cx, result } => {
731 self.trace_response(&request_cx, &result);
732 request_cx.respond_with_result(result)
733 }
734 }
735 }
736
737 fn send_message_to_predecessor_of<Req: JrRequest, N: JrNotification>(
748 &mut self,
749 client: JrConnectionCx<Link>,
750 source_component_index: SourceComponentIndex,
751 message: MessageCx<Req, N>,
752 ) -> Result<(), sacp::Error>
753 where
754 Req::Response: Send,
755 {
756 let source_component_index = match source_component_index {
757 SourceComponentIndex::Successor => self.proxies.len(),
758 SourceComponentIndex::Proxy(index) => index,
759 };
760
761 match message {
762 MessageCx::Request(request, request_cx) => self
763 .send_request_to_predecessor_of(client, source_component_index, request)
764 .forward_response_via(&self.conductor_tx, request_cx),
765 MessageCx::Notification(notification) => self.send_notification_to_predecessor_of(
766 client,
767 source_component_index,
768 notification,
769 ),
770 }
771 }
772
773 fn send_request_to_predecessor_of<Req: JrRequest>(
774 &mut self,
775 client: JrConnectionCx<Link>,
776 source_component_index: usize,
777 request: Req,
778 ) -> JrResponse<Req::Response> {
779 if source_component_index == 0 {
780 client.send_request_to(ClientPeer, request)
781 } else {
782 self.proxies[source_component_index - 1].send_request(SuccessorMessage {
783 message: request,
784 meta: None,
785 })
786 }
787 }
788
789 fn send_notification_to_predecessor_of<N: JrNotification>(
800 &mut self,
801 client: JrConnectionCx<Link>,
802 source_component_index: usize,
803 notification: N,
804 ) -> Result<(), sacp::Error> {
805 tracing::debug!(
806 source_component_index,
807 proxies_len = self.proxies.len(),
808 "send_notification_to_predecessor_of"
809 );
810 if source_component_index == 0 {
811 tracing::debug!("Sending notification directly to client");
812 client.send_notification_to(ClientPeer, notification)
813 } else {
814 tracing::debug!(
815 target_proxy = source_component_index - 1,
816 "Sending notification wrapped as SuccessorMessage to proxy"
817 );
818 self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
819 message: notification,
820 meta: None,
821 })
822 }
823 }
824
825 async fn forward_client_to_agent_message(
830 &mut self,
831 target_component_index: usize,
832 message: MessageCx,
833 conductor_cx: JrConnectionCx<Link>,
834 ) -> Result<(), sacp::Error> {
835 tracing::trace!(
836 target_component_index,
837 ?message,
838 "forward_client_to_agent_message"
839 );
840
841 let message = self
843 .ensure_initialized(conductor_cx.clone(), message)
844 .await?;
845
846 if let Err(e) = self.trace_client_to_agent(target_component_index, &message) {
848 tracing::warn!("Failed to trace client-to-agent message: {e}");
849 }
850
851 if target_component_index < self.proxies.len() {
854 self.forward_message_to_proxy(target_component_index, message)
855 .await
856 } else {
857 assert_eq!(target_component_index, self.proxies.len());
858
859 debug!(
860 target_component_index,
861 proxies_count = self.proxies.len(),
862 "Proxy mode: forwarding successor message to conductor's successor"
863 );
864 let successor = self.successor.clone();
865 successor.send_message(message, conductor_cx, self).await
866 }
867 }
868
869 async fn ensure_initialized(
879 &mut self,
880 client: JrConnectionCx<Link>,
881 message: MessageCx,
882 ) -> Result<MessageCx, Error> {
883 let Some(instantiator) = self.instantiator.take() else {
885 return Ok(message);
886 };
887
888 let message = self
889 .link
890 .initialize(message, client, instantiator, self)
891 .await?;
892 Ok(message)
893 }
894
895 fn spawn_proxies(
897 &mut self,
898 cx: JrConnectionCx<Link>,
899 proxy_components: Vec<sacp::DynComponent<ProxyToConductor>>,
900 ) -> Result<(), sacp::Error> {
901 assert!(self.proxies.is_empty());
902
903 info!(proxy_count = proxy_components.len(), "spawn_proxies");
904
905 for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
907 debug!(component_index, "spawning proxy");
908
909 let proxy_cx = cx.spawn_connection(
910 ConductorToProxy::builder()
911 .name(format!("conductor-to-component({})", component_index))
912 .on_receive_message(
914 {
915 let mut conductor_tx = self.conductor_tx.clone();
916 async move |message_cx: MessageCx<
917 SuccessorMessage,
918 SuccessorMessage,
919 >,
920 _cx| {
921 conductor_tx
922 .send(ConductorMessage::ClientToAgent {
923 target_component_index: component_index + 1,
924 message: message_cx
925 .map(|r, cx| (r.message, cx), |n| n.message),
926 })
927 .await
928 .map_err(sacp::util::internal_error)
929 }
930 },
931 sacp::on_receive_message!(),
932 )
933 .on_receive_message(
935 {
936 let mut conductor_tx = self.conductor_tx.clone();
937 async move |message_cx: MessageCx<UntypedMessage, UntypedMessage>,
938 _cx| {
939 conductor_tx
940 .send(ConductorMessage::AgentToClient {
941 source_component_index: SourceComponentIndex::Proxy(
942 component_index,
943 ),
944 message: message_cx,
945 })
946 .await
947 .map_err(sacp::util::internal_error)
948 }
949 },
950 sacp::on_receive_message!(),
951 )
952 .connect_to(dyn_component)?,
953 |c| Box::pin(c.serve()),
954 )?;
955 self.proxies.push(proxy_cx);
956 }
957
958 info!(proxy_count = self.proxies.len(), "Proxies spawned");
959
960 Ok(())
961 }
962
963 async fn forward_message_to_proxy(
964 &mut self,
965 target_component_index: usize,
966 message: MessageCx,
967 ) -> Result<(), sacp::Error> {
968 tracing::debug!(?message, "forward_message_to_proxy");
969
970 MatchMessage::new(message)
971 .if_request(async |_request: InitializeProxyRequest, request_cx| {
972 request_cx.respond_with_error(
973 sacp::Error::invalid_request()
974 .data("initialize/proxy requests are only sent by the conductor"),
975 )
976 })
977 .await
978 .if_request(async |request: InitializeRequest, request_cx| {
979 self.proxies[target_component_index]
987 .send_request(InitializeProxyRequest::from(request))
988 .on_receiving_result({
989 let conductor_tx = self.conductor_tx.clone();
990 async move |result| {
991 tracing::debug!(?result, "got initialize_proxy response from proxy");
992 request_cx
993 .respond_with_result_via(&conductor_tx, result)
994 .await
995 }
996 })
997 })
998 .await
999 .otherwise(async |message| {
1000 self.proxies[target_component_index].send_proxied_message_to_via(
1002 AgentPeer,
1003 &self.conductor_tx,
1004 message,
1005 )
1006 })
1007 .await
1008 }
1009
1010 async fn forward_message_to_agent(
1015 &mut self,
1016 conductor_cx: JrConnectionCx<ConductorToClient>,
1017 message: MessageCx,
1018 agent_cx: JrConnectionCx<ConductorToAgent>,
1019 ) -> Result<(), Error> {
1020 MatchMessage::new(message)
1021 .if_request(async |_request: InitializeProxyRequest, request_cx| {
1022 request_cx.respond_with_error(
1023 sacp::Error::invalid_request()
1024 .data("initialize/proxy requests are only sent by the conductor"),
1025 )
1026 })
1027 .await
1028 .if_request(async |mut request: NewSessionRequest, request_cx| {
1029 for mcp_server in &mut request.mcp_servers {
1032 self.bridge_listeners
1033 .transform_mcp_server(
1034 conductor_cx.clone(),
1035 mcp_server,
1036 &self.conductor_tx,
1037 &self.mcp_bridge_mode,
1038 )
1039 .await?;
1040 }
1041
1042 agent_cx
1043 .send_request(request)
1044 .forward_response_via(&self.conductor_tx, request_cx)
1045 })
1046 .await
1047 .if_request(
1048 async |request: McpOverAcpMessage<UntypedMessage>, request_cx| {
1049 let McpOverAcpMessage {
1050 connection_id,
1051 message: mcp_request,
1052 ..
1053 } = request;
1054 self.bridge_connections
1055 .get_mut(&connection_id)
1056 .ok_or_else(|| {
1057 sacp::util::internal_error(format!(
1058 "unknown connection id: {}",
1059 connection_id
1060 ))
1061 })?
1062 .send(MessageCx::Request(mcp_request, request_cx))
1063 .await
1064 },
1065 )
1066 .await
1067 .if_notification(async |notification: McpOverAcpMessage<UntypedMessage>| {
1068 let McpOverAcpMessage {
1069 connection_id,
1070 message: mcp_notification,
1071 ..
1072 } = notification;
1073 self.bridge_connections
1074 .get_mut(&connection_id)
1075 .ok_or_else(|| {
1076 sacp::util::internal_error(format!(
1077 "unknown connection id: {}",
1078 connection_id
1079 ))
1080 })?
1081 .send(MessageCx::Notification(mcp_notification))
1082 .await
1083 })
1084 .await
1085 .otherwise(async |message| {
1086 agent_cx.send_proxied_message_to_via(AgentPeer, &self.conductor_tx, message)
1088 })
1089 .await
1090 }
1091}
1092
1093#[derive(Debug, Clone, Copy)]
1099pub enum SourceComponentIndex {
1100 Successor,
1102
1103 Proxy(usize),
1105}
1106
1107pub trait InstantiateProxies: Send {
1112 fn instantiate_proxies(
1117 self: Box<Self>,
1118 req: InitializeRequest,
1119 ) -> futures::future::BoxFuture<
1120 'static,
1121 Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
1122 >;
1123}
1124
1125impl<T> InstantiateProxies for Vec<T>
1129where
1130 T: Component<ProxyToConductor> + 'static,
1131{
1132 fn instantiate_proxies(
1133 self: Box<Self>,
1134 req: InitializeRequest,
1135 ) -> futures::future::BoxFuture<
1136 'static,
1137 Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
1138 > {
1139 Box::pin(async move {
1140 let components: Vec<sacp::DynComponent<ProxyToConductor>> = (*self)
1141 .into_iter()
1142 .map(|c| sacp::DynComponent::new(c))
1143 .collect();
1144 Ok((req, components))
1145 })
1146 }
1147}
1148
1149impl<F, Fut> InstantiateProxies for F
1151where
1152 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1153 Fut: std::future::Future<
1154 Output = Result<
1155 (InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>),
1156 sacp::Error,
1157 >,
1158 > + Send
1159 + 'static,
1160{
1161 fn instantiate_proxies(
1162 self: Box<Self>,
1163 req: InitializeRequest,
1164 ) -> futures::future::BoxFuture<
1165 'static,
1166 Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
1167 > {
1168 Box::pin(async move { (*self)(req).await })
1169 }
1170}
1171
1172pub trait InstantiateProxiesAndAgent: Send {
1177 fn instantiate_proxies_and_agent(
1183 self: Box<Self>,
1184 req: InitializeRequest,
1185 ) -> futures::future::BoxFuture<
1186 'static,
1187 Result<
1188 (
1189 InitializeRequest,
1190 Vec<sacp::DynComponent<ProxyToConductor>>,
1191 sacp::DynComponent<AgentToClient>,
1192 ),
1193 sacp::Error,
1194 >,
1195 >;
1196}
1197
1198pub struct AgentOnly<A>(pub A);
1200
1201impl<A: Component<AgentToClient> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
1202 fn instantiate_proxies_and_agent(
1203 self: Box<Self>,
1204 req: InitializeRequest,
1205 ) -> futures::future::BoxFuture<
1206 'static,
1207 Result<
1208 (
1209 InitializeRequest,
1210 Vec<sacp::DynComponent<ProxyToConductor>>,
1211 sacp::DynComponent<AgentToClient>,
1212 ),
1213 sacp::Error,
1214 >,
1215 > {
1216 Box::pin(async move { Ok((req, Vec::new(), sacp::DynComponent::new(self.0))) })
1217 }
1218}
1219
1220pub struct ProxiesAndAgent {
1229 proxies: Vec<sacp::DynComponent<ProxyToConductor>>,
1230 agent: sacp::DynComponent<AgentToClient>,
1231}
1232
1233impl ProxiesAndAgent {
1234 pub fn new(agent: impl Component<AgentToClient> + 'static) -> Self {
1236 Self {
1237 proxies: vec![],
1238 agent: sacp::DynComponent::new(agent),
1239 }
1240 }
1241
1242 pub fn proxy(mut self, proxy: impl Component<ProxyToConductor> + 'static) -> Self {
1244 self.proxies.push(sacp::DynComponent::new(proxy));
1245 self
1246 }
1247
1248 pub fn proxies<P, I>(mut self, proxies: I) -> Self
1250 where
1251 P: Component<ProxyToConductor> + 'static,
1252 I: IntoIterator<Item = P>,
1253 {
1254 self.proxies
1255 .extend(proxies.into_iter().map(sacp::DynComponent::new));
1256 self
1257 }
1258}
1259
1260impl InstantiateProxiesAndAgent for ProxiesAndAgent {
1261 fn instantiate_proxies_and_agent(
1262 self: Box<Self>,
1263 req: InitializeRequest,
1264 ) -> futures::future::BoxFuture<
1265 'static,
1266 Result<
1267 (
1268 InitializeRequest,
1269 Vec<sacp::DynComponent<ProxyToConductor>>,
1270 sacp::DynComponent<AgentToClient>,
1271 ),
1272 sacp::Error,
1273 >,
1274 > {
1275 Box::pin(async move { Ok((req, self.proxies, self.agent)) })
1276 }
1277}
1278
1279impl<F, Fut> InstantiateProxiesAndAgent for F
1281where
1282 F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
1283 Fut: std::future::Future<
1284 Output = Result<
1285 (
1286 InitializeRequest,
1287 Vec<sacp::DynComponent<ProxyToConductor>>,
1288 sacp::DynComponent<AgentToClient>,
1289 ),
1290 sacp::Error,
1291 >,
1292 > + Send
1293 + 'static,
1294{
1295 fn instantiate_proxies_and_agent(
1296 self: Box<Self>,
1297 req: InitializeRequest,
1298 ) -> futures::future::BoxFuture<
1299 'static,
1300 Result<
1301 (
1302 InitializeRequest,
1303 Vec<sacp::DynComponent<ProxyToConductor>>,
1304 sacp::DynComponent<AgentToClient>,
1305 ),
1306 sacp::Error,
1307 >,
1308 > {
1309 Box::pin(async move { (*self)(req).await })
1310 }
1311}
1312
1313#[derive(Debug)]
1323pub enum ConductorMessage {
1324 ClientToAgent {
1327 target_component_index: usize,
1328 message: MessageCx,
1329 },
1330
1331 AgentToClient {
1334 source_component_index: SourceComponentIndex,
1335 message: MessageCx,
1336 },
1337
1338 McpConnectionReceived {
1342 acp_url: String,
1344
1345 actor: McpBridgeConnectionActor,
1347
1348 connection: McpBridgeConnection,
1350 },
1351
1352 McpConnectionEstablished {
1356 response: McpConnectResponse,
1357
1358 actor: McpBridgeConnectionActor,
1360
1361 connection: McpBridgeConnection,
1363 },
1364
1365 McpClientToMcpServer {
1370 connection_id: String,
1371 message: MessageCx,
1372 },
1373
1374 McpConnectionDisconnected {
1376 notification: McpDisconnectNotification,
1377 },
1378
1379 ForwardResponse {
1389 request_cx: JrRequestCx<serde_json::Value>,
1390 result: Result<serde_json::Value, sacp::Error>,
1391 },
1392}
1393
1394trait JrConnectionCxExt<Link: JrLink> {
1395 fn send_proxied_message_to_via<Peer: JrPeer>(
1396 &self,
1397 peer: Peer,
1398 conductor_tx: &mpsc::Sender<ConductorMessage>,
1399 message: MessageCx,
1400 ) -> Result<(), sacp::Error>
1401 where
1402 Link: sacp::HasPeer<Peer>;
1403}
1404
1405impl<Link: JrLink> JrConnectionCxExt<Link> for JrConnectionCx<Link> {
1406 fn send_proxied_message_to_via<Peer: JrPeer>(
1407 &self,
1408 peer: Peer,
1409 conductor_tx: &mpsc::Sender<ConductorMessage>,
1410 message: MessageCx,
1411 ) -> Result<(), sacp::Error>
1412 where
1413 Link: sacp::HasPeer<Peer>,
1414 {
1415 match message {
1416 MessageCx::Request(request, request_cx) => self
1417 .send_request_to(peer, request)
1418 .forward_response_via(conductor_tx, request_cx),
1419 MessageCx::Notification(notification) => self.send_notification_to(peer, notification),
1420 }
1421 }
1422}
1423
1424trait JrRequestCxExt<T: JrResponsePayload> {
1425 async fn respond_with_result_via(
1426 self,
1427 conductor_tx: &mpsc::Sender<ConductorMessage>,
1428 result: Result<T, sacp::Error>,
1429 ) -> Result<(), sacp::Error>;
1430}
1431
1432impl<T: JrResponsePayload> JrRequestCxExt<T> for JrRequestCx<T> {
1433 async fn respond_with_result_via(
1434 self,
1435 conductor_tx: &mpsc::Sender<ConductorMessage>,
1436 result: Result<T, sacp::Error>,
1437 ) -> Result<(), sacp::Error> {
1438 let result = result.and_then(|response| response.into_json(self.method()));
1439 conductor_tx
1440 .clone()
1441 .send(ConductorMessage::ForwardResponse {
1442 request_cx: self.erase_to_json(),
1443 result,
1444 })
1445 .await
1446 .map_err(|e| sacp::util::internal_error(format!("Failed to send response: {}", e)))
1447 }
1448}
1449
1450pub trait JrResponseExt<T: JrResponsePayload> {
1451 fn forward_response_via(
1452 self,
1453 conductor_tx: &mpsc::Sender<ConductorMessage>,
1454 request_cx: JrRequestCx<T>,
1455 ) -> Result<(), sacp::Error>;
1456}
1457
1458impl<T: JrResponsePayload> JrResponseExt<T> for JrResponse<T> {
1459 fn forward_response_via(
1460 self,
1461 conductor_tx: &mpsc::Sender<ConductorMessage>,
1462 request_cx: JrRequestCx<T>,
1463 ) -> Result<(), sacp::Error> {
1464 let conductor_tx = conductor_tx.clone();
1465 self.on_receiving_result(async move |result| {
1466 request_cx
1467 .respond_with_result_via(&conductor_tx, result)
1468 .await
1469 })
1470 }
1471}
1472
1473pub trait ConductorLink: JrLink + HasPeer<ClientPeer> {
1478 type Speaks: JrLink<ConnectsTo = Self::ConnectsTo>;
1479
1480 type Instantiator: Send;
1482
1483 fn initialize(
1488 self,
1489 message: MessageCx,
1490 cx: JrConnectionCx<Self>,
1491 instantiator: Self::Instantiator,
1492 responder: &mut ConductorResponder<Self>,
1493 ) -> impl Future<Output = Result<MessageCx, sacp::Error>> + Send;
1494
1495 fn handle_message(
1497 self,
1498 message: MessageCx,
1499 cx: JrConnectionCx<Self>,
1500 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1501 ) -> impl Future<Output = Result<Handled<MessageCx>, sacp::Error>> + Send;
1502}
1503
1504impl ConductorLink for ConductorToClient {
1505 type Speaks = AgentToClient;
1507
1508 type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
1509
1510 async fn initialize(
1511 self,
1512 message: MessageCx,
1513 client: JrConnectionCx<Self>,
1514 instantiator: Self::Instantiator,
1515 responder: &mut ConductorResponder<Self>,
1516 ) -> Result<MessageCx, sacp::Error> {
1517 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1518
1519 let MessageCx::Request(request, request_cx) = message else {
1522 message.respond_with_error(invalid_request(), client.clone())?;
1523 return Err(invalid_request());
1524 };
1525 let Some(result) = InitializeRequest::parse_message(request.method(), request.params())
1526 else {
1527 request_cx.respond_with_error(invalid_request())?;
1528 return Err(invalid_request());
1529 };
1530
1531 let init_request = match result {
1532 Ok(r) => r,
1533 Err(error) => {
1534 request_cx.respond_with_error(error)?;
1535 return Err(invalid_request());
1536 }
1537 };
1538
1539 let (modified_req, proxy_components, agent_component) = instantiator
1541 .instantiate_proxies_and_agent(init_request)
1542 .await?;
1543
1544 debug!(?agent_component, "spawning agent");
1546 let agent_cx = client.spawn_connection(
1547 ConductorToAgent::builder()
1548 .name("conductor-to-agent")
1549 .on_receive_message(
1551 {
1552 let mut conductor_tx = responder.conductor_tx.clone();
1553 async move |message_cx: MessageCx, _cx| {
1554 conductor_tx
1555 .send(ConductorMessage::AgentToClient {
1556 source_component_index: SourceComponentIndex::Successor,
1557 message: message_cx,
1558 })
1559 .await
1560 .map_err(sacp::util::internal_error)
1561 }
1562 },
1563 sacp::on_receive_message!(),
1564 )
1565 .connect_to(agent_component)?,
1566 |c| Box::pin(c.serve()),
1567 )?;
1568 responder.successor = Arc::new(agent_cx);
1569
1570 responder.spawn_proxies(client.clone(), proxy_components)?;
1572
1573 Ok(MessageCx::Request(
1574 modified_req.to_untyped_message()?,
1575 request_cx,
1576 ))
1577 }
1578
1579 async fn handle_message(
1580 self,
1581 message: MessageCx,
1582 cx: JrConnectionCx<Self>,
1583 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1584 ) -> Result<Handled<MessageCx>, sacp::Error> {
1585 tracing::debug!(
1586 method = ?message.message().method(),
1587 "ConductorToClient::handle_message"
1588 );
1589 MatchMessageFrom::new(message, &cx)
1590 .if_message_from(ClientPeer, async move |message: MessageCx| {
1592 tracing::debug!(
1593 method = ?message.message().method(),
1594 "ConductorToClient::handle_message - matched Client"
1595 );
1596 Conductor::<Self>::incoming_message_from_client(conductor_tx, message).await
1597 })
1598 .await
1599 .done()
1600 }
1601}
1602
1603impl ConductorLink for ConductorToConductor {
1604 type Speaks = ProxyToConductor;
1606
1607 type Instantiator = Box<dyn InstantiateProxies>;
1608
1609 async fn initialize(
1610 self,
1611 message: MessageCx,
1612 client_cx: JrConnectionCx<Self>,
1613 instantiator: Self::Instantiator,
1614 responder: &mut ConductorResponder<Self>,
1615 ) -> Result<MessageCx, sacp::Error> {
1616 let invalid_request = || Error::invalid_request().data("expected `initialize` request");
1617
1618 let MessageCx::Request(request, request_cx) = message else {
1621 message.respond_with_error(invalid_request(), client_cx.clone())?;
1622 return Err(invalid_request());
1623 };
1624 let Some(result) =
1625 InitializeProxyRequest::parse_message(request.method(), request.params())
1626 else {
1627 request_cx.respond_with_error(invalid_request())?;
1628 return Err(invalid_request());
1629 };
1630
1631 let InitializeProxyRequest { initialize } = match result {
1632 Ok(r) => r,
1633 Err(error) => {
1634 request_cx.respond_with_error(error)?;
1635 return Err(invalid_request());
1636 }
1637 };
1638
1639 tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
1640
1641 let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
1643
1644 responder.successor = Arc::new(());
1646
1647 responder.spawn_proxies(client_cx.clone(), proxy_components)?;
1649
1650 Ok(MessageCx::Request(
1651 modified_req.to_untyped_message()?,
1652 request_cx,
1653 ))
1654 }
1655
1656 async fn handle_message(
1657 self,
1658 message: MessageCx,
1659 cx: JrConnectionCx<Self>,
1660 conductor_tx: &mut mpsc::Sender<ConductorMessage>,
1661 ) -> Result<Handled<MessageCx>, sacp::Error> {
1662 tracing::debug!(
1663 method = ?message.message().method(),
1664 "ConductorToConductor::handle_message"
1665 );
1666 MatchMessageFrom::new(message, &cx)
1667 .if_message_from(AgentPeer, {
1668 async |message: MessageCx| {
1671 tracing::debug!(
1672 method = ?message.message().method(),
1673 "ConductorToConductor::handle_message - matched Agent"
1674 );
1675 let mut conductor_tx = conductor_tx.clone();
1676 Conductor::<Self>::incoming_message_from_agent(&mut conductor_tx, message).await
1677 }
1678 })
1679 .await
1680 .if_message_from(ClientPeer, async |message: MessageCx| {
1682 tracing::debug!(
1683 method = ?message.message().method(),
1684 "ConductorToConductor::handle_message - matched Client"
1685 );
1686 let mut conductor_tx = conductor_tx.clone();
1687 Conductor::<Self>::incoming_message_from_client(&mut conductor_tx, message).await
1688 })
1689 .await
1690 .done()
1691 }
1692}
1693
1694pub trait ConductorSuccessor<Link: ConductorLink>: Send + Sync + 'static {
1695 fn send_message<'a>(
1696 &self,
1697 message: MessageCx,
1698 conductor_cx: JrConnectionCx<Link>,
1699 responder: &'a mut ConductorResponder<Link>,
1700 ) -> BoxFuture<'a, Result<(), sacp::Error>>;
1701}
1702
1703impl<Link: ConductorLink> ConductorSuccessor<Link> for sacp::Error {
1704 fn send_message<'a>(
1705 &self,
1706 #[expect(unused_variables)] message: MessageCx,
1707 #[expect(unused_variables)] conductor_cx: JrConnectionCx<Link>,
1708 #[expect(unused_variables)] responder: &'a mut ConductorResponder<Link>,
1709 ) -> BoxFuture<'a, Result<(), sacp::Error>> {
1710 let error = self.clone();
1711 Box::pin(std::future::ready(Err(error)))
1712 }
1713}
1714
1715impl ConductorSuccessor<ConductorToConductor> for () {
1716 fn send_message<'a>(
1717 &self,
1718 message: MessageCx,
1719 conductor_cx: JrConnectionCx<ConductorToConductor>,
1720 responder: &'a mut ConductorResponder<ConductorToConductor>,
1721 ) -> BoxFuture<'a, Result<(), sacp::Error>> {
1722 Box::pin(async move {
1723 debug!("Proxy mode: forwarding successor message to conductor's successor");
1724 conductor_cx.send_proxied_message_to_via(
1725 AgentPeer,
1726 &mut responder.conductor_tx,
1727 message,
1728 )
1729 })
1730 }
1731}
1732
1733impl ConductorSuccessor<ConductorToClient> for JrConnectionCx<ConductorToAgent> {
1734 fn send_message<'a>(
1735 &self,
1736 message: MessageCx,
1737 conductor_cx: JrConnectionCx<ConductorToClient>,
1738 responder: &'a mut ConductorResponder<ConductorToClient>,
1739 ) -> BoxFuture<'a, Result<(), sacp::Error>> {
1740 let agent_cx = self.clone();
1741 Box::pin(async move {
1742 debug!("Proxy mode: forwarding successor message to conductor's successor");
1743 responder
1744 .forward_message_to_agent(conductor_cx, message, agent_cx)
1745 .await
1746 })
1747 }
1748}