use actix_web::{
HttpRequest, HttpResponse, delete,
http::StatusCode,
web::{Data, Json},
};
use serde_json::json;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use crate::AppState;
use crate::api::gateway::auth::delete_right_for_resource;
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_DELETE, GatewayDeferredRequest, GatewayDeleteRequest,
GatewayOperationKind,
};
use crate::api::gateway::lifecycle::{
MutationPublishEvent, MutationSuccessEffects, authorize_and_log_gateway_request,
enqueue_gateway_deferred_response, finalize_gateway_mutation_success,
log_gateway_operation_result,
};
use crate::api::gateway::response::{gateway_bad_request, gateway_error_with_details};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_publish_event::get_x_publish_event;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::drivers::postgresql::column_resolver::resolve_information_schema_targets;
use crate::drivers::postgresql::sqlx_driver::delete_rows;
use crate::drivers::supabase::{client_router, client_router_health_aware};
use crate::parser::query_builder::{Condition, sanitize_identifier};
use crate::utils::format::normalize_column_name;
#[delete("/gateway/delete")]
pub async fn delete_data(
req: HttpRequest,
body: Json<GatewayDeleteRequest>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_delete_data(req, body.0, app_state).await
}
fn normalize_table_name_for_resource_id_lookup(
table_name: &str,
allow_schema_names_prefixed_as_table_name: bool,
) -> String {
let trimmed = table_name.trim();
if trimmed.is_empty() {
return String::new();
}
if !allow_schema_names_prefixed_as_table_name {
return trimmed.to_string();
}
match resolve_information_schema_targets(trimmed, true) {
Ok((schema, table)) if schema.eq_ignore_ascii_case("public") => table,
_ => trimmed.to_string(),
}
}
pub(crate) async fn resolve_delete_resource_id_column(
table_name: &str,
requested_column_name: Option<&str>,
allow_schema_names_prefixed_as_table_name: bool,
) -> Result<String, String> {
if let Some(column_name) = requested_column_name {
let trimmed = column_name.trim();
if trimmed.is_empty() {
return Err("column_name must not be empty when provided".to_string());
}
let normalized = normalize_column_name(trimmed, true);
if sanitize_identifier(&normalized).is_none() {
return Err(format!("invalid resource_id column_name `{trimmed}`"));
}
return Ok(normalized);
}
let id_lookup_table: String = normalize_table_name_for_resource_id_lookup(
table_name,
allow_schema_names_prefixed_as_table_name,
);
Ok(get_resource_id_key(&id_lookup_table).await)
}
pub(crate) async fn handle_delete_data(
req: HttpRequest,
body: GatewayDeleteRequest,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
let auth_context = match authorize_and_log_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![delete_right_for_resource(Some(&body.table_name))],
)
.await
{
Ok(context) => context,
Err(response) => return response,
};
let auth = auth_context.auth;
let logged_request = auth_context.logged_request;
if auth.force_deferred_queue {
if body.table_name.trim().is_empty() || body.resource_id.trim().is_empty() {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
"table_name and resource_id are required",
);
}
let request_body = serde_json::to_value(&body).unwrap_or_else(|_| {
json!({
"table_name": body.table_name.clone(),
"resource_id": body.resource_id.clone()
})
});
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_DELETE,
auth.request_id.clone(),
client_name.clone(),
request_body,
)
.with_reason(auth.force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
return enqueue_gateway_deferred_response(
&req,
app_state.get_ref(),
&auth,
&client_name,
"DELETE",
&deferred_request,
GatewayOperationKind::Delete,
"Delete request queued for deferred execution (auth fallback mode)",
)
.await;
}
#[allow(unused)]
let _user_id: Option<String> = get_x_user_id(&req);
let organization_id: String = match get_x_organization_id(&req) {
Some(id) => id,
None => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Missing required header",
"X-Organization-Id header not found in the request",
);
}
};
let table_name: String = body.table_name.trim().to_string();
let resource_id: String = body.resource_id.clone();
if table_name.is_empty() {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
"table_name is required",
);
}
if resource_id.trim().is_empty() {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
"resource_id is required",
);
}
let explicit_resource_id_key: Option<String> = if body.column_name.is_some() {
match resolve_delete_resource_id_column(
&table_name,
body.column_name.as_deref(),
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
{
Ok(column) => Some(column),
Err(err) => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
err,
);
}
}
} else {
None
};
let result: Result<(), String> = if let Some(pool) =
app_state.pg_registry.get_pool(&client_name)
{
let resource_id_key: String = if let Some(column) = explicit_resource_id_key.clone() {
column
} else {
match resolve_delete_resource_id_column(
&table_name,
None,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
{
Ok(column) => column,
Err(err) => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
err,
);
}
}
};
let conditions = vec![
Condition::eq(resource_id_key.clone(), resource_id.clone())
.with_uuid_value_text_cast(app_state.gateway_auto_cast_uuid_filter_values_to_text)
.with_pg_cast(Some("text")),
];
if app_state.deferred_write_config.enabled {
if let Some(ref buf) = app_state.write_buffer {
let stored_conditions = vec![crate::deferred_write::StoredCondition {
column: resource_id_key.clone(),
value: serde_json::Value::String(resource_id.clone()),
}];
let entry = crate::deferred_write::DeferredEntry::new_delete(
client_name.clone(),
table_name.clone(),
stored_conditions,
);
if let Some(ref wal) = app_state.wal_manager {
if let Err(e) = wal.append(&entry).await {
tracing::error!(
error = %e,
table = %table_name,
"deferred_write: WAL append failed for delete; falling back to immediate DB write"
);
drop(entry);
} else {
buf.push(entry).await;
return HttpResponse::Ok().json(json!({
"status": "success",
"success": true,
"message": "Delete queued (deferred write mode)",
"table_name": table_name,
"column_name": resource_id_key,
"resource_id": resource_id,
"client": client_name,
}));
}
} else {
buf.push(entry).await;
return HttpResponse::Ok().json(json!({
"status": "success",
"success": true,
"message": "Delete queued (deferred write mode)",
"table_name": table_name,
"column_name": resource_id_key,
"resource_id": resource_id,
"client": client_name,
}));
}
}
}
delete_rows(&pool, &table_name, &conditions)
.await
.map(|_| ())
.map_err(|e| e.to_string())
} else if explicit_resource_id_key.is_some() {
Err("column_name delete alias requires a Postgres-backed Athena client".to_string())
} else if let Ok(health_client) = client_router_health_aware(&client_name) {
health_client
.delete(&table_name, &resource_id)
.await
.map_err(|e| {
if e.downcast_ref::<crate::drivers::scylla::health::HostOffline>()
.is_some()
{
format!(
"Backend {} temporarily unavailable (circuit breaker)",
client_name
)
} else {
e.to_string()
}
})
} else {
let client: SupabaseClient = match client_router(&client_name).await {
Ok(c) => c,
Err(err) => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Failed to resolve Supabase client",
err,
);
}
};
client.delete(&table_name, &resource_id).await
};
match result {
Ok(_) => {
let publish_event = if get_x_publish_event(&req) {
Some(MutationPublishEvent {
company_id: organization_id.clone(),
payload: json!({
"event": "DELETE",
"resource": table_name.clone(),
"resource_id": resource_id.clone(),
"column_name": body.column_name.clone(),
}),
})
} else {
None
};
let delete_response: serde_json::Value = json!({
"status": "success",
"success": true,
"message": "Data deleted successfully",
"table_name": table_name,
"column_name": body.column_name.clone(),
"resource_id": resource_id,
"client": client_name,
});
let delete_payload: serde_json::Value =
serde_json::to_value(&body).unwrap_or_else(|_| {
json!({
"table_name": body.table_name,
"resource_id": body.resource_id,
})
});
let response_payload = finalize_gateway_mutation_success(
&req,
app_state.clone(),
&client_name,
&logged_request,
operation_start,
MutationSuccessEffects {
operation: GatewayOperationKind::Delete,
log_action: "delete",
route_key: crate::webhooks::ROUTE_GATEWAY_DELETE,
table_name,
log_details: Some(json!({
"resource_id": resource_id.clone(),
"column_name": body.column_name.clone(),
})),
request_payload: Some(delete_payload),
response_payload: delete_response,
invalidate_cache: true,
publish_event,
},
)
.await;
HttpResponse::Ok().json(response_payload)
}
Err(err) => {
let is_unavailable = err.contains("circuit breaker");
if is_unavailable {
app_state
.metrics_state
.record_gateway_backend_unavailable("delete", &client_name);
}
let status = if is_unavailable {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"delete",
Some(&table_name),
operation_start,
status,
Some(json!({
"error": err,
})),
);
gateway_error_with_details(
status,
GatewayOperationKind::Delete,
"Failed to delete data",
err,
Some(json!({
"table_name": table_name,
"column_name": body.column_name,
"resource_id": resource_id,
})),
)
}
}
}
#[cfg(test)]
mod tests {
use super::{normalize_table_name_for_resource_id_lookup, resolve_delete_resource_id_column};
#[test]
fn strips_public_prefix_when_enabled() {
let resolved = normalize_table_name_for_resource_id_lookup(
"public.forms_blocks_summary_sections",
true,
);
assert_eq!(resolved, "forms_blocks_summary_sections");
}
#[test]
fn keeps_qualified_name_when_disabled() {
let resolved = normalize_table_name_for_resource_id_lookup(
"public.forms_blocks_summary_sections",
false,
);
assert_eq!(resolved, "public.forms_blocks_summary_sections");
}
#[tokio::test]
async fn explicit_delete_resource_id_column_is_used_as_alias() {
let resolved = resolve_delete_resource_id_column("public.any_table", Some("id"), true)
.await
.expect("explicit column should resolve");
assert_eq!(resolved, "id");
}
#[tokio::test]
async fn explicit_delete_resource_id_column_is_normalized() {
let resolved =
resolve_delete_resource_id_column("public.any_table", Some("resourceId"), true)
.await
.expect("explicit column should resolve");
assert_eq!(resolved, "resource_id");
}
#[tokio::test]
async fn explicit_delete_resource_id_column_is_validated() {
let err = resolve_delete_resource_id_column(
"public.any_table",
Some("id; DROP TABLE users"),
true,
)
.await
.expect_err("unsafe column should fail");
assert!(err.contains("invalid resource_id column_name"));
}
}