athena_rs 3.3.0

Database gateway API
Documentation
use actix_web::{HttpRequest, HttpResponse, Responder, get, web};
use serde::Serialize;
use serde_json::json;
use sqlx::postgres::PgRow;
use sqlx::{Pool, Postgres, Row};
use web::{Data, Query};

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::{pool_for_client, required_client_name};
use crate::api::response::{bad_request, processed_error};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::parser::query_builder::sanitize_identifier;

#[derive(Serialize)]
/// Payload returned by `/schema/clients`.
struct SchemaClients {
    clients: Vec<String>,
}

#[derive(Serialize)]
/// Lightweight table metadata (schema + table name) returned by `/schema/tables`.
pub struct SchemaTable {
    table_schema: String,
    table_name: String,
}

#[derive(Serialize)]
/// Column metadata returned by `/schema/columns`.
struct SchemaColumn {
    column_name: String,
    data_type: Option<String>,
    column_default: Option<String>,
    is_nullable: Option<String>,
}

#[derive(Serialize)]
/// Migration record returned by `/schema/migrations` (Supabase-style schema_migrations).
struct SchemaMigration {
    version: Option<String>,
    name: Option<String>,
}

#[get("/schema/clients")]
/// Returns an array of configured Postgres clients for selection by callers.
///
/// Protected by ATHENA_KEY_12. Use `?api_key=`, `Authorization: Bearer <key>`, or
/// `X-Athena-Key: <key>`.
async fn schema_clients(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
    if let Err(resp) = validate_athena_key_12(&req) {
        return resp;
    }
    let clients: Vec<String> = app_state.pg_registry.list_clients();
    HttpResponse::Ok().json(SchemaClients { clients })
}

/// Extracts and validates ATHENA_KEY_12 for protected endpoints.
fn validate_athena_key_12(req: &HttpRequest) -> Result<(), HttpResponse> {
    authorize_static_admin_key(req)
}

#[get("/clients")]
/// Returns an array of configured Athena/Postgres clients.
///
/// Protected by ATHENA_KEY_12. Use `?api_key=`, `Authorization: Bearer <key>`, or
/// `X-Athena-Key: <key>`.
async fn list_clients_protected(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
    if let Err(resp) = validate_athena_key_12(&req) {
        return resp;
    }
    let clients: Vec<String> = app_state.pg_registry.list_clients();
    HttpResponse::Ok().json(SchemaClients { clients })
}

#[get("/schema/tables")]
/// Lists all tables visible to the supplied `X-Athena-Client`.
///
/// Requires `X-Athena-Client` to look up the appropriate Postgres pool before querying.
///
/// The handler queries `information_schema.tables`, excludes Postgres system schemas,
/// and returns `{ "tables": [ { table_schema, table_name }, ... ] }`.
async fn schema_tables(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
    let client_name: String = match required_client_name(&req) {
        Ok(value) => value,
        Err(resp) => return resp,
    };

    let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let rows: Vec<PgRow> = match sqlx::query(
        r#"
        SELECT table_schema, table_name
        FROM information_schema.tables
        WHERE table_type = 'BASE TABLE'
          AND table_schema NOT IN ('information_schema', 'pg_catalog')
        ORDER BY table_schema, table_name
        "#,
    )
    .fetch_all(&pool)
    .await
    {
        Ok(rows) => rows,
        Err(err) => {
            return HttpResponse::InternalServerError().json(json!({
                "error": format!("Failed to fetch tables: {}", err)
            }));
        }
    };

    let tables: Vec<SchemaTable> = rows
        .into_iter()
        .map(|row| SchemaTable {
            table_schema: row.get("table_schema"),
            table_name: row.get("table_name"),
        })
        .collect();

    HttpResponse::Ok().json(json!({ "tables": tables }))
}

#[derive(serde::Deserialize)]
/// Query parameters accepted by `/schema/columns`.
struct ColumnQuery {
    table_name: String,
    /// Optional. When provided, columns are restricted to this schema (avoids ambiguity when multiple schemas have a table with the same name).
    #[serde(default)]
    table_schema: Option<String>,
}

#[get("/schema/columns")]
/// Returns column metadata for `table_name` using `X-Athena-Client`.
///
/// Requires `X-Athena-Client` header. Optional `table_schema` query parameter
/// restricts results to that schema; otherwise all tables named `table_name`
/// in any schema are considered (first match by ordinal).
///
/// Validates `table_name` (and `table_schema` when provided), fetches from
/// `information_schema.columns`, and returns `{ "columns": [ ... ] }` with
/// `column_name`, `data_type`, `column_default`, `is_nullable`.
async fn schema_columns(
    req: HttpRequest,
    app_state: Data<AppState>,
    query: Query<ColumnQuery>,
) -> impl Responder {
    let table_name_trimmed = query.table_name.trim();
    if table_name_trimmed.is_empty() {
        return bad_request("Invalid request", "table_name is required");
    }
    if sanitize_identifier(table_name_trimmed).is_none() {
        return bad_request("Invalid request", "Invalid table_name parameter");
    }
    if let Some(ref schema) = query.table_schema {
        let s = schema.trim();
        if s.is_empty() || sanitize_identifier(s).is_none() {
            return bad_request("Invalid request", "Invalid table_schema parameter");
        }
    }

    let client_name: String = match required_client_name(&req) {
        Ok(value) => value,
        Err(_) => {
            return bad_request(
                "X-Athena-Client header is required",
                "Missing required header",
            );
        }
    };

    let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let rows: Vec<PgRow> = match query.table_schema.as_ref() {
        Some(schema) => {
            match sqlx::query(
                r#"
                SELECT column_name, data_type, column_default, is_nullable
                FROM information_schema.columns
                WHERE table_name = $1 AND table_schema = $2
                ORDER BY ordinal_position
                "#,
            )
            .bind(table_name_trimmed)
            .bind(schema.trim())
            .fetch_all(&pool)
            .await
            {
                Ok(rows) => rows,
                Err(err) => {
                    return HttpResponse::InternalServerError().json(json!({
                        "error": format!("Failed to fetch columns: {}", err)
                    }));
                }
            }
        }
        None => {
            match sqlx::query(
                r#"
                SELECT column_name, data_type, column_default, is_nullable
                FROM information_schema.columns
                WHERE table_name = $1
                ORDER BY ordinal_position
                "#,
            )
            .bind(table_name_trimmed)
            .fetch_all(&pool)
            .await
            {
                Ok(rows) => rows,
                Err(err) => {
                    return HttpResponse::InternalServerError().json(json!({
                        "error": format!("Failed to fetch columns: {}", err)
                    }));
                }
            }
        }
    };

    let columns: Vec<SchemaColumn> = rows
        .into_iter()
        .map(|row| SchemaColumn {
            column_name: row.get("column_name"),
            data_type: row.get("data_type"),
            column_default: row.get("column_default"),
            is_nullable: row.get("is_nullable"),
        })
        .collect();

    HttpResponse::Ok().json(json!({ "columns": columns }))
}

#[get("/schema/migrations")]
/// Lists migration records from `supabase_migrations.schema_migrations` when available.
///
/// Requires `X-Athena-Client`. If the migrations table does not exist (e.g. non-Supabase
/// or fresh database), returns `{ "migrations": [], "message": "..." }` as a graceful
/// fallback instead of an error.
async fn schema_migrations(req: HttpRequest, app_state: Data<AppState>) -> impl Responder {
    let client_name: String = match required_client_name(&req) {
        Ok(value) => value,
        Err(resp) => return resp,
    };

    let pool: Pool<Postgres> = match pool_for_client(app_state.get_ref(), &client_name) {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let result = sqlx::query(
        r#"
        SELECT version, name
        FROM supabase_migrations.schema_migrations
        ORDER BY version
        "#,
    )
    .fetch_all(&pool)
    .await;

    match result {
        Ok(rows) => {
            let migrations: Vec<SchemaMigration> = rows
                .into_iter()
                .map(|row: PgRow| SchemaMigration {
                    version: row.get("version"),
                    name: row.get("name"),
                })
                .collect();
            HttpResponse::Ok().json(json!({ "migrations": migrations }))
        }
        Err(err) => {
            // Graceful fallback: when the migrations table/schema does not exist, return empty
            let code: Option<String> = err
                .as_database_error()
                .and_then(|db_err| db_err.code().map(|c| c.to_string()));
            if matches!(code.as_deref(), Some("42P01") | Some("3F000")) {
                return HttpResponse::Ok().json(json!({
                    "migrations": [],
                    "message": "Migration tracking is not available for this database."
                }));
            }
            let processed = process_sqlx_error_with_context(&err, Some("schema_migrations"));
            processed_error(processed)
        }
    }
}

/// Registers the schema endpoints with the Actix router.
pub fn services(cfg: &mut web::ServiceConfig) {
    cfg.service(schema_clients)
        .service(list_clients_protected)
        .service(schema_tables)
        .service(schema_columns)
        .service(schema_migrations);
}