use actix_web::{HttpRequest, HttpResponse, http::StatusCode, put, web::Json};
use chrono::Utc;
use serde_json::{Map, Value, json};
use std::time::Instant;
use tracing::{error, info};
mod identity;
use self::identity::InsertIdentity;
use crate::AppState;
use crate::api::gateway::auth::{authorize_gateway_request, write_right_for_resource};
use crate::api::gateway::update::table_id_map::get_resource_id_key;
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_publish_event::get_x_publish_event;
use crate::data::events::post_event;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_crud::insert_row_deadpool;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::deadpool_fallback_reason_label;
use crate::drivers::postgresql::sqlx_driver::{PostgresInsertError, insert_row};
use crate::drivers::supabase::client_router;
#[cfg(feature = "deadpool_experimental")]
use crate::error::tokio_postgres_parser::process_tokio_postgres_db_error;
use crate::utils::request_logging::LoggedRequest;
use crate::utils::request_logging::{log_operation_event, log_request};
use actix_web::web::Data;
#[allow(unused_imports)]
use moka::future::Cache;
use supabase_rs::SupabaseClient;
#[allow(unused_imports)]
use tokio::sync::MutexGuard;
#[derive(Debug)]
pub enum SupabaseInsertError {
UnknownClient { client_name: String },
ClientInit { message: String },
InsertFailed { message: String },
}
#[derive(Debug)]
enum InsertDriverError {
Postgres(PostgresInsertError),
Supabase(SupabaseInsertError),
}
#[derive(Debug)]
struct InsertError {
status: StatusCode,
code: &'static str,
message: String,
trace_id: String,
details: Value,
}
impl InsertError {
fn new(
status: StatusCode,
code: &'static str,
message: impl Into<String>,
trace_id: String,
details: Map<String, Value>,
) -> Self {
let details: Value = Value::Object(details);
Self {
status,
code,
message: message.into(),
trace_id,
details,
}
}
fn into_response(self) -> HttpResponse {
HttpResponse::build(self.status).json(json!({
"status": "error",
"success": false,
"code": self.code,
"message": self.message,
"details": self.details,
"trace_id": self.trace_id,
}))
}
fn from_driver(
driver_error: InsertDriverError,
table_name: Option<&str>,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
trace_id: String,
) -> Self {
let mut metadata: Map<String, Value> = build_error_metadata(
&trace_id,
table_name,
client_name,
user_id,
company_id,
organization_id,
);
let (status, code, message) = match driver_error {
InsertDriverError::Postgres(err) => match err {
PostgresInsertError::InvalidTableName => (
StatusCode::BAD_REQUEST,
"invalid_table_name",
"table name is invalid".to_string(),
),
PostgresInsertError::InvalidPayload(reason) => {
metadata.insert("validation_error".to_string(), json!(reason.clone()));
(
StatusCode::BAD_REQUEST,
"invalid_payload",
format!("insert payload is invalid: {}", reason),
)
}
PostgresInsertError::NoValidColumns => (
StatusCode::BAD_REQUEST,
"no_columns",
"no valid columns provided for insert".to_string(),
),
PostgresInsertError::MissingReturnColumn => (
StatusCode::INTERNAL_SERVER_ERROR,
"missing_return_row",
"insert succeeded but returned no data column".to_string(),
),
PostgresInsertError::SqlExecution { message, sql_state } => {
let db_message: String = message.clone();
if let Some(state) = sql_state.clone() {
metadata.insert("sql_state".to_string(), json!(state));
}
let constraint_name: Option<String> = db_message
.split("constraint \"")
.nth(1)
.and_then(|constraint_start| constraint_start.split('"').next())
.map(str::to_string);
if let Some(constraint_name) = constraint_name.as_deref() {
metadata.insert("constraint".to_string(), json!(constraint_name));
}
metadata.insert("db_error".to_string(), json!(db_message.clone()));
let (status, code, message) = match sql_state.as_deref() {
Some("23505") => {
let unique_violation_message = match constraint_name.as_deref() {
Some(name) => format!(
"Insert would create a duplicate record (unique constraint '{}'). Use /gateway/update to modify the existing record or change the unique field values.",
name
),
None => "Insert would create a duplicate record. Use /gateway/update to modify the existing record or change the unique field values.".to_string(),
};
(
StatusCode::CONFLICT,
"unique_violation",
unique_violation_message,
)
}
Some("23503") => (
StatusCode::UNPROCESSABLE_ENTITY,
"foreign_key_violation",
"failed to execute insert: referenced resource missing or constraint violated"
.to_string(),
),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"sql_execution",
format!("failed to execute insert: {}", db_message),
),
};
(status, code, message)
}
},
InsertDriverError::Supabase(err) => match err {
SupabaseInsertError::UnknownClient { client_name } => {
metadata.insert("unknown_client".to_string(), json!(client_name.clone()));
(
StatusCode::BAD_REQUEST,
"unknown_client",
format!("unknown Supabase client '{}'", client_name),
)
}
SupabaseInsertError::ClientInit { message } => {
metadata.insert("client_init_error".to_string(), json!(message.clone()));
(
StatusCode::BAD_GATEWAY,
"client_initialization_failure",
format!("failed to configure Supabase client: {}", message),
)
}
SupabaseInsertError::InsertFailed { message } => {
metadata.insert("supabase_error".to_string(), json!(message.clone()));
(
StatusCode::INTERNAL_SERVER_ERROR,
"supabase_insert_failed",
format!("failed to insert data via Supabase: {}", message),
)
}
},
};
Self::new(status, code, message, trace_id, metadata)
}
}
fn build_error_metadata(
trace_id: &str,
table_name: Option<&str>,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
) -> Map<String, Value> {
let mut metadata: Map<String, Value> = Map::new();
metadata.insert("trace_id".to_string(), json!(trace_id));
metadata.insert("client".to_string(), json!(client_name));
if let Some(table) = table_name {
metadata.insert("table_name".to_string(), json!(table));
}
if let Some(user) = user_id {
metadata.insert("user_id".to_string(), json!(user));
}
if let Some(company) = company_id {
metadata.insert("company_id".to_string(), json!(company));
}
if let Some(org) = organization_id {
metadata.insert("organization_id".to_string(), json!(org));
}
metadata
}
#[put("/gateway/insert")]
pub async fn insert_data(
req: HttpRequest,
body: Json<Value>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let identity: InsertIdentity = InsertIdentity::from_request(&req);
let trace_id: String = identity.trace_id.clone();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
let mut details = build_error_metadata(
&trace_id,
None,
&client_name,
identity.resolved_user_id().map(|s| s.as_str()),
identity.resolved_company_id().map(|s| s.as_str()),
identity.resolved_organization_id().map(|s| s.as_str()),
);
details.insert("missing_header".to_string(), json!("X-Athena-Client"));
return InsertError::new(
StatusCode::BAD_REQUEST,
"missing_client_header",
"X-Athena-Client header is required and cannot be empty",
trace_id.clone(),
details,
)
.into_response();
}
let requested_table_name = body
.get("table_name")
.and_then(Value::as_str)
.map(str::to_string);
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(requested_table_name.as_deref())],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
for header in identity.missing_headers() {
info!(
trace_id = %trace_id,
header = %header,
"deprecated Athena header missing"
);
}
let user_id: String = identity.audit_user_id();
let company_id: String = identity.audit_company_id();
let organization_id: String = identity.audit_organization_id();
let metadata_user_id: Option<&str> = identity.resolved_user_id().map(|s| s.as_str());
let metadata_company_id: Option<&str> = identity.resolved_company_id().map(|s| s.as_str());
let metadata_organization_id: Option<&str> =
identity.resolved_organization_id().map(|s| s.as_str());
let table_name: String = match body.get("table_name").and_then(Value::as_str) {
Some(name) => name.to_string(),
None => {
error!("table_name is required in the request body.");
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
None,
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("missing_field".to_string(), json!("table_name"));
return InsertError::new(
StatusCode::BAD_REQUEST,
"missing_field",
"table_name is required in the request body",
trace_id.clone(),
details,
)
.into_response();
}
};
let insert_body: Value = match body.get("insert_body") {
Some(body) => body.clone(),
None => {
error!("insert_body is required in the request body.");
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
);
details.insert("missing_field".to_string(), json!("insert_body"));
return InsertError::new(
StatusCode::BAD_REQUEST,
"missing_field",
"insert_body is required in the request body",
trace_id.clone(),
details,
)
.into_response();
}
};
let resource_id_key: String = get_resource_id_key(&table_name).await;
let insert_result: Result<(Value, String), InsertDriverError> = if let Some(pool) =
app_state.pg_registry.get_pool(&client_name)
{
#[cfg(feature = "deadpool_experimental")]
'postgres: {
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(feature = "deadpool_experimental")]
if deadpool_requested {
if let Some(deadpool_pool) = app_state.deadpool_registry.get_pool(&client_name) {
let checkout_timeout_ms: u64 =
std::env::var("ATHENA_DEADPOOL_CHECKOUT_TIMEOUT_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(800);
match insert_row_deadpool(
&deadpool_pool,
&table_name,
&insert_body,
std::time::Duration::from_millis(checkout_timeout_ms),
)
.await
{
Ok(value) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/insert", "deadpool");
tracing::info!(
trace_id = %trace_id,
backend = "postgres",
pg_backend = "deadpool",
"insert_row finished"
);
break 'postgres Ok((
value,
format!("Inserted row into {} via postgres (deadpool)", table_name),
));
}
Err(err) => {
if err.is_db_error {
let processed = process_tokio_postgres_db_error(
err.sql_state.as_deref().unwrap_or(""),
&err.message,
Some(&table_name),
);
return HttpResponse::build(processed.status_code)
.content_type("application/json")
.json(processed.to_json());
}
app_state.metrics_state.record_deadpool_fallback(
"/gateway/insert",
deadpool_fallback_reason_label(err.reason),
);
tracing::warn!(
trace_id = %trace_id,
reason = ?err.reason,
"deadpool insert failed; falling back to sqlx"
);
}
}
}
}
match insert_row(&pool, &table_name, &insert_body).await {
Ok(value) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/insert", "sqlx");
Ok((
value,
format!("Inserted row into {} via postgres", table_name),
))
}
Err(err) => Err(InsertDriverError::Postgres(err)),
}
}
#[cfg(not(feature = "deadpool_experimental"))]
match insert_row(&pool, &table_name, &insert_body).await {
Ok(value) => Ok((
value,
format!("Inserted row into {} via postgres", table_name),
)),
Err(err) => Err(InsertDriverError::Postgres(err)),
}
} else {
insert(
table_name.clone(),
insert_body.clone(),
&client_name.clone(),
)
.await
.map_err(InsertDriverError::Supabase)
};
match insert_result {
Ok((inserted_row, _message)) => {
app_state.cache.invalidate_all();
info!("Cache invalidated after insert");
let mut diff_resource = json!({});
if let (Some(update_body), Some(insert_body_obj)) = (
body.get("update_body").and_then(Value::as_object),
insert_body.as_object(),
) {
for (key, new_value) in update_body {
let inserted_value: Option<&Value> = insert_body_obj.get(key);
if !new_value.is_null() {
let should_include: bool = match inserted_value {
Some(existing) => existing != new_value,
None => true,
};
if should_include {
diff_resource[key] = json!({
"blame": {
"user_id": user_id.clone()
},
"new": new_value,
"old": null,
"time": chrono::Utc::now().timestamp(),
"void": false
});
}
}
}
}
let resource_id: String = inserted_row
.as_object()
.and_then(|obj| obj.get(&resource_id_key))
.and_then(Value::as_str)
.map(|s| s.to_string())
.unwrap_or_else(|| trace_id.clone());
let audit_message: String = format!(
"User {} inserted data into [{}] at {}",
user_id,
table_name,
Utc::now().timestamp()
);
let audit_log_result: Result<Value, String> = insert_audit_log(
table_name.clone(),
resource_id.clone(),
insert_body.clone(),
company_id.clone(),
organization_id.clone(),
"domain".to_string(),
user_id.clone(),
audit_message,
"success".to_string(),
"request".to_string(),
user_id.clone(),
format!("insert-{}", table_name.clone()),
diff_resource,
json!({}),
true,
"/data/insert".to_string(),
&client_name,
)
.await;
match &audit_log_result {
Ok(audit_log_data) => info!(
"Successfully inserted audit log. AUDIT_LOG_DATA: {:#?}",
audit_log_data
),
Err(err) => error!(
"Failed to insert audit log: {:?}, AUDIT_LOG_RESULT: {:#?}",
err, audit_log_result
),
}
let x_publish_event: bool = get_x_publish_event(&req);
if x_publish_event {
if let Some(company_for_event) = identity.resolved_company_id().cloned() {
let event: Value = json!({
"event": "INSERT",
"resource": table_name.clone(),
"inserted_by_user": user_id,
"company_id": company_for_event.clone(),
"insert_body": insert_body.clone(),
});
post_event(company_for_event, event).await;
} else {
info!(
trace_id = %trace_id,
"Skipping publish event because company_id is missing"
);
}
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"resource_id": resource_id,
"client": client_name,
})),
);
HttpResponse::Ok().json(json!({
"status": "success",
"success": true,
"message": "Data inserted successfully",
"data": inserted_row,
"resource_id": resource_id,
"table_name": table_name,
}))
}
Err(driver_error) => {
let insert_error: InsertError = InsertError::from_driver(
driver_error,
Some(&table_name),
&client_name,
metadata_user_id,
metadata_company_id,
metadata_organization_id,
trace_id.clone(),
);
error!(
table = %table_name,
client = %client_name,
user = %user_id,
trace_id = %insert_error.trace_id,
code = %insert_error.code,
"Failed to insert data: {}",
insert_error.message
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"insert",
Some(&table_name),
operation_start.elapsed().as_millis(),
insert_error.status,
Some(json!({
"code": insert_error.code,
"details": insert_error.details,
})),
);
insert_error.into_response()
}
}
}
pub async fn insert(
table_name: String,
data: Value,
client_name: &str,
) -> Result<(Value, String), SupabaseInsertError> {
let client: SupabaseClient = client_router(client_name).await.map_err(|err| {
if err.starts_with("Unknown client name") {
SupabaseInsertError::UnknownClient {
client_name: client_name.to_string(),
}
} else {
SupabaseInsertError::ClientInit { message: err }
}
})?;
let insert_result: Result<String, String> = client.insert(&table_name, data.clone()).await;
match insert_result {
Ok(_result) => Ok((
data,
format!("Successfully inserted data into {:?}", table_name),
)),
Err(err) => Err(SupabaseInsertError::InsertFailed { message: err }),
}
}
#[allow(clippy::too_many_arguments)]
async fn insert_audit_log(
_table_name: String,
_resource_id: String,
_insert_body: Value,
_company_id: String,
_organization_id: String,
_domain: String,
_user_id: String,
_message: String,
_status: String,
_source: String,
_actor_id: String,
_action: String,
_diff_resource: Value,
_meta: Value,
_success: bool,
_path: String,
_client_name: &str,
) -> Result<Value, String> {
Ok(json!({ "audit": true }))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sql_23505_unique_violation_gets_universal_message() {
let error = InsertError::from_driver(
InsertDriverError::Postgres(PostgresInsertError::SqlExecution {
message: "duplicate key value violates unique constraint \"users_email_key\""
.to_string(),
sql_state: Some("23505".to_string()),
}),
Some("users"),
"railway_direct",
Some("user-1"),
Some("company-1"),
Some("org-1"),
"trace-1".to_string(),
);
assert_eq!(error.status, StatusCode::CONFLICT);
assert_eq!(error.code, "unique_violation");
assert!(error.message.contains("duplicate record"));
assert!(error.message.contains("/gateway/update"));
assert_eq!(error.details["constraint"], json!("users_email_key"));
assert_eq!(error.details["sql_state"], json!("23505"));
}
}