scatter-proxy 0.1.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation

scatter-proxy

CI Crates.io docs.rs License

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput.

Features

  • Multi-path race — fan out each request across K proxies simultaneously; first successful response wins, losers are cancelled.
  • Adaptive fan-outK auto-adjusts based on current success rate and the number of healthy proxies.
  • Per-(proxy, host) rate limiting — configurable minimum intervals with per-host overrides prevent tripping upstream abuse detection.
  • Health tracking — sliding-window success/failure counters per (proxy, host) pair with latency stats.
  • Exponential-backoff cooldown — consecutively-failing (proxy, host) pairs back off automatically.
  • Per-host circuit breakers — Open → HalfOpen → Closed state machine prevents thundering-herd retries.
  • Proxy eviction — proxies with 0% global success rate after sufficient samples are marked dead.
  • socks5h:// by default — DNS resolution happens on the proxy side, preventing local DNS leaks.
  • State persistence — JSON snapshots with atomic writes enable hot restarts with no warm-up penalty.
  • Automatic source refresh — proxy lists are periodically re-fetched from configured URLs.
  • Pluggable body classifier — implement the BodyClassifier trait to decide whether a response is good, blocked, or errored.
  • Observability — structured logging via tracing with periodic metrics summaries and real-time PoolMetrics.

Quick Start

use scatter_proxy::{
    ScatterProxy, ScatterProxyConfig, RateLimitConfig,
    DefaultClassifier, TaskHandle, ScatterResponse,
};
use std::collections::HashMap;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ScatterProxyConfig {
        sources: vec!["https://example.com/socks5.txt".into()],
        rate_limit: RateLimitConfig {
            default_interval: Duration::from_millis(500),
            host_overrides: HashMap::from([
                ("slow.com".into(), Duration::from_secs(1)),
            ]),
        },
        max_concurrent_per_request: 3,
        state_file: Some("proxy_state.json".into()),
        prefer_remote_dns: true,
        ..Default::default()
    };

    // Build the proxy pool with the default response classifier.
    let pool = ScatterProxy::new(config, DefaultClassifier).await?;

    // Submit a single request.
    let client = reqwest::Client::new();
    let req = client.get("http://httpbin.org/ip").build()?;
    let handle: TaskHandle = pool.submit(req)?;
    let response: ScatterResponse = handle.await?;

    // ScatterResponse has: status (StatusCode), headers (HeaderMap), body (Bytes)
    println!("Status: {}", response.status);
    println!("Body:   {:?}", response.body);

    // Submit a batch of requests.
    let reqs = vec![
        client.get("http://httpbin.org/ip").build()?,
        client.get("http://httpbin.org/headers").build()?,
        client.get("http://httpbin.org/user-agent").build()?,
    ];
    let handles = pool.submit_batch(reqs)?;
    for h in handles {
        let resp = h.await?;
        println!("{} ({} bytes)", resp.status, resp.body.len());
    }

    // Check real-time metrics.
    let m = pool.metrics();
    println!(
        "Proxies: {} healthy / {} dead | Success rate: {:.0}%",
        m.healthy_proxies, m.dead_proxies, m.success_rate_1m * 100.0,
    );

    // Graceful shutdown (persists state to disk).
    pool.shutdown().await;

    Ok(())
}

Custom Classifier

Implement BodyClassifier to control how responses are categorised:

use scatter_proxy::{BodyClassifier, BodyVerdict, StatusCode, HeaderMap};

struct MyClassifier;

impl BodyClassifier for MyClassifier {
    fn classify(&self, status: StatusCode, headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
        if status.is_success() && !body.is_empty() {
            BodyVerdict::Success
        } else if status.is_server_error() {
            BodyVerdict::TargetError
        } else {
            BodyVerdict::ProxyBlocked
        }
    }
}

// Then pass it when constructing the pool:
// let pool = ScatterProxy::new(config, MyClassifier).await?;

Configuration

All fields on ScatterProxyConfig with their types and defaults:

Field Type Default Description
sources Vec<String> [] URLs of proxy source lists (line-delimited ip:port)
source_refresh_interval Duration 600s How often to re-fetch proxy sources
rate_limit RateLimitConfig (see below) Per-(proxy, host) rate-limiting settings
proxy_timeout Duration 8s Timeout for a single proxy connection attempt
task_timeout Duration 60s Overall timeout per task from submission to final failure
max_concurrent_per_request usize 3 Number of proxy paths raced per request (K)
max_inflight usize 100 Global in-flight concurrency limit
max_attempts usize 5 Maximum scheduling attempts per task
task_pool_capacity usize 1000 Maximum number of pending tasks in the pool
health_window usize 30 Sliding window size for health tracking
cooldown_base Duration 30s Base cooldown after consecutive failures
cooldown_max Duration 300s Maximum cooldown duration
cooldown_consecutive_fails usize 3 Consecutive failures before entering cooldown
eviction_min_samples usize 30 Minimum samples before a proxy can be evicted
circuit_breaker_threshold usize 10 Target errors before tripping the host circuit breaker
circuit_breaker_probe_interval Duration 30s Interval between probe requests when breaker is open
state_file Option<PathBuf> None File path for JSON state persistence
state_save_interval Duration 300s How often to persist state to disk
metrics_log_interval Duration 30s How often to log the metrics summary line
prefer_remote_dns bool true Use socks5h:// for remote DNS resolution

RateLimitConfig fields:

Field Type Default Description
default_interval Duration 500ms Minimum interval between requests per (proxy, host) pair
host_overrides HashMap<String, Duration> {} Per-host interval overrides

Architecture

                  ┌──────────────┐
                  │  Client Code │
                  └──────┬───────┘
                         │ submit(request)
                         ▼
                  ┌──────────────┐
                  │  ScatterProxy│
                  │  (TaskPool)  │
                  └──────┬───────┘
                         │ pick K healthy proxies
                         ▼
               ┌─────────────────────┐
               │     Scheduler       │
               │  ┌───────────────┐  │
               │  │ Rate Limiters │  │
               │  │ Health Scores │  │
               │  │ Circuit Brkrs │  │
               │  │ Adaptive K    │  │
               │  └───────────────┘  │
               └─────────┬──────────-┘
                         │ fan-out K paths
            ┌────────────┼────────────┐
            ▼            ▼            ▼
      ┌──────────┐ ┌──────────┐ ┌──────────┐
      │ Proxy #1 │ │ Proxy #2 │ │ Proxy #3 │
      │ socks5h  │ │ socks5h  │ │ socks5h  │
      └────┬─────┘ └────┬─────┘ └────┬─────┘
           │             │             │
           └─────────────┼─────────────┘
                         │ first good response wins
                         ▼
               ┌───────────────────┐
               │ BodyClassifier    │
               │ → Success         │
               │ → ProxyBlocked    │
               │ → TargetError     │
               └────────┬──────────┘
                        │
                        ▼
               ┌───────────────────┐
               │ ScatterResponse   │
               │ {status, headers, │
               │  body}            │
               └───────────────────┘

  Background tasks:
    • Scheduler loop — polls TaskPool, dispatches fan-out attempts
    • State persistence — periodic JSON snapshots (atomic writes)
    • Metrics logger — periodic tracing summary lines
    • Source refresh — re-fetches proxy lists on interval

Integration Testing

Unit tests run without any external dependencies:

cargo test

Integration tests exercise the full pipeline with real free SOCKS5 proxy sources from the internet. They are marked #[ignore] by default. To run them:

SCATTER_INTEGRATION=1 cargo test -- --ignored

Note: Integration tests require network access and depend on third-party proxy sources being available. They may be flaky in CI due to proxy churn.

License

Licensed under either of

at your option.