athena_rs 3.4.7

Database driver
Documentation
//! Delete gateway helpers that remove records via Supabase and optionally publish audit events.
//!
//! The `/gateway/delete` route validates headers, routes to the configured Supabase client, and
//! ensures downstream services receive the generated audit events.
use actix_web::{
    HttpRequest, HttpResponse, delete,
    http::StatusCode,
    web::{Data, Json},
};
use serde_json::json;
use std::time::Instant;
use supabase_rs::SupabaseClient;

use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{authorize_gateway_request, delete_right_for_resource};
use crate::api::gateway::contracts::{
    GATEWAY_DEFERRED_KIND_DELETE, GatewayDeferredRequest, GatewayDeleteRequest,
    GatewayOperationKind,
};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::{
    gateway_bad_request, gateway_error_with_details, gateway_service_unavailable,
};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_company_id::get_x_company_id;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_publish_event::get_x_publish_event;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::api_accepted;
use crate::data::events::post_event;
use crate::drivers::postgresql::column_resolver::resolve_information_schema_targets;
use crate::drivers::postgresql::sqlx_driver::delete_rows;
use crate::drivers::supabase::{client_router, client_router_health_aware};
use crate::parser::query_builder::Condition;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

#[delete("/gateway/delete")]
/// Deletes a single record from the requested table after validating Athena headers.
///
/// # Parameters
/// - `req`: Headers drive which client (Postgres/Supabase/custom) to use and supply user/organization context.
/// - `body`: JSON body describing `table_name` and `resource_id`.
///
/// # Headers
/// - `X-Athena-Client` selects `custom_supabase` or the default Supabase client.
/// - `X-Company-Id` and `X-Organization-Id` must be present or resolved via `X-Athena-Key`; `X-User-Id` is optional.
/// - Optional `x-supabase-url`/`x-supabase-key` override the default Supabase endpoint.
///
/// # Returns
/// - `200 OK` with success metadata when the delete succeeds.
/// - `500 Internal Server Error` on Supabase failures.
/// - `400 Bad Request` when required headers or body fields are missing.
///
/// # Example
/// ```http
/// DELETE /gateway/delete
/// X-Athena-Client: custom_supabase
/// X-Company-Id: comp-1
/// X-Organization-Id: org-1
/// Content-Type: application/json
///
/// {
///   "table_name": "users",
///   "resource_id": "user-123"
/// }
/// ```
pub async fn delete_data(
    req: HttpRequest,
    body: Json<GatewayDeleteRequest>,
    app_state: Data<AppState>,
) -> HttpResponse {
    handle_delete_data(req, body.0, app_state).await
}

fn normalize_table_name_for_resource_id_lookup(
    table_name: &str,
    allow_schema_names_prefixed_as_table_name: bool,
) -> String {
    let trimmed = table_name.trim();
    if trimmed.is_empty() {
        return String::new();
    }
    if !allow_schema_names_prefixed_as_table_name {
        return trimmed.to_string();
    }
    match resolve_information_schema_targets(trimmed, true) {
        Ok((schema, table)) if schema.eq_ignore_ascii_case("public") => table,
        _ => trimmed.to_string(),
    }
}

pub(crate) async fn handle_delete_data(
    req: HttpRequest,
    body: GatewayDeleteRequest,
    app_state: Data<AppState>,
) -> HttpResponse {
    let operation_start: Instant = Instant::now();
    let client_name: String = x_athena_client(&req.clone());
    let auth = authorize_gateway_request(
        &req,
        app_state.get_ref(),
        Some(&client_name),
        vec![delete_right_for_resource(Some(&body.table_name))],
    )
    .await;
    let logged_request: LoggedRequest = log_request(
        req.clone(),
        Some(app_state.get_ref()),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(resp) = auth.response {
        return resp;
    }
    if auth.force_deferred_queue {
        if body.table_name.trim().is_empty() || body.resource_id.trim().is_empty() {
            return gateway_bad_request(
                GatewayOperationKind::Delete,
                "Invalid delete payload",
                "table_name and resource_id are required",
            );
        }
        let request_bytes: Option<u64> = req
            .headers()
            .get(actix_web::http::header::CONTENT_LENGTH)
            .and_then(|value| value.to_str().ok())
            .and_then(|value| value.parse::<u64>().ok());
        let request_body = serde_json::to_value(&body).unwrap_or_else(|_| {
            json!({
                "table_name": body.table_name.clone(),
                "resource_id": body.resource_id.clone()
            })
        });
        let deferred_request = GatewayDeferredRequest::for_request_body(
            GATEWAY_DEFERRED_KIND_DELETE,
            auth.request_id.clone(),
            client_name.clone(),
            request_body,
        )
        .with_reason(auth.force_deferred_reason.clone())
        .with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
        if let Err(err) = enqueue_gateway_deferred_request(
            app_state.get_ref(),
            "DELETE",
            req.path(),
            request_bytes,
            &deferred_request,
        )
        .await
        {
            return gateway_service_unavailable(
                GatewayOperationKind::Delete,
                "Deferred queue unavailable",
                format!("Failed to queue deferred delete request: {err}"),
            );
        }
        return api_accepted(
            "Delete request queued for deferred execution (auth fallback mode)",
            json!({
                "request_id": auth.request_id,
                "status": "queued",
                "route": req.path(),
            }),
        );
    }

    #[allow(unused)]
    let _user_id: Option<String> = get_x_user_id(&req);

    let _company_id: String = match get_x_company_id(&req) {
        Some(id) => id,
        None => {
            return gateway_bad_request(
                GatewayOperationKind::Delete,
                "Missing required header",
                "X-Company-Id header not found in the request",
            );
        }
    };

    let _organization_id: String = match get_x_organization_id(&req) {
        Some(id) => id,
        None => {
            return gateway_bad_request(
                GatewayOperationKind::Delete,
                "Missing required header",
                "X-Organization-Id header not found in the request",
            );
        }
    };

    let table_name: String = body.table_name.trim().to_string();
    let resource_id: String = body.resource_id.clone();
    if table_name.is_empty() {
        return gateway_bad_request(
            GatewayOperationKind::Delete,
            "Invalid delete payload",
            "table_name is required",
        );
    }
    if resource_id.trim().is_empty() {
        return gateway_bad_request(
            GatewayOperationKind::Delete,
            "Invalid delete payload",
            "resource_id is required",
        );
    }

    let result: Result<(), String> = if let Some(pool) =
        app_state.pg_registry.get_pool(&client_name)
    {
        let id_lookup_table: String = normalize_table_name_for_resource_id_lookup(
            &table_name,
            app_state.gateway_allow_schema_names_prefixed_as_table_name,
        );
        let resource_id_key: String = get_resource_id_key(&id_lookup_table).await;
        let conditions = vec![
            Condition::eq(resource_id_key, resource_id.clone())
                .with_uuid_value_text_cast(app_state.gateway_auto_cast_uuid_filter_values_to_text),
        ];
        delete_rows(&pool, &table_name, &conditions)
            .await
            .map(|_| ())
            .map_err(|e| e.to_string())
    } else if let Ok(health_client) = client_router_health_aware(&client_name) {
        health_client
            .delete(&table_name, &resource_id)
            .await
            .map_err(|e| {
                if e.downcast_ref::<crate::drivers::scylla::health::HostOffline>()
                    .is_some()
                {
                    format!(
                        "Backend {} temporarily unavailable (circuit breaker)",
                        client_name
                    )
                } else {
                    e.to_string()
                }
            })
    } else {
        let client: SupabaseClient = match client_router(&client_name).await {
            Ok(c) => c,
            Err(err) => {
                return gateway_bad_request(
                    GatewayOperationKind::Delete,
                    "Failed to resolve Supabase client",
                    err,
                );
            }
        };
        client.delete(&table_name, &resource_id).await
    };

    match result {
        Ok(_) => {
            let _ =
                invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;

            if get_x_publish_event(&req) {
                let event: serde_json::Value = json!({
                    "event": "DELETE",
                    "resource": table_name.clone(),
                    "resource_id": resource_id.clone(),
                });
                post_event(_company_id, event).await;
            }

            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "delete",
                Some(&table_name),
                operation_start.elapsed().as_millis(),
                StatusCode::OK,
                Some(json!({
                    "resource_id": resource_id.clone(),
                })),
            );

            let delete_response: serde_json::Value = json!({
                "status": "success",
                "success": true,
                "message": "Data deleted successfully",
                "table_name": table_name,
                "resource_id": resource_id,
                "client": client_name,
            });
            let delete_payload: serde_json::Value =
                serde_json::to_value(&body).unwrap_or_else(|_| {
                    json!({
                        "table_name": body.table_name,
                        "resource_id": body.resource_id,
                    })
                });
            crate::webhooks::spawn_gateway_webhook_dispatch(
                app_state.clone(),
                crate::webhooks::gateway_webhook_trigger_from_http(
                    &req,
                    &client_name,
                    crate::webhooks::ROUTE_GATEWAY_DELETE,
                    Some(table_name.clone()),
                    Some(logged_request.request_id.clone()),
                    Some(delete_payload),
                    Some(delete_response.clone()),
                ),
            );

            HttpResponse::Ok().json(delete_response)
        }
        Err(err) => {
            let is_unavailable = err.contains("circuit breaker");
            if is_unavailable {
                app_state
                    .metrics_state
                    .record_gateway_backend_unavailable("delete", &client_name);
            }
            let status = if is_unavailable {
                StatusCode::SERVICE_UNAVAILABLE
            } else {
                StatusCode::INTERNAL_SERVER_ERROR
            };
            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "delete",
                Some(&table_name),
                operation_start.elapsed().as_millis(),
                status,
                Some(json!({
                    "error": err,
                })),
            );

            gateway_error_with_details(
                status,
                GatewayOperationKind::Delete,
                "Failed to delete data",
                err,
                Some(json!({
                    "table_name": table_name,
                    "resource_id": resource_id,
                })),
            )
        }
    }
}

#[cfg(test)]
mod tests {
    use super::normalize_table_name_for_resource_id_lookup;

    #[test]
    fn strips_public_prefix_when_enabled() {
        let resolved = normalize_table_name_for_resource_id_lookup(
            "public.forms_blocks_summary_sections",
            true,
        );
        assert_eq!(resolved, "forms_blocks_summary_sections");
    }

    #[test]
    fn keeps_qualified_name_when_disabled() {
        let resolved = normalize_table_name_for_resource_id_lookup(
            "public.forms_blocks_summary_sections",
            false,
        );
        assert_eq!(resolved, "public.forms_blocks_summary_sections");
    }
}