Skip to main content

agent_client_protocol_conductor/
lib.rs

1//! # agent-client-protocol-conductor
2//!
3//! Binary for orchestrating ACP proxy chains.
4//!
5//! ## What is the conductor?
6//!
7//! The conductor is a tool that manages proxy chains - it spawns proxy components and the base agent,
8//! then routes messages between them. From the editor's perspective, the conductor appears as a single ACP agent.
9//!
10//! ```text
11//! Editor ← stdio → Conductor → Proxy 1 → Proxy 2 → Agent
12//! ```
13//!
14//! ## Usage
15//!
16//! ### Agent Mode
17//!
18//! Orchestrate a chain of proxies in front of an agent:
19//!
20//! ```bash
21//! # Chain format: proxy1 proxy2 ... agent
22//! agent-client-protocol-conductor agent "python proxy1.py" "python proxy2.py" "python base-agent.py"
23//! ```
24//!
25//! The conductor:
26//! 1. Spawns each component as a subprocess
27//! 2. Connects them in a chain
28//! 3. Presents as a single agent on stdin/stdout
29//! 4. Manages the lifecycle of all processes
30//!
31//! ## How It Works
32//!
33//! **Component Communication:**
34//! - Editor talks to conductor via stdio
35//! - Conductor uses `_proxy/successor/*` protocol extensions to route messages
36//! - Each proxy can intercept, transform, or forward messages
37//! - Final agent receives standard ACP messages
38//!
39//! **Process Management:**
40//! - All components are spawned as child processes
41//! - When conductor exits, all children are terminated
42//! - Errors in any component bring down the entire chain
43//!
44//! ## Example Use Case
45//!
46//! Add Sparkle embodiment + custom tools to any agent:
47//!
48//! ```bash
49//! agent-client-protocol-conductor agent \
50//!   "sparkle-acp-proxy" \
51//!   "my-custom-tools-proxy" \
52//!   "claude-agent"
53//! ```
54//!
55//! This creates a stack where:
56//! 1. Sparkle proxy injects MCP servers and prepends embodiment
57//! 2. Custom tools proxy adds domain-specific functionality
58//! 3. Base agent handles the actual AI responses
59//!
60//! ## Related Crates
61//!
62//! - **[agent-client-protocol-proxy](https://crates.io/crates/agent-client-protocol-proxy)** - Framework for building proxy components
63//! - **[agent-client-protocol](https://crates.io/crates/agent-client-protocol)** - Core ACP SDK
64//! - **[agent-client-protocol](https://crates.io/crates/agent-client-protocol)** - Core protocol types, traits, and process spawning
65
66use std::path::PathBuf;
67use std::str::FromStr;
68
69/// Core conductor logic for orchestrating proxy chains
70mod conductor;
71/// Debug logging for conductor
72mod debug_logger;
73mod snoop;
74/// Trace event types for sequence diagram viewer
75pub mod trace;
76
77pub use self::conductor::*;
78
79use clap::{Parser, Subcommand};
80
81use agent_client_protocol::{AcpAgent, Stdio};
82use agent_client_protocol::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
83use tracing::Instrument;
84use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
85
86/// Wrapper for command-line component lists that can serve as either
87/// proxies-only (for proxy mode) or proxies+agent (for agent mode).
88///
89/// This exists because `AcpAgent` implements `Component<L>` for all `L`,
90/// so a `Vec<AcpAgent>` can be used as either a list of proxies or as
91/// proxies + final agent depending on the conductor mode.
92#[derive(Debug)]
93pub struct CommandLineComponents(pub Vec<AcpAgent>);
94
95impl InstantiateProxies for CommandLineComponents {
96    fn instantiate_proxies(
97        self: Box<Self>,
98        req: InitializeRequest,
99    ) -> futures::future::BoxFuture<
100        'static,
101        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), agent_client_protocol::Error>,
102    > {
103        Box::pin(async move {
104            let proxies = self.0.into_iter().map(DynConnectTo::new).collect();
105            Ok((req, proxies))
106        })
107    }
108}
109
110impl InstantiateProxiesAndAgent for CommandLineComponents {
111    fn instantiate_proxies_and_agent(
112        self: Box<Self>,
113        req: InitializeRequest,
114    ) -> futures::future::BoxFuture<
115        'static,
116        Result<
117            (
118                InitializeRequest,
119                Vec<DynConnectTo<Conductor>>,
120                DynConnectTo<Client>,
121            ),
122            agent_client_protocol::Error,
123        >,
124    > {
125        Box::pin(async move {
126            let mut iter = self.0.into_iter().peekable();
127            let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
128
129            // All but the last element are proxies
130            while let Some(component) = iter.next() {
131                if iter.peek().is_some() {
132                    proxies.push(DynConnectTo::new(component));
133                } else {
134                    // Last element is the agent
135                    let agent = DynConnectTo::new(component);
136                    return Ok((req, proxies, agent));
137                }
138            }
139
140            Err(agent_client_protocol::util::internal_error(
141                "no agent component in list",
142            ))
143        })
144    }
145}
146
147/// Wrapper to implement WriteEvent for TraceHandle.
148struct TraceHandleWriter(agent_client_protocol_trace_viewer::TraceHandle);
149
150impl trace::WriteEvent for TraceHandleWriter {
151    fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
152        let value = serde_json::to_value(event).map_err(std::io::Error::other)?;
153        self.0.push(value);
154        Ok(())
155    }
156}
157
158#[derive(Parser, Debug)]
159#[command(author, version, about, long_about = None)]
160pub struct ConductorArgs {
161    /// Enable debug logging of all stdin/stdout/stderr from components
162    #[arg(long)]
163    pub debug: bool,
164
165    /// Directory for debug log files (defaults to current directory)
166    #[arg(long)]
167    pub debug_dir: Option<PathBuf>,
168
169    /// Set log level (e.g., "trace", "debug", "info", "warn", "error", or module-specific like "conductor=debug")
170    /// Only applies when --debug is enabled
171    #[arg(long)]
172    pub log: Option<String>,
173
174    /// Path to write trace events for sequence diagram visualization.
175    /// Events are written as newline-delimited JSON (.jsons format).
176    #[arg(long)]
177    pub trace: Option<PathBuf>,
178
179    /// Serve trace viewer in browser with live updates.
180    /// Can be used alone (in-memory) or with --trace (file-backed).
181    #[arg(long)]
182    pub serve: bool,
183
184    #[command(subcommand)]
185    pub command: ConductorCommand,
186}
187
188#[derive(Subcommand, Debug)]
189pub enum ConductorCommand {
190    /// Run as agent orchestrator managing a proxy chain
191    Agent {
192        /// Name of the agent
193        #[arg(short, long, default_value = "conductor")]
194        name: String,
195
196        /// List of commands to chain together; the final command must be the agent.
197        components: Vec<String>,
198    },
199
200    /// Run as a proxy orchestrating a proxy chain
201    Proxy {
202        /// Name of the proxy
203        #[arg(short, long, default_value = "conductor")]
204        name: String,
205
206        /// List of proxy commands to chain together
207        proxies: Vec<String>,
208    },
209}
210
211impl ConductorArgs {
212    /// Main entry point that sets up tracing and runs the conductor
213    pub async fn main(self) -> anyhow::Result<()> {
214        let pid = std::process::id();
215        let cwd = std::env::current_dir()
216            .map_or_else(|_| "<unknown>".to_string(), |p| p.display().to_string());
217
218        // Only set up tracing if --debug is enabled
219        let debug_logger = if self.debug {
220            // Extract proxy list to create the debug logger
221            let components = match &self.command {
222                ConductorCommand::Agent { components, .. } => components.clone(),
223                ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
224            };
225
226            // Create debug logger
227            Some(
228                debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
229                    .await
230                    .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {e}"))?,
231            )
232        } else {
233            None
234        };
235
236        if let Some(debug_logger) = &debug_logger {
237            // Set up log level from --log flag, defaulting to "info"
238            let log_level = self.log.as_deref().unwrap_or("info");
239
240            // Set up tracing to write to the debug file with "C !" prefix
241            let tracing_writer = debug_logger.create_tracing_writer();
242            tracing_subscriber::registry()
243                .with(EnvFilter::new(log_level))
244                .with(
245                    tracing_subscriber::fmt::layer()
246                        .with_target(true)
247                        .with_writer(move || tracing_writer.clone()),
248                )
249                .init();
250
251            tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
252        }
253
254        // Set up tracing based on --trace and --serve flags
255        let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
256            // --trace only: write to file
257            (Some(trace_path), false) => {
258                let writer = trace::TraceWriter::from_path(trace_path)
259                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
260                (Some(writer), None)
261            }
262            // --serve only: in-memory with viewer
263            (None, true) => {
264                let (handle, server) = agent_client_protocol_trace_viewer::serve_memory(
265                    agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
266                )?;
267                let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
268                (Some(writer), Some(tokio::spawn(server)))
269            }
270            // --trace --serve: write to file and serve it
271            (Some(trace_path), true) => {
272                let writer = trace::TraceWriter::from_path(trace_path)
273                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {e}"))?;
274                let server = agent_client_protocol_trace_viewer::serve_file(
275                    trace_path.clone(),
276                    agent_client_protocol_trace_viewer::TraceViewerConfig::default(),
277                );
278                (Some(writer), Some(tokio::spawn(server)))
279            }
280            // Neither: no tracing
281            (None, false) => (None, None),
282        };
283
284        self.run(debug_logger.as_ref(), trace_writer)
285            .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
286            .await
287            .map_err(|err| anyhow::anyhow!("{err}"))
288    }
289
290    async fn run(
291        self,
292        debug_logger: Option<&debug_logger::DebugLogger>,
293        trace_writer: Option<trace::TraceWriter>,
294    ) -> Result<(), agent_client_protocol::Error> {
295        match self.command {
296            ConductorCommand::Agent { name, components } => {
297                initialize_conductor(
298                    debug_logger,
299                    trace_writer,
300                    name,
301                    components,
302                    ConductorImpl::new_agent,
303                )
304                .await
305            }
306            ConductorCommand::Proxy { name, proxies } => {
307                initialize_conductor(
308                    debug_logger,
309                    trace_writer,
310                    name,
311                    proxies,
312                    ConductorImpl::new_proxy,
313                )
314                .await
315            }
316        }
317    }
318}
319
320async fn initialize_conductor<Host: ConductorHostRole>(
321    debug_logger: Option<&debug_logger::DebugLogger>,
322    trace_writer: Option<trace::TraceWriter>,
323    name: String,
324    components: Vec<String>,
325    new_conductor: impl FnOnce(String, CommandLineComponents) -> ConductorImpl<Host>,
326) -> Result<(), agent_client_protocol::Error> {
327    // Parse agents and optionally wrap with debug callbacks
328    let providers: Vec<AcpAgent> = components
329        .into_iter()
330        .enumerate()
331        .map(|(i, s)| {
332            let mut agent = AcpAgent::from_str(&s)?;
333            if let Some(logger) = debug_logger {
334                agent = agent.with_debug(logger.create_callback(i.to_string()));
335            }
336            Ok(agent)
337        })
338        .collect::<Result<Vec<_>, agent_client_protocol::Error>>()?;
339
340    // Create Stdio component with optional debug logging
341    let stdio = if let Some(logger) = debug_logger {
342        Stdio::new().with_debug(logger.create_callback("C".to_string()))
343    } else {
344        Stdio::new()
345    };
346
347    // Create conductor with optional trace writer
348    let mut conductor = new_conductor(name, CommandLineComponents(providers));
349    if let Some(writer) = trace_writer {
350        conductor = conductor.with_trace_writer(writer);
351    }
352
353    conductor.run(stdio).await
354}