athena_rs 3.12.0

Hyper performant polyglot Database driver
Documentation
use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
use serde_json::Value;
use std::time::Instant;

use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{GatewayAuthOutcome, authorize_gateway_request};
use crate::api::gateway::contracts::{GatewayDeferredRequest, GatewayOperationKind};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::gateway_service_unavailable;
use crate::api::response::api_accepted;
use crate::data::events::post_event;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

pub(crate) struct AuthorizedGatewayRequest {
    pub(crate) auth: GatewayAuthOutcome,
    pub(crate) logged_request: LoggedRequest,
}

pub(crate) struct MutationPublishEvent {
    pub(crate) company_id: String,
    pub(crate) payload: Value,
}

pub(crate) struct MutationSuccessEffects {
    pub(crate) operation: GatewayOperationKind,
    pub(crate) log_action: &'static str,
    pub(crate) route_key: &'static str,
    pub(crate) table_name: String,
    pub(crate) log_details: Option<Value>,
    pub(crate) request_payload: Option<Value>,
    pub(crate) response_payload: Value,
    pub(crate) invalidate_cache: bool,
    pub(crate) publish_event: Option<MutationPublishEvent>,
}

pub(crate) async fn authorize_and_log_gateway_request(
    req: &HttpRequest,
    app_state: &AppState,
    client_name: Option<&str>,
    required_rights: Vec<String>,
) -> Result<AuthorizedGatewayRequest, HttpResponse> {
    let auth = authorize_gateway_request(req, app_state, client_name, required_rights).await;
    let logged_request = log_request(
        req.clone(),
        Some(app_state),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(response) = auth.response {
        return Err(response);
    }
    Ok(AuthorizedGatewayRequest {
        auth,
        logged_request,
    })
}

pub(crate) fn log_gateway_operation_result(
    app_state: Option<&AppState>,
    logged_request: &LoggedRequest,
    action: &str,
    table_name: Option<&str>,
    operation_start: Instant,
    status: StatusCode,
    details: Option<Value>,
) {
    log_operation_event(
        app_state,
        logged_request,
        action,
        table_name,
        operation_start.elapsed().as_millis(),
        status,
        details,
    );
}

pub(crate) async fn enqueue_gateway_deferred_response(
    req: &HttpRequest,
    app_state: &AppState,
    auth: &GatewayAuthOutcome,
    client_name: &str,
    method: &str,
    deferred_request: &GatewayDeferredRequest,
    operation: GatewayOperationKind,
    accepted_message: &str,
) -> HttpResponse {
    let request_bytes = req
        .headers()
        .get(actix_web::http::header::CONTENT_LENGTH)
        .and_then(|value| value.to_str().ok())
        .and_then(|value| value.parse::<u64>().ok());
    if let Err(err) = enqueue_gateway_deferred_request(
        app_state,
        method,
        req.path(),
        request_bytes,
        deferred_request,
    )
    .await
    {
        return gateway_service_unavailable(
            operation,
            "Deferred queue unavailable",
            format!(
                "Failed to queue deferred {} request: {err}",
                operation.as_str()
            ),
        );
    }
    api_accepted(
        accepted_message,
        serde_json::json!({
            "request_id": auth.request_id,
            "status": "queued",
            "route": req.path(),
            "client": client_name,
        }),
    )
}

pub(crate) async fn finalize_gateway_mutation_success(
    req: &HttpRequest,
    app_state: Data<AppState>,
    client_name: &str,
    logged_request: &LoggedRequest,
    operation_start: Instant,
    effects: MutationSuccessEffects,
) -> Value {
    if effects.invalidate_cache {
        let _ =
            invalidate_scoped_gateway_cache(app_state.clone(), client_name, &effects.table_name)
                .await;
    }

    if let Some(event) = effects.publish_event {
        post_event(event.company_id, event.payload).await;
    }

    log_gateway_operation_result(
        Some(app_state.get_ref()),
        logged_request,
        effects.log_action,
        Some(&effects.table_name),
        operation_start,
        StatusCode::OK,
        effects.log_details,
    );

    crate::webhooks::spawn_gateway_webhook_dispatch(
        app_state,
        crate::webhooks::gateway_webhook_trigger_from_http(
            req,
            client_name,
            effects.route_key,
            Some(effects.table_name),
            Some(logged_request.request_id.clone()),
            effects.request_payload,
            Some(effects.response_payload.clone()),
        ),
    );

    effects.response_payload
}