use actix_web::{
HttpRequest, HttpResponse, Responder, delete, get, patch, post,
web::{self, Data, Json, Path},
};
use serde_json::json;
use sqlx::{Postgres, Row, Transaction};
use std::time::Instant;
use crate::AppState;
use crate::api::client_context::{
auth_pool, pool_for_client, registered_client_for, required_client_name,
};
use crate::api::gateway::auth::authorize_gateway_request;
use crate::api::management::audit::{
audit_table_exists, build_audit_entry, insert_database_audit_log,
};
use crate::api::management::sql::{
ALLOWED_COLUMN_DATA_TYPES, ALLOWED_INDEX_METHODS, build_create_index_statement,
build_create_table_statement, build_drop_column_statement, build_drop_index_statement,
build_drop_table_statement, build_edit_table_statements,
};
use crate::api::management::types::{
CapabilityRight, CreateIndexRequest, CreateTableRequest, DropColumnRequest, DropIndexRequest,
DropTableRequest, EditTableRequest, ManagementCapabilitiesResponse,
};
use crate::api::response::{
api_created, api_success, bad_request, conflict, forbidden, internal_error, not_found,
processed_error, service_unavailable,
};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
pub mod audit;
pub mod sql;
pub mod types;
const MANAGEMENT_READ_RIGHT: &str = "management.read";
const MANAGEMENT_TABLES_WRITE_RIGHT: &str = "management.tables.write";
const MANAGEMENT_TABLES_DROP_RIGHT: &str = "management.tables.drop";
const MANAGEMENT_COLUMNS_DROP_RIGHT: &str = "management.columns.drop";
const MANAGEMENT_INDEXES_WRITE_RIGHT: &str = "management.indexes.write";
const MANAGEMENT_INDEXES_DROP_RIGHT: &str = "management.indexes.drop";
async fn authorize_management_request(
req: &HttpRequest,
app_state: &AppState,
client_name: &str,
required_rights: Vec<String>,
) -> Result<crate::api::gateway::auth::GatewayAuthOutcome, HttpResponse> {
auth_pool(app_state)?;
let auth = authorize_gateway_request(req, app_state, Some(client_name), required_rights).await;
if let Some(response) = auth.response {
return Err(response);
}
if auth.bound_client_name.as_deref() != Some(client_name) {
return Err(forbidden(
"Management API requires a client-bound API key",
format!(
"The presented API key must be bound to client '{}'.",
client_name
),
));
}
Ok(auth)
}
async fn table_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
table_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = $1
AND table_name = $2
) AS exists
"#,
)
.bind(schema_name)
.bind(table_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn column_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
table_name: &str,
column_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
) AS exists
"#,
)
.bind(schema_name)
.bind(table_name)
.bind(column_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn index_exists(
tx: &mut Transaction<'_, Postgres>,
schema_name: &str,
index_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT EXISTS (
SELECT 1
FROM pg_indexes
WHERE schemaname = $1
AND indexname = $2
) AS exists
"#,
)
.bind(schema_name)
.bind(index_name)
.fetch_one(&mut **tx)
.await?;
row.try_get("exists")
}
async fn run_audited_statements(
tx: &mut Transaction<'_, Postgres>,
statements: &[String],
) -> Result<(), sqlx::Error> {
sqlx::query("SAVEPOINT athena_management_mutation")
.execute(&mut **tx)
.await?;
for statement in statements {
sqlx::query(statement).execute(&mut **tx).await?;
}
Ok(())
}
async fn rollback_to_savepoint(tx: &mut Transaction<'_, Postgres>) {
let _ = sqlx::query("ROLLBACK TO SAVEPOINT athena_management_mutation")
.execute(&mut **tx)
.await;
}
fn management_capabilities() -> ManagementCapabilitiesResponse {
ManagementCapabilitiesResponse {
backend: "postgresql",
retention_mutation_supported: false,
replication_mutation_supported: false,
supported_operations: vec![
"management.capabilities.read",
"management.tables.create",
"management.tables.edit",
"management.tables.drop",
"management.columns.drop",
"management.indexes.create",
"management.indexes.drop",
],
required_rights: vec![
CapabilityRight {
operation: "GET /management/capabilities",
required_right: MANAGEMENT_READ_RIGHT,
},
CapabilityRight {
operation: "POST /management/tables",
required_right: MANAGEMENT_TABLES_WRITE_RIGHT,
},
CapabilityRight {
operation: "PATCH /management/tables/{table_name}",
required_right: MANAGEMENT_TABLES_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/tables/{table_name}",
required_right: MANAGEMENT_TABLES_DROP_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/tables/{table_name}/columns/{column_name}",
required_right: MANAGEMENT_COLUMNS_DROP_RIGHT,
},
CapabilityRight {
operation: "POST /management/indexes",
required_right: MANAGEMENT_INDEXES_WRITE_RIGHT,
},
CapabilityRight {
operation: "DELETE /management/indexes/{index_name}",
required_right: MANAGEMENT_INDEXES_DROP_RIGHT,
},
],
allowed_index_methods: ALLOWED_INDEX_METHODS.to_vec(),
allowed_column_data_types: ALLOWED_COLUMN_DATA_TYPES.to_vec(),
}
}
#[get("/management/capabilities")]
async fn management_capabilities_route(
req: HttpRequest,
app_state: Data<AppState>,
) -> impl Responder {
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_READ_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_capabilities",
None,
0,
actix_web::http::StatusCode::OK,
Some(json!({ "client_name": client_name })),
);
api_success(
"Loaded management capabilities",
json!(management_capabilities()),
)
}
#[post("/management/tables")]
async fn management_create_table(
req: HttpRequest,
body: Json<CreateTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_create_table_statement(&body) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid create table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &body.table_name).await {
Ok(true) => {
return conflict(
"Table already exists",
format!(
"Table '{}.{}' already exists.",
body.schema_name, body.table_name
),
);
}
Ok(false) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"table",
"create_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_table",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"create_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"table",
"create_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "applied": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not created because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_table",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::CREATED,
Some(json!({ "schema_name": body.schema_name, "table_name": body.table_name })),
);
app_state.metrics_state.record_management_mutation(
"create_table",
"success",
started.elapsed().as_secs_f64(),
);
api_created(
"Created table",
json!({
"schema_name": body.schema_name,
"table_name": body.table_name,
"statements": statements,
}),
)
}
#[patch("/management/tables/{table_name}")]
async fn management_edit_table(
req: HttpRequest,
path: Path<String>,
body: Json<EditTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements =
match build_edit_table_statements(&body.schema_name, &table_name, &body.operations) {
Ok(statements) => statements,
Err(err) => return bad_request("Invalid edit table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"edit_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_edit_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"edit_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"edit_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "applied": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not altered because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_edit_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "table_name": table_name })),
);
app_state.metrics_state.record_management_mutation(
"edit_table",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Updated table",
json!({ "table_name": table_name, "statements": statements }),
)
}
#[delete("/management/tables/{table_name}")]
async fn management_drop_table(
req: HttpRequest,
path: Path<String>,
body: Json<DropTableRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let table_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_TABLES_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_table_statement(&body.schema_name, &table_name, body.cascade)
{
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop table request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"drop_table",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"drop_table",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"table",
"drop_table",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The table was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_table",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "table_name": table_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_table",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped table",
json!({ "table_name": table_name, "statements": statements }),
)
}
#[delete("/management/tables/{table_name}/columns/{column_name}")]
async fn management_drop_column(
req: HttpRequest,
path: Path<(String, String)>,
body: Json<DropColumnRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let (table_name, column_name) = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_COLUMNS_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_column_statement(
&body.schema_name,
&table_name,
&column_name,
body.cascade,
) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop column request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match column_exists(&mut tx, &body.schema_name, &table_name, &column_name).await {
Ok(false) => {
return not_found(
"Column not found",
format!(
"Column '{}.{}.{}' does not exist.",
body.schema_name, table_name, column_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"column",
"drop_column",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_column",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string() })),
);
app_state.metrics_state.record_management_mutation(
"drop_column",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&table_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&table_name),
"column",
"drop_column",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true, "column_name": column_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The column was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_column",
Some(&table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(
json!({ "schema_name": body.schema_name, "table_name": table_name, "column_name": column_name }),
),
);
app_state.metrics_state.record_management_mutation(
"drop_column",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped column",
json!({ "table_name": table_name, "column_name": column_name, "statements": statements }),
)
}
#[post("/management/indexes")]
async fn management_create_index(
req: HttpRequest,
body: Json<CreateIndexRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_INDEXES_WRITE_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let (index_name, statement) = match build_create_index_statement(&body) {
Ok(value) => value,
Err(err) => return bad_request("Invalid create index request", err),
};
let statements = vec![statement];
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match table_exists(&mut tx, &body.schema_name, &body.table_name).await {
Ok(false) => {
return not_found(
"Table not found",
format!(
"Table '{}.{}' does not exist.",
body.schema_name, body.table_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
match index_exists(&mut tx, &body.schema_name, &index_name).await {
Ok(true) => {
return conflict(
"Index already exists",
format!(
"Index '{}.{}' already exists.",
body.schema_name, index_name
),
);
}
Ok(false) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"index",
"create_index",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_index",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string(), "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"create_index",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(
&err,
Some(&body.table_name),
));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
Some(&body.table_name),
"index",
"create_index",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "created": true, "index_name": index_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The index was not created because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_create_index",
Some(&body.table_name),
started.elapsed().as_millis(),
actix_web::http::StatusCode::CREATED,
Some(
json!({ "schema_name": body.schema_name, "table_name": body.table_name, "index_name": index_name }),
),
);
app_state.metrics_state.record_management_mutation(
"create_index",
"success",
started.elapsed().as_secs_f64(),
);
api_created(
"Created index",
json!({ "table_name": body.table_name, "index_name": index_name, "statements": statements }),
)
}
#[delete("/management/indexes/{index_name}")]
async fn management_drop_index(
req: HttpRequest,
path: Path<String>,
body: Json<DropIndexRequest>,
app_state: Data<AppState>,
) -> impl Responder {
let started = Instant::now();
let request_payload = body.0.clone();
let client_name = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let index_name = path.into_inner();
if let Err(resp) = registered_client_for(app_state.get_ref(), &client_name) {
return resp;
}
let pool = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let auth = match authorize_management_request(
&req,
app_state.get_ref(),
&client_name,
vec![MANAGEMENT_INDEXES_DROP_RIGHT.to_string()],
)
.await
{
Ok(auth) => auth,
Err(resp) => return resp,
};
let logged_request = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
let statements = match build_drop_index_statement(&body.schema_name, &index_name) {
Ok(statement) => vec![statement],
Err(err) => return bad_request("Invalid drop index request", err),
};
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(err) => return internal_error("Failed to open database transaction", err.to_string()),
};
if !matches!(audit_table_exists(&mut tx).await, Ok(true)) {
return service_unavailable(
"Database audit log unavailable",
"The target database must contain public.database_audit_log before management mutations can run.",
);
}
match index_exists(&mut tx, &body.schema_name, &index_name).await {
Ok(false) => {
return not_found(
"Index not found",
format!(
"Index '{}.{}' does not exist.",
body.schema_name, index_name
),
);
}
Ok(true) => {}
Err(err) => {
return processed_error(process_sqlx_error_with_context(&err, Some(&index_name)));
}
}
let result = run_audited_statements(&mut tx, &statements).await;
if let Err(err) = result {
rollback_to_savepoint(&mut tx).await;
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
None,
"index",
"drop_index",
"failed",
json!(request_payload.clone()),
statements.clone(),
json!({}),
Some(err.to_string()),
started.elapsed().as_millis() as i64,
&auth.request_id,
);
let _ = insert_database_audit_log(&mut tx, &audit_entry).await;
let _ = tx.commit().await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_index",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::BAD_REQUEST,
Some(json!({ "error": err.to_string(), "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_index",
"error",
started.elapsed().as_secs_f64(),
);
return processed_error(process_sqlx_error_with_context(&err, Some(&index_name)));
}
let audit_entry = build_audit_entry(
&req,
&auth.log_context,
&client_name,
&body.schema_name,
None,
"index",
"drop_index",
"success",
json!(request_payload.clone()),
statements.clone(),
json!({ "dropped": true, "index_name": index_name }),
None,
started.elapsed().as_millis() as i64,
&auth.request_id,
);
if let Err(err) = insert_database_audit_log(&mut tx, &audit_entry).await {
let _ = tx.rollback().await;
return service_unavailable(
"Database audit log write failed",
format!(
"The index was not dropped because the audit log write failed: {}",
err
),
);
}
if let Err(err) = tx.commit().await {
return internal_error("Failed to commit database transaction", err.to_string());
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"management_drop_index",
None,
started.elapsed().as_millis(),
actix_web::http::StatusCode::OK,
Some(json!({ "schema_name": body.schema_name, "index_name": index_name })),
);
app_state.metrics_state.record_management_mutation(
"drop_index",
"success",
started.elapsed().as_secs_f64(),
);
api_success(
"Dropped index",
json!({ "index_name": index_name, "statements": statements }),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(management_capabilities_route)
.service(management_create_table)
.service(management_edit_table)
.service(management_drop_table)
.service(management_drop_column)
.service(management_create_index)
.service(management_drop_index);
}