1use std::collections::{HashMap, VecDeque};
10use std::panic::{self, AssertUnwindSafe};
11use std::path::{Path, PathBuf};
12use std::process::Stdio;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use semver::Version;
18use serde_json::{Value, json};
19use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
20use tokio::process::{Child, ChildStdin, ChildStdout, Command};
21use tokio::sync::Mutex;
22use tracing::warn;
23
24use crate::errors::{
25 CLIConnectionError, CLIJSONDecodeError, CLINotFoundError, Error, ProcessError, Result,
26};
27use crate::transport::{Transport, TransportCloseHandle, TransportReader, TransportWriter};
28use crate::types::{
29 ClaudeAgentOptions, McpServersOption, PermissionMode, SettingSource, StderrCallback,
30 SystemPrompt, ThinkingConfig, ToolsOption,
31};
32
33pub const DEFAULT_MAX_BUFFER_SIZE: usize = 1024 * 1024;
35const MINIMUM_CLAUDE_CODE_VERSION: &str = "2.0.0";
36
37#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum Prompt {
43 Text(String),
45 Messages,
47}
48
49#[derive(Debug, Clone)]
59pub struct JsonStreamBuffer {
60 buffer: String,
61 max_buffer_size: usize,
62}
63
64impl JsonStreamBuffer {
65 pub fn new(max_buffer_size: usize) -> Self {
75 Self {
76 buffer: String::new(),
77 max_buffer_size,
78 }
79 }
80
81 pub fn push_chunk(
105 &mut self,
106 chunk: &str,
107 ) -> std::result::Result<Vec<Value>, CLIJSONDecodeError> {
108 let mut messages = Vec::new();
109
110 for line in chunk.split('\n') {
111 let line = line.trim();
112 if line.is_empty() {
113 continue;
114 }
115
116 self.buffer.push_str(line);
117 if self.buffer.len() > self.max_buffer_size {
118 let current_size = self.buffer.len();
119 self.buffer.clear();
120 return Err(CLIJSONDecodeError::new(
121 format!(
122 "JSON message exceeded maximum buffer size of {} bytes",
123 self.max_buffer_size
124 ),
125 format!(
126 "Buffer size {current_size} exceeds limit {}",
127 self.max_buffer_size
128 ),
129 ));
130 }
131
132 match serde_json::from_str::<Value>(&self.buffer) {
133 Ok(value) => {
134 messages.push(value);
135 self.buffer.clear();
136 }
137 Err(_) => {
138 }
140 }
141 }
142
143 Ok(messages)
144 }
145}
146
147pub struct SubprocessCliTransport {
160 pub prompt: Prompt,
162 pub options: ClaudeAgentOptions,
164 pub cli_path: String,
166 cwd: Option<PathBuf>,
167 child: Option<Child>,
168 stdout: Option<BufReader<ChildStdout>>,
169 stdin: Option<ChildStdin>,
170 ready: bool,
171 write_lock: Arc<Mutex<()>>,
172 parser: JsonStreamBuffer,
173 pending_messages: VecDeque<Value>,
174 stderr_task: Option<tokio::task::JoinHandle<()>>,
176 stderr_callback: Option<StderrCallback>,
178}
179
180impl SubprocessCliTransport {
181 pub fn new(prompt: Prompt, options: ClaudeAgentOptions) -> Result<Self> {
198 let cli_path = match &options.cli_path {
199 Some(path) => path.to_string_lossy().to_string(),
200 None => Self::find_cli()?,
201 };
202
203 let cwd = options.cwd.clone();
204 let max_buffer_size = options.max_buffer_size.unwrap_or(DEFAULT_MAX_BUFFER_SIZE);
205 let stderr_callback = options.stderr.clone();
206
207 Ok(Self {
208 prompt,
209 options,
210 cli_path,
211 cwd,
212 child: None,
213 stdout: None,
214 stdin: None,
215 ready: false,
216 write_lock: Arc::new(Mutex::new(())),
217 parser: JsonStreamBuffer::new(max_buffer_size),
218 pending_messages: VecDeque::new(),
219 stderr_task: None,
220 stderr_callback,
221 })
222 }
223
224 fn find_cli() -> std::result::Result<String, CLINotFoundError> {
226 if let Some(path) = Self::find_bundled_cli() {
227 return Ok(path);
228 }
229
230 if let Ok(path) = which::which("claude") {
231 return Ok(path.to_string_lossy().to_string());
232 }
233
234 let locations = vec![
235 PathBuf::from(format!(
236 "{}/.npm-global/bin/claude",
237 std::env::var("HOME").unwrap_or_default()
238 )),
239 PathBuf::from("/usr/local/bin/claude"),
240 PathBuf::from(format!(
241 "{}/.local/bin/claude",
242 std::env::var("HOME").unwrap_or_default()
243 )),
244 PathBuf::from(format!(
245 "{}/node_modules/.bin/claude",
246 std::env::var("HOME").unwrap_or_default()
247 )),
248 PathBuf::from(format!(
249 "{}/.yarn/bin/claude",
250 std::env::var("HOME").unwrap_or_default()
251 )),
252 PathBuf::from(format!(
253 "{}/.claude/local/claude",
254 std::env::var("HOME").unwrap_or_default()
255 )),
256 ];
257
258 for path in locations {
259 if path.exists() && path.is_file() {
260 return Ok(path.to_string_lossy().to_string());
261 }
262 }
263
264 Err(CLINotFoundError::new(
265 "Claude Code not found. Install with:\n npm install -g @anthropic-ai/claude-code\n\nIf already installed locally, try:\n export PATH=\"$HOME/node_modules/.bin:$PATH\"\n\nOr provide the path via ClaudeAgentOptions",
266 None,
267 ))
268 }
269
270 fn find_bundled_cli() -> Option<String> {
272 if let Ok(path) = std::env::var("CLAUDE_CODE_BUNDLED_CLI") {
273 let candidate = PathBuf::from(path);
274 if candidate.is_file() {
275 return Some(candidate.to_string_lossy().to_string());
276 }
277 }
278
279 let cli_name = if cfg!(windows) {
280 "claude.exe"
281 } else {
282 "claude"
283 };
284 let mut candidates = Vec::new();
285 if let Ok(current_exe) = std::env::current_exe()
286 && let Some(exe_dir) = current_exe.parent()
287 {
288 candidates.push(exe_dir.join("_bundled").join(cli_name));
289 candidates.push(exe_dir.join("..").join("_bundled").join(cli_name));
290 }
291
292 for candidate in candidates {
293 if candidate.is_file() {
294 return Some(candidate.to_string_lossy().to_string());
295 }
296 }
297 None
298 }
299
300 #[cfg(unix)]
304 fn resolve_user_to_uid(user: &str) -> Result<u32> {
305 if let Ok(uid) = user.parse::<u32>() {
307 return Ok(uid);
308 }
309
310 if user.as_bytes().contains(&0) {
311 return Err(Error::Other(format!(
312 "Invalid user name (contains null byte): {user}"
313 )));
314 }
315
316 let found = nix::unistd::User::from_name(user)
318 .map_err(|err| Error::Other(format!("Failed to resolve user '{user}': {err}")))?;
319 let entry = found.ok_or_else(|| Error::Other(format!("User not found: {user}")))?;
320 Ok(entry.uid.as_raw())
321 }
322
323 fn parse_semver_prefix(version: &str) -> Option<[u32; 3]> {
324 let token = version.split_whitespace().next().unwrap_or_default();
325 let parsed = Version::parse(token).ok()?;
326 Some([
327 u32::try_from(parsed.major).ok()?,
328 u32::try_from(parsed.minor).ok()?,
329 u32::try_from(parsed.patch).ok()?,
330 ])
331 }
332
333 async fn check_claude_version(&self) {
334 if std::env::var("CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK").is_ok() {
335 return;
336 }
337
338 let mut command = Command::new(&self.cli_path);
339 command.arg("-v");
340 command.stdout(Stdio::piped());
341 command.stderr(Stdio::null());
342
343 let output = tokio::time::timeout(Duration::from_secs(2), command.output()).await;
344 let Ok(Ok(output)) = output else {
345 return;
346 };
347 if !output.status.success() {
348 return;
349 }
350
351 let version_output = String::from_utf8_lossy(&output.stdout).trim().to_string();
352 let Some(version) = Self::parse_semver_prefix(&version_output) else {
353 return;
354 };
355 let Some(minimum) = Self::parse_semver_prefix(MINIMUM_CLAUDE_CODE_VERSION) else {
356 return;
357 };
358
359 if version < minimum {
360 eprintln!(
361 "Warning: Claude Code version {} is unsupported in the Agent SDK. Minimum required version is {}. Some features may not work correctly.",
362 version_output, MINIMUM_CLAUDE_CODE_VERSION
363 );
364 }
365 }
366
367 fn permission_mode_to_string(mode: &PermissionMode) -> &'static str {
369 match mode {
370 PermissionMode::Default => "default",
371 PermissionMode::AcceptEdits => "acceptEdits",
372 PermissionMode::Plan => "plan",
373 PermissionMode::BypassPermissions => "bypassPermissions",
374 }
375 }
376
377 fn setting_source_to_string(source: &SettingSource) -> &'static str {
379 match source {
380 SettingSource::User => "user",
381 SettingSource::Project => "project",
382 SettingSource::Local => "local",
383 }
384 }
385
386 fn parse_settings_object(
387 settings: &str,
388 ) -> std::result::Result<serde_json::Map<String, Value>, String> {
389 let settings_str = settings.trim();
390
391 if settings_str.starts_with('{') && settings_str.ends_with('}') {
392 let parsed: Value = serde_json::from_str(settings_str)
393 .map_err(|err| format!("Invalid settings JSON: {err}"))?;
394 return match parsed {
395 Value::Object(obj) => Ok(obj),
396 _ => Err("Settings JSON must be an object".to_string()),
397 };
398 }
399
400 let path = Path::new(settings_str);
401 if !path.exists() {
402 return Err(format!("Settings file does not exist: {settings_str}"));
403 }
404
405 let content = std::fs::read_to_string(path)
406 .map_err(|err| format!("Failed to read settings file '{settings_str}': {err}"))?;
407 let parsed: Value = serde_json::from_str(&content)
408 .map_err(|err| format!("Invalid JSON in settings file '{settings_str}': {err}"))?;
409 match parsed {
410 Value::Object(obj) => Ok(obj),
411 _ => Err(format!(
412 "Settings file '{settings_str}' must contain a JSON object"
413 )),
414 }
415 }
416
417 fn build_settings_value(&self) -> Result<Option<String>> {
419 let has_settings = self.options.settings.is_some();
420 let has_sandbox = self.options.sandbox.is_some();
421
422 if !has_settings && !has_sandbox {
423 return Ok(None);
424 }
425
426 if has_settings && !has_sandbox {
427 return Ok(self.options.settings.clone());
428 }
429
430 let mut settings_obj = serde_json::Map::new();
431
432 if let Some(settings) = &self.options.settings {
433 match Self::parse_settings_object(settings) {
434 Ok(obj) => {
435 settings_obj = obj;
436 }
437 Err(err) => {
438 tracing::warn!(
439 "Failed to merge settings into sandbox config: {err}. Falling back to sandbox-only settings."
440 );
441 if self.options.strict_settings_merge {
442 return Err(Error::Other(format!(
443 "Failed to merge settings into sandbox config: {err}"
444 )));
445 }
446 }
447 }
448 }
449
450 if let Some(sandbox) = &self.options.sandbox {
451 settings_obj.insert(
452 "sandbox".to_string(),
453 serde_json::to_value(sandbox).unwrap_or(Value::Null),
454 );
455 }
456
457 Ok(Some(Value::Object(settings_obj).to_string()))
458 }
459
460 pub fn build_command(&self) -> Result<Vec<String>> {
478 let mut cmd = vec![
479 self.cli_path.clone(),
480 "--output-format".to_string(),
481 "stream-json".to_string(),
482 "--verbose".to_string(),
483 ];
484
485 match &self.options.system_prompt {
486 None => {
487 cmd.push("--system-prompt".to_string());
488 cmd.push(String::new());
489 }
490 Some(SystemPrompt::Text(prompt)) => {
491 cmd.push("--system-prompt".to_string());
492 cmd.push(prompt.clone());
493 }
494 Some(SystemPrompt::Preset(preset)) => {
495 if let Some(append) = &preset.append {
496 cmd.push("--append-system-prompt".to_string());
497 cmd.push(append.clone());
498 }
499 }
500 }
501
502 if let Some(tools) = &self.options.tools {
503 match tools {
504 ToolsOption::List(list) => {
505 cmd.push("--tools".to_string());
506 if list.is_empty() {
507 cmd.push(String::new());
508 } else {
509 cmd.push(list.join(","));
510 }
511 }
512 ToolsOption::Preset(_) => {
513 cmd.push("--tools".to_string());
514 cmd.push("default".to_string());
515 }
516 }
517 }
518
519 if !self.options.allowed_tools.is_empty() {
520 cmd.push("--allowedTools".to_string());
521 cmd.push(self.options.allowed_tools.join(","));
522 }
523
524 if let Some(max_turns) = self.options.max_turns {
525 cmd.push("--max-turns".to_string());
526 cmd.push(max_turns.to_string());
527 }
528
529 if let Some(max_budget) = self.options.max_budget_usd {
530 cmd.push("--max-budget-usd".to_string());
531 cmd.push(max_budget.to_string());
532 }
533
534 if !self.options.disallowed_tools.is_empty() {
535 cmd.push("--disallowedTools".to_string());
536 cmd.push(self.options.disallowed_tools.join(","));
537 }
538
539 if let Some(model) = &self.options.model {
540 cmd.push("--model".to_string());
541 cmd.push(model.clone());
542 }
543
544 if let Some(model) = &self.options.fallback_model {
545 cmd.push("--fallback-model".to_string());
546 cmd.push(model.clone());
547 }
548
549 if !self.options.betas.is_empty() {
550 cmd.push("--betas".to_string());
551 cmd.push(self.options.betas.join(","));
552 }
553
554 if let Some(tool_name) = &self.options.permission_prompt_tool_name {
555 cmd.push("--permission-prompt-tool".to_string());
556 cmd.push(tool_name.clone());
557 }
558
559 if let Some(mode) = &self.options.permission_mode {
560 cmd.push("--permission-mode".to_string());
561 cmd.push(Self::permission_mode_to_string(mode).to_string());
562 }
563
564 if self.options.continue_conversation {
565 cmd.push("--continue".to_string());
566 }
567
568 if let Some(resume) = &self.options.resume {
569 cmd.push("--resume".to_string());
570 cmd.push(resume.clone());
571 }
572
573 if let Some(settings) = self.build_settings_value()? {
574 cmd.push("--settings".to_string());
575 cmd.push(settings);
576 }
577
578 for directory in &self.options.add_dirs {
579 cmd.push("--add-dir".to_string());
580 cmd.push(directory.to_string_lossy().to_string());
581 }
582
583 match &self.options.mcp_servers {
584 McpServersOption::Servers(servers) => {
585 let mut cli_servers = HashMap::new();
586 for (name, config) in servers {
587 cli_servers.insert(name.clone(), config.to_cli_json());
588 }
589 if !cli_servers.is_empty() {
590 cmd.push("--mcp-config".to_string());
591 cmd.push(json!({ "mcpServers": cli_servers }).to_string());
592 }
593 }
594 McpServersOption::Raw(raw) => {
595 cmd.push("--mcp-config".to_string());
596 cmd.push(raw.clone());
597 }
598 McpServersOption::None => {}
599 }
600
601 if self.options.include_partial_messages {
602 cmd.push("--include-partial-messages".to_string());
603 }
604
605 if self.options.fork_session {
606 cmd.push("--fork-session".to_string());
607 }
608
609 let setting_sources = self
610 .options
611 .setting_sources
612 .as_ref()
613 .map(|sources| {
614 sources
615 .iter()
616 .map(Self::setting_source_to_string)
617 .collect::<Vec<_>>()
618 .join(",")
619 })
620 .unwrap_or_default();
621 cmd.push("--setting-sources".to_string());
622 cmd.push(setting_sources);
623
624 for plugin in &self.options.plugins {
625 if plugin.type_ != "local" {
626 return Err(Error::Other(format!(
627 "Unsupported plugin type: {}",
628 plugin.type_
629 )));
630 }
631 cmd.push("--plugin-dir".to_string());
632 cmd.push(plugin.path.clone());
633 }
634
635 for (flag, value) in &self.options.extra_args {
636 if let Some(v) = value {
637 cmd.push(format!("--{flag}"));
638 cmd.push(v.clone());
639 } else {
640 cmd.push(format!("--{flag}"));
641 }
642 }
643
644 let mut resolved_max_thinking_tokens = self.options.max_thinking_tokens;
645 if let Some(thinking) = &self.options.thinking {
646 match thinking {
647 ThinkingConfig::Adaptive => {
648 if resolved_max_thinking_tokens.is_none() {
649 resolved_max_thinking_tokens = Some(32_000);
650 }
651 }
652 ThinkingConfig::Enabled { budget_tokens } => {
653 resolved_max_thinking_tokens = Some(*budget_tokens);
654 }
655 ThinkingConfig::Disabled => {
656 resolved_max_thinking_tokens = Some(0);
657 }
658 }
659 }
660
661 if let Some(tokens) = resolved_max_thinking_tokens {
662 cmd.push("--max-thinking-tokens".to_string());
663 cmd.push(tokens.to_string());
664 }
665
666 if let Some(effort) = &self.options.effort {
667 cmd.push("--effort".to_string());
668 cmd.push(effort.clone());
669 }
670
671 if let Some(Value::Object(output_format)) = &self.options.output_format
672 && output_format.get("type").and_then(Value::as_str) == Some("json_schema")
673 && let Some(schema) = output_format.get("schema")
674 {
675 cmd.push("--json-schema".to_string());
676 cmd.push(schema.to_string());
677 }
678
679 cmd.push("--input-format".to_string());
680 cmd.push("stream-json".to_string());
681
682 Ok(cmd)
683 }
684}
685
686#[async_trait]
687impl Transport for SubprocessCliTransport {
688 async fn connect(&mut self) -> Result<()> {
689 if self.child.is_some() {
690 return Ok(());
691 }
692
693 self.check_claude_version().await;
694
695 if let Some(cwd) = &self.cwd
696 && !cwd.exists()
697 {
698 return Err(CLIConnectionError::new(format!(
699 "Working directory does not exist: {}",
700 cwd.to_string_lossy()
701 ))
702 .into());
703 }
704
705 let cmd = self.build_command()?;
706 let mut command = Command::new(&cmd[0]);
707 command.args(&cmd[1..]);
708 command.stdin(Stdio::piped());
709 command.stdout(Stdio::piped());
710 command.stderr(Stdio::piped());
711 if let Some(cwd) = &self.cwd {
712 command.current_dir(cwd);
713 command.env("PWD", cwd.to_string_lossy().to_string());
714 }
715
716 command.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
717 command.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
718 if self.options.enable_file_checkpointing {
719 command.env("CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", "true");
720 }
721 for (key, value) in &self.options.env {
722 command.env(key, value);
723 }
724
725 #[cfg(unix)]
727 if let Some(user) = &self.options.user {
728 let uid = Self::resolve_user_to_uid(user)?;
729 command.uid(uid);
730 }
731
732 let mut child = command.spawn().map_err(|e| {
733 if e.kind() == std::io::ErrorKind::NotFound {
734 Error::CLINotFound(CLINotFoundError::new(
735 "Claude Code not found",
736 Some(self.cli_path.clone()),
737 ))
738 } else {
739 Error::CLIConnection(CLIConnectionError::new(format!(
740 "Failed to start Claude Code: {e}"
741 )))
742 }
743 })?;
744
745 let stdout = child.stdout.take().ok_or_else(|| {
746 Error::CLIConnection(CLIConnectionError::new(
747 "Failed to open stdout for Claude process",
748 ))
749 })?;
750
751 self.stdin = child.stdin.take();
752 self.stdout = Some(BufReader::new(stdout));
753
754 if let Some(stderr) = child.stderr.take() {
757 let callback = self.stderr_callback.clone();
758 self.stderr_task = Some(tokio::spawn(async move {
759 let mut reader = BufReader::new(stderr);
760 let mut line = String::new();
761 loop {
762 line.clear();
763 match reader.read_line(&mut line).await {
764 Ok(0) => break, Ok(_) => {
766 let trimmed = line.trim_end().to_string();
767 if !trimmed.is_empty() {
768 if let Some(cb) = &callback {
769 let callback_result =
770 panic::catch_unwind(AssertUnwindSafe(|| cb(trimmed)));
771 if callback_result.is_err() {
772 warn!("stderr callback panicked; continuing stderr drain");
773 }
774 }
775 }
776 }
777 Err(_) => break,
778 }
779 }
780 }));
781 }
782
783 self.child = Some(child);
784 self.ready = true;
785 Ok(())
786 }
787
788 async fn write(&mut self, data: &str) -> Result<()> {
789 let _guard = self.write_lock.lock().await;
790
791 if !self.ready {
792 return Err(
793 CLIConnectionError::new("ProcessTransport is not ready for writing").into(),
794 );
795 }
796
797 if let Some(child) = &mut self.child
798 && let Ok(Some(status)) = child.try_wait()
799 {
800 return Err(CLIConnectionError::new(format!(
801 "Cannot write to terminated process (exit code: {:?})",
802 status.code()
803 ))
804 .into());
805 }
806
807 let stdin = self.stdin.as_mut().ok_or_else(|| {
808 Error::CLIConnection(CLIConnectionError::new(
809 "ProcessTransport is not ready for writing",
810 ))
811 })?;
812
813 stdin.write_all(data.as_bytes()).await.map_err(|e| {
814 Error::CLIConnection(CLIConnectionError::new(format!(
815 "Failed to write to process stdin: {e}"
816 )))
817 })?;
818 stdin.flush().await.map_err(|e| {
819 Error::CLIConnection(CLIConnectionError::new(format!(
820 "Failed to flush process stdin: {e}"
821 )))
822 })?;
823
824 Ok(())
825 }
826
827 async fn end_input(&mut self) -> Result<()> {
828 let _guard = self.write_lock.lock().await;
829 self.stdin.take();
830 Ok(())
831 }
832
833 async fn read_next_message(&mut self) -> Result<Option<Value>> {
834 if let Some(message) = self.pending_messages.pop_front() {
835 return Ok(Some(message));
836 }
837
838 if self.child.is_none() || self.stdout.is_none() {
839 return Err(CLIConnectionError::new("Not connected").into());
840 }
841
842 let stdout = self.stdout.as_mut().expect("checked is_some");
843
844 loop {
845 let mut line = String::new();
846 let bytes_read = stdout.read_line(&mut line).await?;
847 if bytes_read == 0 {
848 break;
849 }
850
851 let parsed = self.parser.push_chunk(&line)?;
852 for message in parsed {
853 self.pending_messages.push_back(message);
854 }
855 if let Some(message) = self.pending_messages.pop_front() {
856 return Ok(Some(message));
857 }
858 }
859
860 self.ready = false;
861 if let Some(child) = &mut self.child {
862 let status = child.wait().await.map_err(|e| {
863 Error::Process(ProcessError::new(
864 format!("Failed to wait for process completion: {e}"),
865 None,
866 None,
867 ))
868 })?;
869 if !status.success() {
870 return Err(ProcessError::new(
871 "Command failed",
872 status.code(),
873 Some("Check stderr output for details".to_string()),
874 )
875 .into());
876 }
877 }
878 Ok(None)
879 }
880
881 async fn close(&mut self) -> Result<()> {
882 self.ready = false;
883 self.stdin.take();
884 self.stdout.take();
885 if let Some(child) = &mut self.child
886 && child.try_wait()?.is_none()
887 {
888 let _ = child.kill().await;
889 let _ = child.wait().await;
890 }
891 self.child = None;
892 if let Some(task) = self.stderr_task.take() {
894 task.abort();
895 }
896 Ok(())
897 }
898
899 fn is_ready(&self) -> bool {
900 self.ready
901 }
902
903 fn into_split(mut self: Box<Self>) -> super::TransportSplitResult {
904 if !self.ready {
905 return Err(
906 CLIConnectionError::new("Cannot split a transport that is not connected").into(),
907 );
908 }
909
910 let stdout = self.stdout.take().ok_or_else(|| {
911 Error::CLIConnection(CLIConnectionError::new(
912 "Cannot split: stdout not available",
913 ))
914 })?;
915
916 let stdin = self.stdin.take();
917
918 let close_state = Arc::new(SubprocessCloseState {
919 child: Mutex::new(self.child.take()),
920 stderr_task: Mutex::new(self.stderr_task.take()),
921 });
922
923 let reader = SubprocessReader {
924 stdout,
925 parser: self.parser.clone(),
926 pending_messages: std::mem::take(&mut self.pending_messages),
927 close_state: close_state.clone(),
928 };
929
930 let writer = SubprocessWriter {
931 stdin: Mutex::new(stdin),
932 write_lock: self.write_lock.clone(),
933 };
934
935 Ok((
936 Box::new(reader),
937 Box::new(writer),
938 Box::new(SubprocessCloseHandle { state: close_state }),
939 ))
940 }
941}
942
943struct SubprocessCloseState {
945 child: Mutex<Option<Child>>,
946 stderr_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
947}
948
949pub struct SubprocessReader {
953 stdout: BufReader<ChildStdout>,
954 parser: JsonStreamBuffer,
955 pending_messages: VecDeque<Value>,
956 close_state: Arc<SubprocessCloseState>,
957}
958
959#[async_trait]
960impl TransportReader for SubprocessReader {
961 async fn read_next_message(&mut self) -> Result<Option<Value>> {
962 if let Some(message) = self.pending_messages.pop_front() {
963 return Ok(Some(message));
964 }
965
966 loop {
967 let mut line = String::new();
968 let bytes_read = self.stdout.read_line(&mut line).await?;
969 if bytes_read == 0 {
970 break;
971 }
972
973 let parsed = self.parser.push_chunk(&line)?;
974 for message in parsed {
975 self.pending_messages.push_back(message);
976 }
977 if let Some(message) = self.pending_messages.pop_front() {
978 return Ok(Some(message));
979 }
980 }
981
982 if let Some(child) = &mut *self.close_state.child.lock().await {
984 let status = child.wait().await.map_err(|e| {
985 Error::Process(ProcessError::new(
986 format!("Failed to wait for process completion: {e}"),
987 None,
988 None,
989 ))
990 })?;
991 if !status.success() {
992 return Err(ProcessError::new(
993 "Command failed",
994 status.code(),
995 Some("Check stderr output for details".to_string()),
996 )
997 .into());
998 }
999 }
1000 Ok(None)
1001 }
1002}
1003
1004pub struct SubprocessWriter {
1008 stdin: Mutex<Option<ChildStdin>>,
1009 write_lock: Arc<Mutex<()>>,
1010}
1011
1012#[async_trait]
1013impl TransportWriter for SubprocessWriter {
1014 async fn write(&mut self, data: &str) -> Result<()> {
1015 let _guard = self.write_lock.lock().await;
1016
1017 let mut stdin_guard = self.stdin.lock().await;
1018 let stdin = stdin_guard
1019 .as_mut()
1020 .ok_or_else(|| Error::CLIConnection(CLIConnectionError::new("stdin is closed")))?;
1021
1022 stdin.write_all(data.as_bytes()).await.map_err(|e| {
1023 Error::CLIConnection(CLIConnectionError::new(format!(
1024 "Failed to write to process stdin: {e}"
1025 )))
1026 })?;
1027 stdin.flush().await.map_err(|e| {
1028 Error::CLIConnection(CLIConnectionError::new(format!(
1029 "Failed to flush process stdin: {e}"
1030 )))
1031 })?;
1032
1033 Ok(())
1034 }
1035
1036 async fn end_input(&mut self) -> Result<()> {
1037 let _guard = self.write_lock.lock().await;
1038 self.stdin.lock().await.take();
1039 Ok(())
1040 }
1041}
1042
1043struct SubprocessCloseHandle {
1045 state: Arc<SubprocessCloseState>,
1046}
1047
1048#[async_trait]
1049impl TransportCloseHandle for SubprocessCloseHandle {
1050 async fn close(&self) -> Result<()> {
1051 if let Some(child) = &mut *self.state.child.lock().await {
1053 if child.try_wait()?.is_none() {
1054 let _ = child.kill().await;
1055 let _ = child.wait().await;
1056 }
1057 }
1058 *self.state.child.lock().await = None;
1059
1060 if let Some(task) = self.state.stderr_task.lock().await.take() {
1061 task.abort();
1062 }
1063 Ok(())
1064 }
1065}
1066
1067impl Drop for SubprocessCloseHandle {
1068 fn drop(&mut self) {
1069 if let Ok(mut child_guard) = self.state.child.try_lock() {
1073 if let Some(child) = child_guard.as_mut() {
1074 if child.try_wait().ok().flatten().is_none() {
1075 let _ = child.start_kill();
1076 }
1077 }
1078 }
1079 if let Ok(mut task_guard) = self.state.stderr_task.try_lock() {
1080 if let Some(task) = task_guard.take() {
1081 task.abort();
1082 }
1083 }
1084 }
1085}
1086
1087impl Drop for SubprocessCliTransport {
1088 fn drop(&mut self) {
1089 self.ready = false;
1090 self.stdin.take();
1091 self.stdout.take();
1092
1093 if let Some(child) = &mut self.child
1094 && child.try_wait().ok().flatten().is_none()
1095 {
1096 let _ = child.start_kill();
1097 }
1098 self.child = None;
1099
1100 if let Some(task) = self.stderr_task.take() {
1101 task.abort();
1102 }
1103 }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::SubprocessCliTransport;
1109
1110 #[test]
1111 fn parse_semver_prefix_supports_plain_version() {
1112 assert_eq!(
1113 SubprocessCliTransport::parse_semver_prefix("2.4.1"),
1114 Some([2, 4, 1])
1115 );
1116 }
1117
1118 #[test]
1119 fn parse_semver_prefix_supports_prefixed_version() {
1120 assert_eq!(
1121 SubprocessCliTransport::parse_semver_prefix("2.4.1-beta.1"),
1122 Some([2, 4, 1])
1123 );
1124 }
1125
1126 #[test]
1127 fn parse_semver_prefix_supports_trailing_text_after_whitespace() {
1128 assert_eq!(
1129 SubprocessCliTransport::parse_semver_prefix("2.4.1 (stable channel)"),
1130 Some([2, 4, 1])
1131 );
1132 }
1133
1134 #[test]
1135 fn parse_semver_prefix_rejects_invalid_version() {
1136 assert_eq!(SubprocessCliTransport::parse_semver_prefix("invalid"), None);
1137 }
1138
1139 #[cfg(unix)]
1140 #[test]
1141 fn resolve_user_to_uid_accepts_numeric_uid() {
1142 let uid = nix::unistd::Uid::current().as_raw();
1143 let resolved = SubprocessCliTransport::resolve_user_to_uid(&uid.to_string())
1144 .expect("resolve numeric uid");
1145 assert_eq!(resolved, uid);
1146 }
1147
1148 #[cfg(unix)]
1149 #[test]
1150 fn resolve_user_to_uid_accepts_current_username() {
1151 let current_uid = nix::unistd::Uid::current();
1152 let user = nix::unistd::User::from_uid(current_uid)
1153 .expect("lookup current uid")
1154 .expect("current uid should map to a user");
1155 let resolved = SubprocessCliTransport::resolve_user_to_uid(&user.name)
1156 .expect("resolve current username");
1157 assert_eq!(resolved, current_uid.as_raw());
1158 }
1159
1160 #[cfg(unix)]
1161 #[test]
1162 fn resolve_user_to_uid_rejects_unknown_user() {
1163 let user = format!("__claude_code_sdk_nonexistent_{}__", std::process::id());
1164 let err = SubprocessCliTransport::resolve_user_to_uid(&user).expect_err("must fail");
1165 assert!(err.to_string().contains("User not found"));
1166 }
1167
1168 #[cfg(unix)]
1169 #[test]
1170 fn resolve_user_to_uid_rejects_null_byte_in_username() {
1171 let err =
1172 SubprocessCliTransport::resolve_user_to_uid("name\0with-null").expect_err("must fail");
1173 assert!(err.to_string().contains("Invalid user name"));
1174 }
1175}