1use std::path::PathBuf;
78use std::str::FromStr;
79
80mod conductor;
82mod debug_logger;
84mod mcp_bridge;
86mod snoop;
87pub mod trace;
89
90pub use self::conductor::*;
91
92use clap::{Parser, Subcommand};
93
94use sacp::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
95use sacp_tokio::{AcpAgent, Stdio};
96use tracing::Instrument;
97use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
98
99pub struct CommandLineComponents(pub Vec<AcpAgent>);
106
107impl InstantiateProxies for CommandLineComponents {
108 fn instantiate_proxies(
109 self: Box<Self>,
110 req: InitializeRequest,
111 ) -> futures::future::BoxFuture<
112 'static,
113 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), sacp::Error>,
114 > {
115 Box::pin(async move {
116 let proxies = self.0.into_iter().map(|c| DynConnectTo::new(c)).collect();
117 Ok((req, proxies))
118 })
119 }
120}
121
122impl InstantiateProxiesAndAgent for CommandLineComponents {
123 fn instantiate_proxies_and_agent(
124 self: Box<Self>,
125 req: InitializeRequest,
126 ) -> futures::future::BoxFuture<
127 'static,
128 Result<
129 (
130 InitializeRequest,
131 Vec<DynConnectTo<Conductor>>,
132 DynConnectTo<Client>,
133 ),
134 sacp::Error,
135 >,
136 > {
137 Box::pin(async move {
138 let mut iter = self.0.into_iter().peekable();
139 let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
140
141 while let Some(component) = iter.next() {
143 if iter.peek().is_some() {
144 proxies.push(DynConnectTo::new(component));
145 } else {
146 let agent = DynConnectTo::new(component);
148 return Ok((req, proxies, agent));
149 }
150 }
151
152 Err(sacp::util::internal_error("no agent component in list"))
153 })
154 }
155}
156
157struct TraceHandleWriter(sacp_trace_viewer::TraceHandle);
159
160impl trace::WriteEvent for TraceHandleWriter {
161 fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
162 let value = serde_json::to_value(event)
163 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
164 self.0.push(value);
165 Ok(())
166 }
167}
168
169#[derive(Debug, Clone)]
171pub enum McpBridgeMode {
172 Stdio {
174 conductor_command: Vec<String>,
177 },
178
179 Http,
181}
182
183impl Default for McpBridgeMode {
184 fn default() -> Self {
185 McpBridgeMode::Http
186 }
187}
188
189#[derive(Parser, Debug)]
190#[command(author, version, about, long_about = None)]
191pub struct ConductorArgs {
192 #[arg(long)]
194 pub debug: bool,
195
196 #[arg(long)]
198 pub debug_dir: Option<PathBuf>,
199
200 #[arg(long)]
203 pub log: Option<String>,
204
205 #[arg(long)]
208 pub trace: Option<PathBuf>,
209
210 #[arg(long)]
213 pub serve: bool,
214
215 #[command(subcommand)]
216 pub command: ConductorCommand,
217}
218
219#[derive(Subcommand, Debug)]
220pub enum ConductorCommand {
221 Agent {
223 #[arg(short, long, default_value = "conductor")]
225 name: String,
226
227 components: Vec<String>,
229 },
230
231 Proxy {
233 #[arg(short, long, default_value = "conductor")]
235 name: String,
236
237 proxies: Vec<String>,
239 },
240
241 Mcp {
243 port: u16,
245 },
246}
247
248impl ConductorArgs {
249 pub async fn main(self) -> anyhow::Result<()> {
251 let pid = std::process::id();
252 let cwd = std::env::current_dir()
253 .map(|p| p.display().to_string())
254 .unwrap_or_else(|_| "<unknown>".to_string());
255
256 let debug_logger = if self.debug {
258 let components = match &self.command {
260 ConductorCommand::Agent { components, .. } => components.clone(),
261 ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
262 ConductorCommand::Mcp { .. } => Vec::new(),
263 };
264
265 Some(
267 debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
268 .await
269 .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {}", e))?,
270 )
271 } else {
272 None
273 };
274
275 if let Some(debug_logger) = &debug_logger {
276 let log_level = self.log.as_deref().unwrap_or("info");
278
279 let tracing_writer = debug_logger.create_tracing_writer();
281 tracing_subscriber::registry()
282 .with(EnvFilter::new(log_level))
283 .with(
284 tracing_subscriber::fmt::layer()
285 .with_target(true)
286 .with_writer(move || tracing_writer.clone()),
287 )
288 .init();
289
290 tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
291 };
292
293 let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
295 (Some(trace_path), false) => {
297 let writer = trace::TraceWriter::from_path(trace_path)
298 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
299 (Some(writer), None)
300 }
301 (None, true) => {
303 let (handle, server) = sacp_trace_viewer::serve_memory(
304 sacp_trace_viewer::TraceViewerConfig::default(),
305 )
306 .await?;
307 let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
308 (Some(writer), Some(tokio::spawn(server)))
309 }
310 (Some(trace_path), true) => {
312 let writer = trace::TraceWriter::from_path(trace_path)
313 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
314 let server = sacp_trace_viewer::serve_file(
315 trace_path.clone(),
316 sacp_trace_viewer::TraceViewerConfig::default(),
317 );
318 (Some(writer), Some(tokio::spawn(server)))
319 }
320 (None, false) => (None, None),
322 };
323
324 self.run(debug_logger.as_ref(), trace_writer)
325 .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
326 .await
327 .map_err(|err| anyhow::anyhow!("{err}"))
328 }
329
330 async fn run(
331 self,
332 debug_logger: Option<&debug_logger::DebugLogger>,
333 trace_writer: Option<trace::TraceWriter>,
334 ) -> Result<(), sacp::Error> {
335 match self.command {
336 ConductorCommand::Agent { name, components } => {
337 initialize_conductor(
338 debug_logger,
339 trace_writer,
340 name,
341 components,
342 |name, providers, mcp_mode| ConductorImpl::new_agent(name, providers, mcp_mode),
343 )
344 .await
345 }
346 ConductorCommand::Proxy { name, proxies } => {
347 initialize_conductor(
348 debug_logger,
349 trace_writer,
350 name,
351 proxies,
352 |name, providers, mcp_mode| ConductorImpl::new_proxy(name, providers, mcp_mode),
353 )
354 .await
355 }
356 ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
357 }
358 }
359}
360
361async fn initialize_conductor<Host: ConductorHostRole>(
362 debug_logger: Option<&debug_logger::DebugLogger>,
363 trace_writer: Option<trace::TraceWriter>,
364 name: String,
365 components: Vec<String>,
366 new_conductor: impl FnOnce(
367 String,
368 CommandLineComponents,
369 crate::McpBridgeMode,
370 ) -> ConductorImpl<Host>,
371) -> Result<(), sacp::Error> {
372 let providers: Vec<AcpAgent> = components
374 .into_iter()
375 .enumerate()
376 .map(|(i, s)| {
377 let mut agent = AcpAgent::from_str(&s)?;
378 if let Some(logger) = debug_logger {
379 agent = agent.with_debug(logger.create_callback(i.to_string()));
380 }
381 Ok(agent)
382 })
383 .collect::<Result<Vec<_>, sacp::Error>>()?;
384
385 let stdio = if let Some(logger) = debug_logger {
387 Stdio::new().with_debug(logger.create_callback("C".to_string()))
388 } else {
389 Stdio::new()
390 };
391
392 let mut conductor = new_conductor(name, CommandLineComponents(providers), Default::default());
394 if let Some(writer) = trace_writer {
395 conductor = conductor.with_trace_writer(writer);
396 }
397
398 conductor.run(stdio).await
399}