athena_rs 3.22.1

Hyper performant polyglot Database driver
Documentation
use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::warn;

use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{
    GatewayAuthOptions, GatewayAuthOutcome, authorize_gateway_request_with_options,
};
use crate::api::gateway::contracts::{GatewayDeferredRequest, GatewayOperationKind};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::{
    GATEWAY_ERROR_CODE_DEFERRED_QUEUE_UNAVAILABLE, gateway_service_unavailable_with_code,
};
use crate::api::response::api_accepted;
use crate::data::events::post_event;
use crate::data::outbox::{OutboxEventInsert, insert_outbox_event_tx};
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

pub(crate) struct AuthorizedGatewayRequest {
    pub(crate) auth: GatewayAuthOutcome,
    pub(crate) logged_request: LoggedRequest,
}

pub(crate) struct MutationPublishEvent {
    pub(crate) company_id: String,
    pub(crate) payload: Value,
}

pub(crate) struct MutationSuccessEffects {
    pub(crate) operation: GatewayOperationKind,
    pub(crate) log_action: &'static str,
    pub(crate) route_key: &'static str,
    pub(crate) table_name: String,
    pub(crate) log_details: Option<Value>,
    pub(crate) request_payload: Option<Value>,
    pub(crate) response_payload: Value,
    pub(crate) invalidate_cache: bool,
    pub(crate) publish_event: Option<MutationPublishEvent>,
}

pub(crate) async fn authorize_and_log_gateway_request(
    req: &HttpRequest,
    app_state: &AppState,
    client_name: Option<&str>,
    required_rights: Vec<String>,
) -> Result<AuthorizedGatewayRequest, HttpResponse> {
    authorize_and_log_gateway_request_with_options(
        req,
        app_state,
        client_name,
        required_rights,
        GatewayAuthOptions::default(),
    )
    .await
}

pub(crate) async fn authorize_and_log_gateway_request_with_options(
    req: &HttpRequest,
    app_state: &AppState,
    client_name: Option<&str>,
    required_rights: Vec<String>,
    options: GatewayAuthOptions,
) -> Result<AuthorizedGatewayRequest, HttpResponse> {
    let auth = authorize_gateway_request_with_options(
        req,
        app_state,
        client_name,
        required_rights,
        options,
    )
    .await;
    let logged_request = log_request(
        req.clone(),
        Some(app_state),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(response) = auth.response {
        return Err(response);
    }
    Ok(AuthorizedGatewayRequest {
        auth,
        logged_request,
    })
}

pub(crate) fn log_gateway_operation_result(
    app_state: Option<&AppState>,
    logged_request: &LoggedRequest,
    action: &str,
    table_name: Option<&str>,
    operation_start: Instant,
    status: StatusCode,
    details: Option<Value>,
) {
    log_operation_event(
        app_state,
        logged_request,
        action,
        table_name,
        operation_start.elapsed().as_millis(),
        status,
        details,
    );
}

pub(crate) async fn enqueue_gateway_deferred_response(
    req: &HttpRequest,
    app_state: &AppState,
    auth: &GatewayAuthOutcome,
    client_name: &str,
    method: &str,
    deferred_request: &GatewayDeferredRequest,
    operation: GatewayOperationKind,
    accepted_message: &str,
) -> HttpResponse {
    let request_bytes = req
        .headers()
        .get(actix_web::http::header::CONTENT_LENGTH)
        .and_then(|value| value.to_str().ok())
        .and_then(|value| value.parse::<u64>().ok());
    if let Err(err) = enqueue_gateway_deferred_request(
        app_state,
        method,
        req.path(),
        request_bytes,
        deferred_request,
    )
    .await
    {
        return gateway_service_unavailable_with_code(
            GATEWAY_ERROR_CODE_DEFERRED_QUEUE_UNAVAILABLE,
            operation,
            "Deferred queue unavailable",
            format!(
                "Failed to queue deferred {} request: {err}",
                operation.as_str()
            ),
        );
    }
    api_accepted(
        accepted_message,
        serde_json::json!({
            "request_id": auth.request_id,
            "status": "queued",
            "route": req.path(),
            "client": client_name,
        }),
    )
}

pub(crate) async fn finalize_gateway_mutation_success(
    req: &HttpRequest,
    app_state: Data<AppState>,
    client_name: &str,
    logged_request: &LoggedRequest,
    operation_start: Instant,
    effects: MutationSuccessEffects,
) -> Value {
    if effects.invalidate_cache {
        let _ =
            invalidate_scoped_gateway_cache(app_state.clone(), client_name, &effects.table_name)
                .await;
    }

    if let Some(ref event) = effects.publish_event {
        post_event(event.company_id.clone(), event.payload.clone()).await;
    }

    log_gateway_operation_result(
        Some(app_state.get_ref()),
        logged_request,
        effects.log_action,
        Some(&effects.table_name),
        operation_start,
        StatusCode::OK,
        effects.log_details,
    );

    let trigger = crate::webhooks::gateway_webhook_trigger_from_http(
        req,
        client_name,
        effects.route_key,
        Some(effects.table_name.clone()),
        Some(logged_request.request_id.clone()),
        effects.request_payload.clone(),
        Some(effects.response_payload.clone()),
    );

    // Shadow outbox write: record this side-effect intent in the logging DB so
    // the relay worker can retry on crash/timeout.  Non-fatal — if the logging
    // pool is unavailable the direct dispatch (below) still fires.
    write_mutation_outbox_shadow(
        &app_state,
        client_name,
        &effects.table_name,
        effects.operation,
        logged_request,
        &effects.publish_event,
        &trigger,
        &effects.response_payload,
    )
    .await;

    crate::webhooks::spawn_gateway_webhook_dispatch(app_state, trigger);

    effects.response_payload
}

/// Write a best-effort outbox shadow event to the logging DB.
///
/// Inserts both the CDC mutation event (when applicable) and the webhook
/// trigger event in a single transaction.  Failures are warned but do not
/// block the response path — the direct dispatch still fires regardless.
async fn write_mutation_outbox_shadow(
    app_state: &AppState,
    client_name: &str,
    table_name: &str,
    operation: GatewayOperationKind,
    logged_request: &LoggedRequest,
    publish_event: &Option<MutationPublishEvent>,
    trigger: &crate::webhooks::GatewayWebhookTrigger,
    response_payload: &Value,
) {
    let Some(logging_client) = app_state.logging_client_name.as_ref() else {
        return;
    };
    let Some(pool) = app_state.pg_registry.get_pool(logging_client) else {
        return;
    };

    let headers = json!({
        "client_name": client_name,
        "request_id": logged_request.request_id,
        "company_id": publish_event.as_ref().map(|e| e.company_id.as_str()).unwrap_or(""),
    });

    let wh_payload = json!({
        "route_key": trigger.route_key,
        "table_name": trigger.table_name,
        "request_method": trigger.request_method,
        "request_path": trigger.request_path,
        "request_payload": trigger.payload,
        "response_payload": response_payload,
        "headers": trigger.headers,
    });

    let mut tx = match pool.begin().await {
        Ok(t) => t,
        Err(err) => {
            warn!(
                client = %client_name,
                error = %err,
                "Outbox shadow: failed to begin transaction"
            );
            return;
        }
    };

    // 1. Optional CDC mutation event.
    if let Some(evt) = publish_event {
        let event_type = match operation {
            GatewayOperationKind::Insert => "mutation.insert",
            GatewayOperationKind::Update => "mutation.update",
            GatewayOperationKind::Delete => "mutation.delete",
            _ => "mutation.other",
        };
        let insert = OutboxEventInsert {
            aggregate_type: "gateway".into(),
            aggregate_id: table_name.to_string(),
            event_type: event_type.to_string(),
            payload: evt.payload.clone(),
            headers: headers.clone(),
            available_at: None,
        };
        if let Err(err) = insert_outbox_event_tx(&mut tx, insert).await {
            warn!(client = %client_name, error = %err, "Outbox shadow: mutation event insert failed");
        }
    }

    // 2. Webhook trigger event.
    let wh_insert = OutboxEventInsert {
        aggregate_type: "gateway".into(),
        aggregate_id: table_name.to_string(),
        event_type: "webhook.trigger".to_string(),
        payload: wh_payload,
        headers,
        available_at: None,
    };
    if let Err(err) = insert_outbox_event_tx(&mut tx, wh_insert).await {
        warn!(client = %client_name, error = %err, "Outbox shadow: webhook event insert failed");
    }

    if let Err(err) = tx.commit().await {
        warn!(client = %client_name, error = %err, "Outbox shadow: commit failed");
    }
}