# scatter-proxy
[](https://ci.codeberg.org/letllmrun/scatter-proxy)
[](https://crates.io/crates/scatter-proxy)
[](https://docs.rs/scatter-proxy)
[](LICENSE)
**Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput.**
## Features
- **Per-host pool isolation** — [`ScatterProxyRouter`] assigns each target host its own proxy pool, health tracker, and scheduler with its own independent `ScatterProxyConfig`. A struggling host cannot starve tasks for healthy hosts, and proxy eviction is scoped per-host.
- **Multi-path race** — fan out each request across *K* proxies simultaneously; first successful response wins, losers are cancelled.
- **Adaptive fan-out** — *K* auto-adjusts based on current success rate and the number of healthy proxies; boosts automatically during cold start to collect data fast.
- **Never-fail scheduler** — the scheduler retries indefinitely; tasks are never dropped due to attempt limits or internal timeouts. Caller-side deadlines are opt-in via `handle.with_timeout()` or `tokio::time::timeout`.
- **Per-(proxy, host) rate limiting** — configurable minimum intervals with per-host overrides prevent tripping upstream abuse detection.
- **Health tracking** — sliding-window success/failure counters per (proxy, host) pair with latency stats.
- **Exponential-backoff cooldown** — consecutively-failing (proxy, host) pairs back off automatically.
- **Delayed retry queue** — temporarily unrunnable tasks are parked until their next eligible time instead of hot-loop requeueing.
- **Proxy eviction** — proxies with 0% global success rate after sufficient samples are marked dead.
- **`socks5h://` by default** — DNS resolution happens on the proxy side, preventing local DNS leaks.
- **State persistence** — JSON snapshots with atomic writes enable hot restarts with no warm-up penalty.
- **Automatic source refresh** — proxy lists are periodically re-fetched from configured URLs.
- **Pluggable body classifier** — implement the `BodyClassifier` trait to decide whether a response is good, blocked, or errored.
- **Observability** — structured logging via `tracing` with periodic metrics summaries and real-time `PoolMetrics`.
## Quick Start
```rust
use scatter_proxy::{
ScatterProxy, ScatterProxyConfig, RateLimitConfig,
DefaultClassifier, ScatterResponse,
};
use std::collections::HashMap;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sources are optional — scatter-proxy ships with default free proxy sources.
// Just override the fields you care about:
let config = ScatterProxyConfig {
// sources: vec!["https://example.com/socks5.txt".into()], // optional override
rate_limit: RateLimitConfig {
default_interval: Duration::from_millis(500),
host_overrides: HashMap::from([
("slow.com".into(), Duration::from_secs(1)),
]),
},
max_concurrent_per_request: 3,
state_file: Some("proxy_state.json".into()),
prefer_remote_dns: true,
..Default::default()
};
// Build the proxy pool with the default response classifier.
let pool = ScatterProxy::new(config, DefaultClassifier).await?;
let client = reqwest::Client::new();
// Submit a single request. `submit` blocks if the pool is full, then
// the returned handle blocks until a successful proxied response arrives.
// The scheduler retries internally forever — use with_timeout for a deadline.
let req = client.get("http://httpbin.org/ip").build()?;
let response: ScatterResponse = pool.submit(req).await
.with_timeout(Duration::from_secs(30)).await?
.expect("request still pending after timeout");
// ScatterResponse has: status (StatusCode), headers (HeaderMap), body (Bytes)
println!("Status: {}", response.status);
println!("Body: {:?}", response.body);
// Non-blocking submit: returns Err(PoolFull) immediately when the pool is full.
let req2 = client.get("http://httpbin.org/ip").build()?;
if let Ok(handle) = pool.try_submit(req2) {
let _ = handle.with_timeout(Duration::from_secs(30)).await;
}
// Submit a batch of requests (blocks until capacity, sequentially).
let reqs = vec![
client.get("http://httpbin.org/ip").build()?,
client.get("http://httpbin.org/headers").build()?,
client.get("http://httpbin.org/user-agent").build()?,
];
let handles = pool.submit_batch(reqs).await;
for h in handles {
if let Some(resp) = h.with_timeout(Duration::from_secs(30)).await? {
println!(" → {} ({} bytes)", resp.status, resp.body.len());
}
}
// Check real-time metrics.
let m = pool.metrics();
println!(
"Proxies: {} healthy / {} dead | Success rate: {:.0}%",
m.healthy_proxies, m.dead_proxies, m.success_rate_1m * 100.0,
);
// Graceful shutdown (persists state to disk).
pool.shutdown().await;
Ok(())
}
```
## Per-Host Pool Isolation
When crawling multiple target sites, use `ScatterProxyRouter` to give each host its own independent proxy pool. A host where all proxies are in cooldown or getting blocked cannot affect throughput for other hosts.
```rust
use scatter_proxy::{ScatterProxyRouter, ScatterProxyConfig, DefaultClassifier};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Each host gets its own independent config — tune concurrency, sources, and
// timeouts per target. config.name is auto-set from the hostname if not provided.
let router = ScatterProxyRouter::new(
[
("szse.cn", ScatterProxyConfig { max_inflight: 20, ..Default::default() }),
("sse.com.cn", ScatterProxyConfig { max_inflight: 10, ..Default::default() }),
("cninfo.com.cn", ScatterProxyConfig::default()),
],
DefaultClassifier,
).await?;
let client = reqwest::Client::new();
// Requests are routed to the matching host pool automatically.
let req = client.get("http://szse.cn/api/data").build()?;
let handle = router.submit(req).await?;
// handle.with_timeout(…) as usual
// Per-host metrics — see exactly which host is struggling.
for (host, m) in router.all_metrics() {
println!(
"[{host}] success={:.0}% proxies={} pending={}",
m.success_rate_1m * 100.0,
m.healthy_proxies,
m.pending_tasks,
);
}
router.shutdown().await;
Ok(())
}
```
Each host's metrics log lines are automatically prefixed with `[hostname]` so multi-host
output is easy to grep:
```
```
## Custom Classifier
Implement `BodyClassifier` to control how responses are categorised:
```rust
use scatter_proxy::{BodyClassifier, BodyVerdict, StatusCode, HeaderMap};
struct MyClassifier;
impl BodyClassifier for MyClassifier {
fn classify(&self, status: StatusCode, headers: &HeaderMap, body: &[u8]) -> BodyVerdict {
if status.is_success() && !body.is_empty() {
BodyVerdict::Success
} else if status.is_server_error() {
BodyVerdict::TargetError
} else {
BodyVerdict::ProxyBlocked
}
}
}
// Then pass it when constructing the pool:
// let pool = ScatterProxy::new(config, MyClassifier).await?;
```
## Configuration
All fields on `ScatterProxyConfig` with their types and defaults:
| `sources` | `Vec<String>` | `[]` (uses built-in free proxy lists) | URLs of proxy source lists (line-delimited `ip:port`). When empty, defaults to `DEFAULT_PROXY_SOURCES` — a curated set of free SOCKS5 lists. |
| `source_refresh_interval` | `Duration` | `600s` | How often to re-fetch proxy sources |
| `rate_limit` | `RateLimitConfig` | *(see below)* | Per-(proxy, host) rate-limiting settings |
| `proxy_timeout` | `Duration` | `8s` | Timeout for a single proxy connection attempt |
| `max_concurrent_per_request` | `usize` | `3` | Base number of proxy paths raced per request (*K*); boosted automatically during cold start |
| `max_inflight` | `usize` | `100` | Global in-flight concurrency limit |
| `task_pool_capacity` | `usize` | `1000` | Maximum number of pending tasks in the pool |
| `health_window` | `usize` | `30` | Sliding window size for health tracking |
| `cooldown_base` | `Duration` | `30s` | Base cooldown after consecutive failures |
| `cooldown_max` | `Duration` | `300s` | Maximum cooldown duration |
| `cooldown_consecutive_fails` | `usize` | `3` | Consecutive failures before entering cooldown |
| `eviction_min_samples` | `usize` | `30` | Minimum samples before a proxy can be evicted |
| `state_file` | `Option<PathBuf>` | `None` | File path for JSON state persistence |
| `state_save_interval` | `Duration` | `300s` | How often to persist state to disk |
| `metrics_log_interval` | `Duration` | `30s` | How often to log the metrics summary line |
| `prefer_remote_dns` | `bool` | `true` | Use `socks5h://` for remote DNS resolution |
| `name` | `Option<String>` | `None` | Label prepended to metrics log lines as `[name]`; set automatically by `ScatterProxyRouter` |
**`RateLimitConfig` fields:**
| `default_interval` | `Duration` | `500ms` | Minimum interval between requests per (proxy, host) pair |
| `host_overrides` | `HashMap<String, Duration>` | `{}` | Per-host interval overrides |
## Architecture
```text
┌──────────────┐
│ Client Code │
└──────┬───────┘
│ submit(request)
▼
┌──────────────┐
│ ScatterProxy│
│ (TaskPool) │
└──────┬───────┘
│ pick K healthy proxies
▼
┌─────────────────────┐
│ Scheduler │
│ ┌───────────────┐ │
│ │ Rate Limiters │ │
│ │ Health Scores │ │
│ │ Ready Queue │ │
│ │ Delayed Heap │ │
│ │ Adaptive K │ │
│ └───────────────┘ │
└─────────┬──────────-┘
│ fan-out K paths
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Proxy #1 │ │ Proxy #2 │ │ Proxy #3 │
│ socks5h │ │ socks5h │ │ socks5h │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└─────────────┼─────────────┘
│ first good response wins
▼
┌───────────────────┐
│ BodyClassifier │
│ → Success │
│ → ProxyBlocked │
│ → TargetError │
└────────┬──────────┘
│
▼
┌───────────────────┐
│ ScatterResponse │
│ {status, headers, │
│ body} │
└───────────────────┘
Background tasks:
• Scheduler workers — drain ready queue, promote delayed tasks, dispatch fan-out attempts
• State persistence — periodic JSON snapshots (atomic writes)
• Metrics logger — periodic tracing summary lines
• Source refresh — re-fetches proxy lists on interval
```
## Integration Testing
Unit tests run without any external dependencies:
```sh
cargo test
```
Integration tests exercise the full pipeline with real free SOCKS5 proxy sources from the internet. They are marked `#[ignore]` by default. To run them:
```sh
SCATTER_INTEGRATION=1 cargo test -- --ignored
```
> **Note:** Integration tests require network access and depend on third-party proxy sources being available. They may be flaky in CI due to proxy churn.
## Migration Notes
### 0.8.0
- **`ScatterProxyRouter::new()`** signature changed. The second argument (shared `ScatterProxyConfig`) is gone; instead pass `(host, ScatterProxyConfig)` tuples so each host is configured independently. See the example above.
- `direct://localhost` support has been fully removed from `ProxyManager`. All connections go through SOCKS5 proxies.
### 0.5.0
- The circuit breaker subsystem has been removed. Temporary unrunnable tasks are now handled by the delayed retry queue.
- `PoolMetrics` no longer exposes circuit-breaker state. New counters include delayed tasks, requeues, zero-available events, dispatch count, and skip reasons.
## License
Licensed under either of
- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT License ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
at your option.