use actix_web::{HttpRequest, HttpResponse, Responder, http::StatusCode, post, web};
use serde_json::{json, to_value as serde_to_value};
use sqlx::{Pool, Postgres};
use std::time::Instant;
use crate::api::gateway::auth::query_right;
use crate::api::gateway::contracts::GatewayOperationKind;
use crate::api::gateway::contracts::{
GatewayDeferredRequest, GatewayRowsMeta, GatewayRowsResponse, GatewaySqlRequest,
};
#[cfg(feature = "deadpool_experimental")]
use crate::api::gateway::deadpool_timeout::deadpool_checkout_timeout;
use crate::api::gateway::lifecycle::{
authorize_and_log_gateway_request, enqueue_gateway_deferred_response,
log_gateway_operation_result,
};
use crate::api::gateway::pool_resolver::resolve_postgres_pool;
use crate::api::gateway::response::{
gateway_bad_request, gateway_internal_error, gateway_service_unavailable,
};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::response::processed_error;
use crate::athena::resolver::{
AthenaClientResolveError, AthenaResolvedQueryBackend, resolve_query_backend,
};
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::{
deadpool_fallback_reason_label, execute_postgres_sql_deadpool,
};
use crate::drivers::postgresql::raw_sql::{execute_postgres_sql, normalize_sql_query};
use crate::drivers::scylla::client::execute_query_with_info;
use crate::error::ProcessedError;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
#[cfg(feature = "deadpool_experimental")]
use crate::error::tokio_postgres_parser::process_tokio_postgres_db_error;
fn x_athena_defer(req: &HttpRequest) -> bool {
req.headers()
.get("X-Athena-Defer")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.map(|value| matches!(value, "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
fn scylla_resolution_response(err: AthenaClientResolveError) -> HttpResponse {
match err {
AthenaClientResolveError::Inactive { client_name } => gateway_bad_request(
GatewayOperationKind::Query,
"Scylla client is inactive",
format!("Client '{}' is inactive.", client_name),
),
AthenaClientResolveError::Frozen { client_name } => gateway_bad_request(
GatewayOperationKind::Query,
"Scylla client is frozen",
format!("Client '{}' is frozen.", client_name),
),
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => gateway_bad_request(
GatewayOperationKind::Query,
"Invalid Scylla client metadata",
format!("Client '{}' {}", client_name, message),
),
AthenaClientResolveError::Lookup {
client_name,
message,
} => gateway_service_unavailable(
GatewayOperationKind::Query,
"Failed to resolve Scylla client",
format!("Client '{}' lookup failed: {}", client_name, message),
),
}
}
async fn resolve_query_scylla_client(
req: &HttpRequest,
app_state: &crate::AppState,
) -> Result<Option<AthenaResolvedQueryBackend>, HttpResponse> {
let client_name = x_athena_client(req);
if client_name.is_empty() {
return Ok(None);
}
resolve_query_backend(app_state, &client_name)
.await
.map_err(scylla_resolution_response)
}
#[post("/gateway/query")]
pub async fn gateway_query_route(
req: HttpRequest,
body: web::Json<GatewaySqlRequest>,
app_state: web::Data<crate::AppState>,
) -> impl Responder {
handle_gateway_query_route(req, body.0, app_state).await
}
pub(crate) async fn handle_gateway_query_route(
req: HttpRequest,
body: GatewaySqlRequest,
app_state: web::Data<crate::AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let auth_context = match authorize_and_log_gateway_request(
&req,
app_state.get_ref(),
None,
vec![query_right()],
)
.await
{
Ok(context) => context,
Err(response) => return response,
};
let auth = auth_context.auth;
let logged_request = auth_context.logged_request;
let normalized_query = normalize_sql_query(&body.query);
if normalized_query.is_empty() {
return gateway_bad_request(
GatewayOperationKind::Query,
"Invalid query",
"Query cannot be empty or contain only semicolons.",
);
}
let client_for_webhook: String = x_athena_client(&req);
let query_payload_for_webhook: serde_json::Value =
serde_to_value(&body).unwrap_or_else(|_| json!({}));
let resolved_backend = match resolve_query_scylla_client(&req, app_state.get_ref()).await {
Ok(backend) => backend,
Err(response) => return response,
};
let explicit_defer_requested: bool = x_athena_defer(&req);
let force_deferred_queue: bool = auth.force_deferred_queue;
let force_deferred_reason: Option<String> = auth.force_deferred_reason.clone();
if explicit_defer_requested || force_deferred_queue {
let client_name: String = x_athena_client(&req);
if client_name.is_empty() {
if explicit_defer_requested {
return gateway_bad_request(
GatewayOperationKind::Query,
"Missing required header",
"X-Athena-Client is required when using X-Athena-Defer for /gateway/query",
);
}
tracing::warn!(
request_id = %auth.request_id,
"Auth fallback requested deferred queueing for /gateway/query, but X-Athena-Client is missing; continuing with inline execution",
);
} else if matches!(
resolved_backend,
Some(AthenaResolvedQueryBackend::Scylla { .. })
) {
if explicit_defer_requested {
return gateway_bad_request(
GatewayOperationKind::Query,
"Deferred execution is not supported",
"Scylla-backed Athena clients do not support deferred /gateway/query execution yet.",
);
}
tracing::warn!(
request_id = %auth.request_id,
client = %client_name,
"Auth fallback requested deferred queueing for a Scylla-backed client; continuing with inline execution",
);
} else {
let deferred_request = GatewayDeferredRequest::for_query(
auth.request_id.clone(),
client_name.clone(),
normalized_query.clone(),
)
.with_reason(force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
let queue_message = if force_deferred_queue && !explicit_defer_requested {
"Query queued for deferred execution (auth fallback mode)"
} else {
"Query queued for deferred execution"
};
return enqueue_gateway_deferred_response(
&req,
app_state.get_ref(),
&auth,
&client_name,
"POST",
&deferred_request,
GatewayOperationKind::Query,
queue_message,
)
.await;
}
}
if let Some(AthenaResolvedQueryBackend::Scylla {
client,
connection_info,
}) = resolved_backend
{
match execute_query_with_info(normalized_query.clone(), &connection_info).await {
Ok((rows, columns)) => {
let returned_row_count = rows.len();
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "scylla");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "scylla",
"engine": format!("{:?}", client.engine).to_ascii_lowercase(),
"columns": columns,
"query": normalized_query,
"returned_row_count": rows.len(),
})),
);
let rows_response: GatewayRowsResponse =
GatewayRowsResponse::new(rows).with_meta(GatewayRowsMeta {
backend: "scylla".to_string(),
statement_count: 1,
rows_affected: 0,
returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
return HttpResponse::Ok().json(success_payload);
}
Err(err) => {
let error_message = err.to_string();
let response = if error_message.contains("connection")
&& (error_message.contains("refused")
|| error_message.contains("Control connection pool error")
|| error_message.contains("target machine actively refused"))
{
app_state
.metrics_state
.record_gateway_backend_unavailable("query", "scylla");
gateway_service_unavailable(
GatewayOperationKind::Query,
"Scylla backend unavailable",
error_message.clone(),
)
} else {
gateway_internal_error(
GatewayOperationKind::Query,
"Query execution failed",
error_message.clone(),
)
};
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
response.status(),
Some(json!({
"backend": "scylla",
"error": err.to_string(),
})),
);
return response;
}
}
}
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(feature = "deadpool_experimental")]
if deadpool_requested {
match crate::api::gateway::pool_resolver::resolve_deadpool_pool(&req, app_state.get_ref())
.await
{
Ok(pool) => {
match execute_postgres_sql_deadpool(&pool, &body.query, deadpool_checkout_timeout())
.await
{
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "deadpool");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "deadpool",
"deadpool_requested": true,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
})),
);
let rows_response: GatewayRowsResponse =
GatewayRowsResponse::new(result.rows).with_meta(GatewayRowsMeta {
backend: "deadpool".to_string(),
statement_count: result.summary.statement_count,
rows_affected: result.summary.rows_affected,
returned_row_count: result.summary.returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
return HttpResponse::Ok().json(success_payload);
}
Err(err) => {
if err.is_db_error {
let processed = process_tokio_postgres_db_error(
err.sql_state.as_deref().unwrap_or(""),
&err.message,
None,
);
return processed_error(processed);
}
app_state.metrics_state.record_deadpool_fallback(
"/gateway/query",
deadpool_fallback_reason_label(err.reason),
);
tracing::warn!(
request_id = %auth.request_id,
reason = ?err.reason,
"Deadpool query failed; falling back to sqlx"
);
}
}
}
Err(err_resp) => {
tracing::warn!(
request_id = %auth.request_id,
"Deadpool requested but pool could not be resolved; falling back to sqlx"
);
let _ = err_resp;
}
}
}
let pool: Pool<Postgres> = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(p) => p,
Err(resp) => return resp,
};
match execute_postgres_sql(&pool, &normalized_query).await {
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/query", "sqlx");
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
StatusCode::OK,
Some(json!({
"backend": "sqlx",
"deadpool_requested": deadpool_requested,
"query": normalized_query,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
})),
);
let rows_response: GatewayRowsResponse = GatewayRowsResponse::new(result.rows)
.with_meta(GatewayRowsMeta {
backend: "sqlx".to_string(),
statement_count: result.summary.statement_count,
rows_affected: result.summary.rows_affected,
returned_row_count: result.summary.returned_row_count,
});
let success_payload = json!({
"status": "success",
"message": "Query executed",
"data": rows_response.data,
"meta": rows_response.meta,
});
if !client_for_webhook.is_empty() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
client_for_webhook.as_str(),
crate::webhooks::ROUTE_GATEWAY_QUERY,
None,
Some(logged_request.request_id.clone()),
Some(query_payload_for_webhook.clone()),
Some(success_payload.clone()),
),
);
}
HttpResponse::Ok().json(success_payload)
}
Err(err) => {
let processed: ProcessedError = process_sqlx_error_with_context(&err, None);
log_gateway_operation_result(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start,
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
processed_error(processed)
}
}
}