use actix_web::{HttpRequest, HttpResponse, Responder, http::StatusCode, post, web};
use athena_gateway::{
GatewayQueryRequestParseError, GatewayQueryRequestPlanError, build_gateway_query_request_plan,
parse_gateway_query_request_body,
};
use serde_json::{json, to_value as serde_to_value};
use sqlx::{Pool, Postgres};
use std::time::Instant;
use crate::api::gateway::auth::GatewayAuthOptions;
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_QUERY, GatewayDeferredRequest, GatewayOperationKind, GatewayRowsMeta,
GatewayRowsResponse, GatewaySqlRequest, gateway_sql_execution_mode_to_transaction_mode,
};
#[cfg(feature = "deadpool_experimental")]
use crate::api::gateway::deadpool_timeout::deadpool_checkout_timeout;
use crate::api::gateway::fetch::{execute_structured_fetch_sql, render_structured_fetch_sql};
use crate::api::gateway::lifecycle::{
authorize_and_log_gateway_request_with_options, enqueue_gateway_deferred_response,
log_gateway_operation_result,
};
use crate::api::gateway::pool_resolver::{
gateway_client_name_or_direct_token, request_uses_direct_postgres_uri, resolve_postgres_pool,
};
use crate::api::gateway::response::{
GATEWAY_ERROR_CODE_CLIENT_CATALOG_UNAVAILABLE, GATEWAY_ERROR_CODE_CLIENT_FROZEN,
GATEWAY_ERROR_CODE_CLIENT_INACTIVE, GATEWAY_ERROR_CODE_CLIENT_LOOKUP_FAILED,
GATEWAY_ERROR_CODE_D1_BACKEND_UNAVAILABLE, GATEWAY_ERROR_CODE_DEFERRED_QUERY_NOT_SUPPORTED,
GATEWAY_ERROR_CODE_INVALID_CLIENT_METADATA,
GATEWAY_ERROR_CODE_INVALID_RELATION_SELECT_COMPATIBILITY_QUERY,
GATEWAY_ERROR_CODE_MISSING_CLIENT_HEADER, GATEWAY_ERROR_CODE_QUERY_BACKEND_UNAVAILABLE,
GATEWAY_ERROR_CODE_QUERY_EXECUTION_FAILED, GATEWAY_ERROR_CODE_QUERY_SQL_EXECUTION_FAILED,
GATEWAY_ERROR_CODE_SCYLLA_BACKEND_UNAVAILABLE, GATEWAY_ERROR_CODE_UNSUPPORTED_SCHEMA_NAME,
gateway_bad_request_with_code, gateway_error_with_details, gateway_internal_error_with_code,
gateway_service_unavailable_with_code,
};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::response::processed_error;
use crate::athena::contracts::AthenaEngine;
use crate::athena::postgres_clients::catalog_client_has_database_connection;
use crate::athena::resolver::{
AthenaClientResolveError, AthenaResolvedQueryBackend, resolve_query_backend,
};
use crate::drivers::cloudflare_d1::client::{
HEADER_D1_BOOKMARK, HEADER_D1_SESSION_MODE, execute_query_via_proxy,
};
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::{
deadpool_fallback_reason_label, execute_postgres_sql_deadpool,
};
use crate::drivers::postgresql::raw_sql::execute_postgres_sql_script;
use crate::drivers::scylla::client::execute_query_with_info;
use crate::error::ProcessedError;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
#[cfg(feature = "deadpool_experimental")]
use crate::error::tokio_postgres_parser::process_tokio_postgres_db_error;
fn x_athena_defer(req: &HttpRequest) -> bool {
req.headers()
.get("X-Athena-Defer")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.map(|value| matches!(value, "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
fn athena_client_resolution_response(err: AthenaClientResolveError) -> HttpResponse {
match err {
AthenaClientResolveError::Inactive { client_name } => gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_CLIENT_INACTIVE,
GatewayOperationKind::Query,
"Athena client is inactive",
format!("Client '{}' is inactive.", client_name),
),
AthenaClientResolveError::Frozen { client_name } => gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_CLIENT_FROZEN,
GatewayOperationKind::Query,
"Athena client is frozen",
format!("Client '{}' is frozen.", client_name),
),
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_CLIENT_METADATA,
GatewayOperationKind::Query,
"Invalid Athena client metadata",
format!("Client '{}' {}", client_name, message),
),
AthenaClientResolveError::Lookup {
client_name,
message,
} => gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_CLIENT_LOOKUP_FAILED,
GatewayOperationKind::Query,
"Failed to resolve Athena client",
format!("Client '{}' lookup failed: {}", client_name, message),
),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct DeferredQueryUnsupportedBackend {
user_label: &'static str,
metric_label: &'static str,
}
fn deferred_query_unsupported_backend(
engine: AthenaEngine,
) -> Option<DeferredQueryUnsupportedBackend> {
match engine {
AthenaEngine::Scylla => Some(DeferredQueryUnsupportedBackend {
user_label: "Scylla",
metric_label: "scylla",
}),
AthenaEngine::D1 => Some(DeferredQueryUnsupportedBackend {
user_label: "Cloudflare D1",
metric_label: "cloudflare-d1",
}),
AthenaEngine::Postgres | AthenaEngine::S3 => None,
}
}
#[cfg(test)]
mod tests {
use super::{
DeferredQueryUnsupportedBackend, deferred_query_unsupported_backend,
gateway_query_request_parse_error_response, gateway_query_request_plan_error_response,
};
use crate::athena::contracts::AthenaEngine;
use actix_web::{body::to_bytes, http::StatusCode};
use athena_gateway::{GatewayQueryRequestParseError, GatewayQueryRequestPlanError};
use serde_json::Value;
#[actix_web::test]
async fn parse_errors_map_to_normalized_gateway_bad_request_envelope() {
let response =
gateway_query_request_parse_error_response(GatewayQueryRequestParseError::MissingBody);
let status = response.status();
let body = to_bytes(response.into_body())
.await
.expect("body should serialize");
let payload: Value = serde_json::from_slice(&body).expect("body should be json");
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_eq!(payload["status"], "error");
assert_eq!(payload["message"], "Invalid request body");
assert_eq!(
payload["error"],
"request body is required for /gateway/query"
);
assert_eq!(payload["code"], "missing_request_body");
}
#[actix_web::test]
async fn planning_errors_map_to_normalized_gateway_bad_request_envelope() {
let response = gateway_query_request_plan_error_response(
GatewayQueryRequestPlanError::InvalidRelationSelectCompatibility(
"JOIN USING is not supported".to_string(),
),
);
let status = response.status();
let body = to_bytes(response.into_body())
.await
.expect("body should serialize");
let payload: Value = serde_json::from_slice(&body).expect("body should be json");
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_eq!(
payload["message"],
"Invalid relation-select compatibility query"
);
assert_eq!(payload["error"], "JOIN USING is not supported");
assert_eq!(
payload["code"],
"invalid_relation_select_compatibility_query"
);
}
#[test]
fn deferred_query_support_matches_athena_engine_contracts() {
assert_eq!(
deferred_query_unsupported_backend(AthenaEngine::Postgres),
None
);
assert_eq!(
deferred_query_unsupported_backend(AthenaEngine::Scylla),
Some(DeferredQueryUnsupportedBackend {
user_label: "Scylla",
metric_label: "scylla",
})
);
assert_eq!(
deferred_query_unsupported_backend(AthenaEngine::D1),
Some(DeferredQueryUnsupportedBackend {
user_label: "Cloudflare D1",
metric_label: "cloudflare-d1",
})
);
}
}
fn gateway_sql_script_error_response(
err: crate::drivers::postgresql::raw_sql::PostgresSqlScriptError,
) -> HttpResponse {
let status = match err.status_hint {
400 => StatusCode::BAD_REQUEST,
503 => StatusCode::SERVICE_UNAVAILABLE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let code = match status {
StatusCode::BAD_REQUEST => GATEWAY_ERROR_CODE_QUERY_SQL_EXECUTION_FAILED,
StatusCode::SERVICE_UNAVAILABLE => GATEWAY_ERROR_CODE_QUERY_BACKEND_UNAVAILABLE,
_ => GATEWAY_ERROR_CODE_QUERY_EXECUTION_FAILED,
};
gateway_error_with_details(
status,
code,
GatewayOperationKind::Query,
"SQL statement execution failed",
err.message,
Some(json!({
"statement_index": err.statement_index,
"total_statements": err.total_statements,
"line_start": err.line_start,
"line_end": err.line_end,
"preprocess": err.preprocess,
})),
)
}
fn gateway_query_request_parse_error_response(err: GatewayQueryRequestParseError) -> HttpResponse {
gateway_bad_request_with_code(
err.code(),
GatewayOperationKind::Query,
err.summary(),
err.detail(),
)
}
fn gateway_query_request_plan_error_response(err: GatewayQueryRequestPlanError) -> HttpResponse {
gateway_bad_request_with_code(
err.code(),
GatewayOperationKind::Query,
err.summary(),
err.detail(),
)
}
async fn resolve_query_backend_contract(
req: &HttpRequest,
app_state: &crate::AppState,
) -> Result<Option<AthenaResolvedQueryBackend>, HttpResponse> {
let client_name = x_athena_client(req);
if client_name.is_empty() {
return Ok(None);
}
resolve_query_backend(app_state, &client_name)
.await
.map_err(athena_client_resolution_response)
}
#[post("/gateway/query")]
pub async fn gateway_query_route(
req: HttpRequest,
body: web::Bytes,
app_state: web::Data<crate::AppState>,
) -> impl Responder {
let body = match parse_gateway_query_request_body(body.as_ref()) {
Ok(body) => body,
Err(err) => return gateway_query_request_parse_error_response(err),
};
handle_gateway_query_route(req, body, app_state).await
}
pub(crate) async fn handle_gateway_query_route(
req: HttpRequest,
body: GatewaySqlRequest,
app_state: web::Data<crate::AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let direct_pg_uri_requested: bool = request_uses_direct_postgres_uri(&req);
let client_name = x_athena_client(&req);
let definitely_postgres = if direct_pg_uri_requested {
true
} else if client_name.is_empty() {
false
} else if app_state
.pg_registry
.registered_client(&client_name)
.is_some()
{
true
} else {
match catalog_client_has_database_connection(app_state.get_ref(), &client_name).await {
Ok(value) => value,
Err(err) => {
return gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_CLIENT_CATALOG_UNAVAILABLE,
GatewayOperationKind::Query,
"Client catalog unavailable",
format!("Failed to resolve catalog-backed Postgres client: {}", err),
);
}
}
};
let query_plan: athena_gateway::GatewayQueryRequestPlan = match build_gateway_query_request_plan(
&body,
definitely_postgres,
app_state.gateway_force_camel_case_to_snake_case,
) {
Ok(plan) => plan,
Err(err) => return gateway_query_request_plan_error_response(err),
};
let normalized_query = query_plan.normalized_query.clone();
let schema_name = query_plan.schema_name.clone();
let execution_mode: athena_driver::postgresql::raw_sql::PostgresSqlTransactionMode =
gateway_sql_execution_mode_to_transaction_mode(query_plan.execution_mode);
let compatibility_rewrite_and_plan = query_plan.compatibility.clone();
let required_rights = query_plan.required_rights();
let auth_context = match authorize_and_log_gateway_request_with_options(
&req,
app_state.get_ref(),
None,
required_rights,
GatewayAuthOptions::default(),
)
.await
{
Ok(context) => context,
Err(response) => return response,
};
let auth = auth_context.auth;
let logged_request = auth_context.logged_request;
let client_for_webhook: String = gateway_client_name_or_direct_token(&req);
let query_payload_for_webhook: serde_json::Value =
serde_to_value(&body).unwrap_or_else(|_| json!({}));
let resolved_backend = match resolve_query_backend_contract(&req, app_state.get_ref()).await {
Ok(backend) => backend,
Err(response) => return response,
};
let explicit_defer_requested: bool = x_athena_defer(&req);
let force_deferred_queue: bool = auth.force_deferred_queue;
let force_deferred_reason: Option<String> = auth.force_deferred_reason.clone();
if explicit_defer_requested || force_deferred_queue {
if direct_pg_uri_requested {
tracing::warn!(
request_id = %auth.request_id,
"Deferred queue requested for direct PostgreSQL URI request; continuing with inline execution",
);
} else {
let client_name: String = x_athena_client(&req);
if client_name.is_empty() {
if explicit_defer_requested {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_MISSING_CLIENT_HEADER,
GatewayOperationKind::Query,
"Missing required header",
"X-Athena-Client is required when using X-Athena-Defer for /gateway/query",
);
}
tracing::warn!(
request_id = %auth.request_id,
"Auth fallback requested deferred queueing for /gateway/query, but X-Athena-Client is missing; continuing with inline execution",
);
} else if let Some(resolved_backend) = resolved_backend.as_ref() {
if let Some(unsupported_backend) =
deferred_query_unsupported_backend(resolved_backend.client().engine)
{
if explicit_defer_requested {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_DEFERRED_QUERY_NOT_SUPPORTED,
GatewayOperationKind::Query,
"Deferred execution is not supported",
format!(
"{}-backed Athena clients do not support deferred /gateway/query execution yet.",
unsupported_backend.user_label
),
);
}
tracing::warn!(
request_id = %auth.request_id,
client = %client_name,
backend = unsupported_backend.metric_label,
"Auth fallback requested deferred queueing for a non-deferred Athena backend; continuing with inline execution",
);
} else {
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_QUERY,
auth.request_id.clone(),
client_name.clone(),
json!({
"query": normalized_query.clone(),
"schema_name": schema_name,
"execution_mode": body.execution_mode,
}),
)
.with_reason(force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
let queue_message = if force_deferred_queue && !explicit_defer_requested {
"Query queued for deferred execution (auth fallback mode)"
} else {
"Query queued for deferred execution"
};
return enqueue_gateway_deferred_response(
&req,
app_state.get_ref(),
&auth,
&client_name,
"POST",
&deferred_request,
GatewayOperationKind::Query,
queue_message,
)
.await;
}
} else {
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_QUERY,
auth.request_id.clone(),
client_name.clone(),
json!({
"query": normalized_query.clone(),
"schema_name": schema_name,
"execution_mode": body.execution_mode,
}),
)
.with_reason(force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
let queue_message = if force_deferred_queue && !explicit_defer_requested {
"Query queued for deferred execution (auth fallback mode)"
} else {
"Query queued for deferred execution"
};
return enqueue_gateway_deferred_response(
&req,
app_state.get_ref(),
&auth,
&client_name,
"POST",
&deferred_request,
GatewayOperationKind::Query,
queue_message,
)
.await;
}
}
}
if let Some(AthenaResolvedQueryBackend::D1 {
client,
connection_info,
}) = resolved_backend.as_ref()
{
if schema_name.is_some() {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_UNSUPPORTED_SCHEMA_NAME,
GatewayOperationKind::Query,
"Unsupported schema_name",
"schema_name is only supported for PostgreSQL /gateway/query execution",
);
}
let requested_session_mode = req
.headers()
.get(HEADER_D1_SESSION_MODE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let requested_bookmark = req
.headers()
.get(HEADER_D1_BOOKMARK)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
match execute_query_via_proxy(
&app_state.client,
connection_info,
&normalized_query,
vec![],
requested_session_mode,
requested_bookmark,
true,
)
.await
{
Ok(result) => {
let returned_row_count = result.rows.len();
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "cloudflare-d1");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "cloudflare-d1",
"engine": format!("{:?}", client.engine).to_ascii_lowercase(),
"columns": result.columns,
"query": normalized_query,
"returned_row_count": returned_row_count,
"duration_ms": result.duration_ms,
"bookmark": result.bookmark,
"count": result.count,
})),
);
let rows_response: GatewayRowsResponse =
GatewayRowsResponse::new(result.rows.clone()).with_meta(GatewayRowsMeta {
backend: "cloudflare-d1".to_string(),
statement_count: 1,
rows_affected: 0,
returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
"driver_meta": {
"columns": result.columns,
"duration_ms": result.duration_ms,
"bookmark": result.bookmark,
"count": result.count,
"meta": result.meta,
},
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
let mut response = HttpResponse::Ok().json(success_payload);
if let Some(bookmark) = result.bookmark
&& let Ok(header_value) = bookmark.parse()
{
response.headers_mut().insert(
HEADER_D1_BOOKMARK.parse().expect("valid header"),
header_value,
);
}
return response;
}
Err(err) => {
let response = gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_D1_BACKEND_UNAVAILABLE,
GatewayOperationKind::Query,
"Cloudflare D1 backend unavailable",
err.clone(),
);
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
response.status(),
Some(json!({
"backend": "cloudflare-d1",
"error": err,
})),
);
return response;
}
}
}
if let Some(AthenaResolvedQueryBackend::Scylla {
client,
connection_info,
}) = resolved_backend.as_ref()
{
if schema_name.is_some() {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_UNSUPPORTED_SCHEMA_NAME,
GatewayOperationKind::Query,
"Unsupported schema_name",
"schema_name is only supported for PostgreSQL /gateway/query execution",
);
}
match execute_query_with_info(normalized_query.clone(), connection_info).await {
Ok((rows, columns)) => {
let returned_row_count = rows.len();
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "scylla");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "scylla",
"engine": format!("{:?}", client.engine).to_ascii_lowercase(),
"columns": columns,
"query": normalized_query,
"returned_row_count": rows.len(),
})),
);
let rows_response: GatewayRowsResponse =
GatewayRowsResponse::new(rows).with_meta(GatewayRowsMeta {
backend: "scylla".to_string(),
statement_count: 1,
rows_affected: 0,
returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
return HttpResponse::Ok().json(success_payload);
}
Err(err) => {
let error_message = err.to_string();
let response = if error_message.contains("connection")
&& (error_message.contains("refused")
|| error_message.contains("Control connection pool error")
|| error_message.contains("target machine actively refused"))
{
app_state
.metrics_state
.record_gateway_backend_unavailable("query", "scylla");
gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_SCYLLA_BACKEND_UNAVAILABLE,
GatewayOperationKind::Query,
"Scylla backend unavailable",
error_message.clone(),
)
} else {
gateway_internal_error_with_code(
GATEWAY_ERROR_CODE_QUERY_EXECUTION_FAILED,
GatewayOperationKind::Query,
"Query execution failed",
error_message.clone(),
)
};
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
response.status(),
Some(json!({
"backend": "scylla",
"error": err.to_string(),
})),
);
return response;
}
}
}
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id))
&& query_plan.allows_deadpool_execution();
if let Some(compatibility) = compatibility_rewrite_and_plan {
let rewrite = compatibility.rewrite;
let plan = compatibility.structured_fetch_plan;
let pool: Pool<Postgres> = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(p) => p,
Err(resp) => return resp,
};
let compiled_sql = match render_structured_fetch_sql(&pool, &plan).await {
Ok(sql) => sql,
Err(err) => {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_RELATION_SELECT_COMPATIBILITY_QUERY,
GatewayOperationKind::Query,
"Invalid relation-select compatibility query",
err,
);
}
};
return match execute_structured_fetch_sql(&pool, &plan, &compiled_sql).await {
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "sqlx");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "sqlx",
"deadpool_requested": deadpool_requested,
"query": normalized_query,
"execution_mode": execution_mode,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"compatibility_rewrite": {
"kind": "relation_select",
"select": rewrite.select,
"table_name": rewrite.table.table_name,
"schema_name": rewrite.table.schema_name,
}
})),
);
let rows_response: GatewayRowsResponse =
GatewayRowsResponse::new(result.rows.clone()).with_meta(GatewayRowsMeta {
backend: "sqlx".to_string(),
statement_count: result.summary.statement_count,
rows_affected: result.summary.rows_affected,
returned_row_count: result.summary.returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
"execution_mode": execution_mode,
"statements": [compiled_sql],
"compatibility_rewrite": {
"kind": "relation_select",
"table_name": plan.table_name,
}
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
HttpResponse::Ok().json(success_payload)
}
Err(err) => {
let processed = process_sqlx_error_with_context(&err, Some(&plan.table_name));
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
"compatibility_rewrite": "relation_select",
})),
);
processed_error(processed)
}
};
}
#[cfg(feature = "deadpool_experimental")]
if deadpool_requested {
if schema_name.is_some() {
tracing::warn!(
request_id = %auth.request_id,
"Deadpool query requested with schema_name; falling back to sqlx because schema search_path override is not yet supported via deadpool",
);
} else {
match crate::api::gateway::pool_resolver::resolve_deadpool_pool(
&req,
app_state.get_ref(),
)
.await
{
Ok(pool) => {
match execute_postgres_sql_deadpool(
&pool,
&normalized_query,
deadpool_checkout_timeout(),
)
.await
{
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "deadpool");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "deadpool",
"deadpool_requested": true,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
})),
);
let rows_response: GatewayRowsResponse =
GatewayRowsResponse::new(result.rows).with_meta(GatewayRowsMeta {
backend: "deadpool".to_string(),
statement_count: result.summary.statement_count,
rows_affected: result.summary.rows_affected,
returned_row_count: result.summary.returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
return HttpResponse::Ok().json(success_payload);
}
Err(err) => {
if err.is_db_error {
let processed = process_tokio_postgres_db_error(
err.sql_state.as_deref().unwrap_or(""),
&err.message,
None,
);
return processed_error(processed);
}
app_state.metrics_state.record_deadpool_fallback(
"/gateway/query",
deadpool_fallback_reason_label(err.reason),
);
tracing::warn!(
request_id = %auth.request_id,
reason = ?err.reason,
"Deadpool query failed; falling back to sqlx"
);
}
}
}
Err(err_resp) => {
tracing::warn!(
request_id = %auth.request_id,
"Deadpool requested but pool could not be resolved; falling back to sqlx"
);
let _ = err_resp;
}
}
}
}
let pool: Pool<Postgres> = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(p) => p,
Err(resp) => return resp,
};
match execute_postgres_sql_script(
&pool,
&normalized_query,
execution_mode,
schema_name.as_deref(),
)
.await
{
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "sqlx");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "sqlx",
"deadpool_requested": deadpool_requested,
"query": normalized_query,
"execution_mode": execution_mode,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"preprocess": result.preprocess,
})),
);
let rows_response: GatewayRowsResponse = GatewayRowsResponse::new(result.rows.clone())
.with_meta(GatewayRowsMeta {
backend: "sqlx".to_string(),
statement_count: result.summary.statement_count,
rows_affected: result.summary.rows_affected,
returned_row_count: result.summary.returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
"execution_mode": execution_mode,
"statements": result.statements,
"preprocess": result.preprocess,
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
HttpResponse::Ok().json(success_payload)
}
Err(err) => {
if err.status_hint == 400 || err.status_hint == 503 {
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
match err.status_hint {
400 => StatusCode::BAD_REQUEST,
503 => StatusCode::SERVICE_UNAVAILABLE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
},
Some(json!({
"error": err.message,
"statement_index": err.statement_index,
"line_start": err.line_start,
"line_end": err.line_end,
"preprocess": err.preprocess,
})),
);
return gateway_sql_script_error_response(err);
}
let wrapped = sqlx::Error::Protocol(err.message.clone());
let processed: ProcessedError = process_sqlx_error_with_context(&wrapped, None);
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
processed_error(processed)
}
}
}