# scatter-proxy
[](https://github.com/letllmrun/scatter-proxy/actions/workflows/ci.yml)
[](https://crates.io/crates/scatter-proxy)
[](https://docs.rs/scatter-proxy)
[](LICENSE)
**Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput.**
## Features
- **Multi-path race** — fan out each request across *K* proxies simultaneously; first successful response wins, losers are cancelled.
- **Adaptive fan-out** — *K* auto-adjusts based on current success rate and the number of healthy proxies.
- **Per-(proxy, host) rate limiting** — configurable minimum intervals with per-host overrides prevent tripping upstream abuse detection.
- **Health tracking** — sliding-window success/failure counters per (proxy, host) pair with latency stats.
- **Exponential-backoff cooldown** — consecutively-failing (proxy, host) pairs back off automatically.
- **Per-host circuit breakers** — Open → HalfOpen → Closed state machine prevents thundering-herd retries.
- **Proxy eviction** — proxies with 0% global success rate after sufficient samples are marked dead.
- **`socks5h://` by default** — DNS resolution happens on the proxy side, preventing local DNS leaks.
- **State persistence** — JSON snapshots with atomic writes enable hot restarts with no warm-up penalty.
- **Automatic source refresh** — proxy lists are periodically re-fetched from configured URLs.
- **Pluggable body classifier** — implement the `BodyClassifier` trait to decide whether a response is good, blocked, or errored.
- **Observability** — structured logging via `tracing` with periodic metrics summaries and real-time `PoolMetrics`.
## Quick Start
```rust
use scatter_proxy::{
ScatterProxy, ScatterProxyConfig, RateLimitConfig,
DefaultClassifier, TaskHandle, ScatterResponse,
};
use std::collections::HashMap;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sources are optional — scatter-proxy ships with default free proxy sources.
// Just override the fields you care about:
let config = ScatterProxyConfig {
// sources: vec!["https://example.com/socks5.txt".into()], // optional override
rate_limit: RateLimitConfig {
default_interval: Duration::from_millis(500),
host_overrides: HashMap::from([
("slow.com".into(), Duration::from_secs(1)),
]),
},
max_concurrent_per_request: 3,
state_file: Some("proxy_state.json".into()),
prefer_remote_dns: true,
..Default::default()
};
// Build the proxy pool with the default response classifier.
let pool = ScatterProxy::new(config, DefaultClassifier).await?;
// Submit a single request.
let client = reqwest::Client::new();
let req = client.get("http://httpbin.org/ip").build()?;
let handle: TaskHandle = pool.submit(req)?;
let response: ScatterResponse = handle.await?;
// ScatterResponse has: status (StatusCode), headers (HeaderMap), body (Bytes)
println!("Status: {}", response.status);
println!("Body: {:?}", response.body);
// Submit a batch of requests.
let reqs = vec![
client.get("http://httpbin.org/ip").build()?,
client.get("http://httpbin.org/headers").build()?,
client.get("http://httpbin.org/user-agent").build()?,
];
let handles = pool.submit_batch(reqs)?;
for h in handles {
let resp = h.await?;
println!(" → {} ({} bytes)", resp.status, resp.body.len());
}
// Check real-time metrics.
let m = pool.metrics();
println!(
"Proxies: {} healthy / {} dead | Success rate: {:.0}%",
m.healthy_proxies, m.dead_proxies, m.success_rate_1m * 100.0,
);
// Graceful shutdown (persists state to disk).
pool.shutdown().await;
Ok(())
}
```
## Custom Classifier
Implement `BodyClassifier` to control how responses are categorised:
```rust
use scatter_proxy::{BodyClassifier, BodyVerdict, StatusCode, HeaderMap};
struct MyClassifier;
impl BodyClassifier for MyClassifier {
fn classify(&self, status: StatusCode, headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
if status.is_success() && !body.is_empty() {
BodyVerdict::Success
} else if status.is_server_error() {
BodyVerdict::TargetError
} else {
BodyVerdict::ProxyBlocked
}
}
}
// Then pass it when constructing the pool:
// let pool = ScatterProxy::new(config, MyClassifier).await?;
```
## Configuration
All fields on `ScatterProxyConfig` with their types and defaults:
| `sources` | `Vec<String>` | `[]` (uses built-in free proxy lists) | URLs of proxy source lists (line-delimited `ip:port`). When empty, defaults to `DEFAULT_PROXY_SOURCES` — a curated set of free SOCKS5 lists. |
| `source_refresh_interval` | `Duration` | `600s` | How often to re-fetch proxy sources |
| `rate_limit` | `RateLimitConfig` | *(see below)* | Per-(proxy, host) rate-limiting settings |
| `proxy_timeout` | `Duration` | `8s` | Timeout for a single proxy connection attempt |
| `task_timeout` | `Duration` | `60s` | Overall timeout per task from submission to final failure |
| `max_concurrent_per_request` | `usize` | `3` | Number of proxy paths raced per request (*K*) |
| `max_inflight` | `usize` | `100` | Global in-flight concurrency limit |
| `max_attempts` | `usize` | `5` | Maximum scheduling attempts per task |
| `task_pool_capacity` | `usize` | `1000` | Maximum number of pending tasks in the pool |
| `health_window` | `usize` | `30` | Sliding window size for health tracking |
| `cooldown_base` | `Duration` | `30s` | Base cooldown after consecutive failures |
| `cooldown_max` | `Duration` | `300s` | Maximum cooldown duration |
| `cooldown_consecutive_fails` | `usize` | `3` | Consecutive failures before entering cooldown |
| `eviction_min_samples` | `usize` | `30` | Minimum samples before a proxy can be evicted |
| `circuit_breaker_threshold` | `usize` | `10` | Target errors before tripping the host circuit breaker |
| `circuit_breaker_probe_interval` | `Duration` | `30s` | Interval between probe requests when breaker is open |
| `state_file` | `Option<PathBuf>` | `None` | File path for JSON state persistence |
| `state_save_interval` | `Duration` | `300s` | How often to persist state to disk |
| `metrics_log_interval` | `Duration` | `30s` | How often to log the metrics summary line |
| `prefer_remote_dns` | `bool` | `true` | Use `socks5h://` for remote DNS resolution |
**`RateLimitConfig` fields:**
| `default_interval` | `Duration` | `500ms` | Minimum interval between requests per (proxy, host) pair |
| `host_overrides` | `HashMap<String, Duration>` | `{}` | Per-host interval overrides |
## Architecture
```text
┌──────────────┐
│ Client Code │
└──────┬───────┘
│ submit(request)
▼
┌──────────────┐
│ ScatterProxy│
│ (TaskPool) │
└──────┬───────┘
│ pick K healthy proxies
▼
┌─────────────────────┐
│ Scheduler │
│ ┌───────────────┐ │
│ │ Rate Limiters │ │
│ │ Health Scores │ │
│ │ Circuit Brkrs │ │
│ │ Adaptive K │ │
│ └───────────────┘ │
└─────────┬──────────-┘
│ fan-out K paths
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Proxy #1 │ │ Proxy #2 │ │ Proxy #3 │
│ socks5h │ │ socks5h │ │ socks5h │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└─────────────┼─────────────┘
│ first good response wins
▼
┌───────────────────┐
│ BodyClassifier │
│ → Success │
│ → ProxyBlocked │
│ → TargetError │
└────────┬──────────┘
│
▼
┌───────────────────┐
│ ScatterResponse │
│ {status, headers, │
│ body} │
└───────────────────┘
Background tasks:
• Scheduler loop — polls TaskPool, dispatches fan-out attempts
• State persistence — periodic JSON snapshots (atomic writes)
• Metrics logger — periodic tracing summary lines
• Source refresh — re-fetches proxy lists on interval
```
## Integration Testing
Unit tests run without any external dependencies:
```sh
cargo test
```
Integration tests exercise the full pipeline with real free SOCKS5 proxy sources from the internet. They are marked `#[ignore]` by default. To run them:
```sh
SCATTER_INTEGRATION=1 cargo test -- --ignored
```
> **Note:** Integration tests require network access and depend on third-party proxy sources being available. They may be flaky in CI due to proxy churn.
## License
Licensed under either of
- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT License ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
at your option.