1use super::{InputMessage, Transport, TransportState};
6use crate::{
7 errors::{Result, SdkError},
8 types::{ClaudeCodeOptions, ControlRequest, ControlResponse, Message, PermissionMode},
9};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::process::Stdio;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, warn};
19
20const CHANNEL_BUFFER_SIZE: usize = 100;
22
23const MIN_CLI_VERSION: (u32, u32, u32) = (2, 0, 0);
25
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
28struct SemVer {
29 major: u32,
30 minor: u32,
31 patch: u32,
32}
33
34impl SemVer {
35 fn new(major: u32, minor: u32, patch: u32) -> Self {
36 Self {
37 major,
38 minor,
39 patch,
40 }
41 }
42
43 fn parse(version: &str) -> Option<Self> {
45 let version = version.trim().trim_start_matches('v');
46
47 let version = if let Some(v) = version.split('/').next_back() {
49 v
50 } else {
51 version
52 };
53
54 let parts: Vec<&str> = version.split('.').collect();
55 if parts.len() < 2 {
56 return None;
57 }
58
59 Some(Self {
60 major: parts[0].parse().ok()?,
61 minor: parts.get(1)?.parse().ok()?,
62 patch: parts.get(2).and_then(|p| p.parse().ok()).unwrap_or(0),
63 })
64 }
65}
66
67pub struct SubprocessTransport {
69 options: ClaudeCodeOptions,
71 cli_path: PathBuf,
73 child: Option<Child>,
75 stdin_tx: Option<mpsc::Sender<String>>,
77 message_broadcast_tx: Option<tokio::sync::broadcast::Sender<Message>>,
79 control_rx: Option<mpsc::Receiver<ControlResponse>>,
81 sdk_control_rx: Option<mpsc::Receiver<serde_json::Value>>,
83 state: TransportState,
85 request_counter: u64,
87 #[allow(dead_code)]
89 close_stdin_after_prompt: bool,
90}
91
92impl SubprocessTransport {
93 pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
95 let cli_path = find_claude_cli()?;
96 Ok(Self {
97 options,
98 cli_path,
99 child: None,
100 stdin_tx: None,
101 message_broadcast_tx: None,
102 control_rx: None,
103 sdk_control_rx: None,
104 state: TransportState::Disconnected,
105 request_counter: 0,
106 close_stdin_after_prompt: false,
107 })
108 }
109
110 pub async fn new_async(options: ClaudeCodeOptions) -> Result<Self> {
115 let cli_path = match find_claude_cli() {
116 Ok(path) => path,
117 Err(_) if options.auto_download_cli => {
118 info!("Claude CLI not found, attempting automatic download...");
119 crate::cli_download::download_cli(None, None).await?
120 }
121 Err(e) => return Err(e),
122 };
123
124 Ok(Self {
125 options,
126 cli_path,
127 child: None,
128 stdin_tx: None,
129 message_broadcast_tx: None,
130 control_rx: None,
131 sdk_control_rx: None,
132 state: TransportState::Disconnected,
133 request_counter: 0,
134 close_stdin_after_prompt: false,
135 })
136 }
137
138 fn build_settings_value(&self) -> Option<String> {
139 let has_settings = self.options.settings.is_some();
140 let has_sandbox = self.options.sandbox.is_some();
141
142 if !has_settings && !has_sandbox {
143 return None;
144 }
145
146 if has_settings && !has_sandbox {
148 return self.options.settings.clone();
149 }
150
151 let mut settings_obj = serde_json::Map::new();
153
154 if let Some(ref settings) = self.options.settings {
155 let settings_str = settings.trim();
156
157 let load_as_json_string =
158 |s: &str| -> Option<serde_json::Map<String, serde_json::Value>> {
159 match serde_json::from_str::<serde_json::Value>(s) {
160 Ok(serde_json::Value::Object(map)) => Some(map),
161 Ok(_) => {
162 warn!("Settings JSON must be an object; ignoring provided JSON settings");
163 None
164 }
165 Err(_) => None,
166 }
167 };
168
169 let load_from_file =
170 |path: &Path| -> Option<serde_json::Map<String, serde_json::Value>> {
171 let content = std::fs::read_to_string(path).ok()?;
172 match serde_json::from_str::<serde_json::Value>(&content) {
173 Ok(serde_json::Value::Object(map)) => Some(map),
174 Ok(_) => {
175 warn!("Settings file JSON must be an object: {}", path.display());
176 None
177 }
178 Err(e) => {
179 warn!("Failed to parse settings file {}: {}", path.display(), e);
180 None
181 }
182 }
183 };
184
185 if settings_str.starts_with('{') && settings_str.ends_with('}') {
186 if let Some(map) = load_as_json_string(settings_str) {
187 settings_obj = map;
188 } else {
189 warn!(
190 "Failed to parse settings as JSON, treating as file path: {}",
191 settings_str
192 );
193 let settings_path = Path::new(settings_str);
194 if settings_path.exists() {
195 if let Some(map) = load_from_file(settings_path) {
196 settings_obj = map;
197 }
198 } else {
199 warn!("Settings file not found: {}", settings_path.display());
200 }
201 }
202 } else {
203 let settings_path = Path::new(settings_str);
204 if settings_path.exists() {
205 if let Some(map) = load_from_file(settings_path) {
206 settings_obj = map;
207 }
208 } else {
209 warn!("Settings file not found: {}", settings_path.display());
210 }
211 }
212 }
213
214 if let Some(ref sandbox) = self.options.sandbox {
215 match serde_json::to_value(sandbox) {
216 Ok(value) => {
217 settings_obj.insert("sandbox".to_string(), value);
218 }
219 Err(e) => {
220 warn!("Failed to serialize sandbox settings: {}", e);
221 }
222 }
223 }
224
225 Some(serde_json::Value::Object(settings_obj).to_string())
226 }
227
228 pub fn subscribe_messages(&self) -> Option<Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>> {
230 self.message_broadcast_tx.as_ref().map(|tx| {
231 let rx = tx.subscribe();
232 Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
233 |result| async move {
234 match result {
235 Ok(msg) => Some(Ok(msg)),
236 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
237 warn!("Receiver lagged by {} messages", n);
238 None
239 }
240 }
241 },
242 )) as Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>
243 })
244 }
245
246 #[allow(dead_code)]
248 pub async fn receive_sdk_control_request(&mut self) -> Option<serde_json::Value> {
249 if let Some(ref mut rx) = self.sdk_control_rx {
250 rx.recv().await
251 } else {
252 None
253 }
254 }
255
256 pub fn take_sdk_control_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
258 self.sdk_control_rx.take()
259 }
260
261 pub fn with_cli_path(options: ClaudeCodeOptions, cli_path: impl Into<PathBuf>) -> Self {
263 Self {
264 options,
265 cli_path: cli_path.into(),
266 child: None,
267 stdin_tx: None,
268 message_broadcast_tx: None,
269 control_rx: None,
270 sdk_control_rx: None,
271 state: TransportState::Disconnected,
272 request_counter: 0,
273 close_stdin_after_prompt: false,
274 }
275 }
276
277 #[allow(dead_code)]
279 pub fn set_close_stdin_after_prompt(&mut self, close: bool) {
280 self.close_stdin_after_prompt = close;
281 }
282
283 #[allow(dead_code)]
285 pub fn for_print_mode(options: ClaudeCodeOptions, _prompt: String) -> Result<Self> {
286 let cli_path = find_claude_cli()?;
287 Ok(Self {
288 options,
289 cli_path,
290 child: None,
291 stdin_tx: None,
292 message_broadcast_tx: None,
293 control_rx: None,
294 sdk_control_rx: None,
295 state: TransportState::Disconnected,
296 request_counter: 0,
297 close_stdin_after_prompt: true,
298 })
299 }
300
301 fn build_command(&self) -> Command {
303 let mut cmd = Command::new(&self.cli_path);
304
305 cmd.arg("--output-format").arg("stream-json");
307 cmd.arg("--verbose");
308
309 cmd.arg("--input-format").arg("stream-json");
311
312 if self.options.include_partial_messages {
314 cmd.arg("--include-partial-messages");
315 }
316
317 if self.options.debug_stderr.is_some() {
319 cmd.arg("--debug-to-stderr");
320 }
321
322 if let Some(max_tokens) = self.options.max_output_tokens {
325 let capped = max_tokens.clamp(1, 32000);
327 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
328 debug!("Setting max_output_tokens from option: {}", capped);
329 } else {
330 if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
332 if let Ok(tokens) = current_value.parse::<u32>() {
333 if tokens > 32000 {
334 warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
335 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
336 }
337 } else {
339 warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
341 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
342 }
343 }
344 }
345
346 if let Some(ref prompt_v2) = self.options.system_prompt_v2 {
350 match prompt_v2 {
351 crate::types::SystemPrompt::String(s) => {
352 cmd.arg("--system-prompt").arg(s);
353 }
354 crate::types::SystemPrompt::Preset { append, .. } => {
355 if let Some(append_text) = append {
358 cmd.arg("--append-system-prompt").arg(append_text);
359 }
360 }
361 }
362 } else {
363 #[allow(deprecated)]
365 match self.options.system_prompt.as_deref() {
366 Some(prompt) => {
367 cmd.arg("--system-prompt").arg(prompt);
368 }
369 None => {
370 cmd.arg("--system-prompt").arg("");
371 }
372 }
373 #[allow(deprecated)]
374 if let Some(ref prompt) = self.options.append_system_prompt {
375 cmd.arg("--append-system-prompt").arg(prompt);
376 }
377 }
378
379 if !self.options.allowed_tools.is_empty() {
381 cmd.arg("--allowedTools")
382 .arg(self.options.allowed_tools.join(","));
383 }
384 if !self.options.disallowed_tools.is_empty() {
385 cmd.arg("--disallowedTools")
386 .arg(self.options.disallowed_tools.join(","));
387 }
388
389 match self.options.permission_mode {
391 PermissionMode::Default => {
392 cmd.arg("--permission-mode").arg("default");
393 }
394 PermissionMode::AcceptEdits => {
395 cmd.arg("--permission-mode").arg("acceptEdits");
396 }
397 PermissionMode::Plan => {
398 cmd.arg("--permission-mode").arg("plan");
399 }
400 PermissionMode::BypassPermissions => {
401 cmd.arg("--permission-mode").arg("bypassPermissions");
402 }
403 PermissionMode::DontAsk => {
404 cmd.arg("--permission-mode").arg("dontAsk");
405 }
406 }
407
408 if let Some(ref model) = self.options.model {
410 cmd.arg("--model").arg(model);
411 }
412
413 if let Some(ref tool_name) = self.options.permission_prompt_tool_name {
415 cmd.arg("--permission-prompt-tool").arg(tool_name);
416 }
417
418 if let Some(max_turns) = self.options.max_turns {
420 cmd.arg("--max-turns").arg(max_turns.to_string());
421 }
422
423 if let Some(ref thinking) = self.options.thinking {
425 match thinking {
426 crate::types::ThinkingConfig::Enabled { budget_tokens } => {
427 cmd.arg("--max-thinking-tokens")
428 .arg(budget_tokens.to_string());
429 }
430 crate::types::ThinkingConfig::Disabled => {
431 }
433 crate::types::ThinkingConfig::Adaptive => {
434 }
436 }
437 } else if let Some(max_thinking_tokens) = self.options.max_thinking_tokens {
438 if max_thinking_tokens > 0 {
439 cmd.arg("--max-thinking-tokens")
440 .arg(max_thinking_tokens.to_string());
441 }
442 }
443
444 if let Some(ref cwd) = self.options.cwd {
446 cmd.current_dir(cwd);
447 }
448
449 for (key, value) in &self.options.env {
451 cmd.env(key, value);
452 }
453
454 if !self.options.mcp_servers.is_empty() {
456 let mcp_config = serde_json::json!({
457 "mcpServers": self.options.mcp_servers
458 });
459 cmd.arg("--mcp-config").arg(mcp_config.to_string());
460 }
461
462 if self.options.continue_conversation {
464 cmd.arg("--continue");
465 }
466 if let Some(ref resume_id) = self.options.resume {
467 cmd.arg("--resume").arg(resume_id);
468 }
469
470 if let Some(settings_value) = self.build_settings_value() {
472 cmd.arg("--settings").arg(settings_value);
473 }
474
475 for dir in &self.options.add_dirs {
477 cmd.arg("--add-dir").arg(dir);
478 }
479
480 if self.options.fork_session {
482 cmd.arg("--fork-session");
483 }
484
485 if let Some(ref tools) = self.options.tools {
489 match tools {
490 crate::types::ToolsConfig::List(list) => {
491 if list.is_empty() {
492 cmd.arg("--tools").arg("");
493 } else {
494 cmd.arg("--tools").arg(list.join(","));
495 }
496 }
497 crate::types::ToolsConfig::Preset(_preset) => {
498 cmd.arg("--tools").arg("default");
500 }
501 }
502 }
503
504 if !self.options.betas.is_empty() {
506 let betas: Vec<String> = self.options.betas.iter().map(|b| b.to_string()).collect();
507 cmd.arg("--betas").arg(betas.join(","));
508 }
509
510 if let Some(budget) = self.options.max_budget_usd {
512 cmd.arg("--max-budget-usd").arg(budget.to_string());
513 }
514
515 if let Some(ref fallback) = self.options.fallback_model {
517 cmd.arg("--fallback-model").arg(fallback);
518 }
519
520 if self.options.enable_file_checkpointing {
522 cmd.env("CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", "true");
523 }
524
525 if let Some(ref format) = self.options.output_format
527 && format.get("type").and_then(|v| v.as_str()) == Some("json_schema")
528 && let Some(schema) = format.get("schema")
529 && let Ok(schema_json) = serde_json::to_string(schema)
530 {
531 cmd.arg("--json-schema").arg(schema_json);
532 }
533
534 for plugin in &self.options.plugins {
536 match plugin {
537 crate::types::SdkPluginConfig::Local { path } => {
538 cmd.arg("--plugin-dir").arg(path);
539 }
540 }
541 }
542
543 if let Some(ref agents) = self.options.agents
545 && !agents.is_empty()
546 && let Ok(json_str) = serde_json::to_string(agents) {
547 cmd.arg("--agents").arg(json_str);
548 }
549
550 let sources_value = self
552 .options
553 .setting_sources
554 .as_ref()
555 .map(|sources| {
556 sources
557 .iter()
558 .map(|s| match s {
559 crate::types::SettingSource::User => "user",
560 crate::types::SettingSource::Project => "project",
561 crate::types::SettingSource::Local => "local",
562 })
563 .collect::<Vec<_>>()
564 .join(",")
565 })
566 .unwrap_or_default();
567 cmd.arg("--setting-sources").arg(sources_value);
568
569 if let Some(ref effort) = self.options.effort {
571 cmd.arg("--effort").arg(effort.to_string());
572 }
573
574 for (key, value) in &self.options.extra_args {
576 let flag = if key.starts_with("--") || key.starts_with("-") {
577 key.clone()
578 } else {
579 format!("--{key}")
580 };
581 cmd.arg(&flag);
582 if let Some(val) = value {
583 cmd.arg(val);
584 }
585 }
586
587 cmd.stdin(Stdio::piped())
589 .stdout(Stdio::piped())
590 .stderr(Stdio::piped());
591
592 cmd.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
594 cmd.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
595
596 debug!(
598 "Executing Claude CLI command: {} {:?}",
599 self.cli_path.display(),
600 cmd.as_std().get_args().collect::<Vec<_>>()
601 );
602
603 cmd
604 }
605
606 async fn check_cli_version(&self) -> Result<()> {
608 let output = tokio::time::timeout(
610 std::time::Duration::from_secs(5),
611 tokio::process::Command::new(&self.cli_path)
612 .arg("--version")
613 .output(),
614 )
615 .await;
616
617 let output = match output {
618 Ok(Ok(output)) => output,
619 Ok(Err(e)) => {
620 warn!("Failed to check CLI version: {}", e);
621 return Ok(()); }
623 Err(_) => {
624 warn!("CLI version check timed out after 5 seconds");
625 return Ok(());
626 }
627 };
628
629 let version_str = String::from_utf8_lossy(&output.stdout);
630 let version_str = version_str.trim();
631
632 if let Some(semver) = SemVer::parse(version_str) {
633 let min_version = SemVer::new(MIN_CLI_VERSION.0, MIN_CLI_VERSION.1, MIN_CLI_VERSION.2);
634
635 if semver < min_version {
636 warn!(
637 "⚠️ Claude CLI version {}.{}.{} is below minimum required version {}.{}.{}",
638 semver.major,
639 semver.minor,
640 semver.patch,
641 MIN_CLI_VERSION.0,
642 MIN_CLI_VERSION.1,
643 MIN_CLI_VERSION.2
644 );
645 warn!(
646 " Some features may not work correctly. Please upgrade with: npm install -g @anthropic-ai/claude-code@latest"
647 );
648 } else {
649 info!("Claude CLI version: {}.{}.{}", semver.major, semver.minor, semver.patch);
650 }
651 } else {
652 debug!("Could not parse CLI version: {}", version_str);
653 }
654
655 Ok(())
656 }
657
658 async fn spawn_process(&mut self) -> Result<()> {
660 self.state = TransportState::Connecting;
661
662 let mut cmd = self.build_command();
663 info!("Starting Claude CLI with command: {:?}", cmd);
664
665 if let Some(user) = self.options.user.as_deref() {
666 apply_process_user(&mut cmd, user)?;
667 }
668
669 let mut child = cmd.spawn().map_err(|e| {
670 error!("Failed to spawn Claude CLI: {}", e);
671 SdkError::ProcessError(e)
672 })?;
673
674 let stdin = child
676 .stdin
677 .take()
678 .ok_or_else(|| SdkError::ConnectionError("Failed to get stdin".into()))?;
679 let stdout = child
680 .stdout
681 .take()
682 .ok_or_else(|| SdkError::ConnectionError("Failed to get stdout".into()))?;
683 let stderr = child
684 .stderr
685 .take()
686 .ok_or_else(|| SdkError::ConnectionError("Failed to get stderr".into()))?;
687
688 let buffer_size = self.options.cli_channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE);
690
691 let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(buffer_size);
693 let (message_broadcast_tx, _) =
695 tokio::sync::broadcast::channel::<Message>(buffer_size);
696 let (control_tx, control_rx) = mpsc::channel::<ControlResponse>(buffer_size);
697
698 tokio::spawn(async move {
700 let mut stdin = stdin;
701 debug!("Stdin handler started");
702 while let Some(line) = stdin_rx.recv().await {
703 debug!("Received line from channel: {}", line);
704 if let Err(e) = stdin.write_all(line.as_bytes()).await {
705 error!("Failed to write to stdin: {}", e);
706 break;
707 }
708 if let Err(e) = stdin.write_all(b"\n").await {
709 error!("Failed to write newline: {}", e);
710 break;
711 }
712 if let Err(e) = stdin.flush().await {
713 error!("Failed to flush stdin: {}", e);
714 break;
715 }
716 debug!("Successfully sent to Claude process: {}", line);
717 }
718 debug!("Stdin handler ended");
719 });
720
721 let (sdk_control_tx, sdk_control_rx) = mpsc::channel::<serde_json::Value>(buffer_size);
723
724 let message_broadcast_tx_clone = message_broadcast_tx.clone();
726 let control_tx_clone = control_tx.clone();
727 let sdk_control_tx_clone = sdk_control_tx.clone();
728 tokio::spawn(async move {
729 debug!("Stdout handler started");
730 let reader = BufReader::new(stdout);
731 let mut lines = reader.lines();
732
733 while let Ok(Some(line)) = lines.next_line().await {
734 if line.trim().is_empty() {
735 continue;
736 }
737
738 debug!("Claude output: {}", line);
739
740 match serde_json::from_str::<serde_json::Value>(&line) {
742 Ok(json) => {
743 if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
745 if msg_type == "control_response" {
747 debug!("Received control response: {:?}", json);
748
749 let _ = sdk_control_tx_clone.send(json.clone()).await;
751
752 if let Some(response_obj) = json.get("response")
757 && let Some(request_id) = response_obj.get("request_id")
758 .or_else(|| response_obj.get("requestId"))
759 .and_then(|v| v.as_str())
760 {
761 let subtype = response_obj.get("subtype").and_then(|v| v.as_str());
763 let success = subtype == Some("success");
764
765 let control_resp = ControlResponse::InterruptAck {
766 request_id: request_id.to_string(),
767 success,
768 };
769 let _ = control_tx_clone.send(control_resp).await;
770 }
771 continue;
772 }
773
774 if msg_type == "control_request" {
776 debug!("Received control request from CLI: {:?}", json);
777 let _ = sdk_control_tx_clone.send(json.clone()).await;
779 continue;
780 }
781
782 if msg_type == "control"
784 && let Some(control) = json.get("control") {
785 debug!("Received control message: {:?}", control);
786 let _ = sdk_control_tx_clone.send(control.clone()).await;
787 continue;
788 }
789
790 if msg_type == "sdk_control_request" {
792 debug!("Received SDK control request (legacy): {:?}", json);
794 let _ = sdk_control_tx_clone.send(json.clone()).await;
795 continue;
796 }
797
798 if msg_type == "system"
800 && let Some(subtype) = json.get("subtype").and_then(|v| v.as_str())
801 && subtype.starts_with("sdk_control:") {
802 debug!("Received SDK control message: {}", subtype);
804 let _ = sdk_control_tx_clone.send(json.clone()).await;
805 }
807 }
808
809 match crate::message_parser::parse_message(json) {
811 Ok(Some(message)) => {
812 let _ = message_broadcast_tx_clone.send(message);
814 }
815 Ok(None) => {
816 }
818 Err(e) => {
819 warn!("Failed to parse message: {}", e);
820 }
821 }
822 }
823 Err(e) => {
824 warn!("Failed to parse JSON: {} - Line: {}", e, line);
825 }
826 }
827 }
828 info!("Stdout reader ended");
829 });
830
831 let message_broadcast_tx_for_error = message_broadcast_tx.clone();
833 let debug_stderr = self.options.debug_stderr.clone();
834 let stderr_callback = self.options.stderr_callback.clone();
835 tokio::spawn(async move {
836 let reader = BufReader::new(stderr);
837 let mut lines = reader.lines();
838 let mut error_buffer = Vec::new();
839
840 while let Ok(Some(line)) = lines.next_line().await {
841 if !line.trim().is_empty() {
842 if let Some(ref debug_output) = debug_stderr {
844 let mut output = debug_output.lock().await;
845 let _ = writeln!(output, "{line}");
846 let _ = output.flush();
847 }
848
849 if let Some(ref callback) = stderr_callback {
850 callback.as_ref()(line.as_str());
851 }
852
853 error!("Claude CLI stderr: {}", line);
854 error_buffer.push(line.clone());
855
856 if line.contains("command not found") || line.contains("No such file") {
858 error!("Claude CLI binary not found or not executable");
859 } else if line.contains("ENOENT") || line.contains("spawn") {
860 error!("Failed to spawn Claude CLI process - binary may not be installed");
861 } else if line.contains("authentication") || line.contains("API key") || line.contains("Unauthorized") {
862 error!("Claude CLI authentication error - please run 'claude-code api login'");
863 } else if line.contains("model") && (line.contains("not available") || line.contains("not found")) {
864 error!("Model not available for your account: {}", line);
865 } else if line.contains("Error:") || line.contains("error:") {
866 error!("Claude CLI error detected: {}", line);
867 }
868 }
869 }
870
871 if !error_buffer.is_empty() {
873 let error_msg = error_buffer.join("\n");
874 error!("Claude CLI stderr output collected:\n{}", error_msg);
875
876 let _ = message_broadcast_tx_for_error.send(Message::System {
878 subtype: "error".to_string(),
879 data: serde_json::json!({
880 "source": "stderr",
881 "error": "Claude CLI error output",
882 "details": error_msg
883 }),
884 });
885 }
886 });
887
888 self.child = Some(child);
890 self.stdin_tx = Some(stdin_tx);
891 self.message_broadcast_tx = Some(message_broadcast_tx);
892 self.control_rx = Some(control_rx);
893 self.sdk_control_rx = Some(sdk_control_rx);
894 self.state = TransportState::Connected;
895
896 Ok(())
897 }
898}
899
900#[async_trait]
901impl Transport for SubprocessTransport {
902 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
903 self
904 }
905
906 async fn connect(&mut self) -> Result<()> {
907 if self.state == TransportState::Connected {
908 return Ok(());
909 }
910
911 if let Err(e) = self.check_cli_version().await {
913 warn!("CLI version check failed: {}", e);
914 }
915
916 self.spawn_process().await?;
917 info!("Connected to Claude CLI");
918 Ok(())
919 }
920
921 async fn send_message(&mut self, message: InputMessage) -> Result<()> {
922 if self.state != TransportState::Connected {
923 return Err(SdkError::InvalidState {
924 message: "Not connected".into(),
925 });
926 }
927
928 let json = serde_json::to_string(&message)?;
929 debug!("Serialized message: {}", json);
930
931 if let Some(ref tx) = self.stdin_tx {
932 debug!("Sending message to stdin channel");
933 tx.send(json).await?;
934 debug!("Message sent to channel");
935 Ok(())
936 } else {
937 Err(SdkError::InvalidState {
938 message: "Stdin channel not available".into(),
939 })
940 }
941 }
942
943 fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>> {
944 if let Some(ref tx) = self.message_broadcast_tx {
945 let rx = tx.subscribe();
947 Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
949 |result| async move {
950 match result {
951 Ok(msg) => Some(Ok(msg)),
952 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(
953 n,
954 )) => {
955 warn!("Receiver lagged by {} messages", n);
956 None
957 }
958 }
959 },
960 ))
961 } else {
962 Box::pin(futures::stream::empty())
963 }
964 }
965
966 async fn send_control_request(&mut self, request: ControlRequest) -> Result<()> {
967 if self.state != TransportState::Connected {
968 return Err(SdkError::InvalidState {
969 message: "Not connected".into(),
970 });
971 }
972
973 self.request_counter += 1;
974 let control_msg = match request {
975 ControlRequest::Interrupt { request_id } => {
976 serde_json::json!({
977 "type": "control_request",
978 "request": {
979 "type": "interrupt",
980 "request_id": request_id
981 }
982 })
983 }
984 };
985
986 let json = serde_json::to_string(&control_msg)?;
987
988 if let Some(ref tx) = self.stdin_tx {
989 tx.send(json).await?;
990 Ok(())
991 } else {
992 Err(SdkError::InvalidState {
993 message: "Stdin channel not available".into(),
994 })
995 }
996 }
997
998 async fn receive_control_response(&mut self) -> Result<Option<ControlResponse>> {
999 if let Some(ref mut rx) = self.control_rx {
1000 Ok(rx.recv().await)
1001 } else {
1002 Ok(None)
1003 }
1004 }
1005
1006 async fn send_sdk_control_request(&mut self, request: serde_json::Value) -> Result<()> {
1007 let json = serde_json::to_string(&request)?;
1010
1011 if let Some(ref tx) = self.stdin_tx {
1012 tx.send(json).await?;
1013 Ok(())
1014 } else {
1015 Err(SdkError::InvalidState {
1016 message: "Stdin channel not available".into(),
1017 })
1018 }
1019 }
1020
1021 async fn send_sdk_control_response(&mut self, response: serde_json::Value) -> Result<()> {
1022 let control_response = serde_json::json!({
1025 "type": "control_response",
1026 "response": response
1027 });
1028
1029 let json = serde_json::to_string(&control_response)?;
1030
1031 if let Some(ref tx) = self.stdin_tx {
1032 tx.send(json).await?;
1033 Ok(())
1034 } else {
1035 Err(SdkError::InvalidState {
1036 message: "Stdin channel not available".into(),
1037 })
1038 }
1039 }
1040
1041 fn is_connected(&self) -> bool {
1042 self.state == TransportState::Connected
1043 }
1044
1045 async fn disconnect(&mut self) -> Result<()> {
1046 if self.state != TransportState::Connected {
1047 return Ok(());
1048 }
1049
1050 self.state = TransportState::Disconnecting;
1051
1052 self.stdin_tx.take();
1054
1055 if let Some(mut child) = self.child.take() {
1057 match child.kill().await {
1058 Ok(()) => info!("Claude CLI process terminated"),
1059 Err(e) => warn!("Failed to kill Claude CLI process: {}", e),
1060 }
1061 }
1062
1063 self.state = TransportState::Disconnected;
1064 Ok(())
1065 }
1066
1067 fn take_sdk_control_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<serde_json::Value>> {
1068 self.sdk_control_rx.take()
1069 }
1070
1071 async fn end_input(&mut self) -> Result<()> {
1072 self.stdin_tx.take();
1074 Ok(())
1075 }
1076}
1077
1078impl Drop for SubprocessTransport {
1079 fn drop(&mut self) {
1080 if let Some(mut child) = self.child.take() {
1081 let _ = child.start_kill();
1083 }
1084 }
1085}
1086
1087pub fn find_claude_cli() -> Result<PathBuf> {
1094 for cmd_name in &["claude", "claude-code"] {
1096 if let Ok(path) = which::which(cmd_name) {
1097 debug!("Found Claude CLI in PATH at: {}", path.display());
1098 return Ok(path);
1099 }
1100 }
1101
1102 if let Some(cached_path) = crate::cli_download::get_cached_cli_path() {
1104 if cached_path.exists() && cached_path.is_file() {
1105 debug!("Found cached Claude CLI at: {}", cached_path.display());
1106 return Ok(cached_path);
1107 }
1108 }
1109
1110 let home = dirs::home_dir().ok_or_else(|| SdkError::CliNotFound {
1112 searched_paths: "Unable to determine home directory".into(),
1113 })?;
1114
1115 let locations = vec![
1116 home.join(".npm-global/bin/claude"),
1118 home.join(".npm-global/bin/claude-code"),
1119 PathBuf::from("/usr/local/bin/claude"),
1120 PathBuf::from("/usr/local/bin/claude-code"),
1121 home.join(".local/bin/claude"),
1123 home.join(".local/bin/claude-code"),
1124 home.join("node_modules/.bin/claude"),
1125 home.join("node_modules/.bin/claude-code"),
1126 home.join(".yarn/bin/claude"),
1128 home.join(".yarn/bin/claude-code"),
1129 PathBuf::from("/opt/homebrew/bin/claude"),
1131 PathBuf::from("/opt/homebrew/bin/claude-code"),
1132 home.join(".claude/local/claude"),
1134 ];
1135
1136 let mut searched = Vec::new();
1137 for path in &locations {
1138 searched.push(path.display().to_string());
1139 if path.exists() && path.is_file() {
1140 debug!("Found Claude CLI at: {}", path.display());
1141 return Ok(path.clone());
1142 }
1143 }
1144
1145 warn!("Claude CLI not found in any standard location");
1147 warn!("Searched paths: {:?}", searched);
1148
1149 if which::which("node").is_err() && which::which("npm").is_err() {
1151 error!("Node.js/npm not found - Claude CLI requires Node.js");
1152 return Err(SdkError::CliNotFound {
1153 searched_paths: format!(
1154 "Node.js is not installed. Install from https://nodejs.org/\n\n\
1155 Alternatively, enable auto_download_cli to automatically download the CLI:\n\
1156 ```rust\n\
1157 let options = ClaudeCodeOptions::builder()\n\
1158 .auto_download_cli(true)\n\
1159 .build();\n\
1160 ```\n\n\
1161 Searched in:\n{}",
1162 searched.join("\n")
1163 ),
1164 });
1165 }
1166
1167 Err(SdkError::CliNotFound {
1168 searched_paths: format!(
1169 "Claude CLI not found.\n\n\
1170 Option 1 - Auto-download (recommended):\n\
1171 ```rust\n\
1172 let options = ClaudeCodeOptions::builder()\n\
1173 .auto_download_cli(true)\n\
1174 .build();\n\
1175 ```\n\n\
1176 Option 2 - Manual installation:\n\
1177 npm install -g @anthropic-ai/claude-code\n\n\
1178 Searched in:\n{}",
1179 searched.join("\n")
1180 ),
1181 })
1182}
1183
1184pub(crate) fn apply_process_user(cmd: &mut Command, user: &str) -> Result<()> {
1185 let user = user.trim();
1186 if user.is_empty() {
1187 return Err(SdkError::ConfigError(
1188 "options.user must be a non-empty username or uid".into(),
1189 ));
1190 }
1191
1192 apply_process_user_inner(cmd, user)
1193}
1194
1195#[cfg(unix)]
1196fn apply_process_user_inner(cmd: &mut Command, user: &str) -> Result<()> {
1197 use std::ffi::CString;
1198 use std::mem::MaybeUninit;
1199 use std::os::unix::process::CommandExt;
1200 use std::ptr;
1201
1202 fn passwd_buf_len() -> usize {
1203 let buf_len = unsafe { libc::sysconf(libc::_SC_GETPW_R_SIZE_MAX) };
1204 if buf_len <= 0 {
1205 16 * 1024
1206 } else {
1207 buf_len as usize
1208 }
1209 }
1210
1211 fn lookup_by_name(name: &str) -> Result<(u32, u32)> {
1212 let name = CString::new(name).map_err(|_| {
1213 SdkError::ConfigError("options.user must not contain NUL bytes".into())
1214 })?;
1215
1216 let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1217 let mut result: *mut libc::passwd = ptr::null_mut();
1218 let mut buf = vec![0u8; passwd_buf_len()];
1219
1220 let rc = unsafe {
1221 libc::getpwnam_r(
1222 name.as_ptr(),
1223 pwd.as_mut_ptr(),
1224 buf.as_mut_ptr() as *mut libc::c_char,
1225 buf.len(),
1226 &mut result,
1227 )
1228 };
1229 if rc != 0 {
1230 return Err(SdkError::ConfigError(format!(
1231 "Failed to resolve options.user={}: getpwnam_r returned {}",
1232 name.to_string_lossy(),
1233 rc
1234 )));
1235 }
1236 if result.is_null() {
1237 return Err(SdkError::ConfigError(format!(
1238 "User not found: {}",
1239 name.to_string_lossy()
1240 )));
1241 }
1242
1243 let pwd = unsafe { pwd.assume_init() };
1244 Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1245 }
1246
1247 fn lookup_by_uid(uid: u32) -> Result<(u32, u32)> {
1248 let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1249 let mut result: *mut libc::passwd = ptr::null_mut();
1250 let mut buf = vec![0u8; passwd_buf_len()];
1251
1252 let rc = unsafe {
1253 libc::getpwuid_r(
1254 uid as libc::uid_t,
1255 pwd.as_mut_ptr(),
1256 buf.as_mut_ptr() as *mut libc::c_char,
1257 buf.len(),
1258 &mut result,
1259 )
1260 };
1261 if rc != 0 {
1262 return Err(SdkError::ConfigError(format!(
1263 "Failed to resolve options.user={}: getpwuid_r returned {}",
1264 uid, rc
1265 )));
1266 }
1267 if result.is_null() {
1268 return Err(SdkError::ConfigError(format!("User not found for uid: {}", uid)));
1269 }
1270
1271 let pwd = unsafe { pwd.assume_init() };
1272 Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1273 }
1274
1275 let (uid, gid) = match user.parse::<u32>() {
1276 Ok(uid) => lookup_by_uid(uid)?,
1277 Err(_) => lookup_by_name(user)?,
1278 };
1279
1280 cmd.as_std_mut().uid(uid).gid(gid);
1281 Ok(())
1282}
1283
1284#[cfg(not(unix))]
1285fn apply_process_user_inner(_cmd: &mut Command, _user: &str) -> Result<()> {
1286 Err(SdkError::NotSupported {
1287 feature: "options.user is only supported on Unix platforms".into(),
1288 })
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use super::*;
1294
1295 #[test]
1296 fn test_find_claude_cli_error_message() {
1297 let error = SdkError::CliNotFound {
1299 searched_paths: "test paths".to_string(),
1300 };
1301 let error_msg = error.to_string();
1302 assert!(error_msg.contains("npm install -g @anthropic-ai/claude-code"));
1303 assert!(error_msg.contains("test paths"));
1304 }
1305
1306 #[tokio::test]
1307 async fn test_transport_lifecycle() {
1308 let options = ClaudeCodeOptions::default();
1309 let transport = SubprocessTransport::new(options).unwrap_or_else(|_| {
1310 SubprocessTransport::with_cli_path(ClaudeCodeOptions::default(), "/usr/bin/true")
1312 });
1313
1314 assert!(!transport.is_connected());
1315 assert_eq!(transport.state, TransportState::Disconnected);
1316 }
1317
1318 #[test]
1319 fn test_semver_parse() {
1320 let v = SemVer::parse("2.0.0").unwrap();
1322 assert_eq!(v.major, 2);
1323 assert_eq!(v.minor, 0);
1324 assert_eq!(v.patch, 0);
1325
1326 let v = SemVer::parse("v2.1.3").unwrap();
1328 assert_eq!(v.major, 2);
1329 assert_eq!(v.minor, 1);
1330 assert_eq!(v.patch, 3);
1331
1332 let v = SemVer::parse("@anthropic-ai/claude-code/2.5.1").unwrap();
1334 assert_eq!(v.major, 2);
1335 assert_eq!(v.minor, 5);
1336 assert_eq!(v.patch, 1);
1337
1338 let v = SemVer::parse("2.1").unwrap();
1340 assert_eq!(v.major, 2);
1341 assert_eq!(v.minor, 1);
1342 assert_eq!(v.patch, 0);
1343 }
1344
1345 #[test]
1346 fn test_semver_compare() {
1347 let v1 = SemVer::new(2, 0, 0);
1348 let v2 = SemVer::new(2, 0, 1);
1349 let v3 = SemVer::new(2, 1, 0);
1350 let v4 = SemVer::new(3, 0, 0);
1351
1352 assert!(v1 < v2);
1353 assert!(v2 < v3);
1354 assert!(v3 < v4);
1355 assert!(v1 < v4);
1356
1357 let min_version = SemVer::new(2, 0, 0);
1358 assert!(SemVer::new(1, 9, 9) < min_version);
1359 assert!(SemVer::new(2, 0, 0) >= min_version);
1360 assert!(SemVer::new(2, 1, 0) >= min_version);
1361 }
1362}