athena_rs 3.23.0

Hyper performant polyglot Database driver
Documentation
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use tracing::warn;

const STORAGE_EXHAUSTION_BACKOFF: Duration = Duration::from_secs(60);
const POOL_TIMEOUT_BACKOFF: Duration = Duration::from_secs(15);

static BACKOFFS: OnceLock<Mutex<HashMap<&'static str, Instant>>> = OnceLock::new();

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BestEffortPgBackoffCause {
    StorageExhaustion,
    PoolTimeout,
}

impl BestEffortPgBackoffCause {
    fn cooldown(self) -> Duration {
        match self {
            Self::StorageExhaustion => STORAGE_EXHAUSTION_BACKOFF,
            Self::PoolTimeout => POOL_TIMEOUT_BACKOFF,
        }
    }

    fn as_str(self) -> &'static str {
        match self {
            Self::StorageExhaustion => "storage_exhaustion",
            Self::PoolTimeout => "pool_timeout",
        }
    }
}

fn backoffs() -> &'static Mutex<HashMap<&'static str, Instant>> {
    BACKOFFS.get_or_init(|| Mutex::new(HashMap::new()))
}

pub fn is_storage_exhaustion_error_message(message: &str) -> bool {
    let normalized = message.to_ascii_lowercase();
    normalized.contains("no space left on device")
        || normalized.contains("disk quota exceeded")
        || normalized.contains("sqlstate 53100")
        || normalized.contains("could not extend file")
}

pub fn is_pool_timeout_error_message(message: &str) -> bool {
    let normalized = message.to_ascii_lowercase();
    normalized.contains("pool timed out while waiting for an open connection")
        || normalized.contains("database connection pool timed out")
        || normalized.contains("pool is closed")
}

pub fn classify_best_effort_pg_backoff_cause(message: &str) -> Option<BestEffortPgBackoffCause> {
    if is_storage_exhaustion_error_message(message) {
        Some(BestEffortPgBackoffCause::StorageExhaustion)
    } else if is_pool_timeout_error_message(message) {
        Some(BestEffortPgBackoffCause::PoolTimeout)
    } else {
        None
    }
}

pub fn best_effort_pg_write_backoff_active(domain: &'static str) -> bool {
    let now = Instant::now();
    let Ok(mut guard) = backoffs().lock() else {
        return false;
    };
    match guard.get(domain).copied() {
        Some(until) if until > now => true,
        Some(_) => {
            guard.remove(domain);
            false
        }
        None => false,
    }
}

pub fn maybe_activate_best_effort_pg_write_backoff(
    domain: &'static str,
    context: &'static str,
    error_message: &str,
) -> Option<BestEffortPgBackoffCause> {
    let cause = classify_best_effort_pg_backoff_cause(error_message)?;

    let now = Instant::now();
    let cooldown = cause.cooldown();
    let until = now + cooldown;
    let Ok(mut guard) = backoffs().lock() else {
        warn!(
            domain,
            context,
            cause = cause.as_str(),
            cooldown_secs = cooldown.as_secs(),
            error = error_message,
            "best-effort Postgres logging hit a retry-suppression condition; failed to arm backoff"
        );
        return Some(cause);
    };

    let already_active = guard.get(domain).is_some_and(|existing| *existing > now);
    guard.insert(domain, until);

    if !already_active {
        match cause {
            BestEffortPgBackoffCause::StorageExhaustion => warn!(
                domain,
                context,
                cause = cause.as_str(),
                cooldown_secs = cooldown.as_secs(),
                error = error_message,
                "best-effort Postgres logging paused after storage exhaustion"
            ),
            BestEffortPgBackoffCause::PoolTimeout => warn!(
                domain,
                context,
                cause = cause.as_str(),
                cooldown_secs = cooldown.as_secs(),
                error = error_message,
                "best-effort Postgres logging paused after logging pool timeout"
            ),
        }
    }

    Some(cause)
}

#[cfg(test)]
mod tests {
    use super::{
        BestEffortPgBackoffCause, classify_best_effort_pg_backoff_cause,
        is_pool_timeout_error_message, is_storage_exhaustion_error_message,
    };

    #[test]
    fn detects_common_storage_exhaustion_errors() {
        assert!(is_storage_exhaustion_error_message(
            "error returned from database: could not extend file \"base/1/2\": No space left on device"
        ));
        assert!(is_storage_exhaustion_error_message(
            "SQLSTATE 53100 disk full"
        ));
        assert!(!is_storage_exhaustion_error_message("permission denied"));
    }

    #[test]
    fn detects_pool_timeout_errors() {
        assert!(is_pool_timeout_error_message(
            "pool timed out while waiting for an open connection"
        ));
        assert!(is_pool_timeout_error_message(
            "Database connection pool timed out"
        ));
        assert!(!is_pool_timeout_error_message("permission denied"));
    }

    #[test]
    fn classifies_storage_and_pool_timeout_errors() {
        assert_eq!(
            classify_best_effort_pg_backoff_cause("No space left on device"),
            Some(BestEffortPgBackoffCause::StorageExhaustion)
        );
        assert_eq!(
            classify_best_effort_pg_backoff_cause(
                "pool timed out while waiting for an open connection"
            ),
            Some(BestEffortPgBackoffCause::PoolTimeout)
        );
        assert_eq!(
            classify_best_effort_pg_backoff_cause("permission denied"),
            None
        );
    }
}