Skip to main content

cc_sdk/transport/
subprocess.rs

1//! Subprocess-based transport implementation
2//!
3//! This module implements the Transport trait using a subprocess to run the Claude CLI.
4
5use super::{InputMessage, Transport, TransportState};
6use crate::{
7    errors::{Result, SdkError},
8    types::{ClaudeCodeOptions, ControlRequest, ControlResponse, Message, PermissionMode},
9};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::process::Stdio;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, warn};
19
20/// Default buffer size for channels
21const CHANNEL_BUFFER_SIZE: usize = 100;
22
23/// Minimum required CLI version
24const MIN_CLI_VERSION: (u32, u32, u32) = (2, 0, 0);
25
26/// Simple semantic version struct
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
28struct SemVer {
29    major: u32,
30    minor: u32,
31    patch: u32,
32}
33
34impl SemVer {
35    fn new(major: u32, minor: u32, patch: u32) -> Self {
36        Self {
37            major,
38            minor,
39            patch,
40        }
41    }
42
43    /// Parse semantic version from string (e.g., "2.0.0" or "v2.0.0")
44    fn parse(version: &str) -> Option<Self> {
45        let version = version.trim().trim_start_matches('v');
46
47        // Handle versions like "@anthropic-ai/claude-code/2.0.0"
48        let version = if let Some(v) = version.split('/').next_back() {
49            v
50        } else {
51            version
52        };
53
54        let parts: Vec<&str> = version.split('.').collect();
55        if parts.len() < 2 {
56            return None;
57        }
58
59        Some(Self {
60            major: parts[0].parse().ok()?,
61            minor: parts.get(1)?.parse().ok()?,
62            patch: parts.get(2).and_then(|p| p.parse().ok()).unwrap_or(0),
63        })
64    }
65}
66
67/// Subprocess-based transport for Claude CLI
68pub struct SubprocessTransport {
69    /// Configuration options
70    options: ClaudeCodeOptions,
71    /// CLI binary path
72    cli_path: PathBuf,
73    /// Child process
74    child: Option<Child>,
75    /// Sender for stdin
76    stdin_tx: Option<mpsc::Sender<String>>,
77    /// Sender for broadcasting messages to multiple receivers
78    message_broadcast_tx: Option<tokio::sync::broadcast::Sender<Message>>,
79    /// Receiver for control responses
80    control_rx: Option<mpsc::Receiver<ControlResponse>>,
81    /// Receiver for SDK control requests
82    sdk_control_rx: Option<mpsc::Receiver<serde_json::Value>>,
83    /// Transport state
84    state: TransportState,
85    /// Request counter for control requests
86    request_counter: u64,
87    /// Whether to close stdin after initial prompt
88    #[allow(dead_code)]
89    close_stdin_after_prompt: bool,
90}
91
92impl SubprocessTransport {
93    /// Create a new subprocess transport
94    pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
95        let cli_path = find_claude_cli()?;
96        Ok(Self {
97            options,
98            cli_path,
99            child: None,
100            stdin_tx: None,
101            message_broadcast_tx: None,
102            control_rx: None,
103            sdk_control_rx: None,
104            state: TransportState::Disconnected,
105            request_counter: 0,
106            close_stdin_after_prompt: false,
107        })
108    }
109
110    /// Create a new subprocess transport with async initialization
111    ///
112    /// This version supports auto-downloading the CLI if `auto_download_cli` is enabled
113    /// in the options and the CLI is not found.
114    pub async fn new_async(options: ClaudeCodeOptions) -> Result<Self> {
115        let cli_path = match find_claude_cli() {
116            Ok(path) => path,
117            Err(_) if options.auto_download_cli => {
118                info!("Claude CLI not found, attempting automatic download...");
119                crate::cli_download::download_cli(None, None).await?
120            }
121            Err(e) => return Err(e),
122        };
123
124        Ok(Self {
125            options,
126            cli_path,
127            child: None,
128            stdin_tx: None,
129            message_broadcast_tx: None,
130            control_rx: None,
131            sdk_control_rx: None,
132            state: TransportState::Disconnected,
133            request_counter: 0,
134            close_stdin_after_prompt: false,
135        })
136    }
137
138    fn build_settings_value(&self) -> Option<String> {
139        let has_settings = self.options.settings.is_some();
140        let has_sandbox = self.options.sandbox.is_some();
141
142        if !has_settings && !has_sandbox {
143            return None;
144        }
145
146        // If only settings path and no sandbox, pass through as-is
147        if has_settings && !has_sandbox {
148            return self.options.settings.clone();
149        }
150
151        // If we have sandbox settings, merge into a JSON object (Python parity)
152        let mut settings_obj = serde_json::Map::new();
153
154        if let Some(ref settings) = self.options.settings {
155            let settings_str = settings.trim();
156
157            let load_as_json_string =
158                |s: &str| -> Option<serde_json::Map<String, serde_json::Value>> {
159                match serde_json::from_str::<serde_json::Value>(s) {
160                    Ok(serde_json::Value::Object(map)) => Some(map),
161                    Ok(_) => {
162                        warn!("Settings JSON must be an object; ignoring provided JSON settings");
163                        None
164                    }
165                    Err(_) => None,
166                }
167            };
168
169            let load_from_file =
170                |path: &Path| -> Option<serde_json::Map<String, serde_json::Value>> {
171                let content = std::fs::read_to_string(path).ok()?;
172                match serde_json::from_str::<serde_json::Value>(&content) {
173                    Ok(serde_json::Value::Object(map)) => Some(map),
174                    Ok(_) => {
175                        warn!("Settings file JSON must be an object: {}", path.display());
176                        None
177                    }
178                    Err(e) => {
179                        warn!("Failed to parse settings file {}: {}", path.display(), e);
180                        None
181                    }
182                }
183            };
184
185            if settings_str.starts_with('{') && settings_str.ends_with('}') {
186                if let Some(map) = load_as_json_string(settings_str) {
187                    settings_obj = map;
188                } else {
189                    warn!(
190                        "Failed to parse settings as JSON, treating as file path: {}",
191                        settings_str
192                    );
193                    let settings_path = Path::new(settings_str);
194                    if settings_path.exists() {
195                        if let Some(map) = load_from_file(settings_path) {
196                            settings_obj = map;
197                        }
198                    } else {
199                        warn!("Settings file not found: {}", settings_path.display());
200                    }
201                }
202            } else {
203                let settings_path = Path::new(settings_str);
204                if settings_path.exists() {
205                    if let Some(map) = load_from_file(settings_path) {
206                        settings_obj = map;
207                    }
208                } else {
209                    warn!("Settings file not found: {}", settings_path.display());
210                }
211            }
212        }
213
214        if let Some(ref sandbox) = self.options.sandbox {
215            match serde_json::to_value(sandbox) {
216                Ok(value) => {
217                    settings_obj.insert("sandbox".to_string(), value);
218                }
219                Err(e) => {
220                    warn!("Failed to serialize sandbox settings: {}", e);
221                }
222            }
223        }
224
225        Some(serde_json::Value::Object(settings_obj).to_string())
226    }
227    
228    /// Subscribe to messages without borrowing self (for lock-free consumption)
229    pub fn subscribe_messages(&self) -> Option<Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>> {
230        self.message_broadcast_tx.as_ref().map(|tx| {
231            let rx = tx.subscribe();
232            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
233                |result| async move {
234                    match result {
235                        Ok(msg) => Some(Ok(msg)),
236                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
237                            warn!("Receiver lagged by {} messages", n);
238                            None
239                        }
240                    }
241                },
242            )) as Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>
243        })
244    }
245
246    /// Receive SDK control requests
247    #[allow(dead_code)]
248    pub async fn receive_sdk_control_request(&mut self) -> Option<serde_json::Value> {
249        if let Some(ref mut rx) = self.sdk_control_rx {
250            rx.recv().await
251        } else {
252            None
253        }
254    }
255    
256    /// Take the SDK control receiver (can only be called once)
257    pub fn take_sdk_control_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
258        self.sdk_control_rx.take()
259    }
260
261    /// Create with a specific CLI path
262    pub fn with_cli_path(options: ClaudeCodeOptions, cli_path: impl Into<PathBuf>) -> Self {
263        Self {
264            options,
265            cli_path: cli_path.into(),
266            child: None,
267            stdin_tx: None,
268            message_broadcast_tx: None,
269            control_rx: None,
270            sdk_control_rx: None,
271            state: TransportState::Disconnected,
272            request_counter: 0,
273            close_stdin_after_prompt: false,
274        }
275    }
276
277    /// Set whether to close stdin after sending the initial prompt
278    #[allow(dead_code)]
279    pub fn set_close_stdin_after_prompt(&mut self, close: bool) {
280        self.close_stdin_after_prompt = close;
281    }
282
283    /// Create transport for simple print mode (one-shot query)
284    #[allow(dead_code)]
285    pub fn for_print_mode(options: ClaudeCodeOptions, _prompt: String) -> Result<Self> {
286        let cli_path = find_claude_cli()?;
287        Ok(Self {
288            options,
289            cli_path,
290            child: None,
291            stdin_tx: None,
292            message_broadcast_tx: None,
293            control_rx: None,
294            sdk_control_rx: None,
295            state: TransportState::Disconnected,
296            request_counter: 0,
297            close_stdin_after_prompt: true,
298        })
299    }
300
301    /// Build the command with all necessary arguments
302    fn build_command(&self) -> Command {
303        let mut cmd = Command::new(&self.cli_path);
304
305        // Always use output-format stream-json and verbose (like Python SDK)
306        cmd.arg("--output-format").arg("stream-json");
307        cmd.arg("--verbose");
308
309        // For streaming/interactive mode, also add input-format stream-json
310        cmd.arg("--input-format").arg("stream-json");
311        
312        // Include partial messages if requested
313        if self.options.include_partial_messages {
314            cmd.arg("--include-partial-messages");
315        }
316        
317        // Add debug-to-stderr flag if debug_stderr is set
318        if self.options.debug_stderr.is_some() {
319            cmd.arg("--debug-to-stderr");
320        }
321        
322        // Handle max_output_tokens (priority: option > env var)
323        // Maximum safe value is 32000, values above this may cause issues
324        if let Some(max_tokens) = self.options.max_output_tokens {
325            // Option takes priority - validate and cap at 32000
326            let capped = max_tokens.clamp(1, 32000);
327            cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
328            debug!("Setting max_output_tokens from option: {}", capped);
329        } else {
330            // Fall back to environment variable handling
331            if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
332                if let Ok(tokens) = current_value.parse::<u32>() {
333                    if tokens > 32000 {
334                        warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
335                        cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
336                    }
337                    // If it's <= 32000, leave it as is
338                } else {
339                    // Invalid value, set to safe default
340                    warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
341                    cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
342                }
343            }
344        }
345
346        // System prompts (match Python SDK behavior)
347        //
348        // Python always passes `--system-prompt ""` when `system_prompt` is None.
349        if let Some(ref prompt_v2) = self.options.system_prompt_v2 {
350            match prompt_v2 {
351                crate::types::SystemPrompt::String(s) => {
352                    cmd.arg("--system-prompt").arg(s);
353                }
354                crate::types::SystemPrompt::Preset { append, .. } => {
355                    // Python only uses preset prompts to optionally append to the default preset.
356                    // It does not pass a preset selector flag to the CLI.
357                    if let Some(append_text) = append {
358                        cmd.arg("--append-system-prompt").arg(append_text);
359                    }
360                }
361            }
362        } else {
363            // Fallback to deprecated fields for backward compatibility
364            #[allow(deprecated)]
365            match self.options.system_prompt.as_deref() {
366                Some(prompt) => {
367                    cmd.arg("--system-prompt").arg(prompt);
368                }
369                None => {
370                    cmd.arg("--system-prompt").arg("");
371                }
372            }
373            #[allow(deprecated)]
374            if let Some(ref prompt) = self.options.append_system_prompt {
375                cmd.arg("--append-system-prompt").arg(prompt);
376            }
377        }
378
379        // Tool configuration
380        if !self.options.allowed_tools.is_empty() {
381            cmd.arg("--allowedTools")
382                .arg(self.options.allowed_tools.join(","));
383        }
384        if !self.options.disallowed_tools.is_empty() {
385            cmd.arg("--disallowedTools")
386                .arg(self.options.disallowed_tools.join(","));
387        }
388
389        // Permission mode
390        match self.options.permission_mode {
391            PermissionMode::Default => {
392                cmd.arg("--permission-mode").arg("default");
393            }
394            PermissionMode::AcceptEdits => {
395                cmd.arg("--permission-mode").arg("acceptEdits");
396            }
397            PermissionMode::Plan => {
398                cmd.arg("--permission-mode").arg("plan");
399            }
400            PermissionMode::BypassPermissions => {
401                cmd.arg("--permission-mode").arg("bypassPermissions");
402            }
403            PermissionMode::DontAsk => {
404                cmd.arg("--permission-mode").arg("dontAsk");
405            }
406        }
407
408        // Model
409        if let Some(ref model) = self.options.model {
410            cmd.arg("--model").arg(model);
411        }
412
413        // Permission prompt tool
414        if let Some(ref tool_name) = self.options.permission_prompt_tool_name {
415            cmd.arg("--permission-prompt-tool").arg(tool_name);
416        }
417
418        // Max turns
419        if let Some(max_turns) = self.options.max_turns {
420            cmd.arg("--max-turns").arg(max_turns.to_string());
421        }
422
423        // Thinking configuration (thinking takes priority over max_thinking_tokens)
424        if let Some(ref thinking) = self.options.thinking {
425            match thinking {
426                crate::types::ThinkingConfig::Enabled { budget_tokens } => {
427                    cmd.arg("--max-thinking-tokens")
428                        .arg(budget_tokens.to_string());
429                }
430                crate::types::ThinkingConfig::Disabled => {
431                    // Don't pass thinking tokens flag
432                }
433                crate::types::ThinkingConfig::Adaptive => {
434                    // Adaptive is the default, no flag needed
435                }
436            }
437        } else if let Some(max_thinking_tokens) = self.options.max_thinking_tokens {
438            if max_thinking_tokens > 0 {
439                cmd.arg("--max-thinking-tokens")
440                    .arg(max_thinking_tokens.to_string());
441            }
442        }
443
444        // Working directory
445        if let Some(ref cwd) = self.options.cwd {
446            cmd.current_dir(cwd);
447        }
448        
449        // Add environment variables
450        for (key, value) in &self.options.env {
451            cmd.env(key, value);
452        }
453
454        // MCP servers - use --mcp-config with JSON format like Python SDK
455        if !self.options.mcp_servers.is_empty() {
456            let mcp_config = serde_json::json!({
457                "mcpServers": self.options.mcp_servers
458            });
459            cmd.arg("--mcp-config").arg(mcp_config.to_string());
460        }
461
462        // Continue/resume
463        if self.options.continue_conversation {
464            cmd.arg("--continue");
465        }
466        if let Some(ref resume_id) = self.options.resume {
467            cmd.arg("--resume").arg(resume_id);
468        }
469
470        // Settings value (merge sandbox into settings if provided)
471        if let Some(settings_value) = self.build_settings_value() {
472            cmd.arg("--settings").arg(settings_value);
473        }
474
475        // Additional directories
476        for dir in &self.options.add_dirs {
477            cmd.arg("--add-dir").arg(dir);
478        }
479
480        // Fork session if requested
481        if self.options.fork_session {
482            cmd.arg("--fork-session");
483        }
484
485        // ========== Phase 3 CLI args (Python SDK v0.1.12+ sync) ==========
486
487        // Tools configuration (base set of tools)
488        if let Some(ref tools) = self.options.tools {
489            match tools {
490                crate::types::ToolsConfig::List(list) => {
491                    if list.is_empty() {
492                        cmd.arg("--tools").arg("");
493                    } else {
494                        cmd.arg("--tools").arg(list.join(","));
495                    }
496                }
497                crate::types::ToolsConfig::Preset(_preset) => {
498                    // Preset object - 'claude_code' preset maps to 'default'
499                    cmd.arg("--tools").arg("default");
500                }
501            }
502        }
503
504        // SDK betas
505        if !self.options.betas.is_empty() {
506            let betas: Vec<String> = self.options.betas.iter().map(|b| b.to_string()).collect();
507            cmd.arg("--betas").arg(betas.join(","));
508        }
509
510        // Max budget USD
511        if let Some(budget) = self.options.max_budget_usd {
512            cmd.arg("--max-budget-usd").arg(budget.to_string());
513        }
514
515        // Fallback model
516        if let Some(ref fallback) = self.options.fallback_model {
517            cmd.arg("--fallback-model").arg(fallback);
518        }
519
520        // File checkpointing
521        if self.options.enable_file_checkpointing {
522            cmd.env("CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", "true");
523        }
524
525        // Output format for structured outputs (json_schema only)
526        if let Some(ref format) = self.options.output_format
527            && format.get("type").and_then(|v| v.as_str()) == Some("json_schema")
528            && let Some(schema) = format.get("schema")
529            && let Ok(schema_json) = serde_json::to_string(schema)
530        {
531            cmd.arg("--json-schema").arg(schema_json);
532        }
533
534        // Plugin directories
535        for plugin in &self.options.plugins {
536            match plugin {
537                crate::types::SdkPluginConfig::Local { path } => {
538                    cmd.arg("--plugin-dir").arg(path);
539                }
540            }
541        }
542
543        // Programmatic agents
544        if let Some(ref agents) = self.options.agents
545            && !agents.is_empty()
546                && let Ok(json_str) = serde_json::to_string(agents) {
547                    cmd.arg("--agents").arg(json_str);
548                }
549
550        // Setting sources (comma-separated). Always pass a value for SDK parity with Python.
551        let sources_value = self
552            .options
553            .setting_sources
554            .as_ref()
555            .map(|sources| {
556                sources
557                    .iter()
558                    .map(|s| match s {
559                        crate::types::SettingSource::User => "user",
560                        crate::types::SettingSource::Project => "project",
561                        crate::types::SettingSource::Local => "local",
562                    })
563                    .collect::<Vec<_>>()
564                    .join(",")
565            })
566            .unwrap_or_default();
567        cmd.arg("--setting-sources").arg(sources_value);
568
569        // Effort level
570        if let Some(ref effort) = self.options.effort {
571            cmd.arg("--effort").arg(effort.to_string());
572        }
573
574        // Extra arguments
575        for (key, value) in &self.options.extra_args {
576            let flag = if key.starts_with("--") || key.starts_with("-") {
577                key.clone()
578            } else {
579                format!("--{key}")
580            };
581            cmd.arg(&flag);
582            if let Some(val) = value {
583                cmd.arg(val);
584            }
585        }
586
587        // Set up process pipes
588        cmd.stdin(Stdio::piped())
589            .stdout(Stdio::piped())
590            .stderr(Stdio::piped());
591
592        // Set environment variables to indicate SDK usage and version
593        cmd.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
594        cmd.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
595
596        // Debug log the full command being executed
597        debug!(
598            "Executing Claude CLI command: {} {:?}",
599            self.cli_path.display(),
600            cmd.as_std().get_args().collect::<Vec<_>>()
601        );
602
603        cmd
604    }
605
606    /// Check CLI version and warn if below minimum required version
607    async fn check_cli_version(&self) -> Result<()> {
608        // Run the CLI with --version flag (with a timeout to avoid hanging)
609        let output = tokio::time::timeout(
610            std::time::Duration::from_secs(5),
611            tokio::process::Command::new(&self.cli_path)
612                .arg("--version")
613                .output(),
614        )
615        .await;
616
617        let output = match output {
618            Ok(Ok(output)) => output,
619            Ok(Err(e)) => {
620                warn!("Failed to check CLI version: {}", e);
621                return Ok(()); // Don't fail connection, just warn
622            }
623            Err(_) => {
624                warn!("CLI version check timed out after 5 seconds");
625                return Ok(());
626            }
627        };
628
629        let version_str = String::from_utf8_lossy(&output.stdout);
630        let version_str = version_str.trim();
631
632        if let Some(semver) = SemVer::parse(version_str) {
633            let min_version = SemVer::new(MIN_CLI_VERSION.0, MIN_CLI_VERSION.1, MIN_CLI_VERSION.2);
634
635            if semver < min_version {
636                warn!(
637                    "⚠️  Claude CLI version {}.{}.{} is below minimum required version {}.{}.{}",
638                    semver.major,
639                    semver.minor,
640                    semver.patch,
641                    MIN_CLI_VERSION.0,
642                    MIN_CLI_VERSION.1,
643                    MIN_CLI_VERSION.2
644                );
645                warn!(
646                    "   Some features may not work correctly. Please upgrade with: npm install -g @anthropic-ai/claude-code@latest"
647                );
648            } else {
649                info!("Claude CLI version: {}.{}.{}", semver.major, semver.minor, semver.patch);
650            }
651        } else {
652            debug!("Could not parse CLI version: {}", version_str);
653        }
654
655        Ok(())
656    }
657
658    /// Spawn the process and set up communication channels
659    async fn spawn_process(&mut self) -> Result<()> {
660        self.state = TransportState::Connecting;
661
662        let mut cmd = self.build_command();
663        info!("Starting Claude CLI with command: {:?}", cmd);
664
665        if let Some(user) = self.options.user.as_deref() {
666            apply_process_user(&mut cmd, user)?;
667        }
668
669        let mut child = cmd.spawn().map_err(|e| {
670            error!("Failed to spawn Claude CLI: {}", e);
671            SdkError::ProcessError(e)
672        })?;
673
674        // Get stdio handles
675        let stdin = child
676            .stdin
677            .take()
678            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdin".into()))?;
679        let stdout = child
680            .stdout
681            .take()
682            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdout".into()))?;
683        let stderr = child
684            .stderr
685            .take()
686            .ok_or_else(|| SdkError::ConnectionError("Failed to get stderr".into()))?;
687
688        // Determine buffer size from options or use default
689        let buffer_size = self.options.cli_channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE);
690
691        // Create channels
692        let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(buffer_size);
693        // Use broadcast channel for messages to support multiple receivers
694        let (message_broadcast_tx, _) =
695            tokio::sync::broadcast::channel::<Message>(buffer_size);
696        let (control_tx, control_rx) = mpsc::channel::<ControlResponse>(buffer_size);
697
698        // Spawn stdin handler
699        tokio::spawn(async move {
700            let mut stdin = stdin;
701            debug!("Stdin handler started");
702            while let Some(line) = stdin_rx.recv().await {
703                debug!("Received line from channel: {}", line);
704                if let Err(e) = stdin.write_all(line.as_bytes()).await {
705                    error!("Failed to write to stdin: {}", e);
706                    break;
707                }
708                if let Err(e) = stdin.write_all(b"\n").await {
709                    error!("Failed to write newline: {}", e);
710                    break;
711                }
712                if let Err(e) = stdin.flush().await {
713                    error!("Failed to flush stdin: {}", e);
714                    break;
715                }
716                debug!("Successfully sent to Claude process: {}", line);
717            }
718            debug!("Stdin handler ended");
719        });
720
721        // Create channel for SDK control requests
722        let (sdk_control_tx, sdk_control_rx) = mpsc::channel::<serde_json::Value>(buffer_size);
723        
724        // Spawn stdout handler
725        let message_broadcast_tx_clone = message_broadcast_tx.clone();
726        let control_tx_clone = control_tx.clone();
727        let sdk_control_tx_clone = sdk_control_tx.clone();
728        tokio::spawn(async move {
729            debug!("Stdout handler started");
730            let reader = BufReader::new(stdout);
731            let mut lines = reader.lines();
732
733            while let Ok(Some(line)) = lines.next_line().await {
734                if line.trim().is_empty() {
735                    continue;
736                }
737
738                debug!("Claude output: {}", line);
739
740                // Try to parse as JSON
741                match serde_json::from_str::<serde_json::Value>(&line) {
742                    Ok(json) => {
743                        // Check message type
744                        if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
745                            // Handle control responses - these are responses to OUR control requests
746                            if msg_type == "control_response" {
747                                debug!("Received control response: {:?}", json);
748
749                                // Send to sdk_control channel for control protocol mode
750                                let _ = sdk_control_tx_clone.send(json.clone()).await;
751
752                                // Also parse and send to legacy control_tx for non-control-protocol mode
753                                // (needed for interrupt functionality when query_handler is None)
754                                // CLI returns: {"type":"control_response","response":{"subtype":"success","request_id":"..."}}
755                                // or: {"type":"control_response","response":{"subtype":"error","request_id":"...","error":"..."}}
756                                if let Some(response_obj) = json.get("response")
757                                    && let Some(request_id) = response_obj.get("request_id")
758                                        .or_else(|| response_obj.get("requestId"))
759                                        .and_then(|v| v.as_str())
760                                    {
761                                        // Determine success from subtype
762                                        let subtype = response_obj.get("subtype").and_then(|v| v.as_str());
763                                        let success = subtype == Some("success");
764
765                                        let control_resp = ControlResponse::InterruptAck {
766                                            request_id: request_id.to_string(),
767                                            success,
768                                        };
769                                        let _ = control_tx_clone.send(control_resp).await;
770                                    }
771                                continue;
772                            }
773
774                            // Handle control requests FROM CLI (standard format)
775                            if msg_type == "control_request" {
776                                debug!("Received control request from CLI: {:?}", json);
777                                // Send the FULL message including requestId and request
778                                let _ = sdk_control_tx_clone.send(json.clone()).await;
779                                continue;
780                            }
781
782                            // Handle control messages (new format)
783                            if msg_type == "control"
784                                && let Some(control) = json.get("control") {
785                                    debug!("Received control message: {:?}", control);
786                                    let _ = sdk_control_tx_clone.send(control.clone()).await;
787                                    continue;
788                                }
789
790                            // Handle SDK control requests FROM CLI (legacy format)
791                            if msg_type == "sdk_control_request" {
792                                // Send the FULL message including requestId
793                                debug!("Received SDK control request (legacy): {:?}", json);
794                                let _ = sdk_control_tx_clone.send(json.clone()).await;
795                                continue;
796                            }
797                            
798                            // Check for system messages with SDK control subtypes
799                            if msg_type == "system"
800                                && let Some(subtype) = json.get("subtype").and_then(|v| v.as_str())
801                                    && subtype.starts_with("sdk_control:") {
802                                        // This is an SDK control message
803                                        debug!("Received SDK control message: {}", subtype);
804                                        let _ = sdk_control_tx_clone.send(json.clone()).await;
805                                        // Still parse as regular message for now
806                                    }
807                        }
808
809                        // Try to parse as a regular message
810                        match crate::message_parser::parse_message(json) {
811                            Ok(Some(message)) => {
812                                // Use broadcast send which doesn't fail if no receivers
813                                let _ = message_broadcast_tx_clone.send(message);
814                            }
815                            Ok(None) => {
816                                // Ignore non-message JSON
817                            }
818                            Err(e) => {
819                                warn!("Failed to parse message: {}", e);
820                            }
821                        }
822                    }
823                    Err(e) => {
824                        warn!("Failed to parse JSON: {} - Line: {}", e, line);
825                    }
826                }
827            }
828            info!("Stdout reader ended");
829        });
830
831        // Spawn stderr handler - capture error messages for better diagnostics
832        let message_broadcast_tx_for_error = message_broadcast_tx.clone();
833        let debug_stderr = self.options.debug_stderr.clone();
834        let stderr_callback = self.options.stderr_callback.clone();
835        tokio::spawn(async move {
836            let reader = BufReader::new(stderr);
837            let mut lines = reader.lines();
838            let mut error_buffer = Vec::new();
839            
840            while let Ok(Some(line)) = lines.next_line().await {
841                if !line.trim().is_empty() {
842                    // If debug_stderr is set, write to it
843                    if let Some(ref debug_output) = debug_stderr {
844                        let mut output = debug_output.lock().await;
845                        let _ = writeln!(output, "{line}");
846                        let _ = output.flush();
847                    }
848
849                    if let Some(ref callback) = stderr_callback {
850                        callback.as_ref()(line.as_str());
851                    }
852                    
853                    error!("Claude CLI stderr: {}", line);
854                    error_buffer.push(line.clone());
855                    
856                    // Check for common error patterns
857                    if line.contains("command not found") || line.contains("No such file") {
858                        error!("Claude CLI binary not found or not executable");
859                    } else if line.contains("ENOENT") || line.contains("spawn") {
860                        error!("Failed to spawn Claude CLI process - binary may not be installed");
861                    } else if line.contains("authentication") || line.contains("API key") || line.contains("Unauthorized") {
862                        error!("Claude CLI authentication error - please run 'claude-code api login'");
863                    } else if line.contains("model") && (line.contains("not available") || line.contains("not found")) {
864                        error!("Model not available for your account: {}", line);
865                    } else if line.contains("Error:") || line.contains("error:") {
866                        error!("Claude CLI error detected: {}", line);
867                    }
868                }
869            }
870            
871            // If we collected any errors, log them
872            if !error_buffer.is_empty() {
873                let error_msg = error_buffer.join("\n");
874                error!("Claude CLI stderr output collected:\n{}", error_msg);
875                
876                // Try to send an error message through the broadcast channel
877                let _ = message_broadcast_tx_for_error.send(Message::System {
878                    subtype: "error".to_string(),
879                    data: serde_json::json!({
880                        "source": "stderr",
881                        "error": "Claude CLI error output",
882                        "details": error_msg
883                    }),
884                });
885            }
886        });
887
888        // Store handles
889        self.child = Some(child);
890        self.stdin_tx = Some(stdin_tx);
891        self.message_broadcast_tx = Some(message_broadcast_tx);
892        self.control_rx = Some(control_rx);
893        self.sdk_control_rx = Some(sdk_control_rx);
894        self.state = TransportState::Connected;
895
896        Ok(())
897    }
898}
899
900#[async_trait]
901impl Transport for SubprocessTransport {
902    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
903        self
904    }
905    
906    async fn connect(&mut self) -> Result<()> {
907        if self.state == TransportState::Connected {
908            return Ok(());
909        }
910
911        // Check CLI version before connecting
912        if let Err(e) = self.check_cli_version().await {
913            warn!("CLI version check failed: {}", e);
914        }
915
916        self.spawn_process().await?;
917        info!("Connected to Claude CLI");
918        Ok(())
919    }
920
921    async fn send_message(&mut self, message: InputMessage) -> Result<()> {
922        if self.state != TransportState::Connected {
923            return Err(SdkError::InvalidState {
924                message: "Not connected".into(),
925            });
926        }
927
928        let json = serde_json::to_string(&message)?;
929        debug!("Serialized message: {}", json);
930
931        if let Some(ref tx) = self.stdin_tx {
932            debug!("Sending message to stdin channel");
933            tx.send(json).await?;
934            debug!("Message sent to channel");
935            Ok(())
936        } else {
937            Err(SdkError::InvalidState {
938                message: "Stdin channel not available".into(),
939            })
940        }
941    }
942
943    fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>> {
944        if let Some(ref tx) = self.message_broadcast_tx {
945            // Create a new receiver from the broadcast sender
946            let rx = tx.subscribe();
947            // Convert broadcast receiver to stream
948            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
949                |result| async move {
950                    match result {
951                        Ok(msg) => Some(Ok(msg)),
952                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(
953                            n,
954                        )) => {
955                            warn!("Receiver lagged by {} messages", n);
956                            None
957                        }
958                    }
959                },
960            ))
961        } else {
962            Box::pin(futures::stream::empty())
963        }
964    }
965
966    async fn send_control_request(&mut self, request: ControlRequest) -> Result<()> {
967        if self.state != TransportState::Connected {
968            return Err(SdkError::InvalidState {
969                message: "Not connected".into(),
970            });
971        }
972
973        self.request_counter += 1;
974        let control_msg = match request {
975            ControlRequest::Interrupt { request_id } => {
976                serde_json::json!({
977                    "type": "control_request",
978                    "request": {
979                        "type": "interrupt",
980                        "request_id": request_id
981                    }
982                })
983            }
984        };
985
986        let json = serde_json::to_string(&control_msg)?;
987
988        if let Some(ref tx) = self.stdin_tx {
989            tx.send(json).await?;
990            Ok(())
991        } else {
992            Err(SdkError::InvalidState {
993                message: "Stdin channel not available".into(),
994            })
995        }
996    }
997
998    async fn receive_control_response(&mut self) -> Result<Option<ControlResponse>> {
999        if let Some(ref mut rx) = self.control_rx {
1000            Ok(rx.recv().await)
1001        } else {
1002            Ok(None)
1003        }
1004    }
1005    
1006    async fn send_sdk_control_request(&mut self, request: serde_json::Value) -> Result<()> {
1007        // The request is already properly formatted as {"type": "control_request", ...}
1008        // Just send it directly without wrapping
1009        let json = serde_json::to_string(&request)?;
1010
1011        if let Some(ref tx) = self.stdin_tx {
1012            tx.send(json).await?;
1013            Ok(())
1014        } else {
1015            Err(SdkError::InvalidState {
1016                message: "Stdin channel not available".into(),
1017            })
1018        }
1019    }
1020    
1021    async fn send_sdk_control_response(&mut self, response: serde_json::Value) -> Result<()> {
1022        // Wrap the response in control_response format expected by CLI
1023        // The response should have: {"type": "control_response", "response": {...}}
1024        let control_response = serde_json::json!({
1025            "type": "control_response",
1026            "response": response
1027        });
1028
1029        let json = serde_json::to_string(&control_response)?;
1030
1031        if let Some(ref tx) = self.stdin_tx {
1032            tx.send(json).await?;
1033            Ok(())
1034        } else {
1035            Err(SdkError::InvalidState {
1036                message: "Stdin channel not available".into(),
1037            })
1038        }
1039    }
1040
1041    fn is_connected(&self) -> bool {
1042        self.state == TransportState::Connected
1043    }
1044
1045    async fn disconnect(&mut self) -> Result<()> {
1046        if self.state != TransportState::Connected {
1047            return Ok(());
1048        }
1049
1050        self.state = TransportState::Disconnecting;
1051
1052        // Close stdin channel
1053        self.stdin_tx.take();
1054
1055        // Kill the child process
1056        if let Some(mut child) = self.child.take() {
1057            match child.kill().await {
1058                Ok(()) => info!("Claude CLI process terminated"),
1059                Err(e) => warn!("Failed to kill Claude CLI process: {}", e),
1060            }
1061        }
1062
1063        self.state = TransportState::Disconnected;
1064        Ok(())
1065    }
1066
1067    fn take_sdk_control_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<serde_json::Value>> {
1068        self.sdk_control_rx.take()
1069    }
1070
1071    async fn end_input(&mut self) -> Result<()> {
1072        // Close stdin channel to signal end of input
1073        self.stdin_tx.take();
1074        Ok(())
1075    }
1076}
1077
1078impl Drop for SubprocessTransport {
1079    fn drop(&mut self) {
1080        if let Some(mut child) = self.child.take() {
1081            // Try to kill the process
1082            let _ = child.start_kill();
1083        }
1084    }
1085}
1086
1087/// Find the Claude CLI binary
1088///
1089/// Search order:
1090/// 1. System PATH (`claude`, `claude-code`)
1091/// 2. SDK cache directory (auto-downloaded CLI)
1092/// 3. Common installation locations
1093pub fn find_claude_cli() -> Result<PathBuf> {
1094    // First check if it's in PATH - try both 'claude' and 'claude-code'
1095    for cmd_name in &["claude", "claude-code"] {
1096        if let Ok(path) = which::which(cmd_name) {
1097            debug!("Found Claude CLI in PATH at: {}", path.display());
1098            return Ok(path);
1099        }
1100    }
1101
1102    // Check SDK cache directory (for auto-downloaded CLI)
1103    if let Some(cached_path) = crate::cli_download::get_cached_cli_path() {
1104        if cached_path.exists() && cached_path.is_file() {
1105            debug!("Found cached Claude CLI at: {}", cached_path.display());
1106            return Ok(cached_path);
1107        }
1108    }
1109
1110    // Check common installation locations
1111    let home = dirs::home_dir().ok_or_else(|| SdkError::CliNotFound {
1112        searched_paths: "Unable to determine home directory".into(),
1113    })?;
1114
1115    let locations = vec![
1116        // npm global installations
1117        home.join(".npm-global/bin/claude"),
1118        home.join(".npm-global/bin/claude-code"),
1119        PathBuf::from("/usr/local/bin/claude"),
1120        PathBuf::from("/usr/local/bin/claude-code"),
1121        // Local installations
1122        home.join(".local/bin/claude"),
1123        home.join(".local/bin/claude-code"),
1124        home.join("node_modules/.bin/claude"),
1125        home.join("node_modules/.bin/claude-code"),
1126        // Yarn installations
1127        home.join(".yarn/bin/claude"),
1128        home.join(".yarn/bin/claude-code"),
1129        // macOS specific npm location
1130        PathBuf::from("/opt/homebrew/bin/claude"),
1131        PathBuf::from("/opt/homebrew/bin/claude-code"),
1132        // Claude local directory
1133        home.join(".claude/local/claude"),
1134    ];
1135
1136    let mut searched = Vec::new();
1137    for path in &locations {
1138        searched.push(path.display().to_string());
1139        if path.exists() && path.is_file() {
1140            debug!("Found Claude CLI at: {}", path.display());
1141            return Ok(path.clone());
1142        }
1143    }
1144
1145    // Log detailed error information
1146    warn!("Claude CLI not found in any standard location");
1147    warn!("Searched paths: {:?}", searched);
1148
1149    // Check if Node.js is installed
1150    if which::which("node").is_err() && which::which("npm").is_err() {
1151        error!("Node.js/npm not found - Claude CLI requires Node.js");
1152        return Err(SdkError::CliNotFound {
1153            searched_paths: format!(
1154                "Node.js is not installed. Install from https://nodejs.org/\n\n\
1155                Alternatively, enable auto_download_cli to automatically download the CLI:\n\
1156                ```rust\n\
1157                let options = ClaudeCodeOptions::builder()\n\
1158                    .auto_download_cli(true)\n\
1159                    .build();\n\
1160                ```\n\n\
1161                Searched in:\n{}",
1162                searched.join("\n")
1163            ),
1164        });
1165    }
1166
1167    Err(SdkError::CliNotFound {
1168        searched_paths: format!(
1169            "Claude CLI not found.\n\n\
1170            Option 1 - Auto-download (recommended):\n\
1171            ```rust\n\
1172            let options = ClaudeCodeOptions::builder()\n\
1173                .auto_download_cli(true)\n\
1174                .build();\n\
1175            ```\n\n\
1176            Option 2 - Manual installation:\n\
1177            npm install -g @anthropic-ai/claude-code\n\n\
1178            Searched in:\n{}",
1179            searched.join("\n")
1180        ),
1181    })
1182}
1183
1184pub(crate) fn apply_process_user(cmd: &mut Command, user: &str) -> Result<()> {
1185    let user = user.trim();
1186    if user.is_empty() {
1187        return Err(SdkError::ConfigError(
1188            "options.user must be a non-empty username or uid".into(),
1189        ));
1190    }
1191
1192    apply_process_user_inner(cmd, user)
1193}
1194
1195#[cfg(unix)]
1196fn apply_process_user_inner(cmd: &mut Command, user: &str) -> Result<()> {
1197    use std::ffi::CString;
1198    use std::mem::MaybeUninit;
1199    use std::os::unix::process::CommandExt;
1200    use std::ptr;
1201
1202    fn passwd_buf_len() -> usize {
1203        let buf_len = unsafe { libc::sysconf(libc::_SC_GETPW_R_SIZE_MAX) };
1204        if buf_len <= 0 {
1205            16 * 1024
1206        } else {
1207            buf_len as usize
1208        }
1209    }
1210
1211    fn lookup_by_name(name: &str) -> Result<(u32, u32)> {
1212        let name = CString::new(name).map_err(|_| {
1213            SdkError::ConfigError("options.user must not contain NUL bytes".into())
1214        })?;
1215
1216        let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1217        let mut result: *mut libc::passwd = ptr::null_mut();
1218        let mut buf = vec![0u8; passwd_buf_len()];
1219
1220        let rc = unsafe {
1221            libc::getpwnam_r(
1222                name.as_ptr(),
1223                pwd.as_mut_ptr(),
1224                buf.as_mut_ptr() as *mut libc::c_char,
1225                buf.len(),
1226                &mut result,
1227            )
1228        };
1229        if rc != 0 {
1230            return Err(SdkError::ConfigError(format!(
1231                "Failed to resolve options.user={}: getpwnam_r returned {}",
1232                name.to_string_lossy(),
1233                rc
1234            )));
1235        }
1236        if result.is_null() {
1237            return Err(SdkError::ConfigError(format!(
1238                "User not found: {}",
1239                name.to_string_lossy()
1240            )));
1241        }
1242
1243        let pwd = unsafe { pwd.assume_init() };
1244        Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1245    }
1246
1247    fn lookup_by_uid(uid: u32) -> Result<(u32, u32)> {
1248        let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1249        let mut result: *mut libc::passwd = ptr::null_mut();
1250        let mut buf = vec![0u8; passwd_buf_len()];
1251
1252        let rc = unsafe {
1253            libc::getpwuid_r(
1254                uid as libc::uid_t,
1255                pwd.as_mut_ptr(),
1256                buf.as_mut_ptr() as *mut libc::c_char,
1257                buf.len(),
1258                &mut result,
1259            )
1260        };
1261        if rc != 0 {
1262            return Err(SdkError::ConfigError(format!(
1263                "Failed to resolve options.user={}: getpwuid_r returned {}",
1264                uid, rc
1265            )));
1266        }
1267        if result.is_null() {
1268            return Err(SdkError::ConfigError(format!("User not found for uid: {}", uid)));
1269        }
1270
1271        let pwd = unsafe { pwd.assume_init() };
1272        Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1273    }
1274
1275    let (uid, gid) = match user.parse::<u32>() {
1276        Ok(uid) => lookup_by_uid(uid)?,
1277        Err(_) => lookup_by_name(user)?,
1278    };
1279
1280    cmd.as_std_mut().uid(uid).gid(gid);
1281    Ok(())
1282}
1283
1284#[cfg(not(unix))]
1285fn apply_process_user_inner(_cmd: &mut Command, _user: &str) -> Result<()> {
1286    Err(SdkError::NotSupported {
1287        feature: "options.user is only supported on Unix platforms".into(),
1288    })
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use super::*;
1294
1295    #[test]
1296    fn test_find_claude_cli_error_message() {
1297        // Test error message format without relying on CLI not being found
1298        let error = SdkError::CliNotFound {
1299            searched_paths: "test paths".to_string(),
1300        };
1301        let error_msg = error.to_string();
1302        assert!(error_msg.contains("npm install -g @anthropic-ai/claude-code"));
1303        assert!(error_msg.contains("test paths"));
1304    }
1305
1306    #[tokio::test]
1307    async fn test_transport_lifecycle() {
1308        let options = ClaudeCodeOptions::default();
1309        let transport = SubprocessTransport::new(options).unwrap_or_else(|_| {
1310            // Use a dummy path for testing
1311            SubprocessTransport::with_cli_path(ClaudeCodeOptions::default(), "/usr/bin/true")
1312        });
1313
1314        assert!(!transport.is_connected());
1315        assert_eq!(transport.state, TransportState::Disconnected);
1316    }
1317
1318    #[test]
1319    fn test_semver_parse() {
1320        // Test basic version parsing
1321        let v = SemVer::parse("2.0.0").unwrap();
1322        assert_eq!(v.major, 2);
1323        assert_eq!(v.minor, 0);
1324        assert_eq!(v.patch, 0);
1325
1326        // Test with 'v' prefix
1327        let v = SemVer::parse("v2.1.3").unwrap();
1328        assert_eq!(v.major, 2);
1329        assert_eq!(v.minor, 1);
1330        assert_eq!(v.patch, 3);
1331
1332        // Test npm-style version
1333        let v = SemVer::parse("@anthropic-ai/claude-code/2.5.1").unwrap();
1334        assert_eq!(v.major, 2);
1335        assert_eq!(v.minor, 5);
1336        assert_eq!(v.patch, 1);
1337
1338        // Test version without patch
1339        let v = SemVer::parse("2.1").unwrap();
1340        assert_eq!(v.major, 2);
1341        assert_eq!(v.minor, 1);
1342        assert_eq!(v.patch, 0);
1343    }
1344
1345    #[test]
1346    fn test_semver_compare() {
1347        let v1 = SemVer::new(2, 0, 0);
1348        let v2 = SemVer::new(2, 0, 1);
1349        let v3 = SemVer::new(2, 1, 0);
1350        let v4 = SemVer::new(3, 0, 0);
1351
1352        assert!(v1 < v2);
1353        assert!(v2 < v3);
1354        assert!(v3 < v4);
1355        assert!(v1 < v4);
1356
1357        let min_version = SemVer::new(2, 0, 0);
1358        assert!(SemVer::new(1, 9, 9) < min_version);
1359        assert!(SemVer::new(2, 0, 0) >= min_version);
1360        assert!(SemVer::new(2, 1, 0) >= min_version);
1361    }
1362}