use actix_web::{HttpRequest, HttpResponse, Responder, http::StatusCode, post, web};
use serde::Deserialize;
use serde_json::{Value, json};
use sqlx::postgres::PgRow;
use sqlx::types::Json;
use sqlx::{Pool, Postgres, Row};
use std::time::Instant;
use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::response::processed_error;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::utils::request_logging::{log_operation_event, log_request};
#[derive(Debug, Deserialize)]
struct GatewayQueryRequest {
query: String,
}
#[post("/gateway/query")]
pub async fn gateway_query_route(
req: HttpRequest,
body: web::Json<GatewayQueryRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let logged_request = log_request(req.clone(), Some(app_state.get_ref()));
let operation_start = Instant::now();
let client_name = x_athena_client(&req);
if client_name.is_empty() {
return HttpResponse::BadRequest()
.json(json!({ "error": "X-Athena-Client header is required for /gateway/query" }));
}
let pool: Pool<Postgres> = match app_state.pg_registry.get_pool(&client_name) {
Some(pool) => pool,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let trimmed_query = body.query.trim();
if trimmed_query.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "Query cannot be empty"
}));
}
let wrapped_query: String = format!("SELECT to_jsonb(t) AS row FROM ({}) t", trimmed_query);
let rows: Result<Vec<PgRow>, sqlx::Error> = sqlx::query(&wrapped_query).fetch_all(&pool).await;
match rows {
Ok(rows) => {
let data: Vec<Value> = rows
.into_iter()
.filter_map(|row| row.try_get::<Json<Value>, _>("row").ok())
.map(|json| json.0)
.collect();
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"query": trimmed_query,
})),
);
HttpResponse::Ok().json(json!({ "data": data }))
}
Err(err) => {
let processed = process_sqlx_error_with_context(&err, None);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start.elapsed().as_millis(),
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
processed_error(processed)
}
}
}