Skip to main content

steer_core/api/claude/
client.rs

1use async_trait::async_trait;
2use futures_util::StreamExt;
3use reqwest::{self, header};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use strum_macros::Display;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, warn};
9
10use crate::api::error::StreamError;
11use crate::api::provider::{CompletionStream, StreamChunk};
12use crate::api::sse::parse_sse_stream;
13use crate::api::{CompletionResponse, Provider, error::ApiError};
14use crate::app::SystemContext;
15use crate::app::conversation::{
16    AssistantContent, Message as AppMessage, ThoughtContent, ToolResult, UserContent,
17};
18use crate::auth::{
19    AnthropicAuth, AuthErrorAction, AuthErrorContext, AuthHeaderContext, InstructionPolicy,
20    RequestKind,
21};
22use crate::auth::{ModelId as AuthModelId, ProviderId as AuthProviderId};
23use crate::config::model::{ModelId, ModelParameters};
24use steer_tools::{InputSchema, ToolCall, ToolSchema};
25
26const API_URL: &str = "https://api.anthropic.com/v1/messages";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display)]
29pub enum ClaudeMessageRole {
30    #[serde(rename = "user")]
31    #[strum(serialize = "user")]
32    User,
33    #[serde(rename = "assistant")]
34    #[strum(serialize = "assistant")]
35    Assistant,
36    #[serde(rename = "tool")]
37    #[strum(serialize = "tool")]
38    Tool,
39}
40
41/// Represents a message to be sent to the Claude API
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct ClaudeMessage {
44    pub role: ClaudeMessageRole,
45    #[serde(flatten)]
46    pub content: ClaudeMessageContent,
47    #[serde(skip_serializing)]
48    pub id: Option<String>,
49}
50
51/// Content types for Claude API messages
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(untagged)]
54pub enum ClaudeMessageContent {
55    /// Simple text content
56    Text { content: String },
57    /// Structured content for tool results or other special content
58    StructuredContent { content: ClaudeStructuredContent },
59}
60
61/// Represents structured content blocks for messages
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(transparent)]
64pub struct ClaudeStructuredContent(pub Vec<ClaudeContentBlock>);
65
66#[derive(Clone)]
67enum AuthMode {
68    ApiKey(String),
69    Directive(AnthropicAuth),
70}
71
72#[derive(Clone)]
73pub struct AnthropicClient {
74    http_client: reqwest::Client,
75    auth: AuthMode,
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
79#[serde(rename_all = "lowercase")]
80#[derive(Default)]
81enum ThinkingType {
82    #[default]
83    Enabled,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
87struct Thinking {
88    #[serde(rename = "type", default)]
89    thinking_type: ThinkingType,
90    budget_tokens: u32,
91}
92
93#[derive(Debug, Serialize, Clone)]
94struct SystemContentBlock {
95    #[serde(rename = "type")]
96    content_type: String,
97    text: String,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    cache_control: Option<CacheControl>,
100}
101
102#[derive(Debug, Serialize, Clone)]
103#[serde(untagged)]
104enum System {
105    // Structured system prompt represented as a list of content blocks
106    Content(Vec<SystemContentBlock>),
107}
108
109#[derive(Debug, Serialize)]
110struct CompletionRequest {
111    model: String,
112    messages: Vec<ClaudeMessage>,
113    max_tokens: usize,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    system: Option<System>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    tools: Option<Vec<ClaudeTool>>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    temperature: Option<f32>,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    top_p: Option<f32>,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    top_k: Option<usize>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    stream: Option<bool>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    thinking: Option<Thinking>,
128}
129
130#[derive(Debug, Serialize, Clone)]
131struct ClaudeTool {
132    name: String,
133    description: String,
134    input_schema: InputSchema,
135}
136
137impl From<ToolSchema> for ClaudeTool {
138    fn from(tool: ToolSchema) -> Self {
139        let tool = adapt_tool_schema_for_claude(tool);
140        Self {
141            name: tool.name,
142            description: tool.description,
143            input_schema: tool.input_schema,
144        }
145    }
146}
147
148#[derive(Debug, Serialize, Deserialize, Clone)]
149struct ClaudeCompletionResponse {
150    id: String,
151    content: Vec<ClaudeContentBlock>,
152    model: String,
153    role: String,
154    #[serde(default)]
155    stop_reason: Option<String>,
156    #[serde(default)]
157    stop_sequence: Option<String>,
158    #[serde(default)]
159    usage: ClaudeUsage,
160    // Allow other fields for API flexibility
161    #[serde(flatten)]
162    extra: std::collections::HashMap<String, serde_json::Value>,
163}
164
165fn adapt_tool_schema_for_claude(tool: ToolSchema) -> ToolSchema {
166    let root_schema = tool.input_schema.as_value();
167    let sanitized = sanitize_for_claude(root_schema, root_schema);
168    ToolSchema {
169        input_schema: InputSchema::new(sanitized),
170        ..tool
171    }
172}
173
174fn decode_pointer_segment(segment: &str) -> std::borrow::Cow<'_, str> {
175    if !segment.contains('~') {
176        return std::borrow::Cow::Borrowed(segment);
177    }
178    std::borrow::Cow::Owned(segment.replace("~1", "/").replace("~0", "~"))
179}
180
181fn resolve_ref<'a>(root: &'a Value, reference: &str) -> Option<&'a Value> {
182    let path = reference.strip_prefix("#/")?;
183    let mut current = root;
184    for segment in path.split('/') {
185        let decoded = decode_pointer_segment(segment);
186        current = current.get(decoded.as_ref())?;
187    }
188    Some(current)
189}
190
191fn infer_type_from_enum(values: &[Value]) -> Option<String> {
192    let mut has_string = false;
193    let mut has_number = false;
194    let mut has_bool = false;
195    let mut has_object = false;
196    let mut has_array = false;
197
198    for value in values {
199        match value {
200            Value::String(_) => has_string = true,
201            Value::Number(_) => has_number = true,
202            Value::Bool(_) => has_bool = true,
203            Value::Object(_) => has_object = true,
204            Value::Array(_) => has_array = true,
205            Value::Null => {}
206        }
207    }
208
209    let kind_count = u8::from(has_string)
210        + u8::from(has_number)
211        + u8::from(has_bool)
212        + u8::from(has_object)
213        + u8::from(has_array);
214
215    if kind_count != 1 {
216        return None;
217    }
218
219    if has_string {
220        Some("string".to_string())
221    } else if has_number {
222        Some("number".to_string())
223    } else if has_bool {
224        Some("boolean".to_string())
225    } else if has_object {
226        Some("object".to_string())
227    } else if has_array {
228        Some("array".to_string())
229    } else {
230        None
231    }
232}
233
234fn normalize_type(value: &Value) -> Value {
235    if let Some(type_str) = value.as_str() {
236        return Value::String(type_str.to_string());
237    }
238
239    if let Some(type_array) = value.as_array()
240        && let Some(primary_type) = type_array
241            .iter()
242            .find_map(|v| if v.is_null() { None } else { v.as_str() })
243    {
244        return Value::String(primary_type.to_string());
245    }
246
247    Value::String("string".to_string())
248}
249
250fn extract_enum_values(value: &Value) -> Vec<Value> {
251    let Some(obj) = value.as_object() else {
252        return Vec::new();
253    };
254
255    if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
256        return enum_values
257            .iter()
258            .filter(|v| !v.is_null())
259            .cloned()
260            .collect();
261    }
262
263    if let Some(const_value) = obj.get("const") {
264        if const_value.is_null() {
265            return Vec::new();
266        }
267        return vec![const_value.clone()];
268    }
269
270    Vec::new()
271}
272
273fn merge_property(properties: &mut serde_json::Map<String, Value>, key: &str, value: &Value) {
274    match properties.get_mut(key) {
275        None => {
276            properties.insert(key.to_string(), value.clone());
277        }
278        Some(existing) => {
279            if existing == value {
280                return;
281            }
282
283            let existing_values = extract_enum_values(existing);
284            let incoming_values = extract_enum_values(value);
285            if incoming_values.is_empty() && existing_values.is_empty() {
286                return;
287            }
288
289            let mut combined = existing_values;
290            for item in incoming_values {
291                if !combined.contains(&item) {
292                    combined.push(item);
293                }
294            }
295
296            if combined.is_empty() {
297                return;
298            }
299
300            if let Some(obj) = existing.as_object_mut() {
301                obj.remove("const");
302                obj.insert("enum".to_string(), Value::Array(combined.clone()));
303                if !obj.contains_key("type")
304                    && let Some(inferred) = infer_type_from_enum(&combined)
305                {
306                    obj.insert("type".to_string(), Value::String(inferred));
307                }
308            }
309        }
310    }
311}
312
313fn merge_union_schemas(
314    root: &Value,
315    variants: &[Value],
316    seen_refs: &mut std::collections::HashSet<String>,
317) -> Value {
318    let mut merged_props = serde_json::Map::new();
319    let mut required_intersection: Option<std::collections::BTreeSet<String>> = None;
320    let mut enum_values: Vec<Value> = Vec::new();
321    let mut type_candidates: Vec<String> = Vec::new();
322
323    for variant in variants {
324        let sanitized = sanitize_for_claude_inner(root, variant, seen_refs);
325
326        if let Some(schema_type) = sanitized.get("type").and_then(|v| v.as_str()) {
327            type_candidates.push(schema_type.to_string());
328        }
329
330        if let Some(props) = sanitized.get("properties").and_then(|v| v.as_object()) {
331            for (key, value) in props {
332                merge_property(&mut merged_props, key, value);
333            }
334        }
335
336        if let Some(req) = sanitized.get("required").and_then(|v| v.as_array()) {
337            let req_set: std::collections::BTreeSet<String> = req
338                .iter()
339                .filter_map(|item| item.as_str().map(|s| s.to_string()))
340                .collect();
341
342            required_intersection = match required_intersection.take() {
343                None => Some(req_set),
344                Some(existing) => Some(
345                    existing
346                        .intersection(&req_set)
347                        .cloned()
348                        .collect::<std::collections::BTreeSet<String>>(),
349                ),
350            };
351        }
352
353        if let Some(values) = sanitized.get("enum").and_then(|v| v.as_array()) {
354            for value in values {
355                if value.is_null() {
356                    continue;
357                }
358                if !enum_values.contains(value) {
359                    enum_values.push(value.clone());
360                }
361            }
362        }
363    }
364
365    let schema_type = if !merged_props.is_empty() {
366        "object".to_string()
367    } else if let Some(inferred) = infer_type_from_enum(&enum_values) {
368        inferred
369    } else if let Some(first) = type_candidates.first() {
370        first.clone()
371    } else {
372        "string".to_string()
373    };
374
375    let mut merged = serde_json::Map::new();
376    merged.insert("type".to_string(), Value::String(schema_type));
377
378    if !merged_props.is_empty() {
379        merged.insert("properties".to_string(), Value::Object(merged_props));
380    }
381
382    if let Some(required_set) = required_intersection
383        && !required_set.is_empty()
384    {
385        merged.insert(
386            "required".to_string(),
387            Value::Array(
388                required_set
389                    .into_iter()
390                    .map(Value::String)
391                    .collect::<Vec<_>>(),
392            ),
393        );
394    }
395
396    if !enum_values.is_empty() {
397        merged.insert("enum".to_string(), Value::Array(enum_values));
398    }
399
400    Value::Object(merged)
401}
402
403fn sanitize_for_claude(root: &Value, schema: &Value) -> Value {
404    let mut seen_refs = std::collections::HashSet::new();
405    sanitize_for_claude_inner(root, schema, &mut seen_refs)
406}
407
408fn fallback_schema() -> Value {
409    let mut out = serde_json::Map::new();
410    out.insert("type".to_string(), Value::String("object".to_string()));
411    out.insert(
412        "properties".to_string(),
413        Value::Object(serde_json::Map::new()),
414    );
415    Value::Object(out)
416}
417
418fn sanitize_for_claude_inner(
419    root: &Value,
420    schema: &Value,
421    seen_refs: &mut std::collections::HashSet<String>,
422) -> Value {
423    if let Some(reference) = schema.get("$ref").and_then(|v| v.as_str()) {
424        if !seen_refs.insert(reference.to_string()) {
425            return fallback_schema();
426        }
427        if let Some(resolved) = resolve_ref(root, reference) {
428            let sanitized = sanitize_for_claude_inner(root, resolved, seen_refs);
429            seen_refs.remove(reference);
430            return sanitized;
431        }
432        seen_refs.remove(reference);
433    }
434
435    let Some(obj) = schema.as_object() else {
436        return schema.clone();
437    };
438
439    if let Some(union) = obj
440        .get("oneOf")
441        .or_else(|| obj.get("anyOf"))
442        .or_else(|| obj.get("allOf"))
443        .and_then(|v| v.as_array())
444    {
445        return merge_union_schemas(root, union, seen_refs);
446    }
447
448    let mut out = serde_json::Map::new();
449    for (key, value) in obj {
450        match key.as_str() {
451            "$ref"
452            | "$defs"
453            | "oneOf"
454            | "anyOf"
455            | "allOf"
456            | "const"
457            | "additionalProperties"
458            | "default"
459            | "examples"
460            | "title"
461            | "pattern"
462            | "minLength"
463            | "maxLength"
464            | "minimum"
465            | "maximum"
466            | "minItems"
467            | "maxItems"
468            | "uniqueItems"
469            | "deprecated" => {}
470            "type" => {
471                out.insert("type".to_string(), normalize_type(value));
472            }
473            "properties" => {
474                if let Some(props) = value.as_object() {
475                    let mut sanitized_props = serde_json::Map::new();
476                    for (prop_key, prop_value) in props {
477                        sanitized_props.insert(
478                            prop_key.clone(),
479                            sanitize_for_claude_inner(root, prop_value, seen_refs),
480                        );
481                    }
482                    out.insert("properties".to_string(), Value::Object(sanitized_props));
483                }
484            }
485            "items" => {
486                if let Some(items) = value.as_array() {
487                    let merged = merge_union_schemas(root, items, seen_refs);
488                    out.insert("items".to_string(), merged);
489                } else {
490                    out.insert(
491                        "items".to_string(),
492                        sanitize_for_claude_inner(root, value, seen_refs),
493                    );
494                }
495            }
496            "enum" => {
497                let values = value
498                    .as_array()
499                    .map(|items| {
500                        items
501                            .iter()
502                            .filter(|v| !v.is_null())
503                            .cloned()
504                            .collect::<Vec<_>>()
505                    })
506                    .unwrap_or_default();
507                out.insert("enum".to_string(), Value::Array(values));
508            }
509            _ => {
510                out.insert(
511                    key.clone(),
512                    sanitize_for_claude_inner(root, value, seen_refs),
513                );
514            }
515        }
516    }
517
518    if let Some(const_value) = obj.get("const")
519        && !const_value.is_null()
520    {
521        out.insert("enum".to_string(), Value::Array(vec![const_value.clone()]));
522        if !out.contains_key("type")
523            && let Some(inferred) = infer_type_from_enum(std::slice::from_ref(const_value))
524        {
525            out.insert("type".to_string(), Value::String(inferred));
526        }
527    }
528
529    if out.get("type") == Some(&Value::String("object".to_string()))
530        && !out.contains_key("properties")
531    {
532        out.insert(
533            "properties".to_string(),
534            Value::Object(serde_json::Map::new()),
535        );
536    }
537
538    if !out.contains_key("type") {
539        if out.contains_key("properties") {
540            out.insert("type".to_string(), Value::String("object".to_string()));
541        } else if out.contains_key("items") {
542            out.insert("type".to_string(), Value::String("array".to_string()));
543        } else if let Some(enum_values) = out.get("enum").and_then(|v| v.as_array())
544            && let Some(inferred) = infer_type_from_enum(enum_values)
545        {
546            out.insert("type".to_string(), Value::String(inferred));
547        }
548    }
549
550    Value::Object(out)
551}
552
553fn default_cache_type() -> String {
554    "ephemeral".to_string()
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use serde_json::json;
561
562    #[test]
563    fn sanitize_handles_recursive_ref() {
564        let schema = json!({
565            "$defs": {
566                "node": {
567                    "type": "object",
568                    "properties": {
569                        "next": { "$ref": "#/$defs/node" }
570                    }
571                }
572            },
573            "$ref": "#/$defs/node"
574        });
575
576        let sanitized = sanitize_for_claude(&schema, &schema);
577        let next = sanitized
578            .get("properties")
579            .and_then(|v| v.get("next"))
580            .and_then(|v| v.get("type"))
581            .and_then(|v| v.as_str());
582
583        assert_eq!(next, Some("object"));
584    }
585
586    #[test]
587    fn sanitize_collapses_tuple_items() {
588        let schema = json!({
589            "type": "array",
590            "items": [
591                { "type": "string" },
592                { "type": "number" }
593            ]
594        });
595
596        let sanitized = sanitize_for_claude(&schema, &schema);
597        let items = sanitized.get("items");
598
599        assert!(matches!(items, Some(Value::Object(_))));
600    }
601
602    #[test]
603    fn sanitize_removes_unsupported_keywords() {
604        let schema = json!({
605            "type": "object",
606            "title": "ignored",
607            "additionalProperties": false,
608            "properties": {
609                "name": {
610                    "type": "string",
611                    "pattern": "^[a-z]+$",
612                    "default": "x"
613                }
614            }
615        });
616
617        let sanitized = sanitize_for_claude(&schema, &schema);
618        let expected = json!({
619            "type": "object",
620            "properties": {
621                "name": { "type": "string" }
622            }
623        });
624
625        assert_eq!(sanitized, expected);
626    }
627
628    #[test]
629    fn sanitize_converts_const_to_enum_with_type() {
630        let schema = json!({
631            "const": "fixed"
632        });
633
634        let sanitized = sanitize_for_claude(&schema, &schema);
635        let expected = json!({
636            "enum": ["fixed"],
637            "type": "string"
638        });
639
640        assert_eq!(sanitized, expected);
641    }
642
643    #[test]
644    fn sanitize_filters_null_enum_values() {
645        let schema = json!({
646            "enum": ["a", null, "b"]
647        });
648
649        let sanitized = sanitize_for_claude(&schema, &schema);
650        let expected = json!({
651            "enum": ["a", "b"],
652            "type": "string"
653        });
654
655        assert_eq!(sanitized, expected);
656    }
657
658    #[test]
659    fn sanitize_decodes_json_pointer_refs() {
660        let schema = json!({
661            "$defs": {
662                "a/b": { "type": "string" }
663            },
664            "$ref": "#/$defs/a~1b"
665        });
666
667        let sanitized = sanitize_for_claude(&schema, &schema);
668        let expected = json!({
669            "type": "string"
670        });
671
672        assert_eq!(sanitized, expected);
673    }
674
675    #[test]
676    fn sanitize_merges_union_properties_and_required() {
677        let schema = json!({
678            "oneOf": [
679                {
680                    "type": "object",
681                    "properties": {
682                        "a": { "type": "string" },
683                        "b": { "type": "string" }
684                    },
685                    "required": ["a"]
686                },
687                {
688                    "type": "object",
689                    "properties": {
690                        "a": { "type": "string" },
691                        "c": { "type": "string" }
692                    },
693                    "required": ["a", "c"]
694                }
695            ]
696        });
697
698        let sanitized = sanitize_for_claude(&schema, &schema);
699        let expected = json!({
700            "type": "object",
701            "properties": {
702                "a": { "type": "string" },
703                "b": { "type": "string" },
704                "c": { "type": "string" }
705            },
706            "required": ["a"]
707        });
708
709        assert_eq!(sanitized, expected);
710    }
711
712    #[test]
713    fn sanitize_infers_array_type_from_items() {
714        let schema = json!({
715            "items": {
716                "type": "string"
717            }
718        });
719
720        let sanitized = sanitize_for_claude(&schema, &schema);
721        let expected = json!({
722            "type": "array",
723            "items": { "type": "string" }
724        });
725
726        assert_eq!(sanitized, expected);
727    }
728}
729
730#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
731pub struct CacheControl {
732    #[serde(rename = "type", default = "default_cache_type")]
733    cache_type: String,
734}
735
736#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
737#[serde(tag = "type")]
738pub enum ClaudeContentBlock {
739    #[serde(rename = "text")]
740    Text {
741        text: String,
742        #[serde(skip_serializing_if = "Option::is_none")]
743        cache_control: Option<CacheControl>,
744        #[serde(flatten)]
745        extra: std::collections::HashMap<String, serde_json::Value>,
746    },
747    #[serde(rename = "tool_use")]
748    ToolUse {
749        id: String,
750        name: String,
751        input: serde_json::Value,
752        #[serde(skip_serializing_if = "Option::is_none")]
753        cache_control: Option<CacheControl>,
754        #[serde(flatten)]
755        extra: std::collections::HashMap<String, serde_json::Value>,
756    },
757    #[serde(rename = "tool_result")]
758    ToolResult {
759        tool_use_id: String,
760        content: Vec<ClaudeContentBlock>,
761        #[serde(skip_serializing_if = "Option::is_none")]
762        cache_control: Option<CacheControl>,
763        #[serde(skip_serializing_if = "Option::is_none")]
764        is_error: Option<bool>,
765        #[serde(flatten)]
766        extra: std::collections::HashMap<String, serde_json::Value>,
767    },
768    #[serde(rename = "thinking")]
769    Thinking {
770        thinking: String,
771        signature: String,
772        #[serde(skip_serializing_if = "Option::is_none")]
773        cache_control: Option<CacheControl>,
774        #[serde(flatten)]
775        extra: std::collections::HashMap<String, serde_json::Value>,
776    },
777    #[serde(rename = "redacted_thinking")]
778    RedactedThinking {
779        data: String,
780        #[serde(skip_serializing_if = "Option::is_none")]
781        cache_control: Option<CacheControl>,
782        #[serde(flatten)]
783        extra: std::collections::HashMap<String, serde_json::Value>,
784    },
785    #[serde(other)]
786    Unknown,
787}
788
789#[derive(Debug, Serialize, Deserialize, Default, Clone)]
790struct ClaudeUsage {
791    #[serde(rename = "input_tokens")]
792    input: usize,
793    #[serde(rename = "output_tokens")]
794    output: usize,
795    #[serde(rename = "cache_creation_input_tokens")]
796    #[serde(skip_serializing_if = "Option::is_none")]
797    cache_creation_input: Option<usize>,
798    #[serde(rename = "cache_read_input_tokens")]
799    #[serde(skip_serializing_if = "Option::is_none")]
800    cache_read_input: Option<usize>,
801}
802
803#[derive(Debug, Deserialize)]
804#[serde(tag = "type")]
805enum ClaudeStreamEvent {
806    #[serde(rename = "message_start")]
807    MessageStart {
808        #[expect(dead_code)]
809        message: ClaudeMessageStart,
810    },
811    #[serde(rename = "content_block_start")]
812    ContentBlockStart {
813        index: usize,
814        content_block: ClaudeContentBlockStart,
815    },
816    #[serde(rename = "content_block_delta")]
817    ContentBlockDelta { index: usize, delta: ClaudeDelta },
818    #[serde(rename = "content_block_stop")]
819    ContentBlockStop { index: usize },
820    #[serde(rename = "message_delta")]
821    MessageDelta {
822        #[expect(dead_code)]
823        delta: ClaudeMessageDeltaData,
824        #[expect(dead_code)]
825        #[serde(default)]
826        usage: Option<ClaudeUsage>,
827    },
828    #[serde(rename = "message_stop")]
829    MessageStop,
830    #[serde(rename = "ping")]
831    Ping,
832    #[serde(rename = "error")]
833    Error { error: ClaudeStreamError },
834}
835
836#[derive(Debug, Deserialize)]
837struct ClaudeMessageStart {
838    #[expect(dead_code)]
839    #[serde(default)]
840    id: String,
841    #[expect(dead_code)]
842    #[serde(default)]
843    model: String,
844}
845
846#[derive(Debug, Deserialize)]
847struct ClaudeContentBlockStart {
848    #[serde(rename = "type")]
849    block_type: String,
850    #[serde(default)]
851    id: Option<String>,
852    #[serde(default)]
853    name: Option<String>,
854    #[serde(default)]
855    text: Option<String>,
856}
857
858#[derive(Debug, Deserialize)]
859#[serde(tag = "type")]
860enum ClaudeDelta {
861    #[serde(rename = "text_delta")]
862    Text { text: String },
863    #[serde(rename = "thinking_delta")]
864    Thinking { thinking: String },
865    #[serde(rename = "input_json_delta")]
866    InputJson { partial_json: String },
867    #[serde(rename = "signature_delta")]
868    Signature { signature: String },
869}
870
871#[derive(Debug, Deserialize)]
872struct ClaudeMessageDeltaData {
873    #[expect(dead_code)]
874    #[serde(default)]
875    stop_reason: Option<String>,
876}
877
878#[derive(Debug, Deserialize)]
879struct ClaudeStreamError {
880    #[serde(default)]
881    message: String,
882    #[serde(rename = "type", default)]
883    error_type: String,
884}
885
886impl AnthropicClient {
887    pub fn new(api_key: &str) -> Result<Self, ApiError> {
888        Self::with_api_key(api_key)
889    }
890
891    pub fn with_api_key(api_key: &str) -> Result<Self, ApiError> {
892        Ok(Self {
893            http_client: Self::build_http_client()?,
894            auth: AuthMode::ApiKey(api_key.to_string()),
895        })
896    }
897
898    pub fn with_directive(directive: AnthropicAuth) -> Result<Self, ApiError> {
899        Ok(Self {
900            http_client: Self::build_http_client()?,
901            auth: AuthMode::Directive(directive),
902        })
903    }
904
905    fn build_http_client() -> Result<reqwest::Client, ApiError> {
906        let mut headers = header::HeaderMap::new();
907        headers.insert(
908            "anthropic-version",
909            header::HeaderValue::from_static("2023-06-01"),
910        );
911        headers.insert(
912            header::CONTENT_TYPE,
913            header::HeaderValue::from_static("application/json"),
914        );
915
916        reqwest::Client::builder()
917            .default_headers(headers)
918            .build()
919            .map_err(ApiError::Network)
920    }
921
922    async fn auth_headers(
923        &self,
924        ctx: AuthHeaderContext,
925    ) -> Result<Vec<(String, String)>, ApiError> {
926        match &self.auth {
927            AuthMode::ApiKey(key) => Ok(vec![("x-api-key".to_string(), key.clone())]),
928            AuthMode::Directive(directive) => {
929                let header_pairs = directive
930                    .headers
931                    .headers(ctx)
932                    .await
933                    .map_err(|e| ApiError::AuthError(e.to_string()))?;
934                Ok(header_pairs
935                    .into_iter()
936                    .map(|pair| (pair.name, pair.value))
937                    .collect())
938            }
939        }
940    }
941
942    async fn on_auth_error(
943        &self,
944        status: u16,
945        body: &str,
946        request_kind: RequestKind,
947    ) -> Result<AuthErrorAction, ApiError> {
948        let AuthMode::Directive(directive) = &self.auth else {
949            return Ok(AuthErrorAction::NoAction);
950        };
951        let context = AuthErrorContext {
952            status: Some(status),
953            body_snippet: Some(truncate_body(body)),
954            request_kind,
955        };
956        directive
957            .headers
958            .on_auth_error(context)
959            .await
960            .map_err(|e| ApiError::AuthError(e.to_string()))
961    }
962
963    fn request_url(&self) -> Result<String, ApiError> {
964        let AuthMode::Directive(directive) = &self.auth else {
965            return Ok(API_URL.to_string());
966        };
967
968        let Some(query_params) = &directive.query_params else {
969            return Ok(API_URL.to_string());
970        };
971
972        if query_params.is_empty() {
973            return Ok(API_URL.to_string());
974        }
975
976        let mut url = url::Url::parse(API_URL)
977            .map_err(|e| ApiError::Configuration(format!("Invalid API_URL '{API_URL}': {e}")))?;
978        for param in query_params {
979            url.query_pairs_mut().append_pair(&param.name, &param.value);
980        }
981        Ok(url.to_string())
982    }
983}
984
985// Conversion functions start
986fn convert_messages(messages: Vec<AppMessage>) -> Result<Vec<ClaudeMessage>, ApiError> {
987    let claude_messages: Result<Vec<ClaudeMessage>, ApiError> =
988        messages.into_iter().map(convert_single_message).collect();
989
990    // Filter out any User messages that have empty content after removing app commands
991    claude_messages.map(|messages| {
992        messages
993            .into_iter()
994            .filter(|msg| {
995                match &msg.content {
996                    ClaudeMessageContent::Text { content } => !content.trim().is_empty(),
997                    ClaudeMessageContent::StructuredContent { .. } => true, // Keep all non-text messages
998                }
999            })
1000            .collect()
1001    })
1002}
1003
1004fn convert_single_message(msg: AppMessage) -> Result<ClaudeMessage, ApiError> {
1005    match &msg.data {
1006        crate::app::conversation::MessageData::User { content, .. } => {
1007            // Convert UserContent to Claude format
1008            let combined_text = content
1009                .iter()
1010                .map(|user_content| match user_content {
1011                    UserContent::Text { text } => text.clone(),
1012                    UserContent::CommandExecution {
1013                        command,
1014                        stdout,
1015                        stderr,
1016                        exit_code,
1017                    } => UserContent::format_command_execution_as_xml(
1018                        command, stdout, stderr, *exit_code,
1019                    ),
1020                })
1021                .collect::<Vec<_>>()
1022                .join("\n");
1023
1024            Ok(ClaudeMessage {
1025                role: ClaudeMessageRole::User,
1026                content: ClaudeMessageContent::Text {
1027                    content: combined_text,
1028                },
1029                id: Some(msg.id.clone()),
1030            })
1031        }
1032        crate::app::conversation::MessageData::Assistant { content, .. } => {
1033            // Convert AssistantContent to Claude blocks
1034            let claude_blocks: Vec<ClaudeContentBlock> = content
1035                .iter()
1036                .filter_map(|assistant_content| match assistant_content {
1037                    AssistantContent::Text { text } => {
1038                        if text.trim().is_empty() {
1039                            None
1040                        } else {
1041                            Some(ClaudeContentBlock::Text {
1042                                text: text.clone(),
1043                                cache_control: None,
1044                                extra: Default::default(),
1045                            })
1046                        }
1047                    }
1048                    AssistantContent::ToolCall { tool_call, .. } => {
1049                        Some(ClaudeContentBlock::ToolUse {
1050                            id: tool_call.id.clone(),
1051                            name: tool_call.name.clone(),
1052                            input: tool_call.parameters.clone(),
1053                            cache_control: None,
1054                            extra: Default::default(),
1055                        })
1056                    }
1057                    AssistantContent::Thought { thought } => {
1058                        match thought {
1059                            ThoughtContent::Signed { text, signature } => {
1060                                Some(ClaudeContentBlock::Thinking {
1061                                    thinking: text.clone(),
1062                                    signature: signature.clone(),
1063                                    cache_control: None,
1064                                    extra: Default::default(),
1065                                })
1066                            }
1067                            ThoughtContent::Redacted { data } => {
1068                                Some(ClaudeContentBlock::RedactedThinking {
1069                                    data: data.clone(),
1070                                    cache_control: None,
1071                                    extra: Default::default(),
1072                                })
1073                            }
1074                            ThoughtContent::Simple { text } => {
1075                                // Claude doesn't have a simple thought type, convert to text
1076                                Some(ClaudeContentBlock::Text {
1077                                    text: format!("[Thought: {text}]"),
1078                                    cache_control: None,
1079                                    extra: Default::default(),
1080                                })
1081                            }
1082                        }
1083                    }
1084                })
1085                .collect();
1086
1087            if claude_blocks.is_empty() {
1088                debug!("No content blocks found: {:?}", content);
1089                Err(ApiError::InvalidRequest {
1090                    provider: "anthropic".to_string(),
1091                    details: format!(
1092                        "Assistant message ID {} resulted in no valid content blocks",
1093                        msg.id
1094                    ),
1095                })
1096            } else {
1097                let claude_blocks = ensure_thinking_first(claude_blocks);
1098                let claude_content = if claude_blocks.len() == 1 {
1099                    if let Some(ClaudeContentBlock::Text { text, .. }) = claude_blocks.first() {
1100                        ClaudeMessageContent::Text {
1101                            content: text.clone(),
1102                        }
1103                    } else {
1104                        ClaudeMessageContent::StructuredContent {
1105                            content: ClaudeStructuredContent(claude_blocks),
1106                        }
1107                    }
1108                } else {
1109                    ClaudeMessageContent::StructuredContent {
1110                        content: ClaudeStructuredContent(claude_blocks),
1111                    }
1112                };
1113
1114                Ok(ClaudeMessage {
1115                    role: ClaudeMessageRole::Assistant,
1116                    content: claude_content,
1117                    id: Some(msg.id.clone()),
1118                })
1119            }
1120        }
1121        crate::app::conversation::MessageData::Tool {
1122            tool_use_id,
1123            result,
1124            ..
1125        } => {
1126            // Convert ToolResult to Claude format
1127            // Claude expects tool results as User messages
1128            let (result_text, is_error) = if let ToolResult::Error(e) = result {
1129                (e.to_string(), Some(true))
1130            } else {
1131                // For all other variants, use llm_format
1132                let text = result.llm_format();
1133                let text = if text.trim().is_empty() {
1134                    "(No output)".to_string()
1135                } else {
1136                    text
1137                };
1138                (text, None)
1139            };
1140
1141            let claude_blocks = vec![ClaudeContentBlock::ToolResult {
1142                tool_use_id: tool_use_id.clone(),
1143                content: vec![ClaudeContentBlock::Text {
1144                    text: result_text,
1145                    cache_control: None,
1146                    extra: Default::default(),
1147                }],
1148                is_error,
1149                cache_control: None,
1150                extra: Default::default(),
1151            }];
1152
1153            Ok(ClaudeMessage {
1154                role: ClaudeMessageRole::User, // Tool results are sent as User messages in Claude
1155                content: ClaudeMessageContent::StructuredContent {
1156                    content: ClaudeStructuredContent(claude_blocks),
1157                },
1158                id: Some(msg.id.clone()),
1159            })
1160        }
1161    }
1162}
1163// Conversion functions end
1164
1165fn ensure_thinking_first(blocks: Vec<ClaudeContentBlock>) -> Vec<ClaudeContentBlock> {
1166    let mut thinking_blocks = Vec::new();
1167    let mut other_blocks = Vec::new();
1168
1169    for block in blocks {
1170        match block {
1171            ClaudeContentBlock::Thinking { .. } | ClaudeContentBlock::RedactedThinking { .. } => {
1172                thinking_blocks.push(block);
1173            }
1174            _ => other_blocks.push(block),
1175        }
1176    }
1177
1178    if thinking_blocks.is_empty() {
1179        other_blocks
1180    } else {
1181        thinking_blocks.extend(other_blocks);
1182        thinking_blocks
1183    }
1184}
1185
1186// Convert Claude's content blocks to our provider-agnostic format
1187fn convert_claude_content(claude_blocks: Vec<ClaudeContentBlock>) -> Vec<AssistantContent> {
1188    claude_blocks
1189        .into_iter()
1190        .filter_map(|block| match block {
1191            ClaudeContentBlock::Text { text, .. } => Some(AssistantContent::Text { text }),
1192            ClaudeContentBlock::ToolUse {
1193                id, name, input, ..
1194            } => Some(AssistantContent::ToolCall {
1195                tool_call: steer_tools::ToolCall {
1196                    id,
1197                    name,
1198                    parameters: input,
1199                },
1200                thought_signature: None,
1201            }),
1202            ClaudeContentBlock::ToolResult { .. } => {
1203                warn!("Unexpected ToolResult block received in Claude response content");
1204                None
1205            }
1206            ClaudeContentBlock::Thinking {
1207                thinking,
1208                signature,
1209                ..
1210            } => Some(AssistantContent::Thought {
1211                thought: ThoughtContent::Signed {
1212                    text: thinking,
1213                    signature,
1214                },
1215            }),
1216            ClaudeContentBlock::RedactedThinking { data, .. } => Some(AssistantContent::Thought {
1217                thought: ThoughtContent::Redacted { data },
1218            }),
1219            ClaudeContentBlock::Unknown => {
1220                warn!("Unknown content block received in Claude response content");
1221                None
1222            }
1223        })
1224        .collect()
1225}
1226
1227#[async_trait]
1228impl Provider for AnthropicClient {
1229    fn name(&self) -> &'static str {
1230        "anthropic"
1231    }
1232
1233    async fn complete(
1234        &self,
1235        model_id: &ModelId,
1236        messages: Vec<AppMessage>,
1237        system: Option<SystemContext>,
1238        tools: Option<Vec<ToolSchema>>,
1239        call_options: Option<ModelParameters>,
1240        token: CancellationToken,
1241    ) -> Result<CompletionResponse, ApiError> {
1242        let mut claude_messages = convert_messages(messages)?;
1243        let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1244
1245        if claude_messages.is_empty() {
1246            return Err(ApiError::InvalidRequest {
1247                provider: self.name().to_string(),
1248                details: "No messages provided".to_string(),
1249            });
1250        }
1251
1252        let last_message = claude_messages
1253            .last_mut()
1254            .ok_or_else(|| ApiError::InvalidRequest {
1255                provider: self.name().to_string(),
1256                details: "No messages provided".to_string(),
1257            })?;
1258        let cache_setting = Some(CacheControl {
1259            cache_type: "ephemeral".to_string(),
1260        });
1261
1262        let instruction_policy = match &self.auth {
1263            AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1264            AuthMode::ApiKey(_) => None,
1265        };
1266        let system_text = apply_instruction_policy(system, instruction_policy);
1267        let system_content = build_system_content(system_text, cache_setting.clone());
1268
1269        match &mut last_message.content {
1270            ClaudeMessageContent::StructuredContent { content } => {
1271                for block in &mut content.0 {
1272                    if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1273                        cache_control.clone_from(&cache_setting);
1274                    }
1275                }
1276            }
1277            ClaudeMessageContent::Text { content } => {
1278                let text_content = content.clone();
1279                last_message.content = ClaudeMessageContent::StructuredContent {
1280                    content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1281                        text: text_content,
1282                        cache_control: cache_setting,
1283                        extra: Default::default(),
1284                    }]),
1285                };
1286            }
1287        }
1288
1289        // Extract model-specific logic using ModelId
1290        let supports_thinking = call_options
1291            .as_ref()
1292            .and_then(|opts| opts.thinking_config.as_ref())
1293            .is_some_and(|tc| tc.enabled);
1294
1295        let request = if supports_thinking {
1296            // Use catalog/call options to configure thinking budget when provided
1297            let budget = call_options
1298                .as_ref()
1299                .and_then(|o| o.thinking_config)
1300                .and_then(|tc| tc.budget_tokens)
1301                .unwrap_or(4000);
1302            let thinking = Some(Thinking {
1303                thinking_type: ThinkingType::Enabled,
1304                budget_tokens: budget,
1305            });
1306            CompletionRequest {
1307                model: model_id.id.clone(), // Use the model ID string
1308                messages: claude_messages,
1309                max_tokens: call_options
1310                    .as_ref()
1311                    .and_then(|o| o.max_tokens)
1312                    .map_or(32_000, |v| v as usize),
1313                system: system_content.clone(),
1314                tools,
1315                temperature: call_options
1316                    .as_ref()
1317                    .and_then(|o| o.temperature)
1318                    .or(Some(1.0)),
1319                top_p: call_options.as_ref().and_then(|o| o.top_p),
1320                top_k: None,
1321                stream: None,
1322                thinking,
1323            }
1324        } else {
1325            CompletionRequest {
1326                model: model_id.id.clone(), // Use the model ID string
1327                messages: claude_messages,
1328                max_tokens: call_options
1329                    .as_ref()
1330                    .and_then(|o| o.max_tokens)
1331                    .map_or(8000, |v| v as usize),
1332                system: system_content,
1333                tools,
1334                temperature: call_options
1335                    .as_ref()
1336                    .and_then(|o| o.temperature)
1337                    .or(Some(0.7)),
1338                top_p: call_options.as_ref().and_then(|o| o.top_p),
1339                top_k: None,
1340                stream: None,
1341                thinking: None,
1342            }
1343        };
1344
1345        let auth_ctx = auth_header_context(model_id, RequestKind::Complete);
1346        let mut attempts = 0;
1347
1348        loop {
1349            let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1350            let url = self.request_url()?;
1351            let mut request_builder = self.http_client.post(&url).json(&request);
1352
1353            for (name, value) in auth_headers {
1354                request_builder = request_builder.header(&name, &value);
1355            }
1356
1357            if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1358                request_builder =
1359                    request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1360            }
1361
1362            let response = tokio::select! {
1363                biased;
1364                () = token.cancelled() => {
1365                    debug!(target: "claude::complete", "Cancellation token triggered before sending request.");
1366                    return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1367                }
1368                res = request_builder.send() => {
1369                    res?
1370                }
1371            };
1372
1373            if token.is_cancelled() {
1374                debug!(target: "claude::complete", "Cancellation token triggered after sending request, before status check.");
1375                return Err(ApiError::Cancelled {
1376                    provider: self.name().to_string(),
1377                });
1378            }
1379
1380            let status = response.status();
1381            if !status.is_success() {
1382                let error_text = tokio::select! {
1383                    biased;
1384                    () = token.cancelled() => {
1385                        debug!(target: "claude::complete", "Cancellation token triggered while reading error response body.");
1386                        return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1387                    }
1388                    text_res = response.text() => {
1389                        text_res?
1390                    }
1391                };
1392
1393                if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1394                    let action = self
1395                        .on_auth_error(status.as_u16(), &error_text, RequestKind::Complete)
1396                        .await?;
1397                    if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1398                        attempts += 1;
1399                        continue;
1400                    }
1401                    return Err(ApiError::AuthenticationFailed {
1402                        provider: self.name().to_string(),
1403                        details: error_text,
1404                    });
1405                }
1406
1407                return Err(match status.as_u16() {
1408                    401 | 403 => ApiError::AuthenticationFailed {
1409                        provider: self.name().to_string(),
1410                        details: error_text,
1411                    },
1412                    429 => ApiError::RateLimited {
1413                        provider: self.name().to_string(),
1414                        details: error_text,
1415                    },
1416                    400..=499 => ApiError::InvalidRequest {
1417                        provider: self.name().to_string(),
1418                        details: error_text,
1419                    },
1420                    500..=599 => ApiError::ServerError {
1421                        provider: self.name().to_string(),
1422                        status_code: status.as_u16(),
1423                        details: error_text,
1424                    },
1425                    _ => ApiError::Unknown {
1426                        provider: self.name().to_string(),
1427                        details: error_text,
1428                    },
1429                });
1430            }
1431
1432            let response_text = tokio::select! {
1433                biased;
1434                () = token.cancelled() => {
1435                    debug!(target: "claude::complete", "Cancellation token triggered while reading successful response body.");
1436                    return Err(ApiError::Cancelled { provider: self.name().to_string() });
1437                }
1438                text_res = response.text() => {
1439                    text_res?
1440                }
1441            };
1442
1443            let claude_completion: ClaudeCompletionResponse = serde_json::from_str(&response_text)
1444                .map_err(|e| ApiError::ResponseParsingError {
1445                    provider: self.name().to_string(),
1446                    details: format!("Error: {e}, Body: {response_text}"),
1447                })?;
1448            let completion = CompletionResponse {
1449                content: convert_claude_content(claude_completion.content),
1450            };
1451
1452            return Ok(completion);
1453        }
1454    }
1455
1456    async fn stream_complete(
1457        &self,
1458        model_id: &ModelId,
1459        messages: Vec<AppMessage>,
1460        system: Option<SystemContext>,
1461        tools: Option<Vec<ToolSchema>>,
1462        call_options: Option<ModelParameters>,
1463        token: CancellationToken,
1464    ) -> Result<CompletionStream, ApiError> {
1465        let mut claude_messages = convert_messages(messages)?;
1466        let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1467
1468        if claude_messages.is_empty() {
1469            return Err(ApiError::InvalidRequest {
1470                provider: self.name().to_string(),
1471                details: "No messages provided".to_string(),
1472            });
1473        }
1474
1475        let last_message = claude_messages
1476            .last_mut()
1477            .ok_or_else(|| ApiError::InvalidRequest {
1478                provider: self.name().to_string(),
1479                details: "No messages provided".to_string(),
1480            })?;
1481        let cache_setting = Some(CacheControl {
1482            cache_type: "ephemeral".to_string(),
1483        });
1484
1485        let instruction_policy = match &self.auth {
1486            AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1487            AuthMode::ApiKey(_) => None,
1488        };
1489        let system_text = apply_instruction_policy(system, instruction_policy);
1490        let system_content = build_system_content(system_text, cache_setting.clone());
1491
1492        match &mut last_message.content {
1493            ClaudeMessageContent::StructuredContent { content } => {
1494                for block in &mut content.0 {
1495                    if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1496                        cache_control.clone_from(&cache_setting);
1497                    }
1498                }
1499            }
1500            ClaudeMessageContent::Text { content } => {
1501                let text_content = content.clone();
1502                last_message.content = ClaudeMessageContent::StructuredContent {
1503                    content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1504                        text: text_content,
1505                        cache_control: cache_setting,
1506                        extra: Default::default(),
1507                    }]),
1508                };
1509            }
1510        }
1511
1512        let supports_thinking = call_options
1513            .as_ref()
1514            .and_then(|opts| opts.thinking_config.as_ref())
1515            .is_some_and(|tc| tc.enabled);
1516
1517        let request = if supports_thinking {
1518            let budget = call_options
1519                .as_ref()
1520                .and_then(|o| o.thinking_config)
1521                .and_then(|tc| tc.budget_tokens)
1522                .unwrap_or(4000);
1523            let thinking = Some(Thinking {
1524                thinking_type: ThinkingType::Enabled,
1525                budget_tokens: budget,
1526            });
1527            CompletionRequest {
1528                model: model_id.id.clone(),
1529                messages: claude_messages,
1530                max_tokens: call_options
1531                    .as_ref()
1532                    .and_then(|o| o.max_tokens)
1533                    .map_or(32_000, |v| v as usize),
1534                system: system_content.clone(),
1535                tools,
1536                temperature: call_options
1537                    .as_ref()
1538                    .and_then(|o| o.temperature)
1539                    .or(Some(1.0)),
1540                top_p: call_options.as_ref().and_then(|o| o.top_p),
1541                top_k: None,
1542                stream: Some(true),
1543                thinking,
1544            }
1545        } else {
1546            CompletionRequest {
1547                model: model_id.id.clone(),
1548                messages: claude_messages,
1549                max_tokens: call_options
1550                    .as_ref()
1551                    .and_then(|o| o.max_tokens)
1552                    .map_or(8000, |v| v as usize),
1553                system: system_content,
1554                tools,
1555                temperature: call_options
1556                    .as_ref()
1557                    .and_then(|o| o.temperature)
1558                    .or(Some(0.7)),
1559                top_p: call_options.as_ref().and_then(|o| o.top_p),
1560                top_k: None,
1561                stream: Some(true),
1562                thinking: None,
1563            }
1564        };
1565
1566        let auth_ctx = auth_header_context(model_id, RequestKind::Stream);
1567        let mut attempts = 0;
1568
1569        loop {
1570            let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1571            let url = self.request_url()?;
1572            let mut request_builder = self.http_client.post(&url).json(&request);
1573
1574            for (name, value) in auth_headers {
1575                request_builder = request_builder.header(&name, &value);
1576            }
1577
1578            if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1579                request_builder =
1580                    request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1581            }
1582
1583            let response = tokio::select! {
1584                biased;
1585                () = token.cancelled() => {
1586                    return Err(ApiError::Cancelled { provider: self.name().to_string() });
1587                }
1588                res = request_builder.send() => {
1589                    res?
1590                }
1591            };
1592
1593            let status = response.status();
1594            if !status.is_success() {
1595                let error_text = tokio::select! {
1596                    biased;
1597                    () = token.cancelled() => {
1598                        return Err(ApiError::Cancelled { provider: self.name().to_string() });
1599                    }
1600                    text_res = response.text() => {
1601                        text_res?
1602                    }
1603                };
1604
1605                if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1606                    let action = self
1607                        .on_auth_error(status.as_u16(), &error_text, RequestKind::Stream)
1608                        .await?;
1609                    if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1610                        attempts += 1;
1611                        continue;
1612                    }
1613                    return Err(ApiError::AuthenticationFailed {
1614                        provider: self.name().to_string(),
1615                        details: error_text,
1616                    });
1617                }
1618
1619                return Err(match status.as_u16() {
1620                    401 | 403 => ApiError::AuthenticationFailed {
1621                        provider: self.name().to_string(),
1622                        details: error_text,
1623                    },
1624                    429 => ApiError::RateLimited {
1625                        provider: self.name().to_string(),
1626                        details: error_text,
1627                    },
1628                    400..=499 => ApiError::InvalidRequest {
1629                        provider: self.name().to_string(),
1630                        details: error_text,
1631                    },
1632                    500..=599 => ApiError::ServerError {
1633                        provider: self.name().to_string(),
1634                        status_code: status.as_u16(),
1635                        details: error_text,
1636                    },
1637                    _ => ApiError::Unknown {
1638                        provider: self.name().to_string(),
1639                        details: error_text,
1640                    },
1641                });
1642            }
1643
1644            let byte_stream = response.bytes_stream();
1645            let sse_stream = parse_sse_stream(byte_stream);
1646
1647            let stream = convert_claude_stream(sse_stream, token);
1648
1649            return Ok(Box::pin(stream));
1650        }
1651    }
1652}
1653
1654fn auth_header_context(model_id: &ModelId, request_kind: RequestKind) -> AuthHeaderContext {
1655    AuthHeaderContext {
1656        model_id: Some(AuthModelId {
1657            provider_id: AuthProviderId(model_id.provider.as_str().to_string()),
1658            model_id: model_id.id.clone(),
1659        }),
1660        request_kind,
1661    }
1662}
1663
1664fn is_auth_status(status: reqwest::StatusCode) -> bool {
1665    matches!(
1666        status,
1667        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
1668    )
1669}
1670
1671fn truncate_body(body: &str) -> String {
1672    const LIMIT: usize = 512;
1673    let mut chars = body.chars();
1674    let truncated: String = chars.by_ref().take(LIMIT).collect();
1675    if chars.next().is_some() {
1676        format!("{truncated}...")
1677    } else {
1678        truncated
1679    }
1680}
1681
1682fn apply_instruction_policy(
1683    system: Option<SystemContext>,
1684    policy: Option<&InstructionPolicy>,
1685) -> Option<String> {
1686    let base = system.as_ref().and_then(|context| {
1687        let trimmed = context.prompt.trim();
1688        if trimmed.is_empty() {
1689            None
1690        } else {
1691            Some(trimmed.to_string())
1692        }
1693    });
1694
1695    let context = system
1696        .as_ref()
1697        .and_then(|context| context.render_with_prompt(base.clone()));
1698
1699    match policy {
1700        None => context,
1701        Some(InstructionPolicy::Prefix(prefix)) => {
1702            if let Some(context) = context {
1703                Some(format!("{prefix}\n{context}"))
1704            } else {
1705                Some(prefix.clone())
1706            }
1707        }
1708        Some(InstructionPolicy::DefaultIfEmpty(default)) => {
1709            if context.is_some() {
1710                context
1711            } else {
1712                Some(default.clone())
1713            }
1714        }
1715        Some(InstructionPolicy::Override(override_text)) => {
1716            let mut combined = override_text.clone();
1717            if let Some(system) = system.as_ref() {
1718                let overlay = system.prompt.trim();
1719                if !overlay.is_empty() {
1720                    combined.push_str("\n\n## Operating Mode\n");
1721                    combined.push_str(overlay);
1722                }
1723
1724                let env = system
1725                    .environment
1726                    .as_ref()
1727                    .map(|env| env.as_context())
1728                    .map(|value| value.trim().to_string())
1729                    .filter(|value| !value.is_empty());
1730                if let Some(env) = env {
1731                    combined.push_str("\n\n");
1732                    combined.push_str(&env);
1733                }
1734            }
1735            Some(combined)
1736        }
1737    }
1738}
1739
1740fn build_system_content(
1741    system: Option<String>,
1742    cache_setting: Option<CacheControl>,
1743) -> Option<System> {
1744    system.map(|text| {
1745        System::Content(vec![SystemContentBlock {
1746            content_type: "text".to_string(),
1747            text,
1748            cache_control: cache_setting,
1749        }])
1750    })
1751}
1752
1753#[derive(Debug)]
1754enum BlockState {
1755    Text {
1756        text: String,
1757    },
1758    Thinking {
1759        text: String,
1760        signature: Option<String>,
1761    },
1762    ToolUse {
1763        id: String,
1764        name: String,
1765        input: String,
1766    },
1767    Unknown,
1768}
1769
1770fn block_state_to_content(state: BlockState) -> Option<AssistantContent> {
1771    match state {
1772        BlockState::Text { text } => {
1773            if text.is_empty() {
1774                None
1775            } else {
1776                Some(AssistantContent::Text { text })
1777            }
1778        }
1779        BlockState::Thinking { text, signature } => {
1780            if text.is_empty() {
1781                None
1782            } else {
1783                let thought = if let Some(sig) = signature {
1784                    ThoughtContent::Signed {
1785                        text,
1786                        signature: sig,
1787                    }
1788                } else {
1789                    ThoughtContent::Simple { text }
1790                };
1791                Some(AssistantContent::Thought { thought })
1792            }
1793        }
1794        BlockState::ToolUse { id, name, input } => {
1795            if id.is_empty() || name.is_empty() {
1796                None
1797            } else {
1798                let parameters: serde_json::Value = serde_json::from_str(&input)
1799                    .unwrap_or(serde_json::Value::Object(Default::default()));
1800                Some(AssistantContent::ToolCall {
1801                    tool_call: ToolCall {
1802                        id,
1803                        name,
1804                        parameters,
1805                    },
1806                    thought_signature: None,
1807                })
1808            }
1809        }
1810        BlockState::Unknown => None,
1811    }
1812}
1813
1814fn convert_claude_stream(
1815    sse_stream: crate::api::sse::SseStream,
1816    token: CancellationToken,
1817) -> impl futures_core::Stream<Item = StreamChunk> + Send {
1818    async_stream::stream! {
1819        let mut block_states: std::collections::HashMap<usize, BlockState> =
1820            std::collections::HashMap::new();
1821        let mut completed_content: Vec<AssistantContent> = Vec::new();
1822
1823        tokio::pin!(sse_stream);
1824
1825        while let Some(event_result) = sse_stream.next().await {
1826            if token.is_cancelled() {
1827                yield StreamChunk::Error(StreamError::Cancelled);
1828                break;
1829            }
1830
1831            let event = match event_result {
1832                Ok(e) => e,
1833                Err(e) => {
1834                    yield StreamChunk::Error(StreamError::SseParse(e));
1835                    break;
1836                }
1837            };
1838
1839            let parsed: Result<ClaudeStreamEvent, _> = serde_json::from_str(&event.data);
1840            let stream_event = match parsed {
1841                Ok(e) => e,
1842                Err(_) => continue,
1843            };
1844
1845            match stream_event {
1846                ClaudeStreamEvent::ContentBlockStart { index, content_block } => {
1847                    match content_block.block_type.as_str() {
1848                        "text" => {
1849                            let text = content_block.text.unwrap_or_default();
1850                            if !text.is_empty() {
1851                                yield StreamChunk::TextDelta(text.clone());
1852                            }
1853                            block_states.insert(index, BlockState::Text { text });
1854                        }
1855                        "thinking" => {
1856                            block_states.insert(
1857                                index,
1858                                BlockState::Thinking {
1859                                    text: String::new(),
1860                                    signature: None,
1861                                },
1862                            );
1863                        }
1864                        "tool_use" => {
1865                            let id = content_block.id.unwrap_or_default();
1866                            let name = content_block.name.unwrap_or_default();
1867                            if !id.is_empty() && !name.is_empty() {
1868                                yield StreamChunk::ToolUseStart {
1869                                    id: id.clone(),
1870                                    name: name.clone(),
1871                                };
1872                            }
1873                            block_states.insert(
1874                                index,
1875                                BlockState::ToolUse {
1876                                    id,
1877                                    name,
1878                                    input: String::new(),
1879                                },
1880                            );
1881                        }
1882                        _ => {
1883                            block_states.insert(index, BlockState::Unknown);
1884                        }
1885                    }
1886                }
1887                ClaudeStreamEvent::ContentBlockDelta { index, delta } => match delta {
1888                    ClaudeDelta::Text { text } => {
1889                        match block_states.get_mut(&index) {
1890                            Some(BlockState::Text { text: buf }) => buf.push_str(&text),
1891                            _ => {
1892                                block_states.insert(index, BlockState::Text { text: text.clone() });
1893                            }
1894                        }
1895                        yield StreamChunk::TextDelta(text);
1896                    }
1897                    ClaudeDelta::Thinking { thinking } => {
1898                        match block_states.get_mut(&index) {
1899                            Some(BlockState::Thinking { text, .. }) => text.push_str(&thinking),
1900                            _ => {
1901                                block_states.insert(
1902                                    index,
1903                                    BlockState::Thinking {
1904                                        text: thinking.clone(),
1905                                        signature: None,
1906                                    },
1907                                );
1908                            }
1909                        }
1910                        yield StreamChunk::ThinkingDelta(thinking);
1911                    }
1912                    ClaudeDelta::Signature { signature } => {
1913                        if let Some(BlockState::Thinking { signature: sig, .. }) =
1914                            block_states.get_mut(&index)
1915                        {
1916                            *sig = Some(signature);
1917                        }
1918                    }
1919                    ClaudeDelta::InputJson { partial_json } => {
1920                        if let Some(BlockState::ToolUse { id, input, .. }) =
1921                            block_states.get_mut(&index)
1922                        {
1923                            input.push_str(&partial_json);
1924                            if !id.is_empty() {
1925                                yield StreamChunk::ToolUseInputDelta {
1926                                    id: id.clone(),
1927                                    delta: partial_json,
1928                                };
1929                            }
1930                        }
1931                    }
1932                },
1933                ClaudeStreamEvent::ContentBlockStop { index } => {
1934                    if let Some(state) = block_states.remove(&index)
1935                        && let Some(content) = block_state_to_content(state)
1936                    {
1937                        completed_content.push(content);
1938                    }
1939                    yield StreamChunk::ContentBlockStop { index };
1940                }
1941                ClaudeStreamEvent::MessageStop => {
1942                    if !block_states.is_empty() {
1943                        tracing::warn!(
1944                            target: "anthropic::stream",
1945                            "MessageStop received with {} unfinished content blocks",
1946                            block_states.len()
1947                        );
1948                    }
1949                    let content = std::mem::take(&mut completed_content);
1950                    yield StreamChunk::MessageComplete(CompletionResponse { content });
1951                    break;
1952                }
1953                ClaudeStreamEvent::Error { error } => {
1954                    yield StreamChunk::Error(StreamError::Provider {
1955                        provider: "anthropic".into(),
1956                        error_type: error.error_type,
1957                        message: error.message,
1958                    });
1959                }
1960                ClaudeStreamEvent::MessageStart { .. }
1961                | ClaudeStreamEvent::MessageDelta { .. }
1962                | ClaudeStreamEvent::Ping => {}
1963            }
1964        }
1965    }
1966}