athena_rs 3.9.0

Hyper performant polyglot Database driver
Documentation
//! Delete gateway helpers that remove records via Supabase and optionally publish audit events.
//!
//! The `/gateway/delete` route validates headers, routes to the configured Supabase client, and
//! ensures downstream services receive the generated audit events.
use actix_web::{
    HttpRequest, HttpResponse, delete,
    http::StatusCode,
    web::{Data, Json},
};
use serde_json::json;
use std::time::Instant;
use supabase_rs::SupabaseClient;

use crate::AppState;
use crate::api::gateway::auth::delete_right_for_resource;
use crate::api::gateway::contracts::{
    GATEWAY_DEFERRED_KIND_DELETE, GatewayDeferredRequest, GatewayDeleteRequest,
    GatewayOperationKind,
};
use crate::api::gateway::lifecycle::{
    MutationPublishEvent, MutationSuccessEffects, authorize_and_log_gateway_request,
    enqueue_gateway_deferred_response, finalize_gateway_mutation_success,
    log_gateway_operation_result,
};
use crate::api::gateway::response::{gateway_bad_request, gateway_error_with_details};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_company_id::get_x_company_id;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_publish_event::get_x_publish_event;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::drivers::postgresql::column_resolver::resolve_information_schema_targets;
use crate::drivers::postgresql::sqlx_driver::delete_rows;
use crate::drivers::supabase::{client_router, client_router_health_aware};
use crate::parser::query_builder::Condition;

#[delete("/gateway/delete")]
/// Deletes a single record from the requested table after validating Athena headers.
///
/// # Parameters
/// - `req`: Headers drive which client (Postgres/Supabase/custom) to use and supply user/organization context.
/// - `body`: JSON body describing `table_name` and `resource_id`.
///
/// # Headers
/// - `X-Athena-Client` selects `custom_supabase` or the default Supabase client.
/// - `X-Company-Id` and `X-Organization-Id` must be present or resolved via `X-Athena-Key`; `X-User-Id` is optional.
/// - Optional `x-supabase-url`/`x-supabase-key` override the default Supabase endpoint.
///
/// # Returns
/// - `200 OK` with success metadata when the delete succeeds.
/// - `500 Internal Server Error` on Supabase failures.
/// - `400 Bad Request` when required headers or body fields are missing.
///
/// # Example
/// ```http
/// DELETE /gateway/delete
/// X-Athena-Client: custom_supabase
/// X-Company-Id: comp-1
/// X-Organization-Id: org-1
/// Content-Type: application/json
///
/// {
///   "table_name": "users",
///   "resource_id": "user-123"
/// }
/// ```
pub async fn delete_data(
    req: HttpRequest,
    body: Json<GatewayDeleteRequest>,
    app_state: Data<AppState>,
) -> HttpResponse {
    handle_delete_data(req, body.0, app_state).await
}

fn normalize_table_name_for_resource_id_lookup(
    table_name: &str,
    allow_schema_names_prefixed_as_table_name: bool,
) -> String {
    let trimmed = table_name.trim();
    if trimmed.is_empty() {
        return String::new();
    }
    if !allow_schema_names_prefixed_as_table_name {
        return trimmed.to_string();
    }
    match resolve_information_schema_targets(trimmed, true) {
        Ok((schema, table)) if schema.eq_ignore_ascii_case("public") => table,
        _ => trimmed.to_string(),
    }
}

pub(crate) async fn handle_delete_data(
    req: HttpRequest,
    body: GatewayDeleteRequest,
    app_state: Data<AppState>,
) -> HttpResponse {
    let operation_start: Instant = Instant::now();
    let client_name: String = x_athena_client(&req.clone());
    let auth_context = match authorize_and_log_gateway_request(
        &req,
        app_state.get_ref(),
        Some(&client_name),
        vec![delete_right_for_resource(Some(&body.table_name))],
    )
    .await
    {
        Ok(context) => context,
        Err(response) => return response,
    };
    let auth = auth_context.auth;
    let logged_request = auth_context.logged_request;
    if auth.force_deferred_queue {
        if body.table_name.trim().is_empty() || body.resource_id.trim().is_empty() {
            return gateway_bad_request(
                GatewayOperationKind::Delete,
                "Invalid delete payload",
                "table_name and resource_id are required",
            );
        }
        let request_body = serde_json::to_value(&body).unwrap_or_else(|_| {
            json!({
                "table_name": body.table_name.clone(),
                "resource_id": body.resource_id.clone()
            })
        });
        let deferred_request = GatewayDeferredRequest::for_request_body(
            GATEWAY_DEFERRED_KIND_DELETE,
            auth.request_id.clone(),
            client_name.clone(),
            request_body,
        )
        .with_reason(auth.force_deferred_reason.clone())
        .with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
        return enqueue_gateway_deferred_response(
            &req,
            app_state.get_ref(),
            &auth,
            &client_name,
            "DELETE",
            &deferred_request,
            GatewayOperationKind::Delete,
            "Delete request queued for deferred execution (auth fallback mode)",
        )
        .await;
    }

    #[allow(unused)]
    let _user_id: Option<String> = get_x_user_id(&req);

    let _company_id: String = match get_x_company_id(&req) {
        Some(id) => id,
        None => {
            return gateway_bad_request(
                GatewayOperationKind::Delete,
                "Missing required header",
                "X-Company-Id header not found in the request",
            );
        }
    };

    let _organization_id: String = match get_x_organization_id(&req) {
        Some(id) => id,
        None => {
            return gateway_bad_request(
                GatewayOperationKind::Delete,
                "Missing required header",
                "X-Organization-Id header not found in the request",
            );
        }
    };

    let table_name: String = body.table_name.trim().to_string();
    let resource_id: String = body.resource_id.clone();
    if table_name.is_empty() {
        return gateway_bad_request(
            GatewayOperationKind::Delete,
            "Invalid delete payload",
            "table_name is required",
        );
    }
    if resource_id.trim().is_empty() {
        return gateway_bad_request(
            GatewayOperationKind::Delete,
            "Invalid delete payload",
            "resource_id is required",
        );
    }

    let result: Result<(), String> = if let Some(pool) =
        app_state.pg_registry.get_pool(&client_name)
    {
        let id_lookup_table: String = normalize_table_name_for_resource_id_lookup(
            &table_name,
            app_state.gateway_allow_schema_names_prefixed_as_table_name,
        );
        let resource_id_key: String = get_resource_id_key(&id_lookup_table).await;
        let conditions = vec![
            Condition::eq(resource_id_key.clone(), resource_id.clone())
                .with_uuid_value_text_cast(app_state.gateway_auto_cast_uuid_filter_values_to_text),
        ];

        // ------------------------------------------------------------------
        // Deferred write mode: buffer the delete and return optimistic OK.
        // ------------------------------------------------------------------
        if app_state.deferred_write_config.enabled {
            if let Some(ref buf) = app_state.write_buffer {
                let stored_conditions = vec![crate::deferred_write::StoredCondition {
                    column: resource_id_key,
                    value: serde_json::Value::String(resource_id.clone()),
                }];
                let entry = crate::deferred_write::DeferredEntry::new_delete(
                    client_name.clone(),
                    table_name.clone(),
                    stored_conditions,
                );
                if let Some(ref wal) = app_state.wal_manager {
                    if let Err(e) = wal.append(&entry).await {
                        tracing::warn!(
                            error = %e,
                            table = %table_name,
                            "deferred_write: WAL append failed for delete"
                        );
                    }
                }
                buf.push(entry).await;
                return HttpResponse::Ok().json(json!({
                    "status": "success",
                    "success": true,
                    "message": "Delete queued (deferred write mode)",
                    "table_name": table_name,
                    "resource_id": resource_id,
                    "client": client_name,
                }));
            }
        }
        // ------------------------------------------------------------------

        delete_rows(&pool, &table_name, &conditions)
            .await
            .map(|_| ())
            .map_err(|e| e.to_string())
    } else if let Ok(health_client) = client_router_health_aware(&client_name) {
        health_client
            .delete(&table_name, &resource_id)
            .await
            .map_err(|e| {
                if e.downcast_ref::<crate::drivers::scylla::health::HostOffline>()
                    .is_some()
                {
                    format!(
                        "Backend {} temporarily unavailable (circuit breaker)",
                        client_name
                    )
                } else {
                    e.to_string()
                }
            })
    } else {
        let client: SupabaseClient = match client_router(&client_name).await {
            Ok(c) => c,
            Err(err) => {
                return gateway_bad_request(
                    GatewayOperationKind::Delete,
                    "Failed to resolve Supabase client",
                    err,
                );
            }
        };
        client.delete(&table_name, &resource_id).await
    };

    match result {
        Ok(_) => {
            let publish_event = if get_x_publish_event(&req) {
                Some(MutationPublishEvent {
                    company_id: _company_id,
                    payload: json!({
                        "event": "DELETE",
                        "resource": table_name.clone(),
                        "resource_id": resource_id.clone(),
                    }),
                })
            } else {
                None
            };

            let delete_response: serde_json::Value = json!({
                "status": "success",
                "success": true,
                "message": "Data deleted successfully",
                "table_name": table_name,
                "resource_id": resource_id,
                "client": client_name,
            });
            let delete_payload: serde_json::Value =
                serde_json::to_value(&body).unwrap_or_else(|_| {
                    json!({
                        "table_name": body.table_name,
                        "resource_id": body.resource_id,
                    })
                });
            let response_payload = finalize_gateway_mutation_success(
                &req,
                app_state.clone(),
                &client_name,
                &logged_request,
                operation_start,
                MutationSuccessEffects {
                    operation: GatewayOperationKind::Delete,
                    log_action: "delete",
                    route_key: crate::webhooks::ROUTE_GATEWAY_DELETE,
                    table_name,
                    log_details: Some(json!({
                        "resource_id": resource_id.clone(),
                    })),
                    request_payload: Some(delete_payload),
                    response_payload: delete_response,
                    invalidate_cache: true,
                    publish_event,
                },
            )
            .await;

            HttpResponse::Ok().json(response_payload)
        }
        Err(err) => {
            let is_unavailable = err.contains("circuit breaker");
            if is_unavailable {
                app_state
                    .metrics_state
                    .record_gateway_backend_unavailable("delete", &client_name);
            }
            let status = if is_unavailable {
                StatusCode::SERVICE_UNAVAILABLE
            } else {
                StatusCode::INTERNAL_SERVER_ERROR
            };
            log_gateway_operation_result(
                Some(app_state.get_ref()),
                &logged_request,
                "delete",
                Some(&table_name),
                operation_start,
                status,
                Some(json!({
                    "error": err,
                })),
            );

            gateway_error_with_details(
                status,
                GatewayOperationKind::Delete,
                "Failed to delete data",
                err,
                Some(json!({
                    "table_name": table_name,
                    "resource_id": resource_id,
                })),
            )
        }
    }
}

#[cfg(test)]
mod tests {
    use super::normalize_table_name_for_resource_id_lookup;

    #[test]
    fn strips_public_prefix_when_enabled() {
        let resolved = normalize_table_name_for_resource_id_lookup(
            "public.forms_blocks_summary_sections",
            true,
        );
        assert_eq!(resolved, "forms_blocks_summary_sections");
    }

    #[test]
    fn keeps_qualified_name_when_disabled() {
        let resolved = normalize_table_name_for_resource_id_lookup(
            "public.forms_blocks_summary_sections",
            false,
        );
        assert_eq!(resolved, "public.forms_blocks_summary_sections");
    }
}