scatter-proxy 0.2.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
# scatter-proxy

[![CI](https://github.com/letllmrun/scatter-proxy/actions/workflows/ci.yml/badge.svg)](https://github.com/letllmrun/scatter-proxy/actions/workflows/ci.yml)
[![Crates.io](https://img.shields.io/crates/v/scatter-proxy.svg)](https://crates.io/crates/scatter-proxy)
[![docs.rs](https://docs.rs/scatter-proxy/badge.svg)](https://docs.rs/scatter-proxy)
[![License](https://img.shields.io/crates/l/scatter-proxy.svg)](LICENSE)

**Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput.**

## Features

- **Multi-path race** — fan out each request across *K* proxies simultaneously; first successful response wins, losers are cancelled.
- **Adaptive fan-out***K* auto-adjusts based on current success rate and the number of healthy proxies.
- **Per-(proxy, host) rate limiting** — configurable minimum intervals with per-host overrides prevent tripping upstream abuse detection.
- **Health tracking** — sliding-window success/failure counters per (proxy, host) pair with latency stats.
- **Exponential-backoff cooldown** — consecutively-failing (proxy, host) pairs back off automatically.
- **Per-host circuit breakers** — Open → HalfOpen → Closed state machine prevents thundering-herd retries.
- **Proxy eviction** — proxies with 0% global success rate after sufficient samples are marked dead.
- **`socks5h://` by default** — DNS resolution happens on the proxy side, preventing local DNS leaks.
- **State persistence** — JSON snapshots with atomic writes enable hot restarts with no warm-up penalty.
- **Automatic source refresh** — proxy lists are periodically re-fetched from configured URLs.
- **Pluggable body classifier** — implement the `BodyClassifier` trait to decide whether a response is good, blocked, or errored.
- **Observability** — structured logging via `tracing` with periodic metrics summaries and real-time `PoolMetrics`.

## Quick Start

```rust
use scatter_proxy::{
    ScatterProxy, ScatterProxyConfig, RateLimitConfig,
    DefaultClassifier, TaskHandle, ScatterResponse,
};
use std::collections::HashMap;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Sources are optional — scatter-proxy ships with default free proxy sources.
    // Just override the fields you care about:
    let config = ScatterProxyConfig {
        // sources: vec!["https://example.com/socks5.txt".into()],  // optional override
        rate_limit: RateLimitConfig {
            default_interval: Duration::from_millis(500),
            host_overrides: HashMap::from([
                ("slow.com".into(), Duration::from_secs(1)),
            ]),
        },
        max_concurrent_per_request: 3,
        state_file: Some("proxy_state.json".into()),
        prefer_remote_dns: true,
        ..Default::default()
    };

    // Build the proxy pool with the default response classifier.
    let pool = ScatterProxy::new(config, DefaultClassifier).await?;

    // Submit a single request.
    let client = reqwest::Client::new();
    let req = client.get("http://httpbin.org/ip").build()?;
    let handle: TaskHandle = pool.submit(req)?;
    let response: ScatterResponse = handle.await?;

    // ScatterResponse has: status (StatusCode), headers (HeaderMap), body (Bytes)
    println!("Status: {}", response.status);
    println!("Body:   {:?}", response.body);

    // Submit a batch of requests.
    let reqs = vec![
        client.get("http://httpbin.org/ip").build()?,
        client.get("http://httpbin.org/headers").build()?,
        client.get("http://httpbin.org/user-agent").build()?,
    ];
    let handles = pool.submit_batch(reqs)?;
    for h in handles {
        let resp = h.await?;
        println!("  → {} ({} bytes)", resp.status, resp.body.len());
    }

    // Check real-time metrics.
    let m = pool.metrics();
    println!(
        "Proxies: {} healthy / {} dead | Success rate: {:.0}%",
        m.healthy_proxies, m.dead_proxies, m.success_rate_1m * 100.0,
    );

    // Graceful shutdown (persists state to disk).
    pool.shutdown().await;

    Ok(())
}
```

## Custom Classifier

Implement `BodyClassifier` to control how responses are categorised:

```rust
use scatter_proxy::{BodyClassifier, BodyVerdict, StatusCode, HeaderMap};

struct MyClassifier;

impl BodyClassifier for MyClassifier {
    fn classify(&self, status: StatusCode, headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
        if status.is_success() && !body.is_empty() {
            BodyVerdict::Success
        } else if status.is_server_error() {
            BodyVerdict::TargetError
        } else {
            BodyVerdict::ProxyBlocked
        }
    }
}

// Then pass it when constructing the pool:
// let pool = ScatterProxy::new(config, MyClassifier).await?;
```

## Configuration

All fields on `ScatterProxyConfig` with their types and defaults:

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `sources` | `Vec<String>` | `[]` (uses built-in free proxy lists) | URLs of proxy source lists (line-delimited `ip:port`). When empty, defaults to `DEFAULT_PROXY_SOURCES` — a curated set of free SOCKS5 lists. |
| `source_refresh_interval` | `Duration` | `600s` | How often to re-fetch proxy sources |
| `rate_limit` | `RateLimitConfig` | *(see below)* | Per-(proxy, host) rate-limiting settings |
| `proxy_timeout` | `Duration` | `8s` | Timeout for a single proxy connection attempt |
| `task_timeout` | `Duration` | `60s` | Overall timeout per task from submission to final failure |
| `max_concurrent_per_request` | `usize` | `3` | Number of proxy paths raced per request (*K*) |
| `max_inflight` | `usize` | `100` | Global in-flight concurrency limit |
| `max_attempts` | `usize` | `5` | Maximum scheduling attempts per task |
| `task_pool_capacity` | `usize` | `1000` | Maximum number of pending tasks in the pool |
| `health_window` | `usize` | `30` | Sliding window size for health tracking |
| `cooldown_base` | `Duration` | `30s` | Base cooldown after consecutive failures |
| `cooldown_max` | `Duration` | `300s` | Maximum cooldown duration |
| `cooldown_consecutive_fails` | `usize` | `3` | Consecutive failures before entering cooldown |
| `eviction_min_samples` | `usize` | `30` | Minimum samples before a proxy can be evicted |
| `circuit_breaker_threshold` | `usize` | `10` | Target errors before tripping the host circuit breaker |
| `circuit_breaker_probe_interval` | `Duration` | `30s` | Interval between probe requests when breaker is open |
| `state_file` | `Option<PathBuf>` | `None` | File path for JSON state persistence |
| `state_save_interval` | `Duration` | `300s` | How often to persist state to disk |
| `metrics_log_interval` | `Duration` | `30s` | How often to log the metrics summary line |
| `prefer_remote_dns` | `bool` | `true` | Use `socks5h://` for remote DNS resolution |

**`RateLimitConfig` fields:**

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `default_interval` | `Duration` | `500ms` | Minimum interval between requests per (proxy, host) pair |
| `host_overrides` | `HashMap<String, Duration>` | `{}` | Per-host interval overrides |

## Architecture

```text
                  ┌──────────────┐
                  │  Client Code │
                  └──────┬───────┘
                         │ submit(request)
                  ┌──────────────┐
                  │  ScatterProxy│
                  │  (TaskPool)  │
                  └──────┬───────┘
                         │ pick K healthy proxies
               ┌─────────────────────┐
               │     Scheduler       │
               │  ┌───────────────┐  │
               │  │ Rate Limiters │  │
               │  │ Health Scores │  │
               │  │ Circuit Brkrs │  │
               │  │ Adaptive K    │  │
               │  └───────────────┘  │
               └─────────┬──────────-┘
                         │ fan-out K paths
            ┌────────────┼────────────┐
            ▼            ▼            ▼
      ┌──────────┐ ┌──────────┐ ┌──────────┐
      │ Proxy #1 │ │ Proxy #2 │ │ Proxy #3 │
      │ socks5h  │ │ socks5h  │ │ socks5h  │
      └────┬─────┘ └────┬─────┘ └────┬─────┘
           │             │             │
           └─────────────┼─────────────┘
                         │ first good response wins
               ┌───────────────────┐
               │ BodyClassifier    │
               │ → Success         │
               │ → ProxyBlocked    │
               │ → TargetError     │
               └────────┬──────────┘
               ┌───────────────────┐
               │ ScatterResponse   │
               │ {status, headers, │
               │  body}            │
               └───────────────────┘

  Background tasks:
    • Scheduler loop — polls TaskPool, dispatches fan-out attempts
    • State persistence — periodic JSON snapshots (atomic writes)
    • Metrics logger — periodic tracing summary lines
    • Source refresh — re-fetches proxy lists on interval
```

## Integration Testing

Unit tests run without any external dependencies:

```sh
cargo test
```

Integration tests exercise the full pipeline with real free SOCKS5 proxy sources from the internet. They are marked `#[ignore]` by default. To run them:

```sh
SCATTER_INTEGRATION=1 cargo test -- --ignored
```

> **Note:** Integration tests require network access and depend on third-party proxy sources being available. They may be flaky in CI due to proxy churn.

## License

Licensed under either of

- Apache License, Version 2.0 ([LICENSE-APACHE]LICENSE-APACHE or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT License ([LICENSE-MIT]LICENSE-MIT or <http://opensource.org/licenses/MIT>)

at your option.