agent_client_protocol_conductor/lib.rs
1//! # agent-client-protocol-conductor
2//!
3//! Binary for orchestrating ACP proxy chains.
4//!
5//! ## What is the conductor?
6//!
7//! The conductor is a tool that manages proxy chains - it spawns proxy components and the base agent,
8//! then routes messages between them. From the editor's perspective, the conductor appears as a single ACP agent.
9//!
10//! ```text
11//! Editor ← stdio → Conductor → Proxy 1 → Proxy 2 → Agent
12//! ```
13//!
14//! ## Usage
15//!
16//! ### Agent Mode
17//!
18//! Orchestrate a chain of proxies in front of an agent:
19//!
20//! ```bash
21//! # Chain format: proxy1 proxy2 ... agent
22//! agent-client-protocol-conductor agent "python proxy1.py" "python proxy2.py" "python base-agent.py"
23//! ```
24//!
25//! The conductor:
26//! 1. Spawns each component as a subprocess
27//! 2. Connects them in a chain
28//! 3. Presents as a single agent on stdin/stdout
29//! 4. Manages the lifecycle of all processes
30//!
31//! ### MCP Bridge Mode
32//!
33//! Connect stdio to a TCP-based MCP server:
34//!
35//! ```bash
36//! # Bridge stdio to MCP server on localhost:8080
37//! agent-client-protocol-conductor mcp 8080
38//! ```
39//!
40//! This allows stdio-based tools to communicate with TCP MCP servers.
41//!
42//! ## How It Works
43//!
44//! **Component Communication:**
45//! - Editor talks to conductor via stdio
46//! - Conductor uses `_proxy/successor/*` protocol extensions to route messages
47//! - Each proxy can intercept, transform, or forward messages
48//! - Final agent receives standard ACP messages
49//!
50//! **Process Management:**
51//! - All components are spawned as child processes
52//! - When conductor exits, all children are terminated
53//! - Errors in any component bring down the entire chain
54//!
55//! ## Example Use Case
56//!
57//! Add Sparkle embodiment + custom tools to any agent:
58//!
59//! ```bash
60//! agent-client-protocol-conductor agent \
61//! "sparkle-acp-proxy" \
62//! "my-custom-tools-proxy" \
63//! "claude-agent"
64//! ```
65//!
66//! This creates a stack where:
67//! 1. Sparkle proxy injects MCP servers and prepends embodiment
68//! 2. Custom tools proxy adds domain-specific functionality
69//! 3. Base agent handles the actual AI responses
70//!
71//! ## Related Crates
72//!
73//! - **[agent-client-protocol-proxy](https://crates.io/crates/agent-client-protocol-proxy)** - Framework for building proxy components
74//! - **[agent-client-protocol](https://crates.io/crates/agent-client-protocol)** - Core ACP SDK
75//! - **[agent-client-protocol-tokio](https://crates.io/crates/agent-client-protocol-tokio)** - Tokio utilities for process spawning
76
77use std::path::PathBuf;
78use std::str::FromStr;
79
80/// Core conductor logic for orchestrating proxy chains
81mod conductor;
82/// Debug logging for conductor
83mod debug_logger;
84/// MCP bridge functionality for TCP-based MCP servers
85mod mcp_bridge;
86mod snoop;
87/// Trace event types for sequence diagram viewer
88pub mod trace;
89
90pub use self::conductor::*;
91
92use clap::{Parser, Subcommand};
93
94use agent_client_protocol::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
95use agent_client_protocol_tokio::{AcpAgent, Stdio};
96use tracing::Instrument;
97use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
98
99/// Wrapper for command-line component lists that can serve as either
100/// proxies-only (for proxy mode) or proxies+agent (for agent mode).
101///
102/// This exists because `AcpAgent` implements `Component<L>` for all `L`,
103/// so a `Vec<AcpAgent>` can be used as either a list of proxies or as
104/// proxies + final agent depending on the conductor mode.
105#[derive(Debug)]
106pub struct CommandLineComponents(pub Vec<AcpAgent>);
107
108impl InstantiateProxies for CommandLineComponents {
109 fn instantiate_proxies(
110 self: Box<Self>,
111 req: InitializeRequest,
112 ) -> futures::future::BoxFuture<
113 'static,
114 Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
115 > {
116 Box::pin(async move {
117 let proxies = self.0.into_iter().map(DynConnectTo::new).collect();
118 Ok((req, proxies))
119 })
120 }
121}
122
123impl InstantiateProxiesAndAgent for CommandLineComponents {
124 fn instantiate_proxies_and_agent(
125 self: Box<Self>,
126 req: InitializeRequest,
127 ) -> futures::future::BoxFuture<
128 'static,
129 Result<
130 (
131 InitializeRequest,
132 Vec<DynConnectTo<Conductor>>,
133 DynConnectTo<Client>,
134 ),
135 agent_client_protocol::Error,
136 >,
137 > {
138 Box::pin(async move {
139 let mut iter = self.0.into_iter().peekable();
140 let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
141
142 // All but the last element are proxies
143 while let Some(component) = iter.next() {
144 if iter.peek().is_some() {
145 proxies.push(DynConnectTo::new(component));
146 } else {
147 // Last element is the agent
148 let agent = DynConnectTo::new(component);
149 return Ok((req, proxies, agent));
150 }
151 }
152
153 Err(agent_client_protocol::util::internal_error(
154 "no agent component in list",
155 ))
156 })
157 }
158}
159
160/// Wrapper to implement WriteEvent for TraceHandle.
161struct TraceHandleWriter(agent_client_protocol_trace_viewer::TraceHandle);
162
163impl trace::WriteEvent for TraceHandleWriter {
164 fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
165 let value = serde_json::to_value(event).map_err(std::io::Error::other)?;
166 self.0.push(value);
167 Ok(())
168 }
169}
170
171/// Mode for the MCP bridge.
172#[derive(Debug, Clone, Default)]
173pub enum McpBridgeMode {
174 /// Use stdio-based MCP bridge with a conductor subprocess.
175 Stdio {
176 /// Command and args to spawn conductor MCP bridge processes.
177 /// E.g., vec!["conductor"] or vec!["cargo", "run", "-p", "conductor", "--"]
178 conductor_command: Vec<String>,
179 },
180
181 /// Use HTTP-based MCP bridge
182 #[default]
183 Http,
184}
185
186#[derive(Parser, Debug)]
187#[command(author, version, about, long_about = None)]
188pub struct ConductorArgs {
189 /// Enable debug logging of all stdin/stdout/stderr from components
190 #[arg(long)]
191 pub debug: bool,
192
193 /// Directory for debug log files (defaults to current directory)
194 #[arg(long)]
195 pub debug_dir: Option<PathBuf>,
196
197 /// Set log level (e.g., "trace", "debug", "info", "warn", "error", or module-specific like "conductor=debug")
198 /// Only applies when --debug is enabled
199 #[arg(long)]
200 pub log: Option<String>,
201
202 /// Path to write trace events for sequence diagram visualization.
203 /// Events are written as newline-delimited JSON (.jsons format).
204 #[arg(long)]
205 pub trace: Option<PathBuf>,
206
207 /// Serve trace viewer in browser with live updates.
208 /// Can be used alone (in-memory) or with --trace (file-backed).
209 #[arg(long)]
210 pub serve: bool,
211
212 #[command(subcommand)]
213 pub command: ConductorCommand,
214}
215
216#[derive(Subcommand, Debug)]
217pub enum ConductorCommand {
218 /// Run as agent orchestrator managing a proxy chain
219 Agent {
220 /// Name of the agent
221 #[arg(short, long, default_value = "conductor")]
222 name: String,
223
224 /// List of commands to chain together; the final command must be the agent.
225 components: Vec<String>,
226 },
227
228 /// Run as a proxy orchestrating a proxy chain
229 Proxy {
230 /// Name of the proxy
231 #[arg(short, long, default_value = "conductor")]
232 name: String,
233
234 /// List of proxy commands to chain together
235 proxies: Vec<String>,
236 },
237
238 /// Run as MCP bridge connecting stdio to TCP
239 Mcp {
240 /// TCP port to connect to on localhost
241 port: u16,
242 },
243}
244
245impl ConductorArgs {
246 /// Main entry point that sets up tracing and runs the conductor
247 pub async fn main(self) -> anyhow::Result<()> {
248 let pid = std::process::id();
249 let cwd = std::env::current_dir()
250 .map_or_else(|_| "<unknown>".to_string(), |p| p.display().to_string());
251
252 // Only set up tracing if --debug is enabled
253 let debug_logger = if self.debug {
254 // Extract proxy list to create the debug logger
255 let components = match &self.command {
256 ConductorCommand::Agent { components, .. } => components.clone(),
257 ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
258 ConductorCommand::Mcp { .. } => Vec::new(),
259 };
260
261 // Create debug logger
262 Some(
263 debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
264 .await
265 .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {e}"))?,
266 )
267 } else {
268 None
269 };
270
271 if let Some(debug_logger) = &debug_logger {
272 // Set up log level from --log flag, defaulting to "info"
273 let log_level = self.log.as_deref().unwrap_or("info");
274
275 // Set up tracing to write to the debug file with "C !" prefix
276 let tracing_writer = debug_logger.create_tracing_writer();
277 tracing_subscriber::registry()
278 .with(EnvFilter::new(log_level))
279 .with(
280 tracing_subscriber::fmt::layer()
281 .with_target(true)
282 .with_writer(move || tracing_writer.clone()),
283 )
284 .init();
285
286 tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
287 }
288
289 // Set up tracing based on --trace and --serve flags
290 let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
291 // --trace only: write to file
292 (Some(trace_path), false) => {
293 let writer = trace::TraceWriter::from_path(trace_path)
294 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
295 (Some(writer), None)
296 }
297 // --serve only: in-memory with viewer
298 (None, true) => {
299 let (handle, server) = agent_client_protocol_trace_viewer::serve_memory(
300 agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
301 )?;
302 let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
303 (Some(writer), Some(tokio::spawn(server)))
304 }
305 // --trace --serve: write to file and serve it
306 (Some(trace_path), true) => {
307 let writer = trace::TraceWriter::from_path(trace_path)
308 .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
309 let server = agent_client_protocol_trace_viewer::serve_file(
310 trace_path.clone(),
311 agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
312 );
313 (Some(writer), Some(tokio::spawn(server)))
314 }
315 // Neither: no tracing
316 (None, false) => (None, None),
317 };
318
319 self.run(debug_logger.as_ref(), trace_writer)
320 .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
321 .await
322 .map_err(|err| anyhow::anyhow!("{err}"))
323 }
324
325 async fn run(
326 self,
327 debug_logger: Option<&debug_logger::DebugLogger>,
328 trace_writer: Option<trace::TraceWriter>,
329 ) -> Result<(), agent_client_protocol::Error> {
330 match self.command {
331 ConductorCommand::Agent { name, components } => {
332 initialize_conductor(
333 debug_logger,
334 trace_writer,
335 name,
336 components,
337 ConductorImpl::new_agent,
338 )
339 .await
340 }
341 ConductorCommand::Proxy { name, proxies } => {
342 initialize_conductor(
343 debug_logger,
344 trace_writer,
345 name,
346 proxies,
347 ConductorImpl::new_proxy,
348 )
349 .await
350 }
351 ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
352 }
353 }
354}
355
356async fn initialize_conductor<Host: ConductorHostRole>(
357 debug_logger: Option<&debug_logger::DebugLogger>,
358 trace_writer: Option<trace::TraceWriter>,
359 name: String,
360 components: Vec<String>,
361 new_conductor: impl FnOnce(
362 String,
363 CommandLineComponents,
364 crate::McpBridgeMode,
365 ) -> ConductorImpl<Host>,
366) -> Result<(), agent_client_protocol::Error> {
367 // Parse agents and optionally wrap with debug callbacks
368 let providers: Vec<AcpAgent> = components
369 .into_iter()
370 .enumerate()
371 .map(|(i, s)| {
372 let mut agent = AcpAgent::from_str(&s)?;
373 if let Some(logger) = debug_logger {
374 agent = agent.with_debug(logger.create_callback(i.to_string()));
375 }
376 Ok(agent)
377 })
378 .collect::<Result<Vec<_>, agent_client_protocol::Error>>()?;
379
380 // Create Stdio component with optional debug logging
381 let stdio = if let Some(logger) = debug_logger {
382 Stdio::new().with_debug(logger.create_callback("C".to_string()))
383 } else {
384 Stdio::new()
385 };
386
387 // Create conductor with optional trace writer
388 let mut conductor = new_conductor(
389 name,
390 CommandLineComponents(providers),
391 McpBridgeMode::default(),
392 );
393 if let Some(writer) = trace_writer {
394 conductor = conductor.with_trace_writer(writer);
395 }
396
397 conductor.run(stdio).await
398}