phi_core/provider/anthropic.rs
1//! Anthropic Claude provider (Messages API with streaming)
2/*
3ARCHITECTURE: AnthropicProvider — one struct, one job
4
5`AnthropicProvider` is a zero-field unit struct (no state). All behaviour
6lives in the `StreamProvider::stream()` method. The provider is stateless:
7it doesn't cache connections or store conversation history — that's the
8agent loop's responsibility.
9
10The overall flow of `stream()`:
11 1. Build the JSON request body (messages → Anthropic format, prompt caching)
12 2. Start an HTTP POST with streaming enabled (reqwest EventSource)
13 3. Process SSE events in a loop until "message_stop" or error
14 4. Assemble the complete `Message::Assistant` from accumulated content + usage
15 5. Send `StreamEvent::Done` on the channel; return the `Message`
16
17ARCHITECTURE: Anthropic SSE event sequence
18
19The Anthropic streaming API emits events in this order:
20 message_start — response metadata, initial input token usage
21 content_block_start — a new content block begins (text / thinking / tool_use)
22 content_block_delta* — incremental content for the current block
23 content_block_stop — content block complete
24 message_delta — final stop_reason and output token usage
25 message_stop — stream ended
26
27Multiple content blocks may interleave (text + tool_use simultaneously):
28 content_block_start(0, text)
29 content_block_delta(0, text_delta: "Hello ")
30 content_block_start(1, tool_use: {id, name})
31 content_block_delta(1, input_json_delta: "{\"cmd")
32 content_block_delta(0, text_delta: "world")
33 content_block_delta(1, input_json_delta: "\": \"ls\"}")
34 content_block_stop(0)
35 content_block_stop(1)
36 message_delta(stop_reason: "tool_use")
37 message_stop
38
39ARCHITECTURE: Prompt caching
40
41Anthropic supports prompt caching with `"cache_control": {"type": "ephemeral"}`
42markers on system/tools/messages. The provider places up to 3 cache breakpoints
43(see `build_request_body` comments). On cache hits, the API returns reduced
44`cache_read_input_tokens` billing and lower latency.
45*/
46
47use super::traits::*;
48use crate::types::*;
49use async_trait::async_trait;
50use futures::StreamExt;
51use reqwest_eventsource::{Event, EventSource};
52use serde::Deserialize;
53use tokio::sync::mpsc;
54use tracing::{debug, warn};
55
56/// Default Anthropic Messages API endpoint.
57const API_URL: &str = "https://api.anthropic.com/v1/messages";
58/// Required `anthropic-version` header value.
59const API_VERSION: &str = "2023-06-01";
60
61/// Unit struct — no fields, no state. All logic is in the `StreamProvider` impl.
62pub struct AnthropicProvider;
63
64#[async_trait]
65impl StreamProvider for AnthropicProvider {
66 fn provider_id(&self) -> &str {
67 "anthropic"
68 }
69
70 async fn stream(
71 &self,
72 config: StreamConfig, // REQUEST — includes api_key (sk-ant-* or sk-ant-oat* for OAuth)
73 tx: mpsc::UnboundedSender<StreamEvent>, // OBSERVER — receives SSE events as they arrive
74 cancel: tokio_util::sync::CancellationToken, // ABORT — races against SSE stream in tokio::select!
75 ) -> Result<Message, ProviderError> {
76 /*
77 ARCHITECTURE: OAuth vs API key auth
78
79 Anthropic supports two auth modes:
80 1. API key (`sk-ant-...`) — simple `x-api-key` header; for direct API access
81 2. OAuth token (`sk-ant-oat...`) — `Authorization: Bearer ...` header;
82 used by Claude Code CLI (authenticated via claude.ai). Adds extra identity
83 headers so Anthropic can attribute usage to the Claude Code product.
84
85 We detect OAuth by prefix: "sk-ant-oat" in the key. This is fragile but matches
86 the real-world key format Anthropic uses for its own OAuth tokens.
87 */
88 // Resolve via CredentialProvider when set, else fall through to the static `api_key`.
89 // Cached once per stream() call — subsequent OAuth detection + header writes share it.
90 let api_key = config.model_config.resolve_api_key().await?;
91 let is_oauth = api_key.contains("sk-ant-oat");
92 let body = build_request_body(&config, is_oauth);
93 debug!(
94 "Anthropic request: model={}, oauth={}",
95 config.model_config.id, is_oauth
96 );
97
98 /*
99 RUST QUIRK: Builder pattern on `reqwest::Client`
100
101 `reqwest::Client::new()` creates a reusable HTTP client (connection pool, TLS).
102 In production you'd store this in the provider, but for simplicity we create
103 one per call here.
104
105 `client.post(URL).header(...).header(...).json(&body)` is a fluent builder:
106 each method takes `self` by value and returns a new `RequestBuilder`.
107 `.json(&body)` serializes `body` (a `serde_json::Value`) to JSON bytes and
108 sets Content-Type: application/json.
109
110 RUST QUIRK: `let mut builder = ...` — reassigning via `mut`
111 We conditionally add different headers based on `is_oauth`.
112 `builder = builder.header(...)` — the old builder is consumed, the new one with
113 the extra header is returned. This is builder chaining with conditional branches.
114 */
115 let client = reqwest::Client::new();
116 let mut builder = client
117 .post(API_URL)
118 .header("anthropic-version", API_VERSION)
119 .header("content-type", "application/json");
120
121 if is_oauth {
122 // OAuth token — Bearer auth with Claude Code identity headers
123 builder = builder
124 .header("authorization", format!("Bearer {}", api_key))
125 .header(
126 "anthropic-beta",
127 "claude-code-20250219,oauth-2025-04-20,fine-grained-tool-streaming-2025-05-14",
128 )
129 .header("anthropic-dangerous-direct-browser-access", "true")
130 .header("user-agent", "claude-cli/2.1.2 (external, cli)")
131 .header("x-app", "cli");
132 } else {
133 // Standard API key auth
134 builder = builder.header("x-api-key", &api_key);
135 }
136
137 let request = builder.json(&body);
138
139 /*
140 RUST QUIRK: `EventSource::new(request).map_err(|e| ProviderError::Network(...))?`
141
142 `EventSource::new(request)` returns `Result<EventSource, Error>`.
143 `.map_err(|e| ProviderError::Network(e.to_string()))` transforms the error type:
144 `Error` (reqwest_eventsource) → `ProviderError::Network(String)`
145 This is needed because `stream()` must return `Result<Message, ProviderError>`,
146 not `Result<Message, reqwest_eventsource::Error>`.
147
148 `.map_err` is the standard "transform the Err variant, leave Ok untouched" combinator.
149 Python analogy: re-raising as a different exception type.
150
151 The `?` then unwraps the `Ok` or returns the `Err` early.
152 */
153 let mut es =
154 EventSource::new(request).map_err(|e| ProviderError::Network(e.to_string()))?;
155
156 /*
157 ARCHITECTURE: Streaming state — accumulator variables
158
159 We accumulate the full response in these variables across many SSE events:
160 `content` — grows as content_block_start/delta events arrive
161 starts empty; we push new Content variants for each new block
162 `usage` — filled by message_start (input tokens) and message_delta (output tokens)
163 `stop_reason` — updated by message_delta; default is Stop until we know better
164
165 At stream end, these three are assembled into `Message::Assistant { ... }`.
166
167 RUST QUIRK: `Vec<Content>` — a growable array on the heap
168 `Vec::new()` creates an empty vector with no allocation.
169 We `.push()` items as blocks are discovered.
170 `content.get_mut(idx)` — returns `Option<&mut Content>` at index `idx`.
171 Returns None if idx >= len, so we protect with `while content.len() <= idx { push }`.
172
173 RUST QUIRK: `let _ = tx.send(...)` — intentionally ignoring the Result
174 `UnboundedSender::send()` returns `Err` if the receiver is dropped.
175 We don't care — if the UI isn't listening, we still want to continue.
176 `let _ = ` explicitly discards the value, silencing the "unused Result" compiler lint.
177 */
178 let mut content: Vec<Content> = Vec::new();
179 let mut usage = Usage::default();
180 let mut stop_reason = StopReason::Stop;
181
182 let _ = tx.send(StreamEvent::Start); // notify UI that streaming has begun
183
184 /*
185 ARCHITECTURE: The SSE event loop — a streaming state machine
186
187 This `loop` processes Anthropic SSE events one by one. It runs until:
188 - "message_stop" arrives → break and assemble the final Message
189 - "error" event → return an error Message
190 - SSE error → return an error Message
191 - Cancellation → return Err(Cancelled)
192
193 `tokio::select!` races two futures each iteration:
194 1. `cancel.cancelled()` — user pressed Ctrl-C or the agent was aborted
195 2. `es.next()` — the next SSE event from Anthropic's HTTP stream
196
197 The first to complete "wins" and its branch runs. The loser is dropped.
198 This is the idiomatic Rust pattern for "process events but stay interruptible."
199 */
200 loop {
201 tokio::select! {
202 _ = cancel.cancelled() => {
203 es.close();
204 return Err(ProviderError::Cancelled);
205 }
206 event = es.next() => {
207 match event {
208 None => break,
209 Some(Ok(Event::Open)) => {}
210 Some(Ok(Event::Message(msg))) => {
211 /*
212 RUST QUIRK: `msg.event.as_str()` for pattern matching
213 `msg.event` is a `String`. We can't match on `String` directly
214 because Rust's match requires compile-time known sizes.
215 `.as_str()` converts `String` → `&str`, which CAN be matched.
216 Each arm is a string literal (a `&'static str`).
217 Python analogy: `match msg.event:` with `case "message_start":` etc.
218 */
219 match msg.event.as_str() {
220 "message_start" => {
221 /*
222 RUST QUIRK: `if let Ok(data) = serde_json::from_str::<T>(&s)`
223 `serde_json::from_str::<AnthropicMessageStart>(&msg.data)` tries
224 to deserialize the JSON string into our Rust struct.
225 If deserialization succeeds → `Ok(data)`, and `if let Ok(data)` binds it.
226 If it fails → `Err(e)`, and we silently skip this event (no panic).
227 We tolerate partial / unknown events gracefully — the stream continues.
228 `::<AnthropicMessageStart>` is a "turbofish" — explicit type parameter.
229 */
230 if let Ok(data) = serde_json::from_str::<AnthropicMessageStart>(&msg.data) {
231 usage.input = data.message.usage.input_tokens;
232 usage.cache_read = data.message.usage.cache_read_input_tokens;
233 usage.cache_write = data.message.usage.cache_creation_input_tokens;
234 }
235 }
236 "content_block_start" => {
237 if let Ok(data) = serde_json::from_str::<AnthropicContentBlockStart>(&msg.data) {
238 let idx = data.index as usize; // u64 → usize (safe: index is tiny)
239 match data.content_block {
240 AnthropicContentBlock::Text { .. } => {
241 // Pad the content Vec with empty Text blocks up to this index
242 while content.len() <= idx {
243 content.push(Content::Text { text: String::new() });
244 }
245 }
246 AnthropicContentBlock::Thinking { .. } => {
247 while content.len() <= idx {
248 content.push(Content::Thinking { thinking: String::new(), signature: None });
249 }
250 }
251 AnthropicContentBlock::ToolUse { id, name, .. } => {
252 while content.len() <= idx {
253 content.push(Content::ToolCall {
254 id: id.clone(),
255 name: name.clone(),
256 // Placeholder — will hold accumulated JSON fragments
257 arguments: serde_json::Value::Object(Default::default()),
258 });
259 }
260 // Notify the UI that a tool call has started
261 let _ = tx.send(StreamEvent::ToolCallStart {
262 content_index: idx,
263 id,
264 name,
265 });
266 }
267 }
268 }
269 }
270 "content_block_delta" => {
271 if let Ok(data) = serde_json::from_str::<AnthropicContentBlockDelta>(&msg.data) {
272 let idx = data.index as usize;
273 match data.delta {
274 AnthropicDelta::TextDelta { text } => {
275 /*
276 RUST QUIRK: `if let Some(Content::Text { text: ref mut t }) = content.get_mut(idx)`
277 Pattern match with multiple levels at once:
278 - `content.get_mut(idx)` → `Option<&mut Content>`
279 - `Some(...)` unpacks the Option
280 - `Content::Text { text: ref mut t }` destructures the enum variant,
281 binding `t` as a mutable reference to the `text` field
282 - `ref mut` means "bind this field BY mutable reference, don't move it"
283 - `t.push_str(&text)` appends to the string IN PLACE
284
285 Python analogy:
286 block = content[idx]
287 if isinstance(block, TextContent):
288 block.text += text
289 */
290 if let Some(Content::Text { text: ref mut t }) = content.get_mut(idx) {
291 t.push_str(&text);
292 }
293 let _ = tx.send(StreamEvent::TextDelta {
294 content_index: idx,
295 delta: text,
296 });
297 }
298 AnthropicDelta::ThinkingDelta { thinking } => {
299 if let Some(Content::Thinking { thinking: ref mut t, .. }) = content.get_mut(idx) {
300 t.push_str(&thinking);
301 }
302 let _ = tx.send(StreamEvent::ThinkingDelta {
303 content_index: idx,
304 delta: thinking,
305 });
306 }
307 AnthropicDelta::InputJsonDelta { partial_json } => {
308 /*
309 ARCHITECTURE: Tool argument JSON accumulation
310 Anthropic streams tool arguments as partial JSON fragments:
311 chunk 1: "{\"cmd\":"
312 chunk 2: " \"ls -la\"}"
313 We can't parse partial JSON, so we buffer in a hidden
314 `__partial_json` key inside the arguments object.
315 At `content_block_stop`, we parse the full accumulated string.
316
317 Why store it in `arguments` itself? To avoid a separate HashMap
318 of scratch buffers indexed by content_block index.
319 */
320 if let Some(Content::ToolCall { ref mut arguments, .. }) = content.get_mut(idx) {
321 // Append to string buffer stored in arguments
322 // We accumulate the raw JSON string and parse it at content_block_stop
323 let buf = arguments
324 .as_object_mut()
325 .and_then(|o| o.get_mut("__partial_json"))
326 .and_then(|v| v.as_str().map(|s| s.to_string()));
327 let new_buf = format!("{}{}", buf.unwrap_or_default(), partial_json);
328 if let Some(obj) = arguments.as_object_mut() {
329 obj.insert("__partial_json".into(), serde_json::Value::String(new_buf));
330 }
331 }
332 let _ = tx.send(StreamEvent::ToolCallDelta {
333 content_index: idx,
334 delta: partial_json,
335 });
336 }
337 AnthropicDelta::SignatureDelta { signature } => {
338 // Extended thinking: the signature authenticates the thinking block
339 if let Some(Content::Thinking { signature: ref mut s, .. }) = content.get_mut(idx) {
340 *s = Some(signature); // `*s` dereferences the &mut Option<String>
341 }
342 }
343 }
344 }
345 }
346 "content_block_stop" => {
347 if let Ok(data) = serde_json::from_str::<serde_json::Value>(&msg.data) {
348 let idx = data["index"].as_u64().unwrap_or(0) as usize;
349 // Parse accumulated JSON for tool calls
350 if let Some(Content::ToolCall { ref mut arguments, .. }) = content.get_mut(idx) {
351 if let Some(partial) = arguments.as_object()
352 .and_then(|o| o.get("__partial_json"))
353 .and_then(|v| v.as_str())
354 .map(|s| s.to_string())
355 {
356 if let Ok(parsed) = serde_json::from_str(&partial) {
357 *arguments = parsed; // replace placeholder with real parsed JSON
358 } else {
359 warn!("Failed to parse tool call JSON: {}", partial);
360 *arguments = serde_json::Value::Object(Default::default());
361 }
362 }
363 }
364 let _ = tx.send(StreamEvent::ToolCallEnd { content_index: idx });
365 }
366 }
367 "message_delta" => {
368 if let Ok(data) = serde_json::from_str::<AnthropicMessageDelta>(&msg.data) {
369 /*
370 RUST QUIRK: `as_deref()` — converting Option<String> to Option<&str>
371 `data.delta.stop_reason` is `Option<String>`.
372 `.as_deref()` converts it to `Option<&str>` — borrowing the inner string.
373 This lets us match with `Some("tool_use")` etc. without cloning.
374 `match data.delta.stop_reason.as_deref()`:
375 Some("tool_use") → StopReason::ToolUse
376 Some("max_tokens") → StopReason::Length
377 Some("end_turn") | None | Some(_) → StopReason::Stop
378 */
379 stop_reason = match data.delta.stop_reason.as_deref() {
380 Some("tool_use") => StopReason::ToolUse,
381 Some("max_tokens") => StopReason::Length,
382 _ => StopReason::Stop,
383 };
384 usage.output = data.usage.output_tokens;
385 }
386 }
387 "message_stop" => break, // stream complete — exit the loop
388 "ping" => {} // Anthropic sends periodic pings; ignore them
389 "error" => {
390 warn!("Anthropic stream error: {}", msg.data);
391 let err_msg = Message::Assistant {
392 content: vec![Content::Text { text: String::new() }],
393 stop_reason: StopReason::Error,
394 model: config.model_config.id.clone(),
395 provider: "anthropic".into(),
396 usage: usage.clone(),
397 timestamp: now_ms(),
398 error_message: Some(msg.data),
399 };
400 let _ = tx.send(StreamEvent::Error { message: err_msg.clone() });
401 return Ok(err_msg);
402 }
403 other => {
404 debug!("Unknown Anthropic event: {}", other);
405 }
406 }
407 }
408 Some(Err(e)) => {
409 let err_str = e.to_string();
410 warn!("SSE error: {}", err_str);
411 let err_msg = Message::Assistant {
412 content: vec![Content::Text { text: String::new() }],
413 stop_reason: StopReason::Error,
414 model: config.model_config.id.clone(),
415 provider: "anthropic".into(),
416 usage: usage.clone(),
417 timestamp: now_ms(),
418 error_message: Some(err_str),
419 };
420 let _ = tx.send(StreamEvent::Error { message: err_msg.clone() });
421 return Ok(err_msg);
422 }
423 }
424 }
425 }
426 }
427
428 let has_tool_calls = content
429 .iter()
430 .any(|c| matches!(c, Content::ToolCall { .. }));
431 if has_tool_calls {
432 stop_reason = StopReason::ToolUse;
433 }
434
435 let message = Message::Assistant {
436 content,
437 stop_reason,
438 model: config.model_config.id.clone(),
439 provider: "anthropic".into(),
440 usage,
441 timestamp: now_ms(),
442 error_message: None,
443 };
444
445 let _ = tx.send(StreamEvent::Done {
446 message: message.clone(),
447 });
448 Ok(message)
449 }
450}
451
452// ---------------------------------------------------------------------------
453// Anthropic API request/response types
454// ---------------------------------------------------------------------------
455
456/// Builds the JSON request body for the Anthropic Messages API.
457/*
458ARCHITECTURE: build_request_body — translation layer (yo-core types → Anthropic JSON)
459
460The Anthropic API expects a specific JSON format. This function converts:
461 - `Message::User/Assistant/ToolResult` → Anthropic message objects
462 - `Content::Text/Image/Thinking/ToolCall` → Anthropic content blocks
463 - `ThinkingLevel` → `"thinking": { "type": "enabled", "budget_tokens": N }`
464 - `CacheConfig` → `"cache_control": {"type": "ephemeral"}` markers
465
466RUST QUIRK: `serde_json::json!({...})` — macro for inline JSON construction
467 `json!` is a macro that converts a Rust literal into a `serde_json::Value`.
468 It supports Rust expressions inline: `json!({"model": config.model})` → the
469 string value of `config.model` is embedded at the "model" key.
470 Python analogy: dict literals like `{"model": config.model}`.
471
472RUST QUIRK: `&[Content]` — a slice reference as a function parameter
473 `content_to_anthropic(content)` takes `&[Content]`.
474 When called with `content` (a `Vec<Content>`), Rust auto-coerces `Vec<T>` → `&[T]`.
475 The function receives a read-only view of the contents without any allocation.
476*/
477fn build_request_body(
478 config: &StreamConfig, // REQUEST — messages, tools, model, system prompt, cache config
479 is_oauth: bool, // AUTH MODE — true = OAuth (adds claude-code product headers); false = API key only
480) -> serde_json::Value {
481 let mut messages: Vec<serde_json::Value> = Vec::new();
482
483 for msg in &config.messages {
484 match msg {
485 Message::User { content, .. } => {
486 messages.push(serde_json::json!({
487 "role": "user",
488 "content": content_to_anthropic(content),
489 }));
490 }
491 Message::Assistant { content, .. } => {
492 messages.push(serde_json::json!({
493 "role": "assistant",
494 "content": content_to_anthropic(content),
495 }));
496 }
497 Message::ToolResult {
498 tool_call_id,
499 content,
500 is_error,
501 ..
502 } => {
503 let result_content = if content.iter().any(|c| matches!(c, Content::Image { .. })) {
504 // Multi-content with images: use array format
505 serde_json::json!(content_to_anthropic(content))
506 } else {
507 // Text-only: use string shorthand
508 let text = content
509 .iter()
510 .find_map(|c| match c {
511 Content::Text { text } => Some(text.clone()),
512 _ => None,
513 })
514 .unwrap_or_default();
515 serde_json::json!(text)
516 };
517
518 messages.push(serde_json::json!({
519 "role": "user",
520 "content": [{
521 "type": "tool_result",
522 "tool_use_id": tool_call_id,
523 "content": result_content,
524 "is_error": is_error,
525 }],
526 }));
527 }
528 }
529 }
530
531 // -----------------------------------------------------------------------
532 // Prompt caching — place cache_control breakpoints based on CacheConfig.
533 //
534 // Anthropic caches the full prefix (tools → system → messages) up to each
535 // breakpoint. We use up to 3 breakpoints:
536 // 1. System prompt (stable across turns)
537 // 2. Last tool definition (tools rarely change)
538 // 3. Second-to-last message (conversation history grows, cache the prefix)
539 //
540 // When caching is disabled or strategy is Disabled, no markers are added.
541 // -----------------------------------------------------------------------
542 let cache = &config.cache_config;
543 let caching_enabled = cache.enabled && cache.strategy != CacheStrategy::Disabled;
544 let (cache_system, cache_tools, cache_messages) = match &cache.strategy {
545 CacheStrategy::Auto => (true, true, true),
546 CacheStrategy::Disabled => (false, false, false),
547 CacheStrategy::Manual {
548 cache_system,
549 cache_tools,
550 cache_messages,
551 } => (*cache_system, *cache_tools, *cache_messages),
552 };
553
554 // Breakpoint 3: second-to-last message (cache conversation prefix)
555 if caching_enabled && cache_messages && messages.len() >= 2 {
556 let cache_idx = messages.len() - 2;
557 if let Some(content) = messages[cache_idx]["content"].as_array_mut() {
558 if let Some(last_block) = content.last_mut() {
559 last_block["cache_control"] = serde_json::json!({"type": "ephemeral"});
560 }
561 }
562 }
563
564 let mut body = serde_json::json!({
565 "model": config.model_config.id,
566 "max_tokens": config.max_tokens.unwrap_or(8192),
567 "stream": true,
568 "messages": messages,
569 });
570
571 // Breakpoint 1: system prompt
572 if is_oauth {
573 let mut system_blocks = vec![serde_json::json!({
574 "type": "text",
575 "text": "You are Claude Code, Anthropic's official CLI for Claude.",
576 })];
577 if !config.system_prompt.is_empty() {
578 system_blocks.push(serde_json::json!({
579 "type": "text",
580 "text": config.system_prompt,
581 }));
582 }
583 // Cache the last system block
584 if caching_enabled && cache_system {
585 if let Some(last) = system_blocks.last_mut() {
586 last["cache_control"] = serde_json::json!({"type": "ephemeral"});
587 }
588 }
589 body["system"] = serde_json::json!(system_blocks);
590 } else if !config.system_prompt.is_empty() {
591 let mut block = serde_json::json!({
592 "type": "text",
593 "text": config.system_prompt,
594 });
595 if caching_enabled && cache_system {
596 block["cache_control"] = serde_json::json!({"type": "ephemeral"});
597 }
598 body["system"] = serde_json::json!([block]);
599 }
600
601 // Breakpoint 2: last tool definition (tools are stable between turns)
602 if !config.tools.is_empty() {
603 let mut tools: Vec<serde_json::Value> = config
604 .tools
605 .iter()
606 .map(|t| {
607 serde_json::json!({
608 "name": t.name,
609 "description": t.description,
610 "input_schema": t.parameters,
611 })
612 })
613 .collect();
614 if caching_enabled && cache_tools {
615 if let Some(last_tool) = tools.last_mut() {
616 last_tool["cache_control"] = serde_json::json!({"type": "ephemeral"});
617 }
618 }
619 body["tools"] = serde_json::json!(tools);
620 }
621
622 if config.thinking_level != ThinkingLevel::Off {
623 let budget = match config.thinking_level {
624 ThinkingLevel::Minimal => 128,
625 ThinkingLevel::Low => 512,
626 ThinkingLevel::Medium => 2048,
627 ThinkingLevel::High => 8192,
628 ThinkingLevel::Off => 0,
629 };
630 body["thinking"] = serde_json::json!({
631 "type": "enabled",
632 "budget_tokens": budget,
633 });
634 }
635
636 if let Some(temp) = config.temperature {
637 body["temperature"] = serde_json::json!(temp);
638 }
639
640 // Structured-output wiring. Anthropic has no native JSON-mode field; the
641 // canonical emulation is a synthetic `respond_json` tool with the user's
642 // schema (or an open object for JsonObject), plus `tool_choice` to force the
643 // LLM to call it. `Message::extract_json` looks for this exact tool name and
644 // pulls its arguments value as the parsed JSON.
645 match &config.response_format {
646 ResponseFormat::Text => {} // default; omit
647 ResponseFormat::JsonObject | ResponseFormat::JsonSchema { .. } => {
648 let (schema, description) = match &config.response_format {
649 ResponseFormat::JsonSchema { schema, name, .. } => (
650 schema.clone(),
651 format!("Return the response as a JSON object matching `{}`.", name),
652 ),
653 _ => (
654 serde_json::json!({"type": "object", "additionalProperties": true}),
655 "Return the response as a JSON object.".to_string(),
656 ),
657 };
658 let synthetic_tool = serde_json::json!({
659 "name": "respond_json",
660 "description": description,
661 "input_schema": schema,
662 });
663 let existing = body["tools"].as_array_mut();
664 match existing {
665 Some(arr) => arr.push(synthetic_tool),
666 None => body["tools"] = serde_json::json!([synthetic_tool]),
667 }
668 body["tool_choice"] = serde_json::json!({"type": "tool", "name": "respond_json"});
669 }
670 }
671
672 body
673}
674
675fn content_to_anthropic(content: &[Content]) -> Vec<serde_json::Value> {
676 content
677 .iter()
678 .map(|c| match c {
679 Content::Text { text } => serde_json::json!({"type": "text", "text": text}),
680 Content::Image { data, mime_type } => serde_json::json!({
681 "type": "image",
682 "source": {"type": "base64", "media_type": mime_type, "data": data},
683 }),
684 Content::Thinking {
685 thinking,
686 signature,
687 } => serde_json::json!({
688 "type": "thinking",
689 "thinking": thinking,
690 "signature": signature.as_deref().unwrap_or(""),
691 }),
692 Content::ToolCall {
693 id,
694 name,
695 arguments,
696 } => serde_json::json!({
697 "type": "tool_use",
698 "id": id,
699 "name": name,
700 "input": arguments,
701 }),
702 })
703 .collect()
704}
705
706// ---------------------------------------------------------------------------
707// Anthropic SSE event deserialization types (private to this module)
708// ---------------------------------------------------------------------------
709/*
710ARCHITECTURE: Private deserialization types — the "decoder ring" for Anthropic's JSON
711
712These structs mirror Anthropic's SSE event JSON shapes exactly. They exist only
713to deserialize event data — they're never stored or returned to callers.
714Using dedicated structs (vs parsing `serde_json::Value` fields manually) means:
715 - Compile-time field names (typos caught at build time)
716 - Automatic error handling (`serde_json::from_str` returns Err on shape mismatch)
717 - Self-documenting: the struct shows exactly what fields we expect
718
719RUST QUIRK: `#[derive(Deserialize)]` — serde auto-generates deserialization
720 The `Deserialize` derive reads field names from the struct definition.
721 It maps JSON key names to field names. If they don't match, use `#[serde(rename = "...")]`.
722
723RUST QUIRK: `#[serde(tag = "type")]` — "externally tagged" enum deserialization
724 When deserializing `AnthropicContentBlock`, serde looks at the JSON's "type" field
725 to decide which variant to construct:
726 {"type": "text", "text": "Hello"} → Text { text: "Hello" }
727 {"type": "tool_use", "id": ..., "name":...} → ToolUse { id: ..., name: ... }
728 This is the "internally tagged" enum pattern — the discriminant ("type") is a
729 field inside the JSON object, not wrapping the whole thing.
730
731RUST QUIRK: `#[allow(dead_code)]` — suppress "field never read" warnings
732 The `text` field of `AnthropicContentBlock::Text` is present in the JSON but
733 we don't need it (we initialize the content block with an empty string and fill
734 it via deltas). `#[allow(dead_code)]` tells the compiler "yes, I know, I don't care."
735
736RUST QUIRK: `#[serde(default)]` on struct fields
737 If a field is absent in JSON, serde uses `Default::default()` instead of failing.
738 For `u64`, `Default::default()` = `0`. This handles older API responses that
739 don't include all usage fields.
740*/
741
742#[derive(Deserialize)]
743struct AnthropicMessageStart {
744 message: AnthropicMessageInfo,
745}
746
747#[derive(Deserialize)]
748struct AnthropicMessageInfo {
749 usage: AnthropicUsage,
750}
751
752#[derive(Deserialize)]
753struct AnthropicUsage {
754 #[serde(default)]
755 input_tokens: u64,
756 #[serde(default)]
757 output_tokens: u64,
758 #[serde(default)]
759 cache_read_input_tokens: u64,
760 #[serde(default)]
761 cache_creation_input_tokens: u64,
762}
763
764#[derive(Deserialize)]
765struct AnthropicContentBlockStart {
766 index: u64,
767 content_block: AnthropicContentBlock,
768}
769
770/// Anthropic content block type (text, thinking, or tool_use).
771/// Dispatched by the "type" field in the JSON.
772#[derive(Deserialize)]
773#[serde(tag = "type")]
774enum AnthropicContentBlock {
775 #[serde(rename = "text")]
776 Text {
777 #[allow(dead_code)]
778 text: String, // initial text (empty in streaming; filled via TextDelta events)
779 },
780 #[serde(rename = "thinking")]
781 Thinking {
782 #[allow(dead_code)]
783 thinking: String, // initial thinking (empty in streaming)
784 },
785 #[serde(rename = "tool_use")]
786 ToolUse { id: String, name: String },
787}
788
789#[derive(Deserialize)]
790struct AnthropicContentBlockDelta {
791 index: u64,
792 delta: AnthropicDelta,
793}
794
795/// Delta variants for incremental content within a content block.
796/*
797RUST QUIRK: `#[allow(clippy::enum_variant_names)]` — suppress a clippy lint
798
799Clippy warns when all variants of an enum end with the same suffix
800(here: `TextDelta`, `ThinkingDelta`, `InputJsonDelta`, `SignatureDelta` all end in `Delta`).
801Clippy suggests removing the common suffix, but `Delta` is part of the Anthropic API
802terminology, and removing it would make the variants less clear.
803`#[allow(...)]` silences this specific lint for this item only.
804*/
805#[derive(Deserialize)]
806#[serde(tag = "type")]
807#[allow(clippy::enum_variant_names)]
808enum AnthropicDelta {
809 #[serde(rename = "text_delta")]
810 TextDelta { text: String },
811 #[serde(rename = "thinking_delta")]
812 ThinkingDelta { thinking: String },
813 #[serde(rename = "input_json_delta")]
814 InputJsonDelta { partial_json: String },
815 #[serde(rename = "signature_delta")]
816 SignatureDelta { signature: String },
817}
818
819#[derive(Deserialize)]
820struct AnthropicMessageDelta {
821 delta: AnthropicMessageDeltaInner,
822 usage: AnthropicUsage,
823}
824
825#[derive(Deserialize)]
826struct AnthropicMessageDeltaInner {
827 stop_reason: Option<String>,
828}
829
830#[cfg(test)]
831mod tests {
832 use super::*;
833 use crate::provider::traits::ToolDefinition;
834
835 fn make_config(cache: CacheConfig) -> StreamConfig {
836 StreamConfig {
837 model_config: crate::provider::ModelConfig::anthropic(
838 "claude-sonnet-4-20250514",
839 "Claude Sonnet 4",
840 "test-key",
841 ),
842 system_prompt: "You are helpful.".into(),
843 messages: vec![
844 Message::user("Hello"),
845 Message::User {
846 content: vec![Content::Text {
847 text: "What is 2+2?".into(),
848 }],
849 timestamp: 0,
850 },
851 ],
852 tools: vec![ToolDefinition {
853 name: "bash".into(),
854 description: "Run commands".into(),
855 parameters: serde_json::json!({"type": "object"}),
856 }],
857 thinking_level: ThinkingLevel::Off,
858 max_tokens: Some(1024),
859 temperature: None,
860 cache_config: cache,
861 response_format: ResponseFormat::Text,
862 }
863 }
864
865 #[test]
866 fn test_cache_auto_places_all_breakpoints() {
867 let body = build_request_body(&make_config(CacheConfig::default()), false);
868
869 // System prompt should have cache_control
870 let system = &body["system"][0];
871 assert_eq!(system["cache_control"]["type"], "ephemeral");
872
873 // Last tool should have cache_control
874 let tools = body["tools"].as_array().unwrap();
875 let last_tool = tools.last().unwrap();
876 assert_eq!(last_tool["cache_control"]["type"], "ephemeral");
877
878 // Second-to-last message should have cache_control
879 let msgs = body["messages"].as_array().unwrap();
880 let second_to_last = &msgs[msgs.len() - 2];
881 let content = second_to_last["content"].as_array().unwrap();
882 let last_block = content.last().unwrap();
883 assert_eq!(last_block["cache_control"]["type"], "ephemeral");
884 }
885
886 #[test]
887 fn test_cache_disabled_no_breakpoints() {
888 let config = CacheConfig {
889 enabled: false,
890 strategy: CacheStrategy::Auto,
891 };
892 let body = build_request_body(&make_config(config), false);
893
894 // System prompt should NOT have cache_control
895 let system = &body["system"][0];
896 assert!(system.get("cache_control").is_none());
897
898 // Tools should NOT have cache_control
899 let tools = body["tools"].as_array().unwrap();
900 assert!(tools.last().unwrap().get("cache_control").is_none());
901
902 // Messages should NOT have cache_control on any block
903 let msgs = body["messages"].as_array().unwrap();
904 for msg in msgs {
905 if let Some(content) = msg["content"].as_array() {
906 for block in content {
907 assert!(block.get("cache_control").is_none());
908 }
909 }
910 }
911 }
912
913 #[test]
914 fn test_cache_manual_system_only() {
915 let config = CacheConfig {
916 enabled: true,
917 strategy: CacheStrategy::Manual {
918 cache_system: true,
919 cache_tools: false,
920 cache_messages: false,
921 },
922 };
923 let body = build_request_body(&make_config(config), false);
924
925 // System: cached
926 assert_eq!(body["system"][0]["cache_control"]["type"], "ephemeral");
927 // Tools: not cached
928 assert!(body["tools"]
929 .as_array()
930 .unwrap()
931 .last()
932 .unwrap()
933 .get("cache_control")
934 .is_none());
935 // Messages: not cached
936 let msgs = body["messages"].as_array().unwrap();
937 let second = &msgs[msgs.len() - 2];
938 let content = second["content"].as_array().unwrap();
939 assert!(content.last().unwrap().get("cache_control").is_none());
940 }
941
942 #[test]
943 fn test_usage_cache_hit_rate() {
944 let usage = Usage {
945 input: 100,
946 output: 50,
947 reasoning: 0,
948 cache_read: 900,
949 cache_write: 0,
950 total_tokens: 1050,
951 };
952 let rate = usage.cache_hit_rate();
953 assert!((rate - 0.9).abs() < 0.001); // 900 / (100 + 900 + 0) = 0.9
954
955 let empty = Usage::default();
956 assert_eq!(empty.cache_hit_rate(), 0.0);
957 }
958
959 #[test]
960 fn test_tool_result_with_image() {
961 let config = StreamConfig {
962 model_config: crate::provider::ModelConfig::anthropic(
963 "claude-sonnet-4-20250514",
964 "Claude Sonnet 4",
965 "test-key",
966 ),
967 system_prompt: "".into(),
968 messages: vec![
969 Message::Assistant {
970 content: vec![Content::ToolCall {
971 id: "tc-1".into(),
972 name: "read_file".into(),
973 arguments: serde_json::json!({"path": "test.png"}),
974 }],
975 stop_reason: StopReason::ToolUse,
976 model: "test".into(),
977 provider: "test".into(),
978 usage: Usage::default(),
979 timestamp: 0,
980 error_message: None,
981 },
982 Message::ToolResult {
983 tool_call_id: "tc-1".into(),
984 tool_name: "read_file".into(),
985 content: vec![
986 Content::Text {
987 text: "screenshot".into(),
988 },
989 Content::Image {
990 data: "aW1hZ2VkYXRh".into(),
991 mime_type: "image/png".into(),
992 },
993 ],
994 is_error: false,
995 timestamp: 0,
996 },
997 ],
998 tools: vec![],
999 thinking_level: ThinkingLevel::Off,
1000 max_tokens: Some(1024),
1001 temperature: None,
1002 cache_config: CacheConfig {
1003 enabled: false,
1004 strategy: CacheStrategy::Disabled,
1005 },
1006 response_format: ResponseFormat::Text,
1007 };
1008
1009 let body = build_request_body(&config, false);
1010 let msgs = body["messages"].as_array().unwrap();
1011 // The ToolResult message (second message)
1012 let tool_msg = &msgs[1];
1013 let tool_result = &tool_msg["content"][0];
1014 assert_eq!(tool_result["type"], "tool_result");
1015 // content should be an array (not a string) since it has images
1016 let content = tool_result["content"].as_array().unwrap();
1017 assert_eq!(content[0]["type"], "text");
1018 assert_eq!(content[1]["type"], "image");
1019 assert_eq!(content[1]["source"]["media_type"], "image/png");
1020 }
1021
1022 #[test]
1023 fn test_tool_result_text_only_uses_string() {
1024 let config = StreamConfig {
1025 model_config: crate::provider::ModelConfig::anthropic(
1026 "claude-sonnet-4-20250514",
1027 "Claude Sonnet 4",
1028 "test-key",
1029 ),
1030 system_prompt: "".into(),
1031 messages: vec![
1032 Message::Assistant {
1033 content: vec![Content::ToolCall {
1034 id: "tc-1".into(),
1035 name: "bash".into(),
1036 arguments: serde_json::json!({"command": "echo hi"}),
1037 }],
1038 stop_reason: StopReason::ToolUse,
1039 model: "test".into(),
1040 provider: "test".into(),
1041 usage: Usage::default(),
1042 timestamp: 0,
1043 error_message: None,
1044 },
1045 Message::ToolResult {
1046 tool_call_id: "tc-1".into(),
1047 tool_name: "bash".into(),
1048 content: vec![Content::Text {
1049 text: "hello".into(),
1050 }],
1051 is_error: false,
1052 timestamp: 0,
1053 },
1054 ],
1055 tools: vec![],
1056 thinking_level: ThinkingLevel::Off,
1057 max_tokens: Some(1024),
1058 temperature: None,
1059 cache_config: CacheConfig {
1060 enabled: false,
1061 strategy: CacheStrategy::Disabled,
1062 },
1063 response_format: ResponseFormat::Text,
1064 };
1065
1066 let body = build_request_body(&config, false);
1067 let msgs = body["messages"].as_array().unwrap();
1068 let tool_result = &msgs[1]["content"][0];
1069 // Text-only: content should be a plain string
1070 assert_eq!(tool_result["content"], "hello");
1071 }
1072}