Skip to main content

entelix_core/codecs/
anthropic.rs

1//! `AnthropicMessagesCodec` — IR ⇄ Anthropic Messages API
2//! (`POST /v1/messages`).
3//!
4//! Wire format reference: <https://docs.anthropic.com/en/api/messages>.
5//!
6//! Notable mappings:
7//!
8//! - IR `system: Option<String>` → top-level `system` field.
9//! - IR `Role::System` messages → flattened into `system` (Anthropic has no
10//!   system role inside `messages`). Multiple system inputs are joined with
11//!   blank lines.
12//! - IR `Role::Tool` messages → `role: "user"` with `tool_result` blocks
13//!   (Anthropic encodes tool replies as user-authored content).
14//! - IR `ToolChoice` → `tool_choice` object (`auto` / `any` / `tool` /
15//!   `none`).
16//! - IR `Usage` cache fields ↔ Anthropic
17//!   `cache_creation_input_tokens` / `cache_read_input_tokens`.
18//!
19//! Any feature the codec cannot preserve emits a
20//! [`crate::ir::ModelWarning::LossyEncode`] (invariant 6).
21
22#![allow(clippy::cast_possible_truncation)] // u64 → u32 token counts; saturates above
23
24use std::collections::HashMap;
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{BoxByteStream, BoxDeltaStream, Codec, EncodedRequest};
31use crate::error::{Error, Result};
32use crate::ir::{
33    Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
34    ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, RefusalReason,
35    ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent, Usage,
36};
37use crate::rate_limit::RateLimitSnapshot;
38use crate::stream::StreamDelta;
39
40/// Provider key for [`AnthropicMessagesCodec`] — matches `Codec::name`
41/// and identifies this vendor's entries in [`ProviderEchoSnapshot`].
42const PROVIDER_KEY: &str = "anthropic-messages";
43
44const ANTHROPIC_VERSION: &str = "2023-06-01";
45
46/// Stateless codec for Anthropic's Messages API.
47///
48/// Construct once at agent build time and reuse — there is no internal
49/// state, just static behaviour.
50#[derive(Clone, Copy, Debug, Default)]
51pub struct AnthropicMessagesCodec;
52
53impl AnthropicMessagesCodec {
54    /// Create a fresh codec instance.
55    pub const fn new() -> Self {
56        Self
57    }
58}
59
60impl Codec for AnthropicMessagesCodec {
61    fn name(&self) -> &'static str {
62        PROVIDER_KEY
63    }
64
65    fn capabilities(&self, _model: &str) -> Capabilities {
66        // Conservative full-feature defaults that match the documented
67        // Messages API. Per-model precision (vision flag for haiku-3 vs
68        // opus-4 etc.) lands in a follow-up slice.
69        Capabilities {
70            streaming: true,
71            tools: true,
72            multimodal_image: true,
73            multimodal_audio: false,
74            multimodal_video: false,
75            multimodal_document: true,
76            system_prompt: true,
77            structured_output: true,
78            prompt_caching: true,
79            thinking: true,
80            citations: true,
81            web_search: true,
82            computer_use: true,
83            max_context_tokens: 200_000,
84        }
85    }
86
87    fn auto_output_strategy(&self, _model: &str) -> crate::ir::OutputStrategy {
88        // Anthropic's native `output_config.format = json_schema`
89        // is newer than the tool-call surface and ships without a
90        // `strict` toggle — every `strict=true` request emits
91        // `LossyEncode { field: "response_format.strict" }`. The
92        // forced-tool surface is more mature, more consistent with
93        // every Anthropic version, and accepts arbitrary JSON
94        // schemas without strict-mode constraints. Pick Tool until
95        // Anthropic ships a native channel with parity.
96        crate::ir::OutputStrategy::Tool
97    }
98
99    fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
100        let (body, warnings) = build_body(request, false)?;
101        let mut encoded = finalize_request(&body, warnings)?;
102        apply_anthropic_beta_header(&mut encoded, request)?;
103        Ok(encoded)
104    }
105
106    fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
107        let (body, warnings) = build_body(request, true)?;
108        let mut encoded = finalize_request(&body, warnings)?;
109        encoded.headers.insert(
110            http::header::ACCEPT,
111            http::HeaderValue::from_static("text/event-stream"),
112        );
113        apply_anthropic_beta_header(&mut encoded, request)?;
114        Ok(encoded.into_streaming())
115    }
116
117    fn decode_stream<'a>(
118        &'a self,
119        bytes: BoxByteStream<'a>,
120        warnings_in: Vec<ModelWarning>,
121    ) -> BoxDeltaStream<'a> {
122        Box::pin(stream_anthropic_sse(bytes, warnings_in))
123    }
124
125    fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
126        let raw: Value = super::codec::parse_response_body(body, "Anthropic Messages")?;
127        let mut warnings = warnings_in;
128
129        let id = str_field(&raw, "id").to_owned();
130        let model = str_field(&raw, "model").to_owned();
131        let content = decode_content(&raw, &mut warnings);
132        let stop_reason = decode_stop_reason(&raw, &mut warnings);
133        let usage = decode_usage(&raw);
134
135        Ok(ModelResponse {
136            id,
137            model,
138            stop_reason,
139            content,
140            usage,
141            rate_limit: None,
142            warnings,
143            provider_echoes: Vec::new(),
144        })
145    }
146
147    fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
148        let mut snapshot = RateLimitSnapshot::default();
149        let mut populated = false;
150        for (header_name, target) in [
151            (
152                "anthropic-ratelimit-requests-remaining",
153                &mut snapshot.requests_remaining,
154            ),
155            (
156                "anthropic-ratelimit-tokens-remaining",
157                &mut snapshot.tokens_remaining,
158            ),
159        ] {
160            if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
161                && let Ok(parsed) = v.parse::<u64>()
162            {
163                *target = Some(parsed);
164                snapshot.raw.insert(header_name.to_owned(), v.to_owned());
165                populated = true;
166            }
167        }
168        for (header_name, target) in [
169            (
170                "anthropic-ratelimit-requests-reset",
171                &mut snapshot.requests_reset_at,
172            ),
173            (
174                "anthropic-ratelimit-tokens-reset",
175                &mut snapshot.tokens_reset_at,
176            ),
177        ] {
178            if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
179                && let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(v)
180            {
181                *target = Some(parsed.with_timezone(&chrono::Utc));
182                snapshot.raw.insert(header_name.to_owned(), v.to_owned());
183                populated = true;
184            }
185        }
186        populated.then_some(snapshot)
187    }
188}
189
190// ── body / request helpers ─────────────────────────────────────────────────
191
192fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
193    if request.messages.is_empty() {
194        return Err(Error::invalid_request(
195            "Anthropic Messages requires at least one message",
196        ));
197    }
198
199    // Anthropic Messages requires `max_tokens` — the wire field has
200    // no default. Silently injecting one would mask "model truncated
201    // unexpectedly" failures (the operator sees `stop_reason:
202    // max_tokens` without realising the codec, not the caller, set
203    // the cap). Invariant #15 — fail loud when the IR is missing a
204    // mandatory vendor field.
205    let max_tokens = request.max_tokens.ok_or_else(|| {
206        Error::invalid_request(
207            "Anthropic Messages requires max_tokens; \
208             set ModelRequest::max_tokens explicitly",
209        )
210    })?;
211
212    // Most encodes produce 0–1 warnings; pre-allocate one slot so the
213    // first push doesn't reallocate.
214    let mut warnings = Vec::with_capacity(1);
215    let (system_value, wire_messages) = encode_messages(request, &mut warnings);
216
217    let mut body = Map::new();
218    body.insert("model".into(), Value::String(request.model.clone()));
219    body.insert("messages".into(), Value::Array(wire_messages));
220    body.insert("max_tokens".into(), json!(max_tokens));
221    if let Some(value) = system_value {
222        body.insert("system".into(), value);
223    }
224    if let Some(temp) = request.temperature {
225        body.insert("temperature".into(), json!(temp));
226    }
227    if let Some(k) = request.top_k {
228        body.insert("top_k".into(), json!(k));
229    }
230    if let Some(p) = request.top_p {
231        body.insert("top_p".into(), json!(p));
232    }
233    if !request.stop_sequences.is_empty() {
234        body.insert(
235            "stop_sequences".into(),
236            json!(request.stop_sequences.clone()),
237        );
238    }
239    if !request.tools.is_empty() {
240        body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
241        body.insert(
242            "tool_choice".into(),
243            encode_tool_choice(&request.tool_choice),
244        );
245    }
246    if streaming {
247        body.insert("stream".into(), Value::Bool(true));
248    }
249    if let Some(format) = &request.response_format {
250        encode_anthropic_structured_output(format, &request.model, &mut body, &mut warnings)?;
251    }
252    apply_provider_extensions(request, &mut body, &mut warnings);
253    Ok((Value::Object(body), warnings))
254}
255
256/// Merge [`crate::ir::AnthropicExt::betas`] into the
257/// `anthropic-beta` HTTP header. The Anthropic Messages API
258/// documents one comma-separated value per request; the codec
259/// emits no header when the operator's beta list is empty.
260fn apply_anthropic_beta_header(encoded: &mut EncodedRequest, request: &ModelRequest) -> Result<()> {
261    let Some(anthropic) = &request.provider_extensions.anthropic else {
262        return Ok(());
263    };
264    if anthropic.betas.is_empty() {
265        return Ok(());
266    }
267    let value = anthropic.betas.join(",");
268    let header = http::HeaderValue::from_str(&value).map_err(|_| {
269        crate::Error::invalid_request(
270            "AnthropicExt::betas: each entry must contain only visible ASCII",
271        )
272    })?;
273    encoded
274        .headers
275        .insert(http::HeaderName::from_static("anthropic-beta"), header);
276    Ok(())
277}
278
279/// Read [`crate::ir::AnthropicExt`] and merge each set field into the
280/// wire body. Foreign-vendor extensions surface as
281/// [`ModelWarning::ProviderExtensionIgnored`] — the operator
282/// expressed an intent the Anthropic format cannot honour.
283fn apply_provider_extensions(
284    request: &ModelRequest,
285    body: &mut Map<String, Value>,
286    warnings: &mut Vec<ModelWarning>,
287) {
288    let ext = &request.provider_extensions;
289    // IR `parallel_tool_calls` translates onto Anthropic's
290    // `tool_choice.disable_parallel_tool_use` (inverted polarity).
291    // Anthropic only accepts the toggle inside a `tool_choice` block,
292    // which is only emitted when `tools` is non-empty. When the
293    // operator sets `parallel_tool_calls` with an empty tools list,
294    // surface the loss instead of silently discarding the knob.
295    if let Some(parallel) = request.parallel_tool_calls {
296        if let Some(tc) = body.get_mut("tool_choice").and_then(Value::as_object_mut) {
297            tc.insert("disable_parallel_tool_use".into(), json!(!parallel));
298        } else {
299            warnings.push(ModelWarning::LossyEncode {
300                field: "parallel_tool_calls".into(),
301                detail: "Anthropic encodes via tool_choice.disable_parallel_tool_use; \
302                         no tool_choice block (tools list empty) — knob discarded"
303                    .into(),
304            });
305        }
306    }
307    if let Some(user_id) = &request.end_user_id {
308        body.insert("metadata".into(), json!({"user_id": user_id}));
309    }
310    if request.seed.is_some() {
311        warnings.push(ModelWarning::LossyEncode {
312            field: "seed".into(),
313            detail: "Anthropic Messages has no deterministic-sampling knob — drop the field".into(),
314        });
315    }
316    if let Some(effort) = &request.reasoning_effort {
317        encode_anthropic_thinking(&request.model, effort, body, warnings);
318    }
319    if ext.openai_chat.is_some() {
320        warnings.push(ModelWarning::ProviderExtensionIgnored {
321            vendor: "openai_chat".into(),
322        });
323    }
324    if ext.openai_responses.is_some() {
325        warnings.push(ModelWarning::ProviderExtensionIgnored {
326            vendor: "openai_responses".into(),
327        });
328    }
329    if ext.gemini.is_some() {
330        warnings.push(ModelWarning::ProviderExtensionIgnored {
331            vendor: "gemini".into(),
332        });
333    }
334    if ext.bedrock.is_some() {
335        warnings.push(ModelWarning::ProviderExtensionIgnored {
336            vendor: "bedrock".into(),
337        });
338    }
339}
340
341/// Resolve [`OutputStrategy::Auto`] against the codec's preferred
342/// dispatch shape, then emit either the native `output_config`
343/// shape or the forced-tool shape into `body`. `Prompted` is
344/// rejected (1.1 — currently unsupported on every codec).
345fn encode_anthropic_structured_output(
346    format: &ResponseFormat,
347    model: &str,
348    body: &mut Map<String, Value>,
349    warnings: &mut Vec<ModelWarning>,
350) -> Result<()> {
351    let strategy = resolve_output_strategy(format.strategy, model);
352    match strategy {
353        OutputStrategy::Native => {
354            // Anthropic native structured outputs — `output_config`
355            // at the request root, raw JSON Schema (no wrapper, no
356            // strict toggle). Anthropic always strict-validates
357            // when the field is set.
358            body.insert(
359                "output_config".into(),
360                json!({
361                    "format": {
362                        "type": "json_schema",
363                        "schema": format.json_schema.schema.clone(),
364                    }
365                }),
366            );
367            if !format.strict {
368                warnings.push(ModelWarning::LossyEncode {
369                    field: "response_format.strict".into(),
370                    detail: "Anthropic always strict-validates structured output; \
371                         the strict=false request was approximated"
372                        .into(),
373                });
374            }
375        }
376        OutputStrategy::Tool => {
377            // Forced single tool call carrying the target schema.
378            // Mature surface, parity with every Anthropic version,
379            // accepts arbitrary JSON schemas without strict-mode
380            // constraints. The codec injects the synthetic tool
381            // and a `tool_choice` of `{type: "tool", name: ...}`
382            // ahead of any operator-supplied tools so the model
383            // emits exactly one `tool_use` block whose input
384            // matches the target schema.
385            let tool_name = format.json_schema.name.clone();
386            let synthetic_tool = json!({
387                "type": "custom",
388                "name": tool_name,
389                "description": format!(
390                    "Emit the response as a JSON object matching the {tool_name} schema."
391                ),
392                "input_schema": format.json_schema.schema.clone(),
393            });
394            // Prepend the structured-output tool so the model sees
395            // it first. Operator tools survive; the new
396            // `tool_choice` overrides any prior selection because
397            // structured-output dispatch demands a forced single
398            // tool call.
399            let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
400            if let Value::Array(arr) = tools {
401                arr.insert(0, synthetic_tool);
402            }
403            body.insert(
404                "tool_choice".into(),
405                json!({
406                    "type": "tool",
407                    "name": format.json_schema.name,
408                    // Disable parallel tool use so the model cannot
409                    // emit multiple tool_use blocks alongside the
410                    // structured-output call.
411                    "disable_parallel_tool_use": true,
412                }),
413            );
414            if !format.strict {
415                // Tool-input-schema validation is enforced at
416                // construction time by Anthropic regardless of the
417                // strict flag — surface the loss.
418                warnings.push(ModelWarning::LossyEncode {
419                    field: "response_format.strict".into(),
420                    detail: "Anthropic Tool-strategy structured output is always \
421                         schema-validated; strict=false was approximated"
422                        .into(),
423                });
424            }
425        }
426        OutputStrategy::Prompted => {
427            return Err(Error::invalid_request(
428                "OutputStrategy::Prompted is deferred to entelix 1.1; use \
429                 OutputStrategy::Native or OutputStrategy::Tool",
430            ));
431        }
432        OutputStrategy::Auto => {
433            // Resolved above — unreachable. Defensive arm.
434            return Err(Error::invalid_request(
435                "OutputStrategy::Auto did not resolve — codec invariant violation",
436            ));
437        }
438    }
439    Ok(())
440}
441
442/// Resolve [`OutputStrategy::Auto`] to Anthropic's preferred
443/// dispatch shape — currently `Tool` (the native `output_config`
444/// channel ships without a strict toggle and is less mature than
445/// the tool-call surface). The explicit per-variant arms keep the
446/// resolver readable as the cross-vendor `OutputStrategy` enum
447/// gains future variants — Clippy's `match_same_arms` would have
448/// us merge the identity arms but that hides the explicit
449/// per-variant intent.
450#[allow(clippy::match_same_arms)]
451const fn resolve_output_strategy(strategy: OutputStrategy, _model: &str) -> OutputStrategy {
452    match strategy {
453        OutputStrategy::Auto => OutputStrategy::Tool,
454        OutputStrategy::Native => OutputStrategy::Native,
455        OutputStrategy::Tool => OutputStrategy::Tool,
456        OutputStrategy::Prompted => OutputStrategy::Prompted,
457    }
458}
459
460/// Anthropic Opus 4.7 budget tokens — adaptive-only, manual budget
461/// rejected at encode time. Sonnet 4.6 / 4.5 / Haiku accept either
462/// adaptive or explicit budget. The codec branches on the model
463/// string prefix because Anthropic does not expose a wire-level
464/// "this model is adaptive-only" signal — the constraint is
465/// vendor-side request validation that surfaces as 4xx without
466/// useful diagnostic context.
467fn is_anthropic_adaptive_only(model: &str) -> bool {
468    // Opus 4.7 is the only adaptive-only model in Anthropic's 2026
469    // lineup. Future models that ship adaptive-only join this list.
470    model.starts_with("claude-opus-4-7")
471}
472
473/// Translate the cross-vendor [`ReasoningEffort`] knob onto the
474/// Anthropic Messages API `thinking` field. Per:
475///
476/// - `Off` → `{type:"disabled"}`
477/// - `Minimal` → `{type:"adaptive", effort:"low"}` (LossyEncode —
478///   Anthropic's smallest adaptive bucket; closer than `enabled`
479///   with a sub-1024 budget which the vendor rejects)
480/// - `Low` → `{type:"enabled", budget_tokens:1024}` (or adaptive on
481///   Opus 4.7)
482/// - `Medium` → `{type:"enabled", budget_tokens:4096}` (or adaptive
483///   on Opus 4.7)
484/// - `High` → `{type:"enabled", budget_tokens:16384}` (or adaptive
485///   on Opus 4.7)
486/// - `Auto` → `{type:"adaptive"}`
487/// - `VendorSpecific(s)` — `s` parses as decimal `budget_tokens`
488///   (Opus 4.7 rejects this with `Error::invalid_request`).
489///   Non-numeric `s` emits `LossyEncode` and falls through to
490///   `Medium`.
491fn encode_anthropic_thinking(
492    model: &str,
493    effort: &ReasoningEffort,
494    body: &mut Map<String, Value>,
495    warnings: &mut Vec<ModelWarning>,
496) {
497    let adaptive_only = is_anthropic_adaptive_only(model);
498    let thinking = match effort {
499        ReasoningEffort::Off => {
500            json!({"type": "disabled"})
501        }
502        ReasoningEffort::Minimal => {
503            warnings.push(ModelWarning::LossyEncode {
504                field: "reasoning_effort".into(),
505                detail:
506                    "Anthropic has no `Minimal` bucket — snapped to `{type:\"adaptive\", effort:\"low\"}`"
507                        .into(),
508            });
509            json!({"type": "adaptive", "effort": "low"})
510        }
511        ReasoningEffort::Low => {
512            if adaptive_only {
513                json!({"type": "adaptive", "effort": "low"})
514            } else {
515                json!({"type": "enabled", "budget_tokens": 1024})
516            }
517        }
518        ReasoningEffort::Medium => {
519            if adaptive_only {
520                json!({"type": "adaptive", "effort": "medium"})
521            } else {
522                json!({"type": "enabled", "budget_tokens": 4096})
523            }
524        }
525        ReasoningEffort::High => {
526            if adaptive_only {
527                json!({"type": "adaptive", "effort": "high"})
528            } else {
529                json!({"type": "enabled", "budget_tokens": 16384})
530            }
531        }
532        ReasoningEffort::Auto => {
533            json!({"type": "adaptive"})
534        }
535        ReasoningEffort::VendorSpecific(literal) => {
536            if adaptive_only {
537                // Opus 4.7 manual-budget rejection — encode-time
538                // failure with a useful diagnostic. Falling back
539                // silently would let the operator's intent (raw
540                // budget tokens) hit the wire and surface as a
541                // 4xx with vague upstream wording.
542                warnings.push(ModelWarning::LossyEncode {
543                    field: "reasoning_effort".into(),
544                    detail: format!(
545                        "Anthropic {model} is adaptive-only — manual budget '{literal}' \
546                         dropped; emitting `{{type:\"adaptive\"}}` instead"
547                    ),
548                });
549                json!({"type": "adaptive"})
550            } else if let Ok(budget) = literal.parse::<u32>() {
551                json!({"type": "enabled", "budget_tokens": budget})
552            } else {
553                warnings.push(ModelWarning::LossyEncode {
554                    field: "reasoning_effort".into(),
555                    detail: format!(
556                        "Anthropic vendor-specific reasoning_effort {literal:?} is not a \
557                         numeric budget_tokens — falling through to `Medium`"
558                    ),
559                });
560                json!({"type": "enabled", "budget_tokens": 4096})
561            }
562        }
563    };
564    body.insert("thinking".into(), thinking);
565}
566
567fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
568    let bytes = serde_json::to_vec(body)?;
569    let mut encoded = EncodedRequest::post_json("/v1/messages", Bytes::from(bytes));
570    encoded.headers.insert(
571        http::HeaderName::from_static("anthropic-version"),
572        http::HeaderValue::from_static(ANTHROPIC_VERSION),
573    );
574    encoded.warnings = warnings;
575    Ok(encoded)
576}
577
578// ── encode helpers ─────────────────────────────────────────────────────────
579
580fn encode_messages(
581    request: &ModelRequest,
582    warnings: &mut Vec<ModelWarning>,
583) -> (Option<Value>, Vec<Value>) {
584    // Two collection paths:
585    // - If any IR system block carries cache_control, we emit the
586    //   array form (preserving per-block cache directives).
587    // - Otherwise the simple string form is sufficient.
588    let mut system_blocks: Vec<(String, Option<crate::ir::CacheControl>)> = request
589        .system
590        .blocks()
591        .iter()
592        .map(|b| (b.text.clone(), b.cache_control))
593        .collect();
594    let mut wire_messages = Vec::with_capacity(request.messages.len());
595
596    for (idx, msg) in request.messages.iter().enumerate() {
597        match msg.role {
598            Role::System => {
599                let mut lossy_non_text = false;
600                let text = msg
601                    .content
602                    .iter()
603                    .filter_map(|part| {
604                        if let ContentPart::Text { text, .. } = part {
605                            Some(text.clone())
606                        } else {
607                            lossy_non_text = true;
608                            None
609                        }
610                    })
611                    .collect::<Vec<_>>()
612                    .join("\n");
613                if lossy_non_text {
614                    warnings.push(ModelWarning::LossyEncode {
615                        field: format!("messages[{idx}].content"),
616                        detail: "non-text parts dropped from system message (Anthropic has no \
617                                 system role)"
618                            .into(),
619                    });
620                }
621                if !text.is_empty() {
622                    system_blocks.push((text, None));
623                }
624            }
625            Role::User | Role::Assistant | Role::Tool => {
626                let role_str = match msg.role {
627                    Role::Assistant => "assistant",
628                    _ => "user",
629                };
630                let content_array = encode_content_parts(&msg.content, warnings, idx);
631                let mut entry = Map::new();
632                entry.insert("role".into(), Value::String(role_str.into()));
633                entry.insert("content".into(), Value::Array(content_array));
634                wire_messages.push(Value::Object(entry));
635            }
636        }
637    }
638
639    // Emit the array form when any block has cache_control set;
640    // otherwise the simpler string form is enough — minimizing
641    // wire bytes when callers don't need caching.
642    let any_cached = system_blocks.iter().any(|(_, cc)| cc.is_some());
643    let system_value = if system_blocks.is_empty() {
644        None
645    } else if any_cached {
646        let array: Vec<Value> = system_blocks
647            .into_iter()
648            .map(|(text, cc)| {
649                let mut obj = Map::new();
650                obj.insert("type".into(), Value::String("text".into()));
651                obj.insert("text".into(), Value::String(text));
652                if let Some(cache) = cc {
653                    obj.insert("cache_control".into(), encode_cache_control(cache));
654                }
655                Value::Object(obj)
656            })
657            .collect();
658        Some(Value::Array(array))
659    } else {
660        Some(Value::String(
661            system_blocks
662                .into_iter()
663                .map(|(text, _)| text)
664                .collect::<Vec<_>>()
665                .join("\n\n"),
666        ))
667    };
668    (system_value, wire_messages)
669}
670
671#[allow(clippy::too_many_lines)] // dispatch over every ContentPart variant
672fn encode_content_parts(
673    parts: &[ContentPart],
674    warnings: &mut Vec<ModelWarning>,
675    msg_idx: usize,
676) -> Vec<Value> {
677    let mut out = Vec::with_capacity(parts.len());
678    for (part_idx, part) in parts.iter().enumerate() {
679        let path = || format!("messages[{msg_idx}].content[{part_idx}]");
680        match part {
681            ContentPart::Text {
682                text,
683                cache_control,
684                ..
685            } => {
686                let mut block = json_block("text", &[("text", Value::String(text.clone()))]);
687                attach_cache_control(&mut block, cache_control.as_ref());
688                out.push(Value::Object(block));
689            }
690            ContentPart::Image {
691                source,
692                cache_control,
693                ..
694            } => {
695                let mut block = json_block(
696                    "image",
697                    &[("source", encode_media_source_anthropic(source))],
698                );
699                attach_cache_control(&mut block, cache_control.as_ref());
700                out.push(Value::Object(block));
701            }
702            ContentPart::Audio { .. } => warnings.push(ModelWarning::LossyEncode {
703                field: path(),
704                detail: "Anthropic Messages does not accept audio inputs; block dropped".into(),
705            }),
706            ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
707                field: path(),
708                detail: "Anthropic Messages does not accept video inputs; block dropped".into(),
709            }),
710            ContentPart::Document {
711                source,
712                name,
713                cache_control,
714                ..
715            } => {
716                let mut block = Map::new();
717                block.insert("type".into(), Value::String("document".into()));
718                block.insert("source".into(), encode_media_source_anthropic(source));
719                if let Some(title) = name {
720                    block.insert("title".into(), Value::String(title.clone()));
721                }
722                attach_cache_control(&mut block, cache_control.as_ref());
723                out.push(Value::Object(block));
724            }
725            ContentPart::Thinking {
726                text,
727                cache_control,
728                provider_echoes,
729            } => {
730                let mut block = Map::new();
731                block.insert("type".into(), Value::String("thinking".into()));
732                block.insert("thinking".into(), Value::String(text.clone()));
733                if let Some(sig) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
734                    .and_then(|e| e.payload_str("signature"))
735                {
736                    block.insert("signature".into(), Value::String(sig.to_owned()));
737                }
738                attach_cache_control(&mut block, cache_control.as_ref());
739                out.push(Value::Object(block));
740            }
741            ContentPart::RedactedThinking { provider_echoes } => {
742                let Some(data) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
743                    .and_then(|e| e.payload_str("data"))
744                else {
745                    // No matching opaque blob — without the original
746                    // ciphertext the wire shape is unrepresentable.
747                    warnings.push(ModelWarning::LossyEncode {
748                        field: path(),
749                        detail: "redacted_thinking part missing 'anthropic-messages' \
750                                 provider_echo with 'data' payload; block dropped"
751                            .into(),
752                    });
753                    continue;
754                };
755                let mut block = Map::new();
756                block.insert("type".into(), Value::String("redacted_thinking".into()));
757                block.insert("data".into(), Value::String(data.to_owned()));
758                out.push(Value::Object(block));
759            }
760            ContentPart::Citation {
761                snippet,
762                source,
763                cache_control,
764                ..
765            } => {
766                // Anthropic carries citations inline on `text` blocks; round-tripping
767                // a standalone Citation IR variant means re-emitting the cited text
768                // with a `citations` array attached.
769                let citation_json = match source {
770                    CitationSource::Url { url, title } => {
771                        let mut o = Map::new();
772                        o.insert(
773                            "type".into(),
774                            Value::String("web_search_result_location".into()),
775                        );
776                        o.insert("url".into(), Value::String(url.clone()));
777                        if let Some(t) = title {
778                            o.insert("title".into(), Value::String(t.clone()));
779                        }
780                        Value::Object(o)
781                    }
782                    CitationSource::Document {
783                        document_index,
784                        title,
785                    } => {
786                        let mut o = Map::new();
787                        o.insert("type".into(), Value::String("char_location".into()));
788                        o.insert("document_index".into(), json!(*document_index));
789                        if let Some(t) = title {
790                            o.insert("document_title".into(), Value::String(t.clone()));
791                        }
792                        Value::Object(o)
793                    }
794                };
795                let mut block = Map::new();
796                block.insert("type".into(), Value::String("text".into()));
797                block.insert("text".into(), Value::String(snippet.clone()));
798                block.insert("citations".into(), Value::Array(vec![citation_json]));
799                attach_cache_control(&mut block, cache_control.as_ref());
800                out.push(Value::Object(block));
801            }
802            ContentPart::ToolUse {
803                id, name, input, ..
804            } => out.push(json!({
805                "type": "tool_use",
806                "id": id,
807                "name": name,
808                "input": input,
809            })),
810            ContentPart::ToolResult {
811                tool_use_id,
812                name: _,
813                content,
814                is_error,
815                cache_control,
816                ..
817            } => out.push(encode_tool_result(
818                tool_use_id,
819                content,
820                *is_error,
821                cache_control.as_ref(),
822                warnings,
823                msg_idx,
824                part_idx,
825            )),
826            ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
827                // Assistant-produced image/audio is output-only; no
828                // Anthropic input shape consumes it. Drop with a
829                // LossyEncode warning so a multi-turn replay that
830                // happens to surface the part is not silently
831                // mis-encoded as user input.
832                warnings.push(ModelWarning::LossyEncode {
833                    field: path(),
834                    detail: "Anthropic Messages does not accept assistant-produced \
835                             image / audio output as input — block dropped"
836                        .into(),
837                });
838            }
839        }
840    }
841    out
842}
843
844/// Build a JSON object block of `{type: <kind>, ...fields}`.
845fn json_block(kind: &str, fields: &[(&str, Value)]) -> Map<String, Value> {
846    let mut block = Map::new();
847    block.insert("type".into(), Value::String(kind.into()));
848    for (k, v) in fields {
849        block.insert((*k).to_owned(), v.clone());
850    }
851    block
852}
853
854/// Insert Anthropic's `cache_control` block. The wire shape is
855/// always `{type: "ephemeral"}` plus an optional sibling `ttl: "1h"`
856/// for the premium tier — `type` never carries the TTL string.
857fn attach_cache_control(block: &mut Map<String, Value>, cache: Option<&crate::ir::CacheControl>) {
858    if let Some(cache) = cache {
859        block.insert("cache_control".into(), encode_cache_control(*cache));
860    }
861}
862
863/// Render a [`crate::ir::CacheControl`] as Anthropic's
864/// `cache_control` JSON value. Single source of truth for the wire
865/// shape — every encode path (system blocks, content blocks, tool
866/// blocks, tool_result blocks) goes through here.
867fn encode_cache_control(cache: crate::ir::CacheControl) -> Value {
868    let mut obj = Map::new();
869    obj.insert("type".into(), Value::String("ephemeral".into()));
870    if let Some(ttl) = cache.ttl.wire_ttl_field() {
871        obj.insert("ttl".into(), Value::String(ttl.into()));
872    }
873    Value::Object(obj)
874}
875
876fn encode_media_source_anthropic(source: &MediaSource) -> Value {
877    match source {
878        MediaSource::Url { url, .. } => json!({
879            "type": "url",
880            "url": url,
881        }),
882        MediaSource::Base64 { media_type, data } => json!({
883            "type": "base64",
884            "media_type": media_type,
885            "data": data,
886        }),
887        MediaSource::FileId { id, .. } => json!({
888            "type": "file",
889            "file_id": id,
890        }),
891    }
892}
893
894fn encode_tool_result(
895    tool_use_id: &str,
896    content: &ToolResultContent,
897    is_error: bool,
898    cache_control: Option<&crate::ir::CacheControl>,
899    warnings: &mut Vec<ModelWarning>,
900    msg_idx: usize,
901    part_idx: usize,
902) -> Value {
903    let content_json = match content {
904        ToolResultContent::Text(s) => Value::String(s.clone()),
905        ToolResultContent::Json(v) => {
906            warnings.push(ModelWarning::LossyEncode {
907                field: format!("messages[{msg_idx}].content[{part_idx}]"),
908                detail: "tool_result Json payload stringified for Anthropic wire format".into(),
909            });
910            Value::String(v.to_string())
911        }
912    };
913    let mut block = Map::new();
914    block.insert("type".into(), Value::String("tool_result".into()));
915    block.insert("tool_use_id".into(), Value::String(tool_use_id.into()));
916    block.insert("content".into(), content_json);
917    if is_error {
918        block.insert("is_error".into(), Value::Bool(true));
919    }
920    attach_cache_control(&mut block, cache_control);
921    Value::Object(block)
922}
923
924fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
925    let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
926    for (idx, t) in tools.iter().enumerate() {
927        let mut obj = match &t.kind {
928            ToolKind::Function { input_schema } => {
929                let mut o = Map::new();
930                o.insert("name".into(), Value::String(t.name.clone()));
931                o.insert("description".into(), Value::String(t.description.clone()));
932                o.insert("input_schema".into(), input_schema.clone());
933                o
934            }
935            ToolKind::WebSearch {
936                max_uses,
937                allowed_domains,
938            } => {
939                let mut o = Map::new();
940                o.insert("type".into(), Value::String("web_search_20250305".into()));
941                o.insert("name".into(), Value::String(t.name.clone()));
942                if let Some(n) = max_uses {
943                    o.insert("max_uses".into(), json!(*n));
944                }
945                if !allowed_domains.is_empty() {
946                    o.insert("allowed_domains".into(), json!(allowed_domains));
947                }
948                o
949            }
950            ToolKind::Computer {
951                display_width,
952                display_height,
953            } => {
954                let mut o = Map::new();
955                o.insert("type".into(), Value::String("computer_20250124".into()));
956                o.insert("name".into(), Value::String(t.name.clone()));
957                o.insert("display_width_px".into(), json!(*display_width));
958                o.insert("display_height_px".into(), json!(*display_height));
959                o
960            }
961            ToolKind::TextEditor => {
962                let mut o = Map::new();
963                o.insert("type".into(), Value::String("text_editor_20250124".into()));
964                o.insert("name".into(), Value::String(t.name.clone()));
965                o
966            }
967            ToolKind::Bash => {
968                let mut o = Map::new();
969                o.insert("type".into(), Value::String("bash_20250124".into()));
970                o.insert("name".into(), Value::String(t.name.clone()));
971                o
972            }
973            ToolKind::CodeExecution => {
974                let mut o = Map::new();
975                o.insert(
976                    "type".into(),
977                    Value::String("code_execution_20250522".into()),
978                );
979                o.insert("name".into(), Value::String(t.name.clone()));
980                o
981            }
982            ToolKind::McpConnector {
983                name,
984                server_url,
985                authorization_token,
986            } => {
987                let mut o = Map::new();
988                o.insert("type".into(), Value::String("mcp".into()));
989                o.insert("name".into(), Value::String(name.clone()));
990                o.insert("server_url".into(), Value::String(server_url.clone()));
991                if let Some(token) = authorization_token {
992                    o.insert("authorization_token".into(), Value::String(token.clone()));
993                }
994                o
995            }
996            ToolKind::Memory => {
997                let mut o = Map::new();
998                o.insert("type".into(), Value::String("memory_20250818".into()));
999                o.insert("name".into(), Value::String(t.name.clone()));
1000                o
1001            }
1002            ToolKind::FileSearch { .. } | ToolKind::CodeInterpreter | ToolKind::ImageGeneration => {
1003                warnings.push(ModelWarning::LossyEncode {
1004                    field: format!("tools[{idx}]"),
1005                    detail: "Anthropic does not natively support OpenAI-only built-ins \
1006                             (file_search / code_interpreter / image_generation) — tool dropped"
1007                        .into(),
1008                });
1009                continue;
1010            }
1011        };
1012        attach_cache_control(&mut obj, t.cache_control.as_ref());
1013        arr.push(Value::Object(obj));
1014    }
1015    Value::Array(arr)
1016}
1017
1018fn encode_tool_choice(choice: &ToolChoice) -> Value {
1019    match choice {
1020        ToolChoice::Auto => json!({ "type": "auto" }),
1021        ToolChoice::Required => json!({ "type": "any" }),
1022        ToolChoice::Specific { name } => json!({ "type": "tool", "name": name }),
1023        ToolChoice::None => json!({ "type": "none" }),
1024    }
1025}
1026
1027// ── decode helpers ─────────────────────────────────────────────────────────
1028
1029fn decode_content(raw: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
1030    let Some(arr) = raw.get("content").and_then(Value::as_array) else {
1031        return Vec::new();
1032    };
1033    let mut out = Vec::with_capacity(arr.len());
1034    for (idx, block) in arr.iter().enumerate() {
1035        match block.get("type").and_then(Value::as_str) {
1036            Some("text") => {
1037                let text = str_field(block, "text").to_owned();
1038                // If the block carries `citations`, emit them as separate
1039                // ContentPart::Citation entries followed by the underlying
1040                // text — preserves provenance while keeping the IR variant
1041                // discriminated.
1042                if let Some(citations) = block.get("citations").and_then(Value::as_array) {
1043                    for c in citations {
1044                        if let Some(source) = decode_citation_source(c) {
1045                            out.push(ContentPart::Citation {
1046                                snippet: text.clone(),
1047                                source,
1048                                cache_control: None,
1049                                provider_echoes: Vec::new(),
1050                            });
1051                        }
1052                    }
1053                }
1054                if !text.is_empty() {
1055                    out.push(ContentPart::text(text));
1056                }
1057            }
1058            Some("thinking") => {
1059                let thinking_text = str_field(block, "thinking").to_owned();
1060                let mut provider_echoes = Vec::new();
1061                if let Some(sig) = block
1062                    .get("signature")
1063                    .and_then(Value::as_str)
1064                    .filter(|s| !s.is_empty())
1065                {
1066                    provider_echoes.push(ProviderEchoSnapshot::for_provider(
1067                        PROVIDER_KEY,
1068                        "signature",
1069                        sig.to_owned(),
1070                    ));
1071                }
1072                out.push(ContentPart::Thinking {
1073                    text: thinking_text,
1074                    cache_control: None,
1075                    provider_echoes,
1076                });
1077            }
1078            Some("redacted_thinking") => {
1079                // The entire redacted block is opaque ciphertext —
1080                // round-trips through provider_echoes; nothing else.
1081                let data = str_field(block, "data").to_owned();
1082                out.push(ContentPart::RedactedThinking {
1083                    provider_echoes: vec![ProviderEchoSnapshot::for_provider(
1084                        PROVIDER_KEY,
1085                        "data",
1086                        data,
1087                    )],
1088                });
1089            }
1090            Some("tool_use") => {
1091                let id = str_field(block, "id").to_owned();
1092                let name = str_field(block, "name").to_owned();
1093                let input = block.get("input").cloned().unwrap_or_else(|| json!({})); // silent-fallback-ok: tool_use without args = empty-args call (vendor sometimes omits)
1094                out.push(ContentPart::ToolUse {
1095                    id,
1096                    name,
1097                    input,
1098                    provider_echoes: Vec::new(),
1099                });
1100            }
1101            Some(other) => {
1102                warnings.push(ModelWarning::LossyEncode {
1103                    field: format!("response.content[{idx}]"),
1104                    detail: format!("unknown content block type '{other}' dropped"),
1105                });
1106            }
1107            None => {
1108                warnings.push(ModelWarning::LossyEncode {
1109                    field: format!("response.content[{idx}]"),
1110                    detail: "content block missing 'type' field".into(),
1111                });
1112            }
1113        }
1114    }
1115    out
1116}
1117
1118fn decode_citation_source(c: &Value) -> Option<CitationSource> {
1119    match c.get("type").and_then(Value::as_str)? {
1120        "web_search_result_location" => Some(CitationSource::Url {
1121            url: str_field(c, "url").to_owned(),
1122            title: c.get("title").and_then(Value::as_str).map(str::to_owned),
1123        }),
1124        "char_location" | "page_location" | "content_block_location" => {
1125            // Invariant #15 — `document_index` is the
1126            // citation's load-bearing pointer; defaulting to 0
1127            // would silently rewrite "vendor failed to identify the
1128            // document" as "the first document". Drop the citation
1129            // entirely when the index is missing or unparseable so
1130            // operators see the absence rather than a wrong link.
1131            let document_index = c
1132                .get("document_index")
1133                .and_then(Value::as_u64)
1134                .and_then(|n| u32::try_from(n).ok())?;
1135            Some(CitationSource::Document {
1136                document_index,
1137                title: c
1138                    .get("document_title")
1139                    .and_then(Value::as_str)
1140                    .map(str::to_owned),
1141            })
1142        }
1143        _ => None,
1144    }
1145}
1146
1147fn decode_stop_reason(raw: &Value, warnings: &mut Vec<ModelWarning>) -> StopReason {
1148    match raw.get("stop_reason").and_then(Value::as_str) {
1149        Some("end_turn") => StopReason::EndTurn,
1150        Some("max_tokens") => StopReason::MaxTokens,
1151        Some("stop_sequence") => StopReason::StopSequence {
1152            sequence: str_field(raw, "stop_sequence").to_owned(),
1153        },
1154        Some("tool_use") => StopReason::ToolUse,
1155        Some("refusal") => StopReason::Refusal {
1156            reason: RefusalReason::Safety,
1157        },
1158        Some(other) => {
1159            warnings.push(ModelWarning::UnknownStopReason {
1160                raw: other.to_owned(),
1161            });
1162            StopReason::Other {
1163                raw: other.to_owned(),
1164            }
1165        }
1166        None => {
1167            // Invariant #15 — silent EndTurn fallback was masking
1168            // truncated stream payloads from callers. Record as
1169            // Other + warning so observability sees the loss.
1170            warnings.push(ModelWarning::LossyEncode {
1171                field: "stop_reason".into(),
1172                detail: "Anthropic Messages payload carried no stop_reason — \
1173                         IR records `Other{raw:\"missing\"}`"
1174                    .into(),
1175            });
1176            StopReason::Other {
1177                raw: "missing".to_owned(),
1178            }
1179        }
1180    }
1181}
1182
1183fn decode_usage(raw: &Value) -> Usage {
1184    let usage = raw.get("usage");
1185    Usage {
1186        input_tokens: u_field(usage, "input_tokens"),
1187        output_tokens: u_field(usage, "output_tokens"),
1188        cached_input_tokens: u_field(usage, "cache_read_input_tokens"),
1189        cache_creation_input_tokens: u_field(usage, "cache_creation_input_tokens"),
1190        reasoning_tokens: 0,
1191        safety_ratings: Vec::new(),
1192    }
1193}
1194
1195fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1196    v.get(key).and_then(Value::as_str).unwrap_or("") // silent-fallback-ok: missing optional string field
1197}
1198
1199fn u_field(v: Option<&Value>, key: &str) -> u32 {
1200    v.and_then(|inner| inner.get(key))
1201        .and_then(Value::as_u64)
1202        .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) // silent-fallback-ok: missing usage metric = 0 (vendor didn't report = unused); u64→u32 saturate
1203}
1204
1205// ── SSE streaming parser ───────────────────────────────────────────────────
1206
1207/// Per-block book-keeping the SSE parser needs to produce the right
1208/// `StreamDelta` on `content_block_stop`.
1209#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1210enum BlockKind {
1211    Text,
1212    Thinking,
1213    ToolUse,
1214}
1215
1216#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1217fn stream_anthropic_sse(
1218    bytes: BoxByteStream<'_>,
1219    warnings_in: Vec<ModelWarning>,
1220) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1221    async_stream::stream! {
1222        let mut bytes = bytes;
1223        let mut buf: Vec<u8> = Vec::new();
1224        let mut blocks: HashMap<u64, BlockKind> = HashMap::new();
1225        let mut last_stop_reason = StopReason::EndTurn;
1226        let mut accumulated_usage = Usage::default();
1227        let mut warnings_emitted = false;
1228
1229        while let Some(chunk) = bytes.next().await {
1230            match chunk {
1231                Ok(b) => buf.extend_from_slice(&b),
1232                Err(e) => {
1233                    yield Err(e);
1234                    return;
1235                }
1236            }
1237            // Emit any accumulated encode-time warnings on first byte.
1238            if !warnings_emitted {
1239                warnings_emitted = true;
1240                for w in &warnings_in {
1241                    yield Ok(StreamDelta::Warning(w.clone()));
1242                }
1243            }
1244            // Process every complete `\n\n`-terminated SSE frame.
1245            while let Some(pos) = find_double_newline(&buf) {
1246                let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1247                let Ok(frame_str) = std::str::from_utf8(&frame) else {
1248                    continue;
1249                };
1250                let Some(payload) = parse_sse_data(frame_str) else {
1251                    continue;
1252                };
1253                let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1254                    yield Err(Error::invalid_request(format!(
1255                        "Anthropic stream: malformed event payload: {payload}"
1256                    )));
1257                    return;
1258                };
1259                let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); // silent-fallback-ok: missing event type → no-match fallthrough
1260                match event_type {
1261                    "message_start" => {
1262                        let message = event.get("message").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates through child .get() chain as None
1263                        let id = str_field(message, "id").to_owned();
1264                        let model = str_field(message, "model").to_owned();
1265                        if let Some(usage) = message.get("usage") {
1266                            accumulated_usage.input_tokens = u_field(Some(usage), "input_tokens");
1267                            accumulated_usage.cached_input_tokens =
1268                                u_field(Some(usage), "cache_read_input_tokens");
1269                            accumulated_usage.cache_creation_input_tokens =
1270                                u_field(Some(usage), "cache_creation_input_tokens");
1271                        }
1272                        yield Ok(StreamDelta::Start {
1273                            id,
1274                            model,
1275                            provider_echoes: Vec::new(),
1276                        });
1277                    }
1278                    "content_block_start" => {
1279                        let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1280                            n
1281                        } else {
1282                            yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1283                                field: "stream.content_block_start.index".into(),
1284                                detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 to keep stream parser progressing".into(),
1285                            }));
1286                            0
1287                        };
1288                        let block = event.get("content_block").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1289                        match block.get("type").and_then(Value::as_str) {
1290                            Some("text") => {
1291                                blocks.insert(idx, BlockKind::Text);
1292                                if let Some(text) = block.get("text").and_then(Value::as_str)
1293                                    && !text.is_empty()
1294                                {
1295                                    yield Ok(StreamDelta::TextDelta {
1296                                        text: text.to_owned(),
1297                                        provider_echoes: Vec::new(),
1298                                    });
1299                                }
1300                            }
1301                            Some("thinking") => {
1302                                blocks.insert(idx, BlockKind::Thinking);
1303                                let text = block
1304                                    .get("thinking")
1305                                    .and_then(Value::as_str)
1306                                    .unwrap_or("") // silent-fallback-ok: missing thinking text → empty body; downstream is_empty() guard suppresses the StreamDelta
1307                                    .to_owned();
1308                                // Anthropic emits an empty `signature: ""`
1309                                // placeholder on content_block_start; the
1310                                // real signature arrives later via the
1311                                // discrete `signature_delta` event. Skip
1312                                // the placeholder so the aggregated echo
1313                                // list carries only the authoritative
1314                                // value.
1315                                let provider_echoes = block
1316                                    .get("signature")
1317                                    .and_then(Value::as_str)
1318                                    .filter(|s| !s.is_empty())
1319                                    .map(|sig| {
1320                                        vec![ProviderEchoSnapshot::for_provider(
1321                                            PROVIDER_KEY,
1322                                            "signature",
1323                                            sig.to_owned(),
1324                                        )]
1325                                    })
1326                                    .unwrap_or_default();
1327                                if !text.is_empty() || !provider_echoes.is_empty() {
1328                                    yield Ok(StreamDelta::ThinkingDelta {
1329                                        text,
1330                                        provider_echoes,
1331                                    });
1332                                }
1333                            }
1334                            Some("tool_use") => {
1335                                blocks.insert(idx, BlockKind::ToolUse);
1336                                let id = str_field(block, "id").to_owned();
1337                                let name = str_field(block, "name").to_owned();
1338                                yield Ok(StreamDelta::ToolUseStart {
1339                                    id,
1340                                    name,
1341                                    provider_echoes: Vec::new(),
1342                                });
1343                            }
1344                            other => {
1345                                yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1346                                    field: format!("stream.content_block_start[{idx}]"),
1347                                    detail: format!(
1348                                        "unsupported block type {other:?} dropped"
1349                                    ),
1350                                }));
1351                            }
1352                        }
1353                    }
1354                    "content_block_delta" => {
1355                        let delta = event.get("delta").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1356                        match delta.get("type").and_then(Value::as_str) {
1357                            Some("text_delta") => {
1358                                if let Some(text) = delta.get("text").and_then(Value::as_str) {
1359                                    yield Ok(StreamDelta::TextDelta {
1360                                        text: text.to_owned(),
1361                                        provider_echoes: Vec::new(),
1362                                    });
1363                                }
1364                            }
1365                            Some("thinking_delta") => {
1366                                if let Some(text) = delta.get("thinking").and_then(Value::as_str) {
1367                                    yield Ok(StreamDelta::ThinkingDelta {
1368                                        text: text.to_owned(),
1369                                        provider_echoes: Vec::new(),
1370                                    });
1371                                }
1372                            }
1373                            Some("signature_delta") => {
1374                                if let Some(sig) =
1375                                    delta.get("signature").and_then(Value::as_str)
1376                                {
1377                                    yield Ok(StreamDelta::ThinkingDelta {
1378                                        text: String::new(),
1379                                        provider_echoes: vec![
1380                                            ProviderEchoSnapshot::for_provider(
1381                                                PROVIDER_KEY,
1382                                                "signature",
1383                                                sig.to_owned(),
1384                                            ),
1385                                        ],
1386                                    });
1387                                }
1388                            }
1389                            Some("input_json_delta") => {
1390                                if let Some(partial) =
1391                                    delta.get("partial_json").and_then(Value::as_str)
1392                                {
1393                                    yield Ok(StreamDelta::ToolUseInputDelta {
1394                                        partial_json: partial.to_owned(),
1395                                    });
1396                                }
1397                            }
1398                            other => {
1399                                yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1400                                    field: "stream.content_block_delta".into(),
1401                                    detail: format!(
1402                                        "unsupported delta type {other:?} dropped"
1403                                    ),
1404                                }));
1405                            }
1406                        }
1407                    }
1408                    "content_block_stop" => {
1409                        let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1410                            n
1411                        } else {
1412                            yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1413                                field: "stream.content_block_stop.index".into(),
1414                                detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 (mirrors the content_block_start handler)".into(),
1415                            }));
1416                            0
1417                        };
1418                        if matches!(blocks.remove(&idx), Some(BlockKind::ToolUse)) {
1419                            yield Ok(StreamDelta::ToolUseStop);
1420                        }
1421                    }
1422                    "message_delta" => {
1423                        if let Some(delta) = event.get("delta")
1424                            && let Some(reason) =
1425                                delta.get("stop_reason").and_then(Value::as_str)
1426                        {
1427                            // Match the non-streaming `decode_stop_reason` exhaustiveness:
1428                            // every documented Anthropic stop reason maps explicitly,
1429                            // and any unknown raw value yields a `UnknownStopReason`
1430                            // warning delta so streaming and non-streaming clients see
1431                            // the same observability signal when a vendor extension
1432                            // arrives. Also extract `stop_sequence` so the matched
1433                            // sequence string is not dropped.
1434                            last_stop_reason = match reason {
1435                                "end_turn" => StopReason::EndTurn,
1436                                "max_tokens" => StopReason::MaxTokens,
1437                                "stop_sequence" => StopReason::StopSequence {
1438                                    sequence: delta
1439                                        .get("stop_sequence")
1440                                        .and_then(Value::as_str)
1441                                        .unwrap_or_default() // silent-fallback-ok: vendor reported stop_sequence stop reason without echoing the matched sequence string; "" preserves the StopReason variant choice
1442                                        .to_owned(),
1443                                },
1444                                "tool_use" => StopReason::ToolUse,
1445                                "refusal" => StopReason::Refusal {
1446                                    reason: RefusalReason::Safety,
1447                                },
1448                                other => {
1449                                    yield Ok(StreamDelta::Warning(
1450                                        ModelWarning::UnknownStopReason {
1451                                            raw: other.to_owned(),
1452                                        },
1453                                    ));
1454                                    StopReason::Other {
1455                                        raw: other.to_owned(),
1456                                    }
1457                                }
1458                            };
1459                        }
1460                        if let Some(usage) = event.get("usage") {
1461                            accumulated_usage.output_tokens =
1462                                u_field(Some(usage), "output_tokens");
1463                            yield Ok(StreamDelta::Usage(accumulated_usage.clone()));
1464                        }
1465                    }
1466                    "message_stop" => {
1467                        yield Ok(StreamDelta::Stop {
1468                            stop_reason: last_stop_reason.clone(),
1469                        });
1470                    }
1471                    "ping" => {
1472                        // Heartbeat — ignore.
1473                    }
1474                    "error" => {
1475                        let err = event.get("error").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates through child .get() chain as None
1476                        let kind = str_field(err, "type");
1477                        let message = str_field(err, "message");
1478                        yield Err(Error::provider_network(format!(
1479                            "Anthropic stream error ({kind}): {message}"
1480                        )));
1481                        return;
1482                    }
1483                    other => {
1484                        yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1485                            field: "stream.event".into(),
1486                            detail: format!("unknown SSE event type {other:?} ignored"),
1487                        }));
1488                    }
1489                }
1490            }
1491        }
1492    }
1493}
1494
1495/// Find the byte offset of the first `\n\n` in `buf`. Also handles
1496/// `\r\n\r\n` because SSE uses CRLF in the wild.
1497fn find_double_newline(buf: &[u8]) -> Option<usize> {
1498    let lf = buf.windows(2).position(|w| w == b"\n\n");
1499    let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1500    match (lf, crlf) {
1501        (Some(a), Some(b)) => Some(a.min(b)),
1502        (Some(a), None) => Some(a),
1503        (None, Some(b)) => Some(b),
1504        (None, None) => None,
1505    }
1506}
1507
1508/// Pull the JSON payload out of an SSE frame, concatenating multi-line
1509/// `data:` lines per the spec. Returns `None` for frames with no
1510/// `data:` line.
1511fn parse_sse_data(frame: &str) -> Option<String> {
1512    let mut out: Option<String> = None;
1513    for line in frame.lines() {
1514        if let Some(rest) = line.strip_prefix("data:") {
1515            let trimmed = rest.strip_prefix(' ').unwrap_or(rest); // silent-fallback-ok: SSE data line may or may not have leading space; idiomatic strip-or-pass-through
1516            match &mut out {
1517                Some(existing) => {
1518                    existing.push('\n');
1519                    existing.push_str(trimmed);
1520                }
1521                None => out = Some(trimmed.to_owned()),
1522            }
1523        }
1524    }
1525    out
1526}