use actix_web::{
HttpRequest, HttpResponse, Responder, delete, get, patch, post,
web::{self, Data, Json, Path},
};
use serde::Deserialize;
use serde_json::json;
use sqlx::{Postgres, Row, Transaction};
use std::time::Instant;
use crate::AppState;
use crate::api::client_context::{
auth_pool, logging_pool, pool_for_client, registered_client_for, required_client_name,
};
use crate::api::gateway::auth::authorize_gateway_request;
use crate::api::management::audit::{
audit_table_exists, build_audit_entry, insert_database_audit_log,
};
use crate::api::management::sql::{
ALLOWED_COLUMN_DATA_TYPES, ALLOWED_EXTENSIONS, ALLOWED_INDEX_METHODS,
build_create_extension_statement, build_create_index_statement, build_create_table_statement,
build_drop_column_statement, build_drop_index_statement, build_drop_table_statement,
build_edit_table_statements,
};
use crate::api::management::types::{
CapabilityRight, CreateIndexRequest, CreateTableRequest, DropColumnRequest, DropIndexRequest,
DropTableRequest, EditTableRequest, InstallExtensionRequest, ManagementCapabilitiesResponse,
};
use crate::api::response::{
api_created, api_success, bad_request, conflict, forbidden, internal_error, not_found,
processed_error, service_unavailable,
};
use crate::data::clients::{SaveAthenaClientParams, upsert_athena_client};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::provisioning::{
NeonConnectionParams, NeonProjectCreateParams, ProvisioningError, RailwayConnectionParams,
RailwayPluginCreateParams, RailwayProjectCreateParams, RailwayServiceCreateParams,
RenderConnectionParams, RenderPostgresCreateParams, create_neon_project, create_railway_plugin,
create_railway_project, create_railway_service, create_render_postgres_service,
fetch_neon_connection_uri, fetch_railway_connection_uri,
fetch_railway_project_base_environment_id, fetch_render_connection_uri,
json_object_insert_if_missing, run_provision_sql,
};
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
pub mod audit;
pub mod sql;
pub mod types;
const MANAGEMENT_READ_RIGHT: &str = "management.read";
const MANAGEMENT_TABLES_WRITE_RIGHT: &str = "management.tables.write";
const MANAGEMENT_TABLES_DROP_RIGHT: &str = "management.tables.drop";
const MANAGEMENT_COLUMNS_DROP_RIGHT: &str = "management.columns.drop";
const MANAGEMENT_INDEXES_WRITE_RIGHT: &str = "management.indexes.write";
const MANAGEMENT_INDEXES_DROP_RIGHT: &str = "management.indexes.drop";
const MANAGEMENT_EXTENSIONS_WRITE_RIGHT: &str = "management.extensions.write";
const MANAGEMENT_PROVISION_WRITE_RIGHT: &str = "management.provision.write";
async fn authorize_management_request(
req: &HttpRequest,
app_state: &AppState,
client_name: &str,
required_rights: Vec<String>,
) -> Result<crate::api::gateway::auth::GatewayAuthOutcome, HttpResponse> {
auth_pool(app_state)?;
let auth = authorize_gateway_request(req, app_state, Some(client_name), required_rights).await;
if let Some(response) = auth.response {
return Err(response);
}
if auth.bound_client_name.as_deref() != Some(client_name) {
return Err(forbidden(
"Management API requires a client-bound API key",
format!(
"The presented API key must be bound to client '{}'.",
client_name
),
));
}
Ok(auth)
}
async fn table_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
table_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = $1
AND table_name = $2
) AS exists
"#,
)
.bind(schema_name)
.bind(table_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn column_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
table_name: &str,
column_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
) AS exists
"#,
)
.bind(schema_name)
.bind(table_name)
.bind(column_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn index_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
index_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM pg_indexes
WHERE schemaname = $1
AND indexname = $2
) AS exists
"#,
)
.bind(schema_name)
.bind(index_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn run_audited_statements(
tx: &mut Transaction<'_, Postgres>,
statements: &[String],
) -> Result<(), sqlx::Error> {
sqlx::query("SAVEPOINT athena_management_mutation")
.execute(&mut **tx)
.await?;
for statement in statements {
sqlx::query(statement).execute(&mut **tx).await?;
}
Ok(())
}
async fn rollback_to_savepoint(tx: &mut Transaction<'_, Postgres>) {
let _ = sqlx::query("ROLLBACK TO SAVEPOINT athena_management_mutation")
.execute(&mut **tx)
.await;
}
fn management_capabilities() -> ManagementCapabilitiesResponse {
ManagementCapabilitiesResponse {
backend: "postgresql",
retention_mutation_supported: false,
replication_mutation_supported: false,
supported_operations: vec![
"management.capabilities.read",
"management.tables.create",
"management.tables.edit",
"management.tables.drop",
"management.columns.drop",
"management.indexes.create",
"management.indexes.drop",
"management.extensions.install",
"management.provision.providers.neon",
"management.provision.providers.railway",
"management.provision.providers.render",
],
required_rights: vec![
CapabilityRight {
operation: "GET /management/capabilities",
required_right: MANAGEMENT_READ_RIGHT,
},
CapabilityRight {
operation: "POST /management/tables",
required_right: MANAGEMENT_TABLES_WRITE_RIGHT,
},
CapabilityRight {
operation: "PATCH /management/tables/{table_name}",
required_right: MANAGEMENT_TABLES_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/tables/{table_name}",
required_right: MANAGEMENT_TABLES_DROP_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/tables/{table_name}/columns/{column_name}",
required_right: MANAGEMENT_COLUMNS_DROP_RIGHT,
},
CapabilityRight {
operation: "POST /management/indexes",
required_right: MANAGEMENT_INDEXES_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/indexes/{index_name}",
required_right: MANAGEMENT_INDEXES_DROP_RIGHT,
},
CapabilityRight {
operation: "POST /management/extensions/install",
required_right: MANAGEMENT_EXTENSIONS_WRITE_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/providers/neon",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/providers/railway",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/providers/render",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
],
allowed_index_methods: ALLOWED_INDEX_METHODS.to_vec(),
allowed_column_data_types: ALLOWED_COLUMN_DATA_TYPES.to_vec(),
}
}
#[post("/management/extensions/install")]
async fn management_install_extension(
req: HttpRequest,
body: Json<InstallExtensionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_EXTENSIONS_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statement = match build_create_extension_statement(&body.extension_name, body.if_not_exists)
{
Ok(statement) => statement,
Err(err) => {
return bad_request(
"Invalid install extension request",
format!(
"{} Allowed extensions: {}.",
err,
ALLOWED_EXTENSIONS.join(", ")
),
);
}
};
let statements = vec![statement];
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
"public",
None,
"extension",
"install_extension",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_install_extension",
Some(&body.extension_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"install_extension",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.extension_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
"public",
None,
"extension",
"install_extension",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "installed": true, "extension_name": body.extension_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The extension was not installed because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_install_extension",
Some(&body.extension_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "extension_name": body.extension_name })),
);
app_state.metrics_state.record_management_mutation(
"install_extension",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Installed extension",
json!({ "extension_name": body.extension_name, "statements": statements }),
)
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementNeonProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
branch_id: Option<String>,
#[serde(default)]
database_name: Option<String>,
#[serde(default)]
role_name: Option<String>,
#[serde(default)]
endpoint_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_payload: Option<serde_json::Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementRailwayProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
environment_id: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
plugin_id: Option<String>,
#[serde(default)]
graphql_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_input: Option<serde_json::Value>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_input: Option<serde_json::Value>,
#[serde(default)]
create_plugin_if_missing: bool,
#[serde(default)]
plugin_name: Option<String>,
#[serde(default)]
plugin_create_input: Option<serde_json::Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementRenderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_payload: Option<serde_json::Value>,
#[serde(default)]
plan: Option<String>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
postgres_version: Option<String>,
#[serde(default)]
disk_size_gb: Option<u32>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
fn default_true() -> bool {
true
}
fn provisioning_error_response(context: &str, err: ProvisioningError) -> HttpResponse {
match err {
ProvisioningError::InvalidInput(message) => bad_request(context, message),
ProvisioningError::Conflict(message) => conflict(context, message),
ProvisioningError::Unavailable(message) => service_unavailable(context, message),
ProvisioningError::Execution(message) => internal_error(context, message),
}
}
fn required_provider_field(name: &str, value: Option<String>) -> Result<String, HttpResponse> {
let value = value.unwrap_or_default();
if value.trim().is_empty() {
return Err(bad_request(
"Missing required field",
format!(
"Provide '{}' or set 'connection_uri' to bypass provider API lookup.",
name
),
));
}
Ok(value)
}
async fn register_provider_client(
app_state: &AppState,
client_name: &str,
description: Option<String>,
pg_uri: &str,
metadata: serde_json::Value,
register_runtime: bool,
register_catalog: bool,
) -> Result<(bool, bool), HttpResponse> {
let mut runtime_registered = false;
let mut catalog_registered = false;
if register_runtime {
let target = crate::drivers::postgresql::sqlx_driver::ClientConnectionTarget {
client_name: client_name.to_string(),
source: "database".to_string(),
description: description.clone(),
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = app_state.pg_registry.upsert_client(target).await {
return Err(internal_error(
"Failed to register runtime client",
format!("Runtime client registration failed: {}", err),
));
}
runtime_registered = true;
}
if register_catalog {
let pool = logging_pool(app_state)?;
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: client_name.to_string(),
description,
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
source: "database".to_string(),
is_active: true,
is_frozen: false,
metadata,
},
)
.await
{
return Err(internal_error(
"Failed to register client in catalog",
err.to_string(),
));
}
catalog_registered = true;
}
Ok((runtime_registered, catalog_registered))
}
#[get("/management/capabilities")]
async fn management_capabilities_route(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_READ_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_capabilities",
None,
0,
actix_web::http::StatusCode::OK,
Some(json!({ "client_name": client_name })),
);
api_success(
"Loaded management capabilities",
json!(management_capabilities()),
)
}
#[post("/management/provision/providers/neon")]
async fn management_provision_neon(
req: HttpRequest,
body: Json<ManagementNeonProvisionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let mut project_id = body.project_id.clone();
let mut branch_id = body.branch_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_provider_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_neon_project(NeonProjectCreateParams {
api_key,
project_name: body
.project_name
.clone()
.or_else(|| Some(body.client_name.clone())),
project_payload: body.project_create_payload.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return provisioning_error_response("Failed to create Neon project", err),
};
project_id = Some(created.project_id);
if branch_id.is_none() {
branch_id = created.branch_id;
}
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_provider_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_neon_connection_uri(NeonConnectionParams {
api_key,
project_id,
branch_id: branch_id.clone(),
database_name: body.database_name.clone(),
role_name: body.role_name.clone(),
endpoint_id: body.endpoint_id.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return provisioning_error_response("Failed to fetch Neon connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response("Failed to provision Neon database", err);
}
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Neon database managed via Athena management API".to_string()));
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "management_api",
"provider": "neon",
"project_id": project_id,
"branch_id": branch_id,
"database_name": body.database_name,
"role_name": body.role_name,
"endpoint_id": body.endpoint_id,
"created_project": body.create_project_if_missing,
"requested_by_client": caller_client,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_provision_neon",
Some(&body.client_name),
0,
actix_web::http::StatusCode::OK,
Some(json!({
"provider": "neon",
"caller_client": caller_client,
"target_client": body.client_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_success(
"Provisioned Neon database",
json!({
"provider": "neon",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/management/provision/providers/railway")]
async fn management_provision_railway(
req: HttpRequest,
body: Json<ManagementRailwayProvisionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut project_id = body.project_id.clone();
let mut environment_id = body.environment_id.clone();
let mut service_id = body.service_id.clone();
let mut plugin_id = body.plugin_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_input = match json_object_insert_if_missing(
body.project_create_input.clone(),
"name",
serde_json::Value::String(
body.project_name
.clone()
.unwrap_or_else(|| format!("athena-{}", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway project_create_input", err);
}
};
let created = match create_railway_project(RailwayProjectCreateParams {
api_key,
project_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Railway project", err);
}
};
project_id = Some(created.project_id);
if environment_id.is_none() {
environment_id = created.base_environment_id;
}
}
if environment_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& let (Some(api_key), Some(project)) = (api_key_for_create.clone(), project_id.clone())
{
environment_id = match fetch_railway_project_base_environment_id(
&api_key,
&project,
body.graphql_url.as_deref(),
)
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response(
"Failed to resolve Railway base environment",
err,
);
}
};
}
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_input = match json_object_insert_if_missing(
body.service_create_input.clone(),
"projectId",
serde_json::Value::String(project),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway service_create_input", err);
}
};
let service_input = match json_object_insert_if_missing(
Some(service_input),
"name",
serde_json::Value::String(
body.service_name
.clone()
.unwrap_or_else(|| format!("{}-service", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway service_create_input", err);
}
};
let created = match create_railway_service(RailwayServiceCreateParams {
api_key,
service_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Railway service", err);
}
};
service_id = Some(created.service_id);
}
if plugin_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_plugin_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment = match required_provider_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let plugin_input = match json_object_insert_if_missing(
body.plugin_create_input.clone(),
"projectId",
serde_json::Value::String(project),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway plugin_create_input", err);
}
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"environmentId",
serde_json::Value::String(environment),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway plugin_create_input", err);
}
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"name",
serde_json::Value::String(
body.plugin_name
.clone()
.unwrap_or_else(|| "Postgres".to_string()),
),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway plugin_create_input", err);
}
};
let created = match create_railway_plugin(RailwayPluginCreateParams {
api_key,
plugin_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Railway plugin", err);
}
};
plugin_id = Some(created.plugin_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment_id = match required_provider_field("environment_id", environment_id.clone())
{
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_railway_connection_uri(RailwayConnectionParams {
api_key,
project_id,
environment_id,
service_id: service_id.clone(),
plugin_id: plugin_id.clone(),
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return provisioning_error_response("Failed to fetch Railway connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response("Failed to provision Railway database", err);
}
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Railway database managed via Athena management API".to_string()));
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "management_api",
"provider": "railway",
"project_id": project_id,
"environment_id": environment_id,
"service_id": service_id,
"plugin_id": plugin_id,
"created_project": body.create_project_if_missing,
"created_service": body.create_service_if_missing,
"created_plugin": body.create_plugin_if_missing,
"requested_by_client": caller_client,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_provision_railway",
Some(&body.client_name),
0,
actix_web::http::StatusCode::OK,
Some(json!({
"provider": "railway",
"caller_client": caller_client,
"target_client": body.client_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_success(
"Provisioned Railway database",
json!({
"provider": "railway",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/management/provision/providers/render")]
async fn management_provision_render(
req: HttpRequest,
body: Json<ManagementRenderProvisionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut service_id = body.service_id.clone();
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_render_postgres_service(RenderPostgresCreateParams {
api_key,
owner_id: body.owner_id.clone(),
service_name: body.service_name.clone(),
service_payload: body.service_create_payload.clone(),
plan: body.plan.clone(),
region: body.region.clone(),
postgres_version: body.postgres_version.clone(),
disk_size_gb: body.disk_size_gb,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Render service", err);
}
};
service_id = Some(created.service_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_id = match required_provider_field("service_id", service_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_render_connection_uri(RenderConnectionParams {
api_key,
service_id,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return provisioning_error_response("Failed to fetch Render connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response("Failed to provision Render database", err);
}
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Render database managed via Athena management API".to_string()));
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "management_api",
"provider": "render",
"service_id": service_id,
"owner_id": body.owner_id,
"service_name": body.service_name,
"plan": body.plan,
"region": body.region,
"postgres_version": body.postgres_version,
"disk_size_gb": body.disk_size_gb,
"created_service": body.create_service_if_missing,
"requested_by_client": caller_client,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_provision_render",
Some(&body.client_name),
0,
actix_web::http::StatusCode::OK,
Some(json!({
"provider": "render",
"caller_client": caller_client,
"target_client": body.client_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_success(
"Provisioned Render database",
json!({
"provider": "render",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/management/tables")]
async fn management_create_table(
req: HttpRequest,
body: Json<CreateTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_create_table_statement(&body) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid create table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &body.table_name).await {
Ok(true) => {
return conflict(
"Table already exists",
format!(
"Table '{}.{}' already exists.",
body.schema_name, body.table_name
),
);
}
Ok(false) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"table",
"create_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_table",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"create_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"table",
"create_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "applied": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not created because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_table",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::CREATED,
Some(json!({ "schema_name": body.schema_name, "table_name": body.table_name })),
);
app_state.metrics_state.record_management_mutation(
"create_table",
"success",
started.elapsed().as_secs_f64(),
);
api_created(
"Created table",
json!({
"schema_name": body.schema_name,
"table_name": body.table_name,
"statements": statements,
}),
)
}
#[patch("/management/tables/{table_name}")]
async fn management_edit_table(
req: HttpRequest,
path: Path<String>,
body: Json<EditTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements =
match build_edit_table_statements(&body.schema_name, &table_name, &body.operations) {
Ok(statements) => statements,
Err(err) => return bad_request("Invalid edit table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"edit_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_edit_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"edit_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"edit_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "applied": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not altered because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_edit_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "table_name": table_name })),
);
app_state.metrics_state.record_management_mutation(
"edit_table",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Updated table",
json!({ "table_name": table_name, "statements": statements }),
)
}
#[delete("/management/tables/{table_name}")]
async fn management_drop_table(
req: HttpRequest,
path: Path<String>,
body: Json<DropTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_table_statement(&body.schema_name, &table_name, body.cascade)
{
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"drop_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"drop_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"drop_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "table_name": table_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_table",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped table",
json!({ "table_name": table_name, "statements": statements }),
)
}
#[delete("/management/tables/{table_name}/columns/{column_name}")]
async fn management_drop_column(
req: HttpRequest,
path: Path<(String, String)>,
body: Json<DropColumnRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let (table_name, column_name) = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_COLUMNS_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_column_statement(
&body.schema_name,
&table_name,
&column_name,
body.cascade,
) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop column request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match column_exists(&mut tx, &body.schema_name, &table_name, &column_name).await {
Ok(false) => {
return not_found(
"Column not found",
format!(
"Column '{}.{}.{}' does not exist.",
body.schema_name, table_name, column_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"column",
"drop_column",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_column",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"drop_column",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"column",
"drop_column",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true, "column_name": column_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The column was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_column",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(
json!({ "schema_name": body.schema_name, "table_name": table_name, "column_name": column_name }),
),
);
app_state.metrics_state.record_management_mutation(
"drop_column",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped column",
json!({ "table_name": table_name, "column_name": column_name, "statements": statements }),
)
}
#[post("/management/indexes")]
async fn management_create_index(
req: HttpRequest,
body: Json<CreateIndexRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_INDEXES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let (index_name, statement) = match build_create_index_statement(&body) {
Ok(value) => value,
Err(err) => return bad_request("Invalid create index request", err),
};
let statements = vec![statement];
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &body.table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, body.table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
match index_exists(&mut tx, &body.schema_name, &index_name).await {
Ok(true) => {
return conflict(
"Index already exists",
format!(
"Index '{}.{}' already exists.",
body.schema_name, index_name
),
);
}
Ok(false) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"index",
"create_index",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_index",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string(), "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"create_index",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"index",
"create_index",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "created": true, "index_name": index_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The index was not created because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_index",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::CREATED,
Some(
json!({ "schema_name": body.schema_name, "table_name": body.table_name, "index_name": index_name }),
),
);
app_state.metrics_state.record_management_mutation(
"create_index",
"success",
started.elapsed().as_secs_f64(),
);
api_created(
"Created index",
json!({ "table_name": body.table_name, "index_name": index_name, "statements": statements }),
)
}
#[delete("/management/indexes/{index_name}")]
async fn management_drop_index(
req: HttpRequest,
path: Path<String>,
body: Json<DropIndexRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let index_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_INDEXES_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_index_statement(&body.schema_name, &index_name) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop index request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match index_exists(&mut tx, &body.schema_name, &index_name).await {
Ok(false) => {
return not_found(
"Index not found",
format!(
"Index '{}.{}' does not exist.",
body.schema_name, index_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&index_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
None,
"index",
"drop_index",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_index",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string(), "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_index",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&index_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
None,
"index",
"drop_index",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true, "index_name": index_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The index was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_index",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_index",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped index",
json!({ "index_name": index_name, "statements": statements }),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(management_capabilities_route)
.service(management_provision_neon)
.service(management_provision_railway)
.service(management_provision_render)
.service(management_create_table)
.service(management_edit_table)
.service(management_drop_table)
.service(management_drop_column)
.service(management_create_index)
.service(management_drop_index)
.service(management_install_extension);
}