use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo};
use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent};
use agent_client_protocol_test::arrow_proxy::run_arrow_proxy;
use agent_client_protocol_test::test_binaries::{arrow_proxy_example, conductor_binary, testy};
use agent_client_protocol_test::testy::{Testy, TestyCommand};
use agent_client_protocol_tokio::AcpAgent;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
struct MockArrowProxy;
impl ConnectTo<Conductor> for MockArrowProxy {
async fn connect_to(
self,
client: impl ConnectTo<agent_client_protocol::Proxy>,
) -> Result<(), agent_client_protocol::Error> {
run_arrow_proxy(client).await
}
}
struct MockInnerConductor {
num_arrow_proxies: usize,
}
impl MockInnerConductor {
fn new(num_arrow_proxies: usize) -> Self {
Self { num_arrow_proxies }
}
}
impl ConnectTo<Conductor> for MockInnerConductor {
async fn connect_to(
self,
client: impl ConnectTo<agent_client_protocol::Proxy>,
) -> Result<(), agent_client_protocol::Error> {
let mut components: Vec<DynConnectTo<Conductor>> = Vec::new();
for _ in 0..self.num_arrow_proxies {
components.push(DynConnectTo::new(MockArrowProxy));
}
ConnectTo::<Conductor>::connect_to(
agent_client_protocol_conductor::ConductorImpl::new_proxy(
"inner-conductor".to_string(),
components,
McpBridgeMode::default(),
),
client,
)
.await
}
}
#[tokio::test]
async fn test_nested_conductor_with_arrow_proxies() -> Result<(), agent_client_protocol::Error> {
let (editor_write, conductor_read) = duplex(8192);
let (conductor_write, editor_read) = duplex(8192);
let conductor_handle = tokio::spawn(async move {
ConductorImpl::new_agent(
"outer-conductor".to_string(),
ProxiesAndAgent::new(Testy::new()).proxy(MockInnerConductor::new(2)),
McpBridgeMode::default(),
)
.run(agent_client_protocol::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
))
.await
});
let result = tokio::time::timeout(std::time::Duration::from_secs(30), async move {
let result = yopo::prompt(
agent_client_protocol::ByteStreams::new(
editor_write.compat_write(),
editor_read.compat(),
),
TestyCommand::Greet.to_prompt(),
)
.await?;
tracing::debug!(?result, "Received response from nested conductor chain");
expect_test::expect![[r#"
">>Hello, world!"
"#]]
.assert_debug_eq(&result);
Ok::<String, agent_client_protocol::Error>(result)
})
.await
.expect("Test timed out")
.expect("Editor failed");
tracing::info!(
?result,
"Test completed successfully with double-arrow-prefixed response from nested conductor"
);
conductor_handle.abort();
Ok(())
}
#[tokio::test]
async fn test_nested_conductor_with_external_arrow_proxies()
-> Result<(), agent_client_protocol::Error> {
let conductor_path = conductor_binary().to_string_lossy().to_string();
let arrow_proxy_path = arrow_proxy_example().to_string_lossy().to_string();
let inner_conductor = AcpAgent::from_args([
&conductor_path,
"proxy",
&arrow_proxy_path,
&arrow_proxy_path,
])?;
let agent = testy();
let (editor_write, conductor_read) = duplex(8192);
let (conductor_write, editor_read) = duplex(8192);
let conductor_handle = tokio::spawn(async move {
ConductorImpl::new_agent(
"outer-conductor".to_string(),
ProxiesAndAgent::new(agent).proxy(inner_conductor),
McpBridgeMode::default(),
)
.run(agent_client_protocol::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
))
.await
});
let result = tokio::time::timeout(std::time::Duration::from_secs(30), async move {
let result = yopo::prompt(
agent_client_protocol::ByteStreams::new(
editor_write.compat_write(),
editor_read.compat(),
),
TestyCommand::Greet.to_prompt(),
)
.await?;
tracing::debug!(?result, "Received response from nested conductor chain");
expect_test::expect![[r#"
">>Hello, world!"
"#]]
.assert_debug_eq(&result);
Ok::<String, agent_client_protocol::Error>(result)
})
.await
.expect("Test timed out")
.expect("Editor failed");
tracing::info!(
?result,
"Test completed successfully with double-arrow-prefixed response from nested conductor"
);
conductor_handle.abort();
Ok(())
}