1use async_trait::async_trait;
2use futures_util::StreamExt;
3use reqwest::{self, header};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use strum_macros::Display;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, warn};
9
10use crate::api::error::{ProviderStreamErrorKind, StreamError};
11use crate::api::provider::{CompletionStream, StreamChunk, TokenUsage};
12use crate::api::sse::parse_sse_stream;
13use crate::api::util::map_http_status_to_api_error;
14use crate::api::{CompletionResponse, Provider, error::ApiError};
15use crate::app::SystemContext;
16use crate::app::conversation::{
17 AssistantContent, ImageSource, Message as AppMessage, ThoughtContent, ToolResult, UserContent,
18};
19use crate::auth::{
20 AnthropicAuth, AuthErrorAction, AuthErrorContext, AuthHeaderContext, InstructionPolicy,
21 RequestKind,
22};
23use crate::auth::{ModelId as AuthModelId, ProviderId as AuthProviderId};
24use crate::config::model::{ModelId, ModelParameters};
25use steer_tools::{InputSchema, ToolCall, ToolSchema};
26
27const API_URL: &str = "https://api.anthropic.com/v1/messages";
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display)]
30pub enum ClaudeMessageRole {
31 #[serde(rename = "user")]
32 #[strum(serialize = "user")]
33 User,
34 #[serde(rename = "assistant")]
35 #[strum(serialize = "assistant")]
36 Assistant,
37 #[serde(rename = "tool")]
38 #[strum(serialize = "tool")]
39 Tool,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44pub struct ClaudeMessage {
45 pub role: ClaudeMessageRole,
46 #[serde(flatten)]
47 pub content: ClaudeMessageContent,
48 #[serde(skip_serializing)]
49 pub id: Option<String>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54#[serde(untagged)]
55pub enum ClaudeMessageContent {
56 Text { content: String },
58 StructuredContent { content: ClaudeStructuredContent },
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64#[serde(transparent)]
65pub struct ClaudeStructuredContent(pub Vec<ClaudeContentBlock>);
66
67#[derive(Clone)]
68enum AuthMode {
69 ApiKey(String),
70 Directive(AnthropicAuth),
71}
72
73#[derive(Clone)]
74pub struct AnthropicClient {
75 http_client: reqwest::Client,
76 auth: AuthMode,
77}
78
79#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
80#[serde(rename_all = "lowercase")]
81#[derive(Default)]
82enum ThinkingType {
83 #[default]
84 Enabled,
85}
86
87#[derive(Debug, Serialize, Deserialize, Clone)]
88struct Thinking {
89 #[serde(rename = "type", default)]
90 thinking_type: ThinkingType,
91 budget_tokens: u32,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
95pub struct ClaudeImageSource {
96 #[serde(rename = "type")]
97 source_type: String,
98 media_type: String,
99 data: String,
100}
101
102#[derive(Debug, Serialize, Clone)]
103struct SystemContentBlock {
104 #[serde(rename = "type")]
105 content_type: String,
106 text: String,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 cache_control: Option<CacheControl>,
109}
110
111#[derive(Debug, Serialize, Clone)]
112#[serde(untagged)]
113enum System {
114 Content(Vec<SystemContentBlock>),
116}
117
118#[derive(Debug, Serialize)]
119struct CompletionRequest {
120 model: String,
121 messages: Vec<ClaudeMessage>,
122 max_tokens: usize,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 system: Option<System>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 tools: Option<Vec<ClaudeTool>>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 temperature: Option<f32>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 top_p: Option<f32>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 top_k: Option<usize>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 stream: Option<bool>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 thinking: Option<Thinking>,
137}
138
139#[derive(Debug, Serialize, Clone)]
140struct ClaudeTool {
141 name: String,
142 description: String,
143 input_schema: InputSchema,
144}
145
146impl From<ToolSchema> for ClaudeTool {
147 fn from(tool: ToolSchema) -> Self {
148 let tool = adapt_tool_schema_for_claude(tool);
149 Self {
150 name: tool.name,
151 description: tool.description,
152 input_schema: tool.input_schema,
153 }
154 }
155}
156
157#[derive(Debug, Serialize, Deserialize, Clone)]
158struct ClaudeCompletionResponse {
159 id: String,
160 content: Vec<ClaudeContentBlock>,
161 model: String,
162 role: String,
163 #[serde(default)]
164 stop_reason: Option<String>,
165 #[serde(default)]
166 stop_sequence: Option<String>,
167 #[serde(default)]
168 usage: ClaudeUsage,
169 #[serde(flatten)]
171 extra: std::collections::HashMap<String, serde_json::Value>,
172}
173
174fn adapt_tool_schema_for_claude(tool: ToolSchema) -> ToolSchema {
175 let root_schema = tool.input_schema.as_value();
176 let sanitized = sanitize_for_claude(root_schema, root_schema);
177 ToolSchema {
178 input_schema: InputSchema::new(sanitized),
179 ..tool
180 }
181}
182
183fn decode_pointer_segment(segment: &str) -> std::borrow::Cow<'_, str> {
184 if !segment.contains('~') {
185 return std::borrow::Cow::Borrowed(segment);
186 }
187 std::borrow::Cow::Owned(segment.replace("~1", "/").replace("~0", "~"))
188}
189
190fn resolve_ref<'a>(root: &'a Value, reference: &str) -> Option<&'a Value> {
191 let path = reference.strip_prefix("#/")?;
192 let mut current = root;
193 for segment in path.split('/') {
194 let decoded = decode_pointer_segment(segment);
195 current = current.get(decoded.as_ref())?;
196 }
197 Some(current)
198}
199
200fn infer_type_from_enum(values: &[Value]) -> Option<String> {
201 let mut has_string = false;
202 let mut has_number = false;
203 let mut has_bool = false;
204 let mut has_object = false;
205 let mut has_array = false;
206
207 for value in values {
208 match value {
209 Value::String(_) => has_string = true,
210 Value::Number(_) => has_number = true,
211 Value::Bool(_) => has_bool = true,
212 Value::Object(_) => has_object = true,
213 Value::Array(_) => has_array = true,
214 Value::Null => {}
215 }
216 }
217
218 let kind_count = u8::from(has_string)
219 + u8::from(has_number)
220 + u8::from(has_bool)
221 + u8::from(has_object)
222 + u8::from(has_array);
223
224 if kind_count != 1 {
225 return None;
226 }
227
228 if has_string {
229 Some("string".to_string())
230 } else if has_number {
231 Some("number".to_string())
232 } else if has_bool {
233 Some("boolean".to_string())
234 } else if has_object {
235 Some("object".to_string())
236 } else if has_array {
237 Some("array".to_string())
238 } else {
239 None
240 }
241}
242
243fn normalize_type(value: &Value) -> Value {
244 if let Some(type_str) = value.as_str() {
245 return Value::String(type_str.to_string());
246 }
247
248 if let Some(type_array) = value.as_array()
249 && let Some(primary_type) = type_array
250 .iter()
251 .find_map(|v| if v.is_null() { None } else { v.as_str() })
252 {
253 return Value::String(primary_type.to_string());
254 }
255
256 Value::String("string".to_string())
257}
258
259fn extract_enum_values(value: &Value) -> Vec<Value> {
260 let Some(obj) = value.as_object() else {
261 return Vec::new();
262 };
263
264 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
265 return enum_values
266 .iter()
267 .filter(|v| !v.is_null())
268 .cloned()
269 .collect();
270 }
271
272 if let Some(const_value) = obj.get("const") {
273 if const_value.is_null() {
274 return Vec::new();
275 }
276 return vec![const_value.clone()];
277 }
278
279 Vec::new()
280}
281
282fn merge_property(properties: &mut serde_json::Map<String, Value>, key: &str, value: &Value) {
283 match properties.get_mut(key) {
284 None => {
285 properties.insert(key.to_string(), value.clone());
286 }
287 Some(existing) => {
288 if existing == value {
289 return;
290 }
291
292 let existing_values = extract_enum_values(existing);
293 let incoming_values = extract_enum_values(value);
294 if incoming_values.is_empty() && existing_values.is_empty() {
295 return;
296 }
297
298 let mut combined = existing_values;
299 for item in incoming_values {
300 if !combined.contains(&item) {
301 combined.push(item);
302 }
303 }
304
305 if combined.is_empty() {
306 return;
307 }
308
309 if let Some(obj) = existing.as_object_mut() {
310 obj.remove("const");
311 obj.insert("enum".to_string(), Value::Array(combined.clone()));
312 if !obj.contains_key("type")
313 && let Some(inferred) = infer_type_from_enum(&combined)
314 {
315 obj.insert("type".to_string(), Value::String(inferred));
316 }
317 }
318 }
319 }
320}
321
322fn merge_union_schemas(
323 root: &Value,
324 variants: &[Value],
325 seen_refs: &mut std::collections::HashSet<String>,
326) -> Value {
327 let mut merged_props = serde_json::Map::new();
328 let mut required_intersection: Option<std::collections::BTreeSet<String>> = None;
329 let mut enum_values: Vec<Value> = Vec::new();
330 let mut type_candidates: Vec<String> = Vec::new();
331
332 for variant in variants {
333 let sanitized = sanitize_for_claude_inner(root, variant, seen_refs);
334
335 if let Some(schema_type) = sanitized.get("type").and_then(|v| v.as_str()) {
336 type_candidates.push(schema_type.to_string());
337 }
338
339 if let Some(props) = sanitized.get("properties").and_then(|v| v.as_object()) {
340 for (key, value) in props {
341 merge_property(&mut merged_props, key, value);
342 }
343 }
344
345 if let Some(req) = sanitized.get("required").and_then(|v| v.as_array()) {
346 let req_set: std::collections::BTreeSet<String> = req
347 .iter()
348 .filter_map(|item| item.as_str().map(|s| s.to_string()))
349 .collect();
350
351 required_intersection = match required_intersection.take() {
352 None => Some(req_set),
353 Some(existing) => Some(
354 existing
355 .intersection(&req_set)
356 .cloned()
357 .collect::<std::collections::BTreeSet<String>>(),
358 ),
359 };
360 }
361
362 if let Some(values) = sanitized.get("enum").and_then(|v| v.as_array()) {
363 for value in values {
364 if value.is_null() {
365 continue;
366 }
367 if !enum_values.contains(value) {
368 enum_values.push(value.clone());
369 }
370 }
371 }
372 }
373
374 let schema_type = if !merged_props.is_empty() {
375 "object".to_string()
376 } else if let Some(inferred) = infer_type_from_enum(&enum_values) {
377 inferred
378 } else if let Some(first) = type_candidates.first() {
379 first.clone()
380 } else {
381 "string".to_string()
382 };
383
384 let mut merged = serde_json::Map::new();
385 merged.insert("type".to_string(), Value::String(schema_type));
386
387 if !merged_props.is_empty() {
388 merged.insert("properties".to_string(), Value::Object(merged_props));
389 }
390
391 if let Some(required_set) = required_intersection
392 && !required_set.is_empty()
393 {
394 merged.insert(
395 "required".to_string(),
396 Value::Array(
397 required_set
398 .into_iter()
399 .map(Value::String)
400 .collect::<Vec<_>>(),
401 ),
402 );
403 }
404
405 if !enum_values.is_empty() {
406 merged.insert("enum".to_string(), Value::Array(enum_values));
407 }
408
409 Value::Object(merged)
410}
411
412fn sanitize_for_claude(root: &Value, schema: &Value) -> Value {
413 let mut seen_refs = std::collections::HashSet::new();
414 sanitize_for_claude_inner(root, schema, &mut seen_refs)
415}
416
417fn fallback_schema() -> Value {
418 let mut out = serde_json::Map::new();
419 out.insert("type".to_string(), Value::String("object".to_string()));
420 out.insert(
421 "properties".to_string(),
422 Value::Object(serde_json::Map::new()),
423 );
424 Value::Object(out)
425}
426
427fn sanitize_for_claude_inner(
428 root: &Value,
429 schema: &Value,
430 seen_refs: &mut std::collections::HashSet<String>,
431) -> Value {
432 if let Some(reference) = schema.get("$ref").and_then(|v| v.as_str()) {
433 if !seen_refs.insert(reference.to_string()) {
434 return fallback_schema();
435 }
436 if let Some(resolved) = resolve_ref(root, reference) {
437 let sanitized = sanitize_for_claude_inner(root, resolved, seen_refs);
438 seen_refs.remove(reference);
439 return sanitized;
440 }
441 seen_refs.remove(reference);
442 }
443
444 let Some(obj) = schema.as_object() else {
445 return schema.clone();
446 };
447
448 if let Some(union) = obj
449 .get("oneOf")
450 .or_else(|| obj.get("anyOf"))
451 .or_else(|| obj.get("allOf"))
452 .and_then(|v| v.as_array())
453 {
454 return merge_union_schemas(root, union, seen_refs);
455 }
456
457 let mut out = serde_json::Map::new();
458 for (key, value) in obj {
459 match key.as_str() {
460 "$ref"
461 | "$defs"
462 | "oneOf"
463 | "anyOf"
464 | "allOf"
465 | "const"
466 | "additionalProperties"
467 | "default"
468 | "examples"
469 | "title"
470 | "pattern"
471 | "minLength"
472 | "maxLength"
473 | "minimum"
474 | "maximum"
475 | "minItems"
476 | "maxItems"
477 | "uniqueItems"
478 | "deprecated" => {}
479 "type" => {
480 out.insert("type".to_string(), normalize_type(value));
481 }
482 "properties" => {
483 if let Some(props) = value.as_object() {
484 let mut sanitized_props = serde_json::Map::new();
485 for (prop_key, prop_value) in props {
486 sanitized_props.insert(
487 prop_key.clone(),
488 sanitize_for_claude_inner(root, prop_value, seen_refs),
489 );
490 }
491 out.insert("properties".to_string(), Value::Object(sanitized_props));
492 }
493 }
494 "items" => {
495 if let Some(items) = value.as_array() {
496 let merged = merge_union_schemas(root, items, seen_refs);
497 out.insert("items".to_string(), merged);
498 } else {
499 out.insert(
500 "items".to_string(),
501 sanitize_for_claude_inner(root, value, seen_refs),
502 );
503 }
504 }
505 "enum" => {
506 let values = value
507 .as_array()
508 .map(|items| {
509 items
510 .iter()
511 .filter(|v| !v.is_null())
512 .cloned()
513 .collect::<Vec<_>>()
514 })
515 .unwrap_or_default();
516 out.insert("enum".to_string(), Value::Array(values));
517 }
518 _ => {
519 out.insert(
520 key.clone(),
521 sanitize_for_claude_inner(root, value, seen_refs),
522 );
523 }
524 }
525 }
526
527 if let Some(const_value) = obj.get("const")
528 && !const_value.is_null()
529 {
530 out.insert("enum".to_string(), Value::Array(vec![const_value.clone()]));
531 if !out.contains_key("type")
532 && let Some(inferred) = infer_type_from_enum(std::slice::from_ref(const_value))
533 {
534 out.insert("type".to_string(), Value::String(inferred));
535 }
536 }
537
538 if out.get("type") == Some(&Value::String("object".to_string()))
539 && !out.contains_key("properties")
540 {
541 out.insert(
542 "properties".to_string(),
543 Value::Object(serde_json::Map::new()),
544 );
545 }
546
547 if !out.contains_key("type") {
548 if out.contains_key("properties") {
549 out.insert("type".to_string(), Value::String("object".to_string()));
550 } else if out.contains_key("items") {
551 out.insert("type".to_string(), Value::String("array".to_string()));
552 } else if let Some(enum_values) = out.get("enum").and_then(|v| v.as_array())
553 && let Some(inferred) = infer_type_from_enum(enum_values)
554 {
555 out.insert("type".to_string(), Value::String(inferred));
556 }
557 }
558
559 Value::Object(out)
560}
561
562fn default_cache_type() -> String {
563 "ephemeral".to_string()
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::api::error::SseParseError;
570 use crate::api::sse::{SseEvent, SseStream};
571 use futures_util::{StreamExt, stream};
572 use serde_json::json;
573
574 #[test]
575 fn sanitize_handles_recursive_ref() {
576 let schema = json!({
577 "$defs": {
578 "node": {
579 "type": "object",
580 "properties": {
581 "next": { "$ref": "#/$defs/node" }
582 }
583 }
584 },
585 "$ref": "#/$defs/node"
586 });
587
588 let sanitized = sanitize_for_claude(&schema, &schema);
589 let next = sanitized
590 .get("properties")
591 .and_then(|v| v.get("next"))
592 .and_then(|v| v.get("type"))
593 .and_then(|v| v.as_str());
594
595 assert_eq!(next, Some("object"));
596 }
597
598 #[test]
599 fn sanitize_collapses_tuple_items() {
600 let schema = json!({
601 "type": "array",
602 "items": [
603 { "type": "string" },
604 { "type": "number" }
605 ]
606 });
607
608 let sanitized = sanitize_for_claude(&schema, &schema);
609 let items = sanitized.get("items");
610
611 assert!(matches!(items, Some(Value::Object(_))));
612 }
613
614 #[test]
615 fn sanitize_removes_unsupported_keywords() {
616 let schema = json!({
617 "type": "object",
618 "title": "ignored",
619 "additionalProperties": false,
620 "properties": {
621 "name": {
622 "type": "string",
623 "pattern": "^[a-z]+$",
624 "default": "x"
625 }
626 }
627 });
628
629 let sanitized = sanitize_for_claude(&schema, &schema);
630 let expected = json!({
631 "type": "object",
632 "properties": {
633 "name": { "type": "string" }
634 }
635 });
636
637 assert_eq!(sanitized, expected);
638 }
639
640 #[test]
641 fn sanitize_converts_const_to_enum_with_type() {
642 let schema = json!({
643 "const": "fixed"
644 });
645
646 let sanitized = sanitize_for_claude(&schema, &schema);
647 let expected = json!({
648 "enum": ["fixed"],
649 "type": "string"
650 });
651
652 assert_eq!(sanitized, expected);
653 }
654
655 #[test]
656 fn sanitize_filters_null_enum_values() {
657 let schema = json!({
658 "enum": ["a", null, "b"]
659 });
660
661 let sanitized = sanitize_for_claude(&schema, &schema);
662 let expected = json!({
663 "enum": ["a", "b"],
664 "type": "string"
665 });
666
667 assert_eq!(sanitized, expected);
668 }
669
670 #[test]
671 fn sanitize_decodes_json_pointer_refs() {
672 let schema = json!({
673 "$defs": {
674 "a/b": { "type": "string" }
675 },
676 "$ref": "#/$defs/a~1b"
677 });
678
679 let sanitized = sanitize_for_claude(&schema, &schema);
680 let expected = json!({
681 "type": "string"
682 });
683
684 assert_eq!(sanitized, expected);
685 }
686
687 #[test]
688 fn sanitize_merges_union_properties_and_required() {
689 let schema = json!({
690 "oneOf": [
691 {
692 "type": "object",
693 "properties": {
694 "a": { "type": "string" },
695 "b": { "type": "string" }
696 },
697 "required": ["a"]
698 },
699 {
700 "type": "object",
701 "properties": {
702 "a": { "type": "string" },
703 "c": { "type": "string" }
704 },
705 "required": ["a", "c"]
706 }
707 ]
708 });
709
710 let sanitized = sanitize_for_claude(&schema, &schema);
711 let expected = json!({
712 "type": "object",
713 "properties": {
714 "a": { "type": "string" },
715 "b": { "type": "string" },
716 "c": { "type": "string" }
717 },
718 "required": ["a"]
719 });
720
721 assert_eq!(sanitized, expected);
722 }
723
724 #[test]
725 fn sanitize_infers_array_type_from_items() {
726 let schema = json!({
727 "items": {
728 "type": "string"
729 }
730 });
731
732 let sanitized = sanitize_for_claude(&schema, &schema);
733 let expected = json!({
734 "type": "array",
735 "items": { "type": "string" }
736 });
737
738 assert_eq!(sanitized, expected);
739 }
740
741 #[test]
742 fn user_image_data_url_to_claude_block() {
743 let image = crate::app::conversation::ImageContent {
744 mime_type: "image/png".to_string(),
745 source: crate::app::conversation::ImageSource::DataUrl {
746 data_url: "data:image/png;base64,Zm9v".to_string(),
747 },
748 width: None,
749 height: None,
750 bytes: None,
751 sha256: None,
752 };
753
754 let block = user_image_to_claude_block(&image).expect("image should convert");
755 let json = serde_json::to_value(block).expect("serialize block");
756 assert_eq!(json["type"], "image");
757 assert_eq!(json["source"]["type"], "base64");
758 assert_eq!(json["source"]["media_type"], "image/png");
759 assert_eq!(json["source"]["data"], "Zm9v");
760 }
761
762 #[test]
763 fn user_image_session_file_source_is_unsupported() {
764 let image = crate::app::conversation::ImageContent {
765 mime_type: "image/png".to_string(),
766 source: crate::app::conversation::ImageSource::SessionFile {
767 relative_path: "session-1/image.png".to_string(),
768 },
769 width: None,
770 height: None,
771 bytes: None,
772 sha256: None,
773 };
774
775 let err = user_image_to_claude_block(&image).expect_err("expected unsupported feature");
776 match err {
777 ApiError::UnsupportedFeature {
778 provider,
779 feature,
780 details,
781 } => {
782 assert_eq!(provider, "anthropic");
783 assert_eq!(feature, "image input source");
784 assert!(details.contains("session file"));
785 }
786 other => panic!("Expected UnsupportedFeature, got {other:?}"),
787 }
788 }
789
790 #[test]
791 fn map_claude_usage_computes_total_when_missing() {
792 let usage = ClaudeUsage {
793 input: 11,
794 output: 7,
795 total: None,
796 cache_creation_input: None,
797 cache_read_input: None,
798 };
799
800 assert_eq!(map_claude_usage(&usage), TokenUsage::new(11, 7, 18));
801 }
802
803 #[test]
804 fn convert_claude_completion_includes_usage() {
805 let claude_completion = ClaudeCompletionResponse {
806 id: "msg_1".to_string(),
807 content: vec![ClaudeContentBlock::Text {
808 text: "hi".to_string(),
809 cache_control: None,
810 extra: Default::default(),
811 }],
812 model: "claude-test".to_string(),
813 role: "assistant".to_string(),
814 stop_reason: Some("end_turn".to_string()),
815 stop_sequence: None,
816 usage: ClaudeUsage {
817 input: 12,
818 output: 4,
819 total: Some(22),
820 cache_creation_input: None,
821 cache_read_input: None,
822 },
823 extra: Default::default(),
824 };
825
826 let response = convert_claude_completion(claude_completion);
827
828 assert_eq!(response.usage, Some(TokenUsage::new(12, 4, 22)));
829 assert!(matches!(
830 response.content.first(),
831 Some(AssistantContent::Text { text }) if text == "hi"
832 ));
833 }
834
835 #[tokio::test]
836 async fn convert_claude_stream_attaches_latest_usage_on_message_stop() {
837 let events = vec![
838 Ok::<SseEvent, SseParseError>(SseEvent {
839 event_type: None,
840 data: r#"{"type":"content_block_start","index":0,"content_block":{"type":"text","text":"Hello"}}"#.to_string(),
841 id: None,
842 }),
843 Ok(SseEvent {
844 event_type: None,
845 data: r#"{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}"#.to_string(),
846 id: None,
847 }),
848 Ok(SseEvent {
849 event_type: None,
850 data: r#"{"type":"content_block_stop","index":0}"#.to_string(),
851 id: None,
852 }),
853 Ok(SseEvent {
854 event_type: None,
855 data: r#"{"type":"message_delta","delta":{"stop_reason":null},"usage":{"input_tokens":9,"output_tokens":3}}"#.to_string(),
856 id: None,
857 }),
858 Ok(SseEvent {
859 event_type: None,
860 data: r#"{"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":10,"output_tokens":5}}"#.to_string(),
861 id: None,
862 }),
863 Ok(SseEvent {
864 event_type: None,
865 data: r#"{"type":"message_stop"}"#.to_string(),
866 id: None,
867 }),
868 ];
869
870 let sse_stream: SseStream = Box::pin(stream::iter(events));
871 let token = CancellationToken::new();
872 let mut stream = std::pin::pin!(convert_claude_stream(sse_stream, token));
873
874 let mut full_text = String::new();
875 let mut completion = None;
876
877 while let Some(chunk) = stream.next().await {
878 match chunk {
879 StreamChunk::TextDelta(delta) => full_text.push_str(&delta),
880 StreamChunk::ContentBlockStop { .. } => {}
881 StreamChunk::MessageComplete(response) => {
882 completion = Some(response);
883 break;
884 }
885 unexpected => panic!("unexpected chunk: {unexpected:?}"),
886 }
887 }
888
889 assert_eq!(full_text, "Hello world");
890
891 let response = completion.expect("expected final message completion chunk");
892 assert_eq!(response.usage, Some(TokenUsage::new(10, 5, 15)));
893 assert!(matches!(
894 response.content.first(),
895 Some(AssistantContent::Text { text }) if text == "Hello world"
896 ));
897 }
898}
899
900#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
901pub struct CacheControl {
902 #[serde(rename = "type", default = "default_cache_type")]
903 cache_type: String,
904}
905
906#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
907#[serde(tag = "type")]
908pub enum ClaudeContentBlock {
909 #[serde(rename = "text")]
910 Text {
911 text: String,
912 #[serde(skip_serializing_if = "Option::is_none")]
913 cache_control: Option<CacheControl>,
914 #[serde(flatten)]
915 extra: std::collections::HashMap<String, serde_json::Value>,
916 },
917 #[serde(rename = "image")]
918 Image {
919 source: ClaudeImageSource,
920 #[serde(skip_serializing_if = "Option::is_none")]
921 cache_control: Option<CacheControl>,
922 #[serde(flatten)]
923 extra: std::collections::HashMap<String, serde_json::Value>,
924 },
925 #[serde(rename = "tool_use")]
926 ToolUse {
927 id: String,
928 name: String,
929 input: serde_json::Value,
930 #[serde(skip_serializing_if = "Option::is_none")]
931 cache_control: Option<CacheControl>,
932 #[serde(flatten)]
933 extra: std::collections::HashMap<String, serde_json::Value>,
934 },
935 #[serde(rename = "tool_result")]
936 ToolResult {
937 tool_use_id: String,
938 content: Vec<ClaudeContentBlock>,
939 #[serde(skip_serializing_if = "Option::is_none")]
940 cache_control: Option<CacheControl>,
941 #[serde(skip_serializing_if = "Option::is_none")]
942 is_error: Option<bool>,
943 #[serde(flatten)]
944 extra: std::collections::HashMap<String, serde_json::Value>,
945 },
946 #[serde(rename = "thinking")]
947 Thinking {
948 thinking: String,
949 signature: String,
950 #[serde(skip_serializing_if = "Option::is_none")]
951 cache_control: Option<CacheControl>,
952 #[serde(flatten)]
953 extra: std::collections::HashMap<String, serde_json::Value>,
954 },
955 #[serde(rename = "redacted_thinking")]
956 RedactedThinking {
957 data: String,
958 #[serde(skip_serializing_if = "Option::is_none")]
959 cache_control: Option<CacheControl>,
960 #[serde(flatten)]
961 extra: std::collections::HashMap<String, serde_json::Value>,
962 },
963 #[serde(other)]
964 Unknown,
965}
966
967#[derive(Debug, Serialize, Deserialize, Default, Clone)]
968struct ClaudeUsage {
969 #[serde(rename = "input_tokens")]
970 input: usize,
971 #[serde(rename = "output_tokens")]
972 output: usize,
973 #[serde(rename = "total_tokens")]
974 #[serde(skip_serializing_if = "Option::is_none")]
975 total: Option<usize>,
976 #[serde(rename = "cache_creation_input_tokens")]
977 #[serde(skip_serializing_if = "Option::is_none")]
978 cache_creation_input: Option<usize>,
979 #[serde(rename = "cache_read_input_tokens")]
980 #[serde(skip_serializing_if = "Option::is_none")]
981 cache_read_input: Option<usize>,
982}
983
984#[derive(Debug, Deserialize)]
985#[serde(tag = "type")]
986enum ClaudeStreamEvent {
987 #[serde(rename = "message_start")]
988 MessageStart {
989 #[expect(dead_code)]
990 message: ClaudeMessageStart,
991 },
992 #[serde(rename = "content_block_start")]
993 ContentBlockStart {
994 index: usize,
995 content_block: ClaudeContentBlockStart,
996 },
997 #[serde(rename = "content_block_delta")]
998 ContentBlockDelta { index: usize, delta: ClaudeDelta },
999 #[serde(rename = "content_block_stop")]
1000 ContentBlockStop { index: usize },
1001 #[serde(rename = "message_delta")]
1002 MessageDelta {
1003 #[expect(dead_code)]
1004 delta: ClaudeMessageDeltaData,
1005 #[serde(default)]
1006 usage: Option<ClaudeUsage>,
1007 },
1008 #[serde(rename = "message_stop")]
1009 MessageStop,
1010 #[serde(rename = "ping")]
1011 Ping,
1012 #[serde(rename = "error")]
1013 Error { error: ClaudeStreamError },
1014}
1015
1016#[derive(Debug, Deserialize)]
1017struct ClaudeMessageStart {
1018 #[expect(dead_code)]
1019 #[serde(default)]
1020 id: String,
1021 #[expect(dead_code)]
1022 #[serde(default)]
1023 model: String,
1024}
1025
1026#[derive(Debug, Deserialize)]
1027struct ClaudeContentBlockStart {
1028 #[serde(rename = "type")]
1029 block_type: String,
1030 #[serde(default)]
1031 id: Option<String>,
1032 #[serde(default)]
1033 name: Option<String>,
1034 #[serde(default)]
1035 text: Option<String>,
1036}
1037
1038#[derive(Debug, Deserialize)]
1039#[serde(tag = "type")]
1040enum ClaudeDelta {
1041 #[serde(rename = "text_delta")]
1042 Text { text: String },
1043 #[serde(rename = "thinking_delta")]
1044 Thinking { thinking: String },
1045 #[serde(rename = "input_json_delta")]
1046 InputJson { partial_json: String },
1047 #[serde(rename = "signature_delta")]
1048 Signature { signature: String },
1049}
1050
1051#[derive(Debug, Deserialize)]
1052struct ClaudeMessageDeltaData {
1053 #[expect(dead_code)]
1054 #[serde(default)]
1055 stop_reason: Option<String>,
1056}
1057
1058#[derive(Debug, Deserialize)]
1059struct ClaudeStreamError {
1060 #[serde(default)]
1061 message: String,
1062 #[serde(rename = "type", default)]
1063 error_type: String,
1064}
1065
1066impl AnthropicClient {
1067 pub fn new(api_key: &str) -> Result<Self, ApiError> {
1068 Self::with_api_key(api_key)
1069 }
1070
1071 pub fn with_api_key(api_key: &str) -> Result<Self, ApiError> {
1072 Ok(Self {
1073 http_client: Self::build_http_client()?,
1074 auth: AuthMode::ApiKey(api_key.to_string()),
1075 })
1076 }
1077
1078 pub fn with_directive(directive: AnthropicAuth) -> Result<Self, ApiError> {
1079 Ok(Self {
1080 http_client: Self::build_http_client()?,
1081 auth: AuthMode::Directive(directive),
1082 })
1083 }
1084
1085 fn build_http_client() -> Result<reqwest::Client, ApiError> {
1086 let mut headers = header::HeaderMap::new();
1087 headers.insert(
1088 "anthropic-version",
1089 header::HeaderValue::from_static("2023-06-01"),
1090 );
1091 headers.insert(
1092 header::CONTENT_TYPE,
1093 header::HeaderValue::from_static("application/json"),
1094 );
1095
1096 reqwest::Client::builder()
1097 .default_headers(headers)
1098 .build()
1099 .map_err(ApiError::Network)
1100 }
1101
1102 async fn auth_headers(
1103 &self,
1104 ctx: AuthHeaderContext,
1105 ) -> Result<Vec<(String, String)>, ApiError> {
1106 match &self.auth {
1107 AuthMode::ApiKey(key) => Ok(vec![("x-api-key".to_string(), key.clone())]),
1108 AuthMode::Directive(directive) => {
1109 let header_pairs = directive
1110 .headers
1111 .headers(ctx)
1112 .await
1113 .map_err(|e| ApiError::AuthError(e.to_string()))?;
1114 Ok(header_pairs
1115 .into_iter()
1116 .map(|pair| (pair.name, pair.value))
1117 .collect())
1118 }
1119 }
1120 }
1121
1122 async fn on_auth_error(
1123 &self,
1124 status: u16,
1125 body: &str,
1126 request_kind: RequestKind,
1127 ) -> Result<AuthErrorAction, ApiError> {
1128 let AuthMode::Directive(directive) = &self.auth else {
1129 return Ok(AuthErrorAction::NoAction);
1130 };
1131 let context = AuthErrorContext {
1132 status: Some(status),
1133 body_snippet: Some(truncate_body(body)),
1134 request_kind,
1135 };
1136 directive
1137 .headers
1138 .on_auth_error(context)
1139 .await
1140 .map_err(|e| ApiError::AuthError(e.to_string()))
1141 }
1142
1143 fn request_url(&self) -> Result<String, ApiError> {
1144 let AuthMode::Directive(directive) = &self.auth else {
1145 return Ok(API_URL.to_string());
1146 };
1147
1148 let Some(query_params) = &directive.query_params else {
1149 return Ok(API_URL.to_string());
1150 };
1151
1152 if query_params.is_empty() {
1153 return Ok(API_URL.to_string());
1154 }
1155
1156 let mut url = url::Url::parse(API_URL)
1157 .map_err(|e| ApiError::Configuration(format!("Invalid API_URL '{API_URL}': {e}")))?;
1158 for param in query_params {
1159 url.query_pairs_mut().append_pair(¶m.name, ¶m.value);
1160 }
1161 Ok(url.to_string())
1162 }
1163}
1164
1165fn convert_messages(messages: Vec<AppMessage>) -> Result<Vec<ClaudeMessage>, ApiError> {
1167 let claude_messages: Result<Vec<ClaudeMessage>, ApiError> =
1168 messages.into_iter().map(convert_single_message).collect();
1169
1170 claude_messages.map(|messages| {
1172 messages
1173 .into_iter()
1174 .filter(|msg| {
1175 match &msg.content {
1176 ClaudeMessageContent::Text { content } => !content.trim().is_empty(),
1177 ClaudeMessageContent::StructuredContent { .. } => true, }
1179 })
1180 .collect()
1181 })
1182}
1183
1184fn user_image_to_claude_block(
1185 image: &crate::app::conversation::ImageContent,
1186) -> Result<ClaudeContentBlock, ApiError> {
1187 match &image.source {
1188 ImageSource::DataUrl { data_url } => {
1189 let Some((meta, payload)) = data_url.split_once(',') else {
1190 return Err(ApiError::InvalidRequest {
1191 provider: "anthropic".to_string(),
1192 details: "Image data URL is missing ',' separator".to_string(),
1193 });
1194 };
1195
1196 let Some(meta_body) = meta.strip_prefix("data:") else {
1197 return Err(ApiError::InvalidRequest {
1198 provider: "anthropic".to_string(),
1199 details: "Image data URL must start with 'data:'".to_string(),
1200 });
1201 };
1202
1203 if !meta_body
1204 .split(';')
1205 .any(|segment| segment.eq_ignore_ascii_case("base64"))
1206 {
1207 return Err(ApiError::InvalidRequest {
1208 provider: "anthropic".to_string(),
1209 details: "Anthropic image data URLs must be base64 encoded".to_string(),
1210 });
1211 }
1212
1213 let media_type = meta_body
1214 .split(';')
1215 .next()
1216 .filter(|value| !value.trim().is_empty())
1217 .unwrap_or("application/octet-stream")
1218 .to_string();
1219
1220 if !media_type.starts_with("image/") {
1221 return Err(ApiError::UnsupportedFeature {
1222 provider: "anthropic".to_string(),
1223 feature: "image input mime type".to_string(),
1224 details: format!("Unsupported image media type '{}'", media_type),
1225 });
1226 }
1227
1228 Ok(ClaudeContentBlock::Image {
1229 source: ClaudeImageSource {
1230 source_type: "base64".to_string(),
1231 media_type,
1232 data: payload.to_string(),
1233 },
1234 cache_control: None,
1235 extra: Default::default(),
1236 })
1237 }
1238 ImageSource::Url { url } => Err(ApiError::UnsupportedFeature {
1239 provider: "anthropic".to_string(),
1240 feature: "image input source".to_string(),
1241 details: format!(
1242 "Anthropic image input currently requires data URLs in this adapter; got URL '{}'",
1243 url
1244 ),
1245 }),
1246 ImageSource::SessionFile { relative_path } => Err(ApiError::UnsupportedFeature {
1247 provider: "anthropic".to_string(),
1248 feature: "image input source".to_string(),
1249 details: format!(
1250 "Anthropic adapter cannot access session file '{}' directly; use data URLs",
1251 relative_path
1252 ),
1253 }),
1254 }
1255}
1256
1257fn convert_single_message(msg: AppMessage) -> Result<ClaudeMessage, ApiError> {
1258 match &msg.data {
1259 crate::app::conversation::MessageData::User { content, .. } => {
1260 let mut claude_blocks = Vec::new();
1262 for user_content in content {
1263 match user_content {
1264 UserContent::Text { text } => {
1265 if !text.trim().is_empty() {
1266 claude_blocks.push(ClaudeContentBlock::Text {
1267 text: text.clone(),
1268 cache_control: None,
1269 extra: Default::default(),
1270 });
1271 }
1272 }
1273 UserContent::Image { image } => {
1274 claude_blocks.push(user_image_to_claude_block(image)?);
1275 }
1276 UserContent::CommandExecution {
1277 command,
1278 stdout,
1279 stderr,
1280 exit_code,
1281 } => {
1282 let text = UserContent::format_command_execution_as_xml(
1283 command, stdout, stderr, *exit_code,
1284 );
1285 if !text.trim().is_empty() {
1286 claude_blocks.push(ClaudeContentBlock::Text {
1287 text,
1288 cache_control: None,
1289 extra: Default::default(),
1290 });
1291 }
1292 }
1293 }
1294 }
1295
1296 if claude_blocks.is_empty() {
1297 return Err(ApiError::InvalidRequest {
1298 provider: "anthropic".to_string(),
1299 details: format!(
1300 "User message ID {} resulted in no valid content blocks",
1301 msg.id
1302 ),
1303 });
1304 }
1305
1306 Ok(ClaudeMessage {
1307 role: ClaudeMessageRole::User,
1308 content: ClaudeMessageContent::StructuredContent {
1309 content: ClaudeStructuredContent(claude_blocks),
1310 },
1311 id: Some(msg.id.clone()),
1312 })
1313 }
1314
1315 crate::app::conversation::MessageData::Assistant { content, .. } => {
1316 let claude_blocks: Vec<ClaudeContentBlock> = content
1318 .iter()
1319 .filter_map(|assistant_content| match assistant_content {
1320 AssistantContent::Text { text } => {
1321 if text.trim().is_empty() {
1322 None
1323 } else {
1324 Some(ClaudeContentBlock::Text {
1325 text: text.clone(),
1326 cache_control: None,
1327 extra: Default::default(),
1328 })
1329 }
1330 }
1331 AssistantContent::Image { image } => match user_image_to_claude_block(image) {
1332 Ok(block) => Some(block),
1333 Err(err) => {
1334 debug!(
1335 target: "claude::convert_message",
1336 "Skipping unsupported assistant image block: {}",
1337 err
1338 );
1339 None
1340 }
1341 },
1342 AssistantContent::ToolCall { tool_call, .. } => {
1343 Some(ClaudeContentBlock::ToolUse {
1344 id: tool_call.id.clone(),
1345 name: tool_call.name.clone(),
1346 input: tool_call.parameters.clone(),
1347 cache_control: None,
1348 extra: Default::default(),
1349 })
1350 }
1351 AssistantContent::Thought { thought } => {
1352 match thought {
1353 ThoughtContent::Signed { text, signature } => {
1354 Some(ClaudeContentBlock::Thinking {
1355 thinking: text.clone(),
1356 signature: signature.clone(),
1357 cache_control: None,
1358 extra: Default::default(),
1359 })
1360 }
1361 ThoughtContent::Redacted { data } => {
1362 Some(ClaudeContentBlock::RedactedThinking {
1363 data: data.clone(),
1364 cache_control: None,
1365 extra: Default::default(),
1366 })
1367 }
1368 ThoughtContent::Simple { text } => {
1369 Some(ClaudeContentBlock::Text {
1371 text: format!("[Thought: {text}]"),
1372 cache_control: None,
1373 extra: Default::default(),
1374 })
1375 }
1376 }
1377 }
1378 })
1379 .collect();
1380
1381 if claude_blocks.is_empty() {
1382 debug!("No content blocks found: {:?}", content);
1383 Err(ApiError::InvalidRequest {
1384 provider: "anthropic".to_string(),
1385 details: format!(
1386 "Assistant message ID {} resulted in no valid content blocks",
1387 msg.id
1388 ),
1389 })
1390 } else {
1391 let claude_blocks = ensure_thinking_first(claude_blocks);
1392 let claude_content = if claude_blocks.len() == 1 {
1393 if let Some(ClaudeContentBlock::Text { text, .. }) = claude_blocks.first() {
1394 ClaudeMessageContent::Text {
1395 content: text.clone(),
1396 }
1397 } else {
1398 ClaudeMessageContent::StructuredContent {
1399 content: ClaudeStructuredContent(claude_blocks),
1400 }
1401 }
1402 } else {
1403 ClaudeMessageContent::StructuredContent {
1404 content: ClaudeStructuredContent(claude_blocks),
1405 }
1406 };
1407
1408 Ok(ClaudeMessage {
1409 role: ClaudeMessageRole::Assistant,
1410 content: claude_content,
1411 id: Some(msg.id.clone()),
1412 })
1413 }
1414 }
1415 crate::app::conversation::MessageData::Tool {
1416 tool_use_id,
1417 result,
1418 ..
1419 } => {
1420 let (result_text, is_error) = if let ToolResult::Error(e) = result {
1423 (e.to_string(), Some(true))
1424 } else {
1425 let text = result.llm_format();
1427 let text = if text.trim().is_empty() {
1428 "(No output)".to_string()
1429 } else {
1430 text
1431 };
1432 (text, None)
1433 };
1434
1435 let claude_blocks = vec![ClaudeContentBlock::ToolResult {
1436 tool_use_id: tool_use_id.clone(),
1437 content: vec![ClaudeContentBlock::Text {
1438 text: result_text,
1439 cache_control: None,
1440 extra: Default::default(),
1441 }],
1442 is_error,
1443 cache_control: None,
1444 extra: Default::default(),
1445 }];
1446
1447 Ok(ClaudeMessage {
1448 role: ClaudeMessageRole::User, content: ClaudeMessageContent::StructuredContent {
1450 content: ClaudeStructuredContent(claude_blocks),
1451 },
1452 id: Some(msg.id.clone()),
1453 })
1454 }
1455 }
1456}
1457fn ensure_thinking_first(blocks: Vec<ClaudeContentBlock>) -> Vec<ClaudeContentBlock> {
1460 let mut thinking_blocks = Vec::new();
1461 let mut other_blocks = Vec::new();
1462
1463 for block in blocks {
1464 match block {
1465 ClaudeContentBlock::Thinking { .. } | ClaudeContentBlock::RedactedThinking { .. } => {
1466 thinking_blocks.push(block);
1467 }
1468 _ => other_blocks.push(block),
1469 }
1470 }
1471
1472 if thinking_blocks.is_empty() {
1473 other_blocks
1474 } else {
1475 thinking_blocks.extend(other_blocks);
1476 thinking_blocks
1477 }
1478}
1479
1480fn convert_claude_content(claude_blocks: Vec<ClaudeContentBlock>) -> Vec<AssistantContent> {
1482 claude_blocks
1483 .into_iter()
1484 .filter_map(|block| match block {
1485 ClaudeContentBlock::Text { text, .. } => Some(AssistantContent::Text { text }),
1486 ClaudeContentBlock::Image { source, .. } => {
1487 let media_type = source.media_type;
1488 let data = source.data;
1489 Some(AssistantContent::Image {
1490 image: crate::app::conversation::ImageContent {
1491 mime_type: media_type.clone(),
1492 source: crate::app::conversation::ImageSource::DataUrl {
1493 data_url: format!("data:{};base64,{}", media_type, data),
1494 },
1495 width: None,
1496 height: None,
1497 bytes: None,
1498 sha256: None,
1499 },
1500 })
1501 }
1502
1503 ClaudeContentBlock::ToolUse {
1504 id, name, input, ..
1505 } => Some(AssistantContent::ToolCall {
1506 tool_call: steer_tools::ToolCall {
1507 id,
1508 name,
1509 parameters: input,
1510 },
1511 thought_signature: None,
1512 }),
1513 ClaudeContentBlock::ToolResult { .. } => {
1514 warn!("Unexpected ToolResult block received in Claude response content");
1515 None
1516 }
1517 ClaudeContentBlock::Thinking {
1518 thinking,
1519 signature,
1520 ..
1521 } => Some(AssistantContent::Thought {
1522 thought: ThoughtContent::Signed {
1523 text: thinking,
1524 signature,
1525 },
1526 }),
1527 ClaudeContentBlock::RedactedThinking { data, .. } => Some(AssistantContent::Thought {
1528 thought: ThoughtContent::Redacted { data },
1529 }),
1530 ClaudeContentBlock::Unknown => {
1531 warn!("Unknown content block received in Claude response content");
1532 None
1533 }
1534 })
1535 .collect()
1536}
1537
1538fn saturating_u32(value: usize) -> u32 {
1539 u32::try_from(value).unwrap_or(u32::MAX)
1540}
1541
1542fn map_claude_usage(usage: &ClaudeUsage) -> TokenUsage {
1543 let total = usage
1544 .total
1545 .unwrap_or_else(|| usage.input.saturating_add(usage.output));
1546 TokenUsage::new(
1547 saturating_u32(usage.input),
1548 saturating_u32(usage.output),
1549 saturating_u32(total),
1550 )
1551}
1552
1553fn convert_claude_completion(claude_completion: ClaudeCompletionResponse) -> CompletionResponse {
1554 CompletionResponse {
1555 content: convert_claude_content(claude_completion.content),
1556 usage: Some(map_claude_usage(&claude_completion.usage)),
1557 }
1558}
1559
1560#[async_trait]
1561impl Provider for AnthropicClient {
1562 fn name(&self) -> &'static str {
1563 "anthropic"
1564 }
1565
1566 async fn complete(
1567 &self,
1568 model_id: &ModelId,
1569 messages: Vec<AppMessage>,
1570 system: Option<SystemContext>,
1571 tools: Option<Vec<ToolSchema>>,
1572 call_options: Option<ModelParameters>,
1573 token: CancellationToken,
1574 ) -> Result<CompletionResponse, ApiError> {
1575 let mut claude_messages = convert_messages(messages)?;
1576 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1577
1578 if claude_messages.is_empty() {
1579 return Err(ApiError::InvalidRequest {
1580 provider: self.name().to_string(),
1581 details: "No messages provided".to_string(),
1582 });
1583 }
1584
1585 let last_message = claude_messages
1586 .last_mut()
1587 .ok_or_else(|| ApiError::InvalidRequest {
1588 provider: self.name().to_string(),
1589 details: "No messages provided".to_string(),
1590 })?;
1591 let cache_setting = Some(CacheControl {
1592 cache_type: "ephemeral".to_string(),
1593 });
1594
1595 let instruction_policy = match &self.auth {
1596 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1597 AuthMode::ApiKey(_) => None,
1598 };
1599 let system_text = apply_instruction_policy(system, instruction_policy);
1600 let system_content = build_system_content(system_text, cache_setting.clone());
1601
1602 match &mut last_message.content {
1603 ClaudeMessageContent::StructuredContent { content } => {
1604 for block in &mut content.0 {
1605 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1606 cache_control.clone_from(&cache_setting);
1607 }
1608 }
1609 }
1610 ClaudeMessageContent::Text { content } => {
1611 let text_content = content.clone();
1612 last_message.content = ClaudeMessageContent::StructuredContent {
1613 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1614 text: text_content,
1615 cache_control: cache_setting,
1616 extra: Default::default(),
1617 }]),
1618 };
1619 }
1620 }
1621
1622 let supports_thinking = call_options
1624 .as_ref()
1625 .and_then(|opts| opts.thinking_config.as_ref())
1626 .is_some_and(|tc| tc.enabled);
1627
1628 let request = if supports_thinking {
1629 let budget = call_options
1631 .as_ref()
1632 .and_then(|o| o.thinking_config)
1633 .and_then(|tc| tc.budget_tokens)
1634 .unwrap_or(4000);
1635 let thinking = Some(Thinking {
1636 thinking_type: ThinkingType::Enabled,
1637 budget_tokens: budget,
1638 });
1639 CompletionRequest {
1640 model: model_id.id.clone(), messages: claude_messages,
1642 max_tokens: call_options
1643 .as_ref()
1644 .and_then(|o| o.max_output_tokens)
1645 .map_or(32_000, |v| v as usize),
1646 system: system_content.clone(),
1647 tools,
1648 temperature: call_options
1649 .as_ref()
1650 .and_then(|o| o.temperature)
1651 .or(Some(1.0)),
1652 top_p: call_options.as_ref().and_then(|o| o.top_p),
1653 top_k: None,
1654 stream: None,
1655 thinking,
1656 }
1657 } else {
1658 CompletionRequest {
1659 model: model_id.id.clone(), messages: claude_messages,
1661 max_tokens: call_options
1662 .as_ref()
1663 .and_then(|o| o.max_output_tokens)
1664 .map_or(8000, |v| v as usize),
1665 system: system_content,
1666 tools,
1667 temperature: call_options
1668 .as_ref()
1669 .and_then(|o| o.temperature)
1670 .or(Some(0.7)),
1671 top_p: call_options.as_ref().and_then(|o| o.top_p),
1672 top_k: None,
1673 stream: None,
1674 thinking: None,
1675 }
1676 };
1677
1678 let auth_ctx = auth_header_context(model_id, RequestKind::Complete);
1679 let mut attempts = 0;
1680
1681 loop {
1682 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1683 let url = self.request_url()?;
1684 let mut request_builder = self.http_client.post(&url).json(&request);
1685
1686 for (name, value) in auth_headers {
1687 request_builder = request_builder.header(&name, &value);
1688 }
1689
1690 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1691 request_builder =
1692 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1693 }
1694
1695 let response = tokio::select! {
1696 biased;
1697 () = token.cancelled() => {
1698 debug!(target: "claude::complete", "Cancellation token triggered before sending request.");
1699 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1700 }
1701 res = request_builder.send() => {
1702 res?
1703 }
1704 };
1705
1706 if token.is_cancelled() {
1707 debug!(target: "claude::complete", "Cancellation token triggered after sending request, before status check.");
1708 return Err(ApiError::Cancelled {
1709 provider: self.name().to_string(),
1710 });
1711 }
1712
1713 let status = response.status();
1714 if !status.is_success() {
1715 let error_text = tokio::select! {
1716 biased;
1717 () = token.cancelled() => {
1718 debug!(target: "claude::complete", "Cancellation token triggered while reading error response body.");
1719 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1720 }
1721 text_res = response.text() => {
1722 text_res?
1723 }
1724 };
1725
1726 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1727 let action = self
1728 .on_auth_error(status.as_u16(), &error_text, RequestKind::Complete)
1729 .await?;
1730 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1731 attempts += 1;
1732 continue;
1733 }
1734 return Err(ApiError::AuthenticationFailed {
1735 provider: self.name().to_string(),
1736 details: error_text,
1737 });
1738 }
1739
1740 return Err(map_http_status_to_api_error(
1741 self.name(),
1742 status.as_u16(),
1743 error_text,
1744 ));
1745 }
1746
1747 let response_text = tokio::select! {
1748 biased;
1749 () = token.cancelled() => {
1750 debug!(target: "claude::complete", "Cancellation token triggered while reading successful response body.");
1751 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1752 }
1753 text_res = response.text() => {
1754 text_res?
1755 }
1756 };
1757
1758 let claude_completion: ClaudeCompletionResponse = serde_json::from_str(&response_text)
1759 .map_err(|e| ApiError::ResponseParsingError {
1760 provider: self.name().to_string(),
1761 details: format!("Error: {e}, Body: {response_text}"),
1762 })?;
1763 let completion = convert_claude_completion(claude_completion);
1764
1765 return Ok(completion);
1766 }
1767 }
1768
1769 async fn stream_complete(
1770 &self,
1771 model_id: &ModelId,
1772 messages: Vec<AppMessage>,
1773 system: Option<SystemContext>,
1774 tools: Option<Vec<ToolSchema>>,
1775 call_options: Option<ModelParameters>,
1776 token: CancellationToken,
1777 ) -> Result<CompletionStream, ApiError> {
1778 let mut claude_messages = convert_messages(messages)?;
1779 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1780
1781 if claude_messages.is_empty() {
1782 return Err(ApiError::InvalidRequest {
1783 provider: self.name().to_string(),
1784 details: "No messages provided".to_string(),
1785 });
1786 }
1787
1788 let last_message = claude_messages
1789 .last_mut()
1790 .ok_or_else(|| ApiError::InvalidRequest {
1791 provider: self.name().to_string(),
1792 details: "No messages provided".to_string(),
1793 })?;
1794 let cache_setting = Some(CacheControl {
1795 cache_type: "ephemeral".to_string(),
1796 });
1797
1798 let instruction_policy = match &self.auth {
1799 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1800 AuthMode::ApiKey(_) => None,
1801 };
1802 let system_text = apply_instruction_policy(system, instruction_policy);
1803 let system_content = build_system_content(system_text, cache_setting.clone());
1804
1805 match &mut last_message.content {
1806 ClaudeMessageContent::StructuredContent { content } => {
1807 for block in &mut content.0 {
1808 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1809 cache_control.clone_from(&cache_setting);
1810 }
1811 }
1812 }
1813 ClaudeMessageContent::Text { content } => {
1814 let text_content = content.clone();
1815 last_message.content = ClaudeMessageContent::StructuredContent {
1816 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1817 text: text_content,
1818 cache_control: cache_setting,
1819 extra: Default::default(),
1820 }]),
1821 };
1822 }
1823 }
1824
1825 let supports_thinking = call_options
1826 .as_ref()
1827 .and_then(|opts| opts.thinking_config.as_ref())
1828 .is_some_and(|tc| tc.enabled);
1829
1830 let request = if supports_thinking {
1831 let budget = call_options
1832 .as_ref()
1833 .and_then(|o| o.thinking_config)
1834 .and_then(|tc| tc.budget_tokens)
1835 .unwrap_or(4000);
1836 let thinking = Some(Thinking {
1837 thinking_type: ThinkingType::Enabled,
1838 budget_tokens: budget,
1839 });
1840 CompletionRequest {
1841 model: model_id.id.clone(),
1842 messages: claude_messages,
1843 max_tokens: call_options
1844 .as_ref()
1845 .and_then(|o| o.max_output_tokens)
1846 .map_or(32_000, |v| v as usize),
1847 system: system_content.clone(),
1848 tools,
1849 temperature: call_options
1850 .as_ref()
1851 .and_then(|o| o.temperature)
1852 .or(Some(1.0)),
1853 top_p: call_options.as_ref().and_then(|o| o.top_p),
1854 top_k: None,
1855 stream: Some(true),
1856 thinking,
1857 }
1858 } else {
1859 CompletionRequest {
1860 model: model_id.id.clone(),
1861 messages: claude_messages,
1862 max_tokens: call_options
1863 .as_ref()
1864 .and_then(|o| o.max_output_tokens)
1865 .map_or(8000, |v| v as usize),
1866 system: system_content,
1867 tools,
1868 temperature: call_options
1869 .as_ref()
1870 .and_then(|o| o.temperature)
1871 .or(Some(0.7)),
1872 top_p: call_options.as_ref().and_then(|o| o.top_p),
1873 top_k: None,
1874 stream: Some(true),
1875 thinking: None,
1876 }
1877 };
1878
1879 let auth_ctx = auth_header_context(model_id, RequestKind::Stream);
1880 let mut attempts = 0;
1881
1882 loop {
1883 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1884 let url = self.request_url()?;
1885 let mut request_builder = self.http_client.post(&url).json(&request);
1886
1887 for (name, value) in auth_headers {
1888 request_builder = request_builder.header(&name, &value);
1889 }
1890
1891 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1892 request_builder =
1893 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1894 }
1895
1896 let response = tokio::select! {
1897 biased;
1898 () = token.cancelled() => {
1899 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1900 }
1901 res = request_builder.send() => {
1902 res?
1903 }
1904 };
1905
1906 let status = response.status();
1907 if !status.is_success() {
1908 let error_text = tokio::select! {
1909 biased;
1910 () = token.cancelled() => {
1911 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1912 }
1913 text_res = response.text() => {
1914 text_res?
1915 }
1916 };
1917
1918 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1919 let action = self
1920 .on_auth_error(status.as_u16(), &error_text, RequestKind::Stream)
1921 .await?;
1922 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1923 attempts += 1;
1924 continue;
1925 }
1926 return Err(ApiError::AuthenticationFailed {
1927 provider: self.name().to_string(),
1928 details: error_text,
1929 });
1930 }
1931
1932 return Err(map_http_status_to_api_error(
1933 self.name(),
1934 status.as_u16(),
1935 error_text,
1936 ));
1937 }
1938
1939 let byte_stream = response.bytes_stream();
1940 let sse_stream = parse_sse_stream(byte_stream);
1941
1942 let stream = convert_claude_stream(sse_stream, token);
1943
1944 return Ok(Box::pin(stream));
1945 }
1946 }
1947}
1948
1949fn auth_header_context(model_id: &ModelId, request_kind: RequestKind) -> AuthHeaderContext {
1950 AuthHeaderContext {
1951 model_id: Some(AuthModelId {
1952 provider_id: AuthProviderId(model_id.provider.as_str().to_string()),
1953 model_id: model_id.id.clone(),
1954 }),
1955 request_kind,
1956 }
1957}
1958
1959fn is_auth_status(status: reqwest::StatusCode) -> bool {
1960 matches!(
1961 status,
1962 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
1963 )
1964}
1965
1966fn truncate_body(body: &str) -> String {
1967 const LIMIT: usize = 512;
1968 let mut chars = body.chars();
1969 let truncated: String = chars.by_ref().take(LIMIT).collect();
1970 if chars.next().is_some() {
1971 format!("{truncated}...")
1972 } else {
1973 truncated
1974 }
1975}
1976
1977fn apply_instruction_policy(
1978 system: Option<SystemContext>,
1979 policy: Option<&InstructionPolicy>,
1980) -> Option<String> {
1981 let base = system.as_ref().and_then(|context| {
1982 let trimmed = context.prompt.trim();
1983 if trimmed.is_empty() {
1984 None
1985 } else {
1986 Some(trimmed.to_string())
1987 }
1988 });
1989
1990 let context = system
1991 .as_ref()
1992 .and_then(|context| context.render_with_prompt(base.clone()));
1993
1994 match policy {
1995 None => context,
1996 Some(InstructionPolicy::Prefix(prefix)) => {
1997 if let Some(context) = context {
1998 Some(format!("{prefix}\n{context}"))
1999 } else {
2000 Some(prefix.clone())
2001 }
2002 }
2003 Some(InstructionPolicy::DefaultIfEmpty(default)) => {
2004 if context.is_some() {
2005 context
2006 } else {
2007 Some(default.clone())
2008 }
2009 }
2010 Some(InstructionPolicy::Override(override_text)) => {
2011 let mut combined = override_text.clone();
2012 if let Some(system) = system.as_ref() {
2013 let overlay = system.prompt.trim();
2014 if !overlay.is_empty() {
2015 combined.push_str("\n\n## Operating Mode\n");
2016 combined.push_str(overlay);
2017 }
2018
2019 let env = system
2020 .environment
2021 .as_ref()
2022 .map(|env| env.as_context())
2023 .map(|value| value.trim().to_string())
2024 .filter(|value| !value.is_empty());
2025 if let Some(env) = env {
2026 combined.push_str("\n\n");
2027 combined.push_str(&env);
2028 }
2029 }
2030 Some(combined)
2031 }
2032 }
2033}
2034
2035fn build_system_content(
2036 system: Option<String>,
2037 cache_setting: Option<CacheControl>,
2038) -> Option<System> {
2039 system.map(|text| {
2040 System::Content(vec![SystemContentBlock {
2041 content_type: "text".to_string(),
2042 text,
2043 cache_control: cache_setting,
2044 }])
2045 })
2046}
2047
2048#[derive(Debug)]
2049enum BlockState {
2050 Text {
2051 text: String,
2052 },
2053 Thinking {
2054 text: String,
2055 signature: Option<String>,
2056 },
2057 ToolUse {
2058 id: String,
2059 name: String,
2060 input: String,
2061 },
2062 Unknown,
2063}
2064
2065fn block_state_to_content(state: BlockState) -> Option<AssistantContent> {
2066 match state {
2067 BlockState::Text { text } => {
2068 if text.is_empty() {
2069 None
2070 } else {
2071 Some(AssistantContent::Text { text })
2072 }
2073 }
2074 BlockState::Thinking { text, signature } => {
2075 if text.is_empty() {
2076 None
2077 } else {
2078 let thought = if let Some(sig) = signature {
2079 ThoughtContent::Signed {
2080 text,
2081 signature: sig,
2082 }
2083 } else {
2084 ThoughtContent::Simple { text }
2085 };
2086 Some(AssistantContent::Thought { thought })
2087 }
2088 }
2089 BlockState::ToolUse { id, name, input } => {
2090 if id.is_empty() || name.is_empty() {
2091 None
2092 } else {
2093 let parameters: serde_json::Value = serde_json::from_str(&input)
2094 .unwrap_or(serde_json::Value::Object(Default::default()));
2095 Some(AssistantContent::ToolCall {
2096 tool_call: ToolCall {
2097 id,
2098 name,
2099 parameters,
2100 },
2101 thought_signature: None,
2102 })
2103 }
2104 }
2105 BlockState::Unknown => None,
2106 }
2107}
2108
2109fn convert_claude_stream(
2110 sse_stream: crate::api::sse::SseStream,
2111 token: CancellationToken,
2112) -> impl futures_core::Stream<Item = StreamChunk> + Send {
2113 async_stream::stream! {
2114 let mut block_states: std::collections::HashMap<usize, BlockState> =
2115 std::collections::HashMap::new();
2116 let mut completed_content: Vec<AssistantContent> = Vec::new();
2117 let mut latest_usage: Option<TokenUsage> = None;
2118
2119 tokio::pin!(sse_stream);
2120
2121 while let Some(event_result) = sse_stream.next().await {
2122 if token.is_cancelled() {
2123 yield StreamChunk::Error(StreamError::Cancelled);
2124 break;
2125 }
2126
2127 let event = match event_result {
2128 Ok(e) => e,
2129 Err(e) => {
2130 yield StreamChunk::Error(StreamError::SseParse(e));
2131 break;
2132 }
2133 };
2134
2135 let parsed: Result<ClaudeStreamEvent, _> = serde_json::from_str(&event.data);
2136 let stream_event = match parsed {
2137 Ok(e) => e,
2138 Err(_) => continue,
2139 };
2140
2141 match stream_event {
2142 ClaudeStreamEvent::ContentBlockStart { index, content_block } => {
2143 match content_block.block_type.as_str() {
2144 "text" => {
2145 let text = content_block.text.unwrap_or_default();
2146 if !text.is_empty() {
2147 yield StreamChunk::TextDelta(text.clone());
2148 }
2149 block_states.insert(index, BlockState::Text { text });
2150 }
2151 "thinking" => {
2152 block_states.insert(
2153 index,
2154 BlockState::Thinking {
2155 text: String::new(),
2156 signature: None,
2157 },
2158 );
2159 }
2160 "tool_use" => {
2161 let id = content_block.id.unwrap_or_default();
2162 let name = content_block.name.unwrap_or_default();
2163 if !id.is_empty() && !name.is_empty() {
2164 yield StreamChunk::ToolUseStart {
2165 id: id.clone(),
2166 name: name.clone(),
2167 };
2168 }
2169 block_states.insert(
2170 index,
2171 BlockState::ToolUse {
2172 id,
2173 name,
2174 input: String::new(),
2175 },
2176 );
2177 }
2178 _ => {
2179 block_states.insert(index, BlockState::Unknown);
2180 }
2181 }
2182 }
2183 ClaudeStreamEvent::ContentBlockDelta { index, delta } => match delta {
2184 ClaudeDelta::Text { text } => {
2185 match block_states.get_mut(&index) {
2186 Some(BlockState::Text { text: buf }) => buf.push_str(&text),
2187 _ => {
2188 block_states.insert(index, BlockState::Text { text: text.clone() });
2189 }
2190 }
2191 yield StreamChunk::TextDelta(text);
2192 }
2193 ClaudeDelta::Thinking { thinking } => {
2194 match block_states.get_mut(&index) {
2195 Some(BlockState::Thinking { text, .. }) => text.push_str(&thinking),
2196 _ => {
2197 block_states.insert(
2198 index,
2199 BlockState::Thinking {
2200 text: thinking.clone(),
2201 signature: None,
2202 },
2203 );
2204 }
2205 }
2206 yield StreamChunk::ThinkingDelta(thinking);
2207 }
2208 ClaudeDelta::Signature { signature } => {
2209 if let Some(BlockState::Thinking { signature: sig, .. }) =
2210 block_states.get_mut(&index)
2211 {
2212 *sig = Some(signature);
2213 }
2214 }
2215 ClaudeDelta::InputJson { partial_json } => {
2216 if let Some(BlockState::ToolUse { id, input, .. }) =
2217 block_states.get_mut(&index)
2218 {
2219 input.push_str(&partial_json);
2220 if !id.is_empty() {
2221 yield StreamChunk::ToolUseInputDelta {
2222 id: id.clone(),
2223 delta: partial_json,
2224 };
2225 }
2226 }
2227 }
2228 },
2229 ClaudeStreamEvent::ContentBlockStop { index } => {
2230 if let Some(state) = block_states.remove(&index)
2231 && let Some(content) = block_state_to_content(state)
2232 {
2233 completed_content.push(content);
2234 }
2235 yield StreamChunk::ContentBlockStop { index };
2236 }
2237 ClaudeStreamEvent::MessageStop => {
2238 if !block_states.is_empty() {
2239 tracing::warn!(
2240 target: "anthropic::stream",
2241 "MessageStop received with {} unfinished content blocks",
2242 block_states.len()
2243 );
2244 }
2245 let content = std::mem::take(&mut completed_content);
2246 yield StreamChunk::MessageComplete(CompletionResponse {
2247 content,
2248 usage: latest_usage,
2249 });
2250 break;
2251 }
2252 ClaudeStreamEvent::Error { error } => {
2253 let raw_error_type = if error.error_type.is_empty() {
2254 None
2255 } else {
2256 Some(error.error_type)
2257 };
2258 let kind = raw_error_type.as_deref().map_or_else(
2259 || ProviderStreamErrorKind::StreamError,
2260 ProviderStreamErrorKind::from_provider_error_type,
2261 );
2262 yield StreamChunk::Error(StreamError::Provider {
2263 provider: "anthropic".into(),
2264 kind,
2265 raw_error_type,
2266 message: error.message,
2267 });
2268 }
2269 ClaudeStreamEvent::MessageDelta { usage, .. } => {
2270 if let Some(usage) = usage.as_ref() {
2271 latest_usage = Some(map_claude_usage(usage));
2272 }
2273 }
2274 ClaudeStreamEvent::MessageStart { .. } | ClaudeStreamEvent::Ping => {}
2275 }
2276 }
2277 }
2278}