use actix_web::HttpResponse;
use actix_web::http::StatusCode;
use serde_json::{Map, Value, json};
use tracing::{error, info};
use crate::AppState;
use crate::api::headers::response_headers::set_response_trace_id;
use crate::drivers::postgresql::sqlx_driver::PostgresInsertError;
use super::backend::{InsertDriverError, SupabaseInsertError};
use super::config::retry_after_seconds_from_ms;
use super::dedupe::{remember_unique_violation_from_insert, unique_violation_message};
use super::window::WindowInsertJob;
#[derive(Debug)]
pub(crate) struct InsertError {
pub(crate) status: StatusCode,
pub(crate) code: &'static str,
pub(crate) message: String,
pub(crate) trace_id: String,
pub(crate) details: Value,
}
pub(crate) enum WindowInsertOutcome {
Success(Value),
Error(InsertError),
}
impl WindowInsertOutcome {
pub(crate) fn into_http_response(self) -> HttpResponse {
match self {
Self::Success(value) => HttpResponse::Ok().json(value),
Self::Error(error) => error.into_response(),
}
}
}
pub(crate) fn window_insert_internal_error(message: impl Into<String>) -> WindowInsertOutcome {
WindowInsertOutcome::Error(InsertError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_error",
message,
"internal".to_string(),
Map::new(),
))
}
impl InsertError {
pub(crate) fn new(
status: StatusCode,
code: &'static str,
message: impl Into<String>,
trace_id: String,
details: Map<String, Value>,
) -> Self {
Self {
status,
code,
message: message.into(),
trace_id,
details: Value::Object(details),
}
}
pub(crate) fn into_response(self) -> HttpResponse {
self.into_response_with_retry_after_ms(None)
}
pub(crate) fn into_response_with_retry_after_ms(
self,
retry_after_ms: Option<u64>,
) -> HttpResponse {
let InsertError {
status,
code,
message,
trace_id,
details,
} = self;
let mut builder: actix_web::HttpResponseBuilder = HttpResponse::build(status);
if let Some(retry_after_ms) = retry_after_ms {
let retry_after_seconds = retry_after_seconds_from_ms(retry_after_ms);
builder.insert_header((
actix_web::http::header::RETRY_AFTER,
retry_after_seconds.to_string(),
));
}
let mut response = builder.json(json!({
"status": "error",
"success": false,
"code": code,
"message": message,
"details": details,
"trace_id": trace_id,
}));
set_response_trace_id(response.headers_mut(), &trace_id);
response
}
pub(crate) fn from_driver(
driver_error: InsertDriverError,
table_name: Option<&str>,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
trace_id: String,
) -> Self {
let mut metadata: Map<String, Value> = build_error_metadata(
&trace_id,
table_name,
client_name,
user_id,
company_id,
organization_id,
);
let (status, code, message) = match driver_error {
InsertDriverError::Postgres(err) => match err {
PostgresInsertError::InvalidTableName => (
StatusCode::BAD_REQUEST,
"invalid_table_name",
"table name is invalid".to_string(),
),
PostgresInsertError::InvalidPayload(reason) => {
metadata.insert("validation_error".to_string(), json!(reason.clone()));
(
StatusCode::BAD_REQUEST,
"invalid_payload",
format!("insert payload is invalid: {}", reason),
)
}
PostgresInsertError::InvalidJsonString { column, reason } => {
metadata.insert("validation_error".to_string(), json!(reason.clone()));
metadata.insert("column".to_string(), json!(column.clone()));
(
StatusCode::BAD_REQUEST,
"invalid_json_string",
format!(
"insert payload contains invalid JSON string for JSON column '{}': {}",
column, reason
),
)
}
PostgresInsertError::NoValidColumns => (
StatusCode::BAD_REQUEST,
"no_columns",
"no valid columns provided for insert".to_string(),
),
PostgresInsertError::MissingReturnColumn => (
StatusCode::INTERNAL_SERVER_ERROR,
"missing_return_row",
"insert succeeded but returned no data column".to_string(),
),
PostgresInsertError::SqlExecution { message, sql_state } => {
let db_message: String = message.clone();
if let Some(state) = sql_state.clone() {
metadata.insert("sql_state".to_string(), json!(state));
}
let constraint_name = db_message
.split("constraint \"")
.nth(1)
.and_then(|value| value.split('"').next())
.map(str::to_string);
if let Some(constraint_name) = constraint_name.as_deref() {
metadata.insert("constraint".to_string(), json!(constraint_name));
}
let lowercase_message = db_message.to_ascii_lowercase();
if sql_state.as_deref() == Some("23505")
|| lowercase_message.contains("duplicate key value")
|| lowercase_message.contains("unique constraint")
{
(
StatusCode::CONFLICT,
"unique_violation",
unique_violation_message(constraint_name.as_deref()),
)
} else if sql_state.as_deref() == Some("23503")
|| lowercase_message.contains("foreign key constraint")
{
(
StatusCode::BAD_REQUEST,
"foreign_key_violation",
"insert references a related record that does not exist".to_string(),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
"sql_execution",
db_message,
)
}
}
},
InsertDriverError::Supabase(err) => match err {
SupabaseInsertError::UnknownClient { client_name } => {
metadata.insert("unknown_client".to_string(), json!(client_name.clone()));
(
StatusCode::BAD_REQUEST,
"unknown_client",
format!("Supabase client '{}' is not configured", client_name),
)
}
SupabaseInsertError::ClientInit { message } => (
StatusCode::BAD_REQUEST,
"supabase_client_init_failed",
message,
),
SupabaseInsertError::InsertFailed { message } => {
let message_lower: String = message.to_ascii_lowercase();
if message_lower.contains("duplicate key")
|| message_lower.contains("unique constraint")
{
(
StatusCode::CONFLICT,
"unique_violation",
unique_violation_message(None),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
"supabase_insert_failed",
message,
)
}
}
SupabaseInsertError::BackendUnavailable { message } => {
metadata.insert("backend".to_string(), json!("supabase"));
(
StatusCode::SERVICE_UNAVAILABLE,
"backend_unavailable",
message,
)
}
},
InsertDriverError::StageTimeout {
backend,
stage,
timeout_ms,
} => {
metadata.insert("backend".to_string(), json!(backend));
metadata.insert("stage".to_string(), json!(stage));
metadata.insert("timeout_ms".to_string(), json!(timeout_ms));
(
StatusCode::SERVICE_UNAVAILABLE,
"backend_unavailable",
format!(
"Insert stage '{}' timed out after {}ms (backend={}).",
stage, timeout_ms, backend
),
)
}
};
Self::new(status, code, message, trace_id, metadata)
}
}
pub(crate) fn build_error_metadata(
trace_id: &str,
table_name: Option<&str>,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
) -> Map<String, Value> {
let mut metadata = Map::new();
metadata.insert("trace_id".to_string(), json!(trace_id));
metadata.insert("client".to_string(), json!(client_name));
if let Some(table_name) = table_name {
metadata.insert("table_name".to_string(), json!(table_name));
}
if let Some(user_id) = user_id {
metadata.insert("user_id".to_string(), json!(user_id));
}
if let Some(company_id) = company_id {
metadata.insert("company_id".to_string(), json!(company_id));
}
if let Some(organization_id) = organization_id {
metadata.insert("organization_id".to_string(), json!(organization_id));
}
metadata
}
fn insert_error_event_label(code: &str) -> &'static str {
match code {
"unique_violation" => "error_unique_violation",
"foreign_key_violation" => "error_foreign_key_violation",
"invalid_payload" => "error_invalid_payload",
"invalid_json_string" => "error_invalid_json_string",
"missing_field" => "error_missing_field",
"missing_client_header" => "error_missing_client_header",
"unknown_client" => "error_unknown_client",
"backend_unavailable" => "error_backend_unavailable",
"admission_overloaded" => "error_admission_overloaded",
"sql_execution" => "error_sql_execution",
_ => "error_other",
}
}
pub(crate) fn observe_insert_error(
app_state: &AppState,
client_name: &str,
table_name: Option<&str>,
insert_body: Option<&Value>,
insert_error: &InsertError,
) {
app_state
.metrics_state
.record_gateway_insert_error(insert_error.code, insert_error.status.as_u16());
app_state
.metrics_state
.record_gateway_insert_window_event(insert_error_event_label(insert_error.code));
if insert_error.code == "unique_violation" {
app_state
.metrics_state
.record_gateway_insert_window_event("db_unique_violation");
if let (Some(table_name), Some(insert_body)) = (table_name, insert_body) {
remember_unique_violation_from_insert(
client_name,
table_name,
insert_body,
insert_error.code,
&insert_error.details,
);
}
}
if insert_error.code == "backend_unavailable" {
let backend_label = insert_error
.details
.get("backend")
.and_then(Value::as_str)
.unwrap_or("postgres");
app_state
.metrics_state
.record_gateway_backend_unavailable("insert", backend_label);
}
}
pub(crate) fn trace_insert_error(
table_name: &str,
client_name: &str,
user_id: &str,
insert_error: &InsertError,
) {
error!(
table = %table_name,
client = %client_name,
user = %user_id,
trace_id = %insert_error.trace_id,
status_code = insert_error.status.as_u16(),
code = %insert_error.code,
details = %insert_error.details,
"Failed to insert data: {}",
insert_error.message
);
}
pub(crate) fn log_gateway_insert_success(
outcome: &'static str,
trace_id: &str,
client_name: &str,
table_name: &str,
total_ms: u128,
db_ms: u64,
) {
info!(
target: "athena::gateway::insert",
outcome,
trace_id = %trace_id,
client_name = %client_name,
table_name = %table_name,
total_duration_ms = total_ms,
db_duration_ms = db_ms,
"gateway insert completed"
);
}
pub(crate) fn prefilter_unique_violation_error(
trace_id: String,
table_name: &str,
client_name: &str,
user_id: Option<&str>,
company_id: Option<&str>,
organization_id: Option<&str>,
constraint: Option<&str>,
dedupe_source: &str,
) -> InsertError {
let mut details: Map<String, Value> = build_error_metadata(
&trace_id,
Some(table_name),
client_name,
user_id,
company_id,
organization_id,
);
details.insert("sql_state".to_string(), json!("23505"));
details.insert("dedupe_source".to_string(), json!(dedupe_source));
if let Some(constraint) = constraint {
details.insert("constraint".to_string(), json!(constraint));
}
InsertError::new(
StatusCode::CONFLICT,
"unique_violation",
unique_violation_message(constraint),
trace_id,
details,
)
}
pub(crate) fn window_insert_overloaded_outcome(
job: &WindowInsertJob,
message: &str,
admission_reason: &str,
) -> WindowInsertOutcome {
let mut details: Map<String, Value> = build_error_metadata(
&job.trace_id,
Some(&job.table_name),
&job.client_name,
job.metadata_user_id.as_deref(),
job.metadata_company_id.as_deref(),
job.metadata_organization_id.as_deref(),
);
details.insert("admission_reason".to_string(), json!(admission_reason));
details.insert("status".to_string(), json!("overloaded"));
WindowInsertOutcome::Error(InsertError::new(
StatusCode::TOO_MANY_REQUESTS,
"admission_overloaded",
message.to_string(),
job.trace_id.clone(),
details,
))
}