agentkit_provider_anthropic/
lib.rs1mod config;
31mod error;
32mod media;
33mod request;
34mod response;
35mod server_tool;
36mod sse;
37mod stream;
38
39use std::collections::{BTreeSet, VecDeque};
40use std::sync::Arc;
41
42use agentkit_core::TurnCancellation;
43use agentkit_http::{BodyStream, Http, HttpError, HttpRequestBuilder};
44use agentkit_loop::{
45 LoopError, ModelAdapter, ModelSession, ModelTurn, ModelTurnEvent, SessionConfig, TurnRequest,
46};
47use async_trait::async_trait;
48use futures_util::StreamExt;
49use futures_util::future::{Either, select};
50
51use crate::stream::{EventTranslator, SseDecoder};
52
53pub use crate::config::{
54 AnthropicConfig, AnthropicMcpServer, DEFAULT_ANTHROPIC_VERSION, DEFAULT_ENDPOINT, OutputEffort,
55 OutputFormat, ServiceTier, ThinkingConfig, ToolChoice,
56};
57pub use crate::error::AnthropicError;
58pub use crate::server_tool::{
59 BashCodeExecutionTool, CodeExecutionTool, DEFAULT_BASH_EXECUTION_VERSION,
60 DEFAULT_CODE_EXECUTION_VERSION, DEFAULT_TEXT_EDITOR_EXECUTION_VERSION,
61 DEFAULT_WEB_FETCH_VERSION, DEFAULT_WEB_SEARCH_VERSION, RawServerTool, ServerTool,
62 ServerToolHandle, TextEditorCodeExecutionTool, WebFetchTool, WebSearchTool, boxed,
63};
64
65#[derive(Clone)]
68pub struct AnthropicAdapter {
69 client: Http,
70 config: Arc<AnthropicConfig>,
71}
72
73impl AnthropicAdapter {
74 pub fn new(config: AnthropicConfig) -> Result<Self, AnthropicError> {
77 let client = reqwest::Client::builder()
78 .build()
79 .map(Http::new)
80 .map_err(|error| AnthropicError::HttpClient(HttpError::request(error)))?;
81 Ok(Self {
82 client,
83 config: Arc::new(config),
84 })
85 }
86
87 pub fn with_client(config: AnthropicConfig, client: Http) -> Self {
89 Self {
90 client,
91 config: Arc::new(config),
92 }
93 }
94}
95
96pub struct AnthropicSession {
98 client: Http,
99 config: Arc<AnthropicConfig>,
100 _session_config: SessionConfig,
101}
102
103pub struct AnthropicTurn {
109 inner: TurnInner,
110}
111
112enum TurnInner {
113 Buffered { events: VecDeque<ModelTurnEvent> },
115 Streaming(Box<StreamingState>),
120}
121
122struct StreamingState {
123 body: BodyStream,
124 decoder: SseDecoder,
125 translator: EventTranslator,
126 pending: VecDeque<ModelTurnEvent>,
127 eof: bool,
128}
129
130#[async_trait]
131impl ModelAdapter for AnthropicAdapter {
132 type Session = AnthropicSession;
133
134 async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError> {
135 Ok(AnthropicSession {
136 client: self.client.clone(),
137 config: self.config.clone(),
138 _session_config: config,
139 })
140 }
141
142 fn provider_name(&self) -> Option<&str> {
143 Some("anthropic")
144 }
145}
146
147#[async_trait]
148impl ModelSession for AnthropicSession {
149 type Turn = AnthropicTurn;
150
151 async fn begin_turn(
152 &mut self,
153 turn_request: TurnRequest,
154 cancellation: Option<TurnCancellation>,
155 ) -> Result<AnthropicTurn, LoopError> {
156 let config = self.config.clone();
157
158 let request_future = async move {
159 let body = request::build_request_body(&config, &turn_request)
160 .map_err(|e| LoopError::Provider(e.to_string()))?;
161
162 let betas = collect_beta_flags(&config);
163
164 let mut http = self
165 .client
166 .post(&config.base_url)
167 .header("Content-Type", "application/json")
168 .header("anthropic-version", config.anthropic_version.as_str());
169
170 http = attach_auth(http, &config)?;
171
172 if !betas.is_empty() {
173 let joined = betas.into_iter().collect::<Vec<_>>().join(",");
174 http = http.header("anthropic-beta", joined);
175 }
176
177 http = http.header(
178 "User-Agent",
179 concat!("agentkit-provider-anthropic/", env!("CARGO_PKG_VERSION")),
180 );
181
182 if config.streaming {
183 http = http.header("Accept", "text/event-stream");
184 }
185
186 let response = http.json(&body).send().await.map_err(|error| {
187 LoopError::Provider(format!("Anthropic request failed: {error}"))
188 })?;
189
190 let status = response.status();
191
192 if !status.is_success() {
193 let body_text = response.text().await.unwrap_or_default();
196 return Err(LoopError::Provider(format!(
197 "Anthropic request failed with status {status}: {body_text}"
198 )));
199 }
200
201 if config.streaming {
202 Ok(AnthropicTurn {
203 inner: TurnInner::Streaming(Box::new(StreamingState {
204 body: response.bytes_stream(),
205 decoder: SseDecoder::new(),
206 translator: EventTranslator::new(),
207 pending: VecDeque::new(),
208 eof: false,
209 })),
210 })
211 } else {
212 let body_text = response.text().await.map_err(|error| {
213 LoopError::Provider(format!("failed to read Anthropic response body: {error}"))
214 })?;
215
216 let events = response::build_turn_from_response(&body_text)
217 .map_err(|e| LoopError::Provider(e.to_string()))?;
218 Ok(AnthropicTurn {
219 inner: TurnInner::Buffered { events },
220 })
221 }
222 };
223
224 if let Some(cancellation) = cancellation {
225 futures_util::pin_mut!(request_future);
226 let cancelled = cancellation.cancelled();
227 futures_util::pin_mut!(cancelled);
228 match select(request_future, cancelled).await {
229 Either::Left((result, _)) => result,
230 Either::Right((_, _)) => Err(LoopError::Cancelled),
231 }
232 } else {
233 request_future.await
234 }
235 }
236
237 fn model_name(&self) -> Option<&str> {
238 Some(&self.config.model)
239 }
240}
241
242#[async_trait]
243impl ModelTurn for AnthropicTurn {
244 async fn next_event(
245 &mut self,
246 cancellation: Option<TurnCancellation>,
247 ) -> Result<Option<ModelTurnEvent>, LoopError> {
248 if cancellation
249 .as_ref()
250 .is_some_and(TurnCancellation::is_cancelled)
251 {
252 return Err(LoopError::Cancelled);
253 }
254 match &mut self.inner {
255 TurnInner::Buffered { events } => Ok(events.pop_front()),
256 TurnInner::Streaming(state) => {
257 let StreamingState {
258 body,
259 decoder,
260 translator,
261 pending,
262 eof,
263 } = state.as_mut();
264 next_streaming_event(body, decoder, translator, pending, eof, cancellation).await
265 }
266 }
267 }
268}
269
270async fn next_streaming_event(
274 body: &mut BodyStream,
275 decoder: &mut SseDecoder,
276 translator: &mut EventTranslator,
277 pending: &mut VecDeque<ModelTurnEvent>,
278 eof: &mut bool,
279 cancellation: Option<TurnCancellation>,
280) -> Result<Option<ModelTurnEvent>, LoopError> {
281 loop {
282 if let Some(event) = pending.pop_front() {
283 return Ok(Some(event));
284 }
285 if *eof || translator.is_done() {
286 return Ok(None);
287 }
288
289 let chunk = if let Some(cancellation) = cancellation.as_ref() {
292 let next = body.next();
293 futures_util::pin_mut!(next);
294 let cancelled = cancellation.cancelled();
295 futures_util::pin_mut!(cancelled);
296 match select(next, cancelled).await {
297 Either::Left((chunk, _)) => chunk,
298 Either::Right((_, _)) => return Err(LoopError::Cancelled),
299 }
300 } else {
301 body.next().await
302 };
303
304 match chunk {
305 Some(Ok(bytes)) => {
306 let text = std::str::from_utf8(&bytes).map_err(|e| {
307 LoopError::Provider(format!("invalid UTF-8 in Anthropic stream: {e}"))
308 })?;
309 for sse in decoder.feed(text) {
310 for produced in translator.handle(&sse)? {
311 pending.push_back(produced);
312 }
313 }
314 }
315 Some(Err(e)) => {
316 return Err(LoopError::Provider(format!(
317 "Anthropic stream body error: {e}"
318 )));
319 }
320 None => {
321 *eof = true;
322 }
323 }
324 }
325}
326
327fn attach_auth(
328 builder: HttpRequestBuilder,
329 config: &AnthropicConfig,
330) -> Result<HttpRequestBuilder, LoopError> {
331 if let Some(token) = &config.auth_token {
332 return Ok(builder.bearer_auth(token));
333 }
334 if let Some(key) = &config.api_key {
335 return Ok(builder.header("x-api-key", key.as_str()));
336 }
337 Err(LoopError::Provider(
338 AnthropicError::MissingCredentials.to_string(),
339 ))
340}
341
342fn collect_beta_flags(config: &AnthropicConfig) -> BTreeSet<String> {
343 let mut betas: BTreeSet<String> = config.anthropic_beta.iter().cloned().collect();
344 for tool in &config.server_tools {
345 for flag in tool.beta_flags() {
346 betas.insert(flag);
347 }
348 }
349 betas
350}
351
352#[cfg(test)]
353mod tests {
354 use agentkit_core::{CancellationController, FinishReason};
355 use agentkit_http::HttpError;
356 use bytes::Bytes;
357 use futures_util::stream;
358
359 use super::*;
360
361 #[test]
362 fn rejects_zero_max_tokens() {
363 match AnthropicConfig::new("k", "claude-opus-4-7", 0) {
364 Err(AnthropicError::InvalidMaxTokens) => {}
365 other => panic!("expected InvalidMaxTokens, got {:?}", other.map(|_| ())),
366 }
367 }
368
369 #[test]
370 fn beta_flags_union_includes_server_tool_requirements() {
371 let cfg = AnthropicConfig::new("k", "claude-opus-4-7", 1024)
372 .unwrap()
373 .with_beta("extended-thinking-2025-05-07")
374 .with_server_tool(boxed(
375 RawServerTool::new(serde_json::json!({
376 "type": "future_tool_20271231",
377 "name": "future_tool",
378 }))
379 .with_beta("future-tool-2027-12-31"),
380 ));
381 let flags = collect_beta_flags(&cfg);
382 assert!(flags.contains("extended-thinking-2025-05-07"));
383 assert!(flags.contains("future-tool-2027-12-31"));
384 }
385
386 fn streaming_turn_from(chunks: Vec<&'static str>) -> AnthropicTurn {
390 let body: BodyStream = Box::pin(stream::iter(
391 chunks
392 .into_iter()
393 .map(|c| Ok::<_, HttpError>(Bytes::from_static(c.as_bytes()))),
394 ));
395 AnthropicTurn {
396 inner: TurnInner::Streaming(Box::new(StreamingState {
397 body,
398 decoder: SseDecoder::new(),
399 translator: EventTranslator::new(),
400 pending: VecDeque::new(),
401 eof: false,
402 })),
403 }
404 }
405
406 #[tokio::test(flavor = "current_thread")]
407 async fn streaming_turn_drains_to_finished() {
408 let chunks = vec![
409 "event: message_start\ndata: {\"message\":{\"id\":\"m\",\"model\":\"x\",\"usage\":{\"input_tokens\":1,\"output_tokens\":0}}}\n\n",
410 "event: content_block_start\ndata: {\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
411 "event: content_block_delta\ndata: {\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n",
412 "event: content_block_stop\ndata: {\"index\":0}\n\n",
413 "event: message_delta\ndata: {\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":1}}\n\n",
414 "event: message_stop\ndata: {}\n\n",
415 ];
416 let mut turn = streaming_turn_from(chunks);
417
418 let mut seen_finished = false;
419 while let Some(event) = turn.next_event(None).await.expect("next_event") {
420 if let ModelTurnEvent::Finished(result) = event {
421 assert_eq!(result.finish_reason, FinishReason::Completed);
422 seen_finished = true;
423 }
424 }
425 assert!(seen_finished, "turn never emitted Finished");
426 }
427
428 #[tokio::test(flavor = "current_thread")]
429 async fn streaming_turn_respects_pre_fired_cancellation() {
430 let chunks = vec![
431 "event: message_start\ndata: {\"message\":{\"id\":\"m\",\"model\":\"x\",\"usage\":{\"input_tokens\":1,\"output_tokens\":0}}}\n\n",
432 ];
433 let mut turn = streaming_turn_from(chunks);
434
435 let controller = CancellationController::new();
436 let checkpoint = TurnCancellation::new(controller.handle());
437 controller.interrupt();
439
440 let err = turn.next_event(Some(checkpoint)).await.unwrap_err();
441 assert!(matches!(err, LoopError::Cancelled));
442 }
443}