use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(not(target_arch = "wasm32"))]
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::stream::StreamExt;
use reqwest::{Client, Url};
use tracing::trace;
use crate::backends::anthropic::wire::{
MessagesRequest, MessagesResponse, StreamEvent, ANTHROPIC_VERSION,
};
use crate::error::{Error, Result};
const DEFAULT_BASE_URL: &str = "https://api.anthropic.com";
#[cfg(not(target_arch = "wasm32"))]
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
pub struct AnthropicClient {
http: Client,
api_key: Box<str>,
base_url: Url,
}
impl fmt::Debug for AnthropicClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AnthropicClient")
.field("base_url", &self.base_url.as_str())
.field("api_key", &"<redacted>")
.finish()
}
}
impl AnthropicClient {
pub fn new(api_key: impl Into<String>) -> Result<Self> {
let builder =
Client::builder().user_agent(concat!("localharness/", env!("CARGO_PKG_VERSION")));
#[cfg(not(target_arch = "wasm32"))]
let builder = builder.timeout(DEFAULT_TIMEOUT);
let http = builder
.build()
.map_err(|e| Error::other(format!("reqwest client build: {e}")))?;
Ok(Self {
http,
api_key: api_key.into().into_boxed_str(),
base_url: Url::parse(DEFAULT_BASE_URL).expect("default base url is valid"),
})
}
pub fn with_base_url(mut self, url: Url) -> Self {
self.base_url = url;
self
}
fn messages_url(&self) -> Result<Url> {
self.base_url
.join("v1/messages")
.map_err(|e| Error::other(format!("invalid messages url: {e}")))
}
pub async fn messages(&self, req: &MessagesRequest) -> Result<MessagesResponse> {
let url = self.messages_url()?;
let mut body = req.clone();
body.stream = false;
let response = self
.http
.post(url)
.header("x-api-key", self.api_key.as_ref())
.header("anthropic-version", ANTHROPIC_VERSION)
.header("content-type", "application/json")
.json(&body)
.send()
.await
.map_err(|e| Error::other(format!("anthropic POST: {e}")))?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "<no body>".to_string());
return Err(Error::other(format!("anthropic HTTP {status}: {body}")));
}
response
.json::<MessagesResponse>()
.await
.map_err(|e| Error::other(format!("anthropic JSON: {e}")))
}
pub async fn stream_messages(&self, req: &MessagesRequest) -> Result<MessagesSseStream> {
let url = self.messages_url()?;
let mut body = req.clone();
body.stream = true;
let response = self
.http
.post(url)
.header("x-api-key", self.api_key.as_ref())
.header("anthropic-version", ANTHROPIC_VERSION)
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.json(&body)
.send()
.await
.map_err(|e| Error::other(format!("anthropic POST: {e}")))?;
let debug_sse = std::env::var("LH_DEBUG_SSE").is_ok();
if debug_sse {
eprintln!(
"[anthropic resp] status={} content-type={:?}",
response.status(),
response.headers().get("content-type"),
);
}
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "<no body>".to_string());
if debug_sse {
eprintln!("[anthropic ERROR] HTTP {status}: {body}");
}
return Err(Error::other(format!("anthropic HTTP {status}: {body}")));
}
let byte_stream = response
.bytes_stream()
.map(|res| res.map_err(|e| Error::other(format!("anthropic chunk read: {e}"))));
Ok(MessagesSseStream::new(Box::pin(byte_stream)))
}
}
#[cfg(not(target_arch = "wasm32"))]
type ByteStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + 'static>>;
#[cfg(target_arch = "wasm32")]
type ByteStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + 'static>>;
pub struct MessagesSseStream {
upstream: ByteStream,
buffer: BytesMut,
done: bool,
}
impl MessagesSseStream {
pub fn new(upstream: ByteStream) -> Self {
Self {
upstream,
buffer: BytesMut::with_capacity(8 * 1024),
done: false,
}
}
fn take_frame(&mut self) -> Option<Vec<u8>> {
let bytes = &self.buffer[..];
let mut i = 0;
while i < bytes.len() {
if i + 3 < bytes.len()
&& bytes[i] == b'\r'
&& bytes[i + 1] == b'\n'
&& bytes[i + 2] == b'\r'
&& bytes[i + 3] == b'\n'
{
let frame = self.buffer.split_to(i + 4);
return Some(extract_data_payload(&frame));
}
if i + 1 < bytes.len() && bytes[i] == b'\n' && bytes[i + 1] == b'\n' {
let frame = self.buffer.split_to(i + 2);
return Some(extract_data_payload(&frame));
}
i += 1;
}
None
}
fn take_remaining(&mut self) -> Option<Vec<u8>> {
if self.buffer.is_empty() {
return None;
}
let frame = self.buffer.split_to(self.buffer.len());
Some(extract_data_payload(&frame))
}
}
impl Stream for MessagesSseStream {
type Item = Result<StreamEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.done {
let payload = match self.take_frame() {
Some(p) => p,
None => match self.take_remaining() {
Some(p) => p,
None => return Poll::Ready(None),
},
};
if payload.is_empty() {
continue;
}
return Poll::Ready(Some(decode_event(&payload)));
}
if let Some(payload) = self.take_frame() {
if payload.is_empty() {
continue;
}
return Poll::Ready(Some(decode_event(&payload)));
}
match self.upstream.as_mut().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok(bytes))) => {
trace!(len = bytes.len(), "anthropic sse bytes");
self.buffer.extend_from_slice(&bytes);
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => self.done = true,
}
}
}
}
fn extract_data_payload(frame: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(frame.len());
let text = std::str::from_utf8(frame).unwrap_or("");
for line in text.split('\n') {
let line = line.trim_end_matches('\r');
if let Some(rest) = line.strip_prefix("data:") {
let rest = rest.strip_prefix(' ').unwrap_or(rest);
if !out.is_empty() {
out.push(b'\n');
}
out.extend_from_slice(rest.as_bytes());
}
}
out
}
fn decode_event(payload: &[u8]) -> Result<StreamEvent> {
serde_json::from_slice::<StreamEvent>(payload).map_err(|e| {
Error::other(format!(
"anthropic sse decode: {e}; payload: {}",
String::from_utf8_lossy(payload)
))
})
}
pub type SharedClient = Arc<AnthropicClient>;
#[cfg(test)]
mod tests {
use super::*;
use crate::backends::anthropic::wire::{Block, BlockDelta, StopReason};
use futures_util::stream;
fn bytes_from(parts: &[&[u8]]) -> ByteStream {
let owned: Vec<Bytes> = parts.iter().map(|b| Bytes::copy_from_slice(b)).collect();
Box::pin(stream::iter(owned.into_iter().map(Ok)))
}
fn canonical_frames() -> Vec<Vec<u8>> {
let raw = [
"event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"claude-haiku-4-5-20251001\",\"role\":\"assistant\",\"usage\":{\"input_tokens\":12,\"output_tokens\":1}}}\n\n",
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Read\"}}\n\n",
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"ing.\"}}\n\n",
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n",
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"tool_use\",\"id\":\"toolu_x\",\"name\":\"view_file\"}}\n\n",
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"path\\\":\"}}\n\n",
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"\\\"main.rs\\\"}\"}}\n\n",
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":1}\n\n",
"event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\"},\"usage\":{\"output_tokens\":33}}\n\n",
"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
];
raw.iter().map(|s| s.as_bytes().to_vec()).collect()
}
async fn assert_canonical(mut s: MessagesSseStream) {
let mut text = String::new();
let mut tool_id = String::new();
let mut tool_name = String::new();
let mut tool_args = String::new();
let mut stop: Option<StopReason> = None;
let mut out_tokens: Option<i32> = None;
let mut in_tokens: Option<i32> = None;
let mut saw_stop_event = false;
while let Some(ev) = s.next().await {
match ev.unwrap() {
StreamEvent::MessageStart { message } => {
in_tokens = message.usage.and_then(|u| u.input_tokens);
}
StreamEvent::ContentBlockStart {
content_block: Block::ToolUse { id, name, .. },
..
} => {
tool_id = id;
tool_name = name;
}
StreamEvent::ContentBlockDelta { delta, .. } => match delta {
BlockDelta::TextDelta { text: t } => text.push_str(&t),
BlockDelta::InputJsonDelta { partial_json } => tool_args.push_str(&partial_json),
_ => {}
},
StreamEvent::MessageDelta { delta, usage } => {
stop = delta.stop_reason;
out_tokens = usage.and_then(|u| u.output_tokens);
}
StreamEvent::MessageStop => saw_stop_event = true,
_ => {}
}
}
assert_eq!(text, "Reading.");
assert_eq!(tool_id, "toolu_x");
assert_eq!(tool_name, "view_file");
let parsed: serde_json::Value = serde_json::from_str(&tool_args).unwrap();
assert_eq!(parsed["path"], "main.rs");
assert_eq!(stop, Some(StopReason::ToolUse));
assert_eq!(out_tokens, Some(33));
assert_eq!(in_tokens, Some(12));
assert!(saw_stop_event);
}
#[tokio::test]
async fn decodes_canonical_sequence_one_chunk() {
let blob: Vec<u8> = canonical_frames().concat();
let s = MessagesSseStream::new(bytes_from(&[&blob]));
assert_canonical(s).await;
}
#[tokio::test]
async fn decodes_canonical_sequence_split_mid_frame() {
let blob: Vec<u8> = canonical_frames().concat();
let chunks: Vec<&[u8]> = blob.chunks(17).collect();
let s = MessagesSseStream::new(bytes_from(&chunks));
assert_canonical(s).await;
}
#[tokio::test]
async fn decodes_canonical_sequence_crlf() {
let blob: Vec<u8> = canonical_frames().concat();
let crlf: Vec<u8> = String::from_utf8(blob)
.unwrap()
.replace('\n', "\r\n")
.into_bytes();
let chunks: Vec<&[u8]> = crlf.chunks(13).collect();
let s = MessagesSseStream::new(bytes_from(&chunks));
assert_canonical(s).await;
}
#[tokio::test]
async fn ignores_ping_and_unknown_events() {
let frames = [
"event: ping\ndata: {\"type\":\"ping\"}\n\n".as_bytes(),
"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n".as_bytes(),
];
let mut s = MessagesSseStream::new(bytes_from(&frames));
let first = s.next().await.unwrap().unwrap();
assert_eq!(first, StreamEvent::Ping);
let second = s.next().await.unwrap().unwrap();
assert_eq!(second, StreamEvent::MessageStop);
assert!(s.next().await.is_none());
}
async fn collect_kinds(mut s: MessagesSseStream) -> Vec<String> {
let mut out = Vec::new();
while let Some(ev) = s.next().await {
match ev {
Ok(StreamEvent::MessageStart { .. }) => out.push("message_start".into()),
Ok(StreamEvent::ContentBlockStart { .. }) => out.push("content_block_start".into()),
Ok(StreamEvent::ContentBlockDelta { .. }) => out.push("content_block_delta".into()),
Ok(StreamEvent::ContentBlockStop { .. }) => out.push("content_block_stop".into()),
Ok(StreamEvent::MessageDelta { .. }) => out.push("message_delta".into()),
Ok(StreamEvent::MessageStop) => out.push("message_stop".into()),
Ok(StreamEvent::Ping) => out.push("ping".into()),
Ok(StreamEvent::Error { .. }) => out.push("error".into()),
Ok(StreamEvent::Unknown) => out.push("unknown".into()),
Err(_) => out.push("ERR".into()),
}
}
out
}
#[tokio::test]
async fn flushes_final_frame_without_trailing_blank_line() {
let frames = [
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n".as_bytes(),
"event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":7}}".as_bytes(),
];
let s = MessagesSseStream::new(bytes_from(&frames));
let kinds = collect_kinds(s).await;
assert_eq!(
kinds,
vec!["content_block_stop", "message_delta"],
"the final unterminated frame must still be decoded, not dropped at EOF"
);
}
#[tokio::test]
async fn flushes_single_unterminated_frame() {
let frames =
["event: message_stop\ndata: {\"type\":\"message_stop\"}".as_bytes()];
let s = MessagesSseStream::new(bytes_from(&frames));
assert_eq!(collect_kinds(s).await, vec!["message_stop"]);
}
#[tokio::test]
async fn flushes_final_frame_with_single_newline() {
let frames =
["event: message_stop\ndata: {\"type\":\"message_stop\"}\n".as_bytes()];
let s = MessagesSseStream::new(bytes_from(&frames));
assert_eq!(collect_kinds(s).await, vec!["message_stop"]);
}
#[tokio::test]
async fn skips_comment_and_blank_heartbeat_frames() {
let frames = [
": this is an SSE comment / keepalive\n\n".as_bytes(),
"\n".as_bytes(), "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n".as_bytes(),
];
let s = MessagesSseStream::new(bytes_from(&frames));
assert_eq!(
collect_kinds(s).await,
vec!["message_stop"],
"comment + blank-line frames must be skipped, not surfaced as events/errors"
);
}
#[tokio::test]
async fn joins_multiline_data_field() {
let frames =
["event: message_stop\ndata: {\"type\":\ndata: \"message_stop\"}\n\n".as_bytes()];
let s = MessagesSseStream::new(bytes_from(&frames));
assert_eq!(collect_kinds(s).await, vec!["message_stop"]);
}
#[tokio::test]
async fn multibyte_char_split_across_chunks() {
let full = "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"é\"}}\n\n";
let raw = full.as_bytes();
let split_at = raw.iter().position(|&b| b == 0xC3).unwrap();
let (head, tail) = raw.split_at(split_at + 1); let mut s = MessagesSseStream::new(bytes_from(&[head, tail]));
let ev = s.next().await.unwrap().unwrap();
match ev {
StreamEvent::ContentBlockDelta {
delta: BlockDelta::TextDelta { text },
..
} => assert_eq!(text, "é"),
other => panic!("expected a text_delta carrying é, got {other:?}"),
}
assert!(s.next().await.is_none());
}
#[tokio::test]
async fn malformed_json_yields_error_not_panic() {
let frames = ["event: message_delta\ndata: {not valid json}\n\n".as_bytes()];
let mut s = MessagesSseStream::new(bytes_from(&frames));
let item = s.next().await.unwrap();
assert!(item.is_err(), "malformed JSON must be an Err, got {item:?}");
}
#[tokio::test]
async fn accumulates_multiple_blocks_with_empty_and_filled_tool_args() {
let frames: Vec<&[u8]> = vec![
"event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"m\",\"usage\":{\"input_tokens\":5,\"output_tokens\":1}}}\n\n".as_bytes(),
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n".as_bytes(),
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"foo \"}}\n\n".as_bytes(),
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n".as_bytes(),
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"tool_use\",\"id\":\"toolu_empty\",\"name\":\"list_subdomains\"}}\n\n".as_bytes(),
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":1}\n\n".as_bytes(),
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":2,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n".as_bytes(),
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":2,\"delta\":{\"type\":\"text_delta\",\"text\":\"bar\"}}\n\n".as_bytes(),
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":2}\n\n".as_bytes(),
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":3,\"content_block\":{\"type\":\"tool_use\",\"id\":\"toolu_full\",\"name\":\"view_file\"}}\n\n".as_bytes(),
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":3,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"path\\\":\"}}\n\n".as_bytes(),
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":3,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"\\\"a.rs\\\"}\"}}\n\n".as_bytes(),
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":3}\n\n".as_bytes(),
"event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\"},\"usage\":{\"output_tokens\":20}}\n\n".as_bytes(),
"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n".as_bytes(),
];
let mut s = MessagesSseStream::new(bytes_from(&frames));
let mut text = String::new();
let mut tools: std::collections::BTreeMap<u32, (String, String, String)> =
std::collections::BTreeMap::new();
while let Some(ev) = s.next().await {
match ev.unwrap() {
StreamEvent::ContentBlockStart {
index,
content_block: Block::ToolUse { id, name, .. },
} => {
tools.insert(index, (id, name, String::new()));
}
StreamEvent::ContentBlockDelta { index, delta } => match delta {
BlockDelta::TextDelta { text: t } => text.push_str(&t),
BlockDelta::InputJsonDelta { partial_json } => {
if let Some(e) = tools.get_mut(&index) {
e.2.push_str(&partial_json);
}
}
_ => {}
},
_ => {}
}
}
assert_eq!(text, "foo bar", "text from blocks 0 and 2 concatenates");
let empty = &tools[&1];
assert_eq!(empty.0, "toolu_empty");
assert_eq!(empty.1, "list_subdomains");
assert_eq!(empty.2, "", "no input deltas → empty args fragment");
let full = &tools[&3];
assert_eq!(full.0, "toolu_full");
let parsed: serde_json::Value = serde_json::from_str(&full.2).unwrap();
assert_eq!(parsed["path"], "a.rs");
}
}