1#![allow(clippy::cast_possible_truncation)] use std::collections::HashMap;
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{BoxByteStream, BoxDeltaStream, Codec, EncodedRequest};
31use crate::error::{Error, Result};
32use crate::ir::{
33 Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
34 ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, RefusalReason,
35 ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent, Usage,
36};
37use crate::rate_limit::RateLimitSnapshot;
38use crate::stream::StreamDelta;
39
40const PROVIDER_KEY: &str = "anthropic-messages";
43
44const ANTHROPIC_VERSION: &str = "2023-06-01";
45
46#[derive(Clone, Copy, Debug, Default)]
51pub struct AnthropicMessagesCodec;
52
53impl AnthropicMessagesCodec {
54 pub const fn new() -> Self {
56 Self
57 }
58}
59
60impl Codec for AnthropicMessagesCodec {
61 fn name(&self) -> &'static str {
62 PROVIDER_KEY
63 }
64
65 fn capabilities(&self, _model: &str) -> Capabilities {
66 Capabilities {
70 streaming: true,
71 tools: true,
72 multimodal_image: true,
73 multimodal_audio: false,
74 multimodal_video: false,
75 multimodal_document: true,
76 system_prompt: true,
77 structured_output: true,
78 prompt_caching: true,
79 thinking: true,
80 citations: true,
81 web_search: true,
82 computer_use: true,
83 max_context_tokens: 200_000,
84 }
85 }
86
87 fn auto_output_strategy(&self, _model: &str) -> crate::ir::OutputStrategy {
88 crate::ir::OutputStrategy::Tool
97 }
98
99 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
100 let (body, warnings) = build_body(request, false)?;
101 let mut encoded = finalize_request(&body, warnings)?;
102 apply_anthropic_beta_header(&mut encoded, request)?;
103 Ok(encoded)
104 }
105
106 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
107 let (body, warnings) = build_body(request, true)?;
108 let mut encoded = finalize_request(&body, warnings)?;
109 encoded.headers.insert(
110 http::header::ACCEPT,
111 http::HeaderValue::from_static("text/event-stream"),
112 );
113 apply_anthropic_beta_header(&mut encoded, request)?;
114 Ok(encoded.into_streaming())
115 }
116
117 fn decode_stream<'a>(
118 &'a self,
119 bytes: BoxByteStream<'a>,
120 warnings_in: Vec<ModelWarning>,
121 ) -> BoxDeltaStream<'a> {
122 Box::pin(stream_anthropic_sse(bytes, warnings_in))
123 }
124
125 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
126 let raw: Value = super::codec::parse_response_body(body, "Anthropic Messages")?;
127 let mut warnings = warnings_in;
128
129 let id = str_field(&raw, "id").to_owned();
130 let model = str_field(&raw, "model").to_owned();
131 let content = decode_content(&raw, &mut warnings);
132 let stop_reason = decode_stop_reason(&raw, &mut warnings);
133 let usage = decode_usage(&raw);
134
135 Ok(ModelResponse {
136 id,
137 model,
138 stop_reason,
139 content,
140 usage,
141 rate_limit: None,
142 warnings,
143 provider_echoes: Vec::new(),
144 })
145 }
146
147 fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
148 let mut snapshot = RateLimitSnapshot::default();
149 let mut populated = false;
150 for (header_name, target) in [
151 (
152 "anthropic-ratelimit-requests-remaining",
153 &mut snapshot.requests_remaining,
154 ),
155 (
156 "anthropic-ratelimit-tokens-remaining",
157 &mut snapshot.tokens_remaining,
158 ),
159 ] {
160 if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
161 && let Ok(parsed) = v.parse::<u64>()
162 {
163 *target = Some(parsed);
164 snapshot.raw.insert(header_name.to_owned(), v.to_owned());
165 populated = true;
166 }
167 }
168 for (header_name, target) in [
169 (
170 "anthropic-ratelimit-requests-reset",
171 &mut snapshot.requests_reset_at,
172 ),
173 (
174 "anthropic-ratelimit-tokens-reset",
175 &mut snapshot.tokens_reset_at,
176 ),
177 ] {
178 if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
179 && let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(v)
180 {
181 *target = Some(parsed.with_timezone(&chrono::Utc));
182 snapshot.raw.insert(header_name.to_owned(), v.to_owned());
183 populated = true;
184 }
185 }
186 populated.then_some(snapshot)
187 }
188}
189
190fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
193 if request.messages.is_empty() {
194 return Err(Error::invalid_request(
195 "Anthropic Messages requires at least one message",
196 ));
197 }
198
199 let max_tokens = request.max_tokens.ok_or_else(|| {
206 Error::invalid_request(
207 "Anthropic Messages requires max_tokens; \
208 set ModelRequest::max_tokens explicitly",
209 )
210 })?;
211
212 let mut warnings = Vec::with_capacity(1);
215 let (system_value, wire_messages) = encode_messages(request, &mut warnings);
216
217 let mut body = Map::new();
218 body.insert("model".into(), Value::String(request.model.clone()));
219 body.insert("messages".into(), Value::Array(wire_messages));
220 body.insert("max_tokens".into(), json!(max_tokens));
221 if let Some(value) = system_value {
222 body.insert("system".into(), value);
223 }
224 if let Some(temp) = request.temperature {
225 body.insert("temperature".into(), json!(temp));
226 }
227 if let Some(k) = request.top_k {
228 body.insert("top_k".into(), json!(k));
229 }
230 if let Some(p) = request.top_p {
231 body.insert("top_p".into(), json!(p));
232 }
233 if !request.stop_sequences.is_empty() {
234 body.insert(
235 "stop_sequences".into(),
236 json!(request.stop_sequences.clone()),
237 );
238 }
239 if !request.tools.is_empty() {
240 body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
241 body.insert(
242 "tool_choice".into(),
243 encode_tool_choice(&request.tool_choice),
244 );
245 }
246 if streaming {
247 body.insert("stream".into(), Value::Bool(true));
248 }
249 if let Some(format) = &request.response_format {
250 encode_anthropic_structured_output(format, &request.model, &mut body, &mut warnings)?;
251 }
252 apply_provider_extensions(request, &mut body, &mut warnings);
253 Ok((Value::Object(body), warnings))
254}
255
256fn apply_anthropic_beta_header(encoded: &mut EncodedRequest, request: &ModelRequest) -> Result<()> {
261 let Some(anthropic) = &request.provider_extensions.anthropic else {
262 return Ok(());
263 };
264 if anthropic.betas.is_empty() {
265 return Ok(());
266 }
267 let value = anthropic.betas.join(",");
268 let header = http::HeaderValue::from_str(&value).map_err(|_| {
269 crate::Error::invalid_request(
270 "AnthropicExt::betas: each entry must contain only visible ASCII",
271 )
272 })?;
273 encoded
274 .headers
275 .insert(http::HeaderName::from_static("anthropic-beta"), header);
276 Ok(())
277}
278
279fn apply_provider_extensions(
284 request: &ModelRequest,
285 body: &mut Map<String, Value>,
286 warnings: &mut Vec<ModelWarning>,
287) {
288 let ext = &request.provider_extensions;
289 if let Some(parallel) = request.parallel_tool_calls {
296 if let Some(tc) = body.get_mut("tool_choice").and_then(Value::as_object_mut) {
297 tc.insert("disable_parallel_tool_use".into(), json!(!parallel));
298 } else {
299 warnings.push(ModelWarning::LossyEncode {
300 field: "parallel_tool_calls".into(),
301 detail: "Anthropic encodes via tool_choice.disable_parallel_tool_use; \
302 no tool_choice block (tools list empty) — knob discarded"
303 .into(),
304 });
305 }
306 }
307 if let Some(user_id) = &request.end_user_id {
308 body.insert("metadata".into(), json!({"user_id": user_id}));
309 }
310 if request.seed.is_some() {
311 warnings.push(ModelWarning::LossyEncode {
312 field: "seed".into(),
313 detail: "Anthropic Messages has no deterministic-sampling knob — drop the field".into(),
314 });
315 }
316 if let Some(effort) = &request.reasoning_effort {
317 encode_anthropic_thinking(&request.model, effort, body, warnings);
318 }
319 if ext.openai_chat.is_some() {
320 warnings.push(ModelWarning::ProviderExtensionIgnored {
321 vendor: "openai_chat".into(),
322 });
323 }
324 if ext.openai_responses.is_some() {
325 warnings.push(ModelWarning::ProviderExtensionIgnored {
326 vendor: "openai_responses".into(),
327 });
328 }
329 if ext.gemini.is_some() {
330 warnings.push(ModelWarning::ProviderExtensionIgnored {
331 vendor: "gemini".into(),
332 });
333 }
334 if ext.bedrock.is_some() {
335 warnings.push(ModelWarning::ProviderExtensionIgnored {
336 vendor: "bedrock".into(),
337 });
338 }
339}
340
341fn encode_anthropic_structured_output(
346 format: &ResponseFormat,
347 model: &str,
348 body: &mut Map<String, Value>,
349 warnings: &mut Vec<ModelWarning>,
350) -> Result<()> {
351 let strategy = resolve_output_strategy(format.strategy, model);
352 match strategy {
353 OutputStrategy::Native => {
354 body.insert(
359 "output_config".into(),
360 json!({
361 "format": {
362 "type": "json_schema",
363 "schema": format.json_schema.schema.clone(),
364 }
365 }),
366 );
367 if !format.strict {
368 warnings.push(ModelWarning::LossyEncode {
369 field: "response_format.strict".into(),
370 detail: "Anthropic always strict-validates structured output; \
371 the strict=false request was approximated"
372 .into(),
373 });
374 }
375 }
376 OutputStrategy::Tool => {
377 let tool_name = format.json_schema.name.clone();
386 let synthetic_tool = json!({
387 "type": "custom",
388 "name": tool_name,
389 "description": format!(
390 "Emit the response as a JSON object matching the {tool_name} schema."
391 ),
392 "input_schema": format.json_schema.schema.clone(),
393 });
394 let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
400 if let Value::Array(arr) = tools {
401 arr.insert(0, synthetic_tool);
402 }
403 body.insert(
404 "tool_choice".into(),
405 json!({
406 "type": "tool",
407 "name": format.json_schema.name,
408 "disable_parallel_tool_use": true,
412 }),
413 );
414 if !format.strict {
415 warnings.push(ModelWarning::LossyEncode {
419 field: "response_format.strict".into(),
420 detail: "Anthropic Tool-strategy structured output is always \
421 schema-validated; strict=false was approximated"
422 .into(),
423 });
424 }
425 }
426 OutputStrategy::Prompted => {
427 return Err(Error::invalid_request(
428 "OutputStrategy::Prompted is deferred to entelix 1.1; use \
429 OutputStrategy::Native or OutputStrategy::Tool",
430 ));
431 }
432 OutputStrategy::Auto => {
433 return Err(Error::invalid_request(
435 "OutputStrategy::Auto did not resolve — codec invariant violation",
436 ));
437 }
438 }
439 Ok(())
440}
441
442#[allow(clippy::match_same_arms)]
451const fn resolve_output_strategy(strategy: OutputStrategy, _model: &str) -> OutputStrategy {
452 match strategy {
453 OutputStrategy::Auto => OutputStrategy::Tool,
454 OutputStrategy::Native => OutputStrategy::Native,
455 OutputStrategy::Tool => OutputStrategy::Tool,
456 OutputStrategy::Prompted => OutputStrategy::Prompted,
457 }
458}
459
460fn is_anthropic_adaptive_only(model: &str) -> bool {
468 model.starts_with("claude-opus-4-7")
471}
472
473fn encode_anthropic_thinking(
492 model: &str,
493 effort: &ReasoningEffort,
494 body: &mut Map<String, Value>,
495 warnings: &mut Vec<ModelWarning>,
496) {
497 let adaptive_only = is_anthropic_adaptive_only(model);
498 let thinking = match effort {
499 ReasoningEffort::Off => {
500 json!({"type": "disabled"})
501 }
502 ReasoningEffort::Minimal => {
503 warnings.push(ModelWarning::LossyEncode {
504 field: "reasoning_effort".into(),
505 detail:
506 "Anthropic has no `Minimal` bucket — snapped to `{type:\"adaptive\", effort:\"low\"}`"
507 .into(),
508 });
509 json!({"type": "adaptive", "effort": "low"})
510 }
511 ReasoningEffort::Low => {
512 if adaptive_only {
513 json!({"type": "adaptive", "effort": "low"})
514 } else {
515 json!({"type": "enabled", "budget_tokens": 1024})
516 }
517 }
518 ReasoningEffort::Medium => {
519 if adaptive_only {
520 json!({"type": "adaptive", "effort": "medium"})
521 } else {
522 json!({"type": "enabled", "budget_tokens": 4096})
523 }
524 }
525 ReasoningEffort::High => {
526 if adaptive_only {
527 json!({"type": "adaptive", "effort": "high"})
528 } else {
529 json!({"type": "enabled", "budget_tokens": 16384})
530 }
531 }
532 ReasoningEffort::Auto => {
533 json!({"type": "adaptive"})
534 }
535 ReasoningEffort::VendorSpecific(literal) => {
536 if adaptive_only {
537 warnings.push(ModelWarning::LossyEncode {
543 field: "reasoning_effort".into(),
544 detail: format!(
545 "Anthropic {model} is adaptive-only — manual budget '{literal}' \
546 dropped; emitting `{{type:\"adaptive\"}}` instead"
547 ),
548 });
549 json!({"type": "adaptive"})
550 } else if let Ok(budget) = literal.parse::<u32>() {
551 json!({"type": "enabled", "budget_tokens": budget})
552 } else {
553 warnings.push(ModelWarning::LossyEncode {
554 field: "reasoning_effort".into(),
555 detail: format!(
556 "Anthropic vendor-specific reasoning_effort {literal:?} is not a \
557 numeric budget_tokens — falling through to `Medium`"
558 ),
559 });
560 json!({"type": "enabled", "budget_tokens": 4096})
561 }
562 }
563 };
564 body.insert("thinking".into(), thinking);
565}
566
567fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
568 let bytes = serde_json::to_vec(body)?;
569 let mut encoded = EncodedRequest::post_json("/v1/messages", Bytes::from(bytes));
570 encoded.headers.insert(
571 http::HeaderName::from_static("anthropic-version"),
572 http::HeaderValue::from_static(ANTHROPIC_VERSION),
573 );
574 encoded.warnings = warnings;
575 Ok(encoded)
576}
577
578fn encode_messages(
581 request: &ModelRequest,
582 warnings: &mut Vec<ModelWarning>,
583) -> (Option<Value>, Vec<Value>) {
584 let mut system_blocks: Vec<(String, Option<crate::ir::CacheControl>)> = request
589 .system
590 .blocks()
591 .iter()
592 .map(|b| (b.text.clone(), b.cache_control))
593 .collect();
594 let mut wire_messages = Vec::with_capacity(request.messages.len());
595
596 for (idx, msg) in request.messages.iter().enumerate() {
597 match msg.role {
598 Role::System => {
599 let mut lossy_non_text = false;
600 let text = msg
601 .content
602 .iter()
603 .filter_map(|part| {
604 if let ContentPart::Text { text, .. } = part {
605 Some(text.clone())
606 } else {
607 lossy_non_text = true;
608 None
609 }
610 })
611 .collect::<Vec<_>>()
612 .join("\n");
613 if lossy_non_text {
614 warnings.push(ModelWarning::LossyEncode {
615 field: format!("messages[{idx}].content"),
616 detail: "non-text parts dropped from system message (Anthropic has no \
617 system role)"
618 .into(),
619 });
620 }
621 if !text.is_empty() {
622 system_blocks.push((text, None));
623 }
624 }
625 Role::User | Role::Assistant | Role::Tool => {
626 let role_str = match msg.role {
627 Role::Assistant => "assistant",
628 _ => "user",
629 };
630 let content_array = encode_content_parts(&msg.content, warnings, idx);
631 let mut entry = Map::new();
632 entry.insert("role".into(), Value::String(role_str.into()));
633 entry.insert("content".into(), Value::Array(content_array));
634 wire_messages.push(Value::Object(entry));
635 }
636 }
637 }
638
639 let any_cached = system_blocks.iter().any(|(_, cc)| cc.is_some());
643 let system_value = if system_blocks.is_empty() {
644 None
645 } else if any_cached {
646 let array: Vec<Value> = system_blocks
647 .into_iter()
648 .map(|(text, cc)| {
649 let mut obj = Map::new();
650 obj.insert("type".into(), Value::String("text".into()));
651 obj.insert("text".into(), Value::String(text));
652 if let Some(cache) = cc {
653 obj.insert("cache_control".into(), encode_cache_control(cache));
654 }
655 Value::Object(obj)
656 })
657 .collect();
658 Some(Value::Array(array))
659 } else {
660 Some(Value::String(
661 system_blocks
662 .into_iter()
663 .map(|(text, _)| text)
664 .collect::<Vec<_>>()
665 .join("\n\n"),
666 ))
667 };
668 (system_value, wire_messages)
669}
670
671#[allow(clippy::too_many_lines)] fn encode_content_parts(
673 parts: &[ContentPart],
674 warnings: &mut Vec<ModelWarning>,
675 msg_idx: usize,
676) -> Vec<Value> {
677 let mut out = Vec::with_capacity(parts.len());
678 for (part_idx, part) in parts.iter().enumerate() {
679 let path = || format!("messages[{msg_idx}].content[{part_idx}]");
680 match part {
681 ContentPart::Text {
682 text,
683 cache_control,
684 ..
685 } => {
686 let mut block = json_block("text", &[("text", Value::String(text.clone()))]);
687 attach_cache_control(&mut block, cache_control.as_ref());
688 out.push(Value::Object(block));
689 }
690 ContentPart::Image {
691 source,
692 cache_control,
693 ..
694 } => {
695 let mut block = json_block(
696 "image",
697 &[("source", encode_media_source_anthropic(source))],
698 );
699 attach_cache_control(&mut block, cache_control.as_ref());
700 out.push(Value::Object(block));
701 }
702 ContentPart::Audio { .. } => warnings.push(ModelWarning::LossyEncode {
703 field: path(),
704 detail: "Anthropic Messages does not accept audio inputs; block dropped".into(),
705 }),
706 ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
707 field: path(),
708 detail: "Anthropic Messages does not accept video inputs; block dropped".into(),
709 }),
710 ContentPart::Document {
711 source,
712 name,
713 cache_control,
714 ..
715 } => {
716 let mut block = Map::new();
717 block.insert("type".into(), Value::String("document".into()));
718 block.insert("source".into(), encode_media_source_anthropic(source));
719 if let Some(title) = name {
720 block.insert("title".into(), Value::String(title.clone()));
721 }
722 attach_cache_control(&mut block, cache_control.as_ref());
723 out.push(Value::Object(block));
724 }
725 ContentPart::Thinking {
726 text,
727 cache_control,
728 provider_echoes,
729 } => {
730 let mut block = Map::new();
731 block.insert("type".into(), Value::String("thinking".into()));
732 block.insert("thinking".into(), Value::String(text.clone()));
733 if let Some(sig) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
734 .and_then(|e| e.payload_str("signature"))
735 {
736 block.insert("signature".into(), Value::String(sig.to_owned()));
737 }
738 attach_cache_control(&mut block, cache_control.as_ref());
739 out.push(Value::Object(block));
740 }
741 ContentPart::RedactedThinking { provider_echoes } => {
742 let Some(data) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
743 .and_then(|e| e.payload_str("data"))
744 else {
745 warnings.push(ModelWarning::LossyEncode {
748 field: path(),
749 detail: "redacted_thinking part missing 'anthropic-messages' \
750 provider_echo with 'data' payload; block dropped"
751 .into(),
752 });
753 continue;
754 };
755 let mut block = Map::new();
756 block.insert("type".into(), Value::String("redacted_thinking".into()));
757 block.insert("data".into(), Value::String(data.to_owned()));
758 out.push(Value::Object(block));
759 }
760 ContentPart::Citation {
761 snippet,
762 source,
763 cache_control,
764 ..
765 } => {
766 let citation_json = match source {
770 CitationSource::Url { url, title } => {
771 let mut o = Map::new();
772 o.insert(
773 "type".into(),
774 Value::String("web_search_result_location".into()),
775 );
776 o.insert("url".into(), Value::String(url.clone()));
777 if let Some(t) = title {
778 o.insert("title".into(), Value::String(t.clone()));
779 }
780 Value::Object(o)
781 }
782 CitationSource::Document {
783 document_index,
784 title,
785 } => {
786 let mut o = Map::new();
787 o.insert("type".into(), Value::String("char_location".into()));
788 o.insert("document_index".into(), json!(*document_index));
789 if let Some(t) = title {
790 o.insert("document_title".into(), Value::String(t.clone()));
791 }
792 Value::Object(o)
793 }
794 };
795 let mut block = Map::new();
796 block.insert("type".into(), Value::String("text".into()));
797 block.insert("text".into(), Value::String(snippet.clone()));
798 block.insert("citations".into(), Value::Array(vec![citation_json]));
799 attach_cache_control(&mut block, cache_control.as_ref());
800 out.push(Value::Object(block));
801 }
802 ContentPart::ToolUse {
803 id, name, input, ..
804 } => out.push(json!({
805 "type": "tool_use",
806 "id": id,
807 "name": name,
808 "input": input,
809 })),
810 ContentPart::ToolResult {
811 tool_use_id,
812 name: _,
813 content,
814 is_error,
815 cache_control,
816 ..
817 } => out.push(encode_tool_result(
818 tool_use_id,
819 content,
820 *is_error,
821 cache_control.as_ref(),
822 warnings,
823 msg_idx,
824 part_idx,
825 )),
826 ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
827 warnings.push(ModelWarning::LossyEncode {
833 field: path(),
834 detail: "Anthropic Messages does not accept assistant-produced \
835 image / audio output as input — block dropped"
836 .into(),
837 });
838 }
839 }
840 }
841 out
842}
843
844fn json_block(kind: &str, fields: &[(&str, Value)]) -> Map<String, Value> {
846 let mut block = Map::new();
847 block.insert("type".into(), Value::String(kind.into()));
848 for (k, v) in fields {
849 block.insert((*k).to_owned(), v.clone());
850 }
851 block
852}
853
854fn attach_cache_control(block: &mut Map<String, Value>, cache: Option<&crate::ir::CacheControl>) {
858 if let Some(cache) = cache {
859 block.insert("cache_control".into(), encode_cache_control(*cache));
860 }
861}
862
863fn encode_cache_control(cache: crate::ir::CacheControl) -> Value {
868 let mut obj = Map::new();
869 obj.insert("type".into(), Value::String("ephemeral".into()));
870 if let Some(ttl) = cache.ttl.wire_ttl_field() {
871 obj.insert("ttl".into(), Value::String(ttl.into()));
872 }
873 Value::Object(obj)
874}
875
876fn encode_media_source_anthropic(source: &MediaSource) -> Value {
877 match source {
878 MediaSource::Url { url, .. } => json!({
879 "type": "url",
880 "url": url,
881 }),
882 MediaSource::Base64 { media_type, data } => json!({
883 "type": "base64",
884 "media_type": media_type,
885 "data": data,
886 }),
887 MediaSource::FileId { id, .. } => json!({
888 "type": "file",
889 "file_id": id,
890 }),
891 }
892}
893
894fn encode_tool_result(
895 tool_use_id: &str,
896 content: &ToolResultContent,
897 is_error: bool,
898 cache_control: Option<&crate::ir::CacheControl>,
899 warnings: &mut Vec<ModelWarning>,
900 msg_idx: usize,
901 part_idx: usize,
902) -> Value {
903 let content_json = match content {
904 ToolResultContent::Text(s) => Value::String(s.clone()),
905 ToolResultContent::Json(v) => {
906 warnings.push(ModelWarning::LossyEncode {
907 field: format!("messages[{msg_idx}].content[{part_idx}]"),
908 detail: "tool_result Json payload stringified for Anthropic wire format".into(),
909 });
910 Value::String(v.to_string())
911 }
912 };
913 let mut block = Map::new();
914 block.insert("type".into(), Value::String("tool_result".into()));
915 block.insert("tool_use_id".into(), Value::String(tool_use_id.into()));
916 block.insert("content".into(), content_json);
917 if is_error {
918 block.insert("is_error".into(), Value::Bool(true));
919 }
920 attach_cache_control(&mut block, cache_control);
921 Value::Object(block)
922}
923
924fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
925 let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
926 for (idx, t) in tools.iter().enumerate() {
927 let mut obj = match &t.kind {
928 ToolKind::Function { input_schema } => {
929 let mut o = Map::new();
930 o.insert("name".into(), Value::String(t.name.clone()));
931 o.insert("description".into(), Value::String(t.description.clone()));
932 o.insert("input_schema".into(), input_schema.clone());
933 o
934 }
935 ToolKind::WebSearch {
936 max_uses,
937 allowed_domains,
938 } => {
939 let mut o = Map::new();
940 o.insert("type".into(), Value::String("web_search_20250305".into()));
941 o.insert("name".into(), Value::String(t.name.clone()));
942 if let Some(n) = max_uses {
943 o.insert("max_uses".into(), json!(*n));
944 }
945 if !allowed_domains.is_empty() {
946 o.insert("allowed_domains".into(), json!(allowed_domains));
947 }
948 o
949 }
950 ToolKind::Computer {
951 display_width,
952 display_height,
953 } => {
954 let mut o = Map::new();
955 o.insert("type".into(), Value::String("computer_20250124".into()));
956 o.insert("name".into(), Value::String(t.name.clone()));
957 o.insert("display_width_px".into(), json!(*display_width));
958 o.insert("display_height_px".into(), json!(*display_height));
959 o
960 }
961 ToolKind::TextEditor => {
962 let mut o = Map::new();
963 o.insert("type".into(), Value::String("text_editor_20250124".into()));
964 o.insert("name".into(), Value::String(t.name.clone()));
965 o
966 }
967 ToolKind::Bash => {
968 let mut o = Map::new();
969 o.insert("type".into(), Value::String("bash_20250124".into()));
970 o.insert("name".into(), Value::String(t.name.clone()));
971 o
972 }
973 ToolKind::CodeExecution => {
974 let mut o = Map::new();
975 o.insert(
976 "type".into(),
977 Value::String("code_execution_20250522".into()),
978 );
979 o.insert("name".into(), Value::String(t.name.clone()));
980 o
981 }
982 ToolKind::McpConnector {
983 name,
984 server_url,
985 authorization_token,
986 } => {
987 let mut o = Map::new();
988 o.insert("type".into(), Value::String("mcp".into()));
989 o.insert("name".into(), Value::String(name.clone()));
990 o.insert("server_url".into(), Value::String(server_url.clone()));
991 if let Some(token) = authorization_token {
992 o.insert("authorization_token".into(), Value::String(token.clone()));
993 }
994 o
995 }
996 ToolKind::Memory => {
997 let mut o = Map::new();
998 o.insert("type".into(), Value::String("memory_20250818".into()));
999 o.insert("name".into(), Value::String(t.name.clone()));
1000 o
1001 }
1002 ToolKind::FileSearch { .. } | ToolKind::CodeInterpreter | ToolKind::ImageGeneration => {
1003 warnings.push(ModelWarning::LossyEncode {
1004 field: format!("tools[{idx}]"),
1005 detail: "Anthropic does not natively support OpenAI-only built-ins \
1006 (file_search / code_interpreter / image_generation) — tool dropped"
1007 .into(),
1008 });
1009 continue;
1010 }
1011 };
1012 attach_cache_control(&mut obj, t.cache_control.as_ref());
1013 arr.push(Value::Object(obj));
1014 }
1015 Value::Array(arr)
1016}
1017
1018fn encode_tool_choice(choice: &ToolChoice) -> Value {
1019 match choice {
1020 ToolChoice::Auto => json!({ "type": "auto" }),
1021 ToolChoice::Required => json!({ "type": "any" }),
1022 ToolChoice::Specific { name } => json!({ "type": "tool", "name": name }),
1023 ToolChoice::None => json!({ "type": "none" }),
1024 }
1025}
1026
1027fn decode_content(raw: &Value, warnings: &mut Vec<ModelWarning>) -> Vec<ContentPart> {
1030 let Some(arr) = raw.get("content").and_then(Value::as_array) else {
1031 return Vec::new();
1032 };
1033 let mut out = Vec::with_capacity(arr.len());
1034 for (idx, block) in arr.iter().enumerate() {
1035 match block.get("type").and_then(Value::as_str) {
1036 Some("text") => {
1037 let text = str_field(block, "text").to_owned();
1038 if let Some(citations) = block.get("citations").and_then(Value::as_array) {
1043 for c in citations {
1044 if let Some(source) = decode_citation_source(c) {
1045 out.push(ContentPart::Citation {
1046 snippet: text.clone(),
1047 source,
1048 cache_control: None,
1049 provider_echoes: Vec::new(),
1050 });
1051 }
1052 }
1053 }
1054 if !text.is_empty() {
1055 out.push(ContentPart::text(text));
1056 }
1057 }
1058 Some("thinking") => {
1059 let thinking_text = str_field(block, "thinking").to_owned();
1060 let mut provider_echoes = Vec::new();
1061 if let Some(sig) = block
1062 .get("signature")
1063 .and_then(Value::as_str)
1064 .filter(|s| !s.is_empty())
1065 {
1066 provider_echoes.push(ProviderEchoSnapshot::for_provider(
1067 PROVIDER_KEY,
1068 "signature",
1069 sig.to_owned(),
1070 ));
1071 }
1072 out.push(ContentPart::Thinking {
1073 text: thinking_text,
1074 cache_control: None,
1075 provider_echoes,
1076 });
1077 }
1078 Some("redacted_thinking") => {
1079 let data = str_field(block, "data").to_owned();
1082 out.push(ContentPart::RedactedThinking {
1083 provider_echoes: vec![ProviderEchoSnapshot::for_provider(
1084 PROVIDER_KEY,
1085 "data",
1086 data,
1087 )],
1088 });
1089 }
1090 Some("tool_use") => {
1091 let id = str_field(block, "id").to_owned();
1092 let name = str_field(block, "name").to_owned();
1093 let input = block.get("input").cloned().unwrap_or_else(|| json!({})); out.push(ContentPart::ToolUse {
1095 id,
1096 name,
1097 input,
1098 provider_echoes: Vec::new(),
1099 });
1100 }
1101 Some(other) => {
1102 warnings.push(ModelWarning::LossyEncode {
1103 field: format!("response.content[{idx}]"),
1104 detail: format!("unknown content block type '{other}' dropped"),
1105 });
1106 }
1107 None => {
1108 warnings.push(ModelWarning::LossyEncode {
1109 field: format!("response.content[{idx}]"),
1110 detail: "content block missing 'type' field".into(),
1111 });
1112 }
1113 }
1114 }
1115 out
1116}
1117
1118fn decode_citation_source(c: &Value) -> Option<CitationSource> {
1119 match c.get("type").and_then(Value::as_str)? {
1120 "web_search_result_location" => Some(CitationSource::Url {
1121 url: str_field(c, "url").to_owned(),
1122 title: c.get("title").and_then(Value::as_str).map(str::to_owned),
1123 }),
1124 "char_location" | "page_location" | "content_block_location" => {
1125 let document_index = c
1132 .get("document_index")
1133 .and_then(Value::as_u64)
1134 .and_then(|n| u32::try_from(n).ok())?;
1135 Some(CitationSource::Document {
1136 document_index,
1137 title: c
1138 .get("document_title")
1139 .and_then(Value::as_str)
1140 .map(str::to_owned),
1141 })
1142 }
1143 _ => None,
1144 }
1145}
1146
1147fn decode_stop_reason(raw: &Value, warnings: &mut Vec<ModelWarning>) -> StopReason {
1148 match raw.get("stop_reason").and_then(Value::as_str) {
1149 Some("end_turn") => StopReason::EndTurn,
1150 Some("max_tokens") => StopReason::MaxTokens,
1151 Some("stop_sequence") => StopReason::StopSequence {
1152 sequence: str_field(raw, "stop_sequence").to_owned(),
1153 },
1154 Some("tool_use") => StopReason::ToolUse,
1155 Some("refusal") => StopReason::Refusal {
1156 reason: RefusalReason::Safety,
1157 },
1158 Some(other) => {
1159 warnings.push(ModelWarning::UnknownStopReason {
1160 raw: other.to_owned(),
1161 });
1162 StopReason::Other {
1163 raw: other.to_owned(),
1164 }
1165 }
1166 None => {
1167 warnings.push(ModelWarning::LossyEncode {
1171 field: "stop_reason".into(),
1172 detail: "Anthropic Messages payload carried no stop_reason — \
1173 IR records `Other{raw:\"missing\"}`"
1174 .into(),
1175 });
1176 StopReason::Other {
1177 raw: "missing".to_owned(),
1178 }
1179 }
1180 }
1181}
1182
1183fn decode_usage(raw: &Value) -> Usage {
1184 let usage = raw.get("usage");
1185 Usage {
1186 input_tokens: u_field(usage, "input_tokens"),
1187 output_tokens: u_field(usage, "output_tokens"),
1188 cached_input_tokens: u_field(usage, "cache_read_input_tokens"),
1189 cache_creation_input_tokens: u_field(usage, "cache_creation_input_tokens"),
1190 reasoning_tokens: 0,
1191 safety_ratings: Vec::new(),
1192 }
1193}
1194
1195fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1196 v.get(key).and_then(Value::as_str).unwrap_or("") }
1198
1199fn u_field(v: Option<&Value>, key: &str) -> u32 {
1200 v.and_then(|inner| inner.get(key))
1201 .and_then(Value::as_u64)
1202 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
1204
1205#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1210enum BlockKind {
1211 Text,
1212 Thinking,
1213 ToolUse,
1214}
1215
1216#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1217fn stream_anthropic_sse(
1218 bytes: BoxByteStream<'_>,
1219 warnings_in: Vec<ModelWarning>,
1220) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1221 async_stream::stream! {
1222 let mut bytes = bytes;
1223 let mut buf: Vec<u8> = Vec::new();
1224 let mut blocks: HashMap<u64, BlockKind> = HashMap::new();
1225 let mut last_stop_reason = StopReason::EndTurn;
1226 let mut accumulated_usage = Usage::default();
1227 let mut warnings_emitted = false;
1228
1229 while let Some(chunk) = bytes.next().await {
1230 match chunk {
1231 Ok(b) => buf.extend_from_slice(&b),
1232 Err(e) => {
1233 yield Err(e);
1234 return;
1235 }
1236 }
1237 if !warnings_emitted {
1239 warnings_emitted = true;
1240 for w in &warnings_in {
1241 yield Ok(StreamDelta::Warning(w.clone()));
1242 }
1243 }
1244 while let Some(pos) = find_double_newline(&buf) {
1246 let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1247 let Ok(frame_str) = std::str::from_utf8(&frame) else {
1248 continue;
1249 };
1250 let Some(payload) = parse_sse_data(frame_str) else {
1251 continue;
1252 };
1253 let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1254 yield Err(Error::invalid_request(format!(
1255 "Anthropic stream: malformed event payload: {payload}"
1256 )));
1257 return;
1258 };
1259 let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); match event_type {
1261 "message_start" => {
1262 let message = event.get("message").unwrap_or(&Value::Null); let id = str_field(message, "id").to_owned();
1264 let model = str_field(message, "model").to_owned();
1265 if let Some(usage) = message.get("usage") {
1266 accumulated_usage.input_tokens = u_field(Some(usage), "input_tokens");
1267 accumulated_usage.cached_input_tokens =
1268 u_field(Some(usage), "cache_read_input_tokens");
1269 accumulated_usage.cache_creation_input_tokens =
1270 u_field(Some(usage), "cache_creation_input_tokens");
1271 }
1272 yield Ok(StreamDelta::Start {
1273 id,
1274 model,
1275 provider_echoes: Vec::new(),
1276 });
1277 }
1278 "content_block_start" => {
1279 let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1280 n
1281 } else {
1282 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1283 field: "stream.content_block_start.index".into(),
1284 detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 to keep stream parser progressing".into(),
1285 }));
1286 0
1287 };
1288 let block = event.get("content_block").unwrap_or(&Value::Null); match block.get("type").and_then(Value::as_str) {
1290 Some("text") => {
1291 blocks.insert(idx, BlockKind::Text);
1292 if let Some(text) = block.get("text").and_then(Value::as_str)
1293 && !text.is_empty()
1294 {
1295 yield Ok(StreamDelta::TextDelta {
1296 text: text.to_owned(),
1297 provider_echoes: Vec::new(),
1298 });
1299 }
1300 }
1301 Some("thinking") => {
1302 blocks.insert(idx, BlockKind::Thinking);
1303 let text = block
1304 .get("thinking")
1305 .and_then(Value::as_str)
1306 .unwrap_or("") .to_owned();
1308 let provider_echoes = block
1316 .get("signature")
1317 .and_then(Value::as_str)
1318 .filter(|s| !s.is_empty())
1319 .map(|sig| {
1320 vec![ProviderEchoSnapshot::for_provider(
1321 PROVIDER_KEY,
1322 "signature",
1323 sig.to_owned(),
1324 )]
1325 })
1326 .unwrap_or_default();
1327 if !text.is_empty() || !provider_echoes.is_empty() {
1328 yield Ok(StreamDelta::ThinkingDelta {
1329 text,
1330 provider_echoes,
1331 });
1332 }
1333 }
1334 Some("tool_use") => {
1335 blocks.insert(idx, BlockKind::ToolUse);
1336 let id = str_field(block, "id").to_owned();
1337 let name = str_field(block, "name").to_owned();
1338 yield Ok(StreamDelta::ToolUseStart {
1339 id,
1340 name,
1341 provider_echoes: Vec::new(),
1342 });
1343 }
1344 other => {
1345 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1346 field: format!("stream.content_block_start[{idx}]"),
1347 detail: format!(
1348 "unsupported block type {other:?} dropped"
1349 ),
1350 }));
1351 }
1352 }
1353 }
1354 "content_block_delta" => {
1355 let delta = event.get("delta").unwrap_or(&Value::Null); match delta.get("type").and_then(Value::as_str) {
1357 Some("text_delta") => {
1358 if let Some(text) = delta.get("text").and_then(Value::as_str) {
1359 yield Ok(StreamDelta::TextDelta {
1360 text: text.to_owned(),
1361 provider_echoes: Vec::new(),
1362 });
1363 }
1364 }
1365 Some("thinking_delta") => {
1366 if let Some(text) = delta.get("thinking").and_then(Value::as_str) {
1367 yield Ok(StreamDelta::ThinkingDelta {
1368 text: text.to_owned(),
1369 provider_echoes: Vec::new(),
1370 });
1371 }
1372 }
1373 Some("signature_delta") => {
1374 if let Some(sig) =
1375 delta.get("signature").and_then(Value::as_str)
1376 {
1377 yield Ok(StreamDelta::ThinkingDelta {
1378 text: String::new(),
1379 provider_echoes: vec![
1380 ProviderEchoSnapshot::for_provider(
1381 PROVIDER_KEY,
1382 "signature",
1383 sig.to_owned(),
1384 ),
1385 ],
1386 });
1387 }
1388 }
1389 Some("input_json_delta") => {
1390 if let Some(partial) =
1391 delta.get("partial_json").and_then(Value::as_str)
1392 {
1393 yield Ok(StreamDelta::ToolUseInputDelta {
1394 partial_json: partial.to_owned(),
1395 });
1396 }
1397 }
1398 other => {
1399 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1400 field: "stream.content_block_delta".into(),
1401 detail: format!(
1402 "unsupported delta type {other:?} dropped"
1403 ),
1404 }));
1405 }
1406 }
1407 }
1408 "content_block_stop" => {
1409 let idx = if let Some(n) = event.get("index").and_then(Value::as_u64) {
1410 n
1411 } else {
1412 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1413 field: "stream.content_block_stop.index".into(),
1414 detail: "Anthropic SSE event missing spec-mandated 'index' field; falling back to slot 0 (mirrors the content_block_start handler)".into(),
1415 }));
1416 0
1417 };
1418 if matches!(blocks.remove(&idx), Some(BlockKind::ToolUse)) {
1419 yield Ok(StreamDelta::ToolUseStop);
1420 }
1421 }
1422 "message_delta" => {
1423 if let Some(delta) = event.get("delta")
1424 && let Some(reason) =
1425 delta.get("stop_reason").and_then(Value::as_str)
1426 {
1427 last_stop_reason = match reason {
1435 "end_turn" => StopReason::EndTurn,
1436 "max_tokens" => StopReason::MaxTokens,
1437 "stop_sequence" => StopReason::StopSequence {
1438 sequence: delta
1439 .get("stop_sequence")
1440 .and_then(Value::as_str)
1441 .unwrap_or_default() .to_owned(),
1443 },
1444 "tool_use" => StopReason::ToolUse,
1445 "refusal" => StopReason::Refusal {
1446 reason: RefusalReason::Safety,
1447 },
1448 other => {
1449 yield Ok(StreamDelta::Warning(
1450 ModelWarning::UnknownStopReason {
1451 raw: other.to_owned(),
1452 },
1453 ));
1454 StopReason::Other {
1455 raw: other.to_owned(),
1456 }
1457 }
1458 };
1459 }
1460 if let Some(usage) = event.get("usage") {
1461 accumulated_usage.output_tokens =
1462 u_field(Some(usage), "output_tokens");
1463 yield Ok(StreamDelta::Usage(accumulated_usage.clone()));
1464 }
1465 }
1466 "message_stop" => {
1467 yield Ok(StreamDelta::Stop {
1468 stop_reason: last_stop_reason.clone(),
1469 });
1470 }
1471 "ping" => {
1472 }
1474 "error" => {
1475 let err = event.get("error").unwrap_or(&Value::Null); let kind = str_field(err, "type");
1477 let message = str_field(err, "message");
1478 yield Err(Error::provider_network(format!(
1479 "Anthropic stream error ({kind}): {message}"
1480 )));
1481 return;
1482 }
1483 other => {
1484 yield Ok(StreamDelta::Warning(ModelWarning::LossyEncode {
1485 field: "stream.event".into(),
1486 detail: format!("unknown SSE event type {other:?} ignored"),
1487 }));
1488 }
1489 }
1490 }
1491 }
1492 }
1493}
1494
1495fn find_double_newline(buf: &[u8]) -> Option<usize> {
1498 let lf = buf.windows(2).position(|w| w == b"\n\n");
1499 let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1500 match (lf, crlf) {
1501 (Some(a), Some(b)) => Some(a.min(b)),
1502 (Some(a), None) => Some(a),
1503 (None, Some(b)) => Some(b),
1504 (None, None) => None,
1505 }
1506}
1507
1508fn parse_sse_data(frame: &str) -> Option<String> {
1512 let mut out: Option<String> = None;
1513 for line in frame.lines() {
1514 if let Some(rest) = line.strip_prefix("data:") {
1515 let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
1517 Some(existing) => {
1518 existing.push('\n');
1519 existing.push_str(trimmed);
1520 }
1521 None => out = Some(trimmed.to_owned()),
1522 }
1523 }
1524 }
1525 out
1526}