Skip to main content

cortex_llm/
claude_http.rs

1//! HTTP adapter that posts to the Anthropic Messages API.
2//!
3//! [`ClaudeHttpAdapter`] implements [`LlmAdapter`] by forwarding requests to
4//! `https://api.anthropic.com/v1/messages`. Because `ureq` is synchronous,
5//! the blocking I/O is wrapped with `tokio::task::spawn_blocking` so the
6//! adapter can satisfy the async trait contract without blocking the async
7//! executor.
8//!
9//! ## Streaming
10//!
11//! [`ClaudeHttpAdapter::stream_boxed`] overrides the trait default and posts
12//! to `/v1/messages` with `"stream": true`. Anthropic returns a Server-Sent
13//! Events stream. The blocking reader filters `content_block_delta` events to
14//! collect token deltas and emits a terminal [`StreamChunk`] on `message_stop`.
15//!
16//! ## Runtime ceiling
17//!
18//! Per the forthcoming ADR 0048, this adapter carries a `RemoteUnsigned`
19//! runtime ceiling — lower than `OllamaHttpAdapter`'s `LocalUnsigned`
20//! ceiling, because responses arrive from a remote endpoint without
21//! supply-chain or cryptographic non-repudiation. The `RemoteUnsigned`
22//! variant is being added in a separate task; until it lands, callers MUST
23//! treat responses from this adapter as bounded at `LocalUnsigned` or weaker.
24//!
25//! ## Data classification
26//!
27//! Before dispatching a prompt to a remote endpoint, callers should invoke
28//! [`check_prompt_sensitivity`] to verify that no high-sensitivity memory
29//! content is included. The real classification logic (ADR 0030) will be
30//! wired in a follow-on task; the current stub always permits.
31//!
32//! ## Construction
33//!
34//! Construction reads `CORTEX_CLAUDE_API_KEY` from the environment and fails
35//! closed if the variable is absent or empty. This prevents accidental use in
36//! environments where the key has not been provisioned.
37
38use std::time::Duration;
39
40use async_trait::async_trait;
41use serde::{Deserialize, Serialize};
42
43use crate::adapter::{
44    blake3_hex, BoxStream, LlmAdapter, LlmError, LlmRequest, LlmResponse, LlmRole, StreamChunk,
45};
46use crate::sensitivity::{check_remote_prompt_sensitivity, MaxSensitivity};
47
48/// Stable invariant: `CORTEX_CLAUDE_API_KEY` env var absent or empty (ADR 0048 §4).
49pub const CLAUDE_ADAPTER_API_KEY_MISSING_INVARIANT: &str = "cortex.run.claude.api_key_missing";
50/// Stable invariant: model ID not in compile-time allowlist (ADR 0048 §5).
51pub const CLAUDE_ADAPTER_MODEL_NOT_ALLOWED_INVARIANT: &str = "cortex.run.claude.model_not_allowed";
52/// Stable invariant: endpoint rejected — not `api.anthropic.com` (ADR 0048 §2).
53pub const CLAUDE_ADAPTER_ENDPOINT_REJECTED_INVARIANT: &str = "cortex.run.claude.endpoint_rejected";
54
55/// HTTP adapter that routes to the Anthropic Messages API.
56///
57/// Construction reads `CORTEX_CLAUDE_API_KEY` from the environment and
58/// validates the model string. The adapter is `Send + Sync` and may be placed
59/// behind an `Arc<dyn LlmAdapter>`.
60///
61/// The `max_sensitivity` field enforces the ADR 0048 §3 data-classification
62/// gate: prompts containing inline high-sensitivity markers are rejected before
63/// any bytes leave the machine when the gate is set below `High`.
64#[derive(Debug, Clone)]
65pub struct ClaudeHttpAdapter {
66    /// API key loaded from [`ClaudeHttpAdapter::ANTHROPIC_API_KEY_ENV`] at
67    /// construction time.
68    api_key: String,
69    /// Anthropic model identifier, e.g. `claude-3-5-sonnet-20241022`.
70    model: String,
71    /// Base URL for the API; defaults to [`ClaudeHttpAdapter::ANTHROPIC_API_BASE`].
72    /// Overridable via [`ClaudeHttpAdapter::new_with_base_url`] for testing.
73    base_url: String,
74    /// Maximum data-classification level permitted in remote prompts (ADR 0048 §3).
75    /// Defaults to [`MaxSensitivity::Medium`] when constructed via [`Self::new`].
76    max_sensitivity: MaxSensitivity,
77}
78
79impl ClaudeHttpAdapter {
80    /// The base URL for all Anthropic API requests.
81    pub const ANTHROPIC_API_BASE: &'static str = "https://api.anthropic.com";
82
83    /// Environment variable that must contain the Anthropic API key.
84    ///
85    /// Construction fails with [`LlmError::InvalidRequest`] if this variable
86    /// is absent or empty.
87    pub const ANTHROPIC_API_KEY_ENV: &'static str = "CORTEX_CLAUDE_API_KEY";
88
89    /// `anthropic-version` header value required by the Messages API.
90    pub const ANTHROPIC_VERSION_HEADER: &'static str = "2023-06-01";
91
92    /// Construct a `ClaudeHttpAdapter` for `model` with `max_sensitivity`.
93    ///
94    /// `max_sensitivity` controls the data-classification gate (ADR 0048 §3).
95    /// Pass `None` to use the default of [`MaxSensitivity::Medium`], which
96    /// blocks high-sensitivity memories from being sent to the remote endpoint.
97    ///
98    /// Returns [`LlmError::InvalidRequest`] when:
99    /// - `CORTEX_CLAUDE_API_KEY` is absent or empty.
100    /// - `model` is empty.
101    /// - `model` contains the string `"latest"` (forbidden to preserve
102    ///   audit-trail identity; see ADR 0044 §3 and ADR 0048).
103    pub fn new(model: String, max_sensitivity: Option<MaxSensitivity>) -> Result<Self, LlmError> {
104        let api_key = std::env::var(Self::ANTHROPIC_API_KEY_ENV)
105            .ok()
106            .filter(|v| !v.is_empty())
107            .ok_or_else(|| {
108                LlmError::InvalidRequest(format!(
109                    "env var {} is absent or empty; refusing to construct ClaudeHttpAdapter",
110                    Self::ANTHROPIC_API_KEY_ENV
111                ))
112            })?;
113
114        if model.is_empty() {
115            return Err(LlmError::InvalidRequest(
116                "model must not be empty".to_string(),
117            ));
118        }
119        if model.contains("latest") {
120            return Err(LlmError::InvalidRequest(format!(
121                "model '{model}' contains 'latest' alias; pin to a specific version for audit-trail identity"
122            )));
123        }
124
125        Ok(Self {
126            api_key,
127            model,
128            base_url: Self::ANTHROPIC_API_BASE.to_string(),
129            max_sensitivity: max_sensitivity.unwrap_or(MaxSensitivity::Medium),
130        })
131    }
132
133    /// Construct a `ClaudeHttpAdapter` with an explicit `base_url`.
134    ///
135    /// This constructor is intended for testing only — it allows tests to point
136    /// the adapter at a mock `TcpListener` instead of `api.anthropic.com`. The
137    /// API key validation and model validation rules are identical to
138    /// [`Self::new`]. `max_sensitivity` follows the same defaulting rule:
139    /// `None` resolves to [`MaxSensitivity::Medium`].
140    #[doc(hidden)]
141    pub fn new_with_base_url(
142        model: String,
143        base_url: String,
144        max_sensitivity: Option<MaxSensitivity>,
145    ) -> Result<Self, LlmError> {
146        let api_key = std::env::var(Self::ANTHROPIC_API_KEY_ENV)
147            .ok()
148            .filter(|v| !v.is_empty())
149            .ok_or_else(|| {
150                LlmError::InvalidRequest(format!(
151                    "env var {} is absent or empty; refusing to construct ClaudeHttpAdapter",
152                    Self::ANTHROPIC_API_KEY_ENV
153                ))
154            })?;
155
156        if model.is_empty() {
157            return Err(LlmError::InvalidRequest(
158                "model must not be empty".to_string(),
159            ));
160        }
161        if model.contains("latest") {
162            return Err(LlmError::InvalidRequest(format!(
163                "model '{model}' contains 'latest' alias; pin to a specific version for audit-trail identity"
164            )));
165        }
166
167        Ok(Self {
168            api_key,
169            model,
170            base_url,
171            max_sensitivity: max_sensitivity.unwrap_or(MaxSensitivity::Medium),
172        })
173    }
174}
175
176// ---------------------------------------------------------------------------
177// Wire types
178// ---------------------------------------------------------------------------
179
180/// Outgoing body for `POST /v1/messages`.
181#[derive(Debug, Serialize)]
182struct MessagesRequest<'a> {
183    model: &'a str,
184    max_tokens: u32,
185    messages: Vec<AnthropicMessage<'a>>,
186    stream: bool,
187}
188
189// ---------------------------------------------------------------------------
190// Streaming SSE wire types
191// ---------------------------------------------------------------------------
192
193/// Top-level envelope for a `content_block_delta` SSE event.
194#[derive(Debug, Deserialize)]
195struct SseEvent {
196    #[serde(rename = "type")]
197    event_type: String,
198    #[serde(default)]
199    delta: Option<SseDelta>,
200}
201
202/// The `delta` field inside a `content_block_delta` event.
203#[derive(Debug, Deserialize)]
204struct SseDelta {
205    #[serde(rename = "type")]
206    delta_type: String,
207    #[serde(default)]
208    text: String,
209}
210
211/// One message in the Anthropic chat format.
212#[derive(Debug, Serialize)]
213struct AnthropicMessage<'a> {
214    role: &'a str,
215    content: &'a str,
216}
217
218/// Top-level Anthropic Messages API response envelope.
219#[derive(Debug, Deserialize)]
220struct MessagesResponse {
221    #[serde(default)]
222    content: Vec<ContentBlock>,
223    #[serde(default)]
224    model: String,
225    #[serde(default)]
226    usage: Option<AnthropicUsage>,
227}
228
229/// One content block in the response `content` array.
230#[derive(Debug, Deserialize)]
231struct ContentBlock {
232    #[serde(rename = "type")]
233    block_type: String,
234    #[serde(default)]
235    text: String,
236}
237
238/// Token-usage field from the Anthropic response.
239#[derive(Debug, Deserialize)]
240struct AnthropicUsage {
241    input_tokens: u32,
242    output_tokens: u32,
243}
244
245// ---------------------------------------------------------------------------
246// LlmAdapter implementation
247// ---------------------------------------------------------------------------
248
249#[async_trait]
250impl LlmAdapter for ClaudeHttpAdapter {
251    fn adapter_id(&self) -> &'static str {
252        "claude"
253    }
254
255    async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
256        // ADR 0048 §3: data-classification gate before any remote dispatch.
257        // Assemble the full prompt text (system + all message contents) and run
258        // the sensitivity gate so that high-sensitivity markers are caught
259        // before any bytes leave the machine.
260        let prompt_text: String = std::iter::once(req.system.as_str())
261            .chain(req.messages.iter().map(|m| m.content.as_str()))
262            .collect::<Vec<_>>()
263            .join("\n");
264        check_remote_prompt_sensitivity(&prompt_text, self.max_sensitivity)?;
265
266        let api_key = self.api_key.clone();
267        let model = self.model.clone();
268        let base_url = self.base_url.clone();
269        let timeout_ms = req.timeout_ms;
270
271        let result = tokio::task::spawn_blocking(move || {
272            call_claude(&api_key, &model, &base_url, &req, timeout_ms)
273        })
274        .await
275        .map_err(|e| LlmError::Transport(format!("spawn_blocking join error: {e}")))?;
276
277        result
278    }
279
280    /// Override with true Anthropic SSE streaming via `POST /v1/messages` with
281    /// `"stream": true`.
282    ///
283    /// Uses `ureq` (synchronous) inside `spawn_blocking`. The blocking reader
284    /// collects all SSE lines into a `Vec` before yielding them; the
285    /// `async_stream::stream!` block then emits items one by one.
286    ///
287    /// TODO: replace `ureq` with an async HTTP client to achieve true
288    /// line-by-line streaming without buffering the entire response.
289    fn stream_boxed(&self, req: LlmRequest) -> BoxStream<'_> {
290        stream_claude_sse(
291            self.api_key.clone(),
292            self.model.clone(),
293            self.base_url.clone(),
294            req,
295        )
296    }
297}
298
299/// Synchronous Anthropic HTTP call, executed inside `spawn_blocking`.
300fn call_claude(
301    api_key: &str,
302    model: &str,
303    base_url: &str,
304    req: &LlmRequest,
305    timeout_ms: u64,
306) -> Result<LlmResponse, LlmError> {
307    let url = format!("{base_url}/v1/messages");
308
309    // Build message list from request messages; Anthropic only accepts
310    // `user` and `assistant` roles in the `messages` array.
311    let messages: Vec<AnthropicMessage<'_>> = req
312        .messages
313        .iter()
314        .map(|m| AnthropicMessage {
315            role: m.role.as_anthropic_str(),
316            content: &m.content,
317        })
318        .collect();
319
320    let body = MessagesRequest {
321        model,
322        max_tokens: req.max_tokens,
323        messages,
324        stream: false,
325    };
326
327    let body_value = serde_json::to_value(&body)
328        .map_err(|e| LlmError::Transport(format!("request serialization failed: {e}")))?;
329
330    let timeout = Duration::from_millis(timeout_ms);
331    let agent = ureq::AgentBuilder::new().timeout(timeout).build();
332
333    let raw_response = agent
334        .post(&url)
335        .set("x-api-key", api_key)
336        .set(
337            "anthropic-version",
338            ClaudeHttpAdapter::ANTHROPIC_VERSION_HEADER,
339        )
340        .set("content-type", "application/json")
341        .send_json(body_value)
342        .map_err(|err| map_ureq_error(err, timeout_ms))?;
343
344    let status = raw_response.status();
345    if status != 200 {
346        return Err(LlmError::Upstream(format!("HTTP {status}")));
347    }
348
349    let response_text = raw_response
350        .into_string()
351        .map_err(|e| LlmError::Transport(format!("reading response body: {e}")))?;
352
353    let parsed: MessagesResponse = serde_json::from_str(&response_text)
354        .map_err(|e| LlmError::Parse(format!("anthropic response parse: {e}")))?;
355
356    // Extract the first text block from content[].
357    let text = parsed
358        .content
359        .into_iter()
360        .find(|block| block.block_type == "text")
361        .map(|block| block.text)
362        .ok_or_else(|| {
363            LlmError::Parse("anthropic response contained no text content block".to_string())
364        })?;
365
366    let raw_hash = blake3_hex(response_text.as_bytes());
367    let usage = parsed.usage.map(|u| crate::adapter::TokenUsage {
368        prompt_tokens: u.input_tokens,
369        completion_tokens: u.output_tokens,
370    });
371
372    // Use the model echoed by the provider when present; fall back to the
373    // adapter's configured model so the field is never empty.
374    let response_model = if parsed.model.is_empty() {
375        model.to_string()
376    } else {
377        parsed.model
378    };
379
380    Ok(LlmResponse {
381        text,
382        parsed_json: None,
383        model: response_model,
384        usage,
385        raw_hash,
386    })
387}
388
389// ---------------------------------------------------------------------------
390// Streaming implementation
391// ---------------------------------------------------------------------------
392
393/// Build a `BoxStream` that drives Anthropic SSE streaming.
394///
395/// Extracted as a free function so the `async_stream::stream!` macro is not
396/// nested inside an `impl` block, which can confuse lifetime inference.
397fn stream_claude_sse(
398    api_key: String,
399    model: String,
400    base_url: String,
401    req: LlmRequest,
402) -> BoxStream<'static> {
403    Box::pin(async_stream::stream! {
404        let timeout_ms = req.timeout_ms;
405        let result = tokio::task::spawn_blocking(move || {
406            call_claude_streaming(&api_key, &model, &base_url, &req, timeout_ms)
407        })
408        .await;
409
410        match result {
411            Ok(chunks) => {
412                for chunk in chunks {
413                    yield chunk;
414                }
415            }
416            Err(e) => yield Err(LlmError::Transport(format!("spawn_blocking join error: {e}"))),
417        }
418    })
419}
420
421/// Synchronous Anthropic SSE streaming call, executed inside `spawn_blocking`.
422///
423/// Posts to `/v1/messages` with `stream: true`, then reads the response body
424/// line by line. SSE protocol:
425/// - Empty lines are separators — skip them.
426/// - Lines beginning with `event:` are event-type hints — skip them (we parse
427///   the type from the `data:` JSON instead).
428/// - Lines beginning with `data:` carry the JSON payload.
429///
430/// For `content_block_delta` events with `delta.type == "text_delta"` we emit
431/// a [`StreamChunk`] carrying the token text. On `message_stop` we emit a
432/// terminal chunk with `finish_reason = Some("stop")` and return.
433fn call_claude_streaming(
434    api_key: &str,
435    model: &str,
436    base_url: &str,
437    req: &LlmRequest,
438    timeout_ms: u64,
439) -> Vec<Result<StreamChunk, LlmError>> {
440    let url = format!("{base_url}/v1/messages");
441
442    let messages: Vec<AnthropicMessage<'_>> = req
443        .messages
444        .iter()
445        .map(|m| AnthropicMessage {
446            role: m.role.as_anthropic_str(),
447            content: &m.content,
448        })
449        .collect();
450
451    let body = MessagesRequest {
452        model,
453        max_tokens: req.max_tokens,
454        messages,
455        stream: true,
456    };
457
458    let body_value = match serde_json::to_value(&body) {
459        Ok(v) => v,
460        Err(e) => {
461            return vec![Err(LlmError::Transport(format!(
462                "request serialization failed: {e}"
463            )))]
464        }
465    };
466
467    let timeout = Duration::from_millis(timeout_ms);
468    let agent = ureq::AgentBuilder::new().timeout(timeout).build();
469
470    let raw_response = match agent
471        .post(&url)
472        .set("x-api-key", api_key)
473        .set(
474            "anthropic-version",
475            ClaudeHttpAdapter::ANTHROPIC_VERSION_HEADER,
476        )
477        .set("content-type", "application/json")
478        .send_json(body_value)
479    {
480        Ok(r) => r,
481        Err(err) => return vec![Err(map_ureq_error(err, timeout_ms))],
482    };
483
484    let status = raw_response.status();
485    if status != 200 {
486        return vec![Err(LlmError::Upstream(format!("HTTP {status}")))];
487    }
488
489    let body_text = match raw_response.into_string() {
490        Ok(s) => s,
491        Err(e) => {
492            return vec![Err(LlmError::Transport(format!(
493                "reading streaming response body: {e}"
494            )))]
495        }
496    };
497
498    let mut chunks = Vec::new();
499
500    for line in body_text.lines() {
501        // Skip empty lines (SSE event separators) and event-type hint lines.
502        if line.is_empty() || line.starts_with("event:") {
503            continue;
504        }
505
506        let data = match line.strip_prefix("data:") {
507            Some(rest) => rest.trim(),
508            None => continue,
509        };
510
511        let event: SseEvent = match serde_json::from_str(data) {
512            Ok(v) => v,
513            Err(e) => {
514                chunks.push(Err(LlmError::Parse(format!(
515                    "claude SSE data parse: {e}: {data}"
516                ))));
517                continue;
518            }
519        };
520
521        match event.event_type.as_str() {
522            "content_block_delta" => {
523                if let Some(delta) = event.delta {
524                    if delta.delta_type == "text_delta" {
525                        chunks.push(Ok(StreamChunk {
526                            delta: delta.text,
527                            finish_reason: None,
528                        }));
529                    }
530                }
531            }
532            "message_stop" => {
533                chunks.push(Ok(StreamChunk {
534                    delta: String::new(),
535                    finish_reason: Some("stop".into()),
536                }));
537                // Terminal event — no further lines need processing.
538                return chunks;
539            }
540            _ => {
541                // Informational events (message_start, content_block_start,
542                // message_delta, ping, etc.) are intentionally ignored.
543            }
544        }
545    }
546
547    chunks
548}
549
550/// Map a `ureq` error to an [`LlmError`] variant.
551fn map_ureq_error(err: ureq::Error, timeout_ms: u64) -> LlmError {
552    match err {
553        ureq::Error::Transport(t) => {
554            let msg = t.to_string();
555            if is_timeout_message(&msg) {
556                LlmError::Timeout { timeout_ms }
557            } else {
558                LlmError::Transport(msg)
559            }
560        }
561        ureq::Error::Status(code, _) => LlmError::Upstream(format!("HTTP {code}")),
562    }
563}
564
565/// Heuristic: does the transport error message look like a timeout?
566fn is_timeout_message(msg: &str) -> bool {
567    let lower = msg.to_ascii_lowercase();
568    lower.contains("timed out") || lower.contains("deadline exceeded") || lower.contains("timeout")
569}
570
571// ---------------------------------------------------------------------------
572// Role serialization helper
573// ---------------------------------------------------------------------------
574
575impl LlmRole {
576    /// Return the lowercase string representation used by Anthropic's API.
577    ///
578    /// Anthropic accepts `user` and `assistant`; `tool` is mapped to `user`
579    /// as a conservative fallback (tool-result multi-turn is out of scope
580    /// for this adapter version).
581    fn as_anthropic_str(self) -> &'static str {
582        match self {
583            LlmRole::User | LlmRole::Tool => "user",
584            LlmRole::Assistant => "assistant",
585        }
586    }
587}