scatter-proxy 0.6.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use serde::{Deserialize, Serialize};

/// Snapshot of the overall proxy-pool and task-pool state.
///
/// Returned by `ScatterProxy::metrics()` so callers can build dashboards,
/// alerting, or adaptive back-pressure on top of the scheduler.
#[derive(Debug, Clone, Default)]
pub struct PoolMetrics {
    // ── Proxy pool ───────────────────────────────────────────────────
    pub total_proxies: usize,
    pub healthy_proxies: usize,
    pub cooldown_proxies: usize,
    pub dead_proxies: usize,

    // ── Task pool ────────────────────────────────────────────────────
    pub pending_tasks: usize,
    pub delayed_tasks: usize,
    pub completed_tasks: u64,
    pub failed_tasks: u64,

    // ── Throughput (sliding window) ──────────────────────────────────
    pub throughput_1s: f64,
    pub throughput_10s: f64,
    pub throughput_60s: f64,

    // ── Quality ──────────────────────────────────────────────────────
    pub success_rate_1m: f64,
    pub avg_latency_ms: f64,

    // ── Resources ────────────────────────────────────────────────────
    pub inflight: usize,
    pub requeued_tasks: u64,
    pub zero_available_events: u64,
    pub skipped_no_permit: u64,
    pub skipped_rate_limit: u64,
    pub skipped_cooldown: u64,
    pub dispatch_count: u64,
}

/// Per-(proxy, host) statistics snapshot (user-facing / serialisable).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyHostStats {
    pub success: u32,
    pub fail: u32,
    pub success_rate: f64,
    pub avg_latency_ms: f64,
    pub consecutive_fails: u32,
}

impl Default for ProxyHostStats {
    fn default() -> Self {
        Self {
            success: 0,
            fail: 0,
            success_rate: 0.0,
            avg_latency_ms: 0.0,
            consecutive_fails: 0,
        }
    }
}

// ─── Internal throughput tracker ─────────────────────────────────────────────

/// Sliding-window throughput tracker.
///
/// Stores the [`Instant`] of every recorded event and computes throughput
/// (events / second) over arbitrary windows up to `max_window`.
///
/// Thread-safe: the inner [`VecDeque`] is wrapped in a [`Mutex`].
pub(crate) struct ThroughputTracker {
    timestamps: Mutex<VecDeque<Instant>>,
    max_window: Duration,
}

impl ThroughputTracker {
    /// Create a new tracker that retains timestamps for 60 seconds.
    pub fn new() -> Self {
        Self::with_max_window(Duration::from_secs(60))
    }

    /// Create a tracker with a custom retention window.
    pub fn with_max_window(max_window: Duration) -> Self {
        Self {
            timestamps: Mutex::new(VecDeque::new()),
            max_window,
        }
    }

    /// Record a completed event at the current instant.
    pub fn record(&self) {
        self.record_at(Instant::now());
    }

    /// Record a completed event at a specific instant (useful for testing).
    pub(crate) fn record_at(&self, now: Instant) {
        let mut ts = self.timestamps.lock().unwrap();
        ts.push_back(now);
        Self::prune(&mut ts, now, self.max_window);
    }

    /// Compute throughput (events per second) over the given `window`.
    ///
    /// If `window` is zero the result is `0.0`.
    pub fn throughput(&self, window: Duration) -> f64 {
        self.throughput_at(Instant::now(), window)
    }

    /// Compute throughput relative to a specific instant (useful for testing).
    pub(crate) fn throughput_at(&self, now: Instant, window: Duration) -> f64 {
        if window.is_zero() {
            return 0.0;
        }
        let mut ts = self.timestamps.lock().unwrap();
        Self::prune(&mut ts, now, self.max_window);

        let cutoff = now.checked_sub(window).unwrap_or(now);
        let count = ts.iter().filter(|&&t| t >= cutoff).count();
        count as f64 / window.as_secs_f64()
    }

    /// Remove timestamps older than `max_window` before `now`.
    fn prune(ts: &mut VecDeque<Instant>, now: Instant, max_window: Duration) {
        let cutoff = now.checked_sub(max_window).unwrap_or(now);
        while let Some(&front) = ts.front() {
            if front < cutoff {
                ts.pop_front();
            } else {
                break;
            }
        }
    }

    /// Return the number of timestamps currently stored (after pruning).
    #[cfg(test)]
    fn len(&self) -> usize {
        let ts = self.timestamps.lock().unwrap();
        ts.len()
    }
}

// ─── Tests ───────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    // ── PoolMetrics ──────────────────────────────────────────────────

    #[test]
    fn pool_metrics_default_is_zeroed() {
        let m = PoolMetrics::default();
        assert_eq!(m.total_proxies, 0);
        assert_eq!(m.healthy_proxies, 0);
        assert_eq!(m.cooldown_proxies, 0);
        assert_eq!(m.dead_proxies, 0);
        assert_eq!(m.pending_tasks, 0);
        assert_eq!(m.delayed_tasks, 0);
        assert_eq!(m.completed_tasks, 0);
        assert_eq!(m.failed_tasks, 0);
        assert!((m.throughput_1s).abs() < f64::EPSILON);
        assert!((m.throughput_10s).abs() < f64::EPSILON);
        assert!((m.throughput_60s).abs() < f64::EPSILON);
        assert!((m.success_rate_1m).abs() < f64::EPSILON);
        assert!((m.avg_latency_ms).abs() < f64::EPSILON);
        assert_eq!(m.inflight, 0);
        assert_eq!(m.requeued_tasks, 0);
        assert_eq!(m.zero_available_events, 0);
        assert_eq!(m.skipped_no_permit, 0);
        assert_eq!(m.skipped_rate_limit, 0);
        assert_eq!(m.skipped_cooldown, 0);
        assert_eq!(m.dispatch_count, 0);
    }

    #[test]
    fn pool_metrics_is_clone() {
        let m = PoolMetrics {
            total_proxies: 42,
            ..Default::default()
        };
        let m2 = m.clone();
        assert_eq!(m2.total_proxies, 42);
    }

    // ── ProxyHostStats ───────────────────────────────────────────────

    #[test]
    fn proxy_host_stats_default() {
        let s = ProxyHostStats::default();
        assert_eq!(s.success, 0);
        assert_eq!(s.fail, 0);
        assert!((s.success_rate).abs() < f64::EPSILON);
        assert!((s.avg_latency_ms).abs() < f64::EPSILON);
        assert_eq!(s.consecutive_fails, 0);
    }

    #[test]
    fn proxy_host_stats_serde_round_trip() {
        let stats = ProxyHostStats {
            success: 10,
            fail: 2,
            success_rate: 0.833,
            avg_latency_ms: 120.5,
            consecutive_fails: 0,
        };
        let json = serde_json::to_string(&stats).unwrap();
        let deser: ProxyHostStats = serde_json::from_str(&json).unwrap();
        assert_eq!(deser.success, 10);
        assert_eq!(deser.fail, 2);
        assert!((deser.success_rate - 0.833).abs() < 1e-6);
        assert!((deser.avg_latency_ms - 120.5).abs() < 1e-6);
        assert_eq!(deser.consecutive_fails, 0);
    }

    // ── ThroughputTracker ────────────────────────────────────────────

    #[test]
    fn new_tracker_is_empty() {
        let t = ThroughputTracker::new();
        assert_eq!(t.len(), 0);
        assert!((t.throughput(Duration::from_secs(1))).abs() < f64::EPSILON);
    }

    #[test]
    fn record_increases_count() {
        let t = ThroughputTracker::new();
        t.record();
        assert_eq!(t.len(), 1);
        t.record();
        assert_eq!(t.len(), 2);
    }

    #[test]
    fn throughput_with_zero_window_is_zero() {
        let t = ThroughputTracker::new();
        t.record();
        assert!((t.throughput(Duration::ZERO)).abs() < f64::EPSILON);
    }

    #[test]
    fn throughput_over_1s_window() {
        let t = ThroughputTracker::new();
        let now = Instant::now();

        // Record 5 events all within the last second.
        for i in 0..5 {
            t.record_at(now - Duration::from_millis(100 * i));
        }

        let tp = t.throughput_at(now, Duration::from_secs(1));
        // 5 events in 1 second = 5.0/s
        assert!((tp - 5.0).abs() < 0.01);
    }

    #[test]
    fn throughput_over_10s_window() {
        let t = ThroughputTracker::new();
        let now = Instant::now();

        // 20 events spread over the last 10 seconds.
        for i in 0..20 {
            t.record_at(now - Duration::from_millis(500 * i));
        }

        let tp = t.throughput_at(now, Duration::from_secs(10));
        // 20 events in 10 seconds = 2.0/s
        assert!((tp - 2.0).abs() < 0.01);
    }

    #[test]
    fn old_entries_are_pruned() {
        let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
        let now = Instant::now();

        // Record an event 10s ago — beyond the 5s max_window.
        t.record_at(now - Duration::from_secs(10));
        // Record an event 1s ago — within the window.
        t.record_at(now - Duration::from_secs(1));

        // Trigger prune by computing throughput.
        let tp = t.throughput_at(now, Duration::from_secs(5));
        // Only the recent event should survive.
        assert!((tp - 0.2).abs() < 0.01); // 1 event / 5s = 0.2
        assert_eq!(t.len(), 1);
    }

    #[test]
    fn window_larger_than_max_window_still_works() {
        let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
        let now = Instant::now();

        t.record_at(now - Duration::from_secs(3));
        t.record_at(now - Duration::from_secs(1));

        // Ask for 60s window, but tracker only keeps 5s.
        let tp = t.throughput_at(now, Duration::from_secs(60));
        // 2 events / 60s ≈ 0.033
        assert!((tp - 2.0 / 60.0).abs() < 0.01);
    }

    #[test]
    fn concurrent_access_does_not_panic() {
        use std::sync::Arc;
        use std::thread;

        let tracker = Arc::new(ThroughputTracker::new());
        let mut handles = Vec::new();

        for _ in 0..8 {
            let t = Arc::clone(&tracker);
            handles.push(thread::spawn(move || {
                for _ in 0..100 {
                    t.record();
                    let _ = t.throughput(Duration::from_secs(1));
                }
            }));
        }

        for h in handles {
            h.join().unwrap();
        }

        // All 800 events should be recorded (within the 60s window, they all are).
        assert_eq!(tracker.len(), 800);
    }

    #[test]
    fn throughput_only_counts_events_within_requested_window() {
        let t = ThroughputTracker::new();
        let now = Instant::now();

        // 3 events 500ms ago.
        for _ in 0..3 {
            t.record_at(now - Duration::from_millis(500));
        }
        // 2 events 5s ago.
        for _ in 0..2 {
            t.record_at(now - Duration::from_secs(5));
        }

        // 1s window should only see the 3 recent events.
        let tp_1s = t.throughput_at(now, Duration::from_secs(1));
        assert!((tp_1s - 3.0).abs() < 0.01);

        // 10s window should see all 5.
        let tp_10s = t.throughput_at(now, Duration::from_secs(10));
        assert!((tp_10s - 0.5).abs() < 0.01);
    }

    #[test]
    fn custom_max_window() {
        let t = ThroughputTracker::with_max_window(Duration::from_secs(2));
        assert_eq!(t.max_window, Duration::from_secs(2));
    }

    #[test]
    fn default_max_window_is_60s() {
        let t = ThroughputTracker::new();
        assert_eq!(t.max_window, Duration::from_secs(60));
    }
}