sacp_conductor/
lib.rs

1//! # sacp-conductor
2//!
3//! Binary for orchestrating ACP proxy chains.
4//!
5//! ## What is the conductor?
6//!
7//! The conductor is a tool that manages proxy chains - it spawns proxy components and the base agent,
8//! then routes messages between them. From the editor's perspective, the conductor appears as a single ACP agent.
9//!
10//! ```text
11//! Editor ← stdio → Conductor → Proxy 1 → Proxy 2 → Agent
12//! ```
13//!
14//! ## Usage
15//!
16//! ### Agent Mode
17//!
18//! Orchestrate a chain of proxies in front of an agent:
19//!
20//! ```bash
21//! # Chain format: proxy1 proxy2 ... agent
22//! sacp-conductor agent "python proxy1.py" "python proxy2.py" "python base-agent.py"
23//! ```
24//!
25//! The conductor:
26//! 1. Spawns each component as a subprocess
27//! 2. Connects them in a chain
28//! 3. Presents as a single agent on stdin/stdout
29//! 4. Manages the lifecycle of all processes
30//!
31//! ### MCP Bridge Mode
32//!
33//! Connect stdio to a TCP-based MCP server:
34//!
35//! ```bash
36//! # Bridge stdio to MCP server on localhost:8080
37//! sacp-conductor mcp 8080
38//! ```
39//!
40//! This allows stdio-based tools to communicate with TCP MCP servers.
41//!
42//! ## How It Works
43//!
44//! **Component Communication:**
45//! - Editor talks to conductor via stdio
46//! - Conductor uses `_proxy/successor/*` protocol extensions to route messages
47//! - Each proxy can intercept, transform, or forward messages
48//! - Final agent receives standard ACP messages
49//!
50//! **Process Management:**
51//! - All components are spawned as child processes
52//! - When conductor exits, all children are terminated
53//! - Errors in any component bring down the entire chain
54//!
55//! ## Example Use Case
56//!
57//! Add Sparkle embodiment + custom tools to any agent:
58//!
59//! ```bash
60//! sacp-conductor agent \
61//!   "sparkle-acp-proxy" \
62//!   "my-custom-tools-proxy" \
63//!   "claude-agent"
64//! ```
65//!
66//! This creates a stack where:
67//! 1. Sparkle proxy injects MCP servers and prepends embodiment
68//! 2. Custom tools proxy adds domain-specific functionality
69//! 3. Base agent handles the actual AI responses
70//!
71//! ## Related Crates
72//!
73//! - **[sacp-proxy](https://crates.io/crates/sacp-proxy)** - Framework for building proxy components
74//! - **[sacp](https://crates.io/crates/sacp)** - Core ACP SDK
75//! - **[sacp-tokio](https://crates.io/crates/sacp-tokio)** - Tokio utilities for process spawning
76
77use std::path::PathBuf;
78use std::str::FromStr;
79
80/// Core conductor logic for orchestrating proxy chains
81mod conductor;
82/// Debug logging for conductor
83mod debug_logger;
84/// MCP bridge functionality for TCP-based MCP servers
85mod mcp_bridge;
86/// Trace event types for sequence diagram viewer
87pub mod trace;
88
89pub use self::conductor::*;
90
91use clap::{Parser, Subcommand};
92
93use sacp::link::{AgentToClient, ProxyToConductor};
94use sacp::schema::InitializeRequest;
95use sacp_tokio::{AcpAgent, Stdio};
96use tracing::Instrument;
97use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
98
99/// Wrapper for command-line component lists that can serve as either
100/// proxies-only (for proxy mode) or proxies+agent (for agent mode).
101///
102/// This exists because `AcpAgent` implements `Component<L>` for all `L`,
103/// so a `Vec<AcpAgent>` can be used as either a list of proxies or as
104/// proxies + final agent depending on the conductor mode.
105pub struct CommandLineComponents(pub Vec<AcpAgent>);
106
107impl InstantiateProxies for CommandLineComponents {
108    fn instantiate_proxies(
109        self: Box<Self>,
110        req: InitializeRequest,
111    ) -> futures::future::BoxFuture<
112        'static,
113        Result<(InitializeRequest, Vec<sacp::DynComponent<ProxyToConductor>>), sacp::Error>,
114    > {
115        Box::pin(async move {
116            let proxies = self
117                .0
118                .into_iter()
119                .map(|c| sacp::DynComponent::new(c))
120                .collect();
121            Ok((req, proxies))
122        })
123    }
124}
125
126impl InstantiateProxiesAndAgent for CommandLineComponents {
127    fn instantiate_proxies_and_agent(
128        self: Box<Self>,
129        req: InitializeRequest,
130    ) -> futures::future::BoxFuture<
131        'static,
132        Result<
133            (
134                InitializeRequest,
135                Vec<sacp::DynComponent<ProxyToConductor>>,
136                sacp::DynComponent<AgentToClient>,
137            ),
138            sacp::Error,
139        >,
140    > {
141        Box::pin(async move {
142            let mut iter = self.0.into_iter().peekable();
143            let mut proxies: Vec<sacp::DynComponent<ProxyToConductor>> = Vec::new();
144
145            // All but the last element are proxies
146            while let Some(component) = iter.next() {
147                if iter.peek().is_some() {
148                    proxies.push(sacp::DynComponent::new(component));
149                } else {
150                    // Last element is the agent
151                    let agent = sacp::DynComponent::new(component);
152                    return Ok((req, proxies, agent));
153                }
154            }
155
156            Err(sacp::util::internal_error("no agent component in list"))
157        })
158    }
159}
160
161/// Wrapper to implement WriteEvent for TraceHandle.
162struct TraceHandleWriter(sacp_trace_viewer::TraceHandle);
163
164impl trace::WriteEvent for TraceHandleWriter {
165    fn write_event(&mut self, event: &trace::TraceEvent) -> std::io::Result<()> {
166        let value = serde_json::to_value(event)
167            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
168        self.0.push(value);
169        Ok(())
170    }
171}
172
173/// Mode for the MCP bridge.
174#[derive(Debug, Clone)]
175pub enum McpBridgeMode {
176    /// Use stdio-based MCP bridge with a conductor subprocess.
177    Stdio {
178        /// Command and args to spawn conductor MCP bridge processes.
179        /// E.g., vec!["conductor"] or vec!["cargo", "run", "-p", "conductor", "--"]
180        conductor_command: Vec<String>,
181    },
182
183    /// Use HTTP-based MCP bridge
184    Http,
185}
186
187impl Default for McpBridgeMode {
188    fn default() -> Self {
189        McpBridgeMode::Http
190    }
191}
192
193#[derive(Parser, Debug)]
194#[command(author, version, about, long_about = None)]
195pub struct ConductorArgs {
196    /// Enable debug logging of all stdin/stdout/stderr from components
197    #[arg(long)]
198    pub debug: bool,
199
200    /// Directory for debug log files (defaults to current directory)
201    #[arg(long)]
202    pub debug_dir: Option<PathBuf>,
203
204    /// Set log level (e.g., "trace", "debug", "info", "warn", "error", or module-specific like "conductor=debug")
205    /// Only applies when --debug is enabled
206    #[arg(long)]
207    pub log: Option<String>,
208
209    /// Path to write trace events for sequence diagram visualization.
210    /// Events are written as newline-delimited JSON (.jsons format).
211    #[arg(long)]
212    pub trace: Option<PathBuf>,
213
214    /// Serve trace viewer in browser with live updates.
215    /// Can be used alone (in-memory) or with --trace (file-backed).
216    #[arg(long)]
217    pub serve: bool,
218
219    #[command(subcommand)]
220    pub command: ConductorCommand,
221}
222
223#[derive(Subcommand, Debug)]
224pub enum ConductorCommand {
225    /// Run as agent orchestrator managing a proxy chain
226    Agent {
227        /// Name of the agent
228        #[arg(short, long, default_value = "conductor")]
229        name: String,
230
231        /// List of commands to chain together; the final command must be the agent.
232        components: Vec<String>,
233    },
234
235    /// Run as a proxy orchestrating a proxy chain
236    Proxy {
237        /// Name of the proxy
238        #[arg(short, long, default_value = "conductor")]
239        name: String,
240
241        /// List of proxy commands to chain together
242        proxies: Vec<String>,
243    },
244
245    /// Run as MCP bridge connecting stdio to TCP
246    Mcp {
247        /// TCP port to connect to on localhost
248        port: u16,
249    },
250}
251
252impl ConductorArgs {
253    /// Main entry point that sets up tracing and runs the conductor
254    pub async fn main(self) -> anyhow::Result<()> {
255        let pid = std::process::id();
256        let cwd = std::env::current_dir()
257            .map(|p| p.display().to_string())
258            .unwrap_or_else(|_| "<unknown>".to_string());
259
260        // Only set up tracing if --debug is enabled
261        let debug_logger = if self.debug {
262            // Extract proxy list to create the debug logger
263            let components = match &self.command {
264                ConductorCommand::Agent { components, .. } => components.clone(),
265                ConductorCommand::Proxy { proxies, .. } => proxies.clone(),
266                ConductorCommand::Mcp { .. } => Vec::new(),
267            };
268
269            // Create debug logger
270            Some(
271                debug_logger::DebugLogger::new(self.debug_dir.clone(), &components)
272                    .await
273                    .map_err(|e| anyhow::anyhow!("Failed to create debug logger: {}", e))?,
274            )
275        } else {
276            None
277        };
278
279        if let Some(debug_logger) = &debug_logger {
280            // Set up log level from --log flag, defaulting to "info"
281            let log_level = self.log.as_deref().unwrap_or("info");
282
283            // Set up tracing to write to the debug file with "C !" prefix
284            let tracing_writer = debug_logger.create_tracing_writer();
285            tracing_subscriber::registry()
286                .with(EnvFilter::new(log_level))
287                .with(
288                    tracing_subscriber::fmt::layer()
289                        .with_target(true)
290                        .with_writer(move || tracing_writer.clone()),
291                )
292                .init();
293
294            tracing::info!(pid = %pid, cwd = %cwd, level = %log_level, "Conductor starting with debug logging");
295        };
296
297        // Set up tracing based on --trace and --serve flags
298        let (trace_writer, _viewer_server) = match (&self.trace, self.serve) {
299            // --trace only: write to file
300            (Some(trace_path), false) => {
301                let writer = trace::TraceWriter::from_path(trace_path)
302                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
303                (Some(writer), None)
304            }
305            // --serve only: in-memory with viewer
306            (None, true) => {
307                let (handle, server) = sacp_trace_viewer::serve_memory(
308                    sacp_trace_viewer::TraceViewerConfig::default(),
309                )
310                .await?;
311                let writer = trace::TraceWriter::new(TraceHandleWriter(handle));
312                (Some(writer), Some(tokio::spawn(server)))
313            }
314            // --trace --serve: write to file and serve it
315            (Some(trace_path), true) => {
316                let writer = trace::TraceWriter::from_path(trace_path)
317                    .map_err(|e| anyhow::anyhow!("Failed to create trace writer: {}", e))?;
318                let server = sacp_trace_viewer::serve_file(
319                    trace_path.clone(),
320                    sacp_trace_viewer::TraceViewerConfig::default(),
321                );
322                (Some(writer), Some(tokio::spawn(server)))
323            }
324            // Neither: no tracing
325            (None, false) => (None, None),
326        };
327
328        self.run(debug_logger.as_ref(), trace_writer)
329            .instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
330            .await
331            .map_err(|err| anyhow::anyhow!("{err}"))
332    }
333
334    async fn run(
335        self,
336        debug_logger: Option<&debug_logger::DebugLogger>,
337        trace_writer: Option<trace::TraceWriter>,
338    ) -> Result<(), sacp::Error> {
339        match self.command {
340            ConductorCommand::Agent { name, components } => {
341                initialize_conductor(
342                    debug_logger,
343                    trace_writer,
344                    name,
345                    components,
346                    |name, providers, mcp_mode| Conductor::new_agent(name, providers, mcp_mode),
347                )
348                .await
349            }
350            ConductorCommand::Proxy { name, proxies } => {
351                initialize_conductor(
352                    debug_logger,
353                    trace_writer,
354                    name,
355                    proxies,
356                    |name, providers, mcp_mode| Conductor::new_proxy(name, providers, mcp_mode),
357                )
358                .await
359            }
360            ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
361        }
362    }
363}
364
365async fn initialize_conductor<Link: ConductorLink>(
366    debug_logger: Option<&debug_logger::DebugLogger>,
367    trace_writer: Option<trace::TraceWriter>,
368    name: String,
369    components: Vec<String>,
370    new_conductor: impl FnOnce(String, CommandLineComponents, crate::McpBridgeMode) -> Conductor<Link>,
371) -> Result<(), sacp::Error> {
372    // Parse agents and optionally wrap with debug callbacks
373    let providers: Vec<AcpAgent> = components
374        .into_iter()
375        .enumerate()
376        .map(|(i, s)| {
377            let mut agent = AcpAgent::from_str(&s)?;
378            if let Some(logger) = debug_logger {
379                agent = agent.with_debug(logger.create_callback(i.to_string()));
380            }
381            Ok(agent)
382        })
383        .collect::<Result<Vec<_>, sacp::Error>>()?;
384
385    // Create Stdio component with optional debug logging
386    let stdio = if let Some(logger) = debug_logger {
387        Stdio::new().with_debug(logger.create_callback("C".to_string()))
388    } else {
389        Stdio::new()
390    };
391
392    // Create conductor with optional trace writer
393    let mut conductor = new_conductor(name, CommandLineComponents(providers), Default::default());
394    if let Some(writer) = trace_writer {
395        conductor = conductor.with_trace_writer(writer);
396    }
397
398    conductor
399        .into_connection_builder()
400        .connect_to(stdio)?
401        .serve()
402        .await
403}