use actix_web::{HttpRequest, Responder, post, web};
use serde::Deserialize;
use serde_json::json;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{error, info, warn};
const R: &str = "\x1b[31m";
const Y: &str = "\x1b[33m";
const Z: &str = "\x1b[0m";
fn is_missing_relation(err: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db) = err {
let msg = db.message();
let code = db.code().as_ref().map(|c| c.to_string());
let code_str = code.as_deref();
code_str == Some("42P01") || msg.contains("does not exist")
} else {
false
}
}
use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::response::{
api_ok, api_success_value, bad_request, internal_error, processed_error, service_unavailable,
};
use crate::drivers::postgresql::raw_sql::{execute_postgres_sql, normalize_sql_query};
use crate::drivers::scylla::client::execute_query;
use crate::drivers::supabase::execute_query_supabase;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
#[derive(Deserialize)]
pub struct SqlQueryRequest {
pub query: String,
pub driver: String,
pub db_name: String,
}
pub struct SqlQuery {
pub query: String,
pub params: HashMap<String, String>,
pub cache_key: String,
pub driver: String,
}
impl SqlQuery {
pub fn new(query: String, params: HashMap<String, String>, cache_key: String) -> Self {
Self {
query,
params,
cache_key,
driver: "scylla".to_string(),
}
}
}
#[post("/query/sql")]
pub async fn sql_query(
req: HttpRequest,
body: web::Json<SqlQueryRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let sql_query: String = body.query.clone();
let driver: String = body.driver.clone();
if driver != "athena" && driver != "postgresql" && driver != "supabase" {
warn!(%driver, "invalid driver");
return bad_request(
"Invalid driver specified",
format!(
"Driver '{}' is not supported. Use athena, postgresql, or supabase.",
driver
),
);
}
if driver == "postgresql" {
let client_name: String = x_athena_client(&req);
if client_name.is_empty() {
return bad_request(
"Missing required header",
"X-Athena-Client header is required when using the postgresql driver",
);
}
if let Some(pool) = app_state.pg_registry.get_pool(&client_name) {
let normalized_sql = normalize_sql_query(&body.query);
let start_time: Instant = Instant::now();
if normalized_sql.is_empty() {
return bad_request(
"Invalid query",
"Query cannot be empty or contain only semicolons.",
);
}
match execute_postgres_sql(&pool, &normalized_sql).await {
Ok(result) => {
let duration: u64 = start_time.elapsed().as_millis() as u64;
info!("postgresql query ok");
return api_success_value(
"Successfully executed query",
json!({
"rows": result.rows,
"db_name": body.db_name.clone(),
"duration_ms": duration,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
}),
);
}
Err(e) => {
if is_missing_relation(&e) {
warn!(
error = %e,
"{}postgresql query failed (missing relation){} — e.g. table/schema not present for this client",
Y, Z
);
} else {
error!(error = %e, "{}postgresql query failed{}", R, Z);
}
let processed = process_sqlx_error_with_context(&e, Some(&body.db_name));
return processed_error(processed);
}
}
} else {
return bad_request(
"Postgres client not configured",
format!("Client '{}' is not available in the registry", client_name),
);
}
}
if driver == "supabase" {
match execute_query_supabase(sql_query.clone(), body.db_name.clone()).await {
Ok(results) => {
info!("supabase query ok");
return api_ok(results);
}
Err(e) => {
error!(error = %e, "supabase query failed");
return internal_error("Query execution failed", format!("Supabase error: {}", e));
}
}
}
match execute_query(sql_query.clone()).await {
Ok((rows, columns)) => api_success_value(
"Successfully executed query",
json!({
"rows": rows,
"columns": columns
}),
),
Err(err) => {
let error_msg: String = err.to_string();
error!(error = %error_msg, "athena query failed");
if error_msg.contains("connection")
&& (error_msg.contains("refused")
|| error_msg.contains("Control connection pool error")
|| error_msg.contains("target machine actively refused"))
{
warn!("athena/scylladb unreachable");
return service_unavailable(
"Athena server is not reachable",
format!(
"Connection error: {}. Ensure ScyllaDB is running on the configured port.",
error_msg
),
);
}
warn!(
failed_query_preview = %sql_query.chars().take(100).collect::<String>(),
"failed query preview"
);
internal_error(
"Query execution failed",
format!("Athena error: {}", error_msg),
)
}
}
}