Skip to main content

agent_client_protocol_conductor/
lib.rs

1//! # agent-client-protocol-conductor
2//!
3//! Binary for orchestrating ACP proxy chains.
4//!
5//! ## What is the conductor?
6//!
7//! The conductor is a tool that manages proxy chains - it spawns proxy components and the base agent,
8//! then routes messages between them. From the editor's perspective, the conductor appears as a single ACP agent.
9//!
10//! ```text
11//! Editor ← stdio → Conductor → Proxy 1 → Proxy 2 → Agent
12//! ```
13//!
14//! ## Usage
15//!
16//! ### Agent Mode
17//!
18//! Orchestrate a chain of proxies in front of an agent:
19//!
20//! ```bash
21//! # Chain format: proxy1 proxy2 ... agent
22//! agent-client-protocol-conductor agent "python proxy1.py" "python proxy2.py" "python base-agent.py"
23//! ```
24//!
25//! The conductor:
26//! 1. Spawns each component as a subprocess
27//! 2. Connects them in a chain
28//! 3. Presents as a single agent on stdin/stdout
29//! 4. Manages the lifecycle of all processes
30//!
31//! ### MCP Bridge Mode
32//!
33//! Connect stdio to a TCP-based MCP server:
34//!
35//! ```bash
36//! # Bridge stdio to MCP server on localhost:8080
37//! agent-client-protocol-conductor mcp 8080
38//! ```
39//!
40//! This allows stdio-based tools to communicate with TCP MCP servers.
41//!
42//! ## How It Works
43//!
44//! **Component Communication:**
45//! - Editor talks to conductor via stdio
46//! - Conductor uses `_proxy/successor/*` protocol extensions to route messages
47//! - Each proxy can intercept, transform, or forward messages
48//! - Final agent receives standard ACP messages
49//!
50//! **Process Management:**
51//! - All components are spawned as child processes
52//! - When conductor exits, all children are terminated
53//! - Errors in any component bring down the entire chain
54//!
55//! ## Example Use Case
56//!
57//! Add Sparkle embodiment + custom tools to any agent:
58//!
59//! ```bash
60//! agent-client-protocol-conductor agent \
61//!   "sparkle-acp-proxy" \
62//!   "my-custom-tools-proxy" \
63//!   "claude-agent"
64//! ```
65//!
66//! This creates a stack where:
67//! 1. Sparkle proxy injects MCP servers and prepends embodiment
68//! 2. Custom tools proxy adds domain-specific functionality
69//! 3. Base agent handles the actual AI responses
70//!
71//! ## Related Crates
72//!
73//! - **[agent-client-protocol-proxy](https://crates.io/crates/agent-client-protocol-proxy)** - Framework for building proxy components
74//! - **[agent-client-protocol](https://crates.io/crates/agent-client-protocol)** - Core ACP SDK
75//! - **[agent-client-protocol-tokio](https://crates.io/crates/agent-client-protocol-tokio)** - Tokio utilities for process spawning
76
77use std::path::PathBuf;
78use std::str::FromStr;
79
80/// Core conductor logic for orchestrating proxy chains
81mod conductor;
82/// Debug logging for conductor
83mod debug_logger;
84/// MCP bridge functionality for TCP-based MCP servers
85mod mcp_bridge;
86mod snoop;
87/// Trace event types for sequence diagram viewer
88pub mod trace;
89
90pub use self::conductor::*;
91
92use clap::{Parser, Subcommand};
93
94use agent_client_protocol::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
95use agent_client_protocol_tokio::{AcpAgent, Stdio};
96use tracing::Instrument;
97use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
98
99/// Wrapper for command-line component lists that can serve as either
100/// proxies-only (for proxy mode) or proxies+agent (for agent mode).
101///
102/// This exists because `AcpAgent` implements `Component<L>` for all `L`,
103/// so a `Vec<AcpAgent>` can be used as either a list of proxies or as
104/// proxies + final agent depending on the conductor mode.
105#[derive(Debug)]
106pub struct CommandLineComponents(pub Vec<AcpAgent>);
107
108impl InstantiateProxies for CommandLineComponents {
109    fn instantiate_proxies(
110        self: Box<Self>,
111        req: InitializeRequest,
112    ) -> futures::future::BoxFuture<
113        'static,
114        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
115    > {
116        Box::pin(async move {
117            let proxies = self.0.into_iter().map(DynConnectTo::new).collect();
118            Ok((req, proxies))
119        })
120    }
121}
122
123impl InstantiateProxiesAndAgent for CommandLineComponents {
124    fn instantiate_proxies_and_agent(
125        self: Box<Self>,
126        req: InitializeRequest,
127    ) -> futures::future::BoxFuture<
128        'static,
129        Result<
130            (
131                InitializeRequest,
132                Vec<DynConnectTo<Conductor>>,
133                DynConnectTo<Client>,
134            ),
135            agent_client_protocol::Error,
136        >,
137    > {
138        Box::pin(async move {
139            let mut iter = self.0.into_iter().peekable();
140            let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
141
142            // All but the last element are proxies
143            while let Some(component) = iter.next() {
144                if iter.peek().is_some() {
145                    proxies.push(DynConnectTo::new(component));
146                } else {
147                    // Last element is the agent
148                    let agent = DynConnectTo::new(component);
149                    return Ok((req, proxies, agent));
150                }
151            }
152
153            Err(agent_client_protocol::util::internal_error(
154                "no agent component in list",
155            ))
156        })
157    }
158}
159
160/// Wrapper to implement WriteEvent for TraceHandle.
161struct TraceHandleWriter(agent_client_protocol_trace_viewer::TraceHandle);
162
163impl trace::WriteEvent for TraceHandleWriter {
164    fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
165        let value = serde_json::to_value(event).map_err(std::io::Error::other)?;
166        self.0.push(value);
167        Ok(())
168    }
169}
170
171/// Mode for the MCP bridge.
172#[derive(Debug, Clone, Default)]
173pub enum McpBridgeMode {
174    /// Use stdio-based MCP bridge with a conductor subprocess.
175    Stdio {
176        /// Command and args to spawn conductor MCP bridge processes.
177        /// E.g., vec!["conductor"] or vec!["cargo", "run", "-p", "conductor", "--"]
178        conductor_command: Vec<String>,
179    },
180
181    /// Use HTTP-based MCP bridge
182    #[default]
183    Http,
184}
185
186#[derive(Parser, Debug)]
187#[command(author, version, about, long_about = None)]
188pub struct ConductorArgs {
189    /// Enable debug logging of all stdin/stdout/stderr from components
190    #[arg(long)]
191    pub debug: bool,
192
193    /// Directory for debug log files (defaults to current directory)
194    #[arg(long)]
195    pub debug_dir: Option<PathBuf>,
196
197    /// Set log level (e.g., "trace", "debug", "info", "warn", "error", or module-specific like "conductor=debug")
198    /// Only applies when --debug is enabled
199    #[arg(long)]
200    pub log: Option<String>,
201
202    /// Path to write trace events for sequence diagram visualization.
203    /// Events are written as newline-delimited JSON (.jsons format).
204    #[arg(long)]
205    pub trace: Option<PathBuf>,
206
207    /// Serve trace viewer in browser with live updates.
208    /// Can be used alone (in-memory) or with --trace (file-backed).
209    #[arg(long)]
210    pub serve: bool,
211
212    #[command(subcommand)]
213    pub command: ConductorCommand,
214}
215
216#[derive(Subcommand, Debug)]
217pub enum ConductorCommand {
218    /// Run as agent orchestrator managing a proxy chain
219    Agent {
220        /// Name of the agent
221        #[arg(short, long, default_value = "conductor")]
222        name: String,
223
224        /// List of commands to chain together; the final command must be the agent.
225        components: Vec<String>,
226    },
227
228    /// Run as a proxy orchestrating a proxy chain
229    Proxy {
230        /// Name of the proxy
231        #[arg(short, long, default_value = "conductor")]
232        name: String,
233
234        /// List of proxy commands to chain together
235        proxies: Vec<String>,
236    },
237
238    /// Run as MCP bridge connecting stdio to TCP
239    Mcp {
240        /// TCP port to connect to on localhost
241        port: u16,
242    },
243}
244
245impl ConductorArgs {
246    /// Main entry point that sets up tracing and runs the conductor
247    pub async fn main(self) -> anyhow::Result<()> {
248        let pid = std::process::id();
249        let cwd = std::env::current_dir()
250            .map_or_else(|_| "<unknown>".to_string(), |p| p.display().to_string());
251
252        // Only set up tracing if --debug is enabled
253        let debug_logger = if self.debug {
254            // Extract proxy list to create the debug logger
255            let components = match &self.command {
256                ConductorCommand::Agent { components, .. } => components.clone(),
257                ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
258                ConductorCommand::Mcp { .. } => Vec::new(),
259            };
260
261            // Create debug logger
262            Some(
263                debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
264                    .await
265                    .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {e}"))?,
266            )
267        } else {
268            None
269        };
270
271        if let Some(debug_logger) = &debug_logger {
272            // Set up log level from --log flag, defaulting to "info"
273            let log_level = self.log.as_deref().unwrap_or("info");
274
275            // Set up tracing to write to the debug file with "C !" prefix
276            let tracing_writer = debug_logger.create_tracing_writer();
277            tracing_subscriber::registry()
278                .with(EnvFilter::new(log_level))
279                .with(
280                    tracing_subscriber::fmt::layer()
281                        .with_target(true)
282                        .with_writer(move || tracing_writer.clone()),
283                )
284                .init();
285
286            tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
287        }
288
289        // Set up tracing based on --trace and --serve flags
290        let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
291            // --trace only: write to file
292            (Some(trace_path), false) => {
293                let writer = trace::TraceWriter::from_path(trace_path)
294                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
295                (Some(writer), None)
296            }
297            // --serve only: in-memory with viewer
298            (None, true) => {
299                let (handle, server) = agent_client_protocol_trace_viewer::serve_memory(
300                    agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
301                )?;
302                let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
303                (Some(writer), Some(tokio::spawn(server)))
304            }
305            // --trace --serve: write to file and serve it
306            (Some(trace_path), true) => {
307                let writer = trace::TraceWriter::from_path(trace_path)
308                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
309                let server = agent_client_protocol_trace_viewer::serve_file(
310                    trace_path.clone(),
311                    agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
312                );
313                (Some(writer), Some(tokio::spawn(server)))
314            }
315            // Neither: no tracing
316            (None, false) => (None, None),
317        };
318
319        self.run(debug_logger.as_ref(), trace_writer)
320            .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
321            .await
322            .map_err(|err| anyhow::anyhow!("{err}"))
323    }
324
325    async fn run(
326        self,
327        debug_logger: Option<&debug_logger::DebugLogger>,
328        trace_writer: Option<trace::TraceWriter>,
329    ) -> Result<(), agent_client_protocol::Error> {
330        match self.command {
331            ConductorCommand::Agent { name, components } => {
332                initialize_conductor(
333                    debug_logger,
334                    trace_writer,
335                    name,
336                    components,
337                    ConductorImpl::new_agent,
338                )
339                .await
340            }
341            ConductorCommand::Proxy { name, proxies } => {
342                initialize_conductor(
343                    debug_logger,
344                    trace_writer,
345                    name,
346                    proxies,
347                    ConductorImpl::new_proxy,
348                )
349                .await
350            }
351            ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
352        }
353    }
354}
355
356async fn initialize_conductor<Host: ConductorHostRole>(
357    debug_logger: Option<&debug_logger::DebugLogger>,
358    trace_writer: Option<trace::TraceWriter>,
359    name: String,
360    components: Vec<String>,
361    new_conductor: impl FnOnce(
362        String,
363        CommandLineComponents,
364        crate::McpBridgeMode,
365    ) -> ConductorImpl<Host>,
366) -> Result<(), agent_client_protocol::Error> {
367    // Parse agents and optionally wrap with debug callbacks
368    let providers: Vec<AcpAgent> = components
369        .into_iter()
370        .enumerate()
371        .map(|(i, s)| {
372            let mut agent = AcpAgent::from_str(&s)?;
373            if let Some(logger) = debug_logger {
374                agent = agent.with_debug(logger.create_callback(i.to_string()));
375            }
376            Ok(agent)
377        })
378        .collect::<Result<Vec<_>, agent_client_protocol::Error>>()?;
379
380    // Create Stdio component with optional debug logging
381    let stdio = if let Some(logger) = debug_logger {
382        Stdio::new().with_debug(logger.create_callback("C".to_string()))
383    } else {
384        Stdio::new()
385    };
386
387    // Create conductor with optional trace writer
388    let mut conductor = new_conductor(
389        name,
390        CommandLineComponents(providers),
391        McpBridgeMode::default(),
392    );
393    if let Some(writer) = trace_writer {
394        conductor = conductor.with_trace_writer(writer);
395    }
396
397    conductor.run(stdio).await
398}