use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::AppState;
use crate::api::pipelines::{PipelineDefinition, load_registry_from_path};
use crate::config::Config;
use crate::data::clients::{
SaveAthenaClientParams, list_athena_clients, refresh_client_statistics, upsert_athena_client,
};
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
use crate::parser::{parse_env_reference, resolve_postgres_uri};
pub struct Bootstrap {
pub app_state: Data<AppState>,
pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}
pub async fn build_shared_state(config: &Config, pipelines_path: &str) -> Result<Bootstrap> {
let cache_ttl: u64 = config
.get_cache_ttl()
.ok_or_else(|| anyhow!("No cache TTL configured"))?
.parse::<u64>()
.context("parsing cache_ttl")?;
let pool_idle_timeout: u64 = config
.get_pool_idle_timeout()
.ok_or_else(|| anyhow!("No pool idle timeout configured"))?
.parse::<u64>()
.context("parsing pool_idle_timeout")?;
let cache: Arc<Cache<String, Value>> = Arc::new(
Cache::builder()
.time_to_live(Duration::from_secs(cache_ttl))
.build(),
);
let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());
let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
Cache::builder()
.max_capacity(64)
.time_to_live(Duration::from_secs(1800))
.build(),
);
let client: Client = Client::builder()
.pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
.build()
.context("Failed to build HTTP client")?;
let config_targets: Vec<ClientConnectionTarget> = config
.postgres_clients
.iter()
.flat_map(|map| {
map.iter().map(|(key, uri)| ClientConnectionTarget {
client_name: key.clone(),
source: "config".to_string(),
description: None,
pg_uri: parse_env_reference(uri)
.is_none()
.then(|| resolve_postgres_uri(uri)),
pg_uri_env_var: parse_env_reference(uri),
config_uri_template: Some(uri.clone()),
is_active: true,
is_frozen: false,
})
})
.collect();
let postgres_entries: Vec<(String, String)> = config_targets
.iter()
.filter_map(|target| {
let uri = target
.config_uri_template
.as_ref()
.map(|value| resolve_postgres_uri(value))
.or_else(|| target.pg_uri.clone());
uri.map(|uri| (target.client_name.clone(), uri))
})
.collect();
let (registry, failed_connections) = PostgresClientRegistry::from_entries(postgres_entries)
.await
.context("Failed to build Postgres registry")?;
for (client_name, err) in &failed_connections {
tracing::warn!(
client = %client_name,
error = %err,
"Postgres client unavailable, continuing without it"
);
}
if registry.is_empty() {
tracing::warn!("No Postgres clients connected; Athena will run without Postgres support");
}
for target in &config_targets {
registry.remember_client(
target.clone(),
registry.get_pool(&target.client_name).is_some(),
);
}
let logging_client_name = config.get_gateway_logging_client().cloned();
let gateway_auth_client_name = config.get_gateway_auth_client().cloned();
if let Some(logging_client) = logging_client_name.as_ref() {
if let Some(logging_pool) = registry.get_pool(logging_client) {
for target in &config_targets {
if let Err(err) = upsert_athena_client(
&logging_pool,
SaveAthenaClientParams {
client_name: target.client_name.clone(),
description: target.description.clone(),
pg_uri: target.pg_uri.clone(),
pg_uri_env_var: target.pg_uri_env_var.clone(),
config_uri_template: target.config_uri_template.clone(),
source: "config".to_string(),
is_active: true,
is_frozen: false,
metadata: serde_json::json!({ "seeded_from": "config.yaml" }),
},
)
.await
{
tracing::warn!(
client = %target.client_name,
error = %err,
"Failed to sync config client into athena_clients"
);
}
}
match list_athena_clients(&logging_pool).await {
Ok(db_clients) => {
for client in db_clients {
let target = ClientConnectionTarget {
client_name: client.client_name.clone(),
source: client.source.clone(),
description: client.description.clone(),
pg_uri: client.pg_uri.clone(),
pg_uri_env_var: client.pg_uri_env_var.clone(),
config_uri_template: client.config_uri_template.clone(),
is_active: client.is_active,
is_frozen: client.is_frozen,
};
if !target.is_active || target.is_frozen {
registry.remember_client(target.clone(), false);
registry.mark_unavailable(&target.client_name);
continue;
}
if registry.get_pool(&target.client_name).is_some() {
registry.remember_client(target, true);
continue;
}
if let Err(err) = registry.upsert_client(target.clone()).await {
tracing::warn!(
client = %target.client_name,
error = %err,
"Failed to load database-backed client into local registry"
);
registry.remember_client(target, false);
}
}
}
Err(err) => {
tracing::warn!(
client = %logging_client,
error = %err,
"Failed to load athena_clients catalog; continuing with config-backed clients only"
);
}
}
if let Err(err) = refresh_client_statistics(&logging_pool).await {
tracing::warn!(error = %err, "Failed to refresh client statistics during bootstrap");
}
} else {
tracing::warn!(
client = %logging_client,
"Logging client is not connected; database-backed client catalog is unavailable"
);
}
}
let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
match load_registry_from_path(pipelines_path) {
Ok(map) => {
tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
Some(Arc::new(map))
}
Err(err) => {
tracing::warn!(
path = %pipelines_path,
error = %err,
"Failed to load pipelines registry"
);
None
}
};
let app_state: Data<AppState> = Data::new(AppState {
cache,
immortal_cache,
client,
pg_registry: Arc::new(registry),
jdbc_pool_cache,
gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
pipeline_registry: pipeline_registry.clone(),
logging_client_name,
gateway_auth_client_name,
prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
});
Ok(Bootstrap {
app_state,
pipeline_registry,
})
}