Skip to main content

plexus_substrate/activations/claudecode/
executor.rs

1use super::types::{Model, RawClaudeEvent};
2use async_stream::stream;
3use futures::Stream;
4use serde_json::Value;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::process::Stdio;
8use std::sync::Arc;
9use thiserror::Error;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::net::TcpStream;
12use tokio::process::Command;
13use tokio::sync::Mutex;
14
15/// Errors from the Claude Code executor
16#[derive(Debug, Error)]
17pub enum ExecutorError {
18    #[error("failed to spawn claude process (binary='{binary}', cwd='{cwd}'): {source}")]
19    SpawnFailed {
20        binary: String,
21        cwd: String,
22        source: std::io::Error,
23    },
24
25    #[error("failed to write MCP config to '{path}': {reason}")]
26    McpConfigWrite {
27        path: String,
28        reason: String,
29    },
30}
31
32// ─── MCP Reachability Check ───────────────────────────────────────────────────
33
34/// Extract `host:port` from a URL like `http://127.0.0.1:4444/mcp`.
35fn mcp_host_port_from_url(url: &str) -> String {
36    let without_scheme = url
37        .trim_start_matches("https://")
38        .trim_start_matches("http://");
39    let host_port = without_scheme.split('/').next().unwrap_or("127.0.0.1:4444");
40    if host_port.contains(':') {
41        host_port.to_string()
42    } else {
43        format!("{}:4444", host_port)
44    }
45}
46
47/// Check that the Plexus MCP server is reachable via TCP.
48///
49/// Reads `PLEXUS_MCP_URL` (default `http://127.0.0.1:4444/mcp`) to determine
50/// the host:port.  Attempts a TCP connect with a 2-second timeout.
51///
52/// Returns an actionable error message if the server is not reachable, so
53/// callers can fail fast before spawning Claude with a broken MCP config.
54pub async fn check_mcp_reachable() -> Result<(), String> {
55    let url = std::env::var("PLEXUS_MCP_URL")
56        .unwrap_or_else(|_| "http://127.0.0.1:4444/mcp".to_string());
57    let addr = mcp_host_port_from_url(&url);
58
59    match tokio::time::timeout(
60        std::time::Duration::from_secs(2),
61        TcpStream::connect(&addr),
62    )
63    .await
64    {
65        Ok(Ok(_)) => Ok(()),
66        Ok(Err(e)) => Err(format!(
67            "MCP server not reachable at {} ({}). \
68             Start the substrate without --no-mcp so the permission-prompt tool is available.",
69            url, e
70        )),
71        Err(_) => Err(format!(
72            "MCP server connection timed out at {}. \
73             Start the substrate without --no-mcp so the permission-prompt tool is available.",
74            url
75        )),
76    }
77}
78
79/// Configuration for a Claude Code session launch
80#[derive(Debug, Clone)]
81pub struct LaunchConfig {
82    /// The query/prompt to send
83    pub query: String,
84    /// Resume an existing Claude session
85    pub session_id: Option<String>,
86    /// Fork the session instead of resuming
87    pub fork_session: bool,
88    /// Model to use
89    pub model: Model,
90    /// Working directory
91    pub working_dir: String,
92    /// System prompt
93    pub system_prompt: Option<String>,
94    /// MCP configuration (written to temp file)
95    pub mcp_config: Option<Value>,
96    /// Permission prompt tool name
97    pub permission_prompt_tool: Option<String>,
98    /// Allowed tools
99    pub allowed_tools: Vec<String>,
100    /// Disallowed tools
101    pub disallowed_tools: Vec<String>,
102    /// Max turns
103    pub max_turns: Option<i32>,
104    /// Enable loopback mode - routes tool permissions through Plexus for parent approval
105    pub loopback_enabled: bool,
106    /// Session ID for loopback correlation
107    pub loopback_session_id: Option<String>,
108}
109
110impl Default for LaunchConfig {
111    fn default() -> Self {
112        Self {
113            query: String::new(),
114            session_id: None,
115            fork_session: false,
116            model: Model::Sonnet,
117            working_dir: ".".to_string(),
118            system_prompt: None,
119            mcp_config: None,
120            permission_prompt_tool: None,
121            allowed_tools: Vec::new(),
122            disallowed_tools: Vec::new(),
123            max_turns: None,
124            loopback_enabled: false,
125            loopback_session_id: None,
126        }
127    }
128}
129
130/// Executor that wraps the Claude Code CLI
131#[derive(Clone)]
132pub struct ClaudeCodeExecutor {
133    claude_path: String,
134}
135
136impl ClaudeCodeExecutor {
137    pub fn new() -> Self {
138        Self {
139            claude_path: Self::find_claude_binary().unwrap_or_else(|| "claude".to_string()),
140        }
141    }
142
143    pub fn with_path(path: String) -> Self {
144        Self { claude_path: path }
145    }
146
147    /// Discover the Claude binary location
148    fn find_claude_binary() -> Option<String> {
149        // Check common locations
150        let home = dirs::home_dir()?;
151
152        let candidates = [
153            home.join(".claude/local/claude"),
154            home.join(".npm/bin/claude"),
155            home.join(".bun/bin/claude"),
156            home.join(".local/bin/claude"),
157            PathBuf::from("/usr/local/bin/claude"),
158            PathBuf::from("/opt/homebrew/bin/claude"),
159        ];
160
161        for candidate in &candidates {
162            if candidate.exists() {
163                return candidate.to_str().map(|s| s.to_string());
164            }
165        }
166
167        // Try PATH
168        which::which("claude")
169            .ok()
170            .and_then(|p| p.to_str().map(|s| s.to_string()))
171    }
172
173    /// Build command line arguments from config
174    fn build_args(&self, config: &LaunchConfig) -> Vec<String> {
175        let mut args = vec![
176            "--output-format".to_string(),
177            "stream-json".to_string(),
178            "--include-partial-messages".to_string(),
179            "--verbose".to_string(),
180            "--print".to_string(),
181        ];
182
183        // Session resumption
184        if let Some(ref session_id) = config.session_id {
185            args.push("--resume".to_string());
186            args.push(session_id.clone());
187
188            if config.fork_session {
189                args.push("--fork-session".to_string());
190            }
191        }
192
193        // Model
194        args.push("--model".to_string());
195        args.push(config.model.as_str().to_string());
196
197        // Max turns
198        if let Some(max) = config.max_turns {
199            args.push("--max-turns".to_string());
200            args.push(max.to_string());
201        }
202
203        // System prompt
204        if let Some(ref prompt) = config.system_prompt {
205            args.push("--system-prompt".to_string());
206            args.push(prompt.clone());
207        }
208
209        // Permission prompt tool - loopback takes precedence
210        if config.loopback_enabled {
211            args.push("--permission-prompt-tool".to_string());
212            args.push("mcp__plexus__loopback_permit".to_string());
213        } else if let Some(ref tool) = config.permission_prompt_tool {
214            args.push("--permission-prompt-tool".to_string());
215            args.push(tool.clone());
216        }
217
218        // Allowed tools
219        if !config.allowed_tools.is_empty() {
220            args.push("--allowedTools".to_string());
221            args.push(config.allowed_tools.join(","));
222        }
223
224        // Disallowed tools
225        if !config.disallowed_tools.is_empty() {
226            args.push("--disallowedTools".to_string());
227            args.push(config.disallowed_tools.join(","));
228        }
229
230        // Query must be last
231        args.push("--".to_string());
232        args.push(config.query.clone());
233
234        args
235    }
236
237    /// Write MCP config to a temp file and return the path
238    #[allow(dead_code)]
239    async fn write_mcp_config(&self, config: &Value) -> Result<String, String> {
240        let temp_dir = std::env::temp_dir();
241        let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
242
243        let json = serde_json::to_string_pretty(config)
244            .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
245
246        tokio::fs::write(&temp_path, json)
247            .await
248            .map_err(|e| format!("Failed to write MCP config: {}", e))?;
249
250        Ok(temp_path.to_string_lossy().to_string())
251    }
252
253    /// Launch a Claude Code session and stream raw events
254    pub async fn launch(
255        &self,
256        config: LaunchConfig,
257    ) -> Pin<Box<dyn Stream<Item = RawClaudeEvent> + Send + 'static>> {
258        let mut args = self.build_args(&config);
259        let claude_path = self.claude_path.clone();
260        let working_dir = config.working_dir.clone();
261        let loopback_enabled = config.loopback_enabled;
262        let loopback_session_id = config.loopback_session_id.clone();
263
264        // Build MCP config - merge loopback config if enabled
265        let mcp_config = if loopback_enabled {
266            let base_url = std::env::var("PLEXUS_MCP_URL")
267                .unwrap_or_else(|_| "http://127.0.0.1:4444/mcp".to_string());
268
269            // Include session_id in URL for correlation when loopback_permit is called
270            let plexus_url = if let Some(ref sid) = loopback_session_id {
271                format!("{}?session_id={}", base_url, sid)
272            } else {
273                base_url
274            };
275
276            let loopback_mcp = if let Some(ref sid) = loopback_session_id {
277                serde_json::json!({
278                    "mcpServers": {
279                        "plexus": {
280                            "type": "http",
281                            "url": plexus_url
282                        }
283                    },
284                    "env": {
285                        "PLEXUS_SESSION_ID": sid
286                    }
287                })
288            } else {
289                serde_json::json!({
290                    "mcpServers": {
291                        "plexus": {
292                            "type": "http",
293                            "url": plexus_url
294                        }
295                    }
296                })
297            };
298
299            // Merge with existing config if present
300            match config.mcp_config {
301                Some(existing) => {
302                    // Merge mcpServers from both
303                    let mut merged = existing.clone();
304                    if let (Some(existing_servers), Some(loopback_servers)) = (
305                        merged.get_mut("mcpServers"),
306                        loopback_mcp.get("mcpServers")
307                    ) {
308                        if let (Some(existing_obj), Some(loopback_obj)) = (
309                            existing_servers.as_object_mut(),
310                            loopback_servers.as_object()
311                        ) {
312                            for (k, v) in loopback_obj {
313                                existing_obj.insert(k.clone(), v.clone());
314                            }
315                        }
316                    } else {
317                        merged["mcpServers"] = loopback_mcp["mcpServers"].clone();
318                    }
319                    Some(merged)
320                }
321                None => Some(loopback_mcp)
322            }
323        } else {
324            config.mcp_config.clone()
325        };
326
327        Box::pin(stream! {
328            macro_rules! yield_error {
329                ($err:expr) => {{
330                    let err: ExecutorError = $err;
331                    tracing::error!(error = %err, "Claude executor error");
332                    yield RawClaudeEvent::Result {
333                        subtype: Some("error".to_string()),
334                        session_id: None,
335                        cost_usd: None,
336                        is_error: Some(true),
337                        duration_ms: None,
338                        num_turns: None,
339                        result: None,
340                        error: Some(err.to_string()),
341                    };
342                }};
343            }
344
345            // Fail fast if loopback is enabled but the MCP server is not reachable.
346            // Without a live MCP server Claude cannot call the permission-prompt tool
347            // and will return empty output (silent failure).
348            if loopback_enabled {
349                if let Err(e) = check_mcp_reachable().await {
350                    yield RawClaudeEvent::Result {
351                        subtype: Some("error".to_string()),
352                        session_id: None,
353                        cost_usd: None,
354                        is_error: Some(true),
355                        duration_ms: None,
356                        num_turns: None,
357                        result: None,
358                        error: Some(e),
359                    };
360                    return;
361                }
362            }
363
364            // Handle MCP config if present
365            let mcp_path = if let Some(ref mcp) = mcp_config {
366                match Self::write_mcp_config_sync(mcp) {
367                    Ok(path) => {
368                        // Insert MCP config args before the "--" separator
369                        if let Some(pos) = args.iter().position(|a| a == "--") {
370                            args.insert(pos, path.clone());
371                            args.insert(pos, "--mcp-config".to_string());
372                        }
373                        Some(path)
374                    }
375                    Err(e) => {
376                        yield_error!(ExecutorError::McpConfigWrite {
377                            path: std::env::temp_dir().to_string_lossy().to_string(),
378                            reason: e,
379                        });
380                        return;
381                    }
382                }
383            } else {
384                None
385            };
386
387            // Spawn Claude process via shell to ensure clean process context
388            // This avoids any potential issues with nested Claude sessions
389            fn shell_escape(s: &str) -> String {
390                // Escape by wrapping in single quotes and escaping any single quotes
391                format!("'{}'", s.replace("'", "'\\''"))
392            }
393
394            let shell_cmd = format!(
395                "{} {}",
396                shell_escape(&claude_path),
397                args.iter()
398                    .map(|a| shell_escape(a))
399                    .collect::<Vec<_>>()
400                    .join(" ")
401            );
402
403            tracing::debug!(cmd = %shell_cmd, "Launching Claude Code");
404
405            // Emit the launch command as an event (captured in arbor for debugging)
406            yield RawClaudeEvent::LaunchCommand { command: shell_cmd.clone() };
407
408            let mut cmd = Command::new("bash");
409            cmd.args(&["-c", &shell_cmd])
410                .current_dir(&working_dir)
411                .stdout(Stdio::piped())
412                .stderr(Stdio::piped())
413                .stdin(Stdio::null())
414                // Unset CLAUDECODE so nested Claude sessions are allowed
415                .env_remove("CLAUDECODE");
416
417            // Set loopback session ID env var if loopback is enabled
418            if loopback_enabled {
419                if let Some(ref session_id) = loopback_session_id {
420                    cmd.env("PLEXUS_SESSION_ID", session_id);
421                }
422            }
423
424            let mut child = match cmd.spawn() {
425                Ok(c) => c,
426                Err(e) => {
427                    yield_error!(ExecutorError::SpawnFailed {
428                        binary: claude_path.clone(),
429                        cwd: working_dir.clone(),
430                        source: e,
431                    });
432                    return;
433                }
434            };
435
436            let stdout = child.stdout.take().expect("stdout");
437            let mut reader = BufReader::with_capacity(10 * 1024 * 1024, stdout).lines(); // 10MB buffer
438
439            // Capture stderr in a background task to prevent pipe buffer blocking
440            let stderr_buffer: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
441            let stderr = child.stderr.take().expect("stderr");
442            let stderr_buf = stderr_buffer.clone();
443            tokio::spawn(async move {
444                let mut stderr_reader = BufReader::new(stderr).lines();
445                while let Ok(Some(line)) = stderr_reader.next_line().await {
446                    let mut buf = stderr_buf.lock().await;
447                    if buf.len() < 100 {
448                        buf.push(line);
449                    }
450                }
451            });
452
453            // Stream events from stdout
454            while let Ok(Some(line)) = reader.next_line().await {
455                if line.trim().is_empty() {
456                    continue;
457                }
458
459                match serde_json::from_str::<RawClaudeEvent>(&line) {
460                    Ok(event) => {
461                        let is_result = matches!(event, RawClaudeEvent::Result { .. });
462                        yield event;
463                        if is_result {
464                            break;
465                        }
466                    }
467                    Err(_) => {
468                        // Try to parse as generic JSON and wrap as Unknown event
469                        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
470                            let event_type = value.get("type")
471                                .and_then(|t| t.as_str())
472                                .unwrap_or("unknown_json")
473                                .to_string();
474                            yield RawClaudeEvent::Unknown {
475                                event_type,
476                                data: value,
477                            };
478                        } else {
479                            // Non-JSON output (raw text, errors, etc.)
480                            yield RawClaudeEvent::Unknown {
481                                event_type: "raw_output".to_string(),
482                                data: serde_json::Value::String(line),
483                            };
484                        }
485                    }
486                }
487            }
488
489            // Drain stderr and emit as events (captures error messages from Claude)
490            if let Some(stderr) = child.stderr.take() {
491                let mut stderr_reader = BufReader::new(stderr).lines();
492                while let Ok(Some(line)) = stderr_reader.next_line().await {
493                    if !line.trim().is_empty() {
494                        yield RawClaudeEvent::Stderr { text: line };
495                    }
496                }
497            }
498
499            // Cleanup
500            let _ = child.wait().await;
501
502            if let Some(path) = mcp_path {
503                let _ = tokio::fs::remove_file(path).await;
504            }
505        })
506    }
507
508    /// Sync version of write_mcp_config for use in async stream
509    fn write_mcp_config_sync(config: &Value) -> Result<String, String> {
510        let temp_dir = std::env::temp_dir();
511        let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
512
513        let json = serde_json::to_string_pretty(config)
514            .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
515
516        std::fs::write(&temp_path, json)
517            .map_err(|e| format!("Failed to write MCP config: {}", e))?;
518
519        Ok(temp_path.to_string_lossy().to_string())
520    }
521}
522
523impl Default for ClaudeCodeExecutor {
524    fn default() -> Self {
525        Self::new()
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532
533    #[test]
534    fn test_build_args_basic() {
535        let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
536        let config = LaunchConfig {
537            query: "hello".to_string(),
538            model: Model::Sonnet,
539            working_dir: "/tmp".to_string(),
540            ..Default::default()
541        };
542
543        let args = executor.build_args(&config);
544
545        assert!(args.contains(&"--output-format".to_string()));
546        assert!(args.contains(&"stream-json".to_string()));
547        assert!(args.contains(&"--model".to_string()));
548        assert!(args.contains(&"sonnet".to_string()));
549        assert!(args.contains(&"--".to_string()));
550        assert!(args.contains(&"hello".to_string()));
551    }
552
553    #[test]
554    fn test_build_args_with_resume() {
555        let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
556        let config = LaunchConfig {
557            query: "continue".to_string(),
558            session_id: Some("sess_123".to_string()),
559            model: Model::Haiku,
560            working_dir: "/tmp".to_string(),
561            ..Default::default()
562        };
563
564        let args = executor.build_args(&config);
565
566        assert!(args.contains(&"--resume".to_string()));
567        assert!(args.contains(&"sess_123".to_string()));
568        assert!(args.contains(&"haiku".to_string()));
569    }
570
571    #[test]
572    fn test_build_args_with_fork() {
573        let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
574        let config = LaunchConfig {
575            query: "branch".to_string(),
576            session_id: Some("sess_123".to_string()),
577            fork_session: true,
578            model: Model::Opus,
579            working_dir: "/tmp".to_string(),
580            ..Default::default()
581        };
582
583        let args = executor.build_args(&config);
584
585        assert!(args.contains(&"--resume".to_string()));
586        assert!(args.contains(&"--fork-session".to_string()));
587        assert!(args.contains(&"opus".to_string()));
588    }
589}