use actix_web::{
HttpRequest, HttpResponse, delete, get, post,
web::{self, Data, Json, Path, Query},
};
use serde::Deserialize;
use serde_json::{Value, json};
use sqlx::Row;
use sqlx::postgres::PgPool;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};
use crate::data::clients::{
SaveAthenaClientParams, delete_athena_client, get_athena_client_by_name, upsert_athena_client,
};
use crate::drivers::postgresql::sqlx_driver::ClientConnectionTarget;
use crate::parser::resolve_postgres_uri;
use crate::provisioning::{
EXPECTED_TABLES, NeonConnectionParams, NeonProjectCreateParams, ProvisioningError,
RailwayConnectionParams, RailwayPluginCreateParams, RailwayProjectCreateParams,
RailwayServiceCreateParams, RenderConnectionParams, RenderPostgresCreateParams,
SpinUpPostgresParams, create_neon_project, create_railway_plugin, create_railway_project,
create_railway_service, create_render_postgres_service, fetch_neon_connection_uri,
fetch_railway_connection_uri, fetch_railway_project_base_environment_id,
fetch_render_connection_uri, inspect_container, json_object_insert_if_missing,
remove_container, run_provision_sql, spin_up_postgres_instance,
};
#[derive(Debug, Deserialize)]
struct ProvisionRequest {
#[serde(default)]
uri: Option<String>,
#[serde(default)]
client_name: Option<String>,
#[serde(default)]
register: bool,
#[serde(default)]
register_name: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ProvisionStatusQuery {
client_name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct SpinUpInstanceRequest {
client_name: String,
#[serde(default)]
container_name: Option<String>,
#[serde(default)]
image: Option<String>,
#[serde(default)]
host: Option<String>,
#[serde(default)]
host_port: Option<u16>,
#[serde(default)]
db_name: Option<String>,
#[serde(default)]
username: Option<String>,
#[serde(default)]
password: Option<String>,
#[serde(default)]
startup_timeout_secs: Option<u64>,
#[serde(default)]
reuse_existing: bool,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
struct DeleteInstanceRequest {
#[serde(default)]
client_name: Option<String>,
#[serde(default = "default_true")]
remove_runtime_client: bool,
#[serde(default = "default_true")]
remove_catalog_client: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct InstanceStatusQuery {
#[serde(default)]
client_name: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct NeonProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
branch_id: Option<String>,
#[serde(default)]
database_name: Option<String>,
#[serde(default)]
role_name: Option<String>,
#[serde(default)]
endpoint_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_payload: Option<Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct RailwayProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
environment_id: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
plugin_id: Option<String>,
#[serde(default)]
graphql_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_input: Option<Value>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_input: Option<Value>,
#[serde(default)]
create_plugin_if_missing: bool,
#[serde(default)]
plugin_name: Option<String>,
#[serde(default)]
plugin_create_input: Option<Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct RenderProviderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_payload: Option<Value>,
#[serde(default)]
plan: Option<String>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
postgres_version: Option<String>,
#[serde(default)]
disk_size_gb: Option<u32>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
fn default_true() -> bool {
true
}
fn map_provisioning_error(context: &str, err: ProvisioningError) -> HttpResponse {
match err {
ProvisioningError::InvalidInput(message) => bad_request(context, message),
ProvisioningError::Conflict(message) => conflict(context, message),
ProvisioningError::Unavailable(message) => service_unavailable(context, message),
ProvisioningError::Execution(message) => internal_error(context, message),
}
}
fn resolve_uri(state: &AppState, req: &ProvisionRequest) -> Result<String, HttpResponse> {
match (&req.uri, &req.client_name) {
(Some(uri), None) => Ok(uri.clone()),
(None, Some(client_name)) => {
let registered = state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("No Postgres client named '{}' is registered.", client_name),
)
})?;
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(registered.pg_uri)
.ok_or_else(|| {
bad_request(
"Client URI unavailable",
format!("No Postgres URI is available for client '{}'.", client_name),
)
})
}
(Some(_), Some(_)) => Err(bad_request(
"Ambiguous target",
"Provide either 'uri' or 'client_name', not both.",
)),
(None, None) => Err(bad_request(
"Missing target",
"Provide either 'uri' (direct Postgres URI) or 'client_name' (registered client).",
)),
}
}
fn client_catalog_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Err(service_unavailable(
"Client catalog unavailable",
"No athena_logging client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client catalog unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})
}
fn required_field(name: &str, value: Option<String>) -> Result<String, HttpResponse> {
let value = value.unwrap_or_default();
if value.trim().is_empty() {
return Err(bad_request(
"Missing required field",
format!(
"Provide '{}' or set 'connection_uri' to bypass provider API lookup.",
name
),
));
}
Ok(value)
}
async fn register_provisioned_client(
state: &AppState,
client_name: &str,
description: Option<String>,
pg_uri: &str,
metadata: serde_json::Value,
register_runtime: bool,
register_catalog: bool,
) -> Result<(bool, bool), HttpResponse> {
let mut runtime_registered = false;
let mut catalog_registered = false;
if register_runtime {
let target = ClientConnectionTarget {
client_name: client_name.to_string(),
source: "database".to_string(),
description: description.clone(),
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
return Err(internal_error(
"Failed to register runtime client",
format!("Runtime client registration failed: {}", err),
));
}
runtime_registered = true;
}
if register_catalog {
let pool: sqlx::Pool<sqlx::Postgres> = client_catalog_pool(state)?;
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: client_name.to_string(),
description,
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
source: "database".to_string(),
is_active: true,
is_frozen: false,
metadata,
},
)
.await
{
return Err(internal_error(
"Failed to register client in catalog",
err.to_string(),
));
}
catalog_registered = true;
}
Ok((runtime_registered, catalog_registered))
}
#[post("/admin/provision")]
pub async fn admin_provision(
req: HttpRequest,
state: Data<AppState>,
body: Json<ProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pg_uri = match resolve_uri(state.get_ref(), &body) {
Ok(uri) => uri,
Err(resp) => return resp,
};
let statement_count = match run_provision_sql(&pg_uri).await {
Ok(n) => n,
Err(err) => return map_provisioning_error("Provisioning failed", err),
};
let registered_name: Option<String> = if body.register && body.uri.is_some() {
let name: String = body
.register_name
.clone()
.unwrap_or_else(|| "provisioned".to_string());
let target = ClientConnectionTarget {
client_name: name.clone(),
source: "database".to_string(),
description: Some("Provisioned via API".to_string()),
pg_uri: Some(pg_uri.clone()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = state.pg_registry.upsert_client(target).await {
tracing::warn!(
client = %name,
error = %err,
"provisioned but failed to register client"
);
}
Some(name)
} else {
None
};
let client_label = body
.client_name
.clone()
.or(registered_name.clone())
.unwrap_or_else(|| "direct".to_string());
api_success(
format!("Provisioned {} statements", statement_count),
json!({
"statements_executed": statement_count,
"client": client_label,
"registered": registered_name,
"tables": EXPECTED_TABLES,
}),
)
}
#[get("/admin/provision/status")]
pub async fn admin_provision_status(
req: HttpRequest,
state: Data<AppState>,
query: Query<ProvisionStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = state
.pg_registry
.get_pool(&query.client_name)
.ok_or_else(|| {
service_unavailable(
"Client unavailable",
format!("Postgres client '{}' is not connected.", query.client_name),
)
});
let pool = match pool {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows = match sqlx::query(
"SELECT table_name::text FROM information_schema.tables WHERE table_schema = 'public'",
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => return internal_error("Failed to query tables", err.to_string()),
};
let existing_tables: Vec<String> = rows
.iter()
.filter_map(|row| row.try_get::<String, _>("table_name").ok())
.collect();
let present: Vec<&str> = EXPECTED_TABLES
.iter()
.filter(|table| existing_tables.iter().any(|existing| existing == **table))
.copied()
.collect();
let missing: Vec<&str> = EXPECTED_TABLES
.iter()
.filter(|table| !existing_tables.iter().any(|existing| existing == **table))
.copied()
.collect();
api_success(
format!("Provisioning status for '{}'", query.client_name),
json!({
"client_name": query.client_name,
"provisioned": missing.is_empty(),
"present_tables": present,
"missing_tables": missing,
}),
)
}
#[post("/admin/provision/instances")]
pub async fn admin_spin_up_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
body: Json<SpinUpInstanceRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let spin_result = match spin_up_postgres_instance(SpinUpPostgresParams {
client_name: body.client_name.clone(),
container_name: body.container_name.clone(),
image: body.image.clone(),
host: body.host.clone(),
host_port: body.host_port,
db_name: body.db_name.clone(),
username: body.username.clone(),
password: body.password.clone(),
startup_timeout_secs: body.startup_timeout_secs,
reuse_existing: body.reuse_existing,
})
.await
{
Ok(result) => result,
Err(err) => return map_provisioning_error("Failed to spin up Postgres instance", err),
};
let mut statements_executed: Option<usize> = None;
if body.provision_schema {
statements_executed = Some(match run_provision_sql(&spin_result.pg_uri).await {
Ok(total) => total,
Err(err) => {
return map_provisioning_error("Failed to provision Postgres instance", err);
}
});
}
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&spin_result.client_name,
Some(format!(
"Managed Postgres instance ({})",
spin_result.container_name
)),
&spin_result.pg_uri,
json!({
"managed_by": "provision_api",
"provider": "docker",
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Postgres instance is ready",
json!({
"instance": {
"client_name": spin_result.client_name,
"container_name": spin_result.container_name,
"image": spin_result.image,
"host": spin_result.host,
"host_port": spin_result.host_port,
"db_name": spin_result.db_name,
"username": spin_result.username,
"password": spin_result.password,
"pg_uri": spin_result.pg_uri,
"created_new_container": spin_result.created_new_container,
"reused_existing_container": spin_result.reused_existing_container,
"wait_ready_ms": spin_result.wait_ready_ms,
},
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/neon")]
pub async fn admin_provision_neon(
req: HttpRequest,
state: Data<AppState>,
body: Json<NeonProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let mut project_id = body.project_id.clone();
let mut branch_id = body.branch_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_neon_project(NeonProjectCreateParams {
api_key,
project_name: body
.project_name
.clone()
.or_else(|| Some(body.client_name.clone())),
project_payload: body.project_create_payload.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Neon project", err),
};
project_id = Some(created.project_id);
if branch_id.is_none() {
branch_id = created.branch_id;
}
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_neon_connection_uri(NeonConnectionParams {
api_key,
project_id,
branch_id: branch_id.clone(),
database_name: body.database_name.clone(),
role_name: body.role_name.clone(),
endpoint_id: body.endpoint_id.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => return map_provisioning_error("Failed to fetch Neon connection URI", err),
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Neon database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Neon database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "neon",
"project_id": project_id,
"branch_id": branch_id,
"database_name": body.database_name,
"role_name": body.role_name,
"endpoint_id": body.endpoint_id,
"created_project": body.create_project_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Neon database",
json!({
"provider": "neon",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/railway")]
pub async fn admin_provision_railway(
req: HttpRequest,
state: Data<AppState>,
body: Json<RailwayProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut project_id = body.project_id.clone();
let mut environment_id = body.environment_id.clone();
let mut service_id = body.service_id.clone();
let mut plugin_id = body.plugin_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_input = match json_object_insert_if_missing(
body.project_create_input.clone(),
"name",
Value::String(
body.project_name
.clone()
.unwrap_or_else(|| format!("athena-{}", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway project_create_input", err),
};
let created = match create_railway_project(RailwayProjectCreateParams {
api_key,
project_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway project", err),
};
project_id = Some(created.project_id);
if environment_id.is_none() {
environment_id = created.base_environment_id;
}
}
if environment_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& let (Some(api_key), Some(project)) = (api_key_for_create.clone(), project_id.clone())
{
environment_id = match fetch_railway_project_base_environment_id(
&api_key,
&project,
body.graphql_url.as_deref(),
)
.await
{
Ok(value) => value,
Err(err) => {
return map_provisioning_error("Failed to resolve Railway base environment", err);
}
};
}
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_input = match json_object_insert_if_missing(
body.service_create_input.clone(),
"projectId",
Value::String(project),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway service_create_input", err),
};
let service_input = match json_object_insert_if_missing(
Some(service_input),
"name",
Value::String(
body.service_name
.clone()
.unwrap_or_else(|| format!("{}-service", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway service_create_input", err),
};
let created = match create_railway_service(RailwayServiceCreateParams {
api_key,
service_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway service", err),
};
service_id = Some(created.service_id);
}
if plugin_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_plugin_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment = match required_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let plugin_input = match json_object_insert_if_missing(
body.plugin_create_input.clone(),
"projectId",
Value::String(project),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"environmentId",
Value::String(environment),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"name",
Value::String(
body.plugin_name
.clone()
.unwrap_or_else(|| "Postgres".to_string()),
),
) {
Ok(value) => value,
Err(err) => return map_provisioning_error("Invalid Railway plugin_create_input", err),
};
let created = match create_railway_plugin(RailwayPluginCreateParams {
api_key,
plugin_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Railway plugin", err),
};
plugin_id = Some(created.plugin_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment_id = match required_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_railway_connection_uri(RailwayConnectionParams {
api_key,
project_id,
environment_id,
service_id: service_id.clone(),
plugin_id: plugin_id.clone(),
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return map_provisioning_error("Failed to fetch Railway connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Railway database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Railway database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "railway",
"project_id": project_id,
"environment_id": environment_id,
"service_id": service_id,
"plugin_id": plugin_id,
"created_project": body.create_project_if_missing,
"created_service": body.create_service_if_missing,
"created_plugin": body.create_plugin_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Railway database",
json!({
"provider": "railway",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/admin/provision/providers/render")]
pub async fn admin_provision_render(
req: HttpRequest,
state: Data<AppState>,
body: Json<RenderProviderProvisionRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut service_id = body.service_id.clone();
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_render_postgres_service(RenderPostgresCreateParams {
api_key,
owner_id: body.owner_id.clone(),
service_name: body.service_name.clone(),
service_payload: body.service_create_payload.clone(),
plan: body.plan.clone(),
region: body.region.clone(),
postgres_version: body.postgres_version.clone(),
disk_size_gb: body.disk_size_gb,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return map_provisioning_error("Failed to create Render service", err),
};
service_id = Some(created.service_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_id = match required_field("service_id", service_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_render_connection_uri(RenderConnectionParams {
api_key,
service_id,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return map_provisioning_error("Failed to fetch Render connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => return map_provisioning_error("Failed to provision Render database", err),
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Render Postgres database managed via Athena".to_string()));
let (runtime_registered, catalog_registered) = match register_provisioned_client(
state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "provision_api",
"provider": "render",
"service_id": service_id,
"owner_id": body.owner_id,
"service_name": body.service_name,
"plan": body.plan,
"region": body.region,
"postgres_version": body.postgres_version,
"disk_size_gb": body.disk_size_gb,
"created_service": body.create_service_if_missing,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
api_success(
"Provisioned Render database",
json!({
"provider": "render",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[get("/admin/provision/instances/{container_name}")]
pub async fn admin_get_postgres_instance_status(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
query: Query<InstanceStatusQuery>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name = path.into_inner();
let status = match inspect_container(&container_name).await {
Ok(status) => status,
Err(err) => return map_provisioning_error("Failed to inspect Postgres instance", err),
};
if !status.exists {
return not_found(
"Instance not found",
format!("No Postgres container named '{}' exists.", container_name),
);
}
let runtime_client = query
.client_name
.as_ref()
.and_then(|client_name| state.pg_registry.registered_client(client_name));
let mut catalog_client = None;
if let Some(client_name) = query.client_name.as_ref()
&& let Ok(pool) = client_catalog_pool(state.get_ref())
{
catalog_client = match get_athena_client_by_name(&pool, client_name).await {
Ok(value) => value,
Err(err) => return internal_error("Failed to load catalog client", err.to_string()),
};
}
api_success(
"Loaded Postgres instance status",
json!({
"instance": status,
"client_name": query.client_name,
"runtime_client": runtime_client,
"catalog_client": catalog_client,
}),
)
}
#[delete("/admin/provision/instances/{container_name}")]
pub async fn admin_delete_postgres_instance(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Option<Json<DeleteInstanceRequest>>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let container_name = path.into_inner();
if let Err(err) = remove_container(&container_name).await {
return map_provisioning_error("Failed to delete Postgres instance", err);
}
let payload = body
.as_ref()
.map(|value| value.0.clone())
.unwrap_or(DeleteInstanceRequest {
client_name: None,
remove_runtime_client: true,
remove_catalog_client: true,
});
let mut runtime_removed = false;
let mut catalog_removed = false;
if let Some(client_name) = payload.client_name.as_ref() {
if payload.remove_runtime_client {
state.pg_registry.remove_client(client_name);
runtime_removed = true;
}
if payload.remove_catalog_client {
let pool = match client_catalog_pool(state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
match delete_athena_client(&pool, client_name).await {
Ok(Some(_)) => catalog_removed = true,
Ok(None) => catalog_removed = false,
Err(err) => {
return internal_error("Failed to delete catalog client", err.to_string());
}
}
}
}
api_success(
"Deleted Postgres instance",
json!({
"container_name": container_name,
"client_name": payload.client_name,
"runtime_removed": runtime_removed,
"catalog_removed": catalog_removed,
}),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_provision)
.service(admin_provision_status)
.service(admin_spin_up_postgres_instance)
.service(admin_provision_neon)
.service(admin_provision_railway)
.service(admin_provision_render)
.service(admin_get_postgres_instance_status)
.service(admin_delete_postgres_instance);
}