Skip to main content

claude_code/transport/
subprocess_cli.rs

1//! Subprocess-based transport for the Claude Code CLI.
2//!
3//! This module provides [`SubprocessCliTransport`], which spawns the Claude Code CLI
4//! as a child process and communicates via stdin/stdout using newline-delimited JSON.
5//!
6//! It also provides [`JsonStreamBuffer`] for incrementally parsing JSON messages
7//! from a byte stream.
8
9use std::collections::{HashMap, VecDeque};
10use std::panic::{self, AssertUnwindSafe};
11use std::path::{Path, PathBuf};
12use std::process::Stdio;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use semver::Version;
18use serde_json::{Value, json};
19use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
20use tokio::process::{Child, ChildStdin, ChildStdout, Command};
21use tokio::sync::Mutex;
22use tracing::warn;
23
24use crate::errors::{
25    CLIConnectionError, CLIJSONDecodeError, CLINotFoundError, Error, ProcessError, Result,
26};
27use crate::transport::{Transport, TransportCloseHandle, TransportReader, TransportWriter};
28use crate::types::{
29    ClaudeAgentOptions, McpServersOption, PermissionMode, SettingSource, StderrCallback,
30    SystemPrompt, ThinkingConfig, ToolsOption,
31};
32
33/// Default maximum buffer size for JSON stream parsing (1 MB).
34pub const DEFAULT_MAX_BUFFER_SIZE: usize = 1024 * 1024;
35const MINIMUM_CLAUDE_CODE_VERSION: &str = "2.0.0";
36
37/// Prompt type for the transport layer.
38///
39/// Determines whether the CLI is invoked with a text prompt on the command line
40/// or in streaming message mode (input via stdin).
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum Prompt {
43    /// A text prompt passed as a CLI argument.
44    Text(String),
45    /// Streaming message mode — input is provided via stdin as JSON messages.
46    Messages,
47}
48
49/// Incremental JSON stream parser for buffering and parsing newline-delimited JSON.
50///
51/// Accumulates input chunks and attempts to parse complete JSON values.
52/// Handles cases where JSON objects span multiple lines or chunks.
53///
54/// # Buffer overflow protection
55///
56/// If the buffer exceeds `max_buffer_size` bytes, a [`CLIJSONDecodeError`] is returned
57/// and the buffer is cleared.
58#[derive(Debug, Clone)]
59pub struct JsonStreamBuffer {
60    buffer: String,
61    max_buffer_size: usize,
62}
63
64impl JsonStreamBuffer {
65    /// Creates a new `JsonStreamBuffer` with the given maximum buffer size.
66    ///
67    /// # Example
68    ///
69    /// ```rust
70    /// use claude_code::JsonStreamBuffer;
71    ///
72    /// let _buffer = JsonStreamBuffer::new(1024 * 1024);
73    /// ```
74    pub fn new(max_buffer_size: usize) -> Self {
75        Self {
76            buffer: String::new(),
77            max_buffer_size,
78        }
79    }
80
81    /// Pushes a chunk of data into the buffer and returns any complete JSON values.
82    ///
83    /// The chunk is split by newlines, and each line is appended to the internal buffer.
84    /// After each line, the buffer is tested for valid JSON. If it parses successfully,
85    /// the value is collected and the buffer is cleared for the next message.
86    ///
87    /// # Returns
88    ///
89    /// A `Vec<Value>` of all complete JSON values parsed from this chunk.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`CLIJSONDecodeError`] if the buffer exceeds the maximum size.
94    ///
95    /// # Example
96    ///
97    /// ```rust
98    /// use claude_code::JsonStreamBuffer;
99    ///
100    /// let mut buffer = JsonStreamBuffer::new(1024);
101    /// let parsed = buffer.push_chunk("{\"type\":\"system\"}\n").unwrap();
102    /// assert_eq!(parsed.len(), 1);
103    /// ```
104    pub fn push_chunk(
105        &mut self,
106        chunk: &str,
107    ) -> std::result::Result<Vec<Value>, CLIJSONDecodeError> {
108        let mut messages = Vec::new();
109
110        for line in chunk.split('\n') {
111            let line = line.trim();
112            if line.is_empty() {
113                continue;
114            }
115
116            self.buffer.push_str(line);
117            if self.buffer.len() > self.max_buffer_size {
118                let current_size = self.buffer.len();
119                self.buffer.clear();
120                return Err(CLIJSONDecodeError::new(
121                    format!(
122                        "JSON message exceeded maximum buffer size of {} bytes",
123                        self.max_buffer_size
124                    ),
125                    format!(
126                        "Buffer size {current_size} exceeds limit {}",
127                        self.max_buffer_size
128                    ),
129                ));
130            }
131
132            match serde_json::from_str::<Value>(&self.buffer) {
133                Ok(value) => {
134                    messages.push(value);
135                    self.buffer.clear();
136                }
137                Err(_) => {
138                    // Continue buffering partial JSON.
139                }
140            }
141        }
142
143        Ok(messages)
144    }
145}
146
147/// Transport implementation that communicates with the Claude Code CLI via a subprocess.
148///
149/// Spawns the `claude` CLI as a child process, passing configuration via command-line
150/// arguments and environment variables. Communication uses newline-delimited JSON
151/// over stdin (input) and stdout (output).
152///
153/// # CLI discovery
154///
155/// The CLI binary is located by:
156/// 1. Using `cli_path` from [`ClaudeAgentOptions`] if provided
157/// 2. Searching `PATH` for `claude`
158/// 3. Checking common installation locations (`~/.npm-global/bin/`, `/usr/local/bin/`, etc.)
159pub struct SubprocessCliTransport {
160    /// The prompt type for this transport session.
161    pub prompt: Prompt,
162    /// The agent options used to configure the CLI.
163    pub options: ClaudeAgentOptions,
164    /// The resolved path to the CLI executable.
165    pub cli_path: String,
166    cwd: Option<PathBuf>,
167    child: Option<Child>,
168    stdout: Option<BufReader<ChildStdout>>,
169    stdin: Option<ChildStdin>,
170    ready: bool,
171    write_lock: Arc<Mutex<()>>,
172    parser: JsonStreamBuffer,
173    pending_messages: VecDeque<Value>,
174    /// Handle for the background task that drains stderr to prevent pipe blocking.
175    stderr_task: Option<tokio::task::JoinHandle<()>>,
176    /// Optional stderr callback to receive line output.
177    stderr_callback: Option<StderrCallback>,
178}
179
180impl SubprocessCliTransport {
181    /// Creates a new `SubprocessCliTransport` with the given prompt and options.
182    ///
183    /// Resolves the CLI path immediately but does not start the subprocess.
184    /// Call [`connect()`](Transport::connect) to spawn the process.
185    ///
186    /// # Errors
187    ///
188    /// Returns [`CLINotFoundError`] if the CLI executable cannot be located.
189    ///
190    /// # Example
191    ///
192    /// ```rust
193    /// use claude_code::transport::subprocess_cli::{Prompt, SubprocessCliTransport};
194    ///
195    /// let _transport = SubprocessCliTransport::new(Prompt::Messages, Default::default()).unwrap();
196    /// ```
197    pub fn new(prompt: Prompt, options: ClaudeAgentOptions) -> Result<Self> {
198        let cli_path = match &options.cli_path {
199            Some(path) => path.to_string_lossy().to_string(),
200            None => Self::find_cli()?,
201        };
202
203        let cwd = options.cwd.clone();
204        let max_buffer_size = options.max_buffer_size.unwrap_or(DEFAULT_MAX_BUFFER_SIZE);
205        let stderr_callback = options.stderr.clone();
206
207        Ok(Self {
208            prompt,
209            options,
210            cli_path,
211            cwd,
212            child: None,
213            stdout: None,
214            stdin: None,
215            ready: false,
216            write_lock: Arc::new(Mutex::new(())),
217            parser: JsonStreamBuffer::new(max_buffer_size),
218            pending_messages: VecDeque::new(),
219            stderr_task: None,
220            stderr_callback,
221        })
222    }
223
224    /// Locates the Claude Code CLI binary by searching PATH and common locations.
225    fn find_cli() -> std::result::Result<String, CLINotFoundError> {
226        if let Some(path) = Self::find_bundled_cli() {
227            return Ok(path);
228        }
229
230        if let Ok(path) = which::which("claude") {
231            return Ok(path.to_string_lossy().to_string());
232        }
233
234        let locations = vec![
235            PathBuf::from(format!(
236                "{}/.npm-global/bin/claude",
237                std::env::var("HOME").unwrap_or_default()
238            )),
239            PathBuf::from("/usr/local/bin/claude"),
240            PathBuf::from(format!(
241                "{}/.local/bin/claude",
242                std::env::var("HOME").unwrap_or_default()
243            )),
244            PathBuf::from(format!(
245                "{}/node_modules/.bin/claude",
246                std::env::var("HOME").unwrap_or_default()
247            )),
248            PathBuf::from(format!(
249                "{}/.yarn/bin/claude",
250                std::env::var("HOME").unwrap_or_default()
251            )),
252            PathBuf::from(format!(
253                "{}/.claude/local/claude",
254                std::env::var("HOME").unwrap_or_default()
255            )),
256        ];
257
258        for path in locations {
259            if path.exists() && path.is_file() {
260                return Ok(path.to_string_lossy().to_string());
261            }
262        }
263
264        Err(CLINotFoundError::new(
265            "Claude Code not found. Install with:\n  npm install -g @anthropic-ai/claude-code\n\nIf already installed locally, try:\n  export PATH=\"$HOME/node_modules/.bin:$PATH\"\n\nOr provide the path via ClaudeAgentOptions",
266            None,
267        ))
268    }
269
270    /// Attempts to locate a bundled Claude Code CLI binary.
271    fn find_bundled_cli() -> Option<String> {
272        if let Ok(path) = std::env::var("CLAUDE_CODE_BUNDLED_CLI") {
273            let candidate = PathBuf::from(path);
274            if candidate.is_file() {
275                return Some(candidate.to_string_lossy().to_string());
276            }
277        }
278
279        let cli_name = if cfg!(windows) {
280            "claude.exe"
281        } else {
282            "claude"
283        };
284        let mut candidates = Vec::new();
285        if let Ok(current_exe) = std::env::current_exe()
286            && let Some(exe_dir) = current_exe.parent()
287        {
288            candidates.push(exe_dir.join("_bundled").join(cli_name));
289            candidates.push(exe_dir.join("..").join("_bundled").join(cli_name));
290        }
291
292        for candidate in candidates {
293            if candidate.is_file() {
294                return Some(candidate.to_string_lossy().to_string());
295            }
296        }
297        None
298    }
299
300    /// Resolves a user identifier (username string or numeric UID) to a Unix UID.
301    ///
302    /// Supports both numeric UIDs (e.g., `"1000"`) and username strings (e.g., `"nobody"`).
303    #[cfg(unix)]
304    fn resolve_user_to_uid(user: &str) -> Result<u32> {
305        // Try parsing as numeric UID first.
306        if let Ok(uid) = user.parse::<u32>() {
307            return Ok(uid);
308        }
309
310        if user.as_bytes().contains(&0) {
311            return Err(Error::Other(format!(
312                "Invalid user name (contains null byte): {user}"
313            )));
314        }
315
316        // Look up username via nix's safe Unix user APIs.
317        let found = nix::unistd::User::from_name(user)
318            .map_err(|err| Error::Other(format!("Failed to resolve user '{user}': {err}")))?;
319        let entry = found.ok_or_else(|| Error::Other(format!("User not found: {user}")))?;
320        Ok(entry.uid.as_raw())
321    }
322
323    fn parse_semver_prefix(version: &str) -> Option<[u32; 3]> {
324        let token = version.split_whitespace().next().unwrap_or_default();
325        let parsed = Version::parse(token).ok()?;
326        Some([
327            u32::try_from(parsed.major).ok()?,
328            u32::try_from(parsed.minor).ok()?,
329            u32::try_from(parsed.patch).ok()?,
330        ])
331    }
332
333    async fn check_claude_version(&self) {
334        if std::env::var("CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK").is_ok() {
335            return;
336        }
337
338        let mut command = Command::new(&self.cli_path);
339        command.arg("-v");
340        command.stdout(Stdio::piped());
341        command.stderr(Stdio::null());
342
343        let output = tokio::time::timeout(Duration::from_secs(2), command.output()).await;
344        let Ok(Ok(output)) = output else {
345            return;
346        };
347        if !output.status.success() {
348            return;
349        }
350
351        let version_output = String::from_utf8_lossy(&output.stdout).trim().to_string();
352        let Some(version) = Self::parse_semver_prefix(&version_output) else {
353            return;
354        };
355        let Some(minimum) = Self::parse_semver_prefix(MINIMUM_CLAUDE_CODE_VERSION) else {
356            return;
357        };
358
359        if version < minimum {
360            eprintln!(
361                "Warning: Claude Code version {} is unsupported in the Agent SDK. Minimum required version is {}. Some features may not work correctly.",
362                version_output, MINIMUM_CLAUDE_CODE_VERSION
363            );
364        }
365    }
366
367    /// Converts a `PermissionMode` enum variant to its CLI string representation.
368    fn permission_mode_to_string(mode: &PermissionMode) -> &'static str {
369        match mode {
370            PermissionMode::Default => "default",
371            PermissionMode::AcceptEdits => "acceptEdits",
372            PermissionMode::Plan => "plan",
373            PermissionMode::BypassPermissions => "bypassPermissions",
374        }
375    }
376
377    /// Converts a `SettingSource` enum variant to its CLI string representation.
378    fn setting_source_to_string(source: &SettingSource) -> &'static str {
379        match source {
380            SettingSource::User => "user",
381            SettingSource::Project => "project",
382            SettingSource::Local => "local",
383        }
384    }
385
386    fn parse_settings_object(
387        settings: &str,
388    ) -> std::result::Result<serde_json::Map<String, Value>, String> {
389        let settings_str = settings.trim();
390
391        if settings_str.starts_with('{') && settings_str.ends_with('}') {
392            let parsed: Value = serde_json::from_str(settings_str)
393                .map_err(|err| format!("Invalid settings JSON: {err}"))?;
394            return match parsed {
395                Value::Object(obj) => Ok(obj),
396                _ => Err("Settings JSON must be an object".to_string()),
397            };
398        }
399
400        let path = Path::new(settings_str);
401        if !path.exists() {
402            return Err(format!("Settings file does not exist: {settings_str}"));
403        }
404
405        let content = std::fs::read_to_string(path)
406            .map_err(|err| format!("Failed to read settings file '{settings_str}': {err}"))?;
407        let parsed: Value = serde_json::from_str(&content)
408            .map_err(|err| format!("Invalid JSON in settings file '{settings_str}': {err}"))?;
409        match parsed {
410            Value::Object(obj) => Ok(obj),
411            _ => Err(format!(
412                "Settings file '{settings_str}' must contain a JSON object"
413            )),
414        }
415    }
416
417    /// Builds the combined settings value from `options.settings` and `options.sandbox`.
418    fn build_settings_value(&self) -> Result<Option<String>> {
419        let has_settings = self.options.settings.is_some();
420        let has_sandbox = self.options.sandbox.is_some();
421
422        if !has_settings && !has_sandbox {
423            return Ok(None);
424        }
425
426        if has_settings && !has_sandbox {
427            return Ok(self.options.settings.clone());
428        }
429
430        let mut settings_obj = serde_json::Map::new();
431
432        if let Some(settings) = &self.options.settings {
433            match Self::parse_settings_object(settings) {
434                Ok(obj) => {
435                    settings_obj = obj;
436                }
437                Err(err) => {
438                    tracing::warn!(
439                        "Failed to merge settings into sandbox config: {err}. Falling back to sandbox-only settings."
440                    );
441                    if self.options.strict_settings_merge {
442                        return Err(Error::Other(format!(
443                            "Failed to merge settings into sandbox config: {err}"
444                        )));
445                    }
446                }
447            }
448        }
449
450        if let Some(sandbox) = &self.options.sandbox {
451            settings_obj.insert(
452                "sandbox".to_string(),
453                serde_json::to_value(sandbox).unwrap_or(Value::Null),
454            );
455        }
456
457        Ok(Some(Value::Object(settings_obj).to_string()))
458    }
459
460    /// Builds the complete command-line arguments for spawning the CLI process.
461    ///
462    /// Translates all [`ClaudeAgentOptions`] fields into their corresponding CLI flags.
463    ///
464    /// # Returns
465    ///
466    /// A `Vec<String>` where the first element is the CLI path and the rest are arguments.
467    ///
468    /// # Example
469    ///
470    /// ```rust
471    /// use claude_code::transport::subprocess_cli::{Prompt, SubprocessCliTransport};
472    ///
473    /// let transport = SubprocessCliTransport::new(Prompt::Messages, Default::default()).unwrap();
474    /// let args = transport.build_command().unwrap();
475    /// assert!(args.iter().any(|arg| arg == "--input-format"));
476    /// ```
477    pub fn build_command(&self) -> Result<Vec<String>> {
478        let mut cmd = vec![
479            self.cli_path.clone(),
480            "--output-format".to_string(),
481            "stream-json".to_string(),
482            "--verbose".to_string(),
483        ];
484
485        match &self.options.system_prompt {
486            None => {
487                cmd.push("--system-prompt".to_string());
488                cmd.push(String::new());
489            }
490            Some(SystemPrompt::Text(prompt)) => {
491                cmd.push("--system-prompt".to_string());
492                cmd.push(prompt.clone());
493            }
494            Some(SystemPrompt::Preset(preset)) => {
495                if let Some(append) = &preset.append {
496                    cmd.push("--append-system-prompt".to_string());
497                    cmd.push(append.clone());
498                }
499            }
500        }
501
502        if let Some(tools) = &self.options.tools {
503            match tools {
504                ToolsOption::List(list) => {
505                    cmd.push("--tools".to_string());
506                    if list.is_empty() {
507                        cmd.push(String::new());
508                    } else {
509                        cmd.push(list.join(","));
510                    }
511                }
512                ToolsOption::Preset(_) => {
513                    cmd.push("--tools".to_string());
514                    cmd.push("default".to_string());
515                }
516            }
517        }
518
519        if !self.options.allowed_tools.is_empty() {
520            cmd.push("--allowedTools".to_string());
521            cmd.push(self.options.allowed_tools.join(","));
522        }
523
524        if let Some(max_turns) = self.options.max_turns {
525            cmd.push("--max-turns".to_string());
526            cmd.push(max_turns.to_string());
527        }
528
529        if let Some(max_budget) = self.options.max_budget_usd {
530            cmd.push("--max-budget-usd".to_string());
531            cmd.push(max_budget.to_string());
532        }
533
534        if !self.options.disallowed_tools.is_empty() {
535            cmd.push("--disallowedTools".to_string());
536            cmd.push(self.options.disallowed_tools.join(","));
537        }
538
539        if let Some(model) = &self.options.model {
540            cmd.push("--model".to_string());
541            cmd.push(model.clone());
542        }
543
544        if let Some(model) = &self.options.fallback_model {
545            cmd.push("--fallback-model".to_string());
546            cmd.push(model.clone());
547        }
548
549        if !self.options.betas.is_empty() {
550            cmd.push("--betas".to_string());
551            cmd.push(self.options.betas.join(","));
552        }
553
554        if let Some(tool_name) = &self.options.permission_prompt_tool_name {
555            cmd.push("--permission-prompt-tool".to_string());
556            cmd.push(tool_name.clone());
557        }
558
559        if let Some(mode) = &self.options.permission_mode {
560            cmd.push("--permission-mode".to_string());
561            cmd.push(Self::permission_mode_to_string(mode).to_string());
562        }
563
564        if self.options.continue_conversation {
565            cmd.push("--continue".to_string());
566        }
567
568        if let Some(resume) = &self.options.resume {
569            cmd.push("--resume".to_string());
570            cmd.push(resume.clone());
571        }
572
573        if let Some(settings) = self.build_settings_value()? {
574            cmd.push("--settings".to_string());
575            cmd.push(settings);
576        }
577
578        for directory in &self.options.add_dirs {
579            cmd.push("--add-dir".to_string());
580            cmd.push(directory.to_string_lossy().to_string());
581        }
582
583        match &self.options.mcp_servers {
584            McpServersOption::Servers(servers) => {
585                let mut cli_servers = HashMap::new();
586                for (name, config) in servers {
587                    cli_servers.insert(name.clone(), config.to_cli_json());
588                }
589                if !cli_servers.is_empty() {
590                    cmd.push("--mcp-config".to_string());
591                    cmd.push(json!({ "mcpServers": cli_servers }).to_string());
592                }
593            }
594            McpServersOption::Raw(raw) => {
595                cmd.push("--mcp-config".to_string());
596                cmd.push(raw.clone());
597            }
598            McpServersOption::None => {}
599        }
600
601        if self.options.include_partial_messages {
602            cmd.push("--include-partial-messages".to_string());
603        }
604
605        if self.options.fork_session {
606            cmd.push("--fork-session".to_string());
607        }
608
609        let setting_sources = self
610            .options
611            .setting_sources
612            .as_ref()
613            .map(|sources| {
614                sources
615                    .iter()
616                    .map(Self::setting_source_to_string)
617                    .collect::<Vec<_>>()
618                    .join(",")
619            })
620            .unwrap_or_default();
621        cmd.push("--setting-sources".to_string());
622        cmd.push(setting_sources);
623
624        for plugin in &self.options.plugins {
625            if plugin.type_ != "local" {
626                return Err(Error::Other(format!(
627                    "Unsupported plugin type: {}",
628                    plugin.type_
629                )));
630            }
631            cmd.push("--plugin-dir".to_string());
632            cmd.push(plugin.path.clone());
633        }
634
635        for (flag, value) in &self.options.extra_args {
636            if let Some(v) = value {
637                cmd.push(format!("--{flag}"));
638                cmd.push(v.clone());
639            } else {
640                cmd.push(format!("--{flag}"));
641            }
642        }
643
644        let mut resolved_max_thinking_tokens = self.options.max_thinking_tokens;
645        if let Some(thinking) = &self.options.thinking {
646            match thinking {
647                ThinkingConfig::Adaptive => {
648                    if resolved_max_thinking_tokens.is_none() {
649                        resolved_max_thinking_tokens = Some(32_000);
650                    }
651                }
652                ThinkingConfig::Enabled { budget_tokens } => {
653                    resolved_max_thinking_tokens = Some(*budget_tokens);
654                }
655                ThinkingConfig::Disabled => {
656                    resolved_max_thinking_tokens = Some(0);
657                }
658            }
659        }
660
661        if let Some(tokens) = resolved_max_thinking_tokens {
662            cmd.push("--max-thinking-tokens".to_string());
663            cmd.push(tokens.to_string());
664        }
665
666        if let Some(effort) = &self.options.effort {
667            cmd.push("--effort".to_string());
668            cmd.push(effort.clone());
669        }
670
671        if let Some(Value::Object(output_format)) = &self.options.output_format
672            && output_format.get("type").and_then(Value::as_str) == Some("json_schema")
673            && let Some(schema) = output_format.get("schema")
674        {
675            cmd.push("--json-schema".to_string());
676            cmd.push(schema.to_string());
677        }
678
679        cmd.push("--input-format".to_string());
680        cmd.push("stream-json".to_string());
681
682        Ok(cmd)
683    }
684}
685
686#[async_trait]
687impl Transport for SubprocessCliTransport {
688    async fn connect(&mut self) -> Result<()> {
689        if self.child.is_some() {
690            return Ok(());
691        }
692
693        self.check_claude_version().await;
694
695        if let Some(cwd) = &self.cwd
696            && !cwd.exists()
697        {
698            return Err(CLIConnectionError::new(format!(
699                "Working directory does not exist: {}",
700                cwd.to_string_lossy()
701            ))
702            .into());
703        }
704
705        let cmd = self.build_command()?;
706        let mut command = Command::new(&cmd[0]);
707        command.args(&cmd[1..]);
708        command.stdin(Stdio::piped());
709        command.stdout(Stdio::piped());
710        command.stderr(Stdio::piped());
711        if let Some(cwd) = &self.cwd {
712            command.current_dir(cwd);
713            command.env("PWD", cwd.to_string_lossy().to_string());
714        }
715
716        command.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
717        command.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
718        if self.options.enable_file_checkpointing {
719            command.env("CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", "true");
720        }
721        for (key, value) in &self.options.env {
722            command.env(key, value);
723        }
724
725        // Set subprocess user identity on Unix systems.
726        #[cfg(unix)]
727        if let Some(user) = &self.options.user {
728            let uid = Self::resolve_user_to_uid(user)?;
729            command.uid(uid);
730        }
731
732        let mut child = command.spawn().map_err(|e| {
733            if e.kind() == std::io::ErrorKind::NotFound {
734                Error::CLINotFound(CLINotFoundError::new(
735                    "Claude Code not found",
736                    Some(self.cli_path.clone()),
737                ))
738            } else {
739                Error::CLIConnection(CLIConnectionError::new(format!(
740                    "Failed to start Claude Code: {e}"
741                )))
742            }
743        })?;
744
745        let stdout = child.stdout.take().ok_or_else(|| {
746            Error::CLIConnection(CLIConnectionError::new(
747                "Failed to open stdout for Claude process",
748            ))
749        })?;
750
751        self.stdin = child.stdin.take();
752        self.stdout = Some(BufReader::new(stdout));
753
754        // Spawn a background task to drain stderr and prevent pipe buffer blocking.
755        // The task reads all stderr output and optionally invokes the user's callback.
756        if let Some(stderr) = child.stderr.take() {
757            let callback = self.stderr_callback.clone();
758            self.stderr_task = Some(tokio::spawn(async move {
759                let mut reader = BufReader::new(stderr);
760                let mut line = String::new();
761                loop {
762                    line.clear();
763                    match reader.read_line(&mut line).await {
764                        Ok(0) => break, // EOF
765                        Ok(_) => {
766                            let trimmed = line.trim_end().to_string();
767                            if !trimmed.is_empty() {
768                                if let Some(cb) = &callback {
769                                    let callback_result =
770                                        panic::catch_unwind(AssertUnwindSafe(|| cb(trimmed)));
771                                    if callback_result.is_err() {
772                                        warn!("stderr callback panicked; continuing stderr drain");
773                                    }
774                                }
775                            }
776                        }
777                        Err(_) => break,
778                    }
779                }
780            }));
781        }
782
783        self.child = Some(child);
784        self.ready = true;
785        Ok(())
786    }
787
788    async fn write(&mut self, data: &str) -> Result<()> {
789        let _guard = self.write_lock.lock().await;
790
791        if !self.ready {
792            return Err(
793                CLIConnectionError::new("ProcessTransport is not ready for writing").into(),
794            );
795        }
796
797        if let Some(child) = &mut self.child
798            && let Ok(Some(status)) = child.try_wait()
799        {
800            return Err(CLIConnectionError::new(format!(
801                "Cannot write to terminated process (exit code: {:?})",
802                status.code()
803            ))
804            .into());
805        }
806
807        let stdin = self.stdin.as_mut().ok_or_else(|| {
808            Error::CLIConnection(CLIConnectionError::new(
809                "ProcessTransport is not ready for writing",
810            ))
811        })?;
812
813        stdin.write_all(data.as_bytes()).await.map_err(|e| {
814            Error::CLIConnection(CLIConnectionError::new(format!(
815                "Failed to write to process stdin: {e}"
816            )))
817        })?;
818        stdin.flush().await.map_err(|e| {
819            Error::CLIConnection(CLIConnectionError::new(format!(
820                "Failed to flush process stdin: {e}"
821            )))
822        })?;
823
824        Ok(())
825    }
826
827    async fn end_input(&mut self) -> Result<()> {
828        let _guard = self.write_lock.lock().await;
829        self.stdin.take();
830        Ok(())
831    }
832
833    async fn read_next_message(&mut self) -> Result<Option<Value>> {
834        if let Some(message) = self.pending_messages.pop_front() {
835            return Ok(Some(message));
836        }
837
838        if self.child.is_none() || self.stdout.is_none() {
839            return Err(CLIConnectionError::new("Not connected").into());
840        }
841
842        let stdout = self.stdout.as_mut().expect("checked is_some");
843
844        loop {
845            let mut line = String::new();
846            let bytes_read = stdout.read_line(&mut line).await?;
847            if bytes_read == 0 {
848                break;
849            }
850
851            let parsed = self.parser.push_chunk(&line)?;
852            for message in parsed {
853                self.pending_messages.push_back(message);
854            }
855            if let Some(message) = self.pending_messages.pop_front() {
856                return Ok(Some(message));
857            }
858        }
859
860        self.ready = false;
861        if let Some(child) = &mut self.child {
862            let status = child.wait().await.map_err(|e| {
863                Error::Process(ProcessError::new(
864                    format!("Failed to wait for process completion: {e}"),
865                    None,
866                    None,
867                ))
868            })?;
869            if !status.success() {
870                return Err(ProcessError::new(
871                    "Command failed",
872                    status.code(),
873                    Some("Check stderr output for details".to_string()),
874                )
875                .into());
876            }
877        }
878        Ok(None)
879    }
880
881    async fn close(&mut self) -> Result<()> {
882        self.ready = false;
883        self.stdin.take();
884        self.stdout.take();
885        if let Some(child) = &mut self.child
886            && child.try_wait()?.is_none()
887        {
888            let _ = child.kill().await;
889            let _ = child.wait().await;
890        }
891        self.child = None;
892        // Abort the stderr drain task if still running.
893        if let Some(task) = self.stderr_task.take() {
894            task.abort();
895        }
896        Ok(())
897    }
898
899    fn is_ready(&self) -> bool {
900        self.ready
901    }
902
903    fn into_split(mut self: Box<Self>) -> super::TransportSplitResult {
904        if !self.ready {
905            return Err(
906                CLIConnectionError::new("Cannot split a transport that is not connected").into(),
907            );
908        }
909
910        let stdout = self.stdout.take().ok_or_else(|| {
911            Error::CLIConnection(CLIConnectionError::new(
912                "Cannot split: stdout not available",
913            ))
914        })?;
915
916        let stdin = self.stdin.take();
917
918        let close_state = Arc::new(SubprocessCloseState {
919            child: Mutex::new(self.child.take()),
920            stderr_task: Mutex::new(self.stderr_task.take()),
921        });
922
923        let reader = SubprocessReader {
924            stdout,
925            parser: self.parser.clone(),
926            pending_messages: std::mem::take(&mut self.pending_messages),
927            close_state: close_state.clone(),
928        };
929
930        let writer = SubprocessWriter {
931            stdin: Mutex::new(stdin),
932            write_lock: self.write_lock.clone(),
933        };
934
935        Ok((
936            Box::new(reader),
937            Box::new(writer),
938            Box::new(SubprocessCloseHandle { state: close_state }),
939        ))
940    }
941}
942
943/// Shared state for subprocess cleanup after splitting.
944struct SubprocessCloseState {
945    child: Mutex<Option<Child>>,
946    stderr_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
947}
948
949/// Reader half of a split [`SubprocessCliTransport`].
950///
951/// Owns the stdout stream and JSON parser.
952pub struct SubprocessReader {
953    stdout: BufReader<ChildStdout>,
954    parser: JsonStreamBuffer,
955    pending_messages: VecDeque<Value>,
956    close_state: Arc<SubprocessCloseState>,
957}
958
959#[async_trait]
960impl TransportReader for SubprocessReader {
961    async fn read_next_message(&mut self) -> Result<Option<Value>> {
962        if let Some(message) = self.pending_messages.pop_front() {
963            return Ok(Some(message));
964        }
965
966        loop {
967            let mut line = String::new();
968            let bytes_read = self.stdout.read_line(&mut line).await?;
969            if bytes_read == 0 {
970                break;
971            }
972
973            let parsed = self.parser.push_chunk(&line)?;
974            for message in parsed {
975                self.pending_messages.push_back(message);
976            }
977            if let Some(message) = self.pending_messages.pop_front() {
978                return Ok(Some(message));
979            }
980        }
981
982        // EOF — wait for child process
983        if let Some(child) = &mut *self.close_state.child.lock().await {
984            let status = child.wait().await.map_err(|e| {
985                Error::Process(ProcessError::new(
986                    format!("Failed to wait for process completion: {e}"),
987                    None,
988                    None,
989                ))
990            })?;
991            if !status.success() {
992                return Err(ProcessError::new(
993                    "Command failed",
994                    status.code(),
995                    Some("Check stderr output for details".to_string()),
996                )
997                .into());
998            }
999        }
1000        Ok(None)
1001    }
1002}
1003
1004/// Writer half of a split [`SubprocessCliTransport`].
1005///
1006/// Owns the stdin handle.
1007pub struct SubprocessWriter {
1008    stdin: Mutex<Option<ChildStdin>>,
1009    write_lock: Arc<Mutex<()>>,
1010}
1011
1012#[async_trait]
1013impl TransportWriter for SubprocessWriter {
1014    async fn write(&mut self, data: &str) -> Result<()> {
1015        let _guard = self.write_lock.lock().await;
1016
1017        let mut stdin_guard = self.stdin.lock().await;
1018        let stdin = stdin_guard
1019            .as_mut()
1020            .ok_or_else(|| Error::CLIConnection(CLIConnectionError::new("stdin is closed")))?;
1021
1022        stdin.write_all(data.as_bytes()).await.map_err(|e| {
1023            Error::CLIConnection(CLIConnectionError::new(format!(
1024                "Failed to write to process stdin: {e}"
1025            )))
1026        })?;
1027        stdin.flush().await.map_err(|e| {
1028            Error::CLIConnection(CLIConnectionError::new(format!(
1029                "Failed to flush process stdin: {e}"
1030            )))
1031        })?;
1032
1033        Ok(())
1034    }
1035
1036    async fn end_input(&mut self) -> Result<()> {
1037        let _guard = self.write_lock.lock().await;
1038        self.stdin.lock().await.take();
1039        Ok(())
1040    }
1041}
1042
1043/// Close handle for a split [`SubprocessCliTransport`].
1044struct SubprocessCloseHandle {
1045    state: Arc<SubprocessCloseState>,
1046}
1047
1048#[async_trait]
1049impl TransportCloseHandle for SubprocessCloseHandle {
1050    async fn close(&self) -> Result<()> {
1051        // Drop stdin is already handled by writer
1052        if let Some(child) = &mut *self.state.child.lock().await {
1053            if child.try_wait()?.is_none() {
1054                let _ = child.kill().await;
1055                let _ = child.wait().await;
1056            }
1057        }
1058        *self.state.child.lock().await = None;
1059
1060        if let Some(task) = self.state.stderr_task.lock().await.take() {
1061            task.abort();
1062        }
1063        Ok(())
1064    }
1065}
1066
1067impl Drop for SubprocessCloseHandle {
1068    fn drop(&mut self) {
1069        // Best-effort sync cleanup: kill the child process without waiting.
1070        // This is a last-resort safety net for cases where async close() was
1071        // not called (e.g., early stream drop without explicit shutdown).
1072        if let Ok(mut child_guard) = self.state.child.try_lock() {
1073            if let Some(child) = child_guard.as_mut() {
1074                if child.try_wait().ok().flatten().is_none() {
1075                    let _ = child.start_kill();
1076                }
1077            }
1078        }
1079        if let Ok(mut task_guard) = self.state.stderr_task.try_lock() {
1080            if let Some(task) = task_guard.take() {
1081                task.abort();
1082            }
1083        }
1084    }
1085}
1086
1087impl Drop for SubprocessCliTransport {
1088    fn drop(&mut self) {
1089        self.ready = false;
1090        self.stdin.take();
1091        self.stdout.take();
1092
1093        if let Some(child) = &mut self.child
1094            && child.try_wait().ok().flatten().is_none()
1095        {
1096            let _ = child.start_kill();
1097        }
1098        self.child = None;
1099
1100        if let Some(task) = self.stderr_task.take() {
1101            task.abort();
1102        }
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::SubprocessCliTransport;
1109
1110    #[test]
1111    fn parse_semver_prefix_supports_plain_version() {
1112        assert_eq!(
1113            SubprocessCliTransport::parse_semver_prefix("2.4.1"),
1114            Some([2, 4, 1])
1115        );
1116    }
1117
1118    #[test]
1119    fn parse_semver_prefix_supports_prefixed_version() {
1120        assert_eq!(
1121            SubprocessCliTransport::parse_semver_prefix("2.4.1-beta.1"),
1122            Some([2, 4, 1])
1123        );
1124    }
1125
1126    #[test]
1127    fn parse_semver_prefix_supports_trailing_text_after_whitespace() {
1128        assert_eq!(
1129            SubprocessCliTransport::parse_semver_prefix("2.4.1 (stable channel)"),
1130            Some([2, 4, 1])
1131        );
1132    }
1133
1134    #[test]
1135    fn parse_semver_prefix_rejects_invalid_version() {
1136        assert_eq!(SubprocessCliTransport::parse_semver_prefix("invalid"), None);
1137    }
1138
1139    #[cfg(unix)]
1140    #[test]
1141    fn resolve_user_to_uid_accepts_numeric_uid() {
1142        let uid = nix::unistd::Uid::current().as_raw();
1143        let resolved = SubprocessCliTransport::resolve_user_to_uid(&uid.to_string())
1144            .expect("resolve numeric uid");
1145        assert_eq!(resolved, uid);
1146    }
1147
1148    #[cfg(unix)]
1149    #[test]
1150    fn resolve_user_to_uid_accepts_current_username() {
1151        let current_uid = nix::unistd::Uid::current();
1152        let user = nix::unistd::User::from_uid(current_uid)
1153            .expect("lookup current uid")
1154            .expect("current uid should map to a user");
1155        let resolved = SubprocessCliTransport::resolve_user_to_uid(&user.name)
1156            .expect("resolve current username");
1157        assert_eq!(resolved, current_uid.as_raw());
1158    }
1159
1160    #[cfg(unix)]
1161    #[test]
1162    fn resolve_user_to_uid_rejects_unknown_user() {
1163        let user = format!("__claude_code_sdk_nonexistent_{}__", std::process::id());
1164        let err = SubprocessCliTransport::resolve_user_to_uid(&user).expect_err("must fail");
1165        assert!(err.to_string().contains("User not found"));
1166    }
1167
1168    #[cfg(unix)]
1169    #[test]
1170    fn resolve_user_to_uid_rejects_null_byte_in_username() {
1171        let err =
1172            SubprocessCliTransport::resolve_user_to_uid("name\0with-null").expect_err("must fail");
1173        assert!(err.to_string().contains("Invalid user name"));
1174    }
1175}