use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web::{Data, Json};
use actix_web::{HttpResponse, post};
use serde_json::{Map, Number, Value, json};
use sqlx::{Pool, Postgres};
use std::time::Instant;
use tracing::error;
use super::conditions::{RequestCondition, to_query_conditions};
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::api::gateway::auth::{
GatewayAuthOutcome, authorize_gateway_request, write_right_for_resource,
};
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_UPDATE, GatewayDeferredRequest, extract_update_payload,
parse_conditions_from_body,
};
#[cfg(feature = "deadpool_experimental")]
use crate::api::gateway::deadpool_timeout::deadpool_checkout_timeout;
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::pool_resolver::{resolve_deadpool_pool, resolve_postgres_pool};
use crate::api::headers::x_athena_client::x_athena_client;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::response::api_accepted;
use crate::drivers::postgresql::column_resolver::get_available_columns;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_crud::update_rows_deadpool;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::deadpool_fallback_reason_label;
use crate::drivers::postgresql::sqlx_driver::update_rows;
use crate::error::sqlx_parser::process_sqlx_error_with_context_and_columns;
#[cfg(feature = "deadpool_experimental")]
use crate::error::tokio_postgres_parser::process_tokio_postgres_db_error;
use crate::error::{ErrorCategory, ProcessedError, generate_trace_id};
use crate::parser::query_builder::Condition;
use crate::utils::format::normalize_column_name;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
use super::response::missing_client_header_response;
use super::room_id;
use crate::api::gateway::contracts::GatewayOperationKind;
use crate::api::gateway::response::{gateway_bad_request, gateway_service_unavailable};
pub(crate) async fn handle_gateway_update_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text: bool =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let json_body: &Json<Value> = match &body {
Some(b) => b,
None => {
let auth: GatewayAuthOutcome = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return gateway_bad_request(
GatewayOperationKind::Update,
"Request body is required",
"request body is required for /gateway/update",
);
}
};
let table_name: String = json_body
.get("table_name")
.and_then(Value::as_str)
.map(String::from)
.unwrap_or_default();
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(if table_name.is_empty() {
None
} else {
Some(&table_name)
})],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
if table_name.is_empty() {
return gateway_bad_request(
GatewayOperationKind::Update,
"Missing required field",
"table_name is required",
);
}
let set_payload_map: Map<String, Value> = match extract_update_payload(
json_body,
force_camel_case_to_snake_case,
) {
Some(m) => m,
None => {
return gateway_bad_request(
GatewayOperationKind::Update,
"Missing update payload",
"update payload required: provide 'columns' (array of objects with column names and values), or 'data' / 'set' object",
);
}
};
let set_payload: Value = Value::Object(set_payload_map);
if let Some(additional_conditions) = json_body.get("conditions").and_then(Value::as_array) {
for condition in additional_conditions {
let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) else {
continue;
};
let normalized_for_validation =
normalize_column_name(eq_column, force_camel_case_to_snake_case);
if (normalized_for_validation == "room_id" || eq_column == "roomId")
&& condition.get("eq_value").is_none()
{
return gateway_bad_request(
GatewayOperationKind::Update,
"Invalid condition value",
"room_id is required and must be numeric",
);
}
}
}
let mut conditions: Vec<RequestCondition> = Vec::new();
for condition in parse_conditions_from_body(json_body) {
let normalized_for_validation: String =
normalize_column_name(&condition.eq_column, force_camel_case_to_snake_case);
let eq_value = if normalized_for_validation == "room_id" || condition.eq_column == "roomId"
{
match room_id::parse_room_id_value(&condition.eq_value) {
Ok(room_id) => Value::Number(Number::from(room_id)),
Err(err_msg) => {
return gateway_bad_request(
GatewayOperationKind::Update,
"Invalid condition value",
&err_msg,
);
}
}
} else {
condition.eq_value
};
conditions.push(RequestCondition::new(condition.eq_column, eq_value));
}
if conditions.is_empty() {
return gateway_bad_request(
GatewayOperationKind::Update,
"Missing conditions",
"at least one condition is required for update (e.g. eq_column / eq_value)",
);
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
if auth.force_deferred_queue {
let request_bytes: Option<u64> = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let deferred_request: GatewayDeferredRequest = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_UPDATE,
auth.request_id.clone(),
client_name.clone(),
json_body.0.clone(),
)
.with_reason(auth.force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
if let Err(err) = enqueue_gateway_deferred_request(
app_state.get_ref(),
"POST",
req.path(),
request_bytes,
&deferred_request,
)
.await
{
return gateway_service_unavailable(
GatewayOperationKind::Update,
"Deferred queue unavailable",
&format!("Failed to queue deferred update request: {err}"),
);
}
return api_accepted(
"Update request queued for deferred execution (auth fallback mode)",
json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
}),
);
}
let pg_conditions: Vec<Condition> = to_query_conditions(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
);
let pool: Pool<Postgres> = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(p) => p,
Err(resp) => return resp,
};
let mut update_result: Result<Vec<Value>, anyhow::Error> =
Err(anyhow::anyhow!("use_sqlx_fallback"));
#[cfg(feature = "deadpool_experimental")]
{
let deadpool_requested: bool = x_athena_deadpool_enable(&req, Some(&auth.request_id));
if deadpool_requested {
if let Ok(deadpool_pool) = resolve_deadpool_pool(&req, app_state.get_ref()).await {
match update_rows_deadpool(
&deadpool_pool,
&table_name,
&pg_conditions,
&set_payload,
deadpool_checkout_timeout(),
)
.await
{
Ok(rows) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/update", "deadpool");
update_result = Ok(rows);
}
Err(err) => {
if err.is_db_error {
let processed: ProcessedError = process_tokio_postgres_db_error(
err.sql_state.as_deref().unwrap_or(""),
&err.message,
Some(&table_name),
);
return HttpResponse::build(processed.status_code)
.content_type("application/json")
.json(processed.to_json());
}
app_state.metrics_state.record_deadpool_fallback(
"/gateway/update",
deadpool_fallback_reason_label(err.reason),
);
tracing::warn!(
request_id = %auth.request_id,
reason = ?err.reason,
"Deadpool update failed; falling back to sqlx"
);
}
}
}
}
}
if update_result.is_err() {
update_result = update_rows(&pool, &table_name, &pg_conditions, &set_payload).await;
if update_result.is_ok() {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/update", "sqlx");
}
}
let updated_rows: Vec<Value> = match update_result {
Ok(rows) => rows,
Err(err) => {
if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
let available_columns: Option<Vec<String>> = get_available_columns(
&pool,
&table_name,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
.ok();
let processed: ProcessedError = process_sqlx_error_with_context_and_columns(
sqlx_err,
Some(&table_name),
available_columns.as_deref(),
);
error!(
error_code = %processed.error_code,
trace_id = %processed.trace_id,
"gateway update_rows failed"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"gateway_update",
Some(&table_name),
operation_start.elapsed().as_millis(),
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
return HttpResponse::build(processed.status_code).json(processed.to_json());
}
let processed: ProcessedError = ProcessedError::new(
ErrorCategory::Internal,
StatusCode::INTERNAL_SERVER_ERROR,
"update_execution_error",
"Failed to update rows due to an internal gateway error.",
generate_trace_id(),
)
.with_metadata("table", json!(table_name))
.with_metadata("client", json!(client_name))
.with_metadata("reason", json!(err.to_string()));
error!(
error = %err,
error_code = %processed.error_code,
trace_id = %processed.trace_id,
"gateway update_rows failed"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"gateway_update",
Some(&table_name),
operation_start.elapsed().as_millis(),
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
return HttpResponse::build(processed.status_code).json(processed.to_json());
}
};
if !updated_rows.is_empty() {
let _ = invalidate_scoped_gateway_cache(app_state.clone(), &client_name, &table_name).await;
}
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"gateway_update",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({ "updated_count": updated_rows.len() })),
);
let update_response: Value = json!({ "data": updated_rows });
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
crate::webhooks::ROUTE_GATEWAY_UPDATE,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
Some((**json_body).clone()),
Some(update_response.clone()),
),
);
HttpResponse::Ok().json(update_response)
}
#[post("/gateway/update")]
pub async fn gateway_update_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_gateway_update_route(req, body, app_state).await
}