Skip to main content

cc_sdk/transport/
subprocess.rs

1//! Subprocess-based transport implementation
2//!
3//! This module implements the Transport trait using a subprocess to run the Claude CLI.
4
5use super::{InputMessage, Transport, TransportState};
6use crate::{
7    errors::{Result, SdkError},
8    types::{ClaudeCodeOptions, ControlRequest, ControlResponse, Message, PermissionMode},
9};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::process::Stdio;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, warn};
19
20/// Default buffer size for channels
21const CHANNEL_BUFFER_SIZE: usize = 100;
22
23/// Minimum required CLI version
24const MIN_CLI_VERSION: (u32, u32, u32) = (2, 0, 0);
25
26/// Simple semantic version struct
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
28struct SemVer {
29    major: u32,
30    minor: u32,
31    patch: u32,
32}
33
34impl SemVer {
35    fn new(major: u32, minor: u32, patch: u32) -> Self {
36        Self {
37            major,
38            minor,
39            patch,
40        }
41    }
42
43    /// Parse semantic version from string (e.g., "2.0.0" or "v2.0.0")
44    fn parse(version: &str) -> Option<Self> {
45        let version = version.trim().trim_start_matches('v');
46
47        // Handle versions like "@anthropic-ai/claude-code/2.0.0"
48        let version = if let Some(v) = version.split('/').next_back() {
49            v
50        } else {
51            version
52        };
53
54        let parts: Vec<&str> = version.split('.').collect();
55        if parts.len() < 2 {
56            return None;
57        }
58
59        Some(Self {
60            major: parts[0].parse().ok()?,
61            minor: parts.get(1)?.parse().ok()?,
62            patch: parts.get(2).and_then(|p| p.parse().ok()).unwrap_or(0),
63        })
64    }
65}
66
67/// Subprocess-based transport for Claude CLI
68pub struct SubprocessTransport {
69    /// Configuration options
70    options: ClaudeCodeOptions,
71    /// CLI binary path
72    cli_path: PathBuf,
73    /// Child process
74    child: Option<Child>,
75    /// Sender for stdin
76    stdin_tx: Option<mpsc::Sender<String>>,
77    /// Sender for broadcasting messages to multiple receivers
78    message_broadcast_tx: Option<tokio::sync::broadcast::Sender<Message>>,
79    /// Receiver for control responses
80    control_rx: Option<mpsc::Receiver<ControlResponse>>,
81    /// Receiver for SDK control requests
82    sdk_control_rx: Option<mpsc::Receiver<serde_json::Value>>,
83    /// Transport state
84    state: TransportState,
85    /// Request counter for control requests
86    request_counter: u64,
87    /// Whether to close stdin after initial prompt
88    #[allow(dead_code)]
89    close_stdin_after_prompt: bool,
90}
91
92impl SubprocessTransport {
93    /// Create a new subprocess transport
94    pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
95        let cli_path = find_claude_cli()?;
96        Ok(Self {
97            options,
98            cli_path,
99            child: None,
100            stdin_tx: None,
101            message_broadcast_tx: None,
102            control_rx: None,
103            sdk_control_rx: None,
104            state: TransportState::Disconnected,
105            request_counter: 0,
106            close_stdin_after_prompt: false,
107        })
108    }
109
110    /// Create a new subprocess transport with async initialization
111    ///
112    /// This version supports auto-downloading the CLI if `auto_download_cli` is enabled
113    /// in the options and the CLI is not found.
114    pub async fn new_async(options: ClaudeCodeOptions) -> Result<Self> {
115        let cli_path = match find_claude_cli() {
116            Ok(path) => path,
117            Err(_) if options.auto_download_cli => {
118                info!("Claude CLI not found, attempting automatic download...");
119                crate::cli_download::download_cli(None, None).await?
120            }
121            Err(e) => return Err(e),
122        };
123
124        Ok(Self {
125            options,
126            cli_path,
127            child: None,
128            stdin_tx: None,
129            message_broadcast_tx: None,
130            control_rx: None,
131            sdk_control_rx: None,
132            state: TransportState::Disconnected,
133            request_counter: 0,
134            close_stdin_after_prompt: false,
135        })
136    }
137
138    fn build_settings_value(&self) -> Option<String> {
139        let has_settings = self.options.settings.is_some();
140        let has_sandbox = self.options.sandbox.is_some();
141
142        if !has_settings && !has_sandbox {
143            return None;
144        }
145
146        // If only settings path and no sandbox, pass through as-is
147        if has_settings && !has_sandbox {
148            return self.options.settings.clone();
149        }
150
151        // If we have sandbox settings, merge into a JSON object (Python parity)
152        let mut settings_obj = serde_json::Map::new();
153
154        if let Some(ref settings) = self.options.settings {
155            let settings_str = settings.trim();
156
157            let load_as_json_string =
158                |s: &str| -> Option<serde_json::Map<String, serde_json::Value>> {
159                match serde_json::from_str::<serde_json::Value>(s) {
160                    Ok(serde_json::Value::Object(map)) => Some(map),
161                    Ok(_) => {
162                        warn!("Settings JSON must be an object; ignoring provided JSON settings");
163                        None
164                    }
165                    Err(_) => None,
166                }
167            };
168
169            let load_from_file =
170                |path: &Path| -> Option<serde_json::Map<String, serde_json::Value>> {
171                let content = std::fs::read_to_string(path).ok()?;
172                match serde_json::from_str::<serde_json::Value>(&content) {
173                    Ok(serde_json::Value::Object(map)) => Some(map),
174                    Ok(_) => {
175                        warn!("Settings file JSON must be an object: {}", path.display());
176                        None
177                    }
178                    Err(e) => {
179                        warn!("Failed to parse settings file {}: {}", path.display(), e);
180                        None
181                    }
182                }
183            };
184
185            if settings_str.starts_with('{') && settings_str.ends_with('}') {
186                if let Some(map) = load_as_json_string(settings_str) {
187                    settings_obj = map;
188                } else {
189                    warn!(
190                        "Failed to parse settings as JSON, treating as file path: {}",
191                        settings_str
192                    );
193                    let settings_path = Path::new(settings_str);
194                    if settings_path.exists() {
195                        if let Some(map) = load_from_file(settings_path) {
196                            settings_obj = map;
197                        }
198                    } else {
199                        warn!("Settings file not found: {}", settings_path.display());
200                    }
201                }
202            } else {
203                let settings_path = Path::new(settings_str);
204                if settings_path.exists() {
205                    if let Some(map) = load_from_file(settings_path) {
206                        settings_obj = map;
207                    }
208                } else {
209                    warn!("Settings file not found: {}", settings_path.display());
210                }
211            }
212        }
213
214        if let Some(ref sandbox) = self.options.sandbox {
215            match serde_json::to_value(sandbox) {
216                Ok(value) => {
217                    settings_obj.insert("sandbox".to_string(), value);
218                }
219                Err(e) => {
220                    warn!("Failed to serialize sandbox settings: {}", e);
221                }
222            }
223        }
224
225        Some(serde_json::Value::Object(settings_obj).to_string())
226    }
227    
228    /// Subscribe to messages without borrowing self (for lock-free consumption)
229    pub fn subscribe_messages(&self) -> Option<Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>> {
230        self.message_broadcast_tx.as_ref().map(|tx| {
231            let rx = tx.subscribe();
232            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
233                |result| async move {
234                    match result {
235                        Ok(msg) => Some(Ok(msg)),
236                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
237                            warn!("Receiver lagged by {} messages", n);
238                            None
239                        }
240                    }
241                },
242            )) as Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>
243        })
244    }
245
246    /// Receive SDK control requests
247    #[allow(dead_code)]
248    pub async fn receive_sdk_control_request(&mut self) -> Option<serde_json::Value> {
249        if let Some(ref mut rx) = self.sdk_control_rx {
250            rx.recv().await
251        } else {
252            None
253        }
254    }
255    
256    /// Take the SDK control receiver (can only be called once)
257    pub fn take_sdk_control_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
258        self.sdk_control_rx.take()
259    }
260
261    /// Create with a specific CLI path
262    pub fn with_cli_path(options: ClaudeCodeOptions, cli_path: impl Into<PathBuf>) -> Self {
263        Self {
264            options,
265            cli_path: cli_path.into(),
266            child: None,
267            stdin_tx: None,
268            message_broadcast_tx: None,
269            control_rx: None,
270            sdk_control_rx: None,
271            state: TransportState::Disconnected,
272            request_counter: 0,
273            close_stdin_after_prompt: false,
274        }
275    }
276
277    /// Set whether to close stdin after sending the initial prompt
278    #[allow(dead_code)]
279    pub fn set_close_stdin_after_prompt(&mut self, close: bool) {
280        self.close_stdin_after_prompt = close;
281    }
282
283    /// Create transport for simple print mode (one-shot query)
284    #[allow(dead_code)]
285    pub fn for_print_mode(options: ClaudeCodeOptions, _prompt: String) -> Result<Self> {
286        let cli_path = find_claude_cli()?;
287        Ok(Self {
288            options,
289            cli_path,
290            child: None,
291            stdin_tx: None,
292            message_broadcast_tx: None,
293            control_rx: None,
294            sdk_control_rx: None,
295            state: TransportState::Disconnected,
296            request_counter: 0,
297            close_stdin_after_prompt: true,
298        })
299    }
300
301    /// Build the command with all necessary arguments
302    fn build_command(&self) -> Command {
303        let mut cmd = Command::new(&self.cli_path);
304
305        // Always use output-format stream-json and verbose (like Python SDK)
306        cmd.arg("--output-format").arg("stream-json");
307        cmd.arg("--verbose");
308
309        // For streaming/interactive mode, also add input-format stream-json
310        cmd.arg("--input-format").arg("stream-json");
311        
312        // Include partial messages if requested
313        if self.options.include_partial_messages {
314            cmd.arg("--include-partial-messages");
315        }
316        
317        // Add debug-to-stderr flag if debug_stderr is set
318        if self.options.debug_stderr.is_some() {
319            cmd.arg("--debug-to-stderr");
320        }
321        
322        // Handle max_output_tokens (priority: option > env var)
323        // Maximum safe value is 32000, values above this may cause issues
324        if let Some(max_tokens) = self.options.max_output_tokens {
325            // Option takes priority - validate and cap at 32000
326            let capped = max_tokens.clamp(1, 32000);
327            cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
328            debug!("Setting max_output_tokens from option: {}", capped);
329        } else {
330            // Fall back to environment variable handling
331            if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
332                if let Ok(tokens) = current_value.parse::<u32>() {
333                    if tokens > 32000 {
334                        warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
335                        cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
336                    }
337                    // If it's <= 32000, leave it as is
338                } else {
339                    // Invalid value, set to safe default
340                    warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
341                    cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
342                }
343            }
344        }
345
346        // System prompts (match Python SDK behavior)
347        //
348        // Python always passes `--system-prompt ""` when `system_prompt` is None.
349        if let Some(ref prompt_v2) = self.options.system_prompt_v2 {
350            match prompt_v2 {
351                crate::types::SystemPrompt::String(s) => {
352                    cmd.arg("--system-prompt").arg(s);
353                }
354                crate::types::SystemPrompt::Preset { append, .. } => {
355                    // Python only uses preset prompts to optionally append to the default preset.
356                    // It does not pass a preset selector flag to the CLI.
357                    if let Some(append_text) = append {
358                        cmd.arg("--append-system-prompt").arg(append_text);
359                    }
360                }
361            }
362        } else {
363            // Fallback to deprecated fields for backward compatibility
364            #[allow(deprecated)]
365            match self.options.system_prompt.as_deref() {
366                Some(prompt) => {
367                    cmd.arg("--system-prompt").arg(prompt);
368                }
369                None => {
370                    cmd.arg("--system-prompt").arg("");
371                }
372            }
373            #[allow(deprecated)]
374            if let Some(ref prompt) = self.options.append_system_prompt {
375                cmd.arg("--append-system-prompt").arg(prompt);
376            }
377        }
378
379        // Tool configuration
380        if !self.options.allowed_tools.is_empty() {
381            cmd.arg("--allowedTools")
382                .arg(self.options.allowed_tools.join(","));
383        }
384        if !self.options.disallowed_tools.is_empty() {
385            cmd.arg("--disallowedTools")
386                .arg(self.options.disallowed_tools.join(","));
387        }
388
389        // Permission mode
390        match self.options.permission_mode {
391            PermissionMode::Default => {
392                cmd.arg("--permission-mode").arg("default");
393            }
394            PermissionMode::AcceptEdits => {
395                cmd.arg("--permission-mode").arg("acceptEdits");
396            }
397            PermissionMode::Plan => {
398                cmd.arg("--permission-mode").arg("plan");
399            }
400            PermissionMode::BypassPermissions => {
401                cmd.arg("--permission-mode").arg("bypassPermissions");
402            }
403        }
404
405        // Model
406        if let Some(ref model) = self.options.model {
407            cmd.arg("--model").arg(model);
408        }
409
410        // Permission prompt tool
411        if let Some(ref tool_name) = self.options.permission_prompt_tool_name {
412            cmd.arg("--permission-prompt-tool").arg(tool_name);
413        }
414
415        // Max turns
416        if let Some(max_turns) = self.options.max_turns {
417            cmd.arg("--max-turns").arg(max_turns.to_string());
418        }
419
420        // Max thinking tokens (extended thinking budget)
421        if let Some(max_thinking_tokens) = self.options.max_thinking_tokens {
422            if max_thinking_tokens > 0 {
423                cmd.arg("--max-thinking-tokens")
424                    .arg(max_thinking_tokens.to_string());
425            }
426        }
427
428        // Working directory
429        if let Some(ref cwd) = self.options.cwd {
430            cmd.current_dir(cwd);
431        }
432        
433        // Add environment variables
434        for (key, value) in &self.options.env {
435            cmd.env(key, value);
436        }
437
438        // MCP servers - use --mcp-config with JSON format like Python SDK
439        if !self.options.mcp_servers.is_empty() {
440            let mcp_config = serde_json::json!({
441                "mcpServers": self.options.mcp_servers
442            });
443            cmd.arg("--mcp-config").arg(mcp_config.to_string());
444        }
445
446        // Continue/resume
447        if self.options.continue_conversation {
448            cmd.arg("--continue");
449        }
450        if let Some(ref resume_id) = self.options.resume {
451            cmd.arg("--resume").arg(resume_id);
452        }
453
454        // Settings value (merge sandbox into settings if provided)
455        if let Some(settings_value) = self.build_settings_value() {
456            cmd.arg("--settings").arg(settings_value);
457        }
458
459        // Additional directories
460        for dir in &self.options.add_dirs {
461            cmd.arg("--add-dir").arg(dir);
462        }
463
464        // Fork session if requested
465        if self.options.fork_session {
466            cmd.arg("--fork-session");
467        }
468
469        // ========== Phase 3 CLI args (Python SDK v0.1.12+ sync) ==========
470
471        // Tools configuration (base set of tools)
472        if let Some(ref tools) = self.options.tools {
473            match tools {
474                crate::types::ToolsConfig::List(list) => {
475                    if list.is_empty() {
476                        cmd.arg("--tools").arg("");
477                    } else {
478                        cmd.arg("--tools").arg(list.join(","));
479                    }
480                }
481                crate::types::ToolsConfig::Preset(_preset) => {
482                    // Preset object - 'claude_code' preset maps to 'default'
483                    cmd.arg("--tools").arg("default");
484                }
485            }
486        }
487
488        // SDK betas
489        if !self.options.betas.is_empty() {
490            let betas: Vec<String> = self.options.betas.iter().map(|b| b.to_string()).collect();
491            cmd.arg("--betas").arg(betas.join(","));
492        }
493
494        // Max budget USD
495        if let Some(budget) = self.options.max_budget_usd {
496            cmd.arg("--max-budget-usd").arg(budget.to_string());
497        }
498
499        // Fallback model
500        if let Some(ref fallback) = self.options.fallback_model {
501            cmd.arg("--fallback-model").arg(fallback);
502        }
503
504        // File checkpointing
505        if self.options.enable_file_checkpointing {
506            cmd.env("CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", "true");
507        }
508
509        // Output format for structured outputs (json_schema only)
510        if let Some(ref format) = self.options.output_format
511            && format.get("type").and_then(|v| v.as_str()) == Some("json_schema")
512            && let Some(schema) = format.get("schema")
513            && let Ok(schema_json) = serde_json::to_string(schema)
514        {
515            cmd.arg("--json-schema").arg(schema_json);
516        }
517
518        // Plugin directories
519        for plugin in &self.options.plugins {
520            match plugin {
521                crate::types::SdkPluginConfig::Local { path } => {
522                    cmd.arg("--plugin-dir").arg(path);
523                }
524            }
525        }
526
527        // Programmatic agents
528        if let Some(ref agents) = self.options.agents
529            && !agents.is_empty()
530                && let Ok(json_str) = serde_json::to_string(agents) {
531                    cmd.arg("--agents").arg(json_str);
532                }
533
534        // Setting sources (comma-separated). Always pass a value for SDK parity with Python.
535        let sources_value = self
536            .options
537            .setting_sources
538            .as_ref()
539            .map(|sources| {
540                sources
541                    .iter()
542                    .map(|s| match s {
543                        crate::types::SettingSource::User => "user",
544                        crate::types::SettingSource::Project => "project",
545                        crate::types::SettingSource::Local => "local",
546                    })
547                    .collect::<Vec<_>>()
548                    .join(",")
549            })
550            .unwrap_or_default();
551        cmd.arg("--setting-sources").arg(sources_value);
552
553        // Extra arguments
554        for (key, value) in &self.options.extra_args {
555            let flag = if key.starts_with("--") || key.starts_with("-") {
556                key.clone()
557            } else {
558                format!("--{key}")
559            };
560            cmd.arg(&flag);
561            if let Some(val) = value {
562                cmd.arg(val);
563            }
564        }
565
566        // Set up process pipes
567        cmd.stdin(Stdio::piped())
568            .stdout(Stdio::piped())
569            .stderr(Stdio::piped());
570
571        // Set environment variables to indicate SDK usage and version
572        cmd.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
573        cmd.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
574
575        // Debug log the full command being executed
576        debug!(
577            "Executing Claude CLI command: {} {:?}",
578            self.cli_path.display(),
579            cmd.as_std().get_args().collect::<Vec<_>>()
580        );
581
582        cmd
583    }
584
585    /// Check CLI version and warn if below minimum required version
586    async fn check_cli_version(&self) -> Result<()> {
587        // Run the CLI with --version flag (with a timeout to avoid hanging)
588        let output = tokio::time::timeout(
589            std::time::Duration::from_secs(5),
590            tokio::process::Command::new(&self.cli_path)
591                .arg("--version")
592                .output(),
593        )
594        .await;
595
596        let output = match output {
597            Ok(Ok(output)) => output,
598            Ok(Err(e)) => {
599                warn!("Failed to check CLI version: {}", e);
600                return Ok(()); // Don't fail connection, just warn
601            }
602            Err(_) => {
603                warn!("CLI version check timed out after 5 seconds");
604                return Ok(());
605            }
606        };
607
608        let version_str = String::from_utf8_lossy(&output.stdout);
609        let version_str = version_str.trim();
610
611        if let Some(semver) = SemVer::parse(version_str) {
612            let min_version = SemVer::new(MIN_CLI_VERSION.0, MIN_CLI_VERSION.1, MIN_CLI_VERSION.2);
613
614            if semver < min_version {
615                warn!(
616                    "⚠️  Claude CLI version {}.{}.{} is below minimum required version {}.{}.{}",
617                    semver.major,
618                    semver.minor,
619                    semver.patch,
620                    MIN_CLI_VERSION.0,
621                    MIN_CLI_VERSION.1,
622                    MIN_CLI_VERSION.2
623                );
624                warn!(
625                    "   Some features may not work correctly. Please upgrade with: npm install -g @anthropic-ai/claude-code@latest"
626                );
627            } else {
628                info!("Claude CLI version: {}.{}.{}", semver.major, semver.minor, semver.patch);
629            }
630        } else {
631            debug!("Could not parse CLI version: {}", version_str);
632        }
633
634        Ok(())
635    }
636
637    /// Spawn the process and set up communication channels
638    async fn spawn_process(&mut self) -> Result<()> {
639        self.state = TransportState::Connecting;
640
641        let mut cmd = self.build_command();
642        info!("Starting Claude CLI with command: {:?}", cmd);
643
644        if let Some(user) = self.options.user.as_deref() {
645            apply_process_user(&mut cmd, user)?;
646        }
647
648        let mut child = cmd.spawn().map_err(|e| {
649            error!("Failed to spawn Claude CLI: {}", e);
650            SdkError::ProcessError(e)
651        })?;
652
653        // Get stdio handles
654        let stdin = child
655            .stdin
656            .take()
657            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdin".into()))?;
658        let stdout = child
659            .stdout
660            .take()
661            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdout".into()))?;
662        let stderr = child
663            .stderr
664            .take()
665            .ok_or_else(|| SdkError::ConnectionError("Failed to get stderr".into()))?;
666
667        // Determine buffer size from options or use default
668        let buffer_size = self.options.cli_channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE);
669
670        // Create channels
671        let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(buffer_size);
672        // Use broadcast channel for messages to support multiple receivers
673        let (message_broadcast_tx, _) =
674            tokio::sync::broadcast::channel::<Message>(buffer_size);
675        let (control_tx, control_rx) = mpsc::channel::<ControlResponse>(buffer_size);
676
677        // Spawn stdin handler
678        tokio::spawn(async move {
679            let mut stdin = stdin;
680            debug!("Stdin handler started");
681            while let Some(line) = stdin_rx.recv().await {
682                debug!("Received line from channel: {}", line);
683                if let Err(e) = stdin.write_all(line.as_bytes()).await {
684                    error!("Failed to write to stdin: {}", e);
685                    break;
686                }
687                if let Err(e) = stdin.write_all(b"\n").await {
688                    error!("Failed to write newline: {}", e);
689                    break;
690                }
691                if let Err(e) = stdin.flush().await {
692                    error!("Failed to flush stdin: {}", e);
693                    break;
694                }
695                debug!("Successfully sent to Claude process: {}", line);
696            }
697            debug!("Stdin handler ended");
698        });
699
700        // Create channel for SDK control requests
701        let (sdk_control_tx, sdk_control_rx) = mpsc::channel::<serde_json::Value>(buffer_size);
702        
703        // Spawn stdout handler
704        let message_broadcast_tx_clone = message_broadcast_tx.clone();
705        let control_tx_clone = control_tx.clone();
706        let sdk_control_tx_clone = sdk_control_tx.clone();
707        tokio::spawn(async move {
708            debug!("Stdout handler started");
709            let reader = BufReader::new(stdout);
710            let mut lines = reader.lines();
711
712            while let Ok(Some(line)) = lines.next_line().await {
713                if line.trim().is_empty() {
714                    continue;
715                }
716
717                debug!("Claude output: {}", line);
718
719                // Try to parse as JSON
720                match serde_json::from_str::<serde_json::Value>(&line) {
721                    Ok(json) => {
722                        // Check message type
723                        if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
724                            // Handle control responses - these are responses to OUR control requests
725                            if msg_type == "control_response" {
726                                debug!("Received control response: {:?}", json);
727
728                                // Send to sdk_control channel for control protocol mode
729                                let _ = sdk_control_tx_clone.send(json.clone()).await;
730
731                                // Also parse and send to legacy control_tx for non-control-protocol mode
732                                // (needed for interrupt functionality when query_handler is None)
733                                // CLI returns: {"type":"control_response","response":{"subtype":"success","request_id":"..."}}
734                                // or: {"type":"control_response","response":{"subtype":"error","request_id":"...","error":"..."}}
735                                if let Some(response_obj) = json.get("response")
736                                    && let Some(request_id) = response_obj.get("request_id")
737                                        .or_else(|| response_obj.get("requestId"))
738                                        .and_then(|v| v.as_str())
739                                    {
740                                        // Determine success from subtype
741                                        let subtype = response_obj.get("subtype").and_then(|v| v.as_str());
742                                        let success = subtype == Some("success");
743
744                                        let control_resp = ControlResponse::InterruptAck {
745                                            request_id: request_id.to_string(),
746                                            success,
747                                        };
748                                        let _ = control_tx_clone.send(control_resp).await;
749                                    }
750                                continue;
751                            }
752
753                            // Handle control requests FROM CLI (standard format)
754                            if msg_type == "control_request" {
755                                debug!("Received control request from CLI: {:?}", json);
756                                // Send the FULL message including requestId and request
757                                let _ = sdk_control_tx_clone.send(json.clone()).await;
758                                continue;
759                            }
760
761                            // Handle control messages (new format)
762                            if msg_type == "control"
763                                && let Some(control) = json.get("control") {
764                                    debug!("Received control message: {:?}", control);
765                                    let _ = sdk_control_tx_clone.send(control.clone()).await;
766                                    continue;
767                                }
768
769                            // Handle SDK control requests FROM CLI (legacy format)
770                            if msg_type == "sdk_control_request" {
771                                // Send the FULL message including requestId
772                                debug!("Received SDK control request (legacy): {:?}", json);
773                                let _ = sdk_control_tx_clone.send(json.clone()).await;
774                                continue;
775                            }
776                            
777                            // Check for system messages with SDK control subtypes
778                            if msg_type == "system"
779                                && let Some(subtype) = json.get("subtype").and_then(|v| v.as_str())
780                                    && subtype.starts_with("sdk_control:") {
781                                        // This is an SDK control message
782                                        debug!("Received SDK control message: {}", subtype);
783                                        let _ = sdk_control_tx_clone.send(json.clone()).await;
784                                        // Still parse as regular message for now
785                                    }
786                        }
787
788                        // Try to parse as a regular message
789                        match crate::message_parser::parse_message(json) {
790                            Ok(Some(message)) => {
791                                // Use broadcast send which doesn't fail if no receivers
792                                let _ = message_broadcast_tx_clone.send(message);
793                            }
794                            Ok(None) => {
795                                // Ignore non-message JSON
796                            }
797                            Err(e) => {
798                                warn!("Failed to parse message: {}", e);
799                            }
800                        }
801                    }
802                    Err(e) => {
803                        warn!("Failed to parse JSON: {} - Line: {}", e, line);
804                    }
805                }
806            }
807            info!("Stdout reader ended");
808        });
809
810        // Spawn stderr handler - capture error messages for better diagnostics
811        let message_broadcast_tx_for_error = message_broadcast_tx.clone();
812        let debug_stderr = self.options.debug_stderr.clone();
813        let stderr_callback = self.options.stderr_callback.clone();
814        tokio::spawn(async move {
815            let reader = BufReader::new(stderr);
816            let mut lines = reader.lines();
817            let mut error_buffer = Vec::new();
818            
819            while let Ok(Some(line)) = lines.next_line().await {
820                if !line.trim().is_empty() {
821                    // If debug_stderr is set, write to it
822                    if let Some(ref debug_output) = debug_stderr {
823                        let mut output = debug_output.lock().await;
824                        let _ = writeln!(output, "{line}");
825                        let _ = output.flush();
826                    }
827
828                    if let Some(ref callback) = stderr_callback {
829                        callback.as_ref()(line.as_str());
830                    }
831                    
832                    error!("Claude CLI stderr: {}", line);
833                    error_buffer.push(line.clone());
834                    
835                    // Check for common error patterns
836                    if line.contains("command not found") || line.contains("No such file") {
837                        error!("Claude CLI binary not found or not executable");
838                    } else if line.contains("ENOENT") || line.contains("spawn") {
839                        error!("Failed to spawn Claude CLI process - binary may not be installed");
840                    } else if line.contains("authentication") || line.contains("API key") || line.contains("Unauthorized") {
841                        error!("Claude CLI authentication error - please run 'claude-code api login'");
842                    } else if line.contains("model") && (line.contains("not available") || line.contains("not found")) {
843                        error!("Model not available for your account: {}", line);
844                    } else if line.contains("Error:") || line.contains("error:") {
845                        error!("Claude CLI error detected: {}", line);
846                    }
847                }
848            }
849            
850            // If we collected any errors, log them
851            if !error_buffer.is_empty() {
852                let error_msg = error_buffer.join("\n");
853                error!("Claude CLI stderr output collected:\n{}", error_msg);
854                
855                // Try to send an error message through the broadcast channel
856                let _ = message_broadcast_tx_for_error.send(Message::System {
857                    subtype: "error".to_string(),
858                    data: serde_json::json!({
859                        "source": "stderr",
860                        "error": "Claude CLI error output",
861                        "details": error_msg
862                    }),
863                });
864            }
865        });
866
867        // Store handles
868        self.child = Some(child);
869        self.stdin_tx = Some(stdin_tx);
870        self.message_broadcast_tx = Some(message_broadcast_tx);
871        self.control_rx = Some(control_rx);
872        self.sdk_control_rx = Some(sdk_control_rx);
873        self.state = TransportState::Connected;
874
875        Ok(())
876    }
877}
878
879#[async_trait]
880impl Transport for SubprocessTransport {
881    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
882        self
883    }
884    
885    async fn connect(&mut self) -> Result<()> {
886        if self.state == TransportState::Connected {
887            return Ok(());
888        }
889
890        // Check CLI version before connecting
891        if let Err(e) = self.check_cli_version().await {
892            warn!("CLI version check failed: {}", e);
893        }
894
895        self.spawn_process().await?;
896        info!("Connected to Claude CLI");
897        Ok(())
898    }
899
900    async fn send_message(&mut self, message: InputMessage) -> Result<()> {
901        if self.state != TransportState::Connected {
902            return Err(SdkError::InvalidState {
903                message: "Not connected".into(),
904            });
905        }
906
907        let json = serde_json::to_string(&message)?;
908        debug!("Serialized message: {}", json);
909
910        if let Some(ref tx) = self.stdin_tx {
911            debug!("Sending message to stdin channel");
912            tx.send(json).await?;
913            debug!("Message sent to channel");
914            Ok(())
915        } else {
916            Err(SdkError::InvalidState {
917                message: "Stdin channel not available".into(),
918            })
919        }
920    }
921
922    fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>> {
923        if let Some(ref tx) = self.message_broadcast_tx {
924            // Create a new receiver from the broadcast sender
925            let rx = tx.subscribe();
926            // Convert broadcast receiver to stream
927            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
928                |result| async move {
929                    match result {
930                        Ok(msg) => Some(Ok(msg)),
931                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(
932                            n,
933                        )) => {
934                            warn!("Receiver lagged by {} messages", n);
935                            None
936                        }
937                    }
938                },
939            ))
940        } else {
941            Box::pin(futures::stream::empty())
942        }
943    }
944
945    async fn send_control_request(&mut self, request: ControlRequest) -> Result<()> {
946        if self.state != TransportState::Connected {
947            return Err(SdkError::InvalidState {
948                message: "Not connected".into(),
949            });
950        }
951
952        self.request_counter += 1;
953        let control_msg = match request {
954            ControlRequest::Interrupt { request_id } => {
955                serde_json::json!({
956                    "type": "control_request",
957                    "request": {
958                        "type": "interrupt",
959                        "request_id": request_id
960                    }
961                })
962            }
963        };
964
965        let json = serde_json::to_string(&control_msg)?;
966
967        if let Some(ref tx) = self.stdin_tx {
968            tx.send(json).await?;
969            Ok(())
970        } else {
971            Err(SdkError::InvalidState {
972                message: "Stdin channel not available".into(),
973            })
974        }
975    }
976
977    async fn receive_control_response(&mut self) -> Result<Option<ControlResponse>> {
978        if let Some(ref mut rx) = self.control_rx {
979            Ok(rx.recv().await)
980        } else {
981            Ok(None)
982        }
983    }
984    
985    async fn send_sdk_control_request(&mut self, request: serde_json::Value) -> Result<()> {
986        // The request is already properly formatted as {"type": "control_request", ...}
987        // Just send it directly without wrapping
988        let json = serde_json::to_string(&request)?;
989
990        if let Some(ref tx) = self.stdin_tx {
991            tx.send(json).await?;
992            Ok(())
993        } else {
994            Err(SdkError::InvalidState {
995                message: "Stdin channel not available".into(),
996            })
997        }
998    }
999    
1000    async fn send_sdk_control_response(&mut self, response: serde_json::Value) -> Result<()> {
1001        // Wrap the response in control_response format expected by CLI
1002        // The response should have: {"type": "control_response", "response": {...}}
1003        let control_response = serde_json::json!({
1004            "type": "control_response",
1005            "response": response
1006        });
1007
1008        let json = serde_json::to_string(&control_response)?;
1009
1010        if let Some(ref tx) = self.stdin_tx {
1011            tx.send(json).await?;
1012            Ok(())
1013        } else {
1014            Err(SdkError::InvalidState {
1015                message: "Stdin channel not available".into(),
1016            })
1017        }
1018    }
1019
1020    fn is_connected(&self) -> bool {
1021        self.state == TransportState::Connected
1022    }
1023
1024    async fn disconnect(&mut self) -> Result<()> {
1025        if self.state != TransportState::Connected {
1026            return Ok(());
1027        }
1028
1029        self.state = TransportState::Disconnecting;
1030
1031        // Close stdin channel
1032        self.stdin_tx.take();
1033
1034        // Kill the child process
1035        if let Some(mut child) = self.child.take() {
1036            match child.kill().await {
1037                Ok(()) => info!("Claude CLI process terminated"),
1038                Err(e) => warn!("Failed to kill Claude CLI process: {}", e),
1039            }
1040        }
1041
1042        self.state = TransportState::Disconnected;
1043        Ok(())
1044    }
1045
1046    fn take_sdk_control_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<serde_json::Value>> {
1047        self.sdk_control_rx.take()
1048    }
1049
1050    async fn end_input(&mut self) -> Result<()> {
1051        // Close stdin channel to signal end of input
1052        self.stdin_tx.take();
1053        Ok(())
1054    }
1055}
1056
1057impl Drop for SubprocessTransport {
1058    fn drop(&mut self) {
1059        if let Some(mut child) = self.child.take() {
1060            // Try to kill the process
1061            let _ = child.start_kill();
1062        }
1063    }
1064}
1065
1066/// Find the Claude CLI binary
1067///
1068/// Search order:
1069/// 1. System PATH (`claude`, `claude-code`)
1070/// 2. SDK cache directory (auto-downloaded CLI)
1071/// 3. Common installation locations
1072pub fn find_claude_cli() -> Result<PathBuf> {
1073    // First check if it's in PATH - try both 'claude' and 'claude-code'
1074    for cmd_name in &["claude", "claude-code"] {
1075        if let Ok(path) = which::which(cmd_name) {
1076            debug!("Found Claude CLI in PATH at: {}", path.display());
1077            return Ok(path);
1078        }
1079    }
1080
1081    // Check SDK cache directory (for auto-downloaded CLI)
1082    if let Some(cached_path) = crate::cli_download::get_cached_cli_path() {
1083        if cached_path.exists() && cached_path.is_file() {
1084            debug!("Found cached Claude CLI at: {}", cached_path.display());
1085            return Ok(cached_path);
1086        }
1087    }
1088
1089    // Check common installation locations
1090    let home = dirs::home_dir().ok_or_else(|| SdkError::CliNotFound {
1091        searched_paths: "Unable to determine home directory".into(),
1092    })?;
1093
1094    let locations = vec![
1095        // npm global installations
1096        home.join(".npm-global/bin/claude"),
1097        home.join(".npm-global/bin/claude-code"),
1098        PathBuf::from("/usr/local/bin/claude"),
1099        PathBuf::from("/usr/local/bin/claude-code"),
1100        // Local installations
1101        home.join(".local/bin/claude"),
1102        home.join(".local/bin/claude-code"),
1103        home.join("node_modules/.bin/claude"),
1104        home.join("node_modules/.bin/claude-code"),
1105        // Yarn installations
1106        home.join(".yarn/bin/claude"),
1107        home.join(".yarn/bin/claude-code"),
1108        // macOS specific npm location
1109        PathBuf::from("/opt/homebrew/bin/claude"),
1110        PathBuf::from("/opt/homebrew/bin/claude-code"),
1111        // Claude local directory
1112        home.join(".claude/local/claude"),
1113    ];
1114
1115    let mut searched = Vec::new();
1116    for path in &locations {
1117        searched.push(path.display().to_string());
1118        if path.exists() && path.is_file() {
1119            debug!("Found Claude CLI at: {}", path.display());
1120            return Ok(path.clone());
1121        }
1122    }
1123
1124    // Log detailed error information
1125    warn!("Claude CLI not found in any standard location");
1126    warn!("Searched paths: {:?}", searched);
1127
1128    // Check if Node.js is installed
1129    if which::which("node").is_err() && which::which("npm").is_err() {
1130        error!("Node.js/npm not found - Claude CLI requires Node.js");
1131        return Err(SdkError::CliNotFound {
1132            searched_paths: format!(
1133                "Node.js is not installed. Install from https://nodejs.org/\n\n\
1134                Alternatively, enable auto_download_cli to automatically download the CLI:\n\
1135                ```rust\n\
1136                let options = ClaudeCodeOptions::builder()\n\
1137                    .auto_download_cli(true)\n\
1138                    .build();\n\
1139                ```\n\n\
1140                Searched in:\n{}",
1141                searched.join("\n")
1142            ),
1143        });
1144    }
1145
1146    Err(SdkError::CliNotFound {
1147        searched_paths: format!(
1148            "Claude CLI not found.\n\n\
1149            Option 1 - Auto-download (recommended):\n\
1150            ```rust\n\
1151            let options = ClaudeCodeOptions::builder()\n\
1152                .auto_download_cli(true)\n\
1153                .build();\n\
1154            ```\n\n\
1155            Option 2 - Manual installation:\n\
1156            npm install -g @anthropic-ai/claude-code\n\n\
1157            Searched in:\n{}",
1158            searched.join("\n")
1159        ),
1160    })
1161}
1162
1163pub(crate) fn apply_process_user(cmd: &mut Command, user: &str) -> Result<()> {
1164    let user = user.trim();
1165    if user.is_empty() {
1166        return Err(SdkError::ConfigError(
1167            "options.user must be a non-empty username or uid".into(),
1168        ));
1169    }
1170
1171    apply_process_user_inner(cmd, user)
1172}
1173
1174#[cfg(unix)]
1175fn apply_process_user_inner(cmd: &mut Command, user: &str) -> Result<()> {
1176    use std::ffi::CString;
1177    use std::mem::MaybeUninit;
1178    use std::os::unix::process::CommandExt;
1179    use std::ptr;
1180
1181    fn passwd_buf_len() -> usize {
1182        let buf_len = unsafe { libc::sysconf(libc::_SC_GETPW_R_SIZE_MAX) };
1183        if buf_len <= 0 {
1184            16 * 1024
1185        } else {
1186            buf_len as usize
1187        }
1188    }
1189
1190    fn lookup_by_name(name: &str) -> Result<(u32, u32)> {
1191        let name = CString::new(name).map_err(|_| {
1192            SdkError::ConfigError("options.user must not contain NUL bytes".into())
1193        })?;
1194
1195        let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1196        let mut result: *mut libc::passwd = ptr::null_mut();
1197        let mut buf = vec![0u8; passwd_buf_len()];
1198
1199        let rc = unsafe {
1200            libc::getpwnam_r(
1201                name.as_ptr(),
1202                pwd.as_mut_ptr(),
1203                buf.as_mut_ptr() as *mut libc::c_char,
1204                buf.len(),
1205                &mut result,
1206            )
1207        };
1208        if rc != 0 {
1209            return Err(SdkError::ConfigError(format!(
1210                "Failed to resolve options.user={}: getpwnam_r returned {}",
1211                name.to_string_lossy(),
1212                rc
1213            )));
1214        }
1215        if result.is_null() {
1216            return Err(SdkError::ConfigError(format!(
1217                "User not found: {}",
1218                name.to_string_lossy()
1219            )));
1220        }
1221
1222        let pwd = unsafe { pwd.assume_init() };
1223        Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1224    }
1225
1226    fn lookup_by_uid(uid: u32) -> Result<(u32, u32)> {
1227        let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1228        let mut result: *mut libc::passwd = ptr::null_mut();
1229        let mut buf = vec![0u8; passwd_buf_len()];
1230
1231        let rc = unsafe {
1232            libc::getpwuid_r(
1233                uid as libc::uid_t,
1234                pwd.as_mut_ptr(),
1235                buf.as_mut_ptr() as *mut libc::c_char,
1236                buf.len(),
1237                &mut result,
1238            )
1239        };
1240        if rc != 0 {
1241            return Err(SdkError::ConfigError(format!(
1242                "Failed to resolve options.user={}: getpwuid_r returned {}",
1243                uid, rc
1244            )));
1245        }
1246        if result.is_null() {
1247            return Err(SdkError::ConfigError(format!("User not found for uid: {}", uid)));
1248        }
1249
1250        let pwd = unsafe { pwd.assume_init() };
1251        Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1252    }
1253
1254    let (uid, gid) = match user.parse::<u32>() {
1255        Ok(uid) => lookup_by_uid(uid)?,
1256        Err(_) => lookup_by_name(user)?,
1257    };
1258
1259    cmd.as_std_mut().uid(uid).gid(gid);
1260    Ok(())
1261}
1262
1263#[cfg(not(unix))]
1264fn apply_process_user_inner(_cmd: &mut Command, _user: &str) -> Result<()> {
1265    Err(SdkError::NotSupported {
1266        feature: "options.user is only supported on Unix platforms".into(),
1267    })
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272    use super::*;
1273
1274    #[test]
1275    fn test_find_claude_cli_error_message() {
1276        // Test error message format without relying on CLI not being found
1277        let error = SdkError::CliNotFound {
1278            searched_paths: "test paths".to_string(),
1279        };
1280        let error_msg = error.to_string();
1281        assert!(error_msg.contains("npm install -g @anthropic-ai/claude-code"));
1282        assert!(error_msg.contains("test paths"));
1283    }
1284
1285    #[tokio::test]
1286    async fn test_transport_lifecycle() {
1287        let options = ClaudeCodeOptions::default();
1288        let transport = SubprocessTransport::new(options).unwrap_or_else(|_| {
1289            // Use a dummy path for testing
1290            SubprocessTransport::with_cli_path(ClaudeCodeOptions::default(), "/usr/bin/true")
1291        });
1292
1293        assert!(!transport.is_connected());
1294        assert_eq!(transport.state, TransportState::Disconnected);
1295    }
1296
1297    #[test]
1298    fn test_semver_parse() {
1299        // Test basic version parsing
1300        let v = SemVer::parse("2.0.0").unwrap();
1301        assert_eq!(v.major, 2);
1302        assert_eq!(v.minor, 0);
1303        assert_eq!(v.patch, 0);
1304
1305        // Test with 'v' prefix
1306        let v = SemVer::parse("v2.1.3").unwrap();
1307        assert_eq!(v.major, 2);
1308        assert_eq!(v.minor, 1);
1309        assert_eq!(v.patch, 3);
1310
1311        // Test npm-style version
1312        let v = SemVer::parse("@anthropic-ai/claude-code/2.5.1").unwrap();
1313        assert_eq!(v.major, 2);
1314        assert_eq!(v.minor, 5);
1315        assert_eq!(v.patch, 1);
1316
1317        // Test version without patch
1318        let v = SemVer::parse("2.1").unwrap();
1319        assert_eq!(v.major, 2);
1320        assert_eq!(v.minor, 1);
1321        assert_eq!(v.patch, 0);
1322    }
1323
1324    #[test]
1325    fn test_semver_compare() {
1326        let v1 = SemVer::new(2, 0, 0);
1327        let v2 = SemVer::new(2, 0, 1);
1328        let v3 = SemVer::new(2, 1, 0);
1329        let v4 = SemVer::new(3, 0, 0);
1330
1331        assert!(v1 < v2);
1332        assert!(v2 < v3);
1333        assert!(v3 < v4);
1334        assert!(v1 < v4);
1335
1336        let min_version = SemVer::new(2, 0, 0);
1337        assert!(SemVer::new(1, 9, 9) < min_version);
1338        assert!(SemVer::new(2, 0, 0) >= min_version);
1339        assert!(SemVer::new(2, 1, 0) >= min_version);
1340    }
1341}