use actix_web::{
HttpRequest, HttpResponse, delete, get, post,
web::{self, Data, Json, Path, Query},
};
use athena_schema_heal::{
SchemaHealAuditConfig, SchemaHealContext, SchemaHealPlan, SchemaHealer, SchemaMutation,
};
use serde_json::{Map, Value, json};
use sqlx::postgres::PgPool;
use sqlx::{Pool, Postgres, Row};
use std::collections::HashMap;
use std::time::Instant;
use tracing::{error, info};
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
api_created, api_success, bad_request, internal_error, not_found, service_unavailable,
};
use crate::data::clients::AthenaClientRecord;
use crate::data::clients::{
SaveAthenaClientParams, delete_athena_client, get_athena_client_by_name, list_athena_clients,
upsert_athena_client,
};
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, RegisteredClient};
use crate::provisioning::{
CatalogPublicProxyBinding, DockerContainerStatus, DockerManagedContainer,
InstallLocalProvisionDependenciesParams, LocalClusterCreateDatabaseParams,
LocalClusterDatabaseCreateOptions, ProvisioningError, SpinUpPostgresParams,
SpinUpPostgresResult, build_catalog_public_proxy_binding, check_dns_host,
create_postgres_database, inspect_container, inspect_local_provisioning_dependencies,
install_local_provisioning_dependencies, list_managed_postgres_containers,
list_postgres_databases, managed_base_domain_from_pattern, merge_catalog_public_proxy_metadata,
postgres_major_version_from_image, remove_container, replace_uri_database_name,
run_provision_sql, runtime_config_from_root, spin_up_postgres_instance, start_container,
stop_container, summarize_provisioning_metadata, wildcard_host_pattern_from_env,
wildcard_public_host_for_route_key,
};
mod connection_uri;
mod container_resolution;
pub(crate) mod helpers;
mod monitoring;
use self::connection_uri::resolve_client_connection_uri;
use self::container_resolution::{resolve_instance_client_name, resolve_status_client_name_alias};
#[cfg(test)]
use self::helpers::normalize_public_host;
pub(crate) use self::helpers::register_provisioned_client;
use self::helpers::{
bool_or_default, client_catalog_pool, ensure_runtime_client_pool,
load_athena_managed_databases, local_cluster_target, local_mode_as_str, map_provisioning_error,
masked_pg_uri, normalize_route_binding_key, now_ms, pipeline_error_response,
provisioning_error_kind, provisioning_error_message, reconnect_runtime_client,
resolve_managed_catalog_public_proxy, resolve_public_host, resolve_uri, set_pipeline_step,
};
use self::monitoring::{build_provisioning_monitor_entries, summarize_provisioning_monitors};
#[cfg(test)]
use crate::provisioning::ManagedCatalogPublicProxyRequest;
#[cfg(test)]
use crate::provisioning::private_catalog_pg_uri;
use crate::provisioning::{
BindInstancePortRouteRequest, DeleteInstanceRequest, InstallProvisionDependenciesRequest,
InstanceLifecycleRequest, InstanceStatusQuery, LocalClusterCreateDatabaseRequest,
LocalClusterDatabasesQuery, LocalProvisionAdvancedOptions, LocalProvisionMode,
LocalProvisionPipelineRequest, ProvisionRequest, ProvisionStatusQuery, SpinUpInstanceRequest,
};
fn managed_cluster_domain() -> String {
managed_base_domain_from_pattern(&wildcard_host_pattern_from_env())
.unwrap_or_else(|| athena_dns::DEFAULT_MANAGED_BASE_DOMAIN.to_string())
}
fn default_managed_postgres_description() -> String {
format!(
"Managed Postgres instance ({})",
athena_dns::DEFAULT_MANAGED_CLUSTER_NAME
)
}
fn default_shared_cluster_description() -> String {
format!(
"Shared cluster database ({})",
athena_dns::DEFAULT_MANAGED_CLUSTER_NAME
)
}
async fn binding_dns_status(binding: Option<&CatalogPublicProxyBinding>) -> Option<Value> {
let host = binding
.and_then(|value| value.binding.get("public"))
.and_then(|value| value.get("host"))
.and_then(Value::as_str)?;
Some(json!(check_dns_host(host).await))
}
fn deferred_queue_schema_heal_plan() -> SchemaHealPlan {
let context: SchemaHealContext = SchemaHealContext::new("athena_logging", "public")
.with_object_type("deferred_queue_schema_self_heal")
.with_table_name("gateway_deferred_request_queue");
SchemaHealPlan::new("deferred_queue_runtime_storage", context)
.with_mutation(SchemaMutation::ensure_table(
"ensure_gateway_deferred_request_queue_table",
"public",
"gateway_deferred_request_queue",
r#"
CREATE TABLE IF NOT EXISTS public.gateway_deferred_request_queue (
request_id text PRIMARY KEY,
method text NOT NULL,
route text NOT NULL,
client_name text,
request_bytes bigint,
payload jsonb NOT NULL,
status text NOT NULL DEFAULT 'queued',
deferred_reason text,
attempts integer NOT NULL DEFAULT 0,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
next_attempt_at timestamptz,
completed_at timestamptz,
error_message text,
result_summary jsonb
)
"#,
))
.with_mutation(SchemaMutation::ensure_column(
"ensure_gateway_deferred_request_queue_next_attempt_at",
"public",
"gateway_deferred_request_queue",
"next_attempt_at",
"ALTER TABLE public.gateway_deferred_request_queue ADD COLUMN next_attempt_at timestamptz",
))
.with_mutation(SchemaMutation::ensure_column(
"ensure_gateway_deferred_request_queue_result_summary",
"public",
"gateway_deferred_request_queue",
"result_summary",
"ALTER TABLE public.gateway_deferred_request_queue ADD COLUMN result_summary jsonb",
))
.with_mutation(SchemaMutation::ensure_index(
"ensure_idx_gateway_deferred_request_queue_status",
"public",
"gateway_deferred_request_queue",
"idx_gateway_deferred_request_queue_status",
"CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_status ON public.gateway_deferred_request_queue(status)",
))
.with_mutation(SchemaMutation::ensure_index(
"ensure_idx_gateway_deferred_request_queue_created_at",
"public",
"gateway_deferred_request_queue",
"idx_gateway_deferred_request_queue_created_at",
"CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_created_at ON public.gateway_deferred_request_queue(created_at DESC)",
))
.with_mutation(SchemaMutation::ensure_index(
"ensure_idx_gateway_deferred_request_queue_status_created",
"public",
"gateway_deferred_request_queue",
"idx_gateway_deferred_request_queue_status_created",
"CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_status_created ON public.gateway_deferred_request_queue(status, created_at ASC)",
))
.with_mutation(SchemaMutation::ensure_index(
"ensure_idx_gateway_deferred_request_queue_completed_at",
"public",
"gateway_deferred_request_queue",
"idx_gateway_deferred_request_queue_completed_at",
"CREATE INDEX IF NOT EXISTS idx_gateway_deferred_request_queue_completed_at ON public.gateway_deferred_request_queue(completed_at DESC) WHERE status IN ('completed', 'dead_letter')",
))
}
pub(crate) async fn ensure_deferred_queue_table_initialized(pool: &PgPool) -> Result<bool, String> {
let report = SchemaHealer::new(pool)
.with_audit_config(SchemaHealAuditConfig {
enabled: true,
actor: "athena_deferred_runtime".to_string(),
})
.apply_plan(&deferred_queue_schema_heal_plan())
.await
.map_err(|err| format!("failed to self-heal deferred queue table: {err}"))?;
if report.applied_count() > 0 {
info!(
plan = %report.plan_name,
applied = report.applied_count(),
skipped = report.skipped_count(),
"Applied deferred queue runtime schema self-heal mutations"
);
}
Ok(report.applied_count() > 0)
}
#[post("/admin/provision")]
pub async fn admin_provision(
req: HttpRequest,
state: Data<AppState>,
body: Json<ProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pg_uri: String = match resolve_uri(state.get_ref(), &body) {
Ok(uri) => uri,
Err(resp) => return resp,
};
let statement_count: usize = match run_provision_sql(&pg_uri).await {
Ok(n) => n,
Err(err) => return map_provisioning_error("Provisioning failed", err),
};
let defaults = runtime_config_from_root();
let registered_name: Option<String> = if body.register && body.uri.is_some() {
let name: String = body
.register_name
.clone()
.unwrap_or_else(|| "provisioned".to_string());
let target = ClientConnectionTarget {
client_name: name.clone(),
source: "database".to_string(),
description: Some("Provisioned via API".to_string()),
pg_uri: Some(pg_uri.clone()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
tracing::warn!(
client = %name,
error = %err,
"provisioned but failed to register client"
);
}
Some(name)
} else {
None
};
let client_label = body
.client_name
.clone()
.or(registered_name.clone())
.unwrap_or_else(|| "direct".to_string());
api_success(
format!("Provisioned {} statements", statement_count),
json!({
"statements_executed": statement_count,
"client": client_label,
"registered": registered_name,
"tables": defaults.expected_tables,
}),
)
}
#[get("/admin/provision/status")]
pub async fn admin_provision_status(
req: HttpRequest,
state: Data<AppState>,
query: Query<ProvisionStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let requested_client_name = query.client_name.trim().to_string();
if requested_client_name.is_empty() {
return bad_request("Missing required field", "'client_name' must not be empty.");
}
let resolved_client_name =
resolve_status_client_name_alias(state.get_ref(), &requested_client_name).await;
let pool = match ensure_runtime_client_pool(state.get_ref(), &resolved_client_name).await {
Ok(value) => value,
Err(resp) => return resp,
};
let rows = match sqlx::query(
"SELECT table_name::text FROM information_schema.tables WHERE table_schema = 'public'",
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => return internal_error("Failed to query tables", err.to_string()),
};
let existing_tables: Vec<String> = rows
.iter()
.filter_map(|row| row.try_get::<String, _>("table_name").ok())
.collect();
let defaults = runtime_config_from_root();
let present: Vec<String> = defaults
.expected_tables
.iter()
.filter(|table| existing_tables.iter().any(|existing| existing == *table))
.cloned()
.collect();
let missing: Vec<String> = defaults
.expected_tables
.iter()
.filter(|table| !existing_tables.iter().any(|existing| existing == *table))
.cloned()
.collect();
api_success(
format!("Provisioning status for '{}'", resolved_client_name),
json!({
"client_name": resolved_client_name,
"requested_client_name": requested_client_name,
"provisioned": missing.is_empty(),
"present_tables": present,
"missing_tables": missing,
}),
)
}
#[get("/admin/provision/local/databases")]
pub async fn admin_list_local_cluster_databases(
req: HttpRequest,
state: Data<AppState>,
query: Query<LocalClusterDatabasesQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &query.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let all_databases: Vec<String> = match list_postgres_databases(&pool).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to list local cluster databases", err),
};
let managed_databases: Vec<Value> =
match load_athena_managed_databases(state.get_ref(), &server_pg_uri).await {
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Listed local cluster databases",
json!({
"client_name": query.client_name,
"all_databases": all_databases,
"athena_managed_databases": managed_databases,
}),
)
}
#[post("/admin/provision/local/databases")]
pub async fn admin_create_local_cluster_database(
req: HttpRequest,
state: Data<AppState>,
body: Json<LocalClusterCreateDatabaseRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &body.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let create_params = LocalClusterCreateDatabaseParams {
database_name: body.database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: body.owner.clone(),
template: body.template.clone(),
encoding: body.encoding.clone(),
lc_collate: body.lc_collate.clone(),
lc_ctype: body.lc_ctype.clone(),
tablespace: body.tablespace.clone(),
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
return map_provisioning_error("Failed to create local cluster database", err);
}
let database_uri: String = match replace_uri_database_name(&server_pg_uri, &body.database_name)
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to build database URI", err),
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&database_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision newly created database", err);
}
})
} else {
None
};
let register_name: String = body
.register_name
.clone()
.unwrap_or_else(|| body.database_name.clone());
let register_description = body
.description
.clone()
.or_else(|| Some(format!("Local cluster database '{}'", body.database_name)));
let cluster_domain = managed_cluster_domain();
let base_metadata = json!({
"managed_by": "provision_api",
"provider": "local_cluster",
"cluster_name": athena_dns::DEFAULT_MANAGED_CLUSTER_NAME,
"cluster_domain": cluster_domain,
"database_engine": "postgresql",
"server_client_name": body.client_name.clone(),
"database_name": body.database_name.clone(),
"instance": {
"database_name": body.database_name.clone(),
"database_engine": "postgresql",
},
"create_options": {
"owner": body.owner.clone(),
"template": body.template.clone(),
"encoding": body.encoding.clone(),
"lc_collate": body.lc_collate.clone(),
"lc_ctype": body.lc_ctype.clone(),
"tablespace": body.tablespace.clone(),
},
});
let (catalog_pg_uri, metadata, public_proxy_binding) =
match resolve_managed_catalog_public_proxy(
&req,
®ister_name,
&database_uri,
base_metadata,
body.catalog_public_proxy.as_ref(),
) {
Ok(value) => value,
Err(resp) => return resp,
};
let public_proxy_dns = binding_dns_status(public_proxy_binding.as_ref()).await;
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
®ister_name,
register_description,
&database_uri,
Some(&catalog_pg_uri),
metadata,
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_created(
"Created local cluster database",
json!({
"server_client_name": body.client_name,
"database_name": body.database_name,
"database_uri": database_uri,
"catalog_pg_uri": catalog_pg_uri,
"catalog_public_proxy": public_proxy_binding.as_ref().map(|binding| binding.binding.clone()),
"catalog_public_proxy_dns": public_proxy_dns,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
"register_name": register_name,
}
}),
)
}
#[post("/admin/provision/local/pipeline")]
pub async fn admin_run_local_provision_pipeline(
req: HttpRequest,
state: Data<AppState>,
body: Json<LocalProvisionPipelineRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = body.client_name.trim().to_lowercase();
if client_name.is_empty() {
return bad_request("Missing required field", "'client_name' must not be empty.");
}
let mode: LocalProvisionMode = body.mode.clone();
let defaults = runtime_config_from_root();
let advanced: LocalProvisionAdvancedOptions = body.advanced.clone().unwrap_or_default();
let pipeline_id: String = Uuid::new_v4().to_string();
let started: Instant = Instant::now();
let mut steps: Map<String, Value> = Map::new();
let should_provision: bool = bool_or_default(advanced.provision_schema, true);
let register_runtime: bool = bool_or_default(advanced.register_runtime, true);
let register_catalog: bool = bool_or_default(advanced.register_catalog, true);
let mut shared_cluster_client_name: Option<String> = None;
let pg_uri: String;
let instance_data: Value;
let infra_started = Instant::now();
match &mode {
LocalProvisionMode::DedicatedContainer => {
let spin_result: SpinUpPostgresResult =
match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: client_name.clone(),
container_name: advanced.container_name.clone(),
image: advanced
.image
.clone()
.or_else(|| defaults.postgres_image.clone()),
host: advanced
.host
.clone()
.or_else(|| defaults.instance_host.clone()),
host_port: advanced.host_port,
db_name: advanced.db_name.clone(),
username: advanced.username.clone(),
password: advanced.password.clone(),
startup_timeout_secs: advanced
.startup_timeout_secs
.or(defaults.startup_timeout_secs),
reuse_existing: bool_or_default(advanced.reuse_existing, true),
})
.await
{
Ok(value) => value,
Err(err) => {
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"failed",
now_ms(&infra_started),
json!({ "error": "Failed to create or reuse Docker container." }),
);
return pipeline_error_response(
err,
&pipeline_id,
&mode,
&client_name,
None,
);
}
};
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"success",
now_ms(&infra_started),
json!({
"container_name": spin_result.container_name,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"host_port": spin_result.host_port,
}),
);
set_pipeline_step(
&mut steps,
"readiness_check",
"success",
spin_result.wait_ready_ms,
json!({ "wait_ready_ms": spin_result.wait_ready_ms }),
);
pg_uri = spin_result.pg_uri.clone();
instance_data = json!({
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"database_engine": "postgresql",
"database_version": postgres_major_version_from_image(&spin_result.image),
"password": spin_result.password,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
});
}
LocalProvisionMode::SharedClusterDatabase => {
let shared_client: String = body
.shared_cluster_client_name
.as_deref()
.map(str::trim)
.unwrap_or_default()
.to_string();
if shared_client.is_empty() {
return bad_request(
"Missing required field",
"'shared_cluster_client_name' is required for mode='shared_cluster_database'.",
);
}
shared_cluster_client_name = Some(shared_client.clone());
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &shared_client)
{
Ok(value) => value,
Err(resp) => return resp,
};
let database_name: String = advanced
.db_name
.clone()
.unwrap_or_else(|| client_name.clone());
let create_params: LocalClusterCreateDatabaseParams =
LocalClusterCreateDatabaseParams {
database_name: database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: None,
template: None,
encoding: None,
lc_collate: None,
lc_ctype: None,
tablespace: None,
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"failed",
now_ms(&infra_started),
json!({ "error": "Failed to create database in shared cluster." }),
);
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"success",
now_ms(&infra_started),
json!({
"shared_cluster_client_name": shared_client,
"database_name": database_name,
}),
);
set_pipeline_step(
&mut steps,
"readiness_check",
"success",
0,
json!({ "strategy": "shared_cluster_existing_connection" }),
);
pg_uri = match replace_uri_database_name(&server_pg_uri, &database_name) {
Ok(value) => value,
Err(err) => {
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
};
instance_data = json!({
"shared_cluster_client_name": shared_client,
"shared_client_cluster_name": shared_client,
"database_name": database_name,
"database_engine": "postgresql",
});
}
}
let provision_started: Instant = Instant::now();
let mut statements_executed: Option<usize> = None;
if should_provision {
match run_provision_sql(&pg_uri).await {
Ok(total) => {
statements_executed = Some(total);
set_pipeline_step(
&mut steps,
"schema_provision",
"success",
now_ms(&provision_started),
json!({ "statements_executed": total }),
);
}
Err(err) => {
set_pipeline_step(
&mut steps,
"schema_provision",
"failed",
now_ms(&provision_started),
json!({ "error": "Failed to apply Athena schema." }),
);
let rollback_hint = if let Some(name) =
instance_data.get("container_name").and_then(Value::as_str)
{
Some(format!(
"Rollback hint: delete container '{}' via DELETE /admin/provision/instances/{}",
name, name
))
} else {
None
};
return pipeline_error_response(
err,
&pipeline_id,
&mode,
&client_name,
rollback_hint.as_deref(),
);
}
}
} else {
set_pipeline_step(
&mut steps,
"schema_provision",
"skipped",
0,
json!({ "reason": "provision_schema=false" }),
);
}
let register_started = Instant::now();
let description = advanced.description.clone().or_else(|| match &mode {
LocalProvisionMode::DedicatedContainer => Some(default_managed_postgres_description()),
LocalProvisionMode::SharedClusterDatabase => Some(default_shared_cluster_description()),
});
let cluster_domain = managed_cluster_domain();
let metadata = json!({
"managed_by": "provision_pipeline",
"provider": match &mode {
LocalProvisionMode::DedicatedContainer => "docker",
LocalProvisionMode::SharedClusterDatabase => "local_cluster",
},
"mode": local_mode_as_str(&mode),
"shared_cluster_client_name": shared_cluster_client_name,
"shared_client_cluster_name": shared_cluster_client_name,
"cluster_name": athena_dns::DEFAULT_MANAGED_CLUSTER_NAME,
"cluster_domain": cluster_domain,
"database_engine": "postgresql",
"database_version": instance_data.get("database_version").and_then(Value::as_str),
"instance": instance_data,
});
let (catalog_pg_uri, metadata, public_proxy_binding) =
match resolve_managed_catalog_public_proxy(
&req,
&client_name,
&pg_uri,
metadata,
advanced.catalog_public_proxy.as_ref(),
) {
Ok(value) => value,
Err(resp) => return resp,
};
let public_proxy_dns = binding_dns_status(public_proxy_binding.as_ref()).await;
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&client_name,
description,
&pg_uri,
Some(&catalog_pg_uri),
metadata,
register_runtime,
register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
set_pipeline_step(
&mut steps,
"runtime_register",
if register_runtime {
if runtime_registered {
"success"
} else {
"failed"
}
} else {
"skipped"
},
now_ms(®ister_started),
json!({
"requested": register_runtime,
"registered": runtime_registered,
}),
);
set_pipeline_step(
&mut steps,
"catalog_register",
if register_catalog {
if catalog_registered {
"success"
} else {
"failed"
}
} else {
"skipped"
},
now_ms(®ister_started),
json!({
"requested": register_catalog,
"registered": catalog_registered,
}),
);
api_success(
"Provisioning pipeline completed",
json!({
"pipeline_id": pipeline_id,
"mode": local_mode_as_str(&mode),
"client_name": client_name,
"instance": instance_data,
"credentials": {
"masked_pg_uri": masked_pg_uri(&pg_uri),
"copy_safe_pg_uri": pg_uri,
"catalog_pg_uri": catalog_pg_uri,
},
"catalog_public_proxy": public_proxy_binding.as_ref().map(|binding| binding.binding.clone()),
"catalog_public_proxy_dns": public_proxy_dns,
"pipeline": {
"overall_status": "success",
"total_duration_ms": now_ms(&started),
"steps": steps,
"provision_schema": should_provision,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances")]
pub async fn admin_list_postgres_instances(
req: HttpRequest,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let request_started: Instant = Instant::now();
let request_id: String = Uuid::new_v4().to_string();
let dependency_status = inspect_local_provisioning_dependencies().await;
let (containers, instance_runtime_available, instance_runtime_error): (
Vec<DockerManagedContainer>,
bool,
Option<String>,
) = match list_managed_postgres_containers().await {
Ok(value) => (value, true, None),
Err(ProvisioningError::Unavailable(message)) => {
error!(
target: "athena::provisioning",
request_id = %request_id,
route = "/admin/provision/instances",
outcome = "runtime_unavailable",
error_message = %message,
"Provisioning instance runtime unavailable while listing managed containers"
);
(Vec::new(), false, Some(message))
}
Err(err) => {
error!(
target: "athena::provisioning",
request_id = %request_id,
route = "/admin/provision/instances",
outcome = "failed",
error_kind = %provisioning_error_kind(&err),
error_message = %provisioning_error_message(&err),
duration_ms = request_started.elapsed().as_millis(),
"Failed to list managed Postgres instances"
);
return map_provisioning_error("Failed to list Postgres instances", err);
}
};
let runtime_clients: HashMap<String, RegisteredClient> = state
.pg_registry
.list_registered_clients()
.into_iter()
.map(|client| (client.client_name.to_lowercase(), client))
.collect();
let mut catalog_clients: HashMap<String, AthenaClientRecord> = HashMap::new();
if let Ok(pool) = client_catalog_pool(state.get_ref()) {
match list_athena_clients(&pool).await {
Ok(clients) => {
catalog_clients = clients
.into_iter()
.map(|client| (client.client_name.to_lowercase(), client))
.collect();
}
Err(err) => {
return internal_error("Failed to list catalog clients", err.to_string());
}
}
}
let monitors = build_provisioning_monitor_entries(
state.get_ref(),
&containers,
&runtime_clients,
&catalog_clients,
)
.await;
let monitor_summary = summarize_provisioning_monitors(&monitors);
let payload: Vec<Value> = containers
.into_iter()
.map(|container| {
let linked_client_name = container
.labels
.get("athena.client")
.cloned()
.unwrap_or_default();
let linked_key = linked_client_name.to_lowercase();
let runtime_client = runtime_clients.get(&linked_key);
let catalog_client = catalog_clients.get(&linked_key);
let metadata_summary =
catalog_client.map(|client| summarize_provisioning_metadata(&client.metadata));
let runtime_registered = runtime_client
.map(|client| client.pool_connected)
.unwrap_or(false);
json!({
"container_name": container.container_name,
"running": container.running,
"status": container.status,
"image": container.image,
"host_port": container.host_port,
"host": metadata_summary.as_ref().and_then(|value| value.host.clone()),
"database_name": metadata_summary.as_ref().and_then(|value| value.database_name.clone()),
"database_engine": metadata_summary.as_ref().and_then(|value| value.database_engine.clone()),
"database_version": metadata_summary.as_ref().and_then(|value| value.database_version.clone()),
"shared_cluster_client_name": metadata_summary.as_ref().and_then(|value| value.shared_cluster_client_name.clone()),
"shared_client_cluster_name": metadata_summary.as_ref().and_then(|value| value.shared_cluster_client_name.clone()),
"cluster_name": metadata_summary.as_ref().and_then(|value| value.cluster_name.clone()),
"cluster_domain": metadata_summary.as_ref().and_then(|value| value.cluster_domain.clone()),
"labels": container.labels,
"linked_client": if linked_client_name.is_empty() { Value::Null } else {
json!({
"client_name": linked_client_name,
"runtime_registered": runtime_registered,
"runtime_known": runtime_client.is_some(),
"catalog_registered": catalog_client.is_some(),
})
},
"runtime_client": runtime_client,
"catalog_client": catalog_client,
})
})
.collect();
info!(
target: "athena::provisioning",
request_id = %request_id,
route = "/admin/provision/instances",
outcome = if instance_runtime_available { "success" } else { "success_runtime_unavailable" },
managed_instance_count = payload.len(),
monitor_count = monitor_summary.total,
managed_monitor_count = monitor_summary.managed,
docker_monitor_count = monitor_summary.docker,
healthy_probe_count = monitor_summary.healthy,
unreachable_probe_count = monitor_summary.unreachable,
runtime_registered_client_count = runtime_clients.len(),
catalog_registered_client_count = catalog_clients.len(),
instance_runtime_available = instance_runtime_available,
instance_runtime_error = %instance_runtime_error.clone().unwrap_or_default(),
duration_ms = request_started.elapsed().as_millis(),
"Listed managed Postgres instances"
);
api_success(
"Listed managed Postgres instances",
json!({
"instances": payload,
"count": payload.len(),
"dependencies": dependency_status,
"instance_runtime_available": instance_runtime_available,
"instance_runtime_error": instance_runtime_error,
"monitors": monitors,
"monitor_count": monitor_summary.total,
"monitor_summary": monitor_summary,
}),
)
}
#[get("/admin/provision/dependencies")]
pub async fn admin_get_local_provision_dependencies(req: HttpRequest) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let status = inspect_local_provisioning_dependencies().await;
api_success("Loaded local provisioning dependency status", json!(status))
}
#[post("/admin/provision/dependencies/install")]
pub async fn admin_install_local_provision_dependencies(
req: HttpRequest,
body: Json<InstallProvisionDependenciesRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let result =
match install_local_provisioning_dependencies(InstallLocalProvisionDependenciesParams {
install_docker: body.install_docker,
install_postgres: body.install_postgres,
start_services: body.start_services,
use_sudo: body.use_sudo,
})
.await
{
Ok(value) => value,
Err(err) => {
return map_provisioning_error(
"Failed to install local provisioning dependencies",
err,
);
}
};
api_success(
"Local provisioning dependency installation attempt completed",
json!(result),
)
}
#[post("/admin/provision/instances")]
pub async fn admin_spin_up_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
body: Json<SpinUpInstanceRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let defaults = runtime_config_from_root();
let spin_result: SpinUpPostgresResult = match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: body.client_name.clone(),
container_name: body.container_name.clone(),
image: body
.image
.clone()
.or_else(|| defaults.postgres_image.clone()),
host: body.host.clone().or_else(|| defaults.instance_host.clone()),
host_port: body.host_port,
db_name: body.db_name.clone(),
username: body.username.clone(),
password: body.password.clone(),
startup_timeout_secs: body.startup_timeout_secs.or(defaults.startup_timeout_secs),
reuse_existing: body.reuse_existing,
})
.await
{
Ok(result) => result,
Err(err) => return map_provisioning_error("Failed to spin up Postgres instance", err),
};
let mut statements_executed: Option<usize> = None;
if body.provision_schema {
statements_executed = Some(match run_provision_sql(&spin_result.pg_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision Postgres instance", err);
}
});
}
let cluster_domain = managed_cluster_domain();
let database_version = postgres_major_version_from_image(&spin_result.image);
let base_metadata = json!({
"managed_by": "provision_api",
"provider": "docker",
"cluster_name": athena_dns::DEFAULT_MANAGED_CLUSTER_NAME,
"cluster_domain": cluster_domain,
"database_engine": "postgresql",
"database_version": database_version.clone(),
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"instance": {
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"database_engine": "postgresql",
"database_version": database_version.clone(),
},
});
let (catalog_pg_uri, metadata, public_proxy_binding) =
match resolve_managed_catalog_public_proxy(
&req,
&spin_result.client_name,
&spin_result.pg_uri,
base_metadata,
body.catalog_public_proxy.as_ref(),
) {
Ok(value) => value,
Err(resp) => return resp,
};
let public_proxy_dns = binding_dns_status(public_proxy_binding.as_ref()).await;
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&spin_result.client_name,
Some(default_managed_postgres_description()),
&spin_result.pg_uri,
Some(&catalog_pg_uri),
metadata,
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Postgres instance is ready",
json!({
"instance": {
"client_name": spin_result.client_name,
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"database_engine": "postgresql",
"database_version": database_version.clone(),
"password": spin_result.password,
"pg_uri": spin_result.pg_uri,
"catalog_pg_uri": catalog_pg_uri,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"wait_ready_ms": spin_result.wait_ready_ms,
},
"catalog_public_proxy": public_proxy_binding.as_ref().map(|binding| binding.binding.clone()),
"catalog_public_proxy_dns": public_proxy_dns,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances/{container_name}")]
pub async fn admin_get_postgres_instance_status(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<InstanceStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(status) => status,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let resolved_client_name = if let Some(client_name) = query.client_name.as_ref() {
let resolved = resolve_status_client_name_alias(state.get_ref(), client_name).await;
if resolved.is_empty() {
None
} else {
Some(resolved)
}
} else {
None
};
let runtime_client: Option<RegisteredClient> = resolved_client_name
.as_ref()
.and_then(|client_name| state.pg_registry.registered_client(client_name));
let mut catalog_client: Option<AthenaClientRecord> = None;
if let Some(client_name) = resolved_client_name.as_ref()
&& let Ok(pool) = client_catalog_pool(state.get_ref())
{
catalog_client = match get_athena_client_by_name(&pool, client_name).await {
Ok(value) => value,
Err(err) => return internal_error("Failed to load catalog client", err.to_string()),
};
}
api_success(
"Loaded Postgres instance status",
json!({
"instance": status,
"client_name": resolved_client_name,
"requested_client_name": query.client_name,
"runtime_client": runtime_client,
"catalog_client": catalog_client,
}),
)
}
#[post("/admin/provision/instances/{container_name}/start")]
pub async fn admin_start_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<InstanceLifecycleRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = start_container(&container_name).await {
return map_provisioning_error("Failed to start Postgres instance", err);
}
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let payload = body
.as_ref()
.map(|value| value.0.clone())
.unwrap_or(InstanceLifecycleRequest {
client_name: None,
sync_runtime: true,
});
let resolved_client_name =
match resolve_instance_client_name(&container_name, payload.client_name.as_deref()).await {
Ok(value) => value,
Err(resp) => return resp,
};
let runtime_sync = if payload.sync_runtime {
if let Some(client_name) = resolved_client_name.as_deref() {
match reconnect_runtime_client(state.get_ref(), client_name).await {
Ok(()) => json!({
"requested": true,
"client_name": client_name,
"reconnected": true
}),
Err(err) => json!({
"requested": true,
"client_name": client_name,
"reconnected": false,
"error": err
}),
}
} else {
json!({
"requested": true,
"reconnected": false,
"error": "no linked client_name found for this container"
})
}
} else {
json!({
"requested": false,
"reconnected": false
})
};
api_success(
"Started Postgres instance",
json!({
"instance": status,
"client_name": resolved_client_name,
"runtime_sync": runtime_sync,
}),
)
}
#[post("/admin/provision/instances/{container_name}/stop")]
pub async fn admin_stop_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<InstanceLifecycleRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = stop_container(&container_name).await {
return map_provisioning_error("Failed to stop Postgres instance", err);
}
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let payload = body
.as_ref()
.map(|value| value.0.clone())
.unwrap_or(InstanceLifecycleRequest {
client_name: None,
sync_runtime: true,
});
let resolved_client_name =
match resolve_instance_client_name(&container_name, payload.client_name.as_deref()).await {
Ok(value) => value,
Err(resp) => return resp,
};
let runtime_sync = if payload.sync_runtime {
if let Some(client_name) = resolved_client_name.as_deref() {
state.pg_registry.mark_unavailable(client_name);
json!({
"requested": true,
"client_name": client_name,
"marked_unavailable": true
})
} else {
json!({
"requested": true,
"marked_unavailable": false,
"error": "no linked client_name found for this container"
})
}
} else {
json!({
"requested": false,
"marked_unavailable": false
})
};
api_success(
"Stopped Postgres instance",
json!({
"instance": status,
"client_name": resolved_client_name,
"runtime_sync": runtime_sync,
}),
)
}
#[post("/admin/provision/instances/{container_name}/bindings")]
pub async fn admin_bind_postgres_instance_port_route(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<BindInstancePortRouteRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
let route_key = match normalize_route_binding_key(&body.route_key) {
Ok(value) => value,
Err(resp) => return resp,
};
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let Some(target_port) = status.host_port else {
return service_unavailable(
"Port unavailable",
format!(
"Postgres container '{}' is missing a published host port.",
container_name
),
);
};
let client_name =
match resolve_instance_client_name(&container_name, body.client_name.as_deref()).await {
Ok(Some(value)) => value,
Ok(None) => {
return bad_request(
"Missing client name",
"Provide 'client_name' or label the container with 'athena.client'.",
);
}
Err(resp) => return resp,
};
let source_pg_uri = match resolve_client_connection_uri(state.get_ref(), &client_name).await {
Ok(Some(value)) => value,
Ok(None) => {
return service_unavailable(
"Client URI unavailable",
format!(
"No Postgres URI is available for client '{}'. Register runtime/catalog metadata first.",
client_name
),
);
}
Err(resp) => return resp,
};
let public_host = match if let Some(value) = body.public_host.as_deref() {
resolve_public_host(&req, Some(value))
} else if body.use_wildcard_host {
wildcard_public_host_for_route_key(&route_key)
.map_err(|err| bad_request("Invalid wildcard public host", err))
} else {
resolve_public_host(&req, None)
} {
Ok(value) => value,
Err(resp) => return resp,
};
let public_port = body.public_port.unwrap_or(target_port);
let binding = match build_catalog_public_proxy_binding(
&client_name,
&route_key,
&source_pg_uri,
&public_host,
public_port,
) {
Ok(mut binding) => {
if let Some(binding_obj) = binding.binding.as_object_mut() {
binding_obj.insert("container_name".to_string(), json!(container_name));
binding_obj.insert(
"target".to_string(),
json!({
"host": "127.0.0.1",
"port": target_port
}),
);
}
binding
}
Err(err) => return bad_request("Invalid Postgres URI", err),
};
let dns_status = binding_dns_status(Some(&binding)).await;
let mut persisted = false;
if body.persist_in_catalog {
let pool = match client_catalog_pool(state.get_ref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let existing = match get_athena_client_by_name(&pool, &client_name).await {
Ok(value) => value,
Err(err) => return internal_error("Failed to load catalog client", err.to_string()),
};
let Some(existing) = existing else {
return not_found(
"Catalog client not found",
format!("No catalog client '{}' was found.", client_name),
);
};
let metadata = match merge_catalog_public_proxy_metadata(
existing.metadata,
&source_pg_uri,
&binding,
) {
Ok(value) => value,
Err(err) => {
return internal_error("Failed to persist route binding metadata", err);
}
};
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: existing.client_name,
description: existing.description,
pg_uri: Some(binding.public_pg_uri.clone()),
pg_uri_env_var: existing.pg_uri_env_var,
config_uri_template: existing.config_uri_template,
source: existing.source,
is_active: existing.is_active,
is_frozen: existing.is_frozen,
metadata,
},
)
.await
{
return internal_error("Failed to persist route binding metadata", err.to_string());
}
persisted = true;
}
api_success(
"Bound managed instance port route",
json!({
"binding": binding.binding,
"catalog_pg_uri": binding.public_pg_uri,
"dns": dns_status,
"persisted_in_catalog": persisted,
}),
)
}
#[delete("/admin/provision/instances/{container_name}")]
pub async fn admin_delete_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<DeleteInstanceRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = remove_container(&container_name).await {
return map_provisioning_error("Failed to delete Postgres instance", err);
}
let payload: DeleteInstanceRequest =
body.as_ref()
.map(|value| value.0.clone())
.unwrap_or(DeleteInstanceRequest {
client_name: None,
remove_runtime_client: true,
remove_catalog_client: true,
});
let mut runtime_removed: bool = false;
let mut catalog_removed: bool = false;
if let Some(client_name) = payload.client_name.as_ref() {
if payload.remove_runtime_client {
state.pg_registry.remove_client(client_name);
runtime_removed = true;
}
if payload.remove_catalog_client {
let pool: Pool<Postgres> = match client_catalog_pool(state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match delete_athena_client(&pool, client_name).await {
Ok(Some(_)) => catalog_removed = true,
Ok(None) => catalog_removed = false,
Err(err) => {
return internal_error("Failed to delete catalog client", err.to_string());
}
}
}
}
api_success(
"Deleted Postgres instance",
json!({
"container_name": container_name,
"client_name": payload.client_name,
"runtime_removed": runtime_removed,
"catalog_removed": catalog_removed,
}),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
#[cfg(feature = "provisioning")]
{
crate::features::provisioning::api::services(cfg);
return;
}
#[cfg(not(feature = "provisioning"))]
{
cfg.service(admin_provision)
.service(admin_provision_status)
.service(crate::features::provisioning::api::admin_create_clone_job)
.service(crate::features::provisioning::api::admin_list_clone_jobs)
.service(crate::features::provisioning::api::admin_get_clone_job)
.service(crate::features::provisioning::api::admin_cancel_clone_job)
.service(crate::features::provisioning::api::admin_retry_clone_job)
.service(admin_run_local_provision_pipeline)
.service(admin_list_local_cluster_databases)
.service(admin_create_local_cluster_database)
.service(admin_get_local_provision_dependencies)
.service(admin_install_local_provision_dependencies)
.service(admin_list_postgres_instances)
.service(admin_spin_up_postgres_instance)
.service(crate::features::provisioning::api::admin_provision_neon)
.service(crate::features::provisioning::api::admin_provision_railway)
.service(crate::features::provisioning::api::admin_provision_render)
.service(admin_get_postgres_instance_status)
.service(admin_start_postgres_instance)
.service(admin_stop_postgres_instance)
.service(admin_bind_postgres_instance_port_route)
.service(admin_delete_postgres_instance);
}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::test::TestRequest;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn normalize_route_binding_key_accepts_expected_shape() {
let value: String = normalize_route_binding_key("Tenant_01-ReadWrite").expect("valid key");
assert_eq!(value, "tenant_01-readwrite");
}
#[test]
fn normalize_public_host_strips_scheme_path_and_port() {
let value: String =
normalize_public_host("https://cluster.athena-cluster.com:4052/some/path")
.expect("host");
assert_eq!(value, "cluster.athena-cluster.com");
}
#[test]
fn rewrite_postgres_uri_authority_rewrites_host_and_port() {
let rewritten: String = crate::provisioning::rewrite_postgres_uri_authority(
"postgres://athena:secret@127.0.0.1:5432/example?sslmode=disable",
"cluster.athena-cluster.com",
45432,
)
.expect("rewrite");
assert_eq!(
rewritten,
"postgresql://athena:secret@cluster.athena-cluster.com:45432/example?sslmode=disable"
);
}
#[test]
fn resolve_managed_catalog_public_proxy_uses_wildcard_host_when_requested() {
let _guard = ENV_LOCK.lock().expect("lock env");
unsafe {
std::env::set_var("ATHENA_WILDCARD_HOST_PATTERN", "*.v3.athena-cluster.com");
}
let req = TestRequest::default().to_http_request();
let proxy = ManagedCatalogPublicProxyRequest {
route_key: Some("athena-clients".to_string()),
public_host: None,
public_port: Some(45432),
use_wildcard_host: true,
};
let (catalog_pg_uri, metadata, binding) = resolve_managed_catalog_public_proxy(
&req,
"athena-clients",
"postgres://athena:secret@127.0.0.1:5432/athena_clients",
json!({
"managed_by": "provision_api",
"provider": "docker"
}),
Some(&proxy),
)
.expect("proxy");
assert_eq!(
catalog_pg_uri,
"postgresql://athena:secret@athena-clients.v3.athena-cluster.com:45432/athena_clients"
);
assert_eq!(
binding
.as_ref()
.and_then(|value| value.binding.get("public"))
.and_then(|value| value.get("host"))
.and_then(Value::as_str),
Some("athena-clients.v3.athena-cluster.com")
);
assert_eq!(
private_catalog_pg_uri(&metadata).as_deref(),
Some("postgresql://athena:secret@127.0.0.1:5432/athena_clients")
);
unsafe {
std::env::remove_var("ATHENA_WILDCARD_HOST_PATTERN");
}
}
}