1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::process::Stdio;
5
6use anyhow::Context;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::task::JoinHandle;
12
13use tokio_stream::Stream;
14
15use crate::ai::error::AiError;
16use crate::ai::model::Model;
17use crate::ai::provider::AiProvider;
18use crate::ai::tweaks::ModelTweaks;
19use crate::ai::types::*;
20use crate::settings::config::ToolCallStyle;
21
22#[derive(Clone)]
24pub struct ClaudeCodeProvider {
25 command: PathBuf,
26 additional_args: Vec<String>,
27 env: HashMap<String, String>,
28}
29
30impl ClaudeCodeProvider {
31 fn default_model_mappings() -> HashMap<Model, String> {
33 let mut mappings = HashMap::new();
34 mappings.insert(
35 Model::ClaudeSonnet45,
36 "claude-sonnet-4-5-20250929".to_string(),
37 );
38 mappings.insert(
39 Model::ClaudeHaiku45,
40 "claude-haiku-4-5-20251001".to_string(),
41 );
42 mappings.insert(Model::ClaudeOpus46, "claude-opus-4-6".to_string());
43 mappings.insert(Model::ClaudeOpus45, "claude-opus-4-5-20251101".to_string());
44 mappings
45 }
46
47 pub fn new(
48 command: PathBuf,
49 additional_args: Vec<String>,
50 env: HashMap<String, String>,
51 ) -> Self {
52 Self {
53 command,
54 additional_args,
55 env,
56 }
57 }
58
59 fn resolve_model(&self, requested: &Model) -> String {
60 Self::default_model_mappings()
61 .get(requested)
62 .cloned()
63 .unwrap_or_else(|| "claude-sonnet-4-5-20250929".to_string())
64 }
65
66 fn format_system_prompt(&self, user_prompt: &str) -> Option<String> {
67 let user_prompt = user_prompt.trim();
68 if user_prompt.is_empty() {
69 return None;
70 }
71 Some(user_prompt.to_string())
72 }
73
74 fn build_messages(&self, messages: &[Message]) -> Result<Vec<ClaudeMessage>, AiError> {
75 let mut converted = Vec::new();
76
77 for message in messages {
78 let role = match message.role {
79 MessageRole::User => "user",
80 MessageRole::Assistant => "assistant",
81 };
82
83 let mut content = Vec::new();
84 for block in message.content.blocks() {
85 match block {
86 ContentBlock::Text(text) => {
87 if !text.trim().is_empty() {
88 content.push(ClaudeContentBlock::Text {
89 text: text.trim().to_string(),
90 });
91 }
92 }
93 ContentBlock::ReasoningContent(reasoning) => {
94 if !reasoning.text.trim().is_empty() {
95 content.push(ClaudeContentBlock::Thinking {
96 text: reasoning.text.trim().to_string(),
97 });
98 }
99 }
100 ContentBlock::ToolUse(tool_use) => {
101 content.push(ClaudeContentBlock::ToolUse {
102 id: tool_use.id.clone(),
103 name: tool_use.name.clone(),
104 input: tool_use.arguments.clone(),
105 });
106 }
107 ContentBlock::ToolResult(tool_result) => {
108 if !tool_result.content.trim().is_empty() {
109 content.push(ClaudeContentBlock::ToolResult {
110 tool_use_id: tool_result.tool_use_id.clone(),
111 is_error: tool_result.is_error.then_some(true),
112 content: vec![ClaudeToolResultContent::OutputText {
113 text: tool_result.content.trim().to_string(),
114 }],
115 });
116 }
117 }
118 ContentBlock::Image(image) => {
119 content.push(ClaudeContentBlock::Image {
120 source: ClaudeImageSource {
121 source_type: "base64".to_string(),
122 media_type: image.media_type.clone(),
123 data: image.data.clone(),
124 },
125 });
126 }
127 }
128 }
129
130 if content.is_empty() {
131 continue;
132 }
133
134 converted.push(ClaudeMessage {
135 role: role.to_string(),
136 content,
137 });
138 }
139
140 Ok(converted)
141 }
142
143 fn build_tools(&self, tools: &[ToolDefinition]) -> Option<Vec<ClaudeToolDefinition>> {
144 if tools.is_empty() {
145 return None;
146 }
147
148 Some(
149 tools
150 .iter()
151 .map(|tool| ClaudeToolDefinition {
152 name: tool.name.clone(),
153 description: tool.description.clone(),
154 input_schema: tool.input_schema.clone(),
155 })
156 .collect(),
157 )
158 }
159
160 fn build_thinking(&self, reasoning_budget: &ReasoningBudget) -> Option<ClaudeThinking> {
161 reasoning_budget
162 .get_max_tokens()
163 .map(|budget_tokens| ClaudeThinking {
164 thinking_type: "enabled".to_string(),
165 budget_tokens,
166 })
167 }
168
169 async fn invoke_cli(
170 &self,
171 messages: &[ClaudeMessage],
172 model: &str,
173 system_prompt: &str,
174 thinking_budget: Option<ClaudeThinking>,
175 tools: Option<Vec<ClaudeToolDefinition>>,
176 max_tokens: Option<u32>,
177 ) -> Result<(Vec<ContentBlock>, TokenUsage, StopReason), AiError> {
178 let mut command = Command::new(&self.command);
179 command.arg("chat").arg("--print").arg("--model").arg(model);
180
181 if !system_prompt.is_empty() {
182 command.arg("--system-prompt").arg(system_prompt);
183 }
184
185 command
186 .arg("--output-format")
187 .arg("stream-json")
188 .arg("--verbose")
189 .arg("--max-turns")
190 .arg("1")
191 .arg("--disallowed-tools")
192 .arg("Bash,Edit,Read,WebSearch,Grep,Glob,Task,Write,NotebookEdit,WebFetch,BashOutput,KillShell,Skill,SlashCommand,TodoWrite,EnterPlanMode,ExitPlanMode,AskUserQuestion,TaskOutput");
193
194 command.env("NO_COLOR", "1");
196
197 if let Some(thinking) = thinking_budget.as_ref() {
202 command.env("MAX_THINKING_TOKENS", thinking.budget_tokens.to_string());
203 }
204
205 for arg in &self.additional_args {
206 command.arg(arg);
207 }
208
209 for (key, value) in &self.env {
210 command.env(key, value);
211 }
212
213 let mut child = command
214 .stdin(Stdio::piped())
215 .stdout(Stdio::piped())
216 .stderr(Stdio::piped())
217 .kill_on_drop(true)
218 .spawn()
219 .map_err(|err| {
220 AiError::Terminal(anyhow::anyhow!(
221 "Failed to spawn Claude Code CLI '{}': {err}",
222 self.command.display()
223 ))
224 })?;
225
226 let mut stdin = child
227 .stdin
228 .take()
229 .ok_or_else(|| AiError::Terminal(anyhow::anyhow!("Claude CLI stdin is unavailable")))?;
230
231 let mut request = serde_json::json!({
232 "model": model,
233 "messages": messages,
234 });
235
236 if let Some(max_tokens) = max_tokens {
237 request["max_tokens"] = serde_json::json!(max_tokens);
238 }
239
240 if let Some(tools) = tools {
241 request["tools"] = serde_json::json!(tools);
242 }
243
244 if let Some(thinking) = &thinking_budget {
245 request["thinking"] = serde_json::json!(thinking);
246 }
247
248 let request_json = serde_json::to_vec(&request).map_err(|err| {
249 AiError::Terminal(anyhow::anyhow!("Failed to serialize request: {err}"))
250 })?;
251
252 tracing::debug!(
253 "Sending to Claude CLI: {}",
254 String::from_utf8_lossy(&request_json)
255 );
256
257 stdin.write_all(&request_json).await.map_err(|err| {
258 AiError::Terminal(anyhow::anyhow!("Failed to write to Claude stdin: {err}"))
259 })?;
260
261 stdin.flush().await.map_err(|err| {
262 AiError::Terminal(anyhow::anyhow!("Failed to flush Claude stdin: {err}"))
263 })?;
264 drop(stdin);
265
266 let stdout = child.stdout.take().ok_or_else(|| {
267 AiError::Terminal(anyhow::anyhow!("Claude CLI stdout is unavailable"))
268 })?;
269 let stderr = child.stderr.take().ok_or_else(|| {
270 AiError::Terminal(anyhow::anyhow!("Claude CLI stderr is unavailable"))
271 })?;
272
273 let mut stdout_reader = BufReader::new(stdout).lines();
274 let stderr_handle: JoinHandle<Result<String, std::io::Error>> = tokio::spawn(async move {
275 let mut buf = String::new();
276 let mut reader = BufReader::new(stderr);
277 reader.read_to_string(&mut buf).await?;
278 Ok(buf)
279 });
280
281 let mut stream_state = StreamState::default();
282
283 while let Some(line) = stdout_reader.next_line().await.map_err(|err| {
284 AiError::Retryable(anyhow::anyhow!("Failed reading Claude CLI stdout: {err}"))
285 })? {
286 if line.trim().is_empty() {
287 continue;
288 }
289
290 tracing::debug!("claude_cli_event" = line);
291
292 let value: Value = serde_json::from_str(&line).map_err(|err| {
293 AiError::Terminal(anyhow::anyhow!(
294 "Failed to parse Claude CLI output as JSON: {err}. Line: {line}"
295 ))
296 })?;
297
298 let event = ParsedEvent::from_value(value).map_err(|err| {
299 AiError::Terminal(anyhow::anyhow!(
300 "Failed to interpret Claude CLI event: {err}"
301 ))
302 })?;
303
304 if stream_state.handle_event(event).map_err(|err| {
305 AiError::Terminal(anyhow::anyhow!(
306 "Error while processing Claude CLI event: {err}"
307 ))
308 })? {
309 break;
310 }
311 }
312
313 let status = child.wait().await.map_err(|err| {
314 AiError::Retryable(anyhow::anyhow!("Failed waiting for Claude CLI: {err}"))
315 })?;
316
317 let stderr_output = match stderr_handle.await {
318 Ok(Ok(text)) => text,
319 Ok(Err(err)) => {
320 tracing::warn!("Failed reading Claude CLI stderr: {err}");
321 String::new()
322 }
323 Err(err) => {
324 tracing::warn!("Failed awaiting Claude CLI stderr: {err}");
325 String::new()
326 }
327 };
328
329 if !status.success() {
330 let error_message = stream_state
331 .error_message
332 .as_deref()
333 .unwrap_or_else(|| stderr_output.trim());
334
335 if error_message.contains("too long") {
336 return Err(AiError::InputTooLong(anyhow::anyhow!(
337 "Claude CLI error: {}",
338 error_message
339 )));
340 }
341
342 return Err(AiError::Terminal(anyhow::anyhow!(
343 "Claude CLI error: {}",
344 error_message
345 )));
346 }
347
348 let (content_blocks, usage, stop_reason) =
349 stream_state.finish().map_err(AiError::Terminal)?;
350 tracing::info!(?usage, ?stop_reason, "Claude CLI completed");
351
352 Ok((content_blocks, usage, stop_reason))
353 }
354}
355
356#[async_trait::async_trait]
357impl AiProvider for ClaudeCodeProvider {
358 fn name(&self) -> &'static str {
359 "ClaudeCode"
360 }
361
362 fn supported_models(&self) -> HashSet<Model> {
363 HashSet::from([
364 Model::ClaudeSonnet45,
365 Model::ClaudeOpus46,
366 Model::ClaudeOpus45,
367 Model::ClaudeHaiku45,
368 ])
369 }
370
371 async fn converse(
372 &self,
373 request: ConversationRequest,
374 ) -> Result<ConversationResponse, AiError> {
375 let model_id = self.resolve_model(&request.model.model);
376 let messages = self.build_messages(&request.messages)?;
377 let system_prompt = self
378 .format_system_prompt(&request.system_prompt)
379 .unwrap_or_else(|| String::new());
380 let thinking_budget = self.build_thinking(&request.model.reasoning_budget);
381 let tools = self.build_tools(&request.tools);
382
383 let (content_blocks, usage, stop_reason) = self
384 .invoke_cli(
385 &messages,
386 &model_id,
387 &system_prompt,
388 thinking_budget,
389 tools,
390 request.model.max_tokens,
391 )
392 .await?;
393
394 Ok(ConversationResponse {
395 content: Content::from(content_blocks),
396 usage,
397 stop_reason,
398 })
399 }
400
401 async fn converse_stream(
402 &self,
403 request: ConversationRequest,
404 ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>, AiError> {
405 let model_id = self.resolve_model(&request.model.model);
406 let messages = self.build_messages(&request.messages)?;
407 let system_prompt = self
408 .format_system_prompt(&request.system_prompt)
409 .unwrap_or_else(|| String::new());
410 let thinking_budget = self.build_thinking(&request.model.reasoning_budget);
411 let tools = self.build_tools(&request.tools);
412 let max_tokens = request.model.max_tokens;
413
414 let command_path = self.command.clone();
415 let additional_args = self.additional_args.clone();
416 let env = self.env.clone();
417
418 let stream = async_stream::stream! {
419 let mut command = Command::new(&command_path);
420 command.arg("chat").arg("--print").arg("--model").arg(&model_id);
421
422 if !system_prompt.is_empty() {
423 command.arg("--system-prompt").arg(&system_prompt);
424 }
425
426 command
427 .arg("--output-format")
428 .arg("stream-json")
429 .arg("--verbose")
430 .arg("--max-turns")
431 .arg("1")
432 .arg("--disallowed-tools")
433 .arg("Bash,Edit,Read,WebSearch,Grep,Glob,Task,Write,NotebookEdit,WebFetch,BashOutput,KillShell,Skill,SlashCommand,TodoWrite,EnterPlanMode,ExitPlanMode,AskUserQuestion,TaskOutput");
434
435 command.env("NO_COLOR", "1");
436
437 if let Some(thinking) = thinking_budget.as_ref() {
438 command.env("MAX_THINKING_TOKENS", thinking.budget_tokens.to_string());
439 }
440
441 for arg in &additional_args {
442 command.arg(arg);
443 }
444
445 for (key, value) in &env {
446 command.env(key, value);
447 }
448
449 let mut child = match command
450 .stdin(Stdio::piped())
451 .stdout(Stdio::piped())
452 .stderr(Stdio::piped())
453 .kill_on_drop(true)
454 .spawn()
455 {
456 Ok(c) => c,
457 Err(err) => {
458 yield Err(AiError::Terminal(anyhow::anyhow!(
459 "Failed to spawn Claude Code CLI: {err}"
460 )));
461 return;
462 }
463 };
464
465 let mut stdin = match child.stdin.take() {
466 Some(s) => s,
467 None => {
468 yield Err(AiError::Terminal(anyhow::anyhow!("Claude CLI stdin unavailable")));
469 return;
470 }
471 };
472
473 let mut cli_request = serde_json::json!({
474 "model": model_id,
475 "messages": messages,
476 });
477
478 if let Some(mt) = max_tokens {
479 cli_request["max_tokens"] = serde_json::json!(mt);
480 }
481
482 if let Some(t) = tools {
483 cli_request["tools"] = serde_json::json!(t);
484 }
485
486 if let Some(thinking) = &thinking_budget {
487 cli_request["thinking"] = serde_json::json!(thinking);
488 }
489
490 let request_json = match serde_json::to_vec(&cli_request) {
491 Ok(j) => j,
492 Err(err) => {
493 yield Err(AiError::Terminal(anyhow::anyhow!("Failed to serialize request: {err}")));
494 return;
495 }
496 };
497
498 if let Err(err) = stdin.write_all(&request_json).await {
499 yield Err(AiError::Terminal(anyhow::anyhow!("Failed to write to stdin: {err}")));
500 return;
501 }
502 if let Err(err) = stdin.flush().await {
503 yield Err(AiError::Terminal(anyhow::anyhow!("Failed to flush stdin: {err}")));
504 return;
505 }
506 drop(stdin);
507
508 let stdout = match child.stdout.take() {
509 Some(s) => s,
510 None => {
511 yield Err(AiError::Terminal(anyhow::anyhow!("Claude CLI stdout unavailable")));
512 return;
513 }
514 };
515
516 let stderr = match child.stderr.take() {
517 Some(s) => s,
518 None => {
519 yield Err(AiError::Terminal(anyhow::anyhow!("Claude CLI stderr unavailable")));
520 return;
521 }
522 };
523
524 let mut stdout_reader = BufReader::new(stdout).lines();
525 let stderr_handle: JoinHandle<Result<String, std::io::Error>> = tokio::spawn(async move {
526 let mut buf = String::new();
527 let mut reader = BufReader::new(stderr);
528 reader.read_to_string(&mut buf).await?;
529 Ok(buf)
530 });
531
532 let mut stream_state = StreamState::default();
533
534 loop {
535 let line = match stdout_reader.next_line().await {
536 Ok(Some(line)) => line,
537 Ok(None) => break,
538 Err(err) => {
539 yield Err(AiError::Retryable(anyhow::anyhow!(
540 "Failed reading Claude CLI stdout: {err}"
541 )));
542 return;
543 }
544 };
545
546 if line.trim().is_empty() {
547 continue;
548 }
549
550 let value: Value = match serde_json::from_str(&line) {
551 Ok(v) => v,
552 Err(err) => {
553 yield Err(AiError::Terminal(anyhow::anyhow!(
554 "Failed to parse Claude CLI output: {err}. Line: {line}"
555 )));
556 return;
557 }
558 };
559
560 let event = match ParsedEvent::from_value(value) {
561 Ok(e) => e,
562 Err(err) => {
563 yield Err(AiError::Terminal(anyhow::anyhow!(
564 "Failed to interpret Claude CLI event: {err}"
565 )));
566 return;
567 }
568 };
569
570 for stream_event in stream_events_from_parsed(&event) {
571 yield Ok(stream_event);
572 }
573
574 let should_stop = match stream_state.handle_event(event) {
575 Ok(stop) => stop,
576 Err(err) => {
577 yield Err(AiError::Terminal(anyhow::anyhow!(
578 "Error processing Claude CLI event: {err}"
579 )));
580 return;
581 }
582 };
583
584 if should_stop {
585 break;
586 }
587 }
588
589 let status = match child.wait().await {
590 Ok(s) => s,
591 Err(err) => {
592 yield Err(AiError::Retryable(anyhow::anyhow!(
593 "Failed waiting for Claude CLI: {err}"
594 )));
595 return;
596 }
597 };
598
599 let stderr_output = match stderr_handle.await {
600 Ok(Ok(text)) => text,
601 _ => String::new(),
602 };
603
604 if !status.success() {
605 let error_message = stream_state
606 .error_message
607 .as_deref()
608 .unwrap_or_else(|| stderr_output.trim());
609
610 yield Err(AiError::Terminal(anyhow::anyhow!(
611 "Claude CLI error: {}",
612 error_message
613 )));
614 return;
615 }
616
617 let (content_blocks, usage, stop_reason) = match stream_state.finish() {
618 Ok(result) => result,
619 Err(err) => {
620 yield Err(AiError::Terminal(err));
621 return;
622 }
623 };
624
625 yield Ok(StreamEvent::MessageComplete {
626 response: ConversationResponse {
627 content: Content::from(content_blocks),
628 usage,
629 stop_reason,
630 },
631 });
632 };
633
634 Ok(Box::pin(stream))
635 }
636
637 fn get_cost(&self, model: &Model) -> Cost {
638 match model {
639 Model::ClaudeOpus46 => Cost::new(5.0, 25.0, 6.25, 0.5),
640 Model::ClaudeOpus45 => Cost::new(5.0, 25.0, 6.25, 0.5),
641 Model::ClaudeSonnet45 => Cost::new(3.0, 15.0, 3.75, 0.3),
642 Model::ClaudeHaiku45 => Cost::new(1.0, 5.0, 1.25, 0.1),
643 _ => Cost::new(0.0, 0.0, 0.0, 0.0),
644 }
645 }
646
647 fn tweaks(&self) -> ModelTweaks {
648 ModelTweaks {
649 tool_call_style: Some(ToolCallStyle::Xml),
650 ..Default::default()
651 }
652 }
653}
654
655#[derive(Debug, Serialize)]
656struct ClaudeMessage {
657 role: String,
658 content: Vec<ClaudeContentBlock>,
659}
660
661#[derive(Debug, Serialize)]
662#[serde(tag = "type")]
663enum ClaudeContentBlock {
664 #[serde(rename = "text")]
665 Text { text: String },
666 #[serde(rename = "thinking")]
667 Thinking { text: String },
668 #[serde(rename = "tool_use")]
669 ToolUse {
670 id: String,
671 name: String,
672 input: Value,
673 },
674 #[serde(rename = "tool_result")]
675 ToolResult {
676 tool_use_id: String,
677 content: Vec<ClaudeToolResultContent>,
678 #[serde(skip_serializing_if = "Option::is_none")]
679 is_error: Option<bool>,
680 },
681 #[serde(rename = "image")]
682 Image { source: ClaudeImageSource },
683}
684
685#[derive(Debug, Serialize)]
686#[serde(tag = "type")]
687enum ClaudeToolResultContent {
688 #[serde(rename = "output_text")]
689 OutputText { text: String },
690}
691
692#[derive(Debug, Serialize)]
693struct ClaudeImageSource {
694 #[serde(rename = "type")]
695 source_type: String,
696 media_type: String,
697 data: String,
698}
699
700#[derive(Debug, Serialize)]
701struct ClaudeToolDefinition {
702 name: String,
703 description: String,
704 input_schema: Value,
705}
706
707#[derive(Debug, Serialize)]
708struct ClaudeThinking {
709 #[serde(rename = "type")]
710 thinking_type: String,
711 budget_tokens: u32,
712}
713
714#[derive(Default)]
715struct StreamState {
716 content_blocks: Vec<ContentBlock>,
717 pending_blocks: HashMap<usize, PendingBlock>,
718 usage: Option<TokenUsage>,
719 stop_reason: Option<StopReason>,
720 pending_stop_sequence: Option<String>,
721 error_message: Option<String>,
722}
723
724impl StreamState {
725 fn merge_usage(&mut self, new_usage: TokenUsage) {
726 if let Some(existing) = &mut self.usage {
727 existing.input_tokens += new_usage.input_tokens;
728 existing.output_tokens += new_usage.output_tokens;
729 existing.total_tokens += new_usage.total_tokens;
730 if let Some(new_cached) = new_usage.cached_prompt_tokens {
731 existing.cached_prompt_tokens =
732 Some(existing.cached_prompt_tokens.unwrap_or(0) + new_cached);
733 }
734 if let Some(new_cache_creation) = new_usage.cache_creation_input_tokens {
735 existing.cache_creation_input_tokens =
736 Some(existing.cache_creation_input_tokens.unwrap_or(0) + new_cache_creation);
737 }
738 if let Some(new_reasoning) = new_usage.reasoning_tokens {
739 existing.reasoning_tokens =
740 Some(existing.reasoning_tokens.unwrap_or(0) + new_reasoning);
741 }
742 } else {
743 self.usage = Some(new_usage);
744 }
745 }
746
747 fn handle_event(&mut self, event: ParsedEvent) -> Result<bool, anyhow::Error> {
748 match event {
749 ParsedEvent::Assistant(data) => {
750 for block in data.message.content {
751 match block {
752 AssistantContentBlock::Text { text } => {
753 if !text.trim().is_empty() {
754 self.content_blocks.push(ContentBlock::Text(text));
755 }
756 }
757 AssistantContentBlock::Thinking { text } => {
758 if !text.trim().is_empty() {
759 self.content_blocks.push(ContentBlock::ReasoningContent(
760 ReasoningData {
761 text,
762 signature: None,
763 blob: None,
764 raw_json: None,
765 },
766 ));
767 }
768 }
769 AssistantContentBlock::ToolUse { id, name, input } => {
770 self.content_blocks.push(ContentBlock::ToolUse(ToolUseData {
771 id,
772 name,
773 arguments: input,
774 }));
775 if self.stop_reason.is_none() {
776 self.stop_reason = Some(StopReason::ToolUse);
777 }
778 }
779 }
780 }
781
782 if let Some(usage) = data.message.usage {
783 self.merge_usage(usage.into());
784 }
785
786 if let Some(stop_reason) = data.message.stop_reason {
787 self.stop_reason =
788 Some(map_stop_reason(&stop_reason, data.message.stop_sequence));
789 }
790 }
791 ParsedEvent::MessageStart(data) => {
792 if let Some(usage) = data.message.and_then(|m| m.usage) {
793 self.merge_usage(usage.into());
794 }
795 }
796 ParsedEvent::MessageDelta(delta) => {
797 if let Some(usage) = delta.delta.usage {
798 self.merge_usage(usage.into());
799 }
800 if let Some(stop_reason) = delta.delta.stop_reason {
801 let stop_sequence = delta.delta.stop_sequence;
802 self.stop_reason = Some(map_stop_reason(&stop_reason, stop_sequence.clone()));
803 self.pending_stop_sequence = stop_sequence;
804 }
805 }
806 ParsedEvent::MessageStop(data) => {
807 if let Some(usage) = data
808 .usage
809 .or_else(|| data.message.as_ref().and_then(|m| m.usage.clone()))
810 {
811 self.merge_usage(usage.into());
812 }
813
814 if let Some(reason) = data
815 .stop_reason
816 .or_else(|| data.message.as_ref().and_then(|m| m.stop_reason.clone()))
817 {
818 self.stop_reason = Some(map_stop_reason(
819 &reason,
820 data.stop_sequence.or_else(|| {
821 data.message.as_ref().and_then(|m| m.stop_sequence.clone())
822 }),
823 ));
824 }
825
826 return Ok(true);
827 }
828 ParsedEvent::ContentBlockStart(start) => {
829 let index = start.index;
830 match start.content_block.r#type.as_str() {
831 "text" => {
832 self.pending_blocks
833 .insert(index, PendingBlock::Text(String::new()));
834 }
835 "thinking" | "reasoning" => {
836 self.pending_blocks
837 .insert(index, PendingBlock::Thinking(String::new()));
838 }
839 "tool_use" => {
840 let id = start
841 .content_block
842 .id
843 .clone()
844 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
845 let name = start
846 .content_block
847 .name
848 .clone()
849 .unwrap_or_else(|| "unknown_tool".to_string());
850
851 let buffer = String::new();
853
854 self.pending_blocks
855 .insert(index, PendingBlock::ToolUse { id, name, buffer });
856 }
857 other => {
858 tracing::warn!("Unsupported content block type from Claude CLI: {other}");
859 }
860 }
861 }
862 ParsedEvent::ContentBlockDelta(delta) => {
863 if let Some(entry) = self.pending_blocks.get_mut(&delta.index) {
864 match (entry, delta.delta) {
865 (PendingBlock::Text(buffer), BlockDelta::TextDelta { text }) => {
866 buffer.push_str(&text);
867 }
868 (PendingBlock::Thinking(buffer), BlockDelta::ThinkingDelta { text }) => {
869 buffer.push_str(&text);
870 }
871 (
872 PendingBlock::ToolUse { buffer, .. },
873 BlockDelta::InputJsonDelta { partial_json },
874 ) => {
875 buffer.push_str(&partial_json);
876 }
877 (entry, BlockDelta::Other(value)) => {
878 tracing::warn!("Unhandled content_block_delta: {:?}", value);
879 if let PendingBlock::ToolUse { buffer, .. } = entry {
880 if let Ok(fragment) = serde_json::to_string(&value) {
881 buffer.push_str(&fragment);
882 }
883 }
884 }
885 _ => {
886 tracing::warn!("Ignoring unsupported delta for block");
887 }
888 }
889 }
890 }
891 ParsedEvent::ContentBlockStop(stop) => {
892 if let Some(entry) = self.pending_blocks.remove(&stop.index) {
893 match entry {
894 PendingBlock::Text(buffer) => {
895 if !buffer.trim().is_empty() {
896 self.content_blocks
897 .push(ContentBlock::Text(buffer.trim().to_string()));
898 }
899 }
900 PendingBlock::Thinking(buffer) => {
901 if !buffer.trim().is_empty() {
902 self.content_blocks.push(ContentBlock::ReasoningContent(
903 ReasoningData {
904 text: buffer.trim().to_string(),
905 signature: None,
906 blob: None,
907 raw_json: None,
908 },
909 ));
910 }
911 }
912 PendingBlock::ToolUse { id, name, buffer } => {
913 let arguments = if buffer.trim().is_empty() {
914 Value::Null
915 } else {
916 serde_json::from_str(&buffer).with_context(|| {
917 format!("Failed to parse tool_use input JSON: {buffer}")
918 })?
919 };
920 self.content_blocks.push(ContentBlock::ToolUse(ToolUseData {
921 id,
922 name,
923 arguments,
924 }));
925 if self.stop_reason.is_none() {
926 self.stop_reason = Some(StopReason::ToolUse);
927 }
928 }
929 }
930 }
931 }
932 ParsedEvent::Result(result) => {
933 if let Some(usage) = result.usage {
934 self.merge_usage(usage.into());
935 }
936 if result.is_error.unwrap_or(false) {
937 if let Some(error_msg) = result.result {
938 self.error_message = Some(error_msg);
939 }
940 }
941 return Ok(true);
942 }
943 ParsedEvent::Error(error) => {
944 return Err(anyhow::anyhow!(
945 "Claude CLI reported error: {}",
946 error.message
947 ));
948 }
949 ParsedEvent::Ping => {}
950 }
951
952 Ok(false)
953 }
954
955 fn finish(mut self) -> Result<(Vec<ContentBlock>, TokenUsage, StopReason), anyhow::Error> {
956 if !self.pending_blocks.is_empty() {
957 for (_, entry) in self.pending_blocks.drain() {
958 match entry {
959 PendingBlock::Text(buffer) if !buffer.is_empty() => {
960 self.content_blocks
961 .push(ContentBlock::Text(buffer.trim().to_string()));
962 }
963 PendingBlock::Thinking(buffer) if !buffer.is_empty() => {
964 self.content_blocks
965 .push(ContentBlock::ReasoningContent(ReasoningData {
966 text: buffer.trim().to_string(),
967 signature: None,
968 blob: None,
969 raw_json: None,
970 }));
971 }
972 PendingBlock::ToolUse { .. } => {
973 tracing::warn!("Incomplete tool_use block from Claude CLI");
975 }
976 _ => {}
977 }
978 }
979 }
980
981 let usage = self.usage.unwrap_or_else(TokenUsage::empty);
982 let stop_reason = self.stop_reason.unwrap_or_else(|| {
983 if let Some(stop_sequence) = self.pending_stop_sequence {
984 StopReason::StopSequence(stop_sequence)
985 } else {
986 StopReason::EndTurn
987 }
988 });
989
990 Ok((self.content_blocks, usage, stop_reason))
991 }
992}
993
994#[derive(Debug)]
995enum ParsedEvent {
996 MessageStart(MessageStartEvent),
997 MessageDelta(MessageDeltaEvent),
998 MessageStop(MessageStopEvent),
999 ContentBlockStart(ContentBlockStartEvent),
1000 ContentBlockDelta(ContentBlockDeltaEvent),
1001 ContentBlockStop(ContentBlockStopEvent),
1002 Assistant(AssistantMessageEvent),
1003 Result(ResultEvent),
1004 Error(ClaudeErrorEvent),
1005 Ping,
1006}
1007
1008impl ParsedEvent {
1009 fn from_value(value: Value) -> Result<Self, anyhow::Error> {
1010 if let Some(event_type) = value.get("type").and_then(|v| v.as_str()) {
1011 match event_type {
1012 "event" => {
1013 let event_name = value
1014 .get("event")
1015 .and_then(|v| v.as_str())
1016 .unwrap_or("")
1017 .to_string();
1018 let data = value.get("data").cloned().unwrap_or(Value::Null);
1019 ParsedEvent::from_event_name(&event_name, data)
1020 }
1021 other => ParsedEvent::from_event_name(other, value.clone()),
1022 }
1023 } else {
1024 Err(anyhow::anyhow!("Claude CLI event missing 'type' field"))
1025 }
1026 }
1027
1028 fn from_event_name(event: &str, data: Value) -> Result<Self, anyhow::Error> {
1029 match event {
1030 "message_start" => Ok(ParsedEvent::MessageStart(serde_json::from_value(data)?)),
1031 "message_delta" => Ok(ParsedEvent::MessageDelta(serde_json::from_value(data)?)),
1032 "message_stop" => Ok(ParsedEvent::MessageStop(serde_json::from_value(data)?)),
1033 "content_block_start" => Ok(ParsedEvent::ContentBlockStart(serde_json::from_value(
1034 data,
1035 )?)),
1036 "content_block_delta" => Ok(ParsedEvent::ContentBlockDelta(serde_json::from_value(
1037 data,
1038 )?)),
1039 "content_block_stop" => {
1040 Ok(ParsedEvent::ContentBlockStop(serde_json::from_value(data)?))
1041 }
1042 "assistant" => Ok(ParsedEvent::Assistant(serde_json::from_value(data)?)),
1043 "result" => Ok(ParsedEvent::Result(serde_json::from_value(data)?)),
1044 "error" => Ok(ParsedEvent::Error(serde_json::from_value(data)?)),
1045 "ping" => Ok(ParsedEvent::Ping),
1046 "system" => Ok(ParsedEvent::Ping), other => {
1048 tracing::warn!("Unhandled Claude CLI event type: {other}");
1049 Ok(ParsedEvent::Ping)
1050 }
1051 }
1052 }
1053}
1054
1055#[derive(Debug, Deserialize)]
1056struct MessageStartEvent {
1057 message: Option<MessageStartData>,
1058}
1059
1060#[derive(Debug, Deserialize)]
1061struct MessageStartData {
1062 #[serde(default)]
1063 usage: Option<ClaudeUsage>,
1064}
1065
1066#[derive(Debug, Deserialize)]
1067struct MessageDeltaEvent {
1068 delta: MessageDeltaData,
1069}
1070
1071#[derive(Debug, Deserialize)]
1072struct MessageDeltaData {
1073 #[serde(default)]
1074 usage: Option<ClaudeUsage>,
1075 #[serde(default)]
1076 stop_reason: Option<String>,
1077 #[serde(default)]
1078 stop_sequence: Option<String>,
1079}
1080
1081#[derive(Debug, Deserialize)]
1082struct MessageStopEvent {
1083 #[serde(default)]
1084 message: Option<MessageStopData>,
1085 #[serde(default)]
1086 usage: Option<ClaudeUsage>,
1087 #[serde(default)]
1088 stop_reason: Option<String>,
1089 #[serde(default)]
1090 stop_sequence: Option<String>,
1091}
1092
1093#[derive(Debug, Deserialize)]
1094struct MessageStopData {
1095 #[serde(default)]
1096 usage: Option<ClaudeUsage>,
1097 #[serde(default)]
1098 stop_reason: Option<String>,
1099 #[serde(default)]
1100 stop_sequence: Option<String>,
1101}
1102
1103#[derive(Debug, Deserialize)]
1104struct AssistantMessageEvent {
1105 message: AssistantMessage,
1106}
1107
1108#[derive(Debug, Deserialize)]
1109struct AssistantMessage {
1110 content: Vec<AssistantContentBlock>,
1111 #[serde(default)]
1112 usage: Option<ClaudeUsage>,
1113 #[serde(default)]
1114 stop_reason: Option<String>,
1115 #[serde(default)]
1116 stop_sequence: Option<String>,
1117}
1118
1119#[derive(Debug, Deserialize)]
1120#[serde(tag = "type")]
1121enum AssistantContentBlock {
1122 #[serde(rename = "text")]
1123 Text {
1124 #[serde(default)]
1125 text: String,
1126 },
1127 #[serde(rename = "thinking")]
1128 Thinking {
1129 #[serde(default, rename = "thinking")]
1130 text: String,
1131 },
1132 #[serde(rename = "tool_use")]
1133 ToolUse {
1134 id: String,
1135 name: String,
1136 input: Value,
1137 },
1138}
1139
1140#[derive(Debug, Deserialize)]
1141struct ContentBlockStartEvent {
1142 index: usize,
1143 content_block: ContentBlockPayload,
1144}
1145
1146#[derive(Debug, Deserialize, Clone)]
1147struct ContentBlockPayload {
1148 #[serde(rename = "type")]
1149 r#type: String,
1150 #[serde(default)]
1151 id: Option<String>,
1152 #[serde(default)]
1153 name: Option<String>,
1154}
1155
1156#[derive(Debug, Deserialize)]
1157struct ContentBlockDeltaEvent {
1158 index: usize,
1159 delta: BlockDelta,
1160}
1161
1162#[derive(Debug)]
1163enum BlockDelta {
1164 TextDelta { text: String },
1165 ThinkingDelta { text: String },
1166 InputJsonDelta { partial_json: String },
1167 Other(Value),
1168}
1169
1170impl<'de> Deserialize<'de> for BlockDelta {
1171 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1172 where
1173 D: serde::Deserializer<'de>,
1174 {
1175 let value = Value::deserialize(deserializer)?;
1176 let delta_type = value.get("type").and_then(Value::as_str).unwrap_or("");
1177
1178 match delta_type {
1179 "text_delta" => {
1180 let text = value
1181 .get("text")
1182 .and_then(Value::as_str)
1183 .unwrap_or_default()
1184 .to_string();
1185 Ok(BlockDelta::TextDelta { text })
1186 }
1187 "thinking_delta" | "reasoning_delta" => {
1188 let text = value
1189 .get("text")
1190 .and_then(Value::as_str)
1191 .unwrap_or_default()
1192 .to_string();
1193 Ok(BlockDelta::ThinkingDelta { text })
1194 }
1195 "input_json_delta" | "output_json_delta" => {
1196 let partial_json = value
1197 .get("partial_json")
1198 .and_then(Value::as_str)
1199 .unwrap_or_default()
1200 .to_string();
1201 Ok(BlockDelta::InputJsonDelta { partial_json })
1202 }
1203 _ => Ok(BlockDelta::Other(value)),
1204 }
1205 }
1206}
1207
1208#[derive(Debug, Deserialize)]
1209struct ContentBlockStopEvent {
1210 index: usize,
1211}
1212
1213#[derive(Debug, Deserialize)]
1214struct ResultEvent {
1215 #[serde(default)]
1216 usage: Option<ClaudeUsage>,
1217 #[serde(default)]
1218 is_error: Option<bool>,
1219 #[serde(default)]
1220 result: Option<String>,
1221}
1222
1223#[derive(Debug, Deserialize)]
1224struct ClaudeErrorEvent {
1225 message: String,
1226}
1227
1228#[derive(Debug, Deserialize, Default, Clone)]
1229struct ClaudeUsage {
1230 #[serde(default)]
1231 input_tokens: u32,
1232 #[serde(default)]
1233 output_tokens: u32,
1234 #[serde(default)]
1235 cache_read_input_tokens: Option<u32>,
1236 #[serde(default)]
1237 cache_creation_input_tokens: Option<u32>,
1238 #[serde(default, alias = "thinking_tokens")]
1239 reasoning_tokens: Option<u32>,
1240}
1241
1242impl From<ClaudeUsage> for TokenUsage {
1243 fn from(usage: ClaudeUsage) -> Self {
1244 let reasoning = usage.reasoning_tokens;
1245 TokenUsage {
1246 input_tokens: usage.input_tokens,
1247 output_tokens: usage.output_tokens,
1248 total_tokens: usage.input_tokens + usage.output_tokens + reasoning.unwrap_or(0),
1249 cached_prompt_tokens: usage.cache_read_input_tokens,
1250 cache_creation_input_tokens: usage.cache_creation_input_tokens,
1251 reasoning_tokens: reasoning,
1252 }
1253 }
1254}
1255
1256enum PendingBlock {
1257 Text(String),
1258 Thinking(String),
1259 ToolUse {
1260 id: String,
1261 name: String,
1262 buffer: String,
1263 },
1264}
1265
1266fn stream_events_from_parsed(event: &ParsedEvent) -> Vec<StreamEvent> {
1267 match event {
1268 ParsedEvent::ContentBlockStart(_) => vec![StreamEvent::ContentBlockStart],
1269 ParsedEvent::ContentBlockStop(_) => vec![StreamEvent::ContentBlockStop],
1270 ParsedEvent::ContentBlockDelta(delta) => stream_events_from_delta(&delta.delta),
1271 _ => vec![],
1272 }
1273}
1274
1275fn stream_events_from_delta(delta: &BlockDelta) -> Vec<StreamEvent> {
1276 match delta {
1277 BlockDelta::TextDelta { text } if !text.is_empty() => {
1278 vec![StreamEvent::TextDelta { text: text.clone() }]
1279 }
1280 BlockDelta::ThinkingDelta { text } if !text.is_empty() => {
1281 vec![StreamEvent::ReasoningDelta { text: text.clone() }]
1282 }
1283 _ => vec![],
1284 }
1285}
1286
1287fn map_stop_reason(reason: &str, stop_sequence: Option<String>) -> StopReason {
1288 match reason {
1289 "end_turn" => StopReason::EndTurn,
1290 "max_tokens" => StopReason::MaxTokens,
1291 "stop_sequence" => {
1292 let seq = stop_sequence.unwrap_or_else(|| "".to_string());
1293 StopReason::StopSequence(seq)
1294 }
1295 "tool_use" => StopReason::ToolUse,
1296 other => {
1297 tracing::warn!("Unknown Claude stop_reason: {other}");
1298 StopReason::EndTurn
1299 }
1300 }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305 use tracing::debug;
1306
1307 use super::*;
1308 use crate::ai::tests::{
1309 test_hello_world, test_reasoning_conversation, test_reasoning_with_tools, test_tool_usage,
1310 };
1311 use std::collections::HashMap;
1312 use std::path::PathBuf;
1313
1314 async fn create_claude_provider() -> anyhow::Result<ClaudeCodeProvider> {
1315 Ok(ClaudeCodeProvider::new(
1316 PathBuf::from("claude"),
1317 Vec::new(),
1318 HashMap::new(),
1319 ))
1320 }
1321
1322 #[tokio::test]
1323 #[ignore = "requires local Claude CLI"]
1324 async fn test_claude_hello_world() {
1325 let provider = match create_claude_provider().await {
1326 Ok(provider) => provider,
1327 Err(e) => {
1328 debug!(?e, "Failed to create Claude Code provider");
1329 panic!("Failed to create Claude Code provider: {e:?}");
1330 }
1331 };
1332
1333 if let Err(e) = test_hello_world(provider.clone()).await {
1334 debug!(?e, "Claude Code hello world test failed");
1335 panic!("Claude Code hello world test failed: {e:?}");
1336 }
1337 }
1338
1339 #[tokio::test]
1340 #[ignore = "requires local Claude CLI"]
1341 async fn test_claude_reasoning_conversation() {
1342 let provider = match create_claude_provider().await {
1343 Ok(provider) => provider,
1344 Err(e) => {
1345 debug!(?e, "Failed to create Claude Code provider");
1346 panic!("Failed to create Claude Code provider: {e:?}");
1347 }
1348 };
1349
1350 if let Err(e) = test_reasoning_conversation(provider.clone()).await {
1351 debug!(?e, "Claude Code reasoning conversation test failed");
1352 panic!("Claude Code reasoning conversation test failed: {e:?}");
1353 }
1354 }
1355
1356 #[tokio::test]
1357 #[ignore = "requires local Claude CLI"]
1358 async fn test_claude_tool_usage() {
1359 let provider = match create_claude_provider().await {
1360 Ok(provider) => provider,
1361 Err(e) => {
1362 debug!(?e, "Failed to create Claude Code provider");
1363 panic!("Failed to create Claude Code provider: {e:?}");
1364 }
1365 };
1366
1367 if let Err(e) = test_tool_usage(provider.clone()).await {
1368 debug!(?e, "Claude Code tool usage test failed");
1369 panic!("Claude Code tool usage test failed: {e:?}");
1370 }
1371 }
1372
1373 #[tokio::test]
1374 #[ignore = "requires local Claude CLI"]
1375 async fn test_claude_reasoning_with_tools() {
1376 let provider = match create_claude_provider().await {
1377 Ok(provider) => provider,
1378 Err(e) => {
1379 debug!(?e, "Failed to create Claude Code provider");
1380 panic!("Failed to create Claude Code provider: {e:?}");
1381 }
1382 };
1383
1384 if let Err(e) = test_reasoning_with_tools(provider).await {
1385 debug!(?e, "Claude Code reasoning with tools test failed");
1386 panic!("Claude Code reasoning with tools test failed: {e:?}");
1387 }
1388 }
1389}