plexus_substrate/activations/claudecode/
executor.rs1use super::types::{Model, RawClaudeEvent};
2use async_stream::stream;
3use futures::Stream;
4use serde_json::Value;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::process::Stdio;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command;
10
11#[derive(Debug, Clone)]
13pub struct LaunchConfig {
14 pub query: String,
16 pub session_id: Option<String>,
18 pub fork_session: bool,
20 pub model: Model,
22 pub working_dir: String,
24 pub system_prompt: Option<String>,
26 pub mcp_config: Option<Value>,
28 pub permission_prompt_tool: Option<String>,
30 pub allowed_tools: Vec<String>,
32 pub disallowed_tools: Vec<String>,
34 pub max_turns: Option<i32>,
36 pub loopback_enabled: bool,
38 pub loopback_session_id: Option<String>,
40}
41
42impl Default for LaunchConfig {
43 fn default() -> Self {
44 Self {
45 query: String::new(),
46 session_id: None,
47 fork_session: false,
48 model: Model::Sonnet,
49 working_dir: ".".to_string(),
50 system_prompt: None,
51 mcp_config: None,
52 permission_prompt_tool: None,
53 allowed_tools: Vec::new(),
54 disallowed_tools: Vec::new(),
55 max_turns: None,
56 loopback_enabled: false,
57 loopback_session_id: None,
58 }
59 }
60}
61
62#[derive(Clone)]
64pub struct ClaudeCodeExecutor {
65 claude_path: String,
66}
67
68impl ClaudeCodeExecutor {
69 pub fn new() -> Self {
70 Self {
71 claude_path: Self::find_claude_binary().unwrap_or_else(|| "claude".to_string()),
72 }
73 }
74
75 pub fn with_path(path: String) -> Self {
76 Self { claude_path: path }
77 }
78
79 fn find_claude_binary() -> Option<String> {
81 let home = dirs::home_dir()?;
83
84 let candidates = [
85 home.join(".claude/local/claude"),
86 home.join(".npm/bin/claude"),
87 home.join(".bun/bin/claude"),
88 home.join(".local/bin/claude"),
89 PathBuf::from("/usr/local/bin/claude"),
90 PathBuf::from("/opt/homebrew/bin/claude"),
91 ];
92
93 for candidate in &candidates {
94 if candidate.exists() {
95 return candidate.to_str().map(|s| s.to_string());
96 }
97 }
98
99 which::which("claude")
101 .ok()
102 .and_then(|p| p.to_str().map(|s| s.to_string()))
103 }
104
105 fn build_args(&self, config: &LaunchConfig) -> Vec<String> {
107 let mut args = vec![
108 "--output-format".to_string(),
109 "stream-json".to_string(),
110 "--include-partial-messages".to_string(),
111 "--verbose".to_string(),
112 "--print".to_string(),
113 ];
114
115 if let Some(ref session_id) = config.session_id {
117 args.push("--resume".to_string());
118 args.push(session_id.clone());
119
120 if config.fork_session {
121 args.push("--fork-session".to_string());
122 }
123 }
124
125 args.push("--model".to_string());
127 args.push(config.model.as_str().to_string());
128
129 if let Some(max) = config.max_turns {
131 args.push("--max-turns".to_string());
132 args.push(max.to_string());
133 }
134
135 if let Some(ref prompt) = config.system_prompt {
137 args.push("--system-prompt".to_string());
138 args.push(prompt.clone());
139 }
140
141 if config.loopback_enabled {
143 args.push("--permission-prompt-tool".to_string());
144 args.push("mcp__plexus__loopback_permit".to_string());
145 } else if let Some(ref tool) = config.permission_prompt_tool {
146 args.push("--permission-prompt-tool".to_string());
147 args.push(tool.clone());
148 }
149
150 if !config.allowed_tools.is_empty() {
152 args.push("--allowedTools".to_string());
153 args.push(config.allowed_tools.join(","));
154 }
155
156 if !config.disallowed_tools.is_empty() {
158 args.push("--disallowedTools".to_string());
159 args.push(config.disallowed_tools.join(","));
160 }
161
162 args.push("--".to_string());
164 args.push(config.query.clone());
165
166 args
167 }
168
169 #[allow(dead_code)]
171 async fn write_mcp_config(&self, config: &Value) -> Result<String, String> {
172 let temp_dir = std::env::temp_dir();
173 let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
174
175 let json = serde_json::to_string_pretty(config)
176 .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
177
178 tokio::fs::write(&temp_path, json)
179 .await
180 .map_err(|e| format!("Failed to write MCP config: {}", e))?;
181
182 Ok(temp_path.to_string_lossy().to_string())
183 }
184
185 pub async fn launch(
187 &self,
188 config: LaunchConfig,
189 ) -> Pin<Box<dyn Stream<Item = RawClaudeEvent> + Send + 'static>> {
190 let mut args = self.build_args(&config);
191 let claude_path = self.claude_path.clone();
192 let working_dir = config.working_dir.clone();
193 let loopback_enabled = config.loopback_enabled;
194 let loopback_session_id = config.loopback_session_id.clone();
195
196 let mcp_config = if loopback_enabled {
198 let base_url = std::env::var("PLEXUS_MCP_URL")
199 .unwrap_or_else(|_| "http://127.0.0.1:4445/mcp".to_string());
200
201 let plexus_url = if let Some(ref sid) = loopback_session_id {
203 format!("{}?session_id={}", base_url, sid)
204 } else {
205 base_url
206 };
207
208 let loopback_mcp = serde_json::json!({
209 "mcpServers": {
210 "plexus": {
211 "type": "http",
212 "url": plexus_url
213 }
214 }
215 });
216
217 match config.mcp_config {
219 Some(existing) => {
220 let mut merged = existing.clone();
222 if let (Some(existing_servers), Some(loopback_servers)) = (
223 merged.get_mut("mcpServers"),
224 loopback_mcp.get("mcpServers")
225 ) {
226 if let (Some(existing_obj), Some(loopback_obj)) = (
227 existing_servers.as_object_mut(),
228 loopback_servers.as_object()
229 ) {
230 for (k, v) in loopback_obj {
231 existing_obj.insert(k.clone(), v.clone());
232 }
233 }
234 } else {
235 merged["mcpServers"] = loopback_mcp["mcpServers"].clone();
236 }
237 Some(merged)
238 }
239 None => Some(loopback_mcp)
240 }
241 } else {
242 config.mcp_config.clone()
243 };
244
245 Box::pin(stream! {
246 let mcp_path = if let Some(ref mcp) = mcp_config {
248 match Self::write_mcp_config_sync(mcp) {
249 Ok(path) => {
250 if let Some(pos) = args.iter().position(|a| a == "--") {
252 args.insert(pos, path.clone());
253 args.insert(pos, "--mcp-config".to_string());
254 }
255 Some(path)
256 }
257 Err(e) => {
258 yield RawClaudeEvent::Result {
259 subtype: Some("error".to_string()),
260 session_id: None,
261 cost_usd: None,
262 is_error: Some(true),
263 duration_ms: None,
264 num_turns: None,
265 result: None,
266 error: Some(e),
267 };
268 return;
269 }
270 }
271 } else {
272 None
273 };
274
275 fn shell_escape(s: &str) -> String {
278 format!("'{}'", s.replace("'", "'\\''"))
280 }
281
282 let shell_cmd = format!(
283 "{} {}",
284 shell_escape(&claude_path),
285 args.iter()
286 .map(|a| shell_escape(a))
287 .collect::<Vec<_>>()
288 .join(" ")
289 );
290
291 tracing::debug!(cmd = %shell_cmd, "Launching Claude Code");
293 eprintln!("[DEBUG] Claude command: {}", shell_cmd);
294
295 let mut cmd = Command::new("bash");
296 cmd.args(&["-c", &shell_cmd])
297 .current_dir(&working_dir)
298 .stdout(Stdio::piped())
299 .stderr(Stdio::piped())
300 .stdin(Stdio::null());
301
302 if loopback_enabled {
304 if let Some(ref session_id) = loopback_session_id {
305 cmd.env("LOOPBACK_SESSION_ID", session_id);
306 }
307 }
308
309 let mut child = match cmd.spawn() {
310 Ok(c) => c,
311 Err(e) => {
312 yield RawClaudeEvent::Result {
313 subtype: Some("error".to_string()),
314 session_id: None,
315 cost_usd: None,
316 is_error: Some(true),
317 duration_ms: None,
318 num_turns: None,
319 result: None,
320 error: Some(format!("Failed to spawn claude: {}", e)),
321 };
322 return;
323 }
324 };
325
326 let stdout = child.stdout.take().expect("stdout");
327 let mut reader = BufReader::with_capacity(10 * 1024 * 1024, stdout).lines(); while let Ok(Some(line)) = reader.next_line().await {
331 if line.trim().is_empty() {
332 continue;
333 }
334
335 match serde_json::from_str::<RawClaudeEvent>(&line) {
336 Ok(event) => {
337 let is_result = matches!(event, RawClaudeEvent::Result { .. });
338 yield event;
339 if is_result {
340 break;
341 }
342 }
343 Err(_) => {
344 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
346 let event_type = value.get("type")
347 .and_then(|t| t.as_str())
348 .unwrap_or("unknown_json")
349 .to_string();
350 yield RawClaudeEvent::Unknown {
351 event_type,
352 data: value,
353 };
354 } else {
355 yield RawClaudeEvent::Unknown {
357 event_type: "raw_output".to_string(),
358 data: serde_json::Value::String(line),
359 };
360 }
361 }
362 }
363 }
364
365 let _ = child.wait().await;
367 if let Some(path) = mcp_path {
368 let _ = tokio::fs::remove_file(path).await;
369 }
370 })
371 }
372
373 fn write_mcp_config_sync(config: &Value) -> Result<String, String> {
375 let temp_dir = std::env::temp_dir();
376 let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
377
378 let json = serde_json::to_string_pretty(config)
379 .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
380
381 std::fs::write(&temp_path, json)
382 .map_err(|e| format!("Failed to write MCP config: {}", e))?;
383
384 Ok(temp_path.to_string_lossy().to_string())
385 }
386}
387
388impl Default for ClaudeCodeExecutor {
389 fn default() -> Self {
390 Self::new()
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn test_build_args_basic() {
400 let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
401 let config = LaunchConfig {
402 query: "hello".to_string(),
403 model: Model::Sonnet,
404 working_dir: "/tmp".to_string(),
405 ..Default::default()
406 };
407
408 let args = executor.build_args(&config);
409
410 assert!(args.contains(&"--output-format".to_string()));
411 assert!(args.contains(&"stream-json".to_string()));
412 assert!(args.contains(&"--model".to_string()));
413 assert!(args.contains(&"sonnet".to_string()));
414 assert!(args.contains(&"--".to_string()));
415 assert!(args.contains(&"hello".to_string()));
416 }
417
418 #[test]
419 fn test_build_args_with_resume() {
420 let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
421 let config = LaunchConfig {
422 query: "continue".to_string(),
423 session_id: Some("sess_123".to_string()),
424 model: Model::Haiku,
425 working_dir: "/tmp".to_string(),
426 ..Default::default()
427 };
428
429 let args = executor.build_args(&config);
430
431 assert!(args.contains(&"--resume".to_string()));
432 assert!(args.contains(&"sess_123".to_string()));
433 assert!(args.contains(&"haiku".to_string()));
434 }
435
436 #[test]
437 fn test_build_args_with_fork() {
438 let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
439 let config = LaunchConfig {
440 query: "branch".to_string(),
441 session_id: Some("sess_123".to_string()),
442 fork_session: true,
443 model: Model::Opus,
444 working_dir: "/tmp".to_string(),
445 ..Default::default()
446 };
447
448 let args = executor.build_args(&config);
449
450 assert!(args.contains(&"--resume".to_string()));
451 assert!(args.contains(&"--fork-session".to_string()));
452 assert!(args.contains(&"opus".to_string()));
453 }
454}