use async_trait::async_trait;
use std::collections::HashMap;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tokio_stream::Stream;
use tracing::{debug, error, trace, warn};
use super::Transport;
use crate::errors::{ClaudeSDKError, Result};
use crate::types::*;
const DEFAULT_MAX_BUFFER_SIZE: usize = 1024 * 1024;
const DEFAULT_CLI_PATH: &str = "claude";
pub struct SubprocessTransport {
cli_path: PathBuf,
args: Vec<String>,
env: HashMap<String, String>,
max_buffer_size: usize,
process: Option<Child>,
stdin: Option<Arc<Mutex<tokio::process::ChildStdin>>>,
stdout_rx: Option<tokio::sync::mpsc::Receiver<Result<serde_json::Value>>>,
stderr_callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
ready: bool,
streaming_mode: bool,
#[allow(dead_code)]
initial_prompt: Option<String>,
cwd: Option<PathBuf>,
}
impl SubprocessTransport {
pub fn new(options: &ClaudeAgentOptions, initial_prompt: Option<String>) -> Result<Self> {
let cli_path = options
.cli_path
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_CLI_PATH));
if !cli_path.exists() {
if which::which(&cli_path).is_err() {
return Err(ClaudeSDKError::cli_not_found(format!(
"Claude CLI not found at '{}'. Please ensure Claude Code is installed.",
cli_path.display()
)));
}
}
let streaming_mode = initial_prompt.is_none();
let args = Self::build_args(options, streaming_mode, initial_prompt.as_deref())?;
let env = Self::build_env(options);
let max_buffer_size = options.max_buffer_size.unwrap_or(DEFAULT_MAX_BUFFER_SIZE);
Ok(Self {
cli_path,
args,
env,
max_buffer_size,
process: None,
stdin: None,
stdout_rx: None,
stderr_callback: options.stderr.clone(),
ready: false,
streaming_mode,
initial_prompt,
cwd: options.cwd.clone(),
})
}
fn build_args(
options: &ClaudeAgentOptions,
streaming_mode: bool,
initial_prompt: Option<&str>,
) -> Result<Vec<String>> {
let mut args = Vec::new();
args.push("--output-format".to_string());
args.push("stream-json".to_string());
args.push("--verbose".to_string());
if streaming_mode {
args.push("--input-format".to_string());
args.push("stream-json".to_string());
}
match &options.system_prompt {
None => {
args.push("--system-prompt".to_string());
args.push(String::new());
}
Some(SystemPromptConfig::Text(text)) => {
args.push("--system-prompt".to_string());
args.push(text.clone());
}
Some(SystemPromptConfig::Preset(preset)) => {
if let Some(ref append) = preset.append {
args.push("--append-system-prompt".to_string());
args.push(append.clone());
}
}
}
if let Some(mode) = options.permission_mode {
args.push("--permission-mode".to_string());
args.push(
match mode {
PermissionMode::Default => "default",
PermissionMode::AcceptEdits => "acceptEdits",
PermissionMode::Plan => "plan",
PermissionMode::BypassPermissions => "bypassPermissions",
}
.to_string(),
);
}
if let Some(ref model) = options.model {
args.push("--model".to_string());
args.push(model.clone());
}
if let Some(ref model) = options.fallback_model {
args.push("--fallback-model".to_string());
args.push(model.clone());
}
if let Some(turns) = options.max_turns {
args.push("--max-turns".to_string());
args.push(turns.to_string());
}
if let Some(budget) = options.max_budget_usd {
args.push("--max-budget-usd".to_string());
args.push(budget.to_string());
}
if let Some(tokens) = options.max_thinking_tokens {
args.push("--max-thinking-tokens".to_string());
args.push(tokens.to_string());
}
if options.continue_conversation {
args.push("--continue".to_string());
}
if let Some(ref session) = options.resume {
args.push("--resume".to_string());
args.push(session.clone());
}
if options.fork_session {
args.push("--fork-session".to_string());
}
for tool in &options.allowed_tools {
args.push("--allowed-tools".to_string());
args.push(tool.clone());
}
for tool in &options.disallowed_tools {
args.push("--disallowed-tools".to_string());
args.push(tool.clone());
}
if let Some(ref tools) = options.tools {
match tools {
ToolsConfig::List(list) => {
for tool in list {
args.push("--tools".to_string());
args.push(tool.clone());
}
}
ToolsConfig::Preset(preset) => {
args.push("--tools-preset".to_string());
args.push(preset.preset.clone());
}
}
}
match &options.mcp_servers {
McpServersConfig::Path(path) => {
args.push("--mcp-config".to_string());
args.push(path.to_string_lossy().to_string());
}
McpServersConfig::Map(servers) if !servers.is_empty() => {
let json = serde_json::to_string(servers).map_err(|e| {
ClaudeSDKError::configuration(format!("Failed to serialize MCP servers: {}", e))
})?;
args.push("--mcp-servers".to_string());
args.push(json);
}
_ => {}
}
if let Some(ref user) = options.user {
args.push("--user".to_string());
args.push(user.clone());
}
if let Some(ref settings) = options.settings {
args.push("--settings".to_string());
args.push(settings.clone());
}
if let Some(ref sources) = options.setting_sources {
for source in sources {
args.push("--setting-source".to_string());
args.push(
match source {
SettingSource::User => "user",
SettingSource::Project => "project",
SettingSource::Local => "local",
}
.to_string(),
);
}
}
for dir in &options.add_dirs {
args.push("--add-dir".to_string());
args.push(dir.to_string_lossy().to_string());
}
if options.include_partial_messages {
args.push("--include-partial-messages".to_string());
}
if options.enable_file_checkpointing {
args.push("--enable-file-checkpointing".to_string());
}
if let Some(ref sandbox) = options.sandbox {
let json = serde_json::to_string(sandbox).map_err(|e| {
ClaudeSDKError::configuration(format!(
"Failed to serialize sandbox settings: {}",
e
))
})?;
args.push("--sandbox".to_string());
args.push(json);
}
if let Some(ref format) = options.output_format {
let json = serde_json::to_string(format).map_err(|e| {
ClaudeSDKError::configuration(format!("Failed to serialize output format: {}", e))
})?;
args.push("--output-format-schema".to_string());
args.push(json);
}
if let Some(ref agents) = options.agents {
let json = serde_json::to_string(agents).map_err(|e| {
ClaudeSDKError::configuration(format!("Failed to serialize agents: {}", e))
})?;
args.push("--agents".to_string());
args.push(json);
}
for beta in &options.betas {
args.push("--beta".to_string());
args.push(
serde_json::to_string(beta)
.unwrap_or_else(|_| format!("{:?}", beta))
.trim_matches('"')
.to_string(),
);
}
for (key, value) in &options.extra_args {
args.push(format!("--{}", key));
if let Some(v) = value {
args.push(v.clone());
}
}
if !streaming_mode {
if let Some(prompt) = initial_prompt {
args.push("--print".to_string());
args.push("--".to_string());
args.push(prompt.to_string());
}
}
Ok(args)
}
fn build_env(options: &ClaudeAgentOptions) -> HashMap<String, String> {
let mut env = std::env::vars().collect::<HashMap<_, _>>();
for (key, value) in &options.env {
env.insert(key.clone(), value.clone());
}
env.insert("CLAUDE_SDK".to_string(), "true".to_string());
env
}
fn spawn_stdout_reader(
stdout: tokio::process::ChildStdout,
max_buffer_size: usize,
) -> tokio::sync::mpsc::Receiver<Result<serde_json::Value>> {
let (tx, rx) = tokio::sync::mpsc::channel(256);
tokio::spawn(async move {
let reader = BufReader::with_capacity(max_buffer_size, stdout);
let mut lines = reader.lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
let display_len = line.len().min(200);
trace!("Received line from CLI: {}", &line[..display_len]);
let result = match serde_json::from_str(&line) {
Ok(value) => Ok(value),
Err(e) => Err(ClaudeSDKError::json_decode_with_context(
"Failed to parse JSON from CLI",
Some(line),
None,
e,
)),
};
if tx.send(result).await.is_err() {
debug!("Stdout reader: receiver dropped");
break;
}
}
Ok(None) => {
debug!("Stdout reader: EOF received");
break;
}
Err(e) => {
let _ = tx
.send(Err(ClaudeSDKError::cli_connection_with_source(
"Failed to read from CLI stdout",
e,
)))
.await;
break;
}
}
}
debug!("Stdout reader task finished");
});
rx
}
fn spawn_stderr_reader(
stderr: tokio::process::ChildStderr,
callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
) {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
trace!("CLI stderr: {}", line);
if let Some(ref cb) = callback {
cb(line);
}
}
Ok(None) => {
break;
}
Err(e) => {
warn!("Error reading stderr: {}", e);
break;
}
}
}
debug!("Stderr reader task finished");
});
}
}
#[async_trait]
impl Transport for SubprocessTransport {
async fn connect(&mut self) -> Result<()> {
debug!(
"Starting CLI process: {} {:?}",
self.cli_path.display(),
self.args
);
let mut cmd = Command::new(&self.cli_path);
cmd.args(&self.args)
.envs(&self.env)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
if self.streaming_mode {
cmd.stdin(Stdio::piped());
} else {
cmd.stdin(Stdio::null());
}
if let Some(ref cwd) = self.cwd {
cmd.current_dir(cwd);
}
let mut child = cmd.spawn().map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
ClaudeSDKError::cli_not_found(format!(
"Failed to start Claude CLI at '{}': {}",
self.cli_path.display(),
e
))
} else {
ClaudeSDKError::cli_connection_with_source(
format!("Failed to start Claude CLI: {}", e),
e,
)
}
})?;
if self.streaming_mode {
let stdin = child.stdin.take().ok_or_else(|| {
ClaudeSDKError::cli_connection("Failed to open stdin to CLI process")
})?;
self.stdin = Some(Arc::new(Mutex::new(stdin)));
}
let stdout = child.stdout.take().ok_or_else(|| {
ClaudeSDKError::cli_connection("Failed to open stdout from CLI process")
})?;
self.stdout_rx = Some(Self::spawn_stdout_reader(stdout, self.max_buffer_size));
if let Some(stderr) = child.stderr.take() {
Self::spawn_stderr_reader(stderr, self.stderr_callback.clone());
}
self.process = Some(child);
self.ready = true;
debug!("CLI process started successfully");
Ok(())
}
async fn write(&self, data: &str) -> Result<()> {
let stdin = self
.stdin
.as_ref()
.ok_or_else(|| ClaudeSDKError::cli_connection("Transport not connected"))?;
let mut stdin_guard = stdin.lock().await;
trace!("Writing to CLI: {}", &data[..data.len().min(200)]);
stdin_guard.write_all(data.as_bytes()).await.map_err(|e| {
ClaudeSDKError::cli_connection_with_source("Failed to write to CLI stdin", e)
})?;
stdin_guard.write_all(b"\n").await.map_err(|e| {
ClaudeSDKError::cli_connection_with_source("Failed to write newline to CLI stdin", e)
})?;
stdin_guard.flush().await.map_err(|e| {
ClaudeSDKError::cli_connection_with_source("Failed to flush CLI stdin", e)
})?;
Ok(())
}
fn message_stream(&self) -> Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send + '_>> {
Box::pin(futures::stream::unfold((), |_| async {
None
}))
}
async fn close(&mut self) -> Result<()> {
self.ready = false;
if let Some(stdin) = self.stdin.take() {
drop(stdin);
}
if let Some(mut process) = self.process.take() {
match tokio::time::timeout(std::time::Duration::from_secs(2), process.wait()).await {
Ok(Ok(status)) => {
debug!("CLI process exited with status: {:?}", status);
}
Ok(Err(e)) => {
error!("Error waiting for CLI process: {}", e);
}
Err(_) => {
warn!("CLI process did not exit in time, killing");
let _ = process.kill().await;
}
}
}
Ok(())
}
async fn end_input(&self) -> Result<()> {
if let Some(stdin) = &self.stdin {
let mut guard = stdin.lock().await;
guard.shutdown().await.map_err(|e| {
ClaudeSDKError::cli_connection_with_source("Failed to shutdown stdin", e)
})?;
}
Ok(())
}
fn is_ready(&self) -> bool {
self.ready
}
}
impl SubprocessTransport {
pub fn take_stdout_rx(
&mut self,
) -> Option<tokio::sync::mpsc::Receiver<Result<serde_json::Value>>> {
self.stdout_rx.take()
}
pub fn is_streaming_mode(&self) -> bool {
self.streaming_mode
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_args_basic() {
let options = ClaudeAgentOptions::default();
let args = SubprocessTransport::build_args(&options, true, None).unwrap();
assert!(args.contains(&"--output-format".to_string()));
assert!(args.contains(&"stream-json".to_string()));
assert!(args.contains(&"--verbose".to_string()));
assert!(args.contains(&"--input-format".to_string()));
}
#[test]
fn test_build_args_with_model() {
let options = ClaudeAgentOptions::new().with_model("claude-3-sonnet");
let args = SubprocessTransport::build_args(&options, true, None).unwrap();
assert!(args.contains(&"--model".to_string()));
assert!(args.contains(&"claude-3-sonnet".to_string()));
}
#[test]
fn test_build_args_non_streaming() {
let options = ClaudeAgentOptions::default();
let args = SubprocessTransport::build_args(&options, false, Some("Hello")).unwrap();
assert!(args.contains(&"--print".to_string()));
assert!(args.contains(&"Hello".to_string()));
assert!(!args.contains(&"--input-format".to_string()));
}
#[test]
fn test_build_env() {
let mut options = ClaudeAgentOptions::default();
options
.env
.insert("CUSTOM_VAR".to_string(), "value".to_string());
let env = SubprocessTransport::build_env(&options);
assert_eq!(env.get("CLAUDE_SDK"), Some(&"true".to_string()));
assert_eq!(env.get("CUSTOM_VAR"), Some(&"value".to_string()));
}
#[test]
fn test_build_args_system_prompt_none() {
let options = ClaudeAgentOptions::default();
let args = SubprocessTransport::build_args(&options, true, None).unwrap();
let sp_idx = args.iter().position(|a| a == "--system-prompt");
assert!(sp_idx.is_some(), "Should have --system-prompt flag");
assert_eq!(
args[sp_idx.unwrap() + 1],
"",
"System prompt should be empty string"
);
}
#[test]
fn test_build_args_system_prompt_string() {
let options = ClaudeAgentOptions::new().with_system_prompt("You are a pirate.");
let args = SubprocessTransport::build_args(&options, true, None).unwrap();
let sp_idx = args.iter().position(|a| a == "--system-prompt");
assert!(sp_idx.is_some(), "Should have --system-prompt flag");
assert_eq!(
args[sp_idx.unwrap() + 1],
"You are a pirate.",
"System prompt should match"
);
}
#[test]
fn test_build_args_system_prompt_preset_no_append() {
use crate::types::{SystemPromptConfig, SystemPromptPreset};
let mut options = ClaudeAgentOptions::new();
options.system_prompt = Some(SystemPromptConfig::Preset(SystemPromptPreset {
preset_type: "preset".to_string(),
preset: "claude_code".to_string(),
append: None,
}));
let args = SubprocessTransport::build_args(&options, true, None).unwrap();
assert!(
!args.contains(&"--system-prompt".to_string()),
"Should NOT have --system-prompt flag for preset"
);
assert!(
!args.contains(&"--append-system-prompt".to_string()),
"Should NOT have --append-system-prompt flag without append"
);
}
#[test]
fn test_build_args_system_prompt_preset_with_append() {
use crate::types::{SystemPromptConfig, SystemPromptPreset};
let mut options = ClaudeAgentOptions::new();
options.system_prompt = Some(SystemPromptConfig::Preset(SystemPromptPreset {
preset_type: "preset".to_string(),
preset: "claude_code".to_string(),
append: Some("Be concise.".to_string()),
}));
let args = SubprocessTransport::build_args(&options, true, None).unwrap();
assert!(
!args.contains(&"--system-prompt".to_string()),
"Should NOT have --system-prompt flag for preset"
);
let append_idx = args.iter().position(|a| a == "--append-system-prompt");
assert!(
append_idx.is_some(),
"Should have --append-system-prompt flag"
);
assert_eq!(
args[append_idx.unwrap() + 1],
"Be concise.",
"Append text should match"
);
}
}