mod postgres_init;
use actix_web::web::Data;
use anyhow::{Context, Result, anyhow};
use athena_pipelines::{
PipelineDefinition, ensure_pipeline_step_log_table, ensure_pipeline_template_table,
load_registry_from_path,
};
use athena_webhooks::{InboundWebhookSinkDefinition, load_inbound_sink_registry_from_path};
use chrono::{Duration as ChronoDuration, Utc};
use moka::future::Cache;
use reqwest::Client;
use reqwest::Url;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::Semaphore;
use crate::AppState;
use crate::api::chat::auth::AuthResolver;
use crate::api::chat::runtime::{AppChatPoolResolver, ChatMetrics, StorageFacade, WsHubPublisher};
use crate::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use crate::api::metrics::MetricsState;
use crate::api::rate_limit::build_keyed_limiter;
use crate::config::Config;
use crate::config_validation::runtime_env_settings;
use crate::data::client_configs::ensure_athena_client_config_table;
use crate::data::clients::{
ClientStatisticsRefreshMode, ClientStatisticsRefreshParams, SaveAthenaClientParams,
list_athena_clients, refresh_client_statistics_with_params, upsert_athena_client,
};
use crate::config_validation::RuntimeEnvSettings;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry;
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
use crate::features::connection_pooler::ConnectionPoolManager;
use crate::utils::client_stats_batcher::{ClientStatsBatcher, ClientStatsBatcherConfig};
use crate::utils::linux_gateway_file_log::LinuxGatewayFileLog;
use crate::utils::request_logging::GatewayLogBatcher;
use postgres_init::connection_pool_manager_from_env;
use sqlx::PgPool;
pub use postgres_init::{
CatalogClientStep, PostgresCatalogMergeReport, client_connection_targets_from_config,
failed_config_keys_from_errors, merge_athena_clients_from_records,
merge_catalog_targets_into_registry, plan_catalog_client_step,
postgres_registry_entries_from_targets,
};
pub struct Bootstrap {
pub app_state: Data<AppState>,
pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
}
const DEFAULT_WEBHOOK_SINKS_PATH: &str = "config/webhook-sinks.yaml";
#[derive(Debug, Clone, Default)]
pub struct RuntimeConfigMetadata {
pub config_path: Option<String>,
pub source_label: Option<String>,
pub seeded_default: bool,
}
fn logging_task_limit_for_pool_max(pool_max_connections: usize) -> usize {
(pool_max_connections.max(1) / 4).clamp(1, 16)
}
fn redact_database_url(database_url: &str) -> String {
let Ok(mut parsed) = Url::parse(database_url) else {
return "<invalid_database_url>".to_string();
};
if !parsed.username().is_empty() {
let _ = parsed.set_username("***");
}
if parsed.password().is_some() {
let _ = parsed.set_password(Some("***"));
}
parsed.to_string()
}
fn build_postgres_client_debug_entries(config: &Config) -> Vec<Value> {
config
.postgres_clients
.iter()
.filter_map(|entry| {
let client_name = entry
.get("client_name")
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())?;
let pg_uri = entry
.get("pg_uri")
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
let pg_uri_env_var = entry
.get("pg_uri_env_var")
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
let config_uri_template = entry
.get("config_uri_template")
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
let uri_source = if pg_uri.is_some() {
"inline_pg_uri"
} else if pg_uri_env_var.is_some() {
"pg_uri_env_var"
} else if config_uri_template.is_some() {
"config_uri_template"
} else {
"missing"
};
Some(json!({
"client_name": client_name,
"description": entry
.get("description")
.map(|value| value.trim())
.filter(|value| !value.is_empty()),
"uri_source": uri_source,
"pg_uri": pg_uri.as_deref().map(redact_database_url),
"pg_uri_env_var": pg_uri_env_var,
"config_uri_template": config_uri_template.as_deref().map(redact_database_url),
}))
})
.collect()
}
fn build_runtime_debug_config_snapshot(config: &Config, metadata: &RuntimeConfigMetadata) -> Value {
json!({
"config_path": metadata.config_path,
"source_label": metadata.source_label,
"seeded_default": metadata.seeded_default,
"api": {
"cors_allow_any_origin": config.get_cors_allow_any_origin(),
"cors_allowed_origins": config.get_cors_allowed_origins(),
},
"gateway": {
"logging_client": config.get_gateway_logging_client(),
"auth_client": config.get_gateway_auth_client(),
"benchmark_client": config.get_gateway_benchmark_client(),
"database_backed_client_loading_enabled": config.get_gateway_database_backed_client_loading_enabled(),
"api_key_fail_mode": config.get_gateway_api_key_fail_mode(),
"jdbc_allow_private_hosts": config.get_gateway_jdbc_allow_private_hosts(),
"jdbc_allowed_hosts": config.get_gateway_jdbc_allowed_hosts(),
"resilience_timeout_secs": config.get_gateway_resilience_timeout_secs(),
"resilience_read_max_retries": config.get_gateway_resilience_read_max_retries(),
"resilience_initial_backoff_ms": config.get_gateway_resilience_initial_backoff_ms(),
"admission_store_backend": config.get_gateway_admission_store_backend(),
"admission_store_fail_mode": config.get_gateway_admission_store_fail_mode(),
"rate_limit_trust_x_forwarded_for": config.get_gateway_rate_limit_trust_x_forwarded_for(),
"logging_trust_x_forwarded_for": config.get_gateway_logging_trust_x_forwarded_for(),
},
"postgres_clients": build_postgres_client_debug_entries(config),
})
}
fn summarize_postgres_boot_failures(
config_targets: &[ClientConnectionTarget],
failed_connections: &[(String, anyhow::Error)],
logging_override_failure: Option<&str>,
) -> String {
let mut details: Vec<String> = Vec::new();
if config_targets.is_empty() {
details.push("no `postgres_clients` entries were configured".to_string());
}
if !failed_connections.is_empty() {
const FAILURE_PREVIEW_LIMIT: usize = 4;
let mut preview: Vec<String> = failed_connections
.iter()
.take(FAILURE_PREVIEW_LIMIT)
.map(|(client_name, err)| format!("{client_name}: {err}"))
.collect();
if failed_connections.len() > FAILURE_PREVIEW_LIMIT {
preview.push(format!(
"... and {} more failed client(s)",
failed_connections.len() - FAILURE_PREVIEW_LIMIT
));
}
details.push(format!("client bootstrap failures: {}", preview.join("; ")));
}
if let Some(override_failure) = logging_override_failure {
details.push(override_failure.to_string());
}
if details.is_empty() {
details.push(
"no client pools were available after bootstrap, but no specific failure was captured"
.to_string(),
);
}
details.join(" | ")
}
fn enforce_postgres_boot_policy(
config: &Config,
registry: &PostgresClientRegistry,
config_targets: &[ClientConnectionTarget],
failed_connections: &[(String, anyhow::Error)],
logging_override_failure: Option<&str>,
) -> Result<()> {
if !registry.is_empty() {
return Ok(());
}
let details = summarize_postgres_boot_failures(
config_targets,
failed_connections,
logging_override_failure,
);
if config.get_allow_start_without_postgres() {
tracing::warn!(
details = %details,
"No Postgres clients connected; continuing only because ATHENA_ALLOW_START_WITHOUT_POSTGRES=true. Postgres-backed routes, auth, logging, and catalog-backed client loading will be unavailable"
);
return Ok(());
}
tracing::error!(
details = %details,
"No Postgres clients connected; refusing to start"
);
Err(anyhow!(
"No Postgres clients connected; refusing to start. {}. Set the missing POSTGRES_* env vars referenced in config.yaml or fix the configured postgres_clients URIs. Only set ATHENA_ALLOW_START_WITHOUT_POSTGRES=true when you intentionally want a Postgres-free process.",
details
))
}
pub async fn build_shared_state(
config: &Config,
pipelines_path: &str,
runtime_config_metadata: RuntimeConfigMetadata,
) -> Result<Bootstrap> {
let cache_ttl: u64 = config.get_cache_ttl_secs();
let pool_idle_timeout: u64 = config.get_pool_idle_timeout_secs();
let runtime_env: &RuntimeEnvSettings = runtime_env_settings();
let request_cache_max_capacity: u64 = runtime_env.cache_max_capacity;
let request_cache_max_entry_weight: usize = runtime_env.cache_max_entry_weight;
let cache: Arc<Cache<String, Value>> = Arc::new(
Cache::builder()
.support_invalidation_closures()
.max_capacity(request_cache_max_capacity)
.weigher(move |key: &String, value: &Value| {
let key_weight: usize = key.len();
let value_weight: usize = serde_json::to_vec(value)
.map(|bytes| bytes.len())
.unwrap_or(0)
.min(request_cache_max_entry_weight);
let total_weight: usize = key_weight.saturating_add(value_weight);
u32::try_from(total_weight).unwrap_or(u32::MAX)
})
.time_to_live(Duration::from_secs(cache_ttl))
.build(),
);
let immortal_cache: Arc<Cache<String, serde_json::Value>> = Arc::new(Cache::builder().build());
let jdbc_pool_cache: Arc<Cache<String, sqlx::postgres::PgPool>> = Arc::new(
Cache::builder()
.max_capacity(64)
.time_to_live(Duration::from_secs(1800))
.build(),
);
#[cfg(feature = "deadpool_experimental")]
let jdbc_deadpool_cache: Arc<
Cache<String, Arc<tokio::sync::OnceCell<deadpool_postgres::Pool>>>,
> = Arc::new(
Cache::builder()
.max_capacity(64)
.time_to_live(Duration::from_secs(1800))
.build(),
);
let client: Client = Client::builder()
.pool_idle_timeout(Duration::from_secs(pool_idle_timeout))
.build()
.context("Failed to build HTTP client")?;
let config_targets: Vec<ClientConnectionTarget> = client_connection_targets_from_config(config);
let postgres_entries: Vec<(String, String)> =
postgres_registry_entries_from_targets(&config_targets);
let pool_manager: ConnectionPoolManager = connection_pool_manager_from_env();
let (registry, failed_connections) =
PostgresClientRegistry::from_entries(postgres_entries, pool_manager.clone())
.await
.context("Failed to build Postgres registry")?;
#[cfg(feature = "deadpool_experimental")]
let deadpool_registry: DeadpoolPostgresRegistry = {
let max_size: usize = runtime_env.pg_pool_max_connections as usize;
let warmup_timeout_ms: u64 = runtime_env.deadpool_warmup_timeout_ms;
DeadpoolPostgresRegistry::from_entries(
postgres_registry_entries_from_targets(&config_targets),
max_size,
Duration::from_millis(warmup_timeout_ms),
)
.await
};
let failed_config_client_keys = failed_config_keys_from_errors(&failed_connections);
let mut logging_override_failure: Option<String> = None;
for (client_name, err) in &failed_connections {
tracing::debug!(
client = %client_name,
error = %err,
"Postgres client skipped at bootstrap (per-client details were logged when the connection was attempted)"
);
}
for target in &config_targets {
registry.remember_client(
target.clone(),
registry.get_pool(&target.client_name).is_some(),
);
}
let logging_client_name: Option<String> = config.get_gateway_logging_client();
if let (Some(logging_client), Some(logging_pg_uri)) = (
logging_client_name.as_ref(),
config.get_gateway_logging_pg_uri(),
) {
let override_target: ClientConnectionTarget = ClientConnectionTarget {
client_name: logging_client.clone(),
source: "gateway_logging_override".to_string(),
description: Some(
"Configured from gateway.logging_pg_uri override in config".to_string(),
),
pg_uri: Some(logging_pg_uri),
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
};
match registry.upsert_client(override_target.clone()).await {
Ok(()) => {
tracing::info!(
client = %override_target.client_name,
"Connected logging client using dedicated gateway logging URI override"
);
}
Err(err) => {
logging_override_failure = Some(format!(
"gateway logging override `{}` failed: {}",
override_target.client_name, err
));
tracing::warn!(
client = %override_target.client_name,
error = %err,
"Failed to connect dedicated gateway logging URI override; falling back to existing registry entry if available"
);
}
}
}
enforce_postgres_boot_policy(
config,
®istry,
&config_targets,
&failed_connections,
logging_override_failure.as_deref(),
)?;
let gateway_auth_client_name: Option<String> = config.get_gateway_auth_client();
let gateway_benchmark_client_name: Option<String> = config.get_gateway_benchmark_client();
let gateway_database_backed_client_loading_enabled: bool =
config.get_gateway_database_backed_client_loading_enabled();
if let Some(logging_client) = logging_client_name.as_ref() {
if let Some(logging_pool) = registry.get_pool(logging_client) {
if let Err(err) = ensure_athena_client_config_table(&logging_pool).await {
tracing::warn!(
client = %logging_client,
error = %err,
"Failed to ensure athena_client_configs table"
);
}
if let Err(err) = ensure_pipeline_template_table(&logging_pool).await {
tracing::warn!(
client = %logging_client,
error = %err,
"Failed to ensure athena_pipeline_templates table"
);
}
if let Err(err) = ensure_pipeline_step_log_table(&logging_pool).await {
tracing::warn!(
client = %logging_client,
error = %err,
"Failed to ensure pipeline_step_log table"
);
}
for target in &config_targets {
if let Err(err) = upsert_athena_client(
&logging_pool,
SaveAthenaClientParams {
client_name: target.client_name.clone(),
description: target.description.clone(),
pg_uri: target.pg_uri.clone(),
pg_uri_env_var: target.pg_uri_env_var.clone(),
config_uri_template: target.config_uri_template.clone(),
source: "config".to_string(),
is_active: true,
is_frozen: false,
metadata: serde_json::json!({ "seeded_from": "config.yaml" }),
},
)
.await
{
tracing::warn!(
client = %target.client_name,
error = %err,
"Failed to sync config client into athena_clients"
);
}
}
if gateway_database_backed_client_loading_enabled {
match list_athena_clients(&logging_pool).await {
Ok(db_clients) => {
merge_athena_clients_from_records(
®istry,
db_clients,
&failed_config_client_keys,
)
.await;
}
Err(err) => {
tracing::warn!(
client = %logging_client,
error = %err,
"Failed to load athena_clients catalog; continuing with config-backed clients only"
);
}
}
} else {
tracing::info!("Database-backed client catalog loading is disabled via config");
}
let pool_for_client_stats: PgPool = logging_pool.clone();
let logging_client_for_stats: String = logging_client.clone();
let startup_refresh_mode =
if config.get_gateway_client_statistics_startup_refresh_full() {
ClientStatisticsRefreshMode::Full
} else {
ClientStatisticsRefreshMode::Fast
};
let startup_refresh_cutoff = config
.get_gateway_client_statistics_startup_refresh_lookback_days()
.and_then(|days| {
Utc::now().checked_sub_signed(ChronoDuration::days(i64::from(days)))
});
let startup_refresh_params = ClientStatisticsRefreshParams {
mode: startup_refresh_mode,
cutoff: startup_refresh_cutoff,
};
tokio::spawn(async move {
tracing::info!(
client = %logging_client_for_stats,
mode = ?startup_refresh_params.mode,
cutoff = ?startup_refresh_params.cutoff,
"Refreshing client_statistics / client_table_statistics in background (does not block API bind)"
);
if let Err(err) = refresh_client_statistics_with_params(
&pool_for_client_stats,
startup_refresh_params,
)
.await
{
tracing::warn!(
client = %logging_client_for_stats,
error = %err,
"Background refresh of client statistics failed"
);
}
});
} else {
tracing::warn!(
client = %logging_client,
"Logging client is not connected; database-backed client catalog is unavailable"
);
}
}
registry.sync_connection_status();
if let Some(logging_client) = logging_client_name.as_ref()
&& registry.get_pool(logging_client).is_some()
{
let logging_manager = ConnectionPoolManager::new(crate::client::config::PoolConfig {
max_connections: runtime_env.logging_pool_max_connections,
min_connections: runtime_env.pg_pool_min_connections,
connection_timeout: std::time::Duration::from_secs(
runtime_env.pg_pool_acquire_timeout_secs,
),
idle_timeout: std::time::Duration::from_secs(runtime_env.pg_pool_idle_timeout_secs),
})
.with_max_lifetime(std::time::Duration::from_secs(
runtime_env.pg_pool_max_lifetime_secs,
));
if let Err(err) = registry
.reconfigure_pool_manager(logging_client, logging_manager)
.await
{
tracing::warn!(
client = %logging_client,
error = %err,
"Failed to reconfigure logging pool with dedicated manager; using global pool config"
);
} else {
tracing::info!(
client = %logging_client,
max_connections = runtime_env.logging_pool_max_connections,
"Logging pool reconfigured with dedicated connection limit"
);
}
}
let pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>> =
match load_registry_from_path(pipelines_path) {
Ok(map) => {
tracing::info!(path = %pipelines_path, "Loaded pipeline registry");
Some(Arc::new(map))
}
Err(err) => {
tracing::warn!(
path = %pipelines_path,
error = %err,
"Failed to load pipelines registry"
);
None
}
};
let webhook_sink_registry: Option<Arc<HashMap<String, InboundWebhookSinkDefinition>>> =
if Path::new(DEFAULT_WEBHOOK_SINKS_PATH).exists() {
match load_inbound_sink_registry_from_path(DEFAULT_WEBHOOK_SINKS_PATH) {
Ok(map) => {
tracing::info!(
path = %DEFAULT_WEBHOOK_SINKS_PATH,
"Loaded inbound webhook sink registry"
);
Some(Arc::new(map))
}
Err(err) => {
tracing::warn!(
path = %DEFAULT_WEBHOOK_SINKS_PATH,
error = %err,
"Failed to load inbound webhook sink registry"
);
None
}
}
} else {
None
};
let insert_window_settings: InsertWindowSettings = InsertWindowSettings {
max_batch: config.get_gateway_insert_window_max_batch(),
max_queued: config.get_gateway_insert_window_max_queued(),
deny_tables: config.get_gateway_insert_merge_deny_tables(),
};
let insert_window_coordinator: Arc<InsertWindowCoordinator> =
InsertWindowCoordinator::new(insert_window_settings.clone());
let client_stats_batcher: Option<Arc<ClientStatsBatcher>> = logging_client_name
.as_ref()
.and_then(|name| registry.get_pool(name))
.map(|pool| {
Arc::new(ClientStatsBatcher::spawn(
pool,
ClientStatsBatcherConfig::default(),
))
});
let linux_gateway_file_log: Option<Arc<LinuxGatewayFileLog>> = LinuxGatewayFileLog::try_init(
config.get_gateway_linux_file_logging_enabled(),
config.get_gateway_linux_file_logging_dir_or_default(),
);
if linux_gateway_file_log.is_some() {
tracing::info!(
path = %config.get_gateway_linux_file_logging_dir_or_default(),
"Linux gateway file logging enabled (gateway_request.log, gateway_operation.log, api_key_auth.log as NDJSON)"
);
}
let gateway_log_batcher: Option<Arc<GatewayLogBatcher>> = if linux_gateway_file_log.is_some() {
None
} else {
logging_client_name
.as_ref()
.and_then(|name| registry.get_pool(name))
.map(|pool| Arc::new(GatewayLogBatcher::spawn(pool, client_stats_batcher.clone())))
};
let logging_task_limit: usize =
logging_task_limit_for_pool_max(runtime_env.logging_pool_max_connections as usize);
let logging_task_limiter: Option<Arc<Semaphore>> = logging_client_name
.as_ref()
.map(|_| Arc::new(Semaphore::new(logging_task_limit)));
let deferred_write_config = crate::deferred_write::DeferredWriteConfig {
enabled: config.get_deferred_writes_enabled(),
batch_window_ms: config.get_deferred_writes_batch_window_ms(),
batch_max_size: config.get_deferred_writes_batch_max_size(),
wal_enabled: config.get_deferred_writes_wal_enabled(),
wal_dir: config.get_deferred_writes_wal_dir(),
skip_cache_invalidation: config.get_deferred_writes_skip_cache_invalidation(),
};
let (write_buffer, wal_manager) = if deferred_write_config.enabled {
let wal: Option<Arc<crate::deferred_write::WalManager>> =
if deferred_write_config.wal_enabled {
match crate::deferred_write::WalManager::new(&deferred_write_config.wal_dir) {
Ok(mgr) => {
tracing::info!(
wal_dir = %deferred_write_config.wal_dir,
"deferred_write: WAL enabled"
);
Some(Arc::new(mgr))
}
Err(e) => {
tracing::warn!(
error = %e,
wal_dir = %deferred_write_config.wal_dir,
"deferred_write: failed to create WAL directory, WAL disabled"
);
None
}
}
} else {
None
};
let buf = Arc::new(crate::deferred_write::WriteBuffer::new(
deferred_write_config.batch_max_size,
));
if let Some(ref wal_mgr) = wal {
crate::deferred_write::recover_from_wal(wal_mgr, &buf).await;
}
(Some(buf), wal)
} else {
(None, None)
};
let inbound_rate_limit_storage = if config.get_gateway_rate_limit_inbound_storage_enabled() {
Some(build_keyed_limiter(
config.get_gateway_rate_limit_inbound_storage_per_second(),
config.get_gateway_rate_limit_inbound_storage_burst(),
))
} else {
None
};
let inbound_rate_limit_schema = if config.get_gateway_rate_limit_inbound_schema_enabled() {
Some(build_keyed_limiter(
config.get_gateway_rate_limit_inbound_schema_per_second(),
config.get_gateway_rate_limit_inbound_schema_burst(),
))
} else {
None
};
let inbound_rate_limit_raw_sql = if config.get_gateway_rate_limit_inbound_raw_sql_enabled() {
Some(build_keyed_limiter(
config.get_gateway_rate_limit_inbound_raw_sql_per_second(),
config.get_gateway_rate_limit_inbound_raw_sql_burst(),
))
} else {
None
};
let inbound_rate_limit_backup_admin =
if config.get_gateway_rate_limit_inbound_backup_admin_enabled() {
Some(build_keyed_limiter(
config.get_gateway_rate_limit_inbound_backup_admin_per_second(),
config.get_gateway_rate_limit_inbound_backup_admin_burst(),
))
} else {
None
};
let outbound_rate_limit_supabase = if config.get_gateway_rate_limit_outbound_supabase_enabled()
{
Some(build_keyed_limiter(
config.get_gateway_rate_limit_outbound_supabase_per_second(),
config.get_gateway_rate_limit_outbound_supabase_burst(),
))
} else {
None
};
let inbound_rate_limit_trust_x_forwarded_for: bool =
config.get_gateway_rate_limit_trust_x_forwarded_for();
let logging_trust_x_forwarded_for: bool = config.get_gateway_logging_trust_x_forwarded_for();
let cors_allow_any_origin = config.get_cors_allow_any_origin();
let cors_allowed_origins = config.get_cors_allowed_origins();
let runtime_debug_config_snapshot =
build_runtime_debug_config_snapshot(config, &runtime_config_metadata);
let pg_registry = Arc::new(registry);
let ws_hub = Arc::new(athena_wss::WsHub::new(512));
let mut excluded_chat_clients = Vec::new();
if let Some(client_name) = logging_client_name.as_ref() {
excluded_chat_clients.push(client_name.clone());
}
if let Some(client_name) = gateway_auth_client_name.as_ref() {
excluded_chat_clients.push(client_name.clone());
}
if let Some(client_name) = gateway_benchmark_client_name.as_ref() {
excluded_chat_clients.push(client_name.clone());
}
let chat_pool_resolver = Arc::new(AppChatPoolResolver::new(
pg_registry.clone(),
excluded_chat_clients,
));
let chat_publisher = Arc::new(WsHubPublisher::new(ws_hub.clone()));
let chat_app: Arc<dyn athena_chat::ChatApp> = Arc::new(athena_chat::SqlxChatApp::new(
chat_pool_resolver,
chat_publisher,
));
let app_state: Data<AppState> = Data::new(AppState {
cache,
immortal_cache,
client,
process_start_time_seconds: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64,
process_started_at: Instant::now(),
runtime_config_path: runtime_config_metadata.config_path.clone(),
runtime_config_source_label: runtime_config_metadata.source_label.clone(),
runtime_config_seeded_default: runtime_config_metadata.seeded_default,
runtime_debug_config_snapshot,
cors_allow_any_origin,
cors_allowed_origins,
pg_registry,
jdbc_pool_cache,
gateway_insert_relation_kind_cache: Arc::new(
Cache::builder()
.max_capacity(20_000)
.time_to_live(Duration::from_secs(60))
.build(),
),
storage_file_url_cache: Arc::new(Cache::builder().max_capacity(20_000).build()),
#[cfg(feature = "deadpool_experimental")]
deadpool_registry: Arc::new(deadpool_registry),
#[cfg(feature = "deadpool_experimental")]
jdbc_deadpool_cache,
gateway_force_camel_case_to_snake_case: config.get_gateway_force_camel_case_to_snake_case(),
gateway_auto_cast_uuid_filter_values_to_text: config
.get_gateway_auto_cast_uuid_filter_values_to_text(),
gateway_allow_schema_names_prefixed_as_table_name: config
.get_gateway_allow_schema_names_prefixed_as_table_name(),
pipeline_registry: pipeline_registry.clone(),
webhook_sink_registry: webhook_sink_registry.clone(),
logging_client_name,
gateway_auth_client_name,
gateway_benchmark_client_name,
gateway_database_backed_client_loading_enabled,
gateway_api_key_fail_mode: config.get_gateway_api_key_fail_mode(),
gateway_jdbc_allow_private_hosts: config.get_gateway_jdbc_allow_private_hosts(),
gateway_jdbc_allowed_hosts: config.get_gateway_jdbc_allowed_hosts(),
gateway_resilience_timeout_secs: config.get_gateway_resilience_timeout_secs(),
gateway_resilience_read_max_retries: config.get_gateway_resilience_read_max_retries(),
gateway_resilience_initial_backoff_ms: config.get_gateway_resilience_initial_backoff_ms(),
gateway_admission_store_backend: config.get_gateway_admission_store_backend(),
gateway_admission_store_fail_mode: config.get_gateway_admission_store_fail_mode(),
prometheus_metrics_enabled: config.get_prometheus_metrics_enabled(),
metrics_state: Arc::new(MetricsState::new()),
gateway_insert_execution_window_ms: config.get_gateway_insert_execution_window_ms(),
gateway_insert_window_max_batch: insert_window_settings.max_batch,
gateway_insert_window_max_queued: insert_window_settings.max_queued,
gateway_insert_merge_deny_tables: insert_window_settings.deny_tables.clone(),
insert_window_coordinator: insert_window_coordinator.clone(),
chat_app,
ws_hub,
auth_resolver: Arc::new(AuthResolver),
storage_facade: Arc::new(StorageFacade),
chat_metrics: Arc::new(ChatMetrics),
client_stats_batcher,
gateway_log_batcher,
linux_gateway_file_log,
logging_task_limiter,
deferred_write_config,
write_buffer: write_buffer.clone(),
wal_manager: wal_manager.clone(),
inbound_rate_limit_storage,
inbound_rate_limit_schema,
inbound_rate_limit_raw_sql,
inbound_rate_limit_backup_admin,
inbound_rate_limit_trust_x_forwarded_for,
logging_trust_x_forwarded_for,
outbound_rate_limit_supabase,
typesense_allow_http: config.get_typesense_allow_http(),
typesense_sync_worker_enabled: config.get_typesense_sync_worker_enabled(),
typesense_sync_worker_poll_ms: config.get_typesense_sync_worker_poll_ms(),
typesense_import_max_attempts: config.get_typesense_import_max_attempts(),
typesense_import_retry_base_ms: config.get_typesense_import_retry_base_ms(),
typesense_sync_saga_backup_enabled: config.get_typesense_sync_saga_backup_enabled(),
backup_worker_enabled: config.get_backup_worker_enabled(),
backup_execution_worker_poll_ms: config.get_backup_execution_worker_poll_ms(),
backup_schedule_worker_poll_ms: config.get_backup_schedule_worker_poll_ms(),
backup_worker_max_attempts: config.get_backup_worker_max_attempts(),
backup_worker_lease_ttl_minutes: config.get_backup_worker_lease_ttl_minutes(),
});
insert_window_coordinator.bind_app_state(app_state.clone());
if let Some(buf) = write_buffer {
let batch_window_ms = app_state.deferred_write_config.batch_window_ms;
crate::deferred_write::spawn_flush_loop(
buf,
wal_manager,
Arc::clone(&app_state.pg_registry),
Arc::clone(&app_state.cache),
batch_window_ms,
);
tracing::info!(batch_window_ms, "deferred_write: flush loop started");
}
Ok(Bootstrap {
app_state,
pipeline_registry,
})
}
#[cfg(test)]
mod tests {
use super::{
enforce_postgres_boot_policy, logging_task_limit_for_pool_max,
summarize_postgres_boot_failures,
};
use crate::config::Config;
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
#[test]
fn logging_task_limit_scales_with_pool_size() {
assert_eq!(logging_task_limit_for_pool_max(0), 1);
assert_eq!(logging_task_limit_for_pool_max(1), 1);
assert_eq!(logging_task_limit_for_pool_max(4), 1);
assert_eq!(logging_task_limit_for_pool_max(8), 2);
assert_eq!(logging_task_limit_for_pool_max(40), 10);
}
#[test]
fn logging_task_limit_is_capped() {
assert_eq!(logging_task_limit_for_pool_max(64), 16);
assert_eq!(logging_task_limit_for_pool_max(400), 16);
}
#[test]
fn summarize_postgres_boot_failures_mentions_missing_clients() {
let summary = summarize_postgres_boot_failures(&[], &[], None);
assert!(summary.contains("no `postgres_clients` entries were configured"));
}
#[test]
fn enforce_postgres_boot_policy_errors_when_empty_by_default() {
let _guard =
crate::test_support::TestEnvGuard::unset("ATHENA_ALLOW_START_WITHOUT_POSTGRES");
let config = Config::empty_fallback();
let registry = PostgresClientRegistry::empty();
let failed_connections = vec![(
"athena_logging".to_string(),
anyhow::anyhow!("environment variable `POSTGRES_ATHENA_LOGGING_URI` is not set"),
)];
let err = enforce_postgres_boot_policy(&config, ®istry, &[], &failed_connections, None)
.expect_err("expected empty Postgres registry to fail startup");
let message = err.to_string();
assert!(message.contains("No Postgres clients connected; refusing to start"));
assert!(message.contains("POSTGRES_ATHENA_LOGGING_URI"));
}
#[test]
fn enforce_postgres_boot_policy_allows_explicit_opt_in() {
let _guard =
crate::test_support::TestEnvGuard::set("ATHENA_ALLOW_START_WITHOUT_POSTGRES", "true");
let config = Config::empty_fallback();
let registry = PostgresClientRegistry::empty();
let targets = vec![ClientConnectionTarget {
client_name: "athena_logging".to_string(),
source: "config".to_string(),
description: None,
pg_uri: None,
pg_uri_env_var: Some("POSTGRES_ATHENA_LOGGING_URI".to_string()),
config_uri_template: Some("${POSTGRES_ATHENA_LOGGING_URI}".to_string()),
is_active: true,
is_frozen: false,
}];
enforce_postgres_boot_policy(&config, ®istry, &targets, &[], None)
.expect("explicit env opt-in should allow empty startup");
}
}