1use std::path::PathBuf;
78use std::str::FromStr;
79
80mod conductor;
82mod debug_logger;
84mod mcp_bridge;
86pub mod trace;
88
89pub use self::conductor::*;
90
91use clap::{Parser, Subcommand};
92
93use sacp::link::{AgentToClient, ProxyToConductor};
94use sacp::schema::InitializeRequest;
95use sacp_tokio::{AcpAgent, Stdio};
96use tracing::Instrument;
97use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
98
99pub struct CommandLineComponents(pub Vec<AcpAgent>);
106
107impl InstantiateProxies for CommandLineComponents {
108 fn instantiate_proxies(
109 self: Box<Self>,
110 req: InitializeRequest,
111 ) -> futures::future::BoxFuture<
112 'static,
113 Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
114 > {
115 Box::pin(async move {
116 let proxies = self
117 .0
118 .into_iter()
119 .map(|c| sacp::DynComponent::new(c))
120 .collect();
121 Ok((req, proxies))
122 })
123 }
124}
125
126impl InstantiateProxiesAndAgent for CommandLineComponents {
127 fn instantiate_proxies_and_agent(
128 self: Box<Self>,
129 req: InitializeRequest,
130 ) -> futures::future::BoxFuture<
131 'static,
132 Result<
133 (
134 InitializeRequest,
135 Vec<sacp::DynComponent<ProxyToConductor>>,
136 sacp::DynComponent<AgentToClient>,
137 ),
138 sacp::Error,
139 >,
140 > {
141 Box::pin(async move {
142 let mut iter = self.0.into_iter().peekable();
143 let mut proxies: Vec<sacp::DynComponent<ProxyToConductor>> = Vec::new();
144
145 while let Some(component) = iter.next() {
147 if iter.peek().is_some() {
148 proxies.push(sacp::DynComponent::new(component));
149 } else {
150 let agent = sacp::DynComponent::new(component);
152 return Ok((req, proxies, agent));
153 }
154 }
155
156 Err(sacp::util::internal_error("no agent component in list"))
157 })
158 }
159}
160
161struct TraceHandleWriter(sacp_trace_viewer::TraceHandle);
163
164impl trace::WriteEvent for TraceHandleWriter {
165 fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
166 let value = serde_json::to_value(event)
167 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
168 self.0.push(value);
169 Ok(())
170 }
171}
172
173#[derive(Debug, Clone)]
175pub enum McpBridgeMode {
176 Stdio {
178 conductor_command: Vec<String>,
181 },
182
183 Http,
185}
186
187impl Default for McpBridgeMode {
188 fn default() -> Self {
189 McpBridgeMode::Http
190 }
191}
192
193#[derive(Parser, Debug)]
194#[command(author, version, about, long_about = None)]
195pub struct ConductorArgs {
196 #[arg(long)]
198 pub debug: bool,
199
200 #[arg(long)]
202 pub debug_dir: Option<PathBuf>,
203
204 #[arg(long)]
207 pub log: Option<String>,
208
209 #[arg(long)]
212 pub trace: Option<PathBuf>,
213
214 #[arg(long)]
217 pub serve: bool,
218
219 #[command(subcommand)]
220 pub command: ConductorCommand,
221}
222
223#[derive(Subcommand, Debug)]
224pub enum ConductorCommand {
225 Agent {
227 #[arg(short, long, default_value = "conductor")]
229 name: String,
230
231 components: Vec<String>,
233 },
234
235 Proxy {
237 #[arg(short, long, default_value = "conductor")]
239 name: String,
240
241 proxies: Vec<String>,
243 },
244
245 Mcp {
247 port: u16,
249 },
250}
251
252impl ConductorArgs {
253 pub async fn main(self) -> anyhow::Result<()> {
255 let pid = std::process::id();
256 let cwd = std::env::current_dir()
257 .map(|p| p.display().to_string())
258 .unwrap_or_else(|_| "<unknown>".to_string());
259
260 let debug_logger = if self.debug {
262 let components = match &self.command {
264 ConductorCommand::Agent { components, .. } => components.clone(),
265 ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
266 ConductorCommand::Mcp { .. } => Vec::new(),
267 };
268
269 Some(
271 debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
272 .await
273 .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {}", e))?,
274 )
275 } else {
276 None
277 };
278
279 if let Some(debug_logger) = &debug_logger {
280 let log_level = self.log.as_deref().unwrap_or("info");
282
283 let tracing_writer = debug_logger.create_tracing_writer();
285 tracing_subscriber::registry()
286 .with(EnvFilter::new(log_level))
287 .with(
288 tracing_subscriber::fmt::layer()
289 .with_target(true)
290 .with_writer(move || tracing_writer.clone()),
291 )
292 .init();
293
294 tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
295 };
296
297 let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
299 (Some(trace_path), false) => {
301 let writer = trace::TraceWriter::from_path(trace_path)
302 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
303 (Some(writer), None)
304 }
305 (None, true) => {
307 let (handle, server) = sacp_trace_viewer::serve_memory(
308 sacp_trace_viewer::TraceViewerConfig::default(),
309 )
310 .await?;
311 let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
312 (Some(writer), Some(tokio::spawn(server)))
313 }
314 (Some(trace_path), true) => {
316 let writer = trace::TraceWriter::from_path(trace_path)
317 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
318 let server = sacp_trace_viewer::serve_file(
319 trace_path.clone(),
320 sacp_trace_viewer::TraceViewerConfig::default(),
321 );
322 (Some(writer), Some(tokio::spawn(server)))
323 }
324 (None, false) => (None, None),
326 };
327
328 self.run(debug_logger.as_ref(), trace_writer)
329 .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
330 .await
331 .map_err(|err| anyhow::anyhow!("{err}"))
332 }
333
334 async fn run(
335 self,
336 debug_logger: Option<&debug_logger::DebugLogger>,
337 trace_writer: Option<trace::TraceWriter>,
338 ) -> Result<(), sacp::Error> {
339 match self.command {
340 ConductorCommand::Agent { name, components } => {
341 initialize_conductor(
342 debug_logger,
343 trace_writer,
344 name,
345 components,
346 |name, providers, mcp_mode| Conductor::new_agent(name, providers, mcp_mode),
347 )
348 .await
349 }
350 ConductorCommand::Proxy { name, proxies } => {
351 initialize_conductor(
352 debug_logger,
353 trace_writer,
354 name,
355 proxies,
356 |name, providers, mcp_mode| Conductor::new_proxy(name, providers, mcp_mode),
357 )
358 .await
359 }
360 ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
361 }
362 }
363}
364
365async fn initialize_conductor<Link: ConductorLink>(
366 debug_logger: Option<&debug_logger::DebugLogger>,
367 trace_writer: Option<trace::TraceWriter>,
368 name: String,
369 components: Vec<String>,
370 new_conductor: impl FnOnce(String, CommandLineComponents, crate::McpBridgeMode) -> Conductor<Link>,
371) -> Result<(), sacp::Error> {
372 let providers: Vec<AcpAgent> = components
374 .into_iter()
375 .enumerate()
376 .map(|(i, s)| {
377 let mut agent = AcpAgent::from_str(&s)?;
378 if let Some(logger) = debug_logger {
379 agent = agent.with_debug(logger.create_callback(i.to_string()));
380 }
381 Ok(agent)
382 })
383 .collect::<Result<Vec<_>, sacp::Error>>()?;
384
385 let stdio = if let Some(logger) = debug_logger {
387 Stdio::new().with_debug(logger.create_callback("C".to_string()))
388 } else {
389 Stdio::new()
390 };
391
392 let mut conductor = new_conductor(name, CommandLineComponents(providers), Default::default());
394 if let Some(writer) = trace_writer {
395 conductor = conductor.with_trace_writer(writer);
396 }
397
398 conductor
399 .into_connection_builder()
400 .connect_to(stdio)?
401 .serve()
402 .await
403}