use actix_web::{
HttpRequest, HttpResponse, delete,
http::StatusCode,
web::{Data, Json},
};
use serde_json::json;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{authorize_gateway_request, delete_right_for_resource};
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_DELETE, GatewayDeferredRequest, GatewayDeleteRequest,
GatewayOperationKind,
};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::{
gateway_bad_request, gateway_error_with_details, gateway_service_unavailable,
};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_company_id::get_x_company_id;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_publish_event::get_x_publish_event;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::api_accepted;
use crate::data::events::post_event;
use crate::drivers::postgresql::column_resolver::resolve_information_schema_targets;
use crate::drivers::postgresql::sqlx_driver::delete_rows;
use crate::drivers::supabase::{client_router, client_router_health_aware};
use crate::parser::query_builder::Condition;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
#[delete("/gateway/delete")]
pub async fn delete_data(
req: HttpRequest,
body: Json<GatewayDeleteRequest>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_delete_data(req, body.0, app_state).await
}
fn normalize_table_name_for_resource_id_lookup(
table_name: &str,
allow_schema_names_prefixed_as_table_name: bool,
) -> String {
let trimmed = table_name.trim();
if trimmed.is_empty() {
return String::new();
}
if !allow_schema_names_prefixed_as_table_name {
return trimmed.to_string();
}
match resolve_information_schema_targets(trimmed, true) {
Ok((schema, table)) if schema.eq_ignore_ascii_case("public") => table,
_ => trimmed.to_string(),
}
}
pub(crate) async fn handle_delete_data(
req: HttpRequest,
body: GatewayDeleteRequest,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![delete_right_for_resource(Some(&body.table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
if auth.force_deferred_queue {
if body.table_name.trim().is_empty() || body.resource_id.trim().is_empty() {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
"table_name and resource_id are required",
);
}
let request_bytes: Option<u64> = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let request_body = serde_json::to_value(&body).unwrap_or_else(|_| {
json!({
"table_name": body.table_name.clone(),
"resource_id": body.resource_id.clone()
})
});
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_DELETE,
auth.request_id.clone(),
client_name.clone(),
request_body,
)
.with_reason(auth.force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
if let Err(err) = enqueue_gateway_deferred_request(
app_state.get_ref(),
"DELETE",
req.path(),
request_bytes,
&deferred_request,
)
.await
{
return gateway_service_unavailable(
GatewayOperationKind::Delete,
"Deferred queue unavailable",
format!("Failed to queue deferred delete request: {err}"),
);
}
return api_accepted(
"Delete request queued for deferred execution (auth fallback mode)",
json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
}),
);
}
#[allow(unused)]
let _user_id: Option<String> = get_x_user_id(&req);
let _company_id: String = match get_x_company_id(&req) {
Some(id) => id,
None => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Missing required header",
"X-Company-Id header not found in the request",
);
}
};
let _organization_id: String = match get_x_organization_id(&req) {
Some(id) => id,
None => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Missing required header",
"X-Organization-Id header not found in the request",
);
}
};
let table_name: String = body.table_name.trim().to_string();
let resource_id: String = body.resource_id.clone();
if table_name.is_empty() {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
"table_name is required",
);
}
if resource_id.trim().is_empty() {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Invalid delete payload",
"resource_id is required",
);
}
let result: Result<(), String> = if let Some(pool) =
app_state.pg_registry.get_pool(&client_name)
{
let id_lookup_table: String = normalize_table_name_for_resource_id_lookup(
&table_name,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
);
let resource_id_key: String = get_resource_id_key(&id_lookup_table).await;
let conditions = vec![
Condition::eq(resource_id_key, resource_id.clone())
.with_uuid_value_text_cast(app_state.gateway_auto_cast_uuid_filter_values_to_text),
];
delete_rows(&pool, &table_name, &conditions)
.await
.map(|_| ())
.map_err(|e| e.to_string())
} else if let Ok(health_client) = client_router_health_aware(&client_name) {
health_client
.delete(&table_name, &resource_id)
.await
.map_err(|e| {
if e.downcast_ref::<crate::drivers::scylla::health::HostOffline>()
.is_some()
{
format!(
"Backend {} temporarily unavailable (circuit breaker)",
client_name
)
} else {
e.to_string()
}
})
} else {
let client: SupabaseClient = match client_router(&client_name).await {
Ok(c) => c,
Err(err) => {
return gateway_bad_request(
GatewayOperationKind::Delete,
"Failed to resolve Supabase client",
err,
);
}
};
client.delete(&table_name, &resource_id).await
};
match result {
Ok(_) => {
let _ =
invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
if get_x_publish_event(&req) {
let event: serde_json::Value = json!({
"event": "DELETE",
"resource": table_name.clone(),
"resource_id": resource_id.clone(),
});
post_event(_company_id, event).await;
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"delete",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"resource_id": resource_id.clone(),
})),
);
let delete_response: serde_json::Value = json!({
"status": "success",
"success": true,
"message": "Data deleted successfully",
"table_name": table_name,
"resource_id": resource_id,
"client": client_name,
});
let delete_payload: serde_json::Value =
serde_json::to_value(&body).unwrap_or_else(|_| {
json!({
"table_name": body.table_name,
"resource_id": body.resource_id,
})
});
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_GATEWAY_DELETE,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
Some(delete_payload),
Some(delete_response.clone()),
),
);
HttpResponse::Ok().json(delete_response)
}
Err(err) => {
let is_unavailable = err.contains("circuit breaker");
if is_unavailable {
app_state
.metrics_state
.record_gateway_backend_unavailable("delete", &client_name);
}
let status = if is_unavailable {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"delete",
Some(&table_name),
operation_start.elapsed().as_millis(),
status,
Some(json!({
"error": err,
})),
);
gateway_error_with_details(
status,
GatewayOperationKind::Delete,
"Failed to delete data",
err,
Some(json!({
"table_name": table_name,
"resource_id": resource_id,
})),
)
}
}
}
#[cfg(test)]
mod tests {
use super::normalize_table_name_for_resource_id_lookup;
#[test]
fn strips_public_prefix_when_enabled() {
let resolved = normalize_table_name_for_resource_id_lookup(
"public.forms_blocks_summary_sections",
true,
);
assert_eq!(resolved, "forms_blocks_summary_sections");
}
#[test]
fn keeps_qualified_name_when_disabled() {
let resolved = normalize_table_name_for_resource_id_lookup(
"public.forms_blocks_summary_sections",
false,
);
assert_eq!(resolved, "public.forms_blocks_summary_sections");
}
}