agent_client_protocol_conductor/
lib.rs1use std::path::PathBuf;
67use std::str::FromStr;
68
69mod conductor;
71mod debug_logger;
73mod snoop;
74pub mod trace;
76
77pub use self::conductor::*;
78
79use clap::{Parser, Subcommand};
80
81use agent_client_protocol::{AcpAgent, Stdio};
82use agent_client_protocol::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
83use tracing::Instrument;
84use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
85
86#[derive(Debug)]
93pub struct CommandLineComponents(pub Vec<AcpAgent>);
94
95impl InstantiateProxies for CommandLineComponents {
96 fn instantiate_proxies(
97 self: Box<Self>,
98 req: InitializeRequest,
99 ) -> futures::future::BoxFuture<
100 'static,
101 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
102 > {
103 Box::pin(async move {
104 let proxies = self.0.into_iter().map(DynConnectTo::new).collect();
105 Ok((req, proxies))
106 })
107 }
108}
109
110impl InstantiateProxiesAndAgent for CommandLineComponents {
111 fn instantiate_proxies_and_agent(
112 self: Box<Self>,
113 req: InitializeRequest,
114 ) -> futures::future::BoxFuture<
115 'static,
116 Result<
117 (
118 InitializeRequest,
119 Vec<DynConnectTo<Conductor>>,
120 DynConnectTo<Client>,
121 ),
122 agent_client_protocol::Error,
123 >,
124 > {
125 Box::pin(async move {
126 let mut iter = self.0.into_iter().peekable();
127 let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
128
129 while let Some(component) = iter.next() {
131 if iter.peek().is_some() {
132 proxies.push(DynConnectTo::new(component));
133 } else {
134 let agent = DynConnectTo::new(component);
136 return Ok((req, proxies, agent));
137 }
138 }
139
140 Err(agent_client_protocol::util::internal_error(
141 "no agent component in list",
142 ))
143 })
144 }
145}
146
147struct TraceHandleWriter(agent_client_protocol_trace_viewer::TraceHandle);
149
150impl trace::WriteEvent for TraceHandleWriter {
151 fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
152 let value = serde_json::to_value(event).map_err(std::io::Error::other)?;
153 self.0.push(value);
154 Ok(())
155 }
156}
157
158#[derive(Parser, Debug)]
159#[command(author, version, about, long_about = None)]
160pub struct ConductorArgs {
161 #[arg(long)]
163 pub debug: bool,
164
165 #[arg(long)]
167 pub debug_dir: Option<PathBuf>,
168
169 #[arg(long)]
172 pub log: Option<String>,
173
174 #[arg(long)]
177 pub trace: Option<PathBuf>,
178
179 #[arg(long)]
182 pub serve: bool,
183
184 #[command(subcommand)]
185 pub command: ConductorCommand,
186}
187
188#[derive(Subcommand, Debug)]
189pub enum ConductorCommand {
190 Agent {
192 #[arg(short, long, default_value = "conductor")]
194 name: String,
195
196 components: Vec<String>,
198 },
199
200 Proxy {
202 #[arg(short, long, default_value = "conductor")]
204 name: String,
205
206 proxies: Vec<String>,
208 },
209}
210
211impl ConductorArgs {
212 pub async fn main(self) -> anyhow::Result<()> {
214 let pid = std::process::id();
215 let cwd = std::env::current_dir()
216 .map_or_else(|_| "<unknown>".to_string(), |p| p.display().to_string());
217
218 let debug_logger = if self.debug {
220 let components = match &self.command {
222 ConductorCommand::Agent { components, .. } => components.clone(),
223 ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
224 };
225
226 Some(
228 debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
229 .await
230 .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {e}"))?,
231 )
232 } else {
233 None
234 };
235
236 if let Some(debug_logger) = &debug_logger {
237 let log_level = self.log.as_deref().unwrap_or("info");
239
240 let tracing_writer = debug_logger.create_tracing_writer();
242 tracing_subscriber::registry()
243 .with(EnvFilter::new(log_level))
244 .with(
245 tracing_subscriber::fmt::layer()
246 .with_target(true)
247 .with_writer(move || tracing_writer.clone()),
248 )
249 .init();
250
251 tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
252 }
253
254 let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
256 (Some(trace_path), false) => {
258 let writer = trace::TraceWriter::from_path(trace_path)
259 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
260 (Some(writer), None)
261 }
262 (None, true) => {
264 let (handle, server) = agent_client_protocol_trace_viewer::serve_memory(
265 agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
266 )?;
267 let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
268 (Some(writer), Some(tokio::spawn(server)))
269 }
270 (Some(trace_path), true) => {
272 let writer = trace::TraceWriter::from_path(trace_path)
273 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
274 let server = agent_client_protocol_trace_viewer::serve_file(
275 trace_path.clone(),
276 agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
277 );
278 (Some(writer), Some(tokio::spawn(server)))
279 }
280 (None, false) => (None, None),
282 };
283
284 self.run(debug_logger.as_ref(), trace_writer)
285 .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
286 .await
287 .map_err(|err| anyhow::anyhow!("{err}"))
288 }
289
290 async fn run(
291 self,
292 debug_logger: Option<&debug_logger::DebugLogger>,
293 trace_writer: Option<trace::TraceWriter>,
294 ) -> Result<(), agent_client_protocol::Error> {
295 match self.command {
296 ConductorCommand::Agent { name, components } => {
297 initialize_conductor(
298 debug_logger,
299 trace_writer,
300 name,
301 components,
302 ConductorImpl::new_agent,
303 )
304 .await
305 }
306 ConductorCommand::Proxy { name, proxies } => {
307 initialize_conductor(
308 debug_logger,
309 trace_writer,
310 name,
311 proxies,
312 ConductorImpl::new_proxy,
313 )
314 .await
315 }
316 }
317 }
318}
319
320async fn initialize_conductor<Host: ConductorHostRole>(
321 debug_logger: Option<&debug_logger::DebugLogger>,
322 trace_writer: Option<trace::TraceWriter>,
323 name: String,
324 components: Vec<String>,
325 new_conductor: impl FnOnce(String, CommandLineComponents) -> ConductorImpl<Host>,
326) -> Result<(), agent_client_protocol::Error> {
327 let providers: Vec<AcpAgent> = components
329 .into_iter()
330 .enumerate()
331 .map(|(i, s)| {
332 let mut agent = AcpAgent::from_str(&s)?;
333 if let Some(logger) = debug_logger {
334 agent = agent.with_debug(logger.create_callback(i.to_string()));
335 }
336 Ok(agent)
337 })
338 .collect::<Result<Vec<_>, agent_client_protocol::Error>>()?;
339
340 let stdio = if let Some(logger) = debug_logger {
342 Stdio::new().with_debug(logger.create_callback("C".to_string()))
343 } else {
344 Stdio::new()
345 };
346
347 let mut conductor = new_conductor(name, CommandLineComponents(providers));
349 if let Some(writer) = trace_writer {
350 conductor = conductor.with_trace_writer(writer);
351 }
352
353 conductor.run(stdio).await
354}