use crate::admin::state::{AdminEvent, RequestLogEntry, RuntimeConfig, SharedState};
use crate::backend::{BackendClient, BackendError};
use crate::cache::{self, CacheBackend, CacheEntry, CacheNamespace};
use crate::config::{Config, MultiConfig};
use crate::metrics::Metrics;
use crate::server::state::{
AnthropicJson, AppState, ConcurrencyPermit, GlobalState, ToolEngineState,
};
use anyllm_translate::{anthropic, compute_request_warnings, mapping, openai};
use axum::{
extract::{DefaultBodyLimit, State},
http::StatusCode,
response::{IntoResponse, Json, Response},
routing::{get, post},
Router,
};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::sync::Semaphore;
use super::passthrough::anthropic_passthrough;
use super::streaming::messages_stream;
use super::token_counting::count_tokens;
use crate::batch::anthropic_batch;
pub fn app(config: Config) -> Router {
let multi = MultiConfig::from_single_config(&config);
app_multi(multi)
}
pub fn app_multi(config: MultiConfig) -> Router {
app_multi_with_shared(config, None, None, None, None, None)
}
pub fn app_multi_with_shared(
config: MultiConfig,
shared: Option<SharedState>,
model_router: Option<Arc<RwLock<crate::config::model_router::ModelRouter>>>,
tool_engine: Option<Arc<ToolEngineState>>,
batch_engine: Option<
Arc<
anyllm_batch_engine::BatchEngine<
anyllm_batch_engine::queue::sqlite::SqliteQueue,
anyllm_batch_engine::webhook::sqlite::SqliteWebhookQueue,
>,
>,
>,
admin_port: Option<u16>,
) -> Router {
let mut backend_metrics: HashMap<String, Metrics> = HashMap::new();
let mut router = Router::new();
let runtime_config: Arc<RwLock<RuntimeConfig>> = if let Some(ref s) = shared {
s.runtime_config.clone()
} else {
let mut model_mappings = indexmap::IndexMap::new();
for (name, bc) in &config.backends {
model_mappings.insert(name.clone(), bc.model_mapping.clone());
}
Arc::new(RwLock::new(RuntimeConfig {
model_mappings,
log_level: "info".to_string(),
log_bodies: config.log_bodies,
}))
};
let cache_config = crate::cache::CacheConfig::from_env();
let response_cache = Arc::new(crate::cache::memory::MemoryCache::new(&cache_config));
let mut backend_states: HashMap<String, (AppState, HandlerMode)> = HashMap::new();
for (name, bc) in &config.backends {
let metrics = Metrics::new();
backend_metrics.insert(name.clone(), metrics.clone());
let backend = BackendClient::from_backend_config(bc);
let mode = match &backend {
BackendClient::GeminiNative(_) => HandlerMode::GeminiNative,
BackendClient::Anthropic(_) => HandlerMode::Anthropic,
BackendClient::Bedrock(_) => HandlerMode::Bedrock,
_ => HandlerMode::Translate,
};
let state = AppState {
backend,
metrics,
runtime_config: runtime_config.clone(),
shared: shared.clone(),
backend_name: name.clone(),
concurrency: Arc::new(Semaphore::new(super::middleware::MAX_CONCURRENT_REQUESTS)),
omit_stream_options: bc.omit_stream_options,
stream_timeout_secs: bc.stream_timeout_secs,
expose_degradation_warnings: config.expose_degradation_warnings,
cache: Some(response_cache.clone()),
model_router: model_router.clone(),
all_backends: None,
tool_engine: tool_engine.clone(),
batch_engine: batch_engine.clone(),
};
let sub = backend_router(state.clone(), mode);
backend_states.insert(name.clone(), (state, mode));
router = router.nest(&format!("/{name}"), sub);
}
if model_router.is_some() {
let all_map: Arc<HashMap<String, AppState>> = Arc::new(
backend_states
.iter()
.map(|(k, (s, _))| (k.clone(), s.clone()))
.collect(),
);
for (_, (state, _)) in backend_states.iter_mut() {
state.all_backends = Some(all_map.clone());
}
}
if let Some((default_state, mode)) = backend_states.get(&config.default_backend) {
let default_sub = backend_router(default_state.clone(), *mode);
router = router.merge(default_sub);
}
let global_state = GlobalState {
backend_metrics: Arc::new(backend_metrics),
};
let metrics_route = Router::new()
.route(
"/metrics",
get(|State(gs): State<GlobalState>| async move {
let mut backends = serde_json::Map::new();
let mut total_requests: u64 = 0;
let mut total_success: u64 = 0;
let mut total_error: u64 = 0;
for (name, m) in gs.backend_metrics.iter() {
let snap = m.snapshot();
total_requests += snap.requests_total;
total_success += snap.requests_success;
total_error += snap.requests_error;
backends.insert(
name.clone(),
serde_json::to_value(&snap).unwrap_or_default(),
);
}
Json(serde_json::json!({
"backends": backends,
"total": {
"requests_total": total_requests,
"requests_success": total_success,
"requests_error": total_error,
}
}))
}),
)
.layer(axum::middleware::from_fn(super::middleware::validate_auth));
let mut final_router = Router::new()
.route("/health", get(health))
.merge(metrics_route)
.merge(router)
.fallback(fallback_not_found)
.layer(axum::middleware::from_fn(super::middleware::add_request_id));
if super::middleware::ip_allowlist_active() {
final_router = final_router.layer(axum::middleware::from_fn(
super::middleware::check_ip_allowlist,
));
tracing::info!("IP allowlist middleware enabled");
}
let final_router = final_router.with_state(global_state);
if let Some(port) = admin_port {
Router::new()
.route(
"/",
get(move |headers: axum::http::HeaderMap| async move {
let host = headers
.get("host")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split(':').next())
.unwrap_or("localhost")
.to_owned();
axum::response::Redirect::temporary(&format!("http://{}:{}/admin/", host, port))
}),
)
.merge(final_router)
} else {
final_router
}
}
async fn fallback_not_found() -> Response {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::NotFoundError,
"Not found".to_string(),
None,
);
(StatusCode::NOT_FOUND, Json(err)).into_response()
}
#[derive(Debug, Clone, Copy)]
enum HandlerMode {
Anthropic,
Bedrock,
GeminiNative,
Translate,
}
fn backend_router(state: AppState, mode: HandlerMode) -> Router<GlobalState> {
let common_routes: Router<AppState> = Router::new()
.route("/v1/models", get(models))
.route("/v1/files", post(crate::batch::routes::upload_file))
.route(
"/v1/batches",
post(crate::batch::routes::create_batch).get(crate::batch::routes::list_batches),
)
.route(
"/v1/batches/{batch_id}",
get(crate::batch::routes::get_batch),
)
.route(
"/v1/batches/{batch_id}/cancel",
post(crate::batch::routes::cancel_batch),
);
let gemini_input_routes: Router<AppState> = Router::new().route(
"/v1beta/models/{model_action}",
post(super::gemini_input::gemini_input_handler),
);
let api_routes = match mode {
HandlerMode::Anthropic => common_routes
.route("/v1/messages", post(anthropic_passthrough))
.route(
"/v1/{*path}",
axum::routing::any(super::passthrough::anthropic_generic_passthrough),
)
.merge(gemini_input_routes),
HandlerMode::Bedrock => common_routes
.route(
"/v1/messages",
post(super::bedrock_passthrough::bedrock_passthrough),
)
.route(
"/model/{model_id}/converse",
post(super::bedrock_native::bedrock_converse),
)
.route(
"/model/{model_id}/converse-stream",
post(super::bedrock_native::bedrock_converse_stream),
)
.route(
"/model/{model_id}/invoke",
post(super::bedrock_native::bedrock_invoke),
)
.route(
"/model/{model_id}/invoke-with-response-stream",
post(super::bedrock_native::bedrock_invoke_stream),
)
.merge(gemini_input_routes),
HandlerMode::GeminiNative => common_routes
.route(
"/v1/messages",
post(super::gemini_native::gemini_native_handler),
)
.merge(gemini_input_routes),
HandlerMode::Translate => common_routes
.route("/v1/messages", post(messages))
.route(
"/v1/chat/completions",
post(super::chat_completions::chat_completions),
)
.route("/v1/messages/count_tokens", post(count_tokens))
.route(
"/v1/messages/batches",
post(anthropic_batch::create_anthropic_batch),
)
.route(
"/v1/messages/batches/{id}",
get(anthropic_batch::get_anthropic_batch),
)
.route(
"/v1/messages/batches/{id}/results",
get(anthropic_batch::get_anthropic_batch_results),
)
.route("/v1/embeddings", post(embeddings))
.route(
"/v1/audio/transcriptions",
post(super::audio::audio_transcriptions),
)
.route("/v1/audio/speech", post(super::audio::audio_speech))
.route(
"/v1/images/generations",
post(super::images::image_generations),
)
.route("/v1/rerank", post(rerank))
.route("/v2/rerank", post(v2_rerank))
.route("/v1/completions", post(completions))
.route(
"/v1/{*path}",
axum::routing::any(super::generic_passthrough::v1_generic_passthrough),
)
.merge(gemini_input_routes),
};
api_routes
.layer(axum::middleware::from_fn_with_state(
state.clone(),
enforce_route_scope,
))
.layer(axum::middleware::from_fn(super::middleware::validate_auth))
.layer(axum::middleware::from_fn(
super::middleware::log_anthropic_headers,
))
.layer(DefaultBodyLimit::max(super::middleware::MAX_BODY_SIZE))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
enforce_concurrency,
))
.with_state(state)
}
async fn enforce_concurrency(
State(state): State<AppState>,
mut request: axum::extract::Request,
next: axum::middleware::Next,
) -> Response {
let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::RateLimitError,
"Proxy concurrency limit reached".to_string(),
None,
);
return (StatusCode::TOO_MANY_REQUESTS, Json(err)).into_response();
};
request
.extensions_mut()
.insert(ConcurrencyPermit(Arc::new(permit)));
next.run(request).await
}
async fn enforce_route_scope(
State(state): State<AppState>,
request: axum::extract::Request,
next: axum::middleware::Next,
) -> Response {
let allowed_routes = request
.extensions()
.get::<super::middleware::VirtualKeyContext>()
.and_then(|ctx| ctx.allowed_routes.clone());
if allowed_routes.is_some() {
if let Err(error) =
super::policy::enforce_route_scope(&state.backend_name, &state.shared, &allowed_routes)
.await
{
return route_scope_forbidden_response(error);
}
}
next.run(request).await
}
fn route_scope_forbidden_response(error: super::policy::RouteScopeError) -> Response {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::PermissionError,
error.message().to_string(),
None,
);
(StatusCode::FORBIDDEN, Json(err)).into_response()
}
static STATIC_CLAUDE_MODELS: std::sync::LazyLock<Vec<serde_json::Value>> = std::sync::LazyLock::new(
|| {
vec![
serde_json::json!({"id": "claude-opus-4-6", "object": "model", "created": 1715644800, "owned_by": "anthropic", "display_name": "Claude Opus 4.6"}),
serde_json::json!({"id": "claude-sonnet-4-6", "object": "model", "created": 1715644800, "owned_by": "anthropic", "display_name": "Claude Sonnet 4.6"}),
serde_json::json!({"id": "claude-opus-4-5", "object": "model", "created": 1715644800, "owned_by": "anthropic", "display_name": "Claude Opus 4.5"}),
serde_json::json!({"id": "claude-sonnet-4-5", "object": "model", "created": 1715644800, "owned_by": "anthropic", "display_name": "Claude Sonnet 4.5"}),
serde_json::json!({"id": "claude-haiku-4-5", "object": "model", "created": 1715644800, "owned_by": "anthropic", "display_name": "Claude Haiku 4.5"}),
serde_json::json!({"id": "claude-haiku-4-5-20251001", "object": "model", "created": 1727740800, "owned_by": "anthropic", "display_name": "Claude Haiku 4.5 (Oct 2025)"}),
serde_json::json!({"id": "claude-3-7-sonnet-20250219", "object": "model", "created": 1708300800, "owned_by": "anthropic", "display_name": "Claude 3.7 Sonnet"}),
serde_json::json!({"id": "claude-3-5-sonnet-20241022", "object": "model", "created": 1729555200, "owned_by": "anthropic", "display_name": "Claude 3.5 Sonnet (Oct 2024)"}),
serde_json::json!({"id": "claude-3-5-sonnet-20240620", "object": "model", "created": 1718841600, "owned_by": "anthropic", "display_name": "Claude 3.5 Sonnet (Jun 2024)"}),
serde_json::json!({"id": "claude-3-5-haiku-20241022", "object": "model", "created": 1729555200, "owned_by": "anthropic", "display_name": "Claude 3.5 Haiku"}),
serde_json::json!({"id": "claude-3-opus-20240229", "object": "model", "created": 1709164800, "owned_by": "anthropic", "display_name": "Claude 3 Opus"}),
serde_json::json!({"id": "claude-3-haiku-20240307", "object": "model", "created": 1709769600, "owned_by": "anthropic", "display_name": "Claude 3 Haiku"}),
]
},
);
async fn models(State(state): State<AppState>) -> Json<serde_json::Value> {
let mut data: Vec<serde_json::Value> = STATIC_CLAUDE_MODELS.clone();
if let Some(ref router_lock) = state.model_router {
let router = router_lock.read().unwrap_or_else(|e| e.into_inner());
let static_ids: std::collections::HashSet<String> = data
.iter()
.filter_map(|m| m["id"].as_str().map(|s| s.to_string()))
.collect();
for model_name in router.known_models() {
if !static_ids.contains(model_name) {
data.push(serde_json::json!({
"id": model_name,
"object": "model",
"created": 0,
"owned_by": "organization"
}));
}
}
}
Json(serde_json::json!({
"object": "list",
"data": data,
}))
}
async fn health() -> impl IntoResponse {
([("content-type", "application/json")], r#"{"status":"ok"}"#)
}
pub(super) fn backend_error_to_response(error: BackendError) -> Response {
if let Some((message, status)) = error.api_error_details() {
let anthropic_err = mapping::errors_map::status_to_anthropic_error(status, &message, None);
let http_status = StatusCode::from_u16(
mapping::errors_map::anthropic_error_type_to_status(&anthropic_err.error.error_type),
)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
return (http_status, Json(anthropic_err)).into_response();
}
tracing::error!("backend client error: {error}");
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::ApiError,
"An internal error occurred while communicating with the upstream service.".to_string(),
None,
);
(StatusCode::INTERNAL_SERVER_ERROR, Json(err)).into_response()
}
pub(crate) fn cache_header_value(bypass: bool) -> axum::http::HeaderValue {
if bypass {
axum::http::HeaderValue::from_static("bypass")
} else {
axum::http::HeaderValue::from_static("miss")
}
}
pub(crate) async fn try_cache_response<T: serde::Serialize>(
cache_key: &Option<String>,
cache: &Option<Arc<crate::cache::memory::MemoryCache>>,
cache_ttl: Option<u64>,
response: &T,
model: String,
) {
if let (Some(ref key), Some(ref c)) = (cache_key, cache) {
if let Ok(resp_body) = serde_json::to_vec(response).map(bytes::Bytes::from) {
let ttl = cache_ttl.unwrap_or(c.default_ttl_secs);
c.put(
key,
CacheEntry {
response_body: resp_body,
model,
created_at: std::time::Instant::now(),
ttl_secs: cache_ttl,
},
ttl,
)
.await;
}
}
}
pub(crate) fn inject_degradation_header(
headers: &mut axum::http::HeaderMap,
warnings: &anyllm_translate::TranslationWarnings,
) {
if let Some(val) = warnings.as_header_value() {
if let Ok(hv) = axum::http::HeaderValue::from_str(&val) {
headers.insert("x-anyllm-degradation", hv);
}
}
}
async fn passthrough_to_backend(
state: &AppState,
headers: &axum::http::HeaderMap,
body: axum::body::Bytes,
path: &str,
) -> Response {
let content_type = headers
.get(axum::http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("application/json");
match state
.backend
.raw_passthrough(path, body, content_type)
.await
{
Ok((status, resp_headers, resp_body)) => {
let mut response = (status, resp_body).into_response();
for (k, v) in &resp_headers {
response.headers_mut().insert(k, v.clone());
}
response
}
Err(e) => backend_error_to_response(e),
}
}
async fn embeddings(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: axum::body::Bytes,
) -> Response {
passthrough_to_backend(&state, &headers, body, "/v1/embeddings").await
}
async fn rerank(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: axum::body::Bytes,
) -> Response {
passthrough_to_backend(&state, &headers, body, "/v1/rerank").await
}
async fn v2_rerank(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: axum::body::Bytes,
) -> Response {
passthrough_to_backend(&state, &headers, body, "/v2/rerank").await
}
async fn completions(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: axum::body::Bytes,
) -> Response {
passthrough_to_backend(&state, &headers, body, "/v1/completions").await
}
async fn messages(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
permit: Option<axum::Extension<ConcurrencyPermit>>,
vk_ctx: Option<axum::Extension<super::middleware::VirtualKeyContext>>,
AnthropicJson(body): AnthropicJson<anthropic::MessageCreateRequest>,
) -> Response {
let permit = permit.map(|axum::Extension(p)| p);
let vk_ctx = vk_ctx.map(|axum::Extension(c)| c);
let ctx = RequestCtx {
request_id: headers
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string(),
start: std::time::Instant::now(),
model_requested: body.model.clone(),
};
state.metrics.record_request();
if let Some(ref ctx) = vk_ctx {
if !super::policy::is_model_allowed(&body.model, &ctx.allowed_models) {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::PermissionError,
format!("Model '{}' is not allowed for this API key.", body.model),
None,
);
return (StatusCode::FORBIDDEN, Json(err)).into_response();
}
}
if state.log_bodies() {
tracing::debug!(
model = %body.model,
stream = ?body.stream,
message_count = body.messages.len(),
body = %serde_json::to_string(&body).unwrap_or_else(|_| "[serialization failed]".into()),
"request body"
);
}
let warnings = compute_request_warnings(&body);
let is_streaming = body.stream == Some(true);
if is_streaming {
if state.log_bodies() {
tracing::debug!(model = %body.model, "streaming request initiated");
}
let (mapped_model, effective, deployment) = match state.resolve_model_and_state(&body.model)
{
Ok(v) => v,
Err(resp) => return resp,
};
if let Some(ref ctx) = vk_ctx {
if let Err(error) = super::policy::enforce_route_scope(
&effective.backend_name,
&effective.shared,
&ctx.allowed_routes,
)
.await
{
return route_scope_forbidden_response(error);
}
}
if let Some(ref d) = deployment {
d.record_start();
}
let stream_start = std::time::Instant::now();
match messages_stream(effective, body, ctx, mapped_model, permit, vk_ctx.clone()).await {
Ok((rate_limits, sse)) => {
if let Some(ref d) = deployment {
d.record_finish(stream_start.elapsed().as_millis() as u64);
}
let mut response = sse.into_response();
rate_limits.inject_anthropic_response_headers(response.headers_mut());
if state.expose_degradation_warnings {
inject_degradation_header(response.headers_mut(), &warnings);
}
response.headers_mut().insert(
"x-anyllm-cache",
axum::http::HeaderValue::from_static("bypass"),
);
return response;
}
Err(e) => {
if let Some(ref d) = deployment {
d.record_finish(stream_start.elapsed().as_millis() as u64);
}
return backend_error_to_response(e);
}
}
}
let body_value = serde_json::to_value(&body).unwrap_or_default();
let cache_ttl = match cache::parse_cache_ttl(&body_value) {
Ok(ttl) => ttl,
Err(msg) => {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::InvalidRequestError,
msg,
None,
);
return (StatusCode::BAD_REQUEST, Json(err)).into_response();
}
};
let bypass_cache = cache_ttl == Some(0);
let cache_key = if !bypass_cache {
Some(cache::cache_key_for_request(
&body_value,
CacheNamespace::Anthropic,
))
} else {
None
};
let (mapped_model, effective, deployment) = match state.resolve_model_and_state(&body.model) {
Ok(v) => v,
Err(resp) => return resp,
};
if let Some(ref ctx) = vk_ctx {
if let Err(error) = super::policy::enforce_route_scope(
&effective.backend_name,
&effective.shared,
&ctx.allowed_routes,
)
.await
{
return route_scope_forbidden_response(error);
}
}
if let (Some(ref key), Some(ref c)) = (&cache_key, &state.cache) {
if let Some(entry) = c.get(key).await {
tracing::debug!(cache_key = %key, "cache hit for /v1/messages");
let mut response = Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/json")
.header("x-anyllm-cache", "hit")
.body(axum::body::Body::from(entry.response_body))
.unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response());
if state.expose_degradation_warnings {
inject_degradation_header(response.headers_mut(), &warnings);
}
return response;
}
}
if let Some(ref d) = deployment {
d.record_start();
}
let backend_start = std::time::Instant::now();
match &effective.backend {
BackendClient::OpenAI(client)
| BackendClient::AzureOpenAI(client)
| BackendClient::Vertex(client)
| BackendClient::GeminiOpenAI(client) => {
let mut openai_req = mapping::message_map::anthropic_to_openai_request(&body);
inject_gemini_thinking(&body, &effective.backend, &mut openai_req);
inject_glm_thinking(&body, &effective.backend, &mut openai_req);
if matches!(
effective.backend,
BackendClient::GeminiOpenAI(_) | BackendClient::Vertex(_)
) {
if let Some(tools) = openai_req.tools.take() {
openai_req.tools = Some(
tools
.into_iter()
.map(|mut t| {
if let Some(params) = t.function.parameters.take() {
t.function.parameters = Some(
mapping::tools_map::sanitize_schema_for_gemini(params),
);
}
t
})
.collect(),
);
}
}
if effective.omit_stream_options {
openai_req.stream_options = None;
}
openai_req.model = mapped_model.clone();
let mapped_model = openai_req.model.clone();
let original_model = body.model.clone();
match client.chat_completion(&openai_req).await {
Ok((openai_resp, _status, rate_limits)) => {
if let Some(ref d) = deployment {
d.record_finish(backend_start.elapsed().as_millis() as u64);
}
state.metrics.record_success();
let anthropic_resp = mapping::message_map::openai_to_anthropic_response(
&openai_resp,
&original_model,
);
let anthropic_resp = if let Some(ref engine) = state.tool_engine {
let client_for_tools = client.clone();
let model_for_tools = mapped_model.clone();
let orig_model_for_tools = original_model.clone();
let server_advertised_tool_names = std::collections::HashSet::new();
let (resp, trace) = crate::tools::execution::maybe_execute_tools(
engine,
&body,
&server_advertised_tool_names,
anthropic_resp,
|follow_up_req| {
let c = client_for_tools.clone();
let m = model_for_tools.clone();
let om = orig_model_for_tools.clone();
async move {
let mut oai_req =
mapping::message_map::anthropic_to_openai_request(
&follow_up_req,
);
oai_req.model = m;
match c.chat_completion(&oai_req).await {
Ok((resp, _, _)) => {
Ok(mapping::message_map::openai_to_anthropic_response(
&resp, &om,
))
}
Err(e) => Err(format!("{e}")),
}
}
},
)
.await;
tracing::debug!(
termination_reason = ?trace.termination_reason,
iterations = trace.iterations.len(),
tool_calls = trace.total_tool_calls(),
total_ms = trace.total_duration.as_millis(),
"tool execution loop complete"
);
resp
} else {
anthropic_resp
};
if state.log_bodies() {
tracing::debug!(
body = %serde_json::to_string(&anthropic_resp).unwrap_or_else(|_| "[serialization failed]".into()),
"response body"
);
}
let cost = record_virtual_key_usage(
&state.shared,
&vk_ctx,
&mapped_model,
anthropic_resp.usage.input_tokens as u64,
anthropic_resp.usage.output_tokens as u64,
);
log_request(
&state.shared,
ctx.log_entry_with_attribution(
&state.backend_name,
Some(mapped_model),
200,
Some((
anthropic_resp.usage.input_tokens as u64,
anthropic_resp.usage.output_tokens as u64,
)),
false,
None,
&vk_ctx,
Some(cost),
),
);
try_cache_response(
&cache_key,
&state.cache,
cache_ttl,
&anthropic_resp,
original_model,
)
.await;
let cache_hv = cache_header_value(bypass_cache);
let mut response = (StatusCode::OK, Json(anthropic_resp)).into_response();
rate_limits.inject_anthropic_response_headers(response.headers_mut());
if state.expose_degradation_warnings {
inject_degradation_header(response.headers_mut(), &warnings);
}
response.headers_mut().insert("x-anyllm-cache", cache_hv);
response
}
Err(e) => {
if let Some(ref d) = deployment {
d.record_finish(backend_start.elapsed().as_millis() as u64);
}
state.metrics.record_error();
let backend_error = BackendError::from(e);
let mut entry = ctx.log_entry_with_attribution(
&state.backend_name,
Some(mapped_model),
backend_error.status_code(),
None,
false,
Some(backend_error.to_string()),
&vk_ctx,
None,
);
set_backend_error_kind(&mut entry, &backend_error);
log_request(&state.shared, entry);
backend_error_to_response(backend_error)
}
}
}
BackendClient::OpenAIResponses(client) => {
let mut responses_req =
mapping::responses_message_map::anthropic_to_responses_request(&body);
responses_req.model = mapped_model.clone();
let mapped_model = responses_req.model.clone();
let original_model = body.model.clone();
match client.responses(&responses_req).await {
Ok((resp, _status, rate_limits)) => {
if let Some(ref d) = deployment {
d.record_finish(backend_start.elapsed().as_millis() as u64);
}
state.metrics.record_success();
let anthropic_resp =
mapping::responses_message_map::responses_to_anthropic_response(
&resp,
&original_model,
);
if state.log_bodies() {
tracing::debug!(
body = %serde_json::to_string(&anthropic_resp).unwrap_or_else(|_| "[serialization failed]".into()),
"response body"
);
}
let cost = record_virtual_key_usage(
&state.shared,
&vk_ctx,
&mapped_model,
anthropic_resp.usage.input_tokens as u64,
anthropic_resp.usage.output_tokens as u64,
);
log_request(
&state.shared,
ctx.log_entry_with_attribution(
&state.backend_name,
Some(mapped_model),
200,
Some((
anthropic_resp.usage.input_tokens as u64,
anthropic_resp.usage.output_tokens as u64,
)),
false,
None,
&vk_ctx,
Some(cost),
),
);
try_cache_response(
&cache_key,
&state.cache,
cache_ttl,
&anthropic_resp,
original_model,
)
.await;
let cache_hv = cache_header_value(bypass_cache);
let mut response = (StatusCode::OK, Json(anthropic_resp)).into_response();
rate_limits.inject_anthropic_response_headers(response.headers_mut());
if state.expose_degradation_warnings {
inject_degradation_header(response.headers_mut(), &warnings);
}
response.headers_mut().insert("x-anyllm-cache", cache_hv);
response
}
Err(e) => {
if let Some(ref d) = deployment {
d.record_finish(backend_start.elapsed().as_millis() as u64);
}
state.metrics.record_error();
let backend_error = BackendError::from(e);
let mut entry = ctx.log_entry_with_attribution(
&state.backend_name,
Some(mapped_model),
backend_error.status_code(),
None,
false,
Some(backend_error.to_string()),
&vk_ctx,
None,
);
set_backend_error_kind(&mut entry, &backend_error);
log_request(&state.shared, entry);
backend_error_to_response(backend_error)
}
}
}
BackendClient::Anthropic(_)
| BackendClient::Bedrock(_)
| BackendClient::GeminiNative(_) => {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::ApiError,
"This backend does not use the translation handler".to_string(),
None,
);
(StatusCode::INTERNAL_SERVER_ERROR, Json(err)).into_response()
}
}
}
pub(crate) struct RequestCtx {
pub(crate) request_id: String,
pub(crate) start: std::time::Instant,
pub(crate) model_requested: String,
}
impl RequestCtx {
pub(crate) fn log_entry(
&self,
backend_name: &str,
model_mapped: Option<String>,
status_code: u16,
tokens: Option<(u64, u64)>,
is_streaming: bool,
error_message: Option<String>,
) -> RequestLogEntry {
RequestLogEntry {
request_id: self.request_id.clone(),
timestamp: crate::admin::db::now_iso8601(),
backend: backend_name.to_string(),
model_requested: Some(self.model_requested.clone()),
model_mapped,
status_code,
latency_ms: self.start.elapsed().as_millis() as u64,
input_tokens: tokens.map(|(i, _)| i),
output_tokens: tokens.map(|(_, o)| o),
is_streaming,
error_message,
error_kind: None,
key_id: None,
cost_usd: None,
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn log_entry_with_attribution(
&self,
backend_name: &str,
model_mapped: Option<String>,
status_code: u16,
tokens: Option<(u64, u64)>,
is_streaming: bool,
error_message: Option<String>,
vk_ctx: &Option<super::middleware::VirtualKeyContext>,
cost_usd: Option<f64>,
) -> RequestLogEntry {
let mut entry = self.log_entry(
backend_name,
model_mapped,
status_code,
tokens,
is_streaming,
error_message,
);
entry.key_id = vk_ctx.as_ref().map(|ctx| ctx.key_id);
entry.cost_usd = cost_usd.filter(|&c| c > 0.0);
entry
}
}
pub(crate) fn inject_gemini_thinking(
body: &anthropic::MessageCreateRequest,
backend: &BackendClient,
req: &mut openai::ChatCompletionRequest,
) {
if !matches!(
backend,
BackendClient::GeminiOpenAI(_) | BackendClient::Vertex(_)
) {
return;
}
if let Some(anthropic::ThinkingConfig::Enabled { budget_tokens }) = &body.thinking {
req.extra.insert(
"google".to_string(),
serde_json::json!({
"thinking_config": { "thinking_budget": budget_tokens }
}),
);
}
}
pub(crate) fn inject_glm_thinking(
body: &anthropic::MessageCreateRequest,
backend: &BackendClient,
req: &mut openai::ChatCompletionRequest,
) {
let is_glm = matches!(
backend,
BackendClient::OpenAI(c) if c.provider_id() == Some("zhipuai")
);
if !is_glm {
return;
}
if matches!(
&body.thinking,
Some(anthropic::ThinkingConfig::Enabled { .. })
) {
req.extra.remove("reasoning_effort");
req.extra.insert(
"thinking".to_string(),
serde_json::json!({"type": "enabled", "clear_thinking": false}),
);
}
}
pub(crate) fn record_vk_tpm(
vk_ctx: &Option<super::middleware::VirtualKeyContext>,
output_tokens: u32,
) {
if let Some(ctx) = vk_ctx {
ctx.rate_state
.record_tpm(crate::admin::keys::now_ms(), output_tokens);
}
}
pub(crate) fn record_virtual_key_usage(
shared: &Option<SharedState>,
vk_ctx: &Option<super::middleware::VirtualKeyContext>,
model: &str,
input_tokens: u64,
output_tokens: u64,
) -> f64 {
let capped_output = output_tokens.min(u32::MAX as u64) as u32;
record_vk_tpm(vk_ctx, capped_output);
crate::cost::record_cost(shared, vk_ctx, model, input_tokens, output_tokens)
}
static CALLBACKS: std::sync::OnceLock<Arc<crate::callbacks::CallbackConfig>> =
std::sync::OnceLock::new();
pub fn set_callbacks(config: Arc<crate::callbacks::CallbackConfig>) {
let _ = CALLBACKS.set(config);
}
pub fn get_callbacks() -> Option<&'static Arc<crate::callbacks::CallbackConfig>> {
CALLBACKS.get()
}
pub(crate) fn log_request(shared: &Option<SharedState>, entry: RequestLogEntry) {
if let Some(cb) = CALLBACKS.get() {
cb.notify(&entry);
}
if let Some(ref shared) = shared {
let _ = shared
.events_tx
.send(AdminEvent::RequestCompleted(entry.clone()));
let _ = shared.log_tx.try_send(entry);
}
}
pub(crate) fn set_backend_error_kind(entry: &mut RequestLogEntry, error: &BackendError) {
entry.error_kind = Some(error.error_kind().to_string());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn static_model_list_excludes_deprecated_sonnet() {
let ids: Vec<&str> = STATIC_CLAUDE_MODELS
.iter()
.filter_map(|m| m["id"].as_str())
.collect();
assert!(
!ids.contains(&"claude-3-sonnet-20240229"),
"claude-3-sonnet-20240229 is deprecated and must not appear in the model list"
);
}
}