Skip to main content

bamboo_agent/claude/
runner.rs

1//! Claude Code CLI runner (streaming).
2//!
3//! This module spawns `claude` with `--output-format stream-json`, converts the
4//! output stream into Bamboo `AgentEvent`s, and registers the process in
5//! `ProcessRegistry` for cancellation and monitoring.
6
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::sync::{
10    atomic::{AtomicBool, Ordering},
11    Arc, Mutex,
12};
13
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::sync::broadcast;
16use tokio_util::sync::CancellationToken;
17
18use crate::agent::core::{AgentEvent, TokenUsage};
19use crate::process::ProcessRegistry;
20
21use super::command::create_tokio_command_with_env;
22use super::stream_json::ClaudeStreamJsonParser;
23
24#[derive(Debug, Clone)]
25pub struct ClaudeCodeCliConfig {
26    pub claude_path: String,
27    pub project_path: PathBuf,
28    pub prompt: String,
29    /// A UUID string. Claude Code requires session ids to be valid UUIDs.
30    pub session_id: String,
31    /// e.g. `http://localhost:8080/anthropic` (Claude will append `/v1/...`)
32    pub anthropic_base_url: String,
33    /// Optional structured output schema (passed as `--json-schema`).
34    pub json_schema: Option<String>,
35    /// If true, pass `--dangerously-skip-permissions`.
36    pub skip_permissions: bool,
37    /// If true, pass `--include-partial-messages` to improve streaming UX.
38    pub include_partial_messages: bool,
39}
40
41/// Spawn Claude Code CLI and stream its output as `AgentEvent`s.
42///
43/// Returns the registered `run_id` from the `ProcessRegistry`.
44pub async fn spawn_claude_code_cli(
45    registry: Arc<ProcessRegistry>,
46    event_sender: broadcast::Sender<AgentEvent>,
47    cancel_token: CancellationToken,
48    config: ClaudeCodeCliConfig,
49) -> Result<i64, String> {
50    let mut cmd = create_tokio_command_with_env(&config.claude_path);
51
52    cmd.current_dir(&config.project_path);
53    cmd.env("ANTHROPIC_BASE_URL", &config.anthropic_base_url);
54
55    // Non-interactive prompt mode.
56    cmd.arg("-p").arg(&config.prompt);
57    cmd.arg("--output-format").arg("stream-json");
58    // Claude Code requires `--verbose` when using `--print/-p` with `--output-format=stream-json`.
59    cmd.arg("--verbose");
60
61    if config.include_partial_messages {
62        cmd.arg("--include-partial-messages");
63    }
64    if config.skip_permissions {
65        cmd.arg("--dangerously-skip-permissions");
66    }
67    cmd.arg("--session-id").arg(&config.session_id);
68
69    if let Some(schema) = &config.json_schema {
70        cmd.arg("--json-schema").arg(schema);
71    }
72
73    cmd.stdout(Stdio::piped());
74    cmd.stderr(Stdio::piped());
75
76    let mut child = cmd
77        .spawn()
78        .map_err(|e| format!("Failed to spawn Claude Code CLI: {e}"))?;
79
80    let pid = child.id().unwrap_or(0);
81    let stdout = child
82        .stdout
83        .take()
84        .ok_or_else(|| "Failed to capture Claude stdout".to_string())?;
85    let stderr = child
86        .stderr
87        .take()
88        .ok_or_else(|| "Failed to capture Claude stderr".to_string())?;
89
90    let child_arc = Arc::new(Mutex::new(Some(child)));
91
92    let run_id = registry
93        .register_claude_session(
94            config.session_id.clone(),
95            pid,
96            config.project_path.to_string_lossy().to_string(),
97            config.prompt.clone(),
98            "claude-code-cli".to_string(),
99            child_arc.clone(),
100        )
101        .await?;
102
103    let terminal_sent = Arc::new(AtomicBool::new(false));
104
105    // Stream stdout (stream-json) -> AgentEvents.
106    {
107        let registry = registry.clone();
108        let event_sender = event_sender.clone();
109        let terminal_sent = terminal_sent.clone();
110        tokio::spawn(async move {
111            let mut parser = ClaudeStreamJsonParser::default();
112            let mut reader = BufReader::new(stdout).lines();
113
114            while let Ok(Some(line)) = reader.next_line().await {
115                let _ = registry.append_live_output(run_id, &line).await;
116
117                for event in parser.parse_line(&line) {
118                    if matches!(
119                        event,
120                        AgentEvent::Complete { .. } | AgentEvent::Error { .. }
121                    ) {
122                        terminal_sent.store(true, Ordering::SeqCst);
123                    }
124                    let _ = event_sender.send(event);
125                }
126            }
127        });
128    }
129
130    // Stream stderr as plain text into the live output buffer for debugging.
131    {
132        let registry = registry.clone();
133        tokio::spawn(async move {
134            let mut reader = BufReader::new(stderr).lines();
135            while let Ok(Some(line)) = reader.next_line().await {
136                let _ = registry
137                    .append_live_output(run_id, &format!("[stderr] {line}"))
138                    .await;
139            }
140        });
141    }
142
143    // Wait for process completion or cancellation, then emit a terminal event if needed.
144    {
145        let registry = registry.clone();
146        let event_sender = event_sender.clone();
147        let terminal_sent = terminal_sent.clone();
148        let child_arc = child_arc.clone();
149        tokio::spawn(async move {
150            let wait_fut = async {
151                // Take ownership of the child for waiting.
152                let child = {
153                    let mut guard = child_arc.lock().map_err(|e| e.to_string())?;
154                    guard.take()
155                };
156
157                let mut child = match child {
158                    Some(c) => c,
159                    None => {
160                        return Err::<std::process::ExitStatus, String>(
161                            "Child already taken".to_string(),
162                        )
163                    }
164                };
165
166                child
167                    .wait()
168                    .await
169                    .map_err(|e| format!("Failed to wait for Claude process: {e}"))
170            };
171
172            let status = tokio::select! {
173                _ = cancel_token.cancelled() => {
174                    let _ = registry.kill_process(run_id).await;
175                    Err("cancelled".to_string())
176                }
177                s = wait_fut => s.map_err(|e| e),
178            };
179
180            // Ensure we emit a terminal event if Claude didn't provide one.
181            if !terminal_sent.load(Ordering::SeqCst) {
182                match status {
183                    Ok(exit_status) if exit_status.success() => {
184                        let _ = event_sender.send(AgentEvent::Complete {
185                            usage: TokenUsage {
186                                prompt_tokens: 0,
187                                completion_tokens: 0,
188                                total_tokens: 0,
189                            },
190                        });
191                    }
192                    Ok(exit_status) => {
193                        let _ = event_sender.send(AgentEvent::Error {
194                            message: format!("Claude Code CLI exited with status: {exit_status}"),
195                        });
196                    }
197                    Err(e) if e == "cancelled" => {
198                        let _ = event_sender.send(AgentEvent::Error {
199                            message: "Claude Code execution cancelled".to_string(),
200                        });
201                    }
202                    Err(e) => {
203                        let _ = event_sender.send(AgentEvent::Error { message: e });
204                    }
205                }
206            }
207
208            let _ = registry.unregister_process(run_id).await;
209        });
210    }
211
212    Ok(run_id)
213}