# scatter-proxy
> 为免费 / 低质量 SOCKS5 代理设计的异步请求调度库。
> 屏蔽代理不稳定性,**多快好省**地完成批量 HTTP 请求任务。
## 0 一句话定义
**给定 M 个待完成任务和 N 个不稳定代理,以"单代理对单 host 限频"为约束,通过多路 Race 并发最大化任务完成数。**
---
## 1 核心约束
| P0 | **稳** | 零代理可用时立即熔断,让节点有时间恢复,绝不雪崩 |
| P1 | **多** | 有代理可用时,多路 Race 并发,压榨每个代理完成尽可能多的任务 |
| P2 | **快** | 多路 Race 本身就是加速——首个成功即返回;同时加速健康标记,让优质节点尽快露出 |
| P3 | **省** | 在限频约束内最大化吞吐,不让代理空闲也不让代理被封 |
---
## 2 系统模型
```
Per-(proxy, host) 限频
┌─────────────────────┐
Task Pool (未完成任务) │ proxy-A → sse: 500ms│
┌────────────────────┐ Scheduler │ proxy-A → szse:500ms│ Target
│ GET sse/600519 │───pick task────► │ proxy-B → sse: 500ms│───► Host
│ GET sse/601166 │ │ ... │
│ GET szse/000001 │◄──fail? 放回── └─────────────────────┘
│ ... │
└────────────────────┘
▲ 新任务入池 同一任务 Race N 路
│ ┌──proxy-A──┐
submit() ├──proxy-B──┼──► 首个 Success wins
└──proxy-D──┘ 其余 abort
```
三个核心组件:
| **Task Pool** | 待完成任务队列。任务提交后入池;某路成功则移出;全部失败则放回池尾等待下次调度 |
| **Scheduler** | 从池中取任务,为其选 K 个代理并发 Race。选代理依据:(proxy, host) 健康分 + 限频可用性 |
| **Health Tracker** | 实时记录每个 (proxy, host) 的成功率、延迟、最近访问时间,驱动调度决策 |
### 2.1 关键设计决策
**限频模型:per-(proxy, host)**
```
proxy-A → sse.com.cn : 每 500ms 最多 1 次
proxy-A → szse.cn : 每 500ms 最多 1 次(独立计时)
proxy-B → sse.com.cn : 每 500ms 最多 1 次(独立计时)
```
同一个代理访问不同 host 互不影响;不同代理访问同一个 host 互不影响。
这是"反爬"的正确粒度——目标站看到的是"某个 IP 的访问频率"。
**并发策略:默认多路 Race**
不高估单节点成功率。每个任务默认派 K 路代理同时跑(K 可配,默认 3)。
多路 Race 的附带收益:**加速节点健康标记**——一次 Race 产生 K 条观测数据,
优质节点能在更少的轮次内积累足够的成功记录,从而更快地被优先选用。
**无 Retry 概念,只有"放回池"**
任务失败不在内部重试。失败的任务放回 Task Pool 尾部,
下次被调度器取出时自然选择不同的(更健康的)代理。
这让调度逻辑保持简单且统一——每次调度都是无差别的"从池中取任务、选代理、Race"。
---
## 3 功能需求
### 3.1 任务池(Task Pool)
| TP-1 | 调用方通过 `submit(request) -> TaskHandle` 将请求放入池中 |
| TP-2 | `TaskHandle` 是一个 `Future`,`.await` 阻塞直到该任务成功或最终放弃 |
| TP-3 | 任务携带最大尝试次数(默认 5),每次被调度算一次。耗尽后返回 `Err(MaxAttemptsExhausted)` |
| TP-4 | 任务携带整体超时(默认 60s),从 submit 算起。超时后无论剩余尝试次数,返回 `Err(Timeout)` |
| TP-5 | 池有容量上限(默认 1000),超出时 `submit()` 返回 `Err(PoolFull)` |
| TP-6 | 失败的任务放回池**尾部**,让其他任务先执行,避免热点任务反复空耗 |
| TP-7 | 支持批量提交 `submit_batch(requests) -> Vec<TaskHandle>` |
### 3.2 Per-(Proxy, Host) 限频
| RL-1 | 为每个 `(proxy, host)` 组合独立维护最近请求时刻 |
| RL-2 | 默认间隔 500ms,可按 host 覆盖(如某些 host 需要更保守的 1000ms) |
| RL-3 | 调度器选代理时,**跳过**限频未冷却的 `(proxy, host)` 组合,而非阻塞等待 |
| RL-4 | 如果所有代理对目标 host 都在限频期内,该任务暂不调度,留在池中等下一轮 |
```rust
pub struct RateLimitConfig {
pub default_interval: Duration, // default 500ms
pub host_overrides: HashMap<String, Duration>,
}
```
### 3.3 代理健康追踪(Health Tracker)
| HT-1 | 维护二维表 `(proxy, host) → Stats`,每次 Race 的每一路完成后立即更新 |
| HT-2 | `Stats` 包含:成功次数、失败次数、**成功率**(滑动窗口,默认最近 30 次)、平均延迟 ms、**最近一次访问时间**、连续失败次数 |
| HT-3 | 额外维护 proxy 维度的全局健康分(跨所有 host 汇总) |
| HT-4 | 多路 Race 产生多条观测记录 → 加速健康数据收敛 → 优质节点更快露出 |
| HT-5 | 健康数据序列化到磁盘(JSON),进程重启热启动 |
| HT-6 | 后台定时持久化(默认 5 分钟) |
```rust
pub struct ProxyHostStats {
pub success: u32,
pub fail: u32,
pub success_rate: f64, // 滑动窗口内
pub avg_latency_ms: f64,
pub last_access: Option<Instant>,
pub last_success: Option<Instant>,
pub consecutive_fails: u32,
}
```
### 3.4 调度器(Scheduler)
| SC-1 | 循环运行:从 Task Pool 取出任务 → 选代理 → 发起 Race → 处理结果 |
| SC-2 | **选谁发**:对目标 host,在限频可用的代理中,按 `score = affinity(proxy, host) × 0.7 + global_health(proxy) × 0.3` 降序,取 Top-K |
| SC-3 | **发几路**:默认 K = `max_concurrent_per_request`(默认 3),不根据成功率缩减。多路 Race 是常态,不是降级手段 |
| SC-4 | **Race 语义**:K 路并发,首个 `Success` 立即返回给 `TaskHandle`,abort 其余 in-flight 请求 |
| SC-5 | **全部失败**:K 路全部返回 ProxyBlocked / 超时 → 任务放回池尾(TP-6),不在本轮重试 |
| SC-6 | **TargetError**:任一路返回 TargetError → 不计入代理健康分,累计触发 host 熔断 |
| SC-7 | **可用代理不足 K 个**:有几个发几个(哪怕只有 1 个),0 个则触发熔断 |
| SC-8 | **全局并发上限**:所有 in-flight 代理连接总数上限(默认 100),使用 Semaphore 控制 |
| SC-9 | 调度循环的频率由任务池和代理可用性自然驱动——有任务且有可用代理时立即调度,否则短暂 sleep(如 50ms) |
### 3.5 熔断保护
> **稳是第一位。零代理可用时,熔断让节点有机会恢复。**
| CB-1 | **零可用代理**(对某 host,所有代理都在冷却/dead/限频中)| 对该 host 触发熔断。该 host 的新任务立即返回 `Err(CircuitOpen)` |
| CB-2 | **目标站故障**(最近 N 次请求跨所有代理全部 `TargetError`)| 对该 host 触发熔断,不怪代理 |
| CB-3 | **熔断恢复**:每隔 `probe_interval`(默认 30s)放行 1 个任务试探。成功 → 关闭熔断;失败 → 重新计时 |
| CB-4 | 熔断期间,该 host 的待处理任务保留在池中不被调度,直到熔断关闭 |
### 3.6 代理生命周期
| PL-1 | **冷却**:某代理对某 host 连续失败 ≥ K 次(默认 3)→ 该 (proxy, host) 进入冷却。指数退避:30s → 60s → 120s → 300s cap |
| PL-2 | **冷却粒度是 (proxy, host)**:proxy-A 对 SSE 冷却,不影响 proxy-A 对 SZSE 的调度 |
| PL-3 | **半开探测**:冷却到期后,该 (proxy, host) 恢复可选状态,下次被选中即为试探 |
| PL-4 | **淘汰**:proxy 全局成功率 = 0% 且样本数 ≥ 30 → 移入 dead 池,仅在代理源刷新时可复活 |
### 3.7 响应校验(Body Classifier)
| BC-1 | 调用方实现 `BodyClassifier` trait,对每个响应做三级判定 |
| BC-2 | `Success`:任务完成,更新代理健康分(成功),返回响应 |
| BC-3 | `ProxyBlocked`:更新代理健康分(失败),该路视为失败 |
| BC-4 | `TargetError`:不计入代理健康分,累计触发 host 熔断 |
| BC-5 | 内置默认分类器:2xx + body 非空 = Success;403/429 = ProxyBlocked;5xx = TargetError |
```rust
pub enum BodyVerdict {
Success,
ProxyBlocked,
TargetError,
}
pub trait BodyClassifier: Send + Sync + 'static {
fn classify(
&self,
status: StatusCode,
headers: &HeaderMap,
body: &[u8],
) -> BodyVerdict;
}
```
### 3.8 代理源管理
| SM-1 | 支持多个 URL 源(逐行 `ip:port` 或 `socks5://ip:port`),自动去重 |
| SM-2 | 后台定时刷新(默认 10 分钟),增量合入新代理 |
| SM-3 | 新代理入池时状态为"未知",首次被选中即为探测 |
| SM-4 | 持久化:代理列表 + 健康数据 → JSON 文件,重启热加载 |
### 3.9 系统可观测性
| LOG-1 | **周期性状态摘要**(默认 30s,`info!`):<br>`throughput=12.5/s | success=87% | pool: 156 healthy / 40 cooldown / 804 dead | tasks: 23 pending | breakers: (none)` |
| LOG-2 | **每个任务完成时**(`debug!`):<br>`task done | host=sse.com.cn | race=3 | winner=socks5://1.2.3.4:1080 | latency=1.2s | attempt=1/5` |
| LOG-3 | **任务放回池**(`debug!`):<br>`task requeued | host=sse.com.cn | reason=all 3 proxies failed | attempt=2/5` |
| LOG-4 | **熔断事件**(`warn!`):`circuit OPEN | host=www.szse.cn | reason=zero_available_proxies` |
| LOG-5 | **代理状态变更**(`debug!`):`proxy socks5://1.2.3.4:1080 | (sse.com.cn) healthy → cooldown | consecutive_fails=3` |
| LOG-6 | **Metrics 结构体**:调用方可随时获取快照 |
```rust
pub struct PoolMetrics {
// 代理池
pub total_proxies: usize,
pub healthy_proxies: usize,
pub cooldown_proxies: usize,
pub dead_proxies: usize,
// 任务池
pub pending_tasks: usize,
pub completed_tasks: u64,
pub failed_tasks: u64,
// 吞吐量(滑动窗口)
pub throughput_1s: f64,
pub throughput_10s: f64,
pub throughput_60s: f64,
// 质量
pub success_rate_1m: f64,
pub avg_latency_ms: f64,
// 资源
pub inflight: usize,
pub circuit_breakers: HashMap<String, bool>,
}
```
---
## 4 公共 API
```rust
// ── 配置 ──
pub struct ScatterProxyConfig {
// 代理源
pub sources: Vec<String>,
pub source_refresh_interval: Duration, // default 10min
// 限频:per-(proxy, host)
pub rate_limit: RateLimitConfig,
// 超时
pub proxy_timeout: Duration, // default 8s(单代理连接超时)
pub task_timeout: Duration, // default 60s(单任务从 submit 到放弃的总时限)
// 调度
pub max_concurrent_per_request: usize, // default 3(每任务 Race 路数)
pub max_inflight: usize, // default 100(全局 in-flight 并发上限)
pub max_attempts: usize, // default 5(单任务最大被调度次数)
// 任务池
pub task_pool_capacity: usize, // default 1000
// 健康管理
pub health_window: usize, // default 30
pub cooldown_base: Duration, // default 30s
pub cooldown_max: Duration, // default 300s
pub cooldown_consecutive_fails: usize, // default 3
pub eviction_min_samples: usize, // default 30
pub circuit_breaker_threshold: usize, // default 10
pub circuit_breaker_probe_interval: Duration, // default 30s
// 持久化
pub state_file: Option<PathBuf>,
pub state_save_interval: Duration, // default 5min
// 日志
pub metrics_log_interval: Duration, // default 30s
// 自定义 client
pub client_builder: Option<Box<dyn Fn() -> reqwest::ClientBuilder + Send + Sync>>,
}
pub struct RateLimitConfig {
pub default_interval: Duration, // default 500ms
pub host_overrides: HashMap<String, Duration>,
}
// ── 核心 ──
pub struct ScatterProxy { /* ... */ }
impl ScatterProxy {
/// 初始化:拉取代理源 → 加载持久化数据 → 启动后台调度循环
pub async fn new(
config: ScatterProxyConfig,
classifier: impl BodyClassifier,
) -> Result<Self, ScatterProxyError>;
/// 提交单个任务,返回 handle。await handle 获取结果。
pub fn submit(
&self,
request: reqwest::Request,
) -> Result<TaskHandle, ScatterProxyError>;
/// 批量提交
pub fn submit_batch(
&self,
requests: Vec<reqwest::Request>,
) -> Result<Vec<TaskHandle>, ScatterProxyError>;
/// 获取当前 metrics 快照
pub fn metrics(&self) -> PoolMetrics;
/// 优雅关闭:等待 in-flight 完成 → 持久化 → 停止后台任务
pub async fn shutdown(self);
}
/// 任务句柄,await 即阻塞等待该任务完成
pub struct TaskHandle { /* oneshot::Receiver */ }
impl Future for TaskHandle {
type Output = Result<reqwest::Response, ScatterProxyError>;
}
// ── 错误 ──
pub enum ScatterProxyError {
/// 该 host 熔断中
CircuitOpen { host: String },
/// 达到最大尝试次数,仍未成功
MaxAttemptsExhausted {
host: String,
attempts: usize,
last_error: String,
},
/// 任务总超时
Timeout { host: String, elapsed: Duration },
/// 任务池已满
PoolFull { capacity: usize },
/// 初始化失败
Init(String),
}
```
---
## 5 调度循环伪代码
```
scheduler_loop:
loop {
task = task_pool.pick_next() // 取出一个待处理任务(跳过熔断 host 的)
if task is None:
sleep(50ms)
continue
host = task.request.url.host
── 熔断检查 ──
if circuit_breaker[host].is_open():
if circuit_breaker[host].should_probe():
// 半开:只放行 1 路试探
candidates = [select_one_best(host)]
else:
task_pool.push_back(task) // 放回,等熔断恢复
continue
else:
── 选代理 ──
available = all proxies where:
- state != dead
- (proxy, host) not in cooldown
- (proxy, host) rate_limit elapsed >= interval
if available.is_empty():
circuit_breaker[host].trip("zero_available_proxies")
task_pool.push_back(task)
continue
// 按 score 排序,取 Top-K
available.sort_by(|p| score(p, host))
K = min(max_concurrent_per_request, available.len())
candidates = available[..K]
── 并发 Race ──
global_semaphore.acquire(candidates.len())
results = race(candidates.map(|proxy| {
// 标记 (proxy, host) 限频时刻
rate_limiter.mark(proxy, host)
send_via_proxy(task.request.clone(), proxy, proxy_timeout)
}))
── 处理结果 ──
match results {
FirstSuccess(proxy, response):
health_tracker.record(proxy, host, success, latency)
// 其他路的失败结果也记录
for (p, fail) in other_results:
health_tracker.record(p, host, fail, latency)
task.complete(Ok(response))
log_debug("task done | winner={proxy} | ...")
AllFailed(failures):
for (proxy, verdict, latency) in failures:
if verdict == TargetError:
circuit_breaker[host].record_target_error()
else:
health_tracker.record(proxy, host, fail, latency)
task.attempts += 1
if task.attempts >= max_attempts or task.elapsed() >= task_timeout:
task.complete(Err(MaxAttemptsExhausted | Timeout))
log_debug("task abandoned | ...")
else:
task_pool.push_back(task) // 放回池尾
log_debug("task requeued | attempt={}/{}| ...", ...)
}
}
```
---
## 6 评分算法
```
score(proxy, host) -> f64:
affinity = (proxy, host) 的成功率 // 无数据时默认 0.5
global = proxy 全局成功率 // 无数据时默认 0.5
recency = 1.0 / (1.0 + minutes_since_last_success(proxy, host))
score = affinity × 0.5 + global × 0.3 + recency × 0.2
```
- **affinity** 权重最高:该代理对该 host 的历史表现最重要
- **global** 补充:新 host 无亲和数据时,靠全局分兜底
- **recency** 奖励:最近成功过的代理得分更高,避免"老数据"遮蔽当前状态
---
## 7 状态机
### 7.1 代理节点 × Host 生命周期
```
┌──────────────────────────┐
▼ │
[Unknown] ──首次被选中──► [Active] ◄──冷却到期──┐ │
│ │ │
连续失败 ≥ K │ │
│ │ │
▼ │ │
[Cooldown] │ │
│ │ │
冷却到期 ─────────────────┘ │
│ │
全局成功率=0% 代理源刷新
且样本≥30 (可复活)
│ │
▼ │
[Dead] ──────────────────┘
注意:Cooldown 粒度是 (proxy, host)。
proxy-A 对 SSE 冷却,不影响 proxy-A 对 SZSE 继续服务。
Dead 是 proxy 全局状态(全局成功率 = 0%)。
```
### 7.2 Host 熔断
```
[Closed] ──零可用代理 or 连续N次TargetError──► [Open]
▲ │
│ probe_interval
│ │
└───试探成功───── [HalfOpen] ◄───────────────┘
│
试探失败
│
▼
[Open](重新计时)
```
---
## 8 持久化格式
`proxy_state.json`:
```json
{
"version": 1,
"saved_at": "2025-07-12T08:30:00Z",
"proxies": {
"socks5://1.2.3.4:1080": {
"state": "active",
"hosts": {
"yunhq.sse.com.cn": {
"success": 60, "fail": 2,
"avg_latency_ms": 1500,
"consecutive_fails": 0
},
"www.szse.cn": {
"success": 22, "fail": 8,
"avg_latency_ms": 2400,
"consecutive_fails": 0
}
}
},
"socks5://5.6.7.8:9050": {
"state": "dead",
"hosts": {}
}
}
}
```
---
## 9 使用示例
```rust
use scatter_proxy::*;
use std::time::Duration;
use std::collections::HashMap;
struct AShareClassifier;
impl BodyClassifier for AShareClassifier {
fn classify(&self, status: StatusCode, _h: &HeaderMap, body: &[u8]) -> BodyVerdict {
let text = String::from_utf8_lossy(body);
match status.as_u16() {
200 if text.is_empty() => BodyVerdict::ProxyBlocked,
200 if text.contains("验证码") => BodyVerdict::ProxyBlocked,
200 => BodyVerdict::Success,
403 | 429 => BodyVerdict::ProxyBlocked,
500..=599 => BodyVerdict::TargetError,
_ => BodyVerdict::ProxyBlocked,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ScatterProxyConfig {
sources: vec![
"https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/socks5.txt".into(),
"https://cdn.jsdelivr.net/gh/monosans/proxy-list@main/proxies/socks5.txt".into(),
],
rate_limit: RateLimitConfig {
default_interval: Duration::from_millis(500),
host_overrides: HashMap::from([
("query.sse.com.cn".into(), Duration::from_millis(1000)),
]),
},
max_concurrent_per_request: 3,
state_file: Some("proxy_state.json".into()),
..Default::default()
};
let pool = ScatterProxy::new(config, AShareClassifier).await?;
// 批量提交任务
let client = reqwest::Client::new();
let symbols = ["600519", "601166", "000001"];
let handles: Vec<_> = symbols.iter().map(|sym| {
let req = client
.get(format!("https://yunhq.sse.com.cn:32042/v1/sh1/line/{sym}"))
.query(&[("callback", "cb")])
.build()
.unwrap();
pool.submit(req).unwrap()
}).collect();
// 并发等待所有任务完成
for (sym, handle) in symbols.iter().zip(handles) {
match handle.await {
Ok(resp) => println!("{sym}: {}", resp.status()),
Err(e) => println!("{sym}: {e}"),
}
}
let m = pool.metrics();
println!(
"throughput={:.1}/s | success={:.0}% | proxies={}/{} | pending={}",
m.throughput_10s,
m.success_rate_1m * 100.0,
m.healthy_proxies,
m.total_proxies,
m.pending_tasks,
);
pool.shutdown().await;
Ok(())
}
```
---
## 10 日志输出示例
```
# 启动
INFO [scatter-proxy] loaded 1204 proxies from 3 sources
INFO [scatter-proxy] restored health data for 186 proxies from proxy_state.json
# 运行中(每 30s)
# 任务级别
DEBUG [scatter-proxy] task abandoned | host=query.sse.com.cn | reason=max_attempts(5) | last_error=ProxyBlocked
# 熔断
# 代理状态
```
---
## 11 非功能需求
| 异步运行时 | tokio 1.x |
| Rust 版本 | ≥ 1.75 stable |
| 核心依赖 | `reqwest` (0.13, socks+query), `tokio`, `serde`/`serde_json`, `tracing`, `dashmap` |
| 可选 | `reqwest-middleware`(实现 Middleware trait) |
| 线程安全 | `ScatterProxy: Send + Sync`,`Arc` 共享 |
| 内存 | 20,000 代理 × 10 host 健康数据 < 50 MB |
| 测试 | 单元测试:评分算法、冷却状态机、限频器;集成测试:mock server 模拟不稳定代理 |