steam-user 0.2.4

Steam User web client for Rust - HTTP-based Steam Community interactions
Documentation
//! Per-node health tracking (a simple circuit breaker) for the proxy pool.
//!
//! # Why the state is a process-wide static
//!
//! The bot constructs a fresh [`RemoteSteamUser`](super::client::RemoteSteamUser)
//! per account per call (via a builder helper). If the breaker state lived on
//! the client instance it would be born empty and dropped at the end of every
//! call, so consecutive-failure counts could never accumulate and the breaker
//! would do nothing. The state therefore lives in a process-wide
//! [`OnceLock`]`<`[`Mutex`]`<`[`HashMap`]`<String, NodeHealth>>>` keyed by URL,
//! shared across every client in the process. This is the only placement that
//! survives the per-call construction pattern.
//!
//! # Policy
//!
//! * After [`FAILURE_THRESHOLD`] consecutive *retryable* failures (5xx /
//!   connect errors — node-level faults), a node is quarantined for
//!   [`QUARANTINE_DURATION`].
//! * Any success resets the node's failure counter and clears quarantine.
//! * Steam-tunneled errors (the proxy reached Steam, which then returned e.g.
//!   `429`) are *not* node faults — they reset the counter just like a success,
//!   because the node is healthy.
//! * While quarantined a node is skipped during selection, unless *every* node
//!   is quarantined — then all nodes are eligible again (never hard-fail
//!   solely because the breaker closed every door).

use std::{
    collections::HashMap,
    sync::{Mutex, OnceLock},
    time::{Duration, Instant},
};

/// Consecutive retryable failures before a node is quarantined.
pub(crate) const FAILURE_THRESHOLD: u32 = 3;

/// How long a node stays quarantined once tripped.
pub(crate) const QUARANTINE_DURATION: Duration = Duration::from_secs(300);

/// Per-node health record.
#[derive(Debug, Clone, Default)]
pub(crate) struct NodeHealth {
    /// Consecutive retryable failures observed since the last success/reset.
    consecutive_failures: u32,
    /// If `Some`, the node is quarantined until this instant.
    quarantined_until: Option<Instant>,
}

impl NodeHealth {
    /// Returns `true` if the node is quarantined as of `now`.
    fn is_quarantined(&self, now: Instant) -> bool {
        match self.quarantined_until {
            Some(until) => now < until,
            None => false,
        }
    }
}

/// Process-wide breaker state, keyed by node URL.
fn registry() -> &'static Mutex<HashMap<String, NodeHealth>> {
    static REGISTRY: OnceLock<Mutex<HashMap<String, NodeHealth>>> = OnceLock::new();
    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}

/// Outcome of recording an event, returned so the caller can emit `tracing`
/// events without holding the registry lock.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum HealthTransition {
    /// Nothing notable changed.
    None,
    /// The node just crossed the threshold and was quarantined.
    Quarantined { consecutive: u32 },
    /// The node produced its first success after a quarantine had expired.
    Recovered,
}

/// Classification of a node interaction, fed to [`record`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum NodeOutcome {
    /// The node served the request (or reached Steam) successfully — healthy.
    Success,
    /// A node-level fault (5xx / connect error) — counts toward quarantine.
    RetryableFault,
}

/// Records an outcome for `url` against the process-wide registry and returns
/// the resulting [`HealthTransition`]. Pure aside from the shared map; the
/// `tracing` emission is left to the caller.
pub(crate) fn record(url: &str, outcome: NodeOutcome) -> HealthTransition {
    record_at(url, outcome, Instant::now())
}

/// [`record`] with an injectable clock, for tests.
fn record_at(url: &str, outcome: NodeOutcome, now: Instant) -> HealthTransition {
    let mut map = match registry().lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
    };
    let entry = map.entry(url.to_string()).or_default();

    match outcome {
        NodeOutcome::Success => {
            // A success after an expired quarantine is a recovery signal.
            let was_quarantined = entry.quarantined_until.is_some();
            entry.consecutive_failures = 0;
            entry.quarantined_until = None;
            if was_quarantined {
                HealthTransition::Recovered
            } else {
                HealthTransition::None
            }
        }
        NodeOutcome::RetryableFault => {
            // Don't keep escalating an already-quarantined node; the failure is
            // expected while it's serving no traffic. Only trip on the edge.
            if entry.is_quarantined(now) {
                return HealthTransition::None;
            }
            // Deliberately NOT reset when a quarantine expires: expiry admits a
            // single probe request (classic half-open breaker). If that probe
            // fails, this increment re-trips immediately for another full
            // quarantine window instead of granting a still-sick node a fresh
            // THRESHOLD-sized failure budget. Only a success resets the count.
            entry.consecutive_failures = entry.consecutive_failures.saturating_add(1);
            if entry.consecutive_failures >= FAILURE_THRESHOLD {
                entry.quarantined_until = Some(now + QUARANTINE_DURATION);
                HealthTransition::Quarantined {
                    consecutive: entry.consecutive_failures,
                }
            } else {
                HealthTransition::None
            }
        }
    }
}

/// Returns `true` if `url` is quarantined as of `now`. Used by tests; the
/// production selection path goes through [`eligible_indices`] instead.
#[cfg(test)]
fn is_quarantined_at(url: &str, now: Instant) -> bool {
    let map = match registry().lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
    };
    map.get(url).map(|h| h.is_quarantined(now)).unwrap_or(false)
}

/// Given the full URL list, returns the indices of nodes eligible for
/// selection: healthy nodes only, unless *every* node is quarantined, in which
/// case all indices are returned (fail-open).
pub(crate) fn eligible_indices(urls: &[String]) -> Vec<usize> {
    eligible_indices_at(urls, Instant::now())
}

/// [`eligible_indices`] with an injectable clock, for tests.
fn eligible_indices_at(urls: &[String], now: Instant) -> Vec<usize> {
    let map = match registry().lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
    };
    let healthy: Vec<usize> = urls
        .iter()
        .enumerate()
        .filter(|(_, url)| !map.get(*url).map(|h| h.is_quarantined(now)).unwrap_or(false))
        .map(|(i, _)| i)
        .collect();

    if healthy.is_empty() {
        // Every door is closed — fail open and let the caller try everything.
        (0..urls.len()).collect()
    } else {
        healthy
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Each test uses a unique URL prefix so the shared process-wide registry
    /// doesn't bleed state across tests running in the same process.
    fn unique(tag: &str) -> String {
        use std::sync::atomic::{AtomicU64, Ordering};
        static N: AtomicU64 = AtomicU64::new(0);
        format!("https://node-{}-{}.test", tag, N.fetch_add(1, Ordering::Relaxed))
    }

    #[test]
    fn trips_after_threshold_consecutive_faults() {
        let url = unique("trip");
        let t0 = Instant::now();

        // First two faults: counting, not yet quarantined.
        assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
        assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
        assert!(!is_quarantined_at(&url, t0));

        // Third fault trips the breaker.
        assert_eq!(
            record_at(&url, NodeOutcome::RetryableFault, t0),
            HealthTransition::Quarantined { consecutive: 3 }
        );
        assert!(is_quarantined_at(&url, t0));
    }

    #[test]
    fn success_resets_counter_before_threshold() {
        let url = unique("reset");
        let t0 = Instant::now();

        record_at(&url, NodeOutcome::RetryableFault, t0);
        record_at(&url, NodeOutcome::RetryableFault, t0);
        // Success wipes the counter; the node is not quarantined.
        assert_eq!(record_at(&url, NodeOutcome::Success, t0), HealthTransition::None);

        // It now takes a fresh run of THREE faults to trip again.
        record_at(&url, NodeOutcome::RetryableFault, t0);
        record_at(&url, NodeOutcome::RetryableFault, t0);
        assert!(!is_quarantined_at(&url, t0));
        assert_eq!(
            record_at(&url, NodeOutcome::RetryableFault, t0),
            HealthTransition::Quarantined { consecutive: 3 }
        );
    }

    #[test]
    fn quarantine_expires_after_duration() {
        let url = unique("expire");
        let t0 = Instant::now();

        for _ in 0..FAILURE_THRESHOLD {
            record_at(&url, NodeOutcome::RetryableFault, t0);
        }
        assert!(is_quarantined_at(&url, t0));

        // Still quarantined just before expiry.
        let almost = t0 + QUARANTINE_DURATION - Duration::from_secs(1);
        assert!(is_quarantined_at(&url, almost));

        // Expired afterwards.
        let after = t0 + QUARANTINE_DURATION + Duration::from_secs(1);
        assert!(!is_quarantined_at(&url, after));
    }

    #[test]
    fn first_success_after_expiry_reports_recovered() {
        let url = unique("recover");
        let t0 = Instant::now();

        for _ in 0..FAILURE_THRESHOLD {
            record_at(&url, NodeOutcome::RetryableFault, t0);
        }
        let after = t0 + QUARANTINE_DURATION + Duration::from_secs(1);
        assert!(!is_quarantined_at(&url, after));

        // The first success after the quarantine window reports recovery.
        assert_eq!(record_at(&url, NodeOutcome::Success, after), HealthTransition::Recovered);
        // A subsequent success is unremarkable.
        assert_eq!(record_at(&url, NodeOutcome::Success, after), HealthTransition::None);
    }

    #[test]
    fn fault_while_quarantined_does_not_re_escalate() {
        let url = unique("noescalate");
        let t0 = Instant::now();

        for _ in 0..FAILURE_THRESHOLD {
            record_at(&url, NodeOutcome::RetryableFault, t0);
        }
        // Further faults while quarantined are no-ops, not repeated trips.
        assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
        assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
    }

    #[test]
    fn steam_tunneled_errors_count_as_healthy() {
        let url = unique("tunnel");
        let t0 = Instant::now();

        // Steam-tunneled errors are recorded as Success (the node reached
        // Steam), so they never trip the breaker no matter how many occur.
        for _ in 0..(FAILURE_THRESHOLD * 3) {
            assert_eq!(record_at(&url, NodeOutcome::Success, t0), HealthTransition::None);
        }
        assert!(!is_quarantined_at(&url, t0));
    }

    #[test]
    fn eligible_skips_quarantined_nodes() {
        let healthy = unique("elig-healthy");
        let sick = unique("elig-sick");
        let urls = vec![healthy.clone(), sick.clone()];
        let t0 = Instant::now();

        for _ in 0..FAILURE_THRESHOLD {
            record_at(&sick, NodeOutcome::RetryableFault, t0);
        }

        // Only the healthy node (index 0) is eligible.
        assert_eq!(eligible_indices_at(&urls, t0), vec![0]);
    }

    #[test]
    fn all_quarantined_falls_back_to_everything() {
        let a = unique("all-a");
        let b = unique("all-b");
        let urls = vec![a.clone(), b.clone()];
        let t0 = Instant::now();

        for _ in 0..FAILURE_THRESHOLD {
            record_at(&a, NodeOutcome::RetryableFault, t0);
            record_at(&b, NodeOutcome::RetryableFault, t0);
        }

        // Both quarantined → fail open, every index eligible.
        assert_eq!(eligible_indices_at(&urls, t0), vec![0, 1]);
    }
}