use std::{collections::HashMap, sync::Arc};
use agent_client_protocol::{
Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage,
Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch,
};
use agent_client_protocol::{
Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest, UntypedMessage,
};
use agent_client_protocol::{
HandleDispatchFrom,
schema::{InitializeProxyRequest, InitializeRequest, NewSessionRequest},
util::MatchDispatchFrom,
};
use agent_client_protocol::{
Handled,
schema::{
McpConnectRequest, McpConnectResponse, McpDisconnectNotification, McpOverAcpMessage,
SuccessorMessage,
},
};
use futures::{
SinkExt, StreamExt,
channel::mpsc::{self},
};
use tracing::{debug, info};
use crate::conductor::mcp_bridge::{
McpBridgeConnection, McpBridgeConnectionActor, McpBridgeListeners,
};
mod mcp_bridge;
#[derive(Debug)]
pub struct ConductorImpl<Host: ConductorHostRole> {
host: Host,
name: String,
instantiator: Host::Instantiator,
mcp_bridge_mode: crate::McpBridgeMode,
trace_writer: Option<crate::trace::TraceWriter>,
}
impl<Host: ConductorHostRole> ConductorImpl<Host> {
pub fn new(
host: Host,
name: impl ToString,
instantiator: Host::Instantiator,
mcp_bridge_mode: crate::McpBridgeMode,
) -> Self {
ConductorImpl {
name: name.to_string(),
host,
instantiator,
mcp_bridge_mode,
trace_writer: None,
}
}
}
impl ConductorImpl<Agent> {
pub fn new_agent(
name: impl ToString,
instantiator: impl InstantiateProxiesAndAgent + 'static,
mcp_bridge_mode: crate::McpBridgeMode,
) -> Self {
ConductorImpl::new(Agent, name, Box::new(instantiator), mcp_bridge_mode)
}
}
impl ConductorImpl<Proxy> {
pub fn new_proxy(
name: impl ToString,
instantiator: impl InstantiateProxies + 'static,
mcp_bridge_mode: crate::McpBridgeMode,
) -> Self {
ConductorImpl::new(Proxy, name, Box::new(instantiator), mcp_bridge_mode)
}
}
impl<Host: ConductorHostRole> ConductorImpl<Host> {
#[must_use]
pub fn trace_to(mut self, dest: impl crate::trace::WriteEvent) -> Self {
self.trace_writer = Some(crate::trace::TraceWriter::new(dest));
self
}
pub fn trace_to_path(mut self, path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
self.trace_writer = Some(crate::trace::TraceWriter::from_path(path)?);
Ok(self)
}
#[must_use]
pub fn with_trace_writer(mut self, writer: crate::trace::TraceWriter) -> Self {
self.trace_writer = Some(writer);
self
}
pub async fn run(
self,
transport: impl ConnectTo<Host>,
) -> Result<(), agent_client_protocol::Error> {
let (conductor_tx, conductor_rx) = mpsc::channel(128 );
let trace_handle;
let trace_future: BoxFuture<'static, Result<(), agent_client_protocol::Error>>;
if let Some((h, f)) = self.trace_writer.map(super::trace::TraceWriter::spawn) {
trace_handle = Some(h);
trace_future = Box::pin(f);
} else {
trace_handle = None;
trace_future = Box::pin(std::future::ready(Ok(())));
}
let responder = ConductorResponder {
conductor_rx,
conductor_tx: conductor_tx.clone(),
instantiator: Some(self.instantiator),
bridge_listeners: McpBridgeListeners::default(),
bridge_connections: HashMap::default(),
mcp_bridge_mode: self.mcp_bridge_mode,
proxies: Vec::default(),
successor: Arc::new(agent_client_protocol::util::internal_error(
"successor not initialized",
)),
trace_handle,
host: self.host.clone(),
};
Builder::new_with(
self.host.clone(),
ConductorMessageHandler {
conductor_tx,
host: self.host.clone(),
},
)
.name(self.name)
.with_responder(responder)
.with_spawned(|_cx| trace_future)
.connect_to(transport)
.await
}
async fn incoming_message_from_client(
conductor_tx: &mut mpsc::Sender<ConductorMessage>,
message: Dispatch,
) -> Result<(), agent_client_protocol::Error> {
conductor_tx
.send(ConductorMessage::LeftToRight {
target_component_index: 0,
message,
})
.await
.map_err(agent_client_protocol::util::internal_error)
}
async fn incoming_message_from_agent(
conductor_tx: &mut mpsc::Sender<ConductorMessage>,
message: Dispatch,
) -> Result<(), agent_client_protocol::Error> {
conductor_tx
.send(ConductorMessage::RightToLeft {
source_component_index: SourceComponentIndex::Successor,
message,
})
.await
.map_err(agent_client_protocol::util::internal_error)
}
}
impl<Host: ConductorHostRole> ConnectTo<Host::Counterpart> for ConductorImpl<Host> {
async fn connect_to(
self,
client: impl ConnectTo<Host>,
) -> Result<(), agent_client_protocol::Error> {
self.run(client).await
}
}
struct ConductorMessageHandler<Host: ConductorHostRole> {
conductor_tx: mpsc::Sender<ConductorMessage>,
host: Host,
}
impl<Host: ConductorHostRole> HandleDispatchFrom<Host::Counterpart>
for ConductorMessageHandler<Host>
{
async fn handle_dispatch_from(
&mut self,
message: Dispatch,
connection: agent_client_protocol::ConnectionTo<Host::Counterpart>,
) -> Result<agent_client_protocol::Handled<Dispatch>, agent_client_protocol::Error> {
self.host
.handle_dispatch(message, connection, &mut self.conductor_tx)
.await
}
fn describe_chain(&self) -> impl std::fmt::Debug {
"ConductorMessageHandler"
}
}
pub struct ConductorResponder<Host>
where
Host: ConductorHostRole,
{
conductor_rx: mpsc::Receiver<ConductorMessage>,
conductor_tx: mpsc::Sender<ConductorMessage>,
bridge_listeners: McpBridgeListeners,
bridge_connections: HashMap<String, McpBridgeConnection>,
instantiator: Option<Host::Instantiator>,
proxies: Vec<ConnectionTo<Proxy>>,
successor: Arc<dyn ConductorSuccessor<Host>>,
mcp_bridge_mode: crate::McpBridgeMode,
trace_handle: Option<crate::trace::TraceHandle>,
host: Host,
}
impl<Host> std::fmt::Debug for ConductorResponder<Host>
where
Host: ConductorHostRole,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConductorResponder")
.field("conductor_rx", &self.conductor_rx)
.field("conductor_tx", &self.conductor_tx)
.field("bridge_listeners", &self.bridge_listeners)
.field("bridge_connections", &self.bridge_connections)
.field("proxies", &self.proxies)
.field("mcp_bridge_mode", &self.mcp_bridge_mode)
.field("trace_handle", &self.trace_handle)
.field("host", &self.host)
.finish_non_exhaustive()
}
}
impl<Host> RunWithConnectionTo<Host::Counterpart> for ConductorResponder<Host>
where
Host: ConductorHostRole,
{
async fn run_with_connection_to(
mut self,
connection: ConnectionTo<Host::Counterpart>,
) -> Result<(), agent_client_protocol::Error> {
while let Some(message) = self.conductor_rx.next().await {
self.handle_conductor_message(connection.clone(), message)
.await?;
}
Ok(())
}
}
impl<Host> ConductorResponder<Host>
where
Host: ConductorHostRole,
{
async fn handle_conductor_message(
&mut self,
client: ConnectionTo<Host::Counterpart>,
message: ConductorMessage,
) -> Result<(), agent_client_protocol::Error> {
tracing::debug!(?message, "handle_conductor_message");
match message {
ConductorMessage::LeftToRight {
target_component_index,
message,
} => {
self.forward_client_to_agent_message(target_component_index, message, client)
.await
}
ConductorMessage::RightToLeft {
source_component_index,
message,
} => {
tracing::debug!(
?source_component_index,
message_method = ?message.method(),
"Conductor: AgentToClient received"
);
self.send_message_to_predecessor_of(client, source_component_index, message)
}
ConductorMessage::McpConnectionReceived {
acp_url,
connection,
actor,
} => {
self.send_request_to_predecessor_of(
client,
self.proxies.len(),
McpConnectRequest {
acp_url,
meta: None,
},
)
.on_receiving_result({
let mut conductor_tx = self.conductor_tx.clone();
async move |result| {
match result {
Ok(response) => conductor_tx
.send(ConductorMessage::McpConnectionEstablished {
response,
actor,
connection,
})
.await
.map_err(|_| agent_client_protocol::Error::internal_error()),
Err(_) => {
Ok(())
}
}
}
})
}
ConductorMessage::McpConnectionEstablished {
response: McpConnectResponse { connection_id, .. },
actor,
connection,
} => {
self.bridge_connections
.insert(connection_id.clone(), connection);
client.spawn(actor.run(connection_id))
}
ConductorMessage::McpClientToMcpServer {
connection_id,
message,
} => {
let wrapped = message.map(
|request, responder| {
(
McpOverAcpMessage {
connection_id: connection_id.clone(),
message: request,
meta: None,
},
responder,
)
},
|notification| McpOverAcpMessage {
connection_id: connection_id.clone(),
message: notification,
meta: None,
},
);
self.send_message_to_predecessor_of(
client,
SourceComponentIndex::Successor,
wrapped,
)
}
ConductorMessage::McpConnectionDisconnected { notification } => {
self.bridge_connections.remove(¬ification.connection_id);
self.send_notification_to_predecessor_of(client, self.proxies.len(), notification)
}
}
}
fn send_message_to_predecessor_of<Req: JsonRpcRequest, N: JsonRpcNotification>(
&mut self,
client: ConnectionTo<Host::Counterpart>,
source_component_index: SourceComponentIndex,
message: Dispatch<Req, N>,
) -> Result<(), agent_client_protocol::Error>
where
Req::Response: Send,
{
let source_component_index = match source_component_index {
SourceComponentIndex::Successor => self.proxies.len(),
SourceComponentIndex::Proxy(index) => index,
};
match message {
Dispatch::Request(request, responder) => self
.send_request_to_predecessor_of(client, source_component_index, request)
.forward_response_to(responder),
Dispatch::Notification(notification) => self.send_notification_to_predecessor_of(
client,
source_component_index,
notification,
),
Dispatch::Response(result, router) => router.respond_with_result(result),
}
}
fn send_request_to_predecessor_of<Req: JsonRpcRequest>(
&mut self,
client_connection: ConnectionTo<Host::Counterpart>,
source_component_index: usize,
request: Req,
) -> SentRequest<Req::Response> {
if source_component_index == 0 {
client_connection.send_request_to(Client, request)
} else {
self.proxies[source_component_index - 1].send_request(SuccessorMessage {
message: request,
meta: None,
})
}
}
fn send_notification_to_predecessor_of<N: JsonRpcNotification>(
&mut self,
client: ConnectionTo<Host::Counterpart>,
source_component_index: usize,
notification: N,
) -> Result<(), agent_client_protocol::Error> {
tracing::debug!(
source_component_index,
proxies_len = self.proxies.len(),
"send_notification_to_predecessor_of"
);
if source_component_index == 0 {
tracing::debug!("Sending notification directly to client");
client.send_notification_to(Client, notification)
} else {
tracing::debug!(
target_proxy = source_component_index - 1,
"Sending notification wrapped as SuccessorMessage to proxy"
);
self.proxies[source_component_index - 1].send_notification(SuccessorMessage {
message: notification,
meta: None,
})
}
}
async fn forward_client_to_agent_message(
&mut self,
target_component_index: usize,
message: Dispatch,
client: ConnectionTo<Host::Counterpart>,
) -> Result<(), agent_client_protocol::Error> {
tracing::trace!(
target_component_index,
?message,
"forward_client_to_agent_message"
);
let message = self.ensure_initialized(client.clone(), message).await?;
if target_component_index < self.proxies.len() {
self.forward_message_from_client_to_proxy(target_component_index, message)
.await
} else {
assert_eq!(target_component_index, self.proxies.len());
debug!(
target_component_index,
proxies_count = self.proxies.len(),
"Proxy mode: forwarding successor message to conductor's successor"
);
let successor = self.successor.clone();
successor.send_message(message, client, self).await
}
}
async fn ensure_initialized(
&mut self,
client: ConnectionTo<Host::Counterpart>,
message: Dispatch,
) -> Result<Dispatch, Error> {
let Some(instantiator) = self.instantiator.take() else {
return Ok(message);
};
let host = self.host.clone();
let message = host.initialize(message, client, instantiator, self).await?;
Ok(message)
}
fn trace_proxy(
&self,
proxy_index: ComponentIndex,
successor_index: ComponentIndex,
component: impl ConnectTo<Conductor>,
) -> DynConnectTo<Conductor> {
match &self.trace_handle {
Some(trace_handle) => {
trace_handle.bridge_component(proxy_index, successor_index, component)
}
None => DynConnectTo::new(component),
}
}
fn spawn_proxies(
&mut self,
client: ConnectionTo<Host::Counterpart>,
proxy_components: Vec<DynConnectTo<Conductor>>,
) -> Result<(), agent_client_protocol::Error> {
assert!(self.proxies.is_empty());
let num_proxies = proxy_components.len();
info!(proxy_count = num_proxies, "spawn_proxies");
if self.trace_handle.is_some() && num_proxies == 0 {
self.connect_to_proxy(
&client,
0,
ComponentIndex::Client,
ComponentIndex::Agent,
Proxy.builder(),
)?;
} else {
for (component_index, dyn_component) in proxy_components.into_iter().enumerate() {
debug!(component_index, "spawning proxy");
self.connect_to_proxy(
&client,
component_index,
ComponentIndex::Proxy(component_index),
ComponentIndex::successor_of(component_index, num_proxies),
dyn_component,
)?;
}
}
info!(proxy_count = self.proxies.len(), "Proxies spawned");
Ok(())
}
fn connect_to_proxy(
&mut self,
client: &ConnectionTo<Host::Counterpart>,
component_index: usize,
trace_proxy_index: ComponentIndex,
trace_successor_index: ComponentIndex,
component: impl ConnectTo<Conductor>,
) -> Result<(), Error> {
let connection_builder = self.connection_to_proxy(component_index);
let connect_component =
self.trace_proxy(trace_proxy_index, trace_successor_index, component);
let proxy_connection = client.spawn_connection(connection_builder, connect_component)?;
self.proxies.push(proxy_connection);
Ok(())
}
fn connection_to_proxy(
&mut self,
component_index: usize,
) -> Builder<Conductor, impl HandleDispatchFrom<Proxy> + 'static> {
type SuccessorDispatch = Dispatch<SuccessorMessage, SuccessorMessage>;
let mut conductor_tx = self.conductor_tx.clone();
Conductor
.builder()
.name(format!("conductor-to-component({component_index})"))
.on_receive_dispatch(
async move |dispatch: Dispatch, _connection| {
MatchDispatch::new(dispatch)
.if_message(async |dispatch: SuccessorDispatch| {
conductor_tx
.send(ConductorMessage::LeftToRight {
target_component_index: component_index + 1,
message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
})
.await
.map_err(agent_client_protocol::util::internal_error)
})
.await
.otherwise(async |dispatch| {
let message = ConductorMessage::RightToLeft {
source_component_index: SourceComponentIndex::Proxy(
component_index,
),
message: dispatch,
};
conductor_tx
.send(message)
.await
.map_err(agent_client_protocol::util::internal_error)
})
.await
},
agent_client_protocol::on_receive_dispatch!(),
)
}
async fn forward_message_from_client_to_proxy(
&mut self,
target_component_index: usize,
message: Dispatch,
) -> Result<(), agent_client_protocol::Error> {
tracing::debug!(?message, "forward_message_to_proxy");
MatchDispatch::new(message)
.if_request(async |_request: InitializeProxyRequest, responder| {
responder.respond_with_error(
agent_client_protocol::Error::invalid_request()
.data("initialize/proxy requests are only sent by the conductor"),
)
})
.await
.if_request(async |request: InitializeRequest, responder| {
self.proxies[target_component_index]
.send_request(InitializeProxyRequest::from(request))
.on_receiving_result(async move |result| {
tracing::debug!(?result, "got initialize_proxy response from proxy");
responder.respond_with_result(result)
})
})
.await
.otherwise(async |message| {
self.proxies[target_component_index].send_proxied_message(message)
})
.await
}
async fn forward_message_to_agent(
&mut self,
client_connection: ConnectionTo<Host::Counterpart>,
message: Dispatch,
agent_connection: ConnectionTo<Agent>,
) -> Result<(), Error> {
MatchDispatch::new(message)
.if_request(async |_request: InitializeProxyRequest, responder| {
responder.respond_with_error(
agent_client_protocol::Error::invalid_request()
.data("initialize/proxy requests are only sent by the conductor"),
)
})
.await
.if_request(async |mut request: NewSessionRequest, responder| {
for mcp_server in &mut request.mcp_servers {
self.bridge_listeners
.transform_mcp_server(
client_connection.clone(),
mcp_server,
&self.conductor_tx,
&self.mcp_bridge_mode,
)
.await?;
}
agent_connection
.send_request(request)
.forward_response_to(responder)
})
.await
.if_request(
async |request: McpOverAcpMessage<UntypedMessage>, responder| {
let McpOverAcpMessage {
connection_id,
message: mcp_request,
..
} = request;
self.bridge_connections
.get_mut(&connection_id)
.ok_or_else(|| {
agent_client_protocol::util::internal_error(format!(
"unknown connection id: {connection_id}"
))
})?
.send(Dispatch::Request(mcp_request, responder))
.await
},
)
.await
.if_notification(async |notification: McpOverAcpMessage<UntypedMessage>| {
let McpOverAcpMessage {
connection_id,
message: mcp_notification,
..
} = notification;
self.bridge_connections
.get_mut(&connection_id)
.ok_or_else(|| {
agent_client_protocol::util::internal_error(format!(
"unknown connection id: {connection_id}"
))
})?
.send(Dispatch::Notification(mcp_notification))
.await
})
.await
.otherwise(async |message| {
agent_connection.send_proxied_message_to(Agent, message)
})
.await
}
}
#[derive(Debug, Clone, Copy)]
pub enum ComponentIndex {
Client,
Proxy(usize),
Agent,
}
impl ComponentIndex {
#[must_use]
pub fn predecessor_of(proxy_index: usize) -> Self {
match proxy_index.checked_sub(1) {
Some(p_i) => ComponentIndex::Proxy(p_i),
None => ComponentIndex::Client,
}
}
#[must_use]
pub fn successor_of(proxy_index: usize, num_proxies: usize) -> Self {
if proxy_index == num_proxies {
ComponentIndex::Agent
} else {
ComponentIndex::Proxy(proxy_index + 1)
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum SourceComponentIndex {
Proxy(usize),
Successor,
}
pub trait InstantiateProxies: Send {
fn instantiate_proxies(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
>;
}
impl<T> InstantiateProxies for Vec<T>
where
T: ConnectTo<Conductor> + 'static,
{
fn instantiate_proxies(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
> {
Box::pin(async move {
let components: Vec<DynConnectTo<Conductor>> =
(*self).into_iter().map(|c| DynConnectTo::new(c)).collect();
Ok((req, components))
})
}
}
impl<F, Fut> InstantiateProxies for F
where
F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
Fut: std::future::Future<
Output = Result<
(InitializeRequest, Vec<DynConnectTo<Conductor>>),
agent_client_protocol::Error,
>,
> + Send
+ 'static,
{
fn instantiate_proxies(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
> {
Box::pin(async move { (*self)(req).await })
}
}
pub trait InstantiateProxiesAndAgent: Send {
fn instantiate_proxies_and_agent(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<
(
InitializeRequest,
Vec<DynConnectTo<Conductor>>,
DynConnectTo<Client>,
),
agent_client_protocol::Error,
>,
>;
}
#[derive(Debug)]
pub struct AgentOnly<A>(pub A);
impl<A: ConnectTo<Client> + 'static> InstantiateProxiesAndAgent for AgentOnly<A> {
fn instantiate_proxies_and_agent(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<
(
InitializeRequest,
Vec<DynConnectTo<Conductor>>,
DynConnectTo<Client>,
),
agent_client_protocol::Error,
>,
> {
Box::pin(async move { Ok((req, Vec::new(), DynConnectTo::new(self.0))) })
}
}
#[derive(Debug)]
pub struct ProxiesAndAgent {
proxies: Vec<DynConnectTo<Conductor>>,
agent: DynConnectTo<Client>,
}
impl ProxiesAndAgent {
pub fn new(agent: impl ConnectTo<Client> + 'static) -> Self {
Self {
proxies: vec![],
agent: DynConnectTo::new(agent),
}
}
#[must_use]
pub fn proxy(mut self, proxy: impl ConnectTo<Conductor> + 'static) -> Self {
self.proxies.push(DynConnectTo::new(proxy));
self
}
#[must_use]
pub fn proxies<P, I>(mut self, proxies: I) -> Self
where
P: ConnectTo<Conductor> + 'static,
I: IntoIterator<Item = P>,
{
self.proxies
.extend(proxies.into_iter().map(DynConnectTo::new));
self
}
}
impl InstantiateProxiesAndAgent for ProxiesAndAgent {
fn instantiate_proxies_and_agent(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<
(
InitializeRequest,
Vec<DynConnectTo<Conductor>>,
DynConnectTo<Client>,
),
agent_client_protocol::Error,
>,
> {
Box::pin(async move { Ok((req, self.proxies, self.agent)) })
}
}
impl<F, Fut> InstantiateProxiesAndAgent for F
where
F: FnOnce(InitializeRequest) -> Fut + Send + 'static,
Fut: std::future::Future<
Output = Result<
(
InitializeRequest,
Vec<DynConnectTo<Conductor>>,
DynConnectTo<Client>,
),
agent_client_protocol::Error,
>,
> + Send
+ 'static,
{
fn instantiate_proxies_and_agent(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<
(
InitializeRequest,
Vec<DynConnectTo<Conductor>>,
DynConnectTo<Client>,
),
agent_client_protocol::Error,
>,
> {
Box::pin(async move { (*self)(req).await })
}
}
#[derive(Debug)]
pub enum ConductorMessage {
LeftToRight {
target_component_index: usize,
message: Dispatch,
},
RightToLeft {
source_component_index: SourceComponentIndex,
message: Dispatch,
},
McpConnectionReceived {
acp_url: String,
actor: McpBridgeConnectionActor,
connection: McpBridgeConnection,
},
McpConnectionEstablished {
response: McpConnectResponse,
actor: McpBridgeConnectionActor,
connection: McpBridgeConnection,
},
McpClientToMcpServer {
connection_id: String,
message: Dispatch,
},
McpConnectionDisconnected {
notification: McpDisconnectNotification,
},
}
pub trait ConductorHostRole: Role<Counterpart: HasPeer<Client>> {
type Instantiator: Send;
fn initialize(
&self,
message: Dispatch,
connection: ConnectionTo<Self::Counterpart>,
instantiator: Self::Instantiator,
responder: &mut ConductorResponder<Self>,
) -> impl Future<Output = Result<Dispatch, agent_client_protocol::Error>> + Send;
fn handle_dispatch(
&self,
message: Dispatch,
connection: ConnectionTo<Self::Counterpart>,
conductor_tx: &mut mpsc::Sender<ConductorMessage>,
) -> impl Future<Output = Result<Handled<Dispatch>, agent_client_protocol::Error>> + Send;
}
impl ConductorHostRole for Agent {
type Instantiator = Box<dyn InstantiateProxiesAndAgent>;
async fn initialize(
&self,
message: Dispatch,
client_connection: ConnectionTo<Client>,
instantiator: Self::Instantiator,
responder: &mut ConductorResponder<Self>,
) -> Result<Dispatch, agent_client_protocol::Error> {
let invalid_request = || Error::invalid_request().data("expected `initialize` request");
let Dispatch::Request(request, init_responder) = message else {
message.respond_with_error(invalid_request(), client_connection.clone())?;
return Err(invalid_request());
};
if !InitializeRequest::matches_method(request.method()) {
init_responder.respond_with_error(invalid_request())?;
return Err(invalid_request());
}
let init_request =
match InitializeRequest::parse_message(request.method(), request.params()) {
Ok(r) => r,
Err(error) => {
init_responder.respond_with_error(error)?;
return Err(invalid_request());
}
};
let (modified_req, proxy_components, agent_component) = instantiator
.instantiate_proxies_and_agent(init_request)
.await?;
debug!(?agent_component, "spawning agent");
let connection_to_agent = client_connection.spawn_connection(
Client
.builder()
.name("conductor-to-agent")
.on_receive_dispatch(
{
let mut conductor_tx = responder.conductor_tx.clone();
async move |dispatch: Dispatch, _cx| {
conductor_tx
.send(ConductorMessage::RightToLeft {
source_component_index: SourceComponentIndex::Successor,
message: dispatch,
})
.await
.map_err(agent_client_protocol::util::internal_error)
}
},
agent_client_protocol::on_receive_dispatch!(),
),
agent_component,
)?;
responder.successor = Arc::new(connection_to_agent);
responder.spawn_proxies(client_connection.clone(), proxy_components)?;
Ok(Dispatch::Request(
modified_req.to_untyped_message()?,
init_responder,
))
}
async fn handle_dispatch(
&self,
message: Dispatch,
client_connection: ConnectionTo<Client>,
conductor_tx: &mut mpsc::Sender<ConductorMessage>,
) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
tracing::debug!(
method = ?message.method(),
"ConductorToClient::handle_dispatch"
);
MatchDispatchFrom::new(message, &client_connection)
.if_message_from(Client, async move |message: Dispatch| {
tracing::debug!(
method = ?message.method(),
"ConductorToClient::handle_dispatch - matched Client"
);
ConductorImpl::<Self>::incoming_message_from_client(conductor_tx, message).await
})
.await
.done()
}
}
impl ConductorHostRole for Proxy {
type Instantiator = Box<dyn InstantiateProxies>;
async fn initialize(
&self,
message: Dispatch,
client_connection: ConnectionTo<Conductor>,
instantiator: Self::Instantiator,
responder: &mut ConductorResponder<Self>,
) -> Result<Dispatch, agent_client_protocol::Error> {
let invalid_request = || Error::invalid_request().data("expected `initialize` request");
let Dispatch::Request(request, init_responder) = message else {
message.respond_with_error(invalid_request(), client_connection.clone())?;
return Err(invalid_request());
};
if !InitializeProxyRequest::matches_method(request.method()) {
init_responder.respond_with_error(invalid_request())?;
return Err(invalid_request());
}
let InitializeProxyRequest { initialize } =
match InitializeProxyRequest::parse_message(request.method(), request.params()) {
Ok(r) => r,
Err(error) => {
init_responder.respond_with_error(error)?;
return Err(invalid_request());
}
};
tracing::debug!("ensure_initialized: InitializeProxyRequest (proxy mode)");
let (modified_req, proxy_components) = instantiator.instantiate_proxies(initialize).await?;
responder.successor = Arc::new(GrandSuccessor);
responder.spawn_proxies(client_connection.clone(), proxy_components)?;
Ok(Dispatch::Request(
modified_req.to_untyped_message()?,
init_responder,
))
}
async fn handle_dispatch(
&self,
message: Dispatch,
client_connection: ConnectionTo<Conductor>,
conductor_tx: &mut mpsc::Sender<ConductorMessage>,
) -> Result<Handled<Dispatch>, agent_client_protocol::Error> {
tracing::debug!(
method = ?message.method(),
?message,
"ConductorToConductor::handle_dispatch"
);
MatchDispatchFrom::new(message, &client_connection)
.if_message_from(Agent, {
async |message: Dispatch| {
tracing::debug!(
method = ?message.method(),
"ConductorToConductor::handle_dispatch - matched Agent"
);
let mut conductor_tx = conductor_tx.clone();
ConductorImpl::<Self>::incoming_message_from_agent(&mut conductor_tx, message)
.await
}
})
.await
.if_message_from(Client, async |message: Dispatch| {
tracing::debug!(
method = ?message.method(),
"ConductorToConductor::handle_dispatch - matched Client"
);
let mut conductor_tx = conductor_tx.clone();
ConductorImpl::<Self>::incoming_message_from_client(&mut conductor_tx, message)
.await
})
.await
.done()
}
}
pub trait ConductorSuccessor<Host: ConductorHostRole>: Send + Sync + 'static {
fn send_message<'a>(
&self,
message: Dispatch,
connection_to_conductor: ConnectionTo<Host::Counterpart>,
responder: &'a mut ConductorResponder<Host>,
) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>>;
}
impl<Host: ConductorHostRole> ConductorSuccessor<Host> for agent_client_protocol::Error {
fn send_message<'a>(
&self,
#[expect(unused_variables)] message: Dispatch,
#[expect(unused_variables)] connection_to_conductor: ConnectionTo<Host::Counterpart>,
#[expect(unused_variables)] responder: &'a mut ConductorResponder<Host>,
) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
let error = self.clone();
Box::pin(std::future::ready(Err(error)))
}
}
struct GrandSuccessor;
impl ConductorSuccessor<Proxy> for GrandSuccessor {
fn send_message<'a>(
&self,
message: Dispatch,
connection: ConnectionTo<Conductor>,
_responder: &'a mut ConductorResponder<Proxy>,
) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
Box::pin(async move {
debug!("Proxy mode: forwarding successor message to conductor's successor");
connection.send_proxied_message_to(Agent, message)
})
}
}
impl ConductorSuccessor<Agent> for ConnectionTo<Agent> {
fn send_message<'a>(
&self,
message: Dispatch,
connection: ConnectionTo<Client>,
responder: &'a mut ConductorResponder<Agent>,
) -> BoxFuture<'a, Result<(), agent_client_protocol::Error>> {
let connection_to_agent = self.clone();
Box::pin(async move {
debug!("Proxy mode: forwarding successor message to conductor's successor");
responder
.forward_message_to_agent(connection, message, connection_to_agent)
.await
})
}
}