athena_rs 2.0.2

Database gateway API
Documentation
//! Provisioning API endpoints.
//!
//! Exposes the Athena schema provisioning pipeline as HTTP endpoints, allowing
//! callers to apply `sql/provision.sql` to a Postgres database and to check
//! whether a client's database already contains the expected schema tables.
//!
//! ## Endpoints
//!
//! - `POST /admin/provision`          – Provision a database with the Athena schema.
//! - `GET  /admin/provision/status`   – Check whether a client has the expected tables.

use actix_web::{
    HttpRequest, HttpResponse, get, post,
    web::{self, Data, Json, Query},
};
use serde::Deserialize;
use serde_json::json;
use sqlx::Row;
use sqlx::postgres::PgPoolOptions;

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{api_success, bad_request, internal_error, service_unavailable};
use crate::parser::resolve_postgres_uri;
use crate::provision_sql::PROVISION_SQL;

/// Tables that should exist after a successful provisioning run.
const EXPECTED_TABLES: &[&str] = &[
    "gateway_request_log",
    "gateway_operation_log",
    "api_keys",
    "api_key_rights",
    "api_key_right_grants",
    "api_key_config",
    "api_key_client_config",
    "api_key_auth_log",
    "athena_clients",
    "client_statistics",
    "client_table_statistics",
    "client_alert_queries",
    "query_history",
    "saved_queries",
    "ui_request_log",
];

// ── request / response types ─────────────────────────────────────────────────

#[derive(Debug, Deserialize)]
struct ProvisionRequest {
    /// Postgres connection URI to provision.
    /// Mutually exclusive with `client_name`.
    #[serde(default)]
    uri: Option<String>,

    /// Existing registered client whose Postgres URI will be resolved.
    /// Mutually exclusive with `uri`.
    #[serde(default)]
    client_name: Option<String>,

    /// When true and `uri` is provided, register the client in the runtime
    /// catalog after provisioning.
    #[serde(default)]
    register: bool,

    /// Logical name for the client when registering (defaults to
    /// `"provisioned"` if omitted).
    #[serde(default)]
    register_name: Option<String>,
}

#[derive(Debug, Deserialize)]
struct ProvisionStatusQuery {
    /// Logical client name to check.
    client_name: String,
}

// ── helpers ───────────────────────────────────────────────────────────────────

/// Resolve the Postgres URI from either a direct URI or a registered client.
fn resolve_uri(state: &AppState, req: &ProvisionRequest) -> Result<String, HttpResponse> {
    match (&req.uri, &req.client_name) {
        (Some(uri), None) => Ok(uri.clone()),
        (None, Some(client_name)) => {
            let registered = state
                .pg_registry
                .registered_client(client_name)
                .ok_or_else(|| {
                    bad_request(
                        "Unknown client",
                        format!("No Postgres client named '{}' is registered.", client_name),
                    )
                })?;

            registered
                .config_uri_template
                .as_deref()
                .map(resolve_postgres_uri)
                .or(registered.pg_uri)
                .ok_or_else(|| {
                    bad_request(
                        "Client URI unavailable",
                        format!("No Postgres URI is available for client '{}'.", client_name),
                    )
                })
        }
        (Some(_), Some(_)) => Err(bad_request(
            "Ambiguous target",
            "Provide either 'uri' or 'client_name', not both.",
        )),
        (None, None) => Err(bad_request(
            "Missing target",
            "Provide either 'uri' (direct Postgres URI) or 'client_name' (registered client).",
        )),
    }
}

/// Run the provisioning SQL against a Postgres URI.
///
/// Returns the number of statements executed.
async fn run_provision(pg_uri: &str) -> Result<usize, String> {
    let pool = PgPoolOptions::new()
        .max_connections(1)
        .connect(pg_uri)
        .await
        .map_err(|e| format!("Failed to connect to Postgres: {e}"))?;

    let statements: Vec<&str> = PROVISION_SQL
        .split(';')
        .map(str::trim)
        .filter(|s: &&str| !s.is_empty() && !s.starts_with("--"))
        .collect();

    let total = statements.len();
    for (i, statement) in statements.iter().enumerate() {
        sqlx::query(statement).execute(&pool).await.map_err(|e| {
            let preview_len = statement.len().min(120);
            let ellipsis = if statement.len() > 120 { "" } else { "" };
            format!(
                "Statement {}/{} failed: {}{}{}",
                i + 1,
                total,
                &statement[..preview_len],
                ellipsis,
                e
            )
        })?;
    }

    Ok(total)
}

// ── handlers ─────────────────────────────────────────────────────────────────

/// Provision a Postgres database with the full Athena schema.
///
/// Accepts either a direct `uri` or a registered `client_name`.  When
/// `register` is true and a direct `uri` was provided, the client is added
/// to the runtime catalog after provisioning.
///
/// # Request body
/// ```json
/// { "uri": "postgres://user:pass@host/db" }
/// ```
/// or
/// ```json
/// { "client_name": "athena_logging" }
/// ```
#[post("/admin/provision")]
pub async fn admin_provision(
    req: HttpRequest,
    state: Data<AppState>,
    body: Json<ProvisionRequest>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let pg_uri = match resolve_uri(state.get_ref(), &body) {
        Ok(uri) => uri,
        Err(resp) => return resp,
    };

    let statement_count = match run_provision(&pg_uri).await {
        Ok(n) => n,
        Err(err) => return internal_error("Provisioning failed", err),
    };

    // Optionally register the client in the runtime catalog.
    let registered_name = if body.register && body.uri.is_some() {
        let name = body
            .register_name
            .clone()
            .unwrap_or_else(|| "provisioned".to_string());

        use crate::drivers::postgresql::sqlx_driver::ClientConnectionTarget;
        let target = ClientConnectionTarget {
            client_name: name.clone(),
            source: "database".to_string(),
            description: Some("Provisioned via API".to_string()),
            pg_uri: Some(pg_uri.clone()),
            pg_uri_env_var: None,
            config_uri_template: None,
            is_active: true,
            is_frozen: false,
        };

        if let Err(err) = state.pg_registry.upsert_client(target).await {
            tracing::warn!(client = %name, error = %err, "provisioned but failed to register client");
        }

        Some(name)
    } else {
        None
    };

    let client_label = body
        .client_name
        .clone()
        .or(registered_name.clone())
        .unwrap_or_else(|| "direct".to_string());

    api_success(
        format!("Provisioned {} statements", statement_count),
        json!({
            "statements_executed": statement_count,
            "client": client_label,
            "registered": registered_name,
            "tables": EXPECTED_TABLES,
        }),
    )
}

/// Check whether a client's database contains the expected Athena schema
/// tables.
///
/// # Query parameters
/// - `client_name` – the registered client to check.
///
/// # Response
/// ```json
/// {
///   "status": "success",
///   "message": "Provisioning status for 'athena_logging'",
///   "data": {
///     "client_name": "athena_logging",
///     "provisioned": true,
///     "present_tables": [...],
///     "missing_tables": []
///   }
/// }
/// ```
#[get("/admin/provision/status")]
pub async fn admin_provision_status(
    req: HttpRequest,
    state: Data<AppState>,
    query: Query<ProvisionStatusQuery>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let pool = state
        .pg_registry
        .get_pool(&query.client_name)
        .ok_or_else(|| {
            service_unavailable(
                "Client unavailable",
                format!("Postgres client '{}' is not connected.", query.client_name),
            )
        });

    let pool = match pool {
        Ok(p) => p,
        Err(resp) => return resp,
    };

    let rows = match sqlx::query(
        "SELECT table_name::text FROM information_schema.tables WHERE table_schema = 'public'",
    )
    .fetch_all(&pool)
    .await
    {
        Ok(rows) => rows,
        Err(err) => return internal_error("Failed to query tables", err.to_string()),
    };

    let existing_tables: Vec<String> = rows
        .iter()
        .filter_map(|row| row.try_get::<String, _>("table_name").ok())
        .collect();

    let present: Vec<&str> = EXPECTED_TABLES
        .iter()
        .filter(|t| existing_tables.iter().any(|e| e == **t))
        .copied()
        .collect();

    let missing: Vec<&str> = EXPECTED_TABLES
        .iter()
        .filter(|t| !existing_tables.iter().any(|e| e == **t))
        .copied()
        .collect();

    let provisioned = missing.is_empty();

    api_success(
        format!("Provisioning status for '{}'", query.client_name),
        json!({
            "client_name": query.client_name,
            "provisioned": provisioned,
            "present_tables": present,
            "missing_tables": missing,
        }),
    )
}

pub fn services(cfg: &mut web::ServiceConfig) {
    cfg.service(admin_provision).service(admin_provision_status);
}