plexus_substrate/activations/claudecode/
executor.rs1use super::types::{Model, RawClaudeEvent};
2use async_stream::stream;
3use futures::Stream;
4use serde_json::Value;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::process::Stdio;
8use std::sync::Arc;
9use thiserror::Error;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::net::TcpStream;
12use tokio::process::Command;
13use tokio::sync::Mutex;
14
15#[derive(Debug, Error)]
17pub enum ExecutorError {
18 #[error("failed to spawn claude process (binary='{binary}', cwd='{cwd}'): {source}")]
19 SpawnFailed {
20 binary: String,
21 cwd: String,
22 source: std::io::Error,
23 },
24
25 #[error("failed to write MCP config to '{path}': {reason}")]
26 McpConfigWrite {
27 path: String,
28 reason: String,
29 },
30}
31
32fn mcp_host_port_from_url(url: &str) -> String {
36 let without_scheme = url
37 .trim_start_matches("https://")
38 .trim_start_matches("http://");
39 let host_port = without_scheme.split('/').next().unwrap_or("127.0.0.1:4444");
40 if host_port.contains(':') {
41 host_port.to_string()
42 } else {
43 format!("{}:4444", host_port)
44 }
45}
46
47pub async fn check_mcp_reachable() -> Result<(), String> {
55 let url = std::env::var("PLEXUS_MCP_URL")
56 .unwrap_or_else(|_| "http://127.0.0.1:4444/mcp".to_string());
57 let addr = mcp_host_port_from_url(&url);
58
59 match tokio::time::timeout(
60 std::time::Duration::from_secs(2),
61 TcpStream::connect(&addr),
62 )
63 .await
64 {
65 Ok(Ok(_)) => Ok(()),
66 Ok(Err(e)) => Err(format!(
67 "MCP server not reachable at {} ({}). \
68 Start the substrate without --no-mcp so the permission-prompt tool is available.",
69 url, e
70 )),
71 Err(_) => Err(format!(
72 "MCP server connection timed out at {}. \
73 Start the substrate without --no-mcp so the permission-prompt tool is available.",
74 url
75 )),
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct LaunchConfig {
82 pub query: String,
84 pub session_id: Option<String>,
86 pub fork_session: bool,
88 pub model: Model,
90 pub working_dir: String,
92 pub system_prompt: Option<String>,
94 pub mcp_config: Option<Value>,
96 pub permission_prompt_tool: Option<String>,
98 pub allowed_tools: Vec<String>,
100 pub disallowed_tools: Vec<String>,
102 pub max_turns: Option<i32>,
104 pub loopback_enabled: bool,
106 pub loopback_session_id: Option<String>,
108}
109
110impl Default for LaunchConfig {
111 fn default() -> Self {
112 Self {
113 query: String::new(),
114 session_id: None,
115 fork_session: false,
116 model: Model::Sonnet,
117 working_dir: ".".to_string(),
118 system_prompt: None,
119 mcp_config: None,
120 permission_prompt_tool: None,
121 allowed_tools: Vec::new(),
122 disallowed_tools: Vec::new(),
123 max_turns: None,
124 loopback_enabled: false,
125 loopback_session_id: None,
126 }
127 }
128}
129
130#[derive(Clone)]
132pub struct ClaudeCodeExecutor {
133 claude_path: String,
134}
135
136impl ClaudeCodeExecutor {
137 pub fn new() -> Self {
138 Self {
139 claude_path: Self::find_claude_binary().unwrap_or_else(|| "claude".to_string()),
140 }
141 }
142
143 pub fn with_path(path: String) -> Self {
144 Self { claude_path: path }
145 }
146
147 fn find_claude_binary() -> Option<String> {
149 let home = dirs::home_dir()?;
151
152 let candidates = [
153 home.join(".claude/local/claude"),
154 home.join(".npm/bin/claude"),
155 home.join(".bun/bin/claude"),
156 home.join(".local/bin/claude"),
157 PathBuf::from("/usr/local/bin/claude"),
158 PathBuf::from("/opt/homebrew/bin/claude"),
159 ];
160
161 for candidate in &candidates {
162 if candidate.exists() {
163 return candidate.to_str().map(|s| s.to_string());
164 }
165 }
166
167 which::which("claude")
169 .ok()
170 .and_then(|p| p.to_str().map(|s| s.to_string()))
171 }
172
173 fn build_args(&self, config: &LaunchConfig) -> Vec<String> {
175 let mut args = vec![
176 "--output-format".to_string(),
177 "stream-json".to_string(),
178 "--include-partial-messages".to_string(),
179 "--verbose".to_string(),
180 "--print".to_string(),
181 ];
182
183 if let Some(ref session_id) = config.session_id {
185 args.push("--resume".to_string());
186 args.push(session_id.clone());
187
188 if config.fork_session {
189 args.push("--fork-session".to_string());
190 }
191 }
192
193 args.push("--model".to_string());
195 args.push(config.model.as_str().to_string());
196
197 if let Some(max) = config.max_turns {
199 args.push("--max-turns".to_string());
200 args.push(max.to_string());
201 }
202
203 if let Some(ref prompt) = config.system_prompt {
205 args.push("--system-prompt".to_string());
206 args.push(prompt.clone());
207 }
208
209 if config.loopback_enabled {
211 args.push("--permission-prompt-tool".to_string());
212 args.push("mcp__plexus__loopback_permit".to_string());
213 } else if let Some(ref tool) = config.permission_prompt_tool {
214 args.push("--permission-prompt-tool".to_string());
215 args.push(tool.clone());
216 }
217
218 if !config.allowed_tools.is_empty() {
220 args.push("--allowedTools".to_string());
221 args.push(config.allowed_tools.join(","));
222 }
223
224 if !config.disallowed_tools.is_empty() {
226 args.push("--disallowedTools".to_string());
227 args.push(config.disallowed_tools.join(","));
228 }
229
230 args.push("--".to_string());
232 args.push(config.query.clone());
233
234 args
235 }
236
237 #[allow(dead_code)]
239 async fn write_mcp_config(&self, config: &Value) -> Result<String, String> {
240 let temp_dir = std::env::temp_dir();
241 let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
242
243 let json = serde_json::to_string_pretty(config)
244 .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
245
246 tokio::fs::write(&temp_path, json)
247 .await
248 .map_err(|e| format!("Failed to write MCP config: {}", e))?;
249
250 Ok(temp_path.to_string_lossy().to_string())
251 }
252
253 pub async fn launch(
255 &self,
256 config: LaunchConfig,
257 ) -> Pin<Box<dyn Stream<Item = RawClaudeEvent> + Send + 'static>> {
258 let mut args = self.build_args(&config);
259 let claude_path = self.claude_path.clone();
260 let working_dir = config.working_dir.clone();
261 let loopback_enabled = config.loopback_enabled;
262 let loopback_session_id = config.loopback_session_id.clone();
263
264 let mcp_config = if loopback_enabled {
266 let base_url = std::env::var("PLEXUS_MCP_URL")
267 .unwrap_or_else(|_| "http://127.0.0.1:4444/mcp".to_string());
268
269 let plexus_url = if let Some(ref sid) = loopback_session_id {
271 format!("{}?session_id={}", base_url, sid)
272 } else {
273 base_url
274 };
275
276 let loopback_mcp = if let Some(ref sid) = loopback_session_id {
277 serde_json::json!({
278 "mcpServers": {
279 "plexus": {
280 "type": "http",
281 "url": plexus_url
282 }
283 },
284 "env": {
285 "PLEXUS_SESSION_ID": sid
286 }
287 })
288 } else {
289 serde_json::json!({
290 "mcpServers": {
291 "plexus": {
292 "type": "http",
293 "url": plexus_url
294 }
295 }
296 })
297 };
298
299 match config.mcp_config {
301 Some(existing) => {
302 let mut merged = existing.clone();
304 if let (Some(existing_servers), Some(loopback_servers)) = (
305 merged.get_mut("mcpServers"),
306 loopback_mcp.get("mcpServers")
307 ) {
308 if let (Some(existing_obj), Some(loopback_obj)) = (
309 existing_servers.as_object_mut(),
310 loopback_servers.as_object()
311 ) {
312 for (k, v) in loopback_obj {
313 existing_obj.insert(k.clone(), v.clone());
314 }
315 }
316 } else {
317 merged["mcpServers"] = loopback_mcp["mcpServers"].clone();
318 }
319 Some(merged)
320 }
321 None => Some(loopback_mcp)
322 }
323 } else {
324 config.mcp_config.clone()
325 };
326
327 Box::pin(stream! {
328 macro_rules! yield_error {
329 ($err:expr) => {{
330 let err: ExecutorError = $err;
331 tracing::error!(error = %err, "Claude executor error");
332 yield RawClaudeEvent::Result {
333 subtype: Some("error".to_string()),
334 session_id: None,
335 cost_usd: None,
336 is_error: Some(true),
337 duration_ms: None,
338 num_turns: None,
339 result: None,
340 error: Some(err.to_string()),
341 };
342 }};
343 }
344
345 if loopback_enabled {
349 if let Err(e) = check_mcp_reachable().await {
350 yield RawClaudeEvent::Result {
351 subtype: Some("error".to_string()),
352 session_id: None,
353 cost_usd: None,
354 is_error: Some(true),
355 duration_ms: None,
356 num_turns: None,
357 result: None,
358 error: Some(e),
359 };
360 return;
361 }
362 }
363
364 let mcp_path = if let Some(ref mcp) = mcp_config {
366 match Self::write_mcp_config_sync(mcp) {
367 Ok(path) => {
368 if let Some(pos) = args.iter().position(|a| a == "--") {
370 args.insert(pos, path.clone());
371 args.insert(pos, "--mcp-config".to_string());
372 }
373 Some(path)
374 }
375 Err(e) => {
376 yield_error!(ExecutorError::McpConfigWrite {
377 path: std::env::temp_dir().to_string_lossy().to_string(),
378 reason: e,
379 });
380 return;
381 }
382 }
383 } else {
384 None
385 };
386
387 fn shell_escape(s: &str) -> String {
390 format!("'{}'", s.replace("'", "'\\''"))
392 }
393
394 let shell_cmd = format!(
395 "{} {}",
396 shell_escape(&claude_path),
397 args.iter()
398 .map(|a| shell_escape(a))
399 .collect::<Vec<_>>()
400 .join(" ")
401 );
402
403 tracing::debug!(cmd = %shell_cmd, "Launching Claude Code");
404
405 yield RawClaudeEvent::LaunchCommand { command: shell_cmd.clone() };
407
408 let mut cmd = Command::new("bash");
409 cmd.args(&["-c", &shell_cmd])
410 .current_dir(&working_dir)
411 .stdout(Stdio::piped())
412 .stderr(Stdio::piped())
413 .stdin(Stdio::null())
414 .env_remove("CLAUDECODE");
416
417 if loopback_enabled {
419 if let Some(ref session_id) = loopback_session_id {
420 cmd.env("PLEXUS_SESSION_ID", session_id);
421 }
422 }
423
424 let mut child = match cmd.spawn() {
425 Ok(c) => c,
426 Err(e) => {
427 yield_error!(ExecutorError::SpawnFailed {
428 binary: claude_path.clone(),
429 cwd: working_dir.clone(),
430 source: e,
431 });
432 return;
433 }
434 };
435
436 let stdout = child.stdout.take().expect("stdout");
437 let mut reader = BufReader::with_capacity(10 * 1024 * 1024, stdout).lines(); let stderr_buffer: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
441 let stderr = child.stderr.take().expect("stderr");
442 let stderr_buf = stderr_buffer.clone();
443 tokio::spawn(async move {
444 let mut stderr_reader = BufReader::new(stderr).lines();
445 while let Ok(Some(line)) = stderr_reader.next_line().await {
446 let mut buf = stderr_buf.lock().await;
447 if buf.len() < 100 {
448 buf.push(line);
449 }
450 }
451 });
452
453 while let Ok(Some(line)) = reader.next_line().await {
455 if line.trim().is_empty() {
456 continue;
457 }
458
459 match serde_json::from_str::<RawClaudeEvent>(&line) {
460 Ok(event) => {
461 let is_result = matches!(event, RawClaudeEvent::Result { .. });
462 yield event;
463 if is_result {
464 break;
465 }
466 }
467 Err(_) => {
468 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) {
470 let event_type = value.get("type")
471 .and_then(|t| t.as_str())
472 .unwrap_or("unknown_json")
473 .to_string();
474 yield RawClaudeEvent::Unknown {
475 event_type,
476 data: value,
477 };
478 } else {
479 yield RawClaudeEvent::Unknown {
481 event_type: "raw_output".to_string(),
482 data: serde_json::Value::String(line),
483 };
484 }
485 }
486 }
487 }
488
489 if let Some(stderr) = child.stderr.take() {
491 let mut stderr_reader = BufReader::new(stderr).lines();
492 while let Ok(Some(line)) = stderr_reader.next_line().await {
493 if !line.trim().is_empty() {
494 yield RawClaudeEvent::Stderr { text: line };
495 }
496 }
497 }
498
499 let _ = child.wait().await;
501
502 if let Some(path) = mcp_path {
503 let _ = tokio::fs::remove_file(path).await;
504 }
505 })
506 }
507
508 fn write_mcp_config_sync(config: &Value) -> Result<String, String> {
510 let temp_dir = std::env::temp_dir();
511 let temp_path = temp_dir.join(format!("mcp-config-{}.json", uuid::Uuid::new_v4()));
512
513 let json = serde_json::to_string_pretty(config)
514 .map_err(|e| format!("Failed to serialize MCP config: {}", e))?;
515
516 std::fs::write(&temp_path, json)
517 .map_err(|e| format!("Failed to write MCP config: {}", e))?;
518
519 Ok(temp_path.to_string_lossy().to_string())
520 }
521}
522
523impl Default for ClaudeCodeExecutor {
524 fn default() -> Self {
525 Self::new()
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532
533 #[test]
534 fn test_build_args_basic() {
535 let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
536 let config = LaunchConfig {
537 query: "hello".to_string(),
538 model: Model::Sonnet,
539 working_dir: "/tmp".to_string(),
540 ..Default::default()
541 };
542
543 let args = executor.build_args(&config);
544
545 assert!(args.contains(&"--output-format".to_string()));
546 assert!(args.contains(&"stream-json".to_string()));
547 assert!(args.contains(&"--model".to_string()));
548 assert!(args.contains(&"sonnet".to_string()));
549 assert!(args.contains(&"--".to_string()));
550 assert!(args.contains(&"hello".to_string()));
551 }
552
553 #[test]
554 fn test_build_args_with_resume() {
555 let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
556 let config = LaunchConfig {
557 query: "continue".to_string(),
558 session_id: Some("sess_123".to_string()),
559 model: Model::Haiku,
560 working_dir: "/tmp".to_string(),
561 ..Default::default()
562 };
563
564 let args = executor.build_args(&config);
565
566 assert!(args.contains(&"--resume".to_string()));
567 assert!(args.contains(&"sess_123".to_string()));
568 assert!(args.contains(&"haiku".to_string()));
569 }
570
571 #[test]
572 fn test_build_args_with_fork() {
573 let executor = ClaudeCodeExecutor::with_path("/usr/bin/claude".to_string());
574 let config = LaunchConfig {
575 query: "branch".to_string(),
576 session_id: Some("sess_123".to_string()),
577 fork_session: true,
578 model: Model::Opus,
579 working_dir: "/tmp".to_string(),
580 ..Default::default()
581 };
582
583 let args = executor.build_args(&config);
584
585 assert!(args.contains(&"--resume".to_string()));
586 assert!(args.contains(&"--fork-session".to_string()));
587 assert!(args.contains(&"opus".to_string()));
588 }
589}