Skip to main content

entelix_core/codecs/
anthropic.rs

1//! `AnthropicMessagesCodec` — IR ⇄ Anthropic Messages API
2//! (`POST /v1/messages`).
3//!
4//! Wire format reference: <https://docs.anthropic.com/en/api/messages>.
5//!
6//! Notable mappings:
7//!
8//! - IR `system: Option<String>` → top-level `system` field.
9//! - IR `Role::System` messages → flattened into `system` (Anthropic has no
10//!   system role inside `messages`). Multiple system inputs are joined with
11//!   blank lines.
12//! - IR `Role::Tool` messages → `role: "user"` with `tool_result` blocks
13//!   (Anthropic encodes tool replies as user-authored content).
14//! - IR `ToolChoice` → `tool_choice` object (`auto` / `any` / `tool` /
15//!   `none`).
16//! - IR `Usage` cache fields ↔ Anthropic
17//!   `cache_creation_input_tokens` / `cache_read_input_tokens`.
18//!
19//! Any feature the codec cannot preserve emits a
20//! [`crate::ir::ModelWarning::LossyEncode`] (invariant 6).
21
22#![allow(clippy::cast_possible_truncation)] // u64 → u32 token counts; saturates above
23
24use std::collections::HashMap;
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{BoxByteStream, BoxDeltaStream, Codec, EncodedRequest};
31use crate::error::{Error, Result};
32use crate::ir::{
33    Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
34    ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, RefusalReason,
35    ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent, Usage,
36};
37use crate::rate_limit::RateLimitSnapshot;
38use crate::stream::StreamDelta;
39
40/// Provider key for [`AnthropicMessagesCodec`] — matches `Codec::name`
41/// and identifies this vendor's entries in [`ProviderEchoSnapshot`].
42const PROVIDER_KEY: &str = "anthropic-messages";
43
44const ANTHROPIC_VERSION: &str = "2023-06-01";
45
46/// Stateless codec for Anthropic's Messages API.
47///
48/// Construct once at agent build time and reuse — there is no internal
49/// state, just static behaviour.
50#[derive(Clone, Copy, Debug, Default)]
51pub struct AnthropicMessagesCodec;
52
53impl AnthropicMessagesCodec {
54    /// Create a fresh codec instance.
55    pub const fn new() -> Self {
56        Self
57    }
58}
59
60impl Codec for AnthropicMessagesCodec {
61    fn name(&self) -> &'static str {
62        PROVIDER_KEY
63    }
64
65    fn capabilities(&self, _model: &str) -> Capabilities {
66        // Conservative full-feature defaults that match the documented
67        // Messages API. Per-model precision (vision flag for haiku-3 vs
68        // opus-4 etc.) lands in a follow-up slice.
69        Capabilities {
70            streaming: true,
71            tools: true,
72            multimodal_image: true,
73            multimodal_audio: false,
74            multimodal_video: false,
75            multimodal_document: true,
76            system_prompt: true,
77            structured_output: true,
78            prompt_caching: true,
79            thinking: true,
80            citations: true,
81            web_search: true,
82            computer_use: true,
83            max_context_tokens: 200_000,
84        }
85    }
86
87    fn auto_output_strategy(&self, _model: &str) -> crate::ir::OutputStrategy {
88        // Anthropic's native `output_config.format = json_schema`
89        // is newer than the tool-call surface and ships without a
90        // `strict` toggle — every `strict=true` request emits
91        // `LossyEncode { field: "response_format.strict" }`. The
92        // forced-tool surface is more mature, more consistent with
93        // every Anthropic version, and accepts arbitrary JSON
94        // schemas without strict-mode constraints. Pick Tool until
95        // Anthropic ships a native channel with parity.
96        crate::ir::OutputStrategy::Tool
97    }
98
99    fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
100        let (body, warnings) = build_body(request, false)?;
101        let mut encoded = finalize_request(&body, warnings)?;
102        apply_anthropic_beta_header(&mut encoded, request)?;
103        Ok(encoded)
104    }
105
106    fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
107        let (body, warnings) = build_body(request, true)?;
108        let mut encoded = finalize_request(&body, warnings)?;
109        encoded.headers.insert(
110            http::header::ACCEPT,
111            http::HeaderValue::from_static("text/event-stream"),
112        );
113        apply_anthropic_beta_header(&mut encoded, request)?;
114        Ok(encoded.into_streaming())
115    }
116
117    fn decode_stream<'a>(
118        &'a self,
119        bytes: BoxByteStream<'a>,
120        warnings_in: Vec<ModelWarning>,
121    ) -> BoxDeltaStream<'a> {
122        Box::pin(stream_anthropic_sse(bytes, warnings_in))
123    }
124
125    fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
126        let raw: Value = super::codec::parse_response_body(body, "Anthropic Messages")?;
127        let mut warnings = warnings_in;
128
129        let id = str_field(&raw, "id").to_owned();
130        let model = str_field(&raw, "model").to_owned();
131        let content = decode_content(&raw, &mut warnings);
132        let stop_reason = decode_stop_reason(&raw, &mut warnings);
133        let usage = decode_usage(&raw);
134
135        Ok(ModelResponse {
136            id,
137            model,
138            stop_reason,
139            content,
140            usage,
141            rate_limit: None,
142            warnings,
143            provider_echoes: Vec::new(),
144        })
145    }
146
147    fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
148        let mut snapshot = RateLimitSnapshot::default();
149        let mut populated = false;
150        for (header_name, target) in [
151            (
152                "anthropic-ratelimit-requests-remaining",
153                &mut snapshot.requests_remaining,
154            ),
155            (
156                "anthropic-ratelimit-tokens-remaining",
157                &mut snapshot.tokens_remaining,
158            ),
159        ] {
160            if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
161                && let Ok(parsed) = v.parse::<u64>()
162            {
163                *target = Some(parsed);
164                snapshot.raw.insert(header_name.to_owned(), v.to_owned());
165                populated = true;
166            }
167        }
168        for (header_name, target) in [
169            (
170                "anthropic-ratelimit-requests-reset",
171                &mut snapshot.requests_reset_at,
172            ),
173            (
174                "anthropic-ratelimit-tokens-reset",
175                &mut snapshot.tokens_reset_at,
176            ),
177        ] {
178            if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
179                && let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(v)
180            {
181                *target = Some(parsed.with_timezone(&chrono::Utc));
182                snapshot.raw.insert(header_name.to_owned(), v.to_owned());
183                populated = true;
184            }
185        }
186        populated.then_some(snapshot)
187    }
188}
189
190// ── body / request helpers ─────────────────────────────────────────────────
191
192fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
193    if request.messages.is_empty() {
194        return Err(Error::invalid_request(
195            "Anthropic Messages requires at least one message",
196        ));
197    }
198
199    // Anthropic Messages requires `max_tokens` — the wire field has
200    // no default. Silently injecting one would mask "model truncated
201    // unexpectedly" failures (the operator sees `stop_reason:
202    // max_tokens` without realising the codec, not the caller, set
203    // the cap). Invariant #15 — fail loud when the IR is missing a
204    // mandatory vendor field.
205    let max_tokens = request.max_tokens.ok_or_else(|| {
206        Error::invalid_request(
207            "Anthropic Messages requires max_tokens; \
208             set ModelRequest::max_tokens explicitly",
209        )
210    })?;
211
212    // Most encodes produce 0–1 warnings; pre-allocate one slot so the
213    // first push doesn't reallocate.
214    let mut warnings = Vec::with_capacity(1);
215    let (system_value, wire_messages) = encode_messages(request, &mut warnings);
216
217    let mut body = Map::new();
218    body.insert("model".into(), Value::String(request.model.clone()));
219    body.insert("messages".into(), Value::Array(wire_messages));
220    body.insert("max_tokens".into(), json!(max_tokens));
221    if let Some(value) = system_value {
222        body.insert("system".into(), value);
223    }
224    if let Some(temp) = request.temperature {
225        body.insert("temperature".into(), json!(temp));
226    }
227    if let Some(k) = request.top_k {
228        body.insert("top_k".into(), json!(k));
229    }
230    if let Some(p) = request.top_p {
231        body.insert("top_p".into(), json!(p));
232    }
233    if !request.stop_sequences.is_empty() {
234        body.insert(
235            "stop_sequences".into(),
236            json!(request.stop_sequences.clone()),
237        );
238    }
239    if !request.tools.is_empty() {
240        body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
241        body.insert(
242            "tool_choice".into(),
243            encode_tool_choice(&request.tool_choice),
244        );
245    }
246    if streaming {
247        body.insert("stream".into(), Value::Bool(true));
248    }
249    if let Some(format) = &request.response_format {
250        encode_anthropic_structured_output(format, &request.model, &mut body, &mut warnings)?;
251    }
252    apply_provider_extensions(request, &mut body, &mut warnings);
253    Ok((Value::Object(body), warnings))
254}
255
256/// Merge [`crate::ir::AnthropicExt::betas`] into the
257/// `anthropic-beta` HTTP header. The Anthropic Messages API
258/// documents one comma-separated value per request; the codec
259/// emits no header when the operator's beta list is empty.
260fn apply_anthropic_beta_header(encoded: &mut EncodedRequest, request: &ModelRequest) -> Result<()> {
261    let Some(anthropic) = &request.provider_extensions.anthropic else {
262        return Ok(());
263    };
264    if anthropic.betas.is_empty() {
265        return Ok(());
266    }
267    let value = anthropic.betas.join(",");
268    let header = http::HeaderValue::from_str(&value).map_err(|_| {
269        crate::Error::invalid_request(
270            "AnthropicExt::betas: each entry must contain only visible ASCII",
271        )
272    })?;
273    encoded
274        .headers
275        .insert(http::HeaderName::from_static("anthropic-beta"), header);
276    Ok(())
277}
278
279/// Read [`crate::ir::AnthropicExt`] and merge each set field into the
280/// wire body. Foreign-vendor extensions surface as
281/// [`ModelWarning::ProviderExtensionIgnored`] — the operator
282/// expressed an intent the Anthropic format cannot honour.
283fn apply_provider_extensions(
284    request: &ModelRequest,
285    body: &mut Map<String, Value>,
286    warnings: &mut Vec<ModelWarning>,
287) {
288    let ext = &request.provider_extensions;
289    // IR `parallel_tool_calls` translates onto Anthropic's
290    // `tool_choice.disable_parallel_tool_use` (inverted polarity).
291    // Anthropic only accepts the toggle inside a `tool_choice` block,
292    // which is only emitted when `tools` is non-empty. When the
293    // operator sets `parallel_tool_calls` with an empty tools list,
294    // surface the loss instead of silently discarding the knob.
295    if let Some(parallel) = request.parallel_tool_calls {
296        if let Some(tc) = body.get_mut("tool_choice").and_then(Value::as_object_mut) {
297            tc.insert("disable_parallel_tool_use".into(), json!(!parallel));
298        } else {
299            warnings.push(ModelWarning::LossyEncode {
300                field: "parallel_tool_calls".into(),
301                detail: "Anthropic encodes via tool_choice.disable_parallel_tool_use; \
302                         no tool_choice block (tools list empty) — knob discarded"
303                    .into(),
304            });
305        }
306    }
307    if let Some(user_id) = &request.end_user_id {
308        body.insert("metadata".into(), json!({"user_id": user_id}));
309    }
310    if request.seed.is_some() {
311        warnings.push(ModelWarning::LossyEncode {
312            field: "seed".into(),
313            detail: "Anthropic Messages has no deterministic-sampling knob — drop the field".into(),
314        });
315    }
316    if let Some(effort) = &request.reasoning_effort {
317        encode_anthropic_thinking(&request.model, effort, body, warnings);
318    }
319    if ext.openai_chat.is_some() {
320        warnings.push(ModelWarning::ProviderExtensionIgnored {
321            vendor: "openai_chat".into(),
322        });
323    }
324    if ext.openai_responses.is_some() {
325        warnings.push(ModelWarning::ProviderExtensionIgnored {
326            vendor: "openai_responses".into(),
327        });
328    }
329    if ext.gemini.is_some() {
330        warnings.push(ModelWarning::ProviderExtensionIgnored {
331            vendor: "gemini".into(),
332        });
333    }
334    if ext.bedrock.is_some() {
335        warnings.push(ModelWarning::ProviderExtensionIgnored {
336            vendor: "bedrock".into(),
337        });
338    }
339}
340
341/// Resolve [`OutputStrategy::Auto`] against the codec's preferred
342/// dispatch shape, then emit either the native `output_config`
343/// shape or the forced-tool shape into `body`. `Prompted` is
344/// rejected (1.1 — currently unsupported on every codec).
345fn encode_anthropic_structured_output(
346    format: &ResponseFormat,
347    model: &str,
348    body: &mut Map<String, Value>,
349    warnings: &mut Vec<ModelWarning>,
350) -> Result<()> {
351    // Operator-supplied `response_format` schemas route through the
352    // same envelope strip every advertised tool receives via
353    // `SchemaToolAdapter` — both Native (`output_config.format.schema`)
354    // and Tool (`tools[0].input_schema`) paths consume the
355    // pre-stripped form.
356    let stripped = crate::LlmFacingSchema::strip(&format.json_schema.schema);
357    let strategy = resolve_output_strategy(format.strategy, model);
358    match strategy {
359        OutputStrategy::Native => {
360            // Anthropic native structured outputs — `output_config`
361            // at the request root, raw JSON Schema (no wrapper, no
362            // strict toggle). Anthropic always strict-validates
363            // when the field is set.
364            body.insert(
365                "output_config".into(),
366                json!({
367                    "format": {
368                        "type": "json_schema",
369                        "schema": stripped,
370                    }
371                }),
372            );
373            if !format.strict {
374                warnings.push(ModelWarning::LossyEncode {
375                    field: "response_format.strict".into(),
376                    detail: "Anthropic always strict-validates structured output; \
377                         the strict=false request was approximated"
378                        .into(),
379                });
380            }
381        }
382        OutputStrategy::Tool => {
383            // Forced single tool call carrying the target schema.
384            // Mature surface, parity with every Anthropic version,
385            // accepts arbitrary JSON schemas without strict-mode
386            // constraints. The codec injects the synthetic tool
387            // and a `tool_choice` of `{type: "tool", name: ...}`
388            // ahead of any operator-supplied tools so the model
389            // emits exactly one `tool_use` block whose input
390            // matches the target schema.
391            let tool_name = format.json_schema.name.clone();
392            let synthetic_tool = json!({
393                "type": "custom",
394                "name": tool_name,
395                "description": format!(
396                    "Emit the response as a JSON object matching the {tool_name} schema."
397                ),
398                "input_schema": stripped,
399            });
400            // Prepend the structured-output tool so the model sees
401            // it first. Operator tools survive; the new
402            // `tool_choice` overrides any prior selection because
403            // structured-output dispatch demands a forced single
404            // tool call.
405            let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
406            if let Value::Array(arr) = tools {
407                arr.insert(0, synthetic_tool);
408            }
409            body.insert(
410                "tool_choice".into(),
411                json!({
412                    "type": "tool",
413                    "name": format.json_schema.name,
414                    // Disable parallel tool use so the model cannot
415                    // emit multiple tool_use blocks alongside the
416                    // structured-output call.
417                    "disable_parallel_tool_use": true,
418                }),
419            );
420            if !format.strict {
421                // Tool-input-schema validation is enforced at
422                // construction time by Anthropic regardless of the
423                // strict flag — surface the loss.
424                warnings.push(ModelWarning::LossyEncode {
425                    field: "response_format.strict".into(),
426                    detail: "Anthropic Tool-strategy structured output is always \
427                         schema-validated; strict=false was approximated"
428                        .into(),
429                });
430            }
431        }
432        OutputStrategy::Prompted => {
433            return Err(Error::invalid_request(
434                "OutputStrategy::Prompted is deferred to entelix 1.1; use \
435                 OutputStrategy::Native or OutputStrategy::Tool",
436            ));
437        }
438        OutputStrategy::Auto => {
439            // Resolved above — unreachable. Defensive arm.
440            return Err(Error::invalid_request(
441                "OutputStrategy::Auto did not resolve — codec invariant violation",
442            ));
443        }
444    }
445    Ok(())
446}
447
448/// Resolve [`OutputStrategy::Auto`] to Anthropic's preferred
449/// dispatch shape — currently `Tool` (the native `output_config`
450/// channel ships without a strict toggle and is less mature than
451/// the tool-call surface). The explicit per-variant arms keep the
452/// resolver readable as the cross-vendor `OutputStrategy` enum
453/// gains future variants — Clippy's `match_same_arms` would have
454/// us merge the identity arms but that hides the explicit
455/// per-variant intent.
456#[allow(clippy::match_same_arms)]
457const fn resolve_output_strategy(strategy: OutputStrategy, _model: &str) -> OutputStrategy {
458    match strategy {
459        OutputStrategy::Auto => OutputStrategy::Tool,
460        OutputStrategy::Native => OutputStrategy::Native,
461        OutputStrategy::Tool => OutputStrategy::Tool,
462        OutputStrategy::Prompted => OutputStrategy::Prompted,
463    }
464}
465
466/// Anthropic Opus 4.7 budget tokens — adaptive-only, manual budget
467/// rejected at encode time. Sonnet 4.6 / 4.5 / Haiku accept either
468/// adaptive or explicit budget. The codec branches on the model
469/// string prefix because Anthropic does not expose a wire-level
470/// "this model is adaptive-only" signal — the constraint is
471/// vendor-side request validation that surfaces as 4xx without
472/// useful diagnostic context.
473fn is_anthropic_adaptive_only(model: &str) -> bool {
474    // Opus 4.7 is the only adaptive-only model in Anthropic's 2026
475    // lineup. Future models that ship adaptive-only join this list.
476    model.starts_with("claude-opus-4-7")
477}
478
479/// Translate the cross-vendor [`ReasoningEffort`] knob onto the
480/// Anthropic Messages API `thinking` field. Per:
481///
482/// - `Off` → `{type:"disabled"}`
483/// - `Minimal` → `{type:"adaptive", effort:"low"}` (LossyEncode —
484///   Anthropic's smallest adaptive bucket; closer than `enabled`
485///   with a sub-1024 budget which the vendor rejects)
486/// - `Low` → `{type:"enabled", budget_tokens:1024}` (or adaptive on
487///   Opus 4.7)
488/// - `Medium` → `{type:"enabled", budget_tokens:4096}` (or adaptive
489///   on Opus 4.7)
490/// - `High` → `{type:"enabled", budget_tokens:16384}` (or adaptive
491///   on Opus 4.7)
492/// - `Auto` → `{type:"adaptive"}`
493/// - `VendorSpecific(s)` — `s` parses as decimal `budget_tokens`
494///   (Opus 4.7 rejects this with `Error::invalid_request`).
495///   Non-numeric `s` emits `LossyEncode` and falls through to
496///   `Medium`.
497fn encode_anthropic_thinking(
498    model: &str,
499    effort: &ReasoningEffort,
500    body: &mut Map<String, Value>,
501    warnings: &mut Vec<ModelWarning>,
502) {
503    let adaptive_only = is_anthropic_adaptive_only(model);
504    let thinking = match effort {
505        ReasoningEffort::Off => {
506            json!({"type": "disabled"})
507        }
508        ReasoningEffort::Minimal => {
509            warnings.push(ModelWarning::LossyEncode {
510                field: "reasoning_effort".into(),
511                detail:
512                    "Anthropic has no `Minimal` bucket — snapped to `{type:\"adaptive\", effort:\"low\"}`"
513                        .into(),
514            });
515            json!({"type": "adaptive", "effort": "low"})
516        }
517        ReasoningEffort::Low => {
518            if adaptive_only {
519                json!({"type": "adaptive", "effort": "low"})
520            } else {
521                json!({"type": "enabled", "budget_tokens": 1024})
522            }
523        }
524        ReasoningEffort::Medium => {
525            if adaptive_only {
526                json!({"type": "adaptive", "effort": "medium"})
527            } else {
528                json!({"type": "enabled", "budget_tokens": 4096})
529            }
530        }
531        ReasoningEffort::High => {
532            if adaptive_only {
533                json!({"type": "adaptive", "effort": "high"})
534            } else {
535                json!({"type": "enabled", "budget_tokens": 16384})
536            }
537        }
538        ReasoningEffort::Auto => {
539            json!({"type": "adaptive"})
540        }
541        ReasoningEffort::VendorSpecific(literal) => {
542            if adaptive_only {
543                // Opus 4.7 manual-budget rejection — encode-time
544                // failure with a useful diagnostic. Falling back
545                // silently would let the operator's intent (raw
546                // budget tokens) hit the wire and surface as a
547                // 4xx with vague upstream wording.
548                warnings.push(ModelWarning::LossyEncode {
549                    field: "reasoning_effort".into(),
550                    detail: format!(
551                        "Anthropic {model} is adaptive-only — manual budget '{literal}' \
552                         dropped; emitting `{{type:\"adaptive\"}}` instead"
553                    ),
554                });
555                json!({"type": "adaptive"})
556            } else if let Ok(budget) = literal.parse::<u32>() {
557                json!({"type": "enabled", "budget_tokens": budget})
558            } else {
559                warnings.push(ModelWarning::LossyEncode {
560                    field: "reasoning_effort".into(),
561                    detail: format!(
562                        "Anthropic vendor-specific reasoning_effort {literal:?} is not a \
563                         numeric budget_tokens — falling through to `Medium`"
564                    ),
565                });
566                json!({"type": "enabled", "budget_tokens": 4096})
567            }
568        }
569    };
570    body.insert("thinking".into(), thinking);
571}
572
573fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
574    let bytes = serde_json::to_vec(body)?;
575    let mut encoded = EncodedRequest::post_json("/v1/messages", Bytes::from(bytes));
576    encoded.headers.insert(
577        http::HeaderName::from_static("anthropic-version"),
578        http::HeaderValue::from_static(ANTHROPIC_VERSION),
579    );
580    encoded.warnings = warnings;
581    Ok(encoded)
582}
583
584// ── encode helpers ─────────────────────────────────────────────────────────
585
586fn encode_messages(
587    request: &ModelRequest,
588    warnings: &mut Vec<ModelWarning>,
589) -> (Option<Value>, Vec<Value>) {
590    // Two collection paths:
591    // - If any IR system block carries cache_control, we emit the
592    //   array form (preserving per-block cache directives).
593    // - Otherwise the simple string form is sufficient.
594    let mut system_blocks: Vec<(String, Option<crate::ir::CacheControl>)> = request
595        .system
596        .blocks()
597        .iter()
598        .map(|b| (b.text.clone(), b.cache_control))
599        .collect();
600    let mut wire_messages = Vec::with_capacity(request.messages.len());
601
602    for (idx, msg) in request.messages.iter().enumerate() {
603        match msg.role {
604            Role::System => {
605                let mut lossy_non_text = false;
606                let text = msg
607                    .content
608                    .iter()
609                    .filter_map(|part| {
610                        if let ContentPart::Text { text, .. } = part {
611                            Some(text.clone())
612                        } else {
613                            lossy_non_text = true;
614                            None
615                        }
616                    })
617                    .collect::<Vec<_>>()
618                    .join("\n");
619                if lossy_non_text {
620                    warnings.push(ModelWarning::LossyEncode {
621                        field: format!("messages[{idx}].content"),
622                        detail: "non-text parts dropped from system message (Anthropic has no \
623                                 system role)"
624                            .into(),
625                    });
626                }
627                if !text.is_empty() {
628                    system_blocks.push((text, None));
629                }
630            }
631            Role::User | Role::Assistant | Role::Tool => {
632                let role_str = match msg.role {
633                    Role::Assistant => "assistant",
634                    _ => "user",
635                };
636                let content_array = encode_content_parts(&msg.content, warnings, idx);
637                let mut entry = Map::new();
638                entry.insert("role".into(), Value::String(role_str.into()));
639                entry.insert("content".into(), Value::Array(content_array));
640                wire_messages.push(Value::Object(entry));
641            }
642        }
643    }
644
645    // Emit the array form when any block has cache_control set;
646    // otherwise the simpler string form is enough — minimizing
647    // wire bytes when callers don't need caching.
648    let any_cached = system_blocks.iter().any(|(_, cc)| cc.is_some());
649    let system_value = if system_blocks.is_empty() {
650        None
651    } else if any_cached {
652        let array: Vec<Value> = system_blocks
653            .into_iter()
654            .map(|(text, cc)| {
655                let mut obj = Map::new();
656                obj.insert("type".into(), Value::String("text".into()));
657                obj.insert("text".into(), Value::String(text));
658                if let Some(cache) = cc {
659                    obj.insert("cache_control".into(), encode_cache_control(cache));
660                }
661                Value::Object(obj)
662            })
663            .collect();
664        Some(Value::Array(array))
665    } else {
666        Some(Value::String(
667            system_blocks
668                .into_iter()
669                .map(|(text, _)| text)
670                .collect::<Vec<_>>()
671                .join("\n\n"),
672        ))
673    };
674    (system_value, wire_messages)
675}
676
677#[allow(clippy::too_many_lines)] // dispatch over every ContentPart variant
678fn encode_content_parts(
679    parts: &[ContentPart],
680    warnings: &mut Vec<ModelWarning>,
681    msg_idx: usize,
682) -> Vec<Value> {
683    let mut out = Vec::with_capacity(parts.len());
684    for (part_idx, part) in parts.iter().enumerate() {
685        let path = || format!("messages[{msg_idx}].content[{part_idx}]");
686        match part {
687            ContentPart::Text {
688                text,
689                cache_control,
690                ..
691            } => {
692                let mut block = json_block("text", &[("text", Value::String(text.clone()))]);
693                attach_cache_control(&mut block, cache_control.as_ref());
694                out.push(Value::Object(block));
695            }
696            ContentPart::Image {
697                source,
698                cache_control,
699                ..
700            } => {
701                let mut block = json_block(
702                    "image",
703                    &[("source", encode_media_source_anthropic(source))],
704                );
705                attach_cache_control(&mut block, cache_control.as_ref());
706                out.push(Value::Object(block));
707            }
708            ContentPart::Audio { .. } => warnings.push(ModelWarning::LossyEncode {
709                field: path(),
710                detail: "Anthropic Messages does not accept audio inputs; block dropped".into(),
711            }),
712            ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
713                field: path(),
714                detail: "Anthropic Messages does not accept video inputs; block dropped".into(),
715            }),
716            ContentPart::Document {
717                source,
718                name,
719                cache_control,
720                ..
721            } => {
722                let mut block = Map::new();
723                block.insert("type".into(), Value::String("document".into()));
724                block.insert("source".into(), encode_media_source_anthropic(source));
725                if let Some(title) = name {
726                    block.insert("title".into(), Value::String(title.clone()));
727                }
728                attach_cache_control(&mut block, cache_control.as_ref());
729                out.push(Value::Object(block));
730            }
731            ContentPart::Thinking {
732                text,
733                cache_control,
734                provider_echoes,
735            } => {
736                let mut block = Map::new();
737                block.insert("type".into(), Value::String("thinking".into()));
738                block.insert("thinking".into(), Value::String(text.clone()));
739                if let Some(sig) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
740                    .and_then(|e| e.payload_str("signature"))
741                {
742                    block.insert("signature".into(), Value::String(sig.to_owned()));
743                }
744                attach_cache_control(&mut block, cache_control.as_ref());
745                out.push(Value::Object(block));
746            }
747            ContentPart::RedactedThinking { provider_echoes } => {
748                let Some(data) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
749                    .and_then(|e| e.payload_str("data"))
750                else {
751                    // No matching opaque blob — without the original
752                    // ciphertext the wire shape is unrepresentable.
753                    warnings.push(ModelWarning::LossyEncode {
754                        field: path(),
755                        detail: "redacted_thinking part missing 'anthropic-messages' \
756                                 provider_echo with 'data' payload; block dropped"
757                            .into(),
758                    });
759                    continue;
760                };
761                let mut block = Map::new();
762                block.insert("type".into(), Value::String("redacted_thinking".into()));
763                block.insert("data".into(), Value::String(data.to_owned()));
764                out.push(Value::Object(block));
765            }
766            ContentPart::Citation {
767                snippet,
768                source,
769                cache_control,
770                ..
771            } => {
772                // Anthropic carries citations inline on `text` blocks; round-tripping
773                // a standalone Citation IR variant means re-emitting the cited text
774                // with a `citations` array attached.
775                let citation_json = match source {
776                    CitationSource::Url { url, title } => {
777                        let mut o = Map::new();
778                        o.insert(
779                            "type".into(),
780                            Value::String("web_search_result_location".into()),
781                        );
782                        o.insert("url".into(), Value::String(url.clone()));
783                        if let Some(t) = title {
784                            o.insert("title".into(), Value::String(t.clone()));
785                        }
786                        Value::Object(o)
787                    }
788                    CitationSource::Document {
789                        document_index,
790                        title,
791                    } => {
792                        let mut o = Map::new();
793                        o.insert("type".into(), Value::String("char_location".into()));
794                        o.insert("document_index".into(), json!(*document_index));
795                        if let Some(t) = title {
796                            o.insert("document_title".into(), Value::String(t.clone()));
797                        }
798                        Value::Object(o)
799                    }
800                };
801                let mut block = Map::new();
802                block.insert("type".into(), Value::String("text".into()));
803                block.insert("text".into(), Value::String(snippet.clone()));
804                block.insert("citations".into(), Value::Array(vec![citation_json]));
805                attach_cache_control(&mut block, cache_control.as_ref());
806                out.push(Value::Object(block));
807            }
808            ContentPart::ToolUse {
809                id, name, input, ..
810            } => out.push(json!({
811                "type": "tool_use",
812                "id": id,
813                "name": name,
814                "input": input,
815            })),
816            ContentPart::ToolResult {
817                tool_use_id,
818                name: _,
819                content,
820                is_error,
821                cache_control,
822                ..
823            } => out.push(encode_tool_result(
824                tool_use_id,
825                content,
826                *is_error,
827                cache_control.as_ref(),
828                warnings,
829                msg_idx,
830                part_idx,
831            )),
832            ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
833                // Assistant-produced image/audio is output-only; no
834                // Anthropic input shape consumes it. Drop with a
835                // LossyEncode warning so a multi-turn replay that
836                // happens to surface the part is not silently
837                // mis-encoded as user input.
838                warnings.push(ModelWarning::LossyEncode {
839                    field: path(),
840                    detail: "Anthropic Messages does not accept assistant-produced \
841                             image / audio output as input — block dropped"
842                        .into(),
843                });
844            }
845        }
846    }
847    out
848}
849
850/// Build a JSON object block of `{type: <kind>, ...fields}`.
851fn json_block(kind: &str, fields: &[(&str, Value)]) -> Map<String, Value> {
852    let mut block = Map::new();
853    block.insert("type".into(), Value::String(kind.into()));
854    for (k, v) in fields {
855        block.insert((*k).to_owned(), v.clone());
856    }
857    block
858}
859
860/// Insert Anthropic's `cache_control` block. The wire shape is
861/// always `{type: "ephemeral"}` plus an optional sibling `ttl: "1h"`
862/// for the premium tier — `type` never carries the TTL string.
863fn attach_cache_control(block: &mut Map<String, Value>, cache: Option<&crate::ir::CacheControl>) {
864    if let Some(cache) = cache {
865        block.insert("cache_control".into(), encode_cache_control(*cache));
866    }
867}
868
869/// Render a [`crate::ir::CacheControl`] as Anthropic's
870/// `cache_control` JSON value. Single source of truth for the wire
871/// shape — every encode path (system blocks, content blocks, tool
872/// blocks, tool_result blocks) goes through here.
873fn encode_cache_control(cache: crate::ir::CacheControl) -> Value {
874    let mut obj = Map::new();
875    obj.insert("type".into(), Value::String("ephemeral".into()));
876    if let Some(ttl) = cache.ttl.wire_ttl_field() {
877        obj.insert("ttl".into(), Value::String(ttl.into()));
878    }
879    Value::Object(obj)
880}
881
882fn encode_media_source_anthropic(source: &MediaSource) -> Value {
883    match source {
884        MediaSource::Url { url, .. } => json!({
885            "type": "url",
886            "url": url,
887        }),
888        MediaSource::Base64 { media_type, data } => json!({
889            "type": "base64",
890            "media_type": media_type,
891            "data": data,
892        }),
893        MediaSource::FileId { id, .. } => json!({
894            "type": "file",
895            "file_id": id,
896        }),
897    }
898}
899
900fn encode_tool_result(
901    tool_use_id: &str,
902    content: &ToolResultContent,
903    is_error: bool,
904    cache_control: Option<&crate::ir::CacheControl>,
905    warnings: &mut Vec<ModelWarning>,
906    msg_idx: usize,
907    part_idx: usize,
908) -> Value {
909    let content_json = match content {
910        ToolResultContent::Text(s) => Value::String(s.clone()),
911        ToolResultContent::Json(v) => {
912            warnings.push(ModelWarning::LossyEncode {
913                field: format!("messages[{msg_idx}].content[{part_idx}]"),
914                detail: "tool_result Json payload stringified for Anthropic wire format".into(),
915            });
916            Value::String(v.to_string())
917        }
918    };
919    let mut block = Map::new();
920    block.insert("type".into(), Value::String("tool_result".into()));
921    block.insert("tool_use_id".into(), Value::String(tool_use_id.into()));
922    block.insert("content".into(), content_json);
923    if is_error {
924        block.insert("is_error".into(), Value::Bool(true));
925    }
926    attach_cache_control(&mut block, cache_control);
927    Value::Object(block)
928}
929
930fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
931    let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
932    for (idx, t) in tools.iter().enumerate() {
933        let mut obj = match &t.kind {
934            ToolKind::Function { input_schema } => {
935                let mut o = Map::new();
936                o.insert("name".into(), Value::String(t.name.clone()));
937                o.insert("description".into(), Value::String(t.description.clone()));
938                o.insert("input_schema".into(), input_schema.clone());
939                o
940            }
941            ToolKind::WebSearch {
942                max_uses,
943                allowed_domains,
944            } => {
945                let mut o = Map::new();
946                o.insert("type".into(), Value::String("web_search_20250305".into()));
947                o.insert("name".into(), Value::String(t.name.clone()));
948                if let Some(n) = max_uses {
949                    o.insert("max_uses".into(), json!(*n));
950                }
951                if !allowed_domains.is_empty() {
952                    o.insert("allowed_domains".into(), json!(allowed_domains));
953                }
954                o
955            }
956            ToolKind::Computer {
957                display_width,
958                display_height,
959            } => {
960                let mut o = Map::new();
961                o.insert("type".into(), Value::String("computer_20250124".into()));
962                o.insert("name".into(), Value::String(t.name.clone()));
963                o.insert("display_width_px".into(), json!(*display_width));
964                o.insert("display_height_px".into(), json!(*display_height));
965                o
966            }
967            ToolKind::TextEditor => {
968                let mut o = Map::new();
969                o.insert("type".into(), Value::String("text_editor_20250124".into()));
970                o.insert("name".into(), Value::String(t.name.clone()));
971                o
972            }
973            ToolKind::Bash => {
974                let mut o = Map::new();
975                o.insert("type".into(), Value::String("bash_20250124".into()));
976                o.insert("name".into(), Value::String(t.name.clone()));
977                o
978            }
979            ToolKind::CodeExecution => {
980                let mut o = Map::new();
981                o.insert(
982                    "type".into(),
983                    Value::String("code_execution_20250522".into()),
984                );
985                o.insert("name".into(), Value::String(t.name.clone()));
986                o
987            }
988            ToolKind::McpConnector {
989                name,
990                server_url,
991                authorization_token,
992            } => {
993                let mut o = Map::new();
994                o.insert("type".into(), Value::String("mcp".into()));
995                o.insert("name".into(), Value::String(name.clone()));
996                o.insert("server_url".into(), Value::String(server_url.clone()));
997                if let Some(token) = authorization_token {
998                    o.insert("authorization_token".into(), Value::String(token.clone()));
999                }
1000                o
1001            }
1002            ToolKind::Memory => {
1003                let mut o = Map::new();
1004                o.insert("type".into(), Value::String("memory_20250818".into()));
1005                o.insert("name".into(), Value::String(t.name.clone()));
1006                o
1007            }
1008            ToolKind::FileSearch { .. } | ToolKind::CodeInterpreter | ToolKind::ImageGeneration => {
1009                warnings.push(ModelWarning::LossyEncode {
1010                    field: format!("tools[{idx}]"),
1011                    detail: "Anthropic does not natively support OpenAI-only built-ins \
1012                             (file_search / code_interpreter / image_generation) — tool dropped"
1013                        .into(),
1014                });
1015                continue;
1016            }
1017        };
1018        attach_cache_control(&mut obj, t.cache_control.as_ref());
1019        arr.push(Value::Object(obj));
1020    }
1021    Value::Array(arr)
1022}
1023
1024fn encode_tool_choice(choice: &ToolChoice) -> Value {
1025    match choice {
1026        ToolChoice::Auto => json!({ "type": "auto" }),
1027        ToolChoice::Required => json!({ "type": "any" }),
1028        ToolChoice::Specific { name } => json!({ "type": "tool", "name": name }),
1029        ToolChoice::None => json!({ "type": "none" }),
1030    }
1031}
1032
1033// ── decode helpers ─────────────────────────────────────────────────────────
1034
1035fn decode_content(raw: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
1036    let Some(arr) = raw.get("content").and_then(Value::as_array) else {
1037        return Vec::new();
1038    };
1039    let mut out = Vec::with_capacity(arr.len());
1040    for (idx, block) in arr.iter().enumerate() {
1041        match block.get("type").and_then(Value::as_str) {
1042            Some("text") => {
1043                let text = str_field(block, "text").to_owned();
1044                // If the block carries `citations`, emit them as separate
1045                // ContentPart::Citation entries followed by the underlying
1046                // text — preserves provenance while keeping the IR variant
1047                // discriminated.
1048                if let Some(citations) = block.get("citations").and_then(Value::as_array) {
1049                    for c in citations {
1050                        if let Some(source) = decode_citation_source(c) {
1051                            out.push(ContentPart::Citation {
1052                                snippet: text.clone(),
1053                                source,
1054                                cache_control: None,
1055                                provider_echoes: Vec::new(),
1056                            });
1057                        }
1058                    }
1059                }
1060                if !text.is_empty() {
1061                    out.push(ContentPart::text(text));
1062                }
1063            }
1064            Some("thinking") => {
1065                let thinking_text = str_field(block, "thinking").to_owned();
1066                let mut provider_echoes = Vec::new();
1067                if let Some(sig) = block
1068                    .get("signature")
1069                    .and_then(Value::as_str)
1070                    .filter(|s| !s.is_empty())
1071                {
1072                    provider_echoes.push(ProviderEchoSnapshot::for_provider(
1073                        PROVIDER_KEY,
1074                        "signature",
1075                        sig.to_owned(),
1076                    ));
1077                }
1078                out.push(ContentPart::Thinking {
1079                    text: thinking_text,
1080                    cache_control: None,
1081                    provider_echoes,
1082                });
1083            }
1084            Some("redacted_thinking") => {
1085                // The entire redacted block is opaque ciphertext —
1086                // round-trips through provider_echoes; nothing else.
1087                let data = str_field(block, "data").to_owned();
1088                out.push(ContentPart::RedactedThinking {
1089                    provider_echoes: vec![ProviderEchoSnapshot::for_provider(
1090                        PROVIDER_KEY,
1091                        "data",
1092                        data,
1093                    )],
1094                });
1095            }
1096            Some("tool_use") => {
1097                let id = str_field(block, "id").to_owned();
1098                let name = str_field(block, "name").to_owned();
1099                let input = block.get("input").cloned().unwrap_or_else(|| json!({})); // silent-fallback-ok: tool_use without args = empty-args call (vendor sometimes omits)
1100                out.push(ContentPart::ToolUse {
1101                    id,
1102                    name,
1103                    input,
1104                    provider_echoes: Vec::new(),
1105                });
1106            }
1107            Some(other) => {
1108                warnings.push(ModelWarning::LossyEncode {
1109                    field: format!("response.content[{idx}]"),
1110                    detail: format!("unknown content block type '{other}' dropped"),
1111                });
1112            }
1113            None => {
1114                warnings.push(ModelWarning::LossyEncode {
1115                    field: format!("response.content[{idx}]"),
1116                    detail: "content block missing 'type' field".into(),
1117                });
1118            }
1119        }
1120    }
1121    out
1122}
1123
1124fn decode_citation_source(c: &Value) -> Option<CitationSource> {
1125    match c.get("type").and_then(Value::as_str)? {
1126        "web_search_result_location" => Some(CitationSource::Url {
1127            url: str_field(c, "url").to_owned(),
1128            title: c.get("title").and_then(Value::as_str).map(str::to_owned),
1129        }),
1130        "char_location" | "page_location" | "content_block_location" => {
1131            // Invariant #15 — `document_index` is the
1132            // citation's load-bearing pointer; defaulting to 0
1133            // would silently rewrite "vendor failed to identify the
1134            // document" as "the first document". Drop the citation
1135            // entirely when the index is missing or unparseable so
1136            // operators see the absence rather than a wrong link.
1137            let document_index = c
1138                .get("document_index")
1139                .and_then(Value::as_u64)
1140                .and_then(|n| u32::try_from(n).ok())?;
1141            Some(CitationSource::Document {
1142                document_index,
1143                title: c
1144                    .get("document_title")
1145                    .and_then(Value::as_str)
1146                    .map(str::to_owned),
1147            })
1148        }
1149        _ => None,
1150    }
1151}
1152
1153fn decode_stop_reason(raw: &Value, warnings: &mut Vec<ModelWarning>) -> StopReason {
1154    match raw.get("stop_reason").and_then(Value::as_str) {
1155        Some("end_turn") => StopReason::EndTurn,
1156        Some("max_tokens") => StopReason::MaxTokens,
1157        Some("stop_sequence") => StopReason::StopSequence {
1158            sequence: str_field(raw, "stop_sequence").to_owned(),
1159        },
1160        Some("tool_use") => StopReason::ToolUse,
1161        Some("refusal") => StopReason::Refusal {
1162            reason: RefusalReason::Safety,
1163        },
1164        Some(other) => {
1165            warnings.push(ModelWarning::UnknownStopReason {
1166                raw: other.to_owned(),
1167            });
1168            StopReason::Other {
1169                raw: other.to_owned(),
1170            }
1171        }
1172        None => {
1173            // Invariant #15 — silent EndTurn fallback was masking
1174            // truncated stream payloads from callers. Record as
1175            // Other + warning so observability sees the loss.
1176            warnings.push(ModelWarning::LossyEncode {
1177                field: "stop_reason".into(),
1178                detail: "Anthropic Messages payload carried no stop_reason — \
1179                         IR records `Other{raw:\"missing\"}`"
1180                    .into(),
1181            });
1182            StopReason::Other {
1183                raw: "missing".to_owned(),
1184            }
1185        }
1186    }
1187}
1188
1189fn decode_usage(raw: &Value) -> Usage {
1190    let usage = raw.get("usage");
1191    Usage {
1192        input_tokens: u_field(usage, "input_tokens"),
1193        output_tokens: u_field(usage, "output_tokens"),
1194        cached_input_tokens: u_field(usage, "cache_read_input_tokens"),
1195        cache_creation_input_tokens: u_field(usage, "cache_creation_input_tokens"),
1196        reasoning_tokens: 0,
1197        safety_ratings: Vec::new(),
1198    }
1199}
1200
1201fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1202    v.get(key).and_then(Value::as_str).unwrap_or("") // silent-fallback-ok: missing optional string field
1203}
1204
1205fn u_field(v: Option<&Value>, key: &str) -> u32 {
1206    v.and_then(|inner| inner.get(key))
1207        .and_then(Value::as_u64)
1208        .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) // silent-fallback-ok: missing usage metric = 0 (vendor didn't report = unused); u64→u32 saturate
1209}
1210
1211// ── SSE streaming parser ───────────────────────────────────────────────────
1212
1213/// Per-block book-keeping the SSE parser needs to produce the right
1214/// `StreamDelta` on `content_block_stop`.
1215#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1216enum BlockKind {
1217    Text,
1218    Thinking,
1219    ToolUse,
1220}
1221
1222#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1223fn stream_anthropic_sse(
1224    bytes: BoxByteStream<'_>,
1225    warnings_in: Vec<ModelWarning>,
1226) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1227    async_stream::stream! {
1228        let mut bytes = bytes;
1229        let mut buf: Vec<u8> = Vec::new();
1230        let mut blocks: HashMap<u64, BlockKind> = HashMap::new();
1231        let mut last_stop_reason = StopReason::EndTurn;
1232        let mut accumulated_usage = Usage::default();
1233        let mut warnings_emitted = false;
1234
1235        while let Some(chunk) = bytes.next().await {
1236            match chunk {
1237                Ok(b) => buf.extend_from_slice(&b),
1238                Err(e) => {
1239                    yield Err(e);
1240                    return;
1241                }
1242            }
1243            // Emit any accumulated encode-time warnings on first byte.
1244            if !warnings_emitted {
1245                warnings_emitted = true;
1246                for w in &warnings_in {
1247                    yield Ok(StreamDelta::Warning(w.clone()));
1248                }
1249            }
1250            // Process every complete `\n\n`-terminated SSE frame.
1251            while let Some(pos) = find_double_newline(&buf) {
1252                let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1253                let Ok(frame_str) = std::str::from_utf8(&frame) else {
1254                    continue;
1255                };
1256                let Some(payload) = parse_sse_data(frame_str) else {
1257                    continue;
1258                };
1259                let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1260                    yield Err(Error::invalid_request(format!(
1261                        "Anthropic stream: malformed event payload: {payload}"
1262                    )));
1263                    return;
1264                };
1265                let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); // silent-fallback-ok: missing event type → no-match fallthrough
1266                match event_type {
1267                    "message_start" => {
1268                        let message = event.get("message").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates through child .get() chain as None
1269                        let id = str_field(message, "id").to_owned();
1270                        let model = str_field(message, "model").to_owned();
1271                        if let Some(usage) = message.get("usage") {
1272                            accumulated_usage.input_tokens = u_field(Some(usage), "input_tokens");
1273                            accumulated_usage.cached_input_tokens =
1274                                u_field(Some(usage), "cache_read_input_tokens");
1275                            accumulated_usage.cache_creation_input_tokens =
1276                                u_field(Some(usage), "cache_creation_input_tokens");
1277                        }
1278                        yield Ok(StreamDelta::Start {
1279                            id,
1280                            model,
1281                            provider_echoes: Vec::new(),
1282                        });
1283                    }
1284                    "content_block_start" => {
1285                        let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1286                            n
1287                        } else {
1288                            yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1289                                field: "stream.content_block_start.index".into(),
1290                                detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 to keep stream parser progressing".into(),
1291                            }));
1292                            0
1293                        };
1294                        let block = event.get("content_block").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1295                        match block.get("type").and_then(Value::as_str) {
1296                            Some("text") => {
1297                                blocks.insert(idx, BlockKind::Text);
1298                                if let Some(text) = block.get("text").and_then(Value::as_str)
1299                                    && !text.is_empty()
1300                                {
1301                                    yield Ok(StreamDelta::TextDelta {
1302                                        text: text.to_owned(),
1303                                        provider_echoes: Vec::new(),
1304                                    });
1305                                }
1306                            }
1307                            Some("thinking") => {
1308                                blocks.insert(idx, BlockKind::Thinking);
1309                                let text = block
1310                                    .get("thinking")
1311                                    .and_then(Value::as_str)
1312                                    .unwrap_or("") // silent-fallback-ok: missing thinking text → empty body; downstream is_empty() guard suppresses the StreamDelta
1313                                    .to_owned();
1314                                // Anthropic emits an empty `signature: ""`
1315                                // placeholder on content_block_start; the
1316                                // real signature arrives later via the
1317                                // discrete `signature_delta` event. Skip
1318                                // the placeholder so the aggregated echo
1319                                // list carries only the authoritative
1320                                // value.
1321                                let provider_echoes = block
1322                                    .get("signature")
1323                                    .and_then(Value::as_str)
1324                                    .filter(|s| !s.is_empty())
1325                                    .map(|sig| {
1326                                        vec![ProviderEchoSnapshot::for_provider(
1327                                            PROVIDER_KEY,
1328                                            "signature",
1329                                            sig.to_owned(),
1330                                        )]
1331                                    })
1332                                    .unwrap_or_default();
1333                                if !text.is_empty() || !provider_echoes.is_empty() {
1334                                    yield Ok(StreamDelta::ThinkingDelta {
1335                                        text,
1336                                        provider_echoes,
1337                                    });
1338                                }
1339                            }
1340                            Some("tool_use") => {
1341                                blocks.insert(idx, BlockKind::ToolUse);
1342                                let id = str_field(block, "id").to_owned();
1343                                let name = str_field(block, "name").to_owned();
1344                                yield Ok(StreamDelta::ToolUseStart {
1345                                    id,
1346                                    name,
1347                                    provider_echoes: Vec::new(),
1348                                });
1349                            }
1350                            other => {
1351                                yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1352                                    field: format!("stream.content_block_start[{idx}]"),
1353                                    detail: format!(
1354                                        "unsupported block type {other:?} dropped"
1355                                    ),
1356                                }));
1357                            }
1358                        }
1359                    }
1360                    "content_block_delta" => {
1361                        let delta = event.get("delta").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1362                        match delta.get("type").and_then(Value::as_str) {
1363                            Some("text_delta") => {
1364                                if let Some(text) = delta.get("text").and_then(Value::as_str) {
1365                                    yield Ok(StreamDelta::TextDelta {
1366                                        text: text.to_owned(),
1367                                        provider_echoes: Vec::new(),
1368                                    });
1369                                }
1370                            }
1371                            Some("thinking_delta") => {
1372                                if let Some(text) = delta.get("thinking").and_then(Value::as_str) {
1373                                    yield Ok(StreamDelta::ThinkingDelta {
1374                                        text: text.to_owned(),
1375                                        provider_echoes: Vec::new(),
1376                                    });
1377                                }
1378                            }
1379                            Some("signature_delta") => {
1380                                if let Some(sig) =
1381                                    delta.get("signature").and_then(Value::as_str)
1382                                {
1383                                    yield Ok(StreamDelta::ThinkingDelta {
1384                                        text: String::new(),
1385                                        provider_echoes: vec![
1386                                            ProviderEchoSnapshot::for_provider(
1387                                                PROVIDER_KEY,
1388                                                "signature",
1389                                                sig.to_owned(),
1390                                            ),
1391                                        ],
1392                                    });
1393                                }
1394                            }
1395                            Some("input_json_delta") => {
1396                                if let Some(partial) =
1397                                    delta.get("partial_json").and_then(Value::as_str)
1398                                {
1399                                    yield Ok(StreamDelta::ToolUseInputDelta {
1400                                        partial_json: partial.to_owned(),
1401                                    });
1402                                }
1403                            }
1404                            other => {
1405                                yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1406                                    field: "stream.content_block_delta".into(),
1407                                    detail: format!(
1408                                        "unsupported delta type {other:?} dropped"
1409                                    ),
1410                                }));
1411                            }
1412                        }
1413                    }
1414                    "content_block_stop" => {
1415                        let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1416                            n
1417                        } else {
1418                            yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1419                                field: "stream.content_block_stop.index".into(),
1420                                detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 (mirrors the content_block_start handler)".into(),
1421                            }));
1422                            0
1423                        };
1424                        if matches!(blocks.remove(&idx), Some(BlockKind::ToolUse)) {
1425                            yield Ok(StreamDelta::ToolUseStop);
1426                        }
1427                    }
1428                    "message_delta" => {
1429                        if let Some(delta) = event.get("delta")
1430                            && let Some(reason) =
1431                                delta.get("stop_reason").and_then(Value::as_str)
1432                        {
1433                            // Match the non-streaming `decode_stop_reason` exhaustiveness:
1434                            // every documented Anthropic stop reason maps explicitly,
1435                            // and any unknown raw value yields a `UnknownStopReason`
1436                            // warning delta so streaming and non-streaming clients see
1437                            // the same observability signal when a vendor extension
1438                            // arrives. Also extract `stop_sequence` so the matched
1439                            // sequence string is not dropped.
1440                            last_stop_reason = match reason {
1441                                "end_turn" => StopReason::EndTurn,
1442                                "max_tokens" => StopReason::MaxTokens,
1443                                "stop_sequence" => StopReason::StopSequence {
1444                                    sequence: delta
1445                                        .get("stop_sequence")
1446                                        .and_then(Value::as_str)
1447                                        .unwrap_or_default() // silent-fallback-ok: vendor reported stop_sequence stop reason without echoing the matched sequence string; "" preserves the StopReason variant choice
1448                                        .to_owned(),
1449                                },
1450                                "tool_use" => StopReason::ToolUse,
1451                                "refusal" => StopReason::Refusal {
1452                                    reason: RefusalReason::Safety,
1453                                },
1454                                other => {
1455                                    yield Ok(StreamDelta::Warning(
1456                                        ModelWarning::UnknownStopReason {
1457                                            raw: other.to_owned(),
1458                                        },
1459                                    ));
1460                                    StopReason::Other {
1461                                        raw: other.to_owned(),
1462                                    }
1463                                }
1464                            };
1465                        }
1466                        if let Some(usage) = event.get("usage") {
1467                            accumulated_usage.output_tokens =
1468                                u_field(Some(usage), "output_tokens");
1469                            yield Ok(StreamDelta::Usage(accumulated_usage.clone()));
1470                        }
1471                    }
1472                    "message_stop" => {
1473                        yield Ok(StreamDelta::Stop {
1474                            stop_reason: last_stop_reason.clone(),
1475                        });
1476                    }
1477                    "ping" => {
1478                        // Heartbeat — ignore.
1479                    }
1480                    "error" => {
1481                        let err = event.get("error").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates through child .get() chain as None
1482                        let kind = str_field(err, "type");
1483                        let message = str_field(err, "message");
1484                        yield Err(Error::provider_network(format!(
1485                            "Anthropic stream error ({kind}): {message}"
1486                        )));
1487                        return;
1488                    }
1489                    other => {
1490                        yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1491                            field: "stream.event".into(),
1492                            detail: format!("unknown SSE event type {other:?} ignored"),
1493                        }));
1494                    }
1495                }
1496            }
1497        }
1498    }
1499}
1500
1501/// Find the byte offset of the first `\n\n` in `buf`. Also handles
1502/// `\r\n\r\n` because SSE uses CRLF in the wild.
1503fn find_double_newline(buf: &[u8]) -> Option<usize> {
1504    let lf = buf.windows(2).position(|w| w == b"\n\n");
1505    let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1506    match (lf, crlf) {
1507        (Some(a), Some(b)) => Some(a.min(b)),
1508        (Some(a), None) => Some(a),
1509        (None, Some(b)) => Some(b),
1510        (None, None) => None,
1511    }
1512}
1513
1514/// Pull the JSON payload out of an SSE frame, concatenating multi-line
1515/// `data:` lines per the spec. Returns `None` for frames with no
1516/// `data:` line.
1517fn parse_sse_data(frame: &str) -> Option<String> {
1518    let mut out: Option<String> = None;
1519    for line in frame.lines() {
1520        if let Some(rest) = line.strip_prefix("data:") {
1521            let trimmed = rest.strip_prefix(' ').unwrap_or(rest); // silent-fallback-ok: SSE data line may or may not have leading space; idiomatic strip-or-pass-through
1522            match &mut out {
1523                Some(existing) => {
1524                    existing.push('\n');
1525                    existing.push_str(trimmed);
1526                }
1527                None => out = Some(trimmed.to_owned()),
1528            }
1529        }
1530    }
1531    out
1532}