athena_rs 2.9.1

Database gateway API
Documentation
//! Application bootstrap: shared `AppState`, Postgres registry, and pipeline registry.

mod postgres_init;

use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use crate::AppState;
use crate::api::metrics::MetricsState;
use crate::api::pipelines::{PipelineDefinition, load_registry_from_path};
use crate::config::Config;
use crate::data::clients::{
    SaveAthenaClientParams, list_athena_clients, refresh_client_statistics, upsert_athena_client,
};
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry;

use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
use postgres_init::connection_pool_manager_from_env;

pub use postgres_init::{
    CatalogClientStep, PostgresCatalogMergeReport, client_connection_targets_from_config,
    failed_config_keys_from_errors, merge_athena_clients_from_records,
    merge_catalog_targets_into_registry, plan_catalog_client_step,
    postgres_registry_entries_from_targets,
};

/// Shared state produced by the configuration bootstrap.
pub struct Bootstrap {
    /// Shared Actix state.
    pub app_state: Data<AppState>,
    /// Optional pipeline registry loaded from disk.
    pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}

/// Builds caches, HTTP clients, and the shared AppState that both the server and CLI use.
pub async fn build_shared_state(config: &Config, pipelines_path: &str) -> Result<Bootstrap> {
    let cache_ttl: u64 = config
        .get_cache_ttl()
        .ok_or_else(|| anyhow!("No cache TTL configured"))?
        .parse::<u64>()
        .context("parsing cache_ttl")?;

    let pool_idle_timeout: u64 = config
        .get_pool_idle_timeout()
        .ok_or_else(|| anyhow!("No pool idle timeout configured"))?
        .parse::<u64>()
        .context("parsing pool_idle_timeout")?;

    let cache: Arc<Cache<String, Value>> = Arc::new(
        Cache::builder()
            .time_to_live(Duration::from_secs(cache_ttl))
            .build(),
    );
    let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());

    let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
        Cache::builder()
            .max_capacity(64)
            .time_to_live(Duration::from_secs(1800))
            .build(),
    );
    #[cfg(feature = "deadpool_experimental")]
    let jdbc_deadpool_cache: Arc<
        Cache<String, Arc<tokio::sync::OnceCell<deadpool_postgres::Pool>>>,
    > = Arc::new(
        Cache::builder()
            .max_capacity(64)
            .time_to_live(Duration::from_secs(1800))
            .build(),
    );
    let client: Client = Client::builder()
        .pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
        .build()
        .context("Failed to build HTTP client")?;

    let config_targets: Vec<ClientConnectionTarget> = client_connection_targets_from_config(config);
    let postgres_entries: Vec<(String, String)> =
        postgres_registry_entries_from_targets(&config_targets);

    let pool_manager = connection_pool_manager_from_env();

    let (registry, failed_connections) =
        PostgresClientRegistry::from_entries(postgres_entries, pool_manager.clone())
            .await
            .context("Failed to build Postgres registry")?;

    #[cfg(feature = "deadpool_experimental")]
    let deadpool_registry: DeadpoolPostgresRegistry = {
        let max_size: usize = std::env::var("ATHENA_PG_POOL_MAX_CONNECTIONS")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(50) as usize;
        let warmup_timeout_ms: u64 = std::env::var("ATHENA_DEADPOOL_WARMUP_TIMEOUT_MS")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(800);
        DeadpoolPostgresRegistry::from_entries(
            postgres_registry_entries_from_targets(&config_targets),
            max_size,
            Duration::from_millis(warmup_timeout_ms),
        )
        .await
    };

    let failed_config_client_keys = failed_config_keys_from_errors(&failed_connections);

    for (client_name, err) in &failed_connections {
        tracing::warn!(
            client = %client_name,
            error = %err,
            "Postgres client unavailable, continuing without it"
        );
    }

    if registry.is_empty() {
        tracing::warn!("No Postgres clients connected; Athena will run without Postgres support");
    }

    for target in &config_targets {
        registry.remember_client(
            target.clone(),
            registry.get_pool(&target.client_name).is_some(),
        );
    }

    let logging_client_name: Option<String> = config.get_gateway_logging_client().cloned();
    let gateway_auth_client_name: Option<String> = config.get_gateway_auth_client().cloned();

    if let Some(logging_client) = logging_client_name.as_ref() {
        if let Some(logging_pool) = registry.get_pool(logging_client) {
            for target in &config_targets {
                if let Err(err) = upsert_athena_client(
                    &logging_pool,
                    SaveAthenaClientParams {
                        client_name: target.client_name.clone(),
                        description: target.description.clone(),
                        pg_uri: target.pg_uri.clone(),
                        pg_uri_env_var: target.pg_uri_env_var.clone(),
                        config_uri_template: target.config_uri_template.clone(),
                        source: "config".to_string(),
                        is_active: true,
                        is_frozen: false,
                        metadata: serde_json::json!({ "seeded_from": "config.yaml" }),
                    },
                )
                .await
                {
                    tracing::warn!(
                        client = %target.client_name,
                        error = %err,
                        "Failed to sync config client into athena_clients"
                    );
                }
            }

            match list_athena_clients(&logging_pool).await {
                Ok(db_clients) => {
                    merge_athena_clients_from_records(
                        &registry,
                        db_clients,
                        &failed_config_client_keys,
                    )
                    .await;
                }
                Err(err) => {
                    tracing::warn!(
                        client = %logging_client,
                        error = %err,
                        "Failed to load athena_clients catalog; continuing with config-backed clients only"
                    );
                }
            }

            if let Err(err) = refresh_client_statistics(&logging_pool).await {
                tracing::warn!(error = %err, "Failed to refresh client statistics during bootstrap");
            }
        } else {
            tracing::warn!(
                client = %logging_client,
                "Logging client is not connected; database-backed client catalog is unavailable"
            );
        }
    }

    registry.sync_connection_status();

    let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
        match load_registry_from_path(pipelines_path) {
            Ok(map) => {
                tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
                Some(Arc::new(map))
            }
            Err(err) => {
                tracing::warn!(
                    path = %pipelines_path,
                    error = %err,
                    "Failed to load pipelines registry"
                );
                None
            }
        };

    let app_state: Data<AppState> = Data::new(AppState {
        cache,
        immortal_cache,
        client,
        process_start_time_seconds: SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs() as i64,
        process_started_at: Instant::now(),
        pg_registry: Arc::new(registry),
        jdbc_pool_cache,
        #[cfg(feature = "deadpool_experimental")]
        deadpool_registry: Arc::new(deadpool_registry),
        #[cfg(feature = "deadpool_experimental")]
        jdbc_deadpool_cache,
        gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
        gateway_auto_cast_uuid_filter_values_to_text: config
            .get_gateway_auto_cast_uuid_filter_values_to_text(),
        gateway_allow_schema_names_prefixed_as_table_name: config
            .get_gateway_allow_schema_names_prefixed_as_table_name(),
        pipeline_registry: pipeline_registry.clone(),
        logging_client_name,
        gateway_auth_client_name,
        gateway_jdbc_allow_private_hosts: config.get_gateway_jdbc_allow_private_hosts(),
        gateway_jdbc_allowed_hosts: config.get_gateway_jdbc_allowed_hosts(),
        gateway_resilience_timeout_secs: config.get_gateway_resilience_timeout_secs(),
        gateway_resilience_read_max_retries: config.get_gateway_resilience_read_max_retries(),
        gateway_resilience_initial_backoff_ms: config.get_gateway_resilience_initial_backoff_ms(),
        prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
        metrics_state: Arc::new(MetricsState::new()),
    });

    Ok(Bootstrap {
        app_state,
        pipeline_registry,
    })
}