use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
use serde_json::{Value, json};
use std::time::Instant;
use tracing::warn;
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{
GatewayAuthOptions, GatewayAuthOutcome, authorize_gateway_request_with_options,
};
use crate::api::gateway::contracts::{GatewayDeferredRequest, GatewayOperationKind};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::{
GATEWAY_ERROR_CODE_DEFERRED_QUEUE_UNAVAILABLE, gateway_service_unavailable_with_code,
};
use crate::api::response::api_accepted;
use crate::data::events::post_event;
use crate::data::outbox::{OutboxEventInsert, insert_outbox_event_tx};
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
pub(crate) struct AuthorizedGatewayRequest {
pub(crate) auth: GatewayAuthOutcome,
pub(crate) logged_request: LoggedRequest,
}
pub(crate) struct MutationPublishEvent {
pub(crate) company_id: String,
pub(crate) payload: Value,
}
pub(crate) struct MutationSuccessEffects {
pub(crate) operation: GatewayOperationKind,
pub(crate) log_action: &'static str,
pub(crate) route_key: &'static str,
pub(crate) table_name: String,
pub(crate) log_details: Option<Value>,
pub(crate) request_payload: Option<Value>,
pub(crate) response_payload: Value,
pub(crate) invalidate_cache: bool,
pub(crate) publish_event: Option<MutationPublishEvent>,
}
pub(crate) async fn authorize_and_log_gateway_request(
req: &HttpRequest,
app_state: &AppState,
client_name: Option<&str>,
required_rights: Vec<String>,
) -> Result<AuthorizedGatewayRequest, HttpResponse> {
authorize_and_log_gateway_request_with_options(
req,
app_state,
client_name,
required_rights,
GatewayAuthOptions::default(),
)
.await
}
pub(crate) async fn authorize_and_log_gateway_request_with_options(
req: &HttpRequest,
app_state: &AppState,
client_name: Option<&str>,
required_rights: Vec<String>,
options: GatewayAuthOptions,
) -> Result<AuthorizedGatewayRequest, HttpResponse> {
let auth = authorize_gateway_request_with_options(
req,
app_state,
client_name,
required_rights,
options,
)
.await;
let logged_request = log_request(
req.clone(),
Some(app_state),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(response) = auth.response {
return Err(response);
}
Ok(AuthorizedGatewayRequest {
auth,
logged_request,
})
}
pub(crate) fn log_gateway_operation_result(
app_state: Option<&AppState>,
logged_request: &LoggedRequest,
action: &str,
table_name: Option<&str>,
operation_start: Instant,
status: StatusCode,
details: Option<Value>,
) {
log_operation_event(
app_state,
logged_request,
action,
table_name,
operation_start.elapsed().as_millis(),
status,
details,
);
}
pub(crate) async fn enqueue_gateway_deferred_response(
req: &HttpRequest,
app_state: &AppState,
auth: &GatewayAuthOutcome,
client_name: &str,
method: &str,
deferred_request: &GatewayDeferredRequest,
operation: GatewayOperationKind,
accepted_message: &str,
) -> HttpResponse {
let request_bytes = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
if let Err(err) = enqueue_gateway_deferred_request(
app_state,
method,
req.path(),
request_bytes,
deferred_request,
)
.await
{
return gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_DEFERRED_QUEUE_UNAVAILABLE,
operation,
"Deferred queue unavailable",
format!(
"Failed to queue deferred {} request: {err}",
operation.as_str()
),
);
}
api_accepted(
accepted_message,
serde_json::json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
"client": client_name,
}),
)
}
pub(crate) async fn finalize_gateway_mutation_success(
req: &HttpRequest,
app_state: Data<AppState>,
client_name: &str,
logged_request: &LoggedRequest,
operation_start: Instant,
effects: MutationSuccessEffects,
) -> Value {
if effects.invalidate_cache {
let _ =
invalidate_scoped_gateway_cache(app_state.clone(), client_name, &effects.table_name)
.await;
}
if let Some(ref event) = effects.publish_event {
post_event(event.company_id.clone(), event.payload.clone()).await;
}
log_gateway_operation_result(
Some(app_state.get_ref()),
logged_request,
effects.log_action,
Some(&effects.table_name),
operation_start,
StatusCode::OK,
effects.log_details,
);
let trigger = crate::webhooks::gateway_webhook_trigger_from_http(
req,
client_name,
effects.route_key,
Some(effects.table_name.clone()),
Some(logged_request.request_id.clone()),
effects.request_payload.clone(),
Some(effects.response_payload.clone()),
);
write_mutation_outbox_shadow(
&app_state,
client_name,
&effects.table_name,
effects.operation,
logged_request,
&effects.publish_event,
&trigger,
&effects.response_payload,
)
.await;
crate::webhooks::spawn_gateway_webhook_dispatch(app_state, trigger);
effects.response_payload
}
async fn write_mutation_outbox_shadow(
app_state: &AppState,
client_name: &str,
table_name: &str,
operation: GatewayOperationKind,
logged_request: &LoggedRequest,
publish_event: &Option<MutationPublishEvent>,
trigger: &crate::webhooks::GatewayWebhookTrigger,
response_payload: &Value,
) {
let Some(logging_client) = app_state.logging_client_name.as_ref() else {
return;
};
let Some(pool) = app_state.pg_registry.get_pool(logging_client) else {
return;
};
let headers = json!({
"client_name": client_name,
"request_id": logged_request.request_id,
"company_id": publish_event.as_ref().map(|e| e.company_id.as_str()).unwrap_or(""),
});
let wh_payload = json!({
"route_key": trigger.route_key,
"table_name": trigger.table_name,
"request_method": trigger.request_method,
"request_path": trigger.request_path,
"request_payload": trigger.payload,
"response_payload": response_payload,
"headers": trigger.headers,
});
let mut tx = match pool.begin().await {
Ok(t) => t,
Err(err) => {
warn!(
client = %client_name,
error = %err,
"Outbox shadow: failed to begin transaction"
);
return;
}
};
if let Some(evt) = publish_event {
let event_type = match operation {
GatewayOperationKind::Insert => "mutation.insert",
GatewayOperationKind::Update => "mutation.update",
GatewayOperationKind::Delete => "mutation.delete",
_ => "mutation.other",
};
let insert = OutboxEventInsert {
aggregate_type: "gateway".into(),
aggregate_id: table_name.to_string(),
event_type: event_type.to_string(),
payload: evt.payload.clone(),
headers: headers.clone(),
available_at: None,
};
if let Err(err) = insert_outbox_event_tx(&mut tx, insert).await {
warn!(client = %client_name, error = %err, "Outbox shadow: mutation event insert failed");
}
}
let wh_insert = OutboxEventInsert {
aggregate_type: "gateway".into(),
aggregate_id: table_name.to_string(),
event_type: "webhook.trigger".to_string(),
payload: wh_payload,
headers,
available_at: None,
};
if let Err(err) = insert_outbox_event_tx(&mut tx, wh_insert).await {
warn!(client = %client_name, error = %err, "Outbox shadow: webhook event insert failed");
}
if let Err(err) = tx.commit().await {
warn!(client = %client_name, error = %err, "Outbox shadow: commit failed");
}
}