Skip to main content

systemprompt_api/services/gateway/protocol/outbound/anthropic/
mod.rs

1//! Outbound adapter targeting the Anthropic Messages API.
2//!
3//! [`AnthropicOutbound`] builds a Messages request from the canonical model,
4//! sends it upstream, and returns either a buffered [`CanonicalResponse`] or a
5//! stream of canonical events translated from the Anthropic SSE format.
6
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use serde_json::Value;
10use systemprompt_models::wire::anthropic;
11
12use super::super::canonical_response::CanonicalResponse;
13use super::{
14    OutboundAdapter, OutboundCtx, OutboundOutcome, UpstreamError, extract_upstream_message,
15};
16
17mod request;
18mod response;
19mod streaming;
20
21#[cfg(feature = "test-api")]
22pub mod test_api {
23    pub use super::request::build_request_body;
24    pub use super::response::parse_response;
25    pub use super::streaming::sse_to_canonical_events;
26}
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct AnthropicOutbound;
30
31#[async_trait]
32impl OutboundAdapter for AnthropicOutbound {
33    fn provider_tag(&self) -> &'static str {
34        "anthropic"
35    }
36
37    async fn send(&self, ctx: OutboundCtx<'_>) -> Result<OutboundOutcome> {
38        let body = request::build_request_body(ctx.request, ctx.upstream_model);
39        let url = format!("{}/messages", ctx.endpoint.trim_end_matches('/'));
40
41        let client = reqwest::Client::new();
42        let mut req = client.post(&url).json(&body);
43        for (name, value) in anthropic::auth_headers(ctx.api_key) {
44            req = req.header(name, value);
45        }
46        for (name, value) in &ctx.route.extra_headers {
47            req = req.header(name.as_str(), value.as_str());
48        }
49        let upstream_response = req.send().await.map_err(|e| {
50            anyhow::Error::new(UpstreamError::Transport {
51                provider: self.provider_tag(),
52                source: e,
53            })
54        })?;
55
56        let status = upstream_response.status();
57
58        if ctx.request.stream {
59            if !status.is_success() {
60                let err = upstream_response
61                    .text()
62                    .await
63                    .unwrap_or_else(|e| format!("<failed to read upstream body: {e}>"));
64                return Err(anyhow::Error::new(UpstreamError::Status {
65                    provider: self.provider_tag(),
66                    status: status.as_u16(),
67                    message: extract_upstream_message(&err),
68                }));
69            }
70            let stream = upstream_response.bytes_stream();
71            let event_stream = streaming::sse_to_canonical_events(stream);
72            return Ok(OutboundOutcome::Streaming(event_stream));
73        }
74
75        if !status.is_success() {
76            let err = upstream_response
77                .text()
78                .await
79                .unwrap_or_else(|e| format!("<failed to read upstream body: {e}>"));
80            return Err(anyhow::Error::new(UpstreamError::Status {
81                provider: self.provider_tag(),
82                status: status.as_u16(),
83                message: extract_upstream_message(&err),
84            }));
85        }
86
87        let bytes = upstream_response
88            .bytes()
89            .await
90            .map_err(|e| anyhow!("Failed to read Anthropic response: {e}"))?;
91        let value: Value = serde_json::from_slice(&bytes)
92            .map_err(|e| anyhow!("Anthropic response not valid JSON: {e}"))?;
93        let canon: CanonicalResponse = response::parse_response(&value, ctx.request.model.as_str());
94        Ok(OutboundOutcome::Buffered(Box::new(canon)))
95    }
96}