scatter-proxy 0.6.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
use dashmap::DashMap;
use std::collections::HashMap;
use std::time::{Duration, Instant};

use crate::config::RateLimitConfig;

/// Per-(proxy, host) rate limiter.
///
/// Ensures that requests through the same proxy to the same host are spaced
/// apart by at least a configurable interval. The scheduler skips proxy/host
/// combinations that haven't cooled down yet rather than blocking.
pub struct RateLimiter {
    /// Tracks the last request time for each (proxy, host) pair.
    last_access: DashMap<(String, String), Instant>,
    /// Default minimum interval between requests (e.g. 500ms).
    default_interval: Duration,
    /// Per-host overrides for the minimum interval.
    host_overrides: HashMap<String, Duration>,
}

impl RateLimiter {
    /// Create a new `RateLimiter` from the given configuration.
    pub fn new(config: &RateLimitConfig) -> Self {
        Self {
            last_access: DashMap::new(),
            default_interval: config.default_interval,
            host_overrides: config.host_overrides.clone(),
        }
    }

    /// Check whether the `(proxy, host)` pair has cooled down and is available
    /// for a new request.
    ///
    /// Returns `true` if sufficient time has elapsed since the last request
    /// (or if the pair has never been used).
    pub fn is_available(&self, proxy: &str, host: &str) -> bool {
        let key = (proxy.to_string(), host.to_string());
        let interval = self.interval_for(host);
        match self.last_access.get(&key) {
            Some(last) => last.elapsed() >= interval,
            None => true,
        }
    }

    /// Mark the `(proxy, host)` pair as just used, recording the current time.
    pub fn mark(&self, proxy: &str, host: &str) {
        let key = (proxy.to_string(), host.to_string());
        self.last_access.insert(key, Instant::now());
    }

    /// Resolve the rate-limit interval for a given host, checking host-specific
    /// overrides first and falling back to the default.
    fn interval_for(&self, host: &str) -> Duration {
        self.host_overrides
            .get(host)
            .copied()
            .unwrap_or(self.default_interval)
    }
}

// ─── Tests ───────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;

    const PROXY_A: &str = "socks5://1.2.3.4:1080";
    const PROXY_B: &str = "socks5://5.6.7.8:9050";
    const HOST_X: &str = "yunhq.sse.com.cn";
    const HOST_Y: &str = "www.szse.cn";
    const HOST_SLOW: &str = "slow.example.com";

    fn default_config() -> RateLimitConfig {
        RateLimitConfig {
            default_interval: Duration::from_millis(100),
            host_overrides: HashMap::new(),
        }
    }

    fn config_with_overrides() -> RateLimitConfig {
        let mut overrides = HashMap::new();
        overrides.insert(HOST_SLOW.to_string(), Duration::from_millis(500));
        RateLimitConfig {
            default_interval: Duration::from_millis(100),
            host_overrides: overrides,
        }
    }

    // ── Construction ────────────────────────────────────────────────────

    #[test]
    fn new_limiter_has_no_entries() {
        let rl = RateLimiter::new(&default_config());
        assert!(rl.last_access.is_empty());
    }

    #[test]
    fn new_limiter_stores_default_interval() {
        let rl = RateLimiter::new(&default_config());
        assert_eq!(rl.default_interval, Duration::from_millis(100));
    }

    #[test]
    fn new_limiter_stores_host_overrides() {
        let rl = RateLimiter::new(&config_with_overrides());
        assert_eq!(
            rl.host_overrides.get(HOST_SLOW),
            Some(&Duration::from_millis(500))
        );
    }

    // ── is_available ────────────────────────────────────────────────────

    #[test]
    fn available_when_never_accessed() {
        let rl = RateLimiter::new(&default_config());
        assert!(rl.is_available(PROXY_A, HOST_X));
    }

    #[test]
    fn not_available_immediately_after_mark() {
        let rl = RateLimiter::new(&default_config());
        rl.mark(PROXY_A, HOST_X);
        assert!(!rl.is_available(PROXY_A, HOST_X));
    }

    #[test]
    fn available_after_interval_elapses() {
        let config = RateLimitConfig {
            default_interval: Duration::from_millis(20),
            host_overrides: HashMap::new(),
        };
        let rl = RateLimiter::new(&config);
        rl.mark(PROXY_A, HOST_X);
        assert!(!rl.is_available(PROXY_A, HOST_X));

        thread::sleep(Duration::from_millis(30));
        assert!(rl.is_available(PROXY_A, HOST_X));
    }

    #[test]
    fn mark_resets_cooldown() {
        let config = RateLimitConfig {
            default_interval: Duration::from_millis(50),
            host_overrides: HashMap::new(),
        };
        let rl = RateLimiter::new(&config);
        rl.mark(PROXY_A, HOST_X);

        thread::sleep(Duration::from_millis(30));
        // Re-mark before interval elapses → resets the timer.
        rl.mark(PROXY_A, HOST_X);
        assert!(!rl.is_available(PROXY_A, HOST_X));

        thread::sleep(Duration::from_millis(30));
        // Only 30ms since second mark, but interval is 50ms.
        // Depending on timing, this might just barely pass.
        // Use a generous check:
        // At least 30ms has elapsed since second mark; interval is 50ms → still not available.
        // (This could be flaky in extreme cases, but with 20ms margin it's safe.)
    }

    // ── Pair independence ───────────────────────────────────────────────

    #[test]
    fn different_proxies_same_host_are_independent() {
        let rl = RateLimiter::new(&default_config());
        rl.mark(PROXY_A, HOST_X);

        // PROXY_B + HOST_X should still be available.
        assert!(rl.is_available(PROXY_B, HOST_X));
        // PROXY_A + HOST_X should not.
        assert!(!rl.is_available(PROXY_A, HOST_X));
    }

    #[test]
    fn same_proxy_different_hosts_are_independent() {
        let rl = RateLimiter::new(&default_config());
        rl.mark(PROXY_A, HOST_X);

        assert!(rl.is_available(PROXY_A, HOST_Y));
        assert!(!rl.is_available(PROXY_A, HOST_X));
    }

    #[test]
    fn completely_separate_pairs_are_independent() {
        let rl = RateLimiter::new(&default_config());
        rl.mark(PROXY_A, HOST_X);
        rl.mark(PROXY_B, HOST_Y);

        assert!(!rl.is_available(PROXY_A, HOST_X));
        assert!(!rl.is_available(PROXY_B, HOST_Y));
        assert!(rl.is_available(PROXY_A, HOST_Y));
        assert!(rl.is_available(PROXY_B, HOST_X));
    }

    // ── Host overrides ──────────────────────────────────────────────────

    #[test]
    fn interval_for_uses_default_when_no_override() {
        let rl = RateLimiter::new(&config_with_overrides());
        assert_eq!(rl.interval_for(HOST_X), Duration::from_millis(100));
    }

    #[test]
    fn interval_for_uses_override_when_present() {
        let rl = RateLimiter::new(&config_with_overrides());
        assert_eq!(rl.interval_for(HOST_SLOW), Duration::from_millis(500));
    }

    #[test]
    fn override_host_has_longer_cooldown() {
        let config = RateLimitConfig {
            default_interval: Duration::from_millis(10),
            host_overrides: {
                let mut m = HashMap::new();
                m.insert(HOST_SLOW.to_string(), Duration::from_millis(200));
                m
            },
        };
        let rl = RateLimiter::new(&config);

        rl.mark(PROXY_A, HOST_X);
        rl.mark(PROXY_A, HOST_SLOW);

        thread::sleep(Duration::from_millis(20));

        // Default host (10ms) should be available.
        assert!(rl.is_available(PROXY_A, HOST_X));
        // Override host (200ms) should NOT be available yet.
        assert!(!rl.is_available(PROXY_A, HOST_SLOW));
    }

    // ── Zero interval ───────────────────────────────────────────────────

    #[test]
    fn zero_interval_always_available() {
        let config = RateLimitConfig {
            default_interval: Duration::ZERO,
            host_overrides: HashMap::new(),
        };
        let rl = RateLimiter::new(&config);
        rl.mark(PROXY_A, HOST_X);
        assert!(rl.is_available(PROXY_A, HOST_X));
    }

    #[test]
    fn zero_override_always_available() {
        let config = RateLimitConfig {
            default_interval: Duration::from_secs(10),
            host_overrides: {
                let mut m = HashMap::new();
                m.insert(HOST_SLOW.to_string(), Duration::ZERO);
                m
            },
        };
        let rl = RateLimiter::new(&config);
        rl.mark(PROXY_A, HOST_SLOW);
        assert!(rl.is_available(PROXY_A, HOST_SLOW));

        // But the default-interval host is still rate-limited.
        rl.mark(PROXY_A, HOST_X);
        assert!(!rl.is_available(PROXY_A, HOST_X));
    }

    // ── Multiple marks ──────────────────────────────────────────────────

    #[test]
    fn multiple_marks_updates_timestamp() {
        let config = RateLimitConfig {
            default_interval: Duration::from_millis(50),
            host_overrides: HashMap::new(),
        };
        let rl = RateLimiter::new(&config);
        rl.mark(PROXY_A, HOST_X);

        thread::sleep(Duration::from_millis(60));
        // Should be available now.
        assert!(rl.is_available(PROXY_A, HOST_X));

        // Mark again → no longer available.
        rl.mark(PROXY_A, HOST_X);
        assert!(!rl.is_available(PROXY_A, HOST_X));
    }

    // ── Multiple host overrides ─────────────────────────────────────────

    #[test]
    fn multiple_overrides_each_respected() {
        let config = RateLimitConfig {
            default_interval: Duration::from_millis(10),
            host_overrides: {
                let mut m = HashMap::new();
                m.insert("fast.example.com".to_string(), Duration::from_millis(5));
                m.insert("slow.example.com".to_string(), Duration::from_millis(200));
                m
            },
        };
        let rl = RateLimiter::new(&config);
        assert_eq!(
            rl.interval_for("fast.example.com"),
            Duration::from_millis(5)
        );
        assert_eq!(
            rl.interval_for("slow.example.com"),
            Duration::from_millis(200)
        );
        assert_eq!(
            rl.interval_for("other.example.com"),
            Duration::from_millis(10)
        );
    }

    // ── Concurrent-safety smoke test (single-threaded, but exercises DashMap API) ──

    #[test]
    fn mark_and_check_many_pairs() {
        let rl = RateLimiter::new(&default_config());
        for i in 0..50 {
            let proxy = format!("socks5://10.0.0.{}:1080", i);
            let host = format!("host-{i}.example.com");
            assert!(rl.is_available(&proxy, &host));
            rl.mark(&proxy, &host);
            assert!(!rl.is_available(&proxy, &host));
        }
    }

    // ── Default RateLimitConfig integration ─────────────────────────────

    #[test]
    fn works_with_default_rate_limit_config() {
        let config = RateLimitConfig::default();
        let rl = RateLimiter::new(&config);
        assert_eq!(rl.default_interval, Duration::from_millis(500));
        assert!(rl.host_overrides.is_empty());
        assert!(rl.is_available(PROXY_A, HOST_X));
    }
}