1use async_trait::async_trait;
2use futures_util::StreamExt;
3use reqwest::{self, header};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use strum_macros::Display;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, warn};
9
10use crate::api::error::StreamError;
11use crate::api::provider::{CompletionStream, StreamChunk, TokenUsage};
12use crate::api::sse::parse_sse_stream;
13use crate::api::{CompletionResponse, Provider, error::ApiError};
14use crate::app::SystemContext;
15use crate::app::conversation::{
16 AssistantContent, ImageSource, Message as AppMessage, ThoughtContent, ToolResult, UserContent,
17};
18use crate::auth::{
19 AnthropicAuth, AuthErrorAction, AuthErrorContext, AuthHeaderContext, InstructionPolicy,
20 RequestKind,
21};
22use crate::auth::{ModelId as AuthModelId, ProviderId as AuthProviderId};
23use crate::config::model::{ModelId, ModelParameters};
24use steer_tools::{InputSchema, ToolCall, ToolSchema};
25
26const API_URL: &str = "https://api.anthropic.com/v1/messages";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display)]
29pub enum ClaudeMessageRole {
30 #[serde(rename = "user")]
31 #[strum(serialize = "user")]
32 User,
33 #[serde(rename = "assistant")]
34 #[strum(serialize = "assistant")]
35 Assistant,
36 #[serde(rename = "tool")]
37 #[strum(serialize = "tool")]
38 Tool,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct ClaudeMessage {
44 pub role: ClaudeMessageRole,
45 #[serde(flatten)]
46 pub content: ClaudeMessageContent,
47 #[serde(skip_serializing)]
48 pub id: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(untagged)]
54pub enum ClaudeMessageContent {
55 Text { content: String },
57 StructuredContent { content: ClaudeStructuredContent },
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(transparent)]
64pub struct ClaudeStructuredContent(pub Vec<ClaudeContentBlock>);
65
66#[derive(Clone)]
67enum AuthMode {
68 ApiKey(String),
69 Directive(AnthropicAuth),
70}
71
72#[derive(Clone)]
73pub struct AnthropicClient {
74 http_client: reqwest::Client,
75 auth: AuthMode,
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
79#[serde(rename_all = "lowercase")]
80#[derive(Default)]
81enum ThinkingType {
82 #[default]
83 Enabled,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
87struct Thinking {
88 #[serde(rename = "type", default)]
89 thinking_type: ThinkingType,
90 budget_tokens: u32,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
94pub struct ClaudeImageSource {
95 #[serde(rename = "type")]
96 source_type: String,
97 media_type: String,
98 data: String,
99}
100
101#[derive(Debug, Serialize, Clone)]
102struct SystemContentBlock {
103 #[serde(rename = "type")]
104 content_type: String,
105 text: String,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 cache_control: Option<CacheControl>,
108}
109
110#[derive(Debug, Serialize, Clone)]
111#[serde(untagged)]
112enum System {
113 Content(Vec<SystemContentBlock>),
115}
116
117#[derive(Debug, Serialize)]
118struct CompletionRequest {
119 model: String,
120 messages: Vec<ClaudeMessage>,
121 max_tokens: usize,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 system: Option<System>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 tools: Option<Vec<ClaudeTool>>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 temperature: Option<f32>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 top_p: Option<f32>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 top_k: Option<usize>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 stream: Option<bool>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 thinking: Option<Thinking>,
136}
137
138#[derive(Debug, Serialize, Clone)]
139struct ClaudeTool {
140 name: String,
141 description: String,
142 input_schema: InputSchema,
143}
144
145impl From<ToolSchema> for ClaudeTool {
146 fn from(tool: ToolSchema) -> Self {
147 let tool = adapt_tool_schema_for_claude(tool);
148 Self {
149 name: tool.name,
150 description: tool.description,
151 input_schema: tool.input_schema,
152 }
153 }
154}
155
156#[derive(Debug, Serialize, Deserialize, Clone)]
157struct ClaudeCompletionResponse {
158 id: String,
159 content: Vec<ClaudeContentBlock>,
160 model: String,
161 role: String,
162 #[serde(default)]
163 stop_reason: Option<String>,
164 #[serde(default)]
165 stop_sequence: Option<String>,
166 #[serde(default)]
167 usage: ClaudeUsage,
168 #[serde(flatten)]
170 extra: std::collections::HashMap<String, serde_json::Value>,
171}
172
173fn adapt_tool_schema_for_claude(tool: ToolSchema) -> ToolSchema {
174 let root_schema = tool.input_schema.as_value();
175 let sanitized = sanitize_for_claude(root_schema, root_schema);
176 ToolSchema {
177 input_schema: InputSchema::new(sanitized),
178 ..tool
179 }
180}
181
182fn decode_pointer_segment(segment: &str) -> std::borrow::Cow<'_, str> {
183 if !segment.contains('~') {
184 return std::borrow::Cow::Borrowed(segment);
185 }
186 std::borrow::Cow::Owned(segment.replace("~1", "/").replace("~0", "~"))
187}
188
189fn resolve_ref<'a>(root: &'a Value, reference: &str) -> Option<&'a Value> {
190 let path = reference.strip_prefix("#/")?;
191 let mut current = root;
192 for segment in path.split('/') {
193 let decoded = decode_pointer_segment(segment);
194 current = current.get(decoded.as_ref())?;
195 }
196 Some(current)
197}
198
199fn infer_type_from_enum(values: &[Value]) -> Option<String> {
200 let mut has_string = false;
201 let mut has_number = false;
202 let mut has_bool = false;
203 let mut has_object = false;
204 let mut has_array = false;
205
206 for value in values {
207 match value {
208 Value::String(_) => has_string = true,
209 Value::Number(_) => has_number = true,
210 Value::Bool(_) => has_bool = true,
211 Value::Object(_) => has_object = true,
212 Value::Array(_) => has_array = true,
213 Value::Null => {}
214 }
215 }
216
217 let kind_count = u8::from(has_string)
218 + u8::from(has_number)
219 + u8::from(has_bool)
220 + u8::from(has_object)
221 + u8::from(has_array);
222
223 if kind_count != 1 {
224 return None;
225 }
226
227 if has_string {
228 Some("string".to_string())
229 } else if has_number {
230 Some("number".to_string())
231 } else if has_bool {
232 Some("boolean".to_string())
233 } else if has_object {
234 Some("object".to_string())
235 } else if has_array {
236 Some("array".to_string())
237 } else {
238 None
239 }
240}
241
242fn normalize_type(value: &Value) -> Value {
243 if let Some(type_str) = value.as_str() {
244 return Value::String(type_str.to_string());
245 }
246
247 if let Some(type_array) = value.as_array()
248 && let Some(primary_type) = type_array
249 .iter()
250 .find_map(|v| if v.is_null() { None } else { v.as_str() })
251 {
252 return Value::String(primary_type.to_string());
253 }
254
255 Value::String("string".to_string())
256}
257
258fn extract_enum_values(value: &Value) -> Vec<Value> {
259 let Some(obj) = value.as_object() else {
260 return Vec::new();
261 };
262
263 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
264 return enum_values
265 .iter()
266 .filter(|v| !v.is_null())
267 .cloned()
268 .collect();
269 }
270
271 if let Some(const_value) = obj.get("const") {
272 if const_value.is_null() {
273 return Vec::new();
274 }
275 return vec![const_value.clone()];
276 }
277
278 Vec::new()
279}
280
281fn merge_property(properties: &mut serde_json::Map<String, Value>, key: &str, value: &Value) {
282 match properties.get_mut(key) {
283 None => {
284 properties.insert(key.to_string(), value.clone());
285 }
286 Some(existing) => {
287 if existing == value {
288 return;
289 }
290
291 let existing_values = extract_enum_values(existing);
292 let incoming_values = extract_enum_values(value);
293 if incoming_values.is_empty() && existing_values.is_empty() {
294 return;
295 }
296
297 let mut combined = existing_values;
298 for item in incoming_values {
299 if !combined.contains(&item) {
300 combined.push(item);
301 }
302 }
303
304 if combined.is_empty() {
305 return;
306 }
307
308 if let Some(obj) = existing.as_object_mut() {
309 obj.remove("const");
310 obj.insert("enum".to_string(), Value::Array(combined.clone()));
311 if !obj.contains_key("type")
312 && let Some(inferred) = infer_type_from_enum(&combined)
313 {
314 obj.insert("type".to_string(), Value::String(inferred));
315 }
316 }
317 }
318 }
319}
320
321fn merge_union_schemas(
322 root: &Value,
323 variants: &[Value],
324 seen_refs: &mut std::collections::HashSet<String>,
325) -> Value {
326 let mut merged_props = serde_json::Map::new();
327 let mut required_intersection: Option<std::collections::BTreeSet<String>> = None;
328 let mut enum_values: Vec<Value> = Vec::new();
329 let mut type_candidates: Vec<String> = Vec::new();
330
331 for variant in variants {
332 let sanitized = sanitize_for_claude_inner(root, variant, seen_refs);
333
334 if let Some(schema_type) = sanitized.get("type").and_then(|v| v.as_str()) {
335 type_candidates.push(schema_type.to_string());
336 }
337
338 if let Some(props) = sanitized.get("properties").and_then(|v| v.as_object()) {
339 for (key, value) in props {
340 merge_property(&mut merged_props, key, value);
341 }
342 }
343
344 if let Some(req) = sanitized.get("required").and_then(|v| v.as_array()) {
345 let req_set: std::collections::BTreeSet<String> = req
346 .iter()
347 .filter_map(|item| item.as_str().map(|s| s.to_string()))
348 .collect();
349
350 required_intersection = match required_intersection.take() {
351 None => Some(req_set),
352 Some(existing) => Some(
353 existing
354 .intersection(&req_set)
355 .cloned()
356 .collect::<std::collections::BTreeSet<String>>(),
357 ),
358 };
359 }
360
361 if let Some(values) = sanitized.get("enum").and_then(|v| v.as_array()) {
362 for value in values {
363 if value.is_null() {
364 continue;
365 }
366 if !enum_values.contains(value) {
367 enum_values.push(value.clone());
368 }
369 }
370 }
371 }
372
373 let schema_type = if !merged_props.is_empty() {
374 "object".to_string()
375 } else if let Some(inferred) = infer_type_from_enum(&enum_values) {
376 inferred
377 } else if let Some(first) = type_candidates.first() {
378 first.clone()
379 } else {
380 "string".to_string()
381 };
382
383 let mut merged = serde_json::Map::new();
384 merged.insert("type".to_string(), Value::String(schema_type));
385
386 if !merged_props.is_empty() {
387 merged.insert("properties".to_string(), Value::Object(merged_props));
388 }
389
390 if let Some(required_set) = required_intersection
391 && !required_set.is_empty()
392 {
393 merged.insert(
394 "required".to_string(),
395 Value::Array(
396 required_set
397 .into_iter()
398 .map(Value::String)
399 .collect::<Vec<_>>(),
400 ),
401 );
402 }
403
404 if !enum_values.is_empty() {
405 merged.insert("enum".to_string(), Value::Array(enum_values));
406 }
407
408 Value::Object(merged)
409}
410
411fn sanitize_for_claude(root: &Value, schema: &Value) -> Value {
412 let mut seen_refs = std::collections::HashSet::new();
413 sanitize_for_claude_inner(root, schema, &mut seen_refs)
414}
415
416fn fallback_schema() -> Value {
417 let mut out = serde_json::Map::new();
418 out.insert("type".to_string(), Value::String("object".to_string()));
419 out.insert(
420 "properties".to_string(),
421 Value::Object(serde_json::Map::new()),
422 );
423 Value::Object(out)
424}
425
426fn sanitize_for_claude_inner(
427 root: &Value,
428 schema: &Value,
429 seen_refs: &mut std::collections::HashSet<String>,
430) -> Value {
431 if let Some(reference) = schema.get("$ref").and_then(|v| v.as_str()) {
432 if !seen_refs.insert(reference.to_string()) {
433 return fallback_schema();
434 }
435 if let Some(resolved) = resolve_ref(root, reference) {
436 let sanitized = sanitize_for_claude_inner(root, resolved, seen_refs);
437 seen_refs.remove(reference);
438 return sanitized;
439 }
440 seen_refs.remove(reference);
441 }
442
443 let Some(obj) = schema.as_object() else {
444 return schema.clone();
445 };
446
447 if let Some(union) = obj
448 .get("oneOf")
449 .or_else(|| obj.get("anyOf"))
450 .or_else(|| obj.get("allOf"))
451 .and_then(|v| v.as_array())
452 {
453 return merge_union_schemas(root, union, seen_refs);
454 }
455
456 let mut out = serde_json::Map::new();
457 for (key, value) in obj {
458 match key.as_str() {
459 "$ref"
460 | "$defs"
461 | "oneOf"
462 | "anyOf"
463 | "allOf"
464 | "const"
465 | "additionalProperties"
466 | "default"
467 | "examples"
468 | "title"
469 | "pattern"
470 | "minLength"
471 | "maxLength"
472 | "minimum"
473 | "maximum"
474 | "minItems"
475 | "maxItems"
476 | "uniqueItems"
477 | "deprecated" => {}
478 "type" => {
479 out.insert("type".to_string(), normalize_type(value));
480 }
481 "properties" => {
482 if let Some(props) = value.as_object() {
483 let mut sanitized_props = serde_json::Map::new();
484 for (prop_key, prop_value) in props {
485 sanitized_props.insert(
486 prop_key.clone(),
487 sanitize_for_claude_inner(root, prop_value, seen_refs),
488 );
489 }
490 out.insert("properties".to_string(), Value::Object(sanitized_props));
491 }
492 }
493 "items" => {
494 if let Some(items) = value.as_array() {
495 let merged = merge_union_schemas(root, items, seen_refs);
496 out.insert("items".to_string(), merged);
497 } else {
498 out.insert(
499 "items".to_string(),
500 sanitize_for_claude_inner(root, value, seen_refs),
501 );
502 }
503 }
504 "enum" => {
505 let values = value
506 .as_array()
507 .map(|items| {
508 items
509 .iter()
510 .filter(|v| !v.is_null())
511 .cloned()
512 .collect::<Vec<_>>()
513 })
514 .unwrap_or_default();
515 out.insert("enum".to_string(), Value::Array(values));
516 }
517 _ => {
518 out.insert(
519 key.clone(),
520 sanitize_for_claude_inner(root, value, seen_refs),
521 );
522 }
523 }
524 }
525
526 if let Some(const_value) = obj.get("const")
527 && !const_value.is_null()
528 {
529 out.insert("enum".to_string(), Value::Array(vec![const_value.clone()]));
530 if !out.contains_key("type")
531 && let Some(inferred) = infer_type_from_enum(std::slice::from_ref(const_value))
532 {
533 out.insert("type".to_string(), Value::String(inferred));
534 }
535 }
536
537 if out.get("type") == Some(&Value::String("object".to_string()))
538 && !out.contains_key("properties")
539 {
540 out.insert(
541 "properties".to_string(),
542 Value::Object(serde_json::Map::new()),
543 );
544 }
545
546 if !out.contains_key("type") {
547 if out.contains_key("properties") {
548 out.insert("type".to_string(), Value::String("object".to_string()));
549 } else if out.contains_key("items") {
550 out.insert("type".to_string(), Value::String("array".to_string()));
551 } else if let Some(enum_values) = out.get("enum").and_then(|v| v.as_array())
552 && let Some(inferred) = infer_type_from_enum(enum_values)
553 {
554 out.insert("type".to_string(), Value::String(inferred));
555 }
556 }
557
558 Value::Object(out)
559}
560
561fn default_cache_type() -> String {
562 "ephemeral".to_string()
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::api::error::SseParseError;
569 use crate::api::sse::{SseEvent, SseStream};
570 use futures_util::{StreamExt, stream};
571 use serde_json::json;
572
573 #[test]
574 fn sanitize_handles_recursive_ref() {
575 let schema = json!({
576 "$defs": {
577 "node": {
578 "type": "object",
579 "properties": {
580 "next": { "$ref": "#/$defs/node" }
581 }
582 }
583 },
584 "$ref": "#/$defs/node"
585 });
586
587 let sanitized = sanitize_for_claude(&schema, &schema);
588 let next = sanitized
589 .get("properties")
590 .and_then(|v| v.get("next"))
591 .and_then(|v| v.get("type"))
592 .and_then(|v| v.as_str());
593
594 assert_eq!(next, Some("object"));
595 }
596
597 #[test]
598 fn sanitize_collapses_tuple_items() {
599 let schema = json!({
600 "type": "array",
601 "items": [
602 { "type": "string" },
603 { "type": "number" }
604 ]
605 });
606
607 let sanitized = sanitize_for_claude(&schema, &schema);
608 let items = sanitized.get("items");
609
610 assert!(matches!(items, Some(Value::Object(_))));
611 }
612
613 #[test]
614 fn sanitize_removes_unsupported_keywords() {
615 let schema = json!({
616 "type": "object",
617 "title": "ignored",
618 "additionalProperties": false,
619 "properties": {
620 "name": {
621 "type": "string",
622 "pattern": "^[a-z]+$",
623 "default": "x"
624 }
625 }
626 });
627
628 let sanitized = sanitize_for_claude(&schema, &schema);
629 let expected = json!({
630 "type": "object",
631 "properties": {
632 "name": { "type": "string" }
633 }
634 });
635
636 assert_eq!(sanitized, expected);
637 }
638
639 #[test]
640 fn sanitize_converts_const_to_enum_with_type() {
641 let schema = json!({
642 "const": "fixed"
643 });
644
645 let sanitized = sanitize_for_claude(&schema, &schema);
646 let expected = json!({
647 "enum": ["fixed"],
648 "type": "string"
649 });
650
651 assert_eq!(sanitized, expected);
652 }
653
654 #[test]
655 fn sanitize_filters_null_enum_values() {
656 let schema = json!({
657 "enum": ["a", null, "b"]
658 });
659
660 let sanitized = sanitize_for_claude(&schema, &schema);
661 let expected = json!({
662 "enum": ["a", "b"],
663 "type": "string"
664 });
665
666 assert_eq!(sanitized, expected);
667 }
668
669 #[test]
670 fn sanitize_decodes_json_pointer_refs() {
671 let schema = json!({
672 "$defs": {
673 "a/b": { "type": "string" }
674 },
675 "$ref": "#/$defs/a~1b"
676 });
677
678 let sanitized = sanitize_for_claude(&schema, &schema);
679 let expected = json!({
680 "type": "string"
681 });
682
683 assert_eq!(sanitized, expected);
684 }
685
686 #[test]
687 fn sanitize_merges_union_properties_and_required() {
688 let schema = json!({
689 "oneOf": [
690 {
691 "type": "object",
692 "properties": {
693 "a": { "type": "string" },
694 "b": { "type": "string" }
695 },
696 "required": ["a"]
697 },
698 {
699 "type": "object",
700 "properties": {
701 "a": { "type": "string" },
702 "c": { "type": "string" }
703 },
704 "required": ["a", "c"]
705 }
706 ]
707 });
708
709 let sanitized = sanitize_for_claude(&schema, &schema);
710 let expected = json!({
711 "type": "object",
712 "properties": {
713 "a": { "type": "string" },
714 "b": { "type": "string" },
715 "c": { "type": "string" }
716 },
717 "required": ["a"]
718 });
719
720 assert_eq!(sanitized, expected);
721 }
722
723 #[test]
724 fn sanitize_infers_array_type_from_items() {
725 let schema = json!({
726 "items": {
727 "type": "string"
728 }
729 });
730
731 let sanitized = sanitize_for_claude(&schema, &schema);
732 let expected = json!({
733 "type": "array",
734 "items": { "type": "string" }
735 });
736
737 assert_eq!(sanitized, expected);
738 }
739
740 #[test]
741 fn user_image_data_url_to_claude_block() {
742 let image = crate::app::conversation::ImageContent {
743 mime_type: "image/png".to_string(),
744 source: crate::app::conversation::ImageSource::DataUrl {
745 data_url: "".to_string(),
746 },
747 width: None,
748 height: None,
749 bytes: None,
750 sha256: None,
751 };
752
753 let block = user_image_to_claude_block(&image).expect("image should convert");
754 let json = serde_json::to_value(block).expect("serialize block");
755 assert_eq!(json["type"], "image");
756 assert_eq!(json["source"]["type"], "base64");
757 assert_eq!(json["source"]["media_type"], "image/png");
758 assert_eq!(json["source"]["data"], "Zm9v");
759 }
760
761 #[test]
762 fn user_image_session_file_source_is_unsupported() {
763 let image = crate::app::conversation::ImageContent {
764 mime_type: "image/png".to_string(),
765 source: crate::app::conversation::ImageSource::SessionFile {
766 relative_path: "session-1/image.png".to_string(),
767 },
768 width: None,
769 height: None,
770 bytes: None,
771 sha256: None,
772 };
773
774 let err = user_image_to_claude_block(&image).expect_err("expected unsupported feature");
775 match err {
776 ApiError::UnsupportedFeature {
777 provider,
778 feature,
779 details,
780 } => {
781 assert_eq!(provider, "anthropic");
782 assert_eq!(feature, "image input source");
783 assert!(details.contains("session file"));
784 }
785 other => panic!("Expected UnsupportedFeature, got {other:?}"),
786 }
787 }
788
789 #[test]
790 fn map_claude_usage_computes_total_when_missing() {
791 let usage = ClaudeUsage {
792 input: 11,
793 output: 7,
794 total: None,
795 cache_creation_input: None,
796 cache_read_input: None,
797 };
798
799 assert_eq!(map_claude_usage(&usage), TokenUsage::new(11, 7, 18));
800 }
801
802 #[test]
803 fn convert_claude_completion_includes_usage() {
804 let claude_completion = ClaudeCompletionResponse {
805 id: "msg_1".to_string(),
806 content: vec![ClaudeContentBlock::Text {
807 text: "hi".to_string(),
808 cache_control: None,
809 extra: Default::default(),
810 }],
811 model: "claude-test".to_string(),
812 role: "assistant".to_string(),
813 stop_reason: Some("end_turn".to_string()),
814 stop_sequence: None,
815 usage: ClaudeUsage {
816 input: 12,
817 output: 4,
818 total: Some(22),
819 cache_creation_input: None,
820 cache_read_input: None,
821 },
822 extra: Default::default(),
823 };
824
825 let response = convert_claude_completion(claude_completion);
826
827 assert_eq!(response.usage, Some(TokenUsage::new(12, 4, 22)));
828 assert!(matches!(
829 response.content.first(),
830 Some(AssistantContent::Text { text }) if text == "hi"
831 ));
832 }
833
834 #[tokio::test]
835 async fn convert_claude_stream_attaches_latest_usage_on_message_stop() {
836 let events = vec![
837 Ok::<SseEvent, SseParseError>(SseEvent {
838 event_type: None,
839 data: r#"{"type":"content_block_start","index":0,"content_block":{"type":"text","text":"Hello"}}"#.to_string(),
840 id: None,
841 }),
842 Ok(SseEvent {
843 event_type: None,
844 data: r#"{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}"#.to_string(),
845 id: None,
846 }),
847 Ok(SseEvent {
848 event_type: None,
849 data: r#"{"type":"content_block_stop","index":0}"#.to_string(),
850 id: None,
851 }),
852 Ok(SseEvent {
853 event_type: None,
854 data: r#"{"type":"message_delta","delta":{"stop_reason":null},"usage":{"input_tokens":9,"output_tokens":3}}"#.to_string(),
855 id: None,
856 }),
857 Ok(SseEvent {
858 event_type: None,
859 data: r#"{"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":10,"output_tokens":5}}"#.to_string(),
860 id: None,
861 }),
862 Ok(SseEvent {
863 event_type: None,
864 data: r#"{"type":"message_stop"}"#.to_string(),
865 id: None,
866 }),
867 ];
868
869 let sse_stream: SseStream = Box::pin(stream::iter(events));
870 let token = CancellationToken::new();
871 let mut stream = std::pin::pin!(convert_claude_stream(sse_stream, token));
872
873 let mut full_text = String::new();
874 let mut completion = None;
875
876 while let Some(chunk) = stream.next().await {
877 match chunk {
878 StreamChunk::TextDelta(delta) => full_text.push_str(&delta),
879 StreamChunk::ContentBlockStop { .. } => {}
880 StreamChunk::MessageComplete(response) => {
881 completion = Some(response);
882 break;
883 }
884 unexpected => panic!("unexpected chunk: {unexpected:?}"),
885 }
886 }
887
888 assert_eq!(full_text, "Hello world");
889
890 let response = completion.expect("expected final message completion chunk");
891 assert_eq!(response.usage, Some(TokenUsage::new(10, 5, 15)));
892 assert!(matches!(
893 response.content.first(),
894 Some(AssistantContent::Text { text }) if text == "Hello world"
895 ));
896 }
897}
898
899#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
900pub struct CacheControl {
901 #[serde(rename = "type", default = "default_cache_type")]
902 cache_type: String,
903}
904
905#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
906#[serde(tag = "type")]
907pub enum ClaudeContentBlock {
908 #[serde(rename = "text")]
909 Text {
910 text: String,
911 #[serde(skip_serializing_if = "Option::is_none")]
912 cache_control: Option<CacheControl>,
913 #[serde(flatten)]
914 extra: std::collections::HashMap<String, serde_json::Value>,
915 },
916 #[serde(rename = "image")]
917 Image {
918 source: ClaudeImageSource,
919 #[serde(skip_serializing_if = "Option::is_none")]
920 cache_control: Option<CacheControl>,
921 #[serde(flatten)]
922 extra: std::collections::HashMap<String, serde_json::Value>,
923 },
924 #[serde(rename = "tool_use")]
925 ToolUse {
926 id: String,
927 name: String,
928 input: serde_json::Value,
929 #[serde(skip_serializing_if = "Option::is_none")]
930 cache_control: Option<CacheControl>,
931 #[serde(flatten)]
932 extra: std::collections::HashMap<String, serde_json::Value>,
933 },
934 #[serde(rename = "tool_result")]
935 ToolResult {
936 tool_use_id: String,
937 content: Vec<ClaudeContentBlock>,
938 #[serde(skip_serializing_if = "Option::is_none")]
939 cache_control: Option<CacheControl>,
940 #[serde(skip_serializing_if = "Option::is_none")]
941 is_error: Option<bool>,
942 #[serde(flatten)]
943 extra: std::collections::HashMap<String, serde_json::Value>,
944 },
945 #[serde(rename = "thinking")]
946 Thinking {
947 thinking: String,
948 signature: String,
949 #[serde(skip_serializing_if = "Option::is_none")]
950 cache_control: Option<CacheControl>,
951 #[serde(flatten)]
952 extra: std::collections::HashMap<String, serde_json::Value>,
953 },
954 #[serde(rename = "redacted_thinking")]
955 RedactedThinking {
956 data: String,
957 #[serde(skip_serializing_if = "Option::is_none")]
958 cache_control: Option<CacheControl>,
959 #[serde(flatten)]
960 extra: std::collections::HashMap<String, serde_json::Value>,
961 },
962 #[serde(other)]
963 Unknown,
964}
965
966#[derive(Debug, Serialize, Deserialize, Default, Clone)]
967struct ClaudeUsage {
968 #[serde(rename = "input_tokens")]
969 input: usize,
970 #[serde(rename = "output_tokens")]
971 output: usize,
972 #[serde(rename = "total_tokens")]
973 #[serde(skip_serializing_if = "Option::is_none")]
974 total: Option<usize>,
975 #[serde(rename = "cache_creation_input_tokens")]
976 #[serde(skip_serializing_if = "Option::is_none")]
977 cache_creation_input: Option<usize>,
978 #[serde(rename = "cache_read_input_tokens")]
979 #[serde(skip_serializing_if = "Option::is_none")]
980 cache_read_input: Option<usize>,
981}
982
983#[derive(Debug, Deserialize)]
984#[serde(tag = "type")]
985enum ClaudeStreamEvent {
986 #[serde(rename = "message_start")]
987 MessageStart {
988 #[expect(dead_code)]
989 message: ClaudeMessageStart,
990 },
991 #[serde(rename = "content_block_start")]
992 ContentBlockStart {
993 index: usize,
994 content_block: ClaudeContentBlockStart,
995 },
996 #[serde(rename = "content_block_delta")]
997 ContentBlockDelta { index: usize, delta: ClaudeDelta },
998 #[serde(rename = "content_block_stop")]
999 ContentBlockStop { index: usize },
1000 #[serde(rename = "message_delta")]
1001 MessageDelta {
1002 #[expect(dead_code)]
1003 delta: ClaudeMessageDeltaData,
1004 #[serde(default)]
1005 usage: Option<ClaudeUsage>,
1006 },
1007 #[serde(rename = "message_stop")]
1008 MessageStop,
1009 #[serde(rename = "ping")]
1010 Ping,
1011 #[serde(rename = "error")]
1012 Error { error: ClaudeStreamError },
1013}
1014
1015#[derive(Debug, Deserialize)]
1016struct ClaudeMessageStart {
1017 #[expect(dead_code)]
1018 #[serde(default)]
1019 id: String,
1020 #[expect(dead_code)]
1021 #[serde(default)]
1022 model: String,
1023}
1024
1025#[derive(Debug, Deserialize)]
1026struct ClaudeContentBlockStart {
1027 #[serde(rename = "type")]
1028 block_type: String,
1029 #[serde(default)]
1030 id: Option<String>,
1031 #[serde(default)]
1032 name: Option<String>,
1033 #[serde(default)]
1034 text: Option<String>,
1035}
1036
1037#[derive(Debug, Deserialize)]
1038#[serde(tag = "type")]
1039enum ClaudeDelta {
1040 #[serde(rename = "text_delta")]
1041 Text { text: String },
1042 #[serde(rename = "thinking_delta")]
1043 Thinking { thinking: String },
1044 #[serde(rename = "input_json_delta")]
1045 InputJson { partial_json: String },
1046 #[serde(rename = "signature_delta")]
1047 Signature { signature: String },
1048}
1049
1050#[derive(Debug, Deserialize)]
1051struct ClaudeMessageDeltaData {
1052 #[expect(dead_code)]
1053 #[serde(default)]
1054 stop_reason: Option<String>,
1055}
1056
1057#[derive(Debug, Deserialize)]
1058struct ClaudeStreamError {
1059 #[serde(default)]
1060 message: String,
1061 #[serde(rename = "type", default)]
1062 error_type: String,
1063}
1064
1065impl AnthropicClient {
1066 pub fn new(api_key: &str) -> Result<Self, ApiError> {
1067 Self::with_api_key(api_key)
1068 }
1069
1070 pub fn with_api_key(api_key: &str) -> Result<Self, ApiError> {
1071 Ok(Self {
1072 http_client: Self::build_http_client()?,
1073 auth: AuthMode::ApiKey(api_key.to_string()),
1074 })
1075 }
1076
1077 pub fn with_directive(directive: AnthropicAuth) -> Result<Self, ApiError> {
1078 Ok(Self {
1079 http_client: Self::build_http_client()?,
1080 auth: AuthMode::Directive(directive),
1081 })
1082 }
1083
1084 fn build_http_client() -> Result<reqwest::Client, ApiError> {
1085 let mut headers = header::HeaderMap::new();
1086 headers.insert(
1087 "anthropic-version",
1088 header::HeaderValue::from_static("2023-06-01"),
1089 );
1090 headers.insert(
1091 header::CONTENT_TYPE,
1092 header::HeaderValue::from_static("application/json"),
1093 );
1094
1095 reqwest::Client::builder()
1096 .default_headers(headers)
1097 .build()
1098 .map_err(ApiError::Network)
1099 }
1100
1101 async fn auth_headers(
1102 &self,
1103 ctx: AuthHeaderContext,
1104 ) -> Result<Vec<(String, String)>, ApiError> {
1105 match &self.auth {
1106 AuthMode::ApiKey(key) => Ok(vec![("x-api-key".to_string(), key.clone())]),
1107 AuthMode::Directive(directive) => {
1108 let header_pairs = directive
1109 .headers
1110 .headers(ctx)
1111 .await
1112 .map_err(|e| ApiError::AuthError(e.to_string()))?;
1113 Ok(header_pairs
1114 .into_iter()
1115 .map(|pair| (pair.name, pair.value))
1116 .collect())
1117 }
1118 }
1119 }
1120
1121 async fn on_auth_error(
1122 &self,
1123 status: u16,
1124 body: &str,
1125 request_kind: RequestKind,
1126 ) -> Result<AuthErrorAction, ApiError> {
1127 let AuthMode::Directive(directive) = &self.auth else {
1128 return Ok(AuthErrorAction::NoAction);
1129 };
1130 let context = AuthErrorContext {
1131 status: Some(status),
1132 body_snippet: Some(truncate_body(body)),
1133 request_kind,
1134 };
1135 directive
1136 .headers
1137 .on_auth_error(context)
1138 .await
1139 .map_err(|e| ApiError::AuthError(e.to_string()))
1140 }
1141
1142 fn request_url(&self) -> Result<String, ApiError> {
1143 let AuthMode::Directive(directive) = &self.auth else {
1144 return Ok(API_URL.to_string());
1145 };
1146
1147 let Some(query_params) = &directive.query_params else {
1148 return Ok(API_URL.to_string());
1149 };
1150
1151 if query_params.is_empty() {
1152 return Ok(API_URL.to_string());
1153 }
1154
1155 let mut url = url::Url::parse(API_URL)
1156 .map_err(|e| ApiError::Configuration(format!("Invalid API_URL '{API_URL}': {e}")))?;
1157 for param in query_params {
1158 url.query_pairs_mut().append_pair(¶m.name, ¶m.value);
1159 }
1160 Ok(url.to_string())
1161 }
1162}
1163
1164fn convert_messages(messages: Vec<AppMessage>) -> Result<Vec<ClaudeMessage>, ApiError> {
1166 let claude_messages: Result<Vec<ClaudeMessage>, ApiError> =
1167 messages.into_iter().map(convert_single_message).collect();
1168
1169 claude_messages.map(|messages| {
1171 messages
1172 .into_iter()
1173 .filter(|msg| {
1174 match &msg.content {
1175 ClaudeMessageContent::Text { content } => !content.trim().is_empty(),
1176 ClaudeMessageContent::StructuredContent { .. } => true, }
1178 })
1179 .collect()
1180 })
1181}
1182
1183fn user_image_to_claude_block(
1184 image: &crate::app::conversation::ImageContent,
1185) -> Result<ClaudeContentBlock, ApiError> {
1186 match &image.source {
1187 ImageSource::DataUrl { data_url } => {
1188 let Some((meta, payload)) = data_url.split_once(',') else {
1189 return Err(ApiError::InvalidRequest {
1190 provider: "anthropic".to_string(),
1191 details: "Image data URL is missing ',' separator".to_string(),
1192 });
1193 };
1194
1195 let Some(meta_body) = meta.strip_prefix("data:") else {
1196 return Err(ApiError::InvalidRequest {
1197 provider: "anthropic".to_string(),
1198 details: "Image data URL must start with 'data:'".to_string(),
1199 });
1200 };
1201
1202 if !meta_body
1203 .split(';')
1204 .any(|segment| segment.eq_ignore_ascii_case("base64"))
1205 {
1206 return Err(ApiError::InvalidRequest {
1207 provider: "anthropic".to_string(),
1208 details: "Anthropic image data URLs must be base64 encoded".to_string(),
1209 });
1210 }
1211
1212 let media_type = meta_body
1213 .split(';')
1214 .next()
1215 .filter(|value| !value.trim().is_empty())
1216 .unwrap_or("application/octet-stream")
1217 .to_string();
1218
1219 if !media_type.starts_with("image/") {
1220 return Err(ApiError::UnsupportedFeature {
1221 provider: "anthropic".to_string(),
1222 feature: "image input mime type".to_string(),
1223 details: format!("Unsupported image media type '{}'", media_type),
1224 });
1225 }
1226
1227 Ok(ClaudeContentBlock::Image {
1228 source: ClaudeImageSource {
1229 source_type: "base64".to_string(),
1230 media_type,
1231 data: payload.to_string(),
1232 },
1233 cache_control: None,
1234 extra: Default::default(),
1235 })
1236 }
1237 ImageSource::Url { url } => Err(ApiError::UnsupportedFeature {
1238 provider: "anthropic".to_string(),
1239 feature: "image input source".to_string(),
1240 details: format!(
1241 "Anthropic image input currently requires data URLs in this adapter; got URL '{}'",
1242 url
1243 ),
1244 }),
1245 ImageSource::SessionFile { relative_path } => Err(ApiError::UnsupportedFeature {
1246 provider: "anthropic".to_string(),
1247 feature: "image input source".to_string(),
1248 details: format!(
1249 "Anthropic adapter cannot access session file '{}' directly; use data URLs",
1250 relative_path
1251 ),
1252 }),
1253 }
1254}
1255
1256fn convert_single_message(msg: AppMessage) -> Result<ClaudeMessage, ApiError> {
1257 match &msg.data {
1258 crate::app::conversation::MessageData::User { content, .. } => {
1259 let mut claude_blocks = Vec::new();
1261 for user_content in content {
1262 match user_content {
1263 UserContent::Text { text } => {
1264 if !text.trim().is_empty() {
1265 claude_blocks.push(ClaudeContentBlock::Text {
1266 text: text.clone(),
1267 cache_control: None,
1268 extra: Default::default(),
1269 });
1270 }
1271 }
1272 UserContent::Image { image } => {
1273 claude_blocks.push(user_image_to_claude_block(image)?);
1274 }
1275 UserContent::CommandExecution {
1276 command,
1277 stdout,
1278 stderr,
1279 exit_code,
1280 } => {
1281 let text = UserContent::format_command_execution_as_xml(
1282 command, stdout, stderr, *exit_code,
1283 );
1284 if !text.trim().is_empty() {
1285 claude_blocks.push(ClaudeContentBlock::Text {
1286 text,
1287 cache_control: None,
1288 extra: Default::default(),
1289 });
1290 }
1291 }
1292 }
1293 }
1294
1295 if claude_blocks.is_empty() {
1296 return Err(ApiError::InvalidRequest {
1297 provider: "anthropic".to_string(),
1298 details: format!(
1299 "User message ID {} resulted in no valid content blocks",
1300 msg.id
1301 ),
1302 });
1303 }
1304
1305 Ok(ClaudeMessage {
1306 role: ClaudeMessageRole::User,
1307 content: ClaudeMessageContent::StructuredContent {
1308 content: ClaudeStructuredContent(claude_blocks),
1309 },
1310 id: Some(msg.id.clone()),
1311 })
1312 }
1313
1314 crate::app::conversation::MessageData::Assistant { content, .. } => {
1315 let claude_blocks: Vec<ClaudeContentBlock> = content
1317 .iter()
1318 .filter_map(|assistant_content| match assistant_content {
1319 AssistantContent::Text { text } => {
1320 if text.trim().is_empty() {
1321 None
1322 } else {
1323 Some(ClaudeContentBlock::Text {
1324 text: text.clone(),
1325 cache_control: None,
1326 extra: Default::default(),
1327 })
1328 }
1329 }
1330 AssistantContent::Image { image } => match user_image_to_claude_block(image) {
1331 Ok(block) => Some(block),
1332 Err(err) => {
1333 debug!(
1334 target: "claude::convert_message",
1335 "Skipping unsupported assistant image block: {}",
1336 err
1337 );
1338 None
1339 }
1340 },
1341 AssistantContent::ToolCall { tool_call, .. } => {
1342 Some(ClaudeContentBlock::ToolUse {
1343 id: tool_call.id.clone(),
1344 name: tool_call.name.clone(),
1345 input: tool_call.parameters.clone(),
1346 cache_control: None,
1347 extra: Default::default(),
1348 })
1349 }
1350 AssistantContent::Thought { thought } => {
1351 match thought {
1352 ThoughtContent::Signed { text, signature } => {
1353 Some(ClaudeContentBlock::Thinking {
1354 thinking: text.clone(),
1355 signature: signature.clone(),
1356 cache_control: None,
1357 extra: Default::default(),
1358 })
1359 }
1360 ThoughtContent::Redacted { data } => {
1361 Some(ClaudeContentBlock::RedactedThinking {
1362 data: data.clone(),
1363 cache_control: None,
1364 extra: Default::default(),
1365 })
1366 }
1367 ThoughtContent::Simple { text } => {
1368 Some(ClaudeContentBlock::Text {
1370 text: format!("[Thought: {text}]"),
1371 cache_control: None,
1372 extra: Default::default(),
1373 })
1374 }
1375 }
1376 }
1377 })
1378 .collect();
1379
1380 if claude_blocks.is_empty() {
1381 debug!("No content blocks found: {:?}", content);
1382 Err(ApiError::InvalidRequest {
1383 provider: "anthropic".to_string(),
1384 details: format!(
1385 "Assistant message ID {} resulted in no valid content blocks",
1386 msg.id
1387 ),
1388 })
1389 } else {
1390 let claude_blocks = ensure_thinking_first(claude_blocks);
1391 let claude_content = if claude_blocks.len() == 1 {
1392 if let Some(ClaudeContentBlock::Text { text, .. }) = claude_blocks.first() {
1393 ClaudeMessageContent::Text {
1394 content: text.clone(),
1395 }
1396 } else {
1397 ClaudeMessageContent::StructuredContent {
1398 content: ClaudeStructuredContent(claude_blocks),
1399 }
1400 }
1401 } else {
1402 ClaudeMessageContent::StructuredContent {
1403 content: ClaudeStructuredContent(claude_blocks),
1404 }
1405 };
1406
1407 Ok(ClaudeMessage {
1408 role: ClaudeMessageRole::Assistant,
1409 content: claude_content,
1410 id: Some(msg.id.clone()),
1411 })
1412 }
1413 }
1414 crate::app::conversation::MessageData::Tool {
1415 tool_use_id,
1416 result,
1417 ..
1418 } => {
1419 let (result_text, is_error) = if let ToolResult::Error(e) = result {
1422 (e.to_string(), Some(true))
1423 } else {
1424 let text = result.llm_format();
1426 let text = if text.trim().is_empty() {
1427 "(No output)".to_string()
1428 } else {
1429 text
1430 };
1431 (text, None)
1432 };
1433
1434 let claude_blocks = vec![ClaudeContentBlock::ToolResult {
1435 tool_use_id: tool_use_id.clone(),
1436 content: vec![ClaudeContentBlock::Text {
1437 text: result_text,
1438 cache_control: None,
1439 extra: Default::default(),
1440 }],
1441 is_error,
1442 cache_control: None,
1443 extra: Default::default(),
1444 }];
1445
1446 Ok(ClaudeMessage {
1447 role: ClaudeMessageRole::User, content: ClaudeMessageContent::StructuredContent {
1449 content: ClaudeStructuredContent(claude_blocks),
1450 },
1451 id: Some(msg.id.clone()),
1452 })
1453 }
1454 }
1455}
1456fn ensure_thinking_first(blocks: Vec<ClaudeContentBlock>) -> Vec<ClaudeContentBlock> {
1459 let mut thinking_blocks = Vec::new();
1460 let mut other_blocks = Vec::new();
1461
1462 for block in blocks {
1463 match block {
1464 ClaudeContentBlock::Thinking { .. } | ClaudeContentBlock::RedactedThinking { .. } => {
1465 thinking_blocks.push(block);
1466 }
1467 _ => other_blocks.push(block),
1468 }
1469 }
1470
1471 if thinking_blocks.is_empty() {
1472 other_blocks
1473 } else {
1474 thinking_blocks.extend(other_blocks);
1475 thinking_blocks
1476 }
1477}
1478
1479fn convert_claude_content(claude_blocks: Vec<ClaudeContentBlock>) -> Vec<AssistantContent> {
1481 claude_blocks
1482 .into_iter()
1483 .filter_map(|block| match block {
1484 ClaudeContentBlock::Text { text, .. } => Some(AssistantContent::Text { text }),
1485 ClaudeContentBlock::Image { source, .. } => {
1486 let media_type = source.media_type;
1487 let data = source.data;
1488 Some(AssistantContent::Image {
1489 image: crate::app::conversation::ImageContent {
1490 mime_type: media_type.clone(),
1491 source: crate::app::conversation::ImageSource::DataUrl {
1492 data_url: format!("data:{};base64,{}", media_type, data),
1493 },
1494 width: None,
1495 height: None,
1496 bytes: None,
1497 sha256: None,
1498 },
1499 })
1500 }
1501
1502 ClaudeContentBlock::ToolUse {
1503 id, name, input, ..
1504 } => Some(AssistantContent::ToolCall {
1505 tool_call: steer_tools::ToolCall {
1506 id,
1507 name,
1508 parameters: input,
1509 },
1510 thought_signature: None,
1511 }),
1512 ClaudeContentBlock::ToolResult { .. } => {
1513 warn!("Unexpected ToolResult block received in Claude response content");
1514 None
1515 }
1516 ClaudeContentBlock::Thinking {
1517 thinking,
1518 signature,
1519 ..
1520 } => Some(AssistantContent::Thought {
1521 thought: ThoughtContent::Signed {
1522 text: thinking,
1523 signature,
1524 },
1525 }),
1526 ClaudeContentBlock::RedactedThinking { data, .. } => Some(AssistantContent::Thought {
1527 thought: ThoughtContent::Redacted { data },
1528 }),
1529 ClaudeContentBlock::Unknown => {
1530 warn!("Unknown content block received in Claude response content");
1531 None
1532 }
1533 })
1534 .collect()
1535}
1536
1537fn saturating_u32(value: usize) -> u32 {
1538 u32::try_from(value).unwrap_or(u32::MAX)
1539}
1540
1541fn map_claude_usage(usage: &ClaudeUsage) -> TokenUsage {
1542 let total = usage
1543 .total
1544 .unwrap_or_else(|| usage.input.saturating_add(usage.output));
1545 TokenUsage::new(
1546 saturating_u32(usage.input),
1547 saturating_u32(usage.output),
1548 saturating_u32(total),
1549 )
1550}
1551
1552fn convert_claude_completion(claude_completion: ClaudeCompletionResponse) -> CompletionResponse {
1553 CompletionResponse {
1554 content: convert_claude_content(claude_completion.content),
1555 usage: Some(map_claude_usage(&claude_completion.usage)),
1556 }
1557}
1558
1559#[async_trait]
1560impl Provider for AnthropicClient {
1561 fn name(&self) -> &'static str {
1562 "anthropic"
1563 }
1564
1565 async fn complete(
1566 &self,
1567 model_id: &ModelId,
1568 messages: Vec<AppMessage>,
1569 system: Option<SystemContext>,
1570 tools: Option<Vec<ToolSchema>>,
1571 call_options: Option<ModelParameters>,
1572 token: CancellationToken,
1573 ) -> Result<CompletionResponse, ApiError> {
1574 let mut claude_messages = convert_messages(messages)?;
1575 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1576
1577 if claude_messages.is_empty() {
1578 return Err(ApiError::InvalidRequest {
1579 provider: self.name().to_string(),
1580 details: "No messages provided".to_string(),
1581 });
1582 }
1583
1584 let last_message = claude_messages
1585 .last_mut()
1586 .ok_or_else(|| ApiError::InvalidRequest {
1587 provider: self.name().to_string(),
1588 details: "No messages provided".to_string(),
1589 })?;
1590 let cache_setting = Some(CacheControl {
1591 cache_type: "ephemeral".to_string(),
1592 });
1593
1594 let instruction_policy = match &self.auth {
1595 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1596 AuthMode::ApiKey(_) => None,
1597 };
1598 let system_text = apply_instruction_policy(system, instruction_policy);
1599 let system_content = build_system_content(system_text, cache_setting.clone());
1600
1601 match &mut last_message.content {
1602 ClaudeMessageContent::StructuredContent { content } => {
1603 for block in &mut content.0 {
1604 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1605 cache_control.clone_from(&cache_setting);
1606 }
1607 }
1608 }
1609 ClaudeMessageContent::Text { content } => {
1610 let text_content = content.clone();
1611 last_message.content = ClaudeMessageContent::StructuredContent {
1612 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1613 text: text_content,
1614 cache_control: cache_setting,
1615 extra: Default::default(),
1616 }]),
1617 };
1618 }
1619 }
1620
1621 let supports_thinking = call_options
1623 .as_ref()
1624 .and_then(|opts| opts.thinking_config.as_ref())
1625 .is_some_and(|tc| tc.enabled);
1626
1627 let request = if supports_thinking {
1628 let budget = call_options
1630 .as_ref()
1631 .and_then(|o| o.thinking_config)
1632 .and_then(|tc| tc.budget_tokens)
1633 .unwrap_or(4000);
1634 let thinking = Some(Thinking {
1635 thinking_type: ThinkingType::Enabled,
1636 budget_tokens: budget,
1637 });
1638 CompletionRequest {
1639 model: model_id.id.clone(), messages: claude_messages,
1641 max_tokens: call_options
1642 .as_ref()
1643 .and_then(|o| o.max_tokens)
1644 .map_or(32_000, |v| v as usize),
1645 system: system_content.clone(),
1646 tools,
1647 temperature: call_options
1648 .as_ref()
1649 .and_then(|o| o.temperature)
1650 .or(Some(1.0)),
1651 top_p: call_options.as_ref().and_then(|o| o.top_p),
1652 top_k: None,
1653 stream: None,
1654 thinking,
1655 }
1656 } else {
1657 CompletionRequest {
1658 model: model_id.id.clone(), messages: claude_messages,
1660 max_tokens: call_options
1661 .as_ref()
1662 .and_then(|o| o.max_tokens)
1663 .map_or(8000, |v| v as usize),
1664 system: system_content,
1665 tools,
1666 temperature: call_options
1667 .as_ref()
1668 .and_then(|o| o.temperature)
1669 .or(Some(0.7)),
1670 top_p: call_options.as_ref().and_then(|o| o.top_p),
1671 top_k: None,
1672 stream: None,
1673 thinking: None,
1674 }
1675 };
1676
1677 let auth_ctx = auth_header_context(model_id, RequestKind::Complete);
1678 let mut attempts = 0;
1679
1680 loop {
1681 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1682 let url = self.request_url()?;
1683 let mut request_builder = self.http_client.post(&url).json(&request);
1684
1685 for (name, value) in auth_headers {
1686 request_builder = request_builder.header(&name, &value);
1687 }
1688
1689 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1690 request_builder =
1691 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1692 }
1693
1694 let response = tokio::select! {
1695 biased;
1696 () = token.cancelled() => {
1697 debug!(target: "claude::complete", "Cancellation token triggered before sending request.");
1698 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1699 }
1700 res = request_builder.send() => {
1701 res?
1702 }
1703 };
1704
1705 if token.is_cancelled() {
1706 debug!(target: "claude::complete", "Cancellation token triggered after sending request, before status check.");
1707 return Err(ApiError::Cancelled {
1708 provider: self.name().to_string(),
1709 });
1710 }
1711
1712 let status = response.status();
1713 if !status.is_success() {
1714 let error_text = tokio::select! {
1715 biased;
1716 () = token.cancelled() => {
1717 debug!(target: "claude::complete", "Cancellation token triggered while reading error response body.");
1718 return Err(ApiError::Cancelled{ provider: self.name().to_string()});
1719 }
1720 text_res = response.text() => {
1721 text_res?
1722 }
1723 };
1724
1725 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1726 let action = self
1727 .on_auth_error(status.as_u16(), &error_text, RequestKind::Complete)
1728 .await?;
1729 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1730 attempts += 1;
1731 continue;
1732 }
1733 return Err(ApiError::AuthenticationFailed {
1734 provider: self.name().to_string(),
1735 details: error_text,
1736 });
1737 }
1738
1739 return Err(match status.as_u16() {
1740 401 | 403 => ApiError::AuthenticationFailed {
1741 provider: self.name().to_string(),
1742 details: error_text,
1743 },
1744 429 => ApiError::RateLimited {
1745 provider: self.name().to_string(),
1746 details: error_text,
1747 },
1748 400..=499 => ApiError::InvalidRequest {
1749 provider: self.name().to_string(),
1750 details: error_text,
1751 },
1752 500..=599 => ApiError::ServerError {
1753 provider: self.name().to_string(),
1754 status_code: status.as_u16(),
1755 details: error_text,
1756 },
1757 _ => ApiError::Unknown {
1758 provider: self.name().to_string(),
1759 details: error_text,
1760 },
1761 });
1762 }
1763
1764 let response_text = tokio::select! {
1765 biased;
1766 () = token.cancelled() => {
1767 debug!(target: "claude::complete", "Cancellation token triggered while reading successful response body.");
1768 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1769 }
1770 text_res = response.text() => {
1771 text_res?
1772 }
1773 };
1774
1775 let claude_completion: ClaudeCompletionResponse = serde_json::from_str(&response_text)
1776 .map_err(|e| ApiError::ResponseParsingError {
1777 provider: self.name().to_string(),
1778 details: format!("Error: {e}, Body: {response_text}"),
1779 })?;
1780 let completion = convert_claude_completion(claude_completion);
1781
1782 return Ok(completion);
1783 }
1784 }
1785
1786 async fn stream_complete(
1787 &self,
1788 model_id: &ModelId,
1789 messages: Vec<AppMessage>,
1790 system: Option<SystemContext>,
1791 tools: Option<Vec<ToolSchema>>,
1792 call_options: Option<ModelParameters>,
1793 token: CancellationToken,
1794 ) -> Result<CompletionStream, ApiError> {
1795 let mut claude_messages = convert_messages(messages)?;
1796 let tools = tools.map(|tools| tools.into_iter().map(ClaudeTool::from).collect());
1797
1798 if claude_messages.is_empty() {
1799 return Err(ApiError::InvalidRequest {
1800 provider: self.name().to_string(),
1801 details: "No messages provided".to_string(),
1802 });
1803 }
1804
1805 let last_message = claude_messages
1806 .last_mut()
1807 .ok_or_else(|| ApiError::InvalidRequest {
1808 provider: self.name().to_string(),
1809 details: "No messages provided".to_string(),
1810 })?;
1811 let cache_setting = Some(CacheControl {
1812 cache_type: "ephemeral".to_string(),
1813 });
1814
1815 let instruction_policy = match &self.auth {
1816 AuthMode::Directive(directive) => directive.instruction_policy.as_ref(),
1817 AuthMode::ApiKey(_) => None,
1818 };
1819 let system_text = apply_instruction_policy(system, instruction_policy);
1820 let system_content = build_system_content(system_text, cache_setting.clone());
1821
1822 match &mut last_message.content {
1823 ClaudeMessageContent::StructuredContent { content } => {
1824 for block in &mut content.0 {
1825 if let ClaudeContentBlock::ToolResult { cache_control, .. } = block {
1826 cache_control.clone_from(&cache_setting);
1827 }
1828 }
1829 }
1830 ClaudeMessageContent::Text { content } => {
1831 let text_content = content.clone();
1832 last_message.content = ClaudeMessageContent::StructuredContent {
1833 content: ClaudeStructuredContent(vec![ClaudeContentBlock::Text {
1834 text: text_content,
1835 cache_control: cache_setting,
1836 extra: Default::default(),
1837 }]),
1838 };
1839 }
1840 }
1841
1842 let supports_thinking = call_options
1843 .as_ref()
1844 .and_then(|opts| opts.thinking_config.as_ref())
1845 .is_some_and(|tc| tc.enabled);
1846
1847 let request = if supports_thinking {
1848 let budget = call_options
1849 .as_ref()
1850 .and_then(|o| o.thinking_config)
1851 .and_then(|tc| tc.budget_tokens)
1852 .unwrap_or(4000);
1853 let thinking = Some(Thinking {
1854 thinking_type: ThinkingType::Enabled,
1855 budget_tokens: budget,
1856 });
1857 CompletionRequest {
1858 model: model_id.id.clone(),
1859 messages: claude_messages,
1860 max_tokens: call_options
1861 .as_ref()
1862 .and_then(|o| o.max_tokens)
1863 .map_or(32_000, |v| v as usize),
1864 system: system_content.clone(),
1865 tools,
1866 temperature: call_options
1867 .as_ref()
1868 .and_then(|o| o.temperature)
1869 .or(Some(1.0)),
1870 top_p: call_options.as_ref().and_then(|o| o.top_p),
1871 top_k: None,
1872 stream: Some(true),
1873 thinking,
1874 }
1875 } else {
1876 CompletionRequest {
1877 model: model_id.id.clone(),
1878 messages: claude_messages,
1879 max_tokens: call_options
1880 .as_ref()
1881 .and_then(|o| o.max_tokens)
1882 .map_or(8000, |v| v as usize),
1883 system: system_content,
1884 tools,
1885 temperature: call_options
1886 .as_ref()
1887 .and_then(|o| o.temperature)
1888 .or(Some(0.7)),
1889 top_p: call_options.as_ref().and_then(|o| o.top_p),
1890 top_k: None,
1891 stream: Some(true),
1892 thinking: None,
1893 }
1894 };
1895
1896 let auth_ctx = auth_header_context(model_id, RequestKind::Stream);
1897 let mut attempts = 0;
1898
1899 loop {
1900 let auth_headers = self.auth_headers(auth_ctx.clone()).await?;
1901 let url = self.request_url()?;
1902 let mut request_builder = self.http_client.post(&url).json(&request);
1903
1904 for (name, value) in auth_headers {
1905 request_builder = request_builder.header(&name, &value);
1906 }
1907
1908 if supports_thinking && matches!(&self.auth, AuthMode::ApiKey(_)) {
1909 request_builder =
1910 request_builder.header("anthropic-beta", "interleaved-thinking-2025-05-14");
1911 }
1912
1913 let response = tokio::select! {
1914 biased;
1915 () = token.cancelled() => {
1916 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1917 }
1918 res = request_builder.send() => {
1919 res?
1920 }
1921 };
1922
1923 let status = response.status();
1924 if !status.is_success() {
1925 let error_text = tokio::select! {
1926 biased;
1927 () = token.cancelled() => {
1928 return Err(ApiError::Cancelled { provider: self.name().to_string() });
1929 }
1930 text_res = response.text() => {
1931 text_res?
1932 }
1933 };
1934
1935 if is_auth_status(status) && matches!(&self.auth, AuthMode::Directive(_)) {
1936 let action = self
1937 .on_auth_error(status.as_u16(), &error_text, RequestKind::Stream)
1938 .await?;
1939 if matches!(action, AuthErrorAction::RetryOnce) && attempts == 0 {
1940 attempts += 1;
1941 continue;
1942 }
1943 return Err(ApiError::AuthenticationFailed {
1944 provider: self.name().to_string(),
1945 details: error_text,
1946 });
1947 }
1948
1949 return Err(match status.as_u16() {
1950 401 | 403 => ApiError::AuthenticationFailed {
1951 provider: self.name().to_string(),
1952 details: error_text,
1953 },
1954 429 => ApiError::RateLimited {
1955 provider: self.name().to_string(),
1956 details: error_text,
1957 },
1958 400..=499 => ApiError::InvalidRequest {
1959 provider: self.name().to_string(),
1960 details: error_text,
1961 },
1962 500..=599 => ApiError::ServerError {
1963 provider: self.name().to_string(),
1964 status_code: status.as_u16(),
1965 details: error_text,
1966 },
1967 _ => ApiError::Unknown {
1968 provider: self.name().to_string(),
1969 details: error_text,
1970 },
1971 });
1972 }
1973
1974 let byte_stream = response.bytes_stream();
1975 let sse_stream = parse_sse_stream(byte_stream);
1976
1977 let stream = convert_claude_stream(sse_stream, token);
1978
1979 return Ok(Box::pin(stream));
1980 }
1981 }
1982}
1983
1984fn auth_header_context(model_id: &ModelId, request_kind: RequestKind) -> AuthHeaderContext {
1985 AuthHeaderContext {
1986 model_id: Some(AuthModelId {
1987 provider_id: AuthProviderId(model_id.provider.as_str().to_string()),
1988 model_id: model_id.id.clone(),
1989 }),
1990 request_kind,
1991 }
1992}
1993
1994fn is_auth_status(status: reqwest::StatusCode) -> bool {
1995 matches!(
1996 status,
1997 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
1998 )
1999}
2000
2001fn truncate_body(body: &str) -> String {
2002 const LIMIT: usize = 512;
2003 let mut chars = body.chars();
2004 let truncated: String = chars.by_ref().take(LIMIT).collect();
2005 if chars.next().is_some() {
2006 format!("{truncated}...")
2007 } else {
2008 truncated
2009 }
2010}
2011
2012fn apply_instruction_policy(
2013 system: Option<SystemContext>,
2014 policy: Option<&InstructionPolicy>,
2015) -> Option<String> {
2016 let base = system.as_ref().and_then(|context| {
2017 let trimmed = context.prompt.trim();
2018 if trimmed.is_empty() {
2019 None
2020 } else {
2021 Some(trimmed.to_string())
2022 }
2023 });
2024
2025 let context = system
2026 .as_ref()
2027 .and_then(|context| context.render_with_prompt(base.clone()));
2028
2029 match policy {
2030 None => context,
2031 Some(InstructionPolicy::Prefix(prefix)) => {
2032 if let Some(context) = context {
2033 Some(format!("{prefix}\n{context}"))
2034 } else {
2035 Some(prefix.clone())
2036 }
2037 }
2038 Some(InstructionPolicy::DefaultIfEmpty(default)) => {
2039 if context.is_some() {
2040 context
2041 } else {
2042 Some(default.clone())
2043 }
2044 }
2045 Some(InstructionPolicy::Override(override_text)) => {
2046 let mut combined = override_text.clone();
2047 if let Some(system) = system.as_ref() {
2048 let overlay = system.prompt.trim();
2049 if !overlay.is_empty() {
2050 combined.push_str("\n\n## Operating Mode\n");
2051 combined.push_str(overlay);
2052 }
2053
2054 let env = system
2055 .environment
2056 .as_ref()
2057 .map(|env| env.as_context())
2058 .map(|value| value.trim().to_string())
2059 .filter(|value| !value.is_empty());
2060 if let Some(env) = env {
2061 combined.push_str("\n\n");
2062 combined.push_str(&env);
2063 }
2064 }
2065 Some(combined)
2066 }
2067 }
2068}
2069
2070fn build_system_content(
2071 system: Option<String>,
2072 cache_setting: Option<CacheControl>,
2073) -> Option<System> {
2074 system.map(|text| {
2075 System::Content(vec![SystemContentBlock {
2076 content_type: "text".to_string(),
2077 text,
2078 cache_control: cache_setting,
2079 }])
2080 })
2081}
2082
2083#[derive(Debug)]
2084enum BlockState {
2085 Text {
2086 text: String,
2087 },
2088 Thinking {
2089 text: String,
2090 signature: Option<String>,
2091 },
2092 ToolUse {
2093 id: String,
2094 name: String,
2095 input: String,
2096 },
2097 Unknown,
2098}
2099
2100fn block_state_to_content(state: BlockState) -> Option<AssistantContent> {
2101 match state {
2102 BlockState::Text { text } => {
2103 if text.is_empty() {
2104 None
2105 } else {
2106 Some(AssistantContent::Text { text })
2107 }
2108 }
2109 BlockState::Thinking { text, signature } => {
2110 if text.is_empty() {
2111 None
2112 } else {
2113 let thought = if let Some(sig) = signature {
2114 ThoughtContent::Signed {
2115 text,
2116 signature: sig,
2117 }
2118 } else {
2119 ThoughtContent::Simple { text }
2120 };
2121 Some(AssistantContent::Thought { thought })
2122 }
2123 }
2124 BlockState::ToolUse { id, name, input } => {
2125 if id.is_empty() || name.is_empty() {
2126 None
2127 } else {
2128 let parameters: serde_json::Value = serde_json::from_str(&input)
2129 .unwrap_or(serde_json::Value::Object(Default::default()));
2130 Some(AssistantContent::ToolCall {
2131 tool_call: ToolCall {
2132 id,
2133 name,
2134 parameters,
2135 },
2136 thought_signature: None,
2137 })
2138 }
2139 }
2140 BlockState::Unknown => None,
2141 }
2142}
2143
2144fn convert_claude_stream(
2145 sse_stream: crate::api::sse::SseStream,
2146 token: CancellationToken,
2147) -> impl futures_core::Stream<Item = StreamChunk> + Send {
2148 async_stream::stream! {
2149 let mut block_states: std::collections::HashMap<usize, BlockState> =
2150 std::collections::HashMap::new();
2151 let mut completed_content: Vec<AssistantContent> = Vec::new();
2152 let mut latest_usage: Option<TokenUsage> = None;
2153
2154 tokio::pin!(sse_stream);
2155
2156 while let Some(event_result) = sse_stream.next().await {
2157 if token.is_cancelled() {
2158 yield StreamChunk::Error(StreamError::Cancelled);
2159 break;
2160 }
2161
2162 let event = match event_result {
2163 Ok(e) => e,
2164 Err(e) => {
2165 yield StreamChunk::Error(StreamError::SseParse(e));
2166 break;
2167 }
2168 };
2169
2170 let parsed: Result<ClaudeStreamEvent, _> = serde_json::from_str(&event.data);
2171 let stream_event = match parsed {
2172 Ok(e) => e,
2173 Err(_) => continue,
2174 };
2175
2176 match stream_event {
2177 ClaudeStreamEvent::ContentBlockStart { index, content_block } => {
2178 match content_block.block_type.as_str() {
2179 "text" => {
2180 let text = content_block.text.unwrap_or_default();
2181 if !text.is_empty() {
2182 yield StreamChunk::TextDelta(text.clone());
2183 }
2184 block_states.insert(index, BlockState::Text { text });
2185 }
2186 "thinking" => {
2187 block_states.insert(
2188 index,
2189 BlockState::Thinking {
2190 text: String::new(),
2191 signature: None,
2192 },
2193 );
2194 }
2195 "tool_use" => {
2196 let id = content_block.id.unwrap_or_default();
2197 let name = content_block.name.unwrap_or_default();
2198 if !id.is_empty() && !name.is_empty() {
2199 yield StreamChunk::ToolUseStart {
2200 id: id.clone(),
2201 name: name.clone(),
2202 };
2203 }
2204 block_states.insert(
2205 index,
2206 BlockState::ToolUse {
2207 id,
2208 name,
2209 input: String::new(),
2210 },
2211 );
2212 }
2213 _ => {
2214 block_states.insert(index, BlockState::Unknown);
2215 }
2216 }
2217 }
2218 ClaudeStreamEvent::ContentBlockDelta { index, delta } => match delta {
2219 ClaudeDelta::Text { text } => {
2220 match block_states.get_mut(&index) {
2221 Some(BlockState::Text { text: buf }) => buf.push_str(&text),
2222 _ => {
2223 block_states.insert(index, BlockState::Text { text: text.clone() });
2224 }
2225 }
2226 yield StreamChunk::TextDelta(text);
2227 }
2228 ClaudeDelta::Thinking { thinking } => {
2229 match block_states.get_mut(&index) {
2230 Some(BlockState::Thinking { text, .. }) => text.push_str(&thinking),
2231 _ => {
2232 block_states.insert(
2233 index,
2234 BlockState::Thinking {
2235 text: thinking.clone(),
2236 signature: None,
2237 },
2238 );
2239 }
2240 }
2241 yield StreamChunk::ThinkingDelta(thinking);
2242 }
2243 ClaudeDelta::Signature { signature } => {
2244 if let Some(BlockState::Thinking { signature: sig, .. }) =
2245 block_states.get_mut(&index)
2246 {
2247 *sig = Some(signature);
2248 }
2249 }
2250 ClaudeDelta::InputJson { partial_json } => {
2251 if let Some(BlockState::ToolUse { id, input, .. }) =
2252 block_states.get_mut(&index)
2253 {
2254 input.push_str(&partial_json);
2255 if !id.is_empty() {
2256 yield StreamChunk::ToolUseInputDelta {
2257 id: id.clone(),
2258 delta: partial_json,
2259 };
2260 }
2261 }
2262 }
2263 },
2264 ClaudeStreamEvent::ContentBlockStop { index } => {
2265 if let Some(state) = block_states.remove(&index)
2266 && let Some(content) = block_state_to_content(state)
2267 {
2268 completed_content.push(content);
2269 }
2270 yield StreamChunk::ContentBlockStop { index };
2271 }
2272 ClaudeStreamEvent::MessageStop => {
2273 if !block_states.is_empty() {
2274 tracing::warn!(
2275 target: "anthropic::stream",
2276 "MessageStop received with {} unfinished content blocks",
2277 block_states.len()
2278 );
2279 }
2280 let content = std::mem::take(&mut completed_content);
2281 yield StreamChunk::MessageComplete(CompletionResponse {
2282 content,
2283 usage: latest_usage,
2284 });
2285 break;
2286 }
2287 ClaudeStreamEvent::Error { error } => {
2288 yield StreamChunk::Error(StreamError::Provider {
2289 provider: "anthropic".into(),
2290 error_type: error.error_type,
2291 message: error.message,
2292 });
2293 }
2294 ClaudeStreamEvent::MessageDelta { usage, .. } => {
2295 if let Some(usage) = usage.as_ref() {
2296 latest_usage = Some(map_claude_usage(usage));
2297 }
2298 }
2299 ClaudeStreamEvent::MessageStart { .. } | ClaudeStreamEvent::Ping => {}
2300 }
2301 }
2302 }
2303}