athena_rs 3.22.1

Hyper performant polyglot Database driver
Documentation
use crate::AppState;
use std::env;
use std::io::IsTerminal;

const INSERT_WINDOW_INLINE_BYPASS_TABLES: &[&str] =
    &["storage_profiles", "typesense_profiles", "saved_queries"];

fn feature_enabled_from_env(name: &str, default: bool) -> bool {
    env::var(name)
        .ok()
        .map(|value| {
            let normalized = value.trim().to_ascii_lowercase();
            matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
        })
        .unwrap_or(default)
}

fn bounded_f64_from_env(name: &str, default: f64, min: f64, max: f64) -> f64 {
    let parsed: f64 = env::var(name)
        .ok()
        .and_then(|value| value.trim().parse::<f64>().ok())
        .unwrap_or(default);
    parsed.clamp(min, max)
}

fn bounded_u64_from_env(name: &str, default: u64, min: u64, max: u64) -> u64 {
    let parsed: u64 = env::var(name)
        .ok()
        .and_then(|value| value.trim().parse::<u64>().ok())
        .unwrap_or(default);
    parsed.clamp(min, max)
}

fn default_insert_timeout_ms(app_state: &AppState) -> u64 {
    app_state
        .gateway_resilience_timeout_secs
        .saturating_mul(1000)
        .clamp(250, 120_000)
}

pub(crate) fn insert_db_timeout_ms(app_state: &AppState) -> u64 {
    bounded_u64_from_env(
        "ATHENA_INSERT_DB_TIMEOUT_MS",
        default_insert_timeout_ms(app_state),
        100,
        180_000,
    )
}

pub(crate) fn insert_window_response_timeout_ms(app_state: &AppState, window_ms: u64) -> u64 {
    let fallback_ms: u64 = default_insert_timeout_ms(app_state)
        .saturating_add(window_ms)
        .saturating_add(250);
    bounded_u64_from_env(
        "ATHENA_INSERT_WINDOW_RESPONSE_TIMEOUT_MS",
        fallback_ms,
        250,
        240_000,
    )
}

pub(crate) fn insert_window_dedupe_enabled() -> bool {
    feature_enabled_from_env("ATHENA_INSERT_WINDOW_DEDUPE_ENABLED", true)
}

pub(crate) fn table_bypasses_insert_window(table_name: &str) -> bool {
    let normalized = table_name.trim();
    if normalized.is_empty() {
        return false;
    }

    let short_name = normalized
        .rsplit_once('.')
        .map(|(_, short)| short)
        .unwrap_or(normalized);

    INSERT_WINDOW_INLINE_BYPASS_TABLES
        .iter()
        .any(|candidate| short_name.eq_ignore_ascii_case(candidate))
}

pub(crate) fn recent_unique_conflict_cache_enabled() -> bool {
    feature_enabled_from_env("ATHENA_RECENT_UNIQUE_CONFLICT_CACHE_ENABLED", true)
}

pub(crate) fn insert_admission_defer_enabled() -> bool {
    feature_enabled_from_env("ATHENA_INSERT_ADMISSION_DEFER_ENABLED", true)
}

pub(crate) fn insert_admission_high_watermark_ratio() -> f64 {
    bounded_f64_from_env("ATHENA_INSERT_ADMISSION_HIGH_WATERMARK", 0.85, 0.10, 0.99)
}

pub(crate) fn queue_high_watermark(max_queued: usize, ratio: f64) -> usize {
    if max_queued == 0 {
        return 0;
    }
    let threshold = (max_queued as f64 * ratio).ceil() as usize;
    threshold.clamp(1, max_queued)
}

pub(crate) fn should_defer_insert_for_queue_pressure(
    pending_queue: usize,
    max_queued: usize,
    ratio: f64,
) -> bool {
    let threshold: usize = queue_high_watermark(max_queued, ratio);
    threshold > 0 && pending_queue >= threshold
}

pub(crate) fn insert_overload_retry_after_ms(
    pending_queue: usize,
    max_queued: usize,
    window_ms: u64,
) -> u64 {
    let pressure_ratio: f64 = if max_queued == 0 {
        1.0
    } else {
        pending_queue as f64 / max_queued as f64
    }
    .clamp(0.0, 2.0);
    let base_ms: u64 = window_ms.max(25);
    let multiplier: u64 = if pressure_ratio >= 1.5 {
        8
    } else if pressure_ratio >= 1.25 {
        6
    } else if pressure_ratio >= 1.0 {
        4
    } else if pressure_ratio >= 0.9 {
        3
    } else {
        2
    };
    let suggested_ms: u64 = base_ms.saturating_mul(multiplier).clamp(50, 10_000);
    bounded_u64_from_env(
        "ATHENA_INSERT_OVERLOAD_RETRY_AFTER_MS",
        suggested_ms,
        50,
        60_000,
    )
}

pub(crate) fn retry_after_seconds_from_ms(retry_after_ms: u64) -> u64 {
    retry_after_ms
        .saturating_add(999)
        .saturating_div(1000)
        .clamp(1, 60)
}

pub(crate) fn athena_verbose_logging_enabled() -> bool {
    env::var("ATHENA_VERBOSE_LOGGING")
        .ok()
        .map(|value| {
            let normalized = value.trim().to_ascii_lowercase();
            matches!(normalized.as_str(), "1" | "true" | "yes" | "on")
        })
        .unwrap_or(false)
}

pub(crate) fn athena_ansi_enabled() -> bool {
    if env::var("NO_COLOR").is_ok() {
        return false;
    }

    env::var("ATHENA_ANSI")
        .ok()
        .and_then(|value| {
            let normalized = value.trim().to_ascii_lowercase();
            match normalized.as_str() {
                "1" | "true" | "yes" | "on" => Some(true),
                "0" | "false" | "no" | "off" => Some(false),
                _ => None,
            }
        })
        .unwrap_or_else(|| std::io::stderr().is_terminal())
}

#[cfg(test)]
mod tests {
    use super::table_bypasses_insert_window;

    #[test]
    fn insert_window_bypass_matches_control_plane_profile_tables() {
        assert!(table_bypasses_insert_window("storage_profiles"));
        assert!(table_bypasses_insert_window("public.storage_profiles"));
        assert!(table_bypasses_insert_window("typesense_profiles"));
        assert!(table_bypasses_insert_window("saved_queries"));
        assert!(!table_bypasses_insert_window("gateway_request_log"));
        assert!(!table_bypasses_insert_window("public.events"));
    }
}