Skip to main content

plexus_substrate/activations/claudecode/
executor.rs

1use super::types::{Model, RawClaudeEvent};
2use async_stream::stream;
3use futures::Stream;
4use serde_json::Value;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::process::Stdio;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command;
10
11/// Configuration for a Claude Code session launch
12#[derive(Debug, Clone)]
13pub struct LaunchConfig {
14    /// The query/prompt to send
15    pub query: String,
16    /// Resume an existing Claude session
17    pub session_id: Option<String>,
18    /// Fork the session instead of resuming
19    pub fork_session: bool,
20    /// Model to use
21    pub model: Model,
22    /// Working directory
23    pub working_dir: String,
24    /// System prompt
25    pub system_prompt: Option<String>,
26    /// MCP configuration (written to temp file)
27    pub mcp_config: Option<Value>,
28    /// Permission prompt tool name
29    pub permission_prompt_tool: Option<String>,
30    /// Allowed tools
31    pub allowed_tools: Vec<String>,
32    /// Disallowed tools
33    pub disallowed_tools: Vec<String>,
34    /// Max turns
35    pub max_turns: Option<i32>,
36    /// Enable loopback mode - routes tool permissions through Plexus for parent approval
37    pub loopback_enabled: bool,
38    /// Session ID for loopback correlation
39    pub loopback_session_id: Option<String>,
40}
41
42impl Default for LaunchConfig {
43    fn default() -> Self {
44        Self {
45            query: String::new(),
46            session_id: None,
47            fork_session: false,
48            model: Model::Sonnet,
49            working_dir: ".".to_string(),
50            system_prompt: None,
51            mcp_config: None,
52            permission_prompt_tool: None,
53            allowed_tools: Vec::new(),
54            disallowed_tools: Vec::new(),
55            max_turns: None,
56            loopback_enabled: false,
57            loopback_session_id: None,
58        }
59    }
60}
61
62/// Executor that wraps the Claude Code CLI
63#[derive(Clone)]
64pub struct ClaudeCodeExecutor {
65    claude_path: String,
66}
67
68impl ClaudeCodeExecutor {
69    pub fn new() -> Self {
70        Self {
71            claude_path: Self::find_claude_binary().unwrap_or_else(|| "claude".to_string()),
72        }
73    }
74
75    pub fn with_path(path: String) -> Self {
76        Self { claude_path: path }
77    }
78
79    /// Discover the Claude binary location
80    fn find_claude_binary() -> Option<String> {
81        // Check common locations
82        let home = dirs::home_dir()?;
83
84        let candidates = [
85            home.join(".claude/local/claude"),
86            home.join(".npm/bin/claude"),
87            home.join(".bun/bin/claude"),
88            home.join(".local/bin/claude"),
89            PathBuf::from("/usr/local/bin/claude"),
90            PathBuf::from("/opt/homebrew/bin/claude"),
91        ];
92
93        for candidate in &candidates {
94            if candidate.exists() {
95                return candidate.to_str().map(|s| s.to_string());
96            }
97        }
98
99        // Try PATH
100        which::which("claude")
101            .ok()
102            .and_then(|p| p.to_str().map(|s| s.to_string()))
103    }
104
105    /// Build command line arguments from config
106    fn build_args(&self, config: &LaunchConfig) -> Vec<String> {
107        let mut args = vec![
108            "--output-format".to_string(),
109            "stream-json".to_string(),
110            "--include-partial-messages".to_string(),
111            "--verbose".to_string(),
112            "--print".to_string(),
113        ];
114
115        // Session resumption
116        if let Some(ref session_id) = config.session_id {
117            args.push("--resume".to_string());
118            args.push(session_id.clone());
119
120            if config.fork_session {
121                args.push("--fork-session".to_string());
122            }
123        }
124
125        // Model
126        args.push("--model".to_string());
127        args.push(config.model.as_str().to_string());
128
129        // Max turns
130        if let Some(max) = config.max_turns {
131            args.push("--max-turns".to_string());
132            args.push(max.to_string());
133        }
134
135        // System prompt
136        if let Some(ref prompt) = config.system_prompt {
137            args.push("--system-prompt".to_string());
138            args.push(prompt.clone());
139        }
140
141        // Permission prompt tool - loopback takes precedence
142        if config.loopback_enabled {
143            args.push("--permission-prompt-tool".to_string());
144            args.push("mcp__plexus__loopback_permit".to_string());
145        } else if let Some(ref tool) = config.permission_prompt_tool {
146            args.push("--permission-prompt-tool".to_string());
147            args.push(tool.clone());
148        }
149
150        // Allowed tools
151        if !config.allowed_tools.is_empty() {
152            args.push("--allowedTools".to_string());
153            args.push(config.allowed_tools.join(","));
154        }
155
156        // Disallowed tools
157        if !config.disallowed_tools.is_empty() {
158            args.push("--disallowedTools".to_string());
159            args.push(config.disallowed_tools.join(","));
160        }
161
162        // Query must be last
163        args.push("--".to_string());
164        args.push(config.query.clone());
165
166        args
167    }
168
169    /// Write MCP config to a temp file and return the path
170    #[allow(dead_code)]
171    async fn write_mcp_config(&self, config: &Value) -> Result<String, String> {
172        let temp_dir = std::env::temp_dir();
173        let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
174
175        let json = serde_json::to_string_pretty(config)
176            .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
177
178        tokio::fs::write(&temp_path, json)
179            .await
180            .map_err(|e| format!("Failed to write MCP config: {}", e))?;
181
182        Ok(temp_path.to_string_lossy().to_string())
183    }
184
185    /// Launch a Claude Code session and stream raw events
186    pub async fn launch(
187        &self,
188        config: LaunchConfig,
189    ) -> Pin<Box<dyn Stream<Item = RawClaudeEvent> + Send + 'static>> {
190        let mut args = self.build_args(&config);
191        let claude_path = self.claude_path.clone();
192        let working_dir = config.working_dir.clone();
193        let loopback_enabled = config.loopback_enabled;
194        let loopback_session_id = config.loopback_session_id.clone();
195
196        // Build MCP config - merge loopback config if enabled
197        let mcp_config = if loopback_enabled {
198            let base_url = std::env::var("PLEXUS_MCP_URL")
199                .unwrap_or_else(|_| "http://127.0.0.1:4445/mcp".to_string());
200
201            // Include session_id in URL for correlation when loopback_permit is called
202            let plexus_url = if let Some(ref sid) = loopback_session_id {
203                format!("{}?session_id={}", base_url, sid)
204            } else {
205                base_url
206            };
207
208            let loopback_mcp = serde_json::json!({
209                "mcpServers": {
210                    "plexus": {
211                        "type": "http",
212                        "url": plexus_url
213                    }
214                }
215            });
216
217            // Merge with existing config if present
218            match config.mcp_config {
219                Some(existing) => {
220                    // Merge mcpServers from both
221                    let mut merged = existing.clone();
222                    if let (Some(existing_servers), Some(loopback_servers)) = (
223                        merged.get_mut("mcpServers"),
224                        loopback_mcp.get("mcpServers")
225                    ) {
226                        if let (Some(existing_obj), Some(loopback_obj)) = (
227                            existing_servers.as_object_mut(),
228                            loopback_servers.as_object()
229                        ) {
230                            for (k, v) in loopback_obj {
231                                existing_obj.insert(k.clone(), v.clone());
232                            }
233                        }
234                    } else {
235                        merged["mcpServers"] = loopback_mcp["mcpServers"].clone();
236                    }
237                    Some(merged)
238                }
239                None => Some(loopback_mcp)
240            }
241        } else {
242            config.mcp_config.clone()
243        };
244
245        Box::pin(stream! {
246            // Handle MCP config if present
247            let mcp_path = if let Some(ref mcp) = mcp_config {
248                match Self::write_mcp_config_sync(mcp) {
249                    Ok(path) => {
250                        // Insert MCP config args before the "--" separator
251                        if let Some(pos) = args.iter().position(|a| a == "--") {
252                            args.insert(pos, path.clone());
253                            args.insert(pos, "--mcp-config".to_string());
254                        }
255                        Some(path)
256                    }
257                    Err(e) => {
258                        yield RawClaudeEvent::Result {
259                            subtype: Some("error".to_string()),
260                            session_id: None,
261                            cost_usd: None,
262                            is_error: Some(true),
263                            duration_ms: None,
264                            num_turns: None,
265                            result: None,
266                            error: Some(e),
267                        };
268                        return;
269                    }
270                }
271            } else {
272                None
273            };
274
275            // Spawn Claude process via shell to ensure clean process context
276            // This avoids any potential issues with nested Claude sessions
277            fn shell_escape(s: &str) -> String {
278                // Escape by wrapping in single quotes and escaping any single quotes
279                format!("'{}'", s.replace("'", "'\\''"))
280            }
281
282            let shell_cmd = format!(
283                "{} {}",
284                shell_escape(&claude_path),
285                args.iter()
286                    .map(|a| shell_escape(a))
287                    .collect::<Vec<_>>()
288                    .join(" ")
289            );
290
291            // Debug: log the command being executed
292            tracing::debug!(cmd = %shell_cmd, "Launching Claude Code");
293            eprintln!("[DEBUG] Claude command: {}", shell_cmd);
294
295            let mut cmd = Command::new("bash");
296            cmd.args(&["-c", &shell_cmd])
297                .current_dir(&working_dir)
298                .stdout(Stdio::piped())
299                .stderr(Stdio::piped())
300                .stdin(Stdio::null());
301
302            // Set loopback session ID env var if loopback is enabled
303            if loopback_enabled {
304                if let Some(ref session_id) = loopback_session_id {
305                    cmd.env("LOOPBACK_SESSION_ID", session_id);
306                }
307            }
308
309            let mut child = match cmd.spawn() {
310                Ok(c) => c,
311                Err(e) => {
312                    yield RawClaudeEvent::Result {
313                        subtype: Some("error".to_string()),
314                        session_id: None,
315                        cost_usd: None,
316                        is_error: Some(true),
317                        duration_ms: None,
318                        num_turns: None,
319                        result: None,
320                        error: Some(format!("Failed to spawn claude: {}", e)),
321                    };
322                    return;
323                }
324            };
325
326            let stdout = child.stdout.take().expect("stdout");
327            let mut reader = BufReader::with_capacity(10 * 1024 * 1024, stdout).lines(); // 10MB buffer
328
329            // Stream events from stdout
330            while let Ok(Some(line)) = reader.next_line().await {
331                if line.trim().is_empty() {
332                    continue;
333                }
334
335                match serde_json::from_str::<RawClaudeEvent>(&line) {
336                    Ok(event) => {
337                        let is_result = matches!(event, RawClaudeEvent::Result { .. });
338                        yield event;
339                        if is_result {
340                            break;
341                        }
342                    }
343                    Err(_) => {
344                        // Try to parse as generic JSON and wrap as Unknown event
345                        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
346                            let event_type = value.get("type")
347                                .and_then(|t| t.as_str())
348                                .unwrap_or("unknown_json")
349                                .to_string();
350                            yield RawClaudeEvent::Unknown {
351                                event_type,
352                                data: value,
353                            };
354                        } else {
355                            // Non-JSON output (raw text, errors, etc.)
356                            yield RawClaudeEvent::Unknown {
357                                event_type: "raw_output".to_string(),
358                                data: serde_json::Value::String(line),
359                            };
360                        }
361                    }
362                }
363            }
364
365            // Cleanup
366            let _ = child.wait().await;
367            if let Some(path) = mcp_path {
368                let _ = tokio::fs::remove_file(path).await;
369            }
370        })
371    }
372
373    /// Sync version of write_mcp_config for use in async stream
374    fn write_mcp_config_sync(config: &Value) -> Result<String, String> {
375        let temp_dir = std::env::temp_dir();
376        let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
377
378        let json = serde_json::to_string_pretty(config)
379            .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
380
381        std::fs::write(&temp_path, json)
382            .map_err(|e| format!("Failed to write MCP config: {}", e))?;
383
384        Ok(temp_path.to_string_lossy().to_string())
385    }
386}
387
388impl Default for ClaudeCodeExecutor {
389    fn default() -> Self {
390        Self::new()
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_build_args_basic() {
400        let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
401        let config = LaunchConfig {
402            query: "hello".to_string(),
403            model: Model::Sonnet,
404            working_dir: "/tmp".to_string(),
405            ..Default::default()
406        };
407
408        let args = executor.build_args(&config);
409
410        assert!(args.contains(&"--output-format".to_string()));
411        assert!(args.contains(&"stream-json".to_string()));
412        assert!(args.contains(&"--model".to_string()));
413        assert!(args.contains(&"sonnet".to_string()));
414        assert!(args.contains(&"--".to_string()));
415        assert!(args.contains(&"hello".to_string()));
416    }
417
418    #[test]
419    fn test_build_args_with_resume() {
420        let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
421        let config = LaunchConfig {
422            query: "continue".to_string(),
423            session_id: Some("sess_123".to_string()),
424            model: Model::Haiku,
425            working_dir: "/tmp".to_string(),
426            ..Default::default()
427        };
428
429        let args = executor.build_args(&config);
430
431        assert!(args.contains(&"--resume".to_string()));
432        assert!(args.contains(&"sess_123".to_string()));
433        assert!(args.contains(&"haiku".to_string()));
434    }
435
436    #[test]
437    fn test_build_args_with_fork() {
438        let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
439        let config = LaunchConfig {
440            query: "branch".to_string(),
441            session_id: Some("sess_123".to_string()),
442            fork_session: true,
443            model: Model::Opus,
444            working_dir: "/tmp".to_string(),
445            ..Default::default()
446        };
447
448        let args = executor.build_args(&config);
449
450        assert!(args.contains(&"--resume".to_string()));
451        assert!(args.contains(&"--fork-session".to_string()));
452        assert!(args.contains(&"opus".to_string()));
453    }
454}