athena_rs 0.75.4

WIP Database API gateway
Documentation
use actix_web::{HttpRequest, HttpResponse, Responder, get, web};
use serde::Serialize;
use serde_json::json;
use sqlx::Row;

use crate::AppState;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use crate::parser::query_builder::sanitize_identifier;

#[derive(Serialize)]
struct SchemaClients {
    clients: Vec<String>,
}

#[derive(Serialize)]
pub struct SchemaTable {
    table_schema: String,
    table_name: String,
}

#[derive(Serialize)]
struct SchemaColumn {
    column_name: String,
    data_type: Option<String>,
    column_default: Option<String>,
    is_nullable: Option<String>,
}

fn client_header(req: &HttpRequest) -> Result<String, HttpResponse> {
    req.headers()
        .get("x-athena-client")
        .and_then(|value| value.to_str().ok())
        .filter(|value| !value.is_empty())
        .map(ToString::to_string)
        .ok_or_else(|| {
            HttpResponse::BadRequest().json(json!({
                "error": "Missing X-Athena-Client header"
            }))
        })
}

fn pool_for_client(
    client_name: &str,
    registry: &PostgresClientRegistry,
) -> Result<sqlx::postgres::PgPool, HttpResponse> {
    registry.get_pool(client_name).ok_or_else(|| {
        HttpResponse::BadRequest().json(json!({
            "error": format!("Unknown Postgres client '{}'", client_name)
        }))
    })
}

#[get("/schema/clients")]
async fn schema_clients(app_state: web::Data<AppState>) -> impl Responder {
    let clients = app_state.pg_registry.list_clients();
    HttpResponse::Ok().json(SchemaClients { clients })
}

#[get("/schema/tables")]
async fn schema_tables(req: HttpRequest, app_state: web::Data<AppState>) -> impl Responder {
    let client_name = match client_header(&req) {
        Ok(value) => value,
        Err(resp) => return resp,
    };

    let pool = match pool_for_client(&client_name, &app_state.pg_registry) {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let rows = match sqlx::query(
        r#"
        SELECT table_schema, table_name
        FROM information_schema.tables
        WHERE table_type = 'BASE TABLE'
          AND table_schema NOT IN ('information_schema', 'pg_catalog')
        ORDER BY table_schema, table_name
        "#,
    )
    .fetch_all(&pool)
    .await
    {
        Ok(rows) => rows,
        Err(err) => {
            return HttpResponse::InternalServerError().json(json!({
                "error": format!("Failed to fetch tables: {}", err)
            }));
        }
    };

    let tables: Vec<SchemaTable> = rows
        .into_iter()
        .map(|row| SchemaTable {
            table_schema: row.get("table_schema"),
            table_name: row.get("table_name"),
        })
        .collect();

    HttpResponse::Ok().json(json!({ "tables": tables }))
}

#[derive(serde::Deserialize)]
struct ColumnQuery {
    table_name: String,
}

#[get("/schema/columns")]
async fn schema_columns(
    req: HttpRequest,
    app_state: web::Data<AppState>,
    query: web::Query<ColumnQuery>,
) -> impl Responder {
    let client_name = match client_header(&req) {
        Ok(value) => value,
        Err(resp) => return resp,
    };

    let pool = match pool_for_client(&client_name, &app_state.pg_registry) {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let table_identifier = match sanitize_identifier(&query.table_name) {
        Some(value) => value,
        None => {
            return HttpResponse::BadRequest().json(json!({
                "error": "Invalid table_name parameter"
            }));
        }
    };

    let rows = match sqlx::query(
        r#"
        SELECT column_name, data_type, column_default, is_nullable
        FROM information_schema.columns
        WHERE table_name = $1
        ORDER BY ordinal_position
        "#,
    )
    .bind(table_identifier)
    .fetch_all(&pool)
    .await
    {
        Ok(rows) => rows,
        Err(err) => {
            return HttpResponse::InternalServerError().json(json!({
                "error": format!("Failed to fetch columns: {}", err)
            }));
        }
    };

    let columns: Vec<SchemaColumn> = rows
        .into_iter()
        .map(|row| SchemaColumn {
            column_name: row.get("column_name"),
            data_type: row.get("data_type"),
            column_default: row.get("column_default"),
            is_nullable: row.get("is_nullable"),
        })
        .collect();

    HttpResponse::Ok().json(json!({ "columns": columns }))
}

pub fn services(cfg: &mut web::ServiceConfig) {
    cfg.service(schema_clients)
        .service(schema_tables)
        .service(schema_columns);
}