agent_core/client/providers/anthropic/
mod.rs1mod sse;
2mod types;
3
4use async_stream::stream;
5use futures::Stream;
6
7use crate::client::error::LlmError;
8use crate::client::http::HttpClient;
9use crate::client::models::{Message, MessageOptions, StreamEvent};
10use crate::client::traits::LlmProvider;
11use std::future::Future;
12use std::pin::Pin;
13
14pub struct AnthropicProvider {
16 pub api_key: String,
18 pub model: String,
20}
21
22impl AnthropicProvider {
23 pub fn new(api_key: String, model: String) -> Self {
25 Self { api_key, model }
26 }
27}
28
29impl LlmProvider for AnthropicProvider {
30 fn send_msg(
31 &self,
32 client: &HttpClient,
33 messages: &[Message],
34 options: &MessageOptions,
35 ) -> Pin<Box<dyn Future<Output = Result<Message, LlmError>> + Send>> {
36 let client = client.clone();
38 let api_key = self.api_key.clone();
39 let model = self.model.clone();
40 let messages = messages.to_vec();
41 let options = options.clone();
42
43 Box::pin(async move {
44 let body = types::build_request_body(&messages, &options, &model)?;
46
47 let headers = types::get_request_headers(&api_key);
49 let headers_ref: Vec<(&str, &str)> = headers
50 .iter()
51 .map(|(k, v)| (*k, v.as_str()))
52 .collect();
53
54 let response = client
56 .post(types::get_api_url(), &headers_ref, &body)
57 .await?;
58
59 types::parse_response(&response)
61 })
62 }
63
64 fn send_msg_stream(
65 &self,
66 client: &HttpClient,
67 messages: &[Message],
68 options: &MessageOptions,
69 ) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, LlmError>> + Send>>, LlmError>> + Send>> {
70 let client = client.clone();
72 let api_key = self.api_key.clone();
73 let model = self.model.clone();
74 let messages = messages.to_vec();
75 let options = options.clone();
76
77 Box::pin(async move {
78 let body = types::build_streaming_request_body(&messages, &options, &model)?;
80
81 let headers = types::get_request_headers(&api_key);
83 let headers_ref: Vec<(&str, &str)> = headers
84 .iter()
85 .map(|(k, v)| (*k, v.as_str()))
86 .collect();
87
88 let byte_stream = client
90 .post_stream(types::get_api_url(), &headers_ref, &body)
91 .await?;
92
93 use futures::StreamExt;
95 let event_stream = stream! {
96 let mut buffer = String::new();
97 let mut byte_stream = byte_stream;
98
99 while let Some(chunk_result) = byte_stream.next().await {
100 match chunk_result {
101 Ok(bytes) => {
102 if let Ok(text) = std::str::from_utf8(&bytes) {
104 buffer.push_str(text);
105 } else {
106 yield Err(LlmError::new("SSE_DECODE_ERROR", "Invalid UTF-8 in stream"));
107 break;
108 }
109
110 let (events, remaining) = sse::parse_sse_chunk(&buffer);
112 buffer = remaining;
113
114 for sse_event in events {
116 match sse::parse_stream_event(&sse_event) {
117 Ok(Some(stream_event)) => yield Ok(stream_event),
118 Ok(None) => {} Err(e) => {
120 yield Err(e);
121 return;
122 }
123 }
124 }
125 }
126 Err(e) => {
127 yield Err(e);
128 break;
129 }
130 }
131 }
132 };
133
134 Ok(Box::pin(event_stream) as Pin<Box<dyn Stream<Item = Result<StreamEvent, LlmError>> + Send>>)
135 })
136 }
137}