claude_code_sdk/transport/
subprocess_cli.rs

1//! Subprocess transport implementation using Claude Code CLI.
2
3use futures::Stream;
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::process::Stdio;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::{Child, Command};
10use tokio_stream::{wrappers::LinesStream, StreamExt};
11use tracing::{debug, error, info, warn, instrument};
12use async_stream;
13
14use crate::{
15    errors::*,
16    types::{ClaudeCodeOptions, PermissionMode},
17    transport::Transport,
18    SafetyLimits, SafetyError,
19};
20
21/// Subprocess transport using Claude Code CLI
22pub struct SubprocessCLITransport {
23    prompt: String,
24    options: ClaudeCodeOptions,
25    cli_path: String,
26    cwd: Option<PathBuf>,
27    process: Option<Child>,
28    safety_limits: SafetyLimits,
29    json_buffer: String,
30}
31
32impl SubprocessCLITransport {
33    /// Create a new subprocess transport
34    #[instrument(level = "debug", skip(prompt, options))]
35    pub fn new(
36        prompt: &str,
37        options: ClaudeCodeOptions,
38        cli_path: Option<&str>,
39    ) -> Result<Self, ClaudeSDKError> {
40        info!("Creating new subprocess CLI transport");
41        debug!(
42            prompt_length = prompt.len(),
43            cli_path = cli_path,
44            cwd = ?options.cwd,
45            "Transport configuration"
46        );
47
48        let cli_path = if let Some(path) = cli_path {
49            debug!(provided_path = path, "Using provided CLI path");
50            path.to_string()
51        } else {
52            debug!("Searching for CLI path");
53            Self::find_cli()?
54        };
55        
56        let cwd = options.cwd.clone();
57        
58        info!(cli_path = %cli_path, "Successfully created subprocess transport");
59        Ok(Self {
60            prompt: prompt.to_string(),
61            options,
62            cli_path,
63            cwd,
64            process: None,
65            safety_limits: SafetyLimits::default(),
66            json_buffer: String::new(),
67        })
68    }
69    
70    /// Set custom safety limits for this transport
71    pub fn with_safety_limits(mut self, limits: SafetyLimits) -> Self {
72        info!(?limits, "Setting custom safety limits");
73        self.safety_limits = limits;
74        self
75    }
76    
77    /// Try to parse accumulated JSON buffer, handling multiline JSON
78    pub fn try_parse_json_buffer(&mut self) -> Option<Result<HashMap<String, serde_json::Value>, ClaudeSDKError>> {
79        if self.json_buffer.is_empty() {
80            return None;
81        }
82        
83        // Safety check: buffer size
84        let buffer_size = self.json_buffer.len();
85        if !self.safety_limits.is_line_size_safe(buffer_size) {
86            error!(
87                buffer_size = buffer_size,
88                limit = self.safety_limits.max_line_size,
89                "JSON buffer exceeds safety limit"
90            );
91            self.json_buffer.clear(); // Clear to prevent memory issues
92            return Some(Err(ClaudeSDKError::Safety(SafetyError::LineTooLarge {
93                actual: buffer_size,
94                limit: self.safety_limits.max_line_size,
95            })));
96        }
97        
98        debug!(
99            buffer_length = buffer_size,
100            buffer_preview = %self.safety_limits.safe_log_preview(&self.json_buffer),
101            "Attempting to parse JSON buffer"
102        );
103        
104        // Safe JSON parsing with timeout monitoring
105        let parse_start = std::time::Instant::now();
106        let parse_result = serde_json::from_str::<HashMap<String, serde_json::Value>>(&self.json_buffer);
107        let parse_duration = parse_start.elapsed();
108        
109        if parse_duration.as_millis() > self.safety_limits.json_parse_timeout_ms as u128 {
110            warn!(
111                duration_ms = parse_duration.as_millis(),
112                timeout_ms = self.safety_limits.json_parse_timeout_ms,
113                "JSON parsing took longer than expected"
114            );
115        }
116        
117        match parse_result {
118            Ok(data) => {
119                debug!(
120                    fields_count = data.len(),
121                    parse_duration_ms = parse_duration.as_millis(),
122                    buffer_length = buffer_size,
123                    "Successfully parsed multiline JSON message"
124                );
125                
126                // Check if this contains large text content
127                if let Some(message_obj) = data.get("message") {
128                    if let Some(content_arr) = message_obj.get("content").and_then(|c| c.as_array()) {
129                        for content_item in content_arr {
130                            if let Some(text) = content_item.get("text").and_then(|t| t.as_str()) {
131                                let text_size = text.len();
132                                if !self.safety_limits.is_text_block_safe(text_size) {
133                                    warn!(
134                                        text_size = text_size,
135                                        limit = self.safety_limits.max_text_block_size,
136                                        text_preview = %self.safety_limits.safe_log_preview(text),
137                                        "Large text block detected in multiline JSON"
138                                    );
139                                }
140                            }
141                        }
142                    }
143                }
144                
145                self.json_buffer.clear(); // Clear buffer after successful parse
146                Some(Ok(data))
147            }
148            Err(e) => {
149                // For incomplete JSON, we don't immediately error - we wait for more data
150                debug!(
151                    error = %e,
152                    buffer_preview = %self.safety_limits.safe_log_preview(&self.json_buffer),
153                    "JSON buffer not yet complete, waiting for more data"
154                );
155                None // Return None to indicate we need more data
156            }
157        }
158    }
159    
160    /// Process a single line and update JSON buffer state
161    pub fn process_line(&mut self, line: String) -> Option<Result<HashMap<String, serde_json::Value>, ClaudeSDKError>> {
162        let line = line.trim();
163        if line.is_empty() {
164            debug!("Skipping empty line");
165            return None;
166        }
167        
168        // Safety check: individual line size
169        let line_size = line.len();
170        if !self.safety_limits.is_line_size_safe(line_size) {
171            error!(
172                line_size = line_size,
173                limit = self.safety_limits.max_line_size,
174                "Single line exceeds safety limit"
175            );
176            return Some(Err(ClaudeSDKError::Safety(SafetyError::LineTooLarge {
177                actual: line_size,
178                limit: self.safety_limits.max_line_size,
179            })));
180        }
181        
182        debug!(line_length = line_size, "Processing line from subprocess");
183        
184        // Check if this line looks like the start of JSON
185        let looks_like_json_start = line.starts_with('{') || line.starts_with('[');
186        let looks_like_json_continuation = !self.json_buffer.is_empty();
187        
188        if looks_like_json_start && self.json_buffer.is_empty() {
189            // Starting a new JSON object/array
190            debug!("Starting new JSON buffer");
191            self.json_buffer = line.to_string();
192        } else if looks_like_json_continuation {
193            // Continuing a JSON object/array
194            debug!("Appending to existing JSON buffer");
195            self.json_buffer.push('\n');
196            self.json_buffer.push_str(line);
197        } else if !looks_like_json_start {
198            // Non-JSON line, log and skip
199            debug!(
200                line_preview = %self.safety_limits.safe_log_preview(line),
201                "Skipping non-JSON line"
202            );
203            return None;
204        }
205        
206        // Try to parse the current buffer
207        if let Some(result) = self.try_parse_json_buffer() {
208            return Some(result);
209        }
210        
211        // Check if buffer is getting too large without successful parse
212        if self.json_buffer.len() > self.safety_limits.max_line_size / 2 {
213            warn!(
214                buffer_size = self.json_buffer.len(),
215                max_size = self.safety_limits.max_line_size,
216                "JSON buffer growing large without successful parse, might be malformed"
217            );
218        }
219        
220        None // No complete JSON yet, continue accumulating
221    }
222
223    /// Find Claude Code CLI binary
224    #[instrument(level = "debug")]
225    fn find_cli() -> Result<String, ClaudeSDKError> {
226        debug!("Searching for Claude Code CLI binary");
227        
228        // Check if claude is in PATH
229        debug!("Checking PATH for 'claude' executable");
230        if let Ok(path) = which::which("claude") {
231            let path_str = path.to_string_lossy().to_string();
232            info!(path = %path_str, "Found Claude CLI in PATH");
233            return Ok(path_str);
234        }
235        debug!("Claude CLI not found in PATH");
236
237        // Check common locations
238        let home_dir = home::home_dir().unwrap_or_else(|| PathBuf::from("/"));
239        debug!(home_dir = %home_dir.display(), "Using home directory");
240        
241        let locations = vec![
242            home_dir.join(".npm-global/bin/claude"),
243            PathBuf::from("/usr/local/bin/claude"),
244            home_dir.join(".local/bin/claude"),
245            home_dir.join("node_modules/.bin/claude"),
246            home_dir.join(".yarn/bin/claude"),
247        ];
248
249        debug!(locations_count = locations.len(), "Checking common installation locations");
250        for path in &locations {
251            debug!(path = %path.display(), "Checking location");
252            if path.exists() && path.is_file() {
253                let path_str = path.to_string_lossy().to_string();
254                info!(path = %path_str, "Found Claude CLI at common location");
255                return Ok(path_str);
256            }
257        }
258        debug!("Claude CLI not found in common locations");
259
260        // Check if Node.js is installed
261        debug!("Checking if Node.js is available");
262        let node_installed = which::which("node").is_ok();
263
264        if !node_installed {
265            error!("Node.js is not installed");
266            let error_msg = "Claude Code requires Node.js, which is not installed.\n\n\
267                           Install Node.js from: https://nodejs.org/\n\
268                           \nAfter installing Node.js, install Claude Code:\n\
269                           npm install -g @anthropic-ai/claude-code";
270            return Err(ClaudeSDKError::CLINotFound(CLINotFoundError::new(error_msg)));
271        }
272        debug!("Node.js is available");
273
274        error!("Claude Code CLI not found in any location");
275        let error_msg = "Claude Code not found. Install with:\n\
276                        npm install -g @anthropic-ai/claude-code\n\
277                        \nIf already installed locally, try:\n\
278                        export PATH=\"$HOME/node_modules/.bin:$PATH\"\n\
279                        \nOr specify the path when creating transport";
280        Err(ClaudeSDKError::CLINotFound(CLINotFoundError::new(error_msg)))
281    }
282
283    /// Build CLI command with arguments
284    #[instrument(level = "trace", skip(self))]
285    fn build_command(&self) -> Vec<String> {
286        debug!("Building CLI command with arguments");
287        let mut cmd = vec![
288            self.cli_path.clone(),
289            "--output-format".to_string(),
290            "stream-json".to_string(),
291            "--verbose".to_string(),
292        ];
293
294        if let Some(system_prompt) = &self.options.system_prompt {
295            debug!(system_prompt_length = system_prompt.len(), "Adding system prompt");
296            cmd.extend(["--system-prompt".to_string(), system_prompt.clone()]);
297        }
298
299        if let Some(append_system_prompt) = &self.options.append_system_prompt {
300            debug!(append_system_prompt_length = append_system_prompt.len(), "Adding append system prompt");
301            cmd.extend(["--append-system-prompt".to_string(), append_system_prompt.clone()]);
302        }
303
304        if !self.options.allowed_tools.is_empty() {
305            debug!(allowed_tools = ?self.options.allowed_tools, "Adding allowed tools");
306            cmd.extend([
307                "--allowedTools".to_string(),
308                self.options.allowed_tools.join(","),
309            ]);
310        }
311
312        if let Some(max_turns) = self.options.max_turns {
313            debug!(max_turns, "Adding max turns limit");
314            cmd.extend(["--max-turns".to_string(), max_turns.to_string()]);
315        }
316
317        if !self.options.disallowed_tools.is_empty() {
318            debug!(disallowed_tools = ?self.options.disallowed_tools, "Adding disallowed tools");
319            cmd.extend([
320                "--disallowedTools".to_string(),
321                self.options.disallowed_tools.join(","),
322            ]);
323        }
324
325        if let Some(model) = &self.options.model {
326            debug!(model = %model, "Adding model specification");
327            cmd.extend(["--model".to_string(), model.clone()]);
328        }
329
330        if let Some(permission_prompt_tool_name) = &self.options.permission_prompt_tool_name {
331            debug!(tool_name = %permission_prompt_tool_name, "Adding permission prompt tool");
332            cmd.extend([
333                "--permission-prompt-tool".to_string(),
334                permission_prompt_tool_name.clone(),
335            ]);
336        }
337
338        if let Some(permission_mode) = &self.options.permission_mode {
339            let mode_str = match permission_mode {
340                PermissionMode::Default => "default",
341                PermissionMode::AcceptEdits => "acceptEdits",
342                PermissionMode::BypassPermissions => "bypassPermissions",
343            };
344            debug!(permission_mode = mode_str, "Adding permission mode");
345            cmd.extend(["--permission-mode".to_string(), mode_str.to_string()]);
346        }
347
348        if self.options.continue_conversation {
349            debug!("Adding continue conversation flag");
350            cmd.push("--continue".to_string());
351        }
352
353        if let Some(resume) = &self.options.resume {
354            debug!(resume = %resume, "Adding resume option");
355            cmd.extend(["--resume".to_string(), resume.clone()]);
356        }
357
358        if !self.options.mcp_servers.is_empty() {
359            debug!(mcp_servers_count = self.options.mcp_servers.len(), "Adding MCP servers configuration");
360            let mcp_config = serde_json::json!({
361                "mcpServers": self.options.mcp_servers
362            });
363            cmd.extend([
364                "--mcp-config".to_string(),
365                mcp_config.to_string(),
366            ]);
367        }
368
369        cmd.extend(["--print".to_string(), self.prompt.clone()]);
370        debug!(total_args = cmd.len(), "Built complete CLI command");
371        cmd
372    }
373}
374
375#[async_trait::async_trait]
376impl Transport for SubprocessCLITransport {
377    /// Start subprocess
378    #[instrument(level = "info", skip(self))]
379    async fn connect(&mut self) -> Result<(), ClaudeSDKError> {
380        if self.process.is_some() {
381            debug!("Process already connected, skipping connection");
382            return Ok(());
383        }
384
385        info!("Starting Claude CLI subprocess");
386        let cmd_args = self.build_command();
387        debug!(args_count = cmd_args.len(), "Built command arguments");
388
389        let mut command = Command::new(&cmd_args[0]);
390        command
391            .args(&cmd_args[1..])
392            .stdin(Stdio::null())
393            .stdout(Stdio::piped())
394            .stderr(Stdio::piped())
395            .env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
396
397        if let Some(cwd) = &self.cwd {
398            debug!(cwd = %cwd.display(), "Setting working directory");
399            command.current_dir(cwd);
400        }
401
402        debug!("Spawning subprocess");
403        let process = command.spawn().map_err(|e| {
404            if e.kind() == std::io::ErrorKind::NotFound {
405                error!(
406                    error = %e,
407                    cli_path = %self.cli_path,
408                    "Claude Code CLI not found"
409                );
410                ClaudeSDKError::CLINotFound(CLINotFoundError::with_path(
411                    "Claude Code not found at",
412                    &self.cli_path,
413                ))
414            } else {
415                error!(error = %e, "Failed to spawn Claude Code subprocess");
416                ClaudeSDKError::CLIConnection(CLIConnectionError::new(format!(
417                    "Failed to start Claude Code: {}",
418                    e
419                )))
420            }
421        })?;
422
423        info!(pid = process.id(), "Successfully started Claude CLI subprocess");
424        self.process = Some(process);
425        Ok(())
426    }
427
428    /// Terminate subprocess
429    #[instrument(level = "info", skip(self))]
430    async fn disconnect(&mut self) -> Result<(), ClaudeSDKError> {
431        if let Some(mut process) = self.process.take() {
432            info!(pid = process.id(), "Disconnecting from Claude CLI subprocess");
433            
434            // Check if process has already finished
435            if let Ok(Some(status)) = process.try_wait() {
436                if status.success() {
437                    info!("Process already finished successfully");
438                } else {
439                    warn!(exit_code = status.code(), "Process already finished with error");
440                }
441                return Ok(());
442            }
443
444            // Try graceful termination
445            debug!("Killing subprocess");
446            if let Err(e) = process.kill().await {
447                warn!(error = %e, "Failed to kill subprocess (might have already exited)");
448            }
449            
450            debug!("Waiting for subprocess to exit");
451            match process.wait().await {
452                Ok(status) => {
453                    if status.success() {
454                        info!("Subprocess terminated successfully");
455                    } else {
456                        warn!(exit_code = status.code(), "Subprocess terminated with error");
457                    }
458                }
459                Err(e) => {
460                    warn!(error = %e, "Error waiting for subprocess to terminate");
461                }
462            }
463        } else {
464            debug!("No active subprocess to disconnect");
465        }
466        Ok(())
467    }
468
469    /// Receive messages from CLI
470    #[instrument(level = "debug", skip(self))]
471    fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<HashMap<String, serde_json::Value>, ClaudeSDKError>> + Send + '_>> {
472        if let Some(process) = &mut self.process {
473            if let Some(stdout) = process.stdout.take() {
474                debug!("Setting up message stream from subprocess stdout");
475                let reader = BufReader::new(stdout);
476                let mut lines_stream = LinesStream::new(reader.lines());
477                
478                // We need to use a different approach for multiline JSON parsing
479                // Since we need mutable access to self for the buffer, we can't use map directly
480                let stream = async_stream::stream! {
481                    while let Some(line_result) = lines_stream.next().await {
482                        match line_result {
483                            Ok(line) => {
484                                if let Some(result) = self.process_line(line) {
485                                    yield result;
486                                }
487                                // If process_line returns None, we're still accumulating JSON
488                            }
489                            Err(e) => {
490                                error!(error = %e, "Error reading line from subprocess stdout");
491                                yield Err(ClaudeSDKError::Io(e));
492                            }
493                        }
494                    }
495                    
496                    // Handle any remaining buffer content when stream ends
497                    if !self.json_buffer.is_empty() {
498                        warn!(
499                            buffer_length = self.json_buffer.len(),
500                            buffer_preview = %self.safety_limits.safe_log_preview(&self.json_buffer),
501                            "Stream ended with incomplete JSON buffer"
502                        );
503                        // Try to parse whatever we have as a final attempt
504                        if let Some(result) = self.try_parse_json_buffer() {
505                            yield result;
506                        } else {
507                            // If it still doesn't parse, it's malformed JSON
508                            let error = ClaudeSDKError::CLIJSONDecode(
509                                CLIJSONDecodeError::new(
510                                    &self.json_buffer,
511                                    serde_json::Error::io(std::io::Error::new(
512                                        std::io::ErrorKind::InvalidData,
513                                        "Incomplete JSON at end of stream"
514                                    ))
515                                )
516                            );
517                            yield Err(error);
518                            self.json_buffer.clear();
519                        }
520                    }
521                };
522                
523                return Box::pin(stream);
524            } else {
525                warn!("No stdout available from subprocess");
526            }
527        } else {
528            warn!("No active subprocess to receive messages from");
529        }
530        
531        // Return empty stream if no process or stdout
532        debug!("Returning empty message stream");
533        Box::pin(tokio_stream::empty())
534    }
535
536    /// Check if subprocess is running
537    #[instrument(level = "trace", skip(self))]
538    fn is_connected(&self) -> bool {
539        let is_connected = if let Some(_process) = &self.process {
540            // We can't call try_wait on an immutable reference
541            // For now, just assume connected if process exists
542            // In a real implementation, we'd need better state tracking
543            true
544        } else {
545            false
546        };
547        debug!(is_connected, "Checked connection status");
548        is_connected
549    }
550}