use actix_web::{
HttpRequest, HttpResponse, delete, get, post,
web::{self, Data, Json, Path, Query},
};
use serde::Deserialize;
use serde_json::{Map, Value, json};
use sqlx::postgres::PgPool;
use sqlx::{Pool, Postgres, Row};
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
api_created, api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};
use crate::data::clients::AthenaClientRecord;
use crate::data::clients::{
SaveAthenaClientParams, delete_athena_client, get_athena_client_by_name, list_athena_clients,
upsert_athena_client,
};
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, RegisteredClient};
use crate::parser::resolve_postgres_uri;
use crate::provisioning::{
DockerContainerStatus, DockerManagedContainer, EXPECTED_TABLES,
LocalClusterCreateDatabaseParams, LocalClusterDatabaseCreateOptions, NeonConnectionParams,
NeonProjectCreateParams, NeonProjectCreateResult, ProvisioningError, RailwayConnectionParams,
RailwayPluginCreateParams, RailwayProjectCreateParams, RailwayServiceCreateParams,
RenderConnectionParams, RenderPostgresCreateParams, SpinUpPostgresParams, SpinUpPostgresResult,
create_neon_project, create_postgres_database, create_railway_plugin, create_railway_project,
create_railway_service, create_render_postgres_service, fetch_neon_connection_uri,
fetch_railway_connection_uri, fetch_railway_project_base_environment_id,
fetch_render_connection_uri, inspect_container, json_object_insert_if_missing,
list_managed_postgres_containers, list_postgres_databases, postgres_uri_database_name,
postgres_uri_fingerprint, remove_container, replace_uri_database_name, run_provision_sql,
spin_up_postgres_instance, start_container, stop_container,
};
#[derive(Debug, Deserialize)]
struct ProvisionRequest {
#[serde(default)]
uri: Option<String>,
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
register: bool,
#[serde(default)]
register_name: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ProvisionStatusQuery {
client_name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct LocalClusterDatabasesQuery {
client_name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct LocalClusterCreateDatabaseRequest {
client_name: String,
database_name: String,
#[serde(default)]
owner: Option<String>,
#[serde(default)]
template: Option<String>,
#[serde(default)]
encoding: Option<String>,
#[serde(default)]
lc_collate: Option<String>,
#[serde(default)]
lc_ctype: Option<String>,
#[serde(default)]
tablespace: Option<String>,
#[serde(default)]
register_name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct SpinUpInstanceRequest {
client_name: String,
#[serde(default)]
container_name: Option<String>,
#[serde(default)]
image: Option<String>,
#[serde(default)]
host: Option<String>,
#[serde(default)]
host_port: Option<u16>,
#[serde(default)]
db_name: Option<String>,
#[serde(default)]
username: Option<String>,
#[serde(default)]
password: Option<String>,
#[serde(default)]
startup_timeout_secs: Option<u64>,
#[serde(default)]
reuse_existing: bool,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct DeleteInstanceRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default = "default_true")]
remove_runtime_client: bool,
#[serde(default = "default_true")]
remove_catalog_client: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct InstanceStatusQuery {
#[serde(default)]
client_name: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct InstanceLifecycleRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default = "default_true")]
sync_runtime: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct BindInstancePortRouteRequest {
route_key: String,
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
public_host: Option<String>,
#[serde(default)]
public_port: Option<u16>,
#[serde(default = "default_true")]
persist_in_catalog: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
enum LocalProvisionMode {
DedicatedContainer,
SharedClusterDatabase,
}
impl Default for LocalProvisionMode {
fn default() -> Self {
Self::DedicatedContainer
}
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
struct LocalProvisionAdvancedOptions {
#[serde(default)]
container_name: Option<String>,
#[serde(default)]
image: Option<String>,
#[serde(default)]
host: Option<String>,
#[serde(default)]
host_port: Option<u16>,
#[serde(default)]
startup_timeout_secs: Option<u64>,
#[serde(default)]
db_name: Option<String>,
#[serde(default)]
username: Option<String>,
#[serde(default)]
password: Option<String>,
#[serde(default)]
reuse_existing: Option<bool>,
#[serde(default)]
provision_schema: Option<bool>,
#[serde(default)]
register_runtime: Option<bool>,
#[serde(default)]
register_catalog: Option<bool>,
#[serde(default)]
description: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct LocalProvisionPipelineRequest {
client_name: String,
#[serde(default)]
mode: LocalProvisionMode,
#[serde(default)]
shared_cluster_client_name: Option<String>,
#[serde(default)]
advanced: Option<LocalProvisionAdvancedOptions>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct NeonProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
branch_id: Option<String>,
#[serde(default)]
database_name: Option<String>,
#[serde(default)]
role_name: Option<String>,
#[serde(default)]
endpoint_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_payload: Option<Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct RailwayProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
environment_id: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
plugin_id: Option<String>,
#[serde(default)]
graphql_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_input: Option<Value>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_input: Option<Value>,
#[serde(default)]
create_plugin_if_missing: bool,
#[serde(default)]
plugin_name: Option<String>,
#[serde(default)]
plugin_create_input: Option<Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct RenderProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_payload: Option<Value>,
#[serde(default)]
plan: Option<String>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
postgres_version: Option<String>,
#[serde(default)]
disk_size_gb: Option<u32>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
fn default_true() -> bool {
true
}
fn map_provisioning_error(context: &str, err: ProvisioningError) -> HttpResponse {
match err {
ProvisioningError::InvalidInput(message) => bad_request(context, message),
ProvisioningError::Conflict(message) => conflict(context, message),
ProvisioningError::Unavailable(message) => service_unavailable(context, message),
ProvisioningError::Execution(message) => internal_error(context, message),
}
}
fn resolve_uri(state: &AppState, req: &ProvisionRequest) -> Result<String, HttpResponse> {
match (&req.uri, &req.client_name) {
(Some(uri), None) => Ok(uri.clone()),
(None, Some(client_name)) => {
let registered: RegisteredClient = state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("No Postgres client named '{}' is registered.", client_name),
)
})?;
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(registered.pg_uri)
.ok_or_else(|| {
bad_request(
"Client URI unavailable",
format!("No Postgres URI is available for client '{}'.", client_name),
)
})
}
(Some(_), Some(_)) => Err(bad_request(
"Ambiguous target",
"Provide either 'uri' or 'client_name', not both.",
)),
(None, None) => Err(bad_request(
"Missing target",
"Provide either 'uri' (direct Postgres URI) or 'client_name' (registered client).",
)),
}
}
fn resolve_registered_client_uri(
state: &AppState,
client_name: &str,
) -> Result<String, HttpResponse> {
let registered: RegisteredClient = state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("No Postgres client named '{}' is registered.", client_name),
)
})?;
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(registered.pg_uri)
.ok_or_else(|| {
bad_request(
"Client URI unavailable",
format!("No Postgres URI is available for client '{}'.", client_name),
)
})
}
fn local_cluster_target(
state: &AppState,
client_name: &str,
) -> Result<(PgPool, String), HttpResponse> {
let pool: Pool<Postgres> = state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client unavailable",
format!("Postgres client '{}' is not connected.", client_name),
)
})?;
let pg_uri = resolve_registered_client_uri(state, client_name)?;
Ok((pool, pg_uri))
}
async fn ensure_runtime_client_pool(
state: &AppState,
client_name: &str,
) -> Result<PgPool, HttpResponse> {
if let Some(pool) = state.pg_registry.get_pool(client_name) {
return Ok(pool);
}
if let Err(_err) = reconnect_runtime_client(state, client_name).await {
return Err(service_unavailable(
"Client unavailable",
format!(
"Postgres client '{}' is registered but could not be reconnected.",
client_name
),
));
}
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client unavailable",
format!(
"Postgres client '{}' is registered but has no active connection pool.",
client_name
),
)
})
}
async fn reconnect_runtime_client(state: &AppState, client_name: &str) -> Result<(), String> {
let Some(registered_client) = state.pg_registry.registered_client(client_name) else {
return Err(format!("client '{client_name}' is not registered"));
};
if !registered_client.is_active || registered_client.is_frozen {
return Err(format!(
"client '{client_name}' is configured but cannot accept new connections"
));
}
let reconnect_target = ClientConnectionTarget {
client_name: registered_client.client_name.clone(),
source: registered_client.source.clone(),
description: registered_client.description.clone(),
pg_uri: registered_client.pg_uri.clone(),
pg_uri_env_var: registered_client.pg_uri_env_var.clone(),
config_uri_template: registered_client.config_uri_template.clone(),
is_active: registered_client.is_active,
is_frozen: registered_client.is_frozen,
};
if let Err(err) = state.pg_registry.upsert_client(reconnect_target).await {
return Err(format!("runtime reconnect failed: {err}"));
}
if state.pg_registry.get_pool(client_name).is_none() {
return Err(format!(
"client '{client_name}' is still missing an active connection pool"
));
}
Ok(())
}
async fn resolve_status_client_name_alias(state: &AppState, requested: &str) -> String {
let candidate = requested.trim();
if candidate.is_empty() {
return String::new();
}
if state.pg_registry.registered_client(candidate).is_some() {
return candidate.to_string();
}
let containers = match list_managed_postgres_containers().await {
Ok(value) => value,
Err(_) => return candidate.to_string(),
};
if let Some(container) = containers
.iter()
.find(|container| container.container_name.eq_ignore_ascii_case(candidate))
&& let Some(linked_client) = container
.labels
.get("athena.client")
.map(|value| value.trim())
.filter(|value| !value.is_empty())
{
return linked_client.to_string();
}
candidate.to_string()
}
async fn find_managed_container(
container_name: &str,
) -> Result<Option<DockerManagedContainer>, HttpResponse> {
let containers: Vec<DockerManagedContainer> = list_managed_postgres_containers()
.await
.map_err(|err| map_provisioning_error("Failed to list Postgres instances", err))?;
Ok(containers.into_iter().find(|container| {
container
.container_name
.eq_ignore_ascii_case(container_name)
}))
}
async fn resolve_instance_client_name(
container_name: &str,
explicit_client_name: Option<&str>,
) -> Result<Option<String>, HttpResponse> {
if let Some(client_name) = explicit_client_name
.map(str::trim)
.filter(|value| !value.is_empty())
{
return Ok(Some(client_name.to_string()));
}
let managed = find_managed_container(container_name).await?;
Ok(managed.and_then(|container| {
container
.labels
.get("athena.client")
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}))
}
fn registered_client_connection_uri(registered: &RegisteredClient) -> Option<String> {
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or_else(|| registered.pg_uri.clone())
}
fn catalog_client_connection_uri(client: &AthenaClientRecord) -> Option<String> {
client
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or_else(|| client.pg_uri.clone())
}
async fn resolve_client_connection_uri(
state: &AppState,
client_name: &str,
) -> Result<Option<String>, HttpResponse> {
if let Some(runtime_client) = state.pg_registry.registered_client(client_name)
&& let Some(uri) = registered_client_connection_uri(&runtime_client)
{
return Ok(Some(uri));
}
let pool = match client_catalog_pool(state) {
Ok(value) => value,
Err(_) => return Ok(None),
};
let catalog_client = get_athena_client_by_name(&pool, client_name)
.await
.map_err(|err| internal_error("Failed to load catalog client", err.to_string()))?;
Ok(catalog_client
.as_ref()
.and_then(catalog_client_connection_uri))
}
fn normalize_route_binding_key(route_key: &str) -> Result<String, HttpResponse> {
let normalized = route_key.trim().to_ascii_lowercase();
if normalized.is_empty() {
return Err(bad_request(
"Invalid route key",
"'route_key' must not be empty.",
));
}
if normalized.len() > 64
|| !normalized
.chars()
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '-' || ch == '_')
{
return Err(bad_request(
"Invalid route key",
"'route_key' may contain only lowercase letters, numbers, '-' and '_' and must be at most 64 characters.",
));
}
Ok(normalized)
}
fn normalize_public_host(value: &str) -> Option<String> {
let mut host = value.trim();
if host.is_empty() {
return None;
}
if let Some((_, remainder)) = host.split_once("://") {
host = remainder;
}
if let Some((without_path, _)) = host.split_once('/') {
host = without_path;
}
if let Some((first, _)) = host.split_once(',') {
host = first.trim();
}
if host.is_empty() {
return None;
}
if host.starts_with('[') {
return host
.find(']')
.map(|idx| host[1..idx].to_string())
.filter(|value| !value.is_empty());
}
if host.matches(':').count() == 1 {
return host
.split_once(':')
.map(|(candidate, _)| candidate.to_string())
.filter(|candidate| !candidate.is_empty());
}
Some(host.to_string())
}
fn resolve_public_host(
req: &HttpRequest,
explicit_public_host: Option<&str>,
) -> Result<String, HttpResponse> {
if let Some(value) = explicit_public_host.and_then(normalize_public_host) {
return Ok(value);
}
let host_header = req
.headers()
.get("x-forwarded-host")
.or_else(|| req.headers().get("host"))
.and_then(|value| value.to_str().ok())
.and_then(normalize_public_host);
host_header.ok_or_else(|| {
bad_request(
"Missing public host",
"Provide 'public_host' or send a valid Host/X-Forwarded-Host header.",
)
})
}
fn format_postgres_host(host: &str) -> String {
if host.contains(':') && !(host.starts_with('[') && host.ends_with(']')) {
return format!("[{host}]");
}
host.to_string()
}
fn rewrite_postgres_uri_authority(
source_uri: &str,
public_host: &str,
public_port: u16,
) -> Result<String, HttpResponse> {
let trimmed = source_uri.trim();
if !(trimmed.starts_with("postgres://") || trimmed.starts_with("postgresql://")) {
return Err(bad_request(
"Invalid Postgres URI",
"Only postgres:// or postgresql:// URIs are supported.",
));
}
let Some(scheme_sep) = trimmed.find("://") else {
return Err(bad_request(
"Invalid Postgres URI",
"Postgres URI is missing scheme separator.",
));
};
let authority_start = scheme_sep + 3;
let Some(path_start_rel) = trimmed[authority_start..].find('/') else {
return Err(bad_request(
"Invalid Postgres URI",
"Postgres URI is missing database path.",
));
};
let path_start = authority_start + path_start_rel;
let authority = &trimmed[authority_start..path_start];
let userinfo = authority
.rsplit_once('@')
.map(|(prefix, _)| format!("{prefix}@"))
.unwrap_or_default();
let host_for_uri = format_postgres_host(public_host);
Ok(format!(
"{}{}{}:{}{}",
&trimmed[..authority_start],
userinfo,
host_for_uri,
public_port,
&trimmed[path_start..]
))
}
async fn load_athena_managed_databases(
state: &AppState,
server_pg_uri: &str,
) -> Result<Vec<Value>, HttpResponse> {
let target_fingerprint = postgres_uri_fingerprint(server_pg_uri).ok_or_else(|| {
bad_request(
"Client URI unavailable",
"Failed to derive a server fingerprint for the selected client URI.",
)
})?;
let pool: Pool<Postgres> = client_catalog_pool(state)?;
let clients: Vec<AthenaClientRecord> = list_athena_clients(&pool)
.await
.map_err(|err| internal_error("Failed to load catalog clients", err.to_string()))?;
let mut seen: HashSet<String> = HashSet::new();
let mut managed: Vec<Value> = Vec::new();
for client in clients {
let candidate_uri: Option<String> = client
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(client.pg_uri.clone());
let Some(candidate_uri) = candidate_uri else {
continue;
};
if postgres_uri_fingerprint(&candidate_uri).as_deref() != Some(target_fingerprint.as_str())
{
continue;
}
let Some(database_name) = postgres_uri_database_name(&candidate_uri) else {
continue;
};
let dedupe: String = format!("{}:{}", client.client_name, database_name);
if !seen.insert(dedupe) {
continue;
}
managed.push(json!({
"client_name": client.client_name,
"database_name": database_name,
"source": client.source,
"is_active": client.is_active,
"is_frozen": client.is_frozen,
}));
}
managed.sort_by(|a, b| {
let a_name = a
.get("database_name")
.and_then(Value::as_str)
.unwrap_or_default();
let b_name = b
.get("database_name")
.and_then(Value::as_str)
.unwrap_or_default();
a_name.cmp(b_name)
});
Ok(managed)
}
fn client_catalog_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Err(service_unavailable(
"Client catalog unavailable",
"No athena_logging client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client catalog unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})
}
fn required_field(name: &str, value: Option<String>) -> Result<String, HttpResponse> {
let value: String = value.unwrap_or_default();
if value.trim().is_empty() {
return Err(bad_request(
"Missing required field",
format!(
"Provide '{}' or set 'connection_uri' to bypass provider API lookup.",
name
),
));
}
Ok(value)
}
async fn register_provisioned_client(
state: &AppState,
client_name: &str,
description: Option<String>,
pg_uri: &str,
metadata: serde_json::Value,
register_runtime: bool,
register_catalog: bool,
) -> Result<(bool, bool), HttpResponse> {
let mut runtime_registered: bool = false;
let mut catalog_registered: bool = false;
if register_runtime {
let target: ClientConnectionTarget = ClientConnectionTarget {
client_name: client_name.to_string(),
source: "database".to_string(),
description: description.clone(),
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
return Err(internal_error(
"Failed to register runtime client",
format!("Runtime client registration failed: {}", err),
));
}
runtime_registered = true;
}
if register_catalog {
let pool: Pool<Postgres> = client_catalog_pool(state)?;
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: client_name.to_string(),
description,
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
source: "database".to_string(),
is_active: true,
is_frozen: false,
metadata,
},
)
.await
{
return Err(internal_error(
"Failed to register client in catalog",
err.to_string(),
));
}
catalog_registered = true;
}
Ok((runtime_registered, catalog_registered))
}
fn bool_or_default(value: Option<bool>, default: bool) -> bool {
value.unwrap_or(default)
}
fn now_ms(started: &Instant) -> u128 {
started.elapsed().as_millis()
}
fn masked_pg_uri(pg_uri: &str) -> String {
if let Some((scheme, rest)) = pg_uri.split_once("://")
&& let Some((auth_part, host_part)) = rest.split_once('@')
&& let Some((user, _password)) = auth_part.split_once(':')
{
return format!("{scheme}://{user}:***@{host_part}");
}
pg_uri.to_string()
}
fn set_pipeline_step(
steps: &mut Map<String, Value>,
name: &str,
status: &str,
duration_ms: u128,
details: Value,
) {
steps.insert(
name.to_string(),
json!({
"status": status,
"duration_ms": duration_ms,
"details": details,
}),
);
}
fn local_mode_as_str(mode: &LocalProvisionMode) -> &'static str {
match mode {
LocalProvisionMode::DedicatedContainer => "dedicated_container",
LocalProvisionMode::SharedClusterDatabase => "shared_cluster_database",
}
}
fn pipeline_error_response(
err: ProvisioningError,
pipeline_id: &str,
mode: &LocalProvisionMode,
client_name: &str,
rollback_hint: Option<&str>,
) -> HttpResponse {
let context = format!(
"pipeline_id={}, mode={}, client_name={}",
pipeline_id,
local_mode_as_str(mode),
client_name
);
let suffix = rollback_hint
.map(|hint| format!(" {}.", hint))
.unwrap_or_default();
match err {
ProvisioningError::InvalidInput(message) => bad_request(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
ProvisioningError::Conflict(message) => conflict(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
ProvisioningError::Unavailable(message) => service_unavailable(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
ProvisioningError::Execution(message) => internal_error(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
}
}
#[post("/admin/provision")]
pub async fn admin_provision(
req: HttpRequest,
state: Data<AppState>,
body: Json<ProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pg_uri: String = match resolve_uri(state.get_ref(), &body) {
Ok(uri) => uri,
Err(resp) => return resp,
};
let statement_count: usize = match run_provision_sql(&pg_uri).await {
Ok(n) => n,
Err(err) => return map_provisioning_error("Provisioning failed", err),
};
let registered_name: Option<String> = if body.register && body.uri.is_some() {
let name: String = body
.register_name
.clone()
.unwrap_or_else(|| "provisioned".to_string());
let target = ClientConnectionTarget {
client_name: name.clone(),
source: "database".to_string(),
description: Some("Provisioned via API".to_string()),
pg_uri: Some(pg_uri.clone()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
tracing::warn!(
client = %name,
error = %err,
"provisioned but failed to register client"
);
}
Some(name)
} else {
None
};
let client_label = body
.client_name
.clone()
.or(registered_name.clone())
.unwrap_or_else(|| "direct".to_string());
api_success(
format!("Provisioned {} statements", statement_count),
json!({
"statements_executed": statement_count,
"client": client_label,
"registered": registered_name,
"tables": EXPECTED_TABLES,
}),
)
}
#[get("/admin/provision/status")]
pub async fn admin_provision_status(
req: HttpRequest,
state: Data<AppState>,
query: Query<ProvisionStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let requested_client_name = query.client_name.trim().to_string();
if requested_client_name.is_empty() {
return bad_request("Missing required field", "'client_name' must not be empty.");
}
let resolved_client_name =
resolve_status_client_name_alias(state.get_ref(), &requested_client_name).await;
let pool = match ensure_runtime_client_pool(state.get_ref(), &resolved_client_name).await {
Ok(value) => value,
Err(resp) => return resp,
};
let rows = match sqlx::query(
"SELECT table_name::text FROM information_schema.tables WHERE table_schema = 'public'",
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => return internal_error("Failed to query tables", err.to_string()),
};
let existing_tables: Vec<String> = rows
.iter()
.filter_map(|row| row.try_get::<String, _>("table_name").ok())
.collect();
let present: Vec<&str> = EXPECTED_TABLES
.iter()
.filter(|table| existing_tables.iter().any(|existing| existing == **table))
.copied()
.collect();
let missing: Vec<&str> = EXPECTED_TABLES
.iter()
.filter(|table| !existing_tables.iter().any(|existing| existing == **table))
.copied()
.collect();
api_success(
format!("Provisioning status for '{}'", resolved_client_name),
json!({
"client_name": resolved_client_name,
"requested_client_name": requested_client_name,
"provisioned": missing.is_empty(),
"present_tables": present,
"missing_tables": missing,
}),
)
}
#[get("/admin/provision/local/databases")]
pub async fn admin_list_local_cluster_databases(
req: HttpRequest,
state: Data<AppState>,
query: Query<LocalClusterDatabasesQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &query.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let all_databases: Vec<String> = match list_postgres_databases(&pool).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to list local cluster databases", err),
};
let managed_databases: Vec<Value> =
match load_athena_managed_databases(state.get_ref(), &server_pg_uri).await {
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Listed local cluster databases",
json!({
"client_name": query.client_name,
"all_databases": all_databases,
"athena_managed_databases": managed_databases,
}),
)
}
#[post("/admin/provision/local/databases")]
pub async fn admin_create_local_cluster_database(
req: HttpRequest,
state: Data<AppState>,
body: Json<LocalClusterCreateDatabaseRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &body.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let create_params = LocalClusterCreateDatabaseParams {
database_name: body.database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: body.owner.clone(),
template: body.template.clone(),
encoding: body.encoding.clone(),
lc_collate: body.lc_collate.clone(),
lc_ctype: body.lc_ctype.clone(),
tablespace: body.tablespace.clone(),
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
return map_provisioning_error("Failed to create local cluster database", err);
}
let database_uri: String = match replace_uri_database_name(&server_pg_uri, &body.database_name)
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to build database URI", err),
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&database_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision newly created database", err);
}
})
} else {
None
};
let register_name: String = body
.register_name
.clone()
.unwrap_or_else(|| body.database_name.clone());
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
®ister_name,
body.description
.clone()
.or_else(|| Some(format!("Local cluster database '{}'", body.database_name))),
&database_uri,
json!({
"managed_by": "provision_api",
"provider": "local_cluster",
"server_client_name": body.client_name.clone(),
"database_name": body.database_name.clone(),
"create_options": {
"owner": body.owner.clone(),
"template": body.template.clone(),
"encoding": body.encoding.clone(),
"lc_collate": body.lc_collate.clone(),
"lc_ctype": body.lc_ctype.clone(),
"tablespace": body.tablespace.clone(),
},
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_created(
"Created local cluster database",
json!({
"server_client_name": body.client_name,
"database_name": body.database_name,
"database_uri": database_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
"register_name": register_name,
}
}),
)
}
#[post("/admin/provision/local/pipeline")]
pub async fn admin_run_local_provision_pipeline(
req: HttpRequest,
state: Data<AppState>,
body: Json<LocalProvisionPipelineRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = body.client_name.trim().to_lowercase();
if client_name.is_empty() {
return bad_request("Missing required field", "'client_name' must not be empty.");
}
let mode = body.mode.clone();
let advanced = body.advanced.clone().unwrap_or_default();
let pipeline_id = Uuid::new_v4().to_string();
let started = Instant::now();
let mut steps = Map::new();
let should_provision = bool_or_default(advanced.provision_schema, true);
let register_runtime = bool_or_default(advanced.register_runtime, true);
let register_catalog = bool_or_default(advanced.register_catalog, true);
let mut shared_cluster_client_name: Option<String> = None;
let pg_uri: String;
let instance_data: Value;
let infra_started = Instant::now();
match &mode {
LocalProvisionMode::DedicatedContainer => {
let spin_result = match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: client_name.clone(),
container_name: advanced.container_name.clone(),
image: advanced.image.clone(),
host: advanced.host.clone(),
host_port: advanced.host_port,
db_name: advanced.db_name.clone(),
username: advanced.username.clone(),
password: advanced.password.clone(),
startup_timeout_secs: advanced.startup_timeout_secs,
reuse_existing: bool_or_default(advanced.reuse_existing, true),
})
.await
{
Ok(value) => value,
Err(err) => {
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"failed",
now_ms(&infra_started),
json!({ "error": "Failed to create or reuse Docker container." }),
);
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
};
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"success",
now_ms(&infra_started),
json!({
"container_name": spin_result.container_name,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"host_port": spin_result.host_port,
}),
);
set_pipeline_step(
&mut steps,
"readiness_check",
"success",
spin_result.wait_ready_ms,
json!({ "wait_ready_ms": spin_result.wait_ready_ms }),
);
pg_uri = spin_result.pg_uri.clone();
instance_data = json!({
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"password": spin_result.password,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
});
}
LocalProvisionMode::SharedClusterDatabase => {
let shared_client = body
.shared_cluster_client_name
.as_deref()
.map(str::trim)
.unwrap_or_default()
.to_string();
if shared_client.is_empty() {
return bad_request(
"Missing required field",
"'shared_cluster_client_name' is required for mode='shared_cluster_database'.",
);
}
shared_cluster_client_name = Some(shared_client.clone());
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &shared_client)
{
Ok(value) => value,
Err(resp) => return resp,
};
let database_name = advanced
.db_name
.clone()
.unwrap_or_else(|| client_name.clone());
let create_params = LocalClusterCreateDatabaseParams {
database_name: database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: None,
template: None,
encoding: None,
lc_collate: None,
lc_ctype: None,
tablespace: None,
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"failed",
now_ms(&infra_started),
json!({ "error": "Failed to create database in shared cluster." }),
);
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"success",
now_ms(&infra_started),
json!({
"shared_cluster_client_name": shared_client,
"database_name": database_name,
}),
);
set_pipeline_step(
&mut steps,
"readiness_check",
"success",
0,
json!({ "strategy": "shared_cluster_existing_connection" }),
);
pg_uri = match replace_uri_database_name(&server_pg_uri, &database_name) {
Ok(value) => value,
Err(err) => {
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
};
instance_data = json!({
"shared_cluster_client_name": shared_client,
"database_name": database_name,
});
}
}
let provision_started = Instant::now();
let mut statements_executed: Option<usize> = None;
if should_provision {
match run_provision_sql(&pg_uri).await {
Ok(total) => {
statements_executed = Some(total);
set_pipeline_step(
&mut steps,
"schema_provision",
"success",
now_ms(&provision_started),
json!({ "statements_executed": total }),
);
}
Err(err) => {
set_pipeline_step(
&mut steps,
"schema_provision",
"failed",
now_ms(&provision_started),
json!({ "error": "Failed to apply Athena schema." }),
);
let rollback_hint = if let Some(name) =
instance_data.get("container_name").and_then(Value::as_str)
{
Some(format!(
"Rollback hint: delete container '{}' via DELETE /admin/provision/instances/{}",
name, name
))
} else {
None
};
return pipeline_error_response(
err,
&pipeline_id,
&mode,
&client_name,
rollback_hint.as_deref(),
);
}
}
} else {
set_pipeline_step(
&mut steps,
"schema_provision",
"skipped",
0,
json!({ "reason": "provision_schema=false" }),
);
}
let register_started = Instant::now();
let description = advanced.description.clone().or_else(|| match &mode {
LocalProvisionMode::DedicatedContainer => {
Some("Managed Postgres instance (pipeline)".to_string())
}
LocalProvisionMode::SharedClusterDatabase => {
Some("Shared cluster database (pipeline)".to_string())
}
});
let metadata = json!({
"managed_by": "provision_pipeline",
"provider": match &mode {
LocalProvisionMode::DedicatedContainer => "docker",
LocalProvisionMode::SharedClusterDatabase => "local_cluster",
},
"mode": local_mode_as_str(&mode),
"shared_cluster_client_name": shared_cluster_client_name,
"instance": instance_data,
});
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&client_name,
description,
&pg_uri,
metadata,
register_runtime,
register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
set_pipeline_step(
&mut steps,
"runtime_register",
if register_runtime {
if runtime_registered {
"success"
} else {
"failed"
}
} else {
"skipped"
},
now_ms(®ister_started),
json!({
"requested": register_runtime,
"registered": runtime_registered,
}),
);
set_pipeline_step(
&mut steps,
"catalog_register",
if register_catalog {
if catalog_registered {
"success"
} else {
"failed"
}
} else {
"skipped"
},
now_ms(®ister_started),
json!({
"requested": register_catalog,
"registered": catalog_registered,
}),
);
api_success(
"Provisioning pipeline completed",
json!({
"pipeline_id": pipeline_id,
"mode": local_mode_as_str(&mode),
"client_name": client_name,
"instance": instance_data,
"credentials": {
"masked_pg_uri": masked_pg_uri(&pg_uri),
"copy_safe_pg_uri": pg_uri,
},
"pipeline": {
"overall_status": "success",
"total_duration_ms": now_ms(&started),
"steps": steps,
"provision_schema": should_provision,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances")]
pub async fn admin_list_postgres_instances(
req: HttpRequest,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let containers: Vec<DockerManagedContainer> = match list_managed_postgres_containers().await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to list Postgres instances", err),
};
let runtime_clients: HashMap<String, RegisteredClient> = state
.pg_registry
.list_registered_clients()
.into_iter()
.map(|client| (client.client_name.to_lowercase(), client))
.collect();
let mut catalog_clients: HashMap<String, AthenaClientRecord> = HashMap::new();
if let Ok(pool) = client_catalog_pool(state.get_ref()) {
match list_athena_clients(&pool).await {
Ok(clients) => {
catalog_clients = clients
.into_iter()
.map(|client| (client.client_name.to_lowercase(), client))
.collect();
}
Err(err) => {
return internal_error("Failed to list catalog clients", err.to_string());
}
}
}
let payload: Vec<Value> = containers
.into_iter()
.map(|container| {
let linked_client_name = container
.labels
.get("athena.client")
.cloned()
.unwrap_or_default();
let linked_key = linked_client_name.to_lowercase();
let runtime_client = runtime_clients.get(&linked_key);
let catalog_client = catalog_clients.get(&linked_key);
let runtime_registered = runtime_client
.map(|client| client.pool_connected)
.unwrap_or(false);
json!({
"container_name": container.container_name,
"running": container.running,
"status": container.status,
"image": container.image,
"host_port": container.host_port,
"labels": container.labels,
"linked_client": if linked_client_name.is_empty() { Value::Null } else {
json!({
"client_name": linked_client_name,
"runtime_registered": runtime_registered,
"runtime_known": runtime_client.is_some(),
"catalog_registered": catalog_client.is_some(),
})
},
"runtime_client": runtime_client,
"catalog_client": catalog_client,
})
})
.collect();
api_success(
"Listed managed Postgres instances",
json!({
"instances": payload,
"count": payload.len(),
}),
)
}
#[post("/admin/provision/instances")]
pub async fn admin_spin_up_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
body: Json<SpinUpInstanceRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let spin_result: SpinUpPostgresResult = match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: body.client_name.clone(),
container_name: body.container_name.clone(),
image: body.image.clone(),
host: body.host.clone(),
host_port: body.host_port,
db_name: body.db_name.clone(),
username: body.username.clone(),
password: body.password.clone(),
startup_timeout_secs: body.startup_timeout_secs,
reuse_existing: body.reuse_existing,
})
.await
{
Ok(result) => result,
Err(err) => return map_provisioning_error("Failed to spin up Postgres instance", err),
};
let mut statements_executed: Option<usize> = None;
if body.provision_schema {
statements_executed = Some(match run_provision_sql(&spin_result.pg_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision Postgres instance", err);
}
});
}
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&spin_result.client_name,
Some(format!(
"Managed Postgres instance ({})",
spin_result.container_name
)),
&spin_result.pg_uri,
json!({
"managed_by": "provision_api",
"provider": "docker",
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Postgres instance is ready",
json!({
"instance": {
"client_name": spin_result.client_name,
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"password": spin_result.password,
"pg_uri": spin_result.pg_uri,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"wait_ready_ms": spin_result.wait_ready_ms,
},
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/neon")]
pub async fn admin_provision_neon(
req: HttpRequest,
state: Data<AppState>,
body: Json<NeonProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let mut project_id = body.project_id.clone();
let mut branch_id = body.branch_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created: NeonProjectCreateResult = match create_neon_project(NeonProjectCreateParams {
api_key,
project_name: body
.project_name
.clone()
.or_else(|| Some(body.client_name.clone())),
project_payload: body.project_create_payload.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Neon project", err),
};
project_id = Some(created.project_id);
if branch_id.is_none() {
branch_id = created.branch_id;
}
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_neon_connection_uri(NeonConnectionParams {
api_key,
project_id,
branch_id: branch_id.clone(),
database_name: body.database_name.clone(),
role_name: body.role_name.clone(),
endpoint_id: body.endpoint_id.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => return map_provisioning_error("Failed to fetch Neon connection URI", err),
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Neon database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Neon database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "neon",
"project_id": project_id,
"branch_id": branch_id,
"database_name": body.database_name,
"role_name": body.role_name,
"endpoint_id": body.endpoint_id,
"created_project": body.create_project_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Neon database",
json!({
"provider": "neon",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/railway")]
pub async fn admin_provision_railway(
req: HttpRequest,
state: Data<AppState>,
body: Json<RailwayProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut project_id = body.project_id.clone();
let mut environment_id = body.environment_id.clone();
let mut service_id = body.service_id.clone();
let mut plugin_id = body.plugin_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_input = match json_object_insert_if_missing(
body.project_create_input.clone(),
"name",
Value::String(
body.project_name
.clone()
.unwrap_or_else(|| format!("athena-{}", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway project_create_input", err),
};
let created = match create_railway_project(RailwayProjectCreateParams {
api_key,
project_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway project", err),
};
project_id = Some(created.project_id);
if environment_id.is_none() {
environment_id = created.base_environment_id;
}
}
if environment_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& let (Some(api_key), Some(project)) = (api_key_for_create.clone(), project_id.clone())
{
environment_id = match fetch_railway_project_base_environment_id(
&api_key,
&project,
body.graphql_url.as_deref(),
)
.await
{
Ok(value) => value,
Err(err) => {
return map_provisioning_error("Failed to resolve Railway base environment", err);
}
};
}
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_input = match json_object_insert_if_missing(
body.service_create_input.clone(),
"projectId",
Value::String(project),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway service_create_input", err),
};
let service_input = match json_object_insert_if_missing(
Some(service_input),
"name",
Value::String(
body.service_name
.clone()
.unwrap_or_else(|| format!("{}-service", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway service_create_input", err),
};
let created = match create_railway_service(RailwayServiceCreateParams {
api_key,
service_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway service", err),
};
service_id = Some(created.service_id);
}
if plugin_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_plugin_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment = match required_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let plugin_input = match json_object_insert_if_missing(
body.plugin_create_input.clone(),
"projectId",
Value::String(project),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"environmentId",
Value::String(environment),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"name",
Value::String(
body.plugin_name
.clone()
.unwrap_or_else(|| "Postgres".to_string()),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let created = match create_railway_plugin(RailwayPluginCreateParams {
api_key,
plugin_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway plugin", err),
};
plugin_id = Some(created.plugin_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment_id = match required_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_railway_connection_uri(RailwayConnectionParams {
api_key,
project_id,
environment_id,
service_id: service_id.clone(),
plugin_id: plugin_id.clone(),
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return map_provisioning_error("Failed to fetch Railway connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Railway database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Railway database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "railway",
"project_id": project_id,
"environment_id": environment_id,
"service_id": service_id,
"plugin_id": plugin_id,
"created_project": body.create_project_if_missing,
"created_service": body.create_service_if_missing,
"created_plugin": body.create_plugin_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Railway database",
json!({
"provider": "railway",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/render")]
pub async fn admin_provision_render(
req: HttpRequest,
state: Data<AppState>,
body: Json<RenderProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut service_id = body.service_id.clone();
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_render_postgres_service(RenderPostgresCreateParams {
api_key,
owner_id: body.owner_id.clone(),
service_name: body.service_name.clone(),
service_payload: body.service_create_payload.clone(),
plan: body.plan.clone(),
region: body.region.clone(),
postgres_version: body.postgres_version.clone(),
disk_size_gb: body.disk_size_gb,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Render service", err),
};
service_id = Some(created.service_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_id = match required_field("service_id", service_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_render_connection_uri(RenderConnectionParams {
api_key,
service_id,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return map_provisioning_error("Failed to fetch Render connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Render database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Render Postgres database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "render",
"service_id": service_id,
"owner_id": body.owner_id,
"service_name": body.service_name,
"plan": body.plan,
"region": body.region,
"postgres_version": body.postgres_version,
"disk_size_gb": body.disk_size_gb,
"created_service": body.create_service_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Render database",
json!({
"provider": "render",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances/{container_name}")]
pub async fn admin_get_postgres_instance_status(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<InstanceStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(status) => status,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let resolved_client_name = if let Some(client_name) = query.client_name.as_ref() {
let resolved = resolve_status_client_name_alias(state.get_ref(), client_name).await;
if resolved.is_empty() {
None
} else {
Some(resolved)
}
} else {
None
};
let runtime_client: Option<RegisteredClient> = resolved_client_name
.as_ref()
.and_then(|client_name| state.pg_registry.registered_client(client_name));
let mut catalog_client: Option<AthenaClientRecord> = None;
if let Some(client_name) = resolved_client_name.as_ref()
&& let Ok(pool) = client_catalog_pool(state.get_ref())
{
catalog_client = match get_athena_client_by_name(&pool, client_name).await {
Ok(value) => value,
Err(err) => return internal_error("Failed to load catalog client", err.to_string()),
};
}
api_success(
"Loaded Postgres instance status",
json!({
"instance": status,
"client_name": resolved_client_name,
"requested_client_name": query.client_name,
"runtime_client": runtime_client,
"catalog_client": catalog_client,
}),
)
}
#[post("/admin/provision/instances/{container_name}/start")]
pub async fn admin_start_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<InstanceLifecycleRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = start_container(&container_name).await {
return map_provisioning_error("Failed to start Postgres instance", err);
}
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let payload = body
.as_ref()
.map(|value| value.0.clone())
.unwrap_or(InstanceLifecycleRequest {
client_name: None,
sync_runtime: true,
});
let resolved_client_name =
match resolve_instance_client_name(&container_name, payload.client_name.as_deref()).await {
Ok(value) => value,
Err(resp) => return resp,
};
let runtime_sync = if payload.sync_runtime {
if let Some(client_name) = resolved_client_name.as_deref() {
match reconnect_runtime_client(state.get_ref(), client_name).await {
Ok(()) => json!({
"requested": true,
"client_name": client_name,
"reconnected": true
}),
Err(err) => json!({
"requested": true,
"client_name": client_name,
"reconnected": false,
"error": err
}),
}
} else {
json!({
"requested": true,
"reconnected": false,
"error": "no linked client_name found for this container"
})
}
} else {
json!({
"requested": false,
"reconnected": false
})
};
api_success(
"Started Postgres instance",
json!({
"instance": status,
"client_name": resolved_client_name,
"runtime_sync": runtime_sync,
}),
)
}
#[post("/admin/provision/instances/{container_name}/stop")]
pub async fn admin_stop_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<InstanceLifecycleRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = stop_container(&container_name).await {
return map_provisioning_error("Failed to stop Postgres instance", err);
}
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let payload = body
.as_ref()
.map(|value| value.0.clone())
.unwrap_or(InstanceLifecycleRequest {
client_name: None,
sync_runtime: true,
});
let resolved_client_name =
match resolve_instance_client_name(&container_name, payload.client_name.as_deref()).await {
Ok(value) => value,
Err(resp) => return resp,
};
let runtime_sync = if payload.sync_runtime {
if let Some(client_name) = resolved_client_name.as_deref() {
state.pg_registry.mark_unavailable(client_name);
json!({
"requested": true,
"client_name": client_name,
"marked_unavailable": true
})
} else {
json!({
"requested": true,
"marked_unavailable": false,
"error": "no linked client_name found for this container"
})
}
} else {
json!({
"requested": false,
"marked_unavailable": false
})
};
api_success(
"Stopped Postgres instance",
json!({
"instance": status,
"client_name": resolved_client_name,
"runtime_sync": runtime_sync,
}),
)
}
#[post("/admin/provision/instances/{container_name}/bindings")]
pub async fn admin_bind_postgres_instance_port_route(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<BindInstancePortRouteRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
let route_key = match normalize_route_binding_key(&body.route_key) {
Ok(value) => value,
Err(resp) => return resp,
};
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let Some(target_port) = status.host_port else {
return service_unavailable(
"Port unavailable",
format!(
"Postgres container '{}' is missing a published host port.",
container_name
),
);
};
let client_name =
match resolve_instance_client_name(&container_name, body.client_name.as_deref()).await {
Ok(Some(value)) => value,
Ok(None) => {
return bad_request(
"Missing client name",
"Provide 'client_name' or label the container with 'athena.client'.",
);
}
Err(resp) => return resp,
};
let source_pg_uri = match resolve_client_connection_uri(state.get_ref(), &client_name).await {
Ok(Some(value)) => value,
Ok(None) => {
return service_unavailable(
"Client URI unavailable",
format!(
"No Postgres URI is available for client '{}'. Register runtime/catalog metadata first.",
client_name
),
);
}
Err(resp) => return resp,
};
let public_host = match resolve_public_host(&req, body.public_host.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let public_port = body.public_port.unwrap_or(target_port);
let public_pg_uri =
match rewrite_postgres_uri_authority(&source_pg_uri, &public_host, public_port) {
Ok(value) => value,
Err(resp) => return resp,
};
let binding = json!({
"route_key": route_key,
"client_name": client_name,
"container_name": container_name,
"target": {
"host": "127.0.0.1",
"port": target_port
},
"public": {
"host": public_host,
"port": public_port
},
"public_pg_uri": public_pg_uri,
"protocol": "postgresql/tcp",
"note": "PostgreSQL is TCP-based. HTTP path aliases like /xxxxx cannot carry PostgreSQL wire protocol; use host:port routing."
});
let mut persisted = false;
if body.persist_in_catalog {
let pool = match client_catalog_pool(state.get_ref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let existing = match get_athena_client_by_name(&pool, &client_name).await {
Ok(value) => value,
Err(err) => return internal_error("Failed to load catalog client", err.to_string()),
};
let Some(existing) = existing else {
return not_found(
"Catalog client not found",
format!("No catalog client '{}' was found.", client_name),
);
};
let mut metadata = existing.metadata.as_object().cloned().unwrap_or_default();
let network = metadata
.entry("network".to_string())
.or_insert_with(|| json!({}));
if !network.is_object() {
*network = json!({});
}
let Some(network_obj) = network.as_object_mut() else {
return internal_error(
"Failed to persist route binding metadata",
"Client metadata network field is not a JSON object.",
);
};
let routes = network_obj
.entry("pg_route_bindings".to_string())
.or_insert_with(|| json!({}));
if !routes.is_object() {
*routes = json!({});
}
let Some(routes_obj) = routes.as_object_mut() else {
return internal_error(
"Failed to persist route binding metadata",
"Client metadata pg_route_bindings field is not a JSON object.",
);
};
routes_obj.insert(route_key.clone(), binding.clone());
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: existing.client_name,
description: existing.description,
pg_uri: existing.pg_uri,
pg_uri_env_var: existing.pg_uri_env_var,
config_uri_template: existing.config_uri_template,
source: existing.source,
is_active: existing.is_active,
is_frozen: existing.is_frozen,
metadata: Value::Object(metadata),
},
)
.await
{
return internal_error("Failed to persist route binding metadata", err.to_string());
}
persisted = true;
}
api_success(
"Bound managed instance port route",
json!({
"binding": binding,
"persisted_in_catalog": persisted,
}),
)
}
#[delete("/admin/provision/instances/{container_name}")]
pub async fn admin_delete_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<DeleteInstanceRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = remove_container(&container_name).await {
return map_provisioning_error("Failed to delete Postgres instance", err);
}
let payload: DeleteInstanceRequest =
body.as_ref()
.map(|value| value.0.clone())
.unwrap_or(DeleteInstanceRequest {
client_name: None,
remove_runtime_client: true,
remove_catalog_client: true,
});
let mut runtime_removed: bool = false;
let mut catalog_removed: bool = false;
if let Some(client_name) = payload.client_name.as_ref() {
if payload.remove_runtime_client {
state.pg_registry.remove_client(client_name);
runtime_removed = true;
}
if payload.remove_catalog_client {
let pool: Pool<Postgres> = match client_catalog_pool(state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match delete_athena_client(&pool, client_name).await {
Ok(Some(_)) => catalog_removed = true,
Ok(None) => catalog_removed = false,
Err(err) => {
return internal_error("Failed to delete catalog client", err.to_string());
}
}
}
}
api_success(
"Deleted Postgres instance",
json!({
"container_name": container_name,
"client_name": payload.client_name,
"runtime_removed": runtime_removed,
"catalog_removed": catalog_removed,
}),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_provision)
.service(admin_provision_status)
.service(admin_run_local_provision_pipeline)
.service(admin_list_local_cluster_databases)
.service(admin_create_local_cluster_database)
.service(admin_list_postgres_instances)
.service(admin_spin_up_postgres_instance)
.service(admin_provision_neon)
.service(admin_provision_railway)
.service(admin_provision_render)
.service(admin_get_postgres_instance_status)
.service(admin_start_postgres_instance)
.service(admin_stop_postgres_instance)
.service(admin_bind_postgres_instance_port_route)
.service(admin_delete_postgres_instance);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_route_binding_key_accepts_expected_shape() {
let value = normalize_route_binding_key("Tenant_01-ReadWrite").expect("valid key");
assert_eq!(value, "tenant_01-readwrite");
}
#[test]
fn normalize_public_host_strips_scheme_path_and_port() {
let value =
normalize_public_host("https://cluster.athena-db.com:4052/some/path").expect("host");
assert_eq!(value, "cluster.athena-db.com");
}
#[test]
fn rewrite_postgres_uri_authority_rewrites_host_and_port() {
let rewritten = rewrite_postgres_uri_authority(
"postgres://athena:secret@127.0.0.1:5432/example?sslmode=disable",
"cluster.athena-db.com",
45432,
)
.expect("rewrite");
assert_eq!(
rewritten,
"postgres://athena:secret@cluster.athena-db.com:45432/example?sslmode=disable"
);
}
}