use actix_web::{HttpRequest, HttpResponse, http::StatusCode, put, web::Json};
use chrono::Utc;
use once_cell::sync::Lazy;
use serde_json::{Map, Value, json};
use std::collections::HashMap;
use std::env;
use std::io::IsTerminal;
use std::sync::Mutex as StdMutex;
use std::time::Instant;
use std::time::{Duration as StdDuration, Instant as StdInstant};
use tracing::{error, info, warn};
mod identity;
mod window;
pub use window::{InsertWindowCoordinator, InsertWindowSettings, WindowInsertJob};
use self::identity::InsertIdentity;
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{authorize_gateway_request, write_right_for_resource};
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_INSERT, GatewayDeferredRequest, GatewayInsertRequest,
GatewayOperationKind,
};
#[cfg(feature = "deadpool_experimental")]
use crate::api::gateway::deadpool_timeout::deadpool_checkout_timeout;
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::response::{
gateway_bad_request, gateway_internal_error, gateway_service_unavailable,
};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_athena_insert_window::x_athena_insert_window_ms;
use crate::api::headers::x_publish_event::get_x_publish_event;
use crate::api::response::api_accepted;
use crate::data::events::post_event;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_crud::insert_row_deadpool;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::deadpool_fallback_reason_label;
use crate::drivers::postgresql::sqlx_driver::{PostgresInsertError, insert_row};
use crate::drivers::supabase::{client_router, client_router_health_aware};
#[cfg(feature = "deadpool_experimental")]
use crate::error::tokio_postgres_parser::process_tokio_postgres_db_error;
use crate::utils::request_logging::LoggedRequest;
use crate::utils::request_logging::{log_operation_event, log_request};
use actix_web::web::Data;
#[allow(unused_imports)]
use moka::future::Cache;
use supabase_rs::SupabaseClient;
#[allow(unused_imports)]
use tokio::sync::MutexGuard;
#[derive(Debug)]
pub enum SupabaseInsertError {
UnknownClient { client_name: String },
ClientInit { message: String },
InsertFailed { message: String },
BackendUnavailable { message: String },
}
#[derive(Debug)]
pub(crate) enum InsertDriverError {
Postgres(PostgresInsertError),
Supabase(SupabaseInsertError),
StageTimeout {
backend: &'static str,
stage: &'static str,
timeout_ms: u64,
},
}
#[derive(Debug)]
pub(crate) struct InsertError {
status: StatusCode,
code: &'static str,
message: String,
trace_id: String,
details: Value,
}
pub(crate) enum WindowInsertOutcome {
Success(Value),
Error(InsertError),
}
impl WindowInsertOutcome {
pub(crate) fn into_http_response(self) -> HttpResponse {
match self {
WindowInsertOutcome::Success(v) => HttpResponse::Ok().json(v),
WindowInsertOutcome::Error(e) => e.into_response(),
}
}
}
pub(crate) fn window_insert_internal_error(message: impl Into<String>) -> WindowInsertOutcome {
WindowInsertOutcome::Error(InsertError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_error",
message,
"internal".to_string(),
Map::new(),
))
}
pub(crate) fn insert_request_has_update_body(body: &Value) -> bool {
body.get("update_body").is_some()
}
pub(crate) fn should_invalidate_cache_after_insert(inserted_row: &Value) -> bool {
match inserted_row {
Value::Null => false,
Value::Object(map) => !map.is_empty(),
_ => true,
}
}
#[derive(Clone)]
struct RecentUniqueViolationEntry {
constraint: Option<String>,
recorded_at: StdInstant,
}
const RECENT_UNIQUE_VIOLATION_TTL: StdDuration = StdDuration::from_secs(180);
const RECENT_UNIQUE_VIOLATION_MAX_ENTRIES: usize = 20_000;
static RECENT_UNIQUE_VIOLATION_CACHE: Lazy<StdMutex<HashMap<String, RecentUniqueViolationEntry>>> =
Lazy::new(|| StdMutex::new(HashMap::new()));
fn feature_enabled_from_env(name: &str, default: bool) -> bool {
env::var(name)
.ok()
.map(|value| {
let normalized = value.trim().to_ascii_lowercase();
matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(default)
}
fn bounded_f64_from_env(name: &str, default: f64, min: f64, max: f64) -> f64 {
let parsed = env::var(name)
.ok()
.and_then(|value| value.trim().parse::<f64>().ok())
.unwrap_or(default);
parsed.clamp(min, max)
}
fn bounded_u64_from_env(name: &str, default: u64, min: u64, max: u64) -> u64 {
let parsed = env::var(name)
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(default);
parsed.clamp(min, max)
}
fn default_insert_timeout_ms(app_state: &AppState) -> u64 {
app_state
.gateway_resilience_timeout_secs
.saturating_mul(1000)
.clamp(250, 120_000)
}
fn insert_db_timeout_ms(app_state: &AppState) -> u64 {
bounded_u64_from_env(
"ATHENA_INSERT_DB_TIMEOUT_MS",
default_insert_timeout_ms(app_state),
100,
180_000,
)
}
fn insert_window_response_timeout_ms(app_state: &AppState, window_ms: u64) -> u64 {
let fallback_ms: u64 = default_insert_timeout_ms(app_state)
.saturating_add(window_ms)
.saturating_add(250);
bounded_u64_from_env(
"ATHENA_INSERT_WINDOW_RESPONSE_TIMEOUT_MS",
fallback_ms,
250,
240_000,
)
}
async fn postgres_insert_with_timeout(
pool: &sqlx::postgres::PgPool,
table_name: &str,
insert_body: &Value,
timeout_ms: u64,
) -> Result<Value, InsertDriverError> {
let timeout = tokio::time::Duration::from_millis(timeout_ms);
match tokio::time::timeout(timeout, insert_row(pool, table_name, insert_body)).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(InsertDriverError::Postgres(err)),
Err(_) => Err(InsertDriverError::StageTimeout {
backend: "postgres",
stage: "db_insert",
timeout_ms,
}),
}
}
async fn supabase_insert_with_timeout(
table_name: String,
insert_body: Value,
client_name: &str,
timeout_ms: u64,
) -> Result<(Value, String), InsertDriverError> {
let timeout = tokio::time::Duration::from_millis(timeout_ms);
match tokio::time::timeout(timeout, insert(table_name, insert_body, client_name)).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(InsertDriverError::Supabase(err)),
Err(_) => Err(InsertDriverError::StageTimeout {
backend: "supabase",
stage: "db_insert",
timeout_ms,
}),
}
}
pub(crate) fn insert_window_dedupe_enabled() -> bool {
feature_enabled_from_env("ATHENA_INSERT_WINDOW_DEDUPE_ENABLED", true)
}
pub(crate) fn recent_unique_conflict_cache_enabled() -> bool {
feature_enabled_from_env("ATHENA_RECENT_UNIQUE_CONFLICT_CACHE_ENABLED", true)
}
fn insert_admission_defer_enabled() -> bool {
feature_enabled_from_env("ATHENA_INSERT_ADMISSION_DEFER_ENABLED", true)
}
fn insert_admission_high_watermark_ratio() -> f64 {
bounded_f64_from_env("ATHENA_INSERT_ADMISSION_HIGH_WATERMARK", 0.85, 0.10, 0.99)
}
fn queue_high_watermark(max_queued: usize, ratio: f64) -> usize {
if max_queued == 0 {
return 0;
}
let threshold = (max_queued as f64 * ratio).ceil() as usize;
threshold.clamp(1, max_queued)
}
fn should_defer_insert_for_queue_pressure(
pending_queue: usize,
max_queued: usize,
ratio: f64,
) -> bool {
let threshold = queue_high_watermark(max_queued, ratio);
threshold > 0 && pending_queue >= threshold
}
fn insert_overload_retry_after_ms(pending_queue: usize, max_queued: usize, window_ms: u64) -> u64 {
let pressure_ratio: f64 = if max_queued == 0 {
1.0
} else {
pending_queue as f64 / max_queued as f64
}
.clamp(0.0, 2.0);
let base_ms: u64 = window_ms.max(25);
let multiplier: u64 = if pressure_ratio >= 1.5 {
8
} else if pressure_ratio >= 1.25 {
6
} else if pressure_ratio >= 1.0 {
4
} else if pressure_ratio >= 0.9 {
3
} else {
2
};
let suggested_ms: u64 = base_ms.saturating_mul(multiplier).clamp(50, 10_000);
bounded_u64_from_env(
"ATHENA_INSERT_OVERLOAD_RETRY_AFTER_MS",
suggested_ms,
50,
60_000,
)
}
fn retry_after_seconds_from_ms(retry_after_ms: u64) -> u64 {
retry_after_ms
.saturating_add(999)
.saturating_div(1000)
.clamp(1, 60)
}
fn unique_violation_message(constraint: Option<&str>) -> String {
match constraint {
Some(name) => format!(
"Insert would create a duplicate record (unique constraint '{}'). Use /gateway/update to modify the existing record or change the unique field values.",
name
),
None => "Insert would create a duplicate record. Use /gateway/update to modify the existing record or change the unique field values.".to_string(),
}
}
fn normalized_json_for_signature(value: &Value) -> Value {
match value {
Value::Object(map) => {
let mut entries: Vec<(&String, &Value)> = map.iter().collect();
entries.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut normalized: Map<String, Value> = Map::new();
for (key, child) in entries {
normalized.insert(key.clone(), normalized_json_for_signature(child));
}
Value::Object(normalized)
}
Value::Array(items) => Value::Array(
items
.iter()
.map(normalized_json_for_signature)
.collect::<Vec<_>>(),
),
_ => value.clone(),
}
}
pub(crate) fn build_insert_duplicate_signature(
client_name: &str,
table_name: &str,
insert_body: &Value,
) -> Option<String> {
if table_name.trim().is_empty() {
return None;
}
let normalized: Value = normalized_json_for_signature(insert_body);
let payload_fingerprint: String = normalized.to_string();
Some(format!(
"{}\u{001f}{}\u{001f}{}",
client_name.trim(),
table_name.trim(),
payload_fingerprint
))
}
pub(crate) fn lookup_recent_unique_violation(signature: &str) -> Option<Option<String>> {
if !recent_unique_conflict_cache_enabled() {
return None;
}
let mut cache = RECENT_UNIQUE_VIOLATION_CACHE.lock().ok()?;
let Some(entry) = cache.get(signature).cloned() else {
return None;
};
if entry.recorded_at.elapsed() > RECENT_UNIQUE_VIOLATION_TTL {
cache.remove(signature);
return None;
}
Some(entry.constraint)
}
pub(crate) fn store_recent_unique_violation(signature: String, constraint: Option<String>) {
if !recent_unique_conflict_cache_enabled() {
return;
}
let Ok(mut cache) = RECENT_UNIQUE_VIOLATION_CACHE.lock() else {
return;
};
if cache.len() >= RECENT_UNIQUE_VIOLATION_MAX_ENTRIES {
let oldest_key: Option<String> = cache
.iter()
.min_by_key(|(_, entry)| entry.recorded_at)
.map(|(key, _)| key.clone());
if let Some(oldest_key) = oldest_key {
cache.remove(&oldest_key);
}
}
cache.insert(
signature,
RecentUniqueViolationEntry {
constraint,
recorded_at: StdInstant::now(),
},
);
}
pub(crate) fn remember_unique_violation_from_insert(
client_name: &str,
table_name: &str,
insert_body: &Value,
error_code: &str,
details: &Value,
) {
if error_code != "unique_violation" {
return;
}
let Some(signature) = build_insert_duplicate_signature(client_name, table_name, insert_body)
else {
return;
};
let constraint: Option<String> = details
.get("constraint")
.and_then(Value::as_str)
.map(str::to_string);
store_recent_unique_violation(signature, constraint);
}
fn prefilter_unique_violation_error(
trace_id: String,
table_name: &str,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
constraint: Option<&str>,
dedupe_source: &str,
) -> InsertError {
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
Some(table_name),
client_name,
user_id,
company_id,
organization_id,
);
details.insert("sql_state".to_string(), json!("23505"));
details.insert("dedupe_source".to_string(), json!(dedupe_source));
if let Some(constraint) = constraint {
details.insert("constraint".to_string(), json!(constraint));
}
InsertError::new(
StatusCode::CONFLICT,
"unique_violation",
unique_violation_message(constraint),
trace_id,
details,
)
}
pub(crate) fn window_prefilter_unique_violation_outcome(
job: &WindowInsertJob,
constraint: Option<&str>,
dedupe_source: &str,
) -> WindowInsertOutcome {
WindowInsertOutcome::Error(prefilter_unique_violation_error(
job.trace_id.clone(),
&job.table_name,
&job.client_name,
job.metadata_user_id.as_deref(),
job.metadata_company_id.as_deref(),
job.metadata_organization_id.as_deref(),
constraint,
dedupe_source,
))
}
pub(crate) fn window_insert_overloaded_outcome(
job: &WindowInsertJob,
message: &str,
admission_reason: &str,
) -> WindowInsertOutcome {
let mut details: Map<String, Value> = build_error_metadata(
&job.trace_id,
Some(&job.table_name),
&job.client_name,
job.metadata_user_id.as_deref(),
job.metadata_company_id.as_deref(),
job.metadata_organization_id.as_deref(),
);
details.insert("admission_reason".to_string(), json!(admission_reason));
details.insert("status".to_string(), json!("overloaded"));
WindowInsertOutcome::Error(InsertError::new(
StatusCode::TOO_MANY_REQUESTS,
"admission_overloaded",
message.to_string(),
job.trace_id.clone(),
details,
))
}
fn athena_verbose_logging_enabled() -> bool {
env::var("ATHENA_VERBOSE_LOGGING")
.ok()
.map(|v| {
let normalized = v.trim().to_ascii_lowercase();
matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
})
.unwrap_or(false)
}
fn athena_ansi_enabled() -> bool {
if env::var("NO_COLOR").is_ok() {
return false;
}
env::var("ATHENA_ANSI")
.ok()
.and_then(|v| {
let normalized = v.trim().to_ascii_lowercase();
match normalized.as_str() {
"1" | "true" | "yes" | "on" => Some(true),
"0" | "false" | "no" | "off" => Some(false),
_ => None,
}
})
.unwrap_or_else(|| std::io::stderr().is_terminal())
}
impl InsertError {
fn new(
status: StatusCode,
code: &'static str,
message: impl Into<String>,
trace_id: String,
details: Map<String, Value>,
) -> Self {
let details: Value = Value::Object(details);
Self {
status,
code,
message: message.into(),
trace_id,
details,
}
}
fn into_response(self) -> HttpResponse {
self.into_response_with_retry_after_ms(None)
}
fn into_response_with_retry_after_ms(self, retry_after_ms: Option<u64>) -> HttpResponse {
let mut builder = HttpResponse::build(self.status);
if let Some(retry_after_ms) = retry_after_ms {
let retry_after_seconds: u64 = retry_after_seconds_from_ms(retry_after_ms);
builder.insert_header((
actix_web::http::header::RETRY_AFTER,
retry_after_seconds.to_string(),
));
}
builder.json(json!({
"status": "error",
"success": false,
"code": self.code,
"message": self.message,
"details": self.details,
"trace_id": self.trace_id,
}))
}
fn from_driver(
driver_error: InsertDriverError,
table_name: Option<&str>,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
trace_id: String,
) -> Self {
let mut metadata: Map<String, Value> = build_error_metadata(
&trace_id,
table_name,
client_name,
user_id,
company_id,
organization_id,
);
let (status, code, message) = match driver_error {
InsertDriverError::Postgres(err) => match err {
PostgresInsertError::InvalidTableName => (
StatusCode::BAD_REQUEST,
"invalid_table_name",
"table name is invalid".to_string(),
),
PostgresInsertError::InvalidPayload(reason) => {
metadata.insert("validation_error".to_string(), json!(reason.clone()));
(
StatusCode::BAD_REQUEST,
"invalid_payload",
format!("insert payload is invalid: {}", reason),
)
}
PostgresInsertError::InvalidJsonString { column, reason } => {
metadata.insert("validation_error".to_string(), json!(reason.clone()));
metadata.insert("column".to_string(), json!(column.clone()));
(
StatusCode::BAD_REQUEST,
"invalid_json_string",
format!(
"insert payload contains invalid JSON string for JSON column '{}': {}",
column, reason
),
)
}
PostgresInsertError::NoValidColumns => (
StatusCode::BAD_REQUEST,
"no_columns",
"no valid columns provided for insert".to_string(),
),
PostgresInsertError::MissingReturnColumn => (
StatusCode::INTERNAL_SERVER_ERROR,
"missing_return_row",
"insert succeeded but returned no data column".to_string(),
),
PostgresInsertError::SqlExecution { message, sql_state } => {
let db_message: String = message.clone();
if let Some(state) = sql_state.clone() {
metadata.insert("sql_state".to_string(), json!(state));
}
let constraint_name: Option<String> = db_message
.split("constraint \"")
.nth(1)
.and_then(|constraint_start| constraint_start.split('"').next())
.map(str::to_string);
if let Some(constraint_name) = constraint_name.as_deref() {
metadata.insert("constraint".to_string(), json!(constraint_name));
}
metadata.insert("db_error".to_string(), json!(db_message.clone()));
let (status, code, message) = match sql_state.as_deref() {
Some("23505") => {
(
StatusCode::CONFLICT,
"unique_violation",
unique_violation_message(constraint_name.as_deref()),
)
}
Some("23503") => (
StatusCode::UNPROCESSABLE_ENTITY,
"foreign_key_violation",
"failed to execute insert: referenced resource missing or constraint violated"
.to_string(),
),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"sql_execution",
format!("failed to execute insert: {}", db_message),
),
};
(status, code, message)
}
},
InsertDriverError::Supabase(err) => match err {
SupabaseInsertError::UnknownClient { client_name } => {
metadata.insert("unknown_client".to_string(), json!(client_name.clone()));
(
StatusCode::BAD_REQUEST,
"unknown_client",
format!("unknown Supabase client '{}'", client_name),
)
}
SupabaseInsertError::ClientInit { message } => {
metadata.insert("client_init_error".to_string(), json!(message.clone()));
(
StatusCode::BAD_GATEWAY,
"client_initialization_failure",
format!("failed to configure Supabase client: {}", message),
)
}
SupabaseInsertError::InsertFailed { message } => {
metadata.insert("supabase_error".to_string(), json!(message.clone()));
(
StatusCode::INTERNAL_SERVER_ERROR,
"supabase_insert_failed",
format!("failed to insert data via Supabase: {}", message),
)
}
SupabaseInsertError::BackendUnavailable { message } => {
metadata.insert("backend_unavailable".to_string(), json!(message.clone()));
(
StatusCode::SERVICE_UNAVAILABLE,
"backend_unavailable",
message.clone(),
)
}
},
InsertDriverError::StageTimeout {
backend,
stage,
timeout_ms,
} => {
metadata.insert("backend".to_string(), json!(backend));
metadata.insert("stage".to_string(), json!(stage));
metadata.insert("timeout_ms".to_string(), json!(timeout_ms));
(
StatusCode::SERVICE_UNAVAILABLE,
"backend_unavailable",
format!(
"Insert stage '{}' timed out after {}ms (backend={}).",
stage, timeout_ms, backend
),
)
}
};
Self::new(status, code, message, trace_id, metadata)
}
}
fn build_error_metadata(
trace_id: &str,
table_name: Option<&str>,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
) -> Map<String, Value> {
let mut metadata: Map<String, Value> = Map::new();
metadata.insert("trace_id".to_string(), json!(trace_id));
metadata.insert("client".to_string(), json!(client_name));
if let Some(table) = table_name {
metadata.insert("table_name".to_string(), json!(table));
}
if let Some(user) = user_id {
metadata.insert("user_id".to_string(), json!(user));
}
if let Some(company) = company_id {
metadata.insert("company_id".to_string(), json!(company));
}
if let Some(org) = organization_id {
metadata.insert("organization_id".to_string(), json!(org));
}
metadata
}
pub(crate) async fn finish_postgres_insert_success_json_with_recovery(
app_state: Data<AppState>,
job: &WindowInsertJob,
inserted_row: Value,
) -> WindowInsertOutcome {
let timeout = tokio::time::Duration::from_secs(120);
match tokio::time::timeout(
timeout,
finish_postgres_insert_success_json(app_state.clone(), job, inserted_row),
)
.await
{
Ok(body) => WindowInsertOutcome::Success(body),
Err(_) => {
error!(
trace_id = %job.trace_id,
client = %job.client_name,
table = %job.table_name,
"Response serialization timeout (exceeded 120s) - audit log, webhooks, or cache invalidation likely hung"
);
app_state
.metrics_state
.record_gateway_insert_window_event("response_serialization_timeout");
let mut details = Map::new();
details.insert("table".to_string(), Value::String(job.table_name.clone()));
details.insert("client".to_string(), Value::String(job.client_name.clone()));
details.insert(
"likely_cause".to_string(),
Value::String("audit_log_or_webhook_delay".to_string()),
);
WindowInsertOutcome::Error(InsertError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"response_serialization_timeout",
"Response serialization exceeded maximum time limit (120s); check audit logging, webhooks, and cache invalidation".to_string(),
job.trace_id.clone(),
details,
))
}
}
}
pub(crate) async fn finish_postgres_insert_success_json(
app_state: Data<AppState>,
job: &WindowInsertJob,
inserted_row: Value,
) -> Value {
let response_started = std::time::Instant::now();
let ansi_enabled: bool = job.ansi_enabled;
let verbose_logging: bool = job.verbose_logging;
let trace_id: String = job.trace_id.clone();
let client_name: String = job.client_name.clone();
let table_name: String = job.table_name.clone();
let user_id: String = job.user_id.clone();
let company_id: String = job.company_id.clone();
let organization_id: String = job.organization_id.clone();
let insert_body: Value = job.insert_body.clone();
let body: Value = job.body.clone();
let resource_id_key: String = job.resource_id_key.clone();
let operation_start: Instant = job.operation_start;
let logged_request: LoggedRequest = LoggedRequest {
request_id: job.logged_request_id.clone(),
client_name: job.logged_client_name.clone(),
method: job.logged_method.clone(),
path: job.logged_path.clone(),
status_code: 0,
time: 0,
};
let mut diff_resource = json!({});
if let (Some(update_body), Some(insert_body_obj)) = (
body.get("update_body").and_then(Value::as_object),
insert_body.as_object(),
) {
for (key, new_value) in update_body {
if !new_value.is_null() {
let should_include: bool = match insert_body_obj.get(key) {
Some(existing) => existing != new_value,
None => true,
};
if should_include {
diff_resource[key] = json!({
"blame": {
"user_id": user_id.clone()
},
"new": new_value,
"old": null,
"time": chrono::Utc::now().timestamp(),
"void": false
});
}
}
}
}
let resource_id: String = inserted_row
.as_object()
.and_then(|obj| obj.get(&resource_id_key))
.and_then(Value::as_str)
.map(|s| s.to_string())
.unwrap_or_else(|| trace_id.clone());
let audit_message: String = format!(
"User {} inserted data into [{}] at {}",
user_id,
table_name,
Utc::now().timestamp()
);
let audit_log_started = std::time::Instant::now();
let audit_log_result: Result<Value, String> = insert_audit_log(
table_name.clone(),
resource_id.clone(),
insert_body.clone(),
company_id.clone(),
organization_id.clone(),
"domain".to_string(),
user_id.clone(),
audit_message,
"success".to_string(),
"request".to_string(),
user_id.clone(),
format!("insert-{}", table_name.clone()),
diff_resource,
json!({}),
true,
"/data/insert".to_string(),
&client_name,
)
.await;
let audit_log_duration = audit_log_started.elapsed();
match &audit_log_result {
Ok(audit_log_data) => {
if verbose_logging {
let audit_log_message = if ansi_enabled {
"\u{001b}[32mAudit log inserted successfully\u{001b}[0m (ATHENA_VERBOSE_LOGGING=1)"
} else {
"Audit log inserted successfully (ATHENA_VERBOSE_LOGGING=1)"
};
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
resource_id = %resource_id,
audit_log_duration_ms = audit_log_duration.as_millis(),
audit_payload = ?audit_log_data,
"{}",
audit_log_message
);
}
if audit_log_duration.as_millis() > 5000 {
warn!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
audit_log_duration_ms = audit_log_duration.as_millis(),
"Slow audit log insertion (>5s)"
);
}
}
Err(err) => {
error!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
audit_log_duration_ms = audit_log_duration.as_millis(),
error = %err,
"Failed to insert audit log"
);
}
}
if job.x_publish_event {
if let Some(company_for_event) = job.resolved_company_for_event.clone() {
let event: Value = json!({
"event": "INSERT",
"resource": table_name.clone(),
"inserted_by_user": user_id.clone(),
"company_id": company_for_event.clone(),
"insert_body": insert_body.clone(),
});
post_event(company_for_event, event).await;
} else {
info!(
trace_id = %trace_id,
"Skipping publish event because company_id is missing"
);
}
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"resource_id": resource_id,
"client": client_name,
})),
);
let success_json = json!({
"status": "success",
"success": true,
"message": "Data inserted successfully",
"data": inserted_row,
"resource_id": resource_id,
"table_name": table_name,
});
let mut wh_headers: Vec<(String, String)> = Vec::new();
if !user_id.is_empty() {
wh_headers.push(("x-user-id".to_string(), user_id.clone()));
}
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::GatewayWebhookTrigger {
client_name: client_name.clone(),
route_key: crate::webhooks::ROUTE_GATEWAY_INSERT.to_string(),
table_name: Some(table_name.clone()),
request_id: Some(logged_request.request_id.clone()),
request_method: job.logged_method.clone(),
request_path: job.logged_path.clone(),
headers: wh_headers,
payload: Some(body.clone()),
response: Some(success_json.clone()),
},
);
let response_duration = response_started.elapsed();
if response_duration.as_millis() > 10000 {
warn!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
response_serialize_duration_ms = response_duration.as_millis(),
"Slow response serialization phase (>10s) - check audit log, webhooks, cache invalidation"
);
} else if response_duration.as_millis() > 5000 {
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
response_serialize_duration_ms = response_duration.as_millis(),
"Response serialization took >5s"
);
}
success_json
}
pub(crate) async fn run_postgres_insert_to_outcome(
app_state: Data<AppState>,
job: WindowInsertJob,
) -> WindowInsertOutcome {
let trace_id: String = job.trace_id.clone();
let table_name: String = job.table_name.clone();
let client_name: String = job.client_name.clone();
let user_id: String = job.user_id.clone();
let metadata_user_id: Option<&str> = job.metadata_user_id.as_deref();
let metadata_company_id: Option<&str> = job.metadata_company_id.as_deref();
let metadata_organization_id: Option<&str> = job.metadata_organization_id.as_deref();
let ansi_enabled: bool = job.ansi_enabled;
let verbose_logging: bool = job.verbose_logging;
let Some(pool) = app_state.pg_registry.get_pool(&client_name) else {
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("unknown_client".to_string(), json!(client_name.clone()));
return WindowInsertOutcome::Error(InsertError::new(
StatusCode::BAD_REQUEST,
"unknown_client",
format!("Postgres client '{}' is not configured", client_name),
trace_id.clone(),
details,
));
};
if let Some(signature) =
build_insert_duplicate_signature(&client_name, &table_name, &job.insert_body)
{
let dedupe_started = Instant::now();
if let Some(cached_constraint) = lookup_recent_unique_violation(&signature) {
app_state
.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_hit");
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
return window_prefilter_unique_violation_outcome(
&job,
cached_constraint.as_deref(),
"recent_unique_violation_cache",
);
}
app_state
.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_miss");
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
}
let db_insert_started = Instant::now();
let db_timeout_ms: u64 = insert_db_timeout_ms(app_state.get_ref());
let insert_result: Result<(Value, String), InsertDriverError> =
match postgres_insert_with_timeout(&pool, &table_name, &job.insert_body, db_timeout_ms)
.await
{
Ok(value) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/insert", "sqlx");
Ok((
value,
format!("Inserted row into {} via postgres", table_name),
))
}
Err(err) => {
if matches!(err, InsertDriverError::StageTimeout { .. }) {
app_state
.metrics_state
.record_gateway_insert_window_event("db_timeout");
}
Err(err)
}
};
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"db_insert",
db_insert_started.elapsed().as_secs_f64(),
);
match insert_result {
Ok((inserted_row, _message)) => {
if should_invalidate_cache_after_insert(&inserted_row) {
let invalidation =
invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name)
.await;
if invalidation.did_run {
let cache_log_message = if ansi_enabled {
"\u{001b}[33mCache invalidated after insert\u{001b}[0m (reason=table_mutation, scope=client+table)"
} else {
"Cache invalidated after insert (reason=table_mutation, scope=client+table)"
};
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
invalidated_entries = invalidation.invalidated_entries,
cache_entries_remaining = invalidation.remaining_entries,
"{}",
cache_log_message
);
} else if verbose_logging {
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
debounce_delay_ms = invalidation.debounce_delay_ms.unwrap_or(0),
"Cache invalidation batched into debounce window"
);
}
}
let response_started = Instant::now();
let body: Value =
finish_postgres_insert_success_json(app_state.clone(), &job, inserted_row).await;
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"response_serialize",
response_started.elapsed().as_secs_f64(),
);
WindowInsertOutcome::Success(body)
}
Err(driver_error) => {
let insert_error: InsertError = InsertError::from_driver(
driver_error,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
trace_id.clone(),
);
remember_unique_violation_from_insert(
&client_name,
&table_name,
&job.insert_body,
insert_error.code,
&insert_error.details,
);
if insert_error.code == "unique_violation" {
app_state
.metrics_state
.record_gateway_insert_window_event("db_unique_violation");
}
error!(
table = %table_name,
client = %client_name,
user = %user_id,
trace_id = %insert_error.trace_id,
code = %insert_error.code,
"Failed to insert data: {}",
insert_error.message
);
let logged_request: LoggedRequest = LoggedRequest {
request_id: job.logged_request_id.clone(),
client_name: job.logged_client_name.clone(),
method: job.logged_method.clone(),
path: job.logged_path.clone(),
status_code: 0,
time: 0,
};
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
job.operation_start.elapsed().as_millis(),
insert_error.status,
Some(json!({
"code": insert_error.code,
"details": insert_error.details,
})),
);
if insert_error.code == "backend_unavailable" {
app_state
.metrics_state
.record_gateway_backend_unavailable("insert", &client_name);
}
WindowInsertOutcome::Error(insert_error)
}
}
}
#[put("/gateway/insert")]
pub async fn insert_data(
req: HttpRequest,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_insert_data(req, body.0, app_state).await
}
pub(crate) async fn handle_insert_data(
req: HttpRequest,
body: Value,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let identity: InsertIdentity = InsertIdentity::from_request(&req);
let trace_id: String = identity.trace_id.clone();
let verbose_logging: bool = athena_verbose_logging_enabled();
let ansi_enabled: bool = athena_ansi_enabled();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
let mut details = build_error_metadata(
&trace_id,
None,
&client_name,
identity.resolved_user_id().map(|s| s.as_str()),
identity.resolved_company_id().map(|s| s.as_str()),
identity.resolved_organization_id().map(|s| s.as_str()),
);
details.insert("missing_header".to_string(), json!("X-Athena-Client"));
return InsertError::new(
StatusCode::BAD_REQUEST,
"missing_client_header",
"X-Athena-Client header is required and cannot be empty",
trace_id.clone(),
details,
)
.into_response();
}
let requested_table_name = GatewayInsertRequest::table_name_from_body(&body);
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(requested_table_name.as_deref())],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
if auth.force_deferred_queue {
if GatewayInsertRequest::from_body(&body).is_none() {
return gateway_bad_request(
GatewayOperationKind::Insert,
"Invalid insert payload",
"table_name and insert_body are required in the request body",
);
}
let request_bytes: Option<u64> = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_INSERT,
auth.request_id.clone(),
client_name.clone(),
body,
)
.with_reason(auth.force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
if let Err(err) = enqueue_gateway_deferred_request(
app_state.get_ref(),
"PUT",
req.path(),
request_bytes,
&deferred_request,
)
.await
{
return gateway_service_unavailable(
GatewayOperationKind::Insert,
"Deferred queue unavailable",
format!("Failed to queue deferred insert request: {err}"),
);
}
return api_accepted(
"Insert request queued for deferred execution (auth fallback mode)",
json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
}),
);
}
let user_id: String = identity.audit_user_id();
let company_id: String = identity.audit_company_id();
let organization_id: String = identity.audit_organization_id();
let metadata_user_id: Option<&str> = identity.resolved_user_id().map(|s| s.as_str());
let metadata_company_id: Option<&str> = identity.resolved_company_id().map(|s| s.as_str());
let metadata_organization_id: Option<&str> =
identity.resolved_organization_id().map(|s| s.as_str());
let table_name: String = match GatewayInsertRequest::table_name_from_body(&body) {
Some(name) => name,
None => {
error!("table_name is required in the request body.");
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
None,
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("missing_field".to_string(), json!("table_name"));
return InsertError::new(
StatusCode::BAD_REQUEST,
"missing_field",
"table_name is required in the request body",
trace_id.clone(),
details,
)
.into_response();
}
};
let insert_body: Value = match GatewayInsertRequest::insert_body_from_body(&body) {
Some(body) => body,
None => {
error!("insert_body is required in the request body.");
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("missing_field".to_string(), json!("insert_body"));
return InsertError::new(
StatusCode::BAD_REQUEST,
"missing_field",
"insert_body is required in the request body",
trace_id.clone(),
details,
)
.into_response();
}
};
let resource_id_key: String = get_resource_id_key(&table_name).await;
let header_window: Option<u64> = x_athena_insert_window_ms(&req);
let cfg_window: u64 = app_state.gateway_insert_execution_window_ms;
let window_ms: Option<u64> = header_window.or_else(|| {
if cfg_window > 0 {
Some(cfg_window)
} else {
None
}
});
#[cfg(feature = "deadpool_experimental")]
let deadpool_requested_window: bool = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(not(feature = "deadpool_experimental"))]
let deadpool_requested_window: bool = false;
let postgres_pool_ready: bool = app_state.pg_registry.get_pool(&client_name).is_some();
let use_insert_window: bool = window_ms.is_some_and(|w| w > 0 && w <= 60_000)
&& postgres_pool_ready
&& !deadpool_requested_window
&& !insert_request_has_update_body(&body);
if !use_insert_window
&& postgres_pool_ready
&& let Some(signature) =
build_insert_duplicate_signature(&client_name, &table_name, &insert_body)
{
let dedupe_started = Instant::now();
if let Some(cached_constraint) = lookup_recent_unique_violation(&signature) {
app_state
.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_hit");
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
let insert_error = prefilter_unique_violation_error(
trace_id.clone(),
&table_name,
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
cached_constraint.as_deref(),
"recent_unique_violation_cache",
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
insert_error.status,
Some(json!({
"code": insert_error.code,
"details": insert_error.details,
})),
);
return insert_error.into_response();
}
app_state
.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_miss");
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
}
if use_insert_window {
if insert_admission_defer_enabled() {
let pending_queue: usize = app_state
.insert_window_coordinator
.pending_queue_len()
.await;
let max_queued: usize = app_state.insert_window_coordinator.max_queued();
let high_watermark_ratio: f64 = insert_admission_high_watermark_ratio();
if should_defer_insert_for_queue_pressure(
pending_queue,
max_queued,
high_watermark_ratio,
) {
app_state
.metrics_state
.record_gateway_insert_window_event("admission_defer_high_watermark");
let request_bytes: Option<u64> = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_INSERT,
auth.request_id.clone(),
client_name.clone(),
body.clone(),
)
.with_reason(Some("insert_window_queue_high_watermark".to_string()))
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
if let Err(err) = enqueue_gateway_deferred_request(
app_state.get_ref(),
"PUT",
req.path(),
request_bytes,
&deferred_request,
)
.await
{
let retry_after_ms: u64 = insert_overload_retry_after_ms(
pending_queue,
max_queued,
window_ms.unwrap_or(0),
);
let retry_after_seconds: u64 = retry_after_seconds_from_ms(retry_after_ms);
app_state
.metrics_state
.record_gateway_insert_window_event("admission_reject_enqueue_failed");
return HttpResponse::TooManyRequests()
.insert_header((
actix_web::http::header::RETRY_AFTER,
retry_after_seconds.to_string(),
))
.json(json!({
"status": "error",
"code": "admission_overloaded",
"message": "Insert queue is under high load; request rejected to protect database capacity",
"trace_id": trace_id,
"details": {
"reason": "insert_window_queue_high_watermark",
"pending_queue": pending_queue,
"max_queued": max_queued,
"high_watermark_ratio": high_watermark_ratio,
"retry_after_ms": retry_after_ms,
"deferred_enqueue_error": err,
}
}));
}
return api_accepted(
"Insert request queued for deferred execution (admission control)",
json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
"reason": "insert_window_queue_high_watermark",
"pending_queue": pending_queue,
"max_queued": max_queued,
"high_watermark_ratio": high_watermark_ratio,
}),
);
}
}
let wm: u64 = window_ms.unwrap_or(0);
let due: tokio::time::Instant =
tokio::time::Instant::now() + tokio::time::Duration::from_millis(wm);
let merge_eligible: bool = !insert_request_has_update_body(&body);
let resolved_company_for_event: Option<String> = identity.resolved_company_id().cloned();
let job: WindowInsertJob = WindowInsertJob {
trace_id: trace_id.clone(),
user_id: user_id.clone(),
company_id: company_id.clone(),
organization_id: organization_id.clone(),
metadata_user_id: metadata_user_id.map(str::to_string),
metadata_company_id: metadata_company_id.map(str::to_string),
metadata_organization_id: metadata_organization_id.map(str::to_string),
body: body.clone(),
table_name: table_name.clone(),
insert_body: insert_body.clone(),
resource_id_key: resource_id_key.clone(),
client_name: client_name.clone(),
merge_eligible,
logged_request_id: auth.request_id.clone(),
logged_client_name: logged_request.client_name.clone(),
logged_method: logged_request.method.clone(),
logged_path: logged_request.path.clone(),
operation_start,
verbose_logging,
ansi_enabled,
x_publish_event: get_x_publish_event(&req),
resolved_company_for_event,
due,
};
let (tx, rx) = tokio::sync::oneshot::channel();
app_state.insert_window_coordinator.submit(job, tx).await;
let response_timeout_ms = insert_window_response_timeout_ms(app_state.get_ref(), wm);
let response_timeout = tokio::time::Duration::from_millis(response_timeout_ms);
return match tokio::time::timeout(response_timeout, rx).await {
Ok(Ok(outcome)) => outcome.into_http_response(),
Ok(Err(_)) => gateway_internal_error(
GatewayOperationKind::Insert,
"Insert execution failed",
"Insert window worker dropped response channel",
),
Err(_) => {
app_state
.metrics_state
.record_gateway_insert_window_event("window_wait_timeout");
let pending_queue: usize = app_state
.insert_window_coordinator
.pending_queue_len()
.await;
let max_queued: usize = app_state.insert_window_coordinator.max_queued();
let high_watermark_ratio: f64 = insert_admission_high_watermark_ratio();
let queue_high_watermark: usize =
queue_high_watermark(max_queued, high_watermark_ratio);
let overloaded: bool =
pending_queue >= queue_high_watermark && queue_high_watermark > 0;
let (insert_error, retry_after_ms): (InsertError, Option<u64>) = if overloaded {
let retry_after_ms: u64 =
insert_overload_retry_after_ms(pending_queue, max_queued, wm);
app_state
.metrics_state
.record_gateway_insert_window_event("window_wait_timeout_overloaded");
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("backend".to_string(), json!("postgres"));
details.insert("stage".to_string(), json!("window_wait"));
details.insert("timeout_ms".to_string(), json!(response_timeout_ms));
details.insert("pending_queue".to_string(), json!(pending_queue));
details.insert("max_queued".to_string(), json!(max_queued));
details.insert(
"high_watermark_ratio".to_string(),
json!(high_watermark_ratio),
);
details.insert(
"queue_high_watermark".to_string(),
json!(queue_high_watermark),
);
details.insert("retry_after_ms".to_string(), json!(retry_after_ms));
details.insert("status".to_string(), json!("overloaded"));
(
InsertError::new(
StatusCode::TOO_MANY_REQUESTS,
"admission_overloaded",
format!(
"Insert window wait timed out after {}ms under queue pressure (pending {} / {}, high watermark {}).",
response_timeout_ms,
pending_queue,
max_queued,
queue_high_watermark
),
trace_id.clone(),
details,
),
Some(retry_after_ms),
)
} else {
app_state
.metrics_state
.record_gateway_insert_window_event("window_wait_timeout_backend");
app_state
.metrics_state
.record_gateway_backend_unavailable("insert", &client_name);
(
InsertError::from_driver(
InsertDriverError::StageTimeout {
backend: "postgres",
stage: "window_wait",
timeout_ms: response_timeout_ms,
},
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
trace_id.clone(),
),
None,
)
};
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
insert_error.status,
Some(json!({
"code": insert_error.code,
"details": insert_error.details,
})),
);
insert_error.into_response_with_retry_after_ms(retry_after_ms)
}
};
}
let db_insert_started = Instant::now();
let db_timeout_ms: u64 = insert_db_timeout_ms(app_state.get_ref());
let insert_result: Result<(Value, String), InsertDriverError> = if let Some(pool) =
app_state.pg_registry.get_pool(&client_name)
{
#[cfg(feature = "deadpool_experimental")]
'postgres: {
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(feature = "deadpool_experimental")]
if deadpool_requested {
if let Some(deadpool_pool) = app_state.deadpool_registry.get_pool(&client_name) {
match insert_row_deadpool(
&deadpool_pool,
&table_name,
&insert_body,
deadpool_checkout_timeout(),
)
.await
{
Ok(value) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/insert", "deadpool");
tracing::info!(
trace_id = %trace_id,
backend = "postgres",
pg_backend = "deadpool",
"insert_row finished"
);
break 'postgres Ok((
value,
format!("Inserted row into {} via postgres (deadpool)", table_name),
));
}
Err(err) => {
if err.is_db_error {
let processed = process_tokio_postgres_db_error(
err.sql_state.as_deref().unwrap_or(""),
&err.message,
Some(&table_name),
);
return HttpResponse::build(processed.status_code)
.content_type("application/json")
.json(processed.to_json());
}
app_state.metrics_state.record_deadpool_fallback(
"/gateway/insert",
deadpool_fallback_reason_label(err.reason),
);
tracing::warn!(
trace_id = %trace_id,
reason = ?err.reason,
"deadpool insert failed; falling back to sqlx"
);
}
}
}
}
match postgres_insert_with_timeout(&pool, &table_name, &insert_body, db_timeout_ms)
.await
{
Ok(value) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/insert", "sqlx");
Ok((
value,
format!("Inserted row into {} via postgres", table_name),
))
}
Err(err) => {
if matches!(err, InsertDriverError::StageTimeout { .. }) {
app_state
.metrics_state
.record_gateway_insert_window_event("db_timeout");
}
Err(err)
}
}
}
#[cfg(not(feature = "deadpool_experimental"))]
match postgres_insert_with_timeout(&pool, &table_name, &insert_body, db_timeout_ms).await {
Ok(value) => Ok((
value,
format!("Inserted row into {} via postgres", table_name),
)),
Err(err) => {
if matches!(err, InsertDriverError::StageTimeout { .. }) {
app_state
.metrics_state
.record_gateway_insert_window_event("db_timeout");
}
Err(err)
}
}
} else {
supabase_insert_with_timeout(
table_name.clone(),
insert_body.clone(),
&client_name.clone(),
db_timeout_ms,
)
.await
};
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"db_insert",
db_insert_started.elapsed().as_secs_f64(),
);
match insert_result {
Ok((inserted_row, _message)) => {
if should_invalidate_cache_after_insert(&inserted_row) {
let invalidation =
invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name)
.await;
if invalidation.did_run {
let cache_log_message = if ansi_enabled {
"\u{001b}[33mCache invalidated after insert\u{001b}[0m (reason=table_mutation, scope=client+table)"
} else {
"Cache invalidated after insert (reason=table_mutation, scope=client+table)"
};
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
invalidated_entries = invalidation.invalidated_entries,
cache_entries_remaining = invalidation.remaining_entries,
"{}",
cache_log_message
);
} else if verbose_logging {
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
debounce_delay_ms = invalidation.debounce_delay_ms.unwrap_or(0),
"Cache invalidation batched into debounce window"
);
}
}
let mut diff_resource = json!({});
if let (Some(update_body), Some(insert_body_obj)) = (
body.get("update_body").and_then(Value::as_object),
insert_body.as_object(),
) {
for (key, new_value) in update_body {
let inserted_value: Option<&Value> = insert_body_obj.get(key);
if !new_value.is_null() {
let should_include: bool = match inserted_value {
Some(existing) => existing != new_value,
None => true,
};
if should_include {
diff_resource[key] = json!({
"blame": {
"user_id": user_id.clone()
},
"new": new_value,
"old": null,
"time": chrono::Utc::now().timestamp(),
"void": false
});
}
}
}
}
let resource_id: String = inserted_row
.as_object()
.and_then(|obj| obj.get(&resource_id_key))
.and_then(Value::as_str)
.map(|s| s.to_string())
.unwrap_or_else(|| trace_id.clone());
let audit_message: String = format!(
"User {} inserted data into [{}] at {}",
user_id,
table_name,
Utc::now().timestamp()
);
let audit_log_result: Result<Value, String> = insert_audit_log(
table_name.clone(),
resource_id.clone(),
insert_body.clone(),
company_id.clone(),
organization_id.clone(),
"domain".to_string(),
user_id.clone(),
audit_message,
"success".to_string(),
"request".to_string(),
user_id.clone(),
format!("insert-{}", table_name.clone()),
diff_resource,
json!({}),
true,
"/data/insert".to_string(),
&client_name,
)
.await;
match &audit_log_result {
Ok(audit_log_data) => {
if verbose_logging {
let audit_log_message = if ansi_enabled {
"\u{001b}[32mAudit log inserted successfully\u{001b}[0m (ATHENA_VERBOSE_LOGGING=1)"
} else {
"Audit log inserted successfully (ATHENA_VERBOSE_LOGGING=1)"
};
info!(
trace_id = %trace_id,
client = %client_name,
table = %table_name,
resource_id = %resource_id,
audit_payload = ?audit_log_data,
"{}",
audit_log_message
);
}
}
Err(err) => error!(
"Failed to insert audit log: {:?}, AUDIT_LOG_RESULT: {:#?}",
err, audit_log_result
),
}
let x_publish_event: bool = get_x_publish_event(&req);
if x_publish_event {
if let Some(company_for_event) = identity.resolved_company_id().cloned() {
let event: Value = json!({
"event": "INSERT",
"resource": table_name.clone(),
"inserted_by_user": user_id,
"company_id": company_for_event.clone(),
"insert_body": insert_body.clone(),
});
post_event(company_for_event, event).await;
} else {
info!(
trace_id = %trace_id,
"Skipping publish event because company_id is missing"
);
}
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"resource_id": resource_id,
"client": client_name,
})),
);
let insert_response = json!({
"status": "success",
"success": true,
"message": "Data inserted successfully",
"data": inserted_row,
"resource_id": resource_id,
"table_name": table_name,
});
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_GATEWAY_INSERT,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
Some(json!({
"table_name": table_name,
"insert_body": insert_body,
})),
Some(insert_response.clone()),
),
);
let response_started = Instant::now();
let response = HttpResponse::Ok().json(insert_response);
app_state
.metrics_state
.record_gateway_insert_phase_duration(
"response_serialize",
response_started.elapsed().as_secs_f64(),
);
response
}
Err(driver_error) => {
let insert_error: InsertError = InsertError::from_driver(
driver_error,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
trace_id.clone(),
);
remember_unique_violation_from_insert(
&client_name,
&table_name,
&insert_body,
insert_error.code,
&insert_error.details,
);
if insert_error.code == "unique_violation" {
app_state
.metrics_state
.record_gateway_insert_window_event("db_unique_violation");
}
error!(
table = %table_name,
client = %client_name,
user = %user_id,
trace_id = %insert_error.trace_id,
code = %insert_error.code,
"Failed to insert data: {}",
insert_error.message
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
insert_error.status,
Some(json!({
"code": insert_error.code,
"details": insert_error.details,
})),
);
if insert_error.code == "backend_unavailable" {
app_state
.metrics_state
.record_gateway_backend_unavailable("insert", &client_name);
}
insert_error.into_response()
}
}
}
pub async fn insert(
table_name: String,
data: Value,
client_name: &str,
) -> Result<(Value, String), SupabaseInsertError> {
if let Ok(health_client) = client_router_health_aware(client_name) {
match health_client.insert(&table_name, data.clone()).await {
Ok(_) => {
return Ok((
data,
format!("Successfully inserted data into {:?}", table_name),
));
}
Err(err) => {
if err
.downcast_ref::<crate::drivers::scylla::health::HostOffline>()
.is_some()
{
return Err(SupabaseInsertError::BackendUnavailable {
message: format!(
"Backend {} temporarily unavailable (circuit breaker)",
client_name
),
});
}
return Err(SupabaseInsertError::InsertFailed {
message: err.to_string(),
});
}
}
}
let client: SupabaseClient = client_router(client_name).await.map_err(|err| {
if err.starts_with("Unknown client name") {
SupabaseInsertError::UnknownClient {
client_name: client_name.to_string(),
}
} else {
SupabaseInsertError::ClientInit { message: err }
}
})?;
let insert_result: Result<String, String> = client.insert(&table_name, data.clone()).await;
match insert_result {
Ok(_result) => Ok((
data,
format!("Successfully inserted data into {:?}", table_name),
)),
Err(err) => Err(SupabaseInsertError::InsertFailed { message: err }),
}
}
#[allow(clippy::too_many_arguments)]
async fn insert_audit_log(
_table_name: String,
_resource_id: String,
_insert_body: Value,
_company_id: String,
_organization_id: String,
_domain: String,
_user_id: String,
_message: String,
_status: String,
_source: String,
_actor_id: String,
_action: String,
_diff_resource: Value,
_meta: Value,
_success: bool,
_path: String,
_client_name: &str,
) -> Result<Value, String> {
Ok(json!({ "audit": true }))
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::Instant as TokioInstant;
#[test]
fn backend_unavailable_maps_to_503() {
let error = InsertError::from_driver(
InsertDriverError::Supabase(SupabaseInsertError::BackendUnavailable {
message: "Backend supabase temporarily unavailable (circuit breaker)".to_string(),
}),
Some("users"),
"supabase",
None,
None,
None,
"trace-1".to_string(),
);
assert_eq!(error.status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(error.code, "backend_unavailable");
}
#[test]
fn stage_timeout_maps_to_backend_unavailable_503() {
let error = InsertError::from_driver(
InsertDriverError::StageTimeout {
backend: "postgres",
stage: "db_insert",
timeout_ms: 2500,
},
Some("users"),
"postgres",
None,
None,
None,
"trace-timeout".to_string(),
);
assert_eq!(error.status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(error.code, "backend_unavailable");
assert_eq!(error.details["backend"], json!("postgres"));
assert_eq!(error.details["stage"], json!("db_insert"));
assert_eq!(error.details["timeout_ms"], json!(2500));
}
#[test]
fn sql_23505_unique_violation_gets_universal_message() {
let error = InsertError::from_driver(
InsertDriverError::Postgres(PostgresInsertError::SqlExecution {
message: "duplicate key value violates unique constraint \"users_email_key\""
.to_string(),
sql_state: Some("23505".to_string()),
}),
Some("users"),
"railway_direct",
Some("user-1"),
Some("company-1"),
Some("org-1"),
"trace-1".to_string(),
);
assert_eq!(error.status, StatusCode::CONFLICT);
assert_eq!(error.code, "unique_violation");
assert!(error.message.contains("duplicate record"));
assert!(error.message.contains("/gateway/update"));
assert_eq!(error.details["constraint"], json!("users_email_key"));
assert_eq!(error.details["sql_state"], json!("23505"));
}
#[test]
fn invalid_json_string_maps_to_bad_request() {
let error = InsertError::from_driver(
InsertDriverError::Postgres(PostgresInsertError::InvalidJsonString {
column: "payload".to_string(),
reason: "expected value at line 1 column 1".to_string(),
}),
Some("users"),
"postgres",
None,
None,
None,
"trace-json".to_string(),
);
assert_eq!(error.status, StatusCode::BAD_REQUEST);
assert_eq!(error.code, "invalid_json_string");
assert_eq!(error.details["column"], json!("payload"));
assert_eq!(
error.details["validation_error"],
json!("expected value at line 1 column 1")
);
}
#[test]
fn cache_invalidation_skips_null_insert_payload() {
assert!(!should_invalidate_cache_after_insert(&Value::Null));
}
#[test]
fn cache_invalidation_skips_empty_object_insert_payload() {
assert!(!should_invalidate_cache_after_insert(&json!({})));
}
#[test]
fn cache_invalidation_runs_for_non_empty_insert_payload() {
assert!(should_invalidate_cache_after_insert(&json!({"id": 1})));
assert!(should_invalidate_cache_after_insert(&json!([{"id": 1}])));
}
#[test]
fn duplicate_signature_is_stable_for_object_key_order() {
let a: Value = json!({"b": 2, "a": 1});
let b: Value = json!({"a": 1, "b": 2});
let sig_a = build_insert_duplicate_signature("client", "users", &a);
let sig_b = build_insert_duplicate_signature("client", "users", &b);
assert_eq!(sig_a, sig_b);
}
#[test]
fn recent_unique_violation_cache_roundtrip() {
let sig = "client\u{001f}users\u{001f}{\"id\":1}".to_string();
store_recent_unique_violation(sig.clone(), Some("users_id_key".to_string()));
let found = lookup_recent_unique_violation(&sig);
assert_eq!(found, Some(Some("users_id_key".to_string())));
}
#[test]
fn queue_pressure_uses_high_watermark_threshold() {
assert!(!should_defer_insert_for_queue_pressure(84, 100, 0.85));
assert!(should_defer_insert_for_queue_pressure(85, 100, 0.85));
assert!(should_defer_insert_for_queue_pressure(100, 100, 0.85));
assert!(!should_defer_insert_for_queue_pressure(0, 0, 0.85));
}
#[test]
fn retry_after_seconds_rounds_and_clamps() {
assert_eq!(retry_after_seconds_from_ms(1), 1);
assert_eq!(retry_after_seconds_from_ms(999), 1);
assert_eq!(retry_after_seconds_from_ms(1000), 1);
assert_eq!(retry_after_seconds_from_ms(1001), 2);
assert_eq!(retry_after_seconds_from_ms(60_000), 60);
assert_eq!(retry_after_seconds_from_ms(120_000), 60);
}
#[test]
fn prefilter_unique_violation_matches_db_unique_violation_contract() {
let db_error = InsertError::from_driver(
InsertDriverError::Postgres(PostgresInsertError::SqlExecution {
message: "duplicate key value violates unique constraint \"users_email_key\""
.to_string(),
sql_state: Some("23505".to_string()),
}),
Some("users"),
"client-a",
Some("user-a"),
Some("company-a"),
Some("org-a"),
"trace-db".to_string(),
);
let prefiltered = prefilter_unique_violation_error(
"trace-prefilter".to_string(),
"users",
"client-a",
Some("user-a"),
Some("company-a"),
Some("org-a"),
Some("users_email_key"),
"recent_unique_violation_cache",
);
assert_eq!(db_error.status, StatusCode::CONFLICT);
assert_eq!(prefiltered.status, StatusCode::CONFLICT);
assert_eq!(db_error.code, "unique_violation");
assert_eq!(prefiltered.code, "unique_violation");
assert_eq!(db_error.message, prefiltered.message);
assert_eq!(prefiltered.details["sql_state"], json!("23505"));
assert_eq!(
prefiltered.details["dedupe_source"],
json!("recent_unique_violation_cache")
);
}
#[test]
fn remember_unique_violation_only_records_unique_errors() {
let unique_body = json!({ "email": "unit-test-remember@example.com" });
let unique_signature = build_insert_duplicate_signature("client-b", "users", &unique_body)
.expect("signature should be generated");
remember_unique_violation_from_insert(
"client-b",
"users",
&unique_body,
"sql_execution",
&json!({"constraint": "users_email_key"}),
);
assert_eq!(lookup_recent_unique_violation(&unique_signature), None);
remember_unique_violation_from_insert(
"client-b",
"users",
&unique_body,
"unique_violation",
&json!({"constraint": "users_email_key"}),
);
assert_eq!(
lookup_recent_unique_violation(&unique_signature),
Some(Some("users_email_key".to_string()))
);
}
#[test]
fn window_insert_overloaded_outcome_returns_admission_contract() {
let job = WindowInsertJob {
trace_id: "trace-overload".to_string(),
user_id: "user-1".to_string(),
company_id: "company-1".to_string(),
organization_id: "org-1".to_string(),
metadata_user_id: Some("user-1".to_string()),
metadata_company_id: Some("company-1".to_string()),
metadata_organization_id: Some("org-1".to_string()),
body: json!({"table_name": "users", "insert_body": {"id": 1}}),
table_name: "users".to_string(),
insert_body: json!({"id": 1}),
resource_id_key: "id".to_string(),
client_name: "client-c".to_string(),
merge_eligible: true,
logged_request_id: "req-1".to_string(),
logged_client_name: "client-c".to_string(),
logged_method: "PUT".to_string(),
logged_path: "/gateway/insert".to_string(),
operation_start: Instant::now(),
verbose_logging: false,
ansi_enabled: false,
x_publish_event: false,
resolved_company_for_event: Some("company-1".to_string()),
due: TokioInstant::now(),
};
let outcome = window_insert_overloaded_outcome(&job, "overloaded", "window_queue_full");
match outcome {
WindowInsertOutcome::Error(err) => {
assert_eq!(err.status, StatusCode::TOO_MANY_REQUESTS);
assert_eq!(err.code, "admission_overloaded");
assert_eq!(err.message, "overloaded");
assert_eq!(err.details["admission_reason"], json!("window_queue_full"));
assert_eq!(err.details["table_name"], json!("users"));
}
WindowInsertOutcome::Success(_) => {
panic!("expected error outcome for overload")
}
}
}
#[test]
fn insert_error_with_retry_after_emits_header() {
let error = InsertError::new(
StatusCode::TOO_MANY_REQUESTS,
"admission_overloaded",
"retry later",
"trace-retry-after".to_string(),
Map::new(),
);
let response = error.into_response_with_retry_after_ms(Some(1500));
let header_value = response
.headers()
.get(actix_web::http::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.unwrap_or("0");
assert_eq!(header_value, "2");
}
}