scatter-proxy 0.7.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation

scatter-proxy

CI Crates.io docs.rs License

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput.

Features

Compatibility note: Proxy mode no longer implicitly includes a DIRECT fallback. If you want direct traffic, disable proxy mode in the caller instead of relying on direct://localhost being auto-inserted into the race.

  • Per-host pool isolation — [ScatterProxyRouter] assigns each target host its own proxy pool, health tracker, and scheduler. A struggling host cannot starve tasks for healthy hosts, and proxy eviction is scoped per-host.
  • Multi-path race — fan out each request across K proxies simultaneously; first successful response wins, losers are cancelled.
  • Adaptive fan-outK auto-adjusts based on current success rate and the number of healthy proxies; boosts automatically during cold start to collect data fast.
  • Never-fail scheduler — the scheduler retries indefinitely; tasks are never dropped due to attempt limits or internal timeouts. Caller-side deadlines are opt-in via handle.with_timeout() or tokio::time::timeout.
  • Per-(proxy, host) rate limiting — configurable minimum intervals with per-host overrides prevent tripping upstream abuse detection.
  • Health tracking — sliding-window success/failure counters per (proxy, host) pair with latency stats.
  • Exponential-backoff cooldown — consecutively-failing (proxy, host) pairs back off automatically.
  • Delayed retry queue — temporarily unrunnable tasks are parked until their next eligible time instead of hot-loop requeueing.
  • Proxy eviction — proxies with 0% global success rate after sufficient samples are marked dead.
  • socks5h:// by default — DNS resolution happens on the proxy side, preventing local DNS leaks.
  • State persistence — JSON snapshots with atomic writes enable hot restarts with no warm-up penalty.
  • Automatic source refresh — proxy lists are periodically re-fetched from configured URLs.
  • Pluggable body classifier — implement the BodyClassifier trait to decide whether a response is good, blocked, or errored.
  • Observability — structured logging via tracing with periodic metrics summaries and real-time PoolMetrics.

Quick Start

use scatter_proxy::{
    ScatterProxy, ScatterProxyConfig, RateLimitConfig,
    DefaultClassifier, ScatterResponse,
};
use std::collections::HashMap;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Sources are optional — scatter-proxy ships with default free proxy sources.
    // Just override the fields you care about:
    let config = ScatterProxyConfig {
        // sources: vec!["https://example.com/socks5.txt".into()],  // optional override
        rate_limit: RateLimitConfig {
            default_interval: Duration::from_millis(500),
            host_overrides: HashMap::from([
                ("slow.com".into(), Duration::from_secs(1)),
            ]),
        },
        max_concurrent_per_request: 3,
        state_file: Some("proxy_state.json".into()),
        prefer_remote_dns: true,
        ..Default::default()
    };

    // Build the proxy pool with the default response classifier.
    let pool = ScatterProxy::new(config, DefaultClassifier).await?;

    let client = reqwest::Client::new();

    // Submit a single request.  `submit` blocks if the pool is full, then
    // the returned handle blocks until a successful proxied response arrives.
    // The scheduler retries internally forever — use with_timeout for a deadline.
    let req = client.get("http://httpbin.org/ip").build()?;
    let response: ScatterResponse = pool.submit(req).await
        .with_timeout(Duration::from_secs(30)).await?
        .expect("request still pending after timeout");

    // ScatterResponse has: status (StatusCode), headers (HeaderMap), body (Bytes)
    println!("Status: {}", response.status);
    println!("Body:   {:?}", response.body);

    // Non-blocking submit: returns Err(PoolFull) immediately when the pool is full.
    let req2 = client.get("http://httpbin.org/ip").build()?;
    if let Ok(handle) = pool.try_submit(req2) {
        let _ = handle.with_timeout(Duration::from_secs(30)).await;
    }

    // Submit a batch of requests (blocks until capacity, sequentially).
    let reqs = vec![
        client.get("http://httpbin.org/ip").build()?,
        client.get("http://httpbin.org/headers").build()?,
        client.get("http://httpbin.org/user-agent").build()?,
    ];
    let handles = pool.submit_batch(reqs).await;
    for h in handles {
        if let Some(resp) = h.with_timeout(Duration::from_secs(30)).await? {
            println!("{} ({} bytes)", resp.status, resp.body.len());
        }
    }

    // Check real-time metrics.
    let m = pool.metrics();
    println!(
        "Proxies: {} healthy / {} dead | Success rate: {:.0}%",
        m.healthy_proxies, m.dead_proxies, m.success_rate_1m * 100.0,
    );

    // Graceful shutdown (persists state to disk).
    pool.shutdown().await;

    Ok(())
}

Per-Host Pool Isolation

When crawling multiple target sites, use ScatterProxyRouter to give each host its own independent proxy pool. A host where all proxies are in cooldown or getting blocked cannot affect throughput for other hosts.

use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // One pool per host — all start with the full proxy list and evolve independently.
    let router = ScatterProxyRouter::new(
        ["szse.cn", "sse.com.cn", "cninfo.com.cn"],
        ScatterProxyConfig {
            // sources: vec!["https://example.com/socks5.txt".into()],
            ..Default::default()
        },
        DefaultClassifier,
    ).await?;

    let client = reqwest::Client::new();

    // Requests are routed to the matching host pool automatically.
    let req = client.get("http://szse.cn/api/data").build()?;
    let handle = router.submit(req).await?;
    // handle.with_timeout(…) as usual

    // Per-host metrics — see exactly which host is struggling.
    for (host, m) in router.all_metrics() {
        println!(
            "[{host}] success={:.0}% proxies={} pending={}",
            m.success_rate_1m * 100.0,
            m.healthy_proxies,
            m.pending_tasks,
        );
    }

    router.shutdown().await;
    Ok(())
}

Each host's metrics log lines are automatically prefixed with [hostname] so multi-host output is easy to grep:

INFO [szse.cn] throughput=2.1/s | success=18% | pool: 4016 healthy …
INFO [sse.com.cn] throughput=8.4/s | success=71% | pool: 4016 healthy …

Custom Classifier

Implement BodyClassifier to control how responses are categorised:

use scatter_proxy::{BodyClassifier, BodyVerdict, StatusCode, HeaderMap};

struct MyClassifier;

impl BodyClassifier for MyClassifier {
    fn classify(&self, status: StatusCode, headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
        if status.is_success() && !body.is_empty() {
            BodyVerdict::Success
        } else if status.is_server_error() {
            BodyVerdict::TargetError
        } else {
            BodyVerdict::ProxyBlocked
        }
    }
}

// Then pass it when constructing the pool:
// let pool = ScatterProxy::new(config, MyClassifier).await?;

Configuration

All fields on ScatterProxyConfig with their types and defaults:

Field Type Default Description
sources Vec<String> [] (uses built-in free proxy lists) URLs of proxy source lists (line-delimited ip:port). When empty, defaults to DEFAULT_PROXY_SOURCES — a curated set of free SOCKS5 lists.
source_refresh_interval Duration 600s How often to re-fetch proxy sources
rate_limit RateLimitConfig (see below) Per-(proxy, host) rate-limiting settings
proxy_timeout Duration 8s Timeout for a single proxy connection attempt
max_concurrent_per_request usize 3 Base number of proxy paths raced per request (K); boosted automatically during cold start
max_inflight usize 100 Global in-flight concurrency limit
task_pool_capacity usize 1000 Maximum number of pending tasks in the pool
health_window usize 30 Sliding window size for health tracking
cooldown_base Duration 30s Base cooldown after consecutive failures
cooldown_max Duration 300s Maximum cooldown duration
cooldown_consecutive_fails usize 3 Consecutive failures before entering cooldown
eviction_min_samples usize 30 Minimum samples before a proxy can be evicted
state_file Option<PathBuf> None File path for JSON state persistence
state_save_interval Duration 300s How often to persist state to disk
metrics_log_interval Duration 30s How often to log the metrics summary line
prefer_remote_dns bool true Use socks5h:// for remote DNS resolution
name Option<String> None Label prepended to metrics log lines as [name]; set automatically by ScatterProxyRouter

RateLimitConfig fields:

Field Type Default Description
default_interval Duration 500ms Minimum interval between requests per (proxy, host) pair
host_overrides HashMap<String, Duration> {} Per-host interval overrides

Architecture

                  ┌──────────────┐
                  │  Client Code │
                  └──────┬───────┘
                         │ submit(request)
                         ▼
                  ┌──────────────┐
                  │  ScatterProxy│
                  │  (TaskPool)  │
                  └──────┬───────┘
                         │ pick K healthy proxies
                         ▼
               ┌─────────────────────┐
               │     Scheduler       │
               │  ┌───────────────┐  │
               │  │ Rate Limiters │  │
               │  │ Health Scores │  │
               │  │ Ready Queue   │  │
               │  │ Delayed Heap  │  │
               │  │ Adaptive K    │  │
               │  └───────────────┘  │
               └─────────┬──────────-┘
                         │ fan-out K paths
            ┌────────────┼────────────┐
            ▼            ▼            ▼
      ┌──────────┐ ┌──────────┐ ┌──────────┐
      │ Proxy #1 │ │ Proxy #2 │ │ Proxy #3 │
      │ socks5h  │ │ socks5h  │ │ socks5h  │
      └────┬─────┘ └────┬─────┘ └────┬─────┘
           │             │             │
           └─────────────┼─────────────┘
                         │ first good response wins
                         ▼
               ┌───────────────────┐
               │ BodyClassifier    │
               │ → Success         │
               │ → ProxyBlocked    │
               │ → TargetError     │
               └────────┬──────────┘
                        │
                        ▼
               ┌───────────────────┐
               │ ScatterResponse   │
               │ {status, headers, │
               │  body}            │
               └───────────────────┘

  Background tasks:
    • Scheduler workers — drain ready queue, promote delayed tasks, dispatch fan-out attempts
    • State persistence — periodic JSON snapshots (atomic writes)
    • Metrics logger — periodic tracing summary lines
    • Source refresh — re-fetches proxy lists on interval

Integration Testing

Unit tests run without any external dependencies:

cargo test

Integration tests exercise the full pipeline with real free SOCKS5 proxy sources from the internet. They are marked #[ignore] by default. To run them:

SCATTER_INTEGRATION=1 cargo test -- --ignored

Note: Integration tests require network access and depend on third-party proxy sources being available. They may be flaky in CI due to proxy churn.

Migration Notes

  • Proxy mode no longer auto-inserts direct://localhost. If you need direct traffic, disable proxy mode in the caller instead of relying on implicit fallback.
  • The circuit breaker subsystem has been removed. Temporary unrunnable tasks are now handled by the delayed retry queue.
  • PoolMetrics no longer exposes circuit-breaker state. New counters include delayed tasks, requeues, zero-available events, dispatch count, and skip reasons.

License

Licensed under either of

at your option.