use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::AppState;
use crate::api::pipelines::{PipelineDefinition, load_registry_from_path};
use crate::config::Config;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use crate::parser::resolve_postgres_uri;
pub struct Bootstrap {
pub app_state: Data<AppState>,
pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}
pub async fn build_shared_state(config: &Config, pipelines_path: &str) -> Result<Bootstrap> {
let cache_ttl: u64 = config
.get_cache_ttl()
.ok_or_else(|| anyhow!("No cache TTL configured"))?
.parse::<u64>()
.context("parsing cache_ttl")?;
let pool_idle_timeout: u64 = config
.get_pool_idle_timeout()
.ok_or_else(|| anyhow!("No pool idle timeout configured"))?
.parse::<u64>()
.context("parsing pool_idle_timeout")?;
let cache: Arc<Cache<String, Value>> = Arc::new(
Cache::builder()
.time_to_live(Duration::from_secs(cache_ttl))
.build(),
);
let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());
let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
Cache::builder()
.max_capacity(64)
.time_to_live(Duration::from_secs(1800))
.build(),
);
let client: Client = Client::builder()
.pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
.build()
.context("Failed to build HTTP client")?;
let postgres_entries: Vec<(String, String)> = config
.postgres_clients
.iter()
.flat_map(|map| {
map.iter()
.map(|(key, uri)| (key.clone(), resolve_postgres_uri(uri)))
})
.collect();
let (registry, failed_connections) = PostgresClientRegistry::from_entries(postgres_entries)
.await
.context("Failed to build Postgres registry")?;
for (client_name, err) in &failed_connections {
tracing::warn!(
client = %client_name,
error = %err,
"Postgres client unavailable, continuing without it"
);
}
if registry.is_empty() {
tracing::warn!("No Postgres clients connected; Athena will run without Postgres support");
}
let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
match load_registry_from_path(pipelines_path) {
Ok(map) => {
tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
Some(Arc::new(map))
}
Err(err) => {
tracing::warn!(
path = %pipelines_path,
error = %err,
"Failed to load pipelines registry"
);
None
}
};
let app_state: Data<AppState> = Data::new(AppState {
cache,
immortal_cache,
client,
pg_registry: Arc::new(registry),
jdbc_pool_cache,
gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
pipeline_registry: pipeline_registry.clone(),
logging_client_name: config.get_gateway_logging_client().cloned(),
gateway_auth_client_name: config.get_gateway_auth_client().cloned(),
prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
});
Ok(Bootstrap {
app_state,
pipeline_registry,
})
}