use actix_web::{
HttpRequest, HttpResponse, Responder, delete, get, patch, post, put,
web::{self, Data, Json, Path},
};
use serde::Deserialize;
use serde_json::{Value, json};
use sqlx::{Postgres, Row, Transaction};
use std::collections::HashSet;
use std::time::Instant;
use crate::AppState;
use crate::api::client_context::{
auth_pool, logging_pool, pool_for_client, registered_client_for, required_client_name,
};
use crate::api::gateway::auth::authorize_gateway_request;
use crate::api::management::audit::{
audit_table_exists, build_audit_entry, build_function_ddl_audit_entry,
function_ddl_audit_table_exists, insert_database_audit_log, insert_function_ddl_audit_log,
};
use crate::api::management::sql::{
ALLOWED_COLUMN_DATA_TYPES, ALLOWED_EXTENSIONS, ALLOWED_INDEX_METHODS, ParsedFunctionDdlTarget,
build_create_extension_statement, build_create_index_statement, build_create_table_statement,
build_drop_column_statement, build_drop_index_statement, build_drop_table_statement,
build_edit_table_statements, validate_create_or_replace_function_ddl,
validate_function_arg_types, validate_identifier,
};
use crate::api::management::types::{
CapabilityRight, CreateIndexRequest, CreateTableRequest, DropColumnRequest,
DropFunctionRequest, DropIndexRequest, DropTableRequest, EditTableRequest,
InstallExtensionRequest, ManagementCapabilitiesResponse, ManagementFunctionInfo,
ManagementFunctionsQuery, UpsertFunctionRequest,
};
use crate::api::response::{
api_created, api_success, bad_request, conflict, forbidden, internal_error, not_found,
processed_error, service_unavailable,
};
use crate::data::clients::{SaveAthenaClientParams, list_athena_clients, upsert_athena_client};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::parser::resolve_postgres_uri;
use crate::provisioning::{
LocalClusterCreateDatabaseParams, LocalClusterDatabaseCreateOptions, NeonConnectionParams,
NeonProjectCreateParams, ProvisioningError, RailwayConnectionParams, RailwayPluginCreateParams,
RailwayProjectCreateParams, RailwayServiceCreateParams, RenderConnectionParams,
RenderPostgresCreateParams, create_neon_project, create_postgres_database,
create_railway_plugin, create_railway_project, create_railway_service,
create_render_postgres_service, fetch_neon_connection_uri, fetch_railway_connection_uri,
fetch_railway_project_base_environment_id, fetch_render_connection_uri,
json_object_insert_if_missing, list_postgres_databases, postgres_uri_database_name,
postgres_uri_fingerprint, replace_uri_database_name, run_provision_sql,
};
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
pub mod audit;
pub mod sql;
pub mod types;
const MANAGEMENT_READ_RIGHT: &str = "management.read";
const MANAGEMENT_TABLES_WRITE_RIGHT: &str = "management.tables.write";
const MANAGEMENT_TABLES_DROP_RIGHT: &str = "management.tables.drop";
const MANAGEMENT_COLUMNS_DROP_RIGHT: &str = "management.columns.drop";
const MANAGEMENT_INDEXES_WRITE_RIGHT: &str = "management.indexes.write";
const MANAGEMENT_INDEXES_DROP_RIGHT: &str = "management.indexes.drop";
const MANAGEMENT_EXTENSIONS_WRITE_RIGHT: &str = "management.extensions.write";
const MANAGEMENT_PROVISION_WRITE_RIGHT: &str = "management.provision.write";
const MANAGEMENT_FUNCTIONS_READ_RIGHT: &str = "management.functions.read";
const MANAGEMENT_FUNCTIONS_WRITE_RIGHT: &str = "management.functions.write";
const MANAGEMENT_FUNCTIONS_DROP_RIGHT: &str = "management.functions.drop";
async fn authorize_management_request(
req: &HttpRequest,
app_state: &AppState,
client_name: &str,
required_rights: Vec<String>,
) -> Result<crate::api::gateway::auth::GatewayAuthOutcome, HttpResponse> {
auth_pool(app_state)?;
let auth = authorize_gateway_request(req, app_state, Some(client_name), required_rights).await;
if let Some(response) = auth.response {
return Err(response);
}
if auth.bound_client_name.as_deref() != Some(client_name) {
return Err(forbidden(
"Management API requires a client-bound API key",
format!(
"The presented API key must be bound to client '{}'.",
client_name
),
));
}
Ok(auth)
}
async fn table_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
table_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = $1
AND table_name = $2
) AS exists
"#,
)
.bind(schema_name)
.bind(table_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn column_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
table_name: &str,
column_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
) AS exists
"#,
)
.bind(schema_name)
.bind(table_name)
.bind(column_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn index_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
index_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM pg_indexes
WHERE schemaname = $1
AND indexname = $2
) AS exists
"#,
)
.bind(schema_name)
.bind(index_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn list_function_definitions(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
function_name: &str,
) -> Result<Vec<Value>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
p.oid::text AS function_oid,
pg_get_function_identity_arguments(p.oid) AS signature,
pg_get_function_result(p.oid) AS return_type,
l.lanname AS language,
p.proretset AS is_set_returning,
pg_get_functiondef(p.oid) AS definition
FROM pg_proc p
JOIN pg_namespace n ON n.oid = p.pronamespace
JOIN pg_language l ON l.oid = p.prolang
WHERE p.prokind = 'f'
AND n.nspname = $1
AND p.proname = $2
ORDER BY p.oid ASC
"#,
)
.bind(schema_name)
.bind(function_name)
.fetch_all(&mut **tx)
.await?;
Ok(rows
.into_iter()
.map(|row| {
json!({
"function_oid": row.try_get::<String, _>("function_oid").unwrap_or_default(),
"signature": row.try_get::<String, _>("signature").unwrap_or_default(),
"return_type": row.try_get::<String, _>("return_type").unwrap_or_default(),
"language": row.try_get::<String, _>("language").unwrap_or_default(),
"is_set_returning": row.try_get::<bool, _>("is_set_returning").unwrap_or(false),
"definition": row.try_get::<String, _>("definition").unwrap_or_default(),
})
})
.collect())
}
fn normalize_identity_signature(signature: &str) -> String {
signature
.split_whitespace()
.collect::<Vec<&str>>()
.join(" ")
.trim()
.to_string()
}
fn function_signature(value: &Value) -> Option<String> {
value
.get("signature")
.and_then(Value::as_str)
.map(str::to_string)
}
fn resolve_upserted_function_signature(
target: &ParsedFunctionDdlTarget,
previous_defs: &[Value],
next_defs: &[Value],
) -> Option<String> {
let expected = normalize_identity_signature(&target.identity_signature);
if let Some(matched) = next_defs.iter().find_map(|value| {
let signature = function_signature(value)?;
if normalize_identity_signature(&signature) == expected {
Some(signature)
} else {
None
}
}) {
return Some(matched);
}
if next_defs.len() == 1 {
return function_signature(&next_defs[0]);
}
let previous_signatures = previous_defs
.iter()
.filter_map(function_signature)
.map(|signature| normalize_identity_signature(&signature))
.collect::<HashSet<String>>();
let mut new_signatures = next_defs
.iter()
.filter_map(function_signature)
.filter(|signature| !previous_signatures.contains(&normalize_identity_signature(signature)))
.collect::<Vec<String>>();
if new_signatures.len() == 1 {
return new_signatures.pop();
}
next_defs.iter().find_map(function_signature)
}
async fn run_audited_statements(
tx: &mut Transaction<'_, Postgres>,
statements: &[String],
) -> Result<(), sqlx::Error> {
sqlx::query("SAVEPOINT athena_management_mutation")
.execute(&mut **tx)
.await?;
for statement in statements {
sqlx::query(statement).execute(&mut **tx).await?;
}
Ok(())
}
async fn rollback_to_savepoint(tx: &mut Transaction<'_, Postgres>) {
let _ = sqlx::query("ROLLBACK TO SAVEPOINT athena_management_mutation")
.execute(&mut **tx)
.await;
}
fn management_capabilities() -> ManagementCapabilitiesResponse {
ManagementCapabilitiesResponse {
backend: "postgresql",
retention_mutation_supported: false,
replication_mutation_supported: false,
supported_operations: vec![
"management.capabilities.read",
"management.tables.create",
"management.tables.edit",
"management.tables.drop",
"management.columns.drop",
"management.indexes.create",
"management.indexes.drop",
"management.extensions.install",
"management.functions.list",
"management.functions.upsert",
"management.functions.drop",
"management.provision.providers.neon",
"management.provision.providers.railway",
"management.provision.providers.render",
"management.provision.local.databases.list",
"management.provision.local.databases.create",
],
required_rights: vec![
CapabilityRight {
operation: "GET /management/capabilities",
required_right: MANAGEMENT_READ_RIGHT,
},
CapabilityRight {
operation: "POST /management/tables",
required_right: MANAGEMENT_TABLES_WRITE_RIGHT,
},
CapabilityRight {
operation: "PATCH /management/tables/{table_name}",
required_right: MANAGEMENT_TABLES_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/tables/{table_name}",
required_right: MANAGEMENT_TABLES_DROP_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/tables/{table_name}/columns/{column_name}",
required_right: MANAGEMENT_COLUMNS_DROP_RIGHT,
},
CapabilityRight {
operation: "POST /management/indexes",
required_right: MANAGEMENT_INDEXES_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/indexes/{index_name}",
required_right: MANAGEMENT_INDEXES_DROP_RIGHT,
},
CapabilityRight {
operation: "POST /management/extensions/install",
required_right: MANAGEMENT_EXTENSIONS_WRITE_RIGHT,
},
CapabilityRight {
operation: "GET /management/functions",
required_right: MANAGEMENT_FUNCTIONS_READ_RIGHT,
},
CapabilityRight {
operation: "PUT /management/functions",
required_right: MANAGEMENT_FUNCTIONS_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/functions",
required_right: MANAGEMENT_FUNCTIONS_DROP_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/providers/neon",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/providers/railway",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/providers/render",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
CapabilityRight {
operation: "GET /management/provision/local/databases",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
CapabilityRight {
operation: "POST /management/provision/local/databases",
required_right: MANAGEMENT_PROVISION_WRITE_RIGHT,
},
],
allowed_index_methods: ALLOWED_INDEX_METHODS.to_vec(),
allowed_column_data_types: ALLOWED_COLUMN_DATA_TYPES.to_vec(),
}
}
#[post("/management/extensions/install")]
async fn management_install_extension(
req: HttpRequest,
body: Json<InstallExtensionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_EXTENSIONS_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statement = match build_create_extension_statement(&body.extension_name, body.if_not_exists)
{
Ok(statement) => statement,
Err(err) => {
return bad_request(
"Invalid install extension request",
format!(
"{} Allowed extensions: {}.",
err,
ALLOWED_EXTENSIONS.join(", ")
),
);
}
};
let statements = vec![statement];
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
"public",
None,
"extension",
"install_extension",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_install_extension",
Some(&body.extension_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"install_extension",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.extension_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
"public",
None,
"extension",
"install_extension",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "installed": true, "extension_name": body.extension_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The extension was not installed because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_install_extension",
Some(&body.extension_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "extension_name": body.extension_name })),
);
app_state.metrics_state.record_management_mutation(
"install_extension",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Installed extension",
json!({ "extension_name": body.extension_name, "statements": statements }),
)
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementNeonProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
branch_id: Option<String>,
#[serde(default)]
database_name: Option<String>,
#[serde(default)]
role_name: Option<String>,
#[serde(default)]
endpoint_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_payload: Option<serde_json::Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementRailwayProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
project_id: Option<String>,
#[serde(default)]
environment_id: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
plugin_id: Option<String>,
#[serde(default)]
graphql_url: Option<String>,
#[serde(default)]
create_project_if_missing: bool,
#[serde(default)]
project_name: Option<String>,
#[serde(default)]
project_create_input: Option<serde_json::Value>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_input: Option<serde_json::Value>,
#[serde(default)]
create_plugin_if_missing: bool,
#[serde(default)]
plugin_name: Option<String>,
#[serde(default)]
plugin_create_input: Option<serde_json::Value>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementRenderProvisionRequest {
client_name: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
connection_uri: Option<String>,
#[serde(default)]
api_key: Option<String>,
#[serde(default)]
service_id: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
api_base_url: Option<String>,
#[serde(default)]
create_service_if_missing: bool,
#[serde(default)]
service_name: Option<String>,
#[serde(default)]
service_create_payload: Option<serde_json::Value>,
#[serde(default)]
plan: Option<String>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
postgres_version: Option<String>,
#[serde(default)]
disk_size_gb: Option<u32>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ManagementLocalClusterCreateDatabaseRequest {
database_name: String,
#[serde(default)]
owner: Option<String>,
#[serde(default)]
template: Option<String>,
#[serde(default)]
encoding: Option<String>,
#[serde(default)]
lc_collate: Option<String>,
#[serde(default)]
lc_ctype: Option<String>,
#[serde(default)]
tablespace: Option<String>,
#[serde(default)]
register_name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(default = "default_true")]
provision_schema: bool,
#[serde(default = "default_true")]
register_runtime: bool,
#[serde(default = "default_true")]
register_catalog: bool,
}
fn default_true() -> bool {
true
}
fn provisioning_error_response(context: &str, err: ProvisioningError) -> HttpResponse {
match err {
ProvisioningError::InvalidInput(message) => bad_request(context, message),
ProvisioningError::Conflict(message) => conflict(context, message),
ProvisioningError::Unavailable(message) => service_unavailable(context, message),
ProvisioningError::Execution(message) => internal_error(context, message),
}
}
fn required_provider_field(name: &str, value: Option<String>) -> Result<String, HttpResponse> {
let value = value.unwrap_or_default();
if value.trim().is_empty() {
return Err(bad_request(
"Missing required field",
format!(
"Provide '{}' or set 'connection_uri' to bypass provider API lookup.",
name
),
));
}
Ok(value)
}
fn resolve_registered_client_uri(
app_state: &AppState,
client_name: &str,
) -> Result<String, HttpResponse> {
let registered = app_state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("No Postgres client named '{}' is registered.", client_name),
)
})?;
registered
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(registered.pg_uri)
.ok_or_else(|| {
bad_request(
"Client URI unavailable",
format!("No Postgres URI is available for client '{}'.", client_name),
)
})
}
async fn load_athena_managed_databases(
app_state: &AppState,
server_pg_uri: &str,
) -> Result<Vec<serde_json::Value>, HttpResponse> {
let target_fingerprint = postgres_uri_fingerprint(server_pg_uri).ok_or_else(|| {
bad_request(
"Client URI unavailable",
"Failed to derive a server fingerprint for the selected client URI.",
)
})?;
let pool = logging_pool(app_state)?;
let clients = list_athena_clients(&pool)
.await
.map_err(|err| internal_error("Failed to load catalog clients", err.to_string()))?;
let mut seen: HashSet<String> = HashSet::new();
let mut managed = Vec::new();
for client in clients {
let candidate_uri = client
.config_uri_template
.as_deref()
.map(resolve_postgres_uri)
.or(client.pg_uri.clone());
let Some(candidate_uri) = candidate_uri else {
continue;
};
if postgres_uri_fingerprint(&candidate_uri).as_deref() != Some(target_fingerprint.as_str())
{
continue;
}
let Some(database_name) = postgres_uri_database_name(&candidate_uri) else {
continue;
};
let dedupe = format!("{}:{}", client.client_name, database_name);
if !seen.insert(dedupe) {
continue;
}
managed.push(json!({
"client_name": client.client_name,
"database_name": database_name,
"source": client.source,
"is_active": client.is_active,
"is_frozen": client.is_frozen,
}));
}
managed.sort_by(|a, b| {
let a_name = a
.get("database_name")
.and_then(serde_json::Value::as_str)
.unwrap_or_default();
let b_name = b
.get("database_name")
.and_then(serde_json::Value::as_str)
.unwrap_or_default();
a_name.cmp(b_name)
});
Ok(managed)
}
async fn register_provider_client(
app_state: &AppState,
client_name: &str,
description: Option<String>,
pg_uri: &str,
metadata: serde_json::Value,
register_runtime: bool,
register_catalog: bool,
) -> Result<(bool, bool), HttpResponse> {
let mut runtime_registered = false;
let mut catalog_registered = false;
if register_runtime {
let target = crate::drivers::postgresql::sqlx_driver::ClientConnectionTarget {
client_name: client_name.to_string(),
source: "database".to_string(),
description: description.clone(),
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
if let Err(err) = app_state.pg_registry.upsert_client(target).await {
return Err(internal_error(
"Failed to register runtime client",
format!("Runtime client registration failed: {}", err),
));
}
runtime_registered = true;
}
if register_catalog {
let pool = logging_pool(app_state)?;
if let Err(err) = upsert_athena_client(
&pool,
SaveAthenaClientParams {
client_name: client_name.to_string(),
description,
pg_uri: Some(pg_uri.to_string()),
pg_uri_env_var: None,
config_uri_template: None,
source: "database".to_string(),
is_active: true,
is_frozen: false,
metadata,
},
)
.await
{
return Err(internal_error(
"Failed to register client in catalog",
err.to_string(),
));
}
catalog_registered = true;
}
Ok((runtime_registered, catalog_registered))
}
#[get("/management/capabilities")]
async fn management_capabilities_route(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_READ_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_capabilities",
None,
0,
actix_web::http::StatusCode::OK,
Some(json!({ "client_name": client_name })),
);
api_success(
"Loaded management capabilities",
json!(management_capabilities()),
)
}
#[get("/management/provision/local/databases")]
async fn management_list_local_cluster_databases(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &caller_client) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let server_pg_uri = match resolve_registered_client_uri(app_state.get_ref(), &caller_client) {
Ok(value) => value,
Err(resp) => return resp,
};
let all_databases = match list_postgres_databases(&pool).await {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to list local cluster databases", err);
}
};
let managed_databases =
match load_athena_managed_databases(app_state.get_ref(), &server_pg_uri).await {
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_list_local_cluster_databases",
Some(&caller_client),
0,
actix_web::http::StatusCode::OK,
Some(json!({ "caller_client": caller_client })),
);
api_success(
"Listed local cluster databases",
json!({
"client_name": caller_client,
"all_databases": all_databases,
"athena_managed_databases": managed_databases,
}),
)
}
#[post("/management/provision/local/databases")]
async fn management_create_local_cluster_database(
req: HttpRequest,
body: Json<ManagementLocalClusterCreateDatabaseRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &caller_client) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let server_pg_uri = match resolve_registered_client_uri(app_state.get_ref(), &caller_client) {
Ok(value) => value,
Err(resp) => return resp,
};
let create_params = LocalClusterCreateDatabaseParams {
database_name: body.database_name.clone(),
options: LocalClusterDatabaseCreateOptions {
owner: body.owner.clone(),
template: body.template.clone(),
encoding: body.encoding.clone(),
lc_collate: body.lc_collate.clone(),
lc_ctype: body.lc_ctype.clone(),
tablespace: body.tablespace.clone(),
},
};
if let Err(err) = create_postgres_database(&pool, &create_params).await {
return provisioning_error_response("Failed to create local cluster database", err);
}
let database_uri = match replace_uri_database_name(&server_pg_uri, &body.database_name) {
Ok(value) => value,
Err(err) => return provisioning_error_response("Failed to build database URI", err),
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&database_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response(
"Failed to provision newly created database",
err,
);
}
})
} else {
None
};
let register_name = body
.register_name
.clone()
.unwrap_or_else(|| body.database_name.clone());
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
®ister_name,
body.description
.clone()
.or_else(|| Some(format!("Local cluster database '{}'", body.database_name))),
&database_uri,
json!({
"managed_by": "management_api",
"provider": "local_cluster",
"server_client_name": caller_client.clone(),
"database_name": body.database_name.clone(),
"create_options": {
"owner": body.owner.clone(),
"template": body.template.clone(),
"encoding": body.encoding.clone(),
"lc_collate": body.lc_collate.clone(),
"lc_ctype": body.lc_ctype.clone(),
"tablespace": body.tablespace.clone(),
},
"requested_by_client": caller_client.clone(),
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_local_cluster_database",
Some(®ister_name),
0,
actix_web::http::StatusCode::CREATED,
Some(json!({
"caller_client": caller_client,
"database_name": body.database_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_created(
"Created local cluster database",
json!({
"server_client_name": caller_client,
"database_name": body.database_name,
"database_uri": database_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
"register_name": register_name,
}
}),
)
}
#[post("/management/provision/providers/neon")]
async fn management_provision_neon(
req: HttpRequest,
body: Json<ManagementNeonProvisionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let mut project_id = body.project_id.clone();
let mut branch_id = body.branch_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_provider_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_neon_project(NeonProjectCreateParams {
api_key,
project_name: body
.project_name
.clone()
.or_else(|| Some(body.client_name.clone())),
project_payload: body.project_create_payload.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => return provisioning_error_response("Failed to create Neon project", err),
};
project_id = Some(created.project_id);
if branch_id.is_none() {
branch_id = created.branch_id;
}
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_provider_field("api_key", body.api_key.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_neon_connection_uri(NeonConnectionParams {
api_key,
project_id,
branch_id: branch_id.clone(),
database_name: body.database_name.clone(),
role_name: body.role_name.clone(),
endpoint_id: body.endpoint_id.clone(),
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return provisioning_error_response("Failed to fetch Neon connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response("Failed to provision Neon database", err);
}
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Neon database managed via Athena management API".to_string()));
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "management_api",
"provider": "neon",
"project_id": project_id,
"branch_id": branch_id,
"database_name": body.database_name,
"role_name": body.role_name,
"endpoint_id": body.endpoint_id,
"created_project": body.create_project_if_missing,
"requested_by_client": caller_client,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_provision_neon",
Some(&body.client_name),
0,
actix_web::http::StatusCode::OK,
Some(json!({
"provider": "neon",
"caller_client": caller_client,
"target_client": body.client_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_success(
"Provisioned Neon database",
json!({
"provider": "neon",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/management/provision/providers/railway")]
async fn management_provision_railway(
req: HttpRequest,
body: Json<ManagementRailwayProvisionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut project_id = body.project_id.clone();
let mut environment_id = body.environment_id.clone();
let mut service_id = body.service_id.clone();
let mut plugin_id = body.plugin_id.clone();
if project_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_project_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_input = match json_object_insert_if_missing(
body.project_create_input.clone(),
"name",
serde_json::Value::String(
body.project_name
.clone()
.unwrap_or_else(|| format!("athena-{}", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway project_create_input", err);
}
};
let created = match create_railway_project(RailwayProjectCreateParams {
api_key,
project_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Railway project", err);
}
};
project_id = Some(created.project_id);
if environment_id.is_none() {
environment_id = created.base_environment_id;
}
}
if environment_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& let (Some(api_key), Some(project)) = (api_key_for_create.clone(), project_id.clone())
{
environment_id = match fetch_railway_project_base_environment_id(
&api_key,
&project,
body.graphql_url.as_deref(),
)
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response(
"Failed to resolve Railway base environment",
err,
);
}
};
}
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_input = match json_object_insert_if_missing(
body.service_create_input.clone(),
"projectId",
serde_json::Value::String(project),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway service_create_input", err);
}
};
let service_input = match json_object_insert_if_missing(
Some(service_input),
"name",
serde_json::Value::String(
body.service_name
.clone()
.unwrap_or_else(|| format!("{}-service", body.client_name)),
),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway service_create_input", err);
}
};
let created = match create_railway_service(RailwayServiceCreateParams {
api_key,
service_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Railway service", err);
}
};
service_id = Some(created.service_id);
}
if plugin_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_plugin_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment = match required_provider_field("environment_id", environment_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let plugin_input = match json_object_insert_if_missing(
body.plugin_create_input.clone(),
"projectId",
serde_json::Value::String(project),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway plugin_create_input", err);
}
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"environmentId",
serde_json::Value::String(environment),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway plugin_create_input", err);
}
};
let plugin_input = match json_object_insert_if_missing(
Some(plugin_input),
"name",
serde_json::Value::String(
body.plugin_name
.clone()
.unwrap_or_else(|| "Postgres".to_string()),
),
) {
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Invalid Railway plugin_create_input", err);
}
};
let created = match create_railway_plugin(RailwayPluginCreateParams {
api_key,
plugin_input,
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Railway plugin", err);
}
};
plugin_id = Some(created.plugin_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let project_id = match required_provider_field("project_id", project_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let environment_id = match required_provider_field("environment_id", environment_id.clone())
{
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_railway_connection_uri(RailwayConnectionParams {
api_key,
project_id,
environment_id,
service_id: service_id.clone(),
plugin_id: plugin_id.clone(),
graphql_url: body.graphql_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return provisioning_error_response("Failed to fetch Railway connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response("Failed to provision Railway database", err);
}
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Railway database managed via Athena management API".to_string()));
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "management_api",
"provider": "railway",
"project_id": project_id,
"environment_id": environment_id,
"service_id": service_id,
"plugin_id": plugin_id,
"created_project": body.create_project_if_missing,
"created_service": body.create_service_if_missing,
"created_plugin": body.create_plugin_if_missing,
"requested_by_client": caller_client,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_provision_railway",
Some(&body.client_name),
0,
actix_web::http::StatusCode::OK,
Some(json!({
"provider": "railway",
"caller_client": caller_client,
"target_client": body.client_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_success(
"Provisioned Railway database",
json!({
"provider": "railway",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/management/provision/providers/render")]
async fn management_provision_render(
req: HttpRequest,
body: Json<ManagementRenderProvisionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let caller_client = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &caller_client) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&caller_client,
vec![MANAGEMENT_PROVISION_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let api_key_for_create = body
.api_key
.clone()
.filter(|value| !value.trim().is_empty());
let mut service_id = body.service_id.clone();
if service_id
.as_ref()
.is_none_or(|value| value.trim().is_empty())
&& body.create_service_if_missing
{
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let created = match create_render_postgres_service(RenderPostgresCreateParams {
api_key,
owner_id: body.owner_id.clone(),
service_name: body.service_name.clone(),
service_payload: body.service_create_payload.clone(),
plan: body.plan.clone(),
region: body.region.clone(),
postgres_version: body.postgres_version.clone(),
disk_size_gb: body.disk_size_gb,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(value) => value,
Err(err) => {
return provisioning_error_response("Failed to create Render service", err);
}
};
service_id = Some(created.service_id);
}
let pg_uri = if let Some(uri) = body
.connection_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
uri.to_string()
} else {
let api_key = match required_provider_field("api_key", api_key_for_create.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
let service_id = match required_provider_field("service_id", service_id.clone()) {
Ok(value) => value,
Err(resp) => return resp,
};
match fetch_render_connection_uri(RenderConnectionParams {
api_key,
service_id,
api_base_url: body.api_base_url.clone(),
})
.await
{
Ok(uri) => uri,
Err(err) => {
return provisioning_error_response("Failed to fetch Render connection URI", err);
}
}
};
let statements_executed = if body.provision_schema {
Some(match run_provision_sql(&pg_uri).await {
Ok(total) => total,
Err(err) => {
return provisioning_error_response("Failed to provision Render database", err);
}
})
} else {
None
};
let description = body
.description
.clone()
.or_else(|| Some("Render database managed via Athena management API".to_string()));
let (runtime_registered, catalog_registered) = match register_provider_client(
app_state.get_ref(),
&body.client_name,
description,
&pg_uri,
json!({
"managed_by": "management_api",
"provider": "render",
"service_id": service_id,
"owner_id": body.owner_id,
"service_name": body.service_name,
"plan": body.plan,
"region": body.region,
"postgres_version": body.postgres_version,
"disk_size_gb": body.disk_size_gb,
"created_service": body.create_service_if_missing,
"requested_by_client": caller_client,
}),
body.register_runtime,
body.register_catalog,
)
.await
{
Ok(value) => value,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_provision_render",
Some(&body.client_name),
0,
actix_web::http::StatusCode::OK,
Some(json!({
"provider": "render",
"caller_client": caller_client,
"target_client": body.client_name,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
})),
);
api_success(
"Provisioned Render database",
json!({
"provider": "render",
"client_name": body.client_name,
"pg_uri": pg_uri,
"pipeline": {
"provision_schema": body.provision_schema,
"statements_executed": statements_executed,
"register_runtime": runtime_registered,
"register_catalog": catalog_registered,
}
}),
)
}
#[post("/management/tables")]
async fn management_create_table(
req: HttpRequest,
body: Json<CreateTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_create_table_statement(&body) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid create table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &body.table_name).await {
Ok(true) => {
return conflict(
"Table already exists",
format!(
"Table '{}.{}' already exists.",
body.schema_name, body.table_name
),
);
}
Ok(false) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"table",
"create_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_table",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"create_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"table",
"create_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "applied": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not created because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_table",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::CREATED,
Some(json!({ "schema_name": body.schema_name, "table_name": body.table_name })),
);
app_state.metrics_state.record_management_mutation(
"create_table",
"success",
started.elapsed().as_secs_f64(),
);
api_created(
"Created table",
json!({
"schema_name": body.schema_name,
"table_name": body.table_name,
"statements": statements,
}),
)
}
#[patch("/management/tables/{table_name}")]
async fn management_edit_table(
req: HttpRequest,
path: Path<String>,
body: Json<EditTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements =
match build_edit_table_statements(&body.schema_name, &table_name, &body.operations) {
Ok(statements) => statements,
Err(err) => return bad_request("Invalid edit table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"edit_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_edit_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"edit_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"edit_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "applied": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not altered because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_edit_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "table_name": table_name })),
);
app_state.metrics_state.record_management_mutation(
"edit_table",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Updated table",
json!({ "table_name": table_name, "statements": statements }),
)
}
#[delete("/management/tables/{table_name}")]
async fn management_drop_table(
req: HttpRequest,
path: Path<String>,
body: Json<DropTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_table_statement(&body.schema_name, &table_name, body.cascade)
{
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"drop_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"drop_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"drop_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "table_name": table_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_table",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped table",
json!({ "table_name": table_name, "statements": statements }),
)
}
#[delete("/management/tables/{table_name}/columns/{column_name}")]
async fn management_drop_column(
req: HttpRequest,
path: Path<(String, String)>,
body: Json<DropColumnRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started: Instant = Instant::now();
let request_payload: DropColumnRequest = body.0.clone();
let client_name: String = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let (table_name, column_name) = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_COLUMNS_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements: Vec<String> = match build_drop_column_statement(
&body.schema_name,
&table_name,
&column_name,
body.cascade,
) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop column request", err),
};
let mut tx: Transaction<'_, Postgres> = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match column_exists(&mut tx, &body.schema_name, &table_name, &column_name).await {
Ok(false) => {
return not_found(
"Column not found",
format!(
"Column '{}.{}.{}' does not exist.",
body.schema_name, table_name, column_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result: Result<(), sqlx::Error> = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"column",
"drop_column",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_column",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"drop_column",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry: audit::DatabaseAuditLogEntry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"column",
"drop_column",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true, "column_name": column_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The column was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_column",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(
json!({ "schema_name": body.schema_name, "table_name": table_name, "column_name": column_name }),
),
);
app_state.metrics_state.record_management_mutation(
"drop_column",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped column",
json!({ "table_name": table_name, "column_name": column_name, "statements": statements }),
)
}
#[post("/management/indexes")]
async fn management_create_index(
req: HttpRequest,
body: Json<CreateIndexRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_INDEXES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let (index_name, statement) = match build_create_index_statement(&body) {
Ok(value) => value,
Err(err) => return bad_request("Invalid create index request", err),
};
let statements = vec![statement];
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &body.table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, body.table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
match index_exists(&mut tx, &body.schema_name, &index_name).await {
Ok(true) => {
return conflict(
"Index already exists",
format!(
"Index '{}.{}' already exists.",
body.schema_name, index_name
),
);
}
Ok(false) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"index",
"create_index",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_index",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string(), "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"create_index",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"index",
"create_index",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "created": true, "index_name": index_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The index was not created because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_index",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::CREATED,
Some(
json!({ "schema_name": body.schema_name, "table_name": body.table_name, "index_name": index_name }),
),
);
app_state.metrics_state.record_management_mutation(
"create_index",
"success",
started.elapsed().as_secs_f64(),
);
api_created(
"Created index",
json!({ "table_name": body.table_name, "index_name": index_name, "statements": statements }),
)
}
#[delete("/management/indexes/{index_name}")]
async fn management_drop_index(
req: HttpRequest,
path: Path<String>,
body: Json<DropIndexRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let index_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_INDEXES_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_index_statement(&body.schema_name, &index_name) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop index request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match index_exists(&mut tx, &body.schema_name, &index_name).await {
Ok(false) => {
return not_found(
"Index not found",
format!(
"Index '{}.{}' does not exist.",
body.schema_name, index_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&index_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
None,
"index",
"drop_index",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_index",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string(), "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_index",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&index_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
None,
"index",
"drop_index",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true, "index_name": index_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The index was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_index",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_index",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped index",
json!({ "index_name": index_name, "statements": statements }),
)
}
#[get("/management/functions")]
async fn management_list_functions(
req: HttpRequest,
query: web::Query<ManagementFunctionsQuery>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let params = query.into_inner();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_FUNCTIONS_READ_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let schema_filter = if let Some(schema) = params.schema.as_deref() {
match validate_identifier(schema, "schema name") {
Ok(value) => Some(value.trim_matches('"').to_string()),
Err(err) => return bad_request("Invalid functions query", err),
}
} else {
None
};
let name_like = params
.name_like
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| format!("%{}%", value));
let rows = match sqlx::query(
r#"
SELECT
n.nspname AS schema_name,
p.proname AS function_name,
pg_get_function_identity_arguments(p.oid) AS signature,
pg_get_function_result(p.oid) AS return_type,
l.lanname AS language,
p.proretset AS is_set_returning,
CASE WHEN $4 THEN pg_get_functiondef(p.oid) ELSE NULL END AS definition,
p.oid::text AS function_oid
FROM pg_proc p
JOIN pg_namespace n ON n.oid = p.pronamespace
JOIN pg_language l ON l.oid = p.prolang
WHERE p.prokind = 'f'
AND ($1::text IS NULL OR n.nspname = $1)
AND ($2::text IS NULL OR p.proname ILIKE $2)
AND ($3::bool OR n.nspname NOT IN ('pg_catalog', 'information_schema'))
ORDER BY n.nspname ASC, p.proname ASC, signature ASC
"#,
)
.bind(schema_filter)
.bind(name_like)
.bind(params.include_system)
.bind(params.include_definition)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => {
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_list_functions",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({ "error": err.to_string() })),
);
return internal_error("Failed to list functions", err.to_string());
}
};
let functions: Vec<ManagementFunctionInfo> = rows
.into_iter()
.map(|row| ManagementFunctionInfo {
schema_name: row
.try_get::<String, _>("schema_name")
.unwrap_or_else(|_| "public".to_string()),
function_name: row
.try_get::<String, _>("function_name")
.unwrap_or_default(),
signature: row.try_get::<String, _>("signature").unwrap_or_default(),
return_type: row.try_get::<String, _>("return_type").unwrap_or_default(),
language: row
.try_get::<String, _>("language")
.unwrap_or_else(|_| "sql".to_string()),
is_set_returning: row.try_get::<bool, _>("is_set_returning").unwrap_or(false),
definition: row
.try_get::<Option<String>, _>("definition")
.unwrap_or(None),
metadata: json!({
"function_oid": row.try_get::<String, _>("function_oid").unwrap_or_default()
}),
})
.collect();
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_list_functions",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "function_count": functions.len() })),
);
api_success("Listed functions", json!({ "functions": functions }))
}
#[put("/management/functions")]
async fn management_upsert_function(
req: HttpRequest,
body: Json<UpsertFunctionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_FUNCTIONS_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let target: ParsedFunctionDdlTarget = match validate_create_or_replace_function_ddl(&body.ddl) {
Ok(target) => target,
Err(err) => return bad_request("Invalid function DDL", err),
};
let schema_name_for_query = target.schema_name.trim_matches('"').to_string();
let function_name_for_query = target.function_name.trim_matches('"').to_string();
let statements = vec![body.ddl.trim().to_string()];
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
if !matches!(function_ddl_audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Function DDL audit log unavailable",
"The target database must contain public.function_ddl_audit_log before function mutations can run.",
);
}
let previous_defs =
match list_function_definitions(&mut tx, &schema_name_for_query, &function_name_for_query)
.await
{
Ok(defs) => defs,
Err(err) => {
return internal_error(
"Failed to inspect existing function definitions",
err.to_string(),
);
}
};
if let Err(err) = run_audited_statements(&mut tx, &statements).await {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&schema_name_for_query,
Some(&function_name_for_query),
"function",
"upsert_function",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let function_audit_entry = build_function_ddl_audit_entry(
&req,
&auth.log_context,
&auth.request_id,
&client_name,
&schema_name_for_query,
&function_name_for_query,
None,
"upsert",
"failed",
body.ddl.clone(),
json!(previous_defs),
json!([]),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
);
let _ = insert_function_ddl_audit_log(&mut tx, &function_audit_entry).await;
let _ = tx.rollback().await;
app_state.metrics_state.record_management_mutation(
"upsert_function",
"failed",
started.elapsed().as_secs_f64(),
);
let processed = process_sqlx_error_with_context(&err, Some(&function_name_for_query));
return processed_error(processed);
}
let next_defs =
match list_function_definitions(&mut tx, &schema_name_for_query, &function_name_for_query)
.await
{
Ok(defs) => defs,
Err(err) => {
return internal_error(
"Failed to inspect resulting function definitions",
err.to_string(),
);
}
};
let signature = resolve_upserted_function_signature(&target, &previous_defs, &next_defs);
if signature.is_none() {
return conflict(
"Function signature could not be resolved",
"The function DDL executed but Athena could not resolve the resulting function signature.",
);
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&schema_name_for_query,
Some(&function_name_for_query),
"function",
"upsert_function",
"success",
json!(request_payload),
statements.clone(),
json!({
"schema_name": schema_name_for_query,
"function_name": function_name_for_query,
"function_signature": signature,
}),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
return internal_error(
"Database audit log write failed",
format!(
"The function was not updated because the audit log write failed: {}",
err
),
);
}
let function_audit_entry = build_function_ddl_audit_entry(
&req,
&auth.log_context,
&auth.request_id,
&client_name,
&schema_name_for_query,
&function_name_for_query,
signature.clone(),
"upsert",
"success",
body.ddl.clone(),
json!(previous_defs),
json!(next_defs),
None,
started.elapsed().as_millis() as i64,
);
if let Err(err) = insert_function_ddl_audit_log(&mut tx, &function_audit_entry).await {
return internal_error(
"Function DDL audit log write failed",
format!(
"The function was not updated because the function audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_upsert_function",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({
"schema_name": schema_name_for_query,
"function_name": function_name_for_query,
"function_signature": signature,
})),
);
app_state.metrics_state.record_management_mutation(
"upsert_function",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Upserted function",
json!({
"schema_name": schema_name_for_query,
"function_name": function_name_for_query,
"function_signature": signature,
"statements": statements,
}),
)
}
#[delete("/management/functions")]
async fn management_drop_function(
req: HttpRequest,
body: Json<DropFunctionRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_FUNCTIONS_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let schema_name = match validate_identifier(&body.schema_name, "schema name") {
Ok(value) => value,
Err(err) => return bad_request("Invalid drop function request", err),
};
let function_name = match validate_identifier(&body.function_name, "function name") {
Ok(value) => value,
Err(err) => return bad_request("Invalid drop function request", err),
};
let arg_types = match validate_function_arg_types(&body.arg_types) {
Ok(types) => types,
Err(err) => return bad_request("Invalid drop function request", err),
};
let signature = arg_types.join(", ");
let statement = format!(
"DROP FUNCTION {}.{}({}) RESTRICT",
schema_name, function_name, signature
);
let statements = vec![statement.clone()];
let schema_name_for_query = schema_name.trim_matches('"').to_string();
let function_name_for_query = function_name.trim_matches('"').to_string();
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
if !matches!(function_ddl_audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Function DDL audit log unavailable",
"The target database must contain public.function_ddl_audit_log before function mutations can run.",
);
}
let previous_defs =
match list_function_definitions(&mut tx, &schema_name_for_query, &function_name_for_query)
.await
{
Ok(defs) => defs,
Err(err) => {
return internal_error(
"Failed to inspect existing function definitions",
err.to_string(),
);
}
};
if previous_defs.is_empty() {
return not_found(
"Function not found",
format!(
"No function named '{}.{}' exists for the target client.",
schema_name_for_query, function_name_for_query
),
);
}
if let Err(err) = run_audited_statements(&mut tx, &statements).await {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&schema_name_for_query,
Some(&function_name_for_query),
"function",
"drop_function",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let function_audit_entry = build_function_ddl_audit_entry(
&req,
&auth.log_context,
&auth.request_id,
&client_name,
&schema_name_for_query,
&function_name_for_query,
Some(signature.clone()),
"drop",
"failed",
statement.clone(),
json!(previous_defs),
json!([]),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
);
let _ = insert_function_ddl_audit_log(&mut tx, &function_audit_entry).await;
let _ = tx.rollback().await;
app_state.metrics_state.record_management_mutation(
"drop_function",
"failed",
started.elapsed().as_secs_f64(),
);
let processed = process_sqlx_error_with_context(&err, Some(&function_name_for_query));
return processed_error(processed);
}
let next_defs =
match list_function_definitions(&mut tx, &schema_name_for_query, &function_name_for_query)
.await
{
Ok(defs) => defs,
Err(err) => {
return internal_error(
"Failed to inspect resulting function definitions",
err.to_string(),
);
}
};
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&schema_name_for_query,
Some(&function_name_for_query),
"function",
"drop_function",
"success",
json!(request_payload),
statements.clone(),
json!({
"schema_name": schema_name_for_query,
"function_name": function_name_for_query,
"function_signature": signature,
}),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
return internal_error(
"Database audit log write failed",
format!(
"The function was not dropped because the audit log write failed: {}",
err
),
);
}
let function_audit_entry = build_function_ddl_audit_entry(
&req,
&auth.log_context,
&auth.request_id,
&client_name,
&schema_name_for_query,
&function_name_for_query,
Some(signature.clone()),
"drop",
"success",
statement.clone(),
json!(previous_defs),
json!(next_defs),
None,
started.elapsed().as_millis() as i64,
);
if let Err(err) = insert_function_ddl_audit_log(&mut tx, &function_audit_entry).await {
return internal_error(
"Function DDL audit log write failed",
format!(
"The function was not dropped because the function audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_function",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({
"schema_name": schema_name_for_query,
"function_name": function_name_for_query,
"function_signature": signature,
})),
);
app_state.metrics_state.record_management_mutation(
"drop_function",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped function",
json!({
"schema_name": schema_name_for_query,
"function_name": function_name_for_query,
"function_signature": signature,
"statements": statements,
}),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(management_capabilities_route)
.service(management_list_local_cluster_databases)
.service(management_create_local_cluster_database)
.service(management_provision_neon)
.service(management_provision_railway)
.service(management_provision_render)
.service(management_create_table)
.service(management_edit_table)
.service(management_drop_table)
.service(management_drop_column)
.service(management_create_index)
.service(management_drop_index)
.service(management_list_functions)
.service(management_upsert_function)
.service(management_drop_function)
.service(management_install_extension);
}
#[cfg(test)]
mod tests {
use super::{
ParsedFunctionDdlTarget, normalize_identity_signature, resolve_upserted_function_signature,
};
use serde_json::json;
fn target(signature: &str) -> ParsedFunctionDdlTarget {
ParsedFunctionDdlTarget {
schema_name: "\"public\"".to_string(),
function_name: "\"echo\"".to_string(),
identity_signature: signature.to_string(),
}
}
#[test]
fn normalize_identity_signature_compacts_whitespace() {
assert_eq!(
normalize_identity_signature(" name text , suffix text "),
"name text , suffix text"
);
}
#[test]
fn resolve_upserted_function_signature_prefers_exact_match() {
let previous = vec![json!({ "signature": "name text" })];
let next = vec![
json!({ "signature": "name text" }),
json!({ "signature": "name text, suffix text" }),
];
let resolved = resolve_upserted_function_signature(
&target("name text, suffix text"),
&previous,
&next,
);
assert_eq!(resolved.as_deref(), Some("name text, suffix text"));
}
#[test]
fn resolve_upserted_function_signature_uses_single_new_signature_fallback() {
let previous = vec![json!({ "signature": "name text" })];
let next = vec![
json!({ "signature": "name text" }),
json!({ "signature": "name text, suffix text" }),
];
let resolved = resolve_upserted_function_signature(&target(""), &previous, &next);
assert_eq!(resolved.as_deref(), Some("name text, suffix text"));
}
#[test]
fn resolve_upserted_function_signature_returns_only_signature_when_one_exists() {
let previous = vec![];
let next = vec![json!({ "signature": "arr integer[]" })];
let resolved = resolve_upserted_function_signature(&target(""), &previous, &next);
assert_eq!(resolved.as_deref(), Some("arr integer[]"));
}
}