1#![cfg_attr(docsrs, feature(doc_cfg))]
43
44mod process;
45mod protocol;
46
47use std::path::Path;
48use std::sync::Arc;
49
50use async_trait::async_trait;
51use tokio::sync::Mutex;
52
53use nucel_agent_core::{
54 AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
55 AvailabilityStatus, EventStream, ExecutorType, MessageEvent, Result, SessionImpl, SpawnConfig,
56};
57use std::time::Duration;
58
59use process::ClaudeProcess;
60
61pub struct ClaudeCodeExecutor {
63 api_key: Option<String>,
64}
65
66impl ClaudeCodeExecutor {
67 pub fn new() -> Self {
68 Self { api_key: None }
69 }
70
71 pub fn with_api_key(api_key: impl Into<String>) -> Self {
72 Self {
73 api_key: Some(api_key.into()),
74 }
75 }
76
77 fn check_cli_available() -> bool {
78 std::process::Command::new("which")
79 .arg("claude")
80 .stdout(std::process::Stdio::null())
81 .stderr(std::process::Stdio::null())
82 .status()
83 .map(|s| s.success())
84 .unwrap_or(false)
85 }
86}
87
88impl Default for ClaudeCodeExecutor {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94struct ClaudeSessionImpl {
96 process: Arc<Mutex<ClaudeProcess>>,
97 cost: Arc<std::sync::Mutex<AgentCost>>,
98 budget: f64,
99}
100
101#[async_trait]
102impl SessionImpl for ClaudeSessionImpl {
103 async fn query(&self, prompt: &str) -> Result<AgentResponse> {
104 {
106 let c = self.cost.lock().unwrap();
107 if c.total_usd >= self.budget {
108 return Err(AgentError::BudgetExceeded {
109 limit: self.budget,
110 spent: c.total_usd,
111 });
112 }
113 }
114
115 let mut proc = self.process.lock().await;
116 proc.send_query(prompt).await?;
117 let resp = proc.read_response(self.budget).await?;
118
119 {
120 let mut c = self.cost.lock().unwrap();
121 c.input_tokens += resp.cost.input_tokens;
122 c.output_tokens += resp.cost.output_tokens;
123 c.total_usd += resp.cost.total_usd;
124 }
125
126 Ok(resp)
127 }
128
129 async fn query_stream(&self, prompt: &str) -> Result<EventStream> {
130 {
132 let c = self.cost.lock().unwrap();
133 if c.total_usd >= self.budget {
134 return Err(AgentError::BudgetExceeded {
135 limit: self.budget,
136 spent: c.total_usd,
137 });
138 }
139 }
140
141 let process = self.process.clone();
142 let cost_handle = self.cost.clone();
143 let budget = self.budget;
144 let prompt_owned = prompt.to_string();
145
146 let (tx, rx) = tokio::sync::mpsc::channel::<Result<MessageEvent>>(64);
147
148 tokio::spawn(async move {
149 let mut proc = process.lock().await;
150 if let Err(e) = proc.send_query(&prompt_owned).await {
151 let _ = tx.send(Err(e)).await;
152 return;
153 }
154 let stderr_buf = proc.stderr_buf.clone();
155
156 let timeout = Duration::from_secs(600);
157 let mut input_tokens = 0_u64;
158 let mut output_tokens = 0_u64;
159 let mut cache_read = 0_u64;
160 let mut cache_creation = 0_u64;
161 let mut total_cost_usd = 0.0_f64;
162 let mut saw_terminal = false;
163
164 let res = tokio::time::timeout(timeout, async {
165 use tokio::io::AsyncBufReadExt;
166 let mut line = String::new();
167 loop {
168 line.clear();
169 let n = match proc.stdout_reader.read_line(&mut line).await {
170 Ok(n) => n,
171 Err(e) => {
172 let _ = tx.send(Err(AgentError::Io(e))).await;
173 return;
174 }
175 };
176 if n == 0 { break; }
177 let trimmed = line.trim();
178 if trimmed.is_empty() { continue; }
179
180 let v: serde_json::Value = match serde_json::from_str(trimmed) {
181 Ok(v) => v,
182 Err(_) => continue,
183 };
184 let msg_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
185 match msg_type {
186 "assistant" => {
187 let blocks = v["message"]["content"].as_array().cloned().unwrap_or_default();
188 for block in &blocks {
189 let bt = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
190 match bt {
191 "text" => {
192 if let Some(t) = block.get("text").and_then(|t| t.as_str()) {
193 let _ = tx.send(Ok(MessageEvent::TextChunk { text: t.to_string() })).await;
194 }
195 }
196 "tool_use" => {
197 let id = block.get("id").and_then(|s| s.as_str()).unwrap_or("").to_string();
198 let name = block.get("name").and_then(|s| s.as_str()).unwrap_or("").to_string();
199 let input = block.get("input").cloned().unwrap_or(serde_json::Value::Null);
200 let _ = tx.send(Ok(MessageEvent::ToolUse { id, name, input })).await;
201 }
202 "thinking" => {
203 let text = block.get("thinking").and_then(|t| t.as_str()).unwrap_or("").to_string();
204 let _ = tx.send(Ok(MessageEvent::Thinking { text })).await;
205 }
206 _ => {}
207 }
208 }
209 if let Some(u) = v["message"].get("usage") {
210 input_tokens += u.get("input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
211 output_tokens += u.get("output_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
212 cache_read += u.get("cache_read_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
213 cache_creation += u.get("cache_creation_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
214 }
215 }
216 "user" => {
217 let blocks = v["message"]["content"].as_array().cloned().unwrap_or_default();
218 for block in &blocks {
219 if block.get("type").and_then(|t| t.as_str()) == Some("tool_result") {
220 let id = block.get("tool_use_id").and_then(|s| s.as_str()).unwrap_or("").to_string();
221 let is_error = block.get("is_error").and_then(|e| e.as_bool()).unwrap_or(false);
222 let output = block.get("content").and_then(|c| c.as_str()).map(String::from)
223 .or_else(|| block.get("content").map(|c| c.to_string()))
224 .unwrap_or_default();
225 let _ = tx.send(Ok(MessageEvent::ToolResult { tool_use_id: id, success: !is_error, output })).await;
226 }
227 }
228 }
229 "rate_limit_event" => {
230 let _ = tx.send(Ok(MessageEvent::RateLimit { message: "rate limit event".into() })).await;
231 }
232 "result" => {
233 let result_text = v.get("result").and_then(|r| r.as_str()).unwrap_or("").to_string();
234 let is_error = v.get("is_error").and_then(|e| e.as_bool()).unwrap_or(false);
235 total_cost_usd = v.get("total_cost_usd").and_then(|c| c.as_f64()).unwrap_or(total_cost_usd);
236 if let Some(u) = v.get("usage") {
237 input_tokens = u.get("input_tokens").and_then(|x| x.as_u64()).unwrap_or(input_tokens);
238 output_tokens = u.get("output_tokens").and_then(|x| x.as_u64()).unwrap_or(output_tokens);
239 let crd = u.get("cache_read_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
240 let ccr = u.get("cache_creation_input_tokens").and_then(|x| x.as_u64()).unwrap_or(0);
241 if crd > 0 { cache_read = crd; }
242 if ccr > 0 { cache_creation = ccr; }
243 }
244 let cost = AgentCost {
245 input_tokens,
246 output_tokens,
247 cache_read_tokens: cache_read,
248 cache_creation_tokens: cache_creation,
249 total_usd: total_cost_usd,
250 };
251 {
252 let mut c = cost_handle.lock().unwrap();
253 c.input_tokens += cost.input_tokens;
254 c.output_tokens += cost.output_tokens;
255 c.cache_read_tokens += cost.cache_read_tokens;
256 c.cache_creation_tokens += cost.cache_creation_tokens;
257 c.total_usd += cost.total_usd;
258 }
259 if total_cost_usd > budget {
260 let _ = tx.send(Err(AgentError::BudgetExceeded { limit: budget, spent: total_cost_usd })).await;
261 saw_terminal = true;
262 return;
263 }
264 let _ = tx.send(Ok(MessageEvent::ResultDone { cost, content: result_text, is_error })).await;
265 saw_terminal = true;
266 return;
267 }
268 _ => {}
269 }
270 }
271 }).await;
272
273 if res.is_err() {
274 let tail = stderr_buf.lock().await.clone();
275 let msg = if tail.is_empty() {
276 "stream timed out".to_string()
277 } else {
278 format!("stream timed out (stderr: {})", tail.trim())
279 };
280 let _ = tx.send(Err(AgentError::Provider { provider: "claude-code".into(), message: msg })).await;
281 } else if !saw_terminal {
282 let _ = tx.send(Err(AgentError::StreamInterrupted("claude stream ended without result".into()))).await;
283 }
284 });
285
286 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
287 Ok(Box::pin(stream))
288 }
289
290 async fn total_cost(&self) -> Result<AgentCost> {
291 Ok(self.cost.lock().unwrap().clone())
292 }
293
294 async fn close(&self) -> Result<()> {
295 let mut proc = self.process.lock().await;
296 proc.shutdown().await
297 }
298}
299
300#[async_trait]
301impl AgentExecutor for ClaudeCodeExecutor {
302 fn executor_type(&self) -> ExecutorType {
303 ExecutorType::ClaudeCode
304 }
305
306 async fn spawn(
307 &self,
308 working_dir: &Path,
309 prompt: &str,
310 config: &SpawnConfig,
311 ) -> Result<AgentSession> {
312 let cost = Arc::new(std::sync::Mutex::new(AgentCost::default()));
313 let budget = config.budget_usd.unwrap_or(f64::MAX);
314
315 if budget <= 0.0 {
316 return Err(AgentError::BudgetExceeded {
317 limit: budget,
318 spent: 0.0,
319 });
320 }
321
322 let mut proc = ClaudeProcess::start(
323 working_dir,
324 prompt,
325 config,
326 self.api_key.as_deref(),
327 )
328 .await?;
329
330 let session_id = proc.session_id().to_string();
332
333 let response = proc.read_response(budget).await?;
334
335 {
336 let mut c = cost.lock().unwrap();
337 *c = response.cost.clone();
338 }
339
340 let inner = Arc::new(ClaudeSessionImpl {
341 process: Arc::new(Mutex::new(proc)),
342 cost: cost.clone(),
343 budget,
344 });
345
346 Ok(AgentSession::new(
347 session_id,
348 ExecutorType::ClaudeCode,
349 working_dir.to_path_buf(),
350 config.model.clone(),
351 inner,
352 ))
353 }
354
355 async fn resume(
356 &self,
357 working_dir: &Path,
358 session_id: &str,
359 prompt: &str,
360 config: &SpawnConfig,
361 ) -> Result<AgentSession> {
362 let cost = Arc::new(std::sync::Mutex::new(AgentCost::default()));
363 let budget = config.budget_usd.unwrap_or(f64::MAX);
364
365 if budget <= 0.0 {
366 return Err(AgentError::BudgetExceeded {
367 limit: budget,
368 spent: 0.0,
369 });
370 }
371
372 let mut proc = ClaudeProcess::start_resume(
375 working_dir,
376 session_id,
377 prompt,
378 config,
379 self.api_key.as_deref(),
380 )
381 .await?;
382
383 let resumed_session_id = proc.session_id().to_string();
384 let response = proc.read_response(budget).await?;
385
386 {
387 let mut c = cost.lock().unwrap();
388 *c = response.cost.clone();
389 }
390
391 let inner = Arc::new(ClaudeSessionImpl {
392 process: Arc::new(Mutex::new(proc)),
393 cost: cost.clone(),
394 budget,
395 });
396
397 Ok(AgentSession::new(
398 resumed_session_id,
399 ExecutorType::ClaudeCode,
400 working_dir.to_path_buf(),
401 config.model.clone(),
402 inner,
403 ))
404 }
405
406 fn capabilities(&self) -> AgentCapabilities {
407 AgentCapabilities {
408 session_resume: true,
409 token_usage: true,
410 mcp_support: true,
411 autonomous_mode: true,
412 structured_output: false,
413 streaming: true,
414 hooks: true,
415 prompt_caching: true,
416 extended_thinking: true,
417 }
418 }
419
420 fn availability(&self) -> AvailabilityStatus {
421 if Self::check_cli_available() {
422 AvailabilityStatus {
423 available: true,
424 reason: None,
425 }
426 } else {
427 AvailabilityStatus {
428 available: false,
429 reason: Some(
430 "`claude` CLI not found. Install: npm install -g @anthropic-ai/claude-code"
431 .to_string(),
432 ),
433 }
434 }
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 #[test]
443 fn executor_type_is_claude_code() {
444 let exec = ClaudeCodeExecutor::new();
445 assert_eq!(exec.executor_type(), ExecutorType::ClaudeCode);
446 }
447
448 #[test]
449 fn capabilities_declares_autonomous_mode() {
450 let exec = ClaudeCodeExecutor::new();
451 let caps = exec.capabilities();
452 assert!(caps.autonomous_mode);
453 assert!(caps.token_usage);
454 assert!(caps.mcp_support);
455 assert!(caps.session_resume, "Claude Code supports --resume flag");
456 }
457
458 #[tokio::test]
459 async fn budget_zero_returns_error_before_spawn() {
460 let exec = ClaudeCodeExecutor::new();
461 let result = exec
462 .spawn(
463 Path::new("/tmp"),
464 "test",
465 &SpawnConfig {
466 budget_usd: Some(0.0),
467 ..Default::default()
468 },
469 )
470 .await;
471 assert!(matches!(result, Err(AgentError::BudgetExceeded { .. })));
472 }
473}