use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
use serde_json::Value;
use std::time::Instant;
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{GatewayAuthOutcome, authorize_gateway_request};
use crate::api::gateway::contracts::{GatewayDeferredRequest, GatewayOperationKind};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::gateway_service_unavailable;
use crate::api::response::api_accepted;
use crate::data::events::post_event;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
pub(crate) struct AuthorizedGatewayRequest {
pub(crate) auth: GatewayAuthOutcome,
pub(crate) logged_request: LoggedRequest,
}
pub(crate) struct MutationPublishEvent {
pub(crate) company_id: String,
pub(crate) payload: Value,
}
pub(crate) struct MutationSuccessEffects {
pub(crate) operation: GatewayOperationKind,
pub(crate) log_action: &'static str,
pub(crate) route_key: &'static str,
pub(crate) table_name: String,
pub(crate) log_details: Option<Value>,
pub(crate) request_payload: Option<Value>,
pub(crate) response_payload: Value,
pub(crate) invalidate_cache: bool,
pub(crate) publish_event: Option<MutationPublishEvent>,
}
pub(crate) async fn authorize_and_log_gateway_request(
req: &HttpRequest,
app_state: &AppState,
client_name: Option<&str>,
required_rights: Vec<String>,
) -> Result<AuthorizedGatewayRequest, HttpResponse> {
let auth = authorize_gateway_request(req, app_state, client_name, required_rights).await;
let logged_request = log_request(
req.clone(),
Some(app_state),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(response) = auth.response {
return Err(response);
}
Ok(AuthorizedGatewayRequest {
auth,
logged_request,
})
}
pub(crate) fn log_gateway_operation_result(
app_state: Option<&AppState>,
logged_request: &LoggedRequest,
action: &str,
table_name: Option<&str>,
operation_start: Instant,
status: StatusCode,
details: Option<Value>,
) {
log_operation_event(
app_state,
logged_request,
action,
table_name,
operation_start.elapsed().as_millis(),
status,
details,
);
}
pub(crate) async fn enqueue_gateway_deferred_response(
req: &HttpRequest,
app_state: &AppState,
auth: &GatewayAuthOutcome,
client_name: &str,
method: &str,
deferred_request: &GatewayDeferredRequest,
operation: GatewayOperationKind,
accepted_message: &str,
) -> HttpResponse {
let request_bytes = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
if let Err(err) = enqueue_gateway_deferred_request(
app_state,
method,
req.path(),
request_bytes,
deferred_request,
)
.await
{
return gateway_service_unavailable(
operation,
"Deferred queue unavailable",
format!(
"Failed to queue deferred {} request: {err}",
operation.as_str()
),
);
}
api_accepted(
accepted_message,
serde_json::json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
"client": client_name,
}),
)
}
pub(crate) async fn finalize_gateway_mutation_success(
req: &HttpRequest,
app_state: Data<AppState>,
client_name: &str,
logged_request: &LoggedRequest,
operation_start: Instant,
effects: MutationSuccessEffects,
) -> Value {
if effects.invalidate_cache {
let _ =
invalidate_scoped_gateway_cache(app_state.clone(), client_name, &effects.table_name)
.await;
}
if let Some(event) = effects.publish_event {
post_event(event.company_id, event.payload).await;
}
log_gateway_operation_result(
Some(app_state.get_ref()),
logged_request,
effects.log_action,
Some(&effects.table_name),
operation_start,
StatusCode::OK,
effects.log_details,
);
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state,
crate::webhooks::gateway_webhook_trigger_from_http(
req,
client_name,
effects.route_key,
Some(effects.table_name),
Some(logged_request.request_id.clone()),
effects.request_payload,
Some(effects.response_payload.clone()),
),
);
effects.response_payload
}