use std::path::PathBuf;
use std::str::FromStr;
mod conductor;
mod debug_logger;
mod mcp_bridge;
mod snoop;
pub mod trace;
pub use self::conductor::*;
use clap::{Parser, Subcommand};
use agent_client_protocol::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
use agent_client_protocol_tokio::{AcpAgent, Stdio};
use tracing::Instrument;
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug)]
pub struct CommandLineComponents(pub Vec<AcpAgent>);
impl InstantiateProxies for CommandLineComponents {
fn instantiate_proxies(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
> {
Box::pin(async move {
let proxies = self.0.into_iter().map(DynConnectTo::new).collect();
Ok((req, proxies))
})
}
}
impl InstantiateProxiesAndAgent for CommandLineComponents {
fn instantiate_proxies_and_agent(
self: Box<Self>,
req: InitializeRequest,
) -> futures::future::BoxFuture<
'static,
Result<
(
InitializeRequest,
Vec<DynConnectTo<Conductor>>,
DynConnectTo<Client>,
),
agent_client_protocol::Error,
>,
> {
Box::pin(async move {
let mut iter = self.0.into_iter().peekable();
let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
while let Some(component) = iter.next() {
if iter.peek().is_some() {
proxies.push(DynConnectTo::new(component));
} else {
let agent = DynConnectTo::new(component);
return Ok((req, proxies, agent));
}
}
Err(agent_client_protocol::util::internal_error(
"no agent component in list",
))
})
}
}
struct TraceHandleWriter(agent_client_protocol_trace_viewer::TraceHandle);
impl trace::WriteEvent for TraceHandleWriter {
fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
let value = serde_json::to_value(event).map_err(std::io::Error::other)?;
self.0.push(value);
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub enum McpBridgeMode {
Stdio {
conductor_command: Vec<String>,
},
#[default]
Http,
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct ConductorArgs {
#[arg(long)]
pub debug: bool,
#[arg(long)]
pub debug_dir: Option<PathBuf>,
#[arg(long)]
pub log: Option<String>,
#[arg(long)]
pub trace: Option<PathBuf>,
#[arg(long)]
pub serve: bool,
#[command(subcommand)]
pub command: ConductorCommand,
}
#[derive(Subcommand, Debug)]
pub enum ConductorCommand {
Agent {
#[arg(short, long, default_value = "conductor")]
name: String,
components: Vec<String>,
},
Proxy {
#[arg(short, long, default_value = "conductor")]
name: String,
proxies: Vec<String>,
},
Mcp {
port: u16,
},
}
impl ConductorArgs {
pub async fn main(self) -> anyhow::Result<()> {
let pid = std::process::id();
let cwd = std::env::current_dir()
.map_or_else(|_| "<unknown>".to_string(), |p| p.display().to_string());
let debug_logger = if self.debug {
let components = match &self.command {
ConductorCommand::Agent { components, .. } => components.clone(),
ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
ConductorCommand::Mcp { .. } => Vec::new(),
};
Some(
debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
.await
.map_err(|e| anyhow::anyhow!("Failed to create debug logger: {e}"))?,
)
} else {
None
};
if let Some(debug_logger) = &debug_logger {
let log_level = self.log.as_deref().unwrap_or("info");
let tracing_writer = debug_logger.create_tracing_writer();
tracing_subscriber::registry()
.with(EnvFilter::new(log_level))
.with(
tracing_subscriber::fmt::layer()
.with_target(true)
.with_writer(move || tracing_writer.clone()),
)
.init();
tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
}
let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
(Some(trace_path), false) => {
let writer = trace::TraceWriter::from_path(trace_path)
.map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
(Some(writer), None)
}
(None, true) => {
let (handle, server) = agent_client_protocol_trace_viewer::serve_memory(
agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
)?;
let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
(Some(writer), Some(tokio::spawn(server)))
}
(Some(trace_path), true) => {
let writer = trace::TraceWriter::from_path(trace_path)
.map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
let server = agent_client_protocol_trace_viewer::serve_file(
trace_path.clone(),
agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
);
(Some(writer), Some(tokio::spawn(server)))
}
(None, false) => (None, None),
};
self.run(debug_logger.as_ref(), trace_writer)
.instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
.await
.map_err(|err| anyhow::anyhow!("{err}"))
}
async fn run(
self,
debug_logger: Option<&debug_logger::DebugLogger>,
trace_writer: Option<trace::TraceWriter>,
) -> Result<(), agent_client_protocol::Error> {
match self.command {
ConductorCommand::Agent { name, components } => {
initialize_conductor(
debug_logger,
trace_writer,
name,
components,
ConductorImpl::new_agent,
)
.await
}
ConductorCommand::Proxy { name, proxies } => {
initialize_conductor(
debug_logger,
trace_writer,
name,
proxies,
ConductorImpl::new_proxy,
)
.await
}
ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
}
}
}
async fn initialize_conductor<Host: ConductorHostRole>(
debug_logger: Option<&debug_logger::DebugLogger>,
trace_writer: Option<trace::TraceWriter>,
name: String,
components: Vec<String>,
new_conductor: impl FnOnce(
String,
CommandLineComponents,
crate::McpBridgeMode,
) -> ConductorImpl<Host>,
) -> Result<(), agent_client_protocol::Error> {
let providers: Vec<AcpAgent> = components
.into_iter()
.enumerate()
.map(|(i, s)| {
let mut agent = AcpAgent::from_str(&s)?;
if let Some(logger) = debug_logger {
agent = agent.with_debug(logger.create_callback(i.to_string()));
}
Ok(agent)
})
.collect::<Result<Vec<_>, agent_client_protocol::Error>>()?;
let stdio = if let Some(logger) = debug_logger {
Stdio::new().with_debug(logger.create_callback("C".to_string()))
} else {
Stdio::new()
};
let mut conductor = new_conductor(
name,
CommandLineComponents(providers),
McpBridgeMode::default(),
);
if let Some(writer) = trace_writer {
conductor = conductor.with_trace_writer(writer);
}
conductor.run(stdio).await
}