athena_rs 0.77.1

WIP Database API gateway
Documentation
//! Query gateway route that executes user-provided SQL against configured Postgres pools.
use actix_web::{HttpRequest, HttpResponse, Responder, http::StatusCode, post, web};
use serde::Deserialize;
use serde_json::{Value, json};
use sqlx::postgres::PgRow;
use sqlx::types::Json;
use sqlx::{Pool, Postgres, Row};
use std::time::Instant;

use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::response::processed_error;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::utils::request_logging::{log_operation_event, log_request};

#[derive(Debug, Deserialize)]
/// Payload accepted by `/gateway/query`.
struct GatewayQueryRequest {
    /// SQL string to execute.
    query: String,
}

#[post("/gateway/query")]
/// Executes the raw SQL statement from the request body using the configured Postgres client.
///
/// # Parameters
/// - `req`: Incoming request that must include `X-Athena-Client`.
/// - `body`: JSON payload wrapping the SQL string in `query`.
/// - `app_state`: Shared state that exposes configured Postgres pools.
///
/// # Returns
/// JSON with a `"data"` array of rows when the query succeeds, or errors when the client is unavailable or the SQL fails.
///
/// # Example
/// ```http
/// POST /gateway/query
/// X-Athena-Client: reporting
/// Content-Type: application/json
///
/// {
///   "query": "SELECT id, name FROM users LIMIT 10"
/// }
/// ```
pub async fn gateway_query_route(
    req: HttpRequest,
    body: web::Json<GatewayQueryRequest>,
    app_state: web::Data<AppState>,
) -> impl Responder {
    let logged_request = log_request(req.clone(), Some(app_state.get_ref()));
    let operation_start = Instant::now();
    let client_name = x_athena_client(&req);
    if client_name.is_empty() {
        return HttpResponse::BadRequest()
            .json(json!({ "error": "X-Athena-Client header is required for /gateway/query" }));
    }

    let pool: Pool<Postgres> = match app_state.pg_registry.get_pool(&client_name) {
        Some(pool) => pool,
        None => {
            return HttpResponse::BadRequest().json(json!({
                "error": format!("Postgres client '{}' is not configured", client_name)
            }));
        }
    };

    let trimmed_query = body.query.trim();
    if trimmed_query.is_empty() {
        return HttpResponse::BadRequest().json(json!({
            "error": "Query cannot be empty"
        }));
    }

    // Wrap the query so every row is serialized as JSON.
    let wrapped_query: String = format!("SELECT to_jsonb(t) AS row FROM ({}) t", trimmed_query);

    let rows: Result<Vec<PgRow>, sqlx::Error> = sqlx::query(&wrapped_query).fetch_all(&pool).await;
    match rows {
        Ok(rows) => {
            let data: Vec<Value> = rows
                .into_iter()
                .filter_map(|row| row.try_get::<Json<Value>, _>("row").ok())
                .map(|json| json.0)
                .collect();
            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "query",
                None,
                operation_start.elapsed().as_millis(),
                StatusCode::OK,
                Some(json!({
                    "query": trimmed_query,
                })),
            );
            HttpResponse::Ok().json(json!({ "data": data }))
        }
        Err(err) => {
            let processed = process_sqlx_error_with_context(&err, None);
            log_operation_event(
                Some(app_state.get_ref()),
                &logged_request,
                "query",
                None,
                operation_start.elapsed().as_millis(),
                processed.status_code,
                Some(json!({
                    "error_code": processed.error_code,
                    "trace_id": processed.trace_id,
                })),
            );
            processed_error(processed)
        }
    }
}