use actix_web::{HttpRequest, HttpResponse, Responder, get, web};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::postgres::PgRow;
use sqlx::{Pool, Postgres, Row};
use web::{Data, Query};
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::{pool_for_client, required_client_name};
use crate::api::response::{bad_request, internal_error, processed_error};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::parser::query_builder::sanitize_identifier;
#[derive(Serialize)]
struct SchemaClients {
clients: Vec<String>,
}
#[derive(Serialize)]
pub struct SchemaTable {
table_schema: String,
table_name: String,
}
#[derive(Serialize)]
struct SchemaTableWithColumns {
table_name: String,
columns: Vec<SchemaColumn>,
}
#[derive(Serialize)]
struct SchemaColumn {
column_name: String,
data_type: Option<String>,
column_default: Option<String>,
is_nullable: Option<String>,
}
#[derive(Serialize)]
struct SchemaConstraint {
constraint_name: String,
columns: Vec<String>,
}
#[derive(Serialize)]
struct SchemaOverview {
schema_name: String,
tables: Vec<SchemaTableWithColumns>,
}
#[derive(Deserialize)]
struct SchemaQuery {
#[serde(default = "default_schema_name")]
schema_name: String,
}
fn default_schema_name() -> String {
"public".to_string()
}
#[derive(Serialize)]
struct SchemaMigration {
version: Option<String>,
name: Option<String>,
}
#[get("/schema/clients")]
async fn schema_clients(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = validate_athena_key_12(&req) {
return resp;
}
let clients: Vec<String> = app_state.pg_registry.list_clients();
HttpResponse::Ok().json(SchemaClients { clients })
}
fn validate_athena_key_12(req: &HttpRequest) -> Result<(), HttpResponse> {
authorize_static_admin_key(req)
}
#[get("/clients")]
async fn list_clients_protected(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
if let Err(resp) = validate_athena_key_12(&req) {
return resp;
}
let clients: Vec<String> = app_state.pg_registry.list_clients();
HttpResponse::Ok().json(SchemaClients { clients })
}
#[get("/schema")]
async fn schema_overview(
req: HttpRequest,
app_state: Data<AppState>,
query: Query<SchemaQuery>,
) -> impl Responder {
let schema_name: &str = query.schema_name.trim();
if schema_name.is_empty() || sanitize_identifier(schema_name).is_none() {
return bad_request("Invalid request", "Invalid schema_name parameter");
}
let client_name: String = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows: Vec<PgRow> = match sqlx::query(
r#"
SELECT
t.table_name,
c.column_name,
c.data_type,
c.column_default,
c.is_nullable
FROM information_schema.tables AS t
LEFT JOIN information_schema.columns AS c
ON c.table_schema = t.table_schema
AND c.table_name = t.table_name
WHERE t.table_type = 'BASE TABLE'
AND t.table_schema = $1
ORDER BY t.table_name, c.ordinal_position
"#,
)
.bind(schema_name)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => {
let processed = process_sqlx_error_with_context(&err, Some(schema_name));
return processed_error(processed);
}
};
let mut tables: Vec<SchemaTableWithColumns> = Vec::new();
for row in rows {
let table_name: String = row.get("table_name");
let needs_new_table: bool = tables
.last()
.map(|table| table.table_name != table_name)
.unwrap_or(true);
if needs_new_table {
tables.push(SchemaTableWithColumns {
table_name: table_name.clone(),
columns: Vec::new(),
});
}
let column_name: Option<String> = row.get("column_name");
if let Some(column_name) = column_name
&& let Some(current_table) = tables.last_mut()
{
current_table.columns.push(SchemaColumn {
column_name,
data_type: row.get("data_type"),
column_default: row.get("column_default"),
is_nullable: row.get("is_nullable"),
});
}
}
HttpResponse::Ok().json(SchemaOverview {
schema_name: schema_name.to_string(),
tables,
})
}
#[get("/schema/tables")]
async fn schema_tables(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
let client_name: String = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows: Vec<PgRow> = match sqlx::query(
r#"
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY table_schema, table_name
"#,
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => {
return internal_error(
"Failed to fetch tables",
format!("Failed to fetch tables: {}", err),
);
}
};
let tables: Vec<SchemaTable> = rows
.into_iter()
.map(|row| SchemaTable {
table_schema: row.get("table_schema"),
table_name: row.get("table_name"),
})
.collect();
HttpResponse::Ok().json(json!({ "tables": tables }))
}
#[derive(serde::Deserialize)]
struct ColumnQuery {
table_name: String,
#[serde(default)]
table_schema: Option<String>,
}
#[derive(serde::Deserialize)]
struct ConstraintQuery {
table_name: String,
#[serde(default)]
table_schema: Option<String>,
}
#[get("/schema/columns")]
async fn schema_columns(
req: HttpRequest,
app_state: Data<AppState>,
query: Query<ColumnQuery>,
) -> impl Responder {
let table_name_trimmed = query.table_name.trim();
if table_name_trimmed.is_empty() {
return bad_request("Invalid request", "table_name is required");
}
if sanitize_identifier(table_name_trimmed).is_none() {
return bad_request("Invalid request", "Invalid table_name parameter");
}
if let Some(ref schema) = query.table_schema {
let s = schema.trim();
if s.is_empty() || sanitize_identifier(s).is_none() {
return bad_request("Invalid request", "Invalid table_schema parameter");
}
}
let client_name: String = match required_client_name(&req) {
Ok(value) => value,
Err(_) => {
return bad_request(
"X-Athena-Client header is required",
"Missing required header",
);
}
};
let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows: Vec<PgRow> = match query.table_schema.as_ref() {
Some(schema) => {
match sqlx::query(
r#"
SELECT column_name, data_type, column_default, is_nullable
FROM information_schema.columns
WHERE table_name = $1 AND table_schema = $2
ORDER BY ordinal_position
"#,
)
.bind(table_name_trimmed)
.bind(schema.trim())
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => {
return internal_error(
"Failed to fetch columns",
format!("Failed to fetch columns: {}", err),
);
}
}
}
None => {
match sqlx::query(
r#"
SELECT column_name, data_type, column_default, is_nullable
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position
"#,
)
.bind(table_name_trimmed)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => {
return internal_error(
"Failed to fetch columns",
format!("Failed to fetch columns: {}", err),
);
}
}
}
};
let columns: Vec<SchemaColumn> = rows
.into_iter()
.map(|row| SchemaColumn {
column_name: row.get("column_name"),
data_type: row.get("data_type"),
column_default: row.get("column_default"),
is_nullable: row.get("is_nullable"),
})
.collect();
HttpResponse::Ok().json(json!({ "columns": columns }))
}
#[get("/schema/constraints")]
async fn schema_constraints(
req: HttpRequest,
app_state: Data<AppState>,
query: Query<ConstraintQuery>,
) -> impl Responder {
let table_name_trimmed: &str = query.table_name.trim();
if table_name_trimmed.is_empty() {
return bad_request("Invalid request", "table_name is required");
}
if sanitize_identifier(table_name_trimmed).is_none() {
return bad_request("Invalid request", "Invalid table_name parameter");
}
let schema_name: String = query
.table_schema
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.unwrap_or("public")
.to_string();
if sanitize_identifier(&schema_name).is_none() {
return bad_request("Invalid request", "Invalid table_schema parameter");
}
let client_name: String = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let rows: Vec<PgRow> = match sqlx::query(
r#"
SELECT tc.constraint_name, kcu.column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
AND tc.table_name = kcu.table_name
WHERE tc.constraint_type = 'UNIQUE'
AND tc.table_name = $1
AND tc.table_schema = $2
ORDER BY tc.constraint_name, kcu.ordinal_position
"#,
)
.bind(table_name_trimmed)
.bind(&schema_name)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(err) => {
return internal_error(
"Failed to fetch constraints",
format!("Failed to fetch constraints: {}", err),
);
}
};
let mut constraints: Vec<SchemaConstraint> = Vec::new();
for row in rows {
let constraint_name: String = row.get("constraint_name");
let column_name: String = row.get("column_name");
if constraints
.last()
.map(|entry| entry.constraint_name != constraint_name)
.unwrap_or(true)
{
constraints.push(SchemaConstraint {
constraint_name: constraint_name.clone(),
columns: Vec::new(),
});
}
if let Some(current) = constraints.last_mut() {
current.columns.push(column_name);
}
}
HttpResponse::Ok().json(json!({ "constraints": constraints }))
}
#[get("/schema/migrations")]
async fn schema_migrations(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
let client_name: String = match required_client_name(&req) {
Ok(value) => value,
Err(resp) => return resp,
};
let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let result: Result<Vec<PgRow>, sqlx::Error> = sqlx::query(
r#"
SELECT version, name
FROM supabase_migrations.schema_migrations
ORDER BY version
"#,
)
.fetch_all(&pool)
.await;
match result {
Ok(rows) => {
let migrations: Vec<SchemaMigration> = rows
.into_iter()
.map(|row: PgRow| SchemaMigration {
version: row.get("version"),
name: row.get("name"),
})
.collect();
HttpResponse::Ok().json(json!({ "migrations": migrations }))
}
Err(err) => {
let code: Option<String> = err
.as_database_error()
.and_then(|db_err| db_err.code().map(|c| c.to_string()));
if matches!(code.as_deref(), Some("42P01") | Some("3F000")) {
return HttpResponse::Ok().json(json!({
"migrations": [],
"message": "Migration tracking is not available for this database."
}));
}
let processed = process_sqlx_error_with_context(&err, Some("schema_migrations"));
processed_error(processed)
}
}
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(schema_clients)
.service(list_clients_protected)
.service(schema_overview)
.service(schema_tables)
.service(schema_columns)
.service(schema_constraints)
.service(schema_migrations);
}