use actix_web::{
HttpRequest, HttpResponse, Responder, put,
web::{self, Data, Json, Path},
};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value, json};
use sqlx::postgres::PgPool;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::provision::helpers::{normalize_public_host, normalize_route_binding_key};
use crate::api::response::{api_success, bad_request, internal_error, not_found};
use crate::athena::postgres_clients::{
configured_catalog_postgres_uri, postgres_uri_targets_loopback_host,
};
use crate::data::clients::{
AthenaClientRecord, SaveAthenaClientParams, get_athena_client_by_name, upsert_athena_client,
};
use crate::data::public_routes::{
PatchPublicGatewayRouteParams, PublicGatewayRouteRecord, SavePublicGatewayRouteParams,
create_public_gateway_route, get_public_gateway_route_by_key, patch_public_gateway_route,
};
use crate::provisioning::{
build_catalog_public_proxy_binding, check_dns_host, merge_catalog_public_proxy_metadata,
postgres_uri_port, private_catalog_pg_uri, wildcard_host_pattern_from_env,
wildcard_public_host_for_route_key,
};
use super::{
apply_client_record_to_runtime, client_catalog_pool, database_error_response,
ensure_public_route_client_eligible, normalize_allowed_ops, normalize_client_name,
normalize_public_route_target_url,
};
#[derive(Debug, Deserialize)]
struct UpsertTenantHostnameRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
allowed_ops: Option<Vec<String>>,
#[serde(default)]
enable_http_route: Option<bool>,
#[serde(default)]
enable_postgres_binding: Option<bool>,
#[serde(default)]
target_url: Option<String>,
#[serde(default)]
public_host: Option<String>,
#[serde(default)]
public_port: Option<u16>,
#[serde(default)]
route_metadata: Option<Value>,
#[serde(default = "default_true")]
persist_in_catalog: bool,
}
#[derive(Debug, Serialize)]
struct UpsertTenantHostnameResponse {
tenant: String,
derived_host: String,
http_route: Option<PublicGatewayRouteRecord>,
postgres_binding: Option<TenantHostnamePostgresBindingResponse>,
wildcard_pattern: String,
}
#[derive(Debug, Serialize)]
struct TenantHostnamePostgresBindingResponse {
public_pg_uri: String,
binding: Value,
dns: Option<Value>,
persisted_in_catalog: bool,
}
fn default_true() -> bool {
true
}
fn default_allowed_ops() -> Vec<String> {
vec!["fetch".to_string()]
}
fn default_target_url_allowed_ops() -> Vec<String> {
vec!["fetch".to_string()]
}
fn normalize_optional_route_metadata(
value: Option<&Value>,
) -> Result<Option<&Value>, HttpResponse> {
match value {
None | Some(Value::Null) => Ok(None),
Some(Value::Object(_)) => Ok(value),
Some(_) => Err(bad_request(
"Invalid route metadata",
"'route_metadata' must be a JSON object when provided.",
)),
}
}
fn merge_route_metadata(
base: Option<&Value>,
patch: Option<&Value>,
) -> Result<Value, HttpResponse> {
let patch = normalize_optional_route_metadata(patch)?;
if patch.is_none() {
return Ok(base.cloned().unwrap_or_else(|| json!({})));
}
let mut merged: Map<String, Value> =
base.and_then(Value::as_object).cloned().unwrap_or_default();
if let Some(Value::Object(patch_obj)) = patch {
for (key, value) in patch_obj {
merged.insert(key.clone(), value.clone());
}
}
Ok(Value::Object(merged))
}
fn resolve_tenant_public_host(
tenant: &str,
explicit_public_host: Option<&str>,
) -> Result<String, HttpResponse> {
if let Some(public_host) = explicit_public_host {
return normalize_public_host(public_host).ok_or_else(|| {
bad_request(
"Invalid public host",
"Provide a valid 'public_host' or omit it to derive from the wildcard pattern.",
)
});
}
wildcard_public_host_for_route_key(tenant)
.map_err(|err| bad_request("Invalid wildcard public host", err))
}
fn client_should_promote_public_pg_uri(record: &AthenaClientRecord) -> bool {
configured_catalog_postgres_uri(record)
.as_deref()
.is_some_and(postgres_uri_targets_loopback_host)
|| private_catalog_pg_uri(&record.metadata)
.as_deref()
.is_some_and(postgres_uri_targets_loopback_host)
}
fn resolve_source_pg_uri(record: &AthenaClientRecord) -> Option<String> {
private_catalog_pg_uri(&record.metadata).or_else(|| configured_catalog_postgres_uri(record))
}
async fn load_eligible_client_record(
pool: &PgPool,
client_name: &str,
) -> Result<AthenaClientRecord, HttpResponse> {
ensure_public_route_client_eligible(pool, client_name).await?;
match get_athena_client_by_name(pool, client_name).await {
Ok(Some(record)) => Ok(record),
Ok(None) => Err(not_found(
"Athena client not found",
format!("No client exists for '{}'.", client_name),
)),
Err(err) => Err(database_error_response("Failed to load Athena client", err)),
}
}
async fn upsert_tenant_public_route(
pool: &PgPool,
tenant: &str,
client_name: &str,
target_url: Option<String>,
allowed_ops: Vec<String>,
route_metadata_patch: Option<&Value>,
) -> Result<PublicGatewayRouteRecord, HttpResponse> {
let existing = get_public_gateway_route_by_key(pool, tenant)
.await
.map_err(|err| database_error_response("Failed to load public route", err))?;
match existing {
Some(route) => {
let next_metadata = merge_route_metadata(Some(&route.metadata), route_metadata_patch)?;
if route.client_name == client_name
&& route.target_url == target_url
&& route.allowed_ops == allowed_ops
&& route.is_active
&& route.metadata == next_metadata
{
return Ok(route);
}
match patch_public_gateway_route(
pool,
tenant,
PatchPublicGatewayRouteParams {
client_name: Some(client_name.to_string()),
target_url: target_url.clone(),
allowed_ops: Some(allowed_ops),
is_active: Some(true),
metadata: Some(next_metadata),
},
)
.await
{
Ok(Some(route)) => Ok(route),
Ok(None) => Err(not_found(
"Public route not found",
format!("No public route '{}'.", tenant),
)),
Err(err) => Err(database_error_response(
"Failed to update public route",
err,
)),
}
}
None => {
let metadata = merge_route_metadata(None, route_metadata_patch)?;
create_public_gateway_route(
pool,
SavePublicGatewayRouteParams {
route_key: tenant.to_string(),
client_name: client_name.to_string(),
target_url,
allowed_ops,
is_active: true,
metadata,
},
)
.await
.map_err(|err| database_error_response("Failed to create public route", err))
}
}
}
async fn upsert_tenant_postgres_binding(
state: &AppState,
pool: &PgPool,
client_record: &AthenaClientRecord,
tenant: &str,
derived_host: &str,
requested_public_port: Option<u16>,
persist_in_catalog: bool,
) -> Result<TenantHostnamePostgresBindingResponse, HttpResponse> {
let source_pg_uri = resolve_source_pg_uri(client_record).ok_or_else(|| {
internal_error(
"Client URI unavailable",
format!(
"No source Postgres URI is available for client '{}'.",
client_record.client_name
),
)
})?;
let public_port = requested_public_port
.or_else(|| postgres_uri_port(&source_pg_uri))
.unwrap_or(5432);
let binding = build_catalog_public_proxy_binding(
&client_record.client_name,
tenant,
&source_pg_uri,
derived_host,
public_port,
)
.map_err(|err| bad_request("Invalid Postgres URI", err))?;
let dns = binding
.binding
.get("public")
.and_then(|value| value.get("host"))
.and_then(Value::as_str)
.map(|host| host.to_string());
let dns_status = match dns {
Some(host) => Some(json!(check_dns_host(&host).await)),
None => None,
};
if persist_in_catalog {
let next_metadata = merge_catalog_public_proxy_metadata(
client_record.metadata.clone(),
&source_pg_uri,
&binding,
)
.map_err(|err| internal_error("Failed to persist route binding metadata", err))?;
let next_pg_uri = if client_should_promote_public_pg_uri(client_record) {
Some(binding.public_pg_uri.clone())
} else {
client_record.pg_uri.clone()
};
let next_record =
if client_record.metadata == next_metadata && client_record.pg_uri == next_pg_uri {
client_record.clone()
} else {
upsert_athena_client(
pool,
SaveAthenaClientParams {
client_name: client_record.client_name.clone(),
description: client_record.description.clone(),
pg_uri: next_pg_uri,
pg_uri_env_var: client_record.pg_uri_env_var.clone(),
config_uri_template: client_record.config_uri_template.clone(),
source: client_record.source.clone(),
is_active: client_record.is_active,
is_frozen: client_record.is_frozen,
metadata: next_metadata,
},
)
.await
.map_err(|err| {
database_error_response("Failed to persist route binding metadata", err)
})?
};
if let Err(err) = apply_client_record_to_runtime(state, &next_record).await {
return Err(internal_error(
"Failed to refresh Athena client",
err.to_string(),
));
}
}
Ok(TenantHostnamePostgresBindingResponse {
public_pg_uri: binding.public_pg_uri,
binding: binding.binding,
dns: dns_status,
persisted_in_catalog: persist_in_catalog,
})
}
#[put("/admin/tenant-hostnames/{tenant}")]
async fn admin_upsert_tenant_hostname(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<String>,
body: Json<UpsertTenantHostnameRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let target_url = match normalize_public_route_target_url(body.target_url.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let enable_http_route = body.enable_http_route.unwrap_or(true);
let enable_postgres_binding = body.enable_postgres_binding.unwrap_or(target_url.is_none());
if !enable_http_route && !enable_postgres_binding {
return bad_request(
"Nothing to do",
"Enable at least one of 'enable_http_route' or 'enable_postgres_binding'.",
);
}
let tenant = match normalize_route_binding_key(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let client_name = match normalize_client_name(body.client_name.as_deref().unwrap_or(&tenant)) {
Ok(value) => value,
Err(resp) => return resp,
};
let derived_host = match resolve_tenant_public_host(&tenant, body.public_host.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let wildcard_pattern = wildcard_host_pattern_from_env();
let pool = match client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let needs_local_client = enable_postgres_binding || (enable_http_route && target_url.is_none());
let client_record = if needs_local_client {
match load_eligible_client_record(&pool, &client_name).await {
Ok(record) => Some(record),
Err(resp) => return resp,
}
} else {
None
};
let http_route = if enable_http_route {
let requested_ops = body.allowed_ops.clone().unwrap_or_else(|| {
if target_url.is_some() {
default_target_url_allowed_ops()
} else {
default_allowed_ops()
}
});
let allowed_ops = match normalize_allowed_ops(&requested_ops) {
Ok(value) => value,
Err(resp) => return resp,
};
match upsert_tenant_public_route(
&pool,
&tenant,
&client_name,
target_url.clone(),
allowed_ops,
body.route_metadata.as_ref(),
)
.await
{
Ok(route) => Some(route),
Err(resp) => return resp,
}
} else {
None
};
let postgres_binding = if enable_postgres_binding {
let Some(client_record) = client_record.as_ref() else {
return internal_error(
"Client URI unavailable",
"PostgreSQL binding requires a locally configured Athena client.",
);
};
match upsert_tenant_postgres_binding(
app_state.get_ref(),
&pool,
client_record,
&tenant,
&derived_host,
body.public_port,
body.persist_in_catalog,
)
.await
{
Ok(binding) => Some(binding),
Err(resp) => return resp,
}
} else {
None
};
api_success(
"Upserted tenant hostname",
json!(UpsertTenantHostnameResponse {
tenant,
derived_host,
http_route,
postgres_binding,
wildcard_pattern,
}),
)
}
#[cfg(test)]
mod tests {
use super::{
AthenaClientRecord, client_should_promote_public_pg_uri, default_allowed_ops,
merge_route_metadata,
};
use chrono::Utc;
use serde_json::{Value, json};
fn record(pg_uri: Option<&str>, metadata: Value) -> AthenaClientRecord {
AthenaClientRecord {
id: "id".to_string(),
client_name: "tenant".to_string(),
description: Some("Tenant".to_string()),
pg_uri: pg_uri.map(str::to_string),
pg_uri_env_var: None,
config_uri_template: None,
source: "database".to_string(),
is_active: true,
is_frozen: false,
last_synced_from_config_at: None,
last_seen_at: None,
metadata,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
}
}
#[test]
fn default_allowed_ops_match_expected_shape() {
assert_eq!(default_allowed_ops(), vec!["fetch"]);
}
#[test]
fn merge_route_metadata_overlays_object_fields() {
let merged = merge_route_metadata(
Some(&json!({
"managed_by": "admin_api",
"existing": true
})),
Some(&json!({
"existing": false,
"tenant": "acme"
})),
)
.expect("metadata");
assert_eq!(
merged,
json!({
"managed_by": "admin_api",
"existing": false,
"tenant": "acme"
})
);
}
#[test]
fn merge_route_metadata_rejects_non_object_patch() {
let err = merge_route_metadata(Some(&json!({})), Some(&json!(["bad"])))
.expect_err("expected validation error");
assert_eq!(err.status(), actix_web::http::StatusCode::BAD_REQUEST);
}
#[test]
fn client_promotes_public_pg_uri_for_private_loopback_metadata() {
let record = record(
None,
json!({
"network": {
"private_pg_uri": "postgres://athena:secret@127.0.0.1:5432/tenant"
}
}),
);
assert!(client_should_promote_public_pg_uri(&record));
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_upsert_tenant_hostname);
}