use std::path::PathBuf;
use std::process::Stdio;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use bamboo_agent_core::{AgentEvent, TokenUsage};
use bamboo_infrastructure::ProcessRegistry;
use super::command::create_tokio_command_with_env;
use super::stream_json::ClaudeStreamJsonParser;
#[derive(Debug, Clone)]
pub struct ClaudeCodeCliConfig {
pub claude_path: String,
pub project_path: PathBuf,
pub prompt: String,
pub session_id: String,
pub anthropic_base_url: String,
pub json_schema: Option<String>,
pub skip_permissions: bool,
pub include_partial_messages: bool,
}
pub async fn spawn_claude_code_cli(
registry: Arc<ProcessRegistry>,
event_sender: broadcast::Sender<AgentEvent>,
cancel_token: CancellationToken,
config: ClaudeCodeCliConfig,
) -> Result<i64, String> {
let mut cmd = create_tokio_command_with_env(&config.claude_path);
cmd.current_dir(&config.project_path);
cmd.env("ANTHROPIC_BASE_URL", &config.anthropic_base_url);
cmd.arg("-p");
cmd.arg("--output-format").arg("stream-json");
cmd.arg("--verbose");
if config.include_partial_messages {
cmd.arg("--include-partial-messages");
}
if config.skip_permissions {
cmd.arg("--dangerously-skip-permissions");
}
cmd.arg("--session-id").arg(&config.session_id);
if let Some(schema) = &config.json_schema {
cmd.arg("--json-schema").arg(schema);
}
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.map_err(|e| format!("Failed to spawn Claude Code CLI: {e}"))?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(config.prompt.as_bytes())
.await
.map_err(|e| format!("Failed writing Claude stdin: {e}"))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| format!("Failed writing Claude stdin: {e}"))?;
let _ = stdin.shutdown().await;
}
let pid = child.id().unwrap_or(0);
let stdout = child
.stdout
.take()
.ok_or_else(|| "Failed to capture Claude stdout".to_string())?;
let stderr = child
.stderr
.take()
.ok_or_else(|| "Failed to capture Claude stderr".to_string())?;
let child_arc = Arc::new(Mutex::new(Some(child)));
let run_id = registry
.register_claude_session(
config.session_id.clone(),
pid,
config.project_path.to_string_lossy().to_string(),
config.prompt.clone(),
"claude-code-cli".to_string(),
child_arc.clone(),
)
.await?;
let terminal_sent = Arc::new(AtomicBool::new(false));
{
let registry = registry.clone();
let event_sender = event_sender.clone();
let terminal_sent = terminal_sent.clone();
tokio::spawn(async move {
let mut parser = ClaudeStreamJsonParser::default();
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
let _ = registry.append_live_output(run_id, &line).await;
for event in parser.parse_line(&line) {
if matches!(
event,
AgentEvent::Complete { .. } | AgentEvent::Error { .. }
) {
terminal_sent.store(true, Ordering::SeqCst);
}
let _ = event_sender.send(event);
}
}
});
}
{
let registry = registry.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
let _ = registry
.append_live_output(run_id, &format!("[stderr] {line}"))
.await;
}
});
}
{
let registry = registry.clone();
let event_sender = event_sender.clone();
let terminal_sent = terminal_sent.clone();
let child_arc = child_arc.clone();
tokio::spawn(async move {
let wait_fut = async {
let child = {
let mut guard = child_arc.lock().map_err(|e| e.to_string())?;
guard.take()
};
let mut child = match child {
Some(c) => c,
None => {
return Err::<std::process::ExitStatus, String>(
"Child already taken".to_string(),
)
}
};
child
.wait()
.await
.map_err(|e| format!("Failed to wait for Claude process: {e}"))
};
let status = tokio::select! {
_ = cancel_token.cancelled() => {
let _ = registry.kill_process(run_id).await;
Err("cancelled".to_string())
}
s = wait_fut => s,
};
if !terminal_sent.load(Ordering::SeqCst) {
match status {
Ok(exit_status) if exit_status.success() => {
let _ = event_sender.send(AgentEvent::Complete {
usage: TokenUsage {
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
},
});
}
Ok(exit_status) => {
let _ = event_sender.send(AgentEvent::Error {
message: format!("Claude Code CLI exited with status: {exit_status}"),
});
}
Err(e) if e == "cancelled" => {
let _ = event_sender.send(AgentEvent::Error {
message: "Claude Code execution cancelled".to_string(),
});
}
Err(e) => {
let _ = event_sender.send(AgentEvent::Error { message: e });
}
}
}
let _ = registry.unregister_process(run_id).await;
});
}
Ok(run_id)
}