athena_rs 0.82.2

Database gateway API
Documentation
//! SQL query endpoint for executing queries against supported drivers.
//!
//! Overview
//! --------
//! This endpoint accepts a JSON payload with `query`, `driver`, and `db_name`,
//! dispatches to the selected backend (Athena/Scylla, PostgreSQL, or Supabase),
//! and returns a normalized JSON response that includes timing and status fields.
//!
//! Request shape (JSON)
//! --------------------
//! - `query`: SQL string to execute.
//! - `driver`: One of `"athena" | "postgresql" | "supabase"`.
//! - `db_name`: Logical database name (used by some drivers, e.g. PostgreSQL/Supabase).
//!
//! Response shape (success)
//! ------------------------
//! For relational backends (PostgreSQL/Supabase) a typical successful response resembles:
//!
//! ```json
//! {
//!   "data": [ { "col": "value" } ],
//!   "db_name": "example_db",
//!   "duration": 12,
//!   "message": "Successfully executed query",
//!   "status": "success"
//! }
//! ```
//!
//! For Athena/Scylla, results are returned as a JSON array of rows:
//!
//! ```json
//! [ { "col": "value" } ]
//! ```
//!
//! Error responses
//! ---------------
//! - `400 Bad Request` if an unsupported `driver` is provided.
//! - `503 Service Unavailable` when Athena/Scylla is unreachable (connection errors).
//! - `500 Internal Server Error` for driver-specific execution failures.
//!
//! Tracing and logging
//! -------------------
//! - The handler uses `tracing` with `#[instrument]` to attach request-scoped fields
//!   (`driver`, `db_name`, `query_len`).
//! - Logs are structured; long SQL is truncated to a short preview to limit noise.
//! - Configure verbosity with `RUST_LOG`, e.g. `RUST_LOG=info,athena=debug`.
//!
//! Security
//! --------
//! - Ensure queries come from trusted sources or are validated to avoid unsafe operations.
//! - Avoid logging full SQL containing sensitive data; only a short preview is emitted.

use actix_web::{HttpRequest, Responder, post, web};
use serde::Deserialize;
use serde_json::{Value, json};
use sqlx::Row;
use sqlx::types::Json;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{error, info, warn};
// crate imports
use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::response::{
    api_ok, api_success_value, bad_request, internal_error, processed_error, service_unavailable,
};
use crate::drivers::scylla::client::execute_query;
use crate::drivers::supabase::execute_query_supabase;
use crate::error::sqlx_parser::process_sqlx_error_with_context;

#[derive(Deserialize)]
/// Incoming request body for `/query/sql`.
///
/// The body is deserialized from JSON and validated for a supported `driver`.
pub struct SqlQueryRequest {
    /// SQL string to execute
    pub query: String,
    /// Target driver: `"athena" | "postgresql" | "supabase"`
    pub driver: String,
    /// Logical database name (used by certain drivers)
    pub db_name: String,
}

/// Builder-friendly representation of a SQL query request with parameters and cache key.
pub struct SqlQuery {
    pub query: String,
    pub params: HashMap<String, String>,
    pub cache_key: String,
    pub driver: String,
}

impl SqlQuery {
    pub fn new(query: String, params: HashMap<String, String>, cache_key: String) -> Self {
        Self {
            query,
            params,
            cache_key,
            driver: "scylla".to_string(),
        }
    }
}

// #[instrument(
//     skip(body),
//     fields(
//         driver = %body.driver,
//         db_name = %body.db_name,
//         query_len = body.query.len()
//     )
// )]
#[post("/query/sql")]
/// Execute the given SQL against the specified `driver` and return JSON results.
///
/// Examples
/// --------
/// Request (POST `/query/sql`):
///
/// ```json
/// {
///   "query": "select 1 as col",
///   "driver": "postgresql",
///   "db_name": "example_db"
/// }
/// ```
///
/// Successful response (PostgreSQL/Supabase):
///
/// ```json
/// {
///   "data": [{ "col": 1 }],
///   "db_name": "example_db",
///   "duration": 5,
///   "message": "Successfully executed query",
///   "status": "success"
/// }
/// ```
///
/// On failure, a structured error is returned with an appropriate HTTP status code.
pub async fn sql_query(
    req: HttpRequest,
    body: web::Json<SqlQueryRequest>,
    app_state: web::Data<AppState>,
) -> impl Responder {
    let sql_query: String = body.query.clone();
    let driver: String = body.driver.clone();
    if driver != "athena" && driver != "postgresql" && driver != "supabase" {
        warn!(%driver, "invalid driver");
        return bad_request(
            "Invalid driver specified",
            format!(
                "Driver '{}' is not supported. Use athena, postgresql, or supabase.",
                driver
            ),
        );
    }

    if driver == "postgresql" {
        let client_name: String = x_athena_client(&req);
        if client_name.is_empty() {
            return bad_request(
                "Missing required header",
                "X-Athena-Client header is required when using the postgresql driver",
            );
        }

        if let Some(pool) = app_state.pg_registry.get_pool(&client_name) {
            let wrapped_query: String = format!("SELECT to_jsonb(t) AS row FROM ({}) t", sql_query);
            let start_time: Instant = Instant::now();

            match sqlx::query(&wrapped_query).fetch_all(&pool).await {
                Ok(rows) => {
                    let duration: u64 = start_time.elapsed().as_millis() as u64;
                    let data: Vec<Value> = rows
                        .into_iter()
                        .filter_map(|row| row.try_get::<Json<Value>, _>("row").ok())
                        .map(|json| json.0)
                        .collect();

                    info!("postgresql query ok");
                    return api_success_value(
                        "Successfully executed query",
                        json!({
                            "rows": data,
                            "db_name": body.db_name.clone(),
                            "duration_ms": duration
                        }),
                    );
                }
                Err(e) => {
                    error!(error = %e, "postgresql query failed");
                    let processed = process_sqlx_error_with_context(&e, Some(&body.db_name));
                    return processed_error(processed);
                }
            }
        } else {
            return bad_request(
                "Postgres client not configured",
                format!("Client '{}' is not available in the registry", client_name),
            );
        }
    }

    if driver == "supabase" {
        match execute_query_supabase(sql_query.clone(), body.db_name.clone()).await {
            Ok(results) => {
                info!("supabase query ok");
                return api_ok(results);
            }
            Err(e) => {
                error!(error = %e, "supabase query failed");
                return internal_error("Query execution failed", format!("Supabase error: {}", e));
            }
        }
    }

    // athena
    match execute_query(sql_query.clone()).await {
        Ok((rows, columns)) => api_success_value(
            "Successfully executed query",
            json!({
                "rows": rows,
                "columns": columns
            }),
        ),
        Err(err) => {
            let error_msg: String = err.to_string();
            error!(error = %error_msg, "athena query failed");

            if error_msg.contains("connection")
                && (error_msg.contains("refused")
                    || error_msg.contains("Control connection pool error")
                    || error_msg.contains("target machine actively refused"))
            {
                warn!("athena/scylladb unreachable");
                return service_unavailable(
                    "Athena server is not reachable",
                    format!(
                        "Connection error: {}. Ensure ScyllaDB is running on the configured port.",
                        error_msg
                    ),
                );
            }

            warn!(
                failed_query_preview = %sql_query.chars().take(100).collect::<String>(),
                "failed query preview"
            );

            internal_error(
                "Query execution failed",
                format!("Athena error: {}", error_msg),
            )
        }
    }
}