use actix_web::{HttpRequest, Responder, post, web};
use serde_json::json;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{debug, error, info, warn};
const MAX_SQL_DRIVER_LEN: usize = 32;
fn effective_athena_client(req: &HttpRequest) -> Option<String> {
let client_name = x_athena_client(req);
if client_name.trim().is_empty() {
None
} else {
Some(client_name)
}
}
fn is_missing_relation(err: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db) = err {
let msg = db.message();
let code = db.code().as_ref().map(|c| c.to_string());
let code_str = code.as_deref();
code_str == Some("42P01") || msg.contains("does not exist")
} else {
false
}
}
fn sql_script_error_service_unavailable_response(
script_err: &crate::drivers::postgresql::raw_sql::PostgresSqlScriptError,
) -> actix_web::HttpResponse {
actix_web::HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"message": "SQL statement execution failed",
"error": script_err.message,
"details": {
"statement_index": script_err.statement_index,
"total_statements": script_err.total_statements,
"statement": script_err.statement,
"line_start": script_err.line_start,
"line_end": script_err.line_end,
"preprocess": script_err.preprocess,
}
}))
}
use crate::AppState;
use crate::api::gateway::auth::{
GatewayAuthOptions, query_right, require_admin_or_gateway_with_options,
};
use crate::api::gateway::contracts::{
D1MigrationExecutionResponse, D1MigrationPreviewResponse, D1MigrationRequest,
GatewaySqlExecutionRequest, gateway_sql_execution_mode_to_transaction_mode,
normalize_gateway_schema_name,
};
use crate::api::gateway::pool_resolver::resolve_postgres_pool;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::query::d1_migration;
use crate::api::rate_limit::check_inbound_optional;
use crate::api::response::{
api_ok, api_success_value, bad_request, internal_error, processed_error, service_unavailable,
};
use crate::athena::resolver::{
AthenaClientResolveError, AthenaResolvedQueryBackend, resolve_query_backend,
};
use crate::drivers::cloudflare_d1::client::{
HEADER_D1_BOOKMARK, HEADER_D1_SESSION_MODE, execute_query_via_proxy,
};
use crate::drivers::postgresql::raw_sql::{
PostgresSqlTransactionMode, execute_postgres_sql_script,
};
use crate::drivers::scylla::client::{execute_query, execute_query_with_info};
use crate::drivers::supabase::execute_query_supabase;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
pub struct SqlQuery {
pub query: String,
pub params: HashMap<String, String>,
pub cache_key: String,
pub driver: String,
}
impl SqlQuery {
pub fn new(query: String, params: HashMap<String, String>, cache_key: String) -> Self {
Self {
query,
params,
cache_key,
driver: "scylla".to_string(),
}
}
}
fn normalize_sql_driver(driver: &str) -> Option<&'static str> {
match driver.trim().to_ascii_lowercase().as_str() {
"athena" | "scylla" | "scylladb" => Some("athena"),
"postgresql" | "postgres" => Some("postgresql"),
"supabase" => Some("supabase"),
"cloudflare-d1" | "d1" | "athena-d1" => Some("cloudflare-d1"),
_ => None,
}
}
fn scylla_resolution_error_response(err: AthenaClientResolveError) -> actix_web::HttpResponse {
match err {
AthenaClientResolveError::Inactive { client_name } => bad_request(
"Scylla client is inactive",
format!("Client '{}' is inactive.", client_name),
),
AthenaClientResolveError::Frozen { client_name } => bad_request(
"Scylla client is frozen",
format!("Client '{}' is frozen.", client_name),
),
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => bad_request(
"Invalid Scylla client metadata",
format!("Client '{}' {}", client_name, message),
),
AthenaClientResolveError::Lookup {
client_name,
message,
} => service_unavailable(
"Failed to resolve Scylla client",
format!("Client '{}' lookup failed: {}", client_name, message),
),
}
}
fn d1_resolution_error_response(err: AthenaClientResolveError) -> actix_web::HttpResponse {
match err {
AthenaClientResolveError::Inactive { client_name } => bad_request(
"Cloudflare D1 client is inactive",
format!("Client '{}' is inactive.", client_name),
),
AthenaClientResolveError::Frozen { client_name } => bad_request(
"Cloudflare D1 client is frozen",
format!("Client '{}' is frozen.", client_name),
),
AthenaClientResolveError::InvalidMetadata {
client_name,
message,
} => bad_request(
"Invalid Cloudflare D1 client metadata",
format!("Client '{}' {}", client_name, message),
),
AthenaClientResolveError::Lookup {
client_name,
message,
} => service_unavailable(
"Failed to resolve Cloudflare D1 client",
format!("Client '{}' lookup failed: {}", client_name, message),
),
}
}
async fn execute_scylla_request(
req: &HttpRequest,
app_state: &AppState,
sql_text: String,
) -> Result<actix_web::HttpResponse, actix_web::HttpResponse> {
let client_name = effective_athena_client(req);
let resolved_backend = match client_name.as_deref() {
Some(client_name) => match resolve_query_backend(app_state, client_name).await {
Ok(resolution) => resolution,
Err(err) => return Err(scylla_resolution_error_response(err)),
},
None => None,
};
let result = match resolved_backend {
Some(AthenaResolvedQueryBackend::Scylla {
connection_info, ..
}) => execute_query_with_info(sql_text.clone(), &connection_info).await,
_ => execute_query(sql_text.clone()).await,
};
match result {
Ok((rows, columns)) => Ok(api_success_value(
"Successfully executed query",
json!({
"rows": rows,
"columns": columns,
"driver": "scylla",
}),
)),
Err(err) => {
let error_msg: String = err.to_string();
error!(error = %error_msg, "athena query failed");
if error_msg.contains("connection")
&& (error_msg.contains("refused")
|| error_msg.contains("Control connection pool error")
|| error_msg.contains("target machine actively refused"))
{
warn!("athena/scylladb unreachable");
return Err(service_unavailable(
"Athena server is not reachable",
format!(
"Connection error: {}. Ensure ScyllaDB is running on the configured port.",
error_msg
),
));
}
warn!(
client = %client_name.unwrap_or_else(|| "<default>".to_string()),
failed_query_preview = %sql_text.chars().take(100).collect::<String>(),
"failed query preview"
);
Err(internal_error(
"Query execution failed",
format!("Athena error: {}", error_msg),
))
}
}
}
async fn execute_cloudflare_d1_request(
req: &HttpRequest,
app_state: &AppState,
sql_text: String,
params: Vec<serde_json::Value>,
retry_writes: bool,
) -> Result<actix_web::HttpResponse, actix_web::HttpResponse> {
let client_name = effective_athena_client(req).ok_or_else(|| {
bad_request(
"Missing required header",
"X-Athena-Client header or tenant wildcard hostname is required when using the cloudflare-d1 driver",
)
})?;
let resolved_backend = resolve_query_backend(app_state, &client_name)
.await
.map_err(d1_resolution_error_response)?;
let Some(AthenaResolvedQueryBackend::D1 {
connection_info, ..
}) = resolved_backend
else {
return Err(bad_request(
"Cloudflare D1 client not configured",
format!(
"Client '{}' is not registered as a Cloudflare D1 backend",
client_name
),
));
};
let requested_session_mode = req
.headers()
.get(HEADER_D1_SESSION_MODE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let requested_bookmark = req
.headers()
.get(HEADER_D1_BOOKMARK)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let result = execute_query_via_proxy(
&app_state.client,
&connection_info,
&sql_text,
params,
requested_session_mode,
requested_bookmark,
retry_writes,
)
.await
.map_err(|error| service_unavailable("Cloudflare D1 query failed", error))?;
let mut response = api_success_value(
"Successfully executed query",
json!({
"rows": result.rows,
"columns": result.columns,
"driver": "cloudflare-d1",
"duration_ms": result.duration_ms,
"bookmark": result.bookmark,
"count": result.count,
"meta": result.meta,
}),
);
if let Some(bookmark) = result.bookmark
&& let Ok(value) = bookmark.parse()
{
response
.headers_mut()
.insert(HEADER_D1_BOOKMARK.parse().expect("valid header"), value);
}
Ok(response)
}
fn build_migration_error_response(message: &str, error: &str) -> actix_web::HttpResponse {
actix_web::HttpResponse::BadRequest().json(json!({
"status": "error",
"message": message,
"error": error,
}))
}
async fn execute_d1_migration_batch(
req: &HttpRequest,
app_state: &AppState,
sql_text: String,
params: Vec<serde_json::Value>,
retry_writes: bool,
) -> Result<crate::drivers::cloudflare_d1::client::D1ExecutionResult, actix_web::HttpResponse> {
let client_name = effective_athena_client(req).ok_or_else(|| {
bad_request(
"Missing required header",
"X-Athena-Client header or tenant wildcard hostname is required when using cloudflare-d1",
)
})?;
let resolved_backend = resolve_query_backend(app_state, &client_name)
.await
.map_err(d1_resolution_error_response)?;
let Some(AthenaResolvedQueryBackend::D1 {
connection_info, ..
}) = resolved_backend
else {
return Err(bad_request(
"Cloudflare D1 client not configured",
format!(
"Client '{}' is not registered as a Cloudflare D1 backend",
client_name
),
));
};
let requested_session_mode = req
.headers()
.get(HEADER_D1_SESSION_MODE)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let requested_bookmark = req
.headers()
.get(HEADER_D1_BOOKMARK)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty());
let result = execute_query_via_proxy(
&app_state.client,
&connection_info,
&sql_text,
params,
requested_session_mode,
requested_bookmark,
retry_writes,
)
.await
.map_err(|error| service_unavailable("Cloudflare D1 query failed", error))?;
Ok(result)
}
async fn handle_sql_query(
req: HttpRequest,
body: web::Json<GatewaySqlExecutionRequest>,
app_state: web::Data<AppState>,
) -> actix_web::HttpResponse {
let client_for_auth = effective_athena_client(&req);
let driver_trimmed: String = body.driver.trim().to_string();
if driver_trimmed.is_empty() || driver_trimmed.len() > MAX_SQL_DRIVER_LEN {
return bad_request(
"Invalid driver specified",
"driver must be a non-empty supported identifier",
);
}
let driver: &str = match normalize_sql_driver(&driver_trimmed) {
Some(driver) => driver,
None => {
debug!(
driver_len = driver_trimmed.len(),
"unsupported sql driver requested"
);
return bad_request(
"Invalid driver specified",
"Driver is not supported. Use athena/scylla, postgresql, supabase, cloudflare-d1, or d1/athena-d1.",
);
}
};
let schema_name = match normalize_gateway_schema_name(body.schema_name.as_deref()) {
Ok(value) => value,
Err(err) => {
return bad_request("Invalid schema_name", err);
}
};
let execution_mode = body
.execution_mode
.map(gateway_sql_execution_mode_to_transaction_mode)
.unwrap_or(PostgresSqlTransactionMode::SingleTransaction);
if schema_name.is_some() && driver != "postgresql" {
return bad_request(
"Unsupported schema_name",
"schema_name is only supported when driver is postgresql/postgres",
);
}
let auth_options = GatewayAuthOptions::default();
if let Err(resp) = require_admin_or_gateway_with_options(
&req,
app_state.get_ref(),
client_for_auth.as_deref(),
vec![query_right()],
auth_options,
)
.await
{
return resp;
}
if let Err(resp) = check_inbound_optional(
&app_state.inbound_rate_limit_raw_sql,
app_state.inbound_rate_limit_trust_x_forwarded_for,
&req,
) {
return resp;
}
let sql_text: String = body.query.clone();
if driver == "postgresql" {
let pool = match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let start_time: Instant = Instant::now();
match execute_postgres_sql_script(
&pool,
&body.query,
execution_mode,
schema_name.as_deref(),
)
.await
{
Ok(result) => {
let duration: u64 = start_time.elapsed().as_millis() as u64;
info!("postgresql query ok");
return api_success_value(
"Successfully executed query",
json!({
"rows": result.rows,
"db_name": body.db_name.clone(),
"duration_ms": duration,
"schema_name": schema_name,
"execution_mode": execution_mode,
"statement_count": result.summary.statement_count,
"rows_affected": result.summary.rows_affected,
"returned_row_count": result.summary.returned_row_count,
"statements": result.statements,
"preprocess": result.preprocess,
}),
);
}
Err(script_err) => {
if script_err.status_hint == 400 {
return actix_web::HttpResponse::BadRequest().json(json!({
"status": "error",
"message": "SQL statement execution failed",
"error": script_err.message,
"details": {
"statement_index": script_err.statement_index,
"total_statements": script_err.total_statements,
"statement": script_err.statement,
"line_start": script_err.line_start,
"line_end": script_err.line_end,
"preprocess": script_err.preprocess,
}
}));
}
if script_err.status_hint == 503 {
warn!(error = %script_err.message, "postgresql query failed due to database pool saturation");
return sql_script_error_service_unavailable_response(&script_err);
}
let sqlx_like = sqlx::Error::Protocol(script_err.message.clone());
if is_missing_relation(&sqlx_like) {
warn!(
error = %script_err.message,
"postgresql query failed (missing relation) — table/schema may be absent for this client",
);
} else {
error!(error = %script_err.message, "postgresql query failed");
}
let processed = process_sqlx_error_with_context(&sqlx_like, Some(&body.db_name));
return processed_error(processed);
}
}
}
if driver == "supabase" {
match execute_query_supabase(sql_text.clone(), body.db_name.clone()).await {
Ok(results) => {
info!("supabase query ok");
return api_ok(results);
}
Err(e) => {
error!(error = %e, "supabase query failed");
return internal_error("Query execution failed", format!("Supabase error: {}", e));
}
}
}
if driver == "cloudflare-d1" {
match execute_cloudflare_d1_request(
&req,
app_state.get_ref(),
sql_text.clone(),
body.params.clone(),
true,
)
.await
{
Ok(response) => return response,
Err(response) => return response,
}
}
match execute_scylla_request(&req, app_state.get_ref(), sql_text.clone()).await {
Ok(response) => response,
Err(response) => response,
}
}
#[post("/query/sql")]
pub async fn sql_query(
req: HttpRequest,
body: web::Json<GatewaySqlExecutionRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
handle_sql_query(req, body, app_state).await
}
#[post("/gateway/sql")]
pub async fn gateway_sql_query(
req: HttpRequest,
body: web::Json<GatewaySqlExecutionRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
handle_sql_query(req, body, app_state).await
}
#[post("/gateway/sql/d1/migrate")]
pub async fn gateway_sql_d1_migrate(
req: HttpRequest,
body: web::Json<D1MigrationRequest>,
app_state: web::Data<AppState>,
) -> impl Responder {
let client_for_auth = effective_athena_client(&req);
let auth_options = GatewayAuthOptions::default();
if let Err(resp) = require_admin_or_gateway_with_options(
&req,
app_state.get_ref(),
client_for_auth.as_deref(),
vec![query_right()],
auth_options,
)
.await
{
return resp;
}
if let Err(resp) = check_inbound_optional(
&app_state.inbound_rate_limit_raw_sql,
app_state.inbound_rate_limit_trust_x_forwarded_for,
&req,
) {
return resp;
}
if body.dialect != crate::api::gateway::contracts::D1MigrationDialect::PostgreSQL {
return build_migration_error_response(
"Unsupported migration dialect",
"Only postgresql is currently supported for D1 migration conversion",
);
}
match body.driver.trim().to_ascii_lowercase().as_str() {
"d1" | "cloudflare-d1" | "athena-d1" => {}
_ => {
return build_migration_error_response(
"Invalid migration driver",
"driver must be cloudflare-d1 (or d1/athena-d1)",
);
}
}
let conversion = d1_migration::convert_pg_schema_to_d1(&body);
if body.strict && !conversion.errors.is_empty() {
return actix_web::HttpResponse::UnprocessableEntity().json(D1MigrationPreviewResponse {
status: "error".to_string(),
original_sql: conversion.original_sql.clone(),
converted_sql: conversion.converted_sql,
statements: conversion.statements,
warnings: conversion.warnings,
errors: conversion.errors,
source_meta: conversion.source_meta,
});
}
if body.dry_run
|| conversion.statements.iter().all(|entry| {
entry.action != crate::api::gateway::contracts::D1MigrationAction::Converted
})
{
return actix_web::HttpResponse::Ok().json(D1MigrationPreviewResponse {
status: "preview".to_string(),
original_sql: conversion.original_sql.clone(),
converted_sql: conversion.converted_sql,
statements: conversion.statements,
warnings: conversion.warnings,
errors: conversion.errors,
source_meta: conversion.source_meta,
});
}
let batches =
d1_migration::build_batches_for_execution(&conversion.statements, body.batch_size);
let mut per_statement_results = Vec::new();
for batch in batches.iter() {
let result = execute_d1_migration_batch(
&req,
app_state.get_ref(),
batch.sql.clone(),
Vec::new(),
true,
)
.await;
match result {
Ok(result) => {
per_statement_results.extend(d1_migration::map_execution_results(
&conversion.statements,
std::slice::from_ref(batch),
result.duration_ms,
result.count,
));
}
Err(error) => {
return error;
}
}
}
actix_web::HttpResponse::Ok().json(D1MigrationExecutionResponse {
status: "applied".to_string(),
plan_id: Some(uuid::Uuid::new_v4().to_string()),
original_sql: conversion.original_sql,
converted_sql: conversion.converted_sql,
per_statement_results,
warnings: conversion.warnings,
errors: conversion.errors,
})
}
#[cfg(test)]
mod tests {
use super::{gateway_sql_d1_migrate, normalize_sql_driver};
use actix_web::{App, http::StatusCode, test, web};
use serde_json::{Value, json};
use crate::AppState;
use crate::api::gateway::contracts::D1MigrationDialect;
use crate::api::gateway::contracts::D1MigrationRequest;
use crate::test_support::{ATHENA_TEST_ADMIN_KEY, AthAdminKeyGuard};
#[actix_web::test]
async fn normalize_sql_driver_accepts_scylla_aliases() {
assert_eq!(normalize_sql_driver("athena"), Some("athena"));
assert_eq!(normalize_sql_driver("scylla"), Some("athena"));
assert_eq!(normalize_sql_driver("scylladb"), Some("athena"));
assert_eq!(normalize_sql_driver("postgres"), Some("postgresql"));
assert_eq!(normalize_sql_driver("supabase"), Some("supabase"));
assert_eq!(normalize_sql_driver("d1"), Some("cloudflare-d1"));
assert_eq!(normalize_sql_driver("cloudflare-d1"), Some("cloudflare-d1"));
assert_eq!(normalize_sql_driver("athena-d1"), Some("cloudflare-d1"));
assert_eq!(normalize_sql_driver("mysql"), None);
}
fn migration_request(schema_sql: &str, strict: bool, dry_run: bool) -> D1MigrationRequest {
D1MigrationRequest {
driver: "cloudflare-d1".to_string(),
db_name: "test".to_string(),
schema_sql: schema_sql.to_string(),
dialect: D1MigrationDialect::PostgreSQL,
dry_run,
strict,
batch_size: None,
files: None,
statements: None,
}
}
#[actix_web::test]
async fn migrate_route_preview_returns_original_and_converted_sql() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let request = migration_request(
"CREATE TABLE users (\n id SERIAL PRIMARY KEY,\n created_at TIMESTAMP WITH TIME ZONE\n);",
true,
true,
);
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(request.clone())
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::OK);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "preview");
assert_eq!(
payload["originalSql"].as_str(),
Some(request.schema_sql.as_str())
);
assert!(!payload["convertedSql"].as_str().unwrap_or("").is_empty());
}
#[actix_web::test]
async fn migrate_route_strict_mode_blocks_unsupported_statements() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let request = migration_request("CREATE EXTENSION IF NOT EXISTS pgcrypto;", true, true);
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(request)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "error");
let errors = payload["errors"]
.as_array()
.map(std::vec::Vec::as_slice)
.unwrap_or(&[]);
assert!(!errors.is_empty());
}
#[actix_web::test]
async fn migrate_route_exec_mode_without_d1_client_returns_executable_error() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let request = migration_request("CREATE TABLE users (id SERIAL PRIMARY KEY);", true, false);
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(request)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let payload: Value = test::read_body_json(response).await;
assert!(
payload["error"]
.as_str()
.unwrap_or("")
.contains("X-Athena-Client")
);
assert_eq!(payload["status"], "error");
}
#[actix_web::test]
async fn migrate_route_non_strict_mode_keeps_partial_plan() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let request = migration_request(
"CREATE EXTENSION IF NOT EXISTS pgcrypto; CREATE TABLE users (id INT);",
false,
true,
);
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(request)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::OK);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "preview");
assert!(
!payload["warnings"]
.as_array()
.map(std::vec::Vec::as_slice)
.unwrap_or(&[])
.is_empty()
);
assert!(
!payload["statements"]
.as_array()
.map(std::vec::Vec::as_slice)
.unwrap_or(&[])
.is_empty()
);
}
#[actix_web::test]
async fn migrate_route_rejects_invalid_driver() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let payload = json!({
"driver": "mysql",
"db_name": "test",
"schemaSql": "CREATE TABLE users (id INT);",
"dialect": "postgresql",
"dryRun": true,
"strict": true
});
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(payload)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "error");
assert!(
payload["error"]
.as_str()
.unwrap_or("")
.contains("driver must be")
);
}
#[actix_web::test]
async fn migrate_route_defaults_to_dry_run_strict_true_when_omitted() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let payload = json!({
"driver": "cloudflare-d1",
"db_name": "test",
"schemaSql": "CREATE TABLE users (id INT);"
});
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(payload)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::OK);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "preview");
}
#[actix_web::test]
async fn migrate_route_accepts_files_and_statements_payload() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let payload = json!({
"driver": "cloudflare-d1",
"db_name": "test",
"schemaSql": "",
"dialect": "postgresql",
"dryRun": true,
"strict": true,
"files": ["CREATE TABLE users (id INT)"],
"statements": ["CREATE TABLE orders (id INT)"]
});
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(payload)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::OK);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "preview");
assert!(!payload["convertedSql"].as_str().unwrap_or("").is_empty());
assert_eq!(
payload["sourceMeta"]["statementCount"]
.as_u64()
.unwrap_or(0),
2
);
}
#[actix_web::test]
async fn migrate_route_accepts_athena_d1_alias() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let payload = json!({
"driver": "athena-d1",
"dbName": "test",
"schemaSql": "CREATE TABLE users (id INT);",
"dialect": "postgresql",
"dryRun": true,
"strict": true
});
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(payload)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::OK);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "preview");
assert!(!payload["convertedSql"].as_str().unwrap_or("").is_empty());
}
#[actix_web::test]
async fn migrate_route_preview_returns_mapping_warning_codes() {
let _admin = AthAdminKeyGuard::new();
let state = web::Data::new(AppState::default());
let app = test::init_service(
App::new()
.app_data(state.clone())
.service(gateway_sql_d1_migrate),
)
.await;
let payload = json!({
"driver": "athena-d1",
"dbName": "test",
"schemaSql": "CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
external_id UUID,
payload JSONB,
rate NUMERIC(12,4),
tags TEXT[],
mode ENUM,
created_at TIMESTAMP WITHOUT TIME ZONE
);
CREATE UNIQUE INDEX CONCURRENTLY users_external_id_idx ON users(external_id);
CREATE EXTENSION IF NOT EXISTS pgcrypto;",
"dialect": "postgresql",
"dryRun": true,
"strict": false
});
let response = test::call_service(
&app,
test::TestRequest::post()
.uri("/gateway/sql/d1/migrate")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.set_json(payload)
.to_request(),
)
.await;
assert_eq!(response.status(), StatusCode::OK);
let payload: Value = test::read_body_json(response).await;
assert_eq!(payload["status"], "preview");
let warnings = payload["warnings"]
.as_array()
.map(std::vec::Vec::as_slice)
.unwrap_or(&[]);
let codes: Vec<&str> = warnings
.iter()
.map(|warning| warning["code"].as_str().unwrap_or(""))
.collect();
assert!(codes.iter().any(|code| *code == "type.serial"));
assert!(codes.iter().any(|code| *code == "type.uuid"));
assert!(codes.iter().any(|code| *code == "type.json"));
assert!(codes.iter().any(|code| *code == "type.numeric"));
assert!(codes.iter().any(|code| *code == "type.array"));
assert!(codes.iter().any(|code| *code == "type.custom"));
}
}