use actix_web::{HttpRequest, Responder, post, web};
use serde::Deserialize;
use serde_json::{Value, json};
use sqlx::Row;
use sqlx::types::Json;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{error, info, warn};
use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::response::{
api_ok, api_success_value, bad_request, internal_error, service_unavailable,
};
use crate::drivers::scylla::client::execute_query;
use crate::drivers::supabase::execute_query_supabase;
#[derive(Deserialize)]
pub struct SqlQueryRequest {
pub query: String,
pub driver: String,
pub db_name: String,
}
pub struct SqlQuery {
pub query: String,
pub params: HashMap<String, String>,
pub cache_key: String,
pub driver: String,
}
impl SqlQuery {
pub fn new(query: String, params: HashMap<String, String>, cache_key: String) -> Self {
Self {
query,
params,
cache_key,
driver: "scylla".to_string(),
}
}
}
#[post("/query/sql")]
pub async fn sql_query(
req: HttpRequest,
body: web::Json<SqlQueryRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let sql_query: String = body.query.clone();
let driver: String = body.driver.clone();
if driver != "athena" && driver != "postgresql" && driver != "supabase" {
warn!(%driver, "invalid driver");
return bad_request(
"Invalid driver specified",
format!(
"Driver '{}' is not supported. Use athena, postgresql, or supabase.",
driver
),
);
}
if driver == "postgresql" {
let client_name: String = x_athena_client(&req);
if client_name.is_empty() {
return bad_request(
"Missing required header",
"X-Athena-Client header is required when using the postgresql driver",
);
}
if let Some(pool) = app_state.pg_registry.get_pool(&client_name) {
let wrapped_query: String = format!("SELECT to_jsonb(t) AS row FROM ({}) t", sql_query);
let start_time: Instant = Instant::now();
match sqlx::query(&wrapped_query).fetch_all(&pool).await {
Ok(rows) => {
let duration: u64 = start_time.elapsed().as_millis() as u64;
let data: Vec<Value> = rows
.into_iter()
.filter_map(|row| row.try_get::<Json<Value>, _>("row").ok())
.map(|json| json.0)
.collect();
info!("postgresql query ok");
return api_success_value(
"Successfully executed query",
json!({
"rows": data,
"db_name": body.db_name.clone(),
"duration_ms": duration
}),
);
}
Err(e) => {
error!(error = %e, "postgresql query failed");
return internal_error(
"Query execution failed",
format!("PostgreSQL error: {}", e),
);
}
}
} else {
return bad_request(
"Postgres client not configured",
format!("Client '{}' is not available in the registry", client_name),
);
}
}
if driver == "supabase" {
match execute_query_supabase(sql_query.clone(), body.db_name.clone()).await {
Ok(results) => {
info!("supabase query ok");
return api_ok(results);
}
Err(e) => {
error!(error = %e, "supabase query failed");
return internal_error("Query execution failed", format!("Supabase error: {}", e));
}
}
}
match execute_query(sql_query.clone()).await {
Ok((rows, columns)) => api_success_value(
"Successfully executed query",
json!({
"rows": rows,
"columns": columns
}),
),
Err(err) => {
let error_msg: String = err.to_string();
error!(error = %error_msg, "athena query failed");
if error_msg.contains("connection")
&& (error_msg.contains("refused")
|| error_msg.contains("Control connection pool error")
|| error_msg.contains("target machine actively refused"))
{
warn!("athena/scylladb unreachable");
return service_unavailable(
"Athena server is not reachable",
format!(
"Connection error: {}. Ensure ScyllaDB is running on the configured port.",
error_msg
),
);
}
warn!(
failed_query_preview = %sql_query.chars().take(100).collect::<String>(),
"failed query preview"
);
internal_error(
"Query execution failed",
format!("Athena error: {}", error_msg),
)
}
}
}