1use super::{InputMessage, Transport, TransportState};
6use crate::{
7 errors::{Result, SdkError},
8 types::{ClaudeCodeOptions, ControlRequest, ControlResponse, Message, PermissionMode},
9};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::process::Stdio;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, warn};
19
20const CHANNEL_BUFFER_SIZE: usize = 100;
22
23const MIN_CLI_VERSION: (u32, u32, u32) = (2, 0, 0);
25
26#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
28struct SemVer {
29 major: u32,
30 minor: u32,
31 patch: u32,
32}
33
34impl SemVer {
35 fn new(major: u32, minor: u32, patch: u32) -> Self {
36 Self {
37 major,
38 minor,
39 patch,
40 }
41 }
42
43 fn parse(version: &str) -> Option<Self> {
45 let version = version.trim().trim_start_matches('v');
46
47 let version = if let Some(v) = version.split('/').next_back() {
49 v
50 } else {
51 version
52 };
53
54 let parts: Vec<&str> = version.split('.').collect();
55 if parts.len() < 2 {
56 return None;
57 }
58
59 Some(Self {
60 major: parts[0].parse().ok()?,
61 minor: parts.get(1)?.parse().ok()?,
62 patch: parts.get(2).and_then(|p| p.parse().ok()).unwrap_or(0),
63 })
64 }
65}
66
67pub struct SubprocessTransport {
69 options: ClaudeCodeOptions,
71 cli_path: PathBuf,
73 child: Option<Child>,
75 stdin_tx: Option<mpsc::Sender<String>>,
77 message_broadcast_tx: Option<tokio::sync::broadcast::Sender<Message>>,
79 control_rx: Option<mpsc::Receiver<ControlResponse>>,
81 sdk_control_rx: Option<mpsc::Receiver<serde_json::Value>>,
83 state: TransportState,
85 request_counter: u64,
87 #[allow(dead_code)]
89 close_stdin_after_prompt: bool,
90}
91
92impl SubprocessTransport {
93 pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
95 let cli_path = find_claude_cli()?;
96 Ok(Self {
97 options,
98 cli_path,
99 child: None,
100 stdin_tx: None,
101 message_broadcast_tx: None,
102 control_rx: None,
103 sdk_control_rx: None,
104 state: TransportState::Disconnected,
105 request_counter: 0,
106 close_stdin_after_prompt: false,
107 })
108 }
109
110 pub async fn new_async(options: ClaudeCodeOptions) -> Result<Self> {
115 let cli_path = match find_claude_cli() {
116 Ok(path) => path,
117 Err(_) if options.auto_download_cli => {
118 info!("Claude CLI not found, attempting automatic download...");
119 crate::cli_download::download_cli(None, None).await?
120 }
121 Err(e) => return Err(e),
122 };
123
124 Ok(Self {
125 options,
126 cli_path,
127 child: None,
128 stdin_tx: None,
129 message_broadcast_tx: None,
130 control_rx: None,
131 sdk_control_rx: None,
132 state: TransportState::Disconnected,
133 request_counter: 0,
134 close_stdin_after_prompt: false,
135 })
136 }
137
138 fn build_settings_value(&self) -> Option<String> {
139 let has_settings = self.options.settings.is_some();
140 let has_sandbox = self.options.sandbox.is_some();
141
142 if !has_settings && !has_sandbox {
143 return None;
144 }
145
146 if has_settings && !has_sandbox {
148 return self.options.settings.clone();
149 }
150
151 let mut settings_obj = serde_json::Map::new();
153
154 if let Some(ref settings) = self.options.settings {
155 let settings_str = settings.trim();
156
157 let load_as_json_string =
158 |s: &str| -> Option<serde_json::Map<String, serde_json::Value>> {
159 match serde_json::from_str::<serde_json::Value>(s) {
160 Ok(serde_json::Value::Object(map)) => Some(map),
161 Ok(_) => {
162 warn!("Settings JSON must be an object; ignoring provided JSON settings");
163 None
164 }
165 Err(_) => None,
166 }
167 };
168
169 let load_from_file =
170 |path: &Path| -> Option<serde_json::Map<String, serde_json::Value>> {
171 let content = std::fs::read_to_string(path).ok()?;
172 match serde_json::from_str::<serde_json::Value>(&content) {
173 Ok(serde_json::Value::Object(map)) => Some(map),
174 Ok(_) => {
175 warn!("Settings file JSON must be an object: {}", path.display());
176 None
177 }
178 Err(e) => {
179 warn!("Failed to parse settings file {}: {}", path.display(), e);
180 None
181 }
182 }
183 };
184
185 if settings_str.starts_with('{') && settings_str.ends_with('}') {
186 if let Some(map) = load_as_json_string(settings_str) {
187 settings_obj = map;
188 } else {
189 warn!(
190 "Failed to parse settings as JSON, treating as file path: {}",
191 settings_str
192 );
193 let settings_path = Path::new(settings_str);
194 if settings_path.exists() {
195 if let Some(map) = load_from_file(settings_path) {
196 settings_obj = map;
197 }
198 } else {
199 warn!("Settings file not found: {}", settings_path.display());
200 }
201 }
202 } else {
203 let settings_path = Path::new(settings_str);
204 if settings_path.exists() {
205 if let Some(map) = load_from_file(settings_path) {
206 settings_obj = map;
207 }
208 } else {
209 warn!("Settings file not found: {}", settings_path.display());
210 }
211 }
212 }
213
214 if let Some(ref sandbox) = self.options.sandbox {
215 match serde_json::to_value(sandbox) {
216 Ok(value) => {
217 settings_obj.insert("sandbox".to_string(), value);
218 }
219 Err(e) => {
220 warn!("Failed to serialize sandbox settings: {}", e);
221 }
222 }
223 }
224
225 Some(serde_json::Value::Object(settings_obj).to_string())
226 }
227
228 pub fn subscribe_messages(&self) -> Option<Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>> {
230 self.message_broadcast_tx.as_ref().map(|tx| {
231 let rx = tx.subscribe();
232 Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
233 |result| async move {
234 match result {
235 Ok(msg) => Some(Ok(msg)),
236 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
237 warn!("Receiver lagged by {} messages", n);
238 None
239 }
240 }
241 },
242 )) as Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>>
243 })
244 }
245
246 #[allow(dead_code)]
248 pub async fn receive_sdk_control_request(&mut self) -> Option<serde_json::Value> {
249 if let Some(ref mut rx) = self.sdk_control_rx {
250 rx.recv().await
251 } else {
252 None
253 }
254 }
255
256 pub fn take_sdk_control_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
258 self.sdk_control_rx.take()
259 }
260
261 pub fn with_cli_path(options: ClaudeCodeOptions, cli_path: impl Into<PathBuf>) -> Self {
263 Self {
264 options,
265 cli_path: cli_path.into(),
266 child: None,
267 stdin_tx: None,
268 message_broadcast_tx: None,
269 control_rx: None,
270 sdk_control_rx: None,
271 state: TransportState::Disconnected,
272 request_counter: 0,
273 close_stdin_after_prompt: false,
274 }
275 }
276
277 #[allow(dead_code)]
279 pub fn set_close_stdin_after_prompt(&mut self, close: bool) {
280 self.close_stdin_after_prompt = close;
281 }
282
283 #[allow(dead_code)]
285 pub fn for_print_mode(options: ClaudeCodeOptions, _prompt: String) -> Result<Self> {
286 let cli_path = find_claude_cli()?;
287 Ok(Self {
288 options,
289 cli_path,
290 child: None,
291 stdin_tx: None,
292 message_broadcast_tx: None,
293 control_rx: None,
294 sdk_control_rx: None,
295 state: TransportState::Disconnected,
296 request_counter: 0,
297 close_stdin_after_prompt: true,
298 })
299 }
300
301 fn build_command(&self) -> Command {
303 let mut cmd = Command::new(&self.cli_path);
304
305 cmd.arg("--output-format").arg("stream-json");
307 cmd.arg("--verbose");
308
309 cmd.arg("--input-format").arg("stream-json");
311
312 if self.options.include_partial_messages {
314 cmd.arg("--include-partial-messages");
315 }
316
317 if self.options.debug_stderr.is_some() {
319 cmd.arg("--debug-to-stderr");
320 }
321
322 if let Some(max_tokens) = self.options.max_output_tokens {
325 let capped = max_tokens.clamp(1, 32000);
327 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
328 debug!("Setting max_output_tokens from option: {}", capped);
329 } else {
330 if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
332 if let Ok(tokens) = current_value.parse::<u32>() {
333 if tokens > 32000 {
334 warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
335 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
336 }
337 } else {
339 warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
341 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
342 }
343 }
344 }
345
346 if let Some(ref prompt_v2) = self.options.system_prompt_v2 {
350 match prompt_v2 {
351 crate::types::SystemPrompt::String(s) => {
352 cmd.arg("--system-prompt").arg(s);
353 }
354 crate::types::SystemPrompt::Preset { append, .. } => {
355 if let Some(append_text) = append {
358 cmd.arg("--append-system-prompt").arg(append_text);
359 }
360 }
361 }
362 } else {
363 #[allow(deprecated)]
365 match self.options.system_prompt.as_deref() {
366 Some(prompt) => {
367 cmd.arg("--system-prompt").arg(prompt);
368 }
369 None => {
370 cmd.arg("--system-prompt").arg("");
371 }
372 }
373 #[allow(deprecated)]
374 if let Some(ref prompt) = self.options.append_system_prompt {
375 cmd.arg("--append-system-prompt").arg(prompt);
376 }
377 }
378
379 if !self.options.allowed_tools.is_empty() {
381 cmd.arg("--allowedTools")
382 .arg(self.options.allowed_tools.join(","));
383 }
384 if !self.options.disallowed_tools.is_empty() {
385 cmd.arg("--disallowedTools")
386 .arg(self.options.disallowed_tools.join(","));
387 }
388
389 match self.options.permission_mode {
391 PermissionMode::Default => {
392 cmd.arg("--permission-mode").arg("default");
393 }
394 PermissionMode::AcceptEdits => {
395 cmd.arg("--permission-mode").arg("acceptEdits");
396 }
397 PermissionMode::Plan => {
398 cmd.arg("--permission-mode").arg("plan");
399 }
400 PermissionMode::BypassPermissions => {
401 cmd.arg("--permission-mode").arg("bypassPermissions");
402 }
403 }
404
405 if let Some(ref model) = self.options.model {
407 cmd.arg("--model").arg(model);
408 }
409
410 if let Some(ref tool_name) = self.options.permission_prompt_tool_name {
412 cmd.arg("--permission-prompt-tool").arg(tool_name);
413 }
414
415 if let Some(max_turns) = self.options.max_turns {
417 cmd.arg("--max-turns").arg(max_turns.to_string());
418 }
419
420 if let Some(max_thinking_tokens) = self.options.max_thinking_tokens {
422 if max_thinking_tokens > 0 {
423 cmd.arg("--max-thinking-tokens")
424 .arg(max_thinking_tokens.to_string());
425 }
426 }
427
428 if let Some(ref cwd) = self.options.cwd {
430 cmd.current_dir(cwd);
431 }
432
433 for (key, value) in &self.options.env {
435 cmd.env(key, value);
436 }
437
438 if !self.options.mcp_servers.is_empty() {
440 let mcp_config = serde_json::json!({
441 "mcpServers": self.options.mcp_servers
442 });
443 cmd.arg("--mcp-config").arg(mcp_config.to_string());
444 }
445
446 if self.options.continue_conversation {
448 cmd.arg("--continue");
449 }
450 if let Some(ref resume_id) = self.options.resume {
451 cmd.arg("--resume").arg(resume_id);
452 }
453
454 if let Some(settings_value) = self.build_settings_value() {
456 cmd.arg("--settings").arg(settings_value);
457 }
458
459 for dir in &self.options.add_dirs {
461 cmd.arg("--add-dir").arg(dir);
462 }
463
464 if self.options.fork_session {
466 cmd.arg("--fork-session");
467 }
468
469 if let Some(ref tools) = self.options.tools {
473 match tools {
474 crate::types::ToolsConfig::List(list) => {
475 if list.is_empty() {
476 cmd.arg("--tools").arg("");
477 } else {
478 cmd.arg("--tools").arg(list.join(","));
479 }
480 }
481 crate::types::ToolsConfig::Preset(_preset) => {
482 cmd.arg("--tools").arg("default");
484 }
485 }
486 }
487
488 if !self.options.betas.is_empty() {
490 let betas: Vec<String> = self.options.betas.iter().map(|b| b.to_string()).collect();
491 cmd.arg("--betas").arg(betas.join(","));
492 }
493
494 if let Some(budget) = self.options.max_budget_usd {
496 cmd.arg("--max-budget-usd").arg(budget.to_string());
497 }
498
499 if let Some(ref fallback) = self.options.fallback_model {
501 cmd.arg("--fallback-model").arg(fallback);
502 }
503
504 if self.options.enable_file_checkpointing {
506 cmd.env("CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", "true");
507 }
508
509 if let Some(ref format) = self.options.output_format
511 && format.get("type").and_then(|v| v.as_str()) == Some("json_schema")
512 && let Some(schema) = format.get("schema")
513 && let Ok(schema_json) = serde_json::to_string(schema)
514 {
515 cmd.arg("--json-schema").arg(schema_json);
516 }
517
518 for plugin in &self.options.plugins {
520 match plugin {
521 crate::types::SdkPluginConfig::Local { path } => {
522 cmd.arg("--plugin-dir").arg(path);
523 }
524 }
525 }
526
527 if let Some(ref agents) = self.options.agents
529 && !agents.is_empty()
530 && let Ok(json_str) = serde_json::to_string(agents) {
531 cmd.arg("--agents").arg(json_str);
532 }
533
534 let sources_value = self
536 .options
537 .setting_sources
538 .as_ref()
539 .map(|sources| {
540 sources
541 .iter()
542 .map(|s| match s {
543 crate::types::SettingSource::User => "user",
544 crate::types::SettingSource::Project => "project",
545 crate::types::SettingSource::Local => "local",
546 })
547 .collect::<Vec<_>>()
548 .join(",")
549 })
550 .unwrap_or_default();
551 cmd.arg("--setting-sources").arg(sources_value);
552
553 for (key, value) in &self.options.extra_args {
555 let flag = if key.starts_with("--") || key.starts_with("-") {
556 key.clone()
557 } else {
558 format!("--{key}")
559 };
560 cmd.arg(&flag);
561 if let Some(val) = value {
562 cmd.arg(val);
563 }
564 }
565
566 cmd.stdin(Stdio::piped())
568 .stdout(Stdio::piped())
569 .stderr(Stdio::piped());
570
571 cmd.env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
573 cmd.env("CLAUDE_AGENT_SDK_VERSION", env!("CARGO_PKG_VERSION"));
574
575 debug!(
577 "Executing Claude CLI command: {} {:?}",
578 self.cli_path.display(),
579 cmd.as_std().get_args().collect::<Vec<_>>()
580 );
581
582 cmd
583 }
584
585 async fn check_cli_version(&self) -> Result<()> {
587 let output = tokio::time::timeout(
589 std::time::Duration::from_secs(5),
590 tokio::process::Command::new(&self.cli_path)
591 .arg("--version")
592 .output(),
593 )
594 .await;
595
596 let output = match output {
597 Ok(Ok(output)) => output,
598 Ok(Err(e)) => {
599 warn!("Failed to check CLI version: {}", e);
600 return Ok(()); }
602 Err(_) => {
603 warn!("CLI version check timed out after 5 seconds");
604 return Ok(());
605 }
606 };
607
608 let version_str = String::from_utf8_lossy(&output.stdout);
609 let version_str = version_str.trim();
610
611 if let Some(semver) = SemVer::parse(version_str) {
612 let min_version = SemVer::new(MIN_CLI_VERSION.0, MIN_CLI_VERSION.1, MIN_CLI_VERSION.2);
613
614 if semver < min_version {
615 warn!(
616 "⚠️ Claude CLI version {}.{}.{} is below minimum required version {}.{}.{}",
617 semver.major,
618 semver.minor,
619 semver.patch,
620 MIN_CLI_VERSION.0,
621 MIN_CLI_VERSION.1,
622 MIN_CLI_VERSION.2
623 );
624 warn!(
625 " Some features may not work correctly. Please upgrade with: npm install -g @anthropic-ai/claude-code@latest"
626 );
627 } else {
628 info!("Claude CLI version: {}.{}.{}", semver.major, semver.minor, semver.patch);
629 }
630 } else {
631 debug!("Could not parse CLI version: {}", version_str);
632 }
633
634 Ok(())
635 }
636
637 async fn spawn_process(&mut self) -> Result<()> {
639 self.state = TransportState::Connecting;
640
641 let mut cmd = self.build_command();
642 info!("Starting Claude CLI with command: {:?}", cmd);
643
644 if let Some(user) = self.options.user.as_deref() {
645 apply_process_user(&mut cmd, user)?;
646 }
647
648 let mut child = cmd.spawn().map_err(|e| {
649 error!("Failed to spawn Claude CLI: {}", e);
650 SdkError::ProcessError(e)
651 })?;
652
653 let stdin = child
655 .stdin
656 .take()
657 .ok_or_else(|| SdkError::ConnectionError("Failed to get stdin".into()))?;
658 let stdout = child
659 .stdout
660 .take()
661 .ok_or_else(|| SdkError::ConnectionError("Failed to get stdout".into()))?;
662 let stderr = child
663 .stderr
664 .take()
665 .ok_or_else(|| SdkError::ConnectionError("Failed to get stderr".into()))?;
666
667 let buffer_size = self.options.cli_channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE);
669
670 let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(buffer_size);
672 let (message_broadcast_tx, _) =
674 tokio::sync::broadcast::channel::<Message>(buffer_size);
675 let (control_tx, control_rx) = mpsc::channel::<ControlResponse>(buffer_size);
676
677 tokio::spawn(async move {
679 let mut stdin = stdin;
680 debug!("Stdin handler started");
681 while let Some(line) = stdin_rx.recv().await {
682 debug!("Received line from channel: {}", line);
683 if let Err(e) = stdin.write_all(line.as_bytes()).await {
684 error!("Failed to write to stdin: {}", e);
685 break;
686 }
687 if let Err(e) = stdin.write_all(b"\n").await {
688 error!("Failed to write newline: {}", e);
689 break;
690 }
691 if let Err(e) = stdin.flush().await {
692 error!("Failed to flush stdin: {}", e);
693 break;
694 }
695 debug!("Successfully sent to Claude process: {}", line);
696 }
697 debug!("Stdin handler ended");
698 });
699
700 let (sdk_control_tx, sdk_control_rx) = mpsc::channel::<serde_json::Value>(buffer_size);
702
703 let message_broadcast_tx_clone = message_broadcast_tx.clone();
705 let control_tx_clone = control_tx.clone();
706 let sdk_control_tx_clone = sdk_control_tx.clone();
707 tokio::spawn(async move {
708 debug!("Stdout handler started");
709 let reader = BufReader::new(stdout);
710 let mut lines = reader.lines();
711
712 while let Ok(Some(line)) = lines.next_line().await {
713 if line.trim().is_empty() {
714 continue;
715 }
716
717 debug!("Claude output: {}", line);
718
719 match serde_json::from_str::<serde_json::Value>(&line) {
721 Ok(json) => {
722 if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
724 if msg_type == "control_response" {
726 debug!("Received control response: {:?}", json);
727
728 let _ = sdk_control_tx_clone.send(json.clone()).await;
730
731 if let Some(response_obj) = json.get("response")
736 && let Some(request_id) = response_obj.get("request_id")
737 .or_else(|| response_obj.get("requestId"))
738 .and_then(|v| v.as_str())
739 {
740 let subtype = response_obj.get("subtype").and_then(|v| v.as_str());
742 let success = subtype == Some("success");
743
744 let control_resp = ControlResponse::InterruptAck {
745 request_id: request_id.to_string(),
746 success,
747 };
748 let _ = control_tx_clone.send(control_resp).await;
749 }
750 continue;
751 }
752
753 if msg_type == "control_request" {
755 debug!("Received control request from CLI: {:?}", json);
756 let _ = sdk_control_tx_clone.send(json.clone()).await;
758 continue;
759 }
760
761 if msg_type == "control"
763 && let Some(control) = json.get("control") {
764 debug!("Received control message: {:?}", control);
765 let _ = sdk_control_tx_clone.send(control.clone()).await;
766 continue;
767 }
768
769 if msg_type == "sdk_control_request" {
771 debug!("Received SDK control request (legacy): {:?}", json);
773 let _ = sdk_control_tx_clone.send(json.clone()).await;
774 continue;
775 }
776
777 if msg_type == "system"
779 && let Some(subtype) = json.get("subtype").and_then(|v| v.as_str())
780 && subtype.starts_with("sdk_control:") {
781 debug!("Received SDK control message: {}", subtype);
783 let _ = sdk_control_tx_clone.send(json.clone()).await;
784 }
786 }
787
788 match crate::message_parser::parse_message(json) {
790 Ok(Some(message)) => {
791 let _ = message_broadcast_tx_clone.send(message);
793 }
794 Ok(None) => {
795 }
797 Err(e) => {
798 warn!("Failed to parse message: {}", e);
799 }
800 }
801 }
802 Err(e) => {
803 warn!("Failed to parse JSON: {} - Line: {}", e, line);
804 }
805 }
806 }
807 info!("Stdout reader ended");
808 });
809
810 let message_broadcast_tx_for_error = message_broadcast_tx.clone();
812 let debug_stderr = self.options.debug_stderr.clone();
813 let stderr_callback = self.options.stderr_callback.clone();
814 tokio::spawn(async move {
815 let reader = BufReader::new(stderr);
816 let mut lines = reader.lines();
817 let mut error_buffer = Vec::new();
818
819 while let Ok(Some(line)) = lines.next_line().await {
820 if !line.trim().is_empty() {
821 if let Some(ref debug_output) = debug_stderr {
823 let mut output = debug_output.lock().await;
824 let _ = writeln!(output, "{line}");
825 let _ = output.flush();
826 }
827
828 if let Some(ref callback) = stderr_callback {
829 callback.as_ref()(line.as_str());
830 }
831
832 error!("Claude CLI stderr: {}", line);
833 error_buffer.push(line.clone());
834
835 if line.contains("command not found") || line.contains("No such file") {
837 error!("Claude CLI binary not found or not executable");
838 } else if line.contains("ENOENT") || line.contains("spawn") {
839 error!("Failed to spawn Claude CLI process - binary may not be installed");
840 } else if line.contains("authentication") || line.contains("API key") || line.contains("Unauthorized") {
841 error!("Claude CLI authentication error - please run 'claude-code api login'");
842 } else if line.contains("model") && (line.contains("not available") || line.contains("not found")) {
843 error!("Model not available for your account: {}", line);
844 } else if line.contains("Error:") || line.contains("error:") {
845 error!("Claude CLI error detected: {}", line);
846 }
847 }
848 }
849
850 if !error_buffer.is_empty() {
852 let error_msg = error_buffer.join("\n");
853 error!("Claude CLI stderr output collected:\n{}", error_msg);
854
855 let _ = message_broadcast_tx_for_error.send(Message::System {
857 subtype: "error".to_string(),
858 data: serde_json::json!({
859 "source": "stderr",
860 "error": "Claude CLI error output",
861 "details": error_msg
862 }),
863 });
864 }
865 });
866
867 self.child = Some(child);
869 self.stdin_tx = Some(stdin_tx);
870 self.message_broadcast_tx = Some(message_broadcast_tx);
871 self.control_rx = Some(control_rx);
872 self.sdk_control_rx = Some(sdk_control_rx);
873 self.state = TransportState::Connected;
874
875 Ok(())
876 }
877}
878
879#[async_trait]
880impl Transport for SubprocessTransport {
881 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
882 self
883 }
884
885 async fn connect(&mut self) -> Result<()> {
886 if self.state == TransportState::Connected {
887 return Ok(());
888 }
889
890 if let Err(e) = self.check_cli_version().await {
892 warn!("CLI version check failed: {}", e);
893 }
894
895 self.spawn_process().await?;
896 info!("Connected to Claude CLI");
897 Ok(())
898 }
899
900 async fn send_message(&mut self, message: InputMessage) -> Result<()> {
901 if self.state != TransportState::Connected {
902 return Err(SdkError::InvalidState {
903 message: "Not connected".into(),
904 });
905 }
906
907 let json = serde_json::to_string(&message)?;
908 debug!("Serialized message: {}", json);
909
910 if let Some(ref tx) = self.stdin_tx {
911 debug!("Sending message to stdin channel");
912 tx.send(json).await?;
913 debug!("Message sent to channel");
914 Ok(())
915 } else {
916 Err(SdkError::InvalidState {
917 message: "Stdin channel not available".into(),
918 })
919 }
920 }
921
922 fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + 'static>> {
923 if let Some(ref tx) = self.message_broadcast_tx {
924 let rx = tx.subscribe();
926 Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
928 |result| async move {
929 match result {
930 Ok(msg) => Some(Ok(msg)),
931 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(
932 n,
933 )) => {
934 warn!("Receiver lagged by {} messages", n);
935 None
936 }
937 }
938 },
939 ))
940 } else {
941 Box::pin(futures::stream::empty())
942 }
943 }
944
945 async fn send_control_request(&mut self, request: ControlRequest) -> Result<()> {
946 if self.state != TransportState::Connected {
947 return Err(SdkError::InvalidState {
948 message: "Not connected".into(),
949 });
950 }
951
952 self.request_counter += 1;
953 let control_msg = match request {
954 ControlRequest::Interrupt { request_id } => {
955 serde_json::json!({
956 "type": "control_request",
957 "request": {
958 "type": "interrupt",
959 "request_id": request_id
960 }
961 })
962 }
963 };
964
965 let json = serde_json::to_string(&control_msg)?;
966
967 if let Some(ref tx) = self.stdin_tx {
968 tx.send(json).await?;
969 Ok(())
970 } else {
971 Err(SdkError::InvalidState {
972 message: "Stdin channel not available".into(),
973 })
974 }
975 }
976
977 async fn receive_control_response(&mut self) -> Result<Option<ControlResponse>> {
978 if let Some(ref mut rx) = self.control_rx {
979 Ok(rx.recv().await)
980 } else {
981 Ok(None)
982 }
983 }
984
985 async fn send_sdk_control_request(&mut self, request: serde_json::Value) -> Result<()> {
986 let json = serde_json::to_string(&request)?;
989
990 if let Some(ref tx) = self.stdin_tx {
991 tx.send(json).await?;
992 Ok(())
993 } else {
994 Err(SdkError::InvalidState {
995 message: "Stdin channel not available".into(),
996 })
997 }
998 }
999
1000 async fn send_sdk_control_response(&mut self, response: serde_json::Value) -> Result<()> {
1001 let control_response = serde_json::json!({
1004 "type": "control_response",
1005 "response": response
1006 });
1007
1008 let json = serde_json::to_string(&control_response)?;
1009
1010 if let Some(ref tx) = self.stdin_tx {
1011 tx.send(json).await?;
1012 Ok(())
1013 } else {
1014 Err(SdkError::InvalidState {
1015 message: "Stdin channel not available".into(),
1016 })
1017 }
1018 }
1019
1020 fn is_connected(&self) -> bool {
1021 self.state == TransportState::Connected
1022 }
1023
1024 async fn disconnect(&mut self) -> Result<()> {
1025 if self.state != TransportState::Connected {
1026 return Ok(());
1027 }
1028
1029 self.state = TransportState::Disconnecting;
1030
1031 self.stdin_tx.take();
1033
1034 if let Some(mut child) = self.child.take() {
1036 match child.kill().await {
1037 Ok(()) => info!("Claude CLI process terminated"),
1038 Err(e) => warn!("Failed to kill Claude CLI process: {}", e),
1039 }
1040 }
1041
1042 self.state = TransportState::Disconnected;
1043 Ok(())
1044 }
1045
1046 fn take_sdk_control_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<serde_json::Value>> {
1047 self.sdk_control_rx.take()
1048 }
1049
1050 async fn end_input(&mut self) -> Result<()> {
1051 self.stdin_tx.take();
1053 Ok(())
1054 }
1055}
1056
1057impl Drop for SubprocessTransport {
1058 fn drop(&mut self) {
1059 if let Some(mut child) = self.child.take() {
1060 let _ = child.start_kill();
1062 }
1063 }
1064}
1065
1066pub fn find_claude_cli() -> Result<PathBuf> {
1073 for cmd_name in &["claude", "claude-code"] {
1075 if let Ok(path) = which::which(cmd_name) {
1076 debug!("Found Claude CLI in PATH at: {}", path.display());
1077 return Ok(path);
1078 }
1079 }
1080
1081 if let Some(cached_path) = crate::cli_download::get_cached_cli_path() {
1083 if cached_path.exists() && cached_path.is_file() {
1084 debug!("Found cached Claude CLI at: {}", cached_path.display());
1085 return Ok(cached_path);
1086 }
1087 }
1088
1089 let home = dirs::home_dir().ok_or_else(|| SdkError::CliNotFound {
1091 searched_paths: "Unable to determine home directory".into(),
1092 })?;
1093
1094 let locations = vec![
1095 home.join(".npm-global/bin/claude"),
1097 home.join(".npm-global/bin/claude-code"),
1098 PathBuf::from("/usr/local/bin/claude"),
1099 PathBuf::from("/usr/local/bin/claude-code"),
1100 home.join(".local/bin/claude"),
1102 home.join(".local/bin/claude-code"),
1103 home.join("node_modules/.bin/claude"),
1104 home.join("node_modules/.bin/claude-code"),
1105 home.join(".yarn/bin/claude"),
1107 home.join(".yarn/bin/claude-code"),
1108 PathBuf::from("/opt/homebrew/bin/claude"),
1110 PathBuf::from("/opt/homebrew/bin/claude-code"),
1111 home.join(".claude/local/claude"),
1113 ];
1114
1115 let mut searched = Vec::new();
1116 for path in &locations {
1117 searched.push(path.display().to_string());
1118 if path.exists() && path.is_file() {
1119 debug!("Found Claude CLI at: {}", path.display());
1120 return Ok(path.clone());
1121 }
1122 }
1123
1124 warn!("Claude CLI not found in any standard location");
1126 warn!("Searched paths: {:?}", searched);
1127
1128 if which::which("node").is_err() && which::which("npm").is_err() {
1130 error!("Node.js/npm not found - Claude CLI requires Node.js");
1131 return Err(SdkError::CliNotFound {
1132 searched_paths: format!(
1133 "Node.js is not installed. Install from https://nodejs.org/\n\n\
1134 Alternatively, enable auto_download_cli to automatically download the CLI:\n\
1135 ```rust\n\
1136 let options = ClaudeCodeOptions::builder()\n\
1137 .auto_download_cli(true)\n\
1138 .build();\n\
1139 ```\n\n\
1140 Searched in:\n{}",
1141 searched.join("\n")
1142 ),
1143 });
1144 }
1145
1146 Err(SdkError::CliNotFound {
1147 searched_paths: format!(
1148 "Claude CLI not found.\n\n\
1149 Option 1 - Auto-download (recommended):\n\
1150 ```rust\n\
1151 let options = ClaudeCodeOptions::builder()\n\
1152 .auto_download_cli(true)\n\
1153 .build();\n\
1154 ```\n\n\
1155 Option 2 - Manual installation:\n\
1156 npm install -g @anthropic-ai/claude-code\n\n\
1157 Searched in:\n{}",
1158 searched.join("\n")
1159 ),
1160 })
1161}
1162
1163pub(crate) fn apply_process_user(cmd: &mut Command, user: &str) -> Result<()> {
1164 let user = user.trim();
1165 if user.is_empty() {
1166 return Err(SdkError::ConfigError(
1167 "options.user must be a non-empty username or uid".into(),
1168 ));
1169 }
1170
1171 apply_process_user_inner(cmd, user)
1172}
1173
1174#[cfg(unix)]
1175fn apply_process_user_inner(cmd: &mut Command, user: &str) -> Result<()> {
1176 use std::ffi::CString;
1177 use std::mem::MaybeUninit;
1178 use std::os::unix::process::CommandExt;
1179 use std::ptr;
1180
1181 fn passwd_buf_len() -> usize {
1182 let buf_len = unsafe { libc::sysconf(libc::_SC_GETPW_R_SIZE_MAX) };
1183 if buf_len <= 0 {
1184 16 * 1024
1185 } else {
1186 buf_len as usize
1187 }
1188 }
1189
1190 fn lookup_by_name(name: &str) -> Result<(u32, u32)> {
1191 let name = CString::new(name).map_err(|_| {
1192 SdkError::ConfigError("options.user must not contain NUL bytes".into())
1193 })?;
1194
1195 let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1196 let mut result: *mut libc::passwd = ptr::null_mut();
1197 let mut buf = vec![0u8; passwd_buf_len()];
1198
1199 let rc = unsafe {
1200 libc::getpwnam_r(
1201 name.as_ptr(),
1202 pwd.as_mut_ptr(),
1203 buf.as_mut_ptr() as *mut libc::c_char,
1204 buf.len(),
1205 &mut result,
1206 )
1207 };
1208 if rc != 0 {
1209 return Err(SdkError::ConfigError(format!(
1210 "Failed to resolve options.user={}: getpwnam_r returned {}",
1211 name.to_string_lossy(),
1212 rc
1213 )));
1214 }
1215 if result.is_null() {
1216 return Err(SdkError::ConfigError(format!(
1217 "User not found: {}",
1218 name.to_string_lossy()
1219 )));
1220 }
1221
1222 let pwd = unsafe { pwd.assume_init() };
1223 Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1224 }
1225
1226 fn lookup_by_uid(uid: u32) -> Result<(u32, u32)> {
1227 let mut pwd = MaybeUninit::<libc::passwd>::zeroed();
1228 let mut result: *mut libc::passwd = ptr::null_mut();
1229 let mut buf = vec![0u8; passwd_buf_len()];
1230
1231 let rc = unsafe {
1232 libc::getpwuid_r(
1233 uid as libc::uid_t,
1234 pwd.as_mut_ptr(),
1235 buf.as_mut_ptr() as *mut libc::c_char,
1236 buf.len(),
1237 &mut result,
1238 )
1239 };
1240 if rc != 0 {
1241 return Err(SdkError::ConfigError(format!(
1242 "Failed to resolve options.user={}: getpwuid_r returned {}",
1243 uid, rc
1244 )));
1245 }
1246 if result.is_null() {
1247 return Err(SdkError::ConfigError(format!("User not found for uid: {}", uid)));
1248 }
1249
1250 let pwd = unsafe { pwd.assume_init() };
1251 Ok((pwd.pw_uid as u32, pwd.pw_gid as u32))
1252 }
1253
1254 let (uid, gid) = match user.parse::<u32>() {
1255 Ok(uid) => lookup_by_uid(uid)?,
1256 Err(_) => lookup_by_name(user)?,
1257 };
1258
1259 cmd.as_std_mut().uid(uid).gid(gid);
1260 Ok(())
1261}
1262
1263#[cfg(not(unix))]
1264fn apply_process_user_inner(_cmd: &mut Command, _user: &str) -> Result<()> {
1265 Err(SdkError::NotSupported {
1266 feature: "options.user is only supported on Unix platforms".into(),
1267 })
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272 use super::*;
1273
1274 #[test]
1275 fn test_find_claude_cli_error_message() {
1276 let error = SdkError::CliNotFound {
1278 searched_paths: "test paths".to_string(),
1279 };
1280 let error_msg = error.to_string();
1281 assert!(error_msg.contains("npm install -g @anthropic-ai/claude-code"));
1282 assert!(error_msg.contains("test paths"));
1283 }
1284
1285 #[tokio::test]
1286 async fn test_transport_lifecycle() {
1287 let options = ClaudeCodeOptions::default();
1288 let transport = SubprocessTransport::new(options).unwrap_or_else(|_| {
1289 SubprocessTransport::with_cli_path(ClaudeCodeOptions::default(), "/usr/bin/true")
1291 });
1292
1293 assert!(!transport.is_connected());
1294 assert_eq!(transport.state, TransportState::Disconnected);
1295 }
1296
1297 #[test]
1298 fn test_semver_parse() {
1299 let v = SemVer::parse("2.0.0").unwrap();
1301 assert_eq!(v.major, 2);
1302 assert_eq!(v.minor, 0);
1303 assert_eq!(v.patch, 0);
1304
1305 let v = SemVer::parse("v2.1.3").unwrap();
1307 assert_eq!(v.major, 2);
1308 assert_eq!(v.minor, 1);
1309 assert_eq!(v.patch, 3);
1310
1311 let v = SemVer::parse("@anthropic-ai/claude-code/2.5.1").unwrap();
1313 assert_eq!(v.major, 2);
1314 assert_eq!(v.minor, 5);
1315 assert_eq!(v.patch, 1);
1316
1317 let v = SemVer::parse("2.1").unwrap();
1319 assert_eq!(v.major, 2);
1320 assert_eq!(v.minor, 1);
1321 assert_eq!(v.patch, 0);
1322 }
1323
1324 #[test]
1325 fn test_semver_compare() {
1326 let v1 = SemVer::new(2, 0, 0);
1327 let v2 = SemVer::new(2, 0, 1);
1328 let v3 = SemVer::new(2, 1, 0);
1329 let v4 = SemVer::new(3, 0, 0);
1330
1331 assert!(v1 < v2);
1332 assert!(v2 < v3);
1333 assert!(v3 < v4);
1334 assert!(v1 < v4);
1335
1336 let min_version = SemVer::new(2, 0, 0);
1337 assert!(SemVer::new(1, 9, 9) < min_version);
1338 assert!(SemVer::new(2, 0, 0) >= min_version);
1339 assert!(SemVer::new(2, 1, 0) >= min_version);
1340 }
1341}