1use async_trait::async_trait;
2use futures_util::StreamExt;
3use reqwest::{self, header};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use strum_macros::Display;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, warn};
9
10use crate::api::error::StreamError;
11use crate::api::provider::{CompletionStream, StreamChunk};
12use crate::api::sse::parse_sse_stream;
13use crate::api::{CompletionResponse, Provider, error::ApiError};
14use crate::app::SystemContext;
15use crate::app::conversation::{
16 AssistantContent, ImageSource, Message as AppMessage, ThoughtContent, ToolResult, UserContent,
17};
18use crate::auth::{
19 AnthropicAuth, AuthErrorAction, AuthErrorContext, AuthHeaderContext, InstructionPolicy,
20 RequestKind,
21};
22use crate::auth::{ModelId as AuthModelId, ProviderId as AuthProviderId};
23use crate::config::model::{ModelId, ModelParameters};
24use steer_tools::{InputSchema, ToolCall, ToolSchema};
25
26const API_URL: &str = "https://api.anthropic.com/v1/messages";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display)]
29pub enum ClaudeMessageRole {
30 #[serde(rename = "user")]
31 #[strum(serialize = "user")]
32 User,
33 #[serde(rename = "assistant")]
34 #[strum(serialize = "assistant")]
35 Assistant,
36 #[serde(rename = "tool")]
37 #[strum(serialize = "tool")]
38 Tool,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct ClaudeMessage {
44 pub role: ClaudeMessageRole,
45 #[serde(flatten)]
46 pub content: ClaudeMessageContent,
47 #[serde(skip_serializing)]
48 pub id: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(untagged)]
54pub enum ClaudeMessageContent {
55 Text { content: String },
57 StructuredContent { content: ClaudeStructuredContent },
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(transparent)]
64pub struct ClaudeStructuredContent(pub Vec<ClaudeContentBlock>);
65
66#[derive(Clone)]
67enum AuthMode {
68 ApiKey(String),
69 Directive(AnthropicAuth),
70}
71
72#[derive(Clone)]
73pub struct AnthropicClient {
74 http_client: reqwest::Client,
75 auth: AuthMode,
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
79#[serde(rename_all = "lowercase")]
80#[derive(Default)]
81enum ThinkingType {
82 #[default]
83 Enabled,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
87struct Thinking {
88 #[serde(rename = "type", default)]
89 thinking_type: ThinkingType,
90 budget_tokens: u32,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
94pub struct ClaudeImageSource {
95 #[serde(rename = "type")]
96 source_type: String,
97 media_type: String,
98 data: String,
99}
100
101#[derive(Debug, Serialize, Clone)]
102struct SystemContentBlock {
103 #[serde(rename = "type")]
104 content_type: String,
105 text: String,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 cache_control: Option<CacheControl>,
108}
109
110#[derive(Debug, Serialize, Clone)]
111#[serde(untagged)]
112enum System {
113 Content(Vec<SystemContentBlock>),
115}
116
117#[derive(Debug, Serialize)]
118struct CompletionRequest {
119 model: String,
120 messages: Vec<ClaudeMessage>,
121 max_tokens: usize,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 system: Option<System>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 tools: Option<Vec<ClaudeTool>>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 temperature: Option<f32>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 top_p: Option<f32>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 top_k: Option<usize>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 stream: Option<bool>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 thinking: Option<Thinking>,
136}
137
138#[derive(Debug, Serialize, Clone)]
139struct ClaudeTool {
140 name: String,
141 description: String,
142 input_schema: InputSchema,
143}
144
145impl From<ToolSchema> for ClaudeTool {
146 fn from(tool: ToolSchema) -> Self {
147 let tool = adapt_tool_schema_for_claude(tool);
148 Self {
149 name: tool.name,
150 description: tool.description,
151 input_schema: tool.input_schema,
152 }
153 }
154}
155
156#[derive(Debug, Serialize, Deserialize, Clone)]
157struct ClaudeCompletionResponse {
158 id: String,
159 content: Vec<ClaudeContentBlock>,
160 model: String,
161 role: String,
162 #[serde(default)]
163 stop_reason: Option<String>,
164 #[serde(default)]
165 stop_sequence: Option<String>,
166 #[serde(default)]
167 usage: ClaudeUsage,
168 #[serde(flatten)]
170 extra: std::collections::HashMap<String, serde_json::Value>,
171}
172
173fn adapt_tool_schema_for_claude(tool: ToolSchema) -> ToolSchema {
174 let root_schema = tool.input_schema.as_value();
175 let sanitized = sanitize_for_claude(root_schema, root_schema);
176 ToolSchema {
177 input_schema: InputSchema::new(sanitized),
178 ..tool
179 }
180}
181
182fn decode_pointer_segment(segment: &str) -> std::borrow::Cow<'_, str> {
183 if !segment.contains('~') {
184 return std::borrow::Cow::Borrowed(segment);
185 }
186 std::borrow::Cow::Owned(segment.replace("~1", "/").replace("~0", "~"))
187}
188
189fn resolve_ref<'a>(root: &'a Value, reference: &str) -> Option<&'a Value> {
190 let path = reference.strip_prefix("#/")?;
191 let mut current = root;
192 for segment in path.split('/') {
193 let decoded = decode_pointer_segment(segment);
194 current = current.get(decoded.as_ref())?;
195 }
196 Some(current)
197}
198
199fn infer_type_from_enum(values: &[Value]) -> Option<String> {
200 let mut has_string = false;
201 let mut has_number = false;
202 let mut has_bool = false;
203 let mut has_object = false;
204 let mut has_array = false;
205
206 for value in values {
207 match value {
208 Value::String(_) => has_string = true,
209 Value::Number(_) => has_number = true,
210 Value::Bool(_) => has_bool = true,
211 Value::Object(_) => has_object = true,
212 Value::Array(_) => has_array = true,
213 Value::Null => {}
214 }
215 }
216
217 let kind_count = u8::from(has_string)
218 + u8::from(has_number)
219 + u8::from(has_bool)
220 + u8::from(has_object)
221 + u8::from(has_array);
222
223 if kind_count != 1 {
224 return None;
225 }
226
227 if has_string {
228 Some("string".to_string())
229 } else if has_number {
230 Some("number".to_string())
231 } else if has_bool {
232 Some("boolean".to_string())
233 } else if has_object {
234 Some("object".to_string())
235 } else if has_array {
236 Some("array".to_string())
237 } else {
238 None
239 }
240}
241
242fn normalize_type(value: &Value) -> Value {
243 if let Some(type_str) = value.as_str() {
244 return Value::String(type_str.to_string());
245 }
246
247 if let Some(type_array) = value.as_array()
248 && let Some(primary_type) = type_array
249 .iter()
250 .find_map(|v| if v.is_null() { None } else { v.as_str() })
251 {
252 return Value::String(primary_type.to_string());
253 }
254
255 Value::String("string".to_string())
256}
257
258fn extract_enum_values(value: &Value) -> Vec<Value> {
259 let Some(obj) = value.as_object() else {
260 return Vec::new();
261 };
262
263 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
264 return enum_values
265 .iter()
266 .filter(|v| !v.is_null())
267 .cloned()
268 .collect();
269 }
270
271 if let Some(const_value) = obj.get("const") {
272 if const_value.is_null() {
273 return Vec::new();
274 }
275 return vec![const_value.clone()];
276 }
277
278 Vec::new()
279}
280
281fn merge_property(properties: &mut serde_json::Map<String, Value>, key: &str, value: &Value) {
282 match properties.get_mut(key) {
283 None => {
284 properties.insert(key.to_string(), value.clone());
285 }
286 Some(existing) => {
287 if existing == value {
288 return;
289 }
290
291 let existing_values = extract_enum_values(existing);
292 let incoming_values = extract_enum_values(value);
293 if incoming_values.is_empty() && existing_values.is_empty() {
294 return;
295 }
296
297 let mut combined = existing_values;
298 for item in incoming_values {
299 if !combined.contains(&item) {
300 combined.push(item);
301 }
302 }
303
304 if combined.is_empty() {
305 return;
306 }
307
308 if let Some(obj) = existing.as_object_mut() {
309 obj.remove("const");
310 obj.insert("enum".to_string(), Value::Array(combined.clone()));
311 if !obj.contains_key("type")
312 && let Some(inferred) = infer_type_from_enum(&combined)
313 {
314 obj.insert("type".to_string(), Value::String(inferred));
315 }
316 }
317 }
318 }
319}
320
321fn merge_union_schemas(
322 root: &Value,
323 variants: &[Value],
324 seen_refs: &mut std::collections::HashSet<String>,
325) -> Value {
326 let mut merged_props = serde_json::Map::new();
327 let mut required_intersection: Option<std::collections::BTreeSet<String>> = None;
328 let mut enum_values: Vec<Value> = Vec::new();
329 let mut type_candidates: Vec<String> = Vec::new();
330
331 for variant in variants {
332 let sanitized = sanitize_for_claude_inner(root, variant, seen_refs);
333
334 if let Some(schema_type) = sanitized.get("type").and_then(|v| v.as_str()) {
335 type_candidates.push(schema_type.to_string());
336 }
337
338 if let Some(props) = sanitized.get("properties").and_then(|v| v.as_object()) {
339 for (key, value) in props {
340 merge_property(&mut merged_props, key, value);
341 }
342 }
343
344 if let Some(req) = sanitized.get("required").and_then(|v| v.as_array()) {
345 let req_set: std::collections::BTreeSet<String> = req
346 .iter()
347 .filter_map(|item| item.as_str().map(|s| s.to_string()))
348 .collect();
349
350 required_intersection = match required_intersection.take() {
351 None => Some(req_set),
352 Some(existing) => Some(
353 existing
354 .intersection(&req_set)
355 .cloned()
356 .collect::<std::collections::BTreeSet<String>>(),
357 ),
358 };
359 }
360
361 if let Some(values) = sanitized.get("enum").and_then(|v| v.as_array()) {
362 for value in values {
363 if value.is_null() {
364 continue;
365 }
366 if !enum_values.contains(value) {
367 enum_values.push(value.clone());
368 }
369 }
370 }
371 }
372
373 let schema_type = if !merged_props.is_empty() {
374 "object".to_string()
375 } else if let Some(inferred) = infer_type_from_enum(&enum_values) {
376 inferred
377 } else if let Some(first) = type_candidates.first() {
378 first.clone()
379 } else {
380 "string".to_string()
381 };
382
383 let mut merged = serde_json::Map::new();
384 merged.insert("type".to_string(), Value::String(schema_type));
385
386 if !merged_props.is_empty() {
387 merged.insert("properties".to_string(), Value::Object(merged_props));
388 }
389
390 if let Some(required_set) = required_intersection
391 && !required_set.is_empty()
392 {
393 merged.insert(
394 "required".to_string(),
395 Value::Array(
396 required_set
397 .into_iter()
398 .map(Value::String)
399 .collect::<Vec<_>>(),
400 ),
401 );
402 }
403
404 if !enum_values.is_empty() {
405 merged.insert("enum".to_string(), Value::Array(enum_values));
406 }
407
408 Value::Object(merged)
409}
410
411fn sanitize_for_claude(root: &Value, schema: &Value) -> Value {
412 let mut seen_refs = std::collections::HashSet::new();
413 sanitize_for_claude_inner(root, schema, &mut seen_refs)
414}
415
416fn fallback_schema() -> Value {
417 let mut out = serde_json::Map::new();
418 out.insert("type".to_string(), Value::String("object".to_string()));
419 out.insert(
420 "properties".to_string(),
421 Value::Object(serde_json::Map::new()),
422 );
423 Value::Object(out)
424}
425
426fn sanitize_for_claude_inner(
427 root: &Value,
428 schema: &Value,
429 seen_refs: &mut std::collections::HashSet<String>,
430) -> Value {
431 if let Some(reference) = schema.get("$ref").and_then(|v| v.as_str()) {
432 if !seen_refs.insert(reference.to_string()) {
433 return fallback_schema();
434 }
435 if let Some(resolved) = resolve_ref(root, reference) {
436 let sanitized = sanitize_for_claude_inner(root, resolved, seen_refs);
437 seen_refs.remove(reference);
438 return sanitized;
439 }
440 seen_refs.remove(reference);
441 }
442
443 let Some(obj) = schema.as_object() else {
444 return schema.clone();
445 };
446
447 if let Some(union) = obj
448 .get("oneOf")
449 .or_else(|| obj.get("anyOf"))
450 .or_else(|| obj.get("allOf"))
451 .and_then(|v| v.as_array())
452 {
453 return merge_union_schemas(root, union, seen_refs);
454 }
455
456 let mut out = serde_json::Map::new();
457 for (key, value) in obj {
458 match key.as_str() {
459 "$ref"
460 | "$defs"
461 | "oneOf"
462 | "anyOf"
463 | "allOf"
464 | "const"
465 | "additionalProperties"
466 | "default"
467 | "examples"
468 | "title"
469 | "pattern"
470 | "minLength"
471 | "maxLength"
472 | "minimum"
473 | "maximum"
474 | "minItems"
475 | "maxItems"
476 | "uniqueItems"
477 | "deprecated" => {}
478 "type" => {
479 out.insert("type".to_string(), normalize_type(value));
480 }
481 "properties" => {
482 if let Some(props) = value.as_object() {
483 let mut sanitized_props = serde_json::Map::new();
484 for (prop_key, prop_value) in props {
485 sanitized_props.insert(
486 prop_key.clone(),
487 sanitize_for_claude_inner(root, prop_value, seen_refs),
488 );
489 }
490 out.insert("properties".to_string(), Value::Object(sanitized_props));
491 }
492 }
493 "items" => {
494 if let Some(items) = value.as_array() {
495 let merged = merge_union_schemas(root, items, seen_refs);
496 out.insert("items".to_string(), merged);
497 } else {
498 out.insert(
499 "items".to_string(),
500 sanitize_for_claude_inner(root, value, seen_refs),
501 );
502 }
503 }
504 "enum" => {
505 let values = value
506 .as_array()
507 .map(|items| {
508 items
509 .iter()
510 .filter(|v| !v.is_null())
511 .cloned()
512 .collect::<Vec<_>>()
513 })
514 .unwrap_or_default();
515 out.insert("enum".to_string(), Value::Array(values));
516 }
517 _ => {
518 out.insert(
519 key.clone(),
520 sanitize_for_claude_inner(root, value, seen_refs),
521 );
522 }
523 }
524 }
525
526 if let Some(const_value) = obj.get("const")
527 && !const_value.is_null()
528 {
529 out.insert("enum".to_string(), Value::Array(vec![const_value.clone()]));
530 if !out.contains_key("type")
531 && let Some(inferred) = infer_type_from_enum(std::slice::from_ref(const_value))
532 {
533 out.insert("type".to_string(), Value::String(inferred));
534 }
535 }
536
537 if out.get("type") == Some(&Value::String("object".to_string()))
538 && !out.contains_key("properties")
539 {
540 out.insert(
541 "properties".to_string(),
542 Value::Object(serde_json::Map::new()),
543 );
544 }
545
546 if !out.contains_key("type") {
547 if out.contains_key("properties") {
548 out.insert("type".to_string(), Value::String("object".to_string()));
549 } else if out.contains_key("items") {
550 out.insert("type".to_string(), Value::String("array".to_string()));
551 } else if let Some(enum_values) = out.get("enum").and_then(|v| v.as_array())
552 && let Some(inferred) = infer_type_from_enum(enum_values)
553 {
554 out.insert("type".to_string(), Value::String(inferred));
555 }
556 }
557
558 Value::Object(out)
559}
560
561fn default_cache_type() -> String {
562 "ephemeral".to_string()
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use serde_json::json;
569
570 #[test]
571 fn sanitize_handles_recursive_ref() {
572 let schema = json!({
573 "$defs": {
574 "node": {
575 "type": "object",
576 "properties": {
577 "next": { "$ref": "#/$defs/node" }
578 }
579 }
580 },
581 "$ref": "#/$defs/node"
582 });
583
584 let sanitized = sanitize_for_claude(&schema, &schema);
585 let next = sanitized
586 .get("properties")
587 .and_then(|v| v.get("next"))
588 .and_then(|v| v.get("type"))
589 .and_then(|v| v.as_str());
590
591 assert_eq!(next, Some("object"));
592 }
593
594 #[test]
595 fn sanitize_collapses_tuple_items() {
596 let schema = json!({
597 "type": "array",
598 "items": [
599 { "type": "string" },
600 { "type": "number" }
601 ]
602 });
603
604 let sanitized = sanitize_for_claude(&schema, &schema);
605 let items = sanitized.get("items");
606
607 assert!(matches!(items, Some(Value::Object(_))));
608 }
609
610 #[test]
611 fn sanitize_removes_unsupported_keywords() {
612 let schema = json!({
613 "type": "object",
614 "title": "ignored",
615 "additionalProperties": false,
616 "properties": {
617 "name": {
618 "type": "string",
619 "pattern": "^[a-z]+$",
620 "default": "x"
621 }
622 }
623 });
624
625 let sanitized = sanitize_for_claude(&schema, &schema);
626 let expected = json!({
627 "type": "object",
628 "properties": {
629 "name": { "type": "string" }
630 }
631 });
632
633 assert_eq!(sanitized, expected);
634 }
635
636 #[test]
637 fn sanitize_converts_const_to_enum_with_type() {
638 let schema = json!({
639 "const": "fixed"
640 });
641
642 let sanitized = sanitize_for_claude(&schema, &schema);
643 let expected = json!({
644 "enum": ["fixed"],
645 "type": "string"
646 });
647
648 assert_eq!(sanitized, expected);
649 }
650
651 #[test]
652 fn sanitize_filters_null_enum_values() {
653 let schema = json!({
654 "enum": ["a", null, "b"]
655 });
656
657 let sanitized = sanitize_for_claude(&schema, &schema);
658 let expected = json!({
659 "enum": ["a", "b"],
660 "type": "string"
661 });
662
663 assert_eq!(sanitized, expected);
664 }
665
666 #[test]
667 fn sanitize_decodes_json_pointer_refs() {
668 let schema = json!({
669 "$defs": {
670 "a/b": { "type": "string" }
671 },
672 "$ref": "#/$defs/a~1b"
673 });
674
675 let sanitized = sanitize_for_claude(&schema, &schema);
676 let expected = json!({
677 "type": "string"
678 });
679
680 assert_eq!(sanitized, expected);
681 }
682
683 #[test]
684 fn sanitize_merges_union_properties_and_required() {
685 let schema = json!({
686 "oneOf": [
687 {
688 "type": "object",
689 "properties": {
690 "a": { "type": "string" },
691 "b": { "type": "string" }
692 },
693 "required": ["a"]
694 },
695 {
696 "type": "object",
697 "properties": {
698 "a": { "type": "string" },
699 "c": { "type": "string" }
700 },
701 "required": ["a", "c"]
702 }
703 ]
704 });
705
706 let sanitized = sanitize_for_claude(&schema, &schema);
707 let expected = json!({
708 "type": "object",
709 "properties": {
710 "a": { "type": "string" },
711 "b": { "type": "string" },
712 "c": { "type": "string" }
713 },
714 "required": ["a"]
715 });
716
717 assert_eq!(sanitized, expected);
718 }
719
720 #[test]
721 fn sanitize_infers_array_type_from_items() {
722 let schema = json!({
723 "items": {
724 "type": "string"
725 }
726 });
727
728 let sanitized = sanitize_for_claude(&schema, &schema);
729 let expected = json!({
730 "type": "array",
731 "items": { "type": "string" }
732 });
733
734 assert_eq!(sanitized, expected);
735 }
736
737 #[test]
738 fn user_image_data_url_to_claude_block() {
739 let image = crate::app::conversation::ImageContent {
740 mime_type: "image/png".to_string(),
741 source: crate::app::conversation::ImageSource::DataUrl {
742 data_url: "data:image/png;base64,Zm9v".to_string(),
743 },
744 width: None,
745 height: None,
746 bytes: None,
747 sha256: None,
748 };
749
750 let block = user_image_to_claude_block(&image).expect("image should convert");
751 let json = serde_json::to_value(block).expect("serialize block");
752 assert_eq!(json["type"], "image");
753 assert_eq!(json["source"]["type"], "base64");
754 assert_eq!(json["source"]["media_type"], "image/png");
755 assert_eq!(json["source"]["data"], "Zm9v");
756 }
757
758 #[test]
759 fn user_image_session_file_source_is_unsupported() {
760 let image = crate::app::conversation::ImageContent {
761 mime_type: "image/png".to_string(),
762 source: crate::app::conversation::ImageSource::SessionFile {
763 relative_path: "session-1/image.png".to_string(),
764 },
765 width: None,
766 height: None,
767 bytes: None,
768 sha256: None,
769 };
770
771 let err = user_image_to_claude_block(&image).expect_err("expected unsupported feature");
772 match err {
773 ApiError::UnsupportedFeature {
774 provider,
775 feature,
776 details,
777 } => {
778 assert_eq!(provider, "anthropic");
779 assert_eq!(feature, "image input source");
780 assert!(details.contains("session file"));
781 }
782 other => panic!("Expected UnsupportedFeature, got {other:?}"),
783 }
784 }
785}
786
787#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
788pub struct CacheControl {
789 #[serde(rename = "type", default = "default_cache_type")]
790 cache_type: String,
791}
792
793#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
794#[serde(tag = "type")]
795pub enum ClaudeContentBlock {
796 #[serde(rename = "text")]
797 Text {
798 text: String,
799 #[serde(skip_serializing_if = "Option::is_none")]
800 cache_control: Option<CacheControl>,
801 #[serde(flatten)]
802 extra: std::collections::HashMap<String, serde_json::Value>,
803 },
804 #[serde(rename = "image")]
805 Image {
806 source: ClaudeImageSource,
807 #[serde(skip_serializing_if = "Option::is_none")]
808 cache_control: Option<CacheControl>,
809 #[serde(flatten)]
810 extra: std::collections::HashMap<String, serde_json::Value>,
811 },
812 #[serde(rename = "tool_use")]
813 ToolUse {
814 id: String,
815 name: String,
816 input: serde_json::Value,
817 #[serde(skip_serializing_if = "Option::is_none")]
818 cache_control: Option<CacheControl>,
819 #[serde(flatten)]
820 extra: std::collections::HashMap<String, serde_json::Value>,
821 },
822 #[serde(rename = "tool_result")]
823 ToolResult {
824 tool_use_id: String,
825 content: Vec<ClaudeContentBlock>,
826 #[serde(skip_serializing_if = "Option::is_none")]
827 cache_control: Option<CacheControl>,
828 #[serde(skip_serializing_if = "Option::is_none")]
829 is_error: Option<bool>,
830 #[serde(flatten)]
831 extra: std::collections::HashMap<String, serde_json::Value>,
832 },
833 #[serde(rename = "thinking")]
834 Thinking {
835 thinking: String,
836 signature: String,
837 #[serde(skip_serializing_if = "Option::is_none")]
838 cache_control: Option<CacheControl>,
839 #[serde(flatten)]
840 extra: std::collections::HashMap<String, serde_json::Value>,
841 },
842 #[serde(rename = "redacted_thinking")]
843 RedactedThinking {
844 data: String,
845 #[serde(skip_serializing_if = "Option::is_none")]
846 cache_control: Option<CacheControl>,
847 #[serde(flatten)]
848 extra: std::collections::HashMap<String, serde_json::Value>,
849 },
850 #[serde(other)]
851 Unknown,
852}
853
854#[derive(Debug, Serialize, Deserialize, Default, Clone)]
855struct ClaudeUsage {
856 #[serde(rename = "input_tokens")]
857 input: usize,
858 #[serde(rename = "output_tokens")]
859 output: usize,
860 #[serde(rename = "cache_creation_input_tokens")]
861 #[serde(skip_serializing_if = "Option::is_none")]
862 cache_creation_input: Option<usize>,
863 #[serde(rename = "cache_read_input_tokens")]
864 #[serde(skip_serializing_if = "Option::is_none")]
865 cache_read_input: Option<usize>,
866}
867
868#[derive(Debug, Deserialize)]
869#[serde(tag = "type")]
870enum ClaudeStreamEvent {
871 #[serde(rename = "message_start")]
872 MessageStart {
873 #[expect(dead_code)]
874 message: ClaudeMessageStart,
875 },
876 #[serde(rename = "content_block_start")]
877 ContentBlockStart {
878 index: usize,
879 content_block: ClaudeContentBlockStart,
880 },
881 #[serde(rename = "content_block_delta")]
882 ContentBlockDelta { index: usize, delta: ClaudeDelta },
883 #[serde(rename = "content_block_stop")]
884 ContentBlockStop { index: usize },
885 #[serde(rename = "message_delta")]
886 MessageDelta {
887 #[expect(dead_code)]
888 delta: ClaudeMessageDeltaData,
889 #[expect(dead_code)]
890 #[serde(default)]
891 usage: Option<ClaudeUsage>,
892 },
893 #[serde(rename = "message_stop")]
894 MessageStop,
895 #[serde(rename = "ping")]
896 Ping,
897 #[serde(rename = "error")]
898 Error { error: ClaudeStreamError },
899}
900
901#[derive(Debug, Deserialize)]
902struct ClaudeMessageStart {
903 #[expect(dead_code)]
904 #[serde(default)]
905 id: String,
906 #[expect(dead_code)]
907 #[serde(default)]
908 model: String,
909}
910
911#[derive(Debug, Deserialize)]
912struct ClaudeContentBlockStart {
913 #[serde(rename = "type")]
914 block_type: String,
915 #[serde(default)]
916 id: Option<String>,
917 #[serde(default)]
918 name: Option<String>,
919 #[serde(default)]
920 text: Option<String>,
921}
922
923#[derive(Debug, Deserialize)]
924#[serde(tag = "type")]
925enum ClaudeDelta {
926 #[serde(rename = "text_delta")]
927 Text { text: String },
928 #[serde(rename = "thinking_delta")]
929 Thinking { thinking: String },
930 #[serde(rename = "input_json_delta")]
931 InputJson { partial_json: String },
932 #[serde(rename = "signature_delta")]
933 Signature { signature: String },
934}
935
936#[derive(Debug, Deserialize)]
937struct ClaudeMessageDeltaData {
938 #[expect(dead_code)]
939 #[serde(default)]
940 stop_reason: Option<String>,
941}
942
943#[derive(Debug, Deserialize)]
944struct ClaudeStreamError {
945 #[serde(default)]
946 message: String,
947 #[serde(rename = "type", default)]
948 error_type: String,
949}
950
951impl AnthropicClient {
952 pub fn new(api_key: &str) -> Result<Self, ApiError> {
953 Self::with_api_key(api_key)
954 }
955
956 pub fn with_api_key(api_key: &str) -> Result<Self, ApiError> {
957 Ok(Self {
958 http_client: Self::build_http_client()?,
959 auth: AuthMode::ApiKey(api_key.to_string()),
960 })
961 }
962
963 pub fn with_directive(directive: AnthropicAuth) -> Result<Self, ApiError> {
964 Ok(Self {
965 http_client: Self::build_http_client()?,
966 auth: AuthMode::Directive(directive),
967 })
968 }
969
970 fn build_http_client() -> Result<reqwest::Client, ApiError> {
971 let mut headers = header::HeaderMap::new();
972 headers.insert(
973 "anthropic-version",
974 header::HeaderValue::from_static("2023-06-01"),
975 );
976 headers.insert(
977 header::CONTENT_TYPE,
978 header::HeaderValue::from_static("application/json"),
979 );
980
981 reqwest::Client::builder()
982 .default_headers(headers)
983 .build()
984 .map_err(ApiError::Network)
985 }
986
987 async fn auth_headers(
988 &self,
989 ctx: AuthHeaderContext,
990 ) -> Result<Vec<(String, String)>, ApiError> {
991 match &self.auth {
992 AuthMode::ApiKey(key) => Ok(vec![("x-api-key".to_string(), key.clone())]),
993 AuthMode::Directive(directive) => {
994 let header_pairs = directive
995 .headers
996 .headers(ctx)
997 .await
998 .map_err(|e| ApiError::AuthError(e.to_string()))?;
999 Ok(header_pairs
1000 .into_iter()
1001 .map(|pair| (pair.name, pair.value))
1002 .collect())
1003 }
1004 }
1005 }
1006
1007 async fn on_auth_error(
1008 &self,
1009 status: u16,
1010 body: &str,
1011 request_kind: RequestKind,
1012 ) -> Result<AuthErrorAction, ApiError> {
1013 let AuthMode::Directive(directive) = &self.auth else {
1014 return Ok(AuthErrorAction::NoAction);
1015 };
1016 let context = AuthErrorContext {
1017 status: Some(status),
1018 body_snippet: Some(truncate_body(body)),
1019 request_kind,
1020 };
1021 directive
1022 .headers
1023 .on_auth_error(context)
1024 .await
1025 .map_err(|e| ApiError::AuthError(e.to_string()))
1026 }
1027
1028 fn request_url(&self) -> Result<String, ApiError> {
1029 let AuthMode::Directive(directive) = &self.auth else {
1030 return Ok(API_URL.to_string());
1031 };
1032
1033 let Some(query_params) = &directive.query_params else {
1034 return Ok(API_URL.to_string());
1035 };
1036
1037 if query_params.is_empty() {
1038 return Ok(API_URL.to_string());
1039 }
1040
1041 let mut url = url::Url::parse(API_URL)
1042 .map_err(|e| ApiError::Configuration(format!("Invalid API_URL '{API_URL}': {e}")))?;
1043 for param in query_params {
1044 url.query_pairs_mut().append_pair(¶m.name, ¶m.value);
1045 }
1046 Ok(url.to_string())
1047 }
1048}
1049
1050fn convert_messages(messages: Vec<AppMessage>) -> Result<Vec<ClaudeMessage>, ApiError> {
1052 let claude_messages: Result<Vec<ClaudeMessage>, ApiError> =
1053 messages.into_iter().map(convert_single_message).collect();
1054
1055 claude_messages.map(|messages| {
1057 messages
1058 .into_iter()
1059 .filter(|msg| {
1060 match &msg.content {
1061 ClaudeMessageContent::Text { content } => !content.trim().is_empty(),
1062 ClaudeMessageContent::StructuredContent { .. } => true, }
1064 })
1065 .collect()
1066 })
1067}
1068
1069fn user_image_to_claude_block(
1070 image: &crate::app::conversation::ImageContent,
1071) -> Result<ClaudeContentBlock, ApiError> {
1072 match &image.source {
1073 ImageSource::DataUrl { data_url } => {
1074 let Some((meta, payload)) = data_url.split_once(',') else {
1075 return Err(ApiError::InvalidRequest {
1076 provider: "anthropic".to_string(),
1077 details: "Image data URL is missing ',' separator".to_string(),
1078 });
1079 };
1080
1081 let Some(meta_body) = meta.strip_prefix("data:") else {
1082 return Err(ApiError::InvalidRequest {
1083 provider: "anthropic".to_string(),
1084 details: "Image data URL must start with 'data:'".to_string(),
1085 });
1086 };
1087
1088 if !meta_body
1089 .split(';')
1090 .any(|segment| segment.eq_ignore_ascii_case("base64"))
1091 {
1092 return Err(ApiError::InvalidRequest {
1093 provider: "anthropic".to_string(),
1094 details: "Anthropic image data URLs must be base64 encoded".to_string(),
1095 });
1096 }
1097
1098 let media_type = meta_body
1099 .split(';')
1100 .next()
1101 .filter(|value| !value.trim().is_empty())
1102 .unwrap_or("application/octet-stream")
1103 .to_string();
1104
1105 if !media_type.starts_with("image/") {
1106 return Err(ApiError::UnsupportedFeature {
1107 provider: "anthropic".to_string(),
1108 feature: "image input mime type".to_string(),
1109 details: format!("Unsupported image media type '{}'", media_type),
1110 });
1111 }
1112
1113 Ok(ClaudeContentBlock::Image {
1114 source: ClaudeImageSource {
1115 source_type: "base64".to_string(),
1116 media_type,
1117 data: payload.to_string(),
1118 },
1119 cache_control: None,
1120 extra: Default::default(),
1121 })
1122 }
1123 ImageSource::Url { url } => Err(ApiError::UnsupportedFeature {
1124 provider: "anthropic".to_string(),
1125 feature: "image input source".to_string(),
1126 details: format!(
1127 "Anthropic image input currently requires data URLs in this adapter; got URL '{}'",
1128 url
1129 ),
1130 }),
1131 ImageSource::SessionFile { relative_path } => Err(ApiError::UnsupportedFeature {
1132 provider: "anthropic".to_string(),
1133 feature: "image input source".to_string(),
1134 details: format!(
1135 "Anthropic adapter cannot access session file '{}' directly; use data URLs",
1136 relative_path
1137 ),
1138 }),
1139 }
1140}
1141
1142fn convert_single_message(msg: AppMessage) -> Result<ClaudeMessage, ApiError> {
1143 match &msg.data {
1144 crate::app::conversation::MessageData::User { content, .. } => {
1145 let mut claude_blocks = Vec::new();
1147 for user_content in content {
1148 match user_content {
1149 UserContent::Text { text } => {
1150 if !text.trim().is_empty() {
1151 claude_blocks.push(ClaudeContentBlock::Text {
1152 text: text.clone(),
1153 cache_control: None,
1154 extra: Default::default(),
1155 });
1156 }
1157 }
1158 UserContent::Image { image } => {
1159 claude_blocks.push(user_image_to_claude_block(image)?);
1160 }
1161 UserContent::CommandExecution {
1162 command,
1163 stdout,
1164 stderr,
1165 exit_code,
1166 } => {
1167 let text = UserContent::format_command_execution_as_xml(
1168 command, stdout, stderr, *exit_code,
1169 );
1170 if !text.trim().is_empty() {
1171 claude_blocks.push(ClaudeContentBlock::Text {
1172 text,
1173 cache_control: None,
1174 extra: Default::default(),
1175 });
1176 }
1177 }
1178 }
1179 }
1180
1181 if claude_blocks.is_empty() {
1182 return Err(ApiError::InvalidRequest {
1183 provider: "anthropic".to_string(),
1184 details: format!(
1185 "User message ID {} resulted in no valid content blocks",
1186 msg.id
1187 ),
1188 });
1189 }
1190
1191 Ok(ClaudeMessage {
1192 role: ClaudeMessageRole::User,
1193 content: ClaudeMessageContent::StructuredContent {
1194 content: ClaudeStructuredContent(claude_blocks),
1195 },
1196 id: Some(msg.id.clone()),
1197 })
1198 }
1199
1200 crate::app::conversation::MessageData::Assistant { content, .. } => {
1201 let claude_blocks: Vec<ClaudeContentBlock> = content
1203 .iter()
1204 .filter_map(|assistant_content| match assistant_content {
1205 AssistantContent::Text { text } => {
1206 if text.trim().is_empty() {
1207 None
1208 } else {
1209 Some(ClaudeContentBlock::Text {
1210 text: text.clone(),
1211 cache_control: None,
1212 extra: Default::default(),
1213 })
1214 }
1215 }
1216 AssistantContent::Image { image } => match user_image_to_claude_block(image) {
1217 Ok(block) => Some(block),
1218 Err(err) => {
1219 debug!(
1220 target: "claude::convert_message",
1221 "Skipping unsupported assistant image block: {}",
1222 err
1223 );
1224 None
1225 }
1226 },
1227 AssistantContent::ToolCall { tool_call, .. } => {
1228 Some(ClaudeContentBlock::ToolUse {
1229 id: tool_call.id.clone(),
1230 name: tool_call.name.clone(),
1231 input: tool_call.parameters.clone(),
1232 cache_control: None,
1233 extra: Default::default(),
1234 })
1235 }
1236 AssistantContent::Thought { thought } => {
1237 match thought {
1238 ThoughtContent::Signed { text, signature } => {
1239 Some(ClaudeContentBlock::Thinking {
1240 thinking: text.clone(),
1241 signature: signature.clone(),
1242 cache_control: None,
1243 extra: Default::default(),
1244 })
1245 }
1246 ThoughtContent::Redacted { data } => {
1247 Some(ClaudeContentBlock::RedactedThinking {
1248 data: data.clone(),
1249 cache_control: None,
1250 extra: Default::default(),
1251 })
1252 }
1253 ThoughtContent::Simple { text } => {
1254 Some(ClaudeContentBlock::Text {
1256 text: format!("[Thought: {text}]"),
1257 cache_control: None,
1258 extra: Default::default(),
1259 })
1260 }
1261 }
1262 }
1263 })
1264 .collect();
1265
1266 if claude_blocks.is_empty() {
1267 debug!("No content blocks found: {:?}", content);
1268 Err(ApiError::InvalidRequest {
1269 provider: "anthropic".to_string(),
1270 details: format!(
1271 "Assistant message ID {} resulted in no valid content blocks",
1272 msg.id
1273 ),
1274 })
1275 } else {
1276 let claude_blocks = ensure_thinking_first(claude_blocks);
1277 let claude_content = if claude_blocks.len() == 1 {
1278 if let Some(ClaudeContentBlock::Text { text, .. }) = claude_blocks.first() {
1279 ClaudeMessageContent::Text {
1280 content: text.clone(),
1281 }
1282 } else {
1283 ClaudeMessageContent::StructuredContent {
1284 content: ClaudeStructuredContent(claude_blocks),
1285 }
1286 }
1287 } else {
1288 ClaudeMessageContent::StructuredContent {
1289 content: ClaudeStructuredContent(claude_blocks),
1290 }
1291 };
1292
1293 Ok(ClaudeMessage {
1294 role: ClaudeMessageRole::Assistant,
1295 content: claude_content,
1296 id: Some(msg.id.clone()),
1297 })
1298 }
1299 }
1300 crate::app::conversation::MessageData::Tool {
1301 tool_use_id,
1302 result,
1303 ..
1304 } => {
1305 let (result_text, is_error) = if let ToolResult::Error(e) = result {
1308 (e.to_string(), Some(true))
1309 } else {
1310 let text = result.llm_format();
1312 let text = if text.trim().is_empty() {
1313 "(No output)".to_string()
1314 } else {
1315 text
1316 };
1317 (text, None)
1318 };
1319
1320 let claude_blocks = vec![ClaudeContentBlock::ToolResult {
1321 tool_use_id: tool_use_id.clone(),
1322 content: vec![ClaudeContentBlock::Text {
1323 text: result_text,
1324 cache_control: None,
1325 extra: Default::default(),
1326 }],
1327 is_error,
1328 cache_control: None,
1329 extra: Default::default(),
1330 }];
1331
1332 Ok(ClaudeMessage {
1333 role: ClaudeMessageRole::User, content: ClaudeMessageContent::StructuredContent {
1335 content: ClaudeStructuredContent(claude_blocks),
1336 },
1337 id: Some(msg.id.clone()),
1338 })
1339 }
1340 }
1341}
1342fn ensure_thinking_first(blocks: Vec<ClaudeContentBlock>) -> Vec<ClaudeContentBlock> {
1345 let mut thinking_blocks = Vec::new();
1346 let mut other_blocks = Vec::new();
1347
1348 for block in blocks {
1349 match block {
1350 ClaudeContentBlock::Thinking { .. } | ClaudeContentBlock::RedactedThinking { .. } => {
1351 thinking_blocks.push(block);
1352 }
1353 _ => other_blocks.push(block),
1354 }
1355 }
1356
1357 if thinking_blocks.is_empty() {
1358 other_blocks
1359 } else {
1360 thinking_blocks.extend(other_blocks);
1361 thinking_blocks
1362 }
1363}
1364
1365fn convert_claude_content(claude_blocks: Vec<ClaudeContentBlock>) -> Vec<AssistantContent> {
1367 claude_blocks
1368 .into_iter()
1369 .filter_map(|block| match block {
1370 ClaudeContentBlock::Text { text, .. } => Some(AssistantContent::Text { text }),
1371 ClaudeContentBlock::Image { source, .. } => {
1372 let media_type = source.media_type;
1373 let data = source.data;
1374 Some(AssistantContent::Image {
1375 image: crate::app::conversation::ImageContent {
1376 mime_type: media_type.clone(),
1377 source: crate::app::conversation::ImageSource::DataUrl {
1378 data_url: format!("data:{};base64,{}", media_type, data),
1379 },
1380 width: None,
1381 height: None,
1382 bytes: None,
1383 sha256: None,
1384 },
1385 })
1386 }
1387
1388 ClaudeContentBlock::ToolUse {
1389 id, name, input, ..
1390 } => Some(AssistantContent::ToolCall {
1391 tool_call: steer_tools::ToolCall {
1392 id,
1393 name,
1394 parameters: input,
1395 },
1396 thought_signature: None,
1397 }),
1398 ClaudeContentBlock::ToolResult { .. } => {
1399 warn!("Unexpected ToolResult block received in Claude response content");
1400 None
1401 }
1402 ClaudeContentBlock::Thinking {
1403 thinking,
1404 signature,
1405 ..
1406 } => Some(AssistantContent::Thought {
1407 thought: ThoughtContent::Signed {
1408 text: thinking,
1409 signature,
1410 },
1411 }),
1412 ClaudeContentBlock::RedactedThinking { data, .. } => Some(AssistantContent::Thought {
1413 thought: ThoughtContent::Redacted { data },
1414 }),
1415 ClaudeContentBlock::Unknown => {
1416 warn!("Unknown content block received in Claude response content");
1417 None
1418 }
1419 })
1420 .collect()
1421}
1422
1423#[async_trait]
1424impl Provider for AnthropicClient {
1425 fn name(&self) -> &'static str {
1426 "anthropic"
1427 }
1428
1429 async fn complete(
1430 &self,
1431 model_id: &ModelId,
1432 messages: Vec<AppMessage>,
1433 system: Option<SystemContext>,
1434 tools: Option<Vec<ToolSchema>>,
1435 call_options: Option<ModelParameters>,
1436 token: CancellationToken,
1437 ) -> Result<CompletionResponse, ApiError> {
1438 let mut claude_messages = convert_messages(messages)?;
1439 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1440
1441 if claude_messages.is_empty() {
1442 return Err(ApiError::InvalidRequest {
1443 provider: self.name().to_string(),
1444 details: "No messages provided".to_string(),
1445 });
1446 }
1447
1448 let last_message = claude_messages
1449 .last_mut()
1450 .ok_or_else(|| ApiError::InvalidRequest {
1451 provider: self.name().to_string(),
1452 details: "No messages provided".to_string(),
1453 })?;
1454 let cache_setting = Some(CacheControl {
1455 cache_type: "ephemeral".to_string(),
1456 });
1457
1458 let instruction_policy = match &self.auth {
1459 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1460 AuthMode::ApiKey(_) => None,
1461 };
1462 let system_text = apply_instruction_policy(system, instruction_policy);
1463 let system_content = build_system_content(system_text, cache_setting.clone());
1464
1465 match &mut last_message.content {
1466 ClaudeMessageContent::StructuredContent { content } => {
1467 for block in &mut content.0 {
1468 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1469 cache_control.clone_from(&cache_setting);
1470 }
1471 }
1472 }
1473 ClaudeMessageContent::Text { content } => {
1474 let text_content = content.clone();
1475 last_message.content = ClaudeMessageContent::StructuredContent {
1476 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1477 text: text_content,
1478 cache_control: cache_setting,
1479 extra: Default::default(),
1480 }]),
1481 };
1482 }
1483 }
1484
1485 let supports_thinking = call_options
1487 .as_ref()
1488 .and_then(|opts| opts.thinking_config.as_ref())
1489 .is_some_and(|tc| tc.enabled);
1490
1491 let request = if supports_thinking {
1492 let budget = call_options
1494 .as_ref()
1495 .and_then(|o| o.thinking_config)
1496 .and_then(|tc| tc.budget_tokens)
1497 .unwrap_or(4000);
1498 let thinking = Some(Thinking {
1499 thinking_type: ThinkingType::Enabled,
1500 budget_tokens: budget,
1501 });
1502 CompletionRequest {
1503 model: model_id.id.clone(), messages: claude_messages,
1505 max_tokens: call_options
1506 .as_ref()
1507 .and_then(|o| o.max_tokens)
1508 .map_or(32_000, |v| v as usize),
1509 system: system_content.clone(),
1510 tools,
1511 temperature: call_options
1512 .as_ref()
1513 .and_then(|o| o.temperature)
1514 .or(Some(1.0)),
1515 top_p: call_options.as_ref().and_then(|o| o.top_p),
1516 top_k: None,
1517 stream: None,
1518 thinking,
1519 }
1520 } else {
1521 CompletionRequest {
1522 model: model_id.id.clone(), messages: claude_messages,
1524 max_tokens: call_options
1525 .as_ref()
1526 .and_then(|o| o.max_tokens)
1527 .map_or(8000, |v| v as usize),
1528 system: system_content,
1529 tools,
1530 temperature: call_options
1531 .as_ref()
1532 .and_then(|o| o.temperature)
1533 .or(Some(0.7)),
1534 top_p: call_options.as_ref().and_then(|o| o.top_p),
1535 top_k: None,
1536 stream: None,
1537 thinking: None,
1538 }
1539 };
1540
1541 let auth_ctx = auth_header_context(model_id, RequestKind::Complete);
1542 let mut attempts = 0;
1543
1544 loop {
1545 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1546 let url = self.request_url()?;
1547 let mut request_builder = self.http_client.post(&url).json(&request);
1548
1549 for (name, value) in auth_headers {
1550 request_builder = request_builder.header(&name, &value);
1551 }
1552
1553 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1554 request_builder =
1555 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1556 }
1557
1558 let response = tokio::select! {
1559 biased;
1560 () = token.cancelled() => {
1561 debug!(target: "claude::complete", "Cancellation token triggered before sending request.");
1562 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1563 }
1564 res = request_builder.send() => {
1565 res?
1566 }
1567 };
1568
1569 if token.is_cancelled() {
1570 debug!(target: "claude::complete", "Cancellation token triggered after sending request, before status check.");
1571 return Err(ApiError::Cancelled {
1572 provider: self.name().to_string(),
1573 });
1574 }
1575
1576 let status = response.status();
1577 if !status.is_success() {
1578 let error_text = tokio::select! {
1579 biased;
1580 () = token.cancelled() => {
1581 debug!(target: "claude::complete", "Cancellation token triggered while reading error response body.");
1582 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1583 }
1584 text_res = response.text() => {
1585 text_res?
1586 }
1587 };
1588
1589 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1590 let action = self
1591 .on_auth_error(status.as_u16(), &error_text, RequestKind::Complete)
1592 .await?;
1593 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1594 attempts += 1;
1595 continue;
1596 }
1597 return Err(ApiError::AuthenticationFailed {
1598 provider: self.name().to_string(),
1599 details: error_text,
1600 });
1601 }
1602
1603 return Err(match status.as_u16() {
1604 401 | 403 => ApiError::AuthenticationFailed {
1605 provider: self.name().to_string(),
1606 details: error_text,
1607 },
1608 429 => ApiError::RateLimited {
1609 provider: self.name().to_string(),
1610 details: error_text,
1611 },
1612 400..=499 => ApiError::InvalidRequest {
1613 provider: self.name().to_string(),
1614 details: error_text,
1615 },
1616 500..=599 => ApiError::ServerError {
1617 provider: self.name().to_string(),
1618 status_code: status.as_u16(),
1619 details: error_text,
1620 },
1621 _ => ApiError::Unknown {
1622 provider: self.name().to_string(),
1623 details: error_text,
1624 },
1625 });
1626 }
1627
1628 let response_text = tokio::select! {
1629 biased;
1630 () = token.cancelled() => {
1631 debug!(target: "claude::complete", "Cancellation token triggered while reading successful response body.");
1632 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1633 }
1634 text_res = response.text() => {
1635 text_res?
1636 }
1637 };
1638
1639 let claude_completion: ClaudeCompletionResponse = serde_json::from_str(&response_text)
1640 .map_err(|e| ApiError::ResponseParsingError {
1641 provider: self.name().to_string(),
1642 details: format!("Error: {e}, Body: {response_text}"),
1643 })?;
1644 let completion = CompletionResponse {
1645 content: convert_claude_content(claude_completion.content),
1646 };
1647
1648 return Ok(completion);
1649 }
1650 }
1651
1652 async fn stream_complete(
1653 &self,
1654 model_id: &ModelId,
1655 messages: Vec<AppMessage>,
1656 system: Option<SystemContext>,
1657 tools: Option<Vec<ToolSchema>>,
1658 call_options: Option<ModelParameters>,
1659 token: CancellationToken,
1660 ) -> Result<CompletionStream, ApiError> {
1661 let mut claude_messages = convert_messages(messages)?;
1662 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1663
1664 if claude_messages.is_empty() {
1665 return Err(ApiError::InvalidRequest {
1666 provider: self.name().to_string(),
1667 details: "No messages provided".to_string(),
1668 });
1669 }
1670
1671 let last_message = claude_messages
1672 .last_mut()
1673 .ok_or_else(|| ApiError::InvalidRequest {
1674 provider: self.name().to_string(),
1675 details: "No messages provided".to_string(),
1676 })?;
1677 let cache_setting = Some(CacheControl {
1678 cache_type: "ephemeral".to_string(),
1679 });
1680
1681 let instruction_policy = match &self.auth {
1682 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1683 AuthMode::ApiKey(_) => None,
1684 };
1685 let system_text = apply_instruction_policy(system, instruction_policy);
1686 let system_content = build_system_content(system_text, cache_setting.clone());
1687
1688 match &mut last_message.content {
1689 ClaudeMessageContent::StructuredContent { content } => {
1690 for block in &mut content.0 {
1691 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1692 cache_control.clone_from(&cache_setting);
1693 }
1694 }
1695 }
1696 ClaudeMessageContent::Text { content } => {
1697 let text_content = content.clone();
1698 last_message.content = ClaudeMessageContent::StructuredContent {
1699 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1700 text: text_content,
1701 cache_control: cache_setting,
1702 extra: Default::default(),
1703 }]),
1704 };
1705 }
1706 }
1707
1708 let supports_thinking = call_options
1709 .as_ref()
1710 .and_then(|opts| opts.thinking_config.as_ref())
1711 .is_some_and(|tc| tc.enabled);
1712
1713 let request = if supports_thinking {
1714 let budget = call_options
1715 .as_ref()
1716 .and_then(|o| o.thinking_config)
1717 .and_then(|tc| tc.budget_tokens)
1718 .unwrap_or(4000);
1719 let thinking = Some(Thinking {
1720 thinking_type: ThinkingType::Enabled,
1721 budget_tokens: budget,
1722 });
1723 CompletionRequest {
1724 model: model_id.id.clone(),
1725 messages: claude_messages,
1726 max_tokens: call_options
1727 .as_ref()
1728 .and_then(|o| o.max_tokens)
1729 .map_or(32_000, |v| v as usize),
1730 system: system_content.clone(),
1731 tools,
1732 temperature: call_options
1733 .as_ref()
1734 .and_then(|o| o.temperature)
1735 .or(Some(1.0)),
1736 top_p: call_options.as_ref().and_then(|o| o.top_p),
1737 top_k: None,
1738 stream: Some(true),
1739 thinking,
1740 }
1741 } else {
1742 CompletionRequest {
1743 model: model_id.id.clone(),
1744 messages: claude_messages,
1745 max_tokens: call_options
1746 .as_ref()
1747 .and_then(|o| o.max_tokens)
1748 .map_or(8000, |v| v as usize),
1749 system: system_content,
1750 tools,
1751 temperature: call_options
1752 .as_ref()
1753 .and_then(|o| o.temperature)
1754 .or(Some(0.7)),
1755 top_p: call_options.as_ref().and_then(|o| o.top_p),
1756 top_k: None,
1757 stream: Some(true),
1758 thinking: None,
1759 }
1760 };
1761
1762 let auth_ctx = auth_header_context(model_id, RequestKind::Stream);
1763 let mut attempts = 0;
1764
1765 loop {
1766 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1767 let url = self.request_url()?;
1768 let mut request_builder = self.http_client.post(&url).json(&request);
1769
1770 for (name, value) in auth_headers {
1771 request_builder = request_builder.header(&name, &value);
1772 }
1773
1774 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1775 request_builder =
1776 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1777 }
1778
1779 let response = tokio::select! {
1780 biased;
1781 () = token.cancelled() => {
1782 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1783 }
1784 res = request_builder.send() => {
1785 res?
1786 }
1787 };
1788
1789 let status = response.status();
1790 if !status.is_success() {
1791 let error_text = tokio::select! {
1792 biased;
1793 () = token.cancelled() => {
1794 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1795 }
1796 text_res = response.text() => {
1797 text_res?
1798 }
1799 };
1800
1801 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1802 let action = self
1803 .on_auth_error(status.as_u16(), &error_text, RequestKind::Stream)
1804 .await?;
1805 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1806 attempts += 1;
1807 continue;
1808 }
1809 return Err(ApiError::AuthenticationFailed {
1810 provider: self.name().to_string(),
1811 details: error_text,
1812 });
1813 }
1814
1815 return Err(match status.as_u16() {
1816 401 | 403 => ApiError::AuthenticationFailed {
1817 provider: self.name().to_string(),
1818 details: error_text,
1819 },
1820 429 => ApiError::RateLimited {
1821 provider: self.name().to_string(),
1822 details: error_text,
1823 },
1824 400..=499 => ApiError::InvalidRequest {
1825 provider: self.name().to_string(),
1826 details: error_text,
1827 },
1828 500..=599 => ApiError::ServerError {
1829 provider: self.name().to_string(),
1830 status_code: status.as_u16(),
1831 details: error_text,
1832 },
1833 _ => ApiError::Unknown {
1834 provider: self.name().to_string(),
1835 details: error_text,
1836 },
1837 });
1838 }
1839
1840 let byte_stream = response.bytes_stream();
1841 let sse_stream = parse_sse_stream(byte_stream);
1842
1843 let stream = convert_claude_stream(sse_stream, token);
1844
1845 return Ok(Box::pin(stream));
1846 }
1847 }
1848}
1849
1850fn auth_header_context(model_id: &ModelId, request_kind: RequestKind) -> AuthHeaderContext {
1851 AuthHeaderContext {
1852 model_id: Some(AuthModelId {
1853 provider_id: AuthProviderId(model_id.provider.as_str().to_string()),
1854 model_id: model_id.id.clone(),
1855 }),
1856 request_kind,
1857 }
1858}
1859
1860fn is_auth_status(status: reqwest::StatusCode) -> bool {
1861 matches!(
1862 status,
1863 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
1864 )
1865}
1866
1867fn truncate_body(body: &str) -> String {
1868 const LIMIT: usize = 512;
1869 let mut chars = body.chars();
1870 let truncated: String = chars.by_ref().take(LIMIT).collect();
1871 if chars.next().is_some() {
1872 format!("{truncated}...")
1873 } else {
1874 truncated
1875 }
1876}
1877
1878fn apply_instruction_policy(
1879 system: Option<SystemContext>,
1880 policy: Option<&InstructionPolicy>,
1881) -> Option<String> {
1882 let base = system.as_ref().and_then(|context| {
1883 let trimmed = context.prompt.trim();
1884 if trimmed.is_empty() {
1885 None
1886 } else {
1887 Some(trimmed.to_string())
1888 }
1889 });
1890
1891 let context = system
1892 .as_ref()
1893 .and_then(|context| context.render_with_prompt(base.clone()));
1894
1895 match policy {
1896 None => context,
1897 Some(InstructionPolicy::Prefix(prefix)) => {
1898 if let Some(context) = context {
1899 Some(format!("{prefix}\n{context}"))
1900 } else {
1901 Some(prefix.clone())
1902 }
1903 }
1904 Some(InstructionPolicy::DefaultIfEmpty(default)) => {
1905 if context.is_some() {
1906 context
1907 } else {
1908 Some(default.clone())
1909 }
1910 }
1911 Some(InstructionPolicy::Override(override_text)) => {
1912 let mut combined = override_text.clone();
1913 if let Some(system) = system.as_ref() {
1914 let overlay = system.prompt.trim();
1915 if !overlay.is_empty() {
1916 combined.push_str("\n\n## Operating Mode\n");
1917 combined.push_str(overlay);
1918 }
1919
1920 let env = system
1921 .environment
1922 .as_ref()
1923 .map(|env| env.as_context())
1924 .map(|value| value.trim().to_string())
1925 .filter(|value| !value.is_empty());
1926 if let Some(env) = env {
1927 combined.push_str("\n\n");
1928 combined.push_str(&env);
1929 }
1930 }
1931 Some(combined)
1932 }
1933 }
1934}
1935
1936fn build_system_content(
1937 system: Option<String>,
1938 cache_setting: Option<CacheControl>,
1939) -> Option<System> {
1940 system.map(|text| {
1941 System::Content(vec![SystemContentBlock {
1942 content_type: "text".to_string(),
1943 text,
1944 cache_control: cache_setting,
1945 }])
1946 })
1947}
1948
1949#[derive(Debug)]
1950enum BlockState {
1951 Text {
1952 text: String,
1953 },
1954 Thinking {
1955 text: String,
1956 signature: Option<String>,
1957 },
1958 ToolUse {
1959 id: String,
1960 name: String,
1961 input: String,
1962 },
1963 Unknown,
1964}
1965
1966fn block_state_to_content(state: BlockState) -> Option<AssistantContent> {
1967 match state {
1968 BlockState::Text { text } => {
1969 if text.is_empty() {
1970 None
1971 } else {
1972 Some(AssistantContent::Text { text })
1973 }
1974 }
1975 BlockState::Thinking { text, signature } => {
1976 if text.is_empty() {
1977 None
1978 } else {
1979 let thought = if let Some(sig) = signature {
1980 ThoughtContent::Signed {
1981 text,
1982 signature: sig,
1983 }
1984 } else {
1985 ThoughtContent::Simple { text }
1986 };
1987 Some(AssistantContent::Thought { thought })
1988 }
1989 }
1990 BlockState::ToolUse { id, name, input } => {
1991 if id.is_empty() || name.is_empty() {
1992 None
1993 } else {
1994 let parameters: serde_json::Value = serde_json::from_str(&input)
1995 .unwrap_or(serde_json::Value::Object(Default::default()));
1996 Some(AssistantContent::ToolCall {
1997 tool_call: ToolCall {
1998 id,
1999 name,
2000 parameters,
2001 },
2002 thought_signature: None,
2003 })
2004 }
2005 }
2006 BlockState::Unknown => None,
2007 }
2008}
2009
2010fn convert_claude_stream(
2011 sse_stream: crate::api::sse::SseStream,
2012 token: CancellationToken,
2013) -> impl futures_core::Stream<Item = StreamChunk> + Send {
2014 async_stream::stream! {
2015 let mut block_states: std::collections::HashMap<usize, BlockState> =
2016 std::collections::HashMap::new();
2017 let mut completed_content: Vec<AssistantContent> = Vec::new();
2018
2019 tokio::pin!(sse_stream);
2020
2021 while let Some(event_result) = sse_stream.next().await {
2022 if token.is_cancelled() {
2023 yield StreamChunk::Error(StreamError::Cancelled);
2024 break;
2025 }
2026
2027 let event = match event_result {
2028 Ok(e) => e,
2029 Err(e) => {
2030 yield StreamChunk::Error(StreamError::SseParse(e));
2031 break;
2032 }
2033 };
2034
2035 let parsed: Result<ClaudeStreamEvent, _> = serde_json::from_str(&event.data);
2036 let stream_event = match parsed {
2037 Ok(e) => e,
2038 Err(_) => continue,
2039 };
2040
2041 match stream_event {
2042 ClaudeStreamEvent::ContentBlockStart { index, content_block } => {
2043 match content_block.block_type.as_str() {
2044 "text" => {
2045 let text = content_block.text.unwrap_or_default();
2046 if !text.is_empty() {
2047 yield StreamChunk::TextDelta(text.clone());
2048 }
2049 block_states.insert(index, BlockState::Text { text });
2050 }
2051 "thinking" => {
2052 block_states.insert(
2053 index,
2054 BlockState::Thinking {
2055 text: String::new(),
2056 signature: None,
2057 },
2058 );
2059 }
2060 "tool_use" => {
2061 let id = content_block.id.unwrap_or_default();
2062 let name = content_block.name.unwrap_or_default();
2063 if !id.is_empty() && !name.is_empty() {
2064 yield StreamChunk::ToolUseStart {
2065 id: id.clone(),
2066 name: name.clone(),
2067 };
2068 }
2069 block_states.insert(
2070 index,
2071 BlockState::ToolUse {
2072 id,
2073 name,
2074 input: String::new(),
2075 },
2076 );
2077 }
2078 _ => {
2079 block_states.insert(index, BlockState::Unknown);
2080 }
2081 }
2082 }
2083 ClaudeStreamEvent::ContentBlockDelta { index, delta } => match delta {
2084 ClaudeDelta::Text { text } => {
2085 match block_states.get_mut(&index) {
2086 Some(BlockState::Text { text: buf }) => buf.push_str(&text),
2087 _ => {
2088 block_states.insert(index, BlockState::Text { text: text.clone() });
2089 }
2090 }
2091 yield StreamChunk::TextDelta(text);
2092 }
2093 ClaudeDelta::Thinking { thinking } => {
2094 match block_states.get_mut(&index) {
2095 Some(BlockState::Thinking { text, .. }) => text.push_str(&thinking),
2096 _ => {
2097 block_states.insert(
2098 index,
2099 BlockState::Thinking {
2100 text: thinking.clone(),
2101 signature: None,
2102 },
2103 );
2104 }
2105 }
2106 yield StreamChunk::ThinkingDelta(thinking);
2107 }
2108 ClaudeDelta::Signature { signature } => {
2109 if let Some(BlockState::Thinking { signature: sig, .. }) =
2110 block_states.get_mut(&index)
2111 {
2112 *sig = Some(signature);
2113 }
2114 }
2115 ClaudeDelta::InputJson { partial_json } => {
2116 if let Some(BlockState::ToolUse { id, input, .. }) =
2117 block_states.get_mut(&index)
2118 {
2119 input.push_str(&partial_json);
2120 if !id.is_empty() {
2121 yield StreamChunk::ToolUseInputDelta {
2122 id: id.clone(),
2123 delta: partial_json,
2124 };
2125 }
2126 }
2127 }
2128 },
2129 ClaudeStreamEvent::ContentBlockStop { index } => {
2130 if let Some(state) = block_states.remove(&index)
2131 && let Some(content) = block_state_to_content(state)
2132 {
2133 completed_content.push(content);
2134 }
2135 yield StreamChunk::ContentBlockStop { index };
2136 }
2137 ClaudeStreamEvent::MessageStop => {
2138 if !block_states.is_empty() {
2139 tracing::warn!(
2140 target: "anthropic::stream",
2141 "MessageStop received with {} unfinished content blocks",
2142 block_states.len()
2143 );
2144 }
2145 let content = std::mem::take(&mut completed_content);
2146 yield StreamChunk::MessageComplete(CompletionResponse { content });
2147 break;
2148 }
2149 ClaudeStreamEvent::Error { error } => {
2150 yield StreamChunk::Error(StreamError::Provider {
2151 provider: "anthropic".into(),
2152 error_type: error.error_type,
2153 message: error.message,
2154 });
2155 }
2156 ClaudeStreamEvent::MessageStart { .. }
2157 | ClaudeStreamEvent::MessageDelta { .. }
2158 | ClaudeStreamEvent::Ping => {}
2159 }
2160 }
2161 }
2162}