bamboo_agent/claude/
runner.rs1use std::path::PathBuf;
8use std::process::Stdio;
9use std::sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc, Mutex,
12};
13
14use tokio::io::AsyncWriteExt;
15use tokio::io::{AsyncBufReadExt, BufReader};
16use tokio::sync::broadcast;
17use tokio_util::sync::CancellationToken;
18
19use crate::agent::core::{AgentEvent, TokenUsage};
20use crate::process::ProcessRegistry;
21
22use super::command::create_tokio_command_with_env;
23use super::stream_json::ClaudeStreamJsonParser;
24
25#[derive(Debug, Clone)]
26pub struct ClaudeCodeCliConfig {
27 pub claude_path: String,
28 pub project_path: PathBuf,
29 pub prompt: String,
30 pub session_id: String,
32 pub anthropic_base_url: String,
34 pub json_schema: Option<String>,
36 pub skip_permissions: bool,
38 pub include_partial_messages: bool,
40}
41
42pub async fn spawn_claude_code_cli(
46 registry: Arc<ProcessRegistry>,
47 event_sender: broadcast::Sender<AgentEvent>,
48 cancel_token: CancellationToken,
49 config: ClaudeCodeCliConfig,
50) -> Result<i64, String> {
51 let mut cmd = create_tokio_command_with_env(&config.claude_path);
52
53 cmd.current_dir(&config.project_path);
54 cmd.env("ANTHROPIC_BASE_URL", &config.anthropic_base_url);
55
56 cmd.arg("-p");
59 cmd.arg("--output-format").arg("stream-json");
60 cmd.arg("--verbose");
62
63 if config.include_partial_messages {
64 cmd.arg("--include-partial-messages");
65 }
66 if config.skip_permissions {
67 cmd.arg("--dangerously-skip-permissions");
68 }
69 cmd.arg("--session-id").arg(&config.session_id);
70
71 if let Some(schema) = &config.json_schema {
72 cmd.arg("--json-schema").arg(schema);
73 }
74
75 cmd.stdin(Stdio::piped());
76 cmd.stdout(Stdio::piped());
77 cmd.stderr(Stdio::piped());
78
79 let mut child = cmd
80 .spawn()
81 .map_err(|e| format!("Failed to spawn Claude Code CLI: {e}"))?;
82
83 if let Some(mut stdin) = child.stdin.take() {
84 stdin
85 .write_all(config.prompt.as_bytes())
86 .await
87 .map_err(|e| format!("Failed writing Claude stdin: {e}"))?;
88 stdin
89 .write_all(b"\n")
90 .await
91 .map_err(|e| format!("Failed writing Claude stdin: {e}"))?;
92 let _ = stdin.shutdown().await;
93 }
94
95 let pid = child.id().unwrap_or(0);
96 let stdout = child
97 .stdout
98 .take()
99 .ok_or_else(|| "Failed to capture Claude stdout".to_string())?;
100 let stderr = child
101 .stderr
102 .take()
103 .ok_or_else(|| "Failed to capture Claude stderr".to_string())?;
104
105 let child_arc = Arc::new(Mutex::new(Some(child)));
106
107 let run_id = registry
108 .register_claude_session(
109 config.session_id.clone(),
110 pid,
111 config.project_path.to_string_lossy().to_string(),
112 config.prompt.clone(),
113 "claude-code-cli".to_string(),
114 child_arc.clone(),
115 )
116 .await?;
117
118 let terminal_sent = Arc::new(AtomicBool::new(false));
119
120 {
122 let registry = registry.clone();
123 let event_sender = event_sender.clone();
124 let terminal_sent = terminal_sent.clone();
125 tokio::spawn(async move {
126 let mut parser = ClaudeStreamJsonParser::default();
127 let mut reader = BufReader::new(stdout).lines();
128
129 while let Ok(Some(line)) = reader.next_line().await {
130 let _ = registry.append_live_output(run_id, &line).await;
131
132 for event in parser.parse_line(&line) {
133 if matches!(
134 event,
135 AgentEvent::Complete { .. } | AgentEvent::Error { .. }
136 ) {
137 terminal_sent.store(true, Ordering::SeqCst);
138 }
139 let _ = event_sender.send(event);
140 }
141 }
142 });
143 }
144
145 {
147 let registry = registry.clone();
148 tokio::spawn(async move {
149 let mut reader = BufReader::new(stderr).lines();
150 while let Ok(Some(line)) = reader.next_line().await {
151 let _ = registry
152 .append_live_output(run_id, &format!("[stderr] {line}"))
153 .await;
154 }
155 });
156 }
157
158 {
160 let registry = registry.clone();
161 let event_sender = event_sender.clone();
162 let terminal_sent = terminal_sent.clone();
163 let child_arc = child_arc.clone();
164 tokio::spawn(async move {
165 let wait_fut = async {
166 let child = {
168 let mut guard = child_arc.lock().map_err(|e| e.to_string())?;
169 guard.take()
170 };
171
172 let mut child = match child {
173 Some(c) => c,
174 None => {
175 return Err::<std::process::ExitStatus, String>(
176 "Child already taken".to_string(),
177 )
178 }
179 };
180
181 child
182 .wait()
183 .await
184 .map_err(|e| format!("Failed to wait for Claude process: {e}"))
185 };
186
187 let status = tokio::select! {
188 _ = cancel_token.cancelled() => {
189 let _ = registry.kill_process(run_id).await;
190 Err("cancelled".to_string())
191 }
192 s = wait_fut => s,
193 };
194
195 if !terminal_sent.load(Ordering::SeqCst) {
197 match status {
198 Ok(exit_status) if exit_status.success() => {
199 let _ = event_sender.send(AgentEvent::Complete {
200 usage: TokenUsage {
201 prompt_tokens: 0,
202 completion_tokens: 0,
203 total_tokens: 0,
204 },
205 });
206 }
207 Ok(exit_status) => {
208 let _ = event_sender.send(AgentEvent::Error {
209 message: format!("Claude Code CLI exited with status: {exit_status}"),
210 });
211 }
212 Err(e) if e == "cancelled" => {
213 let _ = event_sender.send(AgentEvent::Error {
214 message: "Claude Code execution cancelled".to_string(),
215 });
216 }
217 Err(e) => {
218 let _ = event_sender.send(AgentEvent::Error { message: e });
219 }
220 }
221 }
222
223 let _ = registry.unregister_process(run_id).await;
224 });
225 }
226
227 Ok(run_id)
228}