cortex_llm/claude_http.rs
1//! HTTP adapter that posts to the Anthropic Messages API.
2//!
3//! [`ClaudeHttpAdapter`] implements [`LlmAdapter`] by forwarding requests to
4//! `https://api.anthropic.com/v1/messages`. Because `ureq` is synchronous,
5//! the blocking I/O is wrapped with `tokio::task::spawn_blocking` so the
6//! adapter can satisfy the async trait contract without blocking the async
7//! executor.
8//!
9//! ## Streaming
10//!
11//! [`ClaudeHttpAdapter::stream_boxed`] overrides the trait default and posts
12//! to `/v1/messages` with `"stream": true`. Anthropic returns a Server-Sent
13//! Events stream. The blocking reader filters `content_block_delta` events to
14//! collect token deltas and emits a terminal [`StreamChunk`] on `message_stop`.
15//!
16//! ## Runtime ceiling
17//!
18//! Per the forthcoming ADR 0048, this adapter carries a `RemoteUnsigned`
19//! runtime ceiling — lower than `OllamaHttpAdapter`'s `LocalUnsigned`
20//! ceiling, because responses arrive from a remote endpoint without
21//! supply-chain or cryptographic non-repudiation. The `RemoteUnsigned`
22//! variant is being added in a separate task; until it lands, callers MUST
23//! treat responses from this adapter as bounded at `LocalUnsigned` or weaker.
24//!
25//! ## Data classification
26//!
27//! Before dispatching a prompt to a remote endpoint, callers should invoke
28//! [`check_prompt_sensitivity`] to verify that no high-sensitivity memory
29//! content is included. The real classification logic (ADR 0030) will be
30//! wired in a follow-on task; the current stub always permits.
31//!
32//! ## Construction
33//!
34//! Construction reads `CORTEX_CLAUDE_API_KEY` from the environment and fails
35//! closed if the variable is absent or empty. This prevents accidental use in
36//! environments where the key has not been provisioned.
37
38use std::time::Duration;
39
40use async_trait::async_trait;
41use serde::{Deserialize, Serialize};
42
43use crate::adapter::{
44 blake3_hex, BoxStream, LlmAdapter, LlmError, LlmRequest, LlmResponse, LlmRole, StreamChunk,
45};
46use crate::sensitivity::{check_remote_prompt_sensitivity, MaxSensitivity};
47
48/// Stable invariant: `CORTEX_CLAUDE_API_KEY` env var absent or empty (ADR 0048 §4).
49pub const CLAUDE_ADAPTER_API_KEY_MISSING_INVARIANT: &str = "cortex.run.claude.api_key_missing";
50/// Stable invariant: model ID not in compile-time allowlist (ADR 0048 §5).
51pub const CLAUDE_ADAPTER_MODEL_NOT_ALLOWED_INVARIANT: &str = "cortex.run.claude.model_not_allowed";
52/// Stable invariant: endpoint rejected — not `api.anthropic.com` (ADR 0048 §2).
53pub const CLAUDE_ADAPTER_ENDPOINT_REJECTED_INVARIANT: &str = "cortex.run.claude.endpoint_rejected";
54
55/// HTTP adapter that routes to the Anthropic Messages API.
56///
57/// Construction reads `CORTEX_CLAUDE_API_KEY` from the environment and
58/// validates the model string. The adapter is `Send + Sync` and may be placed
59/// behind an `Arc<dyn LlmAdapter>`.
60///
61/// The `max_sensitivity` field enforces the ADR 0048 §3 data-classification
62/// gate: prompts containing inline high-sensitivity markers are rejected before
63/// any bytes leave the machine when the gate is set below `High`.
64#[derive(Debug, Clone)]
65pub struct ClaudeHttpAdapter {
66 /// API key loaded from [`ClaudeHttpAdapter::ANTHROPIC_API_KEY_ENV`] at
67 /// construction time.
68 api_key: String,
69 /// Anthropic model identifier, e.g. `claude-3-5-sonnet-20241022`.
70 model: String,
71 /// Base URL for the API; defaults to [`ClaudeHttpAdapter::ANTHROPIC_API_BASE`].
72 /// Overridable via [`ClaudeHttpAdapter::new_with_base_url`] for testing.
73 base_url: String,
74 /// Maximum data-classification level permitted in remote prompts (ADR 0048 §3).
75 /// Defaults to [`MaxSensitivity::Medium`] when constructed via [`Self::new`].
76 max_sensitivity: MaxSensitivity,
77}
78
79impl ClaudeHttpAdapter {
80 /// The base URL for all Anthropic API requests.
81 pub const ANTHROPIC_API_BASE: &'static str = "https://api.anthropic.com";
82
83 /// Environment variable that must contain the Anthropic API key.
84 ///
85 /// Construction fails with [`LlmError::InvalidRequest`] if this variable
86 /// is absent or empty.
87 pub const ANTHROPIC_API_KEY_ENV: &'static str = "CORTEX_CLAUDE_API_KEY";
88
89 /// `anthropic-version` header value required by the Messages API.
90 pub const ANTHROPIC_VERSION_HEADER: &'static str = "2023-06-01";
91
92 /// Construct a `ClaudeHttpAdapter` for `model` with `max_sensitivity`.
93 ///
94 /// `max_sensitivity` controls the data-classification gate (ADR 0048 §3).
95 /// Pass `None` to use the default of [`MaxSensitivity::Medium`], which
96 /// blocks high-sensitivity memories from being sent to the remote endpoint.
97 ///
98 /// Returns [`LlmError::InvalidRequest`] when:
99 /// - `CORTEX_CLAUDE_API_KEY` is absent or empty.
100 /// - `model` is empty.
101 /// - `model` contains the string `"latest"` (forbidden to preserve
102 /// audit-trail identity; see ADR 0044 §3 and ADR 0048).
103 pub fn new(model: String, max_sensitivity: Option<MaxSensitivity>) -> Result<Self, LlmError> {
104 let api_key = std::env::var(Self::ANTHROPIC_API_KEY_ENV)
105 .ok()
106 .filter(|v| !v.is_empty())
107 .ok_or_else(|| {
108 LlmError::InvalidRequest(format!(
109 "env var {} is absent or empty; refusing to construct ClaudeHttpAdapter",
110 Self::ANTHROPIC_API_KEY_ENV
111 ))
112 })?;
113
114 if model.is_empty() {
115 return Err(LlmError::InvalidRequest(
116 "model must not be empty".to_string(),
117 ));
118 }
119 if model.contains("latest") {
120 return Err(LlmError::InvalidRequest(format!(
121 "model '{model}' contains 'latest' alias; pin to a specific version for audit-trail identity"
122 )));
123 }
124
125 Ok(Self {
126 api_key,
127 model,
128 base_url: Self::ANTHROPIC_API_BASE.to_string(),
129 max_sensitivity: max_sensitivity.unwrap_or(MaxSensitivity::Medium),
130 })
131 }
132
133 /// Construct a `ClaudeHttpAdapter` with an explicit `base_url`.
134 ///
135 /// This constructor is intended for testing only — it allows tests to point
136 /// the adapter at a mock `TcpListener` instead of `api.anthropic.com`. The
137 /// API key validation and model validation rules are identical to
138 /// [`Self::new`]. `max_sensitivity` follows the same defaulting rule:
139 /// `None` resolves to [`MaxSensitivity::Medium`].
140 #[doc(hidden)]
141 pub fn new_with_base_url(
142 model: String,
143 base_url: String,
144 max_sensitivity: Option<MaxSensitivity>,
145 ) -> Result<Self, LlmError> {
146 let api_key = std::env::var(Self::ANTHROPIC_API_KEY_ENV)
147 .ok()
148 .filter(|v| !v.is_empty())
149 .ok_or_else(|| {
150 LlmError::InvalidRequest(format!(
151 "env var {} is absent or empty; refusing to construct ClaudeHttpAdapter",
152 Self::ANTHROPIC_API_KEY_ENV
153 ))
154 })?;
155
156 if model.is_empty() {
157 return Err(LlmError::InvalidRequest(
158 "model must not be empty".to_string(),
159 ));
160 }
161 if model.contains("latest") {
162 return Err(LlmError::InvalidRequest(format!(
163 "model '{model}' contains 'latest' alias; pin to a specific version for audit-trail identity"
164 )));
165 }
166
167 Ok(Self {
168 api_key,
169 model,
170 base_url,
171 max_sensitivity: max_sensitivity.unwrap_or(MaxSensitivity::Medium),
172 })
173 }
174}
175
176// ---------------------------------------------------------------------------
177// Wire types
178// ---------------------------------------------------------------------------
179
180/// Outgoing body for `POST /v1/messages`.
181#[derive(Debug, Serialize)]
182struct MessagesRequest<'a> {
183 model: &'a str,
184 max_tokens: u32,
185 messages: Vec<AnthropicMessage<'a>>,
186 stream: bool,
187}
188
189// ---------------------------------------------------------------------------
190// Streaming SSE wire types
191// ---------------------------------------------------------------------------
192
193/// Top-level envelope for a `content_block_delta` SSE event.
194#[derive(Debug, Deserialize)]
195struct SseEvent {
196 #[serde(rename = "type")]
197 event_type: String,
198 #[serde(default)]
199 delta: Option<SseDelta>,
200}
201
202/// The `delta` field inside a `content_block_delta` event.
203#[derive(Debug, Deserialize)]
204struct SseDelta {
205 #[serde(rename = "type")]
206 delta_type: String,
207 #[serde(default)]
208 text: String,
209}
210
211/// One message in the Anthropic chat format.
212#[derive(Debug, Serialize)]
213struct AnthropicMessage<'a> {
214 role: &'a str,
215 content: &'a str,
216}
217
218/// Top-level Anthropic Messages API response envelope.
219#[derive(Debug, Deserialize)]
220struct MessagesResponse {
221 #[serde(default)]
222 content: Vec<ContentBlock>,
223 #[serde(default)]
224 model: String,
225 #[serde(default)]
226 usage: Option<AnthropicUsage>,
227}
228
229/// One content block in the response `content` array.
230#[derive(Debug, Deserialize)]
231struct ContentBlock {
232 #[serde(rename = "type")]
233 block_type: String,
234 #[serde(default)]
235 text: String,
236}
237
238/// Token-usage field from the Anthropic response.
239#[derive(Debug, Deserialize)]
240struct AnthropicUsage {
241 input_tokens: u32,
242 output_tokens: u32,
243}
244
245// ---------------------------------------------------------------------------
246// LlmAdapter implementation
247// ---------------------------------------------------------------------------
248
249#[async_trait]
250impl LlmAdapter for ClaudeHttpAdapter {
251 fn adapter_id(&self) -> &'static str {
252 "claude"
253 }
254
255 async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
256 // ADR 0048 §3: data-classification gate before any remote dispatch.
257 // Assemble the full prompt text (system + all message contents) and run
258 // the sensitivity gate so that high-sensitivity markers are caught
259 // before any bytes leave the machine.
260 let prompt_text: String = std::iter::once(req.system.as_str())
261 .chain(req.messages.iter().map(|m| m.content.as_str()))
262 .collect::<Vec<_>>()
263 .join("\n");
264 check_remote_prompt_sensitivity(&prompt_text, self.max_sensitivity)?;
265
266 let api_key = self.api_key.clone();
267 let model = self.model.clone();
268 let base_url = self.base_url.clone();
269 let timeout_ms = req.timeout_ms;
270
271 let result = tokio::task::spawn_blocking(move || {
272 call_claude(&api_key, &model, &base_url, &req, timeout_ms)
273 })
274 .await
275 .map_err(|e| LlmError::Transport(format!("spawn_blocking join error: {e}")))?;
276
277 result
278 }
279
280 /// Override with true Anthropic SSE streaming via `POST /v1/messages` with
281 /// `"stream": true`.
282 ///
283 /// Uses `ureq` (synchronous) inside `spawn_blocking`. The blocking reader
284 /// collects all SSE lines into a `Vec` before yielding them; the
285 /// `async_stream::stream!` block then emits items one by one.
286 ///
287 /// TODO: replace `ureq` with an async HTTP client to achieve true
288 /// line-by-line streaming without buffering the entire response.
289 fn stream_boxed(&self, req: LlmRequest) -> BoxStream<'_> {
290 stream_claude_sse(
291 self.api_key.clone(),
292 self.model.clone(),
293 self.base_url.clone(),
294 req,
295 )
296 }
297}
298
299/// Synchronous Anthropic HTTP call, executed inside `spawn_blocking`.
300fn call_claude(
301 api_key: &str,
302 model: &str,
303 base_url: &str,
304 req: &LlmRequest,
305 timeout_ms: u64,
306) -> Result<LlmResponse, LlmError> {
307 let url = format!("{base_url}/v1/messages");
308
309 // Build message list from request messages; Anthropic only accepts
310 // `user` and `assistant` roles in the `messages` array.
311 let messages: Vec<AnthropicMessage<'_>> = req
312 .messages
313 .iter()
314 .map(|m| AnthropicMessage {
315 role: m.role.as_anthropic_str(),
316 content: &m.content,
317 })
318 .collect();
319
320 let body = MessagesRequest {
321 model,
322 max_tokens: req.max_tokens,
323 messages,
324 stream: false,
325 };
326
327 let body_value = serde_json::to_value(&body)
328 .map_err(|e| LlmError::Transport(format!("request serialization failed: {e}")))?;
329
330 let timeout = Duration::from_millis(timeout_ms);
331 let agent = ureq::AgentBuilder::new().timeout(timeout).build();
332
333 let raw_response = agent
334 .post(&url)
335 .set("x-api-key", api_key)
336 .set(
337 "anthropic-version",
338 ClaudeHttpAdapter::ANTHROPIC_VERSION_HEADER,
339 )
340 .set("content-type", "application/json")
341 .send_json(body_value)
342 .map_err(|err| map_ureq_error(err, timeout_ms))?;
343
344 let status = raw_response.status();
345 if status != 200 {
346 return Err(LlmError::Upstream(format!("HTTP {status}")));
347 }
348
349 let response_text = raw_response
350 .into_string()
351 .map_err(|e| LlmError::Transport(format!("reading response body: {e}")))?;
352
353 let parsed: MessagesResponse = serde_json::from_str(&response_text)
354 .map_err(|e| LlmError::Parse(format!("anthropic response parse: {e}")))?;
355
356 // Extract the first text block from content[].
357 let text = parsed
358 .content
359 .into_iter()
360 .find(|block| block.block_type == "text")
361 .map(|block| block.text)
362 .ok_or_else(|| {
363 LlmError::Parse("anthropic response contained no text content block".to_string())
364 })?;
365
366 let raw_hash = blake3_hex(response_text.as_bytes());
367 let usage = parsed.usage.map(|u| crate::adapter::TokenUsage {
368 prompt_tokens: u.input_tokens,
369 completion_tokens: u.output_tokens,
370 });
371
372 // Use the model echoed by the provider when present; fall back to the
373 // adapter's configured model so the field is never empty.
374 let response_model = if parsed.model.is_empty() {
375 model.to_string()
376 } else {
377 parsed.model
378 };
379
380 Ok(LlmResponse {
381 text,
382 parsed_json: None,
383 model: response_model,
384 usage,
385 raw_hash,
386 })
387}
388
389// ---------------------------------------------------------------------------
390// Streaming implementation
391// ---------------------------------------------------------------------------
392
393/// Build a `BoxStream` that drives Anthropic SSE streaming.
394///
395/// Extracted as a free function so the `async_stream::stream!` macro is not
396/// nested inside an `impl` block, which can confuse lifetime inference.
397fn stream_claude_sse(
398 api_key: String,
399 model: String,
400 base_url: String,
401 req: LlmRequest,
402) -> BoxStream<'static> {
403 Box::pin(async_stream::stream! {
404 let timeout_ms = req.timeout_ms;
405 let result = tokio::task::spawn_blocking(move || {
406 call_claude_streaming(&api_key, &model, &base_url, &req, timeout_ms)
407 })
408 .await;
409
410 match result {
411 Ok(chunks) => {
412 for chunk in chunks {
413 yield chunk;
414 }
415 }
416 Err(e) => yield Err(LlmError::Transport(format!("spawn_blocking join error: {e}"))),
417 }
418 })
419}
420
421/// Synchronous Anthropic SSE streaming call, executed inside `spawn_blocking`.
422///
423/// Posts to `/v1/messages` with `stream: true`, then reads the response body
424/// line by line. SSE protocol:
425/// - Empty lines are separators — skip them.
426/// - Lines beginning with `event:` are event-type hints — skip them (we parse
427/// the type from the `data:` JSON instead).
428/// - Lines beginning with `data:` carry the JSON payload.
429///
430/// For `content_block_delta` events with `delta.type == "text_delta"` we emit
431/// a [`StreamChunk`] carrying the token text. On `message_stop` we emit a
432/// terminal chunk with `finish_reason = Some("stop")` and return.
433fn call_claude_streaming(
434 api_key: &str,
435 model: &str,
436 base_url: &str,
437 req: &LlmRequest,
438 timeout_ms: u64,
439) -> Vec<Result<StreamChunk, LlmError>> {
440 let url = format!("{base_url}/v1/messages");
441
442 let messages: Vec<AnthropicMessage<'_>> = req
443 .messages
444 .iter()
445 .map(|m| AnthropicMessage {
446 role: m.role.as_anthropic_str(),
447 content: &m.content,
448 })
449 .collect();
450
451 let body = MessagesRequest {
452 model,
453 max_tokens: req.max_tokens,
454 messages,
455 stream: true,
456 };
457
458 let body_value = match serde_json::to_value(&body) {
459 Ok(v) => v,
460 Err(e) => {
461 return vec![Err(LlmError::Transport(format!(
462 "request serialization failed: {e}"
463 )))]
464 }
465 };
466
467 let timeout = Duration::from_millis(timeout_ms);
468 let agent = ureq::AgentBuilder::new().timeout(timeout).build();
469
470 let raw_response = match agent
471 .post(&url)
472 .set("x-api-key", api_key)
473 .set(
474 "anthropic-version",
475 ClaudeHttpAdapter::ANTHROPIC_VERSION_HEADER,
476 )
477 .set("content-type", "application/json")
478 .send_json(body_value)
479 {
480 Ok(r) => r,
481 Err(err) => return vec![Err(map_ureq_error(err, timeout_ms))],
482 };
483
484 let status = raw_response.status();
485 if status != 200 {
486 return vec![Err(LlmError::Upstream(format!("HTTP {status}")))];
487 }
488
489 let body_text = match raw_response.into_string() {
490 Ok(s) => s,
491 Err(e) => {
492 return vec![Err(LlmError::Transport(format!(
493 "reading streaming response body: {e}"
494 )))]
495 }
496 };
497
498 let mut chunks = Vec::new();
499
500 for line in body_text.lines() {
501 // Skip empty lines (SSE event separators) and event-type hint lines.
502 if line.is_empty() || line.starts_with("event:") {
503 continue;
504 }
505
506 let data = match line.strip_prefix("data:") {
507 Some(rest) => rest.trim(),
508 None => continue,
509 };
510
511 let event: SseEvent = match serde_json::from_str(data) {
512 Ok(v) => v,
513 Err(e) => {
514 chunks.push(Err(LlmError::Parse(format!(
515 "claude SSE data parse: {e}: {data}"
516 ))));
517 continue;
518 }
519 };
520
521 match event.event_type.as_str() {
522 "content_block_delta" => {
523 if let Some(delta) = event.delta {
524 if delta.delta_type == "text_delta" {
525 chunks.push(Ok(StreamChunk {
526 delta: delta.text,
527 finish_reason: None,
528 }));
529 }
530 }
531 }
532 "message_stop" => {
533 chunks.push(Ok(StreamChunk {
534 delta: String::new(),
535 finish_reason: Some("stop".into()),
536 }));
537 // Terminal event — no further lines need processing.
538 return chunks;
539 }
540 _ => {
541 // Informational events (message_start, content_block_start,
542 // message_delta, ping, etc.) are intentionally ignored.
543 }
544 }
545 }
546
547 chunks
548}
549
550/// Map a `ureq` error to an [`LlmError`] variant.
551fn map_ureq_error(err: ureq::Error, timeout_ms: u64) -> LlmError {
552 match err {
553 ureq::Error::Transport(t) => {
554 let msg = t.to_string();
555 if is_timeout_message(&msg) {
556 LlmError::Timeout { timeout_ms }
557 } else {
558 LlmError::Transport(msg)
559 }
560 }
561 ureq::Error::Status(code, _) => LlmError::Upstream(format!("HTTP {code}")),
562 }
563}
564
565/// Heuristic: does the transport error message look like a timeout?
566fn is_timeout_message(msg: &str) -> bool {
567 let lower = msg.to_ascii_lowercase();
568 lower.contains("timed out") || lower.contains("deadline exceeded") || lower.contains("timeout")
569}
570
571// ---------------------------------------------------------------------------
572// Role serialization helper
573// ---------------------------------------------------------------------------
574
575impl LlmRole {
576 /// Return the lowercase string representation used by Anthropic's API.
577 ///
578 /// Anthropic accepts `user` and `assistant`; `tool` is mapped to `user`
579 /// as a conservative fallback (tool-result multi-turn is out of scope
580 /// for this adapter version).
581 fn as_anthropic_str(self) -> &'static str {
582 match self {
583 LlmRole::User | LlmRole::Tool => "user",
584 LlmRole::Assistant => "assistant",
585 }
586 }
587}