use actix_web::{
HttpRequest, HttpResponse, Responder, delete, get, patch, post, put,
web::{self, Data, Json, Path, Query},
};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde_json::json;
use sha256::digest;
use sqlx::postgres::PgPool;
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
api_created, api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};
use crate::config::Config;
use crate::data::admission_events::list_admission_events;
use crate::data::api_keys::{
ApiKeyStoreError, CreateApiKeyParams, SaveApiKeyParams, create_api_key, create_api_key_right,
delete_api_key, delete_api_key_right, delete_client_api_key_config, get_api_key,
get_global_api_key_config, list_api_key_rights, list_api_keys, list_client_api_key_configs,
save_api_key, set_global_api_key_config, update_api_key_right, upsert_client_api_key_config,
};
use crate::data::client_configs::{
delete_athena_client_config_by_name, ensure_athena_client_config_table,
get_athena_client_config_by_name, list_athena_client_configs, upsert_athena_client_config,
};
use crate::data::clients::{
AthenaClientRecord, ClientOperationStatusFilter, SaveAthenaClientParams, delete_athena_client,
get_athena_client_by_name, get_client_statistics, list_athena_clients,
list_client_operation_drilldown, list_client_statistics, list_client_table_statistics,
refresh_client_statistics, set_client_frozen_state, upsert_athena_client,
};
use crate::data::public_routes::{
PatchPublicGatewayRouteParams, SavePublicGatewayRouteParams, create_public_gateway_route,
list_public_gateway_routes, patch_public_gateway_route, soft_delete_public_gateway_route,
};
use crate::data::query_optimization::{
apply_query_optimization_recommendation, list_query_optimization_recommendations,
list_query_optimization_runs, refresh_query_optimization_recommendations,
};
use crate::data::vacuum_health::{
get_latest_vacuum_health_detail, list_latest_vacuum_health_summaries,
};
use crate::drivers::postgresql::sqlx_driver::ClientConnectionTarget;
#[derive(Debug, Deserialize)]
struct CreateApiKeyRequest {
name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
expires_at: Option<String>,
#[serde(default)]
rights: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct UpdateApiKeyRequest {
#[serde(default)]
name: Option<String>,
#[serde(default)]
description: Option<Option<String>>,
#[serde(default)]
client_name: Option<Option<String>>,
#[serde(default)]
expires_at: Option<Option<String>>,
#[serde(default)]
is_active: Option<bool>,
#[serde(default)]
rights: Option<Vec<String>>,
}
#[derive(Debug, Deserialize)]
struct SaveApiKeyRightRequest {
name: String,
#[serde(default)]
description: Option<String>,
}
#[derive(Debug, Deserialize)]
struct SaveApiKeyEnforcementRequest {
enforce_api_keys: bool,
}
#[derive(Debug, Deserialize)]
struct SaveAthenaClientRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(default)]
pg_uri: Option<String>,
#[serde(default)]
pg_uri_env_var: Option<String>,
#[serde(default)]
is_active: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct FreezeAthenaClientRequest {
is_frozen: bool,
}
#[derive(Debug, Deserialize)]
struct ClientStatisticsDrilldownQuery {
table_name: String,
operation: String,
#[serde(default)]
status: Option<String>,
#[serde(default)]
limit: Option<i64>,
#[serde(default)]
offset: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct AdmissionEventsQuery {
#[serde(default)]
decision: Option<String>,
#[serde(default)]
client: Option<String>,
#[serde(default)]
limit: Option<i64>,
#[serde(default)]
offset: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct QueryOptimizationListQuery {
#[serde(default)]
status: Option<String>,
#[serde(default)]
limit: Option<i64>,
#[serde(default)]
offset: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct QueryOptimizationRunsQuery {
#[serde(default)]
limit: Option<i64>,
#[serde(default)]
offset: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct ApplyQueryOptimizationRequest {
#[serde(default)]
actor: Option<String>,
}
#[derive(Debug, Deserialize)]
struct SaveAthenaClientConfigRequest {
config: serde_json::Value,
#[serde(default)]
metadata: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct SavePublicGatewayRouteRequest {
route_key: String,
client_name: String,
allowed_ops: Vec<String>,
#[serde(default = "default_true")]
is_active: bool,
#[serde(default)]
metadata: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct PatchPublicGatewayRouteRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
allowed_ops: Option<Vec<String>>,
#[serde(default)]
is_active: Option<bool>,
#[serde(default)]
metadata: Option<serde_json::Value>,
}
fn default_true() -> bool {
true
}
fn auth_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.gateway_auth_client_name.as_ref() else {
return Err(service_unavailable(
"Auth store unavailable",
"No gateway auth client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Auth store unavailable",
format!("Gateway auth client '{}' is not connected.", client_name),
)
})
}
fn client_catalog_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Err(service_unavailable(
"Client catalog unavailable",
"No athena_logging client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client catalog unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})
}
fn normalize_client_name(client_name: &str) -> Result<String, HttpResponse> {
let value = client_name.trim();
if value.is_empty() {
Err(bad_request(
"Invalid client name",
"The client name must not be empty.",
))
} else {
Ok(value.to_string())
}
}
fn normalize_route_key(route_key: &str) -> Result<String, HttpResponse> {
let normalized = route_key.trim().to_ascii_lowercase();
if normalized.is_empty() {
return Err(bad_request(
"Invalid route key",
"route_key must not be empty.",
));
}
let valid_chars = normalized
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_' || c == '/');
if !valid_chars {
return Err(bad_request(
"Invalid route key",
"route_key may only contain [a-z0-9-_/].",
));
}
Ok(normalized)
}
fn normalize_allowed_ops(allowed_ops: &[String]) -> Result<Vec<String>, HttpResponse> {
if allowed_ops.is_empty() {
return Err(bad_request(
"Invalid allowed_ops",
"allowed_ops must contain at least one operation.",
));
}
let mut normalized: Vec<String> = Vec::new();
for op in allowed_ops {
let op = op.trim().to_ascii_lowercase();
let is_valid = matches!(
op.as_str(),
"query" | "fetch" | "insert" | "update" | "delete"
);
if !is_valid {
return Err(bad_request(
"Invalid allowed_ops",
format!(
"Unsupported operation '{}'. Allowed values: query, fetch, insert, update, delete.",
op
),
));
}
if !normalized.contains(&op) {
normalized.push(op);
}
}
normalized.sort();
Ok(normalized)
}
async fn ensure_public_route_client_eligible(
pool: &PgPool,
client_name: &str,
) -> Result<(), HttpResponse> {
let Some(client) = get_athena_client_by_name(pool, client_name)
.await
.map_err(|err| database_error_response("Failed to load Athena client", err))?
else {
return Err(bad_request(
"Unknown client",
format!("No Athena client named '{}' was found.", client_name),
));
};
if !client.is_active {
return Err(bad_request(
"Ineligible client",
format!("Client '{}' is not active.", client_name),
));
}
if client.is_frozen {
return Err(bad_request(
"Ineligible client",
format!("Client '{}' is frozen.", client_name),
));
}
Ok(())
}
fn normalize_required_query_value(field_name: &str, value: &str) -> Result<String, HttpResponse> {
let normalized = value.trim();
if normalized.is_empty() {
Err(bad_request(
"Invalid query parameter",
format!("'{}' must not be empty.", field_name),
))
} else {
Ok(normalized.to_string())
}
}
fn normalize_optional_query_value(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|v| !v.is_empty())
.map(str::to_string)
}
fn parse_client_statistics_status_filter(
status: Option<&str>,
) -> Result<ClientOperationStatusFilter, HttpResponse> {
match status.map(|value| value.trim().to_ascii_lowercase()) {
None => Ok(ClientOperationStatusFilter::All),
Some(value) if value.is_empty() || value == "all" => Ok(ClientOperationStatusFilter::All),
Some(value) if value == "errors" => Ok(ClientOperationStatusFilter::Errors),
Some(value) if value == "normal" => Ok(ClientOperationStatusFilter::Normal),
Some(_) => Err(bad_request(
"Invalid status filter",
"status must be one of: all, errors, normal",
)),
}
}
fn parse_query_optimization_status_filter(
status: Option<&str>,
) -> Result<Option<String>, HttpResponse> {
match status.map(|value| value.trim().to_ascii_lowercase()) {
None => Ok(None),
Some(value) if value.is_empty() || value == "all" => Ok(None),
Some(value) if value == "open" || value == "applied" || value == "dismissed" => {
Ok(Some(value))
}
Some(_) => Err(bad_request(
"Invalid status filter",
"status must be one of: all, open, applied, dismissed",
)),
}
}
fn client_record_to_runtime_target(record: &AthenaClientRecord) -> ClientConnectionTarget {
ClientConnectionTarget {
client_name: record.client_name.clone(),
source: record.source.clone(),
description: record.description.clone(),
pg_uri: record.pg_uri.clone(),
pg_uri_env_var: record.pg_uri_env_var.clone(),
config_uri_template: record.config_uri_template.clone(),
is_active: record.is_active,
is_frozen: record.is_frozen,
}
}
fn scoped_config_from_boot(
base: &Config,
client_name: &str,
client_record: Option<&AthenaClientRecord>,
) -> serde_json::Value {
let mut scoped: Config = base.clone();
let mut postgres_clients: Vec<HashMap<String, String>> = Vec::new();
if let Some(logging_client) = base.get_gateway_logging_client() {
let mut map: HashMap<String, String> = HashMap::new();
let value = base
.get_postgres_uri(logging_client)
.cloned()
.unwrap_or_else(|| "${POSTGRES_ATHENA_LOGGING_URI}".to_string());
map.insert(logging_client.clone(), value);
postgres_clients.push(map);
}
let client_value = if let Some(record) = client_record {
record
.config_uri_template
.clone()
.or_else(|| {
record
.pg_uri_env_var
.as_ref()
.map(|key| format!("${{{}}}", key))
})
.or_else(|| record.pg_uri.clone())
.or_else(|| base.get_postgres_uri(client_name).cloned())
} else {
base.get_postgres_uri(client_name).cloned()
};
if let Some(value) = client_value {
let mut map: HashMap<String, String> = HashMap::new();
map.insert(client_name.to_string(), value);
postgres_clients.push(map);
}
scoped.postgres_clients = postgres_clients;
serde_json::to_value(scoped).unwrap_or_else(|_| json!({}))
}
fn empty_config() -> Config {
Config {
urls: Vec::new(),
hosts: Vec::new(),
api: Vec::new(),
authenticator: Vec::new(),
postgres_clients: Vec::new(),
gateway: Vec::new(),
backup: Vec::new(),
}
}
fn load_boot_config_fallback() -> Config {
Config::load_default()
.map(|outcome| outcome.config)
.unwrap_or_else(|_| empty_config())
}
async fn apply_client_record_to_runtime(
state: &AppState,
record: &AthenaClientRecord,
) -> Result<(), anyhow::Error> {
let target = client_record_to_runtime_target(record);
if !record.is_active || record.is_frozen {
state.pg_registry.remember_client(target, false);
state.pg_registry.mark_unavailable(&record.client_name);
state.pg_registry.sync_connection_status();
return Ok(());
}
if state.pg_registry.get_pool(&record.client_name).is_some() {
state.pg_registry.remember_client(target, true);
state.pg_registry.sync_connection_status();
return Ok(());
}
state.pg_registry.upsert_client(target).await?;
state.pg_registry.sync_connection_status();
Ok(())
}
fn parse_optional_timestamp(
value: Option<String>,
field_name: &str,
) -> Result<Option<DateTime<Utc>>, HttpResponse> {
match value {
Some(value) => DateTime::parse_from_rfc3339(&value)
.map(|timestamp| Some(timestamp.with_timezone(&Utc)))
.map_err(|_| {
bad_request(
"Invalid timestamp",
format!("'{}' must be an RFC3339 timestamp.", field_name),
)
}),
None => Ok(None),
}
}
fn apply_optional_timestamp_patch(
patch_value: Option<Option<String>>,
current: Option<DateTime<Utc>>,
field_name: &str,
) -> Result<Option<DateTime<Utc>>, HttpResponse> {
match patch_value {
None => Ok(current),
Some(None) => Ok(None),
Some(Some(value)) => parse_optional_timestamp(Some(value), field_name),
}
}
fn database_error_response(message: &str, err: sqlx::Error) -> HttpResponse {
let unique_violation = err
.as_database_error()
.and_then(|db_err| db_err.code().map(|code| code == "23505"))
.unwrap_or(false);
if unique_violation {
conflict(message, err.to_string())
} else {
internal_error(message, err.to_string())
}
}
fn api_key_store_error_response(message: &str, err: ApiKeyStoreError) -> HttpResponse {
match err {
ApiKeyStoreError::MissingRights(missing) => bad_request(
"Unknown API key rights",
format!("These rights do not exist: {}", missing.join(", ")),
),
ApiKeyStoreError::Database(err) => database_error_response(message, err),
}
}
fn generate_public_id() -> String {
Uuid::new_v4()
.simple()
.to_string()
.chars()
.take(16)
.collect()
}
fn generate_secret() -> String {
format!("{}{}", Uuid::new_v4().simple(), Uuid::new_v4().simple())
}
#[get("/admin/api-keys")]
async fn admin_list_api_keys(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match list_api_keys(&pool).await {
Ok(keys) => api_success("Listed API keys", json!({ "api_keys": keys })),
Err(err) => database_error_response("Failed to list API keys", err),
}
}
#[post("/admin/api-keys")]
async fn admin_create_api_key(
req: HttpRequest,
body: Json<CreateApiKeyRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let expires_at = match parse_optional_timestamp(body.expires_at.clone(), "expires_at") {
Ok(value) => value,
Err(resp) => return resp,
};
let public_id = generate_public_id();
let secret = generate_secret();
let key_salt = format!("{}{}", Uuid::new_v4().simple(), Uuid::new_v4().simple());
let key_hash = digest(format!("{}:{}", key_salt, secret));
let params = CreateApiKeyParams {
public_id: public_id.clone(),
name: body.name.clone(),
description: body.description.clone(),
client_name: body.client_name.clone(),
key_salt,
key_hash,
expires_at,
rights: body.rights.clone(),
};
match create_api_key(&pool, params).await {
Ok(record) => api_created(
"Created API key",
json!({
"api_key": format!("ath_{}.{}", public_id, secret),
"record": record,
}),
),
Err(err) => api_key_store_error_response("Failed to create API key", err),
}
}
#[patch("/admin/api-keys/{id}")]
async fn admin_update_api_key(
req: HttpRequest,
path: Path<String>,
body: Json<UpdateApiKeyRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let id = match Uuid::parse_str(&path.into_inner()) {
Ok(value) => value,
Err(_) => {
return bad_request("Invalid API key id", "The API key id must be a UUID.");
}
};
let Some(existing) = (match get_api_key(&pool, id).await {
Ok(record) => record,
Err(err) => return database_error_response("Failed to load API key", err),
}) else {
return not_found(
"API key not found",
format!("No API key exists for '{}'.", id),
);
};
let expires_at = match apply_optional_timestamp_patch(
body.expires_at.clone(),
existing.expires_at,
"expires_at",
) {
Ok(value) => value,
Err(resp) => return resp,
};
let params = SaveApiKeyParams {
id,
name: body.name.clone().unwrap_or(existing.name),
description: body.description.clone().unwrap_or(existing.description),
client_name: body.client_name.clone().unwrap_or(existing.client_name),
expires_at,
is_active: body.is_active.unwrap_or(existing.is_active),
rights: body.rights.clone().unwrap_or(existing.rights),
};
match save_api_key(&pool, params).await {
Ok(Some(record)) => api_success("Updated API key", json!({ "record": record })),
Ok(None) => not_found(
"API key not found",
format!("No API key exists for '{}'.", id),
),
Err(err) => api_key_store_error_response("Failed to update API key", err),
}
}
#[delete("/admin/api-keys/{id}")]
async fn admin_delete_api_key(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let id = match Uuid::parse_str(&path.into_inner()) {
Ok(value) => value,
Err(_) => {
return bad_request("Invalid API key id", "The API key id must be a UUID.");
}
};
match delete_api_key(&pool, id).await {
Ok(true) => api_success("Deleted API key", json!({ "id": id })),
Ok(false) => not_found(
"API key not found",
format!("No API key exists for '{}'.", id),
),
Err(err) => database_error_response("Failed to delete API key", err),
}
}
#[get("/admin/api-key-rights")]
async fn admin_list_api_key_rights(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match list_api_key_rights(&pool).await {
Ok(rights) => api_success("Listed API key rights", json!({ "rights": rights })),
Err(err) => database_error_response("Failed to list API key rights", err),
}
}
#[post("/admin/api-key-rights")]
async fn admin_create_api_key_right(
req: HttpRequest,
body: Json<SaveApiKeyRightRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match create_api_key_right(&pool, &body.name, body.description.as_deref()).await {
Ok(record) => api_created("Created API key right", json!({ "right": record })),
Err(err) => database_error_response("Failed to create API key right", err),
}
}
#[patch("/admin/api-key-rights/{id}")]
async fn admin_update_api_key_right(
req: HttpRequest,
path: Path<String>,
body: Json<SaveApiKeyRightRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let id = match Uuid::parse_str(&path.into_inner()) {
Ok(value) => value,
Err(_) => {
return bad_request("Invalid API key right id", "The right id must be a UUID.");
}
};
match update_api_key_right(&pool, id, &body.name, body.description.as_deref()).await {
Ok(Some(record)) => api_success("Updated API key right", json!({ "right": record })),
Ok(None) => not_found(
"API key right not found",
format!("No right exists for '{}'.", id),
),
Err(err) => database_error_response("Failed to update API key right", err),
}
}
#[delete("/admin/api-key-rights/{id}")]
async fn admin_delete_api_key_right(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let id = match Uuid::parse_str(&path.into_inner()) {
Ok(value) => value,
Err(_) => {
return bad_request("Invalid API key right id", "The right id must be a UUID.");
}
};
match delete_api_key_right(&pool, id).await {
Ok(true) => api_success("Deleted API key right", json!({ "id": id })),
Ok(false) => not_found(
"API key right not found",
format!("No right exists for '{}'.", id),
),
Err(err) => database_error_response("Failed to delete API key right", err),
}
}
#[get("/admin/api-key-config")]
async fn admin_get_api_key_config(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let global = match get_global_api_key_config(&pool).await {
Ok(value) => value,
Err(err) => return database_error_response("Failed to load API key config", err),
};
let clients = match list_client_api_key_configs(&pool).await {
Ok(value) => value,
Err(err) => return database_error_response("Failed to load API key client config", err),
};
api_success(
"Loaded API key config",
json!({
"global": global,
"clients": clients,
}),
)
}
#[put("/admin/api-key-config")]
async fn admin_set_api_key_config(
req: HttpRequest,
body: Json<SaveApiKeyEnforcementRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match set_global_api_key_config(&pool, body.enforce_api_keys).await {
Ok(global) => api_success("Updated API key config", json!({ "global": global })),
Err(err) => database_error_response("Failed to update API key config", err),
}
}
#[get("/admin/api-key-clients")]
async fn admin_list_api_key_clients(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match list_client_api_key_configs(&pool).await {
Ok(clients) => api_success(
"Listed API key client config",
json!({ "clients": clients }),
),
Err(err) => database_error_response("Failed to list API key client config", err),
}
}
#[put("/admin/api-key-clients/{client_name}")]
async fn admin_upsert_api_key_client(
req: HttpRequest,
path: Path<String>,
body: Json<SaveApiKeyEnforcementRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool: sqlx::Pool<sqlx::Postgres> = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name: String = path.into_inner();
if client_name.trim().is_empty() {
return bad_request("Invalid client name", "The client name must not be empty.");
}
match upsert_client_api_key_config(&pool, &client_name, body.enforce_api_keys).await {
Ok(record) => api_success("Updated API key client config", json!({ "client": record })),
Err(err) => database_error_response("Failed to update API key client config", err),
}
}
#[delete("/admin/api-key-clients/{client_name}")]
async fn admin_delete_api_key_client(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool: sqlx::Pool<sqlx::Postgres> = match auth_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = path.into_inner();
match delete_client_api_key_config(&pool, &client_name).await {
Ok(true) => api_success(
"Deleted API key client config",
json!({ "client_name": client_name }),
),
Ok(false) => not_found(
"API key client config not found",
format!("No client config exists for '{}'.", client_name),
),
Err(err) => database_error_response("Failed to delete API key client config", err),
}
}
#[get("/admin/clients")]
async fn admin_list_clients(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let runtime_clients = app_state.pg_registry.list_registered_clients();
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(_) => {
return api_success(
"Listed runtime Athena clients",
json!({ "clients": runtime_clients }),
);
}
};
match list_athena_clients(&pool).await {
Ok(clients) => api_success(
"Listed Athena clients",
json!({
"clients": clients,
"runtime": runtime_clients,
}),
),
Err(err) => database_error_response("Failed to list Athena clients", err),
}
}
#[post("/admin/clients")]
async fn admin_create_client(
req: HttpRequest,
body: Json<SaveAthenaClientRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool: sqlx::Pool<sqlx::Postgres> = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(body.client_name.as_deref().unwrap_or("")) {
Ok(value) => value,
Err(_) => {
return bad_request(
"Missing client name",
"Provide client_name in the request body when creating a client.",
);
}
};
if body.pg_uri.as_deref().unwrap_or("").trim().is_empty()
&& body
.pg_uri_env_var
.as_deref()
.unwrap_or("")
.trim()
.is_empty()
{
return bad_request(
"Missing connection details",
"Provide either pg_uri or pg_uri_env_var for database-backed clients.",
);
}
match get_athena_client_by_name(&pool, &client_name).await {
Ok(Some(_)) => {
return conflict(
"Athena client already exists",
format!("A client named '{}' already exists.", client_name),
);
}
Ok(None) => {}
Err(err) => return database_error_response("Failed to load Athena client", err),
}
let record = match upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name,
description: body.description.clone(),
pg_uri: body.pg_uri.clone(),
pg_uri_env_var: body.pg_uri_env_var.clone(),
config_uri_template: None,
source: "database".to_string(),
is_active: body.is_active.unwrap_or(true),
is_frozen: false,
metadata: json!({ "managed_by": "admin_api" }),
},
)
.await
{
Ok(record) => record,
Err(err) => return database_error_response("Failed to create Athena client", err),
};
if let Err(err) = apply_client_record_to_runtime(app_state.get_ref(), &record).await {
return internal_error("Failed to activate Athena client", err.to_string());
}
api_created("Created Athena client", json!({ "client": record }))
}
#[patch("/admin/clients/{client_name}")]
async fn admin_update_client(
req: HttpRequest,
path: Path<String>,
body: Json<SaveAthenaClientRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let existing = match get_athena_client_by_name(&pool, &client_name).await {
Ok(Some(record)) => record,
Ok(None) => {
return not_found(
"Athena client not found",
format!("No client exists for '{}'.", client_name),
);
}
Err(err) => return database_error_response("Failed to load Athena client", err),
};
let record = match upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: client_name.clone(),
description: body.description.clone().or(existing.description.clone()),
pg_uri: body.pg_uri.clone().or(existing.pg_uri.clone()),
pg_uri_env_var: body
.pg_uri_env_var
.clone()
.or(existing.pg_uri_env_var.clone()),
config_uri_template: existing.config_uri_template.clone(),
source: existing.source.clone(),
is_active: body.is_active.unwrap_or(existing.is_active),
is_frozen: existing.is_frozen,
metadata: existing.metadata.clone(),
},
)
.await
{
Ok(record) => record,
Err(err) => return database_error_response("Failed to update Athena client", err),
};
if let Err(err) = apply_client_record_to_runtime(app_state.get_ref(), &record).await {
return internal_error("Failed to refresh Athena client", err.to_string());
}
api_success("Updated Athena client", json!({ "client": record }))
}
#[put("/admin/clients/{client_name}/freeze")]
async fn admin_freeze_client(
req: HttpRequest,
path: Path<String>,
body: Json<FreezeAthenaClientRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let record = match set_client_frozen_state(&pool, &client_name, body.is_frozen).await {
Ok(Some(record)) => record,
Ok(None) => {
return not_found(
"Athena client not found",
format!("No client exists for '{}'.", client_name),
);
}
Err(err) => return database_error_response("Failed to update Athena client", err),
};
if let Err(err) = apply_client_record_to_runtime(app_state.get_ref(), &record).await {
return internal_error("Failed to refresh Athena client", err.to_string());
}
api_success(
"Updated Athena client freeze state",
json!({ "client": record }),
)
}
#[delete("/admin/clients/{client_name}")]
async fn admin_delete_client(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
match delete_athena_client(&pool, &client_name).await {
Ok(Some(record)) => {
app_state.pg_registry.remove_client(&record.client_name);
api_success("Deleted Athena client", json!({ "client": record }))
}
Ok(None) => not_found(
"Athena client not found",
format!("No client exists for '{}'.", client_name),
),
Err(err) => database_error_response("Failed to delete Athena client", err),
}
}
#[get("/admin/clients/statistics")]
async fn admin_list_client_statistics(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match list_client_statistics(&pool).await {
Ok(statistics) => api_success(
"Listed client statistics",
json!({ "statistics": statistics }),
),
Err(err) => database_error_response("Failed to list client statistics", err),
}
}
#[post("/admin/clients/statistics/refresh")]
async fn admin_refresh_client_statistics(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = refresh_client_statistics(&pool).await {
return database_error_response("Failed to refresh client statistics", err);
}
match list_client_statistics(&pool).await {
Ok(statistics) => api_success(
"Refreshed client statistics",
json!({ "statistics": statistics }),
),
Err(err) => database_error_response("Failed to list client statistics", err),
}
}
#[get("/admin/clients/{client_name}/statistics")]
async fn admin_get_client_statistics(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let statistics = match get_client_statistics(&pool, &client_name).await {
Ok(Some(record)) => record,
Ok(None) => {
return not_found(
"Client statistics not found",
format!("No statistics exist for '{}'.", client_name),
);
}
Err(err) => return database_error_response("Failed to load client statistics", err),
};
match list_client_table_statistics(&pool, &client_name).await {
Ok(table_statistics) => api_success(
"Loaded client statistics",
json!({
"statistics": statistics,
"tables": table_statistics,
}),
),
Err(err) => database_error_response("Failed to load client table statistics", err),
}
}
#[get("/admin/clients/{client_name}/statistics/drilldown")]
async fn admin_get_client_statistics_drilldown(
req: HttpRequest,
path: Path<String>,
query: Query<ClientStatisticsDrilldownQuery>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = match normalize_required_query_value("table_name", &query.table_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let operation = match normalize_required_query_value("operation", &query.operation) {
Ok(value) => value,
Err(resp) => return resp,
};
let status_filter = match parse_client_statistics_status_filter(query.status.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let limit = query.limit.unwrap_or(100).clamp(1, 500);
let offset = query.offset.unwrap_or(0).max(0);
match list_client_operation_drilldown(
&pool,
&client_name,
&table_name,
&operation,
status_filter,
limit,
offset,
)
.await
{
Ok(rows) => api_success(
"Loaded client statistics drilldown",
json!({
"client_name": client_name,
"table_name": table_name,
"operation": operation,
"status": query.status.as_deref().map(str::trim).filter(|s| !s.is_empty()).unwrap_or("all"),
"limit": limit,
"offset": offset,
"rows": rows,
}),
),
Err(err) => database_error_response("Failed to load client statistics drilldown", err),
}
}
#[get("/admin/clients/{client_name}/query-optimizations")]
async fn admin_list_client_query_optimizations(
req: HttpRequest,
path: Path<String>,
query: Query<QueryOptimizationListQuery>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let status = match parse_query_optimization_status_filter(query.status.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let limit = query.limit.unwrap_or(100).clamp(1, 500);
let offset = query.offset.unwrap_or(0).max(0);
match list_query_optimization_recommendations(
&pool,
&client_name,
status.as_deref(),
limit,
offset,
)
.await
{
Ok(recommendations) => api_success(
"Loaded query optimization recommendations",
json!({
"client_name": client_name,
"status": status.unwrap_or_else(|| "all".to_string()),
"limit": limit,
"offset": offset,
"recommendations": recommendations,
}),
),
Err(err) => {
database_error_response("Failed to load query optimization recommendations", err)
}
}
}
#[post("/admin/clients/{client_name}/query-optimizations/refresh")]
async fn admin_refresh_client_query_optimizations(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
match refresh_query_optimization_recommendations(&pool, &client_name, "manual_admin", None)
.await
{
Ok((run, recommendations)) => api_success(
"Refreshed query optimization recommendations",
json!({
"client_name": client_name,
"run": run,
"recommendations": recommendations,
}),
),
Err(err) => {
database_error_response("Failed to refresh query optimization recommendations", err)
}
}
}
#[get("/admin/clients/{client_name}/query-optimizations/runs")]
async fn admin_list_client_query_optimization_runs(
req: HttpRequest,
path: Path<String>,
query: Query<QueryOptimizationRunsQuery>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let limit = query.limit.unwrap_or(100).clamp(1, 500);
let offset = query.offset.unwrap_or(0).max(0);
match list_query_optimization_runs(&pool, &client_name, limit, offset).await {
Ok(runs) => api_success(
"Loaded query optimization runs",
json!({
"client_name": client_name,
"limit": limit,
"offset": offset,
"runs": runs,
}),
),
Err(err) => database_error_response("Failed to load query optimization runs", err),
}
}
#[post("/admin/clients/{client_name}/query-optimizations/{recommendation_id}/apply")]
async fn admin_apply_client_query_optimization(
req: HttpRequest,
path: Path<(String, i64)>,
body: Option<Json<ApplyQueryOptimizationRequest>>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let (raw_client_name, recommendation_id) = path.into_inner();
let client_name = match normalize_client_name(&raw_client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let actor = body
.as_ref()
.and_then(|payload| payload.actor.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty());
match apply_query_optimization_recommendation(&pool, &client_name, recommendation_id, actor)
.await
{
Ok(Some(result)) => api_success(
"Applied query optimization recommendation",
json!({
"client_name": client_name,
"result": result,
}),
),
Ok(None) => not_found(
"Query optimization recommendation not found",
format!(
"No recommendation exists for id '{}' and client '{}'.",
recommendation_id, client_name
),
),
Err(err) => {
database_error_response("Failed to apply query optimization recommendation", err)
}
}
}
#[get("/admin/vacuum-health")]
async fn admin_list_vacuum_health(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match list_latest_vacuum_health_summaries(&pool).await {
Ok(snapshots) => api_success(
"Loaded vacuum health snapshots",
json!({
"snapshots": snapshots,
}),
),
Err(err) => database_error_response("Failed to load vacuum health snapshots", err),
}
}
#[get("/admin/vacuum-health/{client_name}")]
async fn admin_get_vacuum_health(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
match get_latest_vacuum_health_detail(&pool, &client_name).await {
Ok(Some((snapshot, tables))) => api_success(
"Loaded vacuum health detail",
json!({
"snapshot": snapshot,
"tables": tables,
}),
),
Ok(None) => not_found(
"Vacuum health not found",
format!("No vacuum health snapshots exist for '{}'.", client_name),
),
Err(err) => database_error_response("Failed to load vacuum health detail", err),
}
}
#[get("/admin/admission-events")]
async fn admin_list_admission_events(
req: HttpRequest,
query: Query<AdmissionEventsQuery>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let decision = normalize_optional_query_value(query.decision.as_deref());
let client = normalize_optional_query_value(query.client.as_deref());
let limit = query.limit.unwrap_or(100).clamp(1, 500);
let offset = query.offset.unwrap_or(0).max(0);
match list_admission_events(&pool, decision.as_deref(), client.as_deref(), limit, offset).await
{
Ok(events) => api_success(
"Loaded admission events",
json!({
"decision": decision,
"client": client,
"limit": limit,
"offset": offset,
"events": events,
}),
),
Err(err) => database_error_response("Failed to load admission events", err),
}
}
#[get("/admin/client-configs")]
async fn admin_list_client_configs(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = ensure_athena_client_config_table(&pool).await {
return database_error_response("Failed to initialize client configs", err);
}
match list_athena_client_configs(&pool).await {
Ok(configs) => api_success(
"Loaded Athena client configs",
json!({ "configs": configs }),
),
Err(err) => database_error_response("Failed to list Athena client configs", err),
}
}
#[get("/admin/client-configs/{client_name}")]
async fn admin_get_client_config(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = ensure_athena_client_config_table(&pool).await {
return database_error_response("Failed to initialize client configs", err);
}
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
match get_athena_client_config_by_name(&pool, &client_name).await {
Ok(Some(record)) => api_success(
"Loaded Athena client config",
json!({
"client_name": client_name,
"source": "database",
"config": record.config,
"record": record,
}),
),
Ok(None) => {
let client_record = match get_athena_client_by_name(&pool, &client_name).await {
Ok(record) => record,
Err(err) => return database_error_response("Failed to load Athena client", err),
};
let boot_config = load_boot_config_fallback();
let config =
scoped_config_from_boot(&boot_config, &client_name, client_record.as_ref());
api_success(
"Loaded Athena client config",
json!({
"client_name": client_name,
"source": "config.yaml",
"config": config,
}),
)
}
Err(err) => database_error_response("Failed to load Athena client config", err),
}
}
#[put("/admin/client-configs/{client_name}")]
async fn admin_upsert_client_config(
req: HttpRequest,
path: Path<String>,
body: Json<SaveAthenaClientConfigRequest>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = ensure_athena_client_config_table(&pool).await {
return database_error_response("Failed to initialize client configs", err);
}
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
if !body.config.is_object() {
return bad_request(
"Invalid config payload",
"config must be a JSON object that mirrors config.yaml shape.",
);
}
let metadata = body
.metadata
.clone()
.unwrap_or_else(|| json!({ "managed_by": "admin_api" }));
match upsert_athena_client_config(&pool, &client_name, body.config.clone(), metadata).await {
Ok(record) => api_success(
"Saved Athena client config",
json!({
"client_name": client_name,
"source": "database",
"record": record,
}),
),
Err(err) => database_error_response("Failed to save Athena client config", err),
}
}
#[delete("/admin/client-configs/{client_name}")]
async fn admin_delete_client_config(
req: HttpRequest,
path: Path<String>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = ensure_athena_client_config_table(&pool).await {
return database_error_response("Failed to initialize client configs", err);
}
let client_name = match normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
match delete_athena_client_config_by_name(&pool, &client_name).await {
Ok(Some(record)) => api_success(
"Deleted Athena client config",
json!({
"client_name": client_name,
"record": record,
}),
),
Ok(None) => not_found(
"Athena client config not found",
format!("No client config exists for '{}'.", client_name),
),
Err(err) => database_error_response("Failed to delete Athena client config", err),
}
}
#[post("/admin/client-configs/seed-from-config")]
async fn admin_seed_client_configs_from_boot(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
if let Err(err) = ensure_athena_client_config_table(&pool).await {
return database_error_response("Failed to initialize client configs", err);
}
let catalog_clients = match list_athena_clients(&pool).await {
Ok(value) => value,
Err(err) => return database_error_response("Failed to list Athena clients", err),
};
let catalog_map: HashMap<String, AthenaClientRecord> = catalog_clients
.into_iter()
.map(|record| (record.client_name.to_ascii_lowercase(), record))
.collect();
let boot_config = load_boot_config_fallback();
let mut client_names: HashSet<String> = HashSet::new();
for map in &boot_config.postgres_clients {
client_names.extend(map.keys().cloned());
}
client_names.extend(
catalog_map
.values()
.map(|record| record.client_name.clone()),
);
if let Some(logging_client) = boot_config.get_gateway_logging_client() {
client_names.remove(logging_client);
}
let mut seeded: Vec<String> = Vec::new();
for client_name in client_names {
let client_record = catalog_map.get(&client_name.to_ascii_lowercase());
let config = scoped_config_from_boot(&boot_config, &client_name, client_record);
if upsert_athena_client_config(
&pool,
&client_name,
config,
json!({
"managed_by": "admin_api",
"seeded_from": "config.yaml",
}),
)
.await
.is_ok()
{
seeded.push(client_name);
}
}
seeded.sort_by_key(|name| name.to_ascii_lowercase());
api_success(
"Seeded Athena client configs from config.yaml",
json!({
"seeded_count": seeded.len(),
"clients": seeded,
}),
)
}
#[get("/admin/public-routes")]
async fn admin_list_public_routes(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match list_public_gateway_routes(&pool).await {
Ok(routes) => api_success("Listed public routes", json!({ "routes": routes })),
Err(err) => database_error_response("Failed to list public routes", err),
}
}
#[post("/admin/public-routes")]
async fn admin_create_public_route(
req: HttpRequest,
app_state: Data<AppState>,
body: Json<SavePublicGatewayRouteRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let route_key = match normalize_route_key(&body.route_key) {
Ok(value) => value,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(&body.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let allowed_ops = match normalize_allowed_ops(&body.allowed_ops) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = ensure_public_route_client_eligible(&pool, &client_name).await {
return resp;
}
let params = SavePublicGatewayRouteParams {
route_key,
client_name,
allowed_ops,
is_active: body.is_active,
metadata: body.metadata.clone().unwrap_or_else(|| json!({})),
};
match create_public_gateway_route(&pool, params).await {
Ok(route) => api_created("Created public route", json!({ "route": route })),
Err(err) => database_error_response("Failed to create public route", err),
}
}
#[patch("/admin/public-routes/{route_key}")]
async fn admin_patch_public_route(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<String>,
body: Json<PatchPublicGatewayRouteRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let route_key = match normalize_route_key(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Some(ref next_client_name) = body.client_name {
let normalized = match normalize_client_name(next_client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = ensure_public_route_client_eligible(&pool, &normalized).await {
return resp;
}
}
let normalized_client_name = match body.client_name.as_ref() {
Some(value) => match normalize_client_name(value) {
Ok(v) => Some(v),
Err(resp) => return resp,
},
None => None,
};
let normalized_allowed_ops = match body.allowed_ops.as_ref() {
Some(value) => match normalize_allowed_ops(value) {
Ok(v) => Some(v),
Err(resp) => return resp,
},
None => None,
};
let patch = PatchPublicGatewayRouteParams {
client_name: normalized_client_name,
allowed_ops: normalized_allowed_ops,
is_active: body.is_active,
metadata: body.metadata.clone(),
};
match patch_public_gateway_route(&pool, &route_key, patch).await {
Ok(Some(route)) => api_success("Updated public route", json!({ "route": route })),
Ok(None) => not_found(
"Public route not found",
format!("No public route '{}'.", route_key),
),
Err(err) => database_error_response("Failed to update public route", err),
}
}
#[delete("/admin/public-routes/{route_key}")]
async fn admin_delete_public_route(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<String>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let route_key = match normalize_route_key(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
match soft_delete_public_gateway_route(&pool, &route_key).await {
Ok(Some(route)) => api_success("Deleted public route", json!({ "route": route })),
Ok(None) => not_found(
"Public route not found",
format!("No public route '{}'.", route_key),
),
Err(err) => database_error_response("Failed to delete public route", err),
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_list_api_keys)
.service(admin_create_api_key)
.service(admin_update_api_key)
.service(admin_delete_api_key)
.service(admin_list_api_key_rights)
.service(admin_create_api_key_right)
.service(admin_update_api_key_right)
.service(admin_delete_api_key_right)
.service(admin_get_api_key_config)
.service(admin_set_api_key_config)
.service(admin_list_api_key_clients)
.service(admin_upsert_api_key_client)
.service(admin_delete_api_key_client)
.service(admin_list_clients)
.service(admin_create_client)
.service(admin_update_client)
.service(admin_freeze_client)
.service(admin_delete_client)
.service(admin_list_client_statistics)
.service(admin_refresh_client_statistics)
.service(admin_get_client_statistics)
.service(admin_get_client_statistics_drilldown)
.service(admin_list_client_query_optimizations)
.service(admin_refresh_client_query_optimizations)
.service(admin_list_client_query_optimization_runs)
.service(admin_apply_client_query_optimization)
.service(admin_list_vacuum_health)
.service(admin_get_vacuum_health)
.service(admin_list_admission_events)
.service(admin_list_client_configs)
.service(admin_get_client_config)
.service(admin_upsert_client_config)
.service(admin_delete_client_config)
.service(admin_seed_client_configs_from_boot)
.service(admin_list_public_routes)
.service(admin_create_public_route)
.service(admin_patch_public_route)
.service(admin_delete_public_route);
}