Skip to main content

bamboo_agent/claude/
runner.rs

1//! Claude Code CLI runner (streaming).
2//!
3//! This module spawns `claude` with `--output-format stream-json`, converts the
4//! output stream into Bamboo `AgentEvent`s, and registers the process in
5//! `ProcessRegistry` for cancellation and monitoring.
6
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::sync::{
10    atomic::{AtomicBool, Ordering},
11    Arc, Mutex,
12};
13
14use tokio::io::AsyncWriteExt;
15use tokio::io::{AsyncBufReadExt, BufReader};
16use tokio::sync::broadcast;
17use tokio_util::sync::CancellationToken;
18
19use crate::agent::core::{AgentEvent, TokenUsage};
20use crate::process::ProcessRegistry;
21
22use super::command::create_tokio_command_with_env;
23use super::stream_json::ClaudeStreamJsonParser;
24
25#[derive(Debug, Clone)]
26pub struct ClaudeCodeCliConfig {
27    pub claude_path: String,
28    pub project_path: PathBuf,
29    pub prompt: String,
30    /// A UUID string. Claude Code requires session ids to be valid UUIDs.
31    pub session_id: String,
32    /// e.g. `http://localhost:9562/anthropic` (Claude will append `/v1/...`)
33    pub anthropic_base_url: String,
34    /// Optional structured output schema (passed as `--json-schema`).
35    pub json_schema: Option<String>,
36    /// If true, pass `--dangerously-skip-permissions`.
37    pub skip_permissions: bool,
38    /// If true, pass `--include-partial-messages` to improve streaming UX.
39    pub include_partial_messages: bool,
40}
41
42/// Spawn Claude Code CLI and stream its output as `AgentEvent`s.
43///
44/// Returns the registered `run_id` from the `ProcessRegistry`.
45pub async fn spawn_claude_code_cli(
46    registry: Arc<ProcessRegistry>,
47    event_sender: broadcast::Sender<AgentEvent>,
48    cancel_token: CancellationToken,
49    config: ClaudeCodeCliConfig,
50) -> Result<i64, String> {
51    let mut cmd = create_tokio_command_with_env(&config.claude_path);
52
53    cmd.current_dir(&config.project_path);
54    cmd.env("ANTHROPIC_BASE_URL", &config.anthropic_base_url);
55
56    // Non-interactive prompt mode.
57    // Use stdin for the prompt to avoid command-line quoting issues (notably on Windows).
58    cmd.arg("-p");
59    cmd.arg("--output-format").arg("stream-json");
60    // Claude Code requires `--verbose` when using `--print/-p` with `--output-format=stream-json`.
61    cmd.arg("--verbose");
62
63    if config.include_partial_messages {
64        cmd.arg("--include-partial-messages");
65    }
66    if config.skip_permissions {
67        cmd.arg("--dangerously-skip-permissions");
68    }
69    cmd.arg("--session-id").arg(&config.session_id);
70
71    if let Some(schema) = &config.json_schema {
72        cmd.arg("--json-schema").arg(schema);
73    }
74
75    cmd.stdin(Stdio::piped());
76    cmd.stdout(Stdio::piped());
77    cmd.stderr(Stdio::piped());
78
79    let mut child = cmd
80        .spawn()
81        .map_err(|e| format!("Failed to spawn Claude Code CLI: {e}"))?;
82
83    if let Some(mut stdin) = child.stdin.take() {
84        stdin
85            .write_all(config.prompt.as_bytes())
86            .await
87            .map_err(|e| format!("Failed writing Claude stdin: {e}"))?;
88        stdin
89            .write_all(b"\n")
90            .await
91            .map_err(|e| format!("Failed writing Claude stdin: {e}"))?;
92        let _ = stdin.shutdown().await;
93    }
94
95    let pid = child.id().unwrap_or(0);
96    let stdout = child
97        .stdout
98        .take()
99        .ok_or_else(|| "Failed to capture Claude stdout".to_string())?;
100    let stderr = child
101        .stderr
102        .take()
103        .ok_or_else(|| "Failed to capture Claude stderr".to_string())?;
104
105    let child_arc = Arc::new(Mutex::new(Some(child)));
106
107    let run_id = registry
108        .register_claude_session(
109            config.session_id.clone(),
110            pid,
111            config.project_path.to_string_lossy().to_string(),
112            config.prompt.clone(),
113            "claude-code-cli".to_string(),
114            child_arc.clone(),
115        )
116        .await?;
117
118    let terminal_sent = Arc::new(AtomicBool::new(false));
119
120    // Stream stdout (stream-json) -> AgentEvents.
121    {
122        let registry = registry.clone();
123        let event_sender = event_sender.clone();
124        let terminal_sent = terminal_sent.clone();
125        tokio::spawn(async move {
126            let mut parser = ClaudeStreamJsonParser::default();
127            let mut reader = BufReader::new(stdout).lines();
128
129            while let Ok(Some(line)) = reader.next_line().await {
130                let _ = registry.append_live_output(run_id, &line).await;
131
132                for event in parser.parse_line(&line) {
133                    if matches!(
134                        event,
135                        AgentEvent::Complete { .. } | AgentEvent::Error { .. }
136                    ) {
137                        terminal_sent.store(true, Ordering::SeqCst);
138                    }
139                    let _ = event_sender.send(event);
140                }
141            }
142        });
143    }
144
145    // Stream stderr as plain text into the live output buffer for debugging.
146    {
147        let registry = registry.clone();
148        tokio::spawn(async move {
149            let mut reader = BufReader::new(stderr).lines();
150            while let Ok(Some(line)) = reader.next_line().await {
151                let _ = registry
152                    .append_live_output(run_id, &format!("[stderr] {line}"))
153                    .await;
154            }
155        });
156    }
157
158    // Wait for process completion or cancellation, then emit a terminal event if needed.
159    {
160        let registry = registry.clone();
161        let event_sender = event_sender.clone();
162        let terminal_sent = terminal_sent.clone();
163        let child_arc = child_arc.clone();
164        tokio::spawn(async move {
165            let wait_fut = async {
166                // Take ownership of the child for waiting.
167                let child = {
168                    let mut guard = child_arc.lock().map_err(|e| e.to_string())?;
169                    guard.take()
170                };
171
172                let mut child = match child {
173                    Some(c) => c,
174                    None => {
175                        return Err::<std::process::ExitStatus, String>(
176                            "Child already taken".to_string(),
177                        )
178                    }
179                };
180
181                child
182                    .wait()
183                    .await
184                    .map_err(|e| format!("Failed to wait for Claude process: {e}"))
185            };
186
187            let status = tokio::select! {
188                _ = cancel_token.cancelled() => {
189                    let _ = registry.kill_process(run_id).await;
190                    Err("cancelled".to_string())
191                }
192                s = wait_fut => s,
193            };
194
195            // Ensure we emit a terminal event if Claude didn't provide one.
196            if !terminal_sent.load(Ordering::SeqCst) {
197                match status {
198                    Ok(exit_status) if exit_status.success() => {
199                        let _ = event_sender.send(AgentEvent::Complete {
200                            usage: TokenUsage {
201                                prompt_tokens: 0,
202                                completion_tokens: 0,
203                                total_tokens: 0,
204                            },
205                        });
206                    }
207                    Ok(exit_status) => {
208                        let _ = event_sender.send(AgentEvent::Error {
209                            message: format!("Claude Code CLI exited with status: {exit_status}"),
210                        });
211                    }
212                    Err(e) if e == "cancelled" => {
213                        let _ = event_sender.send(AgentEvent::Error {
214                            message: "Claude Code execution cancelled".to_string(),
215                        });
216                    }
217                    Err(e) => {
218                        let _ = event_sender.send(AgentEvent::Error { message: e });
219                    }
220                }
221            }
222
223            let _ = registry.unregister_process(run_id).await;
224        });
225    }
226
227    Ok(run_id)
228}