athena_rs 1.1.0

Database gateway API
Documentation
use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crate::AppState;
use crate::api::pipelines::{PipelineDefinition, load_registry_from_path};
use crate::config::Config;
use crate::data::clients::{
    SaveAthenaClientParams, list_athena_clients, refresh_client_statistics, upsert_athena_client,
};
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
use crate::parser::{parse_env_reference, resolve_postgres_uri};

/// Shared state produced by the configuration bootstrap.
pub struct Bootstrap {
    /// Shared Actix state.
    pub app_state: Data<AppState>,
    /// Optional pipeline registry loaded from disk.
    pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}

/// Builds caches, HTTP clients, and the shared AppState that both the server and CLI use.
pub async fn build_shared_state(config: &Config, pipelines_path: &str) -> Result<Bootstrap> {
    let cache_ttl: u64 = config
        .get_cache_ttl()
        .ok_or_else(|| anyhow!("No cache TTL configured"))?
        .parse::<u64>()
        .context("parsing cache_ttl")?;

    let pool_idle_timeout: u64 = config
        .get_pool_idle_timeout()
        .ok_or_else(|| anyhow!("No pool idle timeout configured"))?
        .parse::<u64>()
        .context("parsing pool_idle_timeout")?;

    let cache: Arc<Cache<String, Value>> = Arc::new(
        Cache::builder()
            .time_to_live(Duration::from_secs(cache_ttl))
            .build(),
    );
    let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());

    let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
        Cache::builder()
            .max_capacity(64)
            .time_to_live(Duration::from_secs(1800))
            .build(),
    );
    let client: Client = Client::builder()
        .pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
        .build()
        .context("Failed to build HTTP client")?;

    let config_targets: Vec<ClientConnectionTarget> = config
        .postgres_clients
        .iter()
        .flat_map(|map| {
            map.iter().map(|(key, uri)| ClientConnectionTarget {
                client_name: key.clone(),
                source: "config".to_string(),
                description: None,
                pg_uri: parse_env_reference(uri)
                    .is_none()
                    .then(|| resolve_postgres_uri(uri)),
                pg_uri_env_var: parse_env_reference(uri),
                config_uri_template: Some(uri.clone()),
                is_active: true,
                is_frozen: false,
            })
        })
        .collect();
    let postgres_entries: Vec<(String, String)> = config_targets
        .iter()
        .filter_map(|target| {
            let uri = target
                .config_uri_template
                .as_ref()
                .map(|value| resolve_postgres_uri(value))
                .or_else(|| target.pg_uri.clone());
            uri.map(|uri| (target.client_name.clone(), uri))
        })
        .collect();

    let (registry, failed_connections) = PostgresClientRegistry::from_entries(postgres_entries)
        .await
        .context("Failed to build Postgres registry")?;

    for (client_name, err) in &failed_connections {
        tracing::warn!(
            client = %client_name,
            error = %err,
            "Postgres client unavailable, continuing without it"
        );
    }

    if registry.is_empty() {
        tracing::warn!("No Postgres clients connected; Athena will run without Postgres support");
    }

    for target in &config_targets {
        registry.remember_client(
            target.clone(),
            registry.get_pool(&target.client_name).is_some(),
        );
    }

    let logging_client_name = config.get_gateway_logging_client().cloned();
    let gateway_auth_client_name = config.get_gateway_auth_client().cloned();

    if let Some(logging_client) = logging_client_name.as_ref() {
        if let Some(logging_pool) = registry.get_pool(logging_client) {
            for target in &config_targets {
                if let Err(err) = upsert_athena_client(
                    &logging_pool,
                    SaveAthenaClientParams {
                        client_name: target.client_name.clone(),
                        description: target.description.clone(),
                        pg_uri: target.pg_uri.clone(),
                        pg_uri_env_var: target.pg_uri_env_var.clone(),
                        config_uri_template: target.config_uri_template.clone(),
                        source: "config".to_string(),
                        is_active: true,
                        is_frozen: false,
                        metadata: serde_json::json!({ "seeded_from": "config.yaml" }),
                    },
                )
                .await
                {
                    tracing::warn!(
                        client = %target.client_name,
                        error = %err,
                        "Failed to sync config client into athena_clients"
                    );
                }
            }

            match list_athena_clients(&logging_pool).await {
                Ok(db_clients) => {
                    for client in db_clients {
                        let target = ClientConnectionTarget {
                            client_name: client.client_name.clone(),
                            source: client.source.clone(),
                            description: client.description.clone(),
                            pg_uri: client.pg_uri.clone(),
                            pg_uri_env_var: client.pg_uri_env_var.clone(),
                            config_uri_template: client.config_uri_template.clone(),
                            is_active: client.is_active,
                            is_frozen: client.is_frozen,
                        };

                        if !target.is_active || target.is_frozen {
                            registry.remember_client(target.clone(), false);
                            registry.mark_unavailable(&target.client_name);
                            continue;
                        }

                        if registry.get_pool(&target.client_name).is_some() {
                            registry.remember_client(target, true);
                            continue;
                        }

                        if let Err(err) = registry.upsert_client(target.clone()).await {
                            tracing::warn!(
                                client = %target.client_name,
                                error = %err,
                                "Failed to load database-backed client into local registry"
                            );
                            registry.remember_client(target, false);
                        }
                    }
                }
                Err(err) => {
                    tracing::warn!(
                        client = %logging_client,
                        error = %err,
                        "Failed to load athena_clients catalog; continuing with config-backed clients only"
                    );
                }
            }

            if let Err(err) = refresh_client_statistics(&logging_pool).await {
                tracing::warn!(error = %err, "Failed to refresh client statistics during bootstrap");
            }
        } else {
            tracing::warn!(
                client = %logging_client,
                "Logging client is not connected; database-backed client catalog is unavailable"
            );
        }
    }

    let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
        match load_registry_from_path(pipelines_path) {
            Ok(map) => {
                tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
                Some(Arc::new(map))
            }
            Err(err) => {
                tracing::warn!(
                    path = %pipelines_path,
                    error = %err,
                    "Failed to load pipelines registry"
                );
                None
            }
        };

    let app_state: Data<AppState> = Data::new(AppState {
        cache,
        immortal_cache,
        client,
        pg_registry: Arc::new(registry),
        jdbc_pool_cache,
        gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
        pipeline_registry: pipeline_registry.clone(),
        logging_client_name,
        gateway_auth_client_name,
        prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
    });

    Ok(Bootstrap {
        app_state,
        pipeline_registry,
    })
}