agent_core/client/providers/anthropic/
mod.rs

1mod sse;
2mod types;
3
4use async_stream::stream;
5use futures::Stream;
6
7use crate::client::error::LlmError;
8use crate::client::http::HttpClient;
9use crate::client::models::{Message, MessageOptions, StreamEvent};
10use crate::client::traits::LlmProvider;
11use std::future::Future;
12use std::pin::Pin;
13
14/// Anthropic Claude API provider.
15pub struct AnthropicProvider {
16    /// Anthropic API key.
17    pub api_key: String,
18    /// Model identifier (e.g., "claude-3-5-sonnet-20241022").
19    pub model: String,
20}
21
22impl AnthropicProvider {
23    /// Create a new Anthropic provider with API key and model.
24    pub fn new(api_key: String, model: String) -> Self {
25        Self { api_key, model }
26    }
27}
28
29impl LlmProvider for AnthropicProvider {
30    fn send_msg(
31        &self,
32        client: &HttpClient,
33        messages: &[Message],
34        options: &MessageOptions,
35    ) -> Pin<Box<dyn Future<Output = Result<Message, LlmError>> + Send>> {
36        // Clone data for the async block
37        let client = client.clone();
38        let api_key = self.api_key.clone();
39        let model = self.model.clone();
40        let messages = messages.to_vec();
41        let options = options.clone();
42
43        Box::pin(async move {
44            // Build request body
45            let body = types::build_request_body(&messages, &options, &model)?;
46
47            // Get headers
48            let headers = types::get_request_headers(&api_key);
49            let headers_ref: Vec<(&str, &str)> = headers
50                .iter()
51                .map(|(k, v)| (*k, v.as_str()))
52                .collect();
53
54            // Make the API call
55            let response = client
56                .post(types::get_api_url(), &headers_ref, &body)
57                .await?;
58
59            // Parse and return the response
60            types::parse_response(&response)
61        })
62    }
63
64    fn send_msg_stream(
65        &self,
66        client: &HttpClient,
67        messages: &[Message],
68        options: &MessageOptions,
69    ) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, LlmError>> + Send>>, LlmError>> + Send>> {
70        // Clone data for the async block
71        let client = client.clone();
72        let api_key = self.api_key.clone();
73        let model = self.model.clone();
74        let messages = messages.to_vec();
75        let options = options.clone();
76
77        Box::pin(async move {
78            // Build streaming request body
79            let body = types::build_streaming_request_body(&messages, &options, &model)?;
80
81            // Get headers
82            let headers = types::get_request_headers(&api_key);
83            let headers_ref: Vec<(&str, &str)> = headers
84                .iter()
85                .map(|(k, v)| (*k, v.as_str()))
86                .collect();
87
88            // Make the streaming API call
89            let byte_stream = client
90                .post_stream(types::get_api_url(), &headers_ref, &body)
91                .await?;
92
93            // Convert byte stream to SSE events stream
94            use futures::StreamExt;
95            let event_stream = stream! {
96                let mut buffer = String::new();
97                let mut byte_stream = byte_stream;
98
99                while let Some(chunk_result) = byte_stream.next().await {
100                    match chunk_result {
101                        Ok(bytes) => {
102                            // Append new bytes to buffer
103                            if let Ok(text) = std::str::from_utf8(&bytes) {
104                                buffer.push_str(text);
105                            } else {
106                                yield Err(LlmError::new("SSE_DECODE_ERROR", "Invalid UTF-8 in stream"));
107                                break;
108                            }
109
110                            // Parse complete SSE events from buffer
111                            let (events, remaining) = sse::parse_sse_chunk(&buffer);
112                            buffer = remaining;
113
114                            // Convert and yield each SSE event
115                            for sse_event in events {
116                                match sse::parse_stream_event(&sse_event) {
117                                    Ok(Some(stream_event)) => yield Ok(stream_event),
118                                    Ok(None) => {} // Skip unknown events
119                                    Err(e) => {
120                                        yield Err(e);
121                                        return;
122                                    }
123                                }
124                            }
125                        }
126                        Err(e) => {
127                            yield Err(e);
128                            break;
129                        }
130                    }
131                }
132            };
133
134            Ok(Box::pin(event_stream) as Pin<Box<dyn Stream<Item = Result<StreamEvent, LlmError>> + Send>>)
135        })
136    }
137}