use actix_web::http::StatusCode;
use actix_web::web::Data;
use chrono::Utc;
use serde_json::{Map, Value, json};
use std::time::Instant;
use tracing::{error, info, warn};
use crate::AppState;
use crate::api::gateway::lifecycle::log_gateway_operation_result;
use crate::data::events::post_event;
use crate::utils::request_logging::LoggedRequest;
use super::backend::{InsertDriverError, postgres_insert_with_timeout};
use super::config::insert_db_timeout_ms;
use super::dedupe::{build_insert_duplicate_signature, lookup_recent_unique_violation};
use super::error::{
InsertError, WindowInsertOutcome, build_error_metadata, log_gateway_insert_success,
observe_insert_error, prefilter_unique_violation_error, trace_insert_error,
};
use super::window::WindowInsertJob;
pub(crate) fn insert_request_has_update_body(body: &Value) -> bool {
body.get("update_body").is_some()
}
pub(crate) fn should_invalidate_cache_after_insert(inserted_row: &Value) -> bool {
match inserted_row {
Value::Null => false,
Value::Object(map) => !map.is_empty(),
_ => true,
}
}
fn build_diff_resource(body: &Value, insert_body: &Value, user_id: &str) -> Value {
let mut diff_resource = json!({});
if let (Some(update_body), Some(insert_body_obj)) = (
body.get("update_body").and_then(Value::as_object),
insert_body.as_object(),
) {
for (key, new_value) in update_body {
if !new_value.is_null() {
let should_include = match insert_body_obj.get(key) {
Some(existing) => existing != new_value,
None => true,
};
if should_include {
diff_resource[key] = json!({
"blame": { "user_id": user_id },
"new": new_value,
"old": null,
"time": chrono::Utc::now().timestamp(),
"void": false
});
}
}
}
}
diff_resource
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn record_insert_audit_log(
_table_name: String,
_resource_id: String,
_insert_body: Value,
_company_id: String,
_organization_id: String,
_domain: String,
_user_id: String,
_message: String,
_status: String,
_source: String,
_actor_id: String,
_action: String,
_diff_resource: Value,
_meta: Value,
_success: bool,
_path: String,
_client_name: &str,
) -> Result<Value, String> {
Ok(json!({ "audit": true }))
}
pub(crate) async fn finish_postgres_insert_success_json_with_recovery(
app_state: Data<AppState>,
job: &WindowInsertJob,
inserted_row: Value,
) -> WindowInsertOutcome {
let timeout = tokio::time::Duration::from_secs(120);
match tokio::time::timeout(
timeout,
finish_postgres_insert_success_json(app_state.clone(), job, inserted_row),
)
.await
{
Ok(body) => WindowInsertOutcome::Success(body),
Err(_) => {
error!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
"Response serialization timeout (exceeded 120s) - audit log, webhooks, or cache invalidation likely hung"
);
app_state
.metrics_state
.record_gateway_insert_window_event("response_serialization_timeout");
let mut details = Map::new();
details.insert("table".to_string(), Value::String(job.table_name.clone()));
details.insert("client".to_string(), Value::String(job.client_name.clone()));
details.insert(
"likely_cause".to_string(),
Value::String("audit_log_or_webhook_delay".to_string()),
);
WindowInsertOutcome::Error(InsertError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"response_serialization_timeout",
"Response serialization exceeded maximum time limit (120s); check audit logging, webhooks, and cache invalidation".to_string(),
job.trace_id.clone(),
details,
))
}
}
}
pub(crate) async fn finish_postgres_insert_success_json(
app_state: Data<AppState>,
job: &WindowInsertJob,
inserted_row: Value,
) -> Value {
let response_started = Instant::now();
let diff_resource = build_diff_resource(&job.body, &job.insert_body, &job.user_id);
let resource_id = inserted_row
.as_object()
.and_then(|object| object.get(&job.resource_id_key))
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| job.trace_id.clone());
let audit_message = format!(
"User {} inserted data into [{}] at {}",
job.user_id,
job.table_name,
Utc::now().timestamp()
);
let audit_log_started = Instant::now();
let audit_log_result = record_insert_audit_log(
job.table_name.clone(),
resource_id.clone(),
job.insert_body.clone(),
job.company_id.clone(),
job.organization_id.clone(),
"domain".to_string(),
job.user_id.clone(),
audit_message,
"success".to_string(),
"request".to_string(),
job.user_id.clone(),
format!("insert-{}", job.table_name),
diff_resource,
json!({}),
true,
"/data/insert".to_string(),
&job.client_name,
)
.await;
let audit_log_duration = audit_log_started.elapsed();
match &audit_log_result {
Ok(audit_log_data) => {
if job.verbose_logging {
let audit_log_message = if job.ansi_enabled {
"\u{001b}[32mAudit log inserted successfully\u{001b}[0m (ATHENA_VERBOSE_LOGGING=1)"
} else {
"Audit log inserted successfully (ATHENA_VERBOSE_LOGGING=1)"
};
info!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
resource_id = %resource_id,
audit_log_duration_ms = audit_log_duration.as_millis(),
audit_payload = ?audit_log_data,
"{}",
audit_log_message
);
}
if audit_log_duration.as_millis() > 5000 {
warn!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
audit_log_duration_ms = audit_log_duration.as_millis(),
"Slow audit log insertion (>5s)"
);
}
}
Err(err) => {
error!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
audit_log_duration_ms = audit_log_duration.as_millis(),
error = %err,
"Failed to insert audit log"
);
}
}
let success_json = json!({
"status": "success",
"success": true,
"message": "Data inserted successfully",
"data": inserted_row,
"resource_id": resource_id,
"table_name": job.table_name,
});
if job.x_publish_event {
if let Some(company_for_event) = job.resolved_company_for_event.clone() {
let event: Value = json!({
"event": "INSERT",
"resource": job.table_name.clone(),
"inserted_by_user": job.user_id.clone(),
"company_id": company_for_event.clone(),
"insert_body": job.insert_body.clone(),
});
post_event(company_for_event, event).await;
} else {
info!(
trace_id = %job.trace_id,
"Skipping publish event because company_id is missing"
);
}
}
let logged_request = LoggedRequest {
request_id: job.logged_request_id.clone(),
client_name: job.logged_client_name.clone(),
method: job.logged_method.clone(),
path: job.logged_path.clone(),
status_code: 0,
time: 0,
};
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&job.table_name),
job.operation_start,
StatusCode::OK,
Some(json!({
"resource_id": resource_id,
"client": job.client_name,
})),
);
let mut wh_headers = Vec::new();
if !job.user_id.is_empty() {
wh_headers.push(("x-user-id".to_string(), job.user_id.clone()));
}
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::GatewayWebhookTrigger {
client_name: job.client_name.clone(),
route_key: crate::webhooks::ROUTE_GATEWAY_INSERT.to_string(),
table_name: Some(job.table_name.clone()),
request_id: Some(logged_request.request_id.clone()),
request_method: job.logged_method.clone(),
request_path: job.logged_path.clone(),
headers: wh_headers,
payload: Some(job.body.clone()),
response: Some(success_json.clone()),
},
);
let response_duration = response_started.elapsed();
if response_duration.as_millis() > 10000 {
warn!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
response_serialize_duration_ms = response_duration.as_millis(),
"Slow response serialization phase (>10s) - check audit log, webhooks, cache invalidation"
);
} else if response_duration.as_millis() > 5000 {
info!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
response_serialize_duration_ms = response_duration.as_millis(),
"Response serialization took >5s"
);
}
success_json
}
pub(crate) async fn run_postgres_insert_to_outcome(
app_state: Data<AppState>,
job: WindowInsertJob,
) -> WindowInsertOutcome {
let trace_id = job.trace_id.clone();
let table_name = job.table_name.clone();
let client_name = job.client_name.clone();
let user_id = job.user_id.clone();
let metadata_user_id = job.metadata_user_id.as_deref();
let metadata_company_id = job.metadata_company_id.as_deref();
let metadata_organization_id = job.metadata_organization_id.as_deref();
let Some(pool) = app_state.pg_registry.get_pool(&client_name) else {
let mut details = build_error_metadata(
&trace_id,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("unknown_client".to_string(), json!(client_name.clone()));
let insert_error = InsertError::new(
StatusCode::BAD_REQUEST,
"unknown_client",
format!("Postgres client '{}' is not configured", client_name),
trace_id.clone(),
details,
);
observe_insert_error(
app_state.get_ref(),
&client_name,
Some(&table_name),
Some(&job.insert_body),
&insert_error,
);
trace_insert_error(&table_name, &client_name, &user_id, &insert_error);
return WindowInsertOutcome::Error(insert_error);
};
if let Some(signature) =
build_insert_duplicate_signature(&client_name, &table_name, &job.insert_body)
{
let dedupe_started = Instant::now();
if let Some(cached_constraint) = lookup_recent_unique_violation(&signature) {
app_state
.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_hit");
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
let insert_error = prefilter_unique_violation_error(
trace_id.clone(),
&table_name,
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
cached_constraint.as_deref(),
"recent_unique_violation_cache_window",
);
observe_insert_error(
app_state.get_ref(),
&client_name,
Some(&table_name),
Some(&job.insert_body),
&insert_error,
);
trace_insert_error(&table_name, &client_name, &user_id, &insert_error);
return WindowInsertOutcome::Error(insert_error);
}
app_state
.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_miss");
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
}
let db_insert_started = Instant::now();
let db_timeout_ms = insert_db_timeout_ms(app_state.get_ref());
let insert_result: Result<Value, InsertDriverError> =
postgres_insert_with_timeout(&pool, &table_name, &job.insert_body, db_timeout_ms).await;
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"db_insert",
db_insert_started.elapsed().as_secs_f64(),
);
match insert_result {
Ok(inserted_row) => {
log_gateway_insert_success(
"athena",
&trace_id,
&client_name,
&table_name,
job.operation_start.elapsed().as_millis(),
db_insert_started.elapsed().as_millis() as u64,
);
finish_postgres_insert_success_json_with_recovery(app_state, &job, inserted_row).await
}
Err(driver_error) => {
let insert_error = InsertError::from_driver(
driver_error,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
trace_id,
);
observe_insert_error(
app_state.get_ref(),
&client_name,
Some(&table_name),
Some(&job.insert_body),
&insert_error,
);
trace_insert_error(&table_name, &client_name, &user_id, &insert_error);
WindowInsertOutcome::Error(insert_error)
}
}
}