athena_rs 1.1.0

Database gateway API
Documentation
//! Query gateway route that executes user-provided SQL against configured Postgres pools.
//!
//! Accepts either `X-Athena-Client` (pre-configured pool) or `X-JDBC-URL` (direct connection).
use actix_web::{HttpRequest, HttpResponse, Responder, http::StatusCode, post, web};
use serde::Deserialize;
use serde_json::json;
use sqlx::{Pool, Postgres};
use std::time::Instant;

use crate::api::gateway::auth::{authorize_gateway_request, query_right};
use crate::api::gateway::pool_resolver::resolve_postgres_pool;
use crate::api::response::{bad_request, processed_error};
use crate::drivers::postgresql::raw_sql::{execute_postgres_sql, normalize_sql_query};
use crate::error::ProcessedError;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};

#[derive(Debug, Deserialize)]
/// Payload accepted by `/gateway/query`.
struct GatewayQueryRequest {
    /// SQL string to execute.
    query: String,
}

#[post("/gateway/query")]
/// Executes the raw SQL statement from the request body using the configured Postgres client.
///
/// # Parameters
/// - `req`: Incoming request that must include `X-Athena-Client`.
/// - `body`: JSON payload wrapping the SQL string in `query`.
/// - `app_state`: Shared state that exposes configured Postgres pools.
///
/// # Returns
/// JSON with a `"data"` array of rows when the query succeeds, or errors when the client is unavailable or the SQL fails.
///
/// # Example (via X-Athena-Client)
/// ```http
/// POST /gateway/query
/// X-Athena-Client: reporting
/// Content-Type: application/json
///
/// {
///   "query": "SELECT id, name FROM users LIMIT 10"
/// }
/// ```
///
/// # Example (via X-JDBC-URL for direct connection)
/// ```http
/// POST /gateway/query
/// X-JDBC-URL: jdbc:postgresql://localhost:5432/mydb
/// Content-Type: application/json
///
/// {
///   "query": "SELECT id, name FROM users LIMIT 10"
/// }
/// ```
pub async fn gateway_query_route(
    req: HttpRequest,
    body: web::Json<GatewayQueryRequest>,
    app_state: web::Data<crate::AppState>,
) -> impl Responder {
    let operation_start: Instant = Instant::now();
    let auth =
        authorize_gateway_request(&req, app_state.get_ref(), None, vec![query_right()]).await;
    let logged_request: LoggedRequest = log_request(
        req.clone(),
        Some(app_state.get_ref()),
        Some(auth.request_id.clone()),
        Some(&auth.log_context),
    );
    if let Some(resp) = auth.response {
        return resp;
    }

    let pool: Pool<Postgres> = match resolve_postgres_pool(&req, app_state.get_ref()).await {
        Ok(p) => p,
        Err(resp) => return resp,
    };

    let normalized_query = normalize_sql_query(&body.query);
    if normalized_query.is_empty() {
        return bad_request(
            "Invalid query",
            "Query cannot be empty or contain only semicolons.",
        );
    }
    match execute_postgres_sql(&pool, &normalized_query).await {
        Ok(result) => {
            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "query",
                None,
                operation_start.elapsed().as_millis(),
                StatusCode::OK,
                Some(json!({
                    "query": normalized_query,
                    "statement_count": result.summary.statement_count,
                    "rows_affected": result.summary.rows_affected,
                    "returned_row_count": result.summary.returned_row_count,
                })),
            );
            HttpResponse::Ok().json(json!({
                "data": result.rows,
                "meta": {
                    "statement_count": result.summary.statement_count,
                    "rows_affected": result.summary.rows_affected,
                    "returned_row_count": result.summary.returned_row_count,
                }
            }))
        }
        Err(err) => {
            let processed: ProcessedError = process_sqlx_error_with_context(&err, None);
            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "query",
                None,
                operation_start.elapsed().as_millis(),
                processed.status_code,
                Some(json!({
                    "error_code": processed.error_code,
                    "trace_id": processed.trace_id,
                })),
            );
            processed_error(processed)
        }
    }
}