use crate::admin::state::{RuntimeConfig, SharedState};
use crate::backend::BackendClient;
use crate::metrics::Metrics;
use anyllm_translate::{anthropic, mapping};
use axum::{
extract::{rejection::JsonRejection, FromRequest},
http::StatusCode,
response::{IntoResponse, Json, Response},
};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::sync::Semaphore;
pub(crate) struct AnthropicJson<T>(pub T);
impl<S, T> FromRequest<S> for AnthropicJson<T>
where
Json<T>: FromRequest<S, Rejection = JsonRejection>,
S: Send + Sync,
{
type Rejection = Response;
async fn from_request(req: axum::extract::Request, state: &S) -> Result<Self, Self::Rejection> {
match Json::<T>::from_request(req, state).await {
Ok(Json(value)) => Ok(AnthropicJson(value)),
Err(rejection) => {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::InvalidRequestError,
rejection.body_text(),
None,
);
Err((StatusCode::BAD_REQUEST, Json(err)).into_response())
}
}
}
}
pub(crate) enum ResolvedModel {
Routed {
backend_name: String,
model: String,
deployment: Arc<crate::config::model_router::Deployment>,
},
AllAtLimit,
UnknownModel,
Legacy(String),
}
#[derive(Clone)]
pub struct ToolEngineState {
pub registry: Arc<crate::tools::ToolRegistry>,
pub policy: Arc<crate::tools::ToolExecutionPolicy>,
pub loop_config: crate::tools::LoopConfig,
pub mcp_manager: Option<Arc<crate::tools::McpServerManager>>,
}
#[derive(Clone)]
pub struct AppState {
pub backend: BackendClient,
pub metrics: Metrics,
pub runtime_config: Arc<RwLock<RuntimeConfig>>,
pub shared: Option<SharedState>,
pub backend_name: String,
pub concurrency: Arc<Semaphore>,
pub omit_stream_options: bool,
pub stream_timeout_secs: u64,
pub expose_degradation_warnings: bool,
pub cache: Option<Arc<crate::cache::memory::MemoryCache>>,
pub model_router: Option<Arc<RwLock<crate::config::model_router::ModelRouter>>>,
pub all_backends: Option<Arc<HashMap<String, AppState>>>,
pub tool_engine: Option<Arc<ToolEngineState>>,
pub batch_engine: Option<
Arc<
anyllm_batch_engine::BatchEngine<
anyllm_batch_engine::queue::sqlite::SqliteQueue,
anyllm_batch_engine::webhook::sqlite::SqliteWebhookQueue,
>,
>,
>,
}
impl AppState {
pub(crate) fn map_model(&self, model: &str) -> String {
let config = self
.runtime_config
.read()
.unwrap_or_else(|e| e.into_inner());
if let Some(mapping) = config.model_mappings.get(&self.backend_name) {
mapping.map_model(model)
} else {
model.to_string()
}
}
pub(crate) fn resolve_model(&self, model: &str) -> ResolvedModel {
if let Some(ref router_lock) = self.model_router {
let router = router_lock.read().unwrap_or_else(|e| e.into_inner());
if let Some(routed) = router.route(model) {
return ResolvedModel::Routed {
backend_name: routed.backend_name.to_string(),
model: routed.actual_model.to_string(),
deployment: routed.deployment.clone(),
};
}
if router.has_model(model) {
return ResolvedModel::AllAtLimit;
}
return ResolvedModel::UnknownModel;
}
ResolvedModel::Legacy(self.map_model(model))
}
#[allow(clippy::result_large_err)]
pub(crate) fn resolve_model_and_state(
&self,
model: &str,
) -> Result<
(
String,
AppState,
Option<Arc<crate::config::model_router::Deployment>>,
),
Response,
> {
match self.resolve_model(model) {
ResolvedModel::Routed {
backend_name,
model: mapped,
deployment,
} => {
let effective = self
.all_backends
.as_ref()
.and_then(|m| m.get(&backend_name))
.cloned()
.or_else(|| {
self.shared.as_ref().and_then(|s| {
let guard = s.managed_backends
.read()
.ok()
.or_else(|| {
tracing::warn!("managed_backends RwLock is poisoned; skipping managed backend lookup");
None
})?;
guard.get(&backend_name).map(|(_, client)| {
let mut state = self.clone();
state.backend = client.clone();
state.backend_name = backend_name.clone();
state
})
})
})
.unwrap_or_else(|| self.clone());
Ok((mapped, effective, Some(deployment)))
}
ResolvedModel::AllAtLimit => {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::RateLimitError,
"all deployments for this model are at their RPM limit".to_string(),
None,
);
Err((StatusCode::TOO_MANY_REQUESTS, Json(err)).into_response())
}
ResolvedModel::UnknownModel => {
let err = mapping::errors_map::create_anthropic_error(
anthropic::ErrorType::InvalidRequestError,
format!("model '{model}' is not configured in model_list"),
None,
);
Err((StatusCode::BAD_REQUEST, Json(err)).into_response())
}
ResolvedModel::Legacy(mapped) => Ok((mapped, self.clone(), None)),
}
}
pub(crate) fn log_bodies(&self) -> bool {
self.runtime_config
.read()
.unwrap_or_else(|e| e.into_inner())
.log_bodies
}
}
#[derive(Clone)]
pub(crate) struct GlobalState {
pub(crate) backend_metrics: Arc<HashMap<String, Metrics>>,
}
#[derive(Clone)]
pub(crate) struct ConcurrencyPermit(
#[allow(dead_code)] pub(crate) Arc<tokio::sync::OwnedSemaphorePermit>,
);