#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use actix_cors::Cors;
use actix_web::HttpMessage;
use actix_web::body::MessageBody;
use actix_web::dev::ServiceResponse;
use actix_web::http::StatusCode;
use actix_web::http::header;
use actix_web::middleware::{ErrorHandlerResponse, ErrorHandlers};
use actix_web::{App, HttpServer, web};
use sentry::types::Dsn as SentryDsn;
use actix_web::web::Data;
use clap::Parser;
use dotenv::dotenv;
use sentry::ClientInitGuard;
use serde_json::json;
use std::collections::HashMap;
use std::env;
use std::io::Error;
use std::io::ErrorKind::{AddrInUse, Other};
use std::io::IsTerminal;
use std::io::Result as IoResult;
use std::net::{SocketAddr, TcpListener};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread::available_parallelism;
use std::time::{Duration, Instant};
use tokio::sync::OnceCell;
use tracing::info;
use tracing::warn;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt::time::ChronoLocal;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uuid::Uuid;
use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type};
use athena_rs::AppState;
use athena_rs::api::gateway::deferred::{
gateway_deferred_requeue, gateway_deferred_status, spawn_deferred_query_worker,
};
use athena_rs::api::gateway::delete::delete_data;
use athena_rs::api::gateway::fetch::{
fetch_data_route, gateway_update_route, get_data_route, proxy_fetch_data_route,
};
use athena_rs::api::gateway::insert::insert_data;
use athena_rs::api::gateway::postgrest::{
postgrest_delete_route, postgrest_get_route, postgrest_patch_route, postgrest_post_route,
};
use athena_rs::api::gateway::query::gateway_query_route;
use athena_rs::api::gateway::rpc::{gateway_rpc_route, rpc_get_route, rpc_post_route};
use athena_rs::api::headers::request_context::ResolvedAthenaClient;
use athena_rs::api::health::{cluster_health, ping, root};
use athena_rs::api::host_routing::{DEFAULT_WILDCARD_TARGET_BASE_URL, infer_wildcard_host_prefix};
use athena_rs::api::metrics::prometheus_metrics;
use athena_rs::api::pipelines::{list_pipeline_templates, run_pipeline, simulate_pipeline};
use athena_rs::api::public_routes::public_route_dispatch;
use athena_rs::api::query::count::sql_count_query;
use athena_rs::api::query::sql::{gateway_sql_query, sql_query};
use athena_rs::api::registry::{api_registry, api_registry_by_id};
use athena_rs::api::response::{internal_error, not_found, too_many_requests};
use athena_rs::api::supabase::ssl_enforcement;
use athena_rs::api::{
admin, athena_docs, athena_openapi_host, athena_router_registry, athena_wss_openapi_host,
backup, management, provision, schema, storage,
};
use athena_rs::bootstrap::{Bootstrap, build_shared_state};
use athena_rs::cdc::websocket::websocket_server;
use athena_rs::cli::{self, AthenaCli, Command};
use athena_rs::config::{Config, ConfigLocation, DEFAULT_CONFIG_FILE_NAME};
use athena_rs::daemon::{
spawn_connection_monitor, spawn_registry_reconnect_worker, spawn_vacuum_health_collector,
};
use athena_rs::data::public_routes::get_public_gateway_route_by_key;
use athena_rs::parser::{parse_secs_or_default, parse_usize};
use athena_rs::utils::pg_tools::ensure_pg_tools;
use athena_rs::utils::redis_client::{
GLOBAL_REDIS, initialize_global_redis_from_env, note_redis_failure_and_start_cooldown,
note_redis_success, should_bypass_redis_temporarily,
};
use athena_rs::utils::request_logging::{RouteRequestLogContext, log_route_request_event};
use athena_rs::wss::gateway_wss_info;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AdmissionStoreBackend {
Memory,
Redis,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AdmissionStoreFailMode {
FailClosed,
FailOpen,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AdmissionCheckOutcome {
Allowed,
Limited,
StoreUnavailable,
}
#[derive(Debug, Clone)]
struct AdmissionLimiter {
enabled: bool,
window: Duration,
global_limit_per_window: u64,
per_client_limit_per_window: u64,
defer_on_limit_enabled: bool,
defer_route_prefixes: Vec<String>,
store_backend: AdmissionStoreBackend,
store_fail_mode: AdmissionStoreFailMode,
state: Arc<Mutex<AdmissionLimiterWindow>>,
}
#[derive(Debug)]
struct AdmissionLimiterWindow {
started_at: Instant,
global_count: u64,
by_client: HashMap<String, u64>,
}
impl AdmissionLimiter {
fn new(
enabled: bool,
window: Duration,
global_limit_per_window: u64,
per_client_limit_per_window: u64,
defer_on_limit_enabled: bool,
defer_route_prefixes: Vec<String>,
store_backend: AdmissionStoreBackend,
store_fail_mode: AdmissionStoreFailMode,
) -> Self {
Self {
enabled,
window,
global_limit_per_window,
per_client_limit_per_window,
defer_on_limit_enabled,
defer_route_prefixes,
store_backend,
store_fail_mode,
state: Arc::new(Mutex::new(AdmissionLimiterWindow {
started_at: Instant::now(),
global_count: 0,
by_client: HashMap::new(),
})),
}
}
fn is_enabled(&self) -> bool {
self.enabled
}
fn window_secs(&self) -> u64 {
self.window.as_secs().max(1)
}
fn should_defer_route(&self, route: &str) -> bool {
if !self.defer_on_limit_enabled {
return false;
}
let normalized_route = route.trim();
if normalized_route.is_empty() {
return false;
}
self.defer_route_prefixes
.iter()
.map(|prefix| prefix.trim())
.filter(|prefix| !prefix.is_empty())
.any(|prefix| normalized_route.starts_with(prefix))
}
fn fail_open_on_store_unavailable(&self) -> bool {
self.store_fail_mode == AdmissionStoreFailMode::FailOpen
}
fn check_allow(&self, client: Option<&str>) -> AdmissionCheckOutcome {
if !self.enabled {
return AdmissionCheckOutcome::Allowed;
}
match self.store_backend {
AdmissionStoreBackend::Memory => {
if self.memory_should_allow(client) {
AdmissionCheckOutcome::Allowed
} else {
AdmissionCheckOutcome::Limited
}
}
AdmissionStoreBackend::Redis => self.redis_should_allow(client),
}
}
fn memory_should_allow(&self, client: Option<&str>) -> bool {
if !self.enabled {
return true;
}
let Ok(mut guard) = self.state.lock() else {
return true;
};
if guard.started_at.elapsed() >= self.window {
guard.started_at = Instant::now();
guard.global_count = 0;
guard.by_client.clear();
}
if self.global_limit_per_window > 0 && guard.global_count >= self.global_limit_per_window {
return false;
}
if self.per_client_limit_per_window > 0 {
let client_key: String = client
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown")
.to_string();
let current_client_count: u64 = guard.by_client.get(&client_key).copied().unwrap_or(0);
if current_client_count >= self.per_client_limit_per_window {
return false;
}
guard
.by_client
.insert(client_key, current_client_count.saturating_add(1));
}
guard.global_count = guard.global_count.saturating_add(1);
true
}
fn redis_should_allow(&self, client: Option<&str>) -> AdmissionCheckOutcome {
let Some(redis) = GLOBAL_REDIS.get() else {
return AdmissionCheckOutcome::StoreUnavailable;
};
if should_bypass_redis_temporarily() {
return AdmissionCheckOutcome::StoreUnavailable;
}
let client_key = client
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown")
.to_ascii_lowercase();
let window_secs = self.window_secs();
let bucket = chrono::Utc::now()
.timestamp()
.div_euclid(window_secs as i64);
let global_key = format!("athena:admission:global:{bucket}");
let client_key = format!("athena:admission:client:{client_key}:{bucket}");
let global_count = match redis.increment_counter_with_ttl_blocking(&global_key, window_secs)
{
Ok(value) => {
note_redis_success();
value
}
Err(_) => {
note_redis_failure_and_start_cooldown();
return AdmissionCheckOutcome::StoreUnavailable;
}
};
if self.global_limit_per_window > 0 && global_count > self.global_limit_per_window {
return AdmissionCheckOutcome::Limited;
}
let per_client_count =
match redis.increment_counter_with_ttl_blocking(&client_key, window_secs) {
Ok(value) => {
note_redis_success();
value
}
Err(_) => {
note_redis_failure_and_start_cooldown();
return AdmissionCheckOutcome::StoreUnavailable;
}
};
if self.per_client_limit_per_window > 0
&& per_client_count > self.per_client_limit_per_window
{
return AdmissionCheckOutcome::Limited;
}
AdmissionCheckOutcome::Allowed
}
}
fn x_athena_defer_requested(req: &actix_web::dev::ServiceRequest) -> bool {
req.headers()
.get("X-Athena-Defer")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.map(|value| matches!(value, "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
const ADMISSION_EVENT_TABLE_INIT_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS public.gateway_admission_events (
id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at timestamptz NOT NULL DEFAULT now(),
event_id text NOT NULL,
method text NOT NULL,
route text NOT NULL,
client_name text,
request_bytes bigint,
defer_requested boolean NOT NULL DEFAULT false,
is_query_route boolean NOT NULL DEFAULT false,
decision text NOT NULL,
reason text,
deferred_request_id text,
retry_after_seconds integer,
window_seconds integer,
global_limit_per_window bigint,
per_client_limit_per_window bigint,
meta jsonb NOT NULL DEFAULT '{}'::jsonb
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_gateway_admission_events_event_id
ON public.gateway_admission_events(event_id);
CREATE INDEX IF NOT EXISTS idx_gateway_admission_events_created_at
ON public.gateway_admission_events(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_admission_events_decision
ON public.gateway_admission_events(decision);
CREATE INDEX IF NOT EXISTS idx_gateway_admission_events_client_name
ON public.gateway_admission_events(client_name);
"#;
static ADMISSION_EVENT_TABLE_INIT: OnceCell<()> = OnceCell::const_new();
async fn ensure_admission_event_table(state: &AppState) -> Result<(), String> {
ADMISSION_EVENT_TABLE_INIT
.get_or_try_init(|| async {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Err("logging client is not configured".to_string());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Err(format!(
"logging client '{}' is not connected",
logging_client_name
));
};
sqlx::query(ADMISSION_EVENT_TABLE_INIT_SQL)
.execute(&pool)
.await
.map_err(|err| format!("failed to initialize admission event table: {err}"))?;
Ok(())
})
.await
.map(|_| ())
}
fn as_i64_saturating(value: u64) -> i64 {
i64::try_from(value).unwrap_or(i64::MAX)
}
async fn persist_admission_event(
state: &AppState,
limiter: &AdmissionLimiter,
method: &str,
route: &str,
client_name: Option<&str>,
request_bytes: Option<u64>,
defer_requested: bool,
is_query_route: bool,
decision: &str,
reason: &str,
deferred_request_id: Option<&str>,
retry_after_seconds: u64,
) -> Result<(), String> {
ensure_admission_event_table(state).await?;
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
let event_id: String = Uuid::new_v4().to_string();
let request_bytes_i64: Option<i64> = request_bytes.map(as_i64_saturating);
let retry_after_i32: i32 = retry_after_seconds.min(i32::MAX as u64) as i32;
let window_i32: i32 = limiter.window_secs().min(i32::MAX as u64) as i32;
let global_limit_i64: i64 = as_i64_saturating(limiter.global_limit_per_window);
let per_client_limit_i64: i64 = as_i64_saturating(limiter.per_client_limit_per_window);
sqlx::query(
r#"
INSERT INTO public.gateway_admission_events (
event_id,
method,
route,
client_name,
request_bytes,
defer_requested,
is_query_route,
decision,
reason,
deferred_request_id,
retry_after_seconds,
window_seconds,
global_limit_per_window,
per_client_limit_per_window,
meta
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
"#,
)
.bind(event_id)
.bind(method)
.bind(route)
.bind(client_name)
.bind(request_bytes_i64)
.bind(defer_requested)
.bind(is_query_route)
.bind(decision)
.bind(reason)
.bind(deferred_request_id)
.bind(retry_after_i32)
.bind(window_i32)
.bind(global_limit_i64)
.bind(per_client_limit_i64)
.bind(json!({
"defer_on_limit_enabled": limiter.defer_on_limit_enabled,
"defer_route_prefixes": limiter.defer_route_prefixes,
}))
.execute(&pool)
.await
.map_err(|err| format!("failed to persist admission event: {err}"))?;
Ok(())
}
fn spawn_admission_event_persist(
app_state: Option<Data<AppState>>,
limiter: AdmissionLimiter,
method: String,
route: String,
client_name: Option<String>,
request_bytes: Option<u64>,
defer_requested: bool,
is_query_route: bool,
decision: &'static str,
reason: &'static str,
deferred_request_id: Option<String>,
retry_after_seconds: u64,
) {
let Some(state) = app_state else {
return;
};
tokio::spawn(async move {
if let Err(err) = persist_admission_event(
state.get_ref(),
&limiter,
&method,
&route,
client_name.as_deref(),
request_bytes,
defer_requested,
is_query_route,
decision,
reason,
deferred_request_id.as_deref(),
retry_after_seconds,
)
.await
{
warn!(
error = %err,
method = %method,
route = %route,
decision = %decision,
"Failed to persist admission event"
);
}
});
}
#[derive(Debug, Clone)]
struct WildcardHostResolution {
prefix: Option<String>,
resolved_client: Option<String>,
outcome: &'static str,
error_message: Option<String>,
}
async fn resolve_wildcard_host_client(
req: &mut actix_web::dev::ServiceRequest,
app_state: Option<&Data<AppState>>,
) -> WildcardHostResolution {
let Some(prefix) = infer_wildcard_host_prefix(req.headers()) else {
return WildcardHostResolution {
prefix: None,
resolved_client: None,
outcome: "not_wildcard_host",
error_message: None,
};
};
let Some(state) = app_state.map(Data::get_ref) else {
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "missing_app_state",
error_message: None,
};
};
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "missing_logging_client",
error_message: None,
};
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "missing_logging_pool",
error_message: None,
};
};
let route_lookup = get_public_gateway_route_by_key(&pool, &prefix).await;
let route = match route_lookup {
Ok(Some(route)) => route,
Ok(None) => {
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "route_not_found",
error_message: None,
};
}
Err(err) => {
warn!(
prefix = %prefix,
error = %err,
"Wildcard host client lookup failed"
);
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "lookup_failed",
error_message: Some(err.to_string()),
};
}
};
if !route.is_active {
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "route_inactive",
error_message: None,
};
}
let resolved_client = route.client_name.trim().to_string();
if resolved_client.is_empty() {
return WildcardHostResolution {
prefix: Some(prefix),
resolved_client: None,
outcome: "empty_client_name",
error_message: None,
};
}
req.extensions_mut()
.insert(ResolvedAthenaClient(resolved_client.clone()));
WildcardHostResolution {
prefix: Some(prefix),
resolved_client: Some(resolved_client),
outcome: "resolved",
error_message: None,
}
}
async fn admission_and_wildcard_middleware<B>(
mut req: actix_web::dev::ServiceRequest,
next: actix_web::middleware::Next<B>,
limiter: AdmissionLimiter,
) -> Result<actix_web::dev::ServiceResponse<actix_web::body::BoxBody>, actix_web::Error>
where
B: MessageBody + 'static,
{
let metrics_state: Option<Data<AppState>> = req.app_data::<web::Data<AppState>>().cloned();
let method: String = req.method().as_str().to_string();
let route: String = req
.match_pattern()
.map(|value| value.to_string())
.unwrap_or_else(|| req.path().to_string());
let request_bytes: Option<u64> = req
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let host = req
.headers()
.get(header::HOST)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let athena_defer = x_athena_defer_requested(&req);
let started: Instant = Instant::now();
let wildcard_resolution = resolve_wildcard_host_client(&mut req, metrics_state.as_ref()).await;
if let Some(resolved_client) = wildcard_resolution.resolved_client.as_deref() {
info!(
host_pattern = %athena_rs::api::host_routing::wildcard_host_pattern(),
wildcard_target = %DEFAULT_WILDCARD_TARGET_BASE_URL,
resolved_client = %resolved_client,
"Resolved X-Athena-Client from wildcard host route"
);
}
if wildcard_resolution.prefix.is_some() {
log_route_request_event(
metrics_state.as_ref().map(Data::get_ref),
RouteRequestLogContext {
source: Some("wildcard_host".to_string()),
outcome: Some(wildcard_resolution.outcome.to_string()),
method: Some(method.clone()),
path: Some(route.clone()),
host,
wildcard_host_pattern: Some(athena_rs::api::host_routing::wildcard_host_pattern()),
wildcard_prefix: wildcard_resolution.prefix.clone(),
route_key: wildcard_resolution.prefix.clone(),
resolved_client: wildcard_resolution.resolved_client.clone(),
error_message: wildcard_resolution.error_message.clone(),
metadata: Some(json!({
"wildcard_target": DEFAULT_WILDCARD_TARGET_BASE_URL
})),
..Default::default()
},
);
}
let athena_client: Option<String> = req
.extensions()
.get::<ResolvedAthenaClient>()
.map(|value| value.0.clone())
.or_else(|| {
req.headers()
.get("X-Athena-Client")
.and_then(|value| value.to_str().ok())
.map(str::to_string)
});
if let Some(state) = &metrics_state {
state.metrics_state.begin_http_request(&method, &route);
}
let is_query_route: bool = route.starts_with("/gateway/query");
let limiter_for_check = limiter.clone();
let check = limiter_for_check.check_allow(athena_client.as_deref());
let mut allow_request = matches!(check, AdmissionCheckOutcome::Allowed);
let store_unavailable = matches!(check, AdmissionCheckOutcome::StoreUnavailable);
if store_unavailable {
allow_request = limiter_for_check.fail_open_on_store_unavailable();
}
if is_query_route && athena_defer {
allow_request = true;
}
if !allow_request {
let should_defer: bool = limiter.should_defer_route(&route);
let mut status_code: u16 = 429;
let decision: &'static str = "rejected";
let mut reason: &'static str = "admission_limit_exceeded";
let deferred_request_id_for_log: Option<String> = None;
let mut resp: actix_web::HttpResponse = if store_unavailable {
status_code = 503;
reason = "admission_store_unavailable";
actix_web::HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"code": "admission_store_unavailable",
"message": "Admission limiter unavailable",
"error": "Admission store backend is unavailable. Retry shortly."
}))
} else if is_query_route && should_defer {
reason = "gateway_query_requires_explicit_defer_header";
too_many_requests(
"Too Many Requests",
"Gateway query overload requires explicit deferred mode. Retry with header X-Athena-Defer: true to queue the query.",
)
} else {
too_many_requests(
"Too Many Requests",
"Request rate exceeded gateway admission limit",
)
};
let retry_after_seconds: String = limiter.window_secs().to_string();
let retry_after_seconds_u64: u64 = limiter.window_secs();
if let Ok(retry_after_value) = retry_after_seconds.parse() {
resp.headers_mut()
.insert(header::RETRY_AFTER, retry_after_value);
}
resp.headers_mut()
.insert(header::SERVER, "XYLEX/0".parse().unwrap());
spawn_admission_event_persist(
metrics_state.clone(),
limiter.clone(),
method.clone(),
route.clone(),
athena_client.clone(),
request_bytes,
athena_defer,
is_query_route,
decision,
reason,
deferred_request_id_for_log,
retry_after_seconds_u64,
);
if let Some(state) = metrics_state {
state.metrics_state.finish_http_request(
&method,
&route,
status_code,
started.elapsed().as_secs_f64(),
request_bytes,
None,
athena_client.as_deref(),
);
}
let response = req.into_response(resp).map_into_boxed_body();
return Ok(response);
}
match next.call(req).await {
Ok(mut res) => {
let response_bytes: Option<u64> = res
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
res.headers_mut()
.insert(header::SERVER, "XYLEX/0".parse().unwrap());
let res = res.map_into_boxed_body();
if let Some(state) = metrics_state {
state.metrics_state.finish_http_request(
&method,
&route,
res.status().as_u16(),
started.elapsed().as_secs_f64(),
request_bytes,
response_bytes,
athena_client.as_deref(),
);
}
Ok(res)
}
Err(err) => {
if let Some(state) = metrics_state {
state.metrics_state.record_http_handler_error(
&method,
&route,
started.elapsed().as_secs_f64(),
request_bytes,
athena_client.as_deref(),
);
}
Err(err)
}
}
}
#[actix_web::main]
async fn main() -> IoResult<()> {
dotenv().ok();
initialize_global_redis_from_env();
let sentry_guard: Option<ClientInitGuard> = init_sentry();
init_tracing(sentry_guard.is_some());
let _sentry_guard: Option<ClientInitGuard> = sentry_guard;
let cli: AthenaCli = AthenaCli::parse();
let config_path: Option<PathBuf> = cli.config_path.clone();
let pipelines_path: PathBuf = cli.pipelines_path.clone();
let port_override: Option<u16> = cli.port;
let command: Option<Command> = cli.command;
let api_only: bool = cli.api_only;
#[cfg(feature = "cdc")]
let cdc_only: bool = cli.cdc_only;
let config_overridden: bool = config_path.is_some();
let config_path: PathBuf = config_path
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_CONFIG_FILE_NAME));
let pipelines_path: String = pipelines_path.to_string_lossy().to_string();
let config: Config = if config_overridden {
Config::load_from(&config_path).map_err(|err| {
let attempted_locations: Vec<ConfigLocation> = vec![ConfigLocation::new(
"explicit config path".to_string(),
config_path.clone(),
)];
Error::new(
Other,
format!(
"Failed to load config '{}': {}. Looked in:\n{}",
config_path.display(),
err,
format_attempted_locations(&attempted_locations)
),
)
})?
} else {
match Config::load_default() {
Ok(outcome) => {
if outcome.seeded_default {
info!("Seeded default configuration at {}", outcome.path.display());
}
outcome.config
}
Err(err) => {
let attempts: Vec<ConfigLocation> = err.attempted_locations.clone();
return Err(Error::new(
Other,
format!(
"Failed to load config '{}': {}. Looked in:\n{}",
config_path.display(),
err,
format_attempted_locations(&attempts)
),
));
}
}
};
match command {
Some(Command::Server) => {
let bootstrap: Bootstrap = build_shared_state(&config, &pipelines_path)
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
run_server(&config, bootstrap, port_override).await
}
Some(Command::Pipeline(args)) => {
let bootstrap: Bootstrap = build_shared_state(&config, &pipelines_path)
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
cli::run_pipeline_command(&bootstrap, args)
.await
.map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Clients { command }) => {
cli::run_clients_command(command).map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Fetch(cmd)) => cli::run_fetch_command(cmd)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::Provision(args)) => cli::run_provision_command(args)
.await
.map_err(|err| Error::new(Other, err.to_string())),
#[cfg(feature = "cdc")]
Some(Command::Cdc { command }) => cli::run_cdc_command(&config, command)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::Diag) => {
cli::run_diag_command().map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Version) => {
cli::run_version_command();
Ok(())
}
None => {
#[cfg(feature = "cdc")]
{
if cdc_only {
let port: u16 = port_override.unwrap_or(4053);
websocket_server(port)
.await
.map_err(|err| Error::new(Other, err.to_string()))
} else if api_only {
let bootstrap: Bootstrap = build_shared_state(&config, &pipelines_path)
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
run_server(&config, bootstrap, port_override).await
} else {
Err(Error::new(
Other,
"No command provided; pass --api-only to boot the API, --cdc-only for the CDC WebSocket server, or specify a subcommand.",
))
}
}
#[cfg(not(feature = "cdc"))]
{
if api_only {
let bootstrap: Bootstrap = build_shared_state(&config, &pipelines_path)
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
run_server(&config, bootstrap, port_override).await
} else {
Err(Error::new(
Other,
"No command provided; pass --api-only to boot the API or specify a subcommand.",
))
}
}
}
}
}
fn default_error_handlers<B>() -> ErrorHandlers<B>
where
B: MessageBody + 'static,
{
ErrorHandlers::new()
.handler(StatusCode::NOT_FOUND, |res: ServiceResponse<B>| {
let has_json_body: bool = res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|ct| ct.contains("application/json"));
if has_json_body {
Ok(ErrorHandlerResponse::Response(
res.map_into_boxed_body().map_into_right_body(),
))
} else {
let (req, _) = res.into_parts();
let resp: actix_web::HttpResponse =
not_found("Not Found", "The requested resource was not found");
Ok(ErrorHandlerResponse::Response(
ServiceResponse::new(req, resp)
.map_into_boxed_body()
.map_into_right_body(),
))
}
})
.default_handler_server(|res: ServiceResponse<B>| {
let has_json_body: bool = res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|ct| ct.contains("application/json"));
if has_json_body {
Ok(ErrorHandlerResponse::Response(
res.map_into_boxed_body().map_into_right_body(),
))
} else {
let (req, _) = res.into_parts();
let resp: actix_web::HttpResponse =
internal_error("Internal Server Error", "An unexpected error occurred");
Ok(ErrorHandlerResponse::Response(
ServiceResponse::new(req, resp)
.map_into_boxed_body()
.map_into_right_body(),
))
}
})
}
async fn run_server(
config: &Config,
bootstrap: Bootstrap,
port_override: Option<u16>,
) -> IoResult<()> {
let port: u16 = if let Some(port) = port_override {
port
} else {
config
.get_api()
.ok_or("No API port configured")
.and_then(|port_str| port_str.parse().map_err(|_| "Invalid port number"))
.expect("Failed to parse API port")
};
let keep_alive: Duration = parse_secs_or_default(config.get_http_keep_alive_secs(), 15);
let client_disconnect_timeout: Duration =
parse_secs_or_default(config.get_client_disconnect_timeout_secs(), 60);
let client_request_timeout =
parse_secs_or_default(config.get_client_request_timeout_secs(), 60);
let worker_count: usize = config
.get_http_workers()
.and_then(|v| parse_usize(v.as_str()))
.unwrap_or_else(|| available_parallelism().map(|n| n.get()).unwrap_or(4));
let max_connections: usize = config
.get_http_max_connections()
.and_then(|v| parse_usize(v.as_str()))
.unwrap_or(10_000);
let backlog: usize = config
.get_http_backlog()
.and_then(|v| parse_usize(v.as_str()))
.unwrap_or(2_048);
let tcp_keepalive: Duration = parse_secs_or_default(config.get_tcp_keepalive_secs(), 75);
let prometheus_metrics_enabled: bool = config.get_prometheus_metrics_enabled();
let admission_window_secs: u64 = config.get_gateway_admission_window_secs().max(1);
let admission_store_backend = match config.get_gateway_admission_store_backend().as_str() {
"memory" => AdmissionStoreBackend::Memory,
_ => AdmissionStoreBackend::Redis,
};
let admission_store_fail_mode = match config.get_gateway_admission_store_fail_mode().as_str() {
"fail_open" => AdmissionStoreFailMode::FailOpen,
_ => AdmissionStoreFailMode::FailClosed,
};
let admission_limiter: AdmissionLimiter = AdmissionLimiter::new(
config.get_gateway_admission_limit_enabled(),
Duration::from_secs(admission_window_secs),
config.get_gateway_admission_global_requests_per_window(),
config.get_gateway_admission_per_client_requests_per_window(),
config.get_gateway_admission_defer_on_limit_enabled(),
config.get_gateway_admission_defer_route_prefixes(),
admission_store_backend,
admission_store_fail_mode,
);
if admission_limiter.is_enabled() {
info!(
admission_window_secs = admission_limiter.window_secs(),
admission_global_requests_per_window =
config.get_gateway_admission_global_requests_per_window(),
admission_per_client_requests_per_window =
config.get_gateway_admission_per_client_requests_per_window(),
admission_defer_on_limit_enabled =
config.get_gateway_admission_defer_on_limit_enabled(),
admission_store_backend = ?admission_store_backend,
admission_store_fail_mode = ?admission_store_fail_mode,
"Gateway admission limiter enabled"
);
}
let addr: SocketAddr = SocketAddr::from(([0, 0, 0, 0], port));
let socket: Socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
socket.set_nonblocking(true)?;
socket.set_reuse_address(true)?;
socket.set_keepalive(true)?;
let keepalive_cfg: TcpKeepalive = TcpKeepalive::new().with_time(tcp_keepalive);
socket.set_tcp_keepalive(&keepalive_cfg)?;
socket.bind(&addr.into()).map_err(|e| {
let msg = if e.kind() == AddrInUse {
format!(
"Address already in use (port {}). Stop the process using this port or choose a different one (e.g. --port).",
port
)
} else {
format!("Failed to bind to {}: {}", addr, e)
};
Error::new(e.kind(), msg)
})?;
let listen_backlog: i32 = backlog.min(i32::MAX as usize) as i32;
socket
.listen(listen_backlog)
.map_err(|e| Error::new(e.kind(), format!("Failed to listen on {}: {}", addr, e)))?;
let listener: TcpListener = socket.into();
ensure_pg_tools()
.await
.map_err(|err| Error::new(Other, format!("PostgreSQL tools unavailable: {err}")))?;
let app_state: Data<AppState> = bootstrap.app_state.clone();
spawn_connection_monitor(app_state.clone());
spawn_vacuum_health_collector(app_state.clone());
spawn_registry_reconnect_worker(app_state.clone());
let deferred_worker_poll_ms: u64 = config.get_gateway_deferred_query_worker_poll_ms().max(100);
spawn_deferred_query_worker(
app_state.clone(),
config.get_gateway_deferred_query_worker_enabled(),
Duration::from_millis(deferred_worker_poll_ms),
);
warn!("CORS is temporarily disabled: allowing any origin");
HttpServer::new(move || {
let limiter: AdmissionLimiter = admission_limiter.clone();
let cors: Cors = Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header();
let mut app = App::new()
.wrap(default_error_handlers())
.wrap(cors)
.wrap(actix_web::middleware::from_fn(move |req, next| {
admission_and_wildcard_middleware(req, next, limiter.clone())
}))
.app_data(app_state.clone())
.service(root)
.service(ping)
.service(cluster_health)
.service(sql_query)
.service(gateway_sql_query)
.service(sql_count_query)
.service(fetch_data_route)
.service(get_data_route)
.service(proxy_fetch_data_route)
.service(gateway_update_route)
.service(gateway_query_route)
.service(gateway_rpc_route)
.service(insert_data)
.service(delete_data)
.service(rpc_get_route)
.service(rpc_post_route)
.service(postgrest_get_route)
.service(postgrest_post_route)
.service(postgrest_patch_route)
.service(postgrest_delete_route)
.service(run_pipeline)
.service(simulate_pipeline)
.service(list_pipeline_templates)
.service(athena_router_registry)
.service(athena_openapi_host)
.service(athena_wss_openapi_host)
.service(api_registry)
.service(athena_docs)
.service(api_registry_by_id)
.service(gateway_deferred_status)
.service(gateway_deferred_requeue)
.service(gateway_wss_info)
.service(public_route_dispatch)
.configure(admin::services)
.configure(backup::services)
.configure(provision::services)
.configure(management::services)
.configure(schema::services)
.configure(storage::services)
.service(ssl_enforcement);
if prometheus_metrics_enabled {
app = app.service(prometheus_metrics);
}
app
})
.workers(worker_count)
.keep_alive(keep_alive)
.client_disconnect_timeout(client_disconnect_timeout)
.client_request_timeout(client_request_timeout)
.max_connections(max_connections)
.backlog(backlog as u32)
.listen(listener)?
.run()
.await
}
fn init_tracing(enable_sentry_layer: bool) {
let filter: EnvFilter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let no_color: bool = env::var("NO_COLOR").is_ok();
let ansi: bool = !no_color
&& env::var("ATHENA_ANSI")
.ok()
.and_then(|v| match v.as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
.unwrap_or_else(|| std::io::stderr().is_terminal());
let fmt_layer: tracing_subscriber::fmt::Layer<
tracing_subscriber::layer::Layered<EnvFilter, tracing_subscriber::Registry>,
tracing_subscriber::fmt::format::DefaultFields,
tracing_subscriber::fmt::format::Format<tracing_subscriber::fmt::format::Full, ChronoLocal>,
> = tracing_subscriber::fmt::layer()
.with_ansi(ansi)
.with_level(true)
.with_target(true)
.with_timer(ChronoLocal::new("%H:%M:%S%.3f".to_string()));
let base: tracing_subscriber::layer::Layered<
tracing_subscriber::fmt::Layer<
tracing_subscriber::layer::Layered<EnvFilter, tracing_subscriber::Registry>,
tracing_subscriber::fmt::format::DefaultFields,
tracing_subscriber::fmt::format::Format<
tracing_subscriber::fmt::format::Full,
ChronoLocal,
>,
>,
tracing_subscriber::layer::Layered<EnvFilter, tracing_subscriber::Registry>,
> = tracing_subscriber::registry().with(filter).with(fmt_layer);
if enable_sentry_layer {
base.with(sentry_tracing::layer()).init();
} else {
base.init();
}
}
const BETTERSTACK_SENTRY_DSN: &str =
"https://mMR1Bs5K6vSzXT8YGYyUxQSE@s1741777.eu-fsn-3.betterstackdata.com/1741777";
fn init_sentry() -> Option<sentry::ClientInitGuard> {
let dsn_source: String =
env::var("SENTRY_DSN").unwrap_or_else(|_| BETTERSTACK_SENTRY_DSN.to_string());
if dsn_source.trim().is_empty() {
return None;
}
let dsn: SentryDsn = match dsn_source.parse::<SentryDsn>() {
Ok(dsn) => dsn,
Err(err) => {
eprintln!("failed to parse Sentry DSN: {err}");
return None;
}
};
let mut options: sentry::ClientOptions = sentry::ClientOptions {
dsn: Some(dsn),
release: Some(env!("CARGO_PKG_VERSION").into()),
environment: env::var("SENTRY_ENVIRONMENT").ok().map(Into::into),
attach_stacktrace: true,
..Default::default()
};
if let Ok(value) = env::var("SENTRY_SAMPLE_RATE")
&& let Ok(parsed) = value.parse::<f32>()
{
options.sample_rate = parsed;
}
Some(sentry::init(options))
}
fn format_attempted_locations(locations: &[ConfigLocation]) -> String {
locations
.iter()
.map(|location| format!("- {}", location.describe()))
.collect::<Vec<_>>()
.join("\n")
}