bamboo_agent/claude/
runner.rs1use std::path::PathBuf;
8use std::process::Stdio;
9use std::sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc, Mutex,
12};
13
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::sync::broadcast;
16use tokio_util::sync::CancellationToken;
17
18use crate::agent::core::{AgentEvent, TokenUsage};
19use crate::process::ProcessRegistry;
20
21use super::command::create_tokio_command_with_env;
22use super::stream_json::ClaudeStreamJsonParser;
23
24#[derive(Debug, Clone)]
25pub struct ClaudeCodeCliConfig {
26 pub claude_path: String,
27 pub project_path: PathBuf,
28 pub prompt: String,
29 pub session_id: String,
31 pub anthropic_base_url: String,
33 pub json_schema: Option<String>,
35 pub skip_permissions: bool,
37 pub include_partial_messages: bool,
39}
40
41pub async fn spawn_claude_code_cli(
45 registry: Arc<ProcessRegistry>,
46 event_sender: broadcast::Sender<AgentEvent>,
47 cancel_token: CancellationToken,
48 config: ClaudeCodeCliConfig,
49) -> Result<i64, String> {
50 let mut cmd = create_tokio_command_with_env(&config.claude_path);
51
52 cmd.current_dir(&config.project_path);
53 cmd.env("ANTHROPIC_BASE_URL", &config.anthropic_base_url);
54
55 cmd.arg("-p").arg(&config.prompt);
57 cmd.arg("--output-format").arg("stream-json");
58 cmd.arg("--verbose");
60
61 if config.include_partial_messages {
62 cmd.arg("--include-partial-messages");
63 }
64 if config.skip_permissions {
65 cmd.arg("--dangerously-skip-permissions");
66 }
67 cmd.arg("--session-id").arg(&config.session_id);
68
69 if let Some(schema) = &config.json_schema {
70 cmd.arg("--json-schema").arg(schema);
71 }
72
73 cmd.stdout(Stdio::piped());
74 cmd.stderr(Stdio::piped());
75
76 let mut child = cmd
77 .spawn()
78 .map_err(|e| format!("Failed to spawn Claude Code CLI: {e}"))?;
79
80 let pid = child.id().unwrap_or(0);
81 let stdout = child
82 .stdout
83 .take()
84 .ok_or_else(|| "Failed to capture Claude stdout".to_string())?;
85 let stderr = child
86 .stderr
87 .take()
88 .ok_or_else(|| "Failed to capture Claude stderr".to_string())?;
89
90 let child_arc = Arc::new(Mutex::new(Some(child)));
91
92 let run_id = registry
93 .register_claude_session(
94 config.session_id.clone(),
95 pid,
96 config.project_path.to_string_lossy().to_string(),
97 config.prompt.clone(),
98 "claude-code-cli".to_string(),
99 child_arc.clone(),
100 )
101 .await?;
102
103 let terminal_sent = Arc::new(AtomicBool::new(false));
104
105 {
107 let registry = registry.clone();
108 let event_sender = event_sender.clone();
109 let terminal_sent = terminal_sent.clone();
110 tokio::spawn(async move {
111 let mut parser = ClaudeStreamJsonParser::default();
112 let mut reader = BufReader::new(stdout).lines();
113
114 while let Ok(Some(line)) = reader.next_line().await {
115 let _ = registry.append_live_output(run_id, &line).await;
116
117 for event in parser.parse_line(&line) {
118 if matches!(
119 event,
120 AgentEvent::Complete { .. } | AgentEvent::Error { .. }
121 ) {
122 terminal_sent.store(true, Ordering::SeqCst);
123 }
124 let _ = event_sender.send(event);
125 }
126 }
127 });
128 }
129
130 {
132 let registry = registry.clone();
133 tokio::spawn(async move {
134 let mut reader = BufReader::new(stderr).lines();
135 while let Ok(Some(line)) = reader.next_line().await {
136 let _ = registry
137 .append_live_output(run_id, &format!("[stderr] {line}"))
138 .await;
139 }
140 });
141 }
142
143 {
145 let registry = registry.clone();
146 let event_sender = event_sender.clone();
147 let terminal_sent = terminal_sent.clone();
148 let child_arc = child_arc.clone();
149 tokio::spawn(async move {
150 let wait_fut = async {
151 let child = {
153 let mut guard = child_arc.lock().map_err(|e| e.to_string())?;
154 guard.take()
155 };
156
157 let mut child = match child {
158 Some(c) => c,
159 None => {
160 return Err::<std::process::ExitStatus, String>(
161 "Child already taken".to_string(),
162 )
163 }
164 };
165
166 child
167 .wait()
168 .await
169 .map_err(|e| format!("Failed to wait for Claude process: {e}"))
170 };
171
172 let status = tokio::select! {
173 _ = cancel_token.cancelled() => {
174 let _ = registry.kill_process(run_id).await;
175 Err("cancelled".to_string())
176 }
177 s = wait_fut => s.map_err(|e| e),
178 };
179
180 if !terminal_sent.load(Ordering::SeqCst) {
182 match status {
183 Ok(exit_status) if exit_status.success() => {
184 let _ = event_sender.send(AgentEvent::Complete {
185 usage: TokenUsage {
186 prompt_tokens: 0,
187 completion_tokens: 0,
188 total_tokens: 0,
189 },
190 });
191 }
192 Ok(exit_status) => {
193 let _ = event_sender.send(AgentEvent::Error {
194 message: format!("Claude Code CLI exited with status: {exit_status}"),
195 });
196 }
197 Err(e) if e == "cancelled" => {
198 let _ = event_sender.send(AgentEvent::Error {
199 message: "Claude Code execution cancelled".to_string(),
200 });
201 }
202 Err(e) => {
203 let _ = event_sender.send(AgentEvent::Error { message: e });
204 }
205 }
206 }
207
208 let _ = registry.unregister_process(run_id).await;
209 });
210 }
211
212 Ok(run_id)
213}