use athena_pipelines::PipelineDefinition;
use deadpool_postgres::Pool;
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use sqlx::postgres::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{OnceCell, Semaphore};
use crate::api::chat::auth::AuthResolver;
use crate::api::chat::runtime::{ChatMetrics, StorageFacade};
use crate::api::metrics::MetricsState;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use crate::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use crate::deferred_write::{DeferredWriteConfig, WalManager, WriteBuffer};
use crate::utils::client_stats_batcher::ClientStatsBatcher;
use crate::utils::linux_gateway_file_log::LinuxGatewayFileLog;
use crate::utils::request_logging::GatewayLogBatcher;
pub mod api;
pub mod athena;
pub mod bootstrap;
pub mod cli;
pub mod cli_sql;
pub mod client;
pub mod config;
pub mod config_validation;
pub mod daemon;
pub mod data;
pub mod drivers;
pub mod error;
pub mod parser;
pub mod provisioning;
pub mod runtime;
pub mod webhooks;
pub mod workers;
pub mod wss;
pub mod features;
pub mod deferred_write;
#[cfg(feature = "cdc")]
pub mod cdc;
pub struct AppState {
pub cache: Arc<Cache<String, Value>>,
pub immortal_cache: Arc<Cache<String, Value>>,
pub client: Client,
pub process_start_time_seconds: i64,
pub process_started_at: Instant,
pub runtime_config_path: Option<String>,
pub runtime_config_source_label: Option<String>,
pub runtime_config_seeded_default: bool,
pub runtime_debug_config_snapshot: Value,
pub cors_allow_any_origin: bool,
pub cors_allowed_origins: Vec<String>,
pub pg_registry: Arc<PostgresClientRegistry>,
pub jdbc_pool_cache: Arc<Cache<String, PgPool>>,
pub gateway_insert_relation_kind_cache: Arc<Cache<String, Option<String>>>,
pub storage_file_url_cache: Arc<Cache<String, athena_s3::PresignedUrlCacheRecord>>,
#[cfg(feature = "deadpool_experimental")]
pub deadpool_registry: Arc<DeadpoolPostgresRegistry>,
#[cfg(feature = "deadpool_experimental")]
pub jdbc_deadpool_cache: Arc<Cache<String, Arc<OnceCell<Pool>>>>,
pub gateway_force_camel_case_to_snake_case: bool,
pub gateway_auto_cast_uuid_filter_values_to_text: bool,
pub gateway_allow_schema_names_prefixed_as_table_name: bool,
pub pipeline_registry: Option<Arc<HashMap<String, PipelineDefinition>>>,
pub webhook_sink_registry:
Option<Arc<HashMap<String, athena_webhooks::InboundWebhookSinkDefinition>>>,
pub logging_client_name: Option<String>,
pub gateway_auth_client_name: Option<String>,
pub gateway_benchmark_client_name: Option<String>,
pub gateway_database_backed_client_loading_enabled: bool,
pub gateway_api_key_fail_mode: String,
pub gateway_jdbc_allow_private_hosts: bool,
pub gateway_jdbc_allowed_hosts: Vec<String>,
pub gateway_resilience_timeout_secs: u64,
pub gateway_resilience_read_max_retries: u32,
pub gateway_resilience_initial_backoff_ms: u64,
pub gateway_admission_store_backend: String,
pub gateway_admission_store_fail_mode: String,
pub prometheus_metrics_enabled: bool,
pub metrics_state: Arc<MetricsState>,
pub gateway_insert_execution_window_ms: u64,
pub gateway_insert_window_max_batch: usize,
pub gateway_insert_window_max_queued: usize,
pub gateway_insert_merge_deny_tables: HashSet<String>,
pub insert_window_coordinator: Arc<InsertWindowCoordinator>,
pub chat_app: Arc<dyn athena_chat::ChatApp>,
pub ws_hub: Arc<athena_wss::WsHub>,
pub auth_resolver: Arc<AuthResolver>,
pub storage_facade: Arc<StorageFacade>,
pub chat_metrics: Arc<ChatMetrics>,
pub client_stats_batcher: Option<Arc<ClientStatsBatcher>>,
pub gateway_log_batcher: Option<Arc<GatewayLogBatcher>>,
pub linux_gateway_file_log: Option<Arc<LinuxGatewayFileLog>>,
pub logging_task_limiter: Option<Arc<Semaphore>>,
pub deferred_write_config: DeferredWriteConfig,
pub write_buffer: Option<Arc<WriteBuffer>>,
pub wal_manager: Option<Arc<WalManager>>,
pub inbound_rate_limit_storage: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
pub inbound_rate_limit_schema: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
pub inbound_rate_limit_raw_sql: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
pub inbound_rate_limit_backup_admin:
Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
pub inbound_rate_limit_trust_x_forwarded_for: bool,
pub logging_trust_x_forwarded_for: bool,
pub outbound_rate_limit_supabase: Option<Arc<crate::api::rate_limit::KeyedStringRateLimiter>>,
pub typesense_allow_http: bool,
pub typesense_sync_worker_enabled: bool,
pub typesense_sync_worker_poll_ms: u64,
pub typesense_import_max_attempts: usize,
pub typesense_import_retry_base_ms: u64,
pub typesense_sync_saga_backup_enabled: bool,
pub backup_worker_enabled: bool,
pub backup_execution_worker_poll_ms: u64,
pub backup_schedule_worker_poll_ms: u64,
pub backup_worker_max_attempts: i32,
pub backup_worker_lease_ttl_minutes: i32,
}
static BACKUP_WORKERS_FORCED_DISABLED: AtomicBool = AtomicBool::new(false);
pub fn backup_workers_effectively_enabled(state: &AppState) -> bool {
state.backup_worker_enabled && !BACKUP_WORKERS_FORCED_DISABLED.load(Ordering::Relaxed)
}
pub fn disable_backup_workers_for_process() {
BACKUP_WORKERS_FORCED_DISABLED.store(true, Ordering::Relaxed);
}
impl Default for AppState {
fn default() -> Self {
let insert_window_coordinator: Arc<InsertWindowCoordinator> =
InsertWindowCoordinator::new(InsertWindowSettings {
max_batch: 100,
max_queued: 10_000,
deny_tables: HashSet::new(),
});
Self {
cache: Arc::new(Cache::builder().support_invalidation_closures().build()),
immortal_cache: Arc::new(Cache::builder().build()),
client: Client::new(),
process_start_time_seconds: 0,
process_started_at: Instant::now(),
runtime_config_path: None,
runtime_config_source_label: None,
runtime_config_seeded_default: false,
runtime_debug_config_snapshot: Value::Object(Default::default()),
cors_allow_any_origin: false,
cors_allowed_origins: Vec::new(),
pg_registry: Arc::new(PostgresClientRegistry::empty()),
jdbc_pool_cache: Arc::new(Cache::builder().max_capacity(64).build()),
gateway_insert_relation_kind_cache: Arc::new(
Cache::builder()
.max_capacity(20_000)
.time_to_live(Duration::from_secs(60))
.build(),
),
storage_file_url_cache: Arc::new(Cache::builder().max_capacity(20_000).build()),
#[cfg(feature = "deadpool_experimental")]
deadpool_registry: Arc::new(DeadpoolPostgresRegistry::empty()),
#[cfg(feature = "deadpool_experimental")]
jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
gateway_force_camel_case_to_snake_case: false,
gateway_auto_cast_uuid_filter_values_to_text: true,
gateway_allow_schema_names_prefixed_as_table_name: true,
pipeline_registry: None,
webhook_sink_registry: None,
logging_client_name: None,
gateway_auth_client_name: None,
gateway_benchmark_client_name: None,
gateway_database_backed_client_loading_enabled: true,
gateway_api_key_fail_mode: "fail_closed".to_string(),
gateway_jdbc_allow_private_hosts: false,
gateway_jdbc_allowed_hosts: Vec::new(),
gateway_resilience_timeout_secs: 30,
gateway_resilience_read_max_retries: 1,
gateway_resilience_initial_backoff_ms: 100,
gateway_admission_store_backend: "redis".to_string(),
gateway_admission_store_fail_mode: "fail_closed".to_string(),
prometheus_metrics_enabled: false,
metrics_state: Arc::new(MetricsState::new()),
gateway_insert_execution_window_ms: 0,
gateway_insert_window_max_batch: 100,
gateway_insert_window_max_queued: 10_000,
gateway_insert_merge_deny_tables: HashSet::new(),
insert_window_coordinator,
chat_app: Arc::new(athena_chat::NoopChatApp),
ws_hub: Arc::new(athena_wss::WsHub::new(256)),
auth_resolver: Arc::new(AuthResolver),
storage_facade: Arc::new(StorageFacade),
chat_metrics: Arc::new(ChatMetrics),
client_stats_batcher: None,
gateway_log_batcher: None,
linux_gateway_file_log: None,
logging_task_limiter: None,
deferred_write_config: DeferredWriteConfig::default(),
write_buffer: None,
wal_manager: None,
inbound_rate_limit_storage: None,
inbound_rate_limit_schema: None,
inbound_rate_limit_raw_sql: None,
inbound_rate_limit_backup_admin: None,
inbound_rate_limit_trust_x_forwarded_for: false,
logging_trust_x_forwarded_for: true,
outbound_rate_limit_supabase: None,
typesense_allow_http: false,
typesense_sync_worker_enabled: true,
typesense_sync_worker_poll_ms: 30_000,
typesense_import_max_attempts: 3,
typesense_import_retry_base_ms: 400,
typesense_sync_saga_backup_enabled: true,
backup_worker_enabled: true,
backup_execution_worker_poll_ms: 1_500,
backup_schedule_worker_poll_ms: 30_000,
backup_worker_max_attempts: 3,
backup_worker_lease_ttl_minutes: 15,
}
}
}
pub struct ImmortalCache {
pub cache: Arc<Cache<String, Value>>,
}
pub mod utils;
#[doc(hidden)]
pub mod test_support;
pub use client::AthenaClient;
pub use client::backend::BackendType;
pub use client::backend::QueryResult;
pub use client::builder::AthenaClientBuilder;
pub use client::storage::*;
pub use client::{Condition, ConditionOperator, OrderDirection};
pub use client::{
Gateway, GatewayDeleteRequest, GatewayDriverRequest, GatewayFetchRequest, GatewayInsertRequest,
GatewayOperation, GatewayPath, GatewayQueryResult, GatewayRequest, GatewayRequestFactory,
GatewayRequestPayload, GatewayRoutes, GatewayRpcFilter, GatewayRpcFilterOperator,
GatewayRpcRequest, GatewaySqlRequest, GatewayUpdateRequest, RpcBuilder, build_gateway_endpoint,
request,
};
pub const GATEWAY_FETCH_PATH: &str = Gateway::FETCH_PATH;
pub const GATEWAY_INSERT_PATH: &str = Gateway::INSERT_PATH;
pub const GATEWAY_UPDATE_PATH: &str = Gateway::UPDATE_PATH;
pub const GATEWAY_DELETE_PATH: &str = Gateway::DELETE_PATH;
pub const GATEWAY_QUERY_PATH: &str = Gateway::QUERY_PATH;
pub const GATEWAY_SQL_PATH: &str = Gateway::SQL_PATH;
pub const GATEWAY_RPC_PATH: &str = Gateway::RPC_PATH;
pub const LEGACY_SQL_PATH: &str = Gateway::LEGACY_SQL_PATH;