Skip to main content

nucel_agent_claude_code/
lib.rs

1//! Claude Code provider — wraps the `claude` CLI as a subprocess.
2//!
3//! Spawns `claude -p ... --output-format stream-json --verbose` and speaks
4//! JSONL stdio. Supports:
5//!
6//! - One-shot and multi-turn queries (single subprocess kept alive per session)
7//! - Cost tracking per session (from `usage` events on the wire)
8//! - Permission mode configuration ([`PermissionMode`] → `--permission-mode`)
9//! - Budget enforcement (`budget_usd` → cancel on overrun)
10//!
11//! [`PermissionMode`]: nucel_agent_core::PermissionMode
12//!
13//! # Minimal example
14//!
15//! ```rust,no_run
16//! use nucel_agent_claude_code::ClaudeCodeExecutor;
17//! use nucel_agent_core::{AgentExecutor, SpawnConfig};
18//! use std::path::Path;
19//!
20//! # async fn run() -> nucel_agent_core::Result<()> {
21//! let executor = ClaudeCodeExecutor::new();
22//! let session = executor.spawn(
23//!     Path::new("/my/repo"),
24//!     "Fix the failing test in src/lib.rs",
25//!     &SpawnConfig {
26//!         model: Some("claude-opus-4-6".into()),
27//!         budget_usd: Some(5.0),
28//!         max_turns: Some(10),
29//!         ..Default::default()
30//!     },
31//! ).await?;
32//!
33//! let resp = session.query("Did CI pass?").await?;
34//! println!("{}", resp.content);
35//! session.close().await?;
36//! # Ok(()) }
37//! ```
38//!
39//! See also: [workspace README](https://github.com/nucel-dev/agent-sdk#readme)
40//! and the runnable example `crates/unified/examples/claude_basic.rs`.
41
42#![cfg_attr(docsrs, feature(doc_cfg))]
43
44mod process;
45mod protocol;
46
47use std::path::Path;
48use std::sync::Arc;
49
50use async_trait::async_trait;
51use tokio::sync::Mutex;
52
53use nucel_agent_core::{
54    AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
55    AvailabilityStatus, EventStream, ExecutorType, MessageEvent, Result, SessionImpl, SpawnConfig,
56};
57use std::time::Duration;
58
59use process::ClaudeProcess;
60
61/// Claude Code executor — spawns `claude` CLI subprocess.
62pub struct ClaudeCodeExecutor {
63    api_key: Option<String>,
64}
65
66impl ClaudeCodeExecutor {
67    pub fn new() -> Self {
68        Self { api_key: None }
69    }
70
71    pub fn with_api_key(api_key: impl Into<String>) -> Self {
72        Self {
73            api_key: Some(api_key.into()),
74        }
75    }
76
77    fn check_cli_available() -> bool {
78        std::process::Command::new("which")
79            .arg("claude")
80            .stdout(std::process::Stdio::null())
81            .stderr(std::process::Stdio::null())
82            .status()
83            .map(|s| s.success())
84            .unwrap_or(false)
85    }
86}
87
88impl Default for ClaudeCodeExecutor {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94/// Internal session implementation for Claude Code.
95struct ClaudeSessionImpl {
96    process: Arc<Mutex<ClaudeProcess>>,
97    cost: Arc<std::sync::Mutex<AgentCost>>,
98    budget: f64,
99}
100
101#[async_trait]
102impl SessionImpl for ClaudeSessionImpl {
103    async fn query(&self, prompt: &str) -> Result<AgentResponse> {
104        // Budget guard.
105        {
106            let c = self.cost.lock().unwrap();
107            if c.total_usd >= self.budget {
108                return Err(AgentError::BudgetExceeded {
109                    limit: self.budget,
110                    spent: c.total_usd,
111                });
112            }
113        }
114
115        let mut proc = self.process.lock().await;
116        proc.send_query(prompt).await?;
117        let resp = proc.read_response(self.budget).await?;
118
119        {
120            let mut c = self.cost.lock().unwrap();
121            c.input_tokens += resp.cost.input_tokens;
122            c.output_tokens += resp.cost.output_tokens;
123            c.total_usd += resp.cost.total_usd;
124        }
125
126        Ok(resp)
127    }
128
129    async fn query_stream(&self, prompt: &str) -> Result<EventStream> {
130        // Budget guard up-front; the stream itself also checks.
131        {
132            let c = self.cost.lock().unwrap();
133            if c.total_usd >= self.budget {
134                return Err(AgentError::BudgetExceeded {
135                    limit: self.budget,
136                    spent: c.total_usd,
137                });
138            }
139        }
140
141        let process = self.process.clone();
142        let cost_handle = self.cost.clone();
143        let budget = self.budget;
144        let prompt_owned = prompt.to_string();
145
146        let (tx, rx) = tokio::sync::mpsc::channel::<Result<MessageEvent>>(64);
147
148        tokio::spawn(async move {
149            let mut proc = process.lock().await;
150            if let Err(e) = proc.send_query(&prompt_owned).await {
151                let _ = tx.send(Err(e)).await;
152                return;
153            }
154            let stderr_buf = proc.stderr_buf.clone();
155
156            let timeout = Duration::from_secs(600);
157            let mut input_tokens = 0_u64;
158            let mut output_tokens = 0_u64;
159            let mut cache_read = 0_u64;
160            let mut cache_creation = 0_u64;
161            let mut total_cost_usd = 0.0_f64;
162            let mut saw_terminal = false;
163
164            let res = tokio::time::timeout(timeout, async {
165                use tokio::io::AsyncBufReadExt;
166                let mut line = String::new();
167                loop {
168                    line.clear();
169                    let n = match proc.stdout_reader.read_line(&mut line).await {
170                        Ok(n) => n,
171                        Err(e) => {
172                            let _ = tx.send(Err(AgentError::Io(e))).await;
173                            return;
174                        }
175                    };
176                    if n == 0 { break; }
177                    let trimmed = line.trim();
178                    if trimmed.is_empty() { continue; }
179
180                    let v: serde_json::Value = match serde_json::from_str(trimmed) {
181                        Ok(v) => v,
182                        Err(_) => continue,
183                    };
184                    let msg_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
185                    match msg_type {
186                        "assistant" => {
187                            let blocks = v["message"]["content"].as_array().cloned().unwrap_or_default();
188                            for block in &blocks {
189                                let bt = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
190                                match bt {
191                                    "text" => {
192                                        if let Some(t) = block.get("text").and_then(|t| t.as_str()) {
193                                            let _ = tx.send(Ok(MessageEvent::TextChunk { text: t.to_string() })).await;
194                                        }
195                                    }
196                                    "tool_use" => {
197                                        let id = block.get("id").and_then(|s| s.as_str()).unwrap_or("").to_string();
198                                        let name = block.get("name").and_then(|s| s.as_str()).unwrap_or("").to_string();
199                                        let input = block.get("input").cloned().unwrap_or(serde_json::Value::Null);
200                                        let _ = tx.send(Ok(MessageEvent::ToolUse { id, name, input })).await;
201                                    }
202                                    "thinking" => {
203                                        let text = block.get("thinking").and_then(|t| t.as_str()).unwrap_or("").to_string();
204                                        let _ = tx.send(Ok(MessageEvent::Thinking { text })).await;
205                                    }
206                                    _ => {}
207                                }
208                            }
209                            if let Some(u) = v["message"].get("usage") {
210                                input_tokens += u.get("input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
211                                output_tokens += u.get("output_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
212                                cache_read += u.get("cache_read_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
213                                cache_creation += u.get("cache_creation_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
214                            }
215                        }
216                        "user" => {
217                            let blocks = v["message"]["content"].as_array().cloned().unwrap_or_default();
218                            for block in &blocks {
219                                if block.get("type").and_then(|t| t.as_str()) == Some("tool_result") {
220                                    let id = block.get("tool_use_id").and_then(|s| s.as_str()).unwrap_or("").to_string();
221                                    let is_error = block.get("is_error").and_then(|e| e.as_bool()).unwrap_or(false);
222                                    let output = block.get("content").and_then(|c| c.as_str()).map(String::from)
223                                        .or_else(|| block.get("content").map(|c| c.to_string()))
224                                        .unwrap_or_default();
225                                    let _ = tx.send(Ok(MessageEvent::ToolResult { tool_use_id: id, success: !is_error, output })).await;
226                                }
227                            }
228                        }
229                        "rate_limit_event" => {
230                            let _ = tx.send(Ok(MessageEvent::RateLimit { message: "rate limit event".into() })).await;
231                        }
232                        "result" => {
233                            let result_text = v.get("result").and_then(|r| r.as_str()).unwrap_or("").to_string();
234                            let is_error = v.get("is_error").and_then(|e| e.as_bool()).unwrap_or(false);
235                            total_cost_usd = v.get("total_cost_usd").and_then(|c| c.as_f64()).unwrap_or(total_cost_usd);
236                            if let Some(u) = v.get("usage") {
237                                input_tokens = u.get("input_tokens").and_then(|x| x.as_u64()).unwrap_or(input_tokens);
238                                output_tokens = u.get("output_tokens").and_then(|x| x.as_u64()).unwrap_or(output_tokens);
239                                let crd = u.get("cache_read_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
240                                let ccr = u.get("cache_creation_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
241                                if crd > 0 { cache_read = crd; }
242                                if ccr > 0 { cache_creation = ccr; }
243                            }
244                            let cost = AgentCost {
245                                input_tokens,
246                                output_tokens,
247                                cache_read_tokens: cache_read,
248                                cache_creation_tokens: cache_creation,
249                                total_usd: total_cost_usd,
250                            };
251                            {
252                                let mut c = cost_handle.lock().unwrap();
253                                c.input_tokens += cost.input_tokens;
254                                c.output_tokens += cost.output_tokens;
255                                c.cache_read_tokens += cost.cache_read_tokens;
256                                c.cache_creation_tokens += cost.cache_creation_tokens;
257                                c.total_usd += cost.total_usd;
258                            }
259                            if total_cost_usd > budget {
260                                let _ = tx.send(Err(AgentError::BudgetExceeded { limit: budget, spent: total_cost_usd })).await;
261                                saw_terminal = true;
262                                return;
263                            }
264                            let _ = tx.send(Ok(MessageEvent::ResultDone { cost, content: result_text, is_error })).await;
265                            saw_terminal = true;
266                            return;
267                        }
268                        _ => {}
269                    }
270                }
271            }).await;
272
273            if res.is_err() {
274                let tail = stderr_buf.lock().await.clone();
275                let msg = if tail.is_empty() {
276                    "stream timed out".to_string()
277                } else {
278                    format!("stream timed out (stderr: {})", tail.trim())
279                };
280                let _ = tx.send(Err(AgentError::Provider { provider: "claude-code".into(), message: msg })).await;
281            } else if !saw_terminal {
282                let _ = tx.send(Err(AgentError::StreamInterrupted("claude stream ended without result".into()))).await;
283            }
284        });
285
286        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
287        Ok(Box::pin(stream))
288    }
289
290    async fn total_cost(&self) -> Result<AgentCost> {
291        Ok(self.cost.lock().unwrap().clone())
292    }
293
294    async fn close(&self) -> Result<()> {
295        let mut proc = self.process.lock().await;
296        proc.shutdown().await
297    }
298}
299
300#[async_trait]
301impl AgentExecutor for ClaudeCodeExecutor {
302    fn executor_type(&self) -> ExecutorType {
303        ExecutorType::ClaudeCode
304    }
305
306    async fn spawn(
307        &self,
308        working_dir: &Path,
309        prompt: &str,
310        config: &SpawnConfig,
311    ) -> Result<AgentSession> {
312        let cost = Arc::new(std::sync::Mutex::new(AgentCost::default()));
313        let budget = config.budget_usd.unwrap_or(f64::MAX);
314
315        if budget <= 0.0 {
316            return Err(AgentError::BudgetExceeded {
317                limit: budget,
318                spent: 0.0,
319            });
320        }
321
322        let mut proc = ClaudeProcess::start(
323            working_dir,
324            prompt,
325            config,
326            self.api_key.as_deref(),
327        )
328        .await?;
329
330        // Capture the pre-minted session id before the read may consume `proc`.
331        let session_id = proc.session_id().to_string();
332
333        let response = proc.read_response(budget).await?;
334
335        {
336            let mut c = cost.lock().unwrap();
337            *c = response.cost.clone();
338        }
339
340        let inner = Arc::new(ClaudeSessionImpl {
341            process: Arc::new(Mutex::new(proc)),
342            cost: cost.clone(),
343            budget,
344        });
345
346        Ok(AgentSession::new(
347            session_id,
348            ExecutorType::ClaudeCode,
349            working_dir.to_path_buf(),
350            config.model.clone(),
351            inner,
352        ))
353    }
354
355    async fn resume(
356        &self,
357        working_dir: &Path,
358        session_id: &str,
359        prompt: &str,
360        config: &SpawnConfig,
361    ) -> Result<AgentSession> {
362        let cost = Arc::new(std::sync::Mutex::new(AgentCost::default()));
363        let budget = config.budget_usd.unwrap_or(f64::MAX);
364
365        if budget <= 0.0 {
366            return Err(AgentError::BudgetExceeded {
367                limit: budget,
368                spent: 0.0,
369            });
370        }
371
372        // Use official --resume <session_id> CLI flag. The resume keeps the
373        // original session id so consumers can keep resuming.
374        let mut proc = ClaudeProcess::start_resume(
375            working_dir,
376            session_id,
377            prompt,
378            config,
379            self.api_key.as_deref(),
380        )
381        .await?;
382
383        let resumed_session_id = proc.session_id().to_string();
384        let response = proc.read_response(budget).await?;
385
386        {
387            let mut c = cost.lock().unwrap();
388            *c = response.cost.clone();
389        }
390
391        let inner = Arc::new(ClaudeSessionImpl {
392            process: Arc::new(Mutex::new(proc)),
393            cost: cost.clone(),
394            budget,
395        });
396
397        Ok(AgentSession::new(
398            resumed_session_id,
399            ExecutorType::ClaudeCode,
400            working_dir.to_path_buf(),
401            config.model.clone(),
402            inner,
403        ))
404    }
405
406    fn capabilities(&self) -> AgentCapabilities {
407        AgentCapabilities {
408            session_resume: true,
409            token_usage: true,
410            mcp_support: true,
411            autonomous_mode: true,
412            structured_output: false,
413            streaming: true,
414            hooks: true,
415            prompt_caching: true,
416            extended_thinking: true,
417        }
418    }
419
420    fn availability(&self) -> AvailabilityStatus {
421        if Self::check_cli_available() {
422            AvailabilityStatus {
423                available: true,
424                reason: None,
425            }
426        } else {
427            AvailabilityStatus {
428                available: false,
429                reason: Some(
430                    "`claude` CLI not found. Install: npm install -g @anthropic-ai/claude-code"
431                        .to_string(),
432                ),
433            }
434        }
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441
442    #[test]
443    fn executor_type_is_claude_code() {
444        let exec = ClaudeCodeExecutor::new();
445        assert_eq!(exec.executor_type(), ExecutorType::ClaudeCode);
446    }
447
448    #[test]
449    fn capabilities_declares_autonomous_mode() {
450        let exec = ClaudeCodeExecutor::new();
451        let caps = exec.capabilities();
452        assert!(caps.autonomous_mode);
453        assert!(caps.token_usage);
454        assert!(caps.mcp_support);
455        assert!(caps.session_resume, "Claude Code supports --resume flag");
456    }
457
458    #[tokio::test]
459    async fn budget_zero_returns_error_before_spawn() {
460        let exec = ClaudeCodeExecutor::new();
461        let result = exec
462            .spawn(
463                Path::new("/tmp"),
464                "test",
465                &SpawnConfig {
466                    budget_usd: Some(0.0),
467                    ..Default::default()
468                },
469            )
470            .await;
471        assert!(matches!(result, Err(AgentError::BudgetExceeded { .. })));
472    }
473}