claude_code_sdk/transport/
subprocess_cli.rs1use futures::Stream;
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::process::Stdio;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::{Child, Command};
10use tokio_stream::{wrappers::LinesStream, StreamExt};
11use tracing::{debug, error, info, warn, instrument};
12use async_stream;
13
14use crate::{
15 errors::*,
16 types::{ClaudeCodeOptions, PermissionMode},
17 transport::Transport,
18 SafetyLimits, SafetyError,
19};
20
21pub struct SubprocessCLITransport {
23 prompt: String,
24 options: ClaudeCodeOptions,
25 cli_path: String,
26 cwd: Option<PathBuf>,
27 process: Option<Child>,
28 safety_limits: SafetyLimits,
29 json_buffer: String,
30}
31
32impl SubprocessCLITransport {
33 #[instrument(level = "debug", skip(prompt, options))]
35 pub fn new(
36 prompt: &str,
37 options: ClaudeCodeOptions,
38 cli_path: Option<&str>,
39 ) -> Result<Self, ClaudeSDKError> {
40 info!("Creating new subprocess CLI transport");
41 debug!(
42 prompt_length = prompt.len(),
43 cli_path = cli_path,
44 cwd = ?options.cwd,
45 "Transport configuration"
46 );
47
48 let cli_path = if let Some(path) = cli_path {
49 debug!(provided_path = path, "Using provided CLI path");
50 path.to_string()
51 } else {
52 debug!("Searching for CLI path");
53 Self::find_cli()?
54 };
55
56 let cwd = options.cwd.clone();
57
58 info!(cli_path = %cli_path, "Successfully created subprocess transport");
59 Ok(Self {
60 prompt: prompt.to_string(),
61 options,
62 cli_path,
63 cwd,
64 process: None,
65 safety_limits: SafetyLimits::default(),
66 json_buffer: String::new(),
67 })
68 }
69
70 pub fn with_safety_limits(mut self, limits: SafetyLimits) -> Self {
72 info!(?limits, "Setting custom safety limits");
73 self.safety_limits = limits;
74 self
75 }
76
77 pub fn try_parse_json_buffer(&mut self) -> Option<Result<HashMap<String, serde_json::Value>, ClaudeSDKError>> {
79 if self.json_buffer.is_empty() {
80 return None;
81 }
82
83 let buffer_size = self.json_buffer.len();
85 if !self.safety_limits.is_line_size_safe(buffer_size) {
86 error!(
87 buffer_size = buffer_size,
88 limit = self.safety_limits.max_line_size,
89 "JSON buffer exceeds safety limit"
90 );
91 self.json_buffer.clear(); return Some(Err(ClaudeSDKError::Safety(SafetyError::LineTooLarge {
93 actual: buffer_size,
94 limit: self.safety_limits.max_line_size,
95 })));
96 }
97
98 debug!(
99 buffer_length = buffer_size,
100 buffer_preview = %self.safety_limits.safe_log_preview(&self.json_buffer),
101 "Attempting to parse JSON buffer"
102 );
103
104 let parse_start = std::time::Instant::now();
106 let parse_result = serde_json::from_str::<HashMap<String, serde_json::Value>>(&self.json_buffer);
107 let parse_duration = parse_start.elapsed();
108
109 if parse_duration.as_millis() > self.safety_limits.json_parse_timeout_ms as u128 {
110 warn!(
111 duration_ms = parse_duration.as_millis(),
112 timeout_ms = self.safety_limits.json_parse_timeout_ms,
113 "JSON parsing took longer than expected"
114 );
115 }
116
117 match parse_result {
118 Ok(data) => {
119 debug!(
120 fields_count = data.len(),
121 parse_duration_ms = parse_duration.as_millis(),
122 buffer_length = buffer_size,
123 "Successfully parsed multiline JSON message"
124 );
125
126 if let Some(message_obj) = data.get("message") {
128 if let Some(content_arr) = message_obj.get("content").and_then(|c| c.as_array()) {
129 for content_item in content_arr {
130 if let Some(text) = content_item.get("text").and_then(|t| t.as_str()) {
131 let text_size = text.len();
132 if !self.safety_limits.is_text_block_safe(text_size) {
133 warn!(
134 text_size = text_size,
135 limit = self.safety_limits.max_text_block_size,
136 text_preview = %self.safety_limits.safe_log_preview(text),
137 "Large text block detected in multiline JSON"
138 );
139 }
140 }
141 }
142 }
143 }
144
145 self.json_buffer.clear(); Some(Ok(data))
147 }
148 Err(e) => {
149 debug!(
151 error = %e,
152 buffer_preview = %self.safety_limits.safe_log_preview(&self.json_buffer),
153 "JSON buffer not yet complete, waiting for more data"
154 );
155 None }
157 }
158 }
159
160 pub fn process_line(&mut self, line: String) -> Option<Result<HashMap<String, serde_json::Value>, ClaudeSDKError>> {
162 let line = line.trim();
163 if line.is_empty() {
164 debug!("Skipping empty line");
165 return None;
166 }
167
168 let line_size = line.len();
170 if !self.safety_limits.is_line_size_safe(line_size) {
171 error!(
172 line_size = line_size,
173 limit = self.safety_limits.max_line_size,
174 "Single line exceeds safety limit"
175 );
176 return Some(Err(ClaudeSDKError::Safety(SafetyError::LineTooLarge {
177 actual: line_size,
178 limit: self.safety_limits.max_line_size,
179 })));
180 }
181
182 debug!(line_length = line_size, "Processing line from subprocess");
183
184 let looks_like_json_start = line.starts_with('{') || line.starts_with('[');
186 let looks_like_json_continuation = !self.json_buffer.is_empty();
187
188 if looks_like_json_start && self.json_buffer.is_empty() {
189 debug!("Starting new JSON buffer");
191 self.json_buffer = line.to_string();
192 } else if looks_like_json_continuation {
193 debug!("Appending to existing JSON buffer");
195 self.json_buffer.push('\n');
196 self.json_buffer.push_str(line);
197 } else if !looks_like_json_start {
198 debug!(
200 line_preview = %self.safety_limits.safe_log_preview(line),
201 "Skipping non-JSON line"
202 );
203 return None;
204 }
205
206 if let Some(result) = self.try_parse_json_buffer() {
208 return Some(result);
209 }
210
211 if self.json_buffer.len() > self.safety_limits.max_line_size / 2 {
213 warn!(
214 buffer_size = self.json_buffer.len(),
215 max_size = self.safety_limits.max_line_size,
216 "JSON buffer growing large without successful parse, might be malformed"
217 );
218 }
219
220 None }
222
223 #[instrument(level = "debug")]
225 fn find_cli() -> Result<String, ClaudeSDKError> {
226 debug!("Searching for Claude Code CLI binary");
227
228 debug!("Checking PATH for 'claude' executable");
230 if let Ok(path) = which::which("claude") {
231 let path_str = path.to_string_lossy().to_string();
232 info!(path = %path_str, "Found Claude CLI in PATH");
233 return Ok(path_str);
234 }
235 debug!("Claude CLI not found in PATH");
236
237 let home_dir = home::home_dir().unwrap_or_else(|| PathBuf::from("/"));
239 debug!(home_dir = %home_dir.display(), "Using home directory");
240
241 let locations = vec![
242 home_dir.join(".npm-global/bin/claude"),
243 PathBuf::from("/usr/local/bin/claude"),
244 home_dir.join(".local/bin/claude"),
245 home_dir.join("node_modules/.bin/claude"),
246 home_dir.join(".yarn/bin/claude"),
247 ];
248
249 debug!(locations_count = locations.len(), "Checking common installation locations");
250 for path in &locations {
251 debug!(path = %path.display(), "Checking location");
252 if path.exists() && path.is_file() {
253 let path_str = path.to_string_lossy().to_string();
254 info!(path = %path_str, "Found Claude CLI at common location");
255 return Ok(path_str);
256 }
257 }
258 debug!("Claude CLI not found in common locations");
259
260 debug!("Checking if Node.js is available");
262 let node_installed = which::which("node").is_ok();
263
264 if !node_installed {
265 error!("Node.js is not installed");
266 let error_msg = "Claude Code requires Node.js, which is not installed.\n\n\
267 Install Node.js from: https://nodejs.org/\n\
268 \nAfter installing Node.js, install Claude Code:\n\
269 npm install -g @anthropic-ai/claude-code";
270 return Err(ClaudeSDKError::CLINotFound(CLINotFoundError::new(error_msg)));
271 }
272 debug!("Node.js is available");
273
274 error!("Claude Code CLI not found in any location");
275 let error_msg = "Claude Code not found. Install with:\n\
276 npm install -g @anthropic-ai/claude-code\n\
277 \nIf already installed locally, try:\n\
278 export PATH=\"$HOME/node_modules/.bin:$PATH\"\n\
279 \nOr specify the path when creating transport";
280 Err(ClaudeSDKError::CLINotFound(CLINotFoundError::new(error_msg)))
281 }
282
283 #[instrument(level = "trace", skip(self))]
285 fn build_command(&self) -> Vec<String> {
286 debug!("Building CLI command with arguments");
287 let mut cmd = vec![
288 self.cli_path.clone(),
289 "--output-format".to_string(),
290 "stream-json".to_string(),
291 "--verbose".to_string(),
292 ];
293
294 if let Some(system_prompt) = &self.options.system_prompt {
295 debug!(system_prompt_length = system_prompt.len(), "Adding system prompt");
296 cmd.extend(["--system-prompt".to_string(), system_prompt.clone()]);
297 }
298
299 if let Some(append_system_prompt) = &self.options.append_system_prompt {
300 debug!(append_system_prompt_length = append_system_prompt.len(), "Adding append system prompt");
301 cmd.extend(["--append-system-prompt".to_string(), append_system_prompt.clone()]);
302 }
303
304 if !self.options.allowed_tools.is_empty() {
305 debug!(allowed_tools = ?self.options.allowed_tools, "Adding allowed tools");
306 cmd.extend([
307 "--allowedTools".to_string(),
308 self.options.allowed_tools.join(","),
309 ]);
310 }
311
312 if let Some(max_turns) = self.options.max_turns {
313 debug!(max_turns, "Adding max turns limit");
314 cmd.extend(["--max-turns".to_string(), max_turns.to_string()]);
315 }
316
317 if !self.options.disallowed_tools.is_empty() {
318 debug!(disallowed_tools = ?self.options.disallowed_tools, "Adding disallowed tools");
319 cmd.extend([
320 "--disallowedTools".to_string(),
321 self.options.disallowed_tools.join(","),
322 ]);
323 }
324
325 if let Some(model) = &self.options.model {
326 debug!(model = %model, "Adding model specification");
327 cmd.extend(["--model".to_string(), model.clone()]);
328 }
329
330 if let Some(permission_prompt_tool_name) = &self.options.permission_prompt_tool_name {
331 debug!(tool_name = %permission_prompt_tool_name, "Adding permission prompt tool");
332 cmd.extend([
333 "--permission-prompt-tool".to_string(),
334 permission_prompt_tool_name.clone(),
335 ]);
336 }
337
338 if let Some(permission_mode) = &self.options.permission_mode {
339 let mode_str = match permission_mode {
340 PermissionMode::Default => "default",
341 PermissionMode::AcceptEdits => "acceptEdits",
342 PermissionMode::BypassPermissions => "bypassPermissions",
343 };
344 debug!(permission_mode = mode_str, "Adding permission mode");
345 cmd.extend(["--permission-mode".to_string(), mode_str.to_string()]);
346 }
347
348 if self.options.continue_conversation {
349 debug!("Adding continue conversation flag");
350 cmd.push("--continue".to_string());
351 }
352
353 if let Some(resume) = &self.options.resume {
354 debug!(resume = %resume, "Adding resume option");
355 cmd.extend(["--resume".to_string(), resume.clone()]);
356 }
357
358 if !self.options.mcp_servers.is_empty() {
359 debug!(mcp_servers_count = self.options.mcp_servers.len(), "Adding MCP servers configuration");
360 let mcp_config = serde_json::json!({
361 "mcpServers": self.options.mcp_servers
362 });
363 cmd.extend([
364 "--mcp-config".to_string(),
365 mcp_config.to_string(),
366 ]);
367 }
368
369 cmd.extend(["--print".to_string(), self.prompt.clone()]);
370 debug!(total_args = cmd.len(), "Built complete CLI command");
371 cmd
372 }
373}
374
375#[async_trait::async_trait]
376impl Transport for SubprocessCLITransport {
377 #[instrument(level = "info", skip(self))]
379 async fn connect(&mut self) -> Result<(), ClaudeSDKError> {
380 if self.process.is_some() {
381 debug!("Process already connected, skipping connection");
382 return Ok(());
383 }
384
385 info!("Starting Claude CLI subprocess");
386 let cmd_args = self.build_command();
387 debug!(args_count = cmd_args.len(), "Built command arguments");
388
389 let mut command = Command::new(&cmd_args[0]);
390 command
391 .args(&cmd_args[1..])
392 .stdin(Stdio::null())
393 .stdout(Stdio::piped())
394 .stderr(Stdio::piped())
395 .env("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
396
397 if let Some(cwd) = &self.cwd {
398 debug!(cwd = %cwd.display(), "Setting working directory");
399 command.current_dir(cwd);
400 }
401
402 debug!("Spawning subprocess");
403 let process = command.spawn().map_err(|e| {
404 if e.kind() == std::io::ErrorKind::NotFound {
405 error!(
406 error = %e,
407 cli_path = %self.cli_path,
408 "Claude Code CLI not found"
409 );
410 ClaudeSDKError::CLINotFound(CLINotFoundError::with_path(
411 "Claude Code not found at",
412 &self.cli_path,
413 ))
414 } else {
415 error!(error = %e, "Failed to spawn Claude Code subprocess");
416 ClaudeSDKError::CLIConnection(CLIConnectionError::new(format!(
417 "Failed to start Claude Code: {}",
418 e
419 )))
420 }
421 })?;
422
423 info!(pid = process.id(), "Successfully started Claude CLI subprocess");
424 self.process = Some(process);
425 Ok(())
426 }
427
428 #[instrument(level = "info", skip(self))]
430 async fn disconnect(&mut self) -> Result<(), ClaudeSDKError> {
431 if let Some(mut process) = self.process.take() {
432 info!(pid = process.id(), "Disconnecting from Claude CLI subprocess");
433
434 if let Ok(Some(status)) = process.try_wait() {
436 if status.success() {
437 info!("Process already finished successfully");
438 } else {
439 warn!(exit_code = status.code(), "Process already finished with error");
440 }
441 return Ok(());
442 }
443
444 debug!("Killing subprocess");
446 if let Err(e) = process.kill().await {
447 warn!(error = %e, "Failed to kill subprocess (might have already exited)");
448 }
449
450 debug!("Waiting for subprocess to exit");
451 match process.wait().await {
452 Ok(status) => {
453 if status.success() {
454 info!("Subprocess terminated successfully");
455 } else {
456 warn!(exit_code = status.code(), "Subprocess terminated with error");
457 }
458 }
459 Err(e) => {
460 warn!(error = %e, "Error waiting for subprocess to terminate");
461 }
462 }
463 } else {
464 debug!("No active subprocess to disconnect");
465 }
466 Ok(())
467 }
468
469 #[instrument(level = "debug", skip(self))]
471 fn receive_messages(&mut self) -> Pin<Box<dyn Stream<Item = Result<HashMap<String, serde_json::Value>, ClaudeSDKError>> + Send + '_>> {
472 if let Some(process) = &mut self.process {
473 if let Some(stdout) = process.stdout.take() {
474 debug!("Setting up message stream from subprocess stdout");
475 let reader = BufReader::new(stdout);
476 let mut lines_stream = LinesStream::new(reader.lines());
477
478 let stream = async_stream::stream! {
481 while let Some(line_result) = lines_stream.next().await {
482 match line_result {
483 Ok(line) => {
484 if let Some(result) = self.process_line(line) {
485 yield result;
486 }
487 }
489 Err(e) => {
490 error!(error = %e, "Error reading line from subprocess stdout");
491 yield Err(ClaudeSDKError::Io(e));
492 }
493 }
494 }
495
496 if !self.json_buffer.is_empty() {
498 warn!(
499 buffer_length = self.json_buffer.len(),
500 buffer_preview = %self.safety_limits.safe_log_preview(&self.json_buffer),
501 "Stream ended with incomplete JSON buffer"
502 );
503 if let Some(result) = self.try_parse_json_buffer() {
505 yield result;
506 } else {
507 let error = ClaudeSDKError::CLIJSONDecode(
509 CLIJSONDecodeError::new(
510 &self.json_buffer,
511 serde_json::Error::io(std::io::Error::new(
512 std::io::ErrorKind::InvalidData,
513 "Incomplete JSON at end of stream"
514 ))
515 )
516 );
517 yield Err(error);
518 self.json_buffer.clear();
519 }
520 }
521 };
522
523 return Box::pin(stream);
524 } else {
525 warn!("No stdout available from subprocess");
526 }
527 } else {
528 warn!("No active subprocess to receive messages from");
529 }
530
531 debug!("Returning empty message stream");
533 Box::pin(tokio_stream::empty())
534 }
535
536 #[instrument(level = "trace", skip(self))]
538 fn is_connected(&self) -> bool {
539 let is_connected = if let Some(_process) = &self.process {
540 true
544 } else {
545 false
546 };
547 debug!(is_connected, "Checked connection status");
548 is_connected
549 }
550}