athena_rs 0.83.0

Database gateway API
Documentation
use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crate::AppState;
use crate::api::pipelines::{PipelineDefinition, load_registry_from_path};
use crate::config::Config;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use crate::parser::resolve_postgres_uri;

/// Shared state produced by the configuration bootstrap.
pub struct Bootstrap {
    /// Shared Actix state.
    pub app_state: Data<AppState>,
    /// Optional pipeline registry loaded from disk.
    pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}

/// Builds caches, HTTP clients, and the shared AppState that both the server and CLI use.
pub async fn build_shared_state(config: &Config, pipelines_path: &str) -> Result<Bootstrap> {
    let cache_ttl: u64 = config
        .get_cache_ttl()
        .ok_or_else(|| anyhow!("No cache TTL configured"))?
        .parse::<u64>()
        .context("parsing cache_ttl")?;

    let pool_idle_timeout: u64 = config
        .get_pool_idle_timeout()
        .ok_or_else(|| anyhow!("No pool idle timeout configured"))?
        .parse::<u64>()
        .context("parsing pool_idle_timeout")?;

    let cache: Arc<Cache<String, Value>> = Arc::new(
        Cache::builder()
            .time_to_live(Duration::from_secs(cache_ttl))
            .build(),
    );
    let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());

    let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
        Cache::builder()
            .max_capacity(64)
            .time_to_live(Duration::from_secs(1800))
            .build(),
    );
    let client: Client = Client::builder()
        .pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
        .build()
        .context("Failed to build HTTP client")?;

    let postgres_entries: Vec<(String, String)> = config
        .postgres_clients
        .iter()
        .flat_map(|map| {
            map.iter()
                .map(|(key, uri)| (key.clone(), resolve_postgres_uri(uri)))
        })
        .collect();

    let (registry, failed_connections) = PostgresClientRegistry::from_entries(postgres_entries)
        .await
        .context("Failed to build Postgres registry")?;

    for (client_name, err) in &failed_connections {
        tracing::warn!(
            client = %client_name,
            error = %err,
            "Postgres client unavailable, continuing without it"
        );
    }

    if registry.is_empty() {
        tracing::warn!("No Postgres clients connected; Athena will run without Postgres support");
    }

    let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
        match load_registry_from_path(pipelines_path) {
            Ok(map) => {
                tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
                Some(Arc::new(map))
            }
            Err(err) => {
                tracing::warn!(
                    path = %pipelines_path,
                    error = %err,
                    "Failed to load pipelines registry"
                );
                None
            }
        };

    let app_state: Data<AppState> = Data::new(AppState {
        cache,
        immortal_cache,
        client,
        pg_registry: Arc::new(registry),
        jdbc_pool_cache,
        gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
        pipeline_registry: pipeline_registry.clone(),
        logging_client_name: config.get_gateway_logging_client().cloned(),
        gateway_auth_client_name: config.get_gateway_auth_client().cloned(),
        prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
    });

    Ok(Bootstrap {
        app_state,
        pipeline_registry,
    })
}