athena_rs 3.26.1

Hyper performant polyglot Database driver
Documentation
//! Client and pool resolution for API handlers.
//!
//! Use [`required_client_name`] + [`pool_for_client`] or the combined
//! [`client_name_and_pool`] when a handler needs both the client name and a Postgres pool.

use actix_web::{HttpRequest, HttpResponse};
use sqlx::postgres::PgPool;

use crate::AppState;
use crate::api::headers::request_context::resolved_athena_client;
use crate::api::response::{bad_request, postgres_client_not_configured, service_unavailable};
use crate::athena::postgres_clients::{
    catalog_client_has_database_connection, ensure_catalog_database_client_loaded,
};
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;

/// Canonical request header name used to select a logical Athena client.
pub const ATHENA_CLIENT_HEADER: &str = "X-Athena-Client";

/// Returns the client name from `X-Athena-Client`, or from wildcard host routing context.
pub fn required_client_name(req: &HttpRequest) -> Result<String, HttpResponse> {
    if let Some(client_name) = resolved_athena_client(req)
        .map(|value| value.trim().to_string())
        .filter(|value| !value.is_empty())
    {
        return Ok(client_name);
    }

    req.headers()
        .get("x-athena-client")
        .and_then(|value| value.to_str().ok())
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(ToString::to_string)
        .ok_or_else(|| {
            bad_request(
                "Missing required header",
                format!(
                    "{} header or tenant wildcard hostname is required",
                    ATHENA_CLIENT_HEADER
                ),
            )
        })
}

/// Returns the Postgres pool for the given client name or an error response.
pub async fn pool_for_client(state: &AppState, client_name: &str) -> Result<PgPool, HttpResponse> {
    if let Some(pool) = state.pg_registry.get_pool(client_name) {
        return Ok(pool);
    }

    match ensure_catalog_database_client_loaded(state, client_name).await {
        Ok(Some(_)) => {
            if let Some(pool) = state.pg_registry.get_pool(client_name) {
                return Ok(pool);
            }
        }
        Ok(None) => {}
        Err(err) => {
            return Err(service_unavailable(
                "Client catalog unavailable",
                format!("Failed to resolve catalog-backed Postgres client: {}", err),
            ));
        }
    }

    if let Some(registered_client) = state.pg_registry.registered_client(client_name) {
        return Err(unavailable_registered_client_response(
            client_name,
            &registered_client,
        ));
    }

    Err(postgres_client_not_configured(client_name))
}

/// Returns `Some(pool)` for Postgres-backed clients and `None` for non-Postgres/unknown clients.
///
/// Unlike [`pool_for_client`], this helper is intended for mixed backend routes
/// that may legitimately fall back to Supabase when no Postgres client exists.
/// When a Postgres-backed client exists but is inactive, frozen, or temporarily
/// unavailable, the corresponding error response is still returned.
pub async fn optional_pool_for_client(
    state: &AppState,
    client_name: &str,
) -> Result<Option<PgPool>, HttpResponse> {
    let trimmed = client_name.trim();
    if trimmed.is_empty() {
        return Ok(None);
    }

    if state.pg_registry.registered_client(trimmed).is_some() {
        return pool_for_client(state, trimmed).await.map(Some);
    }

    match catalog_client_has_database_connection(state, trimmed).await {
        Ok(true) => pool_for_client(state, trimmed).await.map(Some),
        Ok(false) => Ok(None),
        Err(err) => Err(service_unavailable(
            "Client catalog unavailable",
            format!("Failed to resolve catalog-backed Postgres client: {}", err),
        )),
    }
}

/// Resolves both client name and pool in one call (e.g. for gateway handlers).
///
/// Equivalent to `required_client_name(req)` followed by [`pool_for_client`].
pub async fn client_name_and_pool(
    req: &HttpRequest,
    state: &AppState,
) -> Result<(String, PgPool), HttpResponse> {
    let name: String = required_client_name(req)?;
    let pool: PgPool = pool_for_client(state, &name).await?;
    Ok((name, pool))
}

/// Resolves the required Postgres pool selected by `X-Athena-Client`.
///
/// Use this helper when a route only needs the pool and not the logical client
/// name itself.
pub async fn required_client_pool(
    req: &HttpRequest,
    state: &AppState,
) -> Result<PgPool, HttpResponse> {
    let (_, pool) = client_name_and_pool(req, state).await?;
    Ok(pool)
}

/// Resolves the full registered-client metadata for `client_name`.
///
/// This helper is useful for handlers that need client flags (`is_active`,
/// `is_frozen`, provider metadata) in addition to the live pool.
pub fn registered_client_for(
    state: &AppState,
    client_name: &str,
) -> Result<RegisteredClient, HttpResponse> {
    state
        .pg_registry
        .registered_client(client_name)
        .ok_or_else(|| {
            bad_request(
                "Unknown client",
                format!("Client '{}' is not registered.", client_name),
            )
        })
}

/// Resolves the configured logging client pool used by Athena catalog/audit features.
///
/// Returns a `503` response when logging is not configured or currently disconnected.
pub fn logging_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
    let (_, pool) = logging_client_name_and_pool(state)?;
    Ok(pool)
}

/// Resolves both logging client name and pool.
///
/// Returns a `503` response when logging is not configured or currently disconnected.
pub fn logging_client_name_and_pool(state: &AppState) -> Result<(String, PgPool), HttpResponse> {
    let Some(client_name) = state.logging_client_name.as_ref() else {
        return Err(service_unavailable(
            "Logging store unavailable",
            "No athena logging client is configured.",
        ));
    };

    let pool = state.pg_registry.get_pool(client_name).ok_or_else(|| {
        service_unavailable(
            "Logging store unavailable",
            format!("Logging client '{}' is not connected.", client_name),
        )
    })?;

    Ok((client_name.clone(), pool))
}

/// Resolves the configured auth client pool used by gateway API-key/auth storage.
///
/// Returns a `503` response when auth storage is not configured or disconnected.
pub fn auth_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
    let Some(client_name) = state.gateway_auth_client_name.as_ref() else {
        return Err(service_unavailable(
            "Auth store unavailable",
            "No gateway auth client is configured.",
        ));
    };

    state.pg_registry.get_pool(client_name).ok_or_else(|| {
        service_unavailable(
            "Auth store unavailable",
            format!("Gateway auth client '{}' is not connected.", client_name),
        )
    })
}

/// Resolves the request-scoped auth pool selected by `X-Athena-Client`.
///
/// Use this for surfaces where Athena Auth tables are expected to live inside
/// the same client database as the request-bound data plane.
pub async fn request_auth_pool(
    req: &HttpRequest,
    state: &AppState,
) -> Result<PgPool, HttpResponse> {
    let client_name = required_client_name(req)?;
    pool_for_client(state, &client_name).await
}

/// Resolved [`auth_pool`] without building an HTTP error (for background tasks).
pub fn try_auth_pool(state: &AppState) -> Option<PgPool> {
    let client_name: &String = state.gateway_auth_client_name.as_ref()?;
    state.pg_registry.get_pool(client_name)
}

/// Builds a client-unavailable HTTP response based on registered client state flags.
///
/// This keeps caller-facing error semantics consistent between inactive, frozen,
/// and temporarily disconnected client states.
fn unavailable_registered_client_response(
    client_name: &str,
    registered_client: &RegisteredClient,
) -> HttpResponse {
    if !registered_client.is_active {
        return bad_request(
            format!("Client '{}' is inactive", client_name),
            "Postgres client is configured but inactive",
        );
    }

    if registered_client.is_frozen {
        return bad_request(
            format!("Client '{}' is frozen", client_name),
            "Postgres client is configured but frozen",
        );
    }

    service_unavailable(
        format!(
            "Client '{}' is configured but currently unavailable",
            client_name
        ),
        "Postgres client is configured but has no active connection pool",
    )
}