use actix_web::{HttpRequest, HttpResponse, Responder, http::StatusCode, post, web};
use serde::Deserialize;
use serde_json::json;
use sqlx::{Pool, Postgres};
use std::time::Instant;
use crate::api::gateway::auth::{authorize_gateway_request, query_right};
use crate::api::gateway::pool_resolver::resolve_postgres_pool;
use crate::api::response::{bad_request, processed_error};
use crate::drivers::postgresql::raw_sql::{execute_postgres_sql, normalize_sql_query};
use crate::error::ProcessedError;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
#[derive(Debug, Deserialize)]
struct GatewayQueryRequest {
query: String,
}
#[post("/gateway/query")]
pub async fn gateway_query_route(
req: HttpRequest,
body: web::Json<GatewayQueryRequest>,
app_state: web::Data<crate::AppState>,
) -> impl Responder {
let operation_start: Instant = Instant::now();
let auth =
authorize_gateway_request(&req, app_state.get_ref(), None, vec![query_right()]).await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let pool: Pool<Postgres> = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(p) => p,
Err(resp) => return resp,
};
let normalized_query = normalize_sql_query(&body.query);
if normalized_query.is_empty() {
return bad_request(
"Invalid query",
"Query cannot be empty or contain only semicolons.",
);
}
match execute_postgres_sql(&pool, &normalized_query).await {
Ok(result) => {
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"query": normalized_query,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
})),
);
HttpResponse::Ok().json(json!({
"data": result.rows,
"meta": {
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
}
}))
}
Err(err) => {
let processed: ProcessedError = process_sqlx_error_with_context(&err, None);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"query",
None,
operation_start.elapsed().as_millis(),
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
processed_error(processed)
}
}
}