1use async_trait::async_trait;
2use futures_util::StreamExt;
3use reqwest::{self, header};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use strum_macros::Display;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, warn};
9
10use crate::api::error::StreamError;
11use crate::api::provider::{CompletionStream, StreamChunk};
12use crate::api::sse::parse_sse_stream;
13use crate::api::{CompletionResponse, Provider, error::ApiError};
14use crate::app::SystemContext;
15use crate::app::conversation::{
16 AssistantContent, Message as AppMessage, ThoughtContent, ToolResult, UserContent,
17};
18use crate::auth::{
19 AnthropicAuth, AuthErrorAction, AuthErrorContext, AuthHeaderContext, InstructionPolicy,
20 RequestKind,
21};
22use crate::auth::{ModelId as AuthModelId, ProviderId as AuthProviderId};
23use crate::config::model::{ModelId, ModelParameters};
24use steer_tools::{InputSchema, ToolCall, ToolSchema};
25
26const API_URL: &str = "https://api.anthropic.com/v1/messages";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display)]
29pub enum ClaudeMessageRole {
30 #[serde(rename = "user")]
31 #[strum(serialize = "user")]
32 User,
33 #[serde(rename = "assistant")]
34 #[strum(serialize = "assistant")]
35 Assistant,
36 #[serde(rename = "tool")]
37 #[strum(serialize = "tool")]
38 Tool,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct ClaudeMessage {
44 pub role: ClaudeMessageRole,
45 #[serde(flatten)]
46 pub content: ClaudeMessageContent,
47 #[serde(skip_serializing)]
48 pub id: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(untagged)]
54pub enum ClaudeMessageContent {
55 Text { content: String },
57 StructuredContent { content: ClaudeStructuredContent },
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(transparent)]
64pub struct ClaudeStructuredContent(pub Vec<ClaudeContentBlock>);
65
66#[derive(Clone)]
67enum AuthMode {
68 ApiKey(String),
69 Directive(AnthropicAuth),
70}
71
72#[derive(Clone)]
73pub struct AnthropicClient {
74 http_client: reqwest::Client,
75 auth: AuthMode,
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
79#[serde(rename_all = "lowercase")]
80#[derive(Default)]
81enum ThinkingType {
82 #[default]
83 Enabled,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
87struct Thinking {
88 #[serde(rename = "type", default)]
89 thinking_type: ThinkingType,
90 budget_tokens: u32,
91}
92
93#[derive(Debug, Serialize, Clone)]
94struct SystemContentBlock {
95 #[serde(rename = "type")]
96 content_type: String,
97 text: String,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 cache_control: Option<CacheControl>,
100}
101
102#[derive(Debug, Serialize, Clone)]
103#[serde(untagged)]
104enum System {
105 Content(Vec<SystemContentBlock>),
107}
108
109#[derive(Debug, Serialize)]
110struct CompletionRequest {
111 model: String,
112 messages: Vec<ClaudeMessage>,
113 max_tokens: usize,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 system: Option<System>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 tools: Option<Vec<ClaudeTool>>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 temperature: Option<f32>,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 top_p: Option<f32>,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 top_k: Option<usize>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 stream: Option<bool>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 thinking: Option<Thinking>,
128}
129
130#[derive(Debug, Serialize, Clone)]
131struct ClaudeTool {
132 name: String,
133 description: String,
134 input_schema: InputSchema,
135}
136
137impl From<ToolSchema> for ClaudeTool {
138 fn from(tool: ToolSchema) -> Self {
139 let tool = adapt_tool_schema_for_claude(tool);
140 Self {
141 name: tool.name,
142 description: tool.description,
143 input_schema: tool.input_schema,
144 }
145 }
146}
147
148#[derive(Debug, Serialize, Deserialize, Clone)]
149struct ClaudeCompletionResponse {
150 id: String,
151 content: Vec<ClaudeContentBlock>,
152 model: String,
153 role: String,
154 #[serde(default)]
155 stop_reason: Option<String>,
156 #[serde(default)]
157 stop_sequence: Option<String>,
158 #[serde(default)]
159 usage: ClaudeUsage,
160 #[serde(flatten)]
162 extra: std::collections::HashMap<String, serde_json::Value>,
163}
164
165fn adapt_tool_schema_for_claude(tool: ToolSchema) -> ToolSchema {
166 let root_schema = tool.input_schema.as_value();
167 let sanitized = sanitize_for_claude(root_schema, root_schema);
168 ToolSchema {
169 input_schema: InputSchema::new(sanitized),
170 ..tool
171 }
172}
173
174fn decode_pointer_segment(segment: &str) -> std::borrow::Cow<'_, str> {
175 if !segment.contains('~') {
176 return std::borrow::Cow::Borrowed(segment);
177 }
178 std::borrow::Cow::Owned(segment.replace("~1", "/").replace("~0", "~"))
179}
180
181fn resolve_ref<'a>(root: &'a Value, reference: &str) -> Option<&'a Value> {
182 let path = reference.strip_prefix("#/")?;
183 let mut current = root;
184 for segment in path.split('/') {
185 let decoded = decode_pointer_segment(segment);
186 current = current.get(decoded.as_ref())?;
187 }
188 Some(current)
189}
190
191fn infer_type_from_enum(values: &[Value]) -> Option<String> {
192 let mut has_string = false;
193 let mut has_number = false;
194 let mut has_bool = false;
195 let mut has_object = false;
196 let mut has_array = false;
197
198 for value in values {
199 match value {
200 Value::String(_) => has_string = true,
201 Value::Number(_) => has_number = true,
202 Value::Bool(_) => has_bool = true,
203 Value::Object(_) => has_object = true,
204 Value::Array(_) => has_array = true,
205 Value::Null => {}
206 }
207 }
208
209 let kind_count = u8::from(has_string)
210 + u8::from(has_number)
211 + u8::from(has_bool)
212 + u8::from(has_object)
213 + u8::from(has_array);
214
215 if kind_count != 1 {
216 return None;
217 }
218
219 if has_string {
220 Some("string".to_string())
221 } else if has_number {
222 Some("number".to_string())
223 } else if has_bool {
224 Some("boolean".to_string())
225 } else if has_object {
226 Some("object".to_string())
227 } else if has_array {
228 Some("array".to_string())
229 } else {
230 None
231 }
232}
233
234fn normalize_type(value: &Value) -> Value {
235 if let Some(type_str) = value.as_str() {
236 return Value::String(type_str.to_string());
237 }
238
239 if let Some(type_array) = value.as_array()
240 && let Some(primary_type) = type_array
241 .iter()
242 .find_map(|v| if v.is_null() { None } else { v.as_str() })
243 {
244 return Value::String(primary_type.to_string());
245 }
246
247 Value::String("string".to_string())
248}
249
250fn extract_enum_values(value: &Value) -> Vec<Value> {
251 let Some(obj) = value.as_object() else {
252 return Vec::new();
253 };
254
255 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
256 return enum_values
257 .iter()
258 .filter(|v| !v.is_null())
259 .cloned()
260 .collect();
261 }
262
263 if let Some(const_value) = obj.get("const") {
264 if const_value.is_null() {
265 return Vec::new();
266 }
267 return vec![const_value.clone()];
268 }
269
270 Vec::new()
271}
272
273fn merge_property(properties: &mut serde_json::Map<String, Value>, key: &str, value: &Value) {
274 match properties.get_mut(key) {
275 None => {
276 properties.insert(key.to_string(), value.clone());
277 }
278 Some(existing) => {
279 if existing == value {
280 return;
281 }
282
283 let existing_values = extract_enum_values(existing);
284 let incoming_values = extract_enum_values(value);
285 if incoming_values.is_empty() && existing_values.is_empty() {
286 return;
287 }
288
289 let mut combined = existing_values;
290 for item in incoming_values {
291 if !combined.contains(&item) {
292 combined.push(item);
293 }
294 }
295
296 if combined.is_empty() {
297 return;
298 }
299
300 if let Some(obj) = existing.as_object_mut() {
301 obj.remove("const");
302 obj.insert("enum".to_string(), Value::Array(combined.clone()));
303 if !obj.contains_key("type")
304 && let Some(inferred) = infer_type_from_enum(&combined)
305 {
306 obj.insert("type".to_string(), Value::String(inferred));
307 }
308 }
309 }
310 }
311}
312
313fn merge_union_schemas(
314 root: &Value,
315 variants: &[Value],
316 seen_refs: &mut std::collections::HashSet<String>,
317) -> Value {
318 let mut merged_props = serde_json::Map::new();
319 let mut required_intersection: Option<std::collections::BTreeSet<String>> = None;
320 let mut enum_values: Vec<Value> = Vec::new();
321 let mut type_candidates: Vec<String> = Vec::new();
322
323 for variant in variants {
324 let sanitized = sanitize_for_claude_inner(root, variant, seen_refs);
325
326 if let Some(schema_type) = sanitized.get("type").and_then(|v| v.as_str()) {
327 type_candidates.push(schema_type.to_string());
328 }
329
330 if let Some(props) = sanitized.get("properties").and_then(|v| v.as_object()) {
331 for (key, value) in props {
332 merge_property(&mut merged_props, key, value);
333 }
334 }
335
336 if let Some(req) = sanitized.get("required").and_then(|v| v.as_array()) {
337 let req_set: std::collections::BTreeSet<String> = req
338 .iter()
339 .filter_map(|item| item.as_str().map(|s| s.to_string()))
340 .collect();
341
342 required_intersection = match required_intersection.take() {
343 None => Some(req_set),
344 Some(existing) => Some(
345 existing
346 .intersection(&req_set)
347 .cloned()
348 .collect::<std::collections::BTreeSet<String>>(),
349 ),
350 };
351 }
352
353 if let Some(values) = sanitized.get("enum").and_then(|v| v.as_array()) {
354 for value in values {
355 if value.is_null() {
356 continue;
357 }
358 if !enum_values.contains(value) {
359 enum_values.push(value.clone());
360 }
361 }
362 }
363 }
364
365 let schema_type = if !merged_props.is_empty() {
366 "object".to_string()
367 } else if let Some(inferred) = infer_type_from_enum(&enum_values) {
368 inferred
369 } else if let Some(first) = type_candidates.first() {
370 first.clone()
371 } else {
372 "string".to_string()
373 };
374
375 let mut merged = serde_json::Map::new();
376 merged.insert("type".to_string(), Value::String(schema_type));
377
378 if !merged_props.is_empty() {
379 merged.insert("properties".to_string(), Value::Object(merged_props));
380 }
381
382 if let Some(required_set) = required_intersection
383 && !required_set.is_empty()
384 {
385 merged.insert(
386 "required".to_string(),
387 Value::Array(
388 required_set
389 .into_iter()
390 .map(Value::String)
391 .collect::<Vec<_>>(),
392 ),
393 );
394 }
395
396 if !enum_values.is_empty() {
397 merged.insert("enum".to_string(), Value::Array(enum_values));
398 }
399
400 Value::Object(merged)
401}
402
403fn sanitize_for_claude(root: &Value, schema: &Value) -> Value {
404 let mut seen_refs = std::collections::HashSet::new();
405 sanitize_for_claude_inner(root, schema, &mut seen_refs)
406}
407
408fn fallback_schema() -> Value {
409 let mut out = serde_json::Map::new();
410 out.insert("type".to_string(), Value::String("object".to_string()));
411 out.insert(
412 "properties".to_string(),
413 Value::Object(serde_json::Map::new()),
414 );
415 Value::Object(out)
416}
417
418fn sanitize_for_claude_inner(
419 root: &Value,
420 schema: &Value,
421 seen_refs: &mut std::collections::HashSet<String>,
422) -> Value {
423 if let Some(reference) = schema.get("$ref").and_then(|v| v.as_str()) {
424 if !seen_refs.insert(reference.to_string()) {
425 return fallback_schema();
426 }
427 if let Some(resolved) = resolve_ref(root, reference) {
428 let sanitized = sanitize_for_claude_inner(root, resolved, seen_refs);
429 seen_refs.remove(reference);
430 return sanitized;
431 }
432 seen_refs.remove(reference);
433 }
434
435 let Some(obj) = schema.as_object() else {
436 return schema.clone();
437 };
438
439 if let Some(union) = obj
440 .get("oneOf")
441 .or_else(|| obj.get("anyOf"))
442 .or_else(|| obj.get("allOf"))
443 .and_then(|v| v.as_array())
444 {
445 return merge_union_schemas(root, union, seen_refs);
446 }
447
448 let mut out = serde_json::Map::new();
449 for (key, value) in obj {
450 match key.as_str() {
451 "$ref"
452 | "$defs"
453 | "oneOf"
454 | "anyOf"
455 | "allOf"
456 | "const"
457 | "additionalProperties"
458 | "default"
459 | "examples"
460 | "title"
461 | "pattern"
462 | "minLength"
463 | "maxLength"
464 | "minimum"
465 | "maximum"
466 | "minItems"
467 | "maxItems"
468 | "uniqueItems"
469 | "deprecated" => {}
470 "type" => {
471 out.insert("type".to_string(), normalize_type(value));
472 }
473 "properties" => {
474 if let Some(props) = value.as_object() {
475 let mut sanitized_props = serde_json::Map::new();
476 for (prop_key, prop_value) in props {
477 sanitized_props.insert(
478 prop_key.clone(),
479 sanitize_for_claude_inner(root, prop_value, seen_refs),
480 );
481 }
482 out.insert("properties".to_string(), Value::Object(sanitized_props));
483 }
484 }
485 "items" => {
486 if let Some(items) = value.as_array() {
487 let merged = merge_union_schemas(root, items, seen_refs);
488 out.insert("items".to_string(), merged);
489 } else {
490 out.insert(
491 "items".to_string(),
492 sanitize_for_claude_inner(root, value, seen_refs),
493 );
494 }
495 }
496 "enum" => {
497 let values = value
498 .as_array()
499 .map(|items| {
500 items
501 .iter()
502 .filter(|v| !v.is_null())
503 .cloned()
504 .collect::<Vec<_>>()
505 })
506 .unwrap_or_default();
507 out.insert("enum".to_string(), Value::Array(values));
508 }
509 _ => {
510 out.insert(
511 key.clone(),
512 sanitize_for_claude_inner(root, value, seen_refs),
513 );
514 }
515 }
516 }
517
518 if let Some(const_value) = obj.get("const")
519 && !const_value.is_null()
520 {
521 out.insert("enum".to_string(), Value::Array(vec![const_value.clone()]));
522 if !out.contains_key("type")
523 && let Some(inferred) = infer_type_from_enum(std::slice::from_ref(const_value))
524 {
525 out.insert("type".to_string(), Value::String(inferred));
526 }
527 }
528
529 if out.get("type") == Some(&Value::String("object".to_string()))
530 && !out.contains_key("properties")
531 {
532 out.insert(
533 "properties".to_string(),
534 Value::Object(serde_json::Map::new()),
535 );
536 }
537
538 if !out.contains_key("type") {
539 if out.contains_key("properties") {
540 out.insert("type".to_string(), Value::String("object".to_string()));
541 } else if out.contains_key("items") {
542 out.insert("type".to_string(), Value::String("array".to_string()));
543 } else if let Some(enum_values) = out.get("enum").and_then(|v| v.as_array())
544 && let Some(inferred) = infer_type_from_enum(enum_values)
545 {
546 out.insert("type".to_string(), Value::String(inferred));
547 }
548 }
549
550 Value::Object(out)
551}
552
553fn default_cache_type() -> String {
554 "ephemeral".to_string()
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use serde_json::json;
561
562 #[test]
563 fn sanitize_handles_recursive_ref() {
564 let schema = json!({
565 "$defs": {
566 "node": {
567 "type": "object",
568 "properties": {
569 "next": { "$ref": "#/$defs/node" }
570 }
571 }
572 },
573 "$ref": "#/$defs/node"
574 });
575
576 let sanitized = sanitize_for_claude(&schema, &schema);
577 let next = sanitized
578 .get("properties")
579 .and_then(|v| v.get("next"))
580 .and_then(|v| v.get("type"))
581 .and_then(|v| v.as_str());
582
583 assert_eq!(next, Some("object"));
584 }
585
586 #[test]
587 fn sanitize_collapses_tuple_items() {
588 let schema = json!({
589 "type": "array",
590 "items": [
591 { "type": "string" },
592 { "type": "number" }
593 ]
594 });
595
596 let sanitized = sanitize_for_claude(&schema, &schema);
597 let items = sanitized.get("items");
598
599 assert!(matches!(items, Some(Value::Object(_))));
600 }
601
602 #[test]
603 fn sanitize_removes_unsupported_keywords() {
604 let schema = json!({
605 "type": "object",
606 "title": "ignored",
607 "additionalProperties": false,
608 "properties": {
609 "name": {
610 "type": "string",
611 "pattern": "^[a-z]+$",
612 "default": "x"
613 }
614 }
615 });
616
617 let sanitized = sanitize_for_claude(&schema, &schema);
618 let expected = json!({
619 "type": "object",
620 "properties": {
621 "name": { "type": "string" }
622 }
623 });
624
625 assert_eq!(sanitized, expected);
626 }
627
628 #[test]
629 fn sanitize_converts_const_to_enum_with_type() {
630 let schema = json!({
631 "const": "fixed"
632 });
633
634 let sanitized = sanitize_for_claude(&schema, &schema);
635 let expected = json!({
636 "enum": ["fixed"],
637 "type": "string"
638 });
639
640 assert_eq!(sanitized, expected);
641 }
642
643 #[test]
644 fn sanitize_filters_null_enum_values() {
645 let schema = json!({
646 "enum": ["a", null, "b"]
647 });
648
649 let sanitized = sanitize_for_claude(&schema, &schema);
650 let expected = json!({
651 "enum": ["a", "b"],
652 "type": "string"
653 });
654
655 assert_eq!(sanitized, expected);
656 }
657
658 #[test]
659 fn sanitize_decodes_json_pointer_refs() {
660 let schema = json!({
661 "$defs": {
662 "a/b": { "type": "string" }
663 },
664 "$ref": "#/$defs/a~1b"
665 });
666
667 let sanitized = sanitize_for_claude(&schema, &schema);
668 let expected = json!({
669 "type": "string"
670 });
671
672 assert_eq!(sanitized, expected);
673 }
674
675 #[test]
676 fn sanitize_merges_union_properties_and_required() {
677 let schema = json!({
678 "oneOf": [
679 {
680 "type": "object",
681 "properties": {
682 "a": { "type": "string" },
683 "b": { "type": "string" }
684 },
685 "required": ["a"]
686 },
687 {
688 "type": "object",
689 "properties": {
690 "a": { "type": "string" },
691 "c": { "type": "string" }
692 },
693 "required": ["a", "c"]
694 }
695 ]
696 });
697
698 let sanitized = sanitize_for_claude(&schema, &schema);
699 let expected = json!({
700 "type": "object",
701 "properties": {
702 "a": { "type": "string" },
703 "b": { "type": "string" },
704 "c": { "type": "string" }
705 },
706 "required": ["a"]
707 });
708
709 assert_eq!(sanitized, expected);
710 }
711
712 #[test]
713 fn sanitize_infers_array_type_from_items() {
714 let schema = json!({
715 "items": {
716 "type": "string"
717 }
718 });
719
720 let sanitized = sanitize_for_claude(&schema, &schema);
721 let expected = json!({
722 "type": "array",
723 "items": { "type": "string" }
724 });
725
726 assert_eq!(sanitized, expected);
727 }
728}
729
730#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
731pub struct CacheControl {
732 #[serde(rename = "type", default = "default_cache_type")]
733 cache_type: String,
734}
735
736#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
737#[serde(tag = "type")]
738pub enum ClaudeContentBlock {
739 #[serde(rename = "text")]
740 Text {
741 text: String,
742 #[serde(skip_serializing_if = "Option::is_none")]
743 cache_control: Option<CacheControl>,
744 #[serde(flatten)]
745 extra: std::collections::HashMap<String, serde_json::Value>,
746 },
747 #[serde(rename = "tool_use")]
748 ToolUse {
749 id: String,
750 name: String,
751 input: serde_json::Value,
752 #[serde(skip_serializing_if = "Option::is_none")]
753 cache_control: Option<CacheControl>,
754 #[serde(flatten)]
755 extra: std::collections::HashMap<String, serde_json::Value>,
756 },
757 #[serde(rename = "tool_result")]
758 ToolResult {
759 tool_use_id: String,
760 content: Vec<ClaudeContentBlock>,
761 #[serde(skip_serializing_if = "Option::is_none")]
762 cache_control: Option<CacheControl>,
763 #[serde(skip_serializing_if = "Option::is_none")]
764 is_error: Option<bool>,
765 #[serde(flatten)]
766 extra: std::collections::HashMap<String, serde_json::Value>,
767 },
768 #[serde(rename = "thinking")]
769 Thinking {
770 thinking: String,
771 signature: String,
772 #[serde(skip_serializing_if = "Option::is_none")]
773 cache_control: Option<CacheControl>,
774 #[serde(flatten)]
775 extra: std::collections::HashMap<String, serde_json::Value>,
776 },
777 #[serde(rename = "redacted_thinking")]
778 RedactedThinking {
779 data: String,
780 #[serde(skip_serializing_if = "Option::is_none")]
781 cache_control: Option<CacheControl>,
782 #[serde(flatten)]
783 extra: std::collections::HashMap<String, serde_json::Value>,
784 },
785 #[serde(other)]
786 Unknown,
787}
788
789#[derive(Debug, Serialize, Deserialize, Default, Clone)]
790struct ClaudeUsage {
791 #[serde(rename = "input_tokens")]
792 input: usize,
793 #[serde(rename = "output_tokens")]
794 output: usize,
795 #[serde(rename = "cache_creation_input_tokens")]
796 #[serde(skip_serializing_if = "Option::is_none")]
797 cache_creation_input: Option<usize>,
798 #[serde(rename = "cache_read_input_tokens")]
799 #[serde(skip_serializing_if = "Option::is_none")]
800 cache_read_input: Option<usize>,
801}
802
803#[derive(Debug, Deserialize)]
804#[serde(tag = "type")]
805enum ClaudeStreamEvent {
806 #[serde(rename = "message_start")]
807 MessageStart {
808 #[expect(dead_code)]
809 message: ClaudeMessageStart,
810 },
811 #[serde(rename = "content_block_start")]
812 ContentBlockStart {
813 index: usize,
814 content_block: ClaudeContentBlockStart,
815 },
816 #[serde(rename = "content_block_delta")]
817 ContentBlockDelta { index: usize, delta: ClaudeDelta },
818 #[serde(rename = "content_block_stop")]
819 ContentBlockStop { index: usize },
820 #[serde(rename = "message_delta")]
821 MessageDelta {
822 #[expect(dead_code)]
823 delta: ClaudeMessageDeltaData,
824 #[expect(dead_code)]
825 #[serde(default)]
826 usage: Option<ClaudeUsage>,
827 },
828 #[serde(rename = "message_stop")]
829 MessageStop,
830 #[serde(rename = "ping")]
831 Ping,
832 #[serde(rename = "error")]
833 Error { error: ClaudeStreamError },
834}
835
836#[derive(Debug, Deserialize)]
837struct ClaudeMessageStart {
838 #[expect(dead_code)]
839 #[serde(default)]
840 id: String,
841 #[expect(dead_code)]
842 #[serde(default)]
843 model: String,
844}
845
846#[derive(Debug, Deserialize)]
847struct ClaudeContentBlockStart {
848 #[serde(rename = "type")]
849 block_type: String,
850 #[serde(default)]
851 id: Option<String>,
852 #[serde(default)]
853 name: Option<String>,
854 #[serde(default)]
855 text: Option<String>,
856}
857
858#[derive(Debug, Deserialize)]
859#[serde(tag = "type")]
860enum ClaudeDelta {
861 #[serde(rename = "text_delta")]
862 Text { text: String },
863 #[serde(rename = "thinking_delta")]
864 Thinking { thinking: String },
865 #[serde(rename = "input_json_delta")]
866 InputJson { partial_json: String },
867 #[serde(rename = "signature_delta")]
868 Signature { signature: String },
869}
870
871#[derive(Debug, Deserialize)]
872struct ClaudeMessageDeltaData {
873 #[expect(dead_code)]
874 #[serde(default)]
875 stop_reason: Option<String>,
876}
877
878#[derive(Debug, Deserialize)]
879struct ClaudeStreamError {
880 #[serde(default)]
881 message: String,
882 #[serde(rename = "type", default)]
883 error_type: String,
884}
885
886impl AnthropicClient {
887 pub fn new(api_key: &str) -> Result<Self, ApiError> {
888 Self::with_api_key(api_key)
889 }
890
891 pub fn with_api_key(api_key: &str) -> Result<Self, ApiError> {
892 Ok(Self {
893 http_client: Self::build_http_client()?,
894 auth: AuthMode::ApiKey(api_key.to_string()),
895 })
896 }
897
898 pub fn with_directive(directive: AnthropicAuth) -> Result<Self, ApiError> {
899 Ok(Self {
900 http_client: Self::build_http_client()?,
901 auth: AuthMode::Directive(directive),
902 })
903 }
904
905 fn build_http_client() -> Result<reqwest::Client, ApiError> {
906 let mut headers = header::HeaderMap::new();
907 headers.insert(
908 "anthropic-version",
909 header::HeaderValue::from_static("2023-06-01"),
910 );
911 headers.insert(
912 header::CONTENT_TYPE,
913 header::HeaderValue::from_static("application/json"),
914 );
915
916 reqwest::Client::builder()
917 .default_headers(headers)
918 .build()
919 .map_err(ApiError::Network)
920 }
921
922 async fn auth_headers(
923 &self,
924 ctx: AuthHeaderContext,
925 ) -> Result<Vec<(String, String)>, ApiError> {
926 match &self.auth {
927 AuthMode::ApiKey(key) => Ok(vec![("x-api-key".to_string(), key.clone())]),
928 AuthMode::Directive(directive) => {
929 let header_pairs = directive
930 .headers
931 .headers(ctx)
932 .await
933 .map_err(|e| ApiError::AuthError(e.to_string()))?;
934 Ok(header_pairs
935 .into_iter()
936 .map(|pair| (pair.name, pair.value))
937 .collect())
938 }
939 }
940 }
941
942 async fn on_auth_error(
943 &self,
944 status: u16,
945 body: &str,
946 request_kind: RequestKind,
947 ) -> Result<AuthErrorAction, ApiError> {
948 let AuthMode::Directive(directive) = &self.auth else {
949 return Ok(AuthErrorAction::NoAction);
950 };
951 let context = AuthErrorContext {
952 status: Some(status),
953 body_snippet: Some(truncate_body(body)),
954 request_kind,
955 };
956 directive
957 .headers
958 .on_auth_error(context)
959 .await
960 .map_err(|e| ApiError::AuthError(e.to_string()))
961 }
962
963 fn request_url(&self) -> Result<String, ApiError> {
964 let AuthMode::Directive(directive) = &self.auth else {
965 return Ok(API_URL.to_string());
966 };
967
968 let Some(query_params) = &directive.query_params else {
969 return Ok(API_URL.to_string());
970 };
971
972 if query_params.is_empty() {
973 return Ok(API_URL.to_string());
974 }
975
976 let mut url = url::Url::parse(API_URL)
977 .map_err(|e| ApiError::Configuration(format!("Invalid API_URL '{API_URL}': {e}")))?;
978 for param in query_params {
979 url.query_pairs_mut().append_pair(¶m.name, ¶m.value);
980 }
981 Ok(url.to_string())
982 }
983}
984
985fn convert_messages(messages: Vec<AppMessage>) -> Result<Vec<ClaudeMessage>, ApiError> {
987 let claude_messages: Result<Vec<ClaudeMessage>, ApiError> =
988 messages.into_iter().map(convert_single_message).collect();
989
990 claude_messages.map(|messages| {
992 messages
993 .into_iter()
994 .filter(|msg| {
995 match &msg.content {
996 ClaudeMessageContent::Text { content } => !content.trim().is_empty(),
997 ClaudeMessageContent::StructuredContent { .. } => true, }
999 })
1000 .collect()
1001 })
1002}
1003
1004fn convert_single_message(msg: AppMessage) -> Result<ClaudeMessage, ApiError> {
1005 match &msg.data {
1006 crate::app::conversation::MessageData::User { content, .. } => {
1007 let combined_text = content
1009 .iter()
1010 .map(|user_content| match user_content {
1011 UserContent::Text { text } => text.clone(),
1012 UserContent::CommandExecution {
1013 command,
1014 stdout,
1015 stderr,
1016 exit_code,
1017 } => UserContent::format_command_execution_as_xml(
1018 command, stdout, stderr, *exit_code,
1019 ),
1020 })
1021 .collect::<Vec<_>>()
1022 .join("\n");
1023
1024 Ok(ClaudeMessage {
1025 role: ClaudeMessageRole::User,
1026 content: ClaudeMessageContent::Text {
1027 content: combined_text,
1028 },
1029 id: Some(msg.id.clone()),
1030 })
1031 }
1032 crate::app::conversation::MessageData::Assistant { content, .. } => {
1033 let claude_blocks: Vec<ClaudeContentBlock> = content
1035 .iter()
1036 .filter_map(|assistant_content| match assistant_content {
1037 AssistantContent::Text { text } => {
1038 if text.trim().is_empty() {
1039 None
1040 } else {
1041 Some(ClaudeContentBlock::Text {
1042 text: text.clone(),
1043 cache_control: None,
1044 extra: Default::default(),
1045 })
1046 }
1047 }
1048 AssistantContent::ToolCall { tool_call, .. } => {
1049 Some(ClaudeContentBlock::ToolUse {
1050 id: tool_call.id.clone(),
1051 name: tool_call.name.clone(),
1052 input: tool_call.parameters.clone(),
1053 cache_control: None,
1054 extra: Default::default(),
1055 })
1056 }
1057 AssistantContent::Thought { thought } => {
1058 match thought {
1059 ThoughtContent::Signed { text, signature } => {
1060 Some(ClaudeContentBlock::Thinking {
1061 thinking: text.clone(),
1062 signature: signature.clone(),
1063 cache_control: None,
1064 extra: Default::default(),
1065 })
1066 }
1067 ThoughtContent::Redacted { data } => {
1068 Some(ClaudeContentBlock::RedactedThinking {
1069 data: data.clone(),
1070 cache_control: None,
1071 extra: Default::default(),
1072 })
1073 }
1074 ThoughtContent::Simple { text } => {
1075 Some(ClaudeContentBlock::Text {
1077 text: format!("[Thought: {text}]"),
1078 cache_control: None,
1079 extra: Default::default(),
1080 })
1081 }
1082 }
1083 }
1084 })
1085 .collect();
1086
1087 if claude_blocks.is_empty() {
1088 debug!("No content blocks found: {:?}", content);
1089 Err(ApiError::InvalidRequest {
1090 provider: "anthropic".to_string(),
1091 details: format!(
1092 "Assistant message ID {} resulted in no valid content blocks",
1093 msg.id
1094 ),
1095 })
1096 } else {
1097 let claude_blocks = ensure_thinking_first(claude_blocks);
1098 let claude_content = if claude_blocks.len() == 1 {
1099 if let Some(ClaudeContentBlock::Text { text, .. }) = claude_blocks.first() {
1100 ClaudeMessageContent::Text {
1101 content: text.clone(),
1102 }
1103 } else {
1104 ClaudeMessageContent::StructuredContent {
1105 content: ClaudeStructuredContent(claude_blocks),
1106 }
1107 }
1108 } else {
1109 ClaudeMessageContent::StructuredContent {
1110 content: ClaudeStructuredContent(claude_blocks),
1111 }
1112 };
1113
1114 Ok(ClaudeMessage {
1115 role: ClaudeMessageRole::Assistant,
1116 content: claude_content,
1117 id: Some(msg.id.clone()),
1118 })
1119 }
1120 }
1121 crate::app::conversation::MessageData::Tool {
1122 tool_use_id,
1123 result,
1124 ..
1125 } => {
1126 let (result_text, is_error) = if let ToolResult::Error(e) = result {
1129 (e.to_string(), Some(true))
1130 } else {
1131 let text = result.llm_format();
1133 let text = if text.trim().is_empty() {
1134 "(No output)".to_string()
1135 } else {
1136 text
1137 };
1138 (text, None)
1139 };
1140
1141 let claude_blocks = vec![ClaudeContentBlock::ToolResult {
1142 tool_use_id: tool_use_id.clone(),
1143 content: vec![ClaudeContentBlock::Text {
1144 text: result_text,
1145 cache_control: None,
1146 extra: Default::default(),
1147 }],
1148 is_error,
1149 cache_control: None,
1150 extra: Default::default(),
1151 }];
1152
1153 Ok(ClaudeMessage {
1154 role: ClaudeMessageRole::User, content: ClaudeMessageContent::StructuredContent {
1156 content: ClaudeStructuredContent(claude_blocks),
1157 },
1158 id: Some(msg.id.clone()),
1159 })
1160 }
1161 }
1162}
1163fn ensure_thinking_first(blocks: Vec<ClaudeContentBlock>) -> Vec<ClaudeContentBlock> {
1166 let mut thinking_blocks = Vec::new();
1167 let mut other_blocks = Vec::new();
1168
1169 for block in blocks {
1170 match block {
1171 ClaudeContentBlock::Thinking { .. } | ClaudeContentBlock::RedactedThinking { .. } => {
1172 thinking_blocks.push(block);
1173 }
1174 _ => other_blocks.push(block),
1175 }
1176 }
1177
1178 if thinking_blocks.is_empty() {
1179 other_blocks
1180 } else {
1181 thinking_blocks.extend(other_blocks);
1182 thinking_blocks
1183 }
1184}
1185
1186fn convert_claude_content(claude_blocks: Vec<ClaudeContentBlock>) -> Vec<AssistantContent> {
1188 claude_blocks
1189 .into_iter()
1190 .filter_map(|block| match block {
1191 ClaudeContentBlock::Text { text, .. } => Some(AssistantContent::Text { text }),
1192 ClaudeContentBlock::ToolUse {
1193 id, name, input, ..
1194 } => Some(AssistantContent::ToolCall {
1195 tool_call: steer_tools::ToolCall {
1196 id,
1197 name,
1198 parameters: input,
1199 },
1200 thought_signature: None,
1201 }),
1202 ClaudeContentBlock::ToolResult { .. } => {
1203 warn!("Unexpected ToolResult block received in Claude response content");
1204 None
1205 }
1206 ClaudeContentBlock::Thinking {
1207 thinking,
1208 signature,
1209 ..
1210 } => Some(AssistantContent::Thought {
1211 thought: ThoughtContent::Signed {
1212 text: thinking,
1213 signature,
1214 },
1215 }),
1216 ClaudeContentBlock::RedactedThinking { data, .. } => Some(AssistantContent::Thought {
1217 thought: ThoughtContent::Redacted { data },
1218 }),
1219 ClaudeContentBlock::Unknown => {
1220 warn!("Unknown content block received in Claude response content");
1221 None
1222 }
1223 })
1224 .collect()
1225}
1226
1227#[async_trait]
1228impl Provider for AnthropicClient {
1229 fn name(&self) -> &'static str {
1230 "anthropic"
1231 }
1232
1233 async fn complete(
1234 &self,
1235 model_id: &ModelId,
1236 messages: Vec<AppMessage>,
1237 system: Option<SystemContext>,
1238 tools: Option<Vec<ToolSchema>>,
1239 call_options: Option<ModelParameters>,
1240 token: CancellationToken,
1241 ) -> Result<CompletionResponse, ApiError> {
1242 let mut claude_messages = convert_messages(messages)?;
1243 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1244
1245 if claude_messages.is_empty() {
1246 return Err(ApiError::InvalidRequest {
1247 provider: self.name().to_string(),
1248 details: "No messages provided".to_string(),
1249 });
1250 }
1251
1252 let last_message = claude_messages
1253 .last_mut()
1254 .ok_or_else(|| ApiError::InvalidRequest {
1255 provider: self.name().to_string(),
1256 details: "No messages provided".to_string(),
1257 })?;
1258 let cache_setting = Some(CacheControl {
1259 cache_type: "ephemeral".to_string(),
1260 });
1261
1262 let instruction_policy = match &self.auth {
1263 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1264 AuthMode::ApiKey(_) => None,
1265 };
1266 let system_text = apply_instruction_policy(system, instruction_policy);
1267 let system_content = build_system_content(system_text, cache_setting.clone());
1268
1269 match &mut last_message.content {
1270 ClaudeMessageContent::StructuredContent { content } => {
1271 for block in &mut content.0 {
1272 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1273 cache_control.clone_from(&cache_setting);
1274 }
1275 }
1276 }
1277 ClaudeMessageContent::Text { content } => {
1278 let text_content = content.clone();
1279 last_message.content = ClaudeMessageContent::StructuredContent {
1280 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1281 text: text_content,
1282 cache_control: cache_setting,
1283 extra: Default::default(),
1284 }]),
1285 };
1286 }
1287 }
1288
1289 let supports_thinking = call_options
1291 .as_ref()
1292 .and_then(|opts| opts.thinking_config.as_ref())
1293 .is_some_and(|tc| tc.enabled);
1294
1295 let request = if supports_thinking {
1296 let budget = call_options
1298 .as_ref()
1299 .and_then(|o| o.thinking_config)
1300 .and_then(|tc| tc.budget_tokens)
1301 .unwrap_or(4000);
1302 let thinking = Some(Thinking {
1303 thinking_type: ThinkingType::Enabled,
1304 budget_tokens: budget,
1305 });
1306 CompletionRequest {
1307 model: model_id.id.clone(), messages: claude_messages,
1309 max_tokens: call_options
1310 .as_ref()
1311 .and_then(|o| o.max_tokens)
1312 .map_or(32_000, |v| v as usize),
1313 system: system_content.clone(),
1314 tools,
1315 temperature: call_options
1316 .as_ref()
1317 .and_then(|o| o.temperature)
1318 .or(Some(1.0)),
1319 top_p: call_options.as_ref().and_then(|o| o.top_p),
1320 top_k: None,
1321 stream: None,
1322 thinking,
1323 }
1324 } else {
1325 CompletionRequest {
1326 model: model_id.id.clone(), messages: claude_messages,
1328 max_tokens: call_options
1329 .as_ref()
1330 .and_then(|o| o.max_tokens)
1331 .map_or(8000, |v| v as usize),
1332 system: system_content,
1333 tools,
1334 temperature: call_options
1335 .as_ref()
1336 .and_then(|o| o.temperature)
1337 .or(Some(0.7)),
1338 top_p: call_options.as_ref().and_then(|o| o.top_p),
1339 top_k: None,
1340 stream: None,
1341 thinking: None,
1342 }
1343 };
1344
1345 let auth_ctx = auth_header_context(model_id, RequestKind::Complete);
1346 let mut attempts = 0;
1347
1348 loop {
1349 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1350 let url = self.request_url()?;
1351 let mut request_builder = self.http_client.post(&url).json(&request);
1352
1353 for (name, value) in auth_headers {
1354 request_builder = request_builder.header(&name, &value);
1355 }
1356
1357 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1358 request_builder =
1359 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1360 }
1361
1362 let response = tokio::select! {
1363 biased;
1364 () = token.cancelled() => {
1365 debug!(target: "claude::complete", "Cancellation token triggered before sending request.");
1366 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1367 }
1368 res = request_builder.send() => {
1369 res?
1370 }
1371 };
1372
1373 if token.is_cancelled() {
1374 debug!(target: "claude::complete", "Cancellation token triggered after sending request, before status check.");
1375 return Err(ApiError::Cancelled {
1376 provider: self.name().to_string(),
1377 });
1378 }
1379
1380 let status = response.status();
1381 if !status.is_success() {
1382 let error_text = tokio::select! {
1383 biased;
1384 () = token.cancelled() => {
1385 debug!(target: "claude::complete", "Cancellation token triggered while reading error response body.");
1386 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1387 }
1388 text_res = response.text() => {
1389 text_res?
1390 }
1391 };
1392
1393 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1394 let action = self
1395 .on_auth_error(status.as_u16(), &error_text, RequestKind::Complete)
1396 .await?;
1397 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1398 attempts += 1;
1399 continue;
1400 }
1401 return Err(ApiError::AuthenticationFailed {
1402 provider: self.name().to_string(),
1403 details: error_text,
1404 });
1405 }
1406
1407 return Err(match status.as_u16() {
1408 401 | 403 => ApiError::AuthenticationFailed {
1409 provider: self.name().to_string(),
1410 details: error_text,
1411 },
1412 429 => ApiError::RateLimited {
1413 provider: self.name().to_string(),
1414 details: error_text,
1415 },
1416 400..=499 => ApiError::InvalidRequest {
1417 provider: self.name().to_string(),
1418 details: error_text,
1419 },
1420 500..=599 => ApiError::ServerError {
1421 provider: self.name().to_string(),
1422 status_code: status.as_u16(),
1423 details: error_text,
1424 },
1425 _ => ApiError::Unknown {
1426 provider: self.name().to_string(),
1427 details: error_text,
1428 },
1429 });
1430 }
1431
1432 let response_text = tokio::select! {
1433 biased;
1434 () = token.cancelled() => {
1435 debug!(target: "claude::complete", "Cancellation token triggered while reading successful response body.");
1436 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1437 }
1438 text_res = response.text() => {
1439 text_res?
1440 }
1441 };
1442
1443 let claude_completion: ClaudeCompletionResponse = serde_json::from_str(&response_text)
1444 .map_err(|e| ApiError::ResponseParsingError {
1445 provider: self.name().to_string(),
1446 details: format!("Error: {e}, Body: {response_text}"),
1447 })?;
1448 let completion = CompletionResponse {
1449 content: convert_claude_content(claude_completion.content),
1450 };
1451
1452 return Ok(completion);
1453 }
1454 }
1455
1456 async fn stream_complete(
1457 &self,
1458 model_id: &ModelId,
1459 messages: Vec<AppMessage>,
1460 system: Option<SystemContext>,
1461 tools: Option<Vec<ToolSchema>>,
1462 call_options: Option<ModelParameters>,
1463 token: CancellationToken,
1464 ) -> Result<CompletionStream, ApiError> {
1465 let mut claude_messages = convert_messages(messages)?;
1466 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1467
1468 if claude_messages.is_empty() {
1469 return Err(ApiError::InvalidRequest {
1470 provider: self.name().to_string(),
1471 details: "No messages provided".to_string(),
1472 });
1473 }
1474
1475 let last_message = claude_messages
1476 .last_mut()
1477 .ok_or_else(|| ApiError::InvalidRequest {
1478 provider: self.name().to_string(),
1479 details: "No messages provided".to_string(),
1480 })?;
1481 let cache_setting = Some(CacheControl {
1482 cache_type: "ephemeral".to_string(),
1483 });
1484
1485 let instruction_policy = match &self.auth {
1486 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1487 AuthMode::ApiKey(_) => None,
1488 };
1489 let system_text = apply_instruction_policy(system, instruction_policy);
1490 let system_content = build_system_content(system_text, cache_setting.clone());
1491
1492 match &mut last_message.content {
1493 ClaudeMessageContent::StructuredContent { content } => {
1494 for block in &mut content.0 {
1495 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1496 cache_control.clone_from(&cache_setting);
1497 }
1498 }
1499 }
1500 ClaudeMessageContent::Text { content } => {
1501 let text_content = content.clone();
1502 last_message.content = ClaudeMessageContent::StructuredContent {
1503 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1504 text: text_content,
1505 cache_control: cache_setting,
1506 extra: Default::default(),
1507 }]),
1508 };
1509 }
1510 }
1511
1512 let supports_thinking = call_options
1513 .as_ref()
1514 .and_then(|opts| opts.thinking_config.as_ref())
1515 .is_some_and(|tc| tc.enabled);
1516
1517 let request = if supports_thinking {
1518 let budget = call_options
1519 .as_ref()
1520 .and_then(|o| o.thinking_config)
1521 .and_then(|tc| tc.budget_tokens)
1522 .unwrap_or(4000);
1523 let thinking = Some(Thinking {
1524 thinking_type: ThinkingType::Enabled,
1525 budget_tokens: budget,
1526 });
1527 CompletionRequest {
1528 model: model_id.id.clone(),
1529 messages: claude_messages,
1530 max_tokens: call_options
1531 .as_ref()
1532 .and_then(|o| o.max_tokens)
1533 .map_or(32_000, |v| v as usize),
1534 system: system_content.clone(),
1535 tools,
1536 temperature: call_options
1537 .as_ref()
1538 .and_then(|o| o.temperature)
1539 .or(Some(1.0)),
1540 top_p: call_options.as_ref().and_then(|o| o.top_p),
1541 top_k: None,
1542 stream: Some(true),
1543 thinking,
1544 }
1545 } else {
1546 CompletionRequest {
1547 model: model_id.id.clone(),
1548 messages: claude_messages,
1549 max_tokens: call_options
1550 .as_ref()
1551 .and_then(|o| o.max_tokens)
1552 .map_or(8000, |v| v as usize),
1553 system: system_content,
1554 tools,
1555 temperature: call_options
1556 .as_ref()
1557 .and_then(|o| o.temperature)
1558 .or(Some(0.7)),
1559 top_p: call_options.as_ref().and_then(|o| o.top_p),
1560 top_k: None,
1561 stream: Some(true),
1562 thinking: None,
1563 }
1564 };
1565
1566 let auth_ctx = auth_header_context(model_id, RequestKind::Stream);
1567 let mut attempts = 0;
1568
1569 loop {
1570 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1571 let url = self.request_url()?;
1572 let mut request_builder = self.http_client.post(&url).json(&request);
1573
1574 for (name, value) in auth_headers {
1575 request_builder = request_builder.header(&name, &value);
1576 }
1577
1578 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1579 request_builder =
1580 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1581 }
1582
1583 let response = tokio::select! {
1584 biased;
1585 () = token.cancelled() => {
1586 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1587 }
1588 res = request_builder.send() => {
1589 res?
1590 }
1591 };
1592
1593 let status = response.status();
1594 if !status.is_success() {
1595 let error_text = tokio::select! {
1596 biased;
1597 () = token.cancelled() => {
1598 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1599 }
1600 text_res = response.text() => {
1601 text_res?
1602 }
1603 };
1604
1605 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1606 let action = self
1607 .on_auth_error(status.as_u16(), &error_text, RequestKind::Stream)
1608 .await?;
1609 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1610 attempts += 1;
1611 continue;
1612 }
1613 return Err(ApiError::AuthenticationFailed {
1614 provider: self.name().to_string(),
1615 details: error_text,
1616 });
1617 }
1618
1619 return Err(match status.as_u16() {
1620 401 | 403 => ApiError::AuthenticationFailed {
1621 provider: self.name().to_string(),
1622 details: error_text,
1623 },
1624 429 => ApiError::RateLimited {
1625 provider: self.name().to_string(),
1626 details: error_text,
1627 },
1628 400..=499 => ApiError::InvalidRequest {
1629 provider: self.name().to_string(),
1630 details: error_text,
1631 },
1632 500..=599 => ApiError::ServerError {
1633 provider: self.name().to_string(),
1634 status_code: status.as_u16(),
1635 details: error_text,
1636 },
1637 _ => ApiError::Unknown {
1638 provider: self.name().to_string(),
1639 details: error_text,
1640 },
1641 });
1642 }
1643
1644 let byte_stream = response.bytes_stream();
1645 let sse_stream = parse_sse_stream(byte_stream);
1646
1647 let stream = convert_claude_stream(sse_stream, token);
1648
1649 return Ok(Box::pin(stream));
1650 }
1651 }
1652}
1653
1654fn auth_header_context(model_id: &ModelId, request_kind: RequestKind) -> AuthHeaderContext {
1655 AuthHeaderContext {
1656 model_id: Some(AuthModelId {
1657 provider_id: AuthProviderId(model_id.provider.as_str().to_string()),
1658 model_id: model_id.id.clone(),
1659 }),
1660 request_kind,
1661 }
1662}
1663
1664fn is_auth_status(status: reqwest::StatusCode) -> bool {
1665 matches!(
1666 status,
1667 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
1668 )
1669}
1670
1671fn truncate_body(body: &str) -> String {
1672 const LIMIT: usize = 512;
1673 let mut chars = body.chars();
1674 let truncated: String = chars.by_ref().take(LIMIT).collect();
1675 if chars.next().is_some() {
1676 format!("{truncated}...")
1677 } else {
1678 truncated
1679 }
1680}
1681
1682fn apply_instruction_policy(
1683 system: Option<SystemContext>,
1684 policy: Option<&InstructionPolicy>,
1685) -> Option<String> {
1686 let base = system.as_ref().and_then(|context| {
1687 let trimmed = context.prompt.trim();
1688 if trimmed.is_empty() {
1689 None
1690 } else {
1691 Some(trimmed.to_string())
1692 }
1693 });
1694
1695 let context = system
1696 .as_ref()
1697 .and_then(|context| context.render_with_prompt(base.clone()));
1698
1699 match policy {
1700 None => context,
1701 Some(InstructionPolicy::Prefix(prefix)) => {
1702 if let Some(context) = context {
1703 Some(format!("{prefix}\n{context}"))
1704 } else {
1705 Some(prefix.clone())
1706 }
1707 }
1708 Some(InstructionPolicy::DefaultIfEmpty(default)) => {
1709 if context.is_some() {
1710 context
1711 } else {
1712 Some(default.clone())
1713 }
1714 }
1715 Some(InstructionPolicy::Override(override_text)) => {
1716 let mut combined = override_text.clone();
1717 if let Some(system) = system.as_ref() {
1718 let overlay = system.prompt.trim();
1719 if !overlay.is_empty() {
1720 combined.push_str("\n\n## Operating Mode\n");
1721 combined.push_str(overlay);
1722 }
1723
1724 let env = system
1725 .environment
1726 .as_ref()
1727 .map(|env| env.as_context())
1728 .map(|value| value.trim().to_string())
1729 .filter(|value| !value.is_empty());
1730 if let Some(env) = env {
1731 combined.push_str("\n\n");
1732 combined.push_str(&env);
1733 }
1734 }
1735 Some(combined)
1736 }
1737 }
1738}
1739
1740fn build_system_content(
1741 system: Option<String>,
1742 cache_setting: Option<CacheControl>,
1743) -> Option<System> {
1744 system.map(|text| {
1745 System::Content(vec![SystemContentBlock {
1746 content_type: "text".to_string(),
1747 text,
1748 cache_control: cache_setting,
1749 }])
1750 })
1751}
1752
1753#[derive(Debug)]
1754enum BlockState {
1755 Text {
1756 text: String,
1757 },
1758 Thinking {
1759 text: String,
1760 signature: Option<String>,
1761 },
1762 ToolUse {
1763 id: String,
1764 name: String,
1765 input: String,
1766 },
1767 Unknown,
1768}
1769
1770fn block_state_to_content(state: BlockState) -> Option<AssistantContent> {
1771 match state {
1772 BlockState::Text { text } => {
1773 if text.is_empty() {
1774 None
1775 } else {
1776 Some(AssistantContent::Text { text })
1777 }
1778 }
1779 BlockState::Thinking { text, signature } => {
1780 if text.is_empty() {
1781 None
1782 } else {
1783 let thought = if let Some(sig) = signature {
1784 ThoughtContent::Signed {
1785 text,
1786 signature: sig,
1787 }
1788 } else {
1789 ThoughtContent::Simple { text }
1790 };
1791 Some(AssistantContent::Thought { thought })
1792 }
1793 }
1794 BlockState::ToolUse { id, name, input } => {
1795 if id.is_empty() || name.is_empty() {
1796 None
1797 } else {
1798 let parameters: serde_json::Value = serde_json::from_str(&input)
1799 .unwrap_or(serde_json::Value::Object(Default::default()));
1800 Some(AssistantContent::ToolCall {
1801 tool_call: ToolCall {
1802 id,
1803 name,
1804 parameters,
1805 },
1806 thought_signature: None,
1807 })
1808 }
1809 }
1810 BlockState::Unknown => None,
1811 }
1812}
1813
1814fn convert_claude_stream(
1815 sse_stream: crate::api::sse::SseStream,
1816 token: CancellationToken,
1817) -> impl futures_core::Stream<Item = StreamChunk> + Send {
1818 async_stream::stream! {
1819 let mut block_states: std::collections::HashMap<usize, BlockState> =
1820 std::collections::HashMap::new();
1821 let mut completed_content: Vec<AssistantContent> = Vec::new();
1822
1823 tokio::pin!(sse_stream);
1824
1825 while let Some(event_result) = sse_stream.next().await {
1826 if token.is_cancelled() {
1827 yield StreamChunk::Error(StreamError::Cancelled);
1828 break;
1829 }
1830
1831 let event = match event_result {
1832 Ok(e) => e,
1833 Err(e) => {
1834 yield StreamChunk::Error(StreamError::SseParse(e));
1835 break;
1836 }
1837 };
1838
1839 let parsed: Result<ClaudeStreamEvent, _> = serde_json::from_str(&event.data);
1840 let stream_event = match parsed {
1841 Ok(e) => e,
1842 Err(_) => continue,
1843 };
1844
1845 match stream_event {
1846 ClaudeStreamEvent::ContentBlockStart { index, content_block } => {
1847 match content_block.block_type.as_str() {
1848 "text" => {
1849 let text = content_block.text.unwrap_or_default();
1850 if !text.is_empty() {
1851 yield StreamChunk::TextDelta(text.clone());
1852 }
1853 block_states.insert(index, BlockState::Text { text });
1854 }
1855 "thinking" => {
1856 block_states.insert(
1857 index,
1858 BlockState::Thinking {
1859 text: String::new(),
1860 signature: None,
1861 },
1862 );
1863 }
1864 "tool_use" => {
1865 let id = content_block.id.unwrap_or_default();
1866 let name = content_block.name.unwrap_or_default();
1867 if !id.is_empty() && !name.is_empty() {
1868 yield StreamChunk::ToolUseStart {
1869 id: id.clone(),
1870 name: name.clone(),
1871 };
1872 }
1873 block_states.insert(
1874 index,
1875 BlockState::ToolUse {
1876 id,
1877 name,
1878 input: String::new(),
1879 },
1880 );
1881 }
1882 _ => {
1883 block_states.insert(index, BlockState::Unknown);
1884 }
1885 }
1886 }
1887 ClaudeStreamEvent::ContentBlockDelta { index, delta } => match delta {
1888 ClaudeDelta::Text { text } => {
1889 match block_states.get_mut(&index) {
1890 Some(BlockState::Text { text: buf }) => buf.push_str(&text),
1891 _ => {
1892 block_states.insert(index, BlockState::Text { text: text.clone() });
1893 }
1894 }
1895 yield StreamChunk::TextDelta(text);
1896 }
1897 ClaudeDelta::Thinking { thinking } => {
1898 match block_states.get_mut(&index) {
1899 Some(BlockState::Thinking { text, .. }) => text.push_str(&thinking),
1900 _ => {
1901 block_states.insert(
1902 index,
1903 BlockState::Thinking {
1904 text: thinking.clone(),
1905 signature: None,
1906 },
1907 );
1908 }
1909 }
1910 yield StreamChunk::ThinkingDelta(thinking);
1911 }
1912 ClaudeDelta::Signature { signature } => {
1913 if let Some(BlockState::Thinking { signature: sig, .. }) =
1914 block_states.get_mut(&index)
1915 {
1916 *sig = Some(signature);
1917 }
1918 }
1919 ClaudeDelta::InputJson { partial_json } => {
1920 if let Some(BlockState::ToolUse { id, input, .. }) =
1921 block_states.get_mut(&index)
1922 {
1923 input.push_str(&partial_json);
1924 if !id.is_empty() {
1925 yield StreamChunk::ToolUseInputDelta {
1926 id: id.clone(),
1927 delta: partial_json,
1928 };
1929 }
1930 }
1931 }
1932 },
1933 ClaudeStreamEvent::ContentBlockStop { index } => {
1934 if let Some(state) = block_states.remove(&index)
1935 && let Some(content) = block_state_to_content(state)
1936 {
1937 completed_content.push(content);
1938 }
1939 yield StreamChunk::ContentBlockStop { index };
1940 }
1941 ClaudeStreamEvent::MessageStop => {
1942 if !block_states.is_empty() {
1943 tracing::warn!(
1944 target: "anthropic::stream",
1945 "MessageStop received with {} unfinished content blocks",
1946 block_states.len()
1947 );
1948 }
1949 let content = std::mem::take(&mut completed_content);
1950 yield StreamChunk::MessageComplete(CompletionResponse { content });
1951 break;
1952 }
1953 ClaudeStreamEvent::Error { error } => {
1954 yield StreamChunk::Error(StreamError::Provider {
1955 provider: "anthropic".into(),
1956 error_type: error.error_type,
1957 message: error.message,
1958 });
1959 }
1960 ClaudeStreamEvent::MessageStart { .. }
1961 | ClaudeStreamEvent::MessageDelta { .. }
1962 | ClaudeStreamEvent::Ping => {}
1963 }
1964 }
1965 }
1966}