use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::sync::OnceLock;
use tracing::warn;
const U16_MAX_F64: f64 = u16::MAX as f64;
const U32_MAX_F64: f64 = u32::MAX as f64;
const U64_MAX_F64: f64 = u64::MAX as f64;
const USIZE_MAX_F64: f64 = usize::MAX as f64;
const I32_MIN_F64: f64 = i32::MIN as f64;
const I32_MAX_F64: f64 = i32::MAX as f64;
const I64_MIN_F64: f64 = i64::MIN as f64;
const I64_MAX_F64: f64 = i64::MAX as f64;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct NumericRange {
pub min: f64,
pub max: f64,
}
impl NumericRange {
pub const fn new(min: f64, max: f64) -> Self {
Self { min, max }
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ValidationRanges {
#[serde(default)]
pub env: HashMap<String, NumericRange>,
#[serde(default)]
pub config: HashMap<String, NumericRange>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CorrectionReason {
ParseError,
BelowMin,
AboveMax,
}
#[derive(Debug, Clone, Copy)]
struct ParsedNumeric<T> {
value: T,
reason: Option<CorrectionReason>,
}
#[derive(Debug, Clone)]
struct EnvCorrection {
key: String,
provided: String,
corrected: String,
min: String,
max: String,
reason: CorrectionReason,
}
#[derive(Debug, Clone)]
pub struct RuntimeEnvSettings {
pub pool_monitor_interval_secs: u64,
pub pool_monitor_retention_hours: i64,
pub vacuum_health_interval_secs: u64,
pub vacuum_health_retention_days: i64,
pub vacuum_health_statement_timeout_ms: u64,
pub client_reconnect_interval_secs: u64,
pub cache_max_capacity: u64,
pub cache_max_entry_weight: usize,
pub pg_pool_max_connections: u32,
pub deadpool_warmup_timeout_ms: u64,
pub pg_pool_max_lifetime_secs: u64,
pub pg_pool_min_connections: u32,
pub pg_pool_acquire_timeout_secs: u64,
pub pg_pool_idle_timeout_secs: u64,
pub deferred_max_attempts: i32,
pub deferred_worker_no_storage_poll_ms: u64,
pub deadpool_checkout_timeout_ms: u64,
pub fetch_singleflight_wait_timeout_ms: u64,
pub cache_invalidation_window_ms: u64,
pub redis_op_timeout_ms: u64,
pub redis_cooldown_ms: u64,
pub vacuum_health_bloat_dead_pct: f64,
pub vacuum_health_xid_risk_pct: f64,
pub realtime_registry_purge_interval_secs: u64,
pub realtime_registry_inactive_after_secs: u64,
}
impl Default for RuntimeEnvSettings {
fn default() -> Self {
Self {
pool_monitor_interval_secs: 30,
pool_monitor_retention_hours: 24,
vacuum_health_interval_secs: 3600,
vacuum_health_retention_days: 30,
vacuum_health_statement_timeout_ms: 60_000,
client_reconnect_interval_secs: 20,
cache_max_capacity: 10_000,
cache_max_entry_weight: 256 * 1024,
pg_pool_max_connections: 100,
deadpool_warmup_timeout_ms: 800,
pg_pool_max_lifetime_secs: 900,
pg_pool_min_connections: 0,
pg_pool_acquire_timeout_secs: 8,
pg_pool_idle_timeout_secs: 120,
deferred_max_attempts: 3,
deferred_worker_no_storage_poll_ms: 30_000,
deadpool_checkout_timeout_ms: 800,
fetch_singleflight_wait_timeout_ms: 5_000,
cache_invalidation_window_ms: 25,
redis_op_timeout_ms: 30,
redis_cooldown_ms: 5_000,
vacuum_health_bloat_dead_pct: 10.0,
vacuum_health_xid_risk_pct: 50.0,
realtime_registry_purge_interval_secs: 30,
realtime_registry_inactive_after_secs: 300,
}
}
}
static RUNTIME_ENV_SETTINGS: OnceLock<RuntimeEnvSettings> = OnceLock::new();
pub fn runtime_env_settings() -> &'static RuntimeEnvSettings {
RUNTIME_ENV_SETTINGS.get_or_init(RuntimeEnvSettings::default)
}
pub fn initialize_runtime_env_settings(ranges: &ValidationRanges) -> RuntimeEnvSettings {
if let Some(existing) = RUNTIME_ENV_SETTINGS.get() {
return existing.clone();
}
let (settings, corrections): (RuntimeEnvSettings, Vec<EnvCorrection>) =
build_runtime_env_settings(ranges);
for correction in corrections {
warn!(
env_key = %correction.key,
provided = %correction.provided,
corrected = %correction.corrected,
min = %correction.min,
max = %correction.max,
reason = ?correction.reason,
"Normalized numeric environment value using configured range"
);
}
let _ = RUNTIME_ENV_SETTINGS.set(settings.clone());
settings
}
pub fn normalize_config_u16(
ranges: &ValidationRanges,
key: &str,
raw: Option<&String>,
fallback_range: NumericRange,
) -> u16 {
let resolved: NumericRange = resolve_range(&ranges.config, key, fallback_range);
let normalized: ParsedNumeric<u16> = clamp_u16(raw.map(|v| v.as_str()), resolved);
normalized.value
}
pub fn normalize_config_u32(
ranges: &ValidationRanges,
key: &str,
raw: Option<&String>,
fallback_range: NumericRange,
) -> u32 {
let resolved: NumericRange = resolve_range(&ranges.config, key, fallback_range);
let normalized: ParsedNumeric<u32> = clamp_u32(raw.map(|v| v.as_str()), resolved);
normalized.value
}
pub fn normalize_config_u64(
ranges: &ValidationRanges,
key: &str,
raw: Option<&String>,
fallback_range: NumericRange,
) -> u64 {
let resolved: NumericRange = resolve_range(&ranges.config, key, fallback_range);
let normalized: ParsedNumeric<u64> = clamp_u64(raw.map(|v| v.as_str()), resolved);
normalized.value
}
pub fn normalize_config_usize(
ranges: &ValidationRanges,
key: &str,
raw: Option<&String>,
fallback_range: NumericRange,
) -> usize {
let resolved: NumericRange = resolve_range(&ranges.config, key, fallback_range);
let normalized: ParsedNumeric<usize> = clamp_usize(raw.map(|v| v.as_str()), resolved);
normalized.value
}
pub fn normalize_config_i32(
ranges: &ValidationRanges,
key: &str,
raw: Option<&String>,
fallback_range: NumericRange,
) -> i32 {
let resolved: NumericRange = resolve_range(&ranges.config, key, fallback_range);
let normalized: ParsedNumeric<i32> = clamp_i32(raw.map(|v| v.as_str()), resolved);
normalized.value
}
pub fn normalize_config_f64(
ranges: &ValidationRanges,
key: &str,
raw: Option<&String>,
fallback_range: NumericRange,
) -> f64 {
let resolved: NumericRange = resolve_range(&ranges.config, key, fallback_range);
let normalized: ParsedNumeric<f64> = clamp_f64(raw.map(|v| v.as_str()), resolved);
normalized.value
}
fn build_runtime_env_settings(
ranges: &ValidationRanges,
) -> (RuntimeEnvSettings, Vec<EnvCorrection>) {
let mut corrections: Vec<EnvCorrection> = Vec::new();
let settings: RuntimeEnvSettings = RuntimeEnvSettings {
pool_monitor_interval_secs: env_u64(
"ATHENA_POOL_MONITOR_INTERVAL_SECS",
ranges,
NumericRange::new(30.0, 3600.0),
&mut corrections,
),
pool_monitor_retention_hours: env_i64(
"ATHENA_POOL_MONITOR_RETENTION_HOURS",
ranges,
NumericRange::new(24.0, 720.0),
&mut corrections,
),
vacuum_health_interval_secs: env_u64(
"ATHENA_VACUUM_HEALTH_INTERVAL_SECS",
ranges,
NumericRange::new(300.0, 86_400.0),
&mut corrections,
),
vacuum_health_retention_days: env_i64(
"ATHENA_VACUUM_HEALTH_RETENTION_DAYS",
ranges,
NumericRange::new(30.0, 365.0),
&mut corrections,
),
vacuum_health_statement_timeout_ms: env_u64(
"ATHENA_VACUUM_HEALTH_STATEMENT_TIMEOUT_MS",
ranges,
NumericRange::new(10_000.0, 600_000.0),
&mut corrections,
),
client_reconnect_interval_secs: env_u64(
"ATHENA_CLIENT_RECONNECT_INTERVAL_SECS",
ranges,
NumericRange::new(20.0, 120.0),
&mut corrections,
),
cache_max_capacity: env_u64(
"ATHENA_CACHE_MAX_CAPACITY",
ranges,
NumericRange::new(10_000.0, 1_000_000.0),
&mut corrections,
),
cache_max_entry_weight: env_usize(
"ATHENA_CACHE_MAX_ENTRY_WEIGHT",
ranges,
NumericRange::new(262_144.0, 16_777_216.0),
&mut corrections,
),
pg_pool_max_connections: env_u32(
"ATHENA_PG_POOL_MAX_CONNECTIONS",
ranges,
NumericRange::new(100.0, 500.0),
&mut corrections,
),
deadpool_warmup_timeout_ms: env_u64(
"ATHENA_DEADPOOL_WARMUP_TIMEOUT_MS",
ranges,
NumericRange::new(800.0, 60_000.0),
&mut corrections,
),
pg_pool_max_lifetime_secs: env_u64(
"ATHENA_PG_POOL_MAX_LIFETIME_SECS",
ranges,
NumericRange::new(900.0, 86_400.0),
&mut corrections,
),
pg_pool_min_connections: env_u32(
"ATHENA_PG_POOL_MIN_CONNECTIONS",
ranges,
NumericRange::new(0.0, 500.0),
&mut corrections,
),
pg_pool_acquire_timeout_secs: env_u64(
"ATHENA_PG_POOL_ACQUIRE_TIMEOUT_SECS",
ranges,
NumericRange::new(8.0, 300.0),
&mut corrections,
),
pg_pool_idle_timeout_secs: env_u64(
"ATHENA_PG_POOL_IDLE_TIMEOUT_SECS",
ranges,
NumericRange::new(120.0, 3600.0),
&mut corrections,
),
deferred_max_attempts: env_i32(
"ATHENA_DEFERRED_MAX_ATTEMPTS",
ranges,
NumericRange::new(3.0, 50.0),
&mut corrections,
),
deferred_worker_no_storage_poll_ms: env_u64(
"ATHENA_DEFERRED_WORKER_NO_STORAGE_POLL_MS",
ranges,
NumericRange::new(30_000.0, 300_000.0),
&mut corrections,
),
deadpool_checkout_timeout_ms: env_u64(
"ATHENA_DEADPOOL_CHECKOUT_TIMEOUT_MS",
ranges,
NumericRange::new(800.0, 60_000.0),
&mut corrections,
),
fetch_singleflight_wait_timeout_ms: env_u64(
"ATHENA_FETCH_SINGLEFLIGHT_WAIT_TIMEOUT_MS",
ranges,
NumericRange::new(5_000.0, 120_000.0),
&mut corrections,
),
cache_invalidation_window_ms: env_u64(
"ATHENA_CACHE_INVALIDATION_WINDOW_MS",
ranges,
NumericRange::new(25.0, 10_000.0),
&mut corrections,
),
redis_op_timeout_ms: env_u64(
"ATHENA_REDIS_OP_TIMEOUT_MS",
ranges,
NumericRange::new(30.0, 10_000.0),
&mut corrections,
),
redis_cooldown_ms: env_u64(
"ATHENA_REDIS_COOLDOWN_MS",
ranges,
NumericRange::new(5_000.0, 600_000.0),
&mut corrections,
),
vacuum_health_bloat_dead_pct: env_f64(
"ATHENA_VACUUM_HEALTH_BLOAT_DEAD_PCT",
ranges,
NumericRange::new(10.0, 100.0),
&mut corrections,
),
vacuum_health_xid_risk_pct: env_f64(
"ATHENA_VACUUM_HEALTH_XID_RISK_PCT",
ranges,
NumericRange::new(50.0, 100.0),
&mut corrections,
),
realtime_registry_purge_interval_secs: env_u64(
"ATHENA_REALTIME_REGISTRY_PURGE_INTERVAL_SECS",
ranges,
NumericRange::new(30.0, 3600.0),
&mut corrections,
),
realtime_registry_inactive_after_secs: env_u64(
"ATHENA_REALTIME_REGISTRY_INACTIVE_AFTER_SECS",
ranges,
NumericRange::new(300.0, 86_400.0),
&mut corrections,
),
};
(settings, corrections)
}
fn env_u32(
key: &str,
ranges: &ValidationRanges,
fallback_range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) -> u32 {
let raw: Option<String> = env::var(key).ok();
let resolved: NumericRange = resolve_range(&ranges.env, key, fallback_range);
let parsed: ParsedNumeric<u32> = clamp_u32(raw.as_deref(), resolved);
maybe_push_correction(
key,
raw.as_deref(),
parsed.reason,
parsed.value.to_string(),
resolved,
corrections,
);
parsed.value
}
fn env_u64(
key: &str,
ranges: &ValidationRanges,
fallback_range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) -> u64 {
let raw: Option<String> = env::var(key).ok();
let resolved: NumericRange = resolve_range(&ranges.env, key, fallback_range);
let parsed: ParsedNumeric<u64> = clamp_u64(raw.as_deref(), resolved);
maybe_push_correction(
key,
raw.as_deref(),
parsed.reason,
parsed.value.to_string(),
resolved,
corrections,
);
parsed.value
}
fn env_usize(
key: &str,
ranges: &ValidationRanges,
fallback_range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) -> usize {
let raw: Option<String> = env::var(key).ok();
let resolved: NumericRange = resolve_range(&ranges.env, key, fallback_range);
let parsed: ParsedNumeric<usize> = clamp_usize(raw.as_deref(), resolved);
maybe_push_correction(
key,
raw.as_deref(),
parsed.reason,
parsed.value.to_string(),
resolved,
corrections,
);
parsed.value
}
fn env_i32(
key: &str,
ranges: &ValidationRanges,
fallback_range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) -> i32 {
let raw: Option<String> = env::var(key).ok();
let resolved: NumericRange = resolve_range(&ranges.env, key, fallback_range);
let parsed: ParsedNumeric<i32> = clamp_i32(raw.as_deref(), resolved);
maybe_push_correction(
key,
raw.as_deref(),
parsed.reason,
parsed.value.to_string(),
resolved,
corrections,
);
parsed.value
}
fn env_i64(
key: &str,
ranges: &ValidationRanges,
fallback_range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) -> i64 {
let raw: Option<String> = env::var(key).ok();
let resolved: NumericRange = resolve_range(&ranges.env, key, fallback_range);
let parsed: ParsedNumeric<i64> = clamp_i64(raw.as_deref(), resolved);
maybe_push_correction(
key,
raw.as_deref(),
parsed.reason,
parsed.value.to_string(),
resolved,
corrections,
);
parsed.value
}
fn env_f64(
key: &str,
ranges: &ValidationRanges,
fallback_range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) -> f64 {
let raw: Option<String> = env::var(key).ok();
let resolved: NumericRange = resolve_range(&ranges.env, key, fallback_range);
let parsed: ParsedNumeric<f64> = clamp_f64(raw.as_deref(), resolved);
maybe_push_correction(
key,
raw.as_deref(),
parsed.reason,
format_numeric(parsed.value),
resolved,
corrections,
);
parsed.value
}
fn maybe_push_correction(
key: &str,
raw: Option<&str>,
reason: Option<CorrectionReason>,
corrected: String,
range: NumericRange,
corrections: &mut Vec<EnvCorrection>,
) {
let Some(provided) = raw else {
return;
};
let Some(reason) = reason else {
return;
};
corrections.push(EnvCorrection {
key: key.to_string(),
provided: provided.to_string(),
corrected,
min: format_numeric(range.min),
max: format_numeric(range.max),
reason,
});
}
fn resolve_range(
overrides: &HashMap<String, NumericRange>,
key: &str,
fallback: NumericRange,
) -> NumericRange {
let candidate: NumericRange = overrides.get(key).copied().unwrap_or(fallback);
sanitize_range(candidate, fallback)
}
fn sanitize_range(candidate: NumericRange, fallback: NumericRange) -> NumericRange {
if !candidate.min.is_finite() || !candidate.max.is_finite() {
return fallback;
}
if candidate.min <= candidate.max {
candidate
} else {
NumericRange::new(candidate.max, candidate.min)
}
}
fn clamp_u16(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<u16> {
let bounded: NumericRange = bounded_u64_range(range, 0.0, U16_MAX_F64);
let min: u16 = bounded.min.ceil() as u16;
let max: u16 = bounded.max.floor() as u16;
parse_and_clamp(raw, min, max, |s| s.parse::<u16>().ok())
}
fn clamp_u32(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<u32> {
let bounded: NumericRange = bounded_u64_range(range, 0.0, U32_MAX_F64);
let min: u32 = bounded.min.ceil() as u32;
let max: u32 = bounded.max.floor() as u32;
parse_and_clamp(raw, min, max, |s| s.parse::<u32>().ok())
}
fn clamp_u64(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<u64> {
let bounded: NumericRange = bounded_u64_range(range, 0.0, U64_MAX_F64);
let min: u64 = bounded.min.ceil() as u64;
let max: u64 = bounded.max.floor() as u64;
parse_and_clamp(raw, min, max, |s| s.parse::<u64>().ok())
}
fn clamp_usize(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<usize> {
let bounded: NumericRange = bounded_u64_range(range, 0.0, USIZE_MAX_F64);
let min: usize = bounded.min.ceil() as usize;
let max: usize = bounded.max.floor() as usize;
parse_and_clamp(raw, min, max, |s| s.parse::<usize>().ok())
}
fn clamp_i32(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<i32> {
let bounded: NumericRange = bounded_i64_range(range, I32_MIN_F64, I32_MAX_F64);
let min: i32 = bounded.min.ceil() as i32;
let max: i32 = bounded.max.floor() as i32;
parse_and_clamp(raw, min, max, |s| s.parse::<i32>().ok())
}
fn clamp_i64(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<i64> {
let bounded: NumericRange = bounded_i64_range(range, I64_MIN_F64, I64_MAX_F64);
let min: i64 = bounded.min.ceil() as i64;
let max: i64 = bounded.max.floor() as i64;
parse_and_clamp(raw, min, max, |s| s.parse::<i64>().ok())
}
fn clamp_f64(raw: Option<&str>, range: NumericRange) -> ParsedNumeric<f64> {
let bounded: NumericRange = sanitize_range(range, NumericRange::new(0.0, 100.0));
let min: f64 = bounded.min;
let max: f64 = bounded.max;
let Some(raw_value) = raw.map(str::trim).filter(|v| !v.is_empty()) else {
return ParsedNumeric {
value: min,
reason: None,
};
};
let Some(parsed) = raw_value.parse::<f64>().ok().filter(|v| v.is_finite()) else {
return ParsedNumeric {
value: min,
reason: Some(CorrectionReason::ParseError),
};
};
if parsed < min {
ParsedNumeric {
value: min,
reason: Some(CorrectionReason::BelowMin),
}
} else if parsed > max {
ParsedNumeric {
value: max,
reason: Some(CorrectionReason::AboveMax),
}
} else {
ParsedNumeric {
value: parsed,
reason: None,
}
}
}
fn parse_and_clamp<T>(
raw: Option<&str>,
min: T,
max: T,
parser: impl Fn(&str) -> Option<T>,
) -> ParsedNumeric<T>
where
T: Copy + PartialOrd,
{
let Some(raw_value) = raw.map(str::trim).filter(|v| !v.is_empty()) else {
return ParsedNumeric {
value: min,
reason: None,
};
};
let Some(parsed) = parser(raw_value) else {
return ParsedNumeric {
value: min,
reason: Some(CorrectionReason::ParseError),
};
};
if parsed < min {
ParsedNumeric {
value: min,
reason: Some(CorrectionReason::BelowMin),
}
} else if parsed > max {
ParsedNumeric {
value: max,
reason: Some(CorrectionReason::AboveMax),
}
} else {
ParsedNumeric {
value: parsed,
reason: None,
}
}
}
fn bounded_u64_range(range: NumericRange, floor: f64, ceiling: f64) -> NumericRange {
let fallback: NumericRange = NumericRange::new(floor, ceiling);
let mut bounded: NumericRange = sanitize_range(range, fallback);
bounded.min = bounded.min.clamp(floor, ceiling);
bounded.max = bounded.max.clamp(floor, ceiling);
if bounded.min > bounded.max {
NumericRange::new(bounded.max, bounded.max)
} else {
bounded
}
}
fn bounded_i64_range(range: NumericRange, floor: f64, ceiling: f64) -> NumericRange {
let fallback: NumericRange = NumericRange::new(floor, ceiling);
let mut bounded: NumericRange = sanitize_range(range, fallback);
bounded.min = bounded.min.clamp(floor, ceiling);
bounded.max = bounded.max.clamp(floor, ceiling);
if bounded.min > bounded.max {
NumericRange::new(bounded.max, bounded.max)
} else {
bounded
}
}
fn format_numeric(value: f64) -> String {
if value.fract() == 0.0 {
format!("{:.0}", value)
} else {
format!("{value}")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_config_u64_uses_range_min_when_missing() {
let mut ranges: ValidationRanges = ValidationRanges::default();
ranges
.config
.insert("api.cache_ttl".to_string(), NumericRange::new(7.0, 30.0));
let value: u64 = normalize_config_u64(
&ranges,
"api.cache_ttl",
None,
NumericRange::new(240.0, 86_400.0),
);
assert_eq!(value, 7);
}
#[test]
fn normalize_config_u64_clamps_to_max() {
let mut ranges: ValidationRanges = ValidationRanges::default();
ranges.config.insert(
"gateway.admission_window_secs".to_string(),
NumericRange::new(1.0, 5.0),
);
let raw: String = "500".to_string();
let value: u64 = normalize_config_u64(
&ranges,
"gateway.admission_window_secs",
Some(&raw),
NumericRange::new(1.0, 3_600.0),
);
assert_eq!(value, 5);
}
#[test]
fn env_settings_clamp_client_reconnect_interval() {
let mut ranges: ValidationRanges = ValidationRanges::default();
ranges.env.insert(
"ATHENA_CLIENT_RECONNECT_INTERVAL_SECS".to_string(),
NumericRange::new(20.0, 60.0),
);
unsafe {
env::set_var("ATHENA_CLIENT_RECONNECT_INTERVAL_SECS", "500");
}
let (settings, _corrections) = build_runtime_env_settings(&ranges);
assert_eq!(settings.client_reconnect_interval_secs, 60);
unsafe {
env::remove_var("ATHENA_CLIENT_RECONNECT_INTERVAL_SECS");
}
}
}