cc_sdk/transport/
subprocess.rs

1//! Subprocess-based transport implementation
2//!
3//! This module implements the Transport trait using a subprocess to run the Claude CLI.
4
5use super::{InputMessage, Transport, TransportState};
6use crate::{
7    errors::{Result, SdkError},
8    types::{ClaudeCodeOptions, ControlRequest, ControlResponse, Message, PermissionMode},
9};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use std::path::PathBuf;
13use std::pin::Pin;
14use std::process::Stdio;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, warn};
19
20/// Default buffer size for channels
21const CHANNEL_BUFFER_SIZE: usize = 100;
22
23/// Subprocess-based transport for Claude CLI
24pub struct SubprocessTransport {
25    /// Configuration options
26    options: ClaudeCodeOptions,
27    /// CLI binary path
28    cli_path: PathBuf,
29    /// Child process
30    child: Option<Child>,
31    /// Sender for stdin
32    stdin_tx: Option<mpsc::Sender<String>>,
33    /// Sender for broadcasting messages to multiple receivers
34    message_broadcast_tx: Option<tokio::sync::broadcast::Sender<Message>>,
35    /// Receiver for control responses
36    control_rx: Option<mpsc::Receiver<ControlResponse>>,
37    /// Receiver for SDK control requests
38    sdk_control_rx: Option<mpsc::Receiver<serde_json::Value>>,
39    /// Transport state
40    state: TransportState,
41    /// Request counter for control requests
42    request_counter: u64,
43    /// Whether to close stdin after initial prompt
44    #[allow(dead_code)]
45    close_stdin_after_prompt: bool,
46}
47
48impl SubprocessTransport {
49    /// Create a new subprocess transport
50    pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
51        let cli_path = find_claude_cli()?;
52        Ok(Self {
53            options,
54            cli_path,
55            child: None,
56            stdin_tx: None,
57            message_broadcast_tx: None,
58            control_rx: None,
59            sdk_control_rx: None,
60            state: TransportState::Disconnected,
61            request_counter: 0,
62            close_stdin_after_prompt: false,
63        })
64    }
65    
66    /// Subscribe to messages without borrowing self (for lock-free consumption)
67    pub fn subscribe_messages(&self) -> Option<Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>> {
68        self.message_broadcast_tx.as_ref().map(|tx| {
69            let rx = tx.subscribe();
70            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
71                |result| async move {
72                    match result {
73                        Ok(msg) => Some(Ok(msg)),
74                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
75                            warn!("Receiver lagged by {} messages", n);
76                            None
77                        }
78                    }
79                },
80            )) as Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>
81        })
82    }
83
84    /// Receive SDK control requests
85    #[allow(dead_code)]
86    pub async fn receive_sdk_control_request(&mut self) -> Option<serde_json::Value> {
87        if let Some(ref mut rx) = self.sdk_control_rx {
88            rx.recv().await
89        } else {
90            None
91        }
92    }
93    
94    /// Take the SDK control receiver (can only be called once)
95    pub fn take_sdk_control_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
96        self.sdk_control_rx.take()
97    }
98
99    /// Create with a specific CLI path
100    pub fn with_cli_path(options: ClaudeCodeOptions, cli_path: impl Into<PathBuf>) -> Self {
101        Self {
102            options,
103            cli_path: cli_path.into(),
104            child: None,
105            stdin_tx: None,
106            message_broadcast_tx: None,
107            control_rx: None,
108            sdk_control_rx: None,
109            state: TransportState::Disconnected,
110            request_counter: 0,
111            close_stdin_after_prompt: false,
112        }
113    }
114
115    /// Set whether to close stdin after sending the initial prompt
116    #[allow(dead_code)]
117    pub fn set_close_stdin_after_prompt(&mut self, close: bool) {
118        self.close_stdin_after_prompt = close;
119    }
120
121    /// Create transport for simple print mode (one-shot query)
122    #[allow(dead_code)]
123    pub fn for_print_mode(options: ClaudeCodeOptions, _prompt: String) -> Result<Self> {
124        let cli_path = find_claude_cli()?;
125        Ok(Self {
126            options,
127            cli_path,
128            child: None,
129            stdin_tx: None,
130            message_broadcast_tx: None,
131            control_rx: None,
132            sdk_control_rx: None,
133            state: TransportState::Disconnected,
134            request_counter: 0,
135            close_stdin_after_prompt: true,
136        })
137    }
138
139    /// Build the command with all necessary arguments
140    fn build_command(&self) -> Command {
141        let mut cmd = Command::new(&self.cli_path);
142
143        // Always use output-format stream-json and verbose (like Python SDK)
144        cmd.arg("--output-format").arg("stream-json");
145        cmd.arg("--verbose");
146
147        // For streaming/interactive mode, also add input-format stream-json
148        cmd.arg("--input-format").arg("stream-json");
149        
150        // Include partial messages if requested
151        if self.options.include_partial_messages {
152            cmd.arg("--include-partial-messages");
153        }
154        
155        // Add debug-to-stderr flag if debug_stderr is set
156        if self.options.debug_stderr.is_some() {
157            cmd.arg("--debug-to-stderr");
158        }
159        
160        // Handle max_output_tokens (priority: option > env var)
161        // Maximum safe value is 32000, values above this may cause issues
162        if let Some(max_tokens) = self.options.max_output_tokens {
163            // Option takes priority - validate and cap at 32000
164            let capped = max_tokens.min(32000).max(1);
165            cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
166            debug!("Setting max_output_tokens from option: {}", capped);
167        } else {
168            // Fall back to environment variable handling
169            if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
170                if let Ok(tokens) = current_value.parse::<u32>() {
171                    if tokens > 32000 {
172                        warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
173                        cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
174                    }
175                    // If it's <= 32000, leave it as is
176                } else {
177                    // Invalid value, set to safe default
178                    warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
179                    cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
180                }
181            }
182        }
183
184        // System prompts - prioritize v2 API
185        if let Some(ref prompt_v2) = self.options.system_prompt_v2 {
186            match prompt_v2 {
187                crate::types::SystemPrompt::String(s) => {
188                    cmd.arg("--system-prompt").arg(s);
189                }
190                crate::types::SystemPrompt::Preset { preset, append, .. } => {
191                    // Use preset-based prompt
192                    cmd.arg("--system-prompt-preset").arg(preset);
193
194                    // Append if specified
195                    if let Some(append_text) = append {
196                        cmd.arg("--append-system-prompt").arg(append_text);
197                    }
198                }
199            }
200        } else {
201            // Fallback to deprecated fields for backward compatibility
202            #[allow(deprecated)]
203            if let Some(ref prompt) = self.options.system_prompt {
204                cmd.arg("--system-prompt").arg(prompt);
205            }
206            #[allow(deprecated)]
207            if let Some(ref prompt) = self.options.append_system_prompt {
208                cmd.arg("--append-system-prompt").arg(prompt);
209            }
210        }
211
212        // Tool configuration
213        if !self.options.allowed_tools.is_empty() {
214            cmd.arg("--allowedTools")
215                .arg(self.options.allowed_tools.join(","));
216        }
217        if !self.options.disallowed_tools.is_empty() {
218            cmd.arg("--disallowedTools")
219                .arg(self.options.disallowed_tools.join(","));
220        }
221
222        // Permission mode
223        match self.options.permission_mode {
224            PermissionMode::Default => {
225                cmd.arg("--permission-mode").arg("default");
226            }
227            PermissionMode::AcceptEdits => {
228                cmd.arg("--permission-mode").arg("acceptEdits");
229            }
230            PermissionMode::Plan => {
231                cmd.arg("--permission-mode").arg("plan");
232            }
233            PermissionMode::BypassPermissions => {
234                cmd.arg("--permission-mode").arg("bypassPermissions");
235            }
236        }
237
238        // Model
239        if let Some(ref model) = self.options.model {
240            cmd.arg("--model").arg(model);
241        }
242
243        // Permission prompt tool
244        if let Some(ref tool_name) = self.options.permission_prompt_tool_name {
245            cmd.arg("--permission-prompt-tool").arg(tool_name);
246        }
247
248        // Max turns
249        if let Some(max_turns) = self.options.max_turns {
250            cmd.arg("--max-turns").arg(max_turns.to_string());
251        }
252
253        // Note: max_thinking_tokens is not currently supported by Claude CLI
254
255        // Working directory
256        if let Some(ref cwd) = self.options.cwd {
257            cmd.current_dir(cwd);
258        }
259        
260        // Add environment variables
261        for (key, value) in &self.options.env {
262            cmd.env(key, value);
263        }
264
265        // MCP servers - use --mcp-config with JSON format like Python SDK
266        if !self.options.mcp_servers.is_empty() {
267            let mcp_config = serde_json::json!({
268                "mcpServers": self.options.mcp_servers
269            });
270            cmd.arg("--mcp-config").arg(mcp_config.to_string());
271        }
272
273        // Continue/resume
274        if self.options.continue_conversation {
275            cmd.arg("--continue");
276        }
277        if let Some(ref resume_id) = self.options.resume {
278            cmd.arg("--resume").arg(resume_id);
279        }
280
281        // Settings file
282        if let Some(ref settings) = self.options.settings {
283            cmd.arg("--settings").arg(settings);
284        }
285
286        // Additional directories
287        for dir in &self.options.add_dirs {
288            cmd.arg("--add-dir").arg(dir);
289        }
290
291        // Fork session if requested
292        if self.options.fork_session {
293            cmd.arg("--fork-session");
294        }
295
296        // Programmatic agents
297        if let Some(ref agents) = self.options.agents {
298            if !agents.is_empty() {
299                if let Ok(json_str) = serde_json::to_string(agents) {
300                    cmd.arg("--agents").arg(json_str);
301                }
302            }
303        }
304
305        // Setting sources (comma-separated)
306        if let Some(ref sources) = self.options.setting_sources {
307            if !sources.is_empty() {
308                let value = sources.iter().map(|s| format!("{}", match s { crate::types::SettingSource::User => "user", crate::types::SettingSource::Project => "project", crate::types::SettingSource::Local => "local" })).collect::<Vec<_>>().join(",");
309                cmd.arg("--setting-sources").arg(value);
310            }
311        }
312
313        // Extra arguments
314        for (key, value) in &self.options.extra_args {
315            let flag = if key.starts_with("--") || key.starts_with("-") {
316                key.clone()
317            } else {
318                format!("--{key}")
319            };
320            cmd.arg(&flag);
321            if let Some(val) = value {
322                cmd.arg(val);
323            }
324        }
325
326        // Set up process pipes
327        cmd.stdin(Stdio::piped())
328            .stdout(Stdio::piped())
329            .stderr(Stdio::piped());
330
331        // Set environment variables to indicate SDK usage and version
332        cmd.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
333        cmd.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
334
335        cmd
336    }
337
338    /// Spawn the process and set up communication channels
339    async fn spawn_process(&mut self) -> Result<()> {
340        self.state = TransportState::Connecting;
341
342        let mut cmd = self.build_command();
343        info!("Starting Claude CLI with command: {:?}", cmd);
344
345        let mut child = cmd.spawn().map_err(|e| {
346            error!("Failed to spawn Claude CLI: {}", e);
347            SdkError::ProcessError(e)
348        })?;
349
350        // Get stdio handles
351        let stdin = child
352            .stdin
353            .take()
354            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdin".into()))?;
355        let stdout = child
356            .stdout
357            .take()
358            .ok_or_else(|| SdkError::ConnectionError("Failed to get stdout".into()))?;
359        let stderr = child
360            .stderr
361            .take()
362            .ok_or_else(|| SdkError::ConnectionError("Failed to get stderr".into()))?;
363
364        // Create channels
365        let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(CHANNEL_BUFFER_SIZE);
366        // Use broadcast channel for messages to support multiple receivers
367        let (message_broadcast_tx, _) =
368            tokio::sync::broadcast::channel::<Message>(CHANNEL_BUFFER_SIZE);
369        let (control_tx, control_rx) = mpsc::channel::<ControlResponse>(CHANNEL_BUFFER_SIZE);
370
371        // Spawn stdin handler
372        tokio::spawn(async move {
373            let mut stdin = stdin;
374            debug!("Stdin handler started");
375            while let Some(line) = stdin_rx.recv().await {
376                debug!("Received line from channel: {}", line);
377                if let Err(e) = stdin.write_all(line.as_bytes()).await {
378                    error!("Failed to write to stdin: {}", e);
379                    break;
380                }
381                if let Err(e) = stdin.write_all(b"\n").await {
382                    error!("Failed to write newline: {}", e);
383                    break;
384                }
385                if let Err(e) = stdin.flush().await {
386                    error!("Failed to flush stdin: {}", e);
387                    break;
388                }
389                debug!("Successfully sent to Claude process: {}", line);
390            }
391            debug!("Stdin handler ended");
392        });
393
394        // Create channel for SDK control requests
395        let (sdk_control_tx, sdk_control_rx) = mpsc::channel::<serde_json::Value>(CHANNEL_BUFFER_SIZE);
396        
397        // Spawn stdout handler
398        let message_broadcast_tx_clone = message_broadcast_tx.clone();
399        let control_tx_clone = control_tx.clone();
400        let sdk_control_tx_clone = sdk_control_tx.clone();
401        tokio::spawn(async move {
402            debug!("Stdout handler started");
403            let reader = BufReader::new(stdout);
404            let mut lines = reader.lines();
405
406            while let Ok(Some(line)) = lines.next_line().await {
407                if line.trim().is_empty() {
408                    continue;
409                }
410
411                debug!("Claude output: {}", line);
412
413                // Try to parse as JSON
414                match serde_json::from_str::<serde_json::Value>(&line) {
415                    Ok(json) => {
416                        // Check message type
417                        if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
418                            // Handle control responses - these are responses to OUR control requests
419                            if msg_type == "control_response" {
420                                debug!("Received control response: {:?}", json);
421
422                                // Send to sdk_control channel for control protocol mode
423                                let _ = sdk_control_tx_clone.send(json.clone()).await;
424
425                                // Also parse and send to legacy control_tx for non-control-protocol mode
426                                // (needed for interrupt functionality when query_handler is None)
427                                // CLI returns: {"type":"control_response","response":{"subtype":"success","request_id":"..."}}
428                                // or: {"type":"control_response","response":{"subtype":"error","request_id":"...","error":"..."}}
429                                if let Some(response_obj) = json.get("response") {
430                                    if let Some(request_id) = response_obj.get("request_id")
431                                        .or_else(|| response_obj.get("requestId"))
432                                        .and_then(|v| v.as_str())
433                                    {
434                                        // Determine success from subtype
435                                        let subtype = response_obj.get("subtype").and_then(|v| v.as_str());
436                                        let success = subtype == Some("success");
437
438                                        let control_resp = ControlResponse::InterruptAck {
439                                            request_id: request_id.to_string(),
440                                            success,
441                                        };
442                                        let _ = control_tx_clone.send(control_resp).await;
443                                    }
444                                }
445                                continue;
446                            }
447
448                            // Handle control requests FROM CLI (standard format)
449                            if msg_type == "control_request" {
450                                debug!("Received control request from CLI: {:?}", json);
451                                // Send the FULL message including requestId and request
452                                let _ = sdk_control_tx_clone.send(json.clone()).await;
453                                continue;
454                            }
455
456                            // Handle control messages (new format)
457                            if msg_type == "control" {
458                                if let Some(control) = json.get("control") {
459                                    debug!("Received control message: {:?}", control);
460                                    let _ = sdk_control_tx_clone.send(control.clone()).await;
461                                    continue;
462                                }
463                            }
464
465                            // Handle SDK control requests FROM CLI (legacy format)
466                            if msg_type == "sdk_control_request" {
467                                // Send the FULL message including requestId
468                                debug!("Received SDK control request (legacy): {:?}", json);
469                                let _ = sdk_control_tx_clone.send(json.clone()).await;
470                                continue;
471                            }
472                            
473                            // Check for system messages with SDK control subtypes
474                            if msg_type == "system" {
475                                if let Some(subtype) = json.get("subtype").and_then(|v| v.as_str()) {
476                                    if subtype.starts_with("sdk_control:") {
477                                        // This is an SDK control message
478                                        debug!("Received SDK control message: {}", subtype);
479                                        let _ = sdk_control_tx_clone.send(json.clone()).await;
480                                        // Still parse as regular message for now
481                                    }
482                                }
483                            }
484                        }
485
486                        // Try to parse as a regular message
487                        match crate::message_parser::parse_message(json) {
488                            Ok(Some(message)) => {
489                                // Use broadcast send which doesn't fail if no receivers
490                                let _ = message_broadcast_tx_clone.send(message);
491                            }
492                            Ok(None) => {
493                                // Ignore non-message JSON
494                            }
495                            Err(e) => {
496                                warn!("Failed to parse message: {}", e);
497                            }
498                        }
499                    }
500                    Err(e) => {
501                        warn!("Failed to parse JSON: {} - Line: {}", e, line);
502                    }
503                }
504            }
505            info!("Stdout reader ended");
506        });
507
508        // Spawn stderr handler - capture error messages for better diagnostics
509        let message_broadcast_tx_for_error = message_broadcast_tx.clone();
510        let debug_stderr = self.options.debug_stderr.clone();
511        tokio::spawn(async move {
512            let reader = BufReader::new(stderr);
513            let mut lines = reader.lines();
514            let mut error_buffer = Vec::new();
515            
516            while let Ok(Some(line)) = lines.next_line().await {
517                if !line.trim().is_empty() {
518                    // If debug_stderr is set, write to it
519                    if let Some(ref debug_output) = debug_stderr {
520                        let mut output = debug_output.lock().await;
521                        let _ = writeln!(output, "{}", line);
522                        let _ = output.flush();
523                    }
524                    
525                    error!("Claude CLI stderr: {}", line);
526                    error_buffer.push(line.clone());
527                    
528                    // Check for common error patterns
529                    if line.contains("command not found") || line.contains("No such file") {
530                        error!("Claude CLI binary not found or not executable");
531                    } else if line.contains("ENOENT") || line.contains("spawn") {
532                        error!("Failed to spawn Claude CLI process - binary may not be installed");
533                    } else if line.contains("authentication") || line.contains("API key") || line.contains("Unauthorized") {
534                        error!("Claude CLI authentication error - please run 'claude-code api login'");
535                    } else if line.contains("model") && (line.contains("not available") || line.contains("not found")) {
536                        error!("Model not available for your account: {}", line);
537                    } else if line.contains("Error:") || line.contains("error:") {
538                        error!("Claude CLI error detected: {}", line);
539                    }
540                }
541            }
542            
543            // If we collected any errors, log them
544            if !error_buffer.is_empty() {
545                let error_msg = error_buffer.join("\n");
546                error!("Claude CLI stderr output collected:\n{}", error_msg);
547                
548                // Try to send an error message through the broadcast channel
549                let _ = message_broadcast_tx_for_error.send(Message::System {
550                    subtype: "error".to_string(),
551                    data: serde_json::json!({
552                        "source": "stderr",
553                        "error": "Claude CLI error output",
554                        "details": error_msg
555                    }),
556                });
557            }
558        });
559
560        // Store handles
561        self.child = Some(child);
562        self.stdin_tx = Some(stdin_tx);
563        self.message_broadcast_tx = Some(message_broadcast_tx);
564        self.control_rx = Some(control_rx);
565        self.sdk_control_rx = Some(sdk_control_rx);
566        self.state = TransportState::Connected;
567
568        Ok(())
569    }
570}
571
572#[async_trait]
573impl Transport for SubprocessTransport {
574    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
575        self
576    }
577    
578    async fn connect(&mut self) -> Result<()> {
579        if self.state == TransportState::Connected {
580            return Ok(());
581        }
582
583        self.spawn_process().await?;
584        info!("Connected to Claude CLI");
585        Ok(())
586    }
587
588    async fn send_message(&mut self, message: InputMessage) -> Result<()> {
589        if self.state != TransportState::Connected {
590            return Err(SdkError::InvalidState {
591                message: "Not connected".into(),
592            });
593        }
594
595        let json = serde_json::to_string(&message)?;
596        debug!("Serialized message: {}", json);
597
598        if let Some(ref tx) = self.stdin_tx {
599            debug!("Sending message to stdin channel");
600            tx.send(json).await?;
601            debug!("Message sent to channel");
602            Ok(())
603        } else {
604            Err(SdkError::InvalidState {
605                message: "Stdin channel not available".into(),
606            })
607        }
608    }
609
610    fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>> {
611        if let Some(ref tx) = self.message_broadcast_tx {
612            // Create a new receiver from the broadcast sender
613            let rx = tx.subscribe();
614            // Convert broadcast receiver to stream
615            Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
616                |result| async move {
617                    match result {
618                        Ok(msg) => Some(Ok(msg)),
619                        Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(
620                            n,
621                        )) => {
622                            warn!("Receiver lagged by {} messages", n);
623                            None
624                        }
625                    }
626                },
627            ))
628        } else {
629            Box::pin(futures::stream::empty())
630        }
631    }
632
633    async fn send_control_request(&mut self, request: ControlRequest) -> Result<()> {
634        if self.state != TransportState::Connected {
635            return Err(SdkError::InvalidState {
636                message: "Not connected".into(),
637            });
638        }
639
640        self.request_counter += 1;
641        let control_msg = match request {
642            ControlRequest::Interrupt { request_id } => {
643                serde_json::json!({
644                    "type": "control_request",
645                    "request": {
646                        "type": "interrupt",
647                        "request_id": request_id
648                    }
649                })
650            }
651        };
652
653        let json = serde_json::to_string(&control_msg)?;
654
655        if let Some(ref tx) = self.stdin_tx {
656            tx.send(json).await?;
657            Ok(())
658        } else {
659            Err(SdkError::InvalidState {
660                message: "Stdin channel not available".into(),
661            })
662        }
663    }
664
665    async fn receive_control_response(&mut self) -> Result<Option<ControlResponse>> {
666        if let Some(ref mut rx) = self.control_rx {
667            Ok(rx.recv().await)
668        } else {
669            Ok(None)
670        }
671    }
672    
673    async fn send_sdk_control_request(&mut self, request: serde_json::Value) -> Result<()> {
674        // The request is already properly formatted as {"type": "control_request", ...}
675        // Just send it directly without wrapping
676        let json = serde_json::to_string(&request)?;
677
678        if let Some(ref tx) = self.stdin_tx {
679            tx.send(json).await?;
680            Ok(())
681        } else {
682            Err(SdkError::InvalidState {
683                message: "Stdin channel not available".into(),
684            })
685        }
686    }
687    
688    async fn send_sdk_control_response(&mut self, response: serde_json::Value) -> Result<()> {
689        // Wrap the response in control_response format expected by CLI
690        // The response should have: {"type": "control_response", "response": {...}}
691        let control_response = serde_json::json!({
692            "type": "control_response",
693            "response": response
694        });
695
696        let json = serde_json::to_string(&control_response)?;
697
698        if let Some(ref tx) = self.stdin_tx {
699            tx.send(json).await?;
700            Ok(())
701        } else {
702            Err(SdkError::InvalidState {
703                message: "Stdin channel not available".into(),
704            })
705        }
706    }
707
708    fn is_connected(&self) -> bool {
709        self.state == TransportState::Connected
710    }
711
712    async fn disconnect(&mut self) -> Result<()> {
713        if self.state != TransportState::Connected {
714            return Ok(());
715        }
716
717        self.state = TransportState::Disconnecting;
718
719        // Close stdin channel
720        self.stdin_tx.take();
721
722        // Kill the child process
723        if let Some(mut child) = self.child.take() {
724            match child.kill().await {
725                Ok(()) => info!("Claude CLI process terminated"),
726                Err(e) => warn!("Failed to kill Claude CLI process: {}", e),
727            }
728        }
729
730        self.state = TransportState::Disconnected;
731        Ok(())
732    }
733
734    fn take_sdk_control_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<serde_json::Value>> {
735        self.sdk_control_rx.take()
736    }
737
738    async fn end_input(&mut self) -> Result<()> {
739        // Close stdin channel to signal end of input
740        self.stdin_tx.take();
741        Ok(())
742    }
743}
744
745impl Drop for SubprocessTransport {
746    fn drop(&mut self) {
747        if let Some(mut child) = self.child.take() {
748            // Try to kill the process
749            let _ = child.start_kill();
750        }
751    }
752}
753
754/// Find the Claude CLI binary
755pub(crate) fn find_claude_cli() -> Result<PathBuf> {
756    // First check if it's in PATH - try both 'claude' and 'claude-code'
757    for cmd_name in &["claude", "claude-code"] {
758        if let Ok(path) = which::which(cmd_name) {
759            debug!("Found Claude CLI at: {}", path.display());
760            return Ok(path);
761        }
762    }
763
764    // Check common installation locations
765    let home = dirs::home_dir().ok_or_else(|| SdkError::CliNotFound {
766        searched_paths: "Unable to determine home directory".into(),
767    })?;
768
769    let locations = vec![
770        // npm global installations
771        home.join(".npm-global/bin/claude"),
772        home.join(".npm-global/bin/claude-code"),
773        PathBuf::from("/usr/local/bin/claude"),
774        PathBuf::from("/usr/local/bin/claude-code"),
775        // Local installations
776        home.join(".local/bin/claude"),
777        home.join(".local/bin/claude-code"),
778        home.join("node_modules/.bin/claude"),
779        home.join("node_modules/.bin/claude-code"),
780        // Yarn installations
781        home.join(".yarn/bin/claude"),
782        home.join(".yarn/bin/claude-code"),
783        // macOS specific npm location
784        PathBuf::from("/opt/homebrew/bin/claude"),
785        PathBuf::from("/opt/homebrew/bin/claude-code"),
786    ];
787
788    let mut searched = Vec::new();
789    for path in &locations {
790        searched.push(path.display().to_string());
791        if path.exists() && path.is_file() {
792            debug!("Found Claude CLI at: {}", path.display());
793            return Ok(path.clone());
794        }
795    }
796
797    // Log detailed error information
798    warn!("Claude CLI not found in any standard location");
799    warn!("Searched paths: {:?}", searched);
800
801    // Check if Node.js is installed
802    if which::which("node").is_err() && which::which("npm").is_err() {
803        error!("Node.js/npm not found - Claude CLI requires Node.js");
804        return Err(SdkError::CliNotFound {
805            searched_paths: format!(
806                "Node.js is not installed. Install from https://nodejs.org/\n\nSearched in:\n{}",
807                searched.join("\n")
808            ),
809        });
810    }
811
812    Err(SdkError::CliNotFound {
813        searched_paths: format!(
814            "Claude CLI not found. Install with:\n  npm install -g @anthropic-ai/claude-code\n\nSearched in:\n{}",
815            searched.join("\n")
816        ),
817    })
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823
824    #[test]
825    fn test_find_claude_cli_error_message() {
826        // Test error message format without relying on CLI not being found
827        let error = SdkError::CliNotFound {
828            searched_paths: "test paths".to_string(),
829        };
830        let error_msg = error.to_string();
831        assert!(error_msg.contains("npm install -g @anthropic-ai/claude-code"));
832        assert!(error_msg.contains("test paths"));
833    }
834
835    #[tokio::test]
836    async fn test_transport_lifecycle() {
837        let options = ClaudeCodeOptions::default();
838        let transport = SubprocessTransport::new(options).unwrap_or_else(|_| {
839            // Use a dummy path for testing
840            SubprocessTransport::with_cli_path(ClaudeCodeOptions::default(), "/usr/bin/true")
841        });
842
843        assert!(!transport.is_connected());
844        assert_eq!(transport.state, TransportState::Disconnected);
845    }
846}