use actix_web::{HttpRequest, Responder, post, web};
use serde_json::json;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{debug, error, info, warn};
const MAX_SQL_DRIVER_LEN: usize = 32;
fn is_missing_relation(err: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db) = err {
let msg = db.message();
let code = db.code().as_ref().map(|c| c.to_string());
let code_str = code.as_deref();
code_str == Some("42P01") || msg.contains("does not exist")
} else {
false
}
}
fn sql_script_error_service_unavailable_response(
script_err: &crate::drivers::postgresql::raw_sql::PostgresSqlScriptError,
) -> actix_web::HttpResponse {
actix_web::HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"message": "SQL statement execution failed",
"error": script_err.message,
"details": {
"statement_index": script_err.statement_index,
"total_statements": script_err.total_statements,
"statement": script_err.statement,
"line_start": script_err.line_start,
"line_end": script_err.line_end,
"preprocess": script_err.preprocess,
}
}))
}
use crate::AppState;
use crate::api::client_context::ATHENA_CLIENT_HEADER;
use crate::api::direct_pg_uri_auth_bypass_enabled;
use crate::api::gateway::auth::{
GatewayAuthOptions, query_right, require_admin_or_gateway_with_options,
};
use crate::api::gateway::contracts::{
GatewaySqlExecutionRequest, gateway_sql_execution_mode_to_transaction_mode,
normalize_gateway_schema_name,
};
use crate::api::gateway::pool_resolver::resolve_postgres_pool;
use crate::api::rate_limit::check_inbound_optional;
use crate::api::response::{
api_ok, api_success_value, bad_request, internal_error, processed_error, service_unavailable,
};
use crate::athena::resolver::{
AthenaClientResolveError, AthenaResolvedQueryBackend, resolve_query_backend,
};
use crate::drivers::cloudflare_d1::client::{
HEADER_D1_BOOKMARK, HEADER_D1_SESSION_MODE, execute_query_via_proxy,
};
use crate::drivers::postgresql::raw_sql::{
PostgresSqlTransactionMode, execute_postgres_sql_script,
};
use crate::drivers::scylla::client::{execute_query, execute_query_with_info};
use crate::drivers::supabase::execute_query_supabase;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
pub struct SqlQuery {
pub query: String,
pub params: HashMap<String, String>,
pub cache_key: String,
pub driver: String,
}
impl SqlQuery {
pub fn new(query: String, params: HashMap<String, String>, cache_key: String) -> Self {
Self {
query,
params,
cache_key,
driver: "scylla".to_string(),
}
}
}
fn normalize_sql_driver(driver: &str) -> Option<&'static str> {
match driver.trim().to_ascii_lowercase().as_str() {
"athena" | "scylla" | "scylladb" => Some("athena"),
"postgresql" | "postgres" => Some("postgresql"),
"supabase" => Some("supabase"),
"cloudflare-d1" | "d1" => Some("cloudflare-d1"),
_ => None,
}
}
fn scylla_resolution_error_response(err: AthenaClientResolveError) -> actix_web::HttpResponse {
match err {
AthenaClientResolveError::Inactive { client_name } => bad_request(
"Scylla client is inactive",
format!("Client '{}' is inactive.", client_name),
),
AthenaClientResolveError::Frozen { client_name } => bad_request(
"Scylla client is frozen",
format!("Client '{}' is frozen.", client_name),
),
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => bad_request(
"Invalid Scylla client metadata",
format!("Client '{}' {}", client_name, message),
),
AthenaClientResolveError::Lookup {
client_name,
message,
} => service_unavailable(
"Failed to resolve Scylla client",
format!("Client '{}' lookup failed: {}", client_name, message),
),
}
}
fn d1_resolution_error_response(err: AthenaClientResolveError) -> actix_web::HttpResponse {
match err {
AthenaClientResolveError::Inactive { client_name } => bad_request(
"Cloudflare D1 client is inactive",
format!("Client '{}' is inactive.", client_name),
),
AthenaClientResolveError::Frozen { client_name } => bad_request(
"Cloudflare D1 client is frozen",
format!("Client '{}' is frozen.", client_name),
),
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => bad_request(
"Invalid Cloudflare D1 client metadata",
format!("Client '{}' {}", client_name, message),
),
AthenaClientResolveError::Lookup {
client_name,
message,
} => service_unavailable(
"Failed to resolve Cloudflare D1 client",
format!("Client '{}' lookup failed: {}", client_name, message),
),
}
}
async fn execute_scylla_request(
req: &HttpRequest,
app_state: &AppState,
sql_text: String,
) -> Result<actix_web::HttpResponse, actix_web::HttpResponse> {
let client_name = req
.headers()
.get(ATHENA_CLIENT_HEADER)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let resolved_backend = match client_name.as_deref() {
Some(client_name) => match resolve_query_backend(app_state, client_name).await {
Ok(resolution) => resolution,
Err(err) => return Err(scylla_resolution_error_response(err)),
},
None => None,
};
let result = match resolved_backend {
Some(AthenaResolvedQueryBackend::Scylla {
connection_info, ..
}) => execute_query_with_info(sql_text.clone(), &connection_info).await,
_ => execute_query(sql_text.clone()).await,
};
match result {
Ok((rows, columns)) => Ok(api_success_value(
"Successfully executed query",
json!({
"rows": rows,
"columns": columns,
"driver": "scylla",
}),
)),
Err(err) => {
let error_msg: String = err.to_string();
error!(error = %error_msg, "athena query failed");
if error_msg.contains("connection")
&& (error_msg.contains("refused")
|| error_msg.contains("Control connection pool error")
|| error_msg.contains("target machine actively refused"))
{
warn!("athena/scylladb unreachable");
return Err(service_unavailable(
"Athena server is not reachable",
format!(
"Connection error: {}. Ensure ScyllaDB is running on the configured port.",
error_msg
),
));
}
warn!(
client = %client_name.unwrap_or_else(|| "<default>".to_string()),
failed_query_preview = %sql_text.chars().take(100).collect::<String>(),
"failed query preview"
);
Err(internal_error(
"Query execution failed",
format!("Athena error: {}", error_msg),
))
}
}
}
async fn execute_cloudflare_d1_request(
req: &HttpRequest,
app_state: &AppState,
sql_text: String,
params: Vec<serde_json::Value>,
retry_writes: bool,
) -> Result<actix_web::HttpResponse, actix_web::HttpResponse> {
let client_name = req
.headers()
.get(ATHENA_CLIENT_HEADER)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.ok_or_else(|| {
bad_request(
"Missing required header",
"X-Athena-Client header is required when using the cloudflare-d1 driver",
)
})?;
let resolved_backend = resolve_query_backend(app_state, &client_name)
.await
.map_err(d1_resolution_error_response)?;
let Some(AthenaResolvedQueryBackend::D1 {
connection_info, ..
}) = resolved_backend
else {
return Err(bad_request(
"Cloudflare D1 client not configured",
format!(
"Client '{}' is not registered as a Cloudflare D1 backend",
client_name
),
));
};
let requested_session_mode = req
.headers()
.get(HEADER_D1_SESSION_MODE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let requested_bookmark = req
.headers()
.get(HEADER_D1_BOOKMARK)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let result = execute_query_via_proxy(
&app_state.client,
&connection_info,
&sql_text,
params,
requested_session_mode,
requested_bookmark,
retry_writes,
)
.await
.map_err(|error| service_unavailable("Cloudflare D1 query failed", error))?;
let mut response = api_success_value(
"Successfully executed query",
json!({
"rows": result.rows,
"columns": result.columns,
"driver": "cloudflare-d1",
"duration_ms": result.duration_ms,
"bookmark": result.bookmark,
"count": result.count,
"meta": result.meta,
}),
);
if let Some(bookmark) = result.bookmark
&& let Ok(value) = bookmark.parse()
{
response
.headers_mut()
.insert(HEADER_D1_BOOKMARK.parse().expect("valid header"), value);
}
Ok(response)
}
async fn handle_sql_query(
req: HttpRequest,
body: web::Json<GatewaySqlExecutionRequest>,
app_state: web::Data<AppState>,
) -> actix_web::HttpResponse {
let client_for_auth: Option<String> = req
.headers()
.get(ATHENA_CLIENT_HEADER)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let driver_trimmed: String = body.driver.trim().to_string();
if driver_trimmed.is_empty() || driver_trimmed.len() > MAX_SQL_DRIVER_LEN {
return bad_request(
"Invalid driver specified",
"driver must be a non-empty supported identifier",
);
}
let driver: &str = match normalize_sql_driver(&driver_trimmed) {
Some(driver) => driver,
None => {
debug!(
driver_len = driver_trimmed.len(),
"unsupported sql driver requested"
);
return bad_request(
"Invalid driver specified",
"Driver is not supported. Use athena/scylla, postgresql, supabase, or cloudflare-d1.",
);
}
};
let schema_name = match normalize_gateway_schema_name(body.schema_name.as_deref()) {
Ok(value) => value,
Err(err) => {
return bad_request("Invalid schema_name", err);
}
};
let execution_mode = body
.execution_mode
.map(gateway_sql_execution_mode_to_transaction_mode)
.unwrap_or(PostgresSqlTransactionMode::SingleTransaction);
if schema_name.is_some() && driver != "postgresql" {
return bad_request(
"Unsupported schema_name",
"schema_name is only supported when driver is postgresql/postgres",
);
}
let auth_options = if driver == "postgresql" {
GatewayAuthOptions {
allow_direct_pg_uri_without_api_key: direct_pg_uri_auth_bypass_enabled(),
}
} else {
GatewayAuthOptions::default()
};
if let Err(resp) = require_admin_or_gateway_with_options(
&req,
app_state.get_ref(),
client_for_auth.as_deref(),
vec![query_right()],
auth_options,
)
.await
{
return resp;
}
if let Err(resp) = check_inbound_optional(
&app_state.inbound_rate_limit_raw_sql,
app_state.inbound_rate_limit_trust_x_forwarded_for,
&req,
) {
return resp;
}
let sql_text: String = body.query.clone();
if driver == "postgresql" {
let pool = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let start_time: Instant = Instant::now();
match execute_postgres_sql_script(
&pool,
&body.query,
execution_mode,
schema_name.as_deref(),
)
.await
{
Ok(result) => {
let duration: u64 = start_time.elapsed().as_millis() as u64;
info!("postgresql query ok");
return api_success_value(
"Successfully executed query",
json!({
"rows": result.rows,
"db_name": body.db_name.clone(),
"duration_ms": duration,
"schema_name": schema_name,
"execution_mode": execution_mode,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"statements": result.statements,
"preprocess": result.preprocess,
}),
);
}
Err(script_err) => {
if script_err.status_hint == 400 {
return actix_web::HttpResponse::BadRequest().json(json!({
"status": "error",
"message": "SQL statement execution failed",
"error": script_err.message,
"details": {
"statement_index": script_err.statement_index,
"total_statements": script_err.total_statements,
"statement": script_err.statement,
"line_start": script_err.line_start,
"line_end": script_err.line_end,
"preprocess": script_err.preprocess,
}
}));
}
if script_err.status_hint == 503 {
warn!(error = %script_err.message, "postgresql query failed due to database pool saturation");
return sql_script_error_service_unavailable_response(&script_err);
}
let sqlx_like = sqlx::Error::Protocol(script_err.message.clone());
if is_missing_relation(&sqlx_like) {
warn!(
error = %script_err.message,
"postgresql query failed (missing relation) — table/schema may be absent for this client",
);
} else {
error!(error = %script_err.message, "postgresql query failed");
}
let processed = process_sqlx_error_with_context(&sqlx_like, Some(&body.db_name));
return processed_error(processed);
}
}
}
if driver == "supabase" {
match execute_query_supabase(sql_text.clone(), body.db_name.clone()).await {
Ok(results) => {
info!("supabase query ok");
return api_ok(results);
}
Err(e) => {
error!(error = %e, "supabase query failed");
return internal_error("Query execution failed", format!("Supabase error: {}", e));
}
}
}
if driver == "cloudflare-d1" {
match execute_cloudflare_d1_request(
&req,
app_state.get_ref(),
sql_text.clone(),
body.params.clone(),
true,
)
.await
{
Ok(response) => return response,
Err(response) => return response,
}
}
match execute_scylla_request(&req, app_state.get_ref(), sql_text.clone()).await {
Ok(response) => response,
Err(response) => response,
}
}
#[post("/query/sql")]
pub async fn sql_query(
req: HttpRequest,
body: web::Json<GatewaySqlExecutionRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
handle_sql_query(req, body, app_state).await
}
#[post("/gateway/sql")]
pub async fn gateway_sql_query(
req: HttpRequest,
body: web::Json<GatewaySqlExecutionRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
handle_sql_query(req, body, app_state).await
}
#[cfg(test)]
mod tests {
use super::normalize_sql_driver;
#[test]
fn normalize_sql_driver_accepts_scylla_aliases() {
assert_eq!(normalize_sql_driver("athena"), Some("athena"));
assert_eq!(normalize_sql_driver("scylla"), Some("athena"));
assert_eq!(normalize_sql_driver("scylladb"), Some("athena"));
assert_eq!(normalize_sql_driver("postgres"), Some("postgresql"));
assert_eq!(normalize_sql_driver("supabase"), Some("supabase"));
assert_eq!(normalize_sql_driver("d1"), Some("cloudflare-d1"));
assert_eq!(normalize_sql_driver("cloudflare-d1"), Some("cloudflare-d1"));
assert_eq!(normalize_sql_driver("mysql"), None);
}
}