Skip to main content

tkach/providers/anthropic/
mod.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use eventsource_stream::Eventsource;
5use futures::StreamExt;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9use crate::error::ProviderError;
10use crate::message::{
11    CacheControl, Content, StopReason, ThinkingMetadata, ThinkingProvider, Usage,
12};
13use crate::provider::{
14    LlmProvider, Request, Response, SystemBlock, ThinkingConfig, ThinkingEffort,
15};
16use crate::stream::{ProviderEventStream, StreamEvent};
17
18pub mod batch;
19
20/// Default Anthropic API base URL. Override via [`Anthropic::with_base_url`]
21/// for testing against a mock server.
22pub const DEFAULT_BASE_URL: &str = "https://api.anthropic.com";
23pub(crate) const API_VERSION: &str = "2023-06-01";
24
25/// Anthropic LLM provider (Claude).
26pub struct Anthropic {
27    api_key: String,
28    client: reqwest::Client,
29    base_url: String,
30    thinking: Option<AnthropicThinkingConfig>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum AnthropicEffort {
35    Low,
36    Medium,
37    High,
38    XHigh,
39    Max,
40    Other(String),
41}
42
43impl AnthropicEffort {
44    pub fn as_wire(&self) -> &str {
45        match self {
46            Self::Low => "low",
47            Self::Medium => "medium",
48            Self::High => "high",
49            Self::XHigh => "xhigh",
50            Self::Max => "max",
51            Self::Other(value) => value.as_str(),
52        }
53    }
54}
55
56impl From<&str> for AnthropicEffort {
57    fn from(value: &str) -> Self {
58        if value.eq_ignore_ascii_case("low") {
59            Self::Low
60        } else if value.eq_ignore_ascii_case("medium") {
61            Self::Medium
62        } else if value.eq_ignore_ascii_case("high") {
63            Self::High
64        } else if value.eq_ignore_ascii_case("xhigh") {
65            Self::XHigh
66        } else if value.eq_ignore_ascii_case("max") {
67            Self::Max
68        } else {
69            Self::Other(value.to_string())
70        }
71    }
72}
73
74impl From<String> for AnthropicEffort {
75    fn from(value: String) -> Self {
76        if value.eq_ignore_ascii_case("low") {
77            Self::Low
78        } else if value.eq_ignore_ascii_case("medium") {
79            Self::Medium
80        } else if value.eq_ignore_ascii_case("high") {
81            Self::High
82        } else if value.eq_ignore_ascii_case("xhigh") {
83            Self::XHigh
84        } else if value.eq_ignore_ascii_case("max") {
85            Self::Max
86        } else {
87            Self::Other(value)
88        }
89    }
90}
91
92#[derive(Debug, Clone)]
93pub(crate) enum AnthropicThinkingConfig {
94    Manual {
95        budget_tokens: u32,
96    },
97    Adaptive {
98        effort: Option<AnthropicEffort>,
99        display: AnthropicThinkingDisplay,
100    },
101}
102
103#[derive(Debug, Clone, Copy)]
104pub(crate) enum AnthropicThinkingDisplay {
105    Summarized,
106}
107
108impl AnthropicThinkingDisplay {
109    fn as_wire_str(self) -> &'static str {
110        match self {
111            AnthropicThinkingDisplay::Summarized => "summarized",
112        }
113    }
114}
115
116impl Anthropic {
117    pub fn new(api_key: impl Into<String>) -> Self {
118        Self {
119            api_key: api_key.into(),
120            client: reqwest::Client::new(),
121            base_url: DEFAULT_BASE_URL.to_string(),
122            thinking: None,
123        }
124    }
125
126    /// Create from the `ANTHROPIC_API_KEY` environment variable.
127    pub fn from_env() -> Self {
128        let api_key =
129            std::env::var("ANTHROPIC_API_KEY").expect("ANTHROPIC_API_KEY env var is required");
130        Self::new(api_key)
131    }
132
133    /// Override the API base URL. Pass the **scheme + host** (no
134    /// trailing slash, no `/v1/...` path) — endpoints are appended
135    /// internally. Primarily useful for routing tests through a local
136    /// mock server (e.g. `wiremock`).
137    pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
138        self.base_url = base_url.into();
139        self
140    }
141
142    /// Enable Anthropic extended thinking with a manual token budget.
143    ///
144    /// Anthropic requires `budget_tokens >= 1024` and less than
145    /// `max_tokens`; invalid combinations are rejected by the API. When
146    /// enabled, tkach omits `temperature` because Anthropic marks it as
147    /// incompatible with thinking.
148    pub fn with_thinking_budget(mut self, budget_tokens: u32) -> Self {
149        self.thinking = Some(AnthropicThinkingConfig::Manual { budget_tokens });
150        self
151    }
152
153    /// Enable Anthropic adaptive thinking.
154    ///
155    /// Recommended by Anthropic for Claude Opus 4.7+, Opus 4.6, and
156    /// Sonnet 4.6. Adaptive mode lets Claude decide whether and how much
157    /// to think per request. tkach requests summarized display so
158    /// streaming consumers can receive positive `ThinkingDelta` events
159    /// on models whose API default is otherwise omitted thinking.
160    pub fn with_adaptive_thinking(mut self) -> Self {
161        self.thinking = Some(AnthropicThinkingConfig::Adaptive {
162            effort: None,
163            display: AnthropicThinkingDisplay::Summarized,
164        });
165        self
166    }
167
168    /// Enable adaptive thinking with explicit effort.
169    ///
170    /// Anthropic effort values are model-dependent; documented values are
171    /// `low`, `medium`, `high`, `xhigh`, and `max`.
172    pub fn with_adaptive_thinking_effort(mut self, effort: impl Into<AnthropicEffort>) -> Self {
173        self.thinking = Some(AnthropicThinkingConfig::Adaptive {
174            effort: Some(effort.into()),
175            display: AnthropicThinkingDisplay::Summarized,
176        });
177        self
178    }
179
180    /// Endpoint URL for `complete` / `stream` (`{base}/v1/messages`).
181    pub(crate) fn messages_url(&self) -> String {
182        format!("{}/v1/messages", self.base_url)
183    }
184
185    /// Endpoint root for batch operations (`{base}/v1/messages/batches`).
186    pub(crate) fn batches_url(&self) -> String {
187        format!("{}/v1/messages/batches", self.base_url)
188    }
189}
190
191#[async_trait]
192impl LlmProvider for Anthropic {
193    async fn stream(&self, request: Request) -> Result<ProviderEventStream, ProviderError> {
194        let mut body = build_request_body_with_thinking(
195            &request,
196            effective_thinking(&request, self.thinking.as_ref()),
197        );
198        body.stream = true;
199
200        let response = self
201            .client
202            .post(self.messages_url())
203            .header("x-api-key", &self.api_key)
204            .header("anthropic-version", API_VERSION)
205            .header("content-type", "application/json")
206            .header("accept", "text/event-stream")
207            .json(&body)
208            .send()
209            .await?;
210
211        let status = response.status().as_u16();
212
213        if status >= 400 {
214            // Pre-stream error: same classification as complete().
215            let retry_after_ms = parse_retry_after(response.headers());
216            let text = response.text().await.unwrap_or_default();
217            return Err(classify_error(status, text, retry_after_ms));
218        }
219
220        // Stream OK; wrap the byte stream in an SSE parser then walk our
221        // own state machine over Anthropic's event taxonomy.
222        let event_stream = response.bytes_stream().eventsource();
223        Ok(Box::pin(anthropic_event_stream(event_stream)))
224    }
225
226    async fn complete(&self, request: Request) -> Result<Response, ProviderError> {
227        let body = build_request_body_with_thinking(
228            &request,
229            effective_thinking(&request, self.thinking.as_ref()),
230        );
231
232        let response = self
233            .client
234            .post(self.messages_url())
235            .header("x-api-key", &self.api_key)
236            .header("anthropic-version", API_VERSION)
237            .header("content-type", "application/json")
238            .json(&body)
239            .send()
240            .await?;
241
242        let status = response.status().as_u16();
243
244        if status >= 400 {
245            let retry_after_ms = parse_retry_after(response.headers());
246            let text = response.text().await.unwrap_or_default();
247            return Err(classify_error(status, text, retry_after_ms));
248        }
249
250        let api_response: ApiResponse = response.json().await?;
251        Ok(convert_response(api_response))
252    }
253}
254
255/// Classify an Anthropic API error into a [`ProviderError`].
256///
257/// - 429 ⇒ `RateLimit`
258/// - 529, 503 ⇒ `Overloaded` (529 is Anthropic-specific; 503 is generic
259///   service unavailable — both are transient server-side pressure signals)
260/// - 500, 502, 504 ⇒ retryable `Api`
261/// - other 5xx ⇒ retryable `Api`, other 4xx ⇒ non-retryable `Api`
262pub(crate) fn classify_error(
263    status: u16,
264    message: String,
265    retry_after_ms: Option<u64>,
266) -> ProviderError {
267    match status {
268        429 => ProviderError::RateLimit { retry_after_ms },
269        529 | 503 => ProviderError::Overloaded { retry_after_ms },
270        500 | 502 | 504 => ProviderError::Api {
271            status,
272            message,
273            retryable: true,
274        },
275        s => ProviderError::Api {
276            status: s,
277            message,
278            retryable: (500..600).contains(&s),
279        },
280    }
281}
282
283pub(crate) fn parse_retry_after(headers: &reqwest::header::HeaderMap) -> Option<u64> {
284    let raw = headers.get(reqwest::header::RETRY_AFTER)?.to_str().ok()?;
285    // Spec allows either delay-seconds (integer) or HTTP-date. We only
286    // parse the integer form — OpenAI/Anthropic both use seconds in practice.
287    raw.trim().parse::<u64>().ok().map(|s| s * 1_000)
288}
289
290// --- Streaming SSE state machine ---
291//
292// Anthropic emits events in this lifecycle:
293//
294//   message_start            (carries initial input_tokens + cache fields
295//                             in usage)
296//   content_block_start[i]   (text or tool_use; tool_use has id, name)
297//   content_block_delta[i] * (text_delta or input_json_delta fragments)
298//   content_block_stop[i]    (close block; for tool_use we now emit
299//                             one atomic StreamEvent::ToolUse with
300//                             accumulated JSON parsed)
301//   ... more blocks ...
302//   message_delta            (final stop_reason + final output_tokens;
303//                             cache fields are NOT re-sent here)
304//   message_stop             (terminal)
305//
306// `ping` and unknown event types are ignored. `error` events surface
307// as `Err(ProviderError::Other)` items in the stream and terminate it.
308//
309// Cache token merge: per Anthropic docs, cache_creation_input_tokens
310// and cache_read_input_tokens arrive on `message_start` only. We
311// remember them on the stream state and re-stamp every emitted Usage
312// event with the running maximum, so a consumer that tracks "the
313// latest Usage" never observes the cache fields collapse to 0 on
314// message_delta. `merge_max` semantics are also exposed publicly via
315// `Usage::merge_max`.
316
317#[derive(Deserialize)]
318#[serde(tag = "type")]
319enum StreamingPayload {
320    #[serde(rename = "message_start")]
321    MessageStart {
322        #[serde(default)]
323        message: MessageStartPayload,
324    },
325    #[serde(rename = "content_block_start")]
326    ContentBlockStart {
327        index: usize,
328        content_block: ContentBlockStart,
329    },
330    #[serde(rename = "content_block_delta")]
331    ContentBlockDelta { index: usize, delta: BlockDelta },
332    #[serde(rename = "content_block_stop")]
333    ContentBlockStop { index: usize },
334    #[serde(rename = "message_delta")]
335    MessageDelta {
336        delta: MessageDeltaPayload,
337        #[serde(default)]
338        usage: Option<ApiUsage>,
339    },
340    #[serde(rename = "message_stop")]
341    MessageStop,
342    #[serde(rename = "ping")]
343    Ping,
344    #[serde(rename = "error")]
345    Error { error: ErrorPayload },
346}
347
348#[derive(Deserialize, Default)]
349struct MessageStartPayload {
350    #[serde(default)]
351    usage: Option<ApiUsage>,
352}
353
354#[derive(Deserialize)]
355#[serde(tag = "type")]
356enum ContentBlockStart {
357    #[serde(rename = "text")]
358    Text,
359    #[serde(rename = "thinking")]
360    Thinking {
361        #[serde(default)]
362        thinking: String,
363        #[serde(default)]
364        signature: String,
365    },
366    #[serde(rename = "redacted_thinking")]
367    RedactedThinking { data: String },
368    #[serde(rename = "tool_use")]
369    ToolUse {
370        id: String,
371        name: String,
372        #[serde(default)]
373        input: Value,
374    },
375}
376
377#[derive(Deserialize)]
378#[serde(tag = "type")]
379enum BlockDelta {
380    #[serde(rename = "text_delta")]
381    Text { text: String },
382    #[serde(rename = "thinking_delta")]
383    Thinking { thinking: String },
384    #[serde(rename = "signature_delta")]
385    Signature { signature: String },
386    #[serde(rename = "input_json_delta")]
387    InputJson { partial_json: String },
388}
389
390#[derive(Deserialize)]
391struct MessageDeltaPayload {
392    #[serde(default)]
393    stop_reason: Option<String>,
394}
395
396#[derive(Deserialize)]
397struct ErrorPayload {
398    #[serde(default, rename = "type")]
399    kind: String,
400    #[serde(default)]
401    message: String,
402}
403
404/// Per-block accumulation state. Text deltas are emitted live as
405/// `ContentDelta`; tool_use blocks instead buffer `partial_json`
406/// fragments and emit one atomic `ToolUse` event on `content_block_stop`.
407enum BlockState {
408    Text,
409    Thinking {
410        text_buf: String,
411        signature: String,
412    },
413    RedactedThinking {
414        data: String,
415    },
416    ToolUse {
417        id: String,
418        name: String,
419        json_buf: String,
420    },
421}
422
423/// Streaming state carried across `unfold` polls. Holds the running
424/// `Usage` so cache_creation/cache_read tokens observed on
425/// `message_start` survive into `message_delta` and any subsequent
426/// `Usage` emission.
427struct StreamState<S> {
428    sse: S,
429    blocks: HashMap<usize, BlockState>,
430    buffer: std::collections::VecDeque<Result<StreamEvent, ProviderError>>,
431    /// Running merged `Usage` — re-stamped onto each emitted Usage event.
432    usage: Usage,
433}
434
435fn anthropic_event_stream<S>(
436    sse: S,
437) -> impl futures::Stream<Item = Result<StreamEvent, ProviderError>>
438where
439    S: futures::Stream<
440            Item = Result<
441                eventsource_stream::Event,
442                eventsource_stream::EventStreamError<reqwest::Error>,
443            >,
444        > + Send
445        + Unpin
446        + 'static,
447{
448    use std::collections::VecDeque;
449
450    let initial = StreamState {
451        sse,
452        blocks: HashMap::new(),
453        buffer: VecDeque::new(),
454        usage: Usage::default(),
455    };
456
457    futures::stream::unfold(initial, |mut state| async move {
458        // Drain any pending events first — one SSE event can produce
459        // multiple StreamEvents (e.g. message_delta → MessageDelta + Usage).
460        loop {
461            if let Some(ev) = state.buffer.pop_front() {
462                return Some((ev, state));
463            }
464
465            let next = state.sse.next().await?;
466            let event = match next {
467                Ok(ev) => ev,
468                Err(e) => {
469                    let err = ProviderError::Other(format!("SSE read error: {e}"));
470                    return Some((Err(err), state));
471                }
472            };
473
474            // Ignore unknown event types (ping etc. that we choose not to
475            // type — eventsource-stream gives us them with .event set).
476            // Real Anthropic events all have payload.type matching event.event.
477            let payload: StreamingPayload = match serde_json::from_str(&event.data) {
478                Ok(p) => p,
479                Err(_) => continue, // unknown / malformed payload, skip
480            };
481
482            process_payload(
483                payload,
484                &mut state.blocks,
485                &mut state.buffer,
486                &mut state.usage,
487            );
488        }
489    })
490}
491
492fn process_payload(
493    payload: StreamingPayload,
494    blocks: &mut HashMap<usize, BlockState>,
495    buffer: &mut std::collections::VecDeque<Result<StreamEvent, ProviderError>>,
496    running: &mut Usage,
497) {
498    match payload {
499        StreamingPayload::MessageStart { message } => {
500            if let Some(usage) = message.usage {
501                running.merge_max(&usage_from_api(&usage));
502                buffer.push_back(Ok(StreamEvent::Usage(running.clone())));
503            }
504        }
505        StreamingPayload::ContentBlockStart {
506            index,
507            content_block,
508        } => {
509            let state = match content_block {
510                ContentBlockStart::Text => BlockState::Text,
511                ContentBlockStart::Thinking {
512                    thinking,
513                    signature,
514                } => BlockState::Thinking {
515                    text_buf: thinking,
516                    signature,
517                },
518                ContentBlockStart::RedactedThinking { data } => {
519                    BlockState::RedactedThinking { data }
520                }
521                ContentBlockStart::ToolUse { id, name, input } => BlockState::ToolUse {
522                    id,
523                    name,
524                    // Some Anthropic responses ship a partial input on start;
525                    // serialize it back as JSON to seed the buffer so later
526                    // input_json_delta concatenation yields a valid object.
527                    json_buf: if input.is_null() || input == Value::Object(Default::default()) {
528                        String::new()
529                    } else {
530                        serde_json::to_string(&input).unwrap_or_default()
531                    },
532                },
533            };
534            blocks.insert(index, state);
535        }
536        StreamingPayload::ContentBlockDelta { index, delta } => match delta {
537            BlockDelta::Text { text } => {
538                buffer.push_back(Ok(StreamEvent::ContentDelta(text)));
539            }
540            BlockDelta::Thinking { thinking } => {
541                if let Some(BlockState::Thinking { text_buf, .. }) = blocks.get_mut(&index) {
542                    text_buf.push_str(&thinking);
543                }
544                buffer.push_back(Ok(StreamEvent::ThinkingDelta { text: thinking }));
545            }
546            BlockDelta::Signature { signature } => {
547                if let Some(BlockState::Thinking { signature: sig, .. }) = blocks.get_mut(&index) {
548                    sig.push_str(&signature);
549                }
550            }
551            BlockDelta::InputJson { partial_json } => {
552                if let Some(BlockState::ToolUse { json_buf, .. }) = blocks.get_mut(&index) {
553                    json_buf.push_str(&partial_json);
554                }
555            }
556        },
557        StreamingPayload::ContentBlockStop { index } => {
558            if let Some(block) = blocks.remove(&index) {
559                match block {
560                    BlockState::Text => {}
561                    BlockState::Thinking {
562                        text_buf,
563                        signature,
564                    } => {
565                        buffer.push_back(Ok(StreamEvent::ThinkingBlock {
566                            text: text_buf,
567                            provider: ThinkingProvider::Anthropic,
568                            metadata: ThinkingMetadata::Anthropic {
569                                signature: (!signature.is_empty()).then_some(signature),
570                            },
571                        }));
572                    }
573                    BlockState::RedactedThinking { data } => {
574                        buffer.push_back(Ok(StreamEvent::ThinkingBlock {
575                            text: String::new(),
576                            provider: ThinkingProvider::Anthropic,
577                            metadata: ThinkingMetadata::AnthropicRedacted { data },
578                        }));
579                    }
580                    BlockState::ToolUse { id, name, json_buf } => {
581                        let input: Value = if json_buf.trim().is_empty() {
582                            Value::Object(Default::default())
583                        } else {
584                            serde_json::from_str(&json_buf)
585                                .unwrap_or_else(|_| Value::String(json_buf.clone()))
586                        };
587                        buffer.push_back(Ok(StreamEvent::ToolUse { id, name, input }));
588                    }
589                }
590            }
591        }
592        StreamingPayload::MessageDelta { delta, usage } => {
593            if let Some(stop) = delta.stop_reason {
594                let stop_reason = map_stop_reason(&stop);
595                buffer.push_back(Ok(StreamEvent::MessageDelta { stop_reason }));
596            }
597            if let Some(u) = usage {
598                running.merge_max(&usage_from_api(&u));
599                buffer.push_back(Ok(StreamEvent::Usage(running.clone())));
600            }
601        }
602        StreamingPayload::MessageStop => {
603            buffer.push_back(Ok(StreamEvent::Done));
604        }
605        StreamingPayload::Ping => {
606            // Anthropic uses ping to keep the connection alive; nothing to emit.
607        }
608        StreamingPayload::Error { error } => {
609            buffer.push_back(Err(ProviderError::Other(format!(
610                "anthropic stream error ({}): {}",
611                error.kind, error.message
612            ))));
613        }
614    }
615}
616
617fn map_stop_reason(s: &str) -> StopReason {
618    match s {
619        "end_turn" => StopReason::EndTurn,
620        "tool_use" => StopReason::ToolUse,
621        "max_tokens" => StopReason::MaxTokens,
622        "stop_sequence" => StopReason::StopSequence,
623        "pause_turn" => StopReason::PauseTurn,
624        _ => StopReason::EndTurn,
625    }
626}
627
628pub(crate) fn usage_from_api(api: &ApiUsage) -> Usage {
629    Usage {
630        input_tokens: api.input_tokens,
631        output_tokens: api.output_tokens,
632        cache_creation_input_tokens: api.cache_creation_input_tokens,
633        cache_read_input_tokens: api.cache_read_input_tokens,
634    }
635}
636
637// --- Anthropic API types ---
638
639#[derive(Serialize)]
640pub(crate) struct ApiRequest {
641    pub(crate) model: String,
642    pub(crate) max_tokens: u32,
643    pub(crate) messages: Vec<ApiMessage>,
644    /// Typed system blocks (Anthropic accepts either a free string or
645    /// an array of typed blocks; we always emit the array form so
646    /// `cache_control` works).
647    #[serde(skip_serializing_if = "Option::is_none")]
648    pub(crate) system: Option<Vec<ApiSystemBlock>>,
649    #[serde(skip_serializing_if = "Vec::is_empty")]
650    pub(crate) tools: Vec<ApiTool>,
651    #[serde(skip_serializing_if = "Option::is_none")]
652    pub(crate) temperature: Option<f32>,
653    #[serde(skip_serializing_if = "Option::is_none")]
654    pub(crate) thinking: Option<ApiThinkingConfig>,
655    #[serde(skip_serializing_if = "Option::is_none")]
656    pub(crate) output_config: Option<ApiOutputConfig>,
657    /// `stream: true` switches the response to SSE; default false for
658    /// `complete()`. Always false on the batch path.
659    #[serde(skip_serializing_if = "std::ops::Not::not")]
660    pub(crate) stream: bool,
661}
662
663#[derive(Serialize)]
664pub(crate) struct ApiThinkingConfig {
665    #[serde(rename = "type")]
666    pub(crate) kind: &'static str,
667    #[serde(skip_serializing_if = "Option::is_none")]
668    pub(crate) budget_tokens: Option<u32>,
669    #[serde(skip_serializing_if = "Option::is_none")]
670    pub(crate) display: Option<&'static str>,
671}
672
673#[derive(Serialize)]
674pub(crate) struct ApiOutputConfig {
675    pub(crate) effort: String,
676}
677
678#[derive(Serialize)]
679pub(crate) struct ApiMessage {
680    pub(crate) role: String,
681    pub(crate) content: Vec<ApiContent>,
682}
683
684#[derive(Serialize, Deserialize, Clone)]
685#[serde(tag = "type")]
686pub(crate) enum ApiContent {
687    #[serde(rename = "text")]
688    Text {
689        text: String,
690        #[serde(default, skip_serializing_if = "Option::is_none")]
691        cache_control: Option<CacheControl>,
692    },
693
694    #[serde(rename = "thinking")]
695    Thinking {
696        thinking: String,
697        #[serde(default, skip_serializing_if = "Option::is_none")]
698        signature: Option<String>,
699    },
700
701    #[serde(rename = "redacted_thinking")]
702    RedactedThinking { data: String },
703
704    #[serde(rename = "tool_use")]
705    ToolUse {
706        id: String,
707        name: String,
708        input: Value,
709    },
710
711    #[serde(rename = "tool_result")]
712    ToolResult {
713        tool_use_id: String,
714        content: String,
715        #[serde(default, skip_serializing_if = "std::ops::Not::not")]
716        is_error: bool,
717        #[serde(default, skip_serializing_if = "Option::is_none")]
718        cache_control: Option<CacheControl>,
719    },
720}
721
722#[derive(Serialize)]
723pub(crate) struct ApiSystemBlock {
724    /// Anthropic's typed system block has `type: "text"`.
725    #[serde(rename = "type")]
726    pub(crate) kind: &'static str,
727    pub(crate) text: String,
728    #[serde(skip_serializing_if = "Option::is_none")]
729    pub(crate) cache_control: Option<CacheControl>,
730}
731
732#[derive(Serialize)]
733pub(crate) struct ApiTool {
734    pub(crate) name: String,
735    pub(crate) description: String,
736    pub(crate) input_schema: Value,
737    #[serde(skip_serializing_if = "Option::is_none")]
738    pub(crate) cache_control: Option<CacheControl>,
739}
740
741#[derive(Deserialize)]
742pub(crate) struct ApiResponse {
743    pub(crate) content: Vec<ApiContent>,
744    pub(crate) stop_reason: String,
745    pub(crate) usage: ApiUsage,
746}
747
748#[derive(Deserialize)]
749pub(crate) struct ApiUsage {
750    pub(crate) input_tokens: u32,
751    pub(crate) output_tokens: u32,
752    #[serde(default)]
753    pub(crate) cache_creation_input_tokens: u32,
754    #[serde(default)]
755    pub(crate) cache_read_input_tokens: u32,
756}
757
758// --- Conversion ---
759
760#[cfg(test)]
761pub(crate) fn build_request_body(request: &Request) -> ApiRequest {
762    build_request_body_with_thinking(request, None)
763}
764
765pub(crate) fn build_request_body_with_thinking(
766    request: &Request,
767    thinking: Option<AnthropicThinkingConfig>,
768) -> ApiRequest {
769    let messages = request.messages.iter().filter_map(message_to_api).collect();
770
771    let tools = request
772        .tools
773        .iter()
774        .map(|t| ApiTool {
775            name: t.name.clone(),
776            description: t.description.clone(),
777            input_schema: t.input_schema.clone(),
778            cache_control: t.cache_control.clone(),
779        })
780        .collect();
781
782    let api_thinking = thinking.as_ref().map(api_thinking_config);
783    let output_config = thinking.as_ref().and_then(output_config);
784
785    ApiRequest {
786        model: request.model.clone(),
787        max_tokens: request.max_tokens,
788        messages,
789        system: request.system.as_ref().map(|blocks| {
790            blocks
791                .iter()
792                .map(|b: &SystemBlock| ApiSystemBlock {
793                    kind: "text",
794                    text: b.text.clone(),
795                    cache_control: b.cache_control.clone(),
796                })
797                .collect()
798        }),
799        tools,
800        temperature: thinking.is_none().then_some(request.temperature).flatten(),
801        thinking: api_thinking,
802        output_config,
803        stream: false,
804    }
805}
806
807fn effective_thinking(
808    request: &Request,
809    instance: Option<&AnthropicThinkingConfig>,
810) -> Option<AnthropicThinkingConfig> {
811    match &request.thinking {
812        Some(ThinkingConfig::Disabled) => None,
813        Some(ThinkingConfig::Budget(budget_tokens)) => Some(AnthropicThinkingConfig::Manual {
814            budget_tokens: *budget_tokens,
815        }),
816        Some(ThinkingConfig::Effort(effort)) => Some(AnthropicThinkingConfig::Adaptive {
817            effort: Some(map_thinking_effort(effort)),
818            display: AnthropicThinkingDisplay::Summarized,
819        }),
820        None => instance.cloned(),
821    }
822}
823
824fn map_thinking_effort(effort: &ThinkingEffort) -> AnthropicEffort {
825    match effort {
826        ThinkingEffort::Low => AnthropicEffort::Low,
827        ThinkingEffort::Medium => AnthropicEffort::Medium,
828        ThinkingEffort::High => AnthropicEffort::High,
829        ThinkingEffort::Other(value) => AnthropicEffort::from(value.as_str()),
830    }
831}
832
833fn api_thinking_config(config: &AnthropicThinkingConfig) -> ApiThinkingConfig {
834    match config {
835        AnthropicThinkingConfig::Manual { budget_tokens } => ApiThinkingConfig {
836            kind: "enabled",
837            budget_tokens: Some(*budget_tokens),
838            display: None,
839        },
840        AnthropicThinkingConfig::Adaptive { display, .. } => ApiThinkingConfig {
841            kind: "adaptive",
842            budget_tokens: None,
843            display: Some(display.as_wire_str()),
844        },
845    }
846}
847
848fn output_config(config: &AnthropicThinkingConfig) -> Option<ApiOutputConfig> {
849    match config {
850        AnthropicThinkingConfig::Adaptive {
851            effort: Some(effort),
852            ..
853        } => Some(ApiOutputConfig {
854            effort: effort.as_wire().to_string(),
855        }),
856        AnthropicThinkingConfig::Manual { .. }
857        | AnthropicThinkingConfig::Adaptive { effort: None, .. } => None,
858    }
859}
860
861fn message_to_api(msg: &crate::message::Message) -> Option<ApiMessage> {
862    let content: Vec<ApiContent> = msg.content.iter().filter_map(content_to_api).collect();
863    if content.is_empty() {
864        return None;
865    }
866
867    Some(ApiMessage {
868        role: match msg.role {
869            crate::message::Role::User => "user".to_string(),
870            crate::message::Role::Assistant => "assistant".to_string(),
871        },
872        content,
873    })
874}
875
876fn content_to_api(content: &Content) -> Option<ApiContent> {
877    match content {
878        Content::Text {
879            text,
880            cache_control,
881        } => Some(ApiContent::Text {
882            text: text.clone(),
883            cache_control: cache_control.clone(),
884        }),
885        Content::Thinking {
886            text,
887            provider: ThinkingProvider::Anthropic,
888            metadata: ThinkingMetadata::Anthropic { signature },
889        } => Some(ApiContent::Thinking {
890            thinking: text.clone(),
891            signature: signature.clone(),
892        }),
893        Content::Thinking {
894            provider: ThinkingProvider::Anthropic,
895            metadata: ThinkingMetadata::AnthropicRedacted { data },
896            ..
897        } => Some(ApiContent::RedactedThinking { data: data.clone() }),
898        Content::Thinking { .. } => None,
899        Content::ToolUse { id, name, input } => Some(ApiContent::ToolUse {
900            id: id.clone(),
901            name: name.clone(),
902            input: input.clone(),
903        }),
904        Content::ToolResult {
905            tool_use_id,
906            content,
907            is_error,
908            cache_control,
909        } => Some(ApiContent::ToolResult {
910            tool_use_id: tool_use_id.clone(),
911            content: content.clone(),
912            is_error: *is_error,
913            cache_control: cache_control.clone(),
914        }),
915    }
916}
917
918pub(crate) fn convert_response(api: ApiResponse) -> Response {
919    let content = api
920        .content
921        .into_iter()
922        .map(|c| match c {
923            ApiContent::Text {
924                text,
925                cache_control,
926            } => Content::Text {
927                text,
928                cache_control,
929            },
930            ApiContent::Thinking {
931                thinking,
932                signature,
933            } => Content::Thinking {
934                text: thinking,
935                provider: ThinkingProvider::Anthropic,
936                metadata: ThinkingMetadata::Anthropic { signature },
937            },
938            ApiContent::RedactedThinking { data } => Content::Thinking {
939                text: String::new(),
940                provider: ThinkingProvider::Anthropic,
941                metadata: ThinkingMetadata::AnthropicRedacted { data },
942            },
943            ApiContent::ToolUse { id, name, input } => Content::ToolUse { id, name, input },
944            ApiContent::ToolResult {
945                tool_use_id,
946                content,
947                is_error,
948                cache_control,
949            } => Content::ToolResult {
950                tool_use_id,
951                content,
952                is_error,
953                cache_control,
954            },
955        })
956        .collect();
957
958    let stop_reason = map_stop_reason(&api.stop_reason);
959
960    Response {
961        content,
962        stop_reason,
963        usage: usage_from_api(&api.usage),
964    }
965}
966
967#[cfg(test)]
968mod tests {
969    use super::*;
970    use crate::message::{CacheTtl, Message};
971    use crate::provider::ToolDefinition;
972    use serde_json::json;
973
974    fn req_with_system(blocks: Vec<SystemBlock>) -> Request {
975        Request {
976            model: "claude-test".into(),
977            system: Some(blocks),
978            messages: vec![Message::user_text("hi")],
979            tools: vec![],
980            max_tokens: 100,
981            temperature: None,
982            thinking: None,
983        }
984    }
985
986    fn system_blocks_json(blocks: Vec<SystemBlock>) -> serde_json::Value {
987        let req = req_with_system(blocks);
988        let body = build_request_body(&req);
989        serde_json::to_value(&body).unwrap()
990    }
991
992    #[test]
993    fn anthropic_effort_variants_match_wire_names() {
994        assert_eq!(AnthropicEffort::Low.as_wire(), "low");
995        assert_eq!(AnthropicEffort::Medium.as_wire(), "medium");
996        assert_eq!(AnthropicEffort::High.as_wire(), "high");
997        assert_eq!(AnthropicEffort::XHigh.as_wire(), "xhigh");
998        assert_eq!(AnthropicEffort::Max.as_wire(), "max");
999        assert_eq!(AnthropicEffort::Other("ultra".into()).as_wire(), "ultra");
1000    }
1001
1002    #[test]
1003    fn anthropic_effort_from_string_is_case_insensitive_and_preserves_other() {
1004        assert_eq!(AnthropicEffort::from("XHIGH"), AnthropicEffort::XHigh);
1005        assert_eq!(AnthropicEffort::from("MAX"), AnthropicEffort::Max);
1006        assert_eq!(
1007            AnthropicEffort::from("ultra"),
1008            AnthropicEffort::Other("ultra".into())
1009        );
1010        assert_eq!(
1011            AnthropicEffort::from(String::from("ULTRA")),
1012            AnthropicEffort::Other("ULTRA".into())
1013        );
1014    }
1015
1016    #[test]
1017    fn system_blocks_serialize_as_typed_array() {
1018        let json = system_blocks_json(vec![
1019            SystemBlock::text("base instructions"),
1020            SystemBlock::cached("long stable context"),
1021        ]);
1022        let arr = json["system"].as_array().expect("system should be array");
1023        assert_eq!(arr.len(), 2);
1024        assert_eq!(arr[0]["type"], "text");
1025        assert_eq!(arr[1]["type"], "text");
1026    }
1027
1028    #[test]
1029    fn system_blocks_serialize_text_payloads() {
1030        let json = system_blocks_json(vec![
1031            SystemBlock::text("base instructions"),
1032            SystemBlock::cached("long stable context"),
1033        ]);
1034        assert_eq!(json["system"][0]["text"], "base instructions");
1035        assert_eq!(json["system"][1]["text"], "long stable context");
1036    }
1037
1038    #[test]
1039    fn system_blocks_serialize_cache_control_only_when_set() {
1040        let json = system_blocks_json(vec![
1041            SystemBlock::text("base instructions"),
1042            SystemBlock::cached("long stable context"),
1043        ]);
1044        assert!(json["system"][0].get("cache_control").is_none());
1045        assert_eq!(json["system"][1]["cache_control"]["type"], "ephemeral");
1046    }
1047
1048    #[test]
1049    fn system_blocks_with_one_hour_ttl_serialize_inline() {
1050        let req = req_with_system(vec![SystemBlock::cached_1h("long-lived prefix")]);
1051        let body = build_request_body(&req);
1052        let json = serde_json::to_value(&body).unwrap();
1053        let cc = &json["system"][0]["cache_control"];
1054        assert_eq!(cc["type"], "ephemeral");
1055        assert_eq!(cc["ttl"], "1h");
1056    }
1057
1058    #[test]
1059    fn tool_definition_cache_control_threads_to_api_tool() {
1060        let req = Request {
1061            model: "m".into(),
1062            system: None,
1063            messages: vec![Message::user_text("hi")],
1064            tools: vec![
1065                ToolDefinition {
1066                    name: "first".into(),
1067                    description: "first tool".into(),
1068                    input_schema: json!({"type":"object"}),
1069                    cache_control: None,
1070                },
1071                ToolDefinition {
1072                    name: "last".into(),
1073                    description: "last tool".into(),
1074                    input_schema: json!({"type":"object"}),
1075                    cache_control: Some(CacheControl::ephemeral()),
1076                },
1077            ],
1078            max_tokens: 10,
1079            temperature: None,
1080            thinking: None,
1081        };
1082        let body = build_request_body(&req);
1083        let json = serde_json::to_value(&body).unwrap();
1084        let tools = json["tools"].as_array().unwrap();
1085        assert!(tools[0].get("cache_control").is_none());
1086        assert_eq!(tools[1]["cache_control"]["type"], "ephemeral");
1087    }
1088
1089    #[test]
1090    fn content_text_cache_control_threads_through() {
1091        let req = Request {
1092            model: "m".into(),
1093            system: None,
1094            messages: vec![Message::user(vec![Content::Text {
1095                text: "stable user prefix".into(),
1096                cache_control: Some(CacheControl::Ephemeral {
1097                    ttl: Some(CacheTtl::FiveMinutes),
1098                }),
1099            }])],
1100            tools: vec![],
1101            max_tokens: 10,
1102            temperature: None,
1103            thinking: None,
1104        };
1105        let body = build_request_body(&req);
1106        let json = serde_json::to_value(&body).unwrap();
1107        let block = &json["messages"][0]["content"][0];
1108        assert_eq!(block["type"], "text");
1109        assert_eq!(block["cache_control"]["type"], "ephemeral");
1110        assert_eq!(block["cache_control"]["ttl"], "5m");
1111    }
1112
1113    #[test]
1114    fn tool_result_cache_control_threads_through() {
1115        let req = Request {
1116            model: "m".into(),
1117            system: None,
1118            messages: vec![Message::user(vec![Content::ToolResult {
1119                tool_use_id: "t1".into(),
1120                content: "long output".into(),
1121                is_error: false,
1122                cache_control: Some(CacheControl::ephemeral()),
1123            }])],
1124            tools: vec![],
1125            max_tokens: 10,
1126            temperature: None,
1127            thinking: None,
1128        };
1129        let body = build_request_body(&req);
1130        let json = serde_json::to_value(&body).unwrap();
1131        let block = &json["messages"][0]["content"][0];
1132        assert_eq!(block["type"], "tool_result");
1133        assert_eq!(block["cache_control"]["type"], "ephemeral");
1134    }
1135
1136    #[test]
1137    fn request_with_thinking_budget_serializes_top_level_thinking() {
1138        let req = Request {
1139            model: "m".into(),
1140            system: None,
1141            messages: vec![Message::user_text("solve")],
1142            tools: vec![],
1143            max_tokens: 2048,
1144            temperature: None,
1145            thinking: None,
1146        };
1147        let body = build_request_body_with_thinking(
1148            &req,
1149            Some(AnthropicThinkingConfig::Manual {
1150                budget_tokens: 1024,
1151            }),
1152        );
1153        let json = serde_json::to_value(&body).unwrap();
1154
1155        assert_eq!(json["thinking"]["type"], "enabled");
1156        assert_eq!(json["thinking"]["budget_tokens"], 1024);
1157        assert!(json["thinking"].get("display").is_none());
1158    }
1159
1160    #[test]
1161    fn request_with_adaptive_thinking_serializes_display_and_effort() {
1162        let req = Request {
1163            model: "m".into(),
1164            system: None,
1165            messages: vec![Message::user_text("solve")],
1166            tools: vec![],
1167            max_tokens: 2048,
1168            temperature: None,
1169            thinking: None,
1170        };
1171        let body = build_request_body_with_thinking(
1172            &req,
1173            Some(AnthropicThinkingConfig::Adaptive {
1174                effort: Some(AnthropicEffort::High),
1175                display: AnthropicThinkingDisplay::Summarized,
1176            }),
1177        );
1178        let json = serde_json::to_value(&body).unwrap();
1179
1180        assert_eq!(json["thinking"]["type"], "adaptive");
1181        assert_eq!(json["thinking"]["display"], "summarized");
1182        assert!(json["thinking"].get("budget_tokens").is_none());
1183        assert_eq!(json["output_config"]["effort"], "high");
1184    }
1185
1186    #[test]
1187    fn request_with_thinking_omits_temperature() {
1188        let req = Request {
1189            model: "m".into(),
1190            system: None,
1191            messages: vec![Message::user_text("solve")],
1192            tools: vec![],
1193            max_tokens: 2048,
1194            temperature: Some(0.2),
1195            thinking: None,
1196        };
1197        let body = build_request_body_with_thinking(
1198            &req,
1199            Some(AnthropicThinkingConfig::Manual {
1200                budget_tokens: 1024,
1201            }),
1202        );
1203        let json = serde_json::to_value(&body).unwrap();
1204
1205        assert!(json.get("temperature").is_none());
1206    }
1207
1208    #[test]
1209    fn anthropic_thinking_content_serializes_with_signature() {
1210        let req = Request {
1211            model: "m".into(),
1212            system: None,
1213            messages: vec![Message::assistant(vec![
1214                Content::thinking(
1215                    "reason",
1216                    ThinkingProvider::Anthropic,
1217                    ThinkingMetadata::anthropic(Some("sig".into())),
1218                ),
1219                Content::text("visible"),
1220            ])],
1221            tools: vec![],
1222            max_tokens: 10,
1223            temperature: None,
1224            thinking: None,
1225        };
1226        let body = build_request_body(&req);
1227        let json = serde_json::to_value(&body).unwrap();
1228        let content = json["messages"][0]["content"].as_array().unwrap();
1229
1230        assert_eq!(content[0]["type"], "thinking");
1231        assert_eq!(content[0]["thinking"], "reason");
1232        assert_eq!(content[0]["signature"], "sig");
1233        assert_eq!(content[1]["type"], "text");
1234        assert_eq!(content[1]["text"], "visible");
1235    }
1236
1237    #[test]
1238    fn anthropic_redacted_thinking_content_round_trips() {
1239        let req = Request {
1240            model: "m".into(),
1241            system: None,
1242            messages: vec![Message::assistant(vec![Content::thinking(
1243                "",
1244                ThinkingProvider::Anthropic,
1245                ThinkingMetadata::anthropic_redacted("opaque"),
1246            )])],
1247            tools: vec![],
1248            max_tokens: 10,
1249            temperature: None,
1250            thinking: None,
1251        };
1252        let body = build_request_body(&req);
1253        let json = serde_json::to_value(&body).unwrap();
1254        let block = &json["messages"][0]["content"][0];
1255        assert_eq!(block["type"], "redacted_thinking");
1256        assert_eq!(block["data"], "opaque");
1257
1258        let raw = json!({
1259            "content": [{"type":"redacted_thinking","data":"opaque"}],
1260            "stop_reason": "end_turn",
1261            "usage": {"input_tokens": 1, "output_tokens": 1}
1262        });
1263        let api: ApiResponse = serde_json::from_value(raw).unwrap();
1264        assert!(matches!(
1265            &convert_response(api).content[0],
1266            Content::Thinking {
1267                text,
1268                provider: ThinkingProvider::Anthropic,
1269                metadata: ThinkingMetadata::AnthropicRedacted { data },
1270            } if text.is_empty() && data == "opaque"
1271        ));
1272    }
1273
1274    #[test]
1275    fn foreign_thinking_only_message_is_not_serialized_as_empty_message() {
1276        let req = Request {
1277            model: "m".into(),
1278            system: None,
1279            messages: vec![
1280                Message::assistant(vec![Content::thinking(
1281                    "foreign",
1282                    ThinkingProvider::OpenAIResponses,
1283                    ThinkingMetadata::openai_responses(Some("rs_1".into()), None, 0, None),
1284                )]),
1285                Message::user_text("next"),
1286            ],
1287            tools: vec![],
1288            max_tokens: 10,
1289            temperature: None,
1290            thinking: None,
1291        };
1292        let body = build_request_body(&req);
1293        let json = serde_json::to_value(&body).unwrap();
1294        let messages = json["messages"].as_array().unwrap();
1295
1296        assert_eq!(messages.len(), 1);
1297        assert_eq!(messages[0]["role"], "user");
1298        assert_eq!(messages[0]["content"][0]["text"], "next");
1299    }
1300
1301    #[test]
1302    fn anthropic_provider_with_mismatched_metadata_is_dropped() {
1303        let req = Request {
1304            model: "m".into(),
1305            system: None,
1306            messages: vec![
1307                Message::assistant(vec![Content::thinking(
1308                    "bad metadata",
1309                    ThinkingProvider::Anthropic,
1310                    ThinkingMetadata::None,
1311                )]),
1312                Message::user_text("next"),
1313            ],
1314            tools: vec![],
1315            max_tokens: 10,
1316            temperature: None,
1317            thinking: None,
1318        };
1319        let body = build_request_body(&req);
1320        let json = serde_json::to_value(&body).unwrap();
1321        let messages = json["messages"].as_array().unwrap();
1322
1323        assert_eq!(messages.len(), 1);
1324        assert_eq!(messages[0]["role"], "user");
1325        assert_eq!(messages[0]["content"][0]["text"], "next");
1326    }
1327
1328    #[test]
1329    fn response_with_cache_usage_parses_all_four_fields() {
1330        let raw = json!({
1331            "content": [{"type":"text","text":"ok"}],
1332            "stop_reason": "end_turn",
1333            "usage": {
1334                "input_tokens": 10,
1335                "output_tokens": 5,
1336                "cache_creation_input_tokens": 100,
1337                "cache_read_input_tokens": 200
1338            }
1339        });
1340        let api: ApiResponse = serde_json::from_value(raw).unwrap();
1341        let resp = convert_response(api);
1342        assert_eq!(resp.usage.input_tokens, 10);
1343        assert_eq!(resp.usage.output_tokens, 5);
1344        assert_eq!(resp.usage.cache_creation_input_tokens, 100);
1345        assert_eq!(resp.usage.cache_read_input_tokens, 200);
1346    }
1347
1348    #[test]
1349    fn response_without_cache_usage_defaults_to_zero() {
1350        let raw = json!({
1351            "content": [{"type":"text","text":"ok"}],
1352            "stop_reason": "end_turn",
1353            "usage": {"input_tokens": 1, "output_tokens": 1}
1354        });
1355        let api: ApiResponse = serde_json::from_value(raw).unwrap();
1356        let resp = convert_response(api);
1357        assert_eq!(resp.usage.cache_creation_input_tokens, 0);
1358        assert_eq!(resp.usage.cache_read_input_tokens, 0);
1359    }
1360
1361    #[test]
1362    fn streaming_thinking_delta_and_signature_emit_final_block() {
1363        use std::collections::VecDeque;
1364        let mut blocks: HashMap<usize, BlockState> = HashMap::new();
1365        let mut buffer: VecDeque<Result<StreamEvent, ProviderError>> = VecDeque::new();
1366        let mut running = Usage::default();
1367
1368        process_payload(
1369            StreamingPayload::ContentBlockStart {
1370                index: 0,
1371                content_block: ContentBlockStart::Thinking {
1372                    thinking: String::new(),
1373                    signature: String::new(),
1374                },
1375            },
1376            &mut blocks,
1377            &mut buffer,
1378            &mut running,
1379        );
1380        process_payload(
1381            StreamingPayload::ContentBlockDelta {
1382                index: 0,
1383                delta: BlockDelta::Thinking {
1384                    thinking: "reason".into(),
1385                },
1386            },
1387            &mut blocks,
1388            &mut buffer,
1389            &mut running,
1390        );
1391        process_payload(
1392            StreamingPayload::ContentBlockDelta {
1393                index: 0,
1394                delta: BlockDelta::Signature {
1395                    signature: "sig".into(),
1396                },
1397            },
1398            &mut blocks,
1399            &mut buffer,
1400            &mut running,
1401        );
1402        process_payload(
1403            StreamingPayload::ContentBlockStop { index: 0 },
1404            &mut blocks,
1405            &mut buffer,
1406            &mut running,
1407        );
1408
1409        assert!(matches!(
1410            buffer.pop_front().unwrap().unwrap(),
1411            StreamEvent::ThinkingDelta { text } if text == "reason"
1412        ));
1413        assert!(matches!(
1414            buffer.pop_front().unwrap().unwrap(),
1415            StreamEvent::ThinkingBlock {
1416                text,
1417                provider: ThinkingProvider::Anthropic,
1418                metadata: ThinkingMetadata::Anthropic {
1419                    signature: Some(signature),
1420                },
1421            } if text == "reason" && signature == "sig"
1422        ));
1423    }
1424
1425    #[test]
1426    fn streaming_redacted_thinking_emits_final_block() {
1427        use std::collections::VecDeque;
1428        let mut blocks: HashMap<usize, BlockState> = HashMap::new();
1429        let mut buffer: VecDeque<Result<StreamEvent, ProviderError>> = VecDeque::new();
1430        let mut running = Usage::default();
1431
1432        process_payload(
1433            StreamingPayload::ContentBlockStart {
1434                index: 0,
1435                content_block: ContentBlockStart::RedactedThinking {
1436                    data: "opaque".into(),
1437                },
1438            },
1439            &mut blocks,
1440            &mut buffer,
1441            &mut running,
1442        );
1443        process_payload(
1444            StreamingPayload::ContentBlockStop { index: 0 },
1445            &mut blocks,
1446            &mut buffer,
1447            &mut running,
1448        );
1449
1450        assert!(matches!(
1451            buffer.pop_front().unwrap().unwrap(),
1452            StreamEvent::ThinkingBlock {
1453                text,
1454                provider: ThinkingProvider::Anthropic,
1455                metadata: ThinkingMetadata::AnthropicRedacted { data },
1456            } if text.is_empty() && data == "opaque"
1457        ));
1458        assert!(buffer.is_empty());
1459    }
1460
1461    #[test]
1462    fn streaming_signature_without_thinking_delta_preserves_empty_block() {
1463        use std::collections::VecDeque;
1464        let mut blocks: HashMap<usize, BlockState> = HashMap::new();
1465        let mut buffer: VecDeque<Result<StreamEvent, ProviderError>> = VecDeque::new();
1466        let mut running = Usage::default();
1467
1468        process_payload(
1469            StreamingPayload::ContentBlockStart {
1470                index: 0,
1471                content_block: ContentBlockStart::Thinking {
1472                    thinking: String::new(),
1473                    signature: String::new(),
1474                },
1475            },
1476            &mut blocks,
1477            &mut buffer,
1478            &mut running,
1479        );
1480        process_payload(
1481            StreamingPayload::ContentBlockDelta {
1482                index: 0,
1483                delta: BlockDelta::Signature {
1484                    signature: "sig-only".into(),
1485                },
1486            },
1487            &mut blocks,
1488            &mut buffer,
1489            &mut running,
1490        );
1491        process_payload(
1492            StreamingPayload::ContentBlockStop { index: 0 },
1493            &mut blocks,
1494            &mut buffer,
1495            &mut running,
1496        );
1497
1498        assert!(matches!(
1499            buffer.pop_front().unwrap().unwrap(),
1500            StreamEvent::ThinkingBlock {
1501                text,
1502                provider: ThinkingProvider::Anthropic,
1503                metadata: ThinkingMetadata::Anthropic {
1504                    signature: Some(signature),
1505                },
1506            } if text.is_empty() && signature == "sig-only"
1507        ));
1508        assert!(buffer.is_empty());
1509    }
1510
1511    #[test]
1512    fn streaming_usage_merges_cache_fields_across_message_start_and_delta() {
1513        // message_start carries cache fields; message_delta does not.
1514        // The running Usage must preserve cache fields through to the
1515        // final emitted Usage event.
1516        use std::collections::VecDeque;
1517        let mut blocks: HashMap<usize, BlockState> = HashMap::new();
1518        let mut buffer: VecDeque<Result<StreamEvent, ProviderError>> = VecDeque::new();
1519        let mut running = Usage::default();
1520
1521        let start = StreamingPayload::MessageStart {
1522            message: MessageStartPayload {
1523                usage: Some(ApiUsage {
1524                    input_tokens: 50,
1525                    output_tokens: 0,
1526                    cache_creation_input_tokens: 0,
1527                    cache_read_input_tokens: 1000,
1528                }),
1529            },
1530        };
1531        process_payload(start, &mut blocks, &mut buffer, &mut running);
1532
1533        let delta = StreamingPayload::MessageDelta {
1534            delta: MessageDeltaPayload {
1535                stop_reason: Some("end_turn".into()),
1536            },
1537            usage: Some(ApiUsage {
1538                input_tokens: 50,
1539                output_tokens: 75,
1540                // Crucially: API does NOT re-send cache fields here.
1541                // serde(default) gives 0 — verifying merge_max keeps
1542                // the prior 1000.
1543                cache_creation_input_tokens: 0,
1544                cache_read_input_tokens: 0,
1545            }),
1546        };
1547        process_payload(delta, &mut blocks, &mut buffer, &mut running);
1548
1549        // Drain Usage events; final emitted event should carry the
1550        // merged values.
1551        let usages: Vec<Usage> = buffer
1552            .into_iter()
1553            .filter_map(|r| match r.ok()? {
1554                StreamEvent::Usage(u) => Some(u),
1555                _ => None,
1556            })
1557            .collect();
1558        assert_eq!(usages.len(), 2);
1559        let last = usages.last().unwrap();
1560        assert_eq!(last.input_tokens, 50);
1561        assert_eq!(last.output_tokens, 75);
1562        assert_eq!(last.cache_read_input_tokens, 1000);
1563    }
1564}
1565
1566#[cfg(test)]
1567mod thinking_override_tests {
1568    use super::*;
1569    use crate::provider::{ThinkingConfig, ThinkingEffort};
1570
1571    fn request(thinking: Option<ThinkingConfig>) -> Request {
1572        Request {
1573            model: "claude-test".into(),
1574            system: None,
1575            messages: vec![],
1576            tools: vec![],
1577            max_tokens: 100,
1578            temperature: Some(0.2),
1579            thinking,
1580        }
1581    }
1582
1583    #[test]
1584    fn request_budget_overrides_instance_default() {
1585        let req = request(Some(ThinkingConfig::Budget(2048)));
1586        let effective = effective_thinking(
1587            &req,
1588            Some(&AnthropicThinkingConfig::Adaptive {
1589                effort: Some(AnthropicEffort::Low),
1590                display: AnthropicThinkingDisplay::Summarized,
1591            }),
1592        );
1593        let body = build_request_body_with_thinking(&req, effective);
1594        let json = serde_json::to_value(body).unwrap();
1595        assert_eq!(json["thinking"]["type"], "enabled");
1596        assert_eq!(json["thinking"]["budget_tokens"], 2048);
1597        assert!(json.get("temperature").is_none());
1598    }
1599
1600    #[test]
1601    fn request_effort_other_uses_anthropic_parse() {
1602        let req = request(Some(ThinkingConfig::Effort(ThinkingEffort::Other(
1603            "XHIGH".into(),
1604        ))));
1605        let body = build_request_body_with_thinking(&req, effective_thinking(&req, None));
1606        let json = serde_json::to_value(body).unwrap();
1607        assert_eq!(json["thinking"]["type"], "adaptive");
1608        assert_eq!(json["output_config"]["effort"], "xhigh");
1609    }
1610
1611    #[test]
1612    fn request_disabled_omits_instance_default() {
1613        let req = request(Some(ThinkingConfig::Disabled));
1614        let body = build_request_body_with_thinking(
1615            &req,
1616            effective_thinking(
1617                &req,
1618                Some(&AnthropicThinkingConfig::Manual {
1619                    budget_tokens: 1024,
1620                }),
1621            ),
1622        );
1623        let json = serde_json::to_value(body).unwrap();
1624        assert!(json.get("thinking").is_none());
1625        assert!(
1626            json["temperature"]
1627                .as_f64()
1628                .is_some_and(|v| (v - 0.2).abs() < 1e-6)
1629        );
1630    }
1631}