Skip to main content

agentkit_provider_anthropic/
lib.rs

1//! Anthropic Messages API adapter for the agentkit agent loop.
2//!
3//! This crate implements the agentkit [`ModelAdapter`] directly against
4//! Anthropic's `/v1/messages` endpoint. The API is not OpenAI-compatible
5//! (different message shape, `system` is top-level, tool results live as
6//! content blocks inside user messages, etc.), so the generic completions
7//! adapter is not reused.
8//!
9//! Streaming is on by default: the adapter consumes Anthropic's SSE response
10//! and yields `ModelTurnEvent`s as tokens arrive. Call
11//! [`AnthropicConfig::with_streaming(false)`] to opt out in favour of a single
12//! buffered request.
13//!
14//! # Quick start
15//!
16//! ```rust,ignore
17//! use agentkit_loop::{Agent, SessionConfig};
18//! use agentkit_provider_anthropic::{AnthropicAdapter, AnthropicConfig};
19//!
20//! #[tokio::main]
21//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
22//!     let config = AnthropicConfig::from_env()?;
23//!     let adapter = AnthropicAdapter::new(config)?;
24//!     let agent = Agent::builder().model(adapter).build()?;
25//!     let _driver = agent.start(SessionConfig::new("demo")).await?;
26//!     Ok(())
27//! }
28//! ```
29
30mod config;
31mod error;
32mod media;
33mod request;
34mod response;
35mod server_tool;
36mod sse;
37mod stream;
38
39use std::collections::{BTreeSet, VecDeque};
40use std::sync::Arc;
41
42use agentkit_core::TurnCancellation;
43use agentkit_http::{BodyStream, Http, HttpError, HttpRequestBuilder};
44use agentkit_loop::{
45    LoopError, ModelAdapter, ModelSession, ModelTurn, ModelTurnEvent, SessionConfig, TurnRequest,
46};
47use async_trait::async_trait;
48use futures_util::StreamExt;
49use futures_util::future::{Either, select};
50
51use crate::stream::{EventTranslator, SseDecoder};
52
53pub use crate::config::{
54    AnthropicConfig, AnthropicMcpServer, DEFAULT_ANTHROPIC_VERSION, DEFAULT_ENDPOINT, OutputEffort,
55    OutputFormat, ServiceTier, ThinkingConfig, ToolChoice,
56};
57pub use crate::error::AnthropicError;
58pub use crate::server_tool::{
59    BashCodeExecutionTool, CodeExecutionTool, DEFAULT_BASH_EXECUTION_VERSION,
60    DEFAULT_CODE_EXECUTION_VERSION, DEFAULT_TEXT_EDITOR_EXECUTION_VERSION,
61    DEFAULT_WEB_FETCH_VERSION, DEFAULT_WEB_SEARCH_VERSION, RawServerTool, ServerTool,
62    ServerToolHandle, TextEditorCodeExecutionTool, WebFetchTool, WebSearchTool, boxed,
63};
64
65/// Model adapter that connects the agentkit agent loop to the Anthropic
66/// Messages API.
67#[derive(Clone)]
68pub struct AnthropicAdapter {
69    client: Http,
70    config: Arc<AnthropicConfig>,
71}
72
73impl AnthropicAdapter {
74    /// Creates a new adapter from the given configuration, building a default
75    /// reqwest-backed HTTP client.
76    pub fn new(config: AnthropicConfig) -> Result<Self, AnthropicError> {
77        let client = reqwest::Client::builder()
78            .build()
79            .map(Http::new)
80            .map_err(|error| AnthropicError::HttpClient(HttpError::request(error)))?;
81        Ok(Self {
82            client,
83            config: Arc::new(config),
84        })
85    }
86
87    /// Creates a new adapter using a pre-configured [`Http`] client.
88    pub fn with_client(config: AnthropicConfig, client: Http) -> Self {
89        Self {
90            client,
91            config: Arc::new(config),
92        }
93    }
94}
95
96/// An active session with the Anthropic Messages API.
97pub struct AnthropicSession {
98    client: Http,
99    config: Arc<AnthropicConfig>,
100    _session_config: SessionConfig,
101}
102
103/// A turn in progress against the Messages API.
104///
105/// Either runs in buffered (full-JSON) or streaming (SSE) mode depending on
106/// [`AnthropicConfig::streaming`]. The variant is private because the
107/// streaming state carries opaque decoder/translator types.
108pub struct AnthropicTurn {
109    inner: TurnInner,
110}
111
112enum TurnInner {
113    /// Buffered, non-streaming mode.
114    Buffered { events: VecDeque<ModelTurnEvent> },
115    /// Live SSE stream in progress. Boxed because [`EventTranslator`] carries
116    /// a fairly large state machine and SSE responses are a small fraction of
117    /// total turns; keeping the enum compact avoids a ~350B stack cost on the
118    /// buffered path.
119    Streaming(Box<StreamingState>),
120}
121
122struct StreamingState {
123    body: BodyStream,
124    decoder: SseDecoder,
125    translator: EventTranslator,
126    pending: VecDeque<ModelTurnEvent>,
127    eof: bool,
128}
129
130#[async_trait]
131impl ModelAdapter for AnthropicAdapter {
132    type Session = AnthropicSession;
133
134    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError> {
135        Ok(AnthropicSession {
136            client: self.client.clone(),
137            config: self.config.clone(),
138            _session_config: config,
139        })
140    }
141
142    fn provider_name(&self) -> Option<&str> {
143        Some("anthropic")
144    }
145}
146
147#[async_trait]
148impl ModelSession for AnthropicSession {
149    type Turn = AnthropicTurn;
150
151    async fn begin_turn(
152        &mut self,
153        turn_request: TurnRequest,
154        cancellation: Option<TurnCancellation>,
155    ) -> Result<AnthropicTurn, LoopError> {
156        let config = self.config.clone();
157
158        let request_future = async move {
159            let body = request::build_request_body(&config, &turn_request)
160                .map_err(|e| LoopError::Provider(e.to_string()))?;
161
162            let betas = collect_beta_flags(&config);
163
164            let mut http = self
165                .client
166                .post(&config.base_url)
167                .header("Content-Type", "application/json")
168                .header("anthropic-version", config.anthropic_version.as_str());
169
170            http = attach_auth(http, &config)?;
171
172            if !betas.is_empty() {
173                let joined = betas.into_iter().collect::<Vec<_>>().join(",");
174                http = http.header("anthropic-beta", joined);
175            }
176
177            http = http.header(
178                "User-Agent",
179                concat!("agentkit-provider-anthropic/", env!("CARGO_PKG_VERSION")),
180            );
181
182            if config.streaming {
183                http = http.header("Accept", "text/event-stream");
184            }
185
186            let response = http.json(&body).send().await.map_err(|error| {
187                LoopError::Provider(format!("Anthropic request failed: {error}"))
188            })?;
189
190            let status = response.status();
191
192            if !status.is_success() {
193                // Drain the body for the error message, regardless of mode —
194                // the server typically returns JSON error details here.
195                let body_text = response.text().await.unwrap_or_default();
196                return Err(LoopError::Provider(format!(
197                    "Anthropic request failed with status {status}: {body_text}"
198                )));
199            }
200
201            if config.streaming {
202                Ok(AnthropicTurn {
203                    inner: TurnInner::Streaming(Box::new(StreamingState {
204                        body: response.bytes_stream(),
205                        decoder: SseDecoder::new(),
206                        translator: EventTranslator::new(),
207                        pending: VecDeque::new(),
208                        eof: false,
209                    })),
210                })
211            } else {
212                let body_text = response.text().await.map_err(|error| {
213                    LoopError::Provider(format!("failed to read Anthropic response body: {error}"))
214                })?;
215
216                let events = response::build_turn_from_response(&body_text)
217                    .map_err(|e| LoopError::Provider(e.to_string()))?;
218                Ok(AnthropicTurn {
219                    inner: TurnInner::Buffered { events },
220                })
221            }
222        };
223
224        if let Some(cancellation) = cancellation {
225            futures_util::pin_mut!(request_future);
226            let cancelled = cancellation.cancelled();
227            futures_util::pin_mut!(cancelled);
228            match select(request_future, cancelled).await {
229                Either::Left((result, _)) => result,
230                Either::Right((_, _)) => Err(LoopError::Cancelled),
231            }
232        } else {
233            request_future.await
234        }
235    }
236
237    fn model_name(&self) -> Option<&str> {
238        Some(&self.config.model)
239    }
240}
241
242#[async_trait]
243impl ModelTurn for AnthropicTurn {
244    async fn next_event(
245        &mut self,
246        cancellation: Option<TurnCancellation>,
247    ) -> Result<Option<ModelTurnEvent>, LoopError> {
248        if cancellation
249            .as_ref()
250            .is_some_and(TurnCancellation::is_cancelled)
251        {
252            return Err(LoopError::Cancelled);
253        }
254        match &mut self.inner {
255            TurnInner::Buffered { events } => Ok(events.pop_front()),
256            TurnInner::Streaming(state) => {
257                let StreamingState {
258                    body,
259                    decoder,
260                    translator,
261                    pending,
262                    eof,
263                } = state.as_mut();
264                next_streaming_event(body, decoder, translator, pending, eof, cancellation).await
265            }
266        }
267    }
268}
269
270/// Pulls the next event from an active SSE stream, decoding more bytes as
271/// needed. Returns `Ok(None)` once the translator has emitted `Finished` and
272/// the pending queue is empty.
273async fn next_streaming_event(
274    body: &mut BodyStream,
275    decoder: &mut SseDecoder,
276    translator: &mut EventTranslator,
277    pending: &mut VecDeque<ModelTurnEvent>,
278    eof: &mut bool,
279    cancellation: Option<TurnCancellation>,
280) -> Result<Option<ModelTurnEvent>, LoopError> {
281    loop {
282        if let Some(event) = pending.pop_front() {
283            return Ok(Some(event));
284        }
285        if *eof || translator.is_done() {
286            return Ok(None);
287        }
288
289        // Await the next chunk, racing against cancellation so long-lived
290        // streams can be interrupted mid-response.
291        let chunk = if let Some(cancellation) = cancellation.as_ref() {
292            let next = body.next();
293            futures_util::pin_mut!(next);
294            let cancelled = cancellation.cancelled();
295            futures_util::pin_mut!(cancelled);
296            match select(next, cancelled).await {
297                Either::Left((chunk, _)) => chunk,
298                Either::Right((_, _)) => return Err(LoopError::Cancelled),
299            }
300        } else {
301            body.next().await
302        };
303
304        match chunk {
305            Some(Ok(bytes)) => {
306                let text = std::str::from_utf8(&bytes).map_err(|e| {
307                    LoopError::Provider(format!("invalid UTF-8 in Anthropic stream: {e}"))
308                })?;
309                for sse in decoder.feed(text) {
310                    for produced in translator.handle(&sse)? {
311                        pending.push_back(produced);
312                    }
313                }
314            }
315            Some(Err(e)) => {
316                return Err(LoopError::Provider(format!(
317                    "Anthropic stream body error: {e}"
318                )));
319            }
320            None => {
321                *eof = true;
322            }
323        }
324    }
325}
326
327fn attach_auth(
328    builder: HttpRequestBuilder,
329    config: &AnthropicConfig,
330) -> Result<HttpRequestBuilder, LoopError> {
331    if let Some(token) = &config.auth_token {
332        return Ok(builder.bearer_auth(token));
333    }
334    if let Some(key) = &config.api_key {
335        return Ok(builder.header("x-api-key", key.as_str()));
336    }
337    Err(LoopError::Provider(
338        AnthropicError::MissingCredentials.to_string(),
339    ))
340}
341
342fn collect_beta_flags(config: &AnthropicConfig) -> BTreeSet<String> {
343    let mut betas: BTreeSet<String> = config.anthropic_beta.iter().cloned().collect();
344    for tool in &config.server_tools {
345        for flag in tool.beta_flags() {
346            betas.insert(flag);
347        }
348    }
349    betas
350}
351
352#[cfg(test)]
353mod tests {
354    use agentkit_core::{CancellationController, FinishReason};
355    use agentkit_http::HttpError;
356    use bytes::Bytes;
357    use futures_util::stream;
358
359    use super::*;
360
361    #[test]
362    fn rejects_zero_max_tokens() {
363        match AnthropicConfig::new("k", "claude-opus-4-7", 0) {
364            Err(AnthropicError::InvalidMaxTokens) => {}
365            other => panic!("expected InvalidMaxTokens, got {:?}", other.map(|_| ())),
366        }
367    }
368
369    #[test]
370    fn beta_flags_union_includes_server_tool_requirements() {
371        let cfg = AnthropicConfig::new("k", "claude-opus-4-7", 1024)
372            .unwrap()
373            .with_beta("extended-thinking-2025-05-07")
374            .with_server_tool(boxed(
375                RawServerTool::new(serde_json::json!({
376                    "type": "future_tool_20271231",
377                    "name": "future_tool",
378                }))
379                .with_beta("future-tool-2027-12-31"),
380            ));
381        let flags = collect_beta_flags(&cfg);
382        assert!(flags.contains("extended-thinking-2025-05-07"));
383        assert!(flags.contains("future-tool-2027-12-31"));
384    }
385
386    /// Builds an `AnthropicTurn::Streaming` backed by a canned byte stream so
387    /// we can exercise the full decode -> translate -> yield pipeline without
388    /// a live HTTP connection.
389    fn streaming_turn_from(chunks: Vec<&'static str>) -> AnthropicTurn {
390        let body: BodyStream = Box::pin(stream::iter(
391            chunks
392                .into_iter()
393                .map(|c| Ok::<_, HttpError>(Bytes::from_static(c.as_bytes()))),
394        ));
395        AnthropicTurn {
396            inner: TurnInner::Streaming(Box::new(StreamingState {
397                body,
398                decoder: SseDecoder::new(),
399                translator: EventTranslator::new(),
400                pending: VecDeque::new(),
401                eof: false,
402            })),
403        }
404    }
405
406    #[tokio::test(flavor = "current_thread")]
407    async fn streaming_turn_drains_to_finished() {
408        let chunks = vec![
409            "event: message_start\ndata: {\"message\":{\"id\":\"m\",\"model\":\"x\",\"usage\":{\"input_tokens\":1,\"output_tokens\":0}}}\n\n",
410            "event: content_block_start\ndata: {\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
411            "event: content_block_delta\ndata: {\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n",
412            "event: content_block_stop\ndata: {\"index\":0}\n\n",
413            "event: message_delta\ndata: {\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":1}}\n\n",
414            "event: message_stop\ndata: {}\n\n",
415        ];
416        let mut turn = streaming_turn_from(chunks);
417
418        let mut seen_finished = false;
419        while let Some(event) = turn.next_event(None).await.expect("next_event") {
420            if let ModelTurnEvent::Finished(result) = event {
421                assert_eq!(result.finish_reason, FinishReason::Completed);
422                seen_finished = true;
423            }
424        }
425        assert!(seen_finished, "turn never emitted Finished");
426    }
427
428    #[tokio::test(flavor = "current_thread")]
429    async fn streaming_turn_respects_pre_fired_cancellation() {
430        let chunks = vec![
431            "event: message_start\ndata: {\"message\":{\"id\":\"m\",\"model\":\"x\",\"usage\":{\"input_tokens\":1,\"output_tokens\":0}}}\n\n",
432        ];
433        let mut turn = streaming_turn_from(chunks);
434
435        let controller = CancellationController::new();
436        let checkpoint = TurnCancellation::new(controller.handle());
437        // Fire cancellation before polling.
438        controller.interrupt();
439
440        let err = turn.next_event(Some(checkpoint)).await.unwrap_err();
441        assert!(matches!(err, LoopError::Cancelled));
442    }
443}