use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::adapter::{
blake3_hex, BoxStream, LlmAdapter, LlmError, LlmRequest, LlmResponse, LlmRole, StreamChunk,
};
use crate::sensitivity::{check_remote_prompt_sensitivity, MaxSensitivity};
pub const CLAUDE_ADAPTER_API_KEY_MISSING_INVARIANT: &str = "cortex.run.claude.api_key_missing";
pub const CLAUDE_ADAPTER_MODEL_NOT_ALLOWED_INVARIANT: &str = "cortex.run.claude.model_not_allowed";
pub const CLAUDE_ADAPTER_ENDPOINT_REJECTED_INVARIANT: &str = "cortex.run.claude.endpoint_rejected";
#[derive(Debug, Clone)]
pub struct ClaudeHttpAdapter {
api_key: String,
model: String,
base_url: String,
max_sensitivity: MaxSensitivity,
}
impl ClaudeHttpAdapter {
pub const ANTHROPIC_API_BASE: &'static str = "https://api.anthropic.com";
pub const ANTHROPIC_API_KEY_ENV: &'static str = "CORTEX_CLAUDE_API_KEY";
pub const ANTHROPIC_VERSION_HEADER: &'static str = "2023-06-01";
pub fn new(model: String, max_sensitivity: Option<MaxSensitivity>) -> Result<Self, LlmError> {
let api_key = std::env::var(Self::ANTHROPIC_API_KEY_ENV)
.ok()
.filter(|v| !v.is_empty())
.ok_or_else(|| {
LlmError::InvalidRequest(format!(
"env var {} is absent or empty; refusing to construct ClaudeHttpAdapter",
Self::ANTHROPIC_API_KEY_ENV
))
})?;
if model.is_empty() {
return Err(LlmError::InvalidRequest(
"model must not be empty".to_string(),
));
}
if model.contains("latest") {
return Err(LlmError::InvalidRequest(format!(
"model '{model}' contains 'latest' alias; pin to a specific version for audit-trail identity"
)));
}
Ok(Self {
api_key,
model,
base_url: Self::ANTHROPIC_API_BASE.to_string(),
max_sensitivity: max_sensitivity.unwrap_or(MaxSensitivity::Medium),
})
}
#[doc(hidden)]
pub fn new_with_base_url(
model: String,
base_url: String,
max_sensitivity: Option<MaxSensitivity>,
) -> Result<Self, LlmError> {
let api_key = std::env::var(Self::ANTHROPIC_API_KEY_ENV)
.ok()
.filter(|v| !v.is_empty())
.ok_or_else(|| {
LlmError::InvalidRequest(format!(
"env var {} is absent or empty; refusing to construct ClaudeHttpAdapter",
Self::ANTHROPIC_API_KEY_ENV
))
})?;
if model.is_empty() {
return Err(LlmError::InvalidRequest(
"model must not be empty".to_string(),
));
}
if model.contains("latest") {
return Err(LlmError::InvalidRequest(format!(
"model '{model}' contains 'latest' alias; pin to a specific version for audit-trail identity"
)));
}
Ok(Self {
api_key,
model,
base_url,
max_sensitivity: max_sensitivity.unwrap_or(MaxSensitivity::Medium),
})
}
}
#[derive(Debug, Serialize)]
struct MessagesRequest<'a> {
model: &'a str,
max_tokens: u32,
messages: Vec<AnthropicMessage<'a>>,
stream: bool,
}
#[derive(Debug, Deserialize)]
struct SseEvent {
#[serde(rename = "type")]
event_type: String,
#[serde(default)]
delta: Option<SseDelta>,
}
#[derive(Debug, Deserialize)]
struct SseDelta {
#[serde(rename = "type")]
delta_type: String,
#[serde(default)]
text: String,
}
#[derive(Debug, Serialize)]
struct AnthropicMessage<'a> {
role: &'a str,
content: &'a str,
}
#[derive(Debug, Deserialize)]
struct MessagesResponse {
#[serde(default)]
content: Vec<ContentBlock>,
#[serde(default)]
model: String,
#[serde(default)]
usage: Option<AnthropicUsage>,
}
#[derive(Debug, Deserialize)]
struct ContentBlock {
#[serde(rename = "type")]
block_type: String,
#[serde(default)]
text: String,
}
#[derive(Debug, Deserialize)]
struct AnthropicUsage {
input_tokens: u32,
output_tokens: u32,
}
#[async_trait]
impl LlmAdapter for ClaudeHttpAdapter {
fn adapter_id(&self) -> &'static str {
"claude"
}
async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
let prompt_text: String = std::iter::once(req.system.as_str())
.chain(req.messages.iter().map(|m| m.content.as_str()))
.collect::<Vec<_>>()
.join("\n");
check_remote_prompt_sensitivity(&prompt_text, self.max_sensitivity)?;
let api_key = self.api_key.clone();
let model = self.model.clone();
let base_url = self.base_url.clone();
let timeout_ms = req.timeout_ms;
let result = tokio::task::spawn_blocking(move || {
call_claude(&api_key, &model, &base_url, &req, timeout_ms)
})
.await
.map_err(|e| LlmError::Transport(format!("spawn_blocking join error: {e}")))?;
result
}
fn stream_boxed(&self, req: LlmRequest) -> BoxStream<'_> {
stream_claude_sse(
self.api_key.clone(),
self.model.clone(),
self.base_url.clone(),
req,
)
}
}
fn call_claude(
api_key: &str,
model: &str,
base_url: &str,
req: &LlmRequest,
timeout_ms: u64,
) -> Result<LlmResponse, LlmError> {
let url = format!("{base_url}/v1/messages");
let messages: Vec<AnthropicMessage<'_>> = req
.messages
.iter()
.map(|m| AnthropicMessage {
role: m.role.as_anthropic_str(),
content: &m.content,
})
.collect();
let body = MessagesRequest {
model,
max_tokens: req.max_tokens,
messages,
stream: false,
};
let body_value = serde_json::to_value(&body)
.map_err(|e| LlmError::Transport(format!("request serialization failed: {e}")))?;
let timeout = Duration::from_millis(timeout_ms);
let agent = ureq::AgentBuilder::new().timeout(timeout).build();
let raw_response = agent
.post(&url)
.set("x-api-key", api_key)
.set(
"anthropic-version",
ClaudeHttpAdapter::ANTHROPIC_VERSION_HEADER,
)
.set("content-type", "application/json")
.send_json(body_value)
.map_err(|err| map_ureq_error(err, timeout_ms))?;
let status = raw_response.status();
if status != 200 {
return Err(LlmError::Upstream(format!("HTTP {status}")));
}
let response_text = raw_response
.into_string()
.map_err(|e| LlmError::Transport(format!("reading response body: {e}")))?;
let parsed: MessagesResponse = serde_json::from_str(&response_text)
.map_err(|e| LlmError::Parse(format!("anthropic response parse: {e}")))?;
let text = parsed
.content
.into_iter()
.find(|block| block.block_type == "text")
.map(|block| block.text)
.ok_or_else(|| {
LlmError::Parse("anthropic response contained no text content block".to_string())
})?;
let raw_hash = blake3_hex(response_text.as_bytes());
let usage = parsed.usage.map(|u| crate::adapter::TokenUsage {
prompt_tokens: u.input_tokens,
completion_tokens: u.output_tokens,
});
let response_model = if parsed.model.is_empty() {
model.to_string()
} else {
parsed.model
};
Ok(LlmResponse {
text,
parsed_json: None,
model: response_model,
usage,
raw_hash,
})
}
fn stream_claude_sse(
api_key: String,
model: String,
base_url: String,
req: LlmRequest,
) -> BoxStream<'static> {
Box::pin(async_stream::stream! {
let timeout_ms = req.timeout_ms;
let result = tokio::task::spawn_blocking(move || {
call_claude_streaming(&api_key, &model, &base_url, &req, timeout_ms)
})
.await;
match result {
Ok(chunks) => {
for chunk in chunks {
yield chunk;
}
}
Err(e) => yield Err(LlmError::Transport(format!("spawn_blocking join error: {e}"))),
}
})
}
fn call_claude_streaming(
api_key: &str,
model: &str,
base_url: &str,
req: &LlmRequest,
timeout_ms: u64,
) -> Vec<Result<StreamChunk, LlmError>> {
let url = format!("{base_url}/v1/messages");
let messages: Vec<AnthropicMessage<'_>> = req
.messages
.iter()
.map(|m| AnthropicMessage {
role: m.role.as_anthropic_str(),
content: &m.content,
})
.collect();
let body = MessagesRequest {
model,
max_tokens: req.max_tokens,
messages,
stream: true,
};
let body_value = match serde_json::to_value(&body) {
Ok(v) => v,
Err(e) => {
return vec![Err(LlmError::Transport(format!(
"request serialization failed: {e}"
)))]
}
};
let timeout = Duration::from_millis(timeout_ms);
let agent = ureq::AgentBuilder::new().timeout(timeout).build();
let raw_response = match agent
.post(&url)
.set("x-api-key", api_key)
.set(
"anthropic-version",
ClaudeHttpAdapter::ANTHROPIC_VERSION_HEADER,
)
.set("content-type", "application/json")
.send_json(body_value)
{
Ok(r) => r,
Err(err) => return vec![Err(map_ureq_error(err, timeout_ms))],
};
let status = raw_response.status();
if status != 200 {
return vec![Err(LlmError::Upstream(format!("HTTP {status}")))];
}
let body_text = match raw_response.into_string() {
Ok(s) => s,
Err(e) => {
return vec![Err(LlmError::Transport(format!(
"reading streaming response body: {e}"
)))]
}
};
let mut chunks = Vec::new();
for line in body_text.lines() {
if line.is_empty() || line.starts_with("event:") {
continue;
}
let data = match line.strip_prefix("data:") {
Some(rest) => rest.trim(),
None => continue,
};
let event: SseEvent = match serde_json::from_str(data) {
Ok(v) => v,
Err(e) => {
chunks.push(Err(LlmError::Parse(format!(
"claude SSE data parse: {e}: {data}"
))));
continue;
}
};
match event.event_type.as_str() {
"content_block_delta" => {
if let Some(delta) = event.delta {
if delta.delta_type == "text_delta" {
chunks.push(Ok(StreamChunk {
delta: delta.text,
finish_reason: None,
}));
}
}
}
"message_stop" => {
chunks.push(Ok(StreamChunk {
delta: String::new(),
finish_reason: Some("stop".into()),
}));
return chunks;
}
_ => {
}
}
}
chunks
}
fn map_ureq_error(err: ureq::Error, timeout_ms: u64) -> LlmError {
match err {
ureq::Error::Transport(t) => {
let msg = t.to_string();
if is_timeout_message(&msg) {
LlmError::Timeout { timeout_ms }
} else {
LlmError::Transport(msg)
}
}
ureq::Error::Status(code, _) => LlmError::Upstream(format!("HTTP {code}")),
}
}
fn is_timeout_message(msg: &str) -> bool {
let lower = msg.to_ascii_lowercase();
lower.contains("timed out") || lower.contains("deadline exceeded") || lower.contains("timeout")
}
impl LlmRole {
fn as_anthropic_str(self) -> &'static str {
match self {
LlmRole::User | LlmRole::Tool => "user",
LlmRole::Assistant => "assistant",
}
}
}