athena_rs 3.4.7

Database driver
Documentation
//! SQL query endpoint for executing queries against supported drivers.
//!
//! Overview
//! --------
//! This endpoint accepts a JSON payload with `query`, `driver`, and `db_name`,
//! dispatches to the selected backend (Athena/Scylla, PostgreSQL, or Supabase),
//! and returns a normalized JSON response that includes timing and status fields.
//!
//! Request shape (JSON)
//! --------------------
//! - `query`: SQL string to execute.
//! - `driver`: One of `"athena" | "postgresql" | "supabase"`.
//! - `db_name`: Logical database name (used by some drivers, e.g. PostgreSQL/Supabase).
//!
//! For `driver: postgresql`, use `X-Athena-Client` or `X-JDBC-URL` (same resolution rules as gateway `resolve_postgres_pool`).
//!
//! Response shape (success)
//! ------------------------
//! For relational backends (PostgreSQL/Supabase) a typical successful response resembles:
//!
//! ```json
//! {
//!   "data": [ { "col": "value" } ],
//!   "db_name": "example_db",
//!   "duration": 12,
//!   "message": "Successfully executed query",
//!   "status": "success"
//! }
//! ```
//!
//! For Athena/Scylla, results are returned as a JSON array of rows:
//!
//! ```json
//! [ { "col": "value" } ]
//! ```
//!
//! Error responses
//! ---------------
//! - `400 Bad Request` if an unsupported `driver` is provided.
//! - `503 Service Unavailable` when Athena/Scylla is unreachable (connection errors).
//! - `500 Internal Server Error` for driver-specific execution failures.
//!
//! Tracing and logging
//! -------------------
//! - The handler uses `tracing` with `#[instrument]` to attach request-scoped fields
//!   (`driver`, `db_name`, `query_len`).
//! - Logs are structured; long SQL is truncated to a short preview to limit noise.
//! - Configure verbosity with `RUST_LOG`, e.g. `RUST_LOG=info,athena=debug`.
//!
//! Security
//! --------
//! - Ensure queries come from trusted sources or are validated to avoid unsafe operations.
//! - Avoid logging full SQL containing sensitive data; only a short preview is emitted.

use actix_web::{HttpRequest, Responder, post, web};
use serde_json::json;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{error, info, warn};

/// ANSI for log output (when stdout is a TTY).
const R: &str = "\x1b[31m";
const Y: &str = "\x1b[33m";
const Z: &str = "\x1b[0m";

/// True if the error is a missing-relation / undefined-table style error (e.g. auth.users missing).
fn is_missing_relation(err: &sqlx::Error) -> bool {
    if let sqlx::Error::Database(db) = err {
        let msg = db.message();
        let code = db.code().as_ref().map(|c| c.to_string());
        let code_str = code.as_deref();
        code_str == Some("42P01") || msg.contains("does not exist")
    } else {
        false
    }
}

// crate imports
use crate::AppState;
use crate::api::gateway::contracts::GatewaySqlExecutionRequest;
use crate::api::gateway::pool_resolver::resolve_postgres_pool;
use crate::api::response::{
    api_ok, api_success_value, bad_request, internal_error, processed_error, service_unavailable,
};
use crate::drivers::postgresql::raw_sql::{execute_postgres_sql, normalize_sql_query};
use crate::drivers::scylla::client::execute_query;
use crate::drivers::supabase::execute_query_supabase;
use crate::error::sqlx_parser::process_sqlx_error_with_context;

/// Builder-friendly representation of a SQL query request with parameters and cache key.
pub struct SqlQuery {
    pub query: String,
    pub params: HashMap<String, String>,
    pub cache_key: String,
    pub driver: String,
}

impl SqlQuery {
    pub fn new(query: String, params: HashMap<String, String>, cache_key: String) -> Self {
        Self {
            query,
            params,
            cache_key,
            driver: "scylla".to_string(),
        }
    }
}

// #[instrument(
//     skip(body),
//     fields(
//         driver = %body.driver,
//         db_name = %body.db_name,
//         query_len = body.query.len()
//     )
// )]
async fn handle_sql_query(
    req: HttpRequest,
    body: web::Json<GatewaySqlExecutionRequest>,
    app_state: web::Data<AppState>,
) -> actix_web::HttpResponse {
    let sql_text: String = body.query.clone();
    let driver: String = body.driver.clone();
    if driver != "athena" && driver != "postgresql" && driver != "supabase" {
        warn!(%driver, "invalid driver");
        return bad_request(
            "Invalid driver specified",
            format!(
                "Driver '{}' is not supported. Use athena, postgresql, or supabase.",
                driver
            ),
        );
    }

    if driver == "postgresql" {
        let pool = match resolve_postgres_pool(&req, app_state.get_ref()).await {
            Ok(pool) => pool,
            Err(resp) => return resp,
        };

        let normalized_sql = normalize_sql_query(&body.query);
        let start_time: Instant = Instant::now();

        if normalized_sql.is_empty() {
            return bad_request(
                "Invalid query",
                "Query cannot be empty or contain only semicolons.",
            );
        }

        match execute_postgres_sql(&pool, &normalized_sql).await {
            Ok(result) => {
                let duration: u64 = start_time.elapsed().as_millis() as u64;

                info!("postgresql query ok");
                return api_success_value(
                    "Successfully executed query",
                    json!({
                        "rows": result.rows,
                        "db_name": body.db_name.clone(),
                        "duration_ms": duration,
                        "statement_count": result.summary.statement_count,
                        "rows_affected": result.summary.rows_affected,
                        "returned_row_count": result.summary.returned_row_count,
                    }),
                );
            }
            Err(e) => {
                if is_missing_relation(&e) {
                    warn!(
                        error = %e,
                        "{}postgresql query failed (missing relation){} — e.g. table/schema not present for this client",
                        Y, Z
                    );
                } else {
                    error!(error = %e, "{}postgresql query failed{}", R, Z);
                }
                let processed = process_sqlx_error_with_context(&e, Some(&body.db_name));
                return processed_error(processed);
            }
        }
    }

    if driver == "supabase" {
        match execute_query_supabase(sql_text.clone(), body.db_name.clone()).await {
            Ok(results) => {
                info!("supabase query ok");
                return api_ok(results);
            }
            Err(e) => {
                error!(error = %e, "supabase query failed");
                return internal_error("Query execution failed", format!("Supabase error: {}", e));
            }
        }
    }

    // athena
    match execute_query(sql_text.clone()).await {
        Ok((rows, columns)) => api_success_value(
            "Successfully executed query",
            json!({
                "rows": rows,
                "columns": columns
            }),
        ),
        Err(err) => {
            let error_msg: String = err.to_string();
            error!(error = %error_msg, "athena query failed");

            if error_msg.contains("connection")
                && (error_msg.contains("refused")
                    || error_msg.contains("Control connection pool error")
                    || error_msg.contains("target machine actively refused"))
            {
                warn!("athena/scylladb unreachable");
                return service_unavailable(
                    "Athena server is not reachable",
                    format!(
                        "Connection error: {}. Ensure ScyllaDB is running on the configured port.",
                        error_msg
                    ),
                );
            }

            warn!(
                failed_query_preview = %sql_text.chars().take(100).collect::<String>(),
                "failed query preview"
            );

            internal_error(
                "Query execution failed",
                format!("Athena error: {}", error_msg),
            )
        }
    }
}

#[post("/query/sql")]
/// Execute the given SQL against the specified `driver` and return JSON results.
///
/// Examples
/// --------
/// Request (POST `/query/sql`):
///
/// ```json
/// {
///   "query": "select 1 as col",
///   "driver": "postgresql",
///   "db_name": "example_db"
/// }
/// ```
///
/// Successful response (PostgreSQL/Supabase):
///
/// ```json
/// {
///   "data": [{ "col": 1 }],
///   "db_name": "example_db",
///   "duration": 5,
///   "message": "Successfully executed query",
///   "status": "success"
/// }
/// ```
///
/// On failure, a structured error is returned with an appropriate HTTP status code.
pub async fn sql_query(
    req: HttpRequest,
    body: web::Json<GatewaySqlExecutionRequest>,
    app_state: web::Data<AppState>,
) -> impl Responder {
    handle_sql_query(req, body, app_state).await
}

/// Alias route for SQL execution so SDKs can consistently target `/gateway/sql`.
#[post("/gateway/sql")]
pub async fn gateway_sql_query(
    req: HttpRequest,
    body: web::Json<GatewaySqlExecutionRequest>,
    app_state: web::Data<AppState>,
) -> impl Responder {
    handle_sql_query(req, body, app_state).await
}