use actix_web::HttpRequest;
use chrono::Utc;
use reqwest::Url;
use serde::Serialize;
use serde_json::{Map, Value, json};
use sqlx::{PgPool, Row};
use std::env;
use std::time::Duration;
use tokio::time::timeout;
use crate::AppState;
use crate::api::auth::static_admin_key_status;
use crate::api::gateway::auth::extract_client_ip;
use crate::api::headers::x_jdbc_url::{jdbc_to_postgres_url, x_jdbc_url};
pub const DEBUG_GATEWAY_ENDPOINT_PATH: &str = "/debug/gateway";
pub const DEBUG_S3_ENDPOINT_PATH: &str = "/debug/s3";
pub const DEBUG_JWT_SECRET_ENV: &str = "ATHENA_DEBUG_JWT_SECRET";
const LEGACY_DEBUG_JWT_SECRET_ENV: &str = "JWT_SECRET";
pub const DEBUG_PROBE_TIMEOUT: Duration = Duration::from_secs(4);
#[derive(Debug, Serialize)]
pub struct RuntimeDebugPayload {
pub status: String,
pub service: String,
pub version: String,
pub debug_endpoint: String,
pub generated_at: String,
pub privileged_debug: Value,
pub runtime: Value,
pub request: Value,
pub tenant_configs: Value,
pub current_tenant_probe: Value,
}
#[derive(Debug, Clone)]
pub struct PrivilegedDebugAccess {
pub enabled: bool,
pub reason: &'static str,
pub expected_secret_source: Option<&'static str>,
}
#[derive(Debug, Clone)]
pub struct RelationResolutionSpec {
pub resource_kind: &'static str,
pub logical_resource: String,
pub runtime_lookup_resource: String,
pub configured_mapped_resource: Option<String>,
pub mapping_applied_at_runtime: bool,
pub note: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct PgContextProbe {
pub current_database: Option<String>,
pub runtime_current_schema: Option<String>,
pub runtime_setting_search_path: Option<String>,
pub runtime_effective_schemas: Vec<String>,
}
pub fn evaluate_privileged_debug_access(query_secret: Option<&str>) -> PrivilegedDebugAccess {
let Some((expected_secret, source)) = configured_debug_secret() else {
return PrivilegedDebugAccess {
enabled: false,
reason: "server_jwt_secret_unavailable",
expected_secret_source: None,
};
};
let Some(provided_secret) = query_secret
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return PrivilegedDebugAccess {
enabled: false,
reason: "jwt_secret_query_missing",
expected_secret_source: Some(source),
};
};
if normalize_secret_for_compare(provided_secret)
== normalize_secret_for_compare(&expected_secret)
{
return PrivilegedDebugAccess {
enabled: true,
reason: "jwt_secret_query_matches_env",
expected_secret_source: Some(source),
};
}
PrivilegedDebugAccess {
enabled: false,
reason: "jwt_secret_query_mismatch",
expected_secret_source: Some(source),
}
}
pub fn build_privileged_debug_payload(
app_state: &AppState,
access: &PrivilegedDebugAccess,
env_keys: &[&str],
service_specific: Value,
) -> Value {
let mut payload = json!({
"enabled": access.enabled,
"reason": access.reason,
"expected_secret_source": access.expected_secret_source,
});
if let Value::Object(map) = &mut payload
&& access.enabled
{
map.insert(
"config_snapshot".to_string(),
app_state.runtime_debug_config_snapshot.clone(),
);
map.insert("env_snapshot".to_string(), build_env_snapshot(env_keys));
map.insert("process".to_string(), build_process_debug(app_state));
if !service_specific.is_null() {
map.insert("service_specific".to_string(), service_specific);
}
}
payload
}
pub fn build_debug_payload(
service: &str,
debug_endpoint: &str,
status: impl Into<String>,
privileged_debug: Value,
runtime: Value,
request: Value,
tenant_configs: Value,
current_tenant_probe: Value,
) -> RuntimeDebugPayload {
RuntimeDebugPayload {
status: status.into(),
service: service.to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
debug_endpoint: debug_endpoint.to_string(),
generated_at: Utc::now().to_rfc3339(),
privileged_debug,
runtime,
request,
tenant_configs,
current_tenant_probe,
}
}
pub fn build_origin_policy_debug(req: &HttpRequest, app_state: &AppState) -> Value {
let request_origin = header_value_trimmed(req, "origin");
let origin_allowed_status = match request_origin.as_deref() {
None => "missing_origin",
Some(_) if app_state.cors_allow_any_origin => "allowed",
Some(origin)
if app_state
.cors_allowed_origins
.iter()
.any(|value| value == origin) =>
{
"allowed"
}
Some(_) => "not_allowed",
};
json!({
"request_origin": request_origin,
"cors_allow_any_origin": app_state.cors_allow_any_origin,
"cors_allowed_origins": app_state.cors_allowed_origins,
"origin_allowed_status": origin_allowed_status,
"note": if app_state.cors_allow_any_origin {
"Athena is currently configured to allow any origin."
} else {
"Origin evaluation is based on Athena's effective CORS allowlist."
},
})
}
pub fn build_request_transport_debug(req: &HttpRequest, app_state: &AppState) -> Value {
json!({
"method": req.method().as_str(),
"path": req.path(),
"host_header": header_value_trimmed(req, "host"),
"origin_header": header_value_trimmed(req, "origin"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
"x_forwarded_prefix": header_value_trimmed(req, "x-forwarded-prefix"),
"x_forwarded_for_present": req.headers().contains_key("x-forwarded-for"),
"computed_base_url": computed_base_url(req),
"client_ip": extract_client_ip(req, app_state.logging_trust_x_forwarded_for)
.map(|ip| ip.to_string()),
"static_admin_key_status": format!("{:?}", static_admin_key_status(req)).to_ascii_lowercase(),
"gateway_api_key_present": req.headers().contains_key("x-athena-key"),
"direct_pg_uri_present": x_jdbc_url(req).is_some(),
"direct_pg_uri_redacted": x_jdbc_url(req)
.and_then(|raw| jdbc_to_postgres_url(&raw))
.map(|value| redact_database_url(&value)),
})
}
pub fn computed_base_url(req: &HttpRequest) -> Option<String> {
let scheme = header_value_trimmed(req, "x-forwarded-proto")
.unwrap_or_else(|| req.connection_info().scheme().to_string());
let host = header_value_trimmed(req, "x-forwarded-host")
.or_else(|| header_value_trimmed(req, "host"))
.or_else(|| {
let value = req.connection_info().host().trim().to_string();
if value.is_empty() { None } else { Some(value) }
})?;
let prefix = header_value_trimmed(req, "x-forwarded-prefix")
.unwrap_or_default()
.trim_end_matches('/')
.to_string();
let suffix = if prefix.is_empty() {
String::new()
} else if prefix.starts_with('/') {
prefix
} else {
format!("/{prefix}")
};
Some(format!("{scheme}://{host}{suffix}"))
}
pub fn header_value_trimmed(req: &HttpRequest, name: &str) -> Option<String> {
req.headers()
.get(name)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub fn normalize_tenant_header_value(value: Option<String>) -> Result<Option<String>, String> {
let Some(raw) = value else {
return Ok(None);
};
let trimmed = raw.trim();
if trimmed.is_empty() {
return Ok(None);
}
if trimmed
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.'))
{
return Ok(Some(trimmed.to_string()));
}
Err(format!(
"invalid tenant header value '{trimmed}'; expected letters, numbers, '.', '-', or '_'"
))
}
pub async fn probe_pg_context(pool: &PgPool) -> Result<PgContextProbe, String> {
let row = timeout(DEBUG_PROBE_TIMEOUT, async {
sqlx::query(
r#"
SELECT
current_database() AS current_database,
current_schema() AS current_schema,
current_schemas(false) AS current_schemas,
current_setting('search_path', true) AS search_path
"#,
)
.fetch_one(pool)
.await
})
.await
.map_err(|_| format!("probe timed out after {}s", DEBUG_PROBE_TIMEOUT.as_secs()))?
.map_err(|error| error.to_string())?;
Ok(PgContextProbe {
current_database: row.try_get("current_database").ok(),
runtime_current_schema: row.try_get("current_schema").ok(),
runtime_setting_search_path: row.try_get("search_path").ok(),
runtime_effective_schemas: row.try_get("current_schemas").unwrap_or_default(),
})
}
pub async fn build_relation_resolution_rows(
pool: &PgPool,
effective_schemas: &[String],
specs: &[RelationResolutionSpec],
) -> Vec<Value> {
let mut rows = Vec::with_capacity(specs.len());
for spec in specs {
let runtime_status =
probe_relation_status(pool, &spec.runtime_lookup_resource, effective_schemas).await;
let mapped_status = match spec.configured_mapped_resource.as_deref() {
Some(value) => Some(probe_relation_status(pool, value, effective_schemas).await),
None => None,
};
let overall_status = relation_overall_status(
&runtime_status,
mapped_status.as_ref(),
spec.mapping_applied_at_runtime,
);
rows.push(json!({
"resource_kind": spec.resource_kind,
"logical_resource": spec.logical_resource,
"runtime_lookup_resource": spec.runtime_lookup_resource,
"configured_mapped_resource": spec.configured_mapped_resource,
"mapping_applied_at_runtime": spec.mapping_applied_at_runtime,
"runtime_status": runtime_status,
"mapped_status": mapped_status,
"overall_status": overall_status,
"note": spec.note,
}));
}
rows
}
fn configured_debug_secret() -> Option<(String, &'static str)> {
env::var(DEBUG_JWT_SECRET_ENV)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.map(|value| (value, DEBUG_JWT_SECRET_ENV))
.or_else(|| {
env::var(LEGACY_DEBUG_JWT_SECRET_ENV)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.map(|value| (value, LEGACY_DEBUG_JWT_SECRET_ENV))
})
}
fn normalize_secret_for_compare(value: &str) -> String {
value.trim().replace(' ', "+")
}
fn build_env_snapshot(keys: &[&str]) -> Value {
let mut names = keys
.iter()
.map(|value| value.to_string())
.collect::<Vec<_>>();
names.sort();
names.dedup();
let mut map = Map::new();
for key in names {
match env::var(&key) {
Ok(value) if !value.trim().is_empty() => {
map.insert(
key.clone(),
json!({
"present": true,
"value": redact_env_value(&key, &value),
}),
);
}
_ => {
map.insert(key.clone(), json!({ "present": false }));
}
}
}
Value::Object(map)
}
fn redact_env_value(key: &str, value: &str) -> String {
let normalized_key = key.to_ascii_lowercase();
if normalized_key.contains("uri")
&& (value.starts_with("postgres://") || value.starts_with("postgresql://"))
{
return redact_database_url(value);
}
if is_secret_env_key(&normalized_key) {
return mask_secret(value);
}
value.to_string()
}
fn is_secret_env_key(key: &str) -> bool {
key == "athena_key_12"
|| key == "jwt_secret"
|| key == "athena_debug_jwt_secret"
|| key.ends_with("_token")
|| key.ends_with("_key")
|| key.contains("secret")
|| key.contains("password")
}
fn mask_secret(value: &str) -> String {
let trimmed = value.trim();
if trimmed.len() <= 8 {
return "<redacted>".to_string();
}
let prefix = &trimmed[..4];
let suffix = &trimmed[trimmed.len() - 4..];
format!("{prefix}...{suffix}")
}
fn build_process_debug(app_state: &AppState) -> Value {
json!({
"pid": std::process::id(),
"current_dir": std::env::current_dir()
.ok()
.map(|value| value.to_string_lossy().to_string()),
"current_exe": std::env::current_exe()
.ok()
.map(|value| value.to_string_lossy().to_string()),
"process_start_time_seconds": app_state.process_start_time_seconds,
"uptime_seconds": app_state.process_started_at.elapsed().as_secs_f64(),
"runtime_config_path": app_state.runtime_config_path.clone(),
"runtime_config_source_label": app_state.runtime_config_source_label.clone(),
"runtime_config_seeded_default": app_state.runtime_config_seeded_default,
})
}
fn redact_database_url(database_url: &str) -> String {
let Ok(mut parsed) = Url::parse(database_url) else {
return "<invalid_database_url>".to_string();
};
if !parsed.username().is_empty() {
let _ = parsed.set_username("***");
}
if parsed.password().is_some() {
let _ = parsed.set_password(Some("***"));
}
parsed.to_string()
}
async fn probe_relation_status(
pool: &PgPool,
lookup_name: &str,
effective_schemas: &[String],
) -> Value {
match timeout(DEBUG_PROBE_TIMEOUT, async {
sqlx::query(
r#"
SELECT
n.nspname AS schema_name,
c.relname AS relation_name,
CASE c.relkind
WHEN 'r' THEN 'BASE TABLE'
WHEN 'p' THEN 'PARTITIONED TABLE'
WHEN 'v' THEN 'VIEW'
WHEN 'm' THEN 'MATERIALIZED VIEW'
WHEN 'f' THEN 'FOREIGN TABLE'
ELSE c.relkind::text
END AS relation_type
FROM pg_class c
INNER JOIN pg_namespace n
ON n.oid = c.relnamespace
WHERE c.oid = to_regclass($1)
"#,
)
.bind(lookup_name)
.fetch_optional(pool)
.await
})
.await
{
Err(_) => json!({
"status": "probe_failed",
"lookup_name": lookup_name,
"error": format!("probe timed out after {}s", DEBUG_PROBE_TIMEOUT.as_secs()),
}),
Ok(Err(error)) => json!({
"status": "probe_failed",
"lookup_name": lookup_name,
"error": error.to_string(),
}),
Ok(Ok(None)) => json!({
"status": "missing",
"lookup_name": lookup_name,
"found": false,
}),
Ok(Ok(Some(row))) => {
let schema_name: Option<String> = row.try_get("schema_name").ok();
let relation_name: Option<String> = row.try_get("relation_name").ok();
let qualified_name = match (schema_name.as_deref(), relation_name.as_deref()) {
(Some(schema), Some(relation)) => Some(format!("{schema}.{relation}")),
_ => None,
};
let visible_on_search_path = schema_name
.as_ref()
.map(|schema| effective_schemas.iter().any(|value| value == schema));
json!({
"status": "ok",
"lookup_name": lookup_name,
"found": true,
"schema_name": schema_name,
"relation_name": relation_name,
"qualified_name": qualified_name,
"relation_type": row.try_get::<String, _>("relation_type").ok(),
"visible_on_search_path": visible_on_search_path,
})
}
}
}
fn relation_overall_status(
runtime_status: &Value,
mapped_status: Option<&Value>,
mapping_applied_at_runtime: bool,
) -> &'static str {
let runtime_state = runtime_status
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown");
if runtime_state == "probe_failed" {
return "probe_failed";
}
if runtime_state == "missing" {
return "missing_runtime";
}
let Some(mapped_status) = mapped_status else {
return "ok";
};
let mapped_state = mapped_status
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown");
if mapped_state == "probe_failed" {
return "probe_failed";
}
if mapped_state == "missing" {
return if mapping_applied_at_runtime {
"missing_mapped"
} else {
"diagnostic_only_missing"
};
}
let runtime_qualified = runtime_status
.get("qualified_name")
.and_then(Value::as_str)
.unwrap_or_default();
let mapped_qualified = mapped_status
.get("qualified_name")
.and_then(Value::as_str)
.unwrap_or_default();
if mapping_applied_at_runtime
&& !mapped_qualified.is_empty()
&& runtime_qualified != mapped_qualified
{
return "mismatch";
}
if !mapping_applied_at_runtime
&& !mapped_qualified.is_empty()
&& runtime_qualified != mapped_qualified
{
return "diagnostic_only";
}
"ok"
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalizes_tenant_header_values() {
let normalized = normalize_tenant_header_value(Some(" reporting.v2 ".to_string()))
.expect("valid header should normalize");
assert_eq!(normalized.as_deref(), Some("reporting.v2"));
}
#[test]
fn rejects_invalid_tenant_header_values() {
let error = normalize_tenant_header_value(Some("reporting/v2".to_string()))
.expect_err("invalid header should fail");
assert!(error.contains("invalid tenant header value"));
}
#[test]
fn query_secret_access_is_safe_by_default() {
let access = evaluate_privileged_debug_access(None);
assert!(!access.enabled);
assert!(
access.reason == "server_jwt_secret_unavailable"
|| access.reason == "jwt_secret_query_missing"
);
}
}