use actix_web::{HttpRequest, HttpResponse, get, web};
use reqwest::Url;
use serde::Deserialize;
use serde_json::{Map, Value, json};
use sqlx::PgPool;
use sqlx::postgres::PgPoolOptions;
use std::collections::BTreeMap;
use std::time::Instant;
use crate::AppState;
use crate::api::client_context::{auth_pool, logging_pool};
use crate::api::debug::shared::{
DEBUG_GATEWAY_ENDPOINT_PATH, DEBUG_JWT_SECRET_ENV, DEBUG_PROBE_TIMEOUT, PrivilegedDebugAccess,
RelationResolutionSpec, build_debug_payload, build_origin_policy_debug,
build_privileged_debug_payload, build_relation_resolution_rows, build_request_transport_debug,
computed_base_url, evaluate_privileged_debug_access, header_value_trimmed,
normalize_tenant_header_value, probe_pg_context,
};
use crate::api::direct_pg_uri_auth_bypass_enabled;
use crate::api::gateway::auth::{read_right_for_resource, require_admin_or_gateway};
use crate::api::headers::request_context::{
ResolvedAthenaClientSource, resolved_athena_client, resolved_athena_client_source,
resolved_athena_route_target_url,
};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_jdbc_url::{
direct_postgres_client_token, jdbc_to_postgres_url, validate_postgres_target, x_jdbc_url,
};
use crate::athena::contracts::AthenaConnection;
use crate::athena::postgres_clients::ensure_catalog_database_client_loaded;
use crate::athena::resolver::{
AthenaClientResolveError, AthenaResolvedQueryBackend, resolve_query_backend,
};
use crate::data::clients::{AthenaClientRecord, list_athena_clients};
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;
use crate::drivers::supabase::{client_router_health_aware, known_supabase_connection_info};
use crate::utils::sqlx_postgres_connect_uri::sanitize_sqlx_postgres_connect_uri;
const GATEWAY_DEBUG_ENV_KEYS: &[&str] = &[
"ATHENA_KEY_12",
DEBUG_JWT_SECRET_ENV,
"JWT_SECRET",
"ATHENA_ALLOW_DIRECT_PG_URI_WITHOUT_API_KEY",
"ATHENA_WILDCARD_HOST_PATTERN",
"ATHENA_WILDCARD_HOST_ROUTING_ENABLED",
"ATHENA_RATE_LIMIT_TRUST_X_FORWARDED_FOR",
"ATHENA_LOGGING_TRUST_X_FORWARDED_FOR",
"ATHENA_CORS_ALLOWED_ORIGINS",
];
#[derive(Debug, Deserialize)]
pub struct GatewayDebugQuery {
#[serde(default)]
jwt_secret: Option<String>,
}
#[derive(Debug)]
enum GatewayTarget {
DirectPostgres {
client_token: String,
postgres_url: String,
redacted_postgres_url: String,
},
RegisteredPostgres {
client_name: String,
registered: RegisteredClient,
pool: Option<PgPool>,
},
D1 {
client_name: String,
connection: Value,
},
Scylla {
client_name: String,
connection: Value,
},
Supabase {
client_name: String,
connection: Value,
},
}
#[derive(Debug)]
struct GatewayResolution {
configured_header_value: Option<String>,
resolved_tenant: Option<String>,
resolution_status: &'static str,
resolution_mode: Option<&'static str>,
resolution_error: Option<String>,
fallback_header_values: Value,
target: Option<GatewayTarget>,
}
#[get("/debug/gateway")]
pub async fn debug_gateway(
req: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<GatewayDebugQuery>,
) -> HttpResponse {
if let Err(resp) = require_admin_or_gateway(
&req,
app_state.get_ref(),
None,
vec![read_right_for_resource(None)],
)
.await
{
return resp;
}
let privileged_access = evaluate_privileged_debug_access(query.jwt_secret.as_deref());
let resolution = resolve_gateway_target(&req, app_state.get_ref()).await;
let tenant_configs =
build_gateway_tenant_configs(app_state.get_ref(), privileged_access.enabled).await;
let current_tenant_probe = build_gateway_probe(app_state.get_ref(), &resolution).await;
let probe_status = current_tenant_probe
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown");
let runtime = build_gateway_runtime(
&req,
app_state.get_ref(),
&privileged_access,
tenant_configs.get("known_tenants"),
);
let request = build_gateway_request_payload(
&req,
app_state.get_ref(),
&resolution,
query.jwt_secret.is_some(),
);
let privileged_debug = build_privileged_debug_payload(
app_state.get_ref(),
&privileged_access,
GATEWAY_DEBUG_ENV_KEYS,
json!({
"request_base_url": computed_base_url(&req),
"direct_pg_uri_auth_bypass_enabled": direct_pg_uri_auth_bypass_enabled(),
}),
);
HttpResponse::Ok().json(build_debug_payload(
"athena-gateway",
DEBUG_GATEWAY_ENDPOINT_PATH,
top_level_status(resolution.resolution_status, probe_status),
privileged_debug,
runtime,
request,
tenant_configs,
current_tenant_probe,
))
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(debug_gateway);
}
fn resolved_client_source_payload(source: Option<ResolvedAthenaClientSource>) -> Value {
match source {
Some(source) => json!({
"source": source.source,
"outcome": source.outcome,
"wildcard_host_pattern": source.wildcard_host_pattern,
"matched_host": source.matched_host,
"wildcard_prefix": source.wildcard_prefix,
"route_key": source.route_key,
"tenant_ref": source.tenant_ref,
"service_key": source.service_key,
"major_version": source.major_version,
}),
None => Value::Null,
}
}
fn wildcard_host_candidate_row(label: &str, value: Option<String>, pattern: &str) -> Value {
let extracted_prefix = value
.as_deref()
.and_then(|host| crate::api::host_routing::extract_prefix_from_host(host, pattern));
json!({
"source": label,
"value": value,
"matches_wildcard_pattern": extracted_prefix.is_some(),
"extracted_prefix": extracted_prefix,
})
}
fn first_forwarded_host_value(req: &HttpRequest) -> Option<String> {
header_value_trimmed(req, "x-forwarded-host")
.and_then(|value| value.split(',').next().map(str::trim).map(str::to_string))
.filter(|value| !value.is_empty())
}
fn forwarded_header_host_value(req: &HttpRequest) -> Option<String> {
let forwarded = header_value_trimmed(req, "forwarded")?;
forwarded
.split(',')
.flat_map(|entry| entry.split(';'))
.filter_map(|part| part.trim().split_once('='))
.find_map(|(key, value)| {
if !key.trim().eq_ignore_ascii_case("host") {
return None;
}
let host = value.trim().trim_matches('"').trim();
if host.is_empty() {
None
} else {
Some(host.to_string())
}
})
}
fn wildcard_host_resolution_debug(req: &HttpRequest) -> Value {
let pattern = crate::api::host_routing::wildcard_host_pattern();
let configured_header_value = header_value_trimmed(req, "x-athena-client");
let direct_pg_uri_header_present = x_jdbc_url(req).is_some();
let inferred_prefix = crate::api::host_routing::infer_wildcard_host_prefix(req.headers());
json!({
"enabled": crate::api::host_routing::wildcard_host_routing_enabled(),
"pattern": pattern,
"policy_inferred_prefix": inferred_prefix,
"explicit_x_athena_client_present": configured_header_value.is_some(),
"direct_pg_uri_header_present": direct_pg_uri_header_present,
"skip_reason": if configured_header_value.is_some() {
Value::String("explicit_x_athena_client_header_wins".to_string())
} else if direct_pg_uri_header_present {
Value::String("direct_postgres_uri_header_wins".to_string())
} else {
Value::Null
},
"candidate_hosts": [
wildcard_host_candidate_row("host", header_value_trimmed(req, "host"), &pattern),
wildcard_host_candidate_row("x-forwarded-host", first_forwarded_host_value(req), &pattern),
wildcard_host_candidate_row("forwarded.host", forwarded_header_host_value(req), &pattern),
],
})
}
async fn resolve_gateway_target(req: &HttpRequest, app_state: &AppState) -> GatewayResolution {
let configured_header_value = header_value_trimmed(req, "x-athena-client");
let resolved_request_context_client = resolved_athena_client(req);
let direct_pg_token = direct_postgres_client_token(req);
let fallback_values = json!({
"resolved_request_context_client": resolved_request_context_client,
"resolved_request_context_source": resolved_client_source_payload(resolved_athena_client_source(req)),
"resolved_route_target_url": resolved_athena_route_target_url(req),
"direct_pg_routing_token": direct_pg_token,
"host_header": header_value_trimmed(req, "host"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
"x_forwarded_prefix": header_value_trimmed(req, "x-forwarded-prefix"),
"wildcard_host_resolution": wildcard_host_resolution_debug(req),
});
if let Some(raw_direct_uri) = x_jdbc_url(req) {
let Some(postgres_url) = jdbc_to_postgres_url(&raw_direct_uri) else {
return GatewayResolution {
configured_header_value,
resolved_tenant: direct_pg_token,
resolution_status: "invalid_header",
resolution_mode: Some("direct_pg_uri"),
resolution_error: Some(
"direct PostgreSQL routing header is not a valid postgres or jdbc:postgresql URI"
.to_string(),
),
fallback_header_values: fallback_values,
target: None,
};
};
if let Err(error) = validate_postgres_target(
&postgres_url,
app_state.gateway_jdbc_allow_private_hosts,
&app_state.gateway_jdbc_allowed_hosts,
) {
return GatewayResolution {
configured_header_value,
resolved_tenant: direct_pg_token,
resolution_status: "invalid_header",
resolution_mode: Some("direct_pg_uri"),
resolution_error: Some(error),
fallback_header_values: fallback_values,
target: None,
};
}
return GatewayResolution {
configured_header_value,
resolved_tenant: direct_postgres_client_token(req),
resolution_status: "ok",
resolution_mode: Some("direct_pg_uri"),
resolution_error: None,
fallback_header_values: fallback_values,
target: Some(GatewayTarget::DirectPostgres {
client_token: direct_postgres_client_token(req).unwrap_or_default(),
redacted_postgres_url: redact_direct_pg_url(&postgres_url),
postgres_url,
}),
};
}
let requested_client = x_athena_client(req);
let normalized_client = match normalize_tenant_header_value(Some(requested_client)) {
Ok(value) => value,
Err(error) => {
return GatewayResolution {
configured_header_value,
resolved_tenant: None,
resolution_status: "invalid_header",
resolution_mode: Some("x-athena-client"),
resolution_error: Some(error),
fallback_header_values: fallback_values,
target: None,
};
}
};
let Some(client_name) = normalized_client else {
return GatewayResolution {
configured_header_value,
resolved_tenant: None,
resolution_status: "unknown_tenant",
resolution_mode: None,
resolution_error: Some(
"no X-Athena-Client header or wildcard host client context was present".to_string(),
),
fallback_header_values: fallback_values,
target: None,
};
};
let catalog_warm_error = ensure_catalog_database_client_loaded(app_state, &client_name)
.await
.err();
if let Some(registered) = app_state.pg_registry.registered_client(&client_name) {
return GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name.clone()),
resolution_status: "ok",
resolution_mode: Some("postgres_registry"),
resolution_error: catalog_warm_error,
fallback_header_values: fallback_values,
target: Some(GatewayTarget::RegisteredPostgres {
pool: app_state.pg_registry.get_pool(&client_name),
client_name,
registered,
}),
};
}
match resolve_query_backend(app_state, &client_name).await {
Ok(Some(AthenaResolvedQueryBackend::D1 {
client,
connection_info,
})) => {
return GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name.clone()),
resolution_status: "ok",
resolution_mode: Some("cloudflare_d1"),
resolution_error: None,
fallback_header_values: fallback_values,
target: Some(GatewayTarget::D1 {
client_name,
connection: json!({
"worker_base_url": connection_info.worker_base_url,
"auth_token_env_var": connection_info.auth_token_env_var,
"database_binding": connection_info.database_binding,
"default_session_mode": connection_info.default_session_mode,
"source": client.source,
"metadata": client.metadata,
}),
}),
};
}
Ok(Some(AthenaResolvedQueryBackend::Scylla {
client,
connection_info,
})) => {
return GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name.clone()),
resolution_status: "ok",
resolution_mode: Some("scylla"),
resolution_error: None,
fallback_header_values: fallback_values,
target: Some(GatewayTarget::Scylla {
client_name,
connection: json!({
"host": connection_info.host,
"extra_hosts": connection_info.extra_hosts,
"keyspace": connection_info.keyspace,
"username_present": !connection_info.username.trim().is_empty(),
"source": client.source,
"metadata": client.metadata,
}),
}),
};
}
Ok(Some(AthenaResolvedQueryBackend::Postgres(client))) => {
let resolved_tenant = client_name.clone();
let pool = app_state.pg_registry.get_pool(&client_name);
let pool_connected = pool.is_some();
let (pg_uri, pg_uri_env_var, config_uri_template) = match &client.connection {
AthenaConnection::Postgres(connection) => (
connection.pg_uri.clone(),
connection.pg_uri_env_var.clone(),
connection.config_uri_template.clone(),
),
_ => (None, None, None),
};
return GatewayResolution {
configured_header_value,
resolved_tenant: Some(resolved_tenant),
resolution_status: "ok",
resolution_mode: Some("postgres_catalog"),
resolution_error: None,
fallback_header_values: fallback_values,
target: Some(GatewayTarget::RegisteredPostgres {
pool,
client_name,
registered: RegisteredClient {
client_name: client.client_name,
source: client.source,
description: client.description,
pg_uri,
pg_uri_env_var,
config_uri_template,
is_active: true,
is_frozen: false,
pool_connected,
},
}),
};
}
Ok(None) => {}
Err(error) => {
return GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name),
resolution_status: "ok",
resolution_mode: Some("catalog_lookup"),
resolution_error: Some(gateway_resolve_error_message(&error)),
fallback_header_values: fallback_values,
target: None,
};
}
}
match known_supabase_connection_info(&client_name) {
Ok(Some(info)) => GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name.clone()),
resolution_status: "ok",
resolution_mode: Some("built_in_supabase"),
resolution_error: None,
fallback_header_values: fallback_values,
target: Some(GatewayTarget::Supabase {
client_name,
connection: json!({
"url": info.url,
"host": info.host,
"name": info.name,
}),
}),
},
Ok(None) => GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name),
resolution_status: "unknown_tenant",
resolution_mode: Some("x-athena-client"),
resolution_error: Some("client is not present in the live registry, catalog, or built-in Supabase mappings".to_string()),
fallback_header_values: fallback_values,
target: None,
},
Err(error) => GatewayResolution {
configured_header_value,
resolved_tenant: Some(client_name),
resolution_status: "ok",
resolution_mode: Some("built_in_supabase"),
resolution_error: Some(error),
fallback_header_values: fallback_values,
target: None,
},
}
}
async fn build_gateway_tenant_configs(
app_state: &AppState,
allow_sensitive_metadata: bool,
) -> Value {
let mut tenants = BTreeMap::<String, Value>::new();
for registered in app_state.pg_registry.list_registered_clients() {
tenants.insert(
registered.client_name.to_ascii_lowercase(),
json!({
"client_name": registered.client_name,
"source": registered.source,
"description": registered.description,
"classification": "postgres",
"is_active": registered.is_active,
"is_frozen": registered.is_frozen,
"pool_connected": registered.pool_connected,
"registered_in_runtime": true,
"catalog_present": false,
}),
);
}
if let Ok(pool) = logging_pool(app_state)
&& let Ok(catalog_clients) = list_athena_clients(&pool).await
{
for record in catalog_clients {
let key = record.client_name.to_ascii_lowercase();
let entry = tenants.entry(key).or_insert_with(|| {
json!({
"client_name": record.client_name,
"description": record.description,
"source": record.source,
"classification": classify_catalog_client(&record),
"is_active": record.is_active,
"is_frozen": record.is_frozen,
"pool_connected": false,
"registered_in_runtime": false,
"catalog_present": true,
})
});
if let Value::Object(map) = entry {
map.insert("catalog_present".to_string(), Value::Bool(true));
map.insert(
"classification".to_string(),
Value::String(classify_catalog_client(&record).to_string()),
);
map.insert("is_active".to_string(), Value::Bool(record.is_active));
map.insert("is_frozen".to_string(), Value::Bool(record.is_frozen));
map.insert(
"metadata".to_string(),
sanitize_gateway_tenant_metadata(&record.metadata, allow_sensitive_metadata),
);
}
}
}
for client_name in ["supabase", "xbp", "athena", "atabex", "athena_dexter"] {
if tenants.contains_key(&client_name.to_ascii_lowercase()) {
continue;
}
if let Ok(Some(info)) = known_supabase_connection_info(client_name) {
tenants.insert(
client_name.to_ascii_lowercase(),
json!({
"client_name": client_name,
"source": "built_in_supabase",
"classification": "supabase",
"is_active": true,
"is_frozen": false,
"pool_connected": false,
"registered_in_runtime": false,
"catalog_present": false,
"host": info.host,
}),
);
}
}
json!({
"header_name": "X-Athena-Client",
"default_tenant": Value::Null,
"logging_client": app_state.logging_client_name,
"auth_client": app_state.gateway_auth_client_name,
"benchmark_client": app_state.gateway_benchmark_client_name,
"known_tenants": tenants.into_values().collect::<Vec<_>>(),
})
}
fn sanitize_gateway_tenant_metadata(value: &Value, allow_sensitive_metadata: bool) -> Value {
if allow_sensitive_metadata {
return value.clone();
}
match value {
Value::Object(map) => {
let mut sanitized = Map::new();
for (key, entry) in map {
sanitized.insert(
key.clone(),
sanitize_gateway_tenant_metadata_entry(key, entry),
);
}
Value::Object(sanitized)
}
Value::Array(values) => Value::Array(
values
.iter()
.map(|entry| sanitize_gateway_tenant_metadata(entry, false))
.collect(),
),
Value::String(text) if looks_like_sensitive_gateway_metadata_uri(text) => {
Value::String("<redacted-uri>".to_string())
}
_ => value.clone(),
}
}
fn sanitize_gateway_tenant_metadata_entry(key: &str, value: &Value) -> Value {
let normalized_key = key.to_ascii_lowercase();
if normalized_key == "host_port" {
return Value::String("<redacted>".to_string());
}
if gateway_metadata_key_is_secret(&normalized_key) {
return Value::String("<redacted>".to_string());
}
sanitize_gateway_tenant_metadata(value, false)
}
fn gateway_metadata_key_is_secret(key: &str) -> bool {
key.contains("password")
|| key.contains("secret")
|| key.ends_with("_token")
|| key.ends_with("_key")
}
fn looks_like_sensitive_gateway_metadata_uri(value: &str) -> bool {
let trimmed = value.trim();
trimmed.starts_with("postgres://")
|| trimmed.starts_with("postgresql://")
|| trimmed.starts_with("athena://")
|| trimmed.starts_with("jdbc:postgresql://")
}
fn build_gateway_runtime(
req: &HttpRequest,
app_state: &AppState,
access: &PrivilegedDebugAccess,
known_tenants: Option<&Value>,
) -> Value {
json!({
"tenant_header_name": "X-Athena-Client",
"debug_endpoint": DEBUG_GATEWAY_ENDPOINT_PATH,
"request_base_url": computed_base_url(req),
"multitenancy_enabled": true,
"enabled_features": {
"database_backed_client_loading_enabled": app_state.gateway_database_backed_client_loading_enabled,
"direct_pg_uri_auth_bypass_enabled": direct_pg_uri_auth_bypass_enabled(),
"wildcard_host_routing_enabled": crate::api::host_routing::wildcard_host_routing_enabled(),
"wildcard_host_pattern": crate::api::host_routing::wildcard_host_pattern(),
"logging_trust_x_forwarded_for": app_state.logging_trust_x_forwarded_for,
"rate_limit_trust_x_forwarded_for": app_state.inbound_rate_limit_trust_x_forwarded_for,
"cors_allow_any_origin": app_state.cors_allow_any_origin,
"cors_allowed_origin_count": app_state.cors_allowed_origins.len(),
"privileged_debug_available": access.expected_secret_source.is_some(),
},
"route_paths": {
"gateway_base_path": "/gateway",
"unified_service_base_paths": ["/db", "/auth", "/storage", "/realtime"],
"public_route_base_path": "/public/{route_key}/{op}",
"debug_endpoint": DEBUG_GATEWAY_ENDPOINT_PATH,
},
"tenant_wildcard_host_equivalence": {
"status": if crate::api::host_routing::wildcard_host_routing_enabled() { "enabled" } else { "disabled" },
"pattern": crate::api::host_routing::wildcard_host_pattern(),
"equivalent_header_name": "X-Athena-Client",
"default_rule": "When no explicit X-Athena-Client or direct PostgreSQL URI header is present, <client>.<wildcard suffix> is treated as X-Athena-Client: <client> for routes that resolve a tenant through the shared Athena client helpers. Canonical public module routing then uses /db, /auth, /storage, or /realtime on that same host.",
"precedence": [
"Direct PostgreSQL URI headers keep direct routing and suppress wildcard host inference.",
"A non-empty X-Athena-Client header wins over wildcard host inference.",
"Optional service alias hosts like db.<client>.<wildcard suffix> still resolve <client> as the route key while the leading host label selects the routed module.",
"An active public_gateway_routes row for the wildcard prefix may override the client name and attach target_url metadata.",
"If no active route row can be used, the wildcard prefix itself is the default client name.",
],
"covered_route_groups": [
"/db/*",
"/auth/*",
"/gateway/data",
"/gateway/fetch",
"/gateway/update",
"/gateway/insert",
"/gateway/delete",
"/gateway/query",
"/gateway/sql",
"/query/sql",
"/storage/*",
"/realtime/*",
"/s3/*",
"/management/*",
"/pipelines",
"PostgREST compatibility routes that call the shared x_athena_client helper",
],
"target_url_note": "target_url is mirror metadata attached by an active public route row. Handlers that implement target_url proxying can forward to that mirror; otherwise the resolved client still behaves like the equivalent X-Athena-Client header.",
},
"origin_policy": build_origin_policy_debug(req, app_state),
"proxy_behavior": {
"host_header": header_value_trimmed(req, "host"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
"x_forwarded_prefix": header_value_trimmed(req, "x-forwarded-prefix"),
},
"auth_store": auth_store_status_payload(app_state),
"logging_store": logging_store_status_payload(app_state),
"known_tenant_count": known_tenants.and_then(Value::as_array).map(|values| values.len()).unwrap_or(0),
"mapping_application_note": "Client registry, wildcard host routing, direct PostgreSQL routing, and built-in upstream selection are runtime-applied. Control-plane table rows below are diagnostic visibility checks only.",
})
}
fn build_gateway_request_payload(
req: &HttpRequest,
app_state: &AppState,
resolution: &GatewayResolution,
jwt_secret_supplied: bool,
) -> Value {
let mut payload = match build_request_transport_debug(req, app_state) {
Value::Object(map) => map,
_ => Map::new(),
};
payload.insert(
"configured_header_name".to_string(),
Value::String("X-Athena-Client".to_string()),
);
payload.insert(
"configured_header_value".to_string(),
resolution
.configured_header_value
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
);
payload.insert(
"fallback_header_values".to_string(),
resolution.fallback_header_values.clone(),
);
payload.insert(
"tenant_resolution_flow".to_string(),
json!({
"configured_header": {
"name": "X-Athena-Client",
"value": resolution.configured_header_value.clone(),
"present": resolution.configured_header_value.is_some(),
},
"request_context": {
"resolved_client": resolved_athena_client(req),
"source": resolved_client_source_payload(resolved_athena_client_source(req)),
"route_target_url": resolved_athena_route_target_url(req),
},
"wildcard_host": wildcard_host_resolution_debug(req),
"effective_client": resolution.resolved_tenant.clone(),
"effective_equivalent_header": resolution.resolved_tenant.as_ref().map(|client| {
json!({
"name": "X-Athena-Client",
"value": client,
})
}).unwrap_or(Value::Null),
"resolution_status": resolution.resolution_status,
"resolution_mode": resolution.resolution_mode,
"resolution_error": resolution.resolution_error.clone(),
}),
);
payload.insert(
"resolved_tenant".to_string(),
resolution
.resolved_tenant
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
);
payload.insert(
"resolution_status".to_string(),
Value::String(resolution.resolution_status.to_string()),
);
payload.insert(
"resolution_mode".to_string(),
resolution
.resolution_mode
.map(|value| Value::String(value.to_string()))
.unwrap_or(Value::Null),
);
payload.insert(
"resolution_error".to_string(),
resolution
.resolution_error
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
);
payload.insert(
"jwt_secret_supplied".to_string(),
Value::Bool(jwt_secret_supplied),
);
Value::Object(payload)
}
async fn build_gateway_probe(app_state: &AppState, resolution: &GatewayResolution) -> Value {
let control_plane_probe = build_gateway_control_plane_probe(app_state).await;
let mut resource_rows = control_plane_probe
.get("resource_resolution")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let Some(target) = &resolution.target else {
return json!({
"status": if resolution.resolution_status == "ok" { "unavailable" } else { resolution.resolution_status },
"tenant_id": resolution.resolved_tenant,
"resolution_mode": resolution.resolution_mode,
"routing_context": Value::Null,
"search_path": Value::Null,
"upstream_probe": {
"status": if resolution.resolution_status == "ok" { "unavailable" } else { resolution.resolution_status },
"error": resolution.resolution_error,
},
"control_plane_probe": control_plane_probe.get("summary").cloned().unwrap_or(Value::Null),
"resource_resolution": resource_rows,
});
};
match target {
GatewayTarget::DirectPostgres {
client_token,
postgres_url,
redacted_postgres_url,
} => {
let started = Instant::now();
let upstream_probe =
match connect_probe_pool(postgres_url).await {
Ok(pool) => match probe_pg_context(&pool).await {
Ok(context) => {
resource_rows.insert(0, upstream_probe_row(
"direct_postgres",
client_token,
redacted_postgres_url.clone(),
Some(context.clone()),
"Gateway routes would query this direct PostgreSQL target at runtime.",
None,
));
json!({
"status": "ok",
"latency_ms": started.elapsed().as_millis(),
"probe_kind": "postgres_runtime",
"redacted_postgres_url": redacted_postgres_url,
"context": context,
})
}
Err(error) => {
resource_rows.insert(0, upstream_probe_row(
"direct_postgres",
client_token,
redacted_postgres_url.clone(),
None,
"Gateway routes would query this direct PostgreSQL target at runtime.",
Some(error.clone()),
));
json!({
"status": "probe_failed",
"latency_ms": started.elapsed().as_millis(),
"probe_kind": "postgres_runtime",
"redacted_postgres_url": redacted_postgres_url,
"error": error,
})
}
},
Err(error) => {
resource_rows.insert(0, upstream_probe_row(
"direct_postgres",
client_token,
redacted_postgres_url.clone(),
None,
"Gateway routes would query this direct PostgreSQL target at runtime.",
Some(error.clone()),
));
json!({
"status": "probe_failed",
"latency_ms": started.elapsed().as_millis(),
"probe_kind": "postgres_runtime",
"redacted_postgres_url": redacted_postgres_url,
"error": error,
})
}
};
let search_path = upstream_probe
.get("context")
.cloned()
.unwrap_or(Value::Null);
json!({
"status": upstream_probe.get("status").and_then(Value::as_str).unwrap_or("unknown"),
"tenant_id": client_token,
"resolution_mode": resolution.resolution_mode,
"routing_context": {
"backend_kind": "direct_postgres",
"redacted_postgres_url": redacted_postgres_url,
"direct_routing_token": client_token,
},
"search_path": search_path,
"upstream_probe": upstream_probe,
"control_plane_probe": control_plane_probe.get("summary").cloned().unwrap_or(Value::Null),
"resource_resolution": resource_rows,
})
}
GatewayTarget::RegisteredPostgres {
client_name,
registered,
pool,
} => {
let routing_context = json!({
"backend_kind": "postgres",
"client_name": client_name,
"source": registered.source,
"pool_connected": registered.pool_connected,
"registered_pg_uri": registered.pg_uri.as_deref().map(redact_direct_pg_url),
"pg_uri_env_var": registered.pg_uri_env_var,
"config_uri_template": registered.config_uri_template.as_deref().map(redact_direct_pg_url),
});
let upstream_probe = match pool {
Some(pool) => {
let started = Instant::now();
match probe_pg_context(pool).await {
Ok(context) => {
resource_rows.insert(0, upstream_probe_row(
"postgres",
client_name,
registered.pg_uri.as_deref().map(redact_direct_pg_url).unwrap_or_else(|| client_name.clone()),
Some(context.clone()),
"Gateway routes resolve this logical client through the live Postgres registry.",
None,
));
json!({
"status": "ok",
"latency_ms": started.elapsed().as_millis(),
"probe_kind": "postgres_runtime",
"context": context,
})
}
Err(error) => {
resource_rows.insert(0, upstream_probe_row(
"postgres",
client_name,
registered.pg_uri.as_deref().map(redact_direct_pg_url).unwrap_or_else(|| client_name.clone()),
None,
"Gateway routes resolve this logical client through the live Postgres registry.",
Some(error.clone()),
));
json!({
"status": "probe_failed",
"latency_ms": started.elapsed().as_millis(),
"probe_kind": "postgres_runtime",
"error": error,
})
}
}
}
None => {
let reason = unavailable_registered_client_reason(registered);
resource_rows.insert(0, upstream_probe_row(
"postgres",
client_name,
registered.pg_uri.as_deref().map(redact_direct_pg_url).unwrap_or_else(|| client_name.clone()),
None,
"Gateway routes resolve this logical client through the live Postgres registry.",
Some(reason.clone()),
));
json!({
"status": "unavailable",
"probe_kind": "postgres_runtime",
"error": reason,
})
}
};
json!({
"status": upstream_probe.get("status").and_then(Value::as_str).unwrap_or("unknown"),
"tenant_id": client_name,
"resolution_mode": resolution.resolution_mode,
"routing_context": routing_context,
"search_path": upstream_probe.get("context").cloned().unwrap_or(Value::Null),
"upstream_probe": upstream_probe,
"control_plane_probe": control_plane_probe.get("summary").cloned().unwrap_or(Value::Null),
"resource_resolution": resource_rows,
})
}
GatewayTarget::D1 {
client_name,
connection,
} => {
resource_rows.insert(0, json!({
"resource_kind": "upstream",
"logical_resource": client_name,
"runtime_lookup_resource": connection.get("worker_base_url").and_then(Value::as_str),
"configured_mapped_resource": connection.get("database_binding").cloned(),
"mapping_applied_at_runtime": true,
"runtime_status": {
"status": "ok",
"auth_token_env_var": connection.get("auth_token_env_var"),
"default_session_mode": connection.get("default_session_mode"),
},
"mapped_status": Value::Null,
"overall_status": "ok",
"note": "Gateway SQL for this client resolves through the Cloudflare D1 worker proxy at runtime."
}));
json!({
"status": "ok",
"tenant_id": client_name,
"resolution_mode": resolution.resolution_mode,
"routing_context": {
"backend_kind": "cloudflare_d1",
"connection": connection,
},
"search_path": Value::Null,
"upstream_probe": {
"status": "unsupported_live_probe",
"probe_kind": "cloudflare_d1",
"note": "This debug endpoint reports D1 worker routing metadata and env-token wiring, but does not issue a synthetic query."
},
"control_plane_probe": control_plane_probe.get("summary").cloned().unwrap_or(Value::Null),
"resource_resolution": resource_rows,
})
}
GatewayTarget::Scylla {
client_name,
connection,
} => {
resource_rows.insert(0, json!({
"resource_kind": "upstream",
"logical_resource": client_name,
"runtime_lookup_resource": connection.get("host").and_then(Value::as_str),
"configured_mapped_resource": connection.get("keyspace").cloned(),
"mapping_applied_at_runtime": true,
"runtime_status": {
"status": "ok",
"extra_hosts": connection.get("extra_hosts"),
"username_present": connection.get("username_present"),
},
"mapped_status": Value::Null,
"overall_status": "ok",
"note": "Gateway CQL for this client resolves through the configured Scylla hosts at runtime."
}));
json!({
"status": "ok",
"tenant_id": client_name,
"resolution_mode": resolution.resolution_mode,
"routing_context": {
"backend_kind": "scylla",
"connection": connection,
},
"search_path": Value::Null,
"upstream_probe": {
"status": "unsupported_live_probe",
"probe_kind": "scylla",
"note": "This debug endpoint reports Scylla routing metadata, but does not issue a synthetic CQL request."
},
"control_plane_probe": control_plane_probe.get("summary").cloned().unwrap_or(Value::Null),
"resource_resolution": resource_rows,
})
}
GatewayTarget::Supabase {
client_name,
connection,
} => {
let offline = client_router_health_aware(client_name)
.ok()
.and_then(|client| client.is_offline())
.map(|deadline| {
deadline
.checked_duration_since(Instant::now())
.unwrap_or_default()
.as_secs()
});
resource_rows.insert(0, json!({
"resource_kind": "upstream",
"logical_resource": client_name,
"runtime_lookup_resource": connection.get("host").and_then(Value::as_str),
"configured_mapped_resource": connection.get("url").cloned(),
"mapping_applied_at_runtime": true,
"runtime_status": {
"status": if offline.is_some() { "offline" } else { "ok" },
"offline_seconds_remaining": offline,
},
"mapped_status": Value::Null,
"overall_status": if offline.is_some() { "offline" } else { "ok" },
"note": "Gateway CRUD translation for this client uses the built-in Supabase mapping."
}));
json!({
"status": if offline.is_some() { "offline" } else { "ok" },
"tenant_id": client_name,
"resolution_mode": resolution.resolution_mode,
"routing_context": {
"backend_kind": "supabase",
"connection": connection,
},
"search_path": Value::Null,
"upstream_probe": {
"status": if offline.is_some() { "offline" } else { "ok" },
"probe_kind": "supabase_runtime_state",
"offline_seconds_remaining": offline,
"note": "This status reflects the in-process Supabase circuit breaker. The debugger does not perform a synthetic HTTP request."
},
"control_plane_probe": control_plane_probe.get("summary").cloned().unwrap_or(Value::Null),
"resource_resolution": resource_rows,
})
}
}
}
async fn build_gateway_control_plane_probe(app_state: &AppState) -> Value {
let mut resource_rows = Vec::new();
let logging_summary = match logging_pool(app_state) {
Ok(pool) => match probe_pg_context(&pool).await {
Ok(context) => {
resource_rows.extend(
build_relation_resolution_rows(
&pool,
&context.runtime_effective_schemas,
&[
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "athena_clients".to_string(),
runtime_lookup_resource: "public.athena_clients".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for the live client catalog.".to_string(),
},
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "public_gateway_routes".to_string(),
runtime_lookup_resource: "public.public_gateway_routes".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for public route dispatch metadata.".to_string(),
},
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "api_registry".to_string(),
runtime_lookup_resource: "public.api_registry".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for API registry metadata.".to_string(),
},
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "gateway_request_log".to_string(),
runtime_lookup_resource: "public.gateway_request_log".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for request telemetry.".to_string(),
},
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "gateway_operation_log".to_string(),
runtime_lookup_resource: "public.gateway_operation_log".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for operation telemetry.".to_string(),
},
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "gateway_admission_events".to_string(),
runtime_lookup_resource: "public.gateway_admission_events".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for admission limiter events.".to_string(),
},
RelationResolutionSpec {
resource_kind: "control_plane_table",
logical_resource: "gateway_deferred_request_queue".to_string(),
runtime_lookup_resource: "public.gateway_deferred_request_queue".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for deferred gateway queue state.".to_string(),
},
],
)
.await,
);
json!({
"status": "ok",
"client_name": app_state.logging_client_name,
"context": context,
})
}
Err(error) => json!({
"status": "probe_failed",
"client_name": app_state.logging_client_name,
"error": error,
}),
},
Err(response) => json!({
"status": "unavailable",
"error": response_to_status_line(&response),
}),
};
let auth_summary = match auth_pool(app_state) {
Ok(pool) => {
match probe_pg_context(&pool).await {
Ok(context) => {
resource_rows.extend(
build_relation_resolution_rows(
&pool,
&context.runtime_effective_schemas,
&[
RelationResolutionSpec {
resource_kind: "auth_table",
logical_resource: "api_keys".to_string(),
runtime_lookup_resource: "public.api_keys".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for gateway API key storage.".to_string(),
},
RelationResolutionSpec {
resource_kind: "auth_table",
logical_resource: "api_key_rights".to_string(),
runtime_lookup_resource: "public.api_key_rights".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for gateway rights metadata.".to_string(),
},
RelationResolutionSpec {
resource_kind: "auth_table",
logical_resource: "api_key_config".to_string(),
runtime_lookup_resource: "public.api_key_config".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for global auth enforcement state.".to_string(),
},
RelationResolutionSpec {
resource_kind: "auth_table",
logical_resource: "api_key_client_config".to_string(),
runtime_lookup_resource: "public.api_key_client_config".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for per-client auth policy.".to_string(),
},
],
)
.await,
);
json!({
"status": "ok",
"client_name": app_state.gateway_auth_client_name,
"context": context,
})
}
Err(error) => json!({
"status": "probe_failed",
"client_name": app_state.gateway_auth_client_name,
"error": error,
}),
}
}
Err(response) => json!({
"status": "unavailable",
"error": response_to_status_line(&response),
}),
};
json!({
"summary": {
"logging_store": logging_summary,
"auth_store": auth_summary,
},
"resource_resolution": resource_rows,
})
}
async fn connect_probe_pool(postgres_url: &str) -> Result<PgPool, String> {
let sanitized = sanitize_sqlx_postgres_connect_uri(postgres_url);
PgPoolOptions::new()
.max_connections(1)
.acquire_timeout(DEBUG_PROBE_TIMEOUT)
.connect(sanitized.as_ref())
.await
.map_err(|error| error.to_string())
}
fn classify_catalog_client(record: &AthenaClientRecord) -> &'static str {
if record
.pg_uri
.as_ref()
.is_some_and(|value| !value.trim().is_empty())
|| record
.pg_uri_env_var
.as_ref()
.is_some_and(|value| !value.trim().is_empty())
|| record
.config_uri_template
.as_ref()
.is_some_and(|value| !value.trim().is_empty())
{
return "postgres";
}
if crate::drivers::cloudflare_d1::client::D1ConnectionInfo::from_metadata(&record.metadata)
.ok()
.flatten()
.is_some()
{
return "cloudflare_d1";
}
if crate::drivers::scylla::client::ScyllaConnectionInfo::from_metadata(&record.metadata)
.ok()
.flatten()
.is_some()
{
return "scylla";
}
"unknown"
}
fn auth_store_status_payload(app_state: &AppState) -> Value {
match auth_pool(app_state) {
Ok(_) => json!({
"status": "ok",
"client_name": app_state.gateway_auth_client_name,
"api_key_fail_mode": app_state.gateway_api_key_fail_mode,
}),
Err(response) => json!({
"status": if app_state.gateway_auth_client_name.is_some() { "disconnected" } else { "not_configured" },
"client_name": app_state.gateway_auth_client_name,
"api_key_fail_mode": app_state.gateway_api_key_fail_mode,
"error": response_to_status_line(&response),
}),
}
}
fn logging_store_status_payload(app_state: &AppState) -> Value {
match logging_pool(app_state) {
Ok(_) => json!({
"status": "ok",
"client_name": app_state.logging_client_name,
}),
Err(response) => json!({
"status": if app_state.logging_client_name.is_some() { "disconnected" } else { "not_configured" },
"client_name": app_state.logging_client_name,
"error": response_to_status_line(&response),
}),
}
}
fn upstream_probe_row(
backend_kind: &str,
logical_resource: &str,
runtime_lookup_resource: String,
context: Option<crate::api::debug::shared::PgContextProbe>,
note: &str,
error: Option<String>,
) -> Value {
json!({
"resource_kind": "upstream",
"logical_resource": logical_resource,
"runtime_lookup_resource": runtime_lookup_resource,
"configured_mapped_resource": Value::Null,
"mapping_applied_at_runtime": true,
"runtime_status": match context {
Some(context) => json!({
"status": "ok",
"backend_kind": backend_kind,
"context": context,
}),
None => json!({
"status": if error.is_some() { "probe_failed" } else { "unavailable" },
"backend_kind": backend_kind,
"error": error,
}),
},
"mapped_status": Value::Null,
"overall_status": if error.is_some() { "probe_failed" } else { "ok" },
"note": note,
})
}
fn top_level_status(request_status: &str, probe_status: &str) -> String {
if request_status != "ok" {
return request_status.to_string();
}
if probe_status == "unsupported_live_probe" {
return "ok".to_string();
}
probe_status.to_string()
}
fn response_to_status_line(response: &HttpResponse) -> String {
response
.error()
.map(|error| error.to_string())
.unwrap_or_else(|| response.status().to_string())
}
fn unavailable_registered_client_reason(registered: &RegisteredClient) -> String {
if !registered.is_active {
return "client is configured but inactive".to_string();
}
if registered.is_frozen {
return "client is configured but frozen".to_string();
}
"client is configured but no live pool is connected".to_string()
}
fn gateway_resolve_error_message(error: &AthenaClientResolveError) -> String {
match error {
AthenaClientResolveError::Inactive { client_name } => {
format!("catalog client '{client_name}' is inactive")
}
AthenaClientResolveError::Frozen { client_name } => {
format!("catalog client '{client_name}' is frozen")
}
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => {
format!("catalog client '{client_name}' has invalid metadata: {message}")
}
AthenaClientResolveError::Lookup {
client_name,
message,
} => {
format!("catalog lookup failed for '{client_name}': {message}")
}
}
}
fn redact_direct_pg_url(value: &str) -> String {
let provided = if value.starts_with("postgres://") || value.starts_with("postgresql://") {
value.to_string()
} else {
jdbc_to_postgres_url(value).unwrap_or_else(|| value.to_string())
};
let Ok(mut parsed) = Url::parse(&provided) else {
return "<invalid_database_url>".to_string();
};
if !parsed.username().is_empty() {
let _ = parsed.set_username("***");
}
if parsed.password().is_some() {
let _ = parsed.set_password(Some("***"));
}
parsed.to_string()
}
#[cfg(test)]
mod tests {
use super::sanitize_gateway_tenant_metadata;
use serde_json::json;
#[test]
fn unprivileged_debug_redacts_sensitive_managed_instance_metadata() {
let metadata = json!({
"provider": "docker",
"host_port": 5433,
"instance": {
"host": "127.0.0.1",
"host_port": 5433,
"username": "athena",
"password": "super-secret"
},
"network": {
"private_pg_uri": "postgresql://athena:super-secret@127.0.0.1:5433/tenant_alpha"
}
});
let sanitized = sanitize_gateway_tenant_metadata(&metadata, false);
assert_eq!(sanitized["provider"], json!("docker"));
assert_eq!(sanitized["host_port"], json!("<redacted>"));
assert_eq!(sanitized["instance"]["host_port"], json!("<redacted>"));
assert_eq!(sanitized["instance"]["password"], json!("<redacted>"));
assert_eq!(
sanitized["network"]["private_pg_uri"],
json!("<redacted-uri>")
);
}
#[test]
fn privileged_debug_retains_sensitive_managed_instance_metadata() {
let metadata = json!({
"instance": {
"host_port": 5433,
"password": "super-secret"
}
});
let sanitized = sanitize_gateway_tenant_metadata(&metadata, true);
assert_eq!(sanitized, metadata);
}
}