athena_rs 2.0.2

Database gateway API
Documentation
use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use crate::AppState;
use crate::api::metrics::MetricsState;
use crate::api::pipelines::{PipelineDefinition, load_registry_from_path};
use crate::config::Config;
use crate::data::clients::{
    SaveAthenaClientParams, list_athena_clients, refresh_client_statistics, upsert_athena_client,
};
use crate::drivers::postgresql::pool_manager::ConnectionPoolManager;
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
use crate::parser::{parse_env_reference, resolve_postgres_uri};

/// Shared state produced by the configuration bootstrap.
pub struct Bootstrap {
    /// Shared Actix state.
    pub app_state: Data<AppState>,
    /// Optional pipeline registry loaded from disk.
    pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}

/// Builds caches, HTTP clients, and the shared AppState that both the server and CLI use.
pub async fn build_shared_state(config: &Config, pipelines_path: &str) -> Result<Bootstrap> {
    let cache_ttl: u64 = config
        .get_cache_ttl()
        .ok_or_else(|| anyhow!("No cache TTL configured"))?
        .parse::<u64>()
        .context("parsing cache_ttl")?;

    let pool_idle_timeout: u64 = config
        .get_pool_idle_timeout()
        .ok_or_else(|| anyhow!("No pool idle timeout configured"))?
        .parse::<u64>()
        .context("parsing pool_idle_timeout")?;

    let cache: Arc<Cache<String, Value>> = Arc::new(
        Cache::builder()
            .time_to_live(Duration::from_secs(cache_ttl))
            .build(),
    );
    let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());

    let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
        Cache::builder()
            .max_capacity(64)
            .time_to_live(Duration::from_secs(1800))
            .build(),
    );
    let client: Client = Client::builder()
        .pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
        .build()
        .context("Failed to build HTTP client")?;

    let config_targets: Vec<ClientConnectionTarget> = config
        .postgres_clients
        .iter()
        .flat_map(|map| {
            map.iter().map(|(key, uri)| ClientConnectionTarget {
                client_name: key.clone(),
                source: "config".to_string(),
                description: None,
                pg_uri: parse_env_reference(uri)
                    .is_none()
                    .then(|| resolve_postgres_uri(uri)),
                pg_uri_env_var: parse_env_reference(uri),
                config_uri_template: Some(uri.clone()),
                is_active: true,
                is_frozen: false,
            })
        })
        .collect();
    let postgres_entries: Vec<(String, String)> = config_targets
        .iter()
        .filter_map(|target| {
            let uri = target
                .config_uri_template
                .as_ref()
                .map(|value| resolve_postgres_uri(value))
                .or_else(|| target.pg_uri.clone());
            uri.map(|uri| (target.client_name.clone(), uri))
        })
        .collect();

    let pool_manager: ConnectionPoolManager =
        ConnectionPoolManager::new(crate::client::config::PoolConfig {
            max_connections: env::var("ATHENA_PG_POOL_MAX_CONNECTIONS")
                .ok()
                .and_then(|v| v.parse().ok())
                .unwrap_or(50),
            min_connections: env::var("ATHENA_PG_POOL_MIN_CONNECTIONS")
                .ok()
                .and_then(|v| v.parse().ok())
                .unwrap_or(5),
            connection_timeout: Duration::from_secs(
                env::var("ATHENA_PG_POOL_ACQUIRE_TIMEOUT_SECS")
                    .ok()
                    .and_then(|v| v.parse().ok())
                    .unwrap_or(3),
            ),
            idle_timeout: Duration::from_secs(
                env::var("ATHENA_PG_POOL_IDLE_TIMEOUT_SECS")
                    .ok()
                    .and_then(|v| v.parse().ok())
                    .unwrap_or(300),
            ),
        });

    let (registry, failed_connections) =
        PostgresClientRegistry::from_entries(postgres_entries, pool_manager.clone())
            .await
            .context("Failed to build Postgres registry")?;

    for (client_name, err) in &failed_connections {
        tracing::warn!(
            client = %client_name,
            error = %err,
            "Postgres client unavailable, continuing without it"
        );
    }

    if registry.is_empty() {
        tracing::warn!("No Postgres clients connected; Athena will run without Postgres support");
    }

    for target in &config_targets {
        registry.remember_client(
            target.clone(),
            registry.get_pool(&target.client_name).is_some(),
        );
    }

    let logging_client_name = config.get_gateway_logging_client().cloned();
    let gateway_auth_client_name = config.get_gateway_auth_client().cloned();

    if let Some(logging_client) = logging_client_name.as_ref() {
        if let Some(logging_pool) = registry.get_pool(logging_client) {
            for target in &config_targets {
                if let Err(err) = upsert_athena_client(
                    &logging_pool,
                    SaveAthenaClientParams {
                        client_name: target.client_name.clone(),
                        description: target.description.clone(),
                        pg_uri: target.pg_uri.clone(),
                        pg_uri_env_var: target.pg_uri_env_var.clone(),
                        config_uri_template: target.config_uri_template.clone(),
                        source: "config".to_string(),
                        is_active: true,
                        is_frozen: false,
                        metadata: serde_json::json!({ "seeded_from": "config.yaml" }),
                    },
                )
                .await
                {
                    tracing::warn!(
                        client = %target.client_name,
                        error = %err,
                        "Failed to sync config client into athena_clients"
                    );
                }
            }

            match list_athena_clients(&logging_pool).await {
                Ok(db_clients) => {
                    for client in db_clients {
                        let target = ClientConnectionTarget {
                            client_name: client.client_name.clone(),
                            source: client.source.clone(),
                            description: client.description.clone(),
                            pg_uri: client.pg_uri.clone(),
                            pg_uri_env_var: client.pg_uri_env_var.clone(),
                            config_uri_template: client.config_uri_template.clone(),
                            is_active: client.is_active,
                            is_frozen: client.is_frozen,
                        };

                        if !target.is_active || target.is_frozen {
                            registry.remember_client(target.clone(), false);
                            registry.mark_unavailable(&target.client_name);
                            continue;
                        }

                        if registry.get_pool(&target.client_name).is_some() {
                            registry.remember_client(target, true);
                            continue;
                        }

                        if let Err(err) = registry.upsert_client(target.clone()).await {
                            tracing::warn!(
                                client = %target.client_name,
                                error = %err,
                                "Failed to load database-backed client into local registry"
                            );
                            registry.remember_client(target, false);
                        }
                    }
                }
                Err(err) => {
                    tracing::warn!(
                        client = %logging_client,
                        error = %err,
                        "Failed to load athena_clients catalog; continuing with config-backed clients only"
                    );
                }
            }

            if let Err(err) = refresh_client_statistics(&logging_pool).await {
                tracing::warn!(error = %err, "Failed to refresh client statistics during bootstrap");
            }
        } else {
            tracing::warn!(
                client = %logging_client,
                "Logging client is not connected; database-backed client catalog is unavailable"
            );
        }
    }

    let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
        match load_registry_from_path(pipelines_path) {
            Ok(map) => {
                tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
                Some(Arc::new(map))
            }
            Err(err) => {
                tracing::warn!(
                    path = %pipelines_path,
                    error = %err,
                    "Failed to load pipelines registry"
                );
                None
            }
        };

    let app_state: Data<AppState> = Data::new(AppState {
        cache,
        immortal_cache,
        client,
        process_start_time_seconds: SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs() as i64,
        process_started_at: Instant::now(),
        pg_registry: Arc::new(registry),
        jdbc_pool_cache,
        gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
        gateway_auto_cast_uuid_filter_values_to_text: config
            .get_gateway_auto_cast_uuid_filter_values_to_text(),
        pipeline_registry: pipeline_registry.clone(),
        logging_client_name,
        gateway_auth_client_name,
        gateway_jdbc_allow_private_hosts: config.get_gateway_jdbc_allow_private_hosts(),
        gateway_jdbc_allowed_hosts: config.get_gateway_jdbc_allowed_hosts(),
        prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
        metrics_state: Arc::new(MetricsState::new()),
    });

    Ok(Bootstrap {
        app_state,
        pipeline_registry,
    })
}