use crate::error::{Error, Result};
use crate::providers::{ListModels, ProviderApi, ProviderClient};
use crate::sse::{SseFrame, drain_sse_frames};
use crate::types::{
ContentPart, DEFAULT_ANTHROPIC_MODEL, Event, Message, Model, ModelInfo, Provider, Response,
ResponseRequest, Role, ToolCall, ToolSpec,
};
use futures_util::StreamExt;
use serde::Deserialize;
use std::collections::HashMap;
pub fn anthropic_stream_debug_enabled() -> bool {
matches!(
std::env::var("LLMOXIDE_DEBUG_ANTHROPIC_STREAM").as_deref(),
Ok("1") | Ok("true") | Ok("yes")
) || matches!(
std::env::var("LLMOXIDE_DEBUG_TOOLS_STREAM").as_deref(),
Ok("1") | Ok("true") | Ok("yes")
)
}
fn anthropic_stream_dbg(msg: impl std::fmt::Display) {
if anthropic_stream_debug_enabled() {
eprintln!("[llmoxide anthropic stream] {msg}");
}
}
fn dbg_trim(s: &str, max_chars: usize) -> String {
let mut t: String = s.chars().take(max_chars).collect();
if s.chars().count() > max_chars {
t.push('…');
}
t
}
#[derive(Debug, Deserialize)]
struct AnthropicModelsResponse {
data: Vec<AnthropicModel>,
}
#[derive(Debug, Deserialize)]
struct AnthropicModel {
id: String,
display_name: Option<String>,
created_at: Option<String>,
max_input_tokens: Option<u64>,
max_tokens: Option<u64>,
}
#[derive(Debug, Clone, Copy)]
pub struct AnthropicProvider;
#[derive(Debug, Clone)]
pub struct AnthropicApi {
api_key: String,
base_url: String,
}
impl AnthropicApi {
pub fn new(api_key: impl Into<String>, base_url: impl Into<String>) -> Self {
Self {
api_key: api_key.into(),
base_url: base_url.into(),
}
}
}
impl AnthropicProvider {
fn url(base_url: &str) -> String {
format!("{}/v1/messages", base_url.trim_end_matches('/'))
}
fn models_url(base_url: &str) -> String {
format!("{}/v1/models", base_url.trim_end_matches('/'))
}
fn system_from_messages(messages: &[Message]) -> Option<String> {
let mut out = String::new();
for m in messages {
if m.role != Role::System {
continue;
}
if let Some(t) = m.text_content() {
if !out.is_empty() {
out.push('\n');
}
out.push_str(&t);
}
}
if out.is_empty() { None } else { Some(out) }
}
fn tool_use_input(arguments: &serde_json::Value) -> serde_json::Value {
match arguments {
serde_json::Value::String(s) => {
serde_json::from_str(s).unwrap_or(serde_json::Value::Object(Default::default()))
}
v => v.clone(),
}
}
fn messages_from_messages(messages: &[Message]) -> Vec<serde_json::Value> {
messages
.iter()
.filter_map(|m| {
let role = match m.role {
Role::System => return None,
Role::User => "user",
Role::Assistant => "assistant",
Role::Tool => "user",
};
let content = m
.content
.iter()
.filter_map(|p| match p {
ContentPart::Text(t) => Some(serde_json::json!({
"type": "text",
"text": t
})),
ContentPart::ImageUrl { url } => Some(serde_json::json!({
"type": "image",
"source": { "type": "url", "url": url }
})),
ContentPart::ImageBase64 { media_type, data } => Some(serde_json::json!({
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": data
}
})),
ContentPart::Thinking { text, signature } => {
if m.role == Role::Assistant {
let mut block = serde_json::json!({
"type": "thinking",
"thinking": text
});
if let Some(s) = signature {
block["signature"] = serde_json::json!(s);
}
Some(block)
} else {
Some(serde_json::json!({
"type": "text",
"text": text
}))
}
}
ContentPart::Citation { .. } => None,
ContentPart::ToolCall {
id,
name,
arguments,
} => Some(serde_json::json!({
"type": "tool_use",
"id": id,
"name": name,
"input": Self::tool_use_input(arguments),
})),
ContentPart::ToolResult { id, content, .. } => Some(serde_json::json!({
"type": "tool_result",
"tool_use_id": id,
"content": content.to_string(),
})),
})
.collect::<Vec<_>>();
Some(serde_json::json!({
"role": role,
"content": content,
}))
})
.collect()
}
fn tools_from_tools(tools: &[ToolSpec]) -> Vec<serde_json::Value> {
tools
.iter()
.map(|t| {
serde_json::json!({
"name": t.name,
"description": t.description,
"input_schema": t.parameters,
})
})
.collect()
}
async fn resolve_model(http: &reqwest::Client, api_key: &str, base_url: &str) -> String {
match ListModels::list_models(&AnthropicProvider, http, api_key, base_url).await {
Ok(models) => models
.first()
.map(|m| m.id.clone())
.unwrap_or_else(|| DEFAULT_ANTHROPIC_MODEL.to_string()),
Err(_) => DEFAULT_ANTHROPIC_MODEL.to_string(),
}
}
fn model_infos_from_response(resp: AnthropicModelsResponse) -> Vec<ModelInfo> {
resp.data
.into_iter()
.map(|m| ModelInfo {
id: m.id,
display_name: m.display_name,
provider: Provider::Anthropic,
created_at: m.created_at,
max_input_tokens: m.max_input_tokens.and_then(|n| u32::try_from(n).ok()),
max_output_tokens: m.max_tokens.and_then(|n| u32::try_from(n).ok()),
})
.collect()
}
fn extract_tool_calls(v: &serde_json::Value) -> Vec<ToolCall> {
let mut out: Vec<ToolCall> = Vec::new();
let Some(arr) = v.get("content").and_then(|x| x.as_array()) else {
return out;
};
for part in arr {
if part.get("type").and_then(|x| x.as_str()) != Some("tool_use") {
continue;
}
let id = part
.get("id")
.and_then(|x| x.as_str())
.map(|s| s.to_string());
let name = part
.get("name")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
let arguments = part
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Null);
if name.is_empty() {
continue;
}
out.push(ToolCall {
id,
name,
arguments,
});
}
out
}
fn apply_stream_json_event<F>(
v: &serde_json::Value,
out: &mut String,
tool_calls: &mut HashMap<String, (Option<String>, Option<String>, String)>,
finished_tool_calls: &mut Vec<ToolCall>,
body: &serde_json::Value,
on_event: &mut F,
) -> Option<Response>
where
F: FnMut(Event) + ?Sized,
{
let t = v.get("type").and_then(|x| x.as_str()).unwrap_or("");
if t == "content_block_start" {
let idx = v
.get("index")
.map(|x| x.to_string())
.unwrap_or_else(|| "0".to_string());
if let Some(block) = v.get("content_block")
&& block.get("type").and_then(|x| x.as_str()) == Some("tool_use")
{
let id = block
.get("id")
.and_then(|x| x.as_str())
.map(|s| s.to_string());
let name = block
.get("name")
.and_then(|x| x.as_str())
.map(|s| s.to_string());
let raw = match block.get("input") {
Some(input) if input.as_object().map(|obj| obj.is_empty()).unwrap_or(false) => {
String::new()
}
Some(input) => input.to_string(),
None => String::new(),
};
anthropic_stream_dbg(format!(
"content_block_start tool_use index={idx} id={id:?} name={name:?} input_seed_len={}",
raw.len()
));
tool_calls.insert(idx, (id, name, raw));
}
} else if t == "content_block_delta" {
if let Some(delta) = v.get("delta") {
let dt = delta.get("type").and_then(|x| x.as_str()).unwrap_or("");
if dt == "text_delta"
&& let Some(text) = delta.get("text").and_then(|x| x.as_str())
{
out.push_str(text);
on_event(Event::TextDelta(text.to_string()));
} else if dt == "input_json_delta"
&& let Some(partial) = delta.get("partial_json").and_then(|x| x.as_str())
{
let idx = v
.get("index")
.map(|x| x.to_string())
.unwrap_or_else(|| "0".to_string());
let entry =
tool_calls
.entry(idx.clone())
.or_insert((None, None, String::new()));
entry.2.push_str(partial);
anthropic_stream_dbg(format!(
"input_json_delta index={idx} partial_len={} accumulated_len={}",
partial.len(),
entry.2.len()
));
}
}
} else if t == "content_block_stop" {
let idx = v
.get("index")
.map(|x| x.to_string())
.unwrap_or_else(|| "0".to_string());
if let Some((id, name_opt, raw)) = tool_calls.remove(&idx) {
let name = name_opt.unwrap_or_else(|| "tool".to_string());
if raw.is_empty() {
anthropic_stream_dbg(format!(
"content_block_stop index={idx} tool={name:?} id={id:?} raw empty — skip ToolCall"
));
} else {
match serde_json::from_str::<serde_json::Value>(&raw) {
Ok(args) => {
anthropic_stream_dbg(format!(
"content_block_stop index={idx} tool={name:?} parsed args ok (raw_len={})",
raw.len()
));
let tc = ToolCall {
id,
name,
arguments: args,
};
on_event(Event::ToolCall(tc.clone()));
finished_tool_calls.push(tc);
}
Err(e) => anthropic_stream_dbg(format!(
"content_block_stop index={idx} tool={name:?} JSON parse FAILED: {e}; raw_len={} raw_prefix={:?}",
raw.len(),
dbg_trim(&raw, 200)
)),
}
}
}
} else if t == "message_stop" {
for (_idx, (id, name_opt, raw)) in tool_calls.drain() {
let name = name_opt.unwrap_or_else(|| "tool".to_string());
if raw.is_empty() {
continue;
}
match serde_json::from_str::<serde_json::Value>(&raw) {
Ok(args) => {
let tc = ToolCall {
id,
name,
arguments: args,
};
on_event(Event::ToolCall(tc.clone()));
finished_tool_calls.push(tc);
}
Err(e) => anthropic_stream_dbg(format!(
"message_stop flush tool={name:?} JSON parse FAILED: {e}; raw_prefix={:?}",
dbg_trim(&raw, 200)
)),
}
}
let resp = Response {
model: Model::new(body["model"].as_str().unwrap_or(DEFAULT_ANTHROPIC_MODEL)),
message: Message::text(Role::Assistant, out.clone()),
tool_calls: std::mem::take(finished_tool_calls),
metadata: serde_json::Value::Null,
#[cfg(feature = "raw-json")]
raw_json: None,
};
anthropic_stream_dbg(format!(
"message_stop assistant_text_len={} finished_tool_calls={}",
out.len(),
resp.tool_calls.len()
));
on_event(Event::Completed(resp.clone()));
return Some(resp);
}
None
}
fn drain_stream_frames<F>(
frames: &mut Vec<SseFrame>,
out: &mut String,
tool_calls: &mut HashMap<String, (Option<String>, Option<String>, String)>,
finished_tool_calls: &mut Vec<ToolCall>,
body: &serde_json::Value,
on_event: &mut F,
) -> Option<Response>
where
F: FnMut(Event) + ?Sized,
{
for frame in frames.drain(..) {
let v: serde_json::Value = match serde_json::from_str(&frame.data) {
Ok(v) => v,
Err(e) => {
anthropic_stream_dbg(format!(
"skip SSE frame: JSON parse error: {e}; data_prefix={:?}",
dbg_trim(&frame.data, 240)
));
continue;
}
};
if let Some(resp) = Self::apply_stream_json_event(
&v,
out,
tool_calls,
finished_tool_calls,
body,
on_event,
) {
return Some(resp);
}
}
None
}
}
#[async_trait::async_trait(?Send)]
impl ProviderApi for AnthropicApi {
fn provider(&self) -> Provider {
Provider::Anthropic
}
async fn send(&self, http: &reqwest::Client, req: ResponseRequest) -> Result<Response> {
AnthropicProvider
.send(http, &self.api_key, &self.base_url, req)
.await
}
async fn stream(
&self,
http: &reqwest::Client,
req: ResponseRequest,
on_event: &mut dyn FnMut(Event),
) -> Result<Response> {
AnthropicProvider
.stream(http, &self.api_key, &self.base_url, req, on_event)
.await
}
async fn list_models(&self, http: &reqwest::Client) -> Result<Vec<ModelInfo>> {
AnthropicProvider
.list_models(http, &self.api_key, &self.base_url)
.await
}
}
impl ListModels for AnthropicProvider {
fn list_models(
&self,
http: &reqwest::Client,
api_key: &str,
base_url: &str,
) -> impl std::future::Future<Output = Result<Vec<ModelInfo>>> + Send {
let url = Self::models_url(base_url);
let http = http.clone();
let api_key = api_key.to_string();
async move {
let resp = http
.get(url)
.header("x-api-key", api_key)
.header("anthropic-version", "2023-06-01")
.send()
.await?;
let status = resp.status();
let text = resp.text().await?;
if !status.is_success() {
return Err(Error::Api {
provider: Provider::Anthropic,
status: status.as_u16(),
body: text,
});
}
let parsed: AnthropicModelsResponse = serde_json::from_str(&text)?;
Ok(Self::model_infos_from_response(parsed))
}
}
}
impl ProviderClient for AnthropicProvider {
async fn send(
&self,
http: &reqwest::Client,
api_key: &str,
base_url: &str,
req: ResponseRequest,
) -> Result<Response> {
let url = Self::url(base_url);
let system = Self::system_from_messages(&req.messages);
let messages = Self::messages_from_messages(&req.messages);
let model = match req.model {
Some(m) => m.0,
None => Self::resolve_model(http, api_key, base_url).await,
};
let max_tokens = req.max_output_tokens.unwrap_or(1024);
let mut body = serde_json::json!({
"model": model,
"max_tokens": max_tokens,
"messages": messages,
});
if let Some(system) = system {
body["system"] = serde_json::json!(system);
}
if !req.tools.is_empty() {
body["tools"] = serde_json::json!(Self::tools_from_tools(&req.tools));
}
let resp = http
.post(url)
.header("x-api-key", api_key)
.header("anthropic-version", "2023-06-01")
.json(&body)
.send()
.await?;
let status = resp.status();
let text = resp.text().await?;
if !status.is_success() {
return Err(Error::Api {
provider: Provider::Anthropic,
status: status.as_u16(),
body: text,
});
}
let v: serde_json::Value = serde_json::from_str(&text)?;
let mut out = String::new();
if let Some(arr) = v.get("content").and_then(|x| x.as_array()) {
for part in arr {
if part.get("type").and_then(|x| x.as_str()) == Some("text")
&& let Some(t) = part.get("text").and_then(|x| x.as_str())
{
out.push_str(t);
}
}
}
let tool_calls = Self::extract_tool_calls(&v);
Ok(Response {
model: Model::new(body["model"].as_str().unwrap_or(DEFAULT_ANTHROPIC_MODEL)),
message: Message::text(Role::Assistant, out),
tool_calls,
metadata: serde_json::Value::Null,
#[cfg(feature = "raw-json")]
raw_json: Some(v),
})
}
async fn stream<F>(
&self,
http: &reqwest::Client,
api_key: &str,
base_url: &str,
req: ResponseRequest,
on_event: &mut F,
) -> Result<Response>
where
F: FnMut(Event) + ?Sized,
{
let url = Self::url(base_url);
let system = Self::system_from_messages(&req.messages);
let messages = Self::messages_from_messages(&req.messages);
let model = match req.model {
Some(m) => m.0,
None => Self::resolve_model(http, api_key, base_url).await,
};
let max_tokens = req.max_output_tokens.unwrap_or(1024);
let mut body = serde_json::json!({
"model": model,
"max_tokens": max_tokens,
"messages": messages,
"stream": true,
});
if let Some(system) = system {
body["system"] = serde_json::json!(system);
}
if !req.tools.is_empty() {
body["tools"] = serde_json::json!(Self::tools_from_tools(&req.tools));
}
let resp = http
.post(url)
.header("x-api-key", api_key)
.header("anthropic-version", "2023-06-01")
.json(&body)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await?;
return Err(Error::Api {
provider: Provider::Anthropic,
status: status.as_u16(),
body: text,
});
}
anthropic_stream_dbg(format!(
"stream start model={} max_tokens={} tools_in_request={}",
body["model"].as_str().unwrap_or(DEFAULT_ANTHROPIC_MODEL),
max_tokens,
req.tools.len()
));
let mut out = String::new();
let mut finished_tool_calls: Vec<ToolCall> = Vec::new();
let mut tool_calls: HashMap<String, (Option<String>, Option<String>, String)> =
HashMap::new();
let mut data_buf = String::new();
let mut frames = Vec::new();
let mut bytes = resp.bytes_stream();
while let Some(chunk) = bytes.next().await {
let chunk = chunk?;
let s = String::from_utf8_lossy(&chunk);
data_buf.push_str(&s);
drain_sse_frames(&mut data_buf, &mut frames)?;
if let Some(resp) = Self::drain_stream_frames(
&mut frames,
&mut out,
&mut tool_calls,
&mut finished_tool_calls,
&body,
on_event,
) {
return Ok(resp);
}
}
if !data_buf.is_empty() {
anthropic_stream_dbg(format!(
"eof: flushing trailing SSE buffer ({} bytes) with synthetic delimiter",
data_buf.len()
));
data_buf.push_str("\n\n");
drain_sse_frames(&mut data_buf, &mut frames)?;
if let Some(resp) = Self::drain_stream_frames(
&mut frames,
&mut out,
&mut tool_calls,
&mut finished_tool_calls,
&body,
on_event,
) {
return Ok(resp);
}
}
anthropic_stream_dbg(format!(
"stream ended without message_stop (fallthrough) assistant_text_len={} finished_tool_calls={} pending_open_tool_blocks={}",
out.len(),
finished_tool_calls.len(),
tool_calls.len()
));
Ok(Response {
model: Model::new(body["model"].as_str().unwrap_or(DEFAULT_ANTHROPIC_MODEL)),
message: Message::text(Role::Assistant, out),
tool_calls: finished_tool_calls,
metadata: serde_json::Value::Null,
#[cfg(feature = "raw-json")]
raw_json: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sse::drain_sse_frames;
use std::collections::HashMap;
#[test]
fn streaming_tool_use_with_empty_placeholder_input_accumulates_partial_json() {
let body = serde_json::json!({ "model": "claude-test" });
let mut out = String::new();
let mut tool_calls_map = HashMap::new();
let mut finished_tool_calls = Vec::new();
let mut noop = |_ev: Event| {};
let start = serde_json::json!({
"type": "content_block_start",
"index": 0,
"content_block": {
"type": "tool_use",
"id": "toolu_test",
"name": "add",
"input": {}
}
});
assert!(
AnthropicProvider::apply_stream_json_event(
&start,
&mut out,
&mut tool_calls_map,
&mut finished_tool_calls,
&body,
&mut noop,
)
.is_none()
);
let delta = serde_json::json!({
"type": "content_block_delta",
"index": 0,
"delta": {
"type": "input_json_delta",
"partial_json": "{\"a\":19,\"b\":23}"
}
});
AnthropicProvider::apply_stream_json_event(
&delta,
&mut out,
&mut tool_calls_map,
&mut finished_tool_calls,
&body,
&mut noop,
);
let stop = serde_json::json!({
"type": "content_block_stop",
"index": 0
});
AnthropicProvider::apply_stream_json_event(
&stop,
&mut out,
&mut tool_calls_map,
&mut finished_tool_calls,
&body,
&mut noop,
);
assert_eq!(finished_tool_calls.len(), 1);
assert_eq!(finished_tool_calls[0].name, "add");
assert_eq!(
finished_tool_calls[0].arguments,
serde_json::json!({ "a": 19, "b": 23 })
);
}
#[test]
fn eof_flush_delivers_message_stop_without_trailing_blank_line() {
let body = serde_json::json!({ "model": "claude-test" });
let mut out = String::new();
let mut tool_calls = HashMap::new();
let mut finished_tool_calls = Vec::new();
let mut buf = String::from("event: message_stop\ndata: {\"type\":\"message_stop\"}\n");
let mut frames = Vec::new();
drain_sse_frames(&mut buf, &mut frames).unwrap();
assert!(
frames.is_empty(),
"without an SSE blank-line delimiter, no complete frame should emit"
);
buf.push_str("\n\n");
drain_sse_frames(&mut buf, &mut frames).unwrap();
let mut noop = |_ev: Event| {};
let stopped = AnthropicProvider::drain_stream_frames(
&mut frames,
&mut out,
&mut tool_calls,
&mut finished_tool_calls,
&body,
&mut noop,
)
.is_some();
assert!(
stopped,
"expected completed SSE frame to parse message_stop"
);
}
#[test]
fn parses_models_response_into_model_info() {
let json = r#"
{
"data": [
{
"id": "claude-x",
"display_name": "Claude X",
"created_at": "2026-01-01T00:00:00Z",
"max_input_tokens": 1000,
"max_tokens": 200
}
],
"has_more": false
}
"#;
let parsed: AnthropicModelsResponse = serde_json::from_str(json).unwrap();
let infos = AnthropicProvider::model_infos_from_response(parsed);
assert_eq!(infos.len(), 1);
assert_eq!(infos[0].id, "claude-x");
assert_eq!(infos[0].display_name.as_deref(), Some("Claude X"));
assert_eq!(infos[0].created_at.as_deref(), Some("2026-01-01T00:00:00Z"));
assert_eq!(infos[0].max_input_tokens, Some(1000));
assert_eq!(infos[0].max_output_tokens, Some(200));
}
#[test]
fn serializes_tool_use_and_tool_result_for_messages_api() {
let messages = vec![
Message::text(Role::User, "Add 1 and 2."),
Message::tool_call("toolu_01ABC", "add", serde_json::json!({ "a": 1, "b": 2 })),
Message::tool_result_named("toolu_01ABC", "add", serde_json::json!({ "sum": 3 })),
];
let out = AnthropicProvider::messages_from_messages(&messages);
assert_eq!(out.len(), 3);
assert_eq!(out[0]["role"], "user");
let u0 = out[0]["content"].as_array().unwrap();
assert_eq!(u0[0]["type"], "text");
assert_eq!(u0[0]["text"], "Add 1 and 2.");
assert_eq!(out[1]["role"], "assistant");
let a1 = out[1]["content"].as_array().unwrap();
assert_eq!(a1.len(), 1);
assert_eq!(a1[0]["type"], "tool_use");
assert_eq!(a1[0]["id"], "toolu_01ABC");
assert_eq!(a1[0]["name"], "add");
assert_eq!(a1[0]["input"]["a"], 1);
assert_eq!(a1[0]["input"]["b"], 2);
assert_eq!(out[2]["role"], "user");
let u2 = out[2]["content"].as_array().unwrap();
assert_eq!(u2.len(), 1);
assert_eq!(u2[0]["type"], "tool_result");
assert_eq!(u2[0]["tool_use_id"], "toolu_01ABC");
assert!(u2[0]["content"].as_str().is_some());
}
#[test]
fn serializes_tool_use_when_arguments_are_json_string() {
let m = Message::tool_call("toolu_xyz", "add", serde_json::json!("{\"a\":5,\"b\":7}"));
let out = AnthropicProvider::messages_from_messages(&[m]);
let block = &out[0]["content"].as_array().unwrap()[0];
assert_eq!(block["input"]["a"], 5);
assert_eq!(block["input"]["b"], 7);
}
}