use actix_web::{
HttpRequest, HttpResponse, delete, get, post,
web::{self, Data, Json, Path, Query},
};
use serde::Deserialize;
use serde_json::{Map, Value, json};
use sqlx::postgres::PgPool;
use sqlx::{Pool, Postgres, Row};
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
api_created, api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};
use crate::data::clients::AthenaClientRecord;
use crate::data::clients::{
SaveAthenaClientParams, delete_athena_client, get_athena_client_by_name, list_athena_clients,
upsert_athena_client,
};
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, RegisteredClient};
use crate::parser::resolve_postgres_uri;
use crate::provisioning::{
DockerContainerStatus, DockerManagedContainer, EXPECTED_TABLES,
LocalClusterCreateDatabaseParams, LocalClusterDatabaseCreateOptions, NeonConnectionParams,
NeonProjectCreateParams, NeonProjectCreateResult, ProvisioningError, RailwayConnectionParams,
RailwayPluginCreateParams, RailwayProjectCreateParams, RailwayServiceCreateParams,
RenderConnectionParams, RenderPostgresCreateParams, SpinUpPostgresParams, SpinUpPostgresResult,
create_neon_project, create_postgres_database, create_railway_plugin, create_railway_project,
create_railway_service, create_render_postgres_service, fetch_neon_connection_uri,
fetch_railway_connection_uri, fetch_railway_project_base_environment_id,
fetch_render_connection_uri, inspect_container, json_object_insert_if_missing,
list_managed_postgres_containers, list_postgres_databases, postgres_uri_database_name,
postgres_uri_fingerprint, remove_container, replace_uri_database_name, run_provision_sql,
spin_up_postgres_instance,
};
#[derive(Debug, Deserialize)]
struct ProvisionRequest {
#[serde(default)]
uri: Option<String>,
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
register: bool,
#[serde(default)]
register_name: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ProvisionStatusQuery {
client_name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct LocalClusterDatabasesQuery {
client_name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct LocalClusterCreateDatabaseRequest {
client_name: String,
database_name: String,
#[serde(default)]
owner: Option<String>,
#[serde(default)]
template: Option<String>,
#[serde(default)]
encoding: Option<String>,
#[serde(default)]
lc_collate: Option<String>,
#[serde(default)]
lc_ctype: Option<String>,
#[serde(default)]
tablespace: Option<String>,
#[serde(default)]
register_name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct SpinUpInstanceRequest {
client_name: String,
#[serde(default)]
container_name: Option<String>,
#[serde(default)]
image: Option<String>,
#[serde(default)]
host: Option<String>,
#[serde(default)]
host_port: Option<u16>,
#[serde(default)]
db_name: Option<String>,
#[serde(default)]
username: Option<String>,
#[serde(default)]
password: Option<String>,
#[serde(default)]
startup_timeout_secs: Option<u64>,
#[serde(default)]
reuse_existing: bool,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct DeleteInstanceRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default = "default_true")]
remove_runtime_client: bool,
#[serde(default = "default_true")]
remove_catalog_client: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct InstanceStatusQuery {
#[serde(default)]
client_name: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
enum LocalProvisionMode {
DedicatedContainer,
SharedClusterDatabase,
}
impl Default for LocalProvisionMode {
fn default() -> Self {
Self::DedicatedContainer
}
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
struct LocalProvisionAdvancedOptions {
#[serde(default)]
container_name: Option<String>,
#[serde(default)]
image: Option<String>,
#[serde(default)]
host: Option<String>,
#[serde(default)]
host_port: Option<u16>,
#[serde(default)]
startup_timeout_secs: Option<u64>,
#[serde(default)]
db_name: Option<String>,
#[serde(default)]
username: Option<String>,
#[serde(default)]
password: Option<String>,
#[serde(default)]
reuse_existing: Option<bool>,
#[serde(default)]
provision_schema: Option<bool>,
#[serde(default)]
register_runtime: Option<bool>,
#[serde(default)]
register_catalog: Option<bool>,
#[serde(default)]
description: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct LocalProvisionPipelineRequest {
client_name: String,
#[serde(default)]
mode: LocalProvisionMode,
#[serde(default)]
shared_cluster_client_name: Option<String>,
#[serde(default)]
advanced: Option<LocalProvisionAdvancedOptions>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct NeonProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
branch_id: Option<String>,
#[serde(default)]
database_name: Option<String>,
#[serde(default)]
role_name: Option<String>,
#[serde(default)]
endpoint_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_payload: Option<Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct RailwayProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
environment_id: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
plugin_id: Option<String>,
#[serde(default)]
graphql_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_input: Option<Value>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_input: Option<Value>,
#[serde(default)]
create_plugin_if_missing: bool,
#[serde(default)]
plugin_name: Option<String>,
#[serde(default)]
plugin_create_input: Option<Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct RenderProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_payload: Option<Value>,
#[serde(default)]
plan: Option<String>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
postgres_version: Option<String>,
#[serde(default)]
disk_size_gb: Option<u32>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
fn default_true() -> bool {
true
}
fn map_provisioning_error(context: &str, err: ProvisioningError) -> HttpResponse {
match err {
ProvisioningError::InvalidInput(message) => bad_request(context, message),
ProvisioningError::Conflict(message) => conflict(context, message),
ProvisioningError::Unavailable(message) => service_unavailable(context, message),
ProvisioningError::Execution(message) => internal_error(context, message),
}
}
fn resolve_uri(state: &AppState, req: &ProvisionRequest) -> Result<String, HttpResponse> {
match (&req.uri, &req.client_name) {
(Some(uri), None) => Ok(uri.clone()),
(None, Some(client_name)) => {
let registered: RegisteredClient = state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("No Postgres client named '{}' is registered.", client_name),
)
})?;
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(registered.pg_uri)
.ok_or_else(|| {
bad_request(
"Client URI unavailable",
format!("No Postgres URI is available for client '{}'.", client_name),
)
})
}
(Some(_), Some(_)) => Err(bad_request(
"Ambiguous target",
"Provide either 'uri' or 'client_name', not both.",
)),
(None, None) => Err(bad_request(
"Missing target",
"Provide either 'uri' (direct Postgres URI) or 'client_name' (registered client).",
)),
}
}
fn resolve_registered_client_uri(
state: &AppState,
client_name: &str,
) -> Result<String, HttpResponse> {
let registered: RegisteredClient = state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("No Postgres client named '{}' is registered.", client_name),
)
})?;
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(registered.pg_uri)
.ok_or_else(|| {
bad_request(
"Client URI unavailable",
format!("No Postgres URI is available for client '{}'.", client_name),
)
})
}
fn local_cluster_target(
state: &AppState,
client_name: &str,
) -> Result<(PgPool, String), HttpResponse> {
let pool: Pool<Postgres> = state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client unavailable",
format!("Postgres client '{}' is not connected.", client_name),
)
})?;
let pg_uri = resolve_registered_client_uri(state, client_name)?;
Ok((pool, pg_uri))
}
async fn load_athena_managed_databases(
state: &AppState,
server_pg_uri: &str,
) -> Result<Vec<Value>, HttpResponse> {
let target_fingerprint = postgres_uri_fingerprint(server_pg_uri).ok_or_else(|| {
bad_request(
"Client URI unavailable",
"Failed to derive a server fingerprint for the selected client URI.",
)
})?;
let pool: Pool<Postgres> = client_catalog_pool(state)?;
let clients: Vec<AthenaClientRecord> = list_athena_clients(&pool)
.await
.map_err(|err| internal_error("Failed to load catalog clients", err.to_string()))?;
let mut seen: HashSet<String> = HashSet::new();
let mut managed: Vec<Value> = Vec::new();
for client in clients {
let candidate_uri: Option<String> = client
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(client.pg_uri.clone());
let Some(candidate_uri) = candidate_uri else {
continue;
};
if postgres_uri_fingerprint(&candidate_uri).as_deref() != Some(target_fingerprint.as_str())
{
continue;
}
let Some(database_name) = postgres_uri_database_name(&candidate_uri) else {
continue;
};
let dedupe: String = format!("{}:{}", client.client_name, database_name);
if !seen.insert(dedupe) {
continue;
}
managed.push(json!({
"client_name": client.client_name,
"database_name": database_name,
"source": client.source,
"is_active": client.is_active,
"is_frozen": client.is_frozen,
}));
}
managed.sort_by(|a, b| {
let a_name = a
.get("database_name")
.and_then(Value::as_str)
.unwrap_or_default();
let b_name = b
.get("database_name")
.and_then(Value::as_str)
.unwrap_or_default();
a_name.cmp(b_name)
});
Ok(managed)
}
fn client_catalog_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Err(service_unavailable(
"Client catalog unavailable",
"No athena_logging client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client catalog unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})
}
fn required_field(name: &str, value: Option<String>) -> Result<String, HttpResponse> {
let value: String = value.unwrap_or_default();
if value.trim().is_empty() {
return Err(bad_request(
"Missing required field",
format!(
"Provide '{}' or set 'connection_uri' to bypass provider API lookup.",
name
),
));
}
Ok(value)
}
async fn register_provisioned_client(
state: &AppState,
client_name: &str,
description: Option<String>,
pg_uri: &str,
metadata: serde_json::Value,
register_runtime: bool,
register_catalog: bool,
) -> Result<(bool, bool), HttpResponse> {
let mut runtime_registered: bool = false;
let mut catalog_registered: bool = false;
if register_runtime {
let target: ClientConnectionTarget = ClientConnectionTarget {
client_name: client_name.to_string(),
source: "database".to_string(),
description: description.clone(),
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
return Err(internal_error(
"Failed to register runtime client",
format!("Runtime client registration failed: {}", err),
));
}
runtime_registered = true;
}
if register_catalog {
let pool: Pool<Postgres> = client_catalog_pool(state)?;
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: client_name.to_string(),
description,
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
source: "database".to_string(),
is_active: true,
is_frozen: false,
metadata,
},
)
.await
{
return Err(internal_error(
"Failed to register client in catalog",
err.to_string(),
));
}
catalog_registered = true;
}
Ok((runtime_registered, catalog_registered))
}
fn bool_or_default(value: Option<bool>, default: bool) -> bool {
value.unwrap_or(default)
}
fn now_ms(started: &Instant) -> u128 {
started.elapsed().as_millis()
}
fn masked_pg_uri(pg_uri: &str) -> String {
if let Some((scheme, rest)) = pg_uri.split_once("://")
&& let Some((auth_part, host_part)) = rest.split_once('@')
&& let Some((user, _password)) = auth_part.split_once(':')
{
return format!("{scheme}://{user}:***@{host_part}");
}
pg_uri.to_string()
}
fn set_pipeline_step(
steps: &mut Map<String, Value>,
name: &str,
status: &str,
duration_ms: u128,
details: Value,
) {
steps.insert(
name.to_string(),
json!({
"status": status,
"duration_ms": duration_ms,
"details": details,
}),
);
}
fn local_mode_as_str(mode: &LocalProvisionMode) -> &'static str {
match mode {
LocalProvisionMode::DedicatedContainer => "dedicated_container",
LocalProvisionMode::SharedClusterDatabase => "shared_cluster_database",
}
}
fn pipeline_error_response(
err: ProvisioningError,
pipeline_id: &str,
mode: &LocalProvisionMode,
client_name: &str,
rollback_hint: Option<&str>,
) -> HttpResponse {
let context = format!(
"pipeline_id={}, mode={}, client_name={}",
pipeline_id,
local_mode_as_str(mode),
client_name
);
let suffix = rollback_hint
.map(|hint| format!(" {}.", hint))
.unwrap_or_default();
match err {
ProvisioningError::InvalidInput(message) => bad_request(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
ProvisioningError::Conflict(message) => conflict(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
ProvisioningError::Unavailable(message) => service_unavailable(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
ProvisioningError::Execution(message) => internal_error(
"Local provisioning pipeline failed",
format!("{}: {}.{}", context, message, suffix),
),
}
}
#[post("/admin/provision")]
pub async fn admin_provision(
req: HttpRequest,
state: Data<AppState>,
body: Json<ProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pg_uri: String = match resolve_uri(state.get_ref(), &body) {
Ok(uri) => uri,
Err(resp) => return resp,
};
let statement_count: usize = match run_provision_sql(&pg_uri).await {
Ok(n) => n,
Err(err) => return map_provisioning_error("Provisioning failed", err),
};
let registered_name: Option<String> = if body.register && body.uri.is_some() {
let name: String = body
.register_name
.clone()
.unwrap_or_else(|| "provisioned".to_string());
let target = ClientConnectionTarget {
client_name: name.clone(),
source: "database".to_string(),
description: Some("Provisioned via API".to_string()),
pg_uri: Some(pg_uri.clone()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
tracing::warn!(
client = %name,
error = %err,
"provisioned but failed to register client"
);
}
Some(name)
} else {
None
};
let client_label = body
.client_name
.clone()
.or(registered_name.clone())
.unwrap_or_else(|| "direct".to_string());
api_success(
format!("Provisioned {} statements", statement_count),
json!({
"statements_executed": statement_count,
"client": client_label,
"registered": registered_name,
"tables": EXPECTED_TABLES,
}),
)
}
#[get("/admin/provision/status")]
pub async fn admin_provision_status(
req: HttpRequest,
state: Data<AppState>,
query: Query<ProvisionStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = state
.pg_registry
.get_pool(&query.client_name)
.ok_or_else(|| {
service_unavailable(
"Client unavailable",
format!("Postgres client '{}' is not connected.", query.client_name),
)
});
let pool = match pool {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows = match sqlx::query(
"SELECT table_name::text FROM information_schema.tables WHERE table_schema = 'public'",
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => return internal_error("Failed to query tables", err.to_string()),
};
let existing_tables: Vec<String> = rows
.iter()
.filter_map(|row| row.try_get::<String, _>("table_name").ok())
.collect();
let present: Vec<&str> = EXPECTED_TABLES
.iter()
.filter(|table| existing_tables.iter().any(|existing| existing == **table))
.copied()
.collect();
let missing: Vec<&str> = EXPECTED_TABLES
.iter()
.filter(|table| !existing_tables.iter().any(|existing| existing == **table))
.copied()
.collect();
api_success(
format!("Provisioning status for '{}'", query.client_name),
json!({
"client_name": query.client_name,
"provisioned": missing.is_empty(),
"present_tables": present,
"missing_tables": missing,
}),
)
}
#[get("/admin/provision/local/databases")]
pub async fn admin_list_local_cluster_databases(
req: HttpRequest,
state: Data<AppState>,
query: Query<LocalClusterDatabasesQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &query.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let all_databases: Vec<String> = match list_postgres_databases(&pool).await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to list local cluster databases", err),
};
let managed_databases: Vec<Value> =
match load_athena_managed_databases(state.get_ref(), &server_pg_uri).await {
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Listed local cluster databases",
json!({
"client_name": query.client_name,
"all_databases": all_databases,
"athena_managed_databases": managed_databases,
}),
)
}
#[post("/admin/provision/local/databases")]
pub async fn admin_create_local_cluster_database(
req: HttpRequest,
state: Data<AppState>,
body: Json<LocalClusterCreateDatabaseRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &body.client_name) {
Ok(value) => value,
Err(resp) => return resp,
};
let create_params = LocalClusterCreateDatabaseParams {
database_name: body.database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: body.owner.clone(),
template: body.template.clone(),
encoding: body.encoding.clone(),
lc_collate: body.lc_collate.clone(),
lc_ctype: body.lc_ctype.clone(),
tablespace: body.tablespace.clone(),
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
return map_provisioning_error("Failed to create local cluster database", err);
}
let database_uri: String = match replace_uri_database_name(&server_pg_uri, &body.database_name)
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to build database URI", err),
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&database_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision newly created database", err);
}
})
} else {
None
};
let register_name: String = body
.register_name
.clone()
.unwrap_or_else(|| body.database_name.clone());
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
®ister_name,
body.description
.clone()
.or_else(|| Some(format!("Local cluster database '{}'", body.database_name))),
&database_uri,
json!({
"managed_by": "provision_api",
"provider": "local_cluster",
"server_client_name": body.client_name.clone(),
"database_name": body.database_name.clone(),
"create_options": {
"owner": body.owner.clone(),
"template": body.template.clone(),
"encoding": body.encoding.clone(),
"lc_collate": body.lc_collate.clone(),
"lc_ctype": body.lc_ctype.clone(),
"tablespace": body.tablespace.clone(),
},
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_created(
"Created local cluster database",
json!({
"server_client_name": body.client_name,
"database_name": body.database_name,
"database_uri": database_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
"register_name": register_name,
}
}),
)
}
#[post("/admin/provision/local/pipeline")]
pub async fn admin_run_local_provision_pipeline(
req: HttpRequest,
state: Data<AppState>,
body: Json<LocalProvisionPipelineRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = body.client_name.trim().to_lowercase();
if client_name.is_empty() {
return bad_request("Missing required field", "'client_name' must not be empty.");
}
let mode = body.mode.clone();
let advanced = body.advanced.clone().unwrap_or_default();
let pipeline_id = Uuid::new_v4().to_string();
let started = Instant::now();
let mut steps = Map::new();
let should_provision = bool_or_default(advanced.provision_schema, true);
let register_runtime = bool_or_default(advanced.register_runtime, true);
let register_catalog = bool_or_default(advanced.register_catalog, true);
let mut shared_cluster_client_name: Option<String> = None;
let pg_uri: String;
let instance_data: Value;
let infra_started = Instant::now();
match &mode {
LocalProvisionMode::DedicatedContainer => {
let spin_result = match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: client_name.clone(),
container_name: advanced.container_name.clone(),
image: advanced.image.clone(),
host: advanced.host.clone(),
host_port: advanced.host_port,
db_name: advanced.db_name.clone(),
username: advanced.username.clone(),
password: advanced.password.clone(),
startup_timeout_secs: advanced.startup_timeout_secs,
reuse_existing: bool_or_default(advanced.reuse_existing, true),
})
.await
{
Ok(value) => value,
Err(err) => {
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"failed",
now_ms(&infra_started),
json!({ "error": "Failed to create or reuse Docker container." }),
);
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
};
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"success",
now_ms(&infra_started),
json!({
"container_name": spin_result.container_name,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"host_port": spin_result.host_port,
}),
);
set_pipeline_step(
&mut steps,
"readiness_check",
"success",
spin_result.wait_ready_ms,
json!({ "wait_ready_ms": spin_result.wait_ready_ms }),
);
pg_uri = spin_result.pg_uri.clone();
instance_data = json!({
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"password": spin_result.password,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
});
}
LocalProvisionMode::SharedClusterDatabase => {
let shared_client = body
.shared_cluster_client_name
.as_deref()
.map(str::trim)
.unwrap_or_default()
.to_string();
if shared_client.is_empty() {
return bad_request(
"Missing required field",
"'shared_cluster_client_name' is required for mode='shared_cluster_database'.",
);
}
shared_cluster_client_name = Some(shared_client.clone());
let (pool, server_pg_uri) = match local_cluster_target(state.get_ref(), &shared_client)
{
Ok(value) => value,
Err(resp) => return resp,
};
let database_name = advanced
.db_name
.clone()
.unwrap_or_else(|| client_name.clone());
let create_params = LocalClusterCreateDatabaseParams {
database_name: database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: None,
template: None,
encoding: None,
lc_collate: None,
lc_ctype: None,
tablespace: None,
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"failed",
now_ms(&infra_started),
json!({ "error": "Failed to create database in shared cluster." }),
);
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
set_pipeline_step(
&mut steps,
"infra_create_or_reuse",
"success",
now_ms(&infra_started),
json!({
"shared_cluster_client_name": shared_client,
"database_name": database_name,
}),
);
set_pipeline_step(
&mut steps,
"readiness_check",
"success",
0,
json!({ "strategy": "shared_cluster_existing_connection" }),
);
pg_uri = match replace_uri_database_name(&server_pg_uri, &database_name) {
Ok(value) => value,
Err(err) => {
return pipeline_error_response(err, &pipeline_id, &mode, &client_name, None);
}
};
instance_data = json!({
"shared_cluster_client_name": shared_client,
"database_name": database_name,
});
}
}
let provision_started = Instant::now();
let mut statements_executed: Option<usize> = None;
if should_provision {
match run_provision_sql(&pg_uri).await {
Ok(total) => {
statements_executed = Some(total);
set_pipeline_step(
&mut steps,
"schema_provision",
"success",
now_ms(&provision_started),
json!({ "statements_executed": total }),
);
}
Err(err) => {
set_pipeline_step(
&mut steps,
"schema_provision",
"failed",
now_ms(&provision_started),
json!({ "error": "Failed to apply Athena schema." }),
);
let rollback_hint = if let Some(name) =
instance_data.get("container_name").and_then(Value::as_str)
{
Some(format!(
"Rollback hint: delete container '{}' via DELETE /admin/provision/instances/{}",
name, name
))
} else {
None
};
return pipeline_error_response(
err,
&pipeline_id,
&mode,
&client_name,
rollback_hint.as_deref(),
);
}
}
} else {
set_pipeline_step(
&mut steps,
"schema_provision",
"skipped",
0,
json!({ "reason": "provision_schema=false" }),
);
}
let register_started = Instant::now();
let description = advanced.description.clone().or_else(|| match &mode {
LocalProvisionMode::DedicatedContainer => {
Some("Managed Postgres instance (pipeline)".to_string())
}
LocalProvisionMode::SharedClusterDatabase => {
Some("Shared cluster database (pipeline)".to_string())
}
});
let metadata = json!({
"managed_by": "provision_pipeline",
"provider": match &mode {
LocalProvisionMode::DedicatedContainer => "docker",
LocalProvisionMode::SharedClusterDatabase => "local_cluster",
},
"mode": local_mode_as_str(&mode),
"shared_cluster_client_name": shared_cluster_client_name,
"instance": instance_data,
});
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&client_name,
description,
&pg_uri,
metadata,
register_runtime,
register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
set_pipeline_step(
&mut steps,
"runtime_register",
if register_runtime {
if runtime_registered {
"success"
} else {
"failed"
}
} else {
"skipped"
},
now_ms(®ister_started),
json!({
"requested": register_runtime,
"registered": runtime_registered,
}),
);
set_pipeline_step(
&mut steps,
"catalog_register",
if register_catalog {
if catalog_registered {
"success"
} else {
"failed"
}
} else {
"skipped"
},
now_ms(®ister_started),
json!({
"requested": register_catalog,
"registered": catalog_registered,
}),
);
api_success(
"Provisioning pipeline completed",
json!({
"pipeline_id": pipeline_id,
"mode": local_mode_as_str(&mode),
"client_name": client_name,
"instance": instance_data,
"credentials": {
"masked_pg_uri": masked_pg_uri(&pg_uri),
"copy_safe_pg_uri": pg_uri,
},
"pipeline": {
"overall_status": "success",
"total_duration_ms": now_ms(&started),
"steps": steps,
"provision_schema": should_provision,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances")]
pub async fn admin_list_postgres_instances(
req: HttpRequest,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let containers: Vec<DockerManagedContainer> = match list_managed_postgres_containers().await {
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to list Postgres instances", err),
};
let runtime_clients: HashMap<String, RegisteredClient> = state
.pg_registry
.list_registered_clients()
.into_iter()
.map(|client| (client.client_name.to_lowercase(), client))
.collect();
let mut catalog_clients: HashMap<String, AthenaClientRecord> = HashMap::new();
if let Ok(pool) = client_catalog_pool(state.get_ref()) {
match list_athena_clients(&pool).await {
Ok(clients) => {
catalog_clients = clients
.into_iter()
.map(|client| (client.client_name.to_lowercase(), client))
.collect();
}
Err(err) => {
return internal_error("Failed to list catalog clients", err.to_string());
}
}
}
let payload: Vec<Value> = containers
.into_iter()
.map(|container| {
let linked_client_name = container
.labels
.get("athena.client")
.cloned()
.unwrap_or_default();
let linked_key = linked_client_name.to_lowercase();
let runtime_client = runtime_clients.get(&linked_key);
let catalog_client = catalog_clients.get(&linked_key);
json!({
"container_name": container.container_name,
"running": container.running,
"status": container.status,
"image": container.image,
"host_port": container.host_port,
"labels": container.labels,
"linked_client": if linked_client_name.is_empty() { Value::Null } else {
json!({
"client_name": linked_client_name,
"runtime_registered": runtime_client.is_some(),
"catalog_registered": catalog_client.is_some(),
})
},
"runtime_client": runtime_client,
"catalog_client": catalog_client,
})
})
.collect();
api_success(
"Listed managed Postgres instances",
json!({
"instances": payload,
"count": payload.len(),
}),
)
}
#[post("/admin/provision/instances")]
pub async fn admin_spin_up_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
body: Json<SpinUpInstanceRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let spin_result: SpinUpPostgresResult = match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: body.client_name.clone(),
container_name: body.container_name.clone(),
image: body.image.clone(),
host: body.host.clone(),
host_port: body.host_port,
db_name: body.db_name.clone(),
username: body.username.clone(),
password: body.password.clone(),
startup_timeout_secs: body.startup_timeout_secs,
reuse_existing: body.reuse_existing,
})
.await
{
Ok(result) => result,
Err(err) => return map_provisioning_error("Failed to spin up Postgres instance", err),
};
let mut statements_executed: Option<usize> = None;
if body.provision_schema {
statements_executed = Some(match run_provision_sql(&spin_result.pg_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision Postgres instance", err);
}
});
}
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&spin_result.client_name,
Some(format!(
"Managed Postgres instance ({})",
spin_result.container_name
)),
&spin_result.pg_uri,
json!({
"managed_by": "provision_api",
"provider": "docker",
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Postgres instance is ready",
json!({
"instance": {
"client_name": spin_result.client_name,
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"password": spin_result.password,
"pg_uri": spin_result.pg_uri,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"wait_ready_ms": spin_result.wait_ready_ms,
},
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/neon")]
pub async fn admin_provision_neon(
req: HttpRequest,
state: Data<AppState>,
body: Json<NeonProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let mut project_id = body.project_id.clone();
let mut branch_id = body.branch_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created: NeonProjectCreateResult = match create_neon_project(NeonProjectCreateParams {
api_key,
project_name: body
.project_name
.clone()
.or_else(|| Some(body.client_name.clone())),
project_payload: body.project_create_payload.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Neon project", err),
};
project_id = Some(created.project_id);
if branch_id.is_none() {
branch_id = created.branch_id;
}
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_neon_connection_uri(NeonConnectionParams {
api_key,
project_id,
branch_id: branch_id.clone(),
database_name: body.database_name.clone(),
role_name: body.role_name.clone(),
endpoint_id: body.endpoint_id.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => return map_provisioning_error("Failed to fetch Neon connection URI", err),
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Neon database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Neon database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "neon",
"project_id": project_id,
"branch_id": branch_id,
"database_name": body.database_name,
"role_name": body.role_name,
"endpoint_id": body.endpoint_id,
"created_project": body.create_project_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Neon database",
json!({
"provider": "neon",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/railway")]
pub async fn admin_provision_railway(
req: HttpRequest,
state: Data<AppState>,
body: Json<RailwayProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut project_id = body.project_id.clone();
let mut environment_id = body.environment_id.clone();
let mut service_id = body.service_id.clone();
let mut plugin_id = body.plugin_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_input = match json_object_insert_if_missing(
body.project_create_input.clone(),
"name",
Value::String(
body.project_name
.clone()
.unwrap_or_else(|| format!("athena-{}", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway project_create_input", err),
};
let created = match create_railway_project(RailwayProjectCreateParams {
api_key,
project_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway project", err),
};
project_id = Some(created.project_id);
if environment_id.is_none() {
environment_id = created.base_environment_id;
}
}
if environment_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& let (Some(api_key), Some(project)) = (api_key_for_create.clone(), project_id.clone())
{
environment_id = match fetch_railway_project_base_environment_id(
&api_key,
&project,
body.graphql_url.as_deref(),
)
.await
{
Ok(value) => value,
Err(err) => {
return map_provisioning_error("Failed to resolve Railway base environment", err);
}
};
}
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_input = match json_object_insert_if_missing(
body.service_create_input.clone(),
"projectId",
Value::String(project),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway service_create_input", err),
};
let service_input = match json_object_insert_if_missing(
Some(service_input),
"name",
Value::String(
body.service_name
.clone()
.unwrap_or_else(|| format!("{}-service", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway service_create_input", err),
};
let created = match create_railway_service(RailwayServiceCreateParams {
api_key,
service_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway service", err),
};
service_id = Some(created.service_id);
}
if plugin_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_plugin_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment = match required_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let plugin_input = match json_object_insert_if_missing(
body.plugin_create_input.clone(),
"projectId",
Value::String(project),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"environmentId",
Value::String(environment),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"name",
Value::String(
body.plugin_name
.clone()
.unwrap_or_else(|| "Postgres".to_string()),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let created = match create_railway_plugin(RailwayPluginCreateParams {
api_key,
plugin_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway plugin", err),
};
plugin_id = Some(created.plugin_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment_id = match required_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_railway_connection_uri(RailwayConnectionParams {
api_key,
project_id,
environment_id,
service_id: service_id.clone(),
plugin_id: plugin_id.clone(),
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return map_provisioning_error("Failed to fetch Railway connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Railway database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Railway database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "railway",
"project_id": project_id,
"environment_id": environment_id,
"service_id": service_id,
"plugin_id": plugin_id,
"created_project": body.create_project_if_missing,
"created_service": body.create_service_if_missing,
"created_plugin": body.create_plugin_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Railway database",
json!({
"provider": "railway",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/render")]
pub async fn admin_provision_render(
req: HttpRequest,
state: Data<AppState>,
body: Json<RenderProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut service_id = body.service_id.clone();
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_render_postgres_service(RenderPostgresCreateParams {
api_key,
owner_id: body.owner_id.clone(),
service_name: body.service_name.clone(),
service_payload: body.service_create_payload.clone(),
plan: body.plan.clone(),
region: body.region.clone(),
postgres_version: body.postgres_version.clone(),
disk_size_gb: body.disk_size_gb,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Render service", err),
};
service_id = Some(created.service_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_id = match required_field("service_id", service_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_render_connection_uri(RenderConnectionParams {
api_key,
service_id,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return map_provisioning_error("Failed to fetch Render connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Render database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Render Postgres database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "render",
"service_id": service_id,
"owner_id": body.owner_id,
"service_name": body.service_name,
"plan": body.plan,
"region": body.region,
"postgres_version": body.postgres_version,
"disk_size_gb": body.disk_size_gb,
"created_service": body.create_service_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Render database",
json!({
"provider": "render",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances/{container_name}")]
pub async fn admin_get_postgres_instance_status(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<InstanceStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
let status: DockerContainerStatus = match inspect_container(&container_name).await {
Ok(status) => status,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let runtime_client: Option<RegisteredClient> = query
.client_name
.as_ref()
.and_then(|client_name| state.pg_registry.registered_client(client_name));
let mut catalog_client: Option<AthenaClientRecord> = None;
if let Some(client_name) = query.client_name.as_ref()
&& let Ok(pool) = client_catalog_pool(state.get_ref())
{
catalog_client = match get_athena_client_by_name(&pool, client_name).await {
Ok(value) => value,
Err(err) => return internal_error("Failed to load catalog client", err.to_string()),
};
}
api_success(
"Loaded Postgres instance status",
json!({
"instance": status,
"client_name": query.client_name,
"runtime_client": runtime_client,
"catalog_client": catalog_client,
}),
)
}
#[delete("/admin/provision/instances/{container_name}")]
pub async fn admin_delete_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<DeleteInstanceRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name: String = path.into_inner();
if let Err(err) = remove_container(&container_name).await {
return map_provisioning_error("Failed to delete Postgres instance", err);
}
let payload: DeleteInstanceRequest =
body.as_ref()
.map(|value| value.0.clone())
.unwrap_or(DeleteInstanceRequest {
client_name: None,
remove_runtime_client: true,
remove_catalog_client: true,
});
let mut runtime_removed: bool = false;
let mut catalog_removed: bool = false;
if let Some(client_name) = payload.client_name.as_ref() {
if payload.remove_runtime_client {
state.pg_registry.remove_client(client_name);
runtime_removed = true;
}
if payload.remove_catalog_client {
let pool: Pool<Postgres> = match client_catalog_pool(state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match delete_athena_client(&pool, client_name).await {
Ok(Some(_)) => catalog_removed = true,
Ok(None) => catalog_removed = false,
Err(err) => {
return internal_error("Failed to delete catalog client", err.to_string());
}
}
}
}
api_success(
"Deleted Postgres instance",
json!({
"container_name": container_name,
"client_name": payload.client_name,
"runtime_removed": runtime_removed,
"catalog_removed": catalog_removed,
}),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_provision)
.service(admin_provision_status)
.service(admin_run_local_provision_pipeline)
.service(admin_list_local_cluster_databases)
.service(admin_create_local_cluster_database)
.service(admin_list_postgres_instances)
.service(admin_spin_up_postgres_instance)
.service(admin_provision_neon)
.service(admin_provision_railway)
.service(admin_provision_render)
.service(admin_get_postgres_instance_status)
.service(admin_delete_postgres_instance);
}