use std::{
collections::HashMap,
convert::Infallible,
env,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use axum::{
Json, Router,
body::Body,
extract::State,
http::{HeaderMap, HeaderName, HeaderValue, StatusCode},
response::IntoResponse,
routing::{get, post},
};
use bytes::Bytes;
use futures::{StreamExt, stream};
use llm_bridge_core::{
model::{ApiFormat, StreamDelta, StreamEvent, StreamState, TransformRequest},
stream::events_to_sse,
transform::{
anthropic_response_to_openai_response, anthropic_response_to_responses_response,
anthropic_to_openai, openai_response_to_anthropic_message, openai_to_anthropic,
responses_to_anthropic, transform_stream, transform_stream_to_openai,
transform_stream_to_openai_responses,
},
};
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
use serde_json::json;
use tower_http::limit::RequestBodyLimitLayer;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
struct UpstreamTarget {
name: String,
url: String,
api_key: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ActiveRoute {
Primary,
Backup,
}
#[derive(Debug, Clone)]
struct UpstreamRouter {
primary: UpstreamTarget,
backup: Option<UpstreamTarget>,
active: ActiveRoute,
primary_healthy: bool,
}
impl UpstreamRouter {
fn new(primary: UpstreamTarget, backup: Option<UpstreamTarget>) -> Self {
let primary_healthy = backup.is_some(); Self {
primary,
backup,
active: ActiveRoute::Primary,
primary_healthy,
}
}
fn active_target(&self) -> &UpstreamTarget {
match self.active {
ActiveRoute::Primary => &self.primary,
ActiveRoute::Backup => self.backup.as_ref().unwrap_or(&self.primary),
}
}
fn record_response_status(&mut self, status: StatusCode) {
if status == StatusCode::TOO_MANY_REQUESTS
&& self.backup.is_some()
&& self.active == ActiveRoute::Primary
{
warn!("primary upstream returned 429 — failing over to backup");
self.active = ActiveRoute::Backup;
self.primary_healthy = false;
}
}
fn mark_primary_healthy(&mut self) {
if self.backup.is_none() {
return;
}
if !self.primary_healthy {
info!("primary health check passed — failing back to primary");
self.primary_healthy = true;
self.active = ActiveRoute::Primary;
}
}
fn mark_primary_unhealthy(&mut self) {
if self.backup.is_none() {
return;
}
if self.active == ActiveRoute::Primary {
warn!("primary health check failed — failing over to backup");
self.active = ActiveRoute::Backup;
}
self.primary_healthy = false;
}
}
const MAX_LOGGED_UPSTREAM_ERROR_BODY_BYTES: usize = 8 * 1024;
const MAX_SSE_PENDING_BYTES: usize = 8 * 1024 * 1024;
const REDACTED_HEADER_VALUE: &str = "<redacted>";
#[cfg(test)]
const SYNTHETIC_THINKING_SIGNATURE: &str = "bGxtLWJyaWRnZS1zeW50aGV0aWMtdGhpbmtpbmctc2lnbmF0dXJl";
static NEXT_PROXY_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
fn estimate_tokens(body: &serde_json::Value) -> u64 {
let text = extract_text_from_json(body);
if text.is_empty() {
0
} else {
(text.len() as u64).div_ceil(4)
}
}
fn extract_text_from_json(value: &serde_json::Value) -> String {
match value {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Array(arr) => arr
.iter()
.map(extract_text_from_json)
.collect::<Vec<_>>()
.join(" "),
serde_json::Value::Object(map) => map
.values()
.map(extract_text_from_json)
.collect::<Vec<_>>()
.join(" "),
_ => String::new(),
}
}
fn is_sensitive_header(name: &str) -> bool {
matches!(
name.to_ascii_lowercase().as_str(),
"authorization" | "proxy-authorization" | "x-api-key"
)
}
fn redact_headers(headers: &HashMap<String, String>) -> HashMap<String, String> {
headers
.iter()
.map(|(name, value)| {
let redacted_value = if is_sensitive_header(name) {
REDACTED_HEADER_VALUE.to_string()
} else {
value.clone()
};
(name.clone(), redacted_value)
})
.collect()
}
fn format_upstream_error_body_for_log(body: &Bytes) -> String {
let bytes = body.as_ref();
let preview_bytes = bytes
.get(..MAX_LOGGED_UPSTREAM_ERROR_BODY_BYTES)
.unwrap_or(bytes);
let preview = String::from_utf8_lossy(preview_bytes).into_owned();
if bytes.len() > preview_bytes.len() {
format!(
"{preview}… <truncated {} bytes>",
bytes.len() - preview_bytes.len()
)
} else if preview.is_empty() {
"<empty>".to_string()
} else {
preview
}
}
fn map_http_status_to_anthropic_error_type(status: StatusCode) -> &'static str {
match status.as_u16() {
400 => "invalid_request_error",
401 => "authentication_error",
402 => "billing_error",
403 => "permission_error",
404 => "not_found_error",
413 => "request_too_large",
429 => "rate_limit_error",
504 => "timeout_error",
529 => "overloaded_error",
_ => "api_error",
}
}
fn build_anthropic_error_response(
status: StatusCode,
message: impl Into<String>,
) -> (StatusCode, Json<serde_json::Value>) {
let error_type = map_http_status_to_anthropic_error_type(status);
let body = json!({
"type": "error",
"error": {
"type": error_type,
"message": message.into(),
},
});
(status, Json(body))
}
fn build_openai_error_response(
status: StatusCode,
message: impl Into<String>,
) -> (StatusCode, Json<serde_json::Value>) {
let error_type = map_http_status_to_anthropic_error_type(status);
let body = json!({
"error": {
"message": message.into(),
"type": error_type,
"code": serde_json::Value::Null,
}
});
(status, Json(body))
}
fn transform_anthropic_response_to_openai_completion(body: &Bytes) -> Result<Bytes, String> {
let req = TransformRequest {
headers: HashMap::new(),
path: "/v1/messages".to_string(),
body: body.clone(),
};
anthropic_response_to_openai_response(&req)
.map(|response| response.body)
.map_err(|e| format!("failed to transform upstream Anthropic response body: {e}"))
}
fn transform_anthropic_response_to_openai_responses(body: &Bytes) -> Result<Bytes, String> {
let req = TransformRequest {
headers: HashMap::new(),
path: "/v1/messages".to_string(),
body: body.clone(),
};
anthropic_response_to_responses_response(&req)
.map(|response| response.body)
.map_err(|e| format!("failed to transform upstream Anthropic response body: {e}"))
}
fn transform_openai_response_to_anthropic_message(body: &Bytes) -> Result<Bytes, String> {
let req = TransformRequest {
headers: HashMap::new(),
path: "/v1/chat/completions".to_string(),
body: body.clone(),
};
openai_response_to_anthropic_message(&req)
.map(|response| response.body)
.map_err(|e| format!("failed to transform upstream OpenAI response body: {e}"))
}
#[allow(clippy::too_many_lines)]
fn transform_anthropic_message_to_sse(body: &Bytes) -> Result<Bytes, String> {
let response: serde_json::Value = serde_json::from_slice(body)
.map_err(|e| format!("failed to parse Anthropic message response body: {e}"))?;
let role = response
.get("role")
.and_then(serde_json::Value::as_str)
.unwrap_or("assistant")
.to_string();
let input_tokens = response
.get("usage")
.and_then(|usage| usage.get("input_tokens"))
.and_then(serde_json::Value::as_u64)
.unwrap_or_default();
let output_tokens = response
.get("usage")
.and_then(|usage| usage.get("output_tokens"))
.and_then(serde_json::Value::as_u64)
.unwrap_or_default();
let message_id = response
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("msg_llm_bridge")
.to_string();
let model = response
.get("model")
.and_then(serde_json::Value::as_str)
.unwrap_or("llm-bridge")
.to_string();
let stop_reason = response
.get("stop_reason")
.and_then(serde_json::Value::as_str)
.and_then(|s| match s {
"end_turn" => Some(llm_bridge_core::model::StopReason::EndTurn),
"max_tokens" => Some(llm_bridge_core::model::StopReason::MaxTokens),
"tool_use" => Some(llm_bridge_core::model::StopReason::ToolUse),
"content_filter" => Some(llm_bridge_core::model::StopReason::ContentFilter),
_ => None,
});
let mut events = Vec::new();
events.push(StreamEvent::MessageStart {
role,
message_id,
model,
usage: llm_bridge_core::model::Usage {
input_tokens,
output_tokens: 0,
},
});
if let Some(content_blocks) = response
.get("content")
.and_then(serde_json::Value::as_array)
{
for (index, block) in content_blocks.iter().enumerate() {
let block_type = block
.get("type")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
match block_type {
"thinking" => {
let thinking = block
.get("thinking")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
let signature = block
.get("signature")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
events.push(StreamEvent::ContentBlockStart {
index,
content_block: llm_bridge_core::model::ContentBlock::Thinking {
text: String::new(),
usage: None,
},
});
if !thinking.is_empty() {
events.push(StreamEvent::ContentBlockDelta {
index,
delta: StreamDelta::Thinking {
thinking: thinking.to_string(),
},
});
}
if !signature.is_empty() {
events.push(StreamEvent::ContentBlockDelta {
index,
delta: StreamDelta::Signature {
signature: signature.to_string(),
},
});
}
events.push(StreamEvent::ContentBlockStop { index });
}
"text" => {
let text = block
.get("text")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
events.push(StreamEvent::ContentBlockStart {
index,
content_block: llm_bridge_core::model::ContentBlock::Text {
text: String::new(),
},
});
if !text.is_empty() {
events.push(StreamEvent::ContentBlockDelta {
index,
delta: StreamDelta::Text {
text: text.to_string(),
},
});
}
events.push(StreamEvent::ContentBlockStop { index });
}
"tool_use" => {
let id = block
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("")
.to_string();
let name = block
.get("name")
.and_then(serde_json::Value::as_str)
.unwrap_or("")
.to_string();
let input = block
.get("input")
.cloned()
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
events.push(StreamEvent::ContentBlockStart {
index,
content_block: llm_bridge_core::model::ContentBlock::ToolUse {
id,
name,
input,
},
});
events.push(StreamEvent::ContentBlockStop { index });
}
_ => {}
}
}
}
events.push(StreamEvent::MessageDelta {
stop_reason,
stop_sequence: None,
usage: llm_bridge_core::model::Usage {
input_tokens,
output_tokens,
},
});
events.push(StreamEvent::MessageStop);
Ok(Bytes::from(events_to_sse(&events)))
}
#[allow(clippy::too_many_lines)]
fn transform_openai_completion_to_sse(body: &Bytes) -> Result<Bytes, String> {
let response: serde_json::Value = serde_json::from_slice(body)
.map_err(|e| format!("failed to parse OpenAI completion response body: {e}"))?;
let id = response
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("chatcmpl_llm_bridge");
let model = response
.get("model")
.and_then(serde_json::Value::as_str)
.unwrap_or("llm-bridge");
let choice = response
.get("choices")
.and_then(serde_json::Value::as_array)
.and_then(|choices| choices.first())
.ok_or_else(|| "missing choices[0] in OpenAI completion response".to_string())?;
let message = choice
.get("message")
.and_then(serde_json::Value::as_object)
.ok_or_else(|| "missing choices[0].message in OpenAI completion response".to_string())?;
let finish_reason = choice
.get("finish_reason")
.and_then(serde_json::Value::as_str);
let usage = response.get("usage").cloned();
let mut chunks = Vec::new();
chunks.push(json!({
"id": id,
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"index": 0,
"delta": {
"role": message
.get("role")
.and_then(serde_json::Value::as_str)
.unwrap_or("assistant"),
}
}],
}));
if let Some(reasoning_content) = message
.get("reasoning_content")
.and_then(serde_json::Value::as_str)
.filter(|value| !value.is_empty())
{
chunks.push(json!({
"id": id,
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"index": 0,
"delta": {
"reasoning_content": reasoning_content,
}
}],
}));
}
if let Some(content) = message
.get("content")
.and_then(serde_json::Value::as_str)
.filter(|value| !value.is_empty())
{
chunks.push(json!({
"id": id,
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": content,
}
}],
}));
}
if let Some(tool_calls) = message
.get("tool_calls")
.and_then(serde_json::Value::as_array)
{
for tool_call in tool_calls {
chunks.push(json!({
"id": id,
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"index": 0,
"delta": {
"tool_calls": [tool_call],
}
}],
}));
}
}
let mut final_chunk = json!({
"id": id,
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": finish_reason,
}],
});
if let Some(usage) = usage
&& let Some(object) = final_chunk.as_object_mut()
{
object.insert("usage".to_string(), usage);
}
chunks.push(final_chunk);
let mut out = Vec::with_capacity(1024);
for chunk in chunks {
let serialized = serde_json::to_vec(&chunk)
.map_err(|e| format!("failed to serialize OpenAI SSE chunk: {e}"))?;
out.extend_from_slice(b"data: ");
out.extend_from_slice(&serialized);
out.extend_from_slice(b"\n\n");
}
out.extend_from_slice(b"data: [DONE]\n\n");
Ok(Bytes::from(out))
}
#[allow(clippy::too_many_lines)]
fn transform_openai_responses_to_sse(body: &Bytes) -> Result<Bytes, String> {
let response: serde_json::Value = serde_json::from_slice(body)
.map_err(|e| format!("failed to parse OpenAI Responses body: {e}"))?;
let response_id = response
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("resp_llm_bridge");
let created_at = response
.get("created_at")
.and_then(serde_json::Value::as_u64)
.unwrap_or_default();
let model = response
.get("model")
.and_then(serde_json::Value::as_str)
.unwrap_or("llm-bridge");
let status = response
.get("status")
.and_then(serde_json::Value::as_str)
.unwrap_or("completed");
let output = response
.get("output")
.and_then(serde_json::Value::as_array)
.ok_or_else(|| "missing output in OpenAI Responses body".to_string())?;
let mut sequence_number = 0_u64;
let mut events = Vec::new();
events.push(json!({
"type": "response.created",
"sequence_number": sequence_number,
"response": {
"id": response_id,
"object": "response",
"created_at": created_at,
"status": "in_progress",
"model": model,
"output": [],
"output_text": "",
"usage": response.get("usage").cloned().unwrap_or_else(|| json!({})),
},
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.in_progress",
"sequence_number": sequence_number,
"response": {
"id": response_id,
"object": "response",
"created_at": created_at,
"status": "in_progress",
"model": model,
"output": [],
"output_text": "",
"usage": response.get("usage").cloned().unwrap_or_else(|| json!({})),
},
}));
for (output_index, item) in output.iter().enumerate() {
sequence_number = sequence_number.saturating_add(1);
let item_type = item
.get("type")
.and_then(serde_json::Value::as_str)
.unwrap_or_default();
match item_type {
"message" => {
let item_id = item
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("msg_llm_bridge");
let content = item
.get("content")
.and_then(serde_json::Value::as_array)
.and_then(|parts| parts.first())
.cloned()
.ok_or_else(|| "missing content[0] in Responses message item".to_string())?;
let part_type = content
.get("type")
.and_then(serde_json::Value::as_str)
.unwrap_or_default();
let text = content
.get("text")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
events.push(json!({
"type": "response.output_item.added",
"sequence_number": sequence_number,
"output_index": output_index,
"item": {
"id": item_id,
"type": "message",
"role": "assistant",
"status": "in_progress",
"content": [],
},
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.content_part.added",
"sequence_number": sequence_number,
"output_index": output_index,
"item_id": item_id,
"content_index": 0,
"part": if part_type == "output_text" {
json!({ "type": "output_text", "text": "", "annotations": [] })
} else {
json!({ "type": "reasoning_text", "text": "" })
},
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": if part_type == "output_text" {
"response.output_text.delta"
} else {
"response.reasoning_text.delta"
},
"sequence_number": sequence_number,
"output_index": output_index,
"item_id": item_id,
"content_index": 0,
"delta": text,
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": if part_type == "output_text" {
"response.output_text.done"
} else {
"response.reasoning_text.done"
},
"sequence_number": sequence_number,
"output_index": output_index,
"item_id": item_id,
"content_index": 0,
"text": text,
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.content_part.done",
"sequence_number": sequence_number,
"output_index": output_index,
"item_id": item_id,
"content_index": 0,
"part": content,
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.output_item.done",
"sequence_number": sequence_number,
"output_index": output_index,
"item": item,
}));
}
"function_call" => {
let item_id = item
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("fc_llm_bridge");
let name = item
.get("name")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
let arguments = item
.get("arguments")
.and_then(serde_json::Value::as_str)
.unwrap_or("");
events.push(json!({
"type": "response.output_item.added",
"sequence_number": sequence_number,
"output_index": output_index,
"item": {
"id": item_id,
"type": "function_call",
"call_id": item.get("call_id").cloned().unwrap_or(serde_json::Value::Null),
"name": name,
"arguments": "",
"status": "in_progress",
},
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.function_call_arguments.delta",
"sequence_number": sequence_number,
"output_index": output_index,
"item_id": item_id,
"delta": arguments,
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.function_call_arguments.done",
"sequence_number": sequence_number,
"output_index": output_index,
"item_id": item_id,
"name": name,
"arguments": arguments,
}));
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": "response.output_item.done",
"sequence_number": sequence_number,
"output_index": output_index,
"item": item,
}));
}
_ => {}
}
}
sequence_number = sequence_number.saturating_add(1);
events.push(json!({
"type": if status == "incomplete" {
"response.incomplete"
} else {
"response.completed"
},
"sequence_number": sequence_number,
"response": response,
}));
let mut out = Vec::with_capacity(1024);
for event in events {
let serialized = serde_json::to_vec(&event)
.map_err(|e| format!("failed to serialize Responses SSE event: {e}"))?;
out.extend_from_slice(b"data: ");
out.extend_from_slice(&serialized);
out.extend_from_slice(b"\n\n");
}
out.extend_from_slice(b"data: [DONE]\n\n");
Ok(Bytes::from(out))
}
fn is_streaming_request(body: &Bytes) -> bool {
serde_json::from_slice::<serde_json::Value>(body)
.ok()
.and_then(|value| value.get("stream").and_then(serde_json::Value::as_bool))
.unwrap_or(false)
}
fn requested_model(body: &Bytes) -> Option<String> {
serde_json::from_slice::<serde_json::Value>(body)
.ok()
.and_then(|value| {
value
.get("model")
.and_then(serde_json::Value::as_str)
.map(str::to_string)
})
}
#[cfg(test)]
fn anthropic_request_has_thinking(body: &Bytes) -> bool {
serde_json::from_slice::<serde_json::Value>(body)
.ok()
.and_then(|value| value.get("thinking").cloned())
.is_some()
}
fn is_dashscope_upstream(upstream_url: &str) -> bool {
reqwest::Url::parse(upstream_url)
.ok()
.and_then(|url| {
url.host_str()
.map(|host| host.ends_with("dashscope.aliyuncs.com"))
})
.unwrap_or_else(|| upstream_url.contains("dashscope.aliyuncs.com"))
}
fn is_anthropic_upstream(upstream_url: &str) -> bool {
reqwest::Url::parse(upstream_url)
.ok()
.and_then(|url| url.host_str().map(|host| host == "api.anthropic.com"))
.unwrap_or_else(|| upstream_url.contains("api.anthropic.com"))
}
fn maybe_disable_dashscope_thinking(upstream_url: &str, openai_body: &Bytes) -> Bytes {
if !is_dashscope_upstream(upstream_url) {
return openai_body.clone();
}
let Ok(mut value) = serde_json::from_slice::<serde_json::Value>(openai_body) else {
return openai_body.clone();
};
let Some(object) = value.as_object_mut() else {
return openai_body.clone();
};
if object.get("enable_thinking").is_some() {
return openai_body.clone();
}
object.insert(
"enable_thinking".to_string(),
serde_json::Value::Bool(false),
);
serde_json::to_vec(&value).map_or_else(|_| openai_body.clone(), Bytes::from)
}
fn should_log_raw_anthropic_sse() -> bool {
env::var("DEBUG_ANTHROPIC_SSE")
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
fn maybe_log_raw_anthropic_sse_chunk(label: &str, bytes: &[u8]) {
if !should_log_raw_anthropic_sse() {
return;
}
let raw = String::from_utf8_lossy(bytes);
info!(label, raw_sse = %raw, "← downstream anthropic SSE raw");
}
fn maybe_log_raw_upstream_sse_chunk(
proxy_request_id: u64,
label: &str,
bytes: &[u8],
pending_len: usize,
) {
if !should_log_raw_anthropic_sse() {
return;
}
let raw = String::from_utf8_lossy(bytes);
info!(
proxy_request_id,
label,
chunk_bytes = bytes.len(),
pending_len,
raw_upstream_sse = %raw,
"← upstream raw SSE chunk"
);
}
fn is_event_stream_content_type(content_type: Option<&str>) -> bool {
content_type.is_some_and(|value| value.to_ascii_lowercase().contains("text/event-stream"))
}
fn find_last_sse_frame_boundary(buffer: &[u8]) -> Option<usize> {
let mut last_boundary = None;
for i in 0..buffer.len().saturating_sub(1) {
if buffer[i] == b'\n' && buffer[i + 1] == b'\n' {
last_boundary = Some(i + 2);
}
}
for i in 0..buffer.len().saturating_sub(3) {
if &buffer[i..i + 4] == b"\r\n\r\n" {
last_boundary = Some(i + 4);
}
}
last_boundary
}
fn take_complete_sse_frames(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
let boundary = find_last_sse_frame_boundary(buffer)?;
Some(buffer.drain(..boundary).collect())
}
fn extract_sse_event_types(bytes: &[u8]) -> Vec<String> {
String::from_utf8_lossy(bytes)
.lines()
.filter_map(|line| line.trim().strip_prefix("event:"))
.map(|event_type| event_type.trim().to_string())
.collect()
}
fn transform_upstream_error_body_to_anthropic(body: &Bytes, status: StatusCode) -> Bytes {
let message = if let Ok(json) = serde_json::from_slice::<serde_json::Value>(body) {
json.get("error")
.and_then(|e| e.get("message"))
.or_else(|| json.get("error"))
.and_then(serde_json::Value::as_str)
.map_or_else(|| String::from_utf8_lossy(body).into_owned(), String::from)
} else {
String::from_utf8_lossy(body).into_owned()
};
let error_type = map_http_status_to_anthropic_error_type(status);
let anthropic_error = json!({
"type": "error",
"error": {
"type": error_type,
"message": message,
},
});
serde_json::to_vec(&anthropic_error).map_or_else(|_| body.clone(), Bytes::from)
}
fn transform_upstream_error_body_to_openai(body: &Bytes, status: StatusCode) -> Bytes {
let message = if let Ok(json) = serde_json::from_slice::<serde_json::Value>(body) {
json.get("error")
.and_then(|e| e.get("message"))
.or_else(|| json.get("error"))
.and_then(serde_json::Value::as_str)
.map_or_else(|| String::from_utf8_lossy(body).into_owned(), String::from)
} else {
String::from_utf8_lossy(body).into_owned()
};
let error_type = map_http_status_to_anthropic_error_type(status);
let openai_error = json!({
"error": {
"message": message,
"type": error_type,
"code": serde_json::Value::Null,
}
});
serde_json::to_vec(&openai_error).map_or_else(|_| body.clone(), Bytes::from)
}
fn build_anthropic_stream_error_event(
message: String,
message_started: bool,
error_type: &str,
) -> Bytes {
let mut events = vec![StreamEvent::Error {
error_type: error_type.to_string(),
message,
}];
if message_started {
events.push(StreamEvent::MessageStop);
}
Bytes::from(events_to_sse(&events))
}
#[allow(clippy::needless_pass_by_value)]
fn build_openai_stream_error_event(message: String, error_type: &str) -> Bytes {
let error = json!({
"error": {
"message": message,
"type": error_type,
"code": serde_json::Value::Null,
}
});
match serde_json::to_string(&error) {
Ok(serialized) => Bytes::from(format!("data: {serialized}\n\ndata: [DONE]\n\n")),
Err(_) => Bytes::from_static(b"data: [DONE]\n\n"),
}
}
#[allow(clippy::needless_pass_by_value)]
fn build_openai_responses_stream_error_event(message: String, error_type: &str) -> Bytes {
let error = json!({
"type": "error",
"sequence_number": 0,
"code": error_type,
"message": message,
"param": serde_json::Value::Null,
});
match serde_json::to_string(&error) {
Ok(serialized) => Bytes::from(format!("data: {serialized}\n\ndata: [DONE]\n\n")),
Err(_) => Bytes::from_static(b"data: [DONE]\n\n"),
}
}
fn should_forward_client_header(name: &str) -> bool {
!matches!(
name.to_ascii_lowercase().as_str(),
"host"
| "content-length"
| "connection"
| "proxy-connection"
| "keep-alive"
| "transfer-encoding"
| "te"
| "trailer"
| "upgrade"
| "accept-encoding"
| "x-api-key"
| "authorization"
| "proxy-authorization"
)
}
fn should_forward_transformed_header(name: &str) -> bool {
!matches!(
name.to_ascii_lowercase().as_str(),
"content-length"
| "connection"
| "proxy-connection"
| "keep-alive"
| "transfer-encoding"
| "te"
| "trailer"
| "upgrade"
| "authorization"
| "proxy-authorization"
)
}
fn build_upstream_headers(
client_headers: &HashMap<String, String>,
transformed_headers: &HashMap<String, String>,
upstream_api_key: &str,
request_is_streaming: bool,
) -> HashMap<String, String> {
let mut final_headers: HashMap<String, String> = HashMap::new();
for (name, value) in client_headers {
if should_forward_client_header(name) {
final_headers.insert(name.clone(), value.clone());
}
}
for (name, value) in transformed_headers {
if should_forward_transformed_header(name) {
final_headers.insert(name.clone(), value.clone());
}
}
final_headers.insert(
"Authorization".to_string(),
format!("Bearer {upstream_api_key}"),
);
if request_is_streaming {
final_headers.insert("accept".to_string(), "text/event-stream".to_string());
}
final_headers
}
fn build_anthropic_upstream_headers(
client_headers: &HashMap<String, String>,
transformed_headers: &HashMap<String, String>,
upstream_api_key: &str,
request_is_streaming: bool,
) -> HashMap<String, String> {
let mut final_headers: HashMap<String, String> = HashMap::new();
for (name, value) in client_headers {
if should_forward_client_header(name) {
final_headers.insert(name.clone(), value.clone());
}
}
for (name, value) in transformed_headers {
if should_forward_transformed_header(name) {
final_headers.insert(name.clone(), value.clone());
}
}
final_headers.remove("authorization");
final_headers.remove("Authorization");
final_headers.insert("x-api-key".to_string(), upstream_api_key.to_string());
if request_is_streaming {
final_headers.insert("accept".to_string(), "text/event-stream".to_string());
}
final_headers
}
type SharedRouter = Arc<tokio::sync::Mutex<UpstreamRouter>>;
#[derive(Clone)]
struct ProxyConfig {
upstream_url: String,
upstream_api_key: String,
proxy_api_key: Option<String>,
router: Option<SharedRouter>,
}
#[allow(dead_code)]
const PRIMARY_ANTHROPIC: &str = "https://coding.dashscope.aliyuncs.com/apps/anthropic";
const PRIMARY_OPENAI: &str = "https://coding.dashscope.aliyuncs.com/v1";
#[allow(dead_code)]
const BACKUP_ANTHROPIC: &str = "https://token-plan.cn-beijing.maas.aliyuncs.com/apps/anthropic";
const BACKUP_OPENAI: &str = "https://token-plan.cn-beijing.maas.aliyuncs.com/compatible-mode/v1";
impl ProxyConfig {
fn from_env() -> Self {
let primary_api_key = env::var("PRIMARY_API_KEY")
.or_else(|_| env::var("UPSTREAM_API_KEY"))
.expect("PRIMARY_API_KEY (or UPSTREAM_API_KEY) must be set");
let backup_api_key = env::var("BACKUP_API_KEY").expect("BACKUP_API_KEY must be set");
let proxy_api_key = env::var("PROXY_API_KEY").ok();
let primary_target = UpstreamTarget {
name: "primary".to_string(),
url: PRIMARY_OPENAI.trim_end_matches('/').to_string(),
api_key: primary_api_key.clone(),
};
let backup_target = UpstreamTarget {
name: "backup".to_string(),
url: BACKUP_OPENAI.trim_end_matches('/').to_string(),
api_key: backup_api_key,
};
info!(
primary_url = &primary_target.url,
backup_url = &backup_target.url,
"starting http proxy with primary/backup routing"
);
let router = Some(Arc::new(tokio::sync::Mutex::new(UpstreamRouter::new(
primary_target,
Some(backup_target),
))));
Self {
upstream_url: PRIMARY_OPENAI.trim_end_matches('/').to_string(),
upstream_api_key: primary_api_key,
proxy_api_key,
router,
}
}
async fn active_upstream(&self) -> UpstreamTarget {
if let Some(ref router) = self.router {
let guard = router.lock().await;
guard.active_target().clone()
} else {
UpstreamTarget {
name: "primary".to_string(),
url: self.upstream_url.clone(),
api_key: self.upstream_api_key.clone(),
}
}
}
async fn record_upstream_status(&self, status: StatusCode) {
if let Some(ref router) = self.router {
let mut guard = router.lock().await;
guard.record_response_status(status);
}
}
}
fn check_auth(config: &ProxyConfig, headers: &HeaderMap) -> Option<StatusCode> {
let Some(ref expected) = config.proxy_api_key else {
return None; };
let provided = headers
.get("x-api-key")
.or_else(|| headers.get("authorization"))
.and_then(|v| v.to_str().ok());
match provided {
Some(key) if key == expected || key == format!("Bearer {expected}") => None,
_ => Some(StatusCode::UNAUTHORIZED),
}
}
#[allow(clippy::too_many_lines)]
async fn handle_anthropic_passthrough(
proxy_request_id: u64,
active: &UpstreamTarget,
headers: &HeaderMap,
body: Bytes,
config: &ProxyConfig,
) -> axum::response::Result<impl IntoResponse> {
let is_streaming = is_streaming_request(&body);
let upstream_url = if active.url.ends_with("/v1") {
format!("{}/v1/messages", active.url)
} else {
format!("{}/v1/messages", active.url.trim_end_matches('/'))
};
let mut final_headers = HeaderMap::new();
final_headers.insert("x-api-key", active.api_key.parse().unwrap());
final_headers.insert("anthropic-version", "2023-06-01".parse().unwrap());
final_headers.insert("content-type", "application/json".parse().unwrap());
if is_streaming {
final_headers.insert("accept", "text/event-stream".parse().unwrap());
}
for (name, value) in headers {
let name_lower = name.to_string().to_lowercase();
if matches!(
name_lower.as_str(),
"authorization"
| "x-api-key"
| "anthropic-version"
| "content-type"
| "accept"
| "host"
) {
continue;
}
final_headers.insert(name.clone(), value.clone());
}
info!(
proxy_request_id,
upstream_url, is_streaming, "⏳ sending Anthropic passthrough upstream"
);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(30))
.http1_only()
.build()
.expect("build reqwest client");
let mut upstream_req = client.post(&upstream_url);
for (k, v) in &final_headers {
upstream_req = upstream_req.header(k.clone(), v.clone());
}
upstream_req = upstream_req.body(body);
let resp = match upstream_req.send().await {
Ok(r) => {
info!(
proxy_request_id,
status = r.status().as_u16(),
"✅ Anthropic upstream responded"
);
r
}
Err(e) => {
error!(proxy_request_id, error = %e, upstream_url, "Anthropic upstream request failed");
let status = if e.is_timeout() {
StatusCode::GATEWAY_TIMEOUT
} else {
StatusCode::BAD_GATEWAY
};
let message = if e.is_timeout() {
format!("upstream timeout: {e}")
} else {
format!("upstream failed: {e}")
};
let (status, err_body) = build_anthropic_error_response(status, message);
return Ok((status, err_body).into_response());
}
};
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let resp_content_type = resp
.headers()
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(ToString::to_string);
config.record_upstream_status(status).await;
if status.is_success() && is_event_stream_content_type(resp_content_type.as_deref()) {
info!(proxy_request_id, "↔ forwarding Anthropic SSE stream as-is");
let passthrough_stream = stream::unfold(
(proxy_request_id, resp.bytes_stream(), false),
|(proxy_request_id, mut upstream_stream, finished)| async move {
if finished {
info!(
proxy_request_id,
"✓ Anthropic passthrough SSE stream closed"
);
return None;
}
match upstream_stream.next().await {
Some(Ok(chunk)) => Some((
Ok::<Bytes, Infallible>(chunk),
(proxy_request_id, upstream_stream, false),
)),
Some(Err(e)) => {
error!(proxy_request_id, error = %e, "failed to read upstream SSE chunk");
let err = serde_json::to_string(&json!({
"type": "error",
"error": {
"type": "api_error",
"message": format!("upstream stream error: {e}"),
}
}))
.unwrap_or_default();
Some((
Ok(Bytes::from(format!("event: error\ndata: {err}\n\n"))),
(proxy_request_id, upstream_stream, true),
))
}
None => Some((Ok(Bytes::new()), (proxy_request_id, upstream_stream, true))),
}
},
);
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("text/event-stream"),
);
resp_headers.insert(
HeaderName::from_static("cache-control"),
HeaderValue::from_static("no-cache"),
);
return Ok((status, resp_headers, Body::from_stream(passthrough_stream)).into_response());
}
let resp_body = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
error!(proxy_request_id, error = %e, "failed to read upstream response body");
let (s, b) = build_anthropic_error_response(
StatusCode::BAD_GATEWAY,
"failed to read upstream response",
);
return Ok((s, b).into_response());
}
};
let mut resp_headers = HeaderMap::new();
resp_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
Ok((status, resp_headers, resp_body).into_response())
}
#[allow(clippy::too_many_lines)]
async fn handle_anthropic_request(
State(config): State<ProxyConfig>,
headers: HeaderMap,
body: Bytes,
) -> axum::response::Result<impl IntoResponse> {
let proxy_request_id = NEXT_PROXY_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
info!(
proxy_request_id,
debug_anthropic_sse = should_log_raw_anthropic_sse(),
"→ anthropic SSE raw debug flag"
);
if let Some(status) = check_auth(&config, &headers) {
let (status, body) = build_anthropic_error_response(status, "invalid API key");
return Ok((status, body).into_response());
}
let active = config.active_upstream().await;
info!(
proxy_request_id,
upstream_name = active.name,
upstream_url = active.url,
"→ resolved active upstream"
);
if is_anthropic_upstream(&active.url) {
info!(
proxy_request_id,
"→ upstream is Anthropic-native — passthrough mode"
);
return Ok(handle_anthropic_passthrough(
proxy_request_id,
&active,
&headers,
body,
&config,
)
.await
.into_response());
}
let mut req_headers: HashMap<String, String> = HashMap::new();
for (name, value) in &headers {
if let Ok(val_str) = value.to_str() {
req_headers.insert(name.to_string(), val_str.to_string());
}
}
let req = TransformRequest {
headers: req_headers,
path: "/v1/chat/completions".to_string(),
body,
};
let transformed = match anthropic_to_openai(&req) {
Ok(t) => t,
Err(e) => {
error!(proxy_request_id, error = %e, "transform failed");
let (status, body) = build_anthropic_error_response(
StatusCode::BAD_REQUEST,
format!("transform error: {e}"),
);
return Ok((status, body).into_response());
}
};
let upstream_request_body = maybe_disable_dashscope_thinking(&active.url, &transformed.body);
if upstream_request_body != transformed.body {
info!(
proxy_request_id,
"→ inserted default enable_thinking=false for DashScope upstream"
);
}
let request_is_streaming = is_streaming_request(&upstream_request_body);
let requested_anthropic_model = requested_model(&req.body);
info!(
proxy_request_id,
request_is_streaming,
requested_anthropic_model = ?requested_anthropic_model,
"→ request streaming mode"
);
if let Ok(transformed_json) =
serde_json::from_slice::<serde_json::Value>(&upstream_request_body)
{
let token_estimate = estimate_tokens(&transformed_json);
info!(
proxy_request_id,
estimated_tokens = token_estimate,
"→ estimated request tokens"
);
debug!(
proxy_request_id,
model = %transformed_json.get("model").map_or("unknown", |v| v.as_str().unwrap_or_default()),
messages_count = transformed_json.get("messages").map_or(0, |v| v.as_array().map_or(0, Vec::len)),
"→ transformed body metadata"
);
}
info!(
proxy_request_id,
upstream_headers = ?redact_headers(&transformed.headers),
"→ upstream request headers"
);
let transformed_path = if active.url.ends_with("/v1") && transformed.path.starts_with("/v1/") {
transformed
.path
.strip_prefix("/v1")
.unwrap_or(&transformed.path)
} else {
&transformed.path
};
let upstream_url = format!("{}{}", active.url, transformed_path);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(30))
.http1_only()
.build()
.expect("build reqwest client");
let final_headers = build_upstream_headers(
&req.headers,
&transformed.headers,
&active.api_key,
request_is_streaming,
);
info!(
proxy_request_id,
request_is_streaming,
final_accept = ?final_headers.get("accept"),
"→ upstream accept selection"
);
info!(
proxy_request_id,
upstream_headers = ?redact_headers(&final_headers),
upstream_url,
"→ upstream request"
);
info!(
proxy_request_id,
upstream_url, "⏳ sending upstream request..."
);
let mut upstream_req = client.post(&upstream_url);
for (k, v) in &final_headers {
upstream_req = upstream_req.header(k.clone(), v.clone());
}
upstream_req = upstream_req.body(upstream_request_body);
let resp = match upstream_req.send().await {
Ok(r) => {
let response_headers = r.headers();
let response_content_type = response_headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(std::string::ToString::to_string);
let response_content_encoding = response_headers
.get(CONTENT_ENCODING)
.and_then(|value| value.to_str().ok())
.map(std::string::ToString::to_string);
info!(
proxy_request_id,
status = r.status().as_u16(),
response_content_type = ?response_content_type,
response_content_encoding = ?response_content_encoding,
"✅ upstream responded"
);
r
}
Err(e) => {
error!(proxy_request_id, error = %e, upstream_url, "upstream request failed");
let status = if e.is_timeout() {
StatusCode::GATEWAY_TIMEOUT
} else {
StatusCode::BAD_GATEWAY
};
let message = if e.is_timeout() {
format!("upstream request timed out: {e}")
} else {
format!("upstream request failed: {e}")
};
let (status, body) = build_anthropic_error_response(status, message);
return Ok((status, body).into_response());
}
};
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let response_content_type = resp
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(std::string::ToString::to_string);
config.record_upstream_status(status).await;
if status.is_success() && is_event_stream_content_type(response_content_type.as_deref()) {
info!(proxy_request_id, "↔ transforming upstream SSE stream");
let transformed_stream = stream::unfold(
(
proxy_request_id,
resp.bytes_stream(),
Vec::new(),
StreamState {
model_name: requested_anthropic_model.clone(),
..StreamState::default()
},
false,
),
|(proxy_request_id, mut upstream_stream, mut pending, mut stream_state, finished)| async move {
if finished {
info!(proxy_request_id, "✓ downstream anthropic SSE stream closed");
return None;
}
loop {
match upstream_stream.next().await {
Some(Ok(chunk)) => {
maybe_log_raw_upstream_sse_chunk(
proxy_request_id,
"← upstream raw SSE transport chunk",
&chunk,
pending.len(),
);
pending.extend_from_slice(&chunk);
if pending.len() > MAX_SSE_PENDING_BYTES {
error!(
proxy_request_id,
pending_len = pending.len(),
"upstream SSE buffer exceeded — terminating stream"
);
return Some((
Ok::<Bytes, Infallible>(Bytes::from(
serde_json::to_string(&json!({
"error": {
"message": "upstream SSE buffer limit exceeded",
"type": "buffer_limit_exceeded",
}
}))
.unwrap_or_default(),
)),
(
proxy_request_id,
upstream_stream,
pending,
stream_state,
true,
),
));
}
let Some(complete_frames) = take_complete_sse_frames(&mut pending)
else {
info!(
proxy_request_id,
pending_len = pending.len(),
"… waiting for complete upstream SSE frame boundary"
);
continue;
};
maybe_log_raw_upstream_sse_chunk(
proxy_request_id,
"← upstream raw SSE complete frames",
&complete_frames,
pending.len(),
);
match transform_stream(
&complete_frames,
ApiFormat::OpenaiChat,
&mut stream_state,
) {
Ok(transformed_bytes) => {
if transformed_bytes.is_empty() {
continue;
}
let event_types = extract_sse_event_types(&transformed_bytes);
info!(
proxy_request_id,
anthropic_event_types = ?event_types,
chunk_bytes = transformed_bytes.len(),
contains_message_stop = event_types
.iter()
.any(|event_type| event_type == "message_stop"),
"← downstream anthropic SSE chunk"
);
if event_types
.iter()
.any(|event_type| event_type == "message_start")
|| event_types
.iter()
.any(|event_type| event_type == "message_stop")
{
maybe_log_raw_anthropic_sse_chunk(
"← downstream anthropic SSE raw chunk",
&transformed_bytes,
);
}
return Some((
Ok::<Bytes, Infallible>(Bytes::from(transformed_bytes)),
(
proxy_request_id,
upstream_stream,
pending,
stream_state,
false,
),
));
}
Err(e) => {
error!(
proxy_request_id,
error = %e,
"failed to transform upstream SSE chunk"
);
let error_bytes = build_anthropic_stream_error_event(
format!("failed to transform upstream SSE chunk: {e}"),
stream_state.started,
"api_error",
);
return Some((
Ok(error_bytes),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
}
}
Some(Err(e)) => {
error!(proxy_request_id, error = %e, "failed to read upstream SSE chunk");
let error_bytes = build_anthropic_stream_error_event(
format!("failed to read upstream SSE stream: {e}"),
stream_state.started,
"api_error",
);
return Some((
Ok(error_bytes),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
None => {
if pending.is_empty() {
info!(
proxy_request_id,
"✓ upstream SSE exhausted with no pending bytes"
);
return None;
}
match transform_stream(
&pending,
ApiFormat::OpenaiChat,
&mut stream_state,
) {
Ok(transformed_bytes) if transformed_bytes.is_empty() => {
return None;
}
Ok(transformed_bytes) => {
let event_types = extract_sse_event_types(&transformed_bytes);
info!(
proxy_request_id,
anthropic_event_types = ?event_types,
chunk_bytes = transformed_bytes.len(),
contains_message_stop = event_types
.iter()
.any(|event_type| event_type == "message_stop"),
"← downstream anthropic SSE final chunk"
);
maybe_log_raw_anthropic_sse_chunk(
"← downstream anthropic SSE raw final chunk",
&transformed_bytes,
);
return Some((
Ok(Bytes::from(transformed_bytes)),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
Err(e) => {
error!(
proxy_request_id,
error = %e,
"failed to finalize upstream SSE stream"
);
let error_bytes = build_anthropic_stream_error_event(
format!("failed to finalize upstream SSE stream: {e}"),
stream_state.started,
"api_error",
);
return Some((
Ok(error_bytes),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
}
}
}
}
},
);
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("text/event-stream"),
);
resp_headers.insert(
HeaderName::from_static("cache-control"),
HeaderValue::from_static("no-cache"),
);
return Ok((status, resp_headers, Body::from_stream(transformed_stream)).into_response());
}
if request_is_streaming {
error!(
status = status.as_u16(),
response_content_type = ?response_content_type,
"upstream returned non-SSE success response for streaming request"
);
}
info!("⏳ reading upstream body...");
let body_bytes = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
error!(error = %e, "failed to read upstream body");
let (status, body) = build_anthropic_error_response(
StatusCode::BAD_GATEWAY,
format!("failed to read upstream body: {e}"),
);
return Ok((status, body).into_response());
}
};
if !status.is_success() {
error!(
status = status.as_u16(),
upstream_error_body_bytes = body_bytes.len(),
upstream_error_body = %format_upstream_error_body_for_log(&body_bytes),
"← upstream error response body"
);
}
if let Ok(resp_json) = serde_json::from_slice::<serde_json::Value>(&body_bytes) {
let usage_tokens = resp_json
.get("usage")
.and_then(|u| u.get("total_tokens"))
.and_then(serde_json::Value::as_u64);
let token_estimate = estimate_tokens(&resp_json);
info!(
actual_tokens = usage_tokens,
estimated_tokens = token_estimate,
"← upstream response tokens"
);
}
let response_body = if status.is_success() {
match transform_openai_response_to_anthropic_message(&body_bytes) {
Ok(body) if request_is_streaming => match transform_anthropic_message_to_sse(&body) {
Ok(sse_body) => {
let event_types = extract_sse_event_types(&sse_body);
info!(
anthropic_event_types = ?event_types,
chunk_bytes = sse_body.len(),
contains_message_stop = event_types
.iter()
.any(|event_type| event_type == "message_stop"),
"← synthesized downstream anthropic SSE"
);
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("text/event-stream"),
);
resp_headers.insert(
HeaderName::from_static("cache-control"),
HeaderValue::from_static("no-cache"),
);
return Ok((status, resp_headers, sse_body).into_response());
}
Err(e) => {
error!(error = %e, "failed to synthesize Anthropic SSE response");
let (status, body) = build_anthropic_error_response(
StatusCode::BAD_GATEWAY,
format!("failed to synthesize Anthropic SSE response: {e}"),
);
return Ok((status, body).into_response());
}
},
Ok(body) => body,
Err(e) => {
error!(error = %e, "failed to transform upstream success response");
let (status, body) = build_anthropic_error_response(
StatusCode::BAD_GATEWAY,
format!("failed to transform upstream response: {e}"),
);
return Ok((status, body).into_response());
}
}
} else {
transform_upstream_error_body_to_anthropic(&body_bytes, status)
};
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/json"),
);
Ok((status, resp_headers, response_body).into_response())
}
#[derive(Debug, Clone, Copy)]
enum OpenAiEndpoint {
ChatCompletions,
Responses,
}
async fn handle_openai_request(
State(config): State<ProxyConfig>,
headers: HeaderMap,
body: Bytes,
) -> axum::response::Result<impl IntoResponse> {
handle_openai_compatible_request(config, headers, body, OpenAiEndpoint::ChatCompletions).await
}
async fn handle_openai_responses_request(
State(config): State<ProxyConfig>,
headers: HeaderMap,
body: Bytes,
) -> axum::response::Result<impl IntoResponse> {
handle_openai_compatible_request(config, headers, body, OpenAiEndpoint::Responses).await
}
#[allow(clippy::too_many_lines)]
async fn handle_openai_compatible_request(
config: ProxyConfig,
headers: HeaderMap,
body: Bytes,
endpoint: OpenAiEndpoint,
) -> axum::response::Result<impl IntoResponse> {
let proxy_request_id = NEXT_PROXY_REQUEST_ID.fetch_add(1, Ordering::Relaxed);
if let Some(status) = check_auth(&config, &headers) {
let (status, body) = build_openai_error_response(status, "invalid API key");
return Ok((status, body).into_response());
}
let active = config.active_upstream().await;
let mut req_headers: HashMap<String, String> = HashMap::new();
for (name, value) in &headers {
if let Ok(val_str) = value.to_str() {
req_headers.insert(name.to_string(), val_str.to_string());
}
}
let req = TransformRequest {
headers: req_headers,
path: match endpoint {
OpenAiEndpoint::ChatCompletions => "/v1/chat/completions",
OpenAiEndpoint::Responses => "/v1/responses",
}
.to_string(),
body,
};
let transformed_result = match endpoint {
OpenAiEndpoint::ChatCompletions => openai_to_anthropic(&req),
OpenAiEndpoint::Responses => responses_to_anthropic(&req),
};
let transformed = match transformed_result {
Ok(t) => t,
Err(e) => {
error!(proxy_request_id, error = %e, "transform failed");
let (status, body) = build_openai_error_response(
StatusCode::BAD_REQUEST,
format!("transform error: {e}"),
);
return Ok((status, body).into_response());
}
};
let request_is_streaming = is_streaming_request(&req.body);
let requested_openai_model = requested_model(&req.body);
let final_headers = build_anthropic_upstream_headers(
&req.headers,
&transformed.headers,
&active.api_key,
request_is_streaming,
);
let transformed_path = if active.url.ends_with("/v1") && transformed.path.starts_with("/v1/") {
transformed
.path
.strip_prefix("/v1")
.unwrap_or(&transformed.path)
} else {
&transformed.path
};
let upstream_url = format!("{}{}", active.url, transformed_path);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(30))
.danger_accept_invalid_certs(true)
.http1_only()
.build()
.expect("build reqwest client");
info!(
proxy_request_id,
upstream_headers = ?redact_headers(&final_headers),
upstream_url,
request_is_streaming,
"→ upstream Anthropic request"
);
let mut upstream_req = client.post(&upstream_url);
for (k, v) in &final_headers {
upstream_req = upstream_req.header(k.clone(), v.clone());
}
upstream_req = upstream_req.body(transformed.body.clone());
let resp = match upstream_req.send().await {
Ok(r) => r,
Err(e) => {
error!(proxy_request_id, error = %e, upstream_url, "upstream request failed");
let status = if e.is_timeout() {
StatusCode::GATEWAY_TIMEOUT
} else {
StatusCode::BAD_GATEWAY
};
let message = if e.is_timeout() {
format!("upstream request timed out: {e}")
} else {
format!("upstream request failed: {e}")
};
let (status, body) = build_openai_error_response(status, message);
return Ok((status, body).into_response());
}
};
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let response_content_type = resp
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(std::string::ToString::to_string);
if status.is_success() && is_event_stream_content_type(response_content_type.as_deref()) {
let transformed_stream = stream::unfold(
(
proxy_request_id,
resp.bytes_stream(),
Vec::new(),
StreamState {
model_name: requested_openai_model.clone(),
..StreamState::default()
},
false,
),
move |(
proxy_request_id,
mut upstream_stream,
mut pending,
mut stream_state,
finished,
)| async move {
if finished {
return None;
}
loop {
match upstream_stream.next().await {
Some(Ok(chunk)) => {
maybe_log_raw_upstream_sse_chunk(
proxy_request_id,
"← upstream raw SSE transport chunk",
&chunk,
pending.len(),
);
pending.extend_from_slice(&chunk);
if pending.len() > MAX_SSE_PENDING_BYTES {
error!(
proxy_request_id,
pending_len = pending.len(),
"upstream SSE buffer exceeded — terminating stream"
);
return Some((
Ok::<Bytes, Infallible>(Bytes::from(
serde_json::to_string(&json!({
"error": {
"message": "upstream SSE buffer limit exceeded",
"type": "buffer_limit_exceeded",
}
}))
.unwrap_or_default(),
)),
(
proxy_request_id,
upstream_stream,
pending,
stream_state,
true,
),
));
}
let Some(complete_frames) = take_complete_sse_frames(&mut pending)
else {
continue;
};
let transformed_chunk = match endpoint {
OpenAiEndpoint::ChatCompletions => transform_stream_to_openai(
&complete_frames,
ApiFormat::AnthropicMessages,
&mut stream_state,
),
OpenAiEndpoint::Responses => transform_stream_to_openai_responses(
&complete_frames,
ApiFormat::AnthropicMessages,
&mut stream_state,
),
};
match transformed_chunk {
Ok(transformed_bytes) => {
if transformed_bytes.is_empty() {
continue;
}
return Some((
Ok::<Bytes, Infallible>(Bytes::from(transformed_bytes)),
(
proxy_request_id,
upstream_stream,
pending,
stream_state,
false,
),
));
}
Err(e) => {
error!(
proxy_request_id,
error = %e,
"failed to transform upstream Anthropic SSE chunk"
);
return Some((
Ok(match endpoint {
OpenAiEndpoint::ChatCompletions => {
build_openai_stream_error_event(
format!(
"failed to transform upstream Anthropic \
SSE chunk: {e}"
),
"api_error",
)
}
OpenAiEndpoint::Responses => {
build_openai_responses_stream_error_event(
format!(
"failed to transform upstream Anthropic \
SSE chunk: {e}"
),
"api_error",
)
}
}),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
}
}
Some(Err(e)) => {
error!(proxy_request_id, error = %e, "failed to read upstream SSE chunk");
return Some((
Ok(match endpoint {
OpenAiEndpoint::ChatCompletions => {
build_openai_stream_error_event(
format!("failed to read upstream SSE stream: {e}"),
"api_error",
)
}
OpenAiEndpoint::Responses => {
build_openai_responses_stream_error_event(
format!("failed to read upstream SSE stream: {e}"),
"api_error",
)
}
}),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
None => {
if pending.is_empty() {
return None;
}
let transformed_chunk = match endpoint {
OpenAiEndpoint::ChatCompletions => transform_stream_to_openai(
&pending,
ApiFormat::AnthropicMessages,
&mut stream_state,
),
OpenAiEndpoint::Responses => transform_stream_to_openai_responses(
&pending,
ApiFormat::AnthropicMessages,
&mut stream_state,
),
};
match transformed_chunk {
Ok(transformed_bytes) if transformed_bytes.is_empty() => {
return None;
}
Ok(transformed_bytes) => {
return Some((
Ok(Bytes::from(transformed_bytes)),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
Err(e) => {
error!(
proxy_request_id,
error = %e,
"failed to finalize upstream Anthropic SSE stream"
);
return Some((
Ok(match endpoint {
OpenAiEndpoint::ChatCompletions => {
build_openai_stream_error_event(
format!(
"failed to finalize upstream Anthropic \
SSE stream: {e}"
),
"api_error",
)
}
OpenAiEndpoint::Responses => {
build_openai_responses_stream_error_event(
format!(
"failed to finalize upstream Anthropic \
SSE stream: {e}"
),
"api_error",
)
}
}),
(
proxy_request_id,
upstream_stream,
Vec::new(),
stream_state,
true,
),
));
}
}
}
}
}
},
);
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("text/event-stream"),
);
resp_headers.insert(
HeaderName::from_static("cache-control"),
HeaderValue::from_static("no-cache"),
);
return Ok((status, resp_headers, Body::from_stream(transformed_stream)).into_response());
}
let body_bytes = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
error!(proxy_request_id, error = %e, "failed to read upstream body");
let (status, body) = build_openai_error_response(
StatusCode::BAD_GATEWAY,
format!("failed to read upstream body: {e}"),
);
return Ok((status, body).into_response());
}
};
let response_body = if status.is_success() {
let transformed_body = match endpoint {
OpenAiEndpoint::ChatCompletions => {
transform_anthropic_response_to_openai_completion(&body_bytes)
}
OpenAiEndpoint::Responses => {
transform_anthropic_response_to_openai_responses(&body_bytes)
}
};
match transformed_body {
Ok(body) if request_is_streaming => {
let sse_result = match endpoint {
OpenAiEndpoint::ChatCompletions => transform_openai_completion_to_sse(&body),
OpenAiEndpoint::Responses => transform_openai_responses_to_sse(&body),
};
match sse_result {
Ok(sse_body) => {
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("text/event-stream"),
);
resp_headers.insert(
HeaderName::from_static("cache-control"),
HeaderValue::from_static("no-cache"),
);
return Ok((status, resp_headers, sse_body).into_response());
}
Err(e) => {
error!(error = %e, "failed to synthesize OpenAI SSE response");
let (status, body) = build_openai_error_response(
StatusCode::BAD_GATEWAY,
format!("failed to synthesize OpenAI SSE response: {e}"),
);
return Ok((status, body).into_response());
}
}
}
Ok(body) => body,
Err(e) => {
error!(error = %e, "failed to transform upstream Anthropic success response");
let (status, body) = build_openai_error_response(
StatusCode::BAD_GATEWAY,
format!("failed to transform upstream response: {e}"),
);
return Ok((status, body).into_response());
}
}
} else {
transform_upstream_error_body_to_openai(&body_bytes, status)
};
let mut resp_headers = HeaderMap::new();
resp_headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/json"),
);
Ok((status, resp_headers, response_body).into_response())
}
async fn handle_health() -> impl IntoResponse {
(StatusCode::OK, Json(json!({ "status": "ok" }))).into_response()
}
fn create_router(config: ProxyConfig) -> Router {
Router::new()
.route("/v1/messages", post(handle_anthropic_request))
.route("/v1/chat/completions", post(handle_openai_request))
.route("/v1/responses", post(handle_openai_responses_request))
.route("/health", get(handle_health))
.layer(RequestBodyLimitLayer::new(16 * 1024 * 1024)) .with_state(config)
}
async fn health_check_loop(router: SharedRouter, interval: Duration) {
let client = reqwest::Client::new();
loop {
tokio::time::sleep(interval).await;
let primary_url = {
let guard = router.lock().await;
guard.primary.url.clone()
};
match tokio::time::timeout(Duration::from_secs(10), client.get(&primary_url).send()).await {
Ok(Ok(resp)) if resp.status().is_success() || resp.status() == StatusCode::OK => {
let mut guard = router.lock().await;
guard.mark_primary_healthy();
}
Ok(Ok(resp)) => {
let status = resp.status();
if status.is_client_error() || status.is_server_error() {
let mut guard = router.lock().await;
guard.mark_primary_healthy();
}
}
_ => {
warn!(%primary_url, "primary health check failed — unreachable");
let mut guard = router.lock().await;
guard.mark_primary_unhealthy();
}
}
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_target(false)
.with_level(true)
.init();
let config = ProxyConfig::from_env();
let listen = env::var("PROXY_LISTEN").unwrap_or_else(|_| "127.0.0.1:3000".to_string());
if let Some(ref router) = config.router {
tokio::spawn(health_check_loop(router.clone(), Duration::from_secs(60)));
}
let app = create_router(config);
let listener = match tokio::net::TcpListener::bind(&listen).await {
Ok(l) => l,
Err(e) => {
error!(error = %e, listen, "failed to bind");
panic!("failed to bind: {e}");
}
};
info!(listen, "proxy listening");
axum::serve(listener, app).await.expect("serve failed");
}
#[cfg(test)]
mod tests {
use std::{
collections::HashMap,
convert::Infallible,
sync::mpsc::{self, Receiver, Sender},
time::Duration,
};
use axum::{
Json, Router,
body::Body,
extract::State,
http::{HeaderMap, HeaderValue, StatusCode},
response::IntoResponse,
routing::post,
};
use bytes::{Buf, Bytes};
use futures::stream;
use http_body_util::BodyExt;
use reqwest::header::CONTENT_TYPE;
use serde_json::json;
use super::{
ActiveRoute, MAX_LOGGED_UPSTREAM_ERROR_BODY_BYTES, ProxyConfig, REDACTED_HEADER_VALUE,
SYNTHETIC_THINKING_SIGNATURE, UpstreamRouter, UpstreamTarget,
anthropic_request_has_thinking, build_anthropic_error_response,
build_anthropic_upstream_headers, build_openai_error_response, build_upstream_headers,
format_upstream_error_body_for_log, handle_anthropic_passthrough, handle_anthropic_request,
handle_openai_request, handle_openai_responses_request, is_anthropic_upstream,
is_dashscope_upstream, map_http_status_to_anthropic_error_type,
maybe_disable_dashscope_thinking, redact_headers, transform_anthropic_message_to_sse,
transform_anthropic_response_to_openai_completion,
transform_anthropic_response_to_openai_responses, transform_openai_completion_to_sse,
transform_openai_response_to_anthropic_message, transform_openai_responses_to_sse,
transform_upstream_error_body_to_anthropic, transform_upstream_error_body_to_openai,
};
#[derive(Debug)]
struct CapturedRequest {
headers: HashMap<String, String>,
body: Bytes,
}
fn mock_openai_text_response() -> serde_json::Value {
json!({
"id": "chatcmpl-proxy-test",
"object": "chat.completion",
"model": "qwen3.6-plus",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Hello from upstream"
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 10,
"completion_tokens": 4,
"total_tokens": 14
}
})
}
fn mock_anthropic_text_response() -> serde_json::Value {
json!({
"id": "msg-proxy-test",
"type": "message",
"role": "assistant",
"model": "qwen-plus-anthropic",
"content": [{
"type": "text",
"text": "Hello from Anthropic upstream"
}],
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 10,
"output_tokens": 4
}
})
}
async fn mock_upstream_handler(
State(sender): State<Sender<CapturedRequest>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let captured_headers = headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|header_value| (name.to_string(), header_value.to_string()))
})
.collect::<HashMap<_, _>>();
let _ = sender.send(CapturedRequest {
headers: captured_headers,
body,
});
(StatusCode::OK, Json(mock_openai_text_response()))
}
async fn mock_anthropic_upstream_handler(
State(sender): State<Sender<CapturedRequest>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let captured_headers = headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|header_value| (name.to_string(), header_value.to_string()))
})
.collect::<HashMap<_, _>>();
let _ = sender.send(CapturedRequest {
headers: captured_headers,
body,
});
(StatusCode::OK, Json(mock_anthropic_text_response()))
}
async fn mock_json_for_streaming_upstream_handler(
State(sender): State<Sender<CapturedRequest>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let captured_headers = headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|header_value| (name.to_string(), header_value.to_string()))
})
.collect::<HashMap<_, _>>();
let _ = sender.send(CapturedRequest {
headers: captured_headers,
body,
});
(StatusCode::OK, Json(mock_openai_text_response()))
}
async fn mock_streaming_upstream_handler(
State(sender): State<Sender<CapturedRequest>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let captured_headers = headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|header_value| (name.to_string(), header_value.to_string()))
})
.collect::<HashMap<_, _>>();
let _ = sender.send(CapturedRequest {
headers: captured_headers,
body,
});
let raw_openai_sse = vec![
Ok::<Bytes, Infallible>(Bytes::from_static(
b"data: {\"choices\":[{\"delta\":{\"role\":\"assistant\",\"content\":\"Hel\"}}]}\n",
)),
Ok(Bytes::from_static(b"\n")),
Ok(Bytes::from_static(
b"data: {\"choices\":[{\"delta\":{\"content\":\"lo\"},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":2}}\n\ndata: [DONE]\n\n",
)),
];
let mut response_headers = HeaderMap::new();
response_headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
(
StatusCode::OK,
response_headers,
Body::from_stream(stream::iter(raw_openai_sse)),
)
}
async fn mock_anthropic_json_for_streaming_upstream_handler(
State(sender): State<Sender<CapturedRequest>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let captured_headers = headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|header_value| (name.to_string(), header_value.to_string()))
})
.collect::<HashMap<_, _>>();
let _ = sender.send(CapturedRequest {
headers: captured_headers,
body,
});
(StatusCode::OK, Json(mock_anthropic_text_response()))
}
async fn mock_anthropic_streaming_upstream_handler(
State(sender): State<Sender<CapturedRequest>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let captured_headers = headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|header_value| (name.to_string(), header_value.to_string()))
})
.collect::<HashMap<_, _>>();
let _ = sender.send(CapturedRequest {
headers: captured_headers,
body,
});
let raw_anthropic_sse = vec![
Ok::<Bytes, Infallible>(Bytes::from_static(
b"event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg-stream\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[],\"model\":\"qwen-plus-anthropic\",\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":10,\"output_tokens\":0}}}\n\n",
)),
Ok(Bytes::from_static(
b"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
)),
Ok(Bytes::from_static(
b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hel\"}}\n\n",
)),
Ok(Bytes::from_static(
b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"lo\"}}\n\n",
)),
Ok(Bytes::from_static(
b"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n",
)),
Ok(Bytes::from_static(
b"event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":2}}\n\n",
)),
Ok(Bytes::from_static(
b"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
)),
];
let mut response_headers = HeaderMap::new();
response_headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
(
StatusCode::OK,
response_headers,
Body::from_stream(stream::iter(raw_anthropic_sse)),
)
}
async fn spawn_mock_upstream() -> (
String,
Receiver<CapturedRequest>,
tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = mpsc::channel();
let app = Router::new()
.route("/v1/chat/completions", post(mock_upstream_handler))
.with_state(sender);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("mock upstream should bind");
let addr = listener
.local_addr()
.expect("mock upstream should have local addr");
let join_handle = tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("mock upstream should serve successfully");
});
(format!("http://{addr}"), receiver, join_handle)
}
async fn spawn_mock_anthropic_upstream() -> (
String,
Receiver<CapturedRequest>,
tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = mpsc::channel();
let app = Router::new()
.route("/v1/messages", post(mock_anthropic_upstream_handler))
.with_state(sender);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("mock anthropic upstream should bind");
let addr = listener
.local_addr()
.expect("mock anthropic upstream should have local addr");
let join_handle = tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("mock anthropic upstream should serve successfully");
});
(format!("http://{addr}"), receiver, join_handle)
}
async fn spawn_mock_streaming_upstream() -> (
String,
Receiver<CapturedRequest>,
tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = mpsc::channel();
let app = Router::new()
.route(
"/v1/chat/completions",
post(mock_streaming_upstream_handler),
)
.with_state(sender);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("mock streaming upstream should bind");
let addr = listener
.local_addr()
.expect("mock streaming upstream should have local addr");
let join_handle = tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("mock streaming upstream should serve successfully");
});
(format!("http://{addr}"), receiver, join_handle)
}
async fn spawn_mock_json_for_streaming_upstream() -> (
String,
Receiver<CapturedRequest>,
tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = mpsc::channel();
let app = Router::new()
.route(
"/v1/chat/completions",
post(mock_json_for_streaming_upstream_handler),
)
.with_state(sender);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("mock json-for-streaming upstream should bind");
let addr = listener
.local_addr()
.expect("mock json-for-streaming upstream should have local addr");
let join_handle = tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("mock json-for-streaming upstream should serve successfully");
});
(format!("http://{addr}"), receiver, join_handle)
}
async fn spawn_mock_anthropic_streaming_upstream() -> (
String,
Receiver<CapturedRequest>,
tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = mpsc::channel();
let app = Router::new()
.route(
"/v1/messages",
post(mock_anthropic_streaming_upstream_handler),
)
.with_state(sender);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("mock anthropic streaming upstream should bind");
let addr = listener
.local_addr()
.expect("mock anthropic streaming upstream should have local addr");
let join_handle = tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("mock anthropic streaming upstream should serve successfully");
});
(format!("http://{addr}"), receiver, join_handle)
}
async fn spawn_mock_anthropic_json_for_streaming_upstream() -> (
String,
Receiver<CapturedRequest>,
tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = mpsc::channel();
let app = Router::new()
.route(
"/v1/messages",
post(mock_anthropic_json_for_streaming_upstream_handler),
)
.with_state(sender);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("mock anthropic json-for-streaming upstream should bind");
let addr = listener
.local_addr()
.expect("mock anthropic json-for-streaming upstream should have local addr");
let join_handle = tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("mock anthropic json-for-streaming upstream should serve successfully");
});
(format!("http://{addr}"), receiver, join_handle)
}
#[test]
fn test_should_not_forward_stale_content_length_or_client_auth() {
let client_headers = HashMap::from([
("content-length".to_string(), "999".to_string()),
("content-type".to_string(), "application/json".to_string()),
("accept-encoding".to_string(), "gzip, br".to_string()),
("authorization".to_string(), "Bearer client-key".to_string()),
("x-api-key".to_string(), "proxy-key".to_string()),
("user-agent".to_string(), "curl/8.7.1".to_string()),
]);
let transformed_headers = HashMap::from([
(
"authorization".to_string(),
"Bearer transformed-key".to_string(),
),
("content-type".to_string(), "application/json".to_string()),
]);
let upstream_headers =
build_upstream_headers(&client_headers, &transformed_headers, "upstream-key", false);
assert_eq!(upstream_headers.get("content-length"), None);
assert_eq!(upstream_headers.get("accept-encoding"), None);
assert_eq!(upstream_headers.get("authorization"), None);
assert_eq!(upstream_headers.get("x-api-key"), None);
assert_eq!(
upstream_headers.get("Authorization"),
Some(&"Bearer upstream-key".to_string())
);
assert_eq!(
upstream_headers.get("content-type"),
Some(&"application/json".to_string())
);
assert_eq!(
upstream_headers.get("user-agent"),
Some(&"curl/8.7.1".to_string())
);
}
#[test]
fn test_should_force_event_stream_accept_header_for_streaming_requests() {
let client_headers = HashMap::from([
("accept".to_string(), "application/json".to_string()),
("content-type".to_string(), "application/json".to_string()),
]);
let transformed_headers =
HashMap::from([("content-type".to_string(), "application/json".to_string())]);
let upstream_headers =
build_upstream_headers(&client_headers, &transformed_headers, "upstream-key", true);
assert_eq!(
upstream_headers.get("accept"),
Some(&"text/event-stream".to_string())
);
assert_eq!(
upstream_headers.get("Authorization"),
Some(&"Bearer upstream-key".to_string())
);
}
#[test]
fn test_should_redact_sensitive_headers_in_logs() {
let headers = HashMap::from([
(
"authorization".to_string(),
"Bearer secret-value".to_string(),
),
("x-api-key".to_string(), "proxy-secret".to_string()),
("content-type".to_string(), "application/json".to_string()),
]);
let redacted = redact_headers(&headers);
assert_eq!(
redacted.get("authorization"),
Some(&REDACTED_HEADER_VALUE.to_string())
);
assert_eq!(
redacted.get("x-api-key"),
Some(&REDACTED_HEADER_VALUE.to_string())
);
assert_eq!(
redacted.get("content-type"),
Some(&"application/json".to_string())
);
}
#[test]
fn test_should_truncate_logged_upstream_error_body() {
let body = Bytes::from(vec![b'a'; MAX_LOGGED_UPSTREAM_ERROR_BODY_BYTES + 5]);
let logged = format_upstream_error_body_for_log(&body);
assert!(logged.starts_with(&"a".repeat(MAX_LOGGED_UPSTREAM_ERROR_BODY_BYTES)));
assert!(logged.ends_with("… <truncated 5 bytes>"));
}
#[test]
fn test_should_transform_openai_tool_calls_into_anthropic_tool_use_blocks() {
let openai_response = Bytes::from(
serde_json::to_vec(&json!({
"id": "chatcmpl-tool-call",
"object": "chat.completion",
"model": "qwen3.6-plus",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Let me check that.",
"tool_calls": [{
"id": "call_weather_123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\"city\":\"Paris\"}"
}
}]
},
"finish_reason": "tool_calls"
}],
"usage": {
"prompt_tokens": 20,
"completion_tokens": 6,
"total_tokens": 26
}
}))
.expect("mock OpenAI tool-call response should serialize"),
);
let anthropic_response = transform_openai_response_to_anthropic_message(&openai_response)
.expect("OpenAI response should transform into Anthropic response");
let response_json: serde_json::Value = serde_json::from_slice(&anthropic_response)
.expect("Anthropic response should be valid json");
assert_eq!(response_json["type"], "message");
assert_eq!(response_json["role"], "assistant");
assert_eq!(response_json["content"][0]["type"], "text");
assert_eq!(response_json["content"][0]["text"], "Let me check that.");
assert_eq!(response_json["content"][1]["type"], "tool_use");
assert_eq!(response_json["content"][1]["id"], "call_weather_123");
assert_eq!(response_json["content"][1]["name"], "get_weather");
assert_eq!(response_json["content"][1]["input"]["city"], "Paris");
assert_eq!(response_json["stop_reason"], "tool_use");
assert_eq!(response_json["usage"]["input_tokens"], 20);
assert_eq!(response_json["usage"]["output_tokens"], 6);
}
#[test]
fn test_should_transform_openai_reasoning_content_into_anthropic_thinking_blocks() {
let openai_response = Bytes::from(
serde_json::to_vec(&json!({
"id": "chatcmpl-thinking",
"object": "chat.completion",
"model": "qwen3.6-plus",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"reasoning_content": "First I inspect the route.",
"content": "The route is missing."
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 24,
"completion_tokens": 10,
"total_tokens": 34
}
}))
.expect("mock OpenAI thinking response should serialize"),
);
let anthropic_response = transform_openai_response_to_anthropic_message(&openai_response)
.expect("OpenAI response should transform into Anthropic response");
let response_json: serde_json::Value = serde_json::from_slice(&anthropic_response)
.expect("Anthropic response should be valid json");
assert_eq!(response_json["content"][0]["type"], "thinking");
assert_eq!(
response_json["content"][0]["thinking"],
"First I inspect the route."
);
assert_eq!(
response_json["content"][0]["signature"],
SYNTHETIC_THINKING_SIGNATURE
);
assert_eq!(response_json["content"][1]["type"], "text");
assert_eq!(response_json["content"][1]["text"], "The route is missing.");
assert_eq!(response_json["stop_reason"], "end_turn");
}
#[test]
fn test_should_transform_anthropic_thinking_message_to_sse() {
let anthropic_message = Bytes::from(
serde_json::to_vec(&json!({
"id": "msg-thinking",
"type": "message",
"role": "assistant",
"model": "qwen3.6-plus",
"content": [
{
"type": "thinking",
"thinking": "Let me reason this through.",
"signature": SYNTHETIC_THINKING_SIGNATURE
},
{
"type": "text",
"text": "Done."
}
],
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 12,
"output_tokens": 5
}
}))
.expect("Anthropic message should serialize"),
);
let sse = transform_anthropic_message_to_sse(&anthropic_message)
.expect("Anthropic message should convert to SSE");
let sse_text = String::from_utf8(sse.to_vec()).expect("SSE should be valid UTF-8");
assert!(sse_text.contains("\"type\":\"thinking_delta\""));
assert!(sse_text.contains("\"thinking\":\"Let me reason this through.\""));
assert!(sse_text.contains("\"type\":\"signature_delta\""));
assert!(sse_text.contains(SYNTHETIC_THINKING_SIGNATURE));
assert!(sse_text.contains("\"type\":\"text_delta\""));
assert!(sse_text.contains("\"text\":\"Done.\""));
}
#[test]
fn test_should_transform_anthropic_response_into_openai_completion() {
let anthropic_response = Bytes::from(
serde_json::to_vec(&json!({
"id": "msg-openai",
"type": "message",
"role": "assistant",
"model": "qwen-plus-anthropic",
"content": [
{
"type": "thinking",
"thinking": "Let me inspect the route.",
"signature": SYNTHETIC_THINKING_SIGNATURE
},
{
"type": "text",
"text": "The route exists."
},
{
"type": "tool_use",
"id": "toolu_123",
"name": "codegraph_search",
"input": { "query": "sso google login" }
}
],
"stop_reason": "tool_use",
"stop_sequence": null,
"usage": {
"input_tokens": 12,
"output_tokens": 5
}
}))
.expect("Anthropic response should serialize"),
);
let openai_response =
transform_anthropic_response_to_openai_completion(&anthropic_response)
.expect("Anthropic response should transform into OpenAI completion");
let response_json: serde_json::Value = serde_json::from_slice(&openai_response)
.expect("OpenAI completion should be valid json");
assert_eq!(
response_json["choices"][0]["message"]["reasoning_content"],
"Let me inspect the route."
);
assert_eq!(
response_json["choices"][0]["message"]["content"],
"The route exists."
);
assert_eq!(
response_json["choices"][0]["message"]["tool_calls"][0]["id"],
"toolu_123"
);
assert_eq!(response_json["choices"][0]["finish_reason"], "tool_calls");
assert_eq!(response_json["usage"]["total_tokens"], 17);
}
#[test]
fn test_should_synthesize_openai_sse_from_openai_completion() {
let openai_response = Bytes::from(
serde_json::to_vec(&json!({
"id": "chatcmpl-stream",
"object": "chat.completion",
"model": "qwen3.6-plus",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"reasoning_content": "Let me think.",
"content": "Done.",
"tool_calls": [{
"id": "toolu_123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\"city\":\"Paris\"}"
}
}]
},
"finish_reason": "tool_calls"
}],
"usage": {
"prompt_tokens": 12,
"completion_tokens": 5,
"total_tokens": 17
}
}))
.expect("OpenAI response should serialize"),
);
let sse = transform_openai_completion_to_sse(&openai_response)
.expect("OpenAI completion should convert to SSE");
let sse_text = String::from_utf8(sse.to_vec()).expect("SSE should be valid UTF-8");
assert!(sse_text.contains("\"role\":\"assistant\""));
assert!(sse_text.contains("\"reasoning_content\":\"Let me think.\""));
assert!(sse_text.contains("\"content\":\"Done.\""));
assert!(sse_text.contains("\"tool_calls\""));
assert!(sse_text.contains("\"finish_reason\":\"tool_calls\""));
assert!(sse_text.contains("data: [DONE]"));
}
#[test]
fn test_should_transform_anthropic_response_into_openai_responses_body() {
let anthropic_response = Bytes::from(
serde_json::to_vec(&json!({
"id": "msg-resp-proxy-test",
"type": "message",
"role": "assistant",
"model": "qwen3.6-plus",
"content": [
{
"type": "thinking",
"thinking": "Need to inspect the project tree.",
"signature": SYNTHETIC_THINKING_SIGNATURE
},
{
"type": "text",
"text": "The route exists."
},
{
"type": "tool_use",
"id": "toolu_123",
"name": "find_route",
"input": {"path": "/v1/responses"}
}
],
"stop_reason": "tool_use",
"usage": {
"input_tokens": 12,
"output_tokens": 5
}
}))
.expect("Anthropic response should serialize"),
);
let responses_body = transform_anthropic_response_to_openai_responses(&anthropic_response)
.expect("Anthropic response should transform into Responses body");
let response_json: serde_json::Value =
serde_json::from_slice(&responses_body).expect("Responses body should be valid json");
assert_eq!(response_json["object"], "response");
assert_eq!(response_json["output_text"], "The route exists.");
assert_eq!(
response_json["output"][0]["content"][0]["type"],
"reasoning_text"
);
assert_eq!(
response_json["output"][1]["content"][0]["type"],
"output_text"
);
assert_eq!(response_json["output"][2]["type"], "function_call");
}
#[test]
fn test_should_synthesize_openai_responses_sse_from_openai_responses_body() {
let responses_body = Bytes::from(
serde_json::to_vec(&json!({
"id": "resp-stream",
"object": "response",
"created_at": 1,
"status": "completed",
"model": "qwen3.6-plus",
"output": [
{
"id": "resp-stream_item_0",
"type": "message",
"role": "assistant",
"status": "completed",
"content": [{
"type": "reasoning_text",
"text": "Need to inspect the code path."
}]
},
{
"id": "resp-stream_item_1",
"type": "message",
"role": "assistant",
"status": "completed",
"content": [{
"type": "output_text",
"text": "Done.",
"annotations": []
}]
},
{
"id": "fc_resp-stream_2",
"type": "function_call",
"call_id": "toolu_123",
"name": "find_route",
"arguments": "{\"path\":\"/v1/responses\"}",
"status": "completed"
}
],
"output_text": "Done.",
"usage": {
"input_tokens": 12,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 5,
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 17
}
}))
.expect("Responses body should serialize"),
);
let sse = transform_openai_responses_to_sse(&responses_body)
.expect("Responses body should convert to SSE");
let sse_text = String::from_utf8(sse.to_vec()).expect("SSE should be valid UTF-8");
assert!(sse_text.contains("\"type\":\"response.created\""));
assert!(sse_text.contains("\"type\":\"response.reasoning_text.delta\""));
assert!(sse_text.contains("\"type\":\"response.output_text.delta\""));
assert!(sse_text.contains("\"type\":\"response.function_call_arguments.delta\""));
assert!(sse_text.contains("\"type\":\"response.completed\""));
assert!(sse_text.contains("data: [DONE]"));
}
#[tokio::test]
async fn test_should_rebuild_upstream_request_when_client_content_length_is_stale() {
let (upstream_url, receiver, join_handle) = spawn_mock_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "claude-sonnet-4-20250514",
"max_tokens": 32,
"messages": [{
"role": "user",
"content": [{"type": "text", "text": "Hello from the proxy test"}]
}]
}))
.expect("request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert("content-length", HeaderValue::from_static("999"));
headers.insert(
"accept-encoding",
HeaderValue::from_static("gzip, deflate, br, zstd"),
);
headers.insert("x-api-key", HeaderValue::from_static("client-proxy-key"));
headers.insert("user-agent", HeaderValue::from_static("test-client"));
let response = handle_anthropic_request(State(config), headers, body)
.await
.expect("proxy handler should return a response")
.into_response();
let status = response.status();
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock upstream should receive a request");
join_handle.abort();
let _ = join_handle.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
serde_json::from_slice::<serde_json::Value>(&response_bytes)
.expect("proxy response should be valid json"),
json!({
"id": "chatcmpl-proxy-test",
"type": "message",
"role": "assistant",
"model": "qwen3.6-plus",
"content": [{
"type": "text",
"text": "Hello from upstream"
}],
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 10,
"output_tokens": 4
}
})
);
assert_ne!(
upstream_request.headers.get("content-length"),
Some(&"999".to_string())
);
assert_eq!(upstream_request.headers.get("accept-encoding"), None);
assert_eq!(
upstream_request.headers.get("content-length"),
Some(&upstream_request.body.len().to_string())
);
assert_eq!(
upstream_request.headers.get("authorization"),
Some(&"Bearer upstream-secret".to_string())
);
assert_eq!(
upstream_request.headers.get("user-agent"),
Some(&"test-client".to_string())
);
}
#[test]
fn test_should_detect_dashscope_upstream() {
assert!(is_dashscope_upstream(
"https://coding.dashscope.aliyuncs.com/v1/chat/completions"
));
assert!(!is_dashscope_upstream(
"http://127.0.0.1:18080/v1/chat/completions"
));
}
#[test]
fn test_should_detect_anthropic_upstream() {
assert!(is_anthropic_upstream("https://api.anthropic.com/v1"));
assert!(is_anthropic_upstream(
"https://api.anthropic.com/v1/messages"
));
assert!(!is_anthropic_upstream(
"https://coding.dashscope.aliyuncs.com/v1"
));
assert!(!is_anthropic_upstream("http://127.0.0.1:8080/v1"));
}
#[test]
fn test_should_disable_dashscope_thinking_by_default() {
let openai_body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"stream": true,
"messages": [{"role": "user", "content": "Hello"}]
}))
.expect("OpenAI request body should serialize"),
);
let result = maybe_disable_dashscope_thinking(
"https://coding.dashscope.aliyuncs.com/v1",
&openai_body,
);
let result_json: serde_json::Value =
serde_json::from_slice(&result).expect("result body should remain valid json");
assert_eq!(
result_json["enable_thinking"],
serde_json::Value::Bool(false)
);
}
#[test]
fn test_should_preserve_explicit_dashscope_thinking_setting() {
let anthropic_body = Bytes::from(
serde_json::to_vec(&json!({
"model": "claude-sonnet-4-20250514",
"thinking": {"type": "enabled", "budget_tokens": 1024},
"messages": [{
"role": "user",
"content": [{"type": "text", "text": "Think carefully"}]
}]
}))
.expect("Anthropic request body should serialize"),
);
let openai_body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"enable_thinking": true,
"messages": [{"role": "user", "content": "Think carefully"}]
}))
.expect("OpenAI request body should serialize"),
);
assert!(anthropic_request_has_thinking(&anthropic_body));
let result = maybe_disable_dashscope_thinking(
"https://coding.dashscope.aliyuncs.com/v1",
&openai_body,
);
let result_json: serde_json::Value =
serde_json::from_slice(&result).expect("result body should remain valid json");
assert_eq!(
result_json["enable_thinking"],
serde_json::Value::Bool(true)
);
}
#[tokio::test]
async fn test_should_transform_openai_sse_into_anthropic_event_stream() {
let (upstream_url, receiver, join_handle) = spawn_mock_streaming_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "claude-sonnet-4-20250514",
"max_tokens": 32,
"stream": true,
"messages": [{
"role": "user",
"content": [{"type": "text", "text": "Stream hello"}]
}]
}))
.expect("streaming request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert("x-api-key", HeaderValue::from_static("client-proxy-key"));
headers.insert("user-agent", HeaderValue::from_static("test-client"));
let response = handle_anthropic_request(State(config), headers, body)
.await
.expect("proxy handler should return a streaming response")
.into_response();
let status = response.status();
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy streaming response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock upstream should receive a streaming request");
join_handle.abort();
let _ = join_handle.await;
let response_text = String::from_utf8(response_bytes.to_vec())
.expect("Anthropic SSE response should be valid utf-8");
assert_eq!(status, StatusCode::OK);
assert_eq!(content_type.as_deref(), Some("text/event-stream"));
assert!(response_text.contains("event: message_start"));
assert!(response_text.contains("event: content_block_start"));
assert!(response_text.contains("event: content_block_delta"));
assert!(response_text.contains("\"text\":\"Hel\""));
assert!(response_text.contains("\"text\":\"lo\""));
assert!(response_text.contains("\"model\":\"claude-sonnet-4-20250514\""));
assert!(response_text.contains("event: message_delta"));
assert!(response_text.contains("\"stop_reason\":\"end_turn\""));
assert!(response_text.contains("\"usage\":{\"output_tokens\":"));
assert!(response_text.contains("event: message_stop"));
assert_eq!(
upstream_request.headers.get("authorization"),
Some(&"Bearer upstream-secret".to_string())
);
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(upstream_body["stream"], serde_json::Value::Bool(true));
}
#[tokio::test]
async fn test_should_synthesize_sse_when_streaming_request_gets_json_response() {
let (upstream_url, receiver, join_handle) = spawn_mock_json_for_streaming_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "claude-sonnet-4-20250514",
"max_tokens": 32,
"stream": true,
"messages": [{
"role": "user",
"content": [{"type": "text", "text": "Stream hello"}]
}]
}))
.expect("streaming request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert("x-api-key", HeaderValue::from_static("client-proxy-key"));
headers.insert("user-agent", HeaderValue::from_static("test-client"));
let response = handle_anthropic_request(State(config), headers, body)
.await
.expect("proxy handler should return a synthesized streaming response")
.into_response();
let status = response.status();
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy synthesized streaming response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock upstream should receive a streaming request");
join_handle.abort();
let _ = join_handle.await;
let response_text = String::from_utf8(response_bytes.to_vec())
.expect("Anthropic SSE response should be valid utf-8");
assert_eq!(status, StatusCode::OK);
assert_eq!(content_type.as_deref(), Some("text/event-stream"));
assert!(response_text.contains("event: message_start"));
assert!(response_text.contains("event: content_block_start"));
assert!(response_text.contains("event: content_block_delta"));
assert!(response_text.contains("Hello from upstream"));
assert!(response_text.contains("event: message_delta"));
assert!(response_text.contains("\"stop_reason\":\"end_turn\""));
assert!(response_text.contains("event: message_stop"));
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(upstream_body["stream"], serde_json::Value::Bool(true));
}
#[tokio::test]
async fn test_should_proxy_openai_request_to_anthropic_upstream() {
let (upstream_url, receiver, join_handle) = spawn_mock_anthropic_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"messages": [{
"role": "user",
"content": "Hello from OpenAI client"
}]
}))
.expect("OpenAI request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert(
"authorization",
HeaderValue::from_static("Bearer client-openai-key"),
);
let response = handle_openai_request(State(config), headers, body)
.await
.expect("proxy handler should return a response")
.into_response();
let status = response.status();
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock anthropic upstream should receive a request");
join_handle.abort();
let _ = join_handle.await;
let response_json: serde_json::Value =
serde_json::from_slice(&response_bytes).expect("proxy response should be valid json");
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(status, StatusCode::OK);
assert_eq!(response_json["object"], "chat.completion");
assert_eq!(
response_json["choices"][0]["message"]["content"],
"Hello from Anthropic upstream"
);
assert_eq!(response_json["choices"][0]["finish_reason"], "stop");
assert_eq!(
upstream_request.headers.get("x-api-key"),
Some(&"upstream-secret".to_string())
);
assert_eq!(upstream_request.headers.get("authorization"), None);
assert_eq!(upstream_body["messages"][0]["role"], "user");
assert_eq!(
upstream_body["messages"][0]["content"][0]["text"],
"Hello from OpenAI client"
);
}
#[tokio::test]
async fn test_should_transform_anthropic_sse_into_openai_event_stream() {
let (upstream_url, receiver, join_handle) = spawn_mock_anthropic_streaming_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"stream": true,
"messages": [{
"role": "user",
"content": "Stream hello"
}]
}))
.expect("streaming OpenAI request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert(
"authorization",
HeaderValue::from_static("Bearer client-openai-key"),
);
let response = handle_openai_request(State(config), headers, body)
.await
.expect("proxy handler should return a streaming response")
.into_response();
let status = response.status();
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy streaming response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock anthropic upstream should receive a request");
join_handle.abort();
let _ = join_handle.await;
let response_text = String::from_utf8(response_bytes.to_vec())
.expect("OpenAI SSE response should be valid utf-8");
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(status, StatusCode::OK);
assert_eq!(content_type.as_deref(), Some("text/event-stream"));
assert!(response_text.contains("\"role\":\"assistant\""));
assert!(response_text.contains("\"content\":\"Hel\""));
assert!(response_text.contains("\"content\":\"lo\""));
assert!(response_text.contains("\"finish_reason\":\"stop\""));
assert!(response_text.contains("data: [DONE]"));
assert_eq!(
upstream_request.headers.get("x-api-key"),
Some(&"upstream-secret".to_string())
);
assert_eq!(upstream_body["stream"], serde_json::Value::Bool(true));
}
#[tokio::test]
async fn test_should_synthesize_openai_sse_when_streaming_request_gets_anthropic_json_response()
{
let (upstream_url, receiver, join_handle) =
spawn_mock_anthropic_json_for_streaming_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"stream": true,
"messages": [{
"role": "user",
"content": "Stream hello"
}]
}))
.expect("streaming OpenAI request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert(
"authorization",
HeaderValue::from_static("Bearer client-openai-key"),
);
let response = handle_openai_request(State(config), headers, body)
.await
.expect("proxy handler should return a synthesized streaming response")
.into_response();
let status = response.status();
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy synthesized streaming response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock anthropic upstream should receive a request");
join_handle.abort();
let _ = join_handle.await;
let response_text = String::from_utf8(response_bytes.to_vec())
.expect("OpenAI SSE response should be valid utf-8");
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(status, StatusCode::OK);
assert_eq!(content_type.as_deref(), Some("text/event-stream"));
assert!(response_text.contains("\"role\":\"assistant\""));
assert!(response_text.contains("Hello from Anthropic upstream"));
assert!(response_text.contains("\"finish_reason\":\"stop\""));
assert!(response_text.contains("data: [DONE]"));
assert_eq!(upstream_body["stream"], serde_json::Value::Bool(true));
}
#[tokio::test]
async fn test_should_transform_anthropic_response_into_openai_responses_proxy_response() {
let (upstream_url, receiver, join_handle) = spawn_mock_anthropic_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"instructions": "You are helpful.",
"input": "Hello from OpenAI Responses client"
}))
.expect("request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert(
"authorization",
HeaderValue::from_static("Bearer client-openai-key"),
);
let response = handle_openai_responses_request(State(config), headers, body)
.await
.expect("proxy handler should return a response")
.into_response();
let status = response.status();
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock anthropic upstream should receive a request");
join_handle.abort();
let _ = join_handle.await;
let response_json: serde_json::Value =
serde_json::from_slice(&response_bytes).expect("proxy response should be valid json");
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(status, StatusCode::OK);
assert_eq!(response_json["object"], "response");
assert_eq!(
response_json["output_text"],
"Hello from Anthropic upstream"
);
assert_eq!(
response_json["output"][0]["content"][0]["type"],
"output_text"
);
assert_eq!(upstream_body["system"], "You are helpful.");
assert_eq!(
upstream_body["messages"][0]["content"][0]["text"],
"Hello from OpenAI Responses client"
);
}
#[tokio::test]
async fn test_should_transform_anthropic_sse_into_openai_responses_event_stream() {
let (upstream_url, receiver, join_handle) = spawn_mock_anthropic_streaming_upstream().await;
let config = ProxyConfig {
upstream_url,
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let body = Bytes::from(
serde_json::to_vec(&json!({
"model": "qwen3.6-plus",
"stream": true,
"input": "Stream hello"
}))
.expect("streaming Responses request body should serialize"),
);
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert(
"authorization",
HeaderValue::from_static("Bearer client-openai-key"),
);
let response = handle_openai_responses_request(State(config), headers, body)
.await
.expect("proxy handler should return a streaming response")
.into_response();
let status = response.status();
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let response_bytes = response
.into_body()
.collect()
.await
.expect("proxy streaming response body should be readable")
.to_bytes();
let upstream_request = receiver
.recv_timeout(Duration::from_secs(1))
.expect("mock anthropic upstream should receive a request");
join_handle.abort();
let _ = join_handle.await;
let response_text = String::from_utf8(response_bytes.to_vec())
.expect("Responses SSE response should be valid utf-8");
let upstream_body: serde_json::Value = serde_json::from_slice(&upstream_request.body)
.expect("upstream request body should be valid json");
assert_eq!(status, StatusCode::OK);
assert_eq!(content_type.as_deref(), Some("text/event-stream"));
assert!(response_text.contains("\"type\":\"response.created\""));
assert!(response_text.contains("\"type\":\"response.output_text.delta\""));
assert!(response_text.contains("\"type\":\"response.completed\""));
assert!(response_text.contains("data: [DONE]"));
assert_eq!(upstream_body["stream"], serde_json::Value::Bool(true));
}
#[test]
fn test_should_map_http_status_to_anthropic_error_type() {
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::BAD_REQUEST),
"invalid_request_error"
);
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::UNAUTHORIZED),
"authentication_error"
);
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::GATEWAY_TIMEOUT),
"timeout_error"
);
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::TOO_MANY_REQUESTS),
"rate_limit_error"
);
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::INTERNAL_SERVER_ERROR),
"api_error"
);
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::BAD_GATEWAY),
"api_error"
);
assert_eq!(
map_http_status_to_anthropic_error_type(StatusCode::SERVICE_UNAVAILABLE),
"api_error"
);
}
#[test]
fn test_should_build_anthropic_error_response() {
let (status, Json(body)) =
build_anthropic_error_response(StatusCode::UNAUTHORIZED, "invalid API key");
assert_eq!(status, StatusCode::UNAUTHORIZED);
assert_eq!(body["type"], "error");
assert_eq!(body["error"]["type"], "authentication_error");
assert_eq!(body["error"]["message"], "invalid API key");
}
#[test]
fn test_should_build_openai_error_response() {
let (status, Json(body)) =
build_openai_error_response(StatusCode::UNAUTHORIZED, "invalid API key");
assert_eq!(status, StatusCode::UNAUTHORIZED);
assert_eq!(body["error"]["type"], "authentication_error");
assert_eq!(body["error"]["message"], "invalid API key");
assert_eq!(body["error"]["code"], serde_json::Value::Null);
}
#[test]
fn test_should_transform_openai_error_body_to_anthropic() {
let openai_error = Bytes::from(
serde_json::to_vec(&json!({
"error": {
"code": "invalid_parameter_error",
"message": "The content field is a required field.",
"type": "invalid_request_error"
}
}))
.unwrap(),
);
let result =
transform_upstream_error_body_to_anthropic(&openai_error, StatusCode::BAD_REQUEST);
let result_json: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(result_json["type"], "error");
assert_eq!(result_json["error"]["type"], "invalid_request_error");
assert_eq!(
result_json["error"]["message"],
"The content field is a required field."
);
}
#[test]
fn test_should_transform_non_json_error_body_to_anthropic() {
let plain_text_error = Bytes::from("something went wrong");
let result =
transform_upstream_error_body_to_anthropic(&plain_text_error, StatusCode::BAD_GATEWAY);
let result_json: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(result_json["type"], "error");
assert_eq!(result_json["error"]["type"], "api_error");
assert_eq!(result_json["error"]["message"], "something went wrong");
}
#[test]
fn test_should_transform_anthropic_error_body_to_openai() {
let anthropic_error = Bytes::from(
serde_json::to_vec(&json!({
"type": "error",
"error": {
"type": "invalid_request_error",
"message": "max_tokens is required"
}
}))
.unwrap(),
);
let result =
transform_upstream_error_body_to_openai(&anthropic_error, StatusCode::BAD_REQUEST);
let result_json: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(result_json["error"]["type"], "invalid_request_error");
assert_eq!(result_json["error"]["message"], "max_tokens is required");
assert_eq!(result_json["error"]["code"], serde_json::Value::Null);
}
#[test]
fn test_should_build_anthropic_upstream_headers_without_authorization() {
let client_headers = HashMap::from([
("authorization".to_string(), "Bearer client-key".to_string()),
("content-type".to_string(), "application/json".to_string()),
("user-agent".to_string(), "test-client".to_string()),
]);
let transformed_headers = HashMap::from([
("x-api-key".to_string(), "client-derived-key".to_string()),
("content-type".to_string(), "application/json".to_string()),
]);
let result = build_anthropic_upstream_headers(
&client_headers,
&transformed_headers,
"upstream-secret",
true,
);
assert_eq!(
result.get("x-api-key"),
Some(&"upstream-secret".to_string())
);
assert_eq!(result.get("authorization"), None);
assert_eq!(result.get("accept"), Some(&"text/event-stream".to_string()));
assert_eq!(result.get("user-agent"), Some(&"test-client".to_string()));
}
fn make_primary_target() -> UpstreamTarget {
UpstreamTarget {
name: "primary".to_string(),
url: "http://127.0.0.1:8001/v1".to_string(),
api_key: "pk-primary".to_string(),
}
}
fn make_backup_target() -> UpstreamTarget {
UpstreamTarget {
name: "backup".to_string(),
url: "http://127.0.0.1:8002/v1".to_string(),
api_key: "pk-backup".to_string(),
}
}
#[test]
fn test_router_defaults_to_primary() {
let primary = make_primary_target();
let router = UpstreamRouter::new(primary.clone(), None);
assert_eq!(router.active, ActiveRoute::Primary);
let target = router.active_target();
assert_eq!(target.url, primary.url);
assert_eq!(target.api_key, primary.api_key);
}
#[test]
fn test_router_with_backup_defaults_to_primary() {
let primary = make_primary_target();
let backup = make_backup_target();
let router = UpstreamRouter::new(primary.clone(), Some(backup.clone()));
assert_eq!(router.active, ActiveRoute::Primary);
let target = router.active_target();
assert_eq!(target.url, primary.url);
}
#[test]
fn test_router_fails_over_to_backup_on_429() {
let primary = make_primary_target();
let backup = make_backup_target();
let mut router = UpstreamRouter::new(primary.clone(), Some(backup.clone()));
router.record_response_status(StatusCode::TOO_MANY_REQUESTS);
assert_eq!(router.active, ActiveRoute::Backup);
let target = router.active_target();
assert_eq!(target.url, backup.url);
assert_eq!(target.api_key, backup.api_key);
}
#[test]
fn test_router_no_failover_without_backup() {
let primary = make_primary_target();
let mut router = UpstreamRouter::new(primary.clone(), None);
router.record_response_status(StatusCode::TOO_MANY_REQUESTS);
assert_eq!(router.active, ActiveRoute::Primary);
}
#[test]
fn test_router_non_429_does_not_trigger_failover() {
let primary = make_primary_target();
let backup = make_backup_target();
let mut router = UpstreamRouter::new(primary.clone(), Some(backup.clone()));
router.record_response_status(StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(router.active, ActiveRoute::Primary);
}
#[test]
fn test_router_fails_back_to_primary_on_health_recovery() {
let primary = make_primary_target();
let backup = make_backup_target();
let mut router = UpstreamRouter::new(primary.clone(), Some(backup.clone()));
router.record_response_status(StatusCode::TOO_MANY_REQUESTS);
assert_eq!(router.active, ActiveRoute::Backup);
router.mark_primary_healthy();
assert_eq!(router.active, ActiveRoute::Primary);
assert!(router.primary_healthy);
}
#[test]
fn test_router_health_check_failure_keeps_on_backup() {
let primary = make_primary_target();
let backup = make_backup_target();
let mut router = UpstreamRouter::new(primary.clone(), Some(backup.clone()));
router.record_response_status(StatusCode::TOO_MANY_REQUESTS);
assert_eq!(router.active, ActiveRoute::Backup);
router.mark_primary_unhealthy();
assert_eq!(router.active, ActiveRoute::Backup);
assert!(!router.primary_healthy);
}
#[test]
fn test_router_already_on_backup_does_not_switch_again_on_429() {
let primary = make_primary_target();
let backup = make_backup_target();
let mut router = UpstreamRouter::new(primary.clone(), Some(backup.clone()));
router.record_response_status(StatusCode::TOO_MANY_REQUESTS);
assert_eq!(router.active, ActiveRoute::Backup);
router.record_response_status(StatusCode::TOO_MANY_REQUESTS);
assert_eq!(router.active, ActiveRoute::Backup);
}
#[tokio::test]
async fn test_should_passthrough_to_anthropic_upstream_without_transform() {
let (upstream_url, receiver, join_handle) = spawn_mock_anthropic_upstream().await;
let config = ProxyConfig {
upstream_url: upstream_url.clone(),
upstream_api_key: "upstream-secret".to_string(),
proxy_api_key: None,
router: None,
};
let anthropic_body = Bytes::from(
serde_json::to_vec(&json!({
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello"}]
}))
.unwrap(),
);
let mut headers = HeaderMap::new();
headers.insert("x-api-key", HeaderValue::from_str("test-key").unwrap());
headers.insert(
"content-type",
HeaderValue::from_str("application/json").unwrap(),
);
let active = UpstreamTarget {
name: "primary".to_string(),
url: upstream_url,
api_key: "upstream-secret".to_string(),
};
let response =
handle_anthropic_passthrough(42, &active, &headers, anthropic_body.clone(), &config)
.await;
let captured = receiver
.recv_timeout(Duration::from_secs(5))
.expect("upstream should receive a request");
let received_body: serde_json::Value =
serde_json::from_slice(&captured.body).expect("body should be valid JSON");
assert_eq!(
received_body["max_tokens"], 1024,
"body should keep max_tokens (Anthropic field)"
);
assert!(
received_body.get("messages").is_some(),
"body should have messages array"
);
assert_eq!(
captured.headers.get("x-api-key"),
Some(&"upstream-secret".to_string()),
"upstream should receive x-api-key header"
);
let (_, resp_body) = response.into_response().into_parts();
let resp_bytes = resp_body
.collect()
.await
.expect("response body should be readable")
.aggregate();
let resp_json: serde_json::Value =
serde_json::from_reader(resp_bytes.reader()).expect("response should be valid JSON");
assert_eq!(
resp_json["type"], "message",
"response should be in Anthropic format"
);
assert_eq!(resp_json["role"], "assistant");
join_handle.abort();
}
}