Skip to main content

sacp_conductor/
lib.rs

1//! # sacp-conductor
2//!
3//! Binary for orchestrating ACP proxy chains.
4//!
5//! ## What is the conductor?
6//!
7//! The conductor is a tool that manages proxy chains - it spawns proxy components and the base agent,
8//! then routes messages between them. From the editor's perspective, the conductor appears as a single ACP agent.
9//!
10//! ```text
11//! Editor ← stdio → Conductor → Proxy 1 → Proxy 2 → Agent
12//! ```
13//!
14//! ## Usage
15//!
16//! ### Agent Mode
17//!
18//! Orchestrate a chain of proxies in front of an agent:
19//!
20//! ```bash
21//! # Chain format: proxy1 proxy2 ... agent
22//! sacp-conductor agent "python proxy1.py" "python proxy2.py" "python base-agent.py"
23//! ```
24//!
25//! The conductor:
26//! 1. Spawns each component as a subprocess
27//! 2. Connects them in a chain
28//! 3. Presents as a single agent on stdin/stdout
29//! 4. Manages the lifecycle of all processes
30//!
31//! ### MCP Bridge Mode
32//!
33//! Connect stdio to a TCP-based MCP server:
34//!
35//! ```bash
36//! # Bridge stdio to MCP server on localhost:8080
37//! sacp-conductor mcp 8080
38//! ```
39//!
40//! This allows stdio-based tools to communicate with TCP MCP servers.
41//!
42//! ## How It Works
43//!
44//! **Component Communication:**
45//! - Editor talks to conductor via stdio
46//! - Conductor uses `_proxy/successor/*` protocol extensions to route messages
47//! - Each proxy can intercept, transform, or forward messages
48//! - Final agent receives standard ACP messages
49//!
50//! **Process Management:**
51//! - All components are spawned as child processes
52//! - When conductor exits, all children are terminated
53//! - Errors in any component bring down the entire chain
54//!
55//! ## Example Use Case
56//!
57//! Add Sparkle embodiment + custom tools to any agent:
58//!
59//! ```bash
60//! sacp-conductor agent \
61//!   "sparkle-acp-proxy" \
62//!   "my-custom-tools-proxy" \
63//!   "claude-agent"
64//! ```
65//!
66//! This creates a stack where:
67//! 1. Sparkle proxy injects MCP servers and prepends embodiment
68//! 2. Custom tools proxy adds domain-specific functionality
69//! 3. Base agent handles the actual AI responses
70//!
71//! ## Related Crates
72//!
73//! - **[sacp-proxy](https://crates.io/crates/sacp-proxy)** - Framework for building proxy components
74//! - **[sacp](https://crates.io/crates/sacp)** - Core ACP SDK
75//! - **[sacp-tokio](https://crates.io/crates/sacp-tokio)** - Tokio utilities for process spawning
76
77use std::path::PathBuf;
78use std::str::FromStr;
79
80/// Core conductor logic for orchestrating proxy chains
81mod conductor;
82/// Debug logging for conductor
83mod debug_logger;
84/// MCP bridge functionality for TCP-based MCP servers
85mod mcp_bridge;
86mod snoop;
87/// Trace event types for sequence diagram viewer
88pub mod trace;
89
90pub use self::conductor::*;
91
92use clap::{Parser, Subcommand};
93
94use sacp::{Client, Conductor, DynConnectTo, schema::InitializeRequest};
95use sacp_tokio::{AcpAgent, Stdio};
96use tracing::Instrument;
97use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
98
99/// Wrapper for command-line component lists that can serve as either
100/// proxies-only (for proxy mode) or proxies+agent (for agent mode).
101///
102/// This exists because `AcpAgent` implements `Component<L>` for all `L`,
103/// so a `Vec<AcpAgent>` can be used as either a list of proxies or as
104/// proxies + final agent depending on the conductor mode.
105pub struct CommandLineComponents(pub Vec<AcpAgent>);
106
107impl InstantiateProxies for CommandLineComponents {
108    fn instantiate_proxies(
109        self: Box<Self>,
110        req: InitializeRequest,
111    ) -> futures::future::BoxFuture<
112        'static,
113        Result<(InitializeRequest, Vec<DynConnectTo<Conductor>>), sacp::Error>,
114    > {
115        Box::pin(async move {
116            let proxies = self.0.into_iter().map(|c| DynConnectTo::new(c)).collect();
117            Ok((req, proxies))
118        })
119    }
120}
121
122impl InstantiateProxiesAndAgent for CommandLineComponents {
123    fn instantiate_proxies_and_agent(
124        self: Box<Self>,
125        req: InitializeRequest,
126    ) -> futures::future::BoxFuture<
127        'static,
128        Result<
129            (
130                InitializeRequest,
131                Vec<DynConnectTo<Conductor>>,
132                DynConnectTo<Client>,
133            ),
134            sacp::Error,
135        >,
136    > {
137        Box::pin(async move {
138            let mut iter = self.0.into_iter().peekable();
139            let mut proxies: Vec<DynConnectTo<Conductor>> = Vec::new();
140
141            // All but the last element are proxies
142            while let Some(component) = iter.next() {
143                if iter.peek().is_some() {
144                    proxies.push(DynConnectTo::new(component));
145                } else {
146                    // Last element is the agent
147                    let agent = DynConnectTo::new(component);
148                    return Ok((req, proxies, agent));
149                }
150            }
151
152            Err(sacp::util::internal_error("no agent component in list"))
153        })
154    }
155}
156
157/// Wrapper to implement WriteEvent for TraceHandle.
158struct TraceHandleWriter(sacp_trace_viewer::TraceHandle);
159
160impl trace::WriteEvent for TraceHandleWriter {
161    fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
162        let value = serde_json::to_value(event)
163            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
164        self.0.push(value);
165        Ok(())
166    }
167}
168
169/// Mode for the MCP bridge.
170#[derive(Debug, Clone)]
171pub enum McpBridgeMode {
172    /// Use stdio-based MCP bridge with a conductor subprocess.
173    Stdio {
174        /// Command and args to spawn conductor MCP bridge processes.
175        /// E.g., vec!["conductor"] or vec!["cargo", "run", "-p", "conductor", "--"]
176        conductor_command: Vec<String>,
177    },
178
179    /// Use HTTP-based MCP bridge
180    Http,
181}
182
183impl Default for McpBridgeMode {
184    fn default() -> Self {
185        McpBridgeMode::Http
186    }
187}
188
189#[derive(Parser, Debug)]
190#[command(author, version, about, long_about = None)]
191pub struct ConductorArgs {
192    /// Enable debug logging of all stdin/stdout/stderr from components
193    #[arg(long)]
194    pub debug: bool,
195
196    /// Directory for debug log files (defaults to current directory)
197    #[arg(long)]
198    pub debug_dir: Option<PathBuf>,
199
200    /// Set log level (e.g., "trace", "debug", "info", "warn", "error", or module-specific like "conductor=debug")
201    /// Only applies when --debug is enabled
202    #[arg(long)]
203    pub log: Option<String>,
204
205    /// Path to write trace events for sequence diagram visualization.
206    /// Events are written as newline-delimited JSON (.jsons format).
207    #[arg(long)]
208    pub trace: Option<PathBuf>,
209
210    /// Serve trace viewer in browser with live updates.
211    /// Can be used alone (in-memory) or with --trace (file-backed).
212    #[arg(long)]
213    pub serve: bool,
214
215    #[command(subcommand)]
216    pub command: ConductorCommand,
217}
218
219#[derive(Subcommand, Debug)]
220pub enum ConductorCommand {
221    /// Run as agent orchestrator managing a proxy chain
222    Agent {
223        /// Name of the agent
224        #[arg(short, long, default_value = "conductor")]
225        name: String,
226
227        /// List of commands to chain together; the final command must be the agent.
228        components: Vec<String>,
229    },
230
231    /// Run as a proxy orchestrating a proxy chain
232    Proxy {
233        /// Name of the proxy
234        #[arg(short, long, default_value = "conductor")]
235        name: String,
236
237        /// List of proxy commands to chain together
238        proxies: Vec<String>,
239    },
240
241    /// Run as MCP bridge connecting stdio to TCP
242    Mcp {
243        /// TCP port to connect to on localhost
244        port: u16,
245    },
246}
247
248impl ConductorArgs {
249    /// Main entry point that sets up tracing and runs the conductor
250    pub async fn main(self) -> anyhow::Result<()> {
251        let pid = std::process::id();
252        let cwd = std::env::current_dir()
253            .map(|p| p.display().to_string())
254            .unwrap_or_else(|_| "<unknown>".to_string());
255
256        // Only set up tracing if --debug is enabled
257        let debug_logger = if self.debug {
258            // Extract proxy list to create the debug logger
259            let components = match &self.command {
260                ConductorCommand::Agent { components, .. } => components.clone(),
261                ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
262                ConductorCommand::Mcp { .. } => Vec::new(),
263            };
264
265            // Create debug logger
266            Some(
267                debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
268                    .await
269                    .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {}", e))?,
270            )
271        } else {
272            None
273        };
274
275        if let Some(debug_logger) = &debug_logger {
276            // Set up log level from --log flag, defaulting to "info"
277            let log_level = self.log.as_deref().unwrap_or("info");
278
279            // Set up tracing to write to the debug file with "C !" prefix
280            let tracing_writer = debug_logger.create_tracing_writer();
281            tracing_subscriber::registry()
282                .with(EnvFilter::new(log_level))
283                .with(
284                    tracing_subscriber::fmt::layer()
285                        .with_target(true)
286                        .with_writer(move || tracing_writer.clone()),
287                )
288                .init();
289
290            tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
291        };
292
293        // Set up tracing based on --trace and --serve flags
294        let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
295            // --trace only: write to file
296            (Some(trace_path), false) => {
297                let writer = trace::TraceWriter::from_path(trace_path)
298                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
299                (Some(writer), None)
300            }
301            // --serve only: in-memory with viewer
302            (None, true) => {
303                let (handle, server) = sacp_trace_viewer::serve_memory(
304                    sacp_trace_viewer::TraceViewerConfig::default(),
305                )
306                .await?;
307                let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
308                (Some(writer), Some(tokio::spawn(server)))
309            }
310            // --trace --serve: write to file and serve it
311            (Some(trace_path), true) => {
312                let writer = trace::TraceWriter::from_path(trace_path)
313                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
314                let server = sacp_trace_viewer::serve_file(
315                    trace_path.clone(),
316                    sacp_trace_viewer::TraceViewerConfig::default(),
317                );
318                (Some(writer), Some(tokio::spawn(server)))
319            }
320            // Neither: no tracing
321            (None, false) => (None, None),
322        };
323
324        self.run(debug_logger.as_ref(), trace_writer)
325            .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
326            .await
327            .map_err(|err| anyhow::anyhow!("{err}"))
328    }
329
330    async fn run(
331        self,
332        debug_logger: Option<&debug_logger::DebugLogger>,
333        trace_writer: Option<trace::TraceWriter>,
334    ) -> Result<(), sacp::Error> {
335        match self.command {
336            ConductorCommand::Agent { name, components } => {
337                initialize_conductor(
338                    debug_logger,
339                    trace_writer,
340                    name,
341                    components,
342                    |name, providers, mcp_mode| ConductorImpl::new_agent(name, providers, mcp_mode),
343                )
344                .await
345            }
346            ConductorCommand::Proxy { name, proxies } => {
347                initialize_conductor(
348                    debug_logger,
349                    trace_writer,
350                    name,
351                    proxies,
352                    |name, providers, mcp_mode| ConductorImpl::new_proxy(name, providers, mcp_mode),
353                )
354                .await
355            }
356            ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
357        }
358    }
359}
360
361async fn initialize_conductor<Host: ConductorHostRole>(
362    debug_logger: Option<&debug_logger::DebugLogger>,
363    trace_writer: Option<trace::TraceWriter>,
364    name: String,
365    components: Vec<String>,
366    new_conductor: impl FnOnce(
367        String,
368        CommandLineComponents,
369        crate::McpBridgeMode,
370    ) -> ConductorImpl<Host>,
371) -> Result<(), sacp::Error> {
372    // Parse agents and optionally wrap with debug callbacks
373    let providers: Vec<AcpAgent> = components
374        .into_iter()
375        .enumerate()
376        .map(|(i, s)| {
377            let mut agent = AcpAgent::from_str(&s)?;
378            if let Some(logger) = debug_logger {
379                agent = agent.with_debug(logger.create_callback(i.to_string()));
380            }
381            Ok(agent)
382        })
383        .collect::<Result<Vec<_>, sacp::Error>>()?;
384
385    // Create Stdio component with optional debug logging
386    let stdio = if let Some(logger) = debug_logger {
387        Stdio::new().with_debug(logger.create_callback("C".to_string()))
388    } else {
389        Stdio::new()
390    };
391
392    // Create conductor with optional trace writer
393    let mut conductor = new_conductor(name, CommandLineComponents(providers), Default::default());
394    if let Some(writer) = trace_writer {
395        conductor = conductor.with_trace_writer(writer);
396    }
397
398    conductor.run(stdio).await
399}