1#![allow(clippy::cast_possible_truncation)] use std::collections::HashMap;
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{BoxByteStream, BoxDeltaStream, Codec, EncodedRequest};
31use crate::error::{Error, Result};
32use crate::ir::{
33 Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
34 ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, RefusalReason,
35 ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent, Usage,
36};
37use crate::rate_limit::RateLimitSnapshot;
38use crate::stream::StreamDelta;
39
40const PROVIDER_KEY: &str = "anthropic-messages";
43
44const ANTHROPIC_VERSION: &str = "2023-06-01";
45
46#[derive(Clone, Copy, Debug, Default)]
51pub struct AnthropicMessagesCodec;
52
53impl AnthropicMessagesCodec {
54 pub const fn new() -> Self {
56 Self
57 }
58}
59
60impl Codec for AnthropicMessagesCodec {
61 fn name(&self) -> &'static str {
62 PROVIDER_KEY
63 }
64
65 fn capabilities(&self, _model: &str) -> Capabilities {
66 Capabilities {
70 streaming: true,
71 tools: true,
72 multimodal_image: true,
73 multimodal_audio: false,
74 multimodal_video: false,
75 multimodal_document: true,
76 system_prompt: true,
77 structured_output: true,
78 prompt_caching: true,
79 thinking: true,
80 citations: true,
81 web_search: true,
82 computer_use: true,
83 max_context_tokens: 200_000,
84 }
85 }
86
87 fn auto_output_strategy(&self, _model: &str) -> crate::ir::OutputStrategy {
88 crate::ir::OutputStrategy::Tool
97 }
98
99 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
100 let (body, warnings) = build_body(request, false)?;
101 let mut encoded = finalize_request(&body, warnings)?;
102 apply_anthropic_beta_header(&mut encoded, request)?;
103 Ok(encoded)
104 }
105
106 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
107 let (body, warnings) = build_body(request, true)?;
108 let mut encoded = finalize_request(&body, warnings)?;
109 encoded.headers.insert(
110 http::header::ACCEPT,
111 http::HeaderValue::from_static("text/event-stream"),
112 );
113 apply_anthropic_beta_header(&mut encoded, request)?;
114 Ok(encoded.into_streaming())
115 }
116
117 fn decode_stream<'a>(
118 &'a self,
119 bytes: BoxByteStream<'a>,
120 warnings_in: Vec<ModelWarning>,
121 ) -> BoxDeltaStream<'a> {
122 Box::pin(stream_anthropic_sse(bytes, warnings_in))
123 }
124
125 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
126 let raw: Value = super::codec::parse_response_body(body, "Anthropic Messages")?;
127 let mut warnings = warnings_in;
128
129 let id = str_field(&raw, "id").to_owned();
130 let model = str_field(&raw, "model").to_owned();
131 let content = decode_content(&raw, &mut warnings);
132 let stop_reason = decode_stop_reason(&raw, &mut warnings);
133 let usage = decode_usage(&raw);
134
135 Ok(ModelResponse {
136 id,
137 model,
138 stop_reason,
139 content,
140 usage,
141 rate_limit: None,
142 warnings,
143 provider_echoes: Vec::new(),
144 })
145 }
146
147 fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
148 let mut snapshot = RateLimitSnapshot::default();
149 let mut populated = false;
150 for (header_name, target) in [
151 (
152 "anthropic-ratelimit-requests-remaining",
153 &mut snapshot.requests_remaining,
154 ),
155 (
156 "anthropic-ratelimit-tokens-remaining",
157 &mut snapshot.tokens_remaining,
158 ),
159 ] {
160 if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
161 && let Ok(parsed) = v.parse::<u64>()
162 {
163 *target = Some(parsed);
164 snapshot.raw.insert(header_name.to_owned(), v.to_owned());
165 populated = true;
166 }
167 }
168 for (header_name, target) in [
169 (
170 "anthropic-ratelimit-requests-reset",
171 &mut snapshot.requests_reset_at,
172 ),
173 (
174 "anthropic-ratelimit-tokens-reset",
175 &mut snapshot.tokens_reset_at,
176 ),
177 ] {
178 if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
179 && let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(v)
180 {
181 *target = Some(parsed.with_timezone(&chrono::Utc));
182 snapshot.raw.insert(header_name.to_owned(), v.to_owned());
183 populated = true;
184 }
185 }
186 populated.then_some(snapshot)
187 }
188}
189
190fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
193 if request.messages.is_empty() {
194 return Err(Error::invalid_request(
195 "Anthropic Messages requires at least one message",
196 ));
197 }
198
199 let max_tokens = request.max_tokens.ok_or_else(|| {
206 Error::invalid_request(
207 "Anthropic Messages requires max_tokens; \
208 set ModelRequest::max_tokens explicitly",
209 )
210 })?;
211
212 let mut warnings = Vec::with_capacity(1);
215 let (system_value, wire_messages) = encode_messages(request, &mut warnings);
216
217 let mut body = Map::new();
218 body.insert("model".into(), Value::String(request.model.clone()));
219 body.insert("messages".into(), Value::Array(wire_messages));
220 body.insert("max_tokens".into(), json!(max_tokens));
221 if let Some(value) = system_value {
222 body.insert("system".into(), value);
223 }
224 if let Some(temp) = request.temperature {
225 body.insert("temperature".into(), json!(temp));
226 }
227 if let Some(k) = request.top_k {
228 body.insert("top_k".into(), json!(k));
229 }
230 if let Some(p) = request.top_p {
231 body.insert("top_p".into(), json!(p));
232 }
233 if !request.stop_sequences.is_empty() {
234 body.insert(
235 "stop_sequences".into(),
236 json!(request.stop_sequences.clone()),
237 );
238 }
239 if !request.tools.is_empty() {
240 body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
241 body.insert(
242 "tool_choice".into(),
243 encode_tool_choice(&request.tool_choice),
244 );
245 }
246 if streaming {
247 body.insert("stream".into(), Value::Bool(true));
248 }
249 if let Some(format) = &request.response_format {
250 encode_anthropic_structured_output(format, &request.model, &mut body, &mut warnings)?;
251 }
252 apply_provider_extensions(request, &mut body, &mut warnings);
253 Ok((Value::Object(body), warnings))
254}
255
256fn apply_anthropic_beta_header(encoded: &mut EncodedRequest, request: &ModelRequest) -> Result<()> {
261 let Some(anthropic) = &request.provider_extensions.anthropic else {
262 return Ok(());
263 };
264 if anthropic.betas.is_empty() {
265 return Ok(());
266 }
267 let value = anthropic.betas.join(",");
268 let header = http::HeaderValue::from_str(&value).map_err(|_| {
269 crate::Error::invalid_request(
270 "AnthropicExt::betas: each entry must contain only visible ASCII",
271 )
272 })?;
273 encoded
274 .headers
275 .insert(http::HeaderName::from_static("anthropic-beta"), header);
276 Ok(())
277}
278
279fn apply_provider_extensions(
284 request: &ModelRequest,
285 body: &mut Map<String, Value>,
286 warnings: &mut Vec<ModelWarning>,
287) {
288 let ext = &request.provider_extensions;
289 if let Some(parallel) = request.parallel_tool_calls {
296 if let Some(tc) = body.get_mut("tool_choice").and_then(Value::as_object_mut) {
297 tc.insert("disable_parallel_tool_use".into(), json!(!parallel));
298 } else {
299 warnings.push(ModelWarning::LossyEncode {
300 field: "parallel_tool_calls".into(),
301 detail: "Anthropic encodes via tool_choice.disable_parallel_tool_use; \
302 no tool_choice block (tools list empty) — knob discarded"
303 .into(),
304 });
305 }
306 }
307 if let Some(user_id) = &request.end_user_id {
308 body.insert("metadata".into(), json!({"user_id": user_id}));
309 }
310 if request.seed.is_some() {
311 warnings.push(ModelWarning::LossyEncode {
312 field: "seed".into(),
313 detail: "Anthropic Messages has no deterministic-sampling knob — drop the field".into(),
314 });
315 }
316 if let Some(effort) = &request.reasoning_effort {
317 encode_anthropic_thinking(&request.model, effort, body, warnings);
318 }
319 if ext.openai_chat.is_some() {
320 warnings.push(ModelWarning::ProviderExtensionIgnored {
321 vendor: "openai_chat".into(),
322 });
323 }
324 if ext.openai_responses.is_some() {
325 warnings.push(ModelWarning::ProviderExtensionIgnored {
326 vendor: "openai_responses".into(),
327 });
328 }
329 if ext.gemini.is_some() {
330 warnings.push(ModelWarning::ProviderExtensionIgnored {
331 vendor: "gemini".into(),
332 });
333 }
334 if ext.bedrock.is_some() {
335 warnings.push(ModelWarning::ProviderExtensionIgnored {
336 vendor: "bedrock".into(),
337 });
338 }
339}
340
341fn encode_anthropic_structured_output(
346 format: &ResponseFormat,
347 model: &str,
348 body: &mut Map<String, Value>,
349 warnings: &mut Vec<ModelWarning>,
350) -> Result<()> {
351 let stripped = crate::LlmFacingSchema::strip(&format.json_schema.schema);
357 let strategy = resolve_output_strategy(format.strategy, model);
358 match strategy {
359 OutputStrategy::Native => {
360 body.insert(
365 "output_config".into(),
366 json!({
367 "format": {
368 "type": "json_schema",
369 "schema": stripped,
370 }
371 }),
372 );
373 if !format.strict {
374 warnings.push(ModelWarning::LossyEncode {
375 field: "response_format.strict".into(),
376 detail: "Anthropic always strict-validates structured output; \
377 the strict=false request was approximated"
378 .into(),
379 });
380 }
381 }
382 OutputStrategy::Tool => {
383 let tool_name = format.json_schema.name.clone();
392 let synthetic_tool = json!({
393 "type": "custom",
394 "name": tool_name,
395 "description": format!(
396 "Emit the response as a JSON object matching the {tool_name} schema."
397 ),
398 "input_schema": stripped,
399 });
400 let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
406 if let Value::Array(arr) = tools {
407 arr.insert(0, synthetic_tool);
408 }
409 body.insert(
410 "tool_choice".into(),
411 json!({
412 "type": "tool",
413 "name": format.json_schema.name,
414 "disable_parallel_tool_use": true,
418 }),
419 );
420 if !format.strict {
421 warnings.push(ModelWarning::LossyEncode {
425 field: "response_format.strict".into(),
426 detail: "Anthropic Tool-strategy structured output is always \
427 schema-validated; strict=false was approximated"
428 .into(),
429 });
430 }
431 }
432 OutputStrategy::Prompted => {
433 return Err(Error::invalid_request(
434 "OutputStrategy::Prompted is deferred to entelix 1.1; use \
435 OutputStrategy::Native or OutputStrategy::Tool",
436 ));
437 }
438 OutputStrategy::Auto => {
439 return Err(Error::invalid_request(
441 "OutputStrategy::Auto did not resolve — codec invariant violation",
442 ));
443 }
444 }
445 Ok(())
446}
447
448#[allow(clippy::match_same_arms)]
457const fn resolve_output_strategy(strategy: OutputStrategy, _model: &str) -> OutputStrategy {
458 match strategy {
459 OutputStrategy::Auto => OutputStrategy::Tool,
460 OutputStrategy::Native => OutputStrategy::Native,
461 OutputStrategy::Tool => OutputStrategy::Tool,
462 OutputStrategy::Prompted => OutputStrategy::Prompted,
463 }
464}
465
466fn is_anthropic_adaptive_only(model: &str) -> bool {
474 model.starts_with("claude-opus-4-7")
477}
478
479fn encode_anthropic_thinking(
498 model: &str,
499 effort: &ReasoningEffort,
500 body: &mut Map<String, Value>,
501 warnings: &mut Vec<ModelWarning>,
502) {
503 let adaptive_only = is_anthropic_adaptive_only(model);
504 let thinking = match effort {
505 ReasoningEffort::Off => {
506 json!({"type": "disabled"})
507 }
508 ReasoningEffort::Minimal => {
509 warnings.push(ModelWarning::LossyEncode {
510 field: "reasoning_effort".into(),
511 detail:
512 "Anthropic has no `Minimal` bucket — snapped to `{type:\"adaptive\", effort:\"low\"}`"
513 .into(),
514 });
515 json!({"type": "adaptive", "effort": "low"})
516 }
517 ReasoningEffort::Low => {
518 if adaptive_only {
519 json!({"type": "adaptive", "effort": "low"})
520 } else {
521 json!({"type": "enabled", "budget_tokens": 1024})
522 }
523 }
524 ReasoningEffort::Medium => {
525 if adaptive_only {
526 json!({"type": "adaptive", "effort": "medium"})
527 } else {
528 json!({"type": "enabled", "budget_tokens": 4096})
529 }
530 }
531 ReasoningEffort::High => {
532 if adaptive_only {
533 json!({"type": "adaptive", "effort": "high"})
534 } else {
535 json!({"type": "enabled", "budget_tokens": 16384})
536 }
537 }
538 ReasoningEffort::Auto => {
539 json!({"type": "adaptive"})
540 }
541 ReasoningEffort::VendorSpecific(literal) => {
542 if adaptive_only {
543 warnings.push(ModelWarning::LossyEncode {
549 field: "reasoning_effort".into(),
550 detail: format!(
551 "Anthropic {model} is adaptive-only — manual budget '{literal}' \
552 dropped; emitting `{{type:\"adaptive\"}}` instead"
553 ),
554 });
555 json!({"type": "adaptive"})
556 } else if let Ok(budget) = literal.parse::<u32>() {
557 json!({"type": "enabled", "budget_tokens": budget})
558 } else {
559 warnings.push(ModelWarning::LossyEncode {
560 field: "reasoning_effort".into(),
561 detail: format!(
562 "Anthropic vendor-specific reasoning_effort {literal:?} is not a \
563 numeric budget_tokens — falling through to `Medium`"
564 ),
565 });
566 json!({"type": "enabled", "budget_tokens": 4096})
567 }
568 }
569 };
570 body.insert("thinking".into(), thinking);
571}
572
573fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
574 let bytes = serde_json::to_vec(body)?;
575 let mut encoded = EncodedRequest::post_json("/v1/messages", Bytes::from(bytes));
576 encoded.headers.insert(
577 http::HeaderName::from_static("anthropic-version"),
578 http::HeaderValue::from_static(ANTHROPIC_VERSION),
579 );
580 encoded.warnings = warnings;
581 Ok(encoded)
582}
583
584fn encode_messages(
587 request: &ModelRequest,
588 warnings: &mut Vec<ModelWarning>,
589) -> (Option<Value>, Vec<Value>) {
590 let mut system_blocks: Vec<(String, Option<crate::ir::CacheControl>)> = request
595 .system
596 .blocks()
597 .iter()
598 .map(|b| (b.text.clone(), b.cache_control))
599 .collect();
600 let mut wire_messages = Vec::with_capacity(request.messages.len());
601
602 for (idx, msg) in request.messages.iter().enumerate() {
603 match msg.role {
604 Role::System => {
605 let mut lossy_non_text = false;
606 let text = msg
607 .content
608 .iter()
609 .filter_map(|part| {
610 if let ContentPart::Text { text, .. } = part {
611 Some(text.clone())
612 } else {
613 lossy_non_text = true;
614 None
615 }
616 })
617 .collect::<Vec<_>>()
618 .join("\n");
619 if lossy_non_text {
620 warnings.push(ModelWarning::LossyEncode {
621 field: format!("messages[{idx}].content"),
622 detail: "non-text parts dropped from system message (Anthropic has no \
623 system role)"
624 .into(),
625 });
626 }
627 if !text.is_empty() {
628 system_blocks.push((text, None));
629 }
630 }
631 Role::User | Role::Assistant | Role::Tool => {
632 let role_str = match msg.role {
633 Role::Assistant => "assistant",
634 _ => "user",
635 };
636 let content_array = encode_content_parts(&msg.content, warnings, idx);
637 let mut entry = Map::new();
638 entry.insert("role".into(), Value::String(role_str.into()));
639 entry.insert("content".into(), Value::Array(content_array));
640 wire_messages.push(Value::Object(entry));
641 }
642 }
643 }
644
645 let any_cached = system_blocks.iter().any(|(_, cc)| cc.is_some());
649 let system_value = if system_blocks.is_empty() {
650 None
651 } else if any_cached {
652 let array: Vec<Value> = system_blocks
653 .into_iter()
654 .map(|(text, cc)| {
655 let mut obj = Map::new();
656 obj.insert("type".into(), Value::String("text".into()));
657 obj.insert("text".into(), Value::String(text));
658 if let Some(cache) = cc {
659 obj.insert("cache_control".into(), encode_cache_control(cache));
660 }
661 Value::Object(obj)
662 })
663 .collect();
664 Some(Value::Array(array))
665 } else {
666 Some(Value::String(
667 system_blocks
668 .into_iter()
669 .map(|(text, _)| text)
670 .collect::<Vec<_>>()
671 .join("\n\n"),
672 ))
673 };
674 (system_value, wire_messages)
675}
676
677#[allow(clippy::too_many_lines)] fn encode_content_parts(
679 parts: &[ContentPart],
680 warnings: &mut Vec<ModelWarning>,
681 msg_idx: usize,
682) -> Vec<Value> {
683 let mut out = Vec::with_capacity(parts.len());
684 for (part_idx, part) in parts.iter().enumerate() {
685 let path = || format!("messages[{msg_idx}].content[{part_idx}]");
686 match part {
687 ContentPart::Text {
688 text,
689 cache_control,
690 ..
691 } => {
692 let mut block = json_block("text", &[("text", Value::String(text.clone()))]);
693 attach_cache_control(&mut block, cache_control.as_ref());
694 out.push(Value::Object(block));
695 }
696 ContentPart::Image {
697 source,
698 cache_control,
699 ..
700 } => {
701 let mut block = json_block(
702 "image",
703 &[("source", encode_media_source_anthropic(source))],
704 );
705 attach_cache_control(&mut block, cache_control.as_ref());
706 out.push(Value::Object(block));
707 }
708 ContentPart::Audio { .. } => warnings.push(ModelWarning::LossyEncode {
709 field: path(),
710 detail: "Anthropic Messages does not accept audio inputs; block dropped".into(),
711 }),
712 ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
713 field: path(),
714 detail: "Anthropic Messages does not accept video inputs; block dropped".into(),
715 }),
716 ContentPart::Document {
717 source,
718 name,
719 cache_control,
720 ..
721 } => {
722 let mut block = Map::new();
723 block.insert("type".into(), Value::String("document".into()));
724 block.insert("source".into(), encode_media_source_anthropic(source));
725 if let Some(title) = name {
726 block.insert("title".into(), Value::String(title.clone()));
727 }
728 attach_cache_control(&mut block, cache_control.as_ref());
729 out.push(Value::Object(block));
730 }
731 ContentPart::Thinking {
732 text,
733 cache_control,
734 provider_echoes,
735 } => {
736 let mut block = Map::new();
737 block.insert("type".into(), Value::String("thinking".into()));
738 block.insert("thinking".into(), Value::String(text.clone()));
739 if let Some(sig) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
740 .and_then(|e| e.payload_str("signature"))
741 {
742 block.insert("signature".into(), Value::String(sig.to_owned()));
743 }
744 attach_cache_control(&mut block, cache_control.as_ref());
745 out.push(Value::Object(block));
746 }
747 ContentPart::RedactedThinking { provider_echoes } => {
748 let Some(data) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
749 .and_then(|e| e.payload_str("data"))
750 else {
751 warnings.push(ModelWarning::LossyEncode {
754 field: path(),
755 detail: "redacted_thinking part missing 'anthropic-messages' \
756 provider_echo with 'data' payload; block dropped"
757 .into(),
758 });
759 continue;
760 };
761 let mut block = Map::new();
762 block.insert("type".into(), Value::String("redacted_thinking".into()));
763 block.insert("data".into(), Value::String(data.to_owned()));
764 out.push(Value::Object(block));
765 }
766 ContentPart::Citation {
767 snippet,
768 source,
769 cache_control,
770 ..
771 } => {
772 let citation_json = match source {
776 CitationSource::Url { url, title } => {
777 let mut o = Map::new();
778 o.insert(
779 "type".into(),
780 Value::String("web_search_result_location".into()),
781 );
782 o.insert("url".into(), Value::String(url.clone()));
783 if let Some(t) = title {
784 o.insert("title".into(), Value::String(t.clone()));
785 }
786 Value::Object(o)
787 }
788 CitationSource::Document {
789 document_index,
790 title,
791 } => {
792 let mut o = Map::new();
793 o.insert("type".into(), Value::String("char_location".into()));
794 o.insert("document_index".into(), json!(*document_index));
795 if let Some(t) = title {
796 o.insert("document_title".into(), Value::String(t.clone()));
797 }
798 Value::Object(o)
799 }
800 };
801 let mut block = Map::new();
802 block.insert("type".into(), Value::String("text".into()));
803 block.insert("text".into(), Value::String(snippet.clone()));
804 block.insert("citations".into(), Value::Array(vec![citation_json]));
805 attach_cache_control(&mut block, cache_control.as_ref());
806 out.push(Value::Object(block));
807 }
808 ContentPart::ToolUse {
809 id, name, input, ..
810 } => out.push(json!({
811 "type": "tool_use",
812 "id": id,
813 "name": name,
814 "input": input,
815 })),
816 ContentPart::ToolResult {
817 tool_use_id,
818 name: _,
819 content,
820 is_error,
821 cache_control,
822 ..
823 } => out.push(encode_tool_result(
824 tool_use_id,
825 content,
826 *is_error,
827 cache_control.as_ref(),
828 warnings,
829 msg_idx,
830 part_idx,
831 )),
832 ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
833 warnings.push(ModelWarning::LossyEncode {
839 field: path(),
840 detail: "Anthropic Messages does not accept assistant-produced \
841 image / audio output as input — block dropped"
842 .into(),
843 });
844 }
845 }
846 }
847 out
848}
849
850fn json_block(kind: &str, fields: &[(&str, Value)]) -> Map<String, Value> {
852 let mut block = Map::new();
853 block.insert("type".into(), Value::String(kind.into()));
854 for (k, v) in fields {
855 block.insert((*k).to_owned(), v.clone());
856 }
857 block
858}
859
860fn attach_cache_control(block: &mut Map<String, Value>, cache: Option<&crate::ir::CacheControl>) {
864 if let Some(cache) = cache {
865 block.insert("cache_control".into(), encode_cache_control(*cache));
866 }
867}
868
869fn encode_cache_control(cache: crate::ir::CacheControl) -> Value {
874 let mut obj = Map::new();
875 obj.insert("type".into(), Value::String("ephemeral".into()));
876 if let Some(ttl) = cache.ttl.wire_ttl_field() {
877 obj.insert("ttl".into(), Value::String(ttl.into()));
878 }
879 Value::Object(obj)
880}
881
882fn encode_media_source_anthropic(source: &MediaSource) -> Value {
883 match source {
884 MediaSource::Url { url, .. } => json!({
885 "type": "url",
886 "url": url,
887 }),
888 MediaSource::Base64 { media_type, data } => json!({
889 "type": "base64",
890 "media_type": media_type,
891 "data": data,
892 }),
893 MediaSource::FileId { id, .. } => json!({
894 "type": "file",
895 "file_id": id,
896 }),
897 }
898}
899
900fn encode_tool_result(
901 tool_use_id: &str,
902 content: &ToolResultContent,
903 is_error: bool,
904 cache_control: Option<&crate::ir::CacheControl>,
905 warnings: &mut Vec<ModelWarning>,
906 msg_idx: usize,
907 part_idx: usize,
908) -> Value {
909 let content_json = match content {
910 ToolResultContent::Text(s) => Value::String(s.clone()),
911 ToolResultContent::Json(v) => {
912 warnings.push(ModelWarning::LossyEncode {
913 field: format!("messages[{msg_idx}].content[{part_idx}]"),
914 detail: "tool_result Json payload stringified for Anthropic wire format".into(),
915 });
916 Value::String(v.to_string())
917 }
918 };
919 let mut block = Map::new();
920 block.insert("type".into(), Value::String("tool_result".into()));
921 block.insert("tool_use_id".into(), Value::String(tool_use_id.into()));
922 block.insert("content".into(), content_json);
923 if is_error {
924 block.insert("is_error".into(), Value::Bool(true));
925 }
926 attach_cache_control(&mut block, cache_control);
927 Value::Object(block)
928}
929
930fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
931 let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
932 for (idx, t) in tools.iter().enumerate() {
933 let mut obj = match &t.kind {
934 ToolKind::Function { input_schema } => {
935 let mut o = Map::new();
936 o.insert("name".into(), Value::String(t.name.clone()));
937 o.insert("description".into(), Value::String(t.description.clone()));
938 o.insert("input_schema".into(), input_schema.clone());
939 o
940 }
941 ToolKind::WebSearch {
942 max_uses,
943 allowed_domains,
944 } => {
945 let mut o = Map::new();
946 o.insert("type".into(), Value::String("web_search_20250305".into()));
947 o.insert("name".into(), Value::String(t.name.clone()));
948 if let Some(n) = max_uses {
949 o.insert("max_uses".into(), json!(*n));
950 }
951 if !allowed_domains.is_empty() {
952 o.insert("allowed_domains".into(), json!(allowed_domains));
953 }
954 o
955 }
956 ToolKind::Computer {
957 display_width,
958 display_height,
959 } => {
960 let mut o = Map::new();
961 o.insert("type".into(), Value::String("computer_20250124".into()));
962 o.insert("name".into(), Value::String(t.name.clone()));
963 o.insert("display_width_px".into(), json!(*display_width));
964 o.insert("display_height_px".into(), json!(*display_height));
965 o
966 }
967 ToolKind::TextEditor => {
968 let mut o = Map::new();
969 o.insert("type".into(), Value::String("text_editor_20250124".into()));
970 o.insert("name".into(), Value::String(t.name.clone()));
971 o
972 }
973 ToolKind::Bash => {
974 let mut o = Map::new();
975 o.insert("type".into(), Value::String("bash_20250124".into()));
976 o.insert("name".into(), Value::String(t.name.clone()));
977 o
978 }
979 ToolKind::CodeExecution => {
980 let mut o = Map::new();
981 o.insert(
982 "type".into(),
983 Value::String("code_execution_20250522".into()),
984 );
985 o.insert("name".into(), Value::String(t.name.clone()));
986 o
987 }
988 ToolKind::McpConnector {
989 name,
990 server_url,
991 authorization_token,
992 } => {
993 let mut o = Map::new();
994 o.insert("type".into(), Value::String("mcp".into()));
995 o.insert("name".into(), Value::String(name.clone()));
996 o.insert("server_url".into(), Value::String(server_url.clone()));
997 if let Some(token) = authorization_token {
998 o.insert("authorization_token".into(), Value::String(token.clone()));
999 }
1000 o
1001 }
1002 ToolKind::Memory => {
1003 let mut o = Map::new();
1004 o.insert("type".into(), Value::String("memory_20250818".into()));
1005 o.insert("name".into(), Value::String(t.name.clone()));
1006 o
1007 }
1008 ToolKind::FileSearch { .. } | ToolKind::CodeInterpreter | ToolKind::ImageGeneration => {
1009 warnings.push(ModelWarning::LossyEncode {
1010 field: format!("tools[{idx}]"),
1011 detail: "Anthropic does not natively support OpenAI-only built-ins \
1012 (file_search / code_interpreter / image_generation) — tool dropped"
1013 .into(),
1014 });
1015 continue;
1016 }
1017 };
1018 attach_cache_control(&mut obj, t.cache_control.as_ref());
1019 arr.push(Value::Object(obj));
1020 }
1021 Value::Array(arr)
1022}
1023
1024fn encode_tool_choice(choice: &ToolChoice) -> Value {
1025 match choice {
1026 ToolChoice::Auto => json!({ "type": "auto" }),
1027 ToolChoice::Required => json!({ "type": "any" }),
1028 ToolChoice::Specific { name } => json!({ "type": "tool", "name": name }),
1029 ToolChoice::None => json!({ "type": "none" }),
1030 }
1031}
1032
1033fn decode_content(raw: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
1036 let Some(arr) = raw.get("content").and_then(Value::as_array) else {
1037 return Vec::new();
1038 };
1039 let mut out = Vec::with_capacity(arr.len());
1040 for (idx, block) in arr.iter().enumerate() {
1041 match block.get("type").and_then(Value::as_str) {
1042 Some("text") => {
1043 let text = str_field(block, "text").to_owned();
1044 if let Some(citations) = block.get("citations").and_then(Value::as_array) {
1049 for c in citations {
1050 if let Some(source) = decode_citation_source(c) {
1051 out.push(ContentPart::Citation {
1052 snippet: text.clone(),
1053 source,
1054 cache_control: None,
1055 provider_echoes: Vec::new(),
1056 });
1057 }
1058 }
1059 }
1060 if !text.is_empty() {
1061 out.push(ContentPart::text(text));
1062 }
1063 }
1064 Some("thinking") => {
1065 let thinking_text = str_field(block, "thinking").to_owned();
1066 let mut provider_echoes = Vec::new();
1067 if let Some(sig) = block
1068 .get("signature")
1069 .and_then(Value::as_str)
1070 .filter(|s| !s.is_empty())
1071 {
1072 provider_echoes.push(ProviderEchoSnapshot::for_provider(
1073 PROVIDER_KEY,
1074 "signature",
1075 sig.to_owned(),
1076 ));
1077 }
1078 out.push(ContentPart::Thinking {
1079 text: thinking_text,
1080 cache_control: None,
1081 provider_echoes,
1082 });
1083 }
1084 Some("redacted_thinking") => {
1085 let data = str_field(block, "data").to_owned();
1088 out.push(ContentPart::RedactedThinking {
1089 provider_echoes: vec![ProviderEchoSnapshot::for_provider(
1090 PROVIDER_KEY,
1091 "data",
1092 data,
1093 )],
1094 });
1095 }
1096 Some("tool_use") => {
1097 let id = str_field(block, "id").to_owned();
1098 let name = str_field(block, "name").to_owned();
1099 let input = block.get("input").cloned().unwrap_or_else(|| json!({})); out.push(ContentPart::ToolUse {
1101 id,
1102 name,
1103 input,
1104 provider_echoes: Vec::new(),
1105 });
1106 }
1107 Some(other) => {
1108 warnings.push(ModelWarning::LossyEncode {
1109 field: format!("response.content[{idx}]"),
1110 detail: format!("unknown content block type '{other}' dropped"),
1111 });
1112 }
1113 None => {
1114 warnings.push(ModelWarning::LossyEncode {
1115 field: format!("response.content[{idx}]"),
1116 detail: "content block missing 'type' field".into(),
1117 });
1118 }
1119 }
1120 }
1121 out
1122}
1123
1124fn decode_citation_source(c: &Value) -> Option<CitationSource> {
1125 match c.get("type").and_then(Value::as_str)? {
1126 "web_search_result_location" => Some(CitationSource::Url {
1127 url: str_field(c, "url").to_owned(),
1128 title: c.get("title").and_then(Value::as_str).map(str::to_owned),
1129 }),
1130 "char_location" | "page_location" | "content_block_location" => {
1131 let document_index = c
1138 .get("document_index")
1139 .and_then(Value::as_u64)
1140 .and_then(|n| u32::try_from(n).ok())?;
1141 Some(CitationSource::Document {
1142 document_index,
1143 title: c
1144 .get("document_title")
1145 .and_then(Value::as_str)
1146 .map(str::to_owned),
1147 })
1148 }
1149 _ => None,
1150 }
1151}
1152
1153fn decode_stop_reason(raw: &Value, warnings: &mut Vec<ModelWarning>) -> StopReason {
1154 match raw.get("stop_reason").and_then(Value::as_str) {
1155 Some("end_turn") => StopReason::EndTurn,
1156 Some("max_tokens") => StopReason::MaxTokens,
1157 Some("stop_sequence") => StopReason::StopSequence {
1158 sequence: str_field(raw, "stop_sequence").to_owned(),
1159 },
1160 Some("tool_use") => StopReason::ToolUse,
1161 Some("refusal") => StopReason::Refusal {
1162 reason: RefusalReason::Safety,
1163 },
1164 Some(other) => {
1165 warnings.push(ModelWarning::UnknownStopReason {
1166 raw: other.to_owned(),
1167 });
1168 StopReason::Other {
1169 raw: other.to_owned(),
1170 }
1171 }
1172 None => {
1173 warnings.push(ModelWarning::LossyEncode {
1177 field: "stop_reason".into(),
1178 detail: "Anthropic Messages payload carried no stop_reason — \
1179 IR records `Other{raw:\"missing\"}`"
1180 .into(),
1181 });
1182 StopReason::Other {
1183 raw: "missing".to_owned(),
1184 }
1185 }
1186 }
1187}
1188
1189fn decode_usage(raw: &Value) -> Usage {
1190 let usage = raw.get("usage");
1191 Usage {
1192 input_tokens: u_field(usage, "input_tokens"),
1193 output_tokens: u_field(usage, "output_tokens"),
1194 cached_input_tokens: u_field(usage, "cache_read_input_tokens"),
1195 cache_creation_input_tokens: u_field(usage, "cache_creation_input_tokens"),
1196 reasoning_tokens: 0,
1197 safety_ratings: Vec::new(),
1198 }
1199}
1200
1201fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1202 v.get(key).and_then(Value::as_str).unwrap_or("") }
1204
1205fn u_field(v: Option<&Value>, key: &str) -> u32 {
1206 v.and_then(|inner| inner.get(key))
1207 .and_then(Value::as_u64)
1208 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
1210
1211#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1216enum BlockKind {
1217 Text,
1218 Thinking,
1219 ToolUse,
1220}
1221
1222#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1223fn stream_anthropic_sse(
1224 bytes: BoxByteStream<'_>,
1225 warnings_in: Vec<ModelWarning>,
1226) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1227 async_stream::stream! {
1228 let mut bytes = bytes;
1229 let mut buf: Vec<u8> = Vec::new();
1230 let mut blocks: HashMap<u64, BlockKind> = HashMap::new();
1231 let mut last_stop_reason = StopReason::EndTurn;
1232 let mut accumulated_usage = Usage::default();
1233 let mut warnings_emitted = false;
1234
1235 while let Some(chunk) = bytes.next().await {
1236 match chunk {
1237 Ok(b) => buf.extend_from_slice(&b),
1238 Err(e) => {
1239 yield Err(e);
1240 return;
1241 }
1242 }
1243 if !warnings_emitted {
1245 warnings_emitted = true;
1246 for w in &warnings_in {
1247 yield Ok(StreamDelta::Warning(w.clone()));
1248 }
1249 }
1250 while let Some(pos) = find_double_newline(&buf) {
1252 let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1253 let Ok(frame_str) = std::str::from_utf8(&frame) else {
1254 continue;
1255 };
1256 let Some(payload) = parse_sse_data(frame_str) else {
1257 continue;
1258 };
1259 let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1260 yield Err(Error::invalid_request(format!(
1261 "Anthropic stream: malformed event payload: {payload}"
1262 )));
1263 return;
1264 };
1265 let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); match event_type {
1267 "message_start" => {
1268 let message = event.get("message").unwrap_or(&Value::Null); let id = str_field(message, "id").to_owned();
1270 let model = str_field(message, "model").to_owned();
1271 if let Some(usage) = message.get("usage") {
1272 accumulated_usage.input_tokens = u_field(Some(usage), "input_tokens");
1273 accumulated_usage.cached_input_tokens =
1274 u_field(Some(usage), "cache_read_input_tokens");
1275 accumulated_usage.cache_creation_input_tokens =
1276 u_field(Some(usage), "cache_creation_input_tokens");
1277 }
1278 yield Ok(StreamDelta::Start {
1279 id,
1280 model,
1281 provider_echoes: Vec::new(),
1282 });
1283 }
1284 "content_block_start" => {
1285 let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1286 n
1287 } else {
1288 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1289 field: "stream.content_block_start.index".into(),
1290 detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 to keep stream parser progressing".into(),
1291 }));
1292 0
1293 };
1294 let block = event.get("content_block").unwrap_or(&Value::Null); match block.get("type").and_then(Value::as_str) {
1296 Some("text") => {
1297 blocks.insert(idx, BlockKind::Text);
1298 if let Some(text) = block.get("text").and_then(Value::as_str)
1299 && !text.is_empty()
1300 {
1301 yield Ok(StreamDelta::TextDelta {
1302 text: text.to_owned(),
1303 provider_echoes: Vec::new(),
1304 });
1305 }
1306 }
1307 Some("thinking") => {
1308 blocks.insert(idx, BlockKind::Thinking);
1309 let text = block
1310 .get("thinking")
1311 .and_then(Value::as_str)
1312 .unwrap_or("") .to_owned();
1314 let provider_echoes = block
1322 .get("signature")
1323 .and_then(Value::as_str)
1324 .filter(|s| !s.is_empty())
1325 .map(|sig| {
1326 vec![ProviderEchoSnapshot::for_provider(
1327 PROVIDER_KEY,
1328 "signature",
1329 sig.to_owned(),
1330 )]
1331 })
1332 .unwrap_or_default();
1333 if !text.is_empty() || !provider_echoes.is_empty() {
1334 yield Ok(StreamDelta::ThinkingDelta {
1335 text,
1336 provider_echoes,
1337 });
1338 }
1339 }
1340 Some("tool_use") => {
1341 blocks.insert(idx, BlockKind::ToolUse);
1342 let id = str_field(block, "id").to_owned();
1343 let name = str_field(block, "name").to_owned();
1344 yield Ok(StreamDelta::ToolUseStart {
1345 id,
1346 name,
1347 provider_echoes: Vec::new(),
1348 });
1349 }
1350 other => {
1351 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1352 field: format!("stream.content_block_start[{idx}]"),
1353 detail: format!(
1354 "unsupported block type {other:?} dropped"
1355 ),
1356 }));
1357 }
1358 }
1359 }
1360 "content_block_delta" => {
1361 let delta = event.get("delta").unwrap_or(&Value::Null); match delta.get("type").and_then(Value::as_str) {
1363 Some("text_delta") => {
1364 if let Some(text) = delta.get("text").and_then(Value::as_str) {
1365 yield Ok(StreamDelta::TextDelta {
1366 text: text.to_owned(),
1367 provider_echoes: Vec::new(),
1368 });
1369 }
1370 }
1371 Some("thinking_delta") => {
1372 if let Some(text) = delta.get("thinking").and_then(Value::as_str) {
1373 yield Ok(StreamDelta::ThinkingDelta {
1374 text: text.to_owned(),
1375 provider_echoes: Vec::new(),
1376 });
1377 }
1378 }
1379 Some("signature_delta") => {
1380 if let Some(sig) =
1381 delta.get("signature").and_then(Value::as_str)
1382 {
1383 yield Ok(StreamDelta::ThinkingDelta {
1384 text: String::new(),
1385 provider_echoes: vec![
1386 ProviderEchoSnapshot::for_provider(
1387 PROVIDER_KEY,
1388 "signature",
1389 sig.to_owned(),
1390 ),
1391 ],
1392 });
1393 }
1394 }
1395 Some("input_json_delta") => {
1396 if let Some(partial) =
1397 delta.get("partial_json").and_then(Value::as_str)
1398 {
1399 yield Ok(StreamDelta::ToolUseInputDelta {
1400 partial_json: partial.to_owned(),
1401 });
1402 }
1403 }
1404 other => {
1405 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1406 field: "stream.content_block_delta".into(),
1407 detail: format!(
1408 "unsupported delta type {other:?} dropped"
1409 ),
1410 }));
1411 }
1412 }
1413 }
1414 "content_block_stop" => {
1415 let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1416 n
1417 } else {
1418 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1419 field: "stream.content_block_stop.index".into(),
1420 detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 (mirrors the content_block_start handler)".into(),
1421 }));
1422 0
1423 };
1424 if matches!(blocks.remove(&idx), Some(BlockKind::ToolUse)) {
1425 yield Ok(StreamDelta::ToolUseStop);
1426 }
1427 }
1428 "message_delta" => {
1429 if let Some(delta) = event.get("delta")
1430 && let Some(reason) =
1431 delta.get("stop_reason").and_then(Value::as_str)
1432 {
1433 last_stop_reason = match reason {
1441 "end_turn" => StopReason::EndTurn,
1442 "max_tokens" => StopReason::MaxTokens,
1443 "stop_sequence" => StopReason::StopSequence {
1444 sequence: delta
1445 .get("stop_sequence")
1446 .and_then(Value::as_str)
1447 .unwrap_or_default() .to_owned(),
1449 },
1450 "tool_use" => StopReason::ToolUse,
1451 "refusal" => StopReason::Refusal {
1452 reason: RefusalReason::Safety,
1453 },
1454 other => {
1455 yield Ok(StreamDelta::Warning(
1456 ModelWarning::UnknownStopReason {
1457 raw: other.to_owned(),
1458 },
1459 ));
1460 StopReason::Other {
1461 raw: other.to_owned(),
1462 }
1463 }
1464 };
1465 }
1466 if let Some(usage) = event.get("usage") {
1467 accumulated_usage.output_tokens =
1468 u_field(Some(usage), "output_tokens");
1469 yield Ok(StreamDelta::Usage(accumulated_usage.clone()));
1470 }
1471 }
1472 "message_stop" => {
1473 yield Ok(StreamDelta::Stop {
1474 stop_reason: last_stop_reason.clone(),
1475 });
1476 }
1477 "ping" => {
1478 }
1480 "error" => {
1481 let err = event.get("error").unwrap_or(&Value::Null); let kind = str_field(err, "type");
1483 let message = str_field(err, "message");
1484 yield Err(Error::provider_network(format!(
1485 "Anthropic stream error ({kind}): {message}"
1486 )));
1487 return;
1488 }
1489 other => {
1490 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1491 field: "stream.event".into(),
1492 detail: format!("unknown SSE event type {other:?} ignored"),
1493 }));
1494 }
1495 }
1496 }
1497 }
1498 }
1499}
1500
1501fn find_double_newline(buf: &[u8]) -> Option<usize> {
1504 let lf = buf.windows(2).position(|w| w == b"\n\n");
1505 let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1506 match (lf, crlf) {
1507 (Some(a), Some(b)) => Some(a.min(b)),
1508 (Some(a), None) => Some(a),
1509 (None, Some(b)) => Some(b),
1510 (None, None) => None,
1511 }
1512}
1513
1514fn parse_sse_data(frame: &str) -> Option<String> {
1518 let mut out: Option<String> = None;
1519 for line in frame.lines() {
1520 if let Some(rest) = line.strip_prefix("data:") {
1521 let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
1523 Some(existing) => {
1524 existing.push('\n');
1525 existing.push_str(trimmed);
1526 }
1527 None => out = Some(trimmed.to_owned()),
1528 }
1529 }
1530 }
1531 out
1532}