scatter-proxy 0.8.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
# scatter-proxy

[![CI](https://ci.codeberg.org/api/badges/letllmrun/scatter-proxy/status.svg)](https://ci.codeberg.org/letllmrun/scatter-proxy)
[![Crates.io](https://img.shields.io/crates/v/scatter-proxy.svg)](https://crates.io/crates/scatter-proxy)
[![docs.rs](https://docs.rs/scatter-proxy/badge.svg)](https://docs.rs/scatter-proxy)
[![License](https://img.shields.io/crates/l/scatter-proxy.svg)](LICENSE)

**Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput.**

## Features

- **Per-host pool isolation** — [`ScatterProxyRouter`] assigns each target host its own proxy pool, health tracker, and scheduler with its own independent `ScatterProxyConfig`.  A struggling host cannot starve tasks for healthy hosts, and proxy eviction is scoped per-host.
- **Multi-path race** — fan out each request across *K* proxies simultaneously; first successful response wins, losers are cancelled.
- **Adaptive fan-out***K* auto-adjusts based on current success rate and the number of healthy proxies; boosts automatically during cold start to collect data fast.
- **Never-fail scheduler** — the scheduler retries indefinitely; tasks are never dropped due to attempt limits or internal timeouts.  Caller-side deadlines are opt-in via `handle.with_timeout()` or `tokio::time::timeout`.
- **Per-(proxy, host) rate limiting** — configurable minimum intervals with per-host overrides prevent tripping upstream abuse detection.
- **Health tracking** — sliding-window success/failure counters per (proxy, host) pair with latency stats.
- **Exponential-backoff cooldown** — consecutively-failing (proxy, host) pairs back off automatically.
- **Delayed retry queue** — temporarily unrunnable tasks are parked until their next eligible time instead of hot-loop requeueing.
- **Proxy eviction** — proxies with 0% global success rate after sufficient samples are marked dead.
- **`socks5h://` by default** — DNS resolution happens on the proxy side, preventing local DNS leaks.
- **State persistence** — JSON snapshots with atomic writes enable hot restarts with no warm-up penalty.
- **Automatic source refresh** — proxy lists are periodically re-fetched from configured URLs.
- **Pluggable body classifier** — implement the `BodyClassifier` trait to decide whether a response is good, blocked, or errored.
- **Observability** — structured logging via `tracing` with periodic metrics summaries and real-time `PoolMetrics`.

## Quick Start

```rust
use scatter_proxy::{
    ScatterProxy, ScatterProxyConfig, RateLimitConfig,
    DefaultClassifier, ScatterResponse,
};
use std::collections::HashMap;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Sources are optional — scatter-proxy ships with default free proxy sources.
    // Just override the fields you care about:
    let config = ScatterProxyConfig {
        // sources: vec!["https://example.com/socks5.txt".into()],  // optional override
        rate_limit: RateLimitConfig {
            default_interval: Duration::from_millis(500),
            host_overrides: HashMap::from([
                ("slow.com".into(), Duration::from_secs(1)),
            ]),
        },
        max_concurrent_per_request: 3,
        state_file: Some("proxy_state.json".into()),
        prefer_remote_dns: true,
        ..Default::default()
    };

    // Build the proxy pool with the default response classifier.
    let pool = ScatterProxy::new(config, DefaultClassifier).await?;

    let client = reqwest::Client::new();

    // Submit a single request.  `submit` blocks if the pool is full, then
    // the returned handle blocks until a successful proxied response arrives.
    // The scheduler retries internally forever — use with_timeout for a deadline.
    let req = client.get("http://httpbin.org/ip").build()?;
    let response: ScatterResponse = pool.submit(req).await
        .with_timeout(Duration::from_secs(30)).await?
        .expect("request still pending after timeout");

    // ScatterResponse has: status (StatusCode), headers (HeaderMap), body (Bytes)
    println!("Status: {}", response.status);
    println!("Body:   {:?}", response.body);

    // Non-blocking submit: returns Err(PoolFull) immediately when the pool is full.
    let req2 = client.get("http://httpbin.org/ip").build()?;
    if let Ok(handle) = pool.try_submit(req2) {
        let _ = handle.with_timeout(Duration::from_secs(30)).await;
    }

    // Submit a batch of requests (blocks until capacity, sequentially).
    let reqs = vec![
        client.get("http://httpbin.org/ip").build()?,
        client.get("http://httpbin.org/headers").build()?,
        client.get("http://httpbin.org/user-agent").build()?,
    ];
    let handles = pool.submit_batch(reqs).await;
    for h in handles {
        if let Some(resp) = h.with_timeout(Duration::from_secs(30)).await? {
            println!("  → {} ({} bytes)", resp.status, resp.body.len());
        }
    }

    // Check real-time metrics.
    let m = pool.metrics();
    println!(
        "Proxies: {} healthy / {} dead | Success rate: {:.0}%",
        m.healthy_proxies, m.dead_proxies, m.success_rate_1m * 100.0,
    );

    // Graceful shutdown (persists state to disk).
    pool.shutdown().await;

    Ok(())
}
```

## Per-Host Pool Isolation

When crawling multiple target sites, use `ScatterProxyRouter` to give each host its own independent proxy pool.  A host where all proxies are in cooldown or getting blocked cannot affect throughput for other hosts.

```rust
use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Each host gets its own independent config — tune concurrency, sources, and
    // timeouts per target.  config.name is auto-set from the hostname if not provided.
    let router = ScatterProxyRouter::new(
        [
            ("szse.cn",       ScatterProxyConfig { max_inflight: 20, ..Default::default() }),
            ("sse.com.cn",    ScatterProxyConfig { max_inflight: 10, ..Default::default() }),
            ("cninfo.com.cn", ScatterProxyConfig::default()),
        ],
        DefaultClassifier,
    ).await?;

    let client = reqwest::Client::new();

    // Requests are routed to the matching host pool automatically.
    let req = client.get("http://szse.cn/api/data").build()?;
    let handle = router.submit(req).await?;
    // handle.with_timeout(…) as usual

    // Per-host metrics — see exactly which host is struggling.
    for (host, m) in router.all_metrics() {
        println!(
            "[{host}] success={:.0}% proxies={} pending={}",
            m.success_rate_1m * 100.0,
            m.healthy_proxies,
            m.pending_tasks,
        );
    }

    router.shutdown().await;
    Ok(())
}
```

Each host's metrics log lines are automatically prefixed with `[hostname]` so multi-host
output is easy to grep:

```
INFO [szse.cn] throughput=2.1/s | success=18% | pool: 4016 healthy …
INFO [sse.com.cn] throughput=8.4/s | success=71% | pool: 4016 healthy …
```

## Custom Classifier

Implement `BodyClassifier` to control how responses are categorised:

```rust
use scatter_proxy::{BodyClassifier, BodyVerdict, StatusCode, HeaderMap};

struct MyClassifier;

impl BodyClassifier for MyClassifier {
    fn classify(&self, status: StatusCode, headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
        if status.is_success() && !body.is_empty() {
            BodyVerdict::Success
        } else if status.is_server_error() {
            BodyVerdict::TargetError
        } else {
            BodyVerdict::ProxyBlocked
        }
    }
}

// Then pass it when constructing the pool:
// let pool = ScatterProxy::new(config, MyClassifier).await?;
```

## Configuration

All fields on `ScatterProxyConfig` with their types and defaults:

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `sources` | `Vec<String>` | `[]` (uses built-in free proxy lists) | URLs of proxy source lists (line-delimited `ip:port`). When empty, defaults to `DEFAULT_PROXY_SOURCES` — a curated set of free SOCKS5 lists. |
| `source_refresh_interval` | `Duration` | `600s` | How often to re-fetch proxy sources |
| `rate_limit` | `RateLimitConfig` | *(see below)* | Per-(proxy, host) rate-limiting settings |
| `proxy_timeout` | `Duration` | `8s` | Timeout for a single proxy connection attempt |
| `max_concurrent_per_request` | `usize` | `3` | Base number of proxy paths raced per request (*K*); boosted automatically during cold start |
| `max_inflight` | `usize` | `100` | Global in-flight concurrency limit |
| `task_pool_capacity` | `usize` | `1000` | Maximum number of pending tasks in the pool |
| `health_window` | `usize` | `30` | Sliding window size for health tracking |
| `cooldown_base` | `Duration` | `30s` | Base cooldown after consecutive failures |
| `cooldown_max` | `Duration` | `300s` | Maximum cooldown duration |
| `cooldown_consecutive_fails` | `usize` | `3` | Consecutive failures before entering cooldown |
| `eviction_min_samples` | `usize` | `30` | Minimum samples before a proxy can be evicted |
| `state_file` | `Option<PathBuf>` | `None` | File path for JSON state persistence |
| `state_save_interval` | `Duration` | `300s` | How often to persist state to disk |
| `metrics_log_interval` | `Duration` | `30s` | How often to log the metrics summary line |
| `prefer_remote_dns` | `bool` | `true` | Use `socks5h://` for remote DNS resolution |
| `name` | `Option<String>` | `None` | Label prepended to metrics log lines as `[name]`; set automatically by `ScatterProxyRouter` |

**`RateLimitConfig` fields:**

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `default_interval` | `Duration` | `500ms` | Minimum interval between requests per (proxy, host) pair |
| `host_overrides` | `HashMap<String, Duration>` | `{}` | Per-host interval overrides |

## Architecture

```text
                  ┌──────────────┐
                  │  Client Code │
                  └──────┬───────┘
                         │ submit(request)
                  ┌──────────────┐
                  │  ScatterProxy│
                  │  (TaskPool)  │
                  └──────┬───────┘
                         │ pick K healthy proxies
               ┌─────────────────────┐
               │     Scheduler       │
               │  ┌───────────────┐  │
               │  │ Rate Limiters │  │
               │  │ Health Scores │  │
               │  │ Ready Queue   │  │
               │  │ Delayed Heap  │  │
               │  │ Adaptive K    │  │
               │  └───────────────┘  │
               └─────────┬──────────-┘
                         │ fan-out K paths
            ┌────────────┼────────────┐
            ▼            ▼            ▼
      ┌──────────┐ ┌──────────┐ ┌──────────┐
      │ Proxy #1 │ │ Proxy #2 │ │ Proxy #3 │
      │ socks5h  │ │ socks5h  │ │ socks5h  │
      └────┬─────┘ └────┬─────┘ └────┬─────┘
           │             │             │
           └─────────────┼─────────────┘
                         │ first good response wins
               ┌───────────────────┐
               │ BodyClassifier    │
               │ → Success         │
               │ → ProxyBlocked    │
               │ → TargetError     │
               └────────┬──────────┘
               ┌───────────────────┐
               │ ScatterResponse   │
               │ {status, headers, │
               │  body}            │
               └───────────────────┘

  Background tasks:
    • Scheduler workers — drain ready queue, promote delayed tasks, dispatch fan-out attempts
    • State persistence — periodic JSON snapshots (atomic writes)
    • Metrics logger — periodic tracing summary lines
    • Source refresh — re-fetches proxy lists on interval
```

## Integration Testing

Unit tests run without any external dependencies:

```sh
cargo test
```

Integration tests exercise the full pipeline with real free SOCKS5 proxy sources from the internet. They are marked `#[ignore]` by default. To run them:

```sh
SCATTER_INTEGRATION=1 cargo test -- --ignored
```

> **Note:** Integration tests require network access and depend on third-party proxy sources being available. They may be flaky in CI due to proxy churn.


## Migration Notes

### 0.8.0

- **`ScatterProxyRouter::new()`** signature changed. The second argument (shared `ScatterProxyConfig`) is gone; instead pass `(host, ScatterProxyConfig)` tuples so each host is configured independently. See the example above.
- `direct://localhost` support has been fully removed from `ProxyManager`. All connections go through SOCKS5 proxies.

### 0.5.0

- The circuit breaker subsystem has been removed. Temporary unrunnable tasks are now handled by the delayed retry queue.
- `PoolMetrics` no longer exposes circuit-breaker state. New counters include delayed tasks, requeues, zero-available events, dispatch count, and skip reasons.

## License

Licensed under either of

- Apache License, Version 2.0 ([LICENSE-APACHE]LICENSE-APACHE or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT License ([LICENSE-MIT]LICENSE-MIT or <http://opensource.org/licenses/MIT>)

at your option.