#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use actix_cors::Cors;
use actix_web::HttpMessage;
use actix_web::body::MessageBody;
use actix_web::dev::ServiceResponse;
use actix_web::http::{StatusCode, header};
use actix_web::middleware::{DefaultHeaders, ErrorHandlerResponse, ErrorHandlers, Next};
use actix_web::{App, HttpServer, web};
#[cfg(feature = "sentry")]
use sentry::ClientInitGuard;
#[cfg(feature = "sentry")]
use sentry::types::Dsn as SentryDsn;
use actix_web::web::Data;
use clap::Parser;
use colored::Colorize;
use dotenv::dotenv;
use serde_json::json;
use serde_yaml;
use std::collections::HashMap;
use std::env;
use std::fs::OpenOptions;
use std::io::ErrorKind::{AddrInUse, Other};
use std::io::Result as IoResult;
use std::io::{Error, IsTerminal};
use std::net::{SocketAddr, TcpListener};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tracing::Instrument as _;
use tracing::{info, warn};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::Registry;
use tracing_subscriber::fmt::time::ChronoLocal;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uuid::Uuid;
use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type};
use athena_rs::AppState;
use athena_rs::api::gateway::deferred::{gateway_deferred_requeue, gateway_deferred_status};
use athena_rs::api::gateway::delete::delete_data;
use athena_rs::api::gateway::fetch::{
fetch_data_route, gateway_update_route, get_data_route, proxy_fetch_data_route,
};
use athena_rs::api::gateway::flush::gateway_deferred_flush_route;
use athena_rs::api::gateway::insert::insert_data;
use athena_rs::api::gateway::postgrest::{
postgrest_delete_route, postgrest_get_route, postgrest_patch_route, postgrest_post_route,
};
use athena_rs::api::gateway::query::gateway_query_route;
use athena_rs::api::gateway::rpc::{gateway_rpc_route, rpc_get_route, rpc_post_route};
use athena_rs::api::headers::request_context::{
ResolvedAthenaClient, ResolvedAthenaClientSource, ResolvedAthenaRouteTargetUrl,
set_athena_request_id, set_athena_request_trace_id,
};
use athena_rs::api::headers::response_headers::apply_common_response_headers;
use athena_rs::api::health::{cluster_health, health, ping, root};
use athena_rs::api::host_routing::{
DEFAULT_WILDCARD_TARGET_BASE_URL, resolve_public_host_route, wildcard_host_pattern,
};
use athena_rs::api::metrics::prometheus_metrics;
use athena_rs::api::pipelines::{list_pipeline_templates, run_pipeline, simulate_pipeline};
use athena_rs::api::public_routes::public_route_dispatch;
use athena_rs::api::query::count::sql_count_query;
use athena_rs::api::query::sql::{gateway_sql_d1_migrate, gateway_sql_query, sql_query};
use athena_rs::api::registry::{api_registry, api_registry_by_id};
use athena_rs::api::response::{internal_error, not_found, too_many_requests};
use athena_rs::api::service_router::maybe_route_service_request;
use athena_rs::api::supabase::ssl_enforcement;
use athena_rs::api::{
admin, athena_docs, athena_error_catalog, athena_openapi_host, athena_router_registry,
athena_wss_openapi_host, billing, chat, configure_optional_operator_services, daemons, debug,
management, schema, storage, typesense, webhook_sinks,
};
use athena_rs::bootstrap::{Bootstrap, RuntimeConfigMetadata, build_shared_state};
use athena_rs::cdc::websocket::websocket_server;
use athena_rs::cli::{self, AthenaCli, Command};
use athena_rs::config::{Config, ConfigLoadError, ConfigLocation, DEFAULT_CONFIG_FILE_NAME};
use athena_rs::config_validation::initialize_runtime_env_settings;
use athena_rs::data::public_routes::{PublicGatewayRouteRecord, get_public_gateway_route_by_key};
#[cfg(feature = "tokio-console")]
use athena_rs::features::tokio_console;
use athena_rs::utils::best_effort_pg_backoff::{
best_effort_pg_write_backoff_active, maybe_activate_best_effort_pg_write_backoff,
};
use athena_rs::utils::logging_task_limiter::spawn_best_effort_logging_task;
use athena_rs::utils::pg_tools::ensure_pg_tools;
use athena_rs::utils::redis_client::{
GLOBAL_REDIS, initialize_global_redis_from_env, note_redis_failure_and_start_cooldown,
note_redis_success, should_bypass_redis_temporarily,
};
use athena_rs::utils::request_logging::{RouteRequestLogContext, log_route_request_event};
use athena_rs::wss::{gateway_wss_info, gateway_wss_route};
mod tooling_hints;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AdmissionStoreBackend {
Memory,
Redis,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AdmissionStoreFailMode {
FailClosed,
FailOpen,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AdmissionCheckOutcome {
Allowed,
Limited,
StoreUnavailable,
}
const ADMISSION_EVENT_BACKOFF_DOMAIN: &str = "gateway_admission_events_pg_logging";
#[derive(Debug, Clone)]
struct AdmissionLimiter {
enabled: bool,
window: Duration,
global_limit_per_window: u64,
per_client_limit_per_window: u64,
defer_on_limit_enabled: bool,
defer_route_prefixes: Vec<String>,
store_backend: AdmissionStoreBackend,
store_fail_mode: AdmissionStoreFailMode,
state: Arc<Mutex<AdmissionLimiterWindow>>,
}
#[derive(Debug)]
struct AdmissionLimiterWindow {
started_at: Instant,
global_count: u64,
by_client: HashMap<String, u64>,
}
impl AdmissionLimiter {
fn new(
enabled: bool,
window: Duration,
global_limit_per_window: u64,
per_client_limit_per_window: u64,
defer_on_limit_enabled: bool,
defer_route_prefixes: Vec<String>,
store_backend: AdmissionStoreBackend,
store_fail_mode: AdmissionStoreFailMode,
) -> Self {
Self {
enabled,
window,
global_limit_per_window,
per_client_limit_per_window,
defer_on_limit_enabled,
defer_route_prefixes,
store_backend,
store_fail_mode,
state: Arc::new(Mutex::new(AdmissionLimiterWindow {
started_at: Instant::now(),
global_count: 0,
by_client: HashMap::new(),
})),
}
}
fn is_enabled(&self) -> bool {
self.enabled
}
fn window_secs(&self) -> u64 {
self.window.as_secs().max(1)
}
fn should_defer_route(&self, route: &str) -> bool {
if !self.defer_on_limit_enabled {
return false;
}
let normalized_route: &str = route.trim();
if normalized_route.is_empty() {
return false;
}
self.defer_route_prefixes
.iter()
.map(|prefix| prefix.trim())
.filter(|prefix| !prefix.is_empty())
.any(|prefix| normalized_route.starts_with(prefix))
}
fn fail_open_on_store_unavailable(&self) -> bool {
self.store_fail_mode == AdmissionStoreFailMode::FailOpen
}
fn check_allow(&self, client: Option<&str>) -> AdmissionCheckOutcome {
if !self.enabled {
return AdmissionCheckOutcome::Allowed;
}
match self.store_backend {
AdmissionStoreBackend::Memory => {
if self.memory_should_allow(client) {
AdmissionCheckOutcome::Allowed
} else {
AdmissionCheckOutcome::Limited
}
}
AdmissionStoreBackend::Redis => self.redis_should_allow(client),
}
}
fn memory_should_allow(&self, client: Option<&str>) -> bool {
if !self.enabled {
return true;
}
let Ok(mut guard) = self.state.lock() else {
return true;
};
if guard.started_at.elapsed() >= self.window {
guard.started_at = Instant::now();
guard.global_count = 0;
guard.by_client.clear();
}
if self.global_limit_per_window > 0 && guard.global_count >= self.global_limit_per_window {
return false;
}
if self.per_client_limit_per_window > 0 {
let client_key: String = client
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown")
.to_string();
let current_client_count: u64 = guard.by_client.get(&client_key).copied().unwrap_or(0);
if current_client_count >= self.per_client_limit_per_window {
return false;
}
guard
.by_client
.insert(client_key, current_client_count.saturating_add(1));
}
guard.global_count = guard.global_count.saturating_add(1);
true
}
fn redis_should_allow(&self, client: Option<&str>) -> AdmissionCheckOutcome {
let Some(redis) = GLOBAL_REDIS.get() else {
return AdmissionCheckOutcome::StoreUnavailable;
};
if should_bypass_redis_temporarily() {
return AdmissionCheckOutcome::StoreUnavailable;
}
let client_key: String = client
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown")
.to_ascii_lowercase();
let window_secs: u64 = self.window_secs();
let bucket: i64 = chrono::Utc::now()
.timestamp()
.div_euclid(window_secs as i64);
let global_key: String = format!("athena:admission:global:{bucket}");
let client_key: String = format!("athena:admission:client:{client_key}:{bucket}");
let global_count: u64 =
match redis.increment_counter_with_ttl_blocking(&global_key, window_secs) {
Ok(value) => {
note_redis_success();
value
}
Err(_) => {
note_redis_failure_and_start_cooldown();
return AdmissionCheckOutcome::StoreUnavailable;
}
};
if self.global_limit_per_window > 0 && global_count > self.global_limit_per_window {
return AdmissionCheckOutcome::Limited;
}
let per_client_count: u64 =
match redis.increment_counter_with_ttl_blocking(&client_key, window_secs) {
Ok(value) => {
note_redis_success();
value
}
Err(_) => {
note_redis_failure_and_start_cooldown();
return AdmissionCheckOutcome::StoreUnavailable;
}
};
if self.per_client_limit_per_window > 0
&& per_client_count > self.per_client_limit_per_window
{
return AdmissionCheckOutcome::Limited;
}
AdmissionCheckOutcome::Allowed
}
}
fn x_athena_defer_requested(req: &actix_web::dev::ServiceRequest) -> bool {
req.headers()
.get("X-Athena-Defer")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.map(|value| matches!(value, "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
static FILE_LOG_GUARDS: Mutex<Vec<WorkerGuard>> = Mutex::new(Vec::new());
const PRIMARY_TRACING_LOG_DIR: &str = "/var/log/athena";
const FALLBACK_TRACING_LOG_DIR: &str = "./var/log/athena";
const TRACING_LOG_DIR_ENV: &str = "ATHENA_TRACING_LOG_DIR";
const SUCCESS_LOG_FILE_NAME: &str = "success.log";
const ERROR_LOG_FILE_NAME: &str = "error.log";
#[allow(unexpected_cfgs)]
const TOKIO_UNSTABLE_ENABLED: bool = cfg!(tokio_unstable);
const DEFAULT_TRACING_FILTER: &str = "info,sqlx_core::pool::inner=warn";
#[derive(Debug, Clone)]
struct TerminalContext {
terminal_name: String,
shell_name: String,
stderr_is_tty: bool,
ansi_enabled: bool,
hint: String,
}
#[derive(Debug, Clone)]
struct StartupConfigContext {
source_label: String,
config_path: PathBuf,
seeded_default: bool,
supplemental_dotenv_status: SupplementalDotenvStatus,
terminal_ansi_enabled: bool,
}
#[derive(Debug, Clone)]
enum SupplementalDotenvStatus {
NotFound { expected_path: PathBuf },
Loaded { path: PathBuf },
Failed { path: PathBuf },
}
impl SupplementalDotenvStatus {
fn path(&self) -> &Path {
match self {
Self::NotFound { expected_path } => expected_path.as_path(),
Self::Loaded { path } | Self::Failed { path } => path.as_path(),
}
}
fn status_label(&self) -> &'static str {
match self {
Self::NotFound { .. } => "not found",
Self::Loaded { .. } => "loaded",
Self::Failed { .. } => "parse failed",
}
}
fn display_value(&self) -> String {
format!("{} ({})", self.path().display(), self.status_label())
}
}
#[derive(Debug, Clone)]
struct ServerStartupSummary {
version: &'static str,
build_profile: &'static str,
working_directory: Option<PathBuf>,
config_context: StartupConfigContext,
listen_addr: SocketAddr,
worker_count: usize,
max_connections: usize,
backlog: usize,
keep_alive: Duration,
client_disconnect_timeout: Duration,
client_request_timeout: Duration,
daemon_enabled: bool,
prometheus_enabled: bool,
logging_client_name: Option<String>,
gateway_auth_client_name: Option<String>,
gateway_benchmark_client_name: Option<String>,
}
fn as_i64_saturating(value: u64) -> i64 {
i64::try_from(value).unwrap_or(i64::MAX)
}
fn resolve_ansi_enabled_for_console() -> bool {
let no_color: bool = env::var("NO_COLOR").is_ok();
!no_color
&& env::var("ATHENA_ANSI")
.ok()
.and_then(|v| match v.as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
.unwrap_or_else(|| std::io::stderr().is_terminal())
}
fn detect_terminal_context() -> TerminalContext {
let stderr_is_tty: bool = std::io::stderr().is_terminal();
let ansi_enabled: bool = resolve_ansi_enabled_for_console();
let shell_name: String = env::var("SHELL")
.or_else(|_| env::var("COMSPEC"))
.ok()
.and_then(|path| {
Path::new(&path)
.file_name()
.map(|name| name.to_string_lossy().to_string())
})
.unwrap_or_else(|| "unknown".to_string());
let mut terminal_name: String = "unknown".to_string();
let mut hint: String = "no terminal markers detected".to_string();
if env::var("WT_SESSION").is_ok() {
terminal_name = "Windows Terminal".to_string();
hint = "WT_SESSION".to_string();
} else if env::var("TERM_PROGRAM")
.ok()
.is_some_and(|value| value.eq_ignore_ascii_case("vscode"))
{
terminal_name = "VS Code Integrated Terminal".to_string();
hint = "TERM_PROGRAM=vscode".to_string();
} else if env::var("TERM_PROGRAM")
.ok()
.is_some_and(|value| value.eq_ignore_ascii_case("Apple_Terminal"))
{
terminal_name = "Apple Terminal".to_string();
hint = "TERM_PROGRAM=Apple_Terminal".to_string();
} else if env::var("TERM_PROGRAM")
.ok()
.is_some_and(|value| value.eq_ignore_ascii_case("iTerm.app"))
{
terminal_name = "iTerm2".to_string();
hint = "TERM_PROGRAM=iTerm.app".to_string();
} else if env::var("ALACRITTY_WINDOW_ID").is_ok() {
terminal_name = "Alacritty".to_string();
hint = "ALACRITTY_WINDOW_ID".to_string();
} else if env::var("KITTY_PID").is_ok() {
terminal_name = "kitty".to_string();
hint = "KITTY_PID".to_string();
} else if env::var("TERM")
.ok()
.is_some_and(|value| value == "xterm-kitty")
{
terminal_name = "kitty".to_string();
hint = "TERM=xterm-kitty".to_string();
} else if env::var("ConEmuPID").is_ok() {
terminal_name = "ConEmu".to_string();
hint = "ConEmuPID".to_string();
} else if env::var("TERM")
.ok()
.is_some_and(|value| value.contains("xterm"))
{
terminal_name = "xterm-compatible terminal".to_string();
hint = "TERM contains xterm".to_string();
} else if env::var("TERM")
.ok()
.is_some_and(|value| value.contains("screen"))
{
terminal_name = "screen/tmux-compatible terminal".to_string();
hint = "TERM contains screen".to_string();
}
TerminalContext {
terminal_name,
shell_name,
stderr_is_tty,
ansi_enabled,
hint,
}
}
fn build_profile_name() -> &'static str {
if cfg!(debug_assertions) {
"debug"
} else {
"release"
}
}
fn option_display(value: Option<&str>) -> &str {
value.unwrap_or("none")
}
fn print_server_startup_banner(summary: &ServerStartupSummary) {
let config_seeded: &str = if summary.config_context.seeded_default {
"yes"
} else {
"no"
};
let cwd_display: String = summary
.working_directory
.as_ref()
.map(|path| path.display().to_string())
.unwrap_or_else(|| "unknown".to_string());
if !summary.config_context.terminal_ansi_enabled {
eprintln!("=== Athena server starting ===");
eprintln!("version: {} ({})", summary.version, summary.build_profile);
eprintln!("config source: {}", summary.config_context.source_label);
eprintln!(
"config file: {}",
summary.config_context.config_path.display()
);
eprintln!("config seeded default: {config_seeded}");
eprintln!(
"config .env: {}",
summary
.config_context
.supplemental_dotenv_status
.display_value()
);
eprintln!("working directory: {cwd_display}");
eprintln!("listen: {}", summary.listen_addr);
eprintln!("http workers: {}", summary.worker_count);
eprintln!("http max connections: {}", summary.max_connections);
eprintln!("http backlog: {}", summary.backlog);
eprintln!("http keep-alive: {}s", summary.keep_alive.as_secs());
eprintln!(
"client disconnect timeout: {}s",
summary.client_disconnect_timeout.as_secs()
);
eprintln!(
"client request timeout: {}s",
summary.client_request_timeout.as_secs()
);
eprintln!(
"daemon mode: {}",
if summary.daemon_enabled {
"enabled"
} else {
"disabled"
}
);
eprintln!(
"prometheus metrics: {}",
if summary.prometheus_enabled {
"enabled"
} else {
"disabled"
}
);
eprintln!(
"gateway auth client: {}",
option_display(summary.gateway_auth_client_name.as_deref())
);
eprintln!(
"gateway logging client: {}",
option_display(summary.logging_client_name.as_deref())
);
eprintln!(
"gateway benchmark client: {}",
option_display(summary.gateway_benchmark_client_name.as_deref())
);
return;
}
const RESET: &str = "\x1b[0m";
const BOLD: &str = "\x1b[1m";
const DIM: &str = "\x1b[2m";
const CYAN: &str = "\x1b[36m";
const YELLOW: &str = "\x1b[33m";
const GREEN: &str = "\x1b[32m";
eprintln!("{BOLD}{GREEN}=== Athena server starting ==={RESET}");
eprintln!(
"{DIM}version:{RESET} {YELLOW}{} ({}){RESET}",
summary.version, summary.build_profile
);
eprintln!(
"{DIM}config source:{RESET} {CYAN}{}{RESET}",
summary.config_context.source_label
);
eprintln!(
"{DIM}config file:{RESET} {YELLOW}{}{RESET}",
summary.config_context.config_path.display()
);
eprintln!("{DIM}config seeded default:{RESET} {config_seeded}");
eprintln!(
"{DIM}config .env:{RESET} {YELLOW}{}{RESET}",
summary
.config_context
.supplemental_dotenv_status
.display_value()
);
eprintln!("{DIM}working directory:{RESET} {YELLOW}{cwd_display}{RESET}");
eprintln!("{DIM}listen:{RESET} {CYAN}{}{RESET}", summary.listen_addr);
eprintln!("{DIM}http workers:{RESET} {}", summary.worker_count);
eprintln!(
"{DIM}http max connections:{RESET} {}",
summary.max_connections
);
eprintln!("{DIM}http backlog:{RESET} {}", summary.backlog);
eprintln!(
"{DIM}http keep-alive:{RESET} {}s",
summary.keep_alive.as_secs()
);
eprintln!(
"{DIM}client disconnect timeout:{RESET} {}s",
summary.client_disconnect_timeout.as_secs()
);
eprintln!(
"{DIM}client request timeout:{RESET} {}s",
summary.client_request_timeout.as_secs()
);
eprintln!(
"{DIM}daemon mode:{RESET} {}",
if summary.daemon_enabled {
"enabled"
} else {
"disabled"
}
);
eprintln!(
"{DIM}prometheus metrics:{RESET} {}",
if summary.prometheus_enabled {
"enabled"
} else {
"disabled"
}
);
eprintln!(
"{DIM}gateway auth client:{RESET} {}",
option_display(summary.gateway_auth_client_name.as_deref())
);
eprintln!(
"{DIM}gateway logging client:{RESET} {}",
option_display(summary.logging_client_name.as_deref())
);
eprintln!(
"{DIM}gateway benchmark client:{RESET} {}",
option_display(summary.gateway_benchmark_client_name.as_deref())
);
}
fn log_server_startup_summary(summary: &ServerStartupSummary) {
let cwd_display: String = summary
.working_directory
.as_ref()
.map(|path| path.display().to_string())
.unwrap_or_else(|| "unknown".to_string());
info!(
version = summary.version,
build_profile = summary.build_profile,
config_source = %summary.config_context.source_label,
config_path = %summary.config_context.config_path.display(),
config_seeded_default = summary.config_context.seeded_default,
config_dotenv_path = %summary.config_context.supplemental_dotenv_status.path().display(),
config_dotenv_status = summary.config_context.supplemental_dotenv_status.status_label(),
working_directory = %cwd_display,
listen_addr = %summary.listen_addr,
http_workers = summary.worker_count,
http_max_connections = summary.max_connections,
http_backlog = summary.backlog,
http_keep_alive_secs = summary.keep_alive.as_secs(),
client_disconnect_timeout_secs = summary.client_disconnect_timeout.as_secs(),
client_request_timeout_secs = summary.client_request_timeout.as_secs(),
daemon_enabled = summary.daemon_enabled,
prometheus_enabled = summary.prometheus_enabled,
gateway_auth_client = option_display(summary.gateway_auth_client_name.as_deref()),
gateway_logging_client = option_display(summary.logging_client_name.as_deref()),
gateway_benchmark_client = option_display(summary.gateway_benchmark_client_name.as_deref()),
"Athena server startup summary"
);
}
fn print_colored_boot_config_dump(config: &Config, config_path: &Path, ansi_enabled: bool) {
let yaml: String = match serde_yaml::to_string(config) {
Ok(value) => value,
Err(err) => {
warn!(error = %err, "Failed to serialize parsed config for startup dump");
return;
}
};
if !ansi_enabled {
eprintln!("=== Athena parsed configuration ===");
eprintln!("source: {}", config_path.display());
eprintln!("{yaml}");
return;
}
const RESET: &str = "\x1b[0m";
const BOLD: &str = "\x1b[1m";
const DIM: &str = "\x1b[2m";
const CYAN: &str = "\x1b[36m";
const BLUE: &str = "\x1b[34m";
const GREEN: &str = "\x1b[32m";
const MAGENTA: &str = "\x1b[35m";
const YELLOW: &str = "\x1b[33m";
eprintln!(
"{BOLD}{CYAN}=== Athena parsed configuration ==={RESET}\n{DIM}source:{RESET} {YELLOW}{}{RESET}",
config_path.display()
);
for line in yaml.lines() {
let trimmed: &str = line.trim_start();
let indent_len: usize = line.len().saturating_sub(trimmed.len());
let indent: &str = &line[..indent_len];
if trimmed.starts_with('#') {
eprintln!("{DIM}{line}{RESET}");
continue;
}
if let Some(rest) = trimmed.strip_prefix("- ") {
eprintln!("{indent}{MAGENTA}-{RESET} {GREEN}{rest}{RESET}");
continue;
}
if let Some((key, value)) = trimmed.split_once(':') {
if value.trim().is_empty() {
eprintln!("{indent}{BLUE}{key}{RESET}:");
} else {
eprintln!("{indent}{BLUE}{key}{RESET}:{GREEN}{value}{RESET}");
}
continue;
}
eprintln!("{line}");
}
}
async fn persist_admission_event(
state: &AppState,
limiter: &AdmissionLimiter,
method: &str,
route: &str,
client_name: Option<&str>,
request_bytes: Option<u64>,
defer_requested: bool,
is_query_route: bool,
decision: &str,
reason: &str,
deferred_request_id: Option<&str>,
retry_after_seconds: u64,
) -> Result<(), String> {
if best_effort_pg_write_backoff_active(ADMISSION_EVENT_BACKOFF_DOMAIN) {
return Ok(());
}
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
return Ok(());
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
return Ok(());
};
let event_id: String = Uuid::new_v4().to_string();
let request_bytes_i64: Option<i64> = request_bytes.map(as_i64_saturating);
let retry_after_i32: i32 = retry_after_seconds.min(i32::MAX as u64) as i32;
let window_i32: i32 = limiter.window_secs().min(i32::MAX as u64) as i32;
let global_limit_i64: i64 = as_i64_saturating(limiter.global_limit_per_window);
let per_client_limit_i64: i64 = as_i64_saturating(limiter.per_client_limit_per_window);
sqlx::query(
r#"
INSERT INTO public.gateway_admission_events (
event_id,
method,
route,
client_name,
request_bytes,
defer_requested,
is_query_route,
decision,
reason,
deferred_request_id,
retry_after_seconds,
window_seconds,
global_limit_per_window,
per_client_limit_per_window,
meta
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
"#,
)
.bind(event_id)
.bind(method)
.bind(route)
.bind(client_name)
.bind(request_bytes_i64)
.bind(defer_requested)
.bind(is_query_route)
.bind(decision)
.bind(reason)
.bind(deferred_request_id)
.bind(retry_after_i32)
.bind(window_i32)
.bind(global_limit_i64)
.bind(per_client_limit_i64)
.bind(json!({
"defer_on_limit_enabled": limiter.defer_on_limit_enabled,
"defer_route_prefixes": limiter.defer_route_prefixes,
}))
.execute(&pool)
.await
.map_err(|err| format!("failed to persist admission event: {err}"))?;
Ok(())
}
fn spawn_admission_event_persist(
app_state: Option<Data<AppState>>,
limiter: AdmissionLimiter,
method: String,
route: String,
client_name: Option<String>,
request_bytes: Option<u64>,
defer_requested: bool,
is_query_route: bool,
decision: &'static str,
reason: &'static str,
deferred_request_id: Option<String>,
retry_after_seconds: u64,
) {
let Some(state) = app_state else {
return;
};
if best_effort_pg_write_backoff_active(ADMISSION_EVENT_BACKOFF_DOMAIN) {
return;
}
let logging_task_limiter: Option<Arc<tokio::sync::Semaphore>> =
state.logging_task_limiter.clone();
spawn_best_effort_logging_task(
logging_task_limiter,
"admission_event_persist",
async move {
if let Err(err) = persist_admission_event(
state.get_ref(),
&limiter,
&method,
&route,
client_name.as_deref(),
request_bytes,
defer_requested,
is_query_route,
decision,
reason,
deferred_request_id.as_deref(),
retry_after_seconds,
)
.await
{
if maybe_activate_best_effort_pg_write_backoff(
ADMISSION_EVENT_BACKOFF_DOMAIN,
"admission_event_persist",
&err,
)
.is_none()
{
warn!(
error = %err,
method = %method,
route = %route,
decision = %decision,
"Failed to persist admission event"
);
}
}
},
);
}
#[derive(Debug, Clone)]
struct WildcardHostResolution {
matched_host: Option<String>,
route_key: Option<String>,
tenant_ref: Option<String>,
service_key: Option<String>,
major_version: Option<String>,
resolved_client: Option<String>,
target_url: Option<String>,
outcome: &'static str,
error_message: Option<String>,
}
fn set_wildcard_host_client_context(
req: &mut actix_web::dev::ServiceRequest,
host_route: &athena_rs::api::host_routing::PublicHostRoute,
resolved_client: &str,
outcome: &'static str,
) {
req.extensions_mut()
.insert(ResolvedAthenaClient(resolved_client.to_string()));
req.extensions_mut().insert(ResolvedAthenaClientSource {
source: "wildcard_host".to_string(),
outcome: outcome.to_string(),
wildcard_host_pattern: Some(wildcard_host_pattern()),
matched_host: Some(host_route.matched_host.clone()),
wildcard_prefix: Some(host_route.route_key.clone()),
route_key: Some(host_route.route_key.clone()),
tenant_ref: Some(host_route.tenant_ref.clone()),
service_key: host_route.service_key.clone(),
major_version: host_route.major_version.clone(),
});
}
async fn resolve_wildcard_host_client(
req: &mut actix_web::dev::ServiceRequest,
app_state: Option<&Data<AppState>>,
) -> WildcardHostResolution {
let Some(host_route) = resolve_public_host_route(req.headers()) else {
return WildcardHostResolution {
matched_host: None,
route_key: None,
tenant_ref: None,
service_key: None,
major_version: None,
resolved_client: None,
target_url: None,
outcome: "not_wildcard_host",
error_message: None,
};
};
let route_key = host_route.route_key.clone();
let Some(state) = app_state.map(Data::get_ref) else {
let resolved_client = route_key.clone();
set_wildcard_host_client_context(
req,
&host_route,
&resolved_client,
"client_name_fallback_missing_app_state",
);
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key.clone()),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client.to_string()),
target_url: None,
outcome: "client_name_fallback_missing_app_state",
error_message: None,
};
};
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
let resolved_client = route_key.clone();
set_wildcard_host_client_context(
req,
&host_route,
&resolved_client,
"client_name_fallback_missing_logging_client",
);
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key.clone()),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client.to_string()),
target_url: None,
outcome: "client_name_fallback_missing_logging_client",
error_message: None,
};
};
let Some(pool) = state.pg_registry.get_pool(logging_client_name) else {
let resolved_client = route_key.clone();
set_wildcard_host_client_context(
req,
&host_route,
&resolved_client,
"client_name_fallback_missing_logging_pool",
);
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key.clone()),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client.to_string()),
target_url: None,
outcome: "client_name_fallback_missing_logging_pool",
error_message: None,
};
};
let route_lookup: Result<Option<PublicGatewayRouteRecord>, sqlx::Error> =
get_public_gateway_route_by_key(&pool, &route_key).await;
let route: PublicGatewayRouteRecord = match route_lookup {
Ok(Some(route)) => route,
Ok(None) => {
let resolved_client = route_key.clone();
set_wildcard_host_client_context(
req,
&host_route,
&resolved_client,
"client_name_fallback",
);
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key.clone()),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client.to_string()),
target_url: None,
outcome: "client_name_fallback",
error_message: None,
};
}
Err(err) => {
warn!(
route_key = %route_key,
error = %err,
"Wildcard host client lookup failed"
);
let resolved_client = route_key.clone();
set_wildcard_host_client_context(
req,
&host_route,
&resolved_client,
"client_name_fallback_route_lookup_failed",
);
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key.clone()),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client.to_string()),
target_url: None,
outcome: "client_name_fallback_route_lookup_failed",
error_message: Some(err.to_string()),
};
}
};
if !route.is_active {
let resolved_client: String = route_key.clone();
set_wildcard_host_client_context(
req,
&host_route,
&resolved_client,
"client_name_fallback_route_inactive",
);
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key.clone()),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client.to_string()),
target_url: None,
outcome: "client_name_fallback_route_inactive",
error_message: None,
};
}
let resolved_client: String = route.client_name.trim().to_string();
if resolved_client.is_empty() {
return WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(route_key),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: None,
target_url: None,
outcome: "empty_client_name",
error_message: None,
};
}
set_wildcard_host_client_context(req, &host_route, &resolved_client, "resolved");
let target_url = route
.target_url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
if let Some(target_url) = target_url.as_ref() {
req.extensions_mut()
.insert(ResolvedAthenaRouteTargetUrl(target_url.clone()));
}
WildcardHostResolution {
matched_host: Some(host_route.matched_host),
route_key: Some(host_route.route_key),
tenant_ref: Some(host_route.tenant_ref),
service_key: host_route.service_key,
major_version: host_route.major_version,
resolved_client: Some(resolved_client),
target_url,
outcome: "resolved",
error_message: None,
}
}
async fn admission_and_wildcard_middleware<B>(
mut req: actix_web::dev::ServiceRequest,
next: actix_web::middleware::Next<B>,
limiter: AdmissionLimiter,
) -> Result<actix_web::dev::ServiceResponse<actix_web::body::BoxBody>, actix_web::Error>
where
B: MessageBody + 'static,
{
let metrics_state: Option<Data<AppState>> = req.app_data::<web::Data<AppState>>().cloned();
let request_id: String = Uuid::new_v4().to_string();
set_athena_request_id(req.request(), request_id.clone());
set_athena_request_trace_id(req.request(), request_id.clone());
let method: String = req.method().as_str().to_string();
let incoming_route: String = req
.match_pattern()
.map(|value| value.to_string())
.unwrap_or_else(|| req.path().to_string());
let request_bytes: Option<u64> = req
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let host = req
.headers()
.get(header::HOST)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let athena_defer: bool = x_athena_defer_requested(&req);
let started: Instant = Instant::now();
let wildcard_resolution: WildcardHostResolution =
resolve_wildcard_host_client(&mut req, metrics_state.as_ref()).await;
if let Some(resolved_client) = wildcard_resolution.resolved_client.as_deref() {
info!(
host_pattern = %wildcard_host_pattern(),
wildcard_target = %DEFAULT_WILDCARD_TARGET_BASE_URL,
resolved_client = %resolved_client,
"Resolved X-Athena-Client from wildcard host route"
);
}
if wildcard_resolution.route_key.is_some() {
log_route_request_event(
metrics_state.as_ref().map(Data::get_ref),
RouteRequestLogContext {
request_id: Some(request_id.clone()),
source: Some("wildcard_host".to_string()),
outcome: Some(wildcard_resolution.outcome.to_string()),
method: Some(method.clone()),
path: Some(incoming_route.clone()),
host,
wildcard_host_pattern: Some(wildcard_host_pattern()),
wildcard_prefix: wildcard_resolution.route_key.clone(),
route_key: wildcard_resolution.route_key.clone(),
resolved_client: wildcard_resolution.resolved_client.clone(),
error_message: wildcard_resolution.error_message.clone(),
metadata: Some(json!({
"matched_host": wildcard_resolution.matched_host,
"tenant_ref": wildcard_resolution.tenant_ref,
"service_key": wildcard_resolution.service_key,
"major_version": wildcard_resolution.major_version,
"route_target_url": wildcard_resolution.target_url,
"wildcard_target": DEFAULT_WILDCARD_TARGET_BASE_URL
})),
..Default::default()
},
);
}
if let Some(mut response) = maybe_route_service_request(&mut req).await? {
let athena_client: Option<String> = req
.extensions()
.get::<ResolvedAthenaClient>()
.map(|value| value.0.clone())
.or_else(|| {
req.headers()
.get("X-Athena-Client")
.and_then(|value| value.to_str().ok())
.map(str::to_string)
});
let response_bytes: Option<u64> = response
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
if let Some(state) = &metrics_state {
state
.metrics_state
.begin_http_request(&method, &incoming_route);
state.metrics_state.finish_http_request(
&method,
&incoming_route,
response.status().as_u16(),
started.elapsed().as_secs_f64(),
request_bytes,
response_bytes,
athena_client.as_deref(),
);
}
response
.headers_mut()
.insert(header::SERVER, "XYLEX/0".parse().unwrap());
apply_common_response_headers(
response.headers_mut(),
started.elapsed().as_millis(),
Some(&request_id),
None,
athena_client.as_deref(),
);
let response: ServiceResponse = req.into_response(response).map_into_boxed_body();
return Ok(response);
}
let route: String = req
.match_pattern()
.map(|value| value.to_string())
.unwrap_or_else(|| req.path().to_string());
let athena_client: Option<String> = req
.extensions()
.get::<ResolvedAthenaClient>()
.map(|value| value.0.clone())
.or_else(|| {
req.headers()
.get("X-Athena-Client")
.and_then(|value| value.to_str().ok())
.map(str::to_string)
});
if let Some(state) = &metrics_state {
state.metrics_state.begin_http_request(&method, &route);
}
let is_query_route: bool = route.starts_with("/gateway/query");
let limiter_for_check: AdmissionLimiter = limiter.clone();
let check: AdmissionCheckOutcome = limiter_for_check.check_allow(athena_client.as_deref());
let mut allow_request: bool = matches!(check, AdmissionCheckOutcome::Allowed);
let store_unavailable: bool = matches!(check, AdmissionCheckOutcome::StoreUnavailable);
if store_unavailable {
allow_request = limiter_for_check.fail_open_on_store_unavailable();
}
if is_query_route && athena_defer {
allow_request = true;
}
if !allow_request {
let should_defer: bool = limiter.should_defer_route(&route);
let mut status_code: u16 = 429;
let decision: &'static str = "rejected";
let mut reason: &'static str = "admission_limit_exceeded";
let deferred_request_id_for_log: Option<String> = None;
let mut resp: actix_web::HttpResponse = if store_unavailable {
status_code = 503;
reason = "admission_store_unavailable";
actix_web::HttpResponse::ServiceUnavailable().json(json!({
"status": "error",
"code": "admission_store_unavailable",
"message": "Admission limiter unavailable",
"error": "Admission store backend is unavailable. Retry shortly."
}))
} else if is_query_route && should_defer {
reason = "gateway_query_requires_explicit_defer_header";
too_many_requests(
"Too Many Requests",
"Gateway query overload requires explicit deferred mode. Retry with header X-Athena-Defer: true to queue the query.",
)
} else {
too_many_requests(
"Too Many Requests",
"Request rate exceeded gateway admission limit",
)
};
let retry_after_seconds: String = limiter.window_secs().to_string();
let retry_after_seconds_u64: u64 = limiter.window_secs();
if let Ok(retry_after_value) = retry_after_seconds.parse() {
resp.headers_mut()
.insert(header::RETRY_AFTER, retry_after_value);
}
resp.headers_mut()
.insert(header::SERVER, "XYLEX/0".parse().unwrap());
apply_common_response_headers(
resp.headers_mut(),
started.elapsed().as_millis(),
Some(&request_id),
None,
athena_client.as_deref(),
);
spawn_admission_event_persist(
metrics_state.clone(),
limiter.clone(),
method.clone(),
route.clone(),
athena_client.clone(),
request_bytes,
athena_defer,
is_query_route,
decision,
reason,
deferred_request_id_for_log,
retry_after_seconds_u64,
);
if let Some(state) = metrics_state {
state.metrics_state.finish_http_request(
&method,
&route,
status_code,
started.elapsed().as_secs_f64(),
request_bytes,
None,
athena_client.as_deref(),
);
}
let response: ServiceResponse = req.into_response(resp).map_into_boxed_body();
return Ok(response);
}
match next.call(req).await {
Ok(mut res) => {
let response_bytes: Option<u64> = res
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
res.headers_mut()
.insert(header::SERVER, "XYLEX/0".parse().unwrap());
apply_common_response_headers(
res.headers_mut(),
started.elapsed().as_millis(),
Some(&request_id),
None,
athena_client.as_deref(),
);
let res = res.map_into_boxed_body();
if let Some(state) = metrics_state {
state.metrics_state.finish_http_request(
&method,
&route,
res.status().as_u16(),
started.elapsed().as_secs_f64(),
request_bytes,
response_bytes,
athena_client.as_deref(),
);
}
Ok(res)
}
Err(err) => {
if let Some(state) = metrics_state {
state.metrics_state.record_http_handler_error(
&method,
&route,
started.elapsed().as_secs_f64(),
request_bytes,
athena_client.as_deref(),
);
}
Err(err)
}
}
}
async fn http_lifecycle_tracing_middleware<B>(
req: actix_web::dev::ServiceRequest,
next: Next<B>,
) -> Result<actix_web::dev::ServiceResponse<B>, actix_web::Error>
where
B: MessageBody + 'static,
{
let method: String = req.method().as_str().to_string();
let path: String = req.path().to_string();
let peer: String = req
.connection_info()
.peer_addr()
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string());
let span: tracing::Span = tracing::info_span!(
"http.request",
http_method = %method,
http_route = %path,
net_peer_addr = %peer,
http_status_code = tracing::field::Empty,
);
let fut: std::pin::Pin<Box<dyn Future<Output = Result<ServiceResponse<B>, actix_web::Error>>>> =
next.call(req);
async move {
let res: ServiceResponse<B> = fut.await?;
let code: u16 = res.status().as_u16();
tracing::Span::current().record("http_status_code", code);
let status = res.status();
if status.is_server_error() {
tracing::error!(
target: "athena::http",
http_method = %method,
http_route = %path,
http_status_code = code,
"request completed"
);
} else if status.is_client_error() {
tracing::warn!(
target: "athena::http",
http_method = %method,
http_route = %path,
http_status_code = code,
"request completed"
);
} else {
tracing::info!(
target: "athena::http",
http_method = %method,
http_route = %path,
http_status_code = code,
"request completed"
);
}
Ok(res)
}
.instrument(span)
.await
}
fn config_sidecar_dotenv_path(config_file: &Path) -> PathBuf {
config_file
.parent()
.map(|dir| dir.join(".env"))
.unwrap_or_else(|| PathBuf::from(".env"))
}
fn load_dotenv_beside_config_file(config_file: &Path) -> SupplementalDotenvStatus {
let env_path: PathBuf = config_sidecar_dotenv_path(config_file);
if !env_path.is_file() {
return SupplementalDotenvStatus::NotFound {
expected_path: env_path,
};
}
match dotenv::from_path(&env_path) {
Ok(()) => {
info!(
path = %env_path.display(),
"Loaded supplementary .env beside config file"
);
SupplementalDotenvStatus::Loaded { path: env_path }
}
Err(err) => {
warn!(
path = %env_path.display(),
error = %err,
"Found .env beside config file but failed to parse it; ignoring \
(quoted Windows paths: use C:/... not C:\\... inside \"...\")"
);
SupplementalDotenvStatus::Failed { path: env_path }
}
}
}
#[actix_web::main]
async fn main() -> IoResult<()> {
let cli: AthenaCli = parse_cli_or_exit();
let command: Option<Command> = match cli.command {
Some(Command::Audit(args)) => {
return cli::run_audit_command(args).map_err(|err| Error::new(Other, err.to_string()));
}
other => other,
};
if let Err(err) = dotenv() {
warn!(
error = %err,
"Failed to parse `.env` in the current directory; variables from that file are skipped \
(in quoted values use forward slashes for Windows paths, e.g. C:/Program Files/..., or escape each \\)"
);
}
initialize_global_redis_from_env();
#[cfg(feature = "sentry")]
let sentry_guard: Option<ClientInitGuard> = init_sentry();
#[cfg(feature = "sentry")]
init_tracing(sentry_guard.is_some());
#[cfg(feature = "sentry")]
let _sentry_guard: Option<ClientInitGuard> = sentry_guard;
#[cfg(not(feature = "sentry"))]
init_tracing(false);
let config_path: Option<PathBuf> = cli.config_path.clone();
let pipelines_path: PathBuf = cli.pipelines_path.clone();
let port_override: Option<u16> = cli.port;
let max_workers_override: Option<usize> = cli.max_workers;
let api_only: bool = cli.api_only;
let cdc_only: bool = {
#[cfg(feature = "cdc")]
{
cli.cdc_only
}
#[cfg(not(feature = "cdc"))]
{
false
}
};
if api_only {
warn!("`--api-only` is deprecated; `athena_rs` now boots the server by default");
}
let config_overridden: bool = config_path.is_some();
let config_path: PathBuf = config_path
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_CONFIG_FILE_NAME));
let pipelines_path: String = pipelines_path.to_string_lossy().to_string();
let (config, resolved_config_path, config_source_label, config_seeded_default): (
Config,
PathBuf,
String,
bool,
) = if config_overridden {
let cfg: Config = Config::load_from(&config_path).map_err(|err| {
let attempted_locations: Vec<ConfigLocation> = vec![ConfigLocation::new(
"explicit config path".to_string(),
config_path.clone(),
)];
Error::new(
Other,
format!(
"Failed to load config '{}': {}. Looked in:\n{}",
config_path.display(),
err,
format_attempted_locations(&attempted_locations)
),
)
})?;
(
cfg,
config_path.clone(),
"explicit --config path".to_string(),
false,
)
} else {
match Config::load_default() {
Ok(outcome) => {
let path: PathBuf = outcome.path.clone();
let source_label: String = outcome
.attempted_locations
.iter()
.find(|location| location.path == path)
.map(|location| location.label.clone())
.unwrap_or_else(|| "auto-discovered config path".to_string());
if outcome.seeded_default {
info!(
path = %path.display(),
"Seeded default configuration (first run)"
);
} else {
info!(path = %path.display(), "Loaded configuration");
}
(outcome.config, path, source_label, outcome.seeded_default)
}
Err(err) => resolve_default_config_load_error(err, config_path.clone(), cdc_only)?,
}
};
let supplemental_dotenv_status: SupplementalDotenvStatus =
load_dotenv_beside_config_file(&resolved_config_path);
initialize_runtime_env_settings(&config.validation_ranges);
let terminal_context: TerminalContext = detect_terminal_context();
info!(
terminal = %terminal_context.terminal_name,
shell = %terminal_context.shell_name,
stderr_is_tty = terminal_context.stderr_is_tty,
ansi_enabled = terminal_context.ansi_enabled,
detection_hint = %terminal_context.hint,
"Detected terminal runtime context"
);
print_colored_boot_config_dump(
&config,
&resolved_config_path,
terminal_context.ansi_enabled,
);
let startup_config_context: StartupConfigContext = StartupConfigContext {
source_label: config_source_label,
config_path: resolved_config_path.clone(),
seeded_default: config_seeded_default,
supplemental_dotenv_status,
terminal_ansi_enabled: terminal_context.ansi_enabled,
};
let runtime_config_metadata = RuntimeConfigMetadata {
config_path: Some(resolved_config_path.to_string_lossy().to_string()),
source_label: Some(startup_config_context.source_label.clone()),
seeded_default: startup_config_context.seeded_default,
};
{
let loki_cfg: athena_rs::features::loki::LokiConfig =
athena_rs::features::loki::LokiConfig::resolve(&config);
if loki_cfg.is_enabled() {
match athena_rs::features::loki::attach_loki_layer(&loki_cfg) {
Ok(true) => {
info!(
url = %loki_cfg.url.as_deref().unwrap_or_default(),
tenant_id = %loki_cfg.tenant_id.as_deref().unwrap_or(""),
label_count = loki_cfg.labels.len(),
extra_field_count = loki_cfg.extra_fields.len(),
"Attached Grafana Loki tracing layer"
);
}
Ok(false) => {
info!("Loki tracing layer skipped: attach returned false (misconfiguration)");
}
Err(err) => {
warn!(
error = %err,
"failed to attach Grafana Loki tracing layer; continuing without it"
);
}
}
}
}
match command {
Some(Command::Server) => {
let bootstrap: Bootstrap =
build_shared_state(&config, &pipelines_path, runtime_config_metadata.clone())
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
run_server(
&config,
&startup_config_context,
bootstrap,
port_override,
max_workers_override,
)
.await
}
Some(Command::Pipeline(args)) => {
let bootstrap: Bootstrap =
build_shared_state(&config, &pipelines_path, runtime_config_metadata.clone())
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
cli::run_pipeline_command(&bootstrap, args)
.await
.map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Clients { command }) => {
cli::run_clients_command(command).map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Fetch(cmd)) => cli::run_fetch_command(cmd)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::Sql(args)) => athena_rs::cli_sql::run_sql_command(args)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::Audit(args)) => {
cli::run_audit_command(args).map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Pools(args)) => cli::run_pools_command(args)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::Provision(args)) => cli::run_provision_command(args)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::PressureWorker) => {
athena_rs::features::client_pressure::run_pressure_worker(&config)
.await
.map_err(|err| Error::new(Other, err.to_string()))
}
#[cfg(feature = "cdc")]
Some(Command::Cdc { command }) => cli::run_cdc_command(&config, command)
.await
.map_err(|err| Error::new(Other, err.to_string())),
Some(Command::Diag) => {
cli::run_diag_command().map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Install(args)) => {
cli::run_install_command(args).map_err(|err| Error::new(Other, err.to_string()))
}
Some(Command::Version) => {
cli::run_version_command();
Ok(())
}
None => {
let raw_args: Vec<String> = env::args().collect();
let prog_name = raw_args.first().map(|s| s.as_str()).unwrap_or("athena");
let short_name = std::path::Path::new(prog_name)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(prog_name);
let non_prog_args = raw_args.len().saturating_sub(1);
if non_prog_args <= 1 && !cdc_only && !api_only {
let is_tty = std::io::stdout().is_terminal();
let has_local_config = std::path::Path::new("config.yaml").is_file()
|| std::path::Path::new("config.yml").is_file();
if is_tty {
eprintln!("{}", "Athena Runtime (athena_rs)".bold().blue());
eprintln!(" Core server and surfaces ({})\n", env!("CARGO_PKG_VERSION"));
if has_local_config {
eprintln!("{} Found {} in current directory.", "→".green(), "config.yaml".cyan());
eprintln!(" Booting the API server by default.");
eprintln!(" Use {} to stop.\n", "Ctrl+C".yellow());
eprintln!(" Recommended CLI: {} (the athena toolchain)", "athena".green());
} else {
eprintln!("This is the Athena runtime.");
eprintln!();
eprintln!("For the user-friendly CLI that interfaces Athena surfaces (APIs, clients, audit, etc.):");
eprintln!(" {}", "athena --help".green());
eprintln!();
eprintln!("Start server explicitly:");
eprintln!(" {} server", short_name.green());
}
eprintln!();
eprintln!("Dev:");
eprintln!(" {} run --bin athena -- --help # the CLI", "cargo".dimmed());
eprintln!(" {} run --bin athena_rs # this runtime", "cargo".dimmed());
} else {
if has_local_config {
eprintln!("athena_rs: starting server (config present)");
} else {
eprintln!("athena_rs: server mode (use the `athena` CLI for operations)");
}
}
return Ok(());
}
#[cfg(feature = "cdc")]
{
if cdc_only {
let port: u16 = port_override.unwrap_or(4053);
websocket_server(port, config.get_cdc_allow_legacy_axum_wildcards())
.await
.map_err(|err| Error::new(Other, err.to_string()))
} else {
let bootstrap: Bootstrap = build_shared_state(
&config,
&pipelines_path,
runtime_config_metadata.clone(),
)
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
run_server(
&config,
&startup_config_context,
bootstrap,
port_override,
max_workers_override,
)
.await
}
}
#[cfg(not(feature = "cdc"))]
{
let bootstrap: Bootstrap =
build_shared_state(&config, &pipelines_path, runtime_config_metadata.clone())
.await
.map_err(|err| Error::new(Other, err.to_string()))?;
run_server(
&config,
&startup_config_context,
bootstrap,
port_override,
max_workers_override,
)
.await
}
}
}
}
fn parse_cli_or_exit() -> AthenaCli {
match AthenaCli::try_parse() {
Ok(cli) => cli,
Err(err) => {
let rendered: String = err.to_string();
let args: Vec<String> = env::args().collect();
let hint: Option<String> = tooling_hints::cli_parse_hint(&args, err.kind());
eprint!("{rendered}");
if let Some(hint) = hint {
eprintln!("\nAthena hint: {hint}");
}
std::process::exit(err.exit_code());
}
}
}
fn default_error_handlers<B>() -> ErrorHandlers<B>
where
B: MessageBody + 'static,
{
ErrorHandlers::new()
.handler(StatusCode::NOT_FOUND, |res: ServiceResponse<B>| {
let has_json_body: bool = res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|ct| ct.contains("application/json"));
if has_json_body {
Ok(ErrorHandlerResponse::Response(
res.map_into_boxed_body().map_into_right_body(),
))
} else {
let (req, _) = res.into_parts();
let resp: actix_web::HttpResponse =
not_found("Not Found", "The requested resource was not found");
Ok(ErrorHandlerResponse::Response(
ServiceResponse::new(req, resp)
.map_into_boxed_body()
.map_into_right_body(),
))
}
})
.default_handler_server(|res: ServiceResponse<B>| {
let has_json_body: bool = res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|ct| ct.contains("application/json"));
if has_json_body {
Ok(ErrorHandlerResponse::Response(
res.map_into_boxed_body().map_into_right_body(),
))
} else {
let (req, _) = res.into_parts();
let resp: actix_web::HttpResponse =
internal_error("Internal Server Error", "An unexpected error occurred");
Ok(ErrorHandlerResponse::Response(
ServiceResponse::new(req, resp)
.map_into_boxed_body()
.map_into_right_body(),
))
}
})
}
async fn run_server(
config: &Config,
startup_config_context: &StartupConfigContext,
bootstrap: Bootstrap,
port_override: Option<u16>,
max_workers_override: Option<usize>,
) -> IoResult<()> {
let port: u16 = port_override.unwrap_or_else(|| config.get_api_port());
let keep_alive: Duration = Duration::from_secs(config.get_http_keep_alive_timeout_secs());
let client_disconnect_timeout: Duration =
Duration::from_secs(config.get_client_disconnect_timeout_value_secs());
let client_request_timeout: Duration =
Duration::from_secs(config.get_client_request_timeout_value_secs());
let worker_count: usize =
max_workers_override.unwrap_or_else(|| config.get_http_worker_count());
if worker_count == 0 {
return Err(Error::new(
Other,
"Invalid --max-workers value: must be greater than 0.",
));
}
let max_connections: usize = config.get_http_max_connections_value();
let backlog: usize = config.get_http_backlog_value();
let tcp_keepalive: Duration = Duration::from_secs(config.get_tcp_keepalive_timeout_secs());
let admission_window_secs: u64 = config.get_gateway_admission_window_secs().max(1);
let admission_store_backend: AdmissionStoreBackend =
match config.get_gateway_admission_store_backend().as_str() {
"memory" => AdmissionStoreBackend::Memory,
_ => AdmissionStoreBackend::Redis,
};
let admission_store_fail_mode: AdmissionStoreFailMode =
match config.get_gateway_admission_store_fail_mode().as_str() {
"fail_open" => AdmissionStoreFailMode::FailOpen,
_ => AdmissionStoreFailMode::FailClosed,
};
let admission_limiter: AdmissionLimiter = AdmissionLimiter::new(
config.get_gateway_admission_limit_enabled(),
Duration::from_secs(admission_window_secs),
config.get_gateway_admission_global_requests_per_window(),
config.get_gateway_admission_per_client_requests_per_window(),
config.get_gateway_admission_defer_on_limit_enabled(),
config.get_gateway_admission_defer_route_prefixes(),
admission_store_backend,
admission_store_fail_mode,
);
if admission_limiter.is_enabled() {
info!(
admission_window_secs = admission_limiter.window_secs(),
admission_global_requests_per_window =
config.get_gateway_admission_global_requests_per_window(),
admission_per_client_requests_per_window =
config.get_gateway_admission_per_client_requests_per_window(),
admission_defer_on_limit_enabled =
config.get_gateway_admission_defer_on_limit_enabled(),
admission_store_backend = ?admission_store_backend,
admission_store_fail_mode = ?admission_store_fail_mode,
"Gateway admission limiter enabled"
);
}
let addr: SocketAddr = SocketAddr::from(([0, 0, 0, 0], port));
let socket: Socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
socket.set_nonblocking(true)?;
socket.set_reuse_address(true)?;
socket.set_keepalive(true)?;
let keepalive_cfg: TcpKeepalive = TcpKeepalive::new().with_time(tcp_keepalive);
socket.set_tcp_keepalive(&keepalive_cfg)?;
socket.bind(&addr.into()).map_err(|e| {
let msg = if e.kind() == AddrInUse {
format!(
"Address already in use (port {}). Stop the process using this port or choose a different one (e.g. --port).",
port
)
} else {
format!("Failed to bind to {}: {}", addr, e)
};
Error::new(e.kind(), msg)
})?;
let listen_backlog: i32 = backlog.min(i32::MAX as usize) as i32;
socket
.listen(listen_backlog)
.map_err(|e| Error::new(e.kind(), format!("Failed to listen on {}: {}", addr, e)))?;
let listener: TcpListener = socket.into();
let app_state: Data<AppState> = bootstrap.app_state.clone();
let startup_summary: ServerStartupSummary = ServerStartupSummary {
version: env!("CARGO_PKG_VERSION"),
build_profile: build_profile_name(),
working_directory: env::current_dir().ok(),
config_context: startup_config_context.clone(),
listen_addr: addr,
worker_count,
max_connections,
backlog,
keep_alive,
client_disconnect_timeout,
client_request_timeout,
daemon_enabled: config.get_daemon_enabled(),
prometheus_enabled: app_state.prometheus_metrics_enabled,
logging_client_name: app_state.logging_client_name.clone(),
gateway_auth_client_name: app_state.gateway_auth_client_name.clone(),
gateway_benchmark_client_name: app_state.gateway_benchmark_client_name.clone(),
};
print_server_startup_banner(&startup_summary);
log_server_startup_summary(&startup_summary);
if app_state.backup_worker_enabled {
if let Err(err) = ensure_pg_tools().await {
athena_rs::disable_backup_workers_for_process();
warn!(
error = %err,
"PostgreSQL tools unavailable at startup; continuing with backup workers disabled for this process"
);
}
}
let shutdown_write_buffer: Option<Arc<athena_rs::deferred_write::WriteBuffer>> =
app_state.write_buffer.clone();
let shutdown_wal_manager: Option<Arc<athena_rs::deferred_write::WalManager>> =
app_state.wal_manager.clone();
let shutdown_pg_registry: Arc<
athena_rs::drivers::postgresql::sqlx_driver::PostgresClientRegistry,
> = Arc::clone(&app_state.pg_registry);
let shutdown_cache: Arc<moka::future::Cache<String, serde_json::Value>> =
Arc::clone(&app_state.cache);
if config.get_daemon_enabled() {
info!("Daemon mode enabled in config; API runtime skips daemon-owned background workers");
} else {
let inline_workers =
athena_rs::runtime::spawn_api_inline_runtime(app_state.clone(), config);
if !inline_workers.is_empty() {
athena_rs::daemon::spawn_runtime_registry_heartbeat(
app_state.clone(),
"api_inline",
None,
inline_workers,
json!({
"management_mode": "inline_runtime",
"runtime": "api",
}),
);
}
}
warn!("CORS is temporarily disabled: allowing any origin");
HttpServer::new(move || {
let limiter: AdmissionLimiter = admission_limiter.clone();
let cors: Cors = Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header();
let app = App::new()
.wrap(DefaultHeaders::new().add(("X-Athena-Version", env!("CARGO_PKG_VERSION"))))
.wrap(default_error_handlers())
.wrap(cors)
.wrap(actix_web::middleware::from_fn(move |req, next| {
admission_and_wildcard_middleware(req, next, limiter.clone())
}))
.wrap(actix_web::middleware::from_fn(
http_lifecycle_tracing_middleware,
))
.app_data(app_state.clone())
.service(root)
.service(health)
.service(ping)
.service(cluster_health)
.service(sql_query)
.service(gateway_sql_query)
.service(gateway_sql_d1_migrate)
.service(sql_count_query)
.service(fetch_data_route)
.service(get_data_route)
.service(proxy_fetch_data_route)
.service(gateway_update_route)
.service(gateway_query_route)
.service(gateway_rpc_route)
.service(insert_data)
.service(delete_data)
.service(rpc_get_route)
.service(rpc_post_route)
.service(postgrest_get_route)
.service(postgrest_post_route)
.service(postgrest_patch_route)
.service(postgrest_delete_route)
.service(run_pipeline)
.service(simulate_pipeline)
.service(list_pipeline_templates)
.service(athena_router_registry)
.service(athena_openapi_host)
.service(athena_wss_openapi_host)
.service(athena_error_catalog)
.service(api_registry)
.service(athena_docs)
.service(api_registry_by_id)
.service(gateway_deferred_flush_route)
.service(gateway_deferred_status)
.service(gateway_deferred_requeue)
.service(gateway_wss_info)
.service(gateway_wss_route)
.service(public_route_dispatch)
.configure(webhook_sinks::services)
.configure(admin::services)
.configure(billing::services)
.configure(chat::services)
.configure(daemons::services)
.configure(configure_optional_operator_services)
.configure(management::services)
.configure(debug::services)
.configure(schema::services)
.configure(athena_rs::api::service_router::services)
.configure(storage::services)
.configure(typesense::services)
.service(ssl_enforcement)
.service(prometheus_metrics);
app
})
.workers(worker_count)
.keep_alive(keep_alive)
.client_disconnect_timeout(client_disconnect_timeout)
.client_request_timeout(client_request_timeout)
.max_connections(max_connections)
.backlog(backlog as u32)
.listen(listener)?
.run()
.await?;
if let Some(ref buf) = shutdown_write_buffer {
tracing::info!("deferred_write: flushing pending writes on shutdown");
let summary: athena_rs::deferred_write::FlushSummary =
athena_rs::deferred_write::flush_pending(
buf,
shutdown_wal_manager.as_deref(),
&shutdown_pg_registry,
&shutdown_cache,
)
.await;
tracing::info!(
total = summary.total,
succeeded = summary.succeeded,
failed = summary.failed,
"deferred_write: shutdown flush complete"
);
}
Ok(())
}
fn ensure_log_destination(dir: &Path) -> IoResult<()> {
std::fs::create_dir_all(dir)?;
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join(SUCCESS_LOG_FILE_NAME))?;
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join(ERROR_LOG_FILE_NAME))?;
Ok(())
}
fn tracing_log_directory_candidates() -> Vec<PathBuf> {
let mut candidates: Vec<PathBuf> = Vec::new();
if let Ok(value) = env::var(TRACING_LOG_DIR_ENV) {
let trimmed: &str = value.trim();
if !trimmed.is_empty() {
candidates.push(PathBuf::from(trimmed));
}
}
candidates.push(PathBuf::from(PRIMARY_TRACING_LOG_DIR));
candidates.push(PathBuf::from(FALLBACK_TRACING_LOG_DIR));
candidates.push(env::temp_dir().join("athena").join("log"));
let mut deduped: Vec<PathBuf> = Vec::new();
for candidate in candidates {
if deduped.iter().any(|existing| existing == &candidate) {
continue;
}
deduped.push(candidate);
}
deduped
}
fn resolve_tracing_log_directory() -> (Option<PathBuf>, Option<String>) {
let mut failures: Vec<(PathBuf, Error)> = Vec::new();
for candidate in tracing_log_directory_candidates() {
match ensure_log_destination(&candidate) {
Ok(()) => {
if failures.is_empty() {
return (Some(candidate), None);
}
let failed_paths: String = failures
.iter()
.map(|(path, err)| format!("{} ({err})", path.display()))
.collect::<Vec<String>>()
.join("; ");
let permission_hint: Option<String> = failures
.iter()
.find(|(_, err)| err.kind() == std::io::ErrorKind::PermissionDenied)
.and_then(|(path, _)| {
tooling_hints::tracing_log_permission_hint(
path.as_path(),
Some(candidate.as_path()),
)
});
let message: String = format!(
"unable to initialize tracing log files at {failed_paths}; using {} (set {} to pin a writable directory)",
candidate.display(),
TRACING_LOG_DIR_ENV
);
return (
Some(candidate.clone()),
Some(match permission_hint {
Some(hint) => format!("{message}. {hint}"),
None => message,
}),
);
}
Err(err) => failures.push((candidate, err)),
}
}
let attempted_paths: String = failures
.iter()
.map(|(path, err)| format!("{} ({err})", path.display()))
.collect::<Vec<String>>()
.join("; ");
let permission_hint: Option<String> = failures
.iter()
.find(|(_, err)| err.kind() == std::io::ErrorKind::PermissionDenied)
.and_then(|(path, _)| tooling_hints::tracing_log_permission_hint(path.as_path(), None));
let message: String = format!(
"unable to initialize tracing log files at {attempted_paths}; continuing without file log sinks. Set {} to a writable directory to enable file sinks.",
TRACING_LOG_DIR_ENV
);
(
None,
Some(match permission_hint {
Some(hint) => format!("{message} {hint}"),
None => message,
}),
)
}
fn is_error_level(metadata: &tracing::Metadata<'_>) -> bool {
*metadata.level() == tracing::Level::ERROR
}
fn allow_output_target(
metadata: &tracing::Metadata<'_>,
suppress_tokio_console_noise: bool,
) -> bool {
if !suppress_tokio_console_noise {
return true;
}
#[cfg(feature = "tokio-console")]
{
!tokio_console::should_suppress_output_target(metadata.target())
}
#[cfg(not(feature = "tokio-console"))]
{
let _ = metadata;
true
}
}
fn athena_tracing_stdout_fmt_layer<S>(
ansi: bool,
suppress_tokio_console_noise: bool,
) -> impl tracing_subscriber::layer::Layer<S> + Send + Sync + 'static
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
tracing_subscriber::fmt::layer()
.with_ansi(ansi)
.with_level(true)
.with_target(true)
.with_timer(ChronoLocal::new("%H:%M:%S%.3f".to_string()))
.with_filter(tracing_subscriber::filter::filter_fn(move |metadata| {
allow_output_target(metadata, suppress_tokio_console_noise)
}))
}
fn athena_tracing_file_layer<S>(
writer: NonBlocking,
errors_only: bool,
suppress_tokio_console_noise: bool,
) -> impl tracing_subscriber::layer::Layer<S> + Send + Sync + 'static
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_level(true)
.with_target(true)
.with_timer(ChronoLocal::new("%H:%M:%S%.3f".to_string()))
.with_writer(writer)
.with_filter(tracing_subscriber::filter::filter_fn(move |metadata| {
allow_output_target(metadata, suppress_tokio_console_noise)
&& if errors_only {
is_error_level(metadata)
} else {
!is_error_level(metadata)
}
}))
}
#[cfg(feature = "tokio-console")]
fn tokio_console_env_opt_in() -> bool {
tokio_console::env_opt_in()
}
#[cfg(feature = "tokio-console")]
fn tokio_console_subscriber_layer<S>()
-> impl tracing_subscriber::layer::Layer<S> + Send + Sync + 'static
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
tokio_console::subscriber_layer()
}
#[cfg(feature = "tokio-console")]
fn tokio_console_trace_targets_filter() -> tracing_subscriber::filter::Targets {
tokio_console::trace_targets_filter()
}
fn loki_tracing_env_filter() -> EnvFilter {
EnvFilter::try_from_env("ATHENA_LOKI_FILTER")
.or_else(|_| EnvFilter::try_from_default_env())
.unwrap_or_else(|_| EnvFilter::new("info"))
}
fn loki_reservation_stack() -> Box<dyn Layer<Registry> + Send + Sync + 'static> {
Box::new(athena_rs::features::loki::reservation_layer().with_filter(loki_tracing_env_filter()))
}
#[cfg(feature = "sentry")]
macro_rules! maybe_sentry_layer {
($enable:expr, $filter:expr) => {
$enable.then(|| sentry_tracing::layer().with_filter($filter))
};
}
#[cfg(not(feature = "sentry"))]
macro_rules! maybe_sentry_layer {
($enable:expr, $filter:expr) => {
None::<()>
};
}
fn init_tracing(enable_sentry_layer: bool) {
#[cfg(feature = "tokio-console")]
let console_opt_in: bool = {
let requested = tokio_console_env_opt_in();
if requested && !TOKIO_UNSTABLE_ENABLED {
eprintln!(
"ATHENA_BENCHMARK_CLIENT=true requested tokio-console, but this build was not compiled with RUSTFLAGS=\"--cfg tokio_unstable\"; continuing without tokio-console subscriber"
);
false
} else {
requested
}
};
let stdout_filter: EnvFilter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(DEFAULT_TRACING_FILTER));
let ansi: bool = resolve_ansi_enabled_for_console();
let (log_dir, file_log_warning) = resolve_tracing_log_directory();
let mut file_log_guards: Vec<WorkerGuard> = Vec::new();
let file_writers: Option<(NonBlocking, NonBlocking)> = if let Some(log_dir) = log_dir.as_ref() {
let success_appender: tracing_appender::rolling::RollingFileAppender =
tracing_appender::rolling::never(log_dir, SUCCESS_LOG_FILE_NAME);
let (success_writer, success_guard) = tracing_appender::non_blocking(success_appender);
file_log_guards.push(success_guard);
let error_appender: tracing_appender::rolling::RollingFileAppender =
tracing_appender::rolling::never(log_dir, ERROR_LOG_FILE_NAME);
let (error_writer, error_guard) = tracing_appender::non_blocking(error_appender);
file_log_guards.push(error_guard);
Some((success_writer, error_writer))
} else {
None
};
if !file_log_guards.is_empty() {
if let Ok(mut guards) = FILE_LOG_GUARDS.lock() {
guards.extend(file_log_guards);
} else {
eprintln!("failed to retain tracing file log worker guards");
}
}
#[cfg(feature = "tokio-console")]
{
if console_opt_in {
match file_writers {
Some((sw, ew)) => {
tracing_subscriber::registry()
.with(loki_reservation_stack())
.with(
tokio_console_subscriber_layer()
.with_filter(tokio_console_trace_targets_filter()),
)
.with(
athena_tracing_stdout_fmt_layer(ansi, true)
.with_filter(stdout_filter.clone()),
)
.with(
athena_tracing_file_layer(sw, false, true)
.with_filter(stdout_filter.clone()),
)
.with(
athena_tracing_file_layer(ew, true, true)
.with_filter(stdout_filter.clone()),
)
.with(maybe_sentry_layer!(
enable_sentry_layer,
stdout_filter.clone()
))
.init();
}
None => {
tracing_subscriber::registry()
.with(loki_reservation_stack())
.with(
tokio_console_subscriber_layer()
.with_filter(tokio_console_trace_targets_filter()),
)
.with(
athena_tracing_stdout_fmt_layer(ansi, true)
.with_filter(stdout_filter.clone()),
)
.with(maybe_sentry_layer!(
enable_sentry_layer,
stdout_filter.clone()
))
.init();
}
}
info!(
message = "tokio-console subscriber active (ATHENA_BENCHMARK_CLIENT=true); optional TOKIO_CONSOLE_BIND for listen address (default 127.0.0.1:6669); TOKIO_CONSOLE_RETENTION is floored at 1h for completed tasks; stdout/file logs use RUST_LOG (default info); Grafana URL is typically http://<host>:6669"
);
} else {
match file_writers {
Some((sw, ew)) => {
tracing_subscriber::registry()
.with(loki_reservation_stack())
.with(
athena_tracing_stdout_fmt_layer(ansi, false)
.with_filter(stdout_filter.clone()),
)
.with(
athena_tracing_file_layer(sw, false, false)
.with_filter(stdout_filter.clone()),
)
.with(
athena_tracing_file_layer(ew, true, false)
.with_filter(stdout_filter.clone()),
)
.with(maybe_sentry_layer!(
enable_sentry_layer,
stdout_filter.clone()
))
.init();
}
None => {
tracing_subscriber::registry()
.with(loki_reservation_stack())
.with(
athena_tracing_stdout_fmt_layer(ansi, false)
.with_filter(stdout_filter.clone()),
)
.with(maybe_sentry_layer!(
enable_sentry_layer,
stdout_filter.clone()
))
.init();
}
}
}
}
#[cfg(not(feature = "tokio-console"))]
{
match file_writers {
Some((sw, ew)) => {
tracing_subscriber::registry()
.with(loki_reservation_stack())
.with(
athena_tracing_stdout_fmt_layer(ansi, false)
.with_filter(stdout_filter.clone()),
)
.with(
athena_tracing_file_layer(sw, false, false)
.with_filter(stdout_filter.clone()),
)
.with(
athena_tracing_file_layer(ew, true, false)
.with_filter(stdout_filter.clone()),
)
.with(maybe_sentry_layer!(
enable_sentry_layer,
stdout_filter.clone()
))
.init();
}
None => {
tracing_subscriber::registry()
.with(loki_reservation_stack())
.with(
athena_tracing_stdout_fmt_layer(ansi, false)
.with_filter(stdout_filter.clone()),
)
.with(maybe_sentry_layer!(
enable_sentry_layer,
stdout_filter.clone()
))
.init();
}
}
}
if let Some(message) = file_log_warning {
warn!(message = %message, "tracing file logging status");
}
}
#[cfg(feature = "sentry")]
const BETTERSTACK_SENTRY_DSN: &str =
"https://mMR1Bs5K6vSzXT8YGYyUxQSE@s1741777.eu-fsn-3.betterstackdata.com/1741777";
#[cfg(feature = "sentry")]
fn init_sentry() -> Option<sentry::ClientInitGuard> {
let dsn_source: String =
env::var("SENTRY_DSN").unwrap_or_else(|_| BETTERSTACK_SENTRY_DSN.to_string());
if dsn_source.trim().is_empty() {
return None;
}
let dsn: SentryDsn = match dsn_source.parse::<SentryDsn>() {
Ok(dsn) => dsn,
Err(err) => {
eprintln!("failed to parse Sentry DSN: {err}");
return None;
}
};
let mut options: sentry::ClientOptions = sentry::ClientOptions {
dsn: Some(dsn),
release: Some(env!("CARGO_PKG_VERSION").into()),
environment: env::var("SENTRY_ENVIRONMENT").ok().map(Into::into),
attach_stacktrace: true,
..Default::default()
};
if let Ok(value) = env::var("SENTRY_SAMPLE_RATE")
&& let Ok(parsed) = value.parse::<f32>()
{
options.sample_rate = parsed;
}
Some(sentry::init(options))
}
fn resolve_default_config_load_error(
err: ConfigLoadError,
config_path: PathBuf,
cdc_only: bool,
) -> Result<(Config, PathBuf, String, bool), Error> {
let attempts: Vec<ConfigLocation> = err.attempted_locations.clone();
#[cfg(feature = "cdc")]
if cdc_only {
warn!(
error = %err,
"Failed to load config.yaml; continuing --cdc-only with in-memory fallback config"
);
return Ok((
Config::empty_fallback(),
config_path,
"in-memory fallback (--cdc-only)".to_string(),
false,
));
}
Err(Error::new(
Other,
format!(
"Failed to load config '{}': {}. Looked in:\n{}",
config_path.display(),
err,
format_attempted_locations(&attempts)
),
))
}
fn format_attempted_locations(locations: &[ConfigLocation]) -> String {
locations
.iter()
.map(|location| format!("- {}", location.describe()))
.collect::<Vec<_>>()
.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_sidecar_dotenv_path_uses_config_directory() {
let path = config_sidecar_dotenv_path(Path::new("config/athena/config.yaml"));
assert_eq!(path, PathBuf::from("config/athena/.env"));
}
#[test]
fn supplemental_dotenv_status_display_value_includes_status() {
let status = SupplementalDotenvStatus::Loaded {
path: PathBuf::from("C:/athena/.env"),
};
assert!(
status.display_value().ends_with("athena/.env (loaded)")
|| status.display_value().ends_with("athena\\.env (loaded)")
);
assert_eq!(status.status_label(), "loaded");
}
}