cc_sdk/transport/
subprocess.rs

1//! Subprocess-based transport implementation
2//!
3//! This module implements the Transport trait using a subprocess to run the Claude CLI.
4
5use super::{InputMessage, Transport, TransportState};
6use crate::{
7    errors::{Result, SdkError},
8    types::{ClaudeCodeOptions, ControlRequest, ControlResponse, Message, PermissionMode},
9};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use std::path::PathBuf;
13use std::pin::Pin;
14use std::process::Stdio;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, warn};
19
20/// Default buffer size for channels
21const CHANNEL_BUFFER_SIZE: usize = 100;
22
23/// Minimum required CLI version
24const MIN_CLI_VERSION: (u32, u32, u32) = (2, 0, 0);
25
26/// Simple semantic version struct
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
28struct SemVer {
29    major: u32,
30    minor: u32,
31    patch: u32,
32}
33
34impl SemVer {
35    fn new(major: u32, minor: u32, patch: u32) -> Self {
36        Self {
37            major,
38            minor,
39            patch,
40        }
41    }
42
43    /// Parse semantic version from string (e.g., "2.0.0" or "v2.0.0")
44    fn parse(version: &str) -> Option<Self> {
45        let version = version.trim().trim_start_matches('v');
46
47        // Handle versions like "@anthropic-ai/claude-code/2.0.0"
48        let version = if let Some(v) = version.split('/').next_back() {
49            v
50        } else {
51            version
52        };
53
54        let parts: Vec<&str> = version.split('.').collect();
55        if parts.len() < 2 {
56            return None;
57        }
58
59        Some(Self {
60            major: parts[0].parse().ok()?,
61            minor: parts.get(1)?.parse().ok()?,
62            patch: parts.get(2).and_then(|p| p.parse().ok()).unwrap_or(0),
63        })
64    }
65}
66
67/// Subprocess-based transport for Claude CLI
68pub struct SubprocessTransport {
69    /// Configuration options
70    options: ClaudeCodeOptions,
71    /// CLI binary path
72    cli_path: PathBuf,
73    /// Child process
74    child: Option<Child>,
75    /// Sender for stdin
76    stdin_tx: Option<mpsc::Sender<String>>,
77    /// Sender for broadcasting messages to multiple receivers
78    message_broadcast_tx: Option<tokio::sync::broadcast::Sender<Message>>,
79    /// Receiver for control responses
80    control_rx: Option<mpsc::Receiver<ControlResponse>>,
81    /// Receiver for SDK control requests
82    sdk_control_rx: Option<mpsc::Receiver<serde_json::Value>>,
83    /// Transport state
84    state: TransportState,
85    /// Request counter for control requests
86    request_counter: u64,
87    /// Whether to close stdin after initial prompt
88    #[allow(dead_code)]
89    close_stdin_after_prompt: bool,
90}
91
92impl SubprocessTransport {
93    /// Create a new subprocess transport
94    pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
95        let cli_path = find_claude_cli()?;
96        Ok(Self {
97            options,
98            cli_path,
99            child: None,
100            stdin_tx: None,
101            message_broadcast_tx: None,
102            control_rx: None,
103            sdk_control_rx: None,
104            state: TransportState::Disconnected,
105            request_counter: 0,
106            close_stdin_after_prompt: false,
107        })
108    }
109    
110    /// Subscribe to messages without borrowing self (for lock-free consumption)
111    pub fn subscribe_messages(&self) -> Option<Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>> {
112        self.message_broadcast_tx.as_ref().map(|tx| {
113            let rx = tx.subscribe();
114            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
115                |result| async move {
116                    match result {
117                        Ok(msg) => Some(Ok(msg)),
118                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
119                            warn!("Receiver lagged by {} messages", n);
120                            None
121                        }
122                    }
123                },
124            )) as Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>
125        })
126    }
127
128    /// Receive SDK control requests
129    #[allow(dead_code)]
130    pub async fn receive_sdk_control_request(&mut self) -> Option<serde_json::Value> {
131        if let Some(ref mut rx) = self.sdk_control_rx {
132            rx.recv().await
133        } else {
134            None
135        }
136    }
137    
138    /// Take the SDK control receiver (can only be called once)
139    pub fn take_sdk_control_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
140        self.sdk_control_rx.take()
141    }
142
143    /// Create with a specific CLI path
144    pub fn with_cli_path(options: ClaudeCodeOptions, cli_path: impl Into<PathBuf>) -> Self {
145        Self {
146            options,
147            cli_path: cli_path.into(),
148            child: None,
149            stdin_tx: None,
150            message_broadcast_tx: None,
151            control_rx: None,
152            sdk_control_rx: None,
153            state: TransportState::Disconnected,
154            request_counter: 0,
155            close_stdin_after_prompt: false,
156        }
157    }
158
159    /// Set whether to close stdin after sending the initial prompt
160    #[allow(dead_code)]
161    pub fn set_close_stdin_after_prompt(&mut self, close: bool) {
162        self.close_stdin_after_prompt = close;
163    }
164
165    /// Create transport for simple print mode (one-shot query)
166    #[allow(dead_code)]
167    pub fn for_print_mode(options: ClaudeCodeOptions, _prompt: String) -> Result<Self> {
168        let cli_path = find_claude_cli()?;
169        Ok(Self {
170            options,
171            cli_path,
172            child: None,
173            stdin_tx: None,
174            message_broadcast_tx: None,
175            control_rx: None,
176            sdk_control_rx: None,
177            state: TransportState::Disconnected,
178            request_counter: 0,
179            close_stdin_after_prompt: true,
180        })
181    }
182
183    /// Build the command with all necessary arguments
184    fn build_command(&self) -> Command {
185        let mut cmd = Command::new(&self.cli_path);
186
187        // Always use output-format stream-json and verbose (like Python SDK)
188        cmd.arg("--output-format").arg("stream-json");
189        cmd.arg("--verbose");
190
191        // For streaming/interactive mode, also add input-format stream-json
192        cmd.arg("--input-format").arg("stream-json");
193        
194        // Include partial messages if requested
195        if self.options.include_partial_messages {
196            cmd.arg("--include-partial-messages");
197        }
198        
199        // Add debug-to-stderr flag if debug_stderr is set
200        if self.options.debug_stderr.is_some() {
201            cmd.arg("--debug-to-stderr");
202        }
203        
204        // Handle max_output_tokens (priority: option > env var)
205        // Maximum safe value is 32000, values above this may cause issues
206        if let Some(max_tokens) = self.options.max_output_tokens {
207            // Option takes priority - validate and cap at 32000
208            let capped = max_tokens.clamp(1, 32000);
209            cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
210            debug!("Setting max_output_tokens from option: {}", capped);
211        } else {
212            // Fall back to environment variable handling
213            if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
214                if let Ok(tokens) = current_value.parse::<u32>() {
215                    if tokens > 32000 {
216                        warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
217                        cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
218                    }
219                    // If it's <= 32000, leave it as is
220                } else {
221                    // Invalid value, set to safe default
222                    warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
223                    cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
224                }
225            }
226        }
227
228        // System prompts - prioritize v2 API
229        if let Some(ref prompt_v2) = self.options.system_prompt_v2 {
230            match prompt_v2 {
231                crate::types::SystemPrompt::String(s) => {
232                    cmd.arg("--system-prompt").arg(s);
233                }
234                crate::types::SystemPrompt::Preset { preset, append, .. } => {
235                    // Use preset-based prompt
236                    cmd.arg("--system-prompt-preset").arg(preset);
237
238                    // Append if specified
239                    if let Some(append_text) = append {
240                        cmd.arg("--append-system-prompt").arg(append_text);
241                    }
242                }
243            }
244        } else {
245            // Fallback to deprecated fields for backward compatibility
246            #[allow(deprecated)]
247            if let Some(ref prompt) = self.options.system_prompt {
248                cmd.arg("--system-prompt").arg(prompt);
249            }
250            #[allow(deprecated)]
251            if let Some(ref prompt) = self.options.append_system_prompt {
252                cmd.arg("--append-system-prompt").arg(prompt);
253            }
254        }
255
256        // Tool configuration
257        if !self.options.allowed_tools.is_empty() {
258            cmd.arg("--allowedTools")
259                .arg(self.options.allowed_tools.join(","));
260        }
261        if !self.options.disallowed_tools.is_empty() {
262            cmd.arg("--disallowedTools")
263                .arg(self.options.disallowed_tools.join(","));
264        }
265
266        // Permission mode
267        match self.options.permission_mode {
268            PermissionMode::Default => {
269                cmd.arg("--permission-mode").arg("default");
270            }
271            PermissionMode::AcceptEdits => {
272                cmd.arg("--permission-mode").arg("acceptEdits");
273            }
274            PermissionMode::Plan => {
275                cmd.arg("--permission-mode").arg("plan");
276            }
277            PermissionMode::BypassPermissions => {
278                cmd.arg("--permission-mode").arg("bypassPermissions");
279            }
280        }
281
282        // Model
283        if let Some(ref model) = self.options.model {
284            cmd.arg("--model").arg(model);
285        }
286
287        // Permission prompt tool
288        if let Some(ref tool_name) = self.options.permission_prompt_tool_name {
289            cmd.arg("--permission-prompt-tool").arg(tool_name);
290        }
291
292        // Max turns
293        if let Some(max_turns) = self.options.max_turns {
294            cmd.arg("--max-turns").arg(max_turns.to_string());
295        }
296
297        // Note: max_thinking_tokens is not currently supported by Claude CLI
298
299        // Working directory
300        if let Some(ref cwd) = self.options.cwd {
301            cmd.current_dir(cwd);
302        }
303        
304        // Add environment variables
305        for (key, value) in &self.options.env {
306            cmd.env(key, value);
307        }
308
309        // MCP servers - use --mcp-config with JSON format like Python SDK
310        if !self.options.mcp_servers.is_empty() {
311            let mcp_config = serde_json::json!({
312                "mcpServers": self.options.mcp_servers
313            });
314            cmd.arg("--mcp-config").arg(mcp_config.to_string());
315        }
316
317        // Continue/resume
318        if self.options.continue_conversation {
319            cmd.arg("--continue");
320        }
321        if let Some(ref resume_id) = self.options.resume {
322            cmd.arg("--resume").arg(resume_id);
323        }
324
325        // Settings file
326        if let Some(ref settings) = self.options.settings {
327            cmd.arg("--settings").arg(settings);
328        }
329
330        // Additional directories
331        for dir in &self.options.add_dirs {
332            cmd.arg("--add-dir").arg(dir);
333        }
334
335        // Fork session if requested
336        if self.options.fork_session {
337            cmd.arg("--fork-session");
338        }
339
340        // Programmatic agents
341        if let Some(ref agents) = self.options.agents
342            && !agents.is_empty()
343                && let Ok(json_str) = serde_json::to_string(agents) {
344                    cmd.arg("--agents").arg(json_str);
345                }
346
347        // Setting sources (comma-separated)
348        if let Some(ref sources) = self.options.setting_sources
349            && !sources.is_empty() {
350                let value = sources.iter().map(|s| (match s { crate::types::SettingSource::User => "user", crate::types::SettingSource::Project => "project", crate::types::SettingSource::Local => "local" }).to_string()).collect::<Vec<_>>().join(",");
351                cmd.arg("--setting-sources").arg(value);
352            }
353
354        // Extra arguments
355        for (key, value) in &self.options.extra_args {
356            let flag = if key.starts_with("--") || key.starts_with("-") {
357                key.clone()
358            } else {
359                format!("--{key}")
360            };
361            cmd.arg(&flag);
362            if let Some(val) = value {
363                cmd.arg(val);
364            }
365        }
366
367        // Set up process pipes
368        cmd.stdin(Stdio::piped())
369            .stdout(Stdio::piped())
370            .stderr(Stdio::piped());
371
372        // Set environment variables to indicate SDK usage and version
373        cmd.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
374        cmd.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
375
376        cmd
377    }
378
379    /// Check CLI version and warn if below minimum required version
380    async fn check_cli_version(&self) -> Result<()> {
381        // Run the CLI with --version flag (with a timeout to avoid hanging)
382        let output = tokio::time::timeout(
383            std::time::Duration::from_secs(5),
384            tokio::process::Command::new(&self.cli_path)
385                .arg("--version")
386                .output(),
387        )
388        .await;
389
390        let output = match output {
391            Ok(Ok(output)) => output,
392            Ok(Err(e)) => {
393                warn!("Failed to check CLI version: {}", e);
394                return Ok(()); // Don't fail connection, just warn
395            }
396            Err(_) => {
397                warn!("CLI version check timed out after 5 seconds");
398                return Ok(());
399            }
400        };
401
402        let version_str = String::from_utf8_lossy(&output.stdout);
403        let version_str = version_str.trim();
404
405        if let Some(semver) = SemVer::parse(version_str) {
406            let min_version = SemVer::new(MIN_CLI_VERSION.0, MIN_CLI_VERSION.1, MIN_CLI_VERSION.2);
407
408            if semver < min_version {
409                warn!(
410                    "⚠️  Claude CLI version {}.{}.{} is below minimum required version {}.{}.{}",
411                    semver.major,
412                    semver.minor,
413                    semver.patch,
414                    MIN_CLI_VERSION.0,
415                    MIN_CLI_VERSION.1,
416                    MIN_CLI_VERSION.2
417                );
418                warn!(
419                    "   Some features may not work correctly. Please upgrade with: npm install -g @anthropic-ai/claude-code@latest"
420                );
421            } else {
422                info!("Claude CLI version: {}.{}.{}", semver.major, semver.minor, semver.patch);
423            }
424        } else {
425            debug!("Could not parse CLI version: {}", version_str);
426        }
427
428        Ok(())
429    }
430
431    /// Spawn the process and set up communication channels
432    async fn spawn_process(&mut self) -> Result<()> {
433        self.state = TransportState::Connecting;
434
435        let mut cmd = self.build_command();
436        info!("Starting Claude CLI with command: {:?}", cmd);
437
438        let mut child = cmd.spawn().map_err(|e| {
439            error!("Failed to spawn Claude CLI: {}", e);
440            SdkError::ProcessError(e)
441        })?;
442
443        // Get stdio handles
444        let stdin = child
445            .stdin
446            .take()
447            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdin".into()))?;
448        let stdout = child
449            .stdout
450            .take()
451            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdout".into()))?;
452        let stderr = child
453            .stderr
454            .take()
455            .ok_or_else(|| SdkError::ConnectionError("Failed to get stderr".into()))?;
456
457        // Determine buffer size from options or use default
458        let buffer_size = self.options.cli_channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE);
459
460        // Create channels
461        let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(buffer_size);
462        // Use broadcast channel for messages to support multiple receivers
463        let (message_broadcast_tx, _) =
464            tokio::sync::broadcast::channel::<Message>(buffer_size);
465        let (control_tx, control_rx) = mpsc::channel::<ControlResponse>(buffer_size);
466
467        // Spawn stdin handler
468        tokio::spawn(async move {
469            let mut stdin = stdin;
470            debug!("Stdin handler started");
471            while let Some(line) = stdin_rx.recv().await {
472                debug!("Received line from channel: {}", line);
473                if let Err(e) = stdin.write_all(line.as_bytes()).await {
474                    error!("Failed to write to stdin: {}", e);
475                    break;
476                }
477                if let Err(e) = stdin.write_all(b"\n").await {
478                    error!("Failed to write newline: {}", e);
479                    break;
480                }
481                if let Err(e) = stdin.flush().await {
482                    error!("Failed to flush stdin: {}", e);
483                    break;
484                }
485                debug!("Successfully sent to Claude process: {}", line);
486            }
487            debug!("Stdin handler ended");
488        });
489
490        // Create channel for SDK control requests
491        let (sdk_control_tx, sdk_control_rx) = mpsc::channel::<serde_json::Value>(buffer_size);
492        
493        // Spawn stdout handler
494        let message_broadcast_tx_clone = message_broadcast_tx.clone();
495        let control_tx_clone = control_tx.clone();
496        let sdk_control_tx_clone = sdk_control_tx.clone();
497        tokio::spawn(async move {
498            debug!("Stdout handler started");
499            let reader = BufReader::new(stdout);
500            let mut lines = reader.lines();
501
502            while let Ok(Some(line)) = lines.next_line().await {
503                if line.trim().is_empty() {
504                    continue;
505                }
506
507                debug!("Claude output: {}", line);
508
509                // Try to parse as JSON
510                match serde_json::from_str::<serde_json::Value>(&line) {
511                    Ok(json) => {
512                        // Check message type
513                        if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
514                            // Handle control responses - these are responses to OUR control requests
515                            if msg_type == "control_response" {
516                                debug!("Received control response: {:?}", json);
517
518                                // Send to sdk_control channel for control protocol mode
519                                let _ = sdk_control_tx_clone.send(json.clone()).await;
520
521                                // Also parse and send to legacy control_tx for non-control-protocol mode
522                                // (needed for interrupt functionality when query_handler is None)
523                                // CLI returns: {"type":"control_response","response":{"subtype":"success","request_id":"..."}}
524                                // or: {"type":"control_response","response":{"subtype":"error","request_id":"...","error":"..."}}
525                                if let Some(response_obj) = json.get("response")
526                                    && let Some(request_id) = response_obj.get("request_id")
527                                        .or_else(|| response_obj.get("requestId"))
528                                        .and_then(|v| v.as_str())
529                                    {
530                                        // Determine success from subtype
531                                        let subtype = response_obj.get("subtype").and_then(|v| v.as_str());
532                                        let success = subtype == Some("success");
533
534                                        let control_resp = ControlResponse::InterruptAck {
535                                            request_id: request_id.to_string(),
536                                            success,
537                                        };
538                                        let _ = control_tx_clone.send(control_resp).await;
539                                    }
540                                continue;
541                            }
542
543                            // Handle control requests FROM CLI (standard format)
544                            if msg_type == "control_request" {
545                                debug!("Received control request from CLI: {:?}", json);
546                                // Send the FULL message including requestId and request
547                                let _ = sdk_control_tx_clone.send(json.clone()).await;
548                                continue;
549                            }
550
551                            // Handle control messages (new format)
552                            if msg_type == "control"
553                                && let Some(control) = json.get("control") {
554                                    debug!("Received control message: {:?}", control);
555                                    let _ = sdk_control_tx_clone.send(control.clone()).await;
556                                    continue;
557                                }
558
559                            // Handle SDK control requests FROM CLI (legacy format)
560                            if msg_type == "sdk_control_request" {
561                                // Send the FULL message including requestId
562                                debug!("Received SDK control request (legacy): {:?}", json);
563                                let _ = sdk_control_tx_clone.send(json.clone()).await;
564                                continue;
565                            }
566                            
567                            // Check for system messages with SDK control subtypes
568                            if msg_type == "system"
569                                && let Some(subtype) = json.get("subtype").and_then(|v| v.as_str())
570                                    && subtype.starts_with("sdk_control:") {
571                                        // This is an SDK control message
572                                        debug!("Received SDK control message: {}", subtype);
573                                        let _ = sdk_control_tx_clone.send(json.clone()).await;
574                                        // Still parse as regular message for now
575                                    }
576                        }
577
578                        // Try to parse as a regular message
579                        match crate::message_parser::parse_message(json) {
580                            Ok(Some(message)) => {
581                                // Use broadcast send which doesn't fail if no receivers
582                                let _ = message_broadcast_tx_clone.send(message);
583                            }
584                            Ok(None) => {
585                                // Ignore non-message JSON
586                            }
587                            Err(e) => {
588                                warn!("Failed to parse message: {}", e);
589                            }
590                        }
591                    }
592                    Err(e) => {
593                        warn!("Failed to parse JSON: {} - Line: {}", e, line);
594                    }
595                }
596            }
597            info!("Stdout reader ended");
598        });
599
600        // Spawn stderr handler - capture error messages for better diagnostics
601        let message_broadcast_tx_for_error = message_broadcast_tx.clone();
602        let debug_stderr = self.options.debug_stderr.clone();
603        tokio::spawn(async move {
604            let reader = BufReader::new(stderr);
605            let mut lines = reader.lines();
606            let mut error_buffer = Vec::new();
607            
608            while let Ok(Some(line)) = lines.next_line().await {
609                if !line.trim().is_empty() {
610                    // If debug_stderr is set, write to it
611                    if let Some(ref debug_output) = debug_stderr {
612                        let mut output = debug_output.lock().await;
613                        let _ = writeln!(output, "{line}");
614                        let _ = output.flush();
615                    }
616                    
617                    error!("Claude CLI stderr: {}", line);
618                    error_buffer.push(line.clone());
619                    
620                    // Check for common error patterns
621                    if line.contains("command not found") || line.contains("No such file") {
622                        error!("Claude CLI binary not found or not executable");
623                    } else if line.contains("ENOENT") || line.contains("spawn") {
624                        error!("Failed to spawn Claude CLI process - binary may not be installed");
625                    } else if line.contains("authentication") || line.contains("API key") || line.contains("Unauthorized") {
626                        error!("Claude CLI authentication error - please run 'claude-code api login'");
627                    } else if line.contains("model") && (line.contains("not available") || line.contains("not found")) {
628                        error!("Model not available for your account: {}", line);
629                    } else if line.contains("Error:") || line.contains("error:") {
630                        error!("Claude CLI error detected: {}", line);
631                    }
632                }
633            }
634            
635            // If we collected any errors, log them
636            if !error_buffer.is_empty() {
637                let error_msg = error_buffer.join("\n");
638                error!("Claude CLI stderr output collected:\n{}", error_msg);
639                
640                // Try to send an error message through the broadcast channel
641                let _ = message_broadcast_tx_for_error.send(Message::System {
642                    subtype: "error".to_string(),
643                    data: serde_json::json!({
644                        "source": "stderr",
645                        "error": "Claude CLI error output",
646                        "details": error_msg
647                    }),
648                });
649            }
650        });
651
652        // Store handles
653        self.child = Some(child);
654        self.stdin_tx = Some(stdin_tx);
655        self.message_broadcast_tx = Some(message_broadcast_tx);
656        self.control_rx = Some(control_rx);
657        self.sdk_control_rx = Some(sdk_control_rx);
658        self.state = TransportState::Connected;
659
660        Ok(())
661    }
662}
663
664#[async_trait]
665impl Transport for SubprocessTransport {
666    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
667        self
668    }
669    
670    async fn connect(&mut self) -> Result<()> {
671        if self.state == TransportState::Connected {
672            return Ok(());
673        }
674
675        // Check CLI version before connecting
676        if let Err(e) = self.check_cli_version().await {
677            warn!("CLI version check failed: {}", e);
678        }
679
680        self.spawn_process().await?;
681        info!("Connected to Claude CLI");
682        Ok(())
683    }
684
685    async fn send_message(&mut self, message: InputMessage) -> Result<()> {
686        if self.state != TransportState::Connected {
687            return Err(SdkError::InvalidState {
688                message: "Not connected".into(),
689            });
690        }
691
692        let json = serde_json::to_string(&message)?;
693        debug!("Serialized message: {}", json);
694
695        if let Some(ref tx) = self.stdin_tx {
696            debug!("Sending message to stdin channel");
697            tx.send(json).await?;
698            debug!("Message sent to channel");
699            Ok(())
700        } else {
701            Err(SdkError::InvalidState {
702                message: "Stdin channel not available".into(),
703            })
704        }
705    }
706
707    fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>> {
708        if let Some(ref tx) = self.message_broadcast_tx {
709            // Create a new receiver from the broadcast sender
710            let rx = tx.subscribe();
711            // Convert broadcast receiver to stream
712            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
713                |result| async move {
714                    match result {
715                        Ok(msg) => Some(Ok(msg)),
716                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(
717                            n,
718                        )) => {
719                            warn!("Receiver lagged by {} messages", n);
720                            None
721                        }
722                    }
723                },
724            ))
725        } else {
726            Box::pin(futures::stream::empty())
727        }
728    }
729
730    async fn send_control_request(&mut self, request: ControlRequest) -> Result<()> {
731        if self.state != TransportState::Connected {
732            return Err(SdkError::InvalidState {
733                message: "Not connected".into(),
734            });
735        }
736
737        self.request_counter += 1;
738        let control_msg = match request {
739            ControlRequest::Interrupt { request_id } => {
740                serde_json::json!({
741                    "type": "control_request",
742                    "request": {
743                        "type": "interrupt",
744                        "request_id": request_id
745                    }
746                })
747            }
748        };
749
750        let json = serde_json::to_string(&control_msg)?;
751
752        if let Some(ref tx) = self.stdin_tx {
753            tx.send(json).await?;
754            Ok(())
755        } else {
756            Err(SdkError::InvalidState {
757                message: "Stdin channel not available".into(),
758            })
759        }
760    }
761
762    async fn receive_control_response(&mut self) -> Result<Option<ControlResponse>> {
763        if let Some(ref mut rx) = self.control_rx {
764            Ok(rx.recv().await)
765        } else {
766            Ok(None)
767        }
768    }
769    
770    async fn send_sdk_control_request(&mut self, request: serde_json::Value) -> Result<()> {
771        // The request is already properly formatted as {"type": "control_request", ...}
772        // Just send it directly without wrapping
773        let json = serde_json::to_string(&request)?;
774
775        if let Some(ref tx) = self.stdin_tx {
776            tx.send(json).await?;
777            Ok(())
778        } else {
779            Err(SdkError::InvalidState {
780                message: "Stdin channel not available".into(),
781            })
782        }
783    }
784    
785    async fn send_sdk_control_response(&mut self, response: serde_json::Value) -> Result<()> {
786        // Wrap the response in control_response format expected by CLI
787        // The response should have: {"type": "control_response", "response": {...}}
788        let control_response = serde_json::json!({
789            "type": "control_response",
790            "response": response
791        });
792
793        let json = serde_json::to_string(&control_response)?;
794
795        if let Some(ref tx) = self.stdin_tx {
796            tx.send(json).await?;
797            Ok(())
798        } else {
799            Err(SdkError::InvalidState {
800                message: "Stdin channel not available".into(),
801            })
802        }
803    }
804
805    fn is_connected(&self) -> bool {
806        self.state == TransportState::Connected
807    }
808
809    async fn disconnect(&mut self) -> Result<()> {
810        if self.state != TransportState::Connected {
811            return Ok(());
812        }
813
814        self.state = TransportState::Disconnecting;
815
816        // Close stdin channel
817        self.stdin_tx.take();
818
819        // Kill the child process
820        if let Some(mut child) = self.child.take() {
821            match child.kill().await {
822                Ok(()) => info!("Claude CLI process terminated"),
823                Err(e) => warn!("Failed to kill Claude CLI process: {}", e),
824            }
825        }
826
827        self.state = TransportState::Disconnected;
828        Ok(())
829    }
830
831    fn take_sdk_control_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<serde_json::Value>> {
832        self.sdk_control_rx.take()
833    }
834
835    async fn end_input(&mut self) -> Result<()> {
836        // Close stdin channel to signal end of input
837        self.stdin_tx.take();
838        Ok(())
839    }
840}
841
842impl Drop for SubprocessTransport {
843    fn drop(&mut self) {
844        if let Some(mut child) = self.child.take() {
845            // Try to kill the process
846            let _ = child.start_kill();
847        }
848    }
849}
850
851/// Find the Claude CLI binary
852pub(crate) fn find_claude_cli() -> Result<PathBuf> {
853    // First check if it's in PATH - try both 'claude' and 'claude-code'
854    for cmd_name in &["claude", "claude-code"] {
855        if let Ok(path) = which::which(cmd_name) {
856            debug!("Found Claude CLI at: {}", path.display());
857            return Ok(path);
858        }
859    }
860
861    // Check common installation locations
862    let home = dirs::home_dir().ok_or_else(|| SdkError::CliNotFound {
863        searched_paths: "Unable to determine home directory".into(),
864    })?;
865
866    let locations = vec![
867        // npm global installations
868        home.join(".npm-global/bin/claude"),
869        home.join(".npm-global/bin/claude-code"),
870        PathBuf::from("/usr/local/bin/claude"),
871        PathBuf::from("/usr/local/bin/claude-code"),
872        // Local installations
873        home.join(".local/bin/claude"),
874        home.join(".local/bin/claude-code"),
875        home.join("node_modules/.bin/claude"),
876        home.join("node_modules/.bin/claude-code"),
877        // Yarn installations
878        home.join(".yarn/bin/claude"),
879        home.join(".yarn/bin/claude-code"),
880        // macOS specific npm location
881        PathBuf::from("/opt/homebrew/bin/claude"),
882        PathBuf::from("/opt/homebrew/bin/claude-code"),
883    ];
884
885    let mut searched = Vec::new();
886    for path in &locations {
887        searched.push(path.display().to_string());
888        if path.exists() && path.is_file() {
889            debug!("Found Claude CLI at: {}", path.display());
890            return Ok(path.clone());
891        }
892    }
893
894    // Log detailed error information
895    warn!("Claude CLI not found in any standard location");
896    warn!("Searched paths: {:?}", searched);
897
898    // Check if Node.js is installed
899    if which::which("node").is_err() && which::which("npm").is_err() {
900        error!("Node.js/npm not found - Claude CLI requires Node.js");
901        return Err(SdkError::CliNotFound {
902            searched_paths: format!(
903                "Node.js is not installed. Install from https://nodejs.org/\n\nSearched in:\n{}",
904                searched.join("\n")
905            ),
906        });
907    }
908
909    Err(SdkError::CliNotFound {
910        searched_paths: format!(
911            "Claude CLI not found. Install with:\n  npm install -g @anthropic-ai/claude-code\n\nSearched in:\n{}",
912            searched.join("\n")
913        ),
914    })
915}
916
917#[cfg(test)]
918mod tests {
919    use super::*;
920
921    #[test]
922    fn test_find_claude_cli_error_message() {
923        // Test error message format without relying on CLI not being found
924        let error = SdkError::CliNotFound {
925            searched_paths: "test paths".to_string(),
926        };
927        let error_msg = error.to_string();
928        assert!(error_msg.contains("npm install -g @anthropic-ai/claude-code"));
929        assert!(error_msg.contains("test paths"));
930    }
931
932    #[tokio::test]
933    async fn test_transport_lifecycle() {
934        let options = ClaudeCodeOptions::default();
935        let transport = SubprocessTransport::new(options).unwrap_or_else(|_| {
936            // Use a dummy path for testing
937            SubprocessTransport::with_cli_path(ClaudeCodeOptions::default(), "/usr/bin/true")
938        });
939
940        assert!(!transport.is_connected());
941        assert_eq!(transport.state, TransportState::Disconnected);
942    }
943
944    #[test]
945    fn test_semver_parse() {
946        // Test basic version parsing
947        let v = SemVer::parse("2.0.0").unwrap();
948        assert_eq!(v.major, 2);
949        assert_eq!(v.minor, 0);
950        assert_eq!(v.patch, 0);
951
952        // Test with 'v' prefix
953        let v = SemVer::parse("v2.1.3").unwrap();
954        assert_eq!(v.major, 2);
955        assert_eq!(v.minor, 1);
956        assert_eq!(v.patch, 3);
957
958        // Test npm-style version
959        let v = SemVer::parse("@anthropic-ai/claude-code/2.5.1").unwrap();
960        assert_eq!(v.major, 2);
961        assert_eq!(v.minor, 5);
962        assert_eq!(v.patch, 1);
963
964        // Test version without patch
965        let v = SemVer::parse("2.1").unwrap();
966        assert_eq!(v.major, 2);
967        assert_eq!(v.minor, 1);
968        assert_eq!(v.patch, 0);
969    }
970
971    #[test]
972    fn test_semver_compare() {
973        let v1 = SemVer::new(2, 0, 0);
974        let v2 = SemVer::new(2, 0, 1);
975        let v3 = SemVer::new(2, 1, 0);
976        let v4 = SemVer::new(3, 0, 0);
977
978        assert!(v1 < v2);
979        assert!(v2 < v3);
980        assert!(v3 < v4);
981        assert!(v1 < v4);
982
983        let min_version = SemVer::new(2, 0, 0);
984        assert!(SemVer::new(1, 9, 9) < min_version);
985        assert!(SemVer::new(2, 0, 0) >= min_version);
986        assert!(SemVer::new(2, 1, 0) >= min_version);
987    }
988}