scatter-proxy 0.2.0

Async request scheduler for unreliable SOCKS5 proxies — multi-path race for maximum throughput
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
# scatter-proxy

> 为免费 / 低质量 SOCKS5 代理设计的异步请求调度库。
> 屏蔽代理不稳定性,**多快好省**地完成批量 HTTP 请求任务。

## 0 一句话定义

**给定 M 个待完成任务和 N 个不稳定代理,以"单代理对单 host 限频"为约束,通过多路 Race 并发最大化任务完成数。**

---

## 1 核心约束

| 优先级 | 约束 | 说明 |
|--------|------|------|
| P0 | **** | 零代理可用时立即熔断,让节点有时间恢复,绝不雪崩 |
| P1 | **** | 有代理可用时,多路 Race 并发,压榨每个代理完成尽可能多的任务 |
| P2 | **** | 多路 Race 本身就是加速——首个成功即返回;同时加速健康标记,让优质节点尽快露出 |
| P3 | **** | 在限频约束内最大化吞吐,不让代理空闲也不让代理被封 |

---

## 2 系统模型

```
                                            Per-(proxy, host) 限频
                                           ┌─────────────────────┐
  Task Pool (未完成任务)                     │ proxy-A → sse: 500ms│
  ┌────────────────────┐    Scheduler       │ proxy-A → szse:500ms│    Target
  │ GET sse/600519     │───pick task────►   │ proxy-B → sse: 500ms│───► Host
  │ GET sse/601166     │                    │ ...                 │
  │ GET szse/000001    │◄──fail? 放回──     └─────────────────────┘
  │ ...                │
  └────────────────────┘
         ▲ 新任务入池                同一任务 Race N 路
         │                         ┌──proxy-A──┐
         submit()                  ├──proxy-B──┼──► 首个 Success wins
                                   └──proxy-D──┘    其余 abort
```

三个核心组件:

| 组件 | 职责 |
|------|------|
| **Task Pool** | 待完成任务队列。任务提交后入池;某路成功则移出;全部失败则放回池尾等待下次调度 |
| **Scheduler** | 从池中取任务,为其选 K 个代理并发 Race。选代理依据:(proxy, host) 健康分 + 限频可用性 |
| **Health Tracker** | 实时记录每个 (proxy, host) 的成功率、延迟、最近访问时间,驱动调度决策 |

### 2.1 关键设计决策

**限频模型:per-(proxy, host)**

```
proxy-A → sse.com.cn : 每 500ms 最多 1 次
proxy-A → szse.cn    : 每 500ms 最多 1 次(独立计时)
proxy-B → sse.com.cn : 每 500ms 最多 1 次(独立计时)
```

同一个代理访问不同 host 互不影响;不同代理访问同一个 host 互不影响。
这是"反爬"的正确粒度——目标站看到的是"某个 IP 的访问频率"。

**并发策略:默认多路 Race**

不高估单节点成功率。每个任务默认派 K 路代理同时跑(K 可配,默认 3)。
多路 Race 的附带收益:**加速节点健康标记**——一次 Race 产生 K 条观测数据,
优质节点能在更少的轮次内积累足够的成功记录,从而更快地被优先选用。

**无 Retry 概念,只有"放回池"**

任务失败不在内部重试。失败的任务放回 Task Pool 尾部,
下次被调度器取出时自然选择不同的(更健康的)代理。
这让调度逻辑保持简单且统一——每次调度都是无差别的"从池中取任务、选代理、Race"。

---

## 3 功能需求

### 3.1 任务池(Task Pool)

| 编号 | 需求 |
|------|------|
| TP-1 | 调用方通过 `submit(request) -> TaskHandle` 将请求放入池中 |
| TP-2 | `TaskHandle` 是一个 `Future``.await` 阻塞直到该任务成功或最终放弃 |
| TP-3 | 任务携带最大尝试次数(默认 5),每次被调度算一次。耗尽后返回 `Err(MaxAttemptsExhausted)` |
| TP-4 | 任务携带整体超时(默认 60s),从 submit 算起。超时后无论剩余尝试次数,返回 `Err(Timeout)` |
| TP-5 | 池有容量上限(默认 1000),超出时 `submit()` 返回 `Err(PoolFull)` |
| TP-6 | 失败的任务放回池**尾部**,让其他任务先执行,避免热点任务反复空耗 |
| TP-7 | 支持批量提交 `submit_batch(requests) -> Vec<TaskHandle>` |

### 3.2 Per-(Proxy, Host) 限频

| 编号 | 需求 |
|------|------|
| RL-1 | 为每个 `(proxy, host)` 组合独立维护最近请求时刻 |
| RL-2 | 默认间隔 500ms,可按 host 覆盖(如某些 host 需要更保守的 1000ms) |
| RL-3 | 调度器选代理时,**跳过**限频未冷却的 `(proxy, host)` 组合,而非阻塞等待 |
| RL-4 | 如果所有代理对目标 host 都在限频期内,该任务暂不调度,留在池中等下一轮 |

```rust
pub struct RateLimitConfig {
    pub default_interval: Duration,              // default 500ms
    pub host_overrides: HashMap<String, Duration>,
}
```

### 3.3 代理健康追踪(Health Tracker)

| 编号 | 需求 |
|------|------|
| HT-1 | 维护二维表 `(proxy, host) → Stats`,每次 Race 的每一路完成后立即更新 |
| HT-2 | `Stats` 包含:成功次数、失败次数、**成功率**(滑动窗口,默认最近 30 次)、平均延迟 ms、**最近一次访问时间**、连续失败次数 |
| HT-3 | 额外维护 proxy 维度的全局健康分(跨所有 host 汇总) |
| HT-4 | 多路 Race 产生多条观测记录 → 加速健康数据收敛 → 优质节点更快露出 |
| HT-5 | 健康数据序列化到磁盘(JSON),进程重启热启动 |
| HT-6 | 后台定时持久化(默认 5 分钟) |

```rust
pub struct ProxyHostStats {
    pub success: u32,
    pub fail: u32,
    pub success_rate: f64,             // 滑动窗口内
    pub avg_latency_ms: f64,
    pub last_access: Option<Instant>,
    pub last_success: Option<Instant>,
    pub consecutive_fails: u32,
}
```

### 3.4 调度器(Scheduler)

| 编号 | 需求 |
|------|------|
| SC-1 | 循环运行:从 Task Pool 取出任务 → 选代理 → 发起 Race → 处理结果 |
| SC-2 | **选谁发**:对目标 host,在限频可用的代理中,按 `score = affinity(proxy, host) × 0.7 + global_health(proxy) × 0.3` 降序,取 Top-K |
| SC-3 | **发几路**:默认 K = `max_concurrent_per_request`(默认 3),不根据成功率缩减。多路 Race 是常态,不是降级手段 |
| SC-4 | **Race 语义**:K 路并发,首个 `Success` 立即返回给 `TaskHandle`,abort 其余 in-flight 请求 |
| SC-5 | **全部失败**:K 路全部返回 ProxyBlocked / 超时 → 任务放回池尾(TP-6),不在本轮重试 |
| SC-6 | **TargetError**:任一路返回 TargetError → 不计入代理健康分,累计触发 host 熔断 |
| SC-7 | **可用代理不足 K 个**:有几个发几个(哪怕只有 1 个),0 个则触发熔断 |
| SC-8 | **全局并发上限**:所有 in-flight 代理连接总数上限(默认 100),使用 Semaphore 控制 |
| SC-9 | 调度循环的频率由任务池和代理可用性自然驱动——有任务且有可用代理时立即调度,否则短暂 sleep(如 50ms) |

### 3.5 熔断保护

> **稳是第一位。零代理可用时,熔断让节点有机会恢复。**

| 编号 | 场景 | 处理 |
|------|------|------|
| CB-1 | **零可用代理**(对某 host,所有代理都在冷却/dead/限频中)| 对该 host 触发熔断。该 host 的新任务立即返回 `Err(CircuitOpen)` |
| CB-2 | **目标站故障**(最近 N 次请求跨所有代理全部 `TargetError`| 对该 host 触发熔断,不怪代理 |
| CB-3 | **熔断恢复**:每隔 `probe_interval`(默认 30s)放行 1 个任务试探。成功 → 关闭熔断;失败 → 重新计时 |
| CB-4 | 熔断期间,该 host 的待处理任务保留在池中不被调度,直到熔断关闭 |

### 3.6 代理生命周期

| 编号 | 需求 |
|------|------|
| PL-1 | **冷却**:某代理对某 host 连续失败 ≥ K 次(默认 3)→ 该 (proxy, host) 进入冷却。指数退避:30s → 60s → 120s → 300s cap |
| PL-2 | **冷却粒度是 (proxy, host)**:proxy-A 对 SSE 冷却,不影响 proxy-A 对 SZSE 的调度 |
| PL-3 | **半开探测**:冷却到期后,该 (proxy, host) 恢复可选状态,下次被选中即为试探 |
| PL-4 | **淘汰**:proxy 全局成功率 = 0% 且样本数 ≥ 30 → 移入 dead 池,仅在代理源刷新时可复活 |

### 3.7 响应校验(Body Classifier)

| 编号 | 需求 |
|------|------|
| BC-1 | 调用方实现 `BodyClassifier` trait,对每个响应做三级判定 |
| BC-2 | `Success`:任务完成,更新代理健康分(成功),返回响应 |
| BC-3 | `ProxyBlocked`:更新代理健康分(失败),该路视为失败 |
| BC-4 | `TargetError`:不计入代理健康分,累计触发 host 熔断 |
| BC-5 | 内置默认分类器:2xx + body 非空 = Success;403/429 = ProxyBlocked;5xx = TargetError |

```rust
pub enum BodyVerdict {
    Success,
    ProxyBlocked,
    TargetError,
}

pub trait BodyClassifier: Send + Sync + 'static {
    fn classify(
        &self,
        status: StatusCode,
        headers: &HeaderMap,
        body: &[u8],
    ) -> BodyVerdict;
}
```

### 3.8 代理源管理

| 编号 | 需求 |
|------|------|
| SM-1 | 支持多个 URL 源(逐行 `ip:port``socks5://ip:port`),自动去重 |
| SM-2 | 后台定时刷新(默认 10 分钟),增量合入新代理 |
| SM-3 | 新代理入池时状态为"未知",首次被选中即为探测 |
| SM-4 | 持久化:代理列表 + 健康数据 → JSON 文件,重启热加载 |

### 3.9 系统可观测性

| 编号 | 需求 |
|------|------|
| LOG-1 | **周期性状态摘要**(默认 30s,`info!`):<br>`throughput=12.5/s | success=87% | pool: 156 healthy / 40 cooldown / 804 dead | tasks: 23 pending | breakers: (none)` |
| LOG-2 | **每个任务完成时**`debug!`):<br>`task done | host=sse.com.cn | race=3 | winner=socks5://1.2.3.4:1080 | latency=1.2s | attempt=1/5` |
| LOG-3 | **任务放回池**`debug!`):<br>`task requeued | host=sse.com.cn | reason=all 3 proxies failed | attempt=2/5` |
| LOG-4 | **熔断事件**`warn!`):`circuit OPEN | host=www.szse.cn | reason=zero_available_proxies` |
| LOG-5 | **代理状态变更**`debug!`):`proxy socks5://1.2.3.4:1080 | (sse.com.cn) healthy → cooldown | consecutive_fails=3` |
| LOG-6 | **Metrics 结构体**:调用方可随时获取快照 |

```rust
pub struct PoolMetrics {
    // 代理池
    pub total_proxies: usize,
    pub healthy_proxies: usize,
    pub cooldown_proxies: usize,
    pub dead_proxies: usize,

    // 任务池
    pub pending_tasks: usize,
    pub completed_tasks: u64,
    pub failed_tasks: u64,

    // 吞吐量(滑动窗口)
    pub throughput_1s: f64,
    pub throughput_10s: f64,
    pub throughput_60s: f64,

    // 质量
    pub success_rate_1m: f64,
    pub avg_latency_ms: f64,

    // 资源
    pub inflight: usize,
    pub circuit_breakers: HashMap<String, bool>,
}
```

---

## 4 公共 API

```rust
// ── 配置 ──

pub struct ScatterProxyConfig {
    // 代理源
    pub sources: Vec<String>,
    pub source_refresh_interval: Duration,       // default 10min

    // 限频:per-(proxy, host)
    pub rate_limit: RateLimitConfig,

    // 超时
    pub proxy_timeout: Duration,                 // default 8s(单代理连接超时)
    pub task_timeout: Duration,                  // default 60s(单任务从 submit 到放弃的总时限)

    // 调度
    pub max_concurrent_per_request: usize,       // default 3(每任务 Race 路数)
    pub max_inflight: usize,                     // default 100(全局 in-flight 并发上限)
    pub max_attempts: usize,                     // default 5(单任务最大被调度次数)

    // 任务池
    pub task_pool_capacity: usize,               // default 1000

    // 健康管理
    pub health_window: usize,                    // default 30
    pub cooldown_base: Duration,                 // default 30s
    pub cooldown_max: Duration,                  // default 300s
    pub cooldown_consecutive_fails: usize,       // default 3
    pub eviction_min_samples: usize,             // default 30
    pub circuit_breaker_threshold: usize,        // default 10
    pub circuit_breaker_probe_interval: Duration, // default 30s

    // 持久化
    pub state_file: Option<PathBuf>,
    pub state_save_interval: Duration,           // default 5min

    // 日志
    pub metrics_log_interval: Duration,          // default 30s

    // 自定义 client
    pub client_builder: Option<Box<dyn Fn() -> reqwest::ClientBuilder + Send + Sync>>,
}

pub struct RateLimitConfig {
    pub default_interval: Duration,              // default 500ms
    pub host_overrides: HashMap<String, Duration>,
}

// ── 核心 ──

pub struct ScatterProxy { /* ... */ }

impl ScatterProxy {
    /// 初始化:拉取代理源 → 加载持久化数据 → 启动后台调度循环
    pub async fn new(
        config: ScatterProxyConfig,
        classifier: impl BodyClassifier,
    ) -> Result<Self, ScatterProxyError>;

    /// 提交单个任务,返回 handle。await handle 获取结果。
    pub fn submit(
        &self,
        request: reqwest::Request,
    ) -> Result<TaskHandle, ScatterProxyError>;

    /// 批量提交
    pub fn submit_batch(
        &self,
        requests: Vec<reqwest::Request>,
    ) -> Result<Vec<TaskHandle>, ScatterProxyError>;

    /// 获取当前 metrics 快照
    pub fn metrics(&self) -> PoolMetrics;

    /// 优雅关闭:等待 in-flight 完成 → 持久化 → 停止后台任务
    pub async fn shutdown(self);
}

/// 任务句柄,await 即阻塞等待该任务完成
pub struct TaskHandle { /* oneshot::Receiver */ }

impl Future for TaskHandle {
    type Output = Result<reqwest::Response, ScatterProxyError>;
}

// ── 错误 ──

pub enum ScatterProxyError {
    /// 该 host 熔断中
    CircuitOpen { host: String },
    /// 达到最大尝试次数,仍未成功
    MaxAttemptsExhausted {
        host: String,
        attempts: usize,
        last_error: String,
    },
    /// 任务总超时
    Timeout { host: String, elapsed: Duration },
    /// 任务池已满
    PoolFull { capacity: usize },
    /// 初始化失败
    Init(String),
}
```

---

## 5 调度循环伪代码

```
scheduler_loop:
    loop {
        task = task_pool.pick_next()      // 取出一个待处理任务(跳过熔断 host 的)
        if task is None:
            sleep(50ms)
            continue

        host = task.request.url.host

        ── 熔断检查 ──
        if circuit_breaker[host].is_open():
            if circuit_breaker[host].should_probe():
                // 半开:只放行 1 路试探
                candidates = [select_one_best(host)]
            else:
                task_pool.push_back(task)  // 放回,等熔断恢复
                continue
        else:
            ── 选代理 ──
            available = all proxies where:
                - state != dead
                - (proxy, host) not in cooldown
                - (proxy, host) rate_limit elapsed >= interval
            if available.is_empty():
                circuit_breaker[host].trip("zero_available_proxies")
                task_pool.push_back(task)
                continue

            // 按 score 排序,取 Top-K
            available.sort_by(|p| score(p, host))
            K = min(max_concurrent_per_request, available.len())
            candidates = available[..K]

        ── 并发 Race ──
        global_semaphore.acquire(candidates.len())

        results = race(candidates.map(|proxy| {
            // 标记 (proxy, host) 限频时刻
            rate_limiter.mark(proxy, host)
            send_via_proxy(task.request.clone(), proxy, proxy_timeout)
        }))

        ── 处理结果 ──
        match results {
            FirstSuccess(proxy, response):
                health_tracker.record(proxy, host, success, latency)
                // 其他路的失败结果也记录
                for (p, fail) in other_results:
                    health_tracker.record(p, host, fail, latency)
                task.complete(Ok(response))
                log_debug("task done | winner={proxy} | ...")

            AllFailed(failures):
                for (proxy, verdict, latency) in failures:
                    if verdict == TargetError:
                        circuit_breaker[host].record_target_error()
                    else:
                        health_tracker.record(proxy, host, fail, latency)
                task.attempts += 1
                if task.attempts >= max_attempts or task.elapsed() >= task_timeout:
                    task.complete(Err(MaxAttemptsExhausted | Timeout))
                    log_debug("task abandoned | ...")
                else:
                    task_pool.push_back(task)   // 放回池尾
                    log_debug("task requeued | attempt={}/{}| ...", ...)
        }
    }
```

---

## 6 评分算法

```
score(proxy, host) -> f64:

    affinity = (proxy, host) 的成功率                // 无数据时默认 0.5
    global   = proxy 全局成功率                      // 无数据时默认 0.5
    recency  = 1.0 / (1.0 + minutes_since_last_success(proxy, host))

    score = affinity × 0.5 + global × 0.3 + recency × 0.2
```

- **affinity** 权重最高:该代理对该 host 的历史表现最重要
- **global** 补充:新 host 无亲和数据时,靠全局分兜底
- **recency** 奖励:最近成功过的代理得分更高,避免"老数据"遮蔽当前状态

---

## 7 状态机

### 7.1 代理节点 × Host 生命周期

```
                      ┌──────────────────────────┐
                      ▼                          │
 [Unknown] ──首次被选中──► [Active] ◄──冷却到期──┐ │
                            │                   │ │
                      连续失败 ≥ K               │ │
                            │                   │ │
                            ▼                   │ │
                        [Cooldown]              │ │
                            │                   │ │
                       冷却到期 ─────────────────┘ │
                            │                     │
                 全局成功率=0%                  代理源刷新
                 且样本≥30                     (可复活)
                            │                     │
                            ▼                     │
                         [Dead] ──────────────────┘

注意:Cooldown 粒度是 (proxy, host)。
      proxy-A 对 SSE 冷却,不影响 proxy-A 对 SZSE 继续服务。
      Dead 是 proxy 全局状态(全局成功率 = 0%)。
```

### 7.2 Host 熔断

```
 [Closed] ──零可用代理 or 连续N次TargetError──► [Open]
    ▲                                            │
    │                                       probe_interval
    │                                            │
    └───试探成功───── [HalfOpen] ◄───────────────┘
                    试探失败
                     [Open](重新计时)
```

---

## 8 持久化格式

`proxy_state.json`:

```json
{
  "version": 1,
  "saved_at": "2025-07-12T08:30:00Z",
  "proxies": {
    "socks5://1.2.3.4:1080": {
      "state": "active",
      "hosts": {
        "yunhq.sse.com.cn": {
          "success": 60, "fail": 2,
          "avg_latency_ms": 1500,
          "consecutive_fails": 0
        },
        "www.szse.cn": {
          "success": 22, "fail": 8,
          "avg_latency_ms": 2400,
          "consecutive_fails": 0
        }
      }
    },
    "socks5://5.6.7.8:9050": {
      "state": "dead",
      "hosts": {}
    }
  }
}
```

---

## 9 使用示例

```rust
use scatter_proxy::*;
use std::time::Duration;
use std::collections::HashMap;

struct AShareClassifier;
impl BodyClassifier for AShareClassifier {
    fn classify(&self, status: StatusCode, _h: &HeaderMap, body: &[u8]) -> BodyVerdict {
        let text = String::from_utf8_lossy(body);
        match status.as_u16() {
            200 if text.is_empty() => BodyVerdict::ProxyBlocked,
            200 if text.contains("验证码") => BodyVerdict::ProxyBlocked,
            200 => BodyVerdict::Success,
            403 | 429 => BodyVerdict::ProxyBlocked,
            500..=599 => BodyVerdict::TargetError,
            _ => BodyVerdict::ProxyBlocked,
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ScatterProxyConfig {
        sources: vec![
            "https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/socks5.txt".into(),
            "https://cdn.jsdelivr.net/gh/monosans/proxy-list@main/proxies/socks5.txt".into(),
        ],
        rate_limit: RateLimitConfig {
            default_interval: Duration::from_millis(500),
            host_overrides: HashMap::from([
                ("query.sse.com.cn".into(), Duration::from_millis(1000)),
            ]),
        },
        max_concurrent_per_request: 3,
        state_file: Some("proxy_state.json".into()),
        ..Default::default()
    };

    let pool = ScatterProxy::new(config, AShareClassifier).await?;

    // 批量提交任务
    let client = reqwest::Client::new();
    let symbols = ["600519", "601166", "000001"];
    let handles: Vec<_> = symbols.iter().map(|sym| {
        let req = client
            .get(format!("https://yunhq.sse.com.cn:32042/v1/sh1/line/{sym}"))
            .query(&[("callback", "cb")])
            .build()
            .unwrap();
        pool.submit(req).unwrap()
    }).collect();

    // 并发等待所有任务完成
    for (sym, handle) in symbols.iter().zip(handles) {
        match handle.await {
            Ok(resp) => println!("{sym}: {}", resp.status()),
            Err(e) => println!("{sym}: {e}"),
        }
    }

    let m = pool.metrics();
    println!(
        "throughput={:.1}/s | success={:.0}% | proxies={}/{} | pending={}",
        m.throughput_10s,
        m.success_rate_1m * 100.0,
        m.healthy_proxies,
        m.total_proxies,
        m.pending_tasks,
    );

    pool.shutdown().await;
    Ok(())
}
```

---

## 10 日志输出示例

```
# 启动
INFO  [scatter-proxy] loaded 1204 proxies from 3 sources
INFO  [scatter-proxy] restored health data for 186 proxies from proxy_state.json

# 运行中(每 30s)
INFO  [scatter-proxy] throughput=8.3/s | success=87% | pool: 142 healthy / 31 cooldown / 1031 dead | tasks: 23 pending 847 done 6 failed | inflight=9 | breakers: (none)

# 任务级别
DEBUG [scatter-proxy] task done | host=yunhq.sse.com.cn | race=3 | winner=socks5://1.2.3.4:1080 | latency=1.2s | attempt=1/5
DEBUG [scatter-proxy] task requeued | host=query.sse.com.cn | reason=3/3 ProxyBlocked | attempt=2/5
DEBUG [scatter-proxy] task abandoned | host=query.sse.com.cn | reason=max_attempts(5) | last_error=ProxyBlocked

# 熔断
WARN  [scatter-proxy] circuit OPEN | host=www.szse.cn | reason=zero available proxies
WARN  [scatter-proxy] circuit probe | host=www.szse.cn | proxy=socks5://1.2.3.4:1080 | result=Success → CLOSED

# 代理状态
DEBUG [scatter-proxy] proxy cooldown | socks5://9.8.7.6:1080 × sse.com.cn | consecutive_fails=3 | cooldown=30s
DEBUG [scatter-proxy] proxy dead | socks5://5.5.5.5:1080 | global success_rate=0% (samples=30)
```

---

## 11 非功能需求

| 项目 | 要求 |
|------|------|
| 异步运行时 | tokio 1.x |
| Rust 版本 | ≥ 1.75 stable |
| 核心依赖 | `reqwest` (0.13, socks+query), `tokio`, `serde`/`serde_json`, `tracing`, `dashmap` |
| 可选 | `reqwest-middleware`(实现 Middleware trait) |
| 线程安全 | `ScatterProxy: Send + Sync``Arc` 共享 |
| 内存 | 20,000 代理 × 10 host 健康数据 < 50 MB |
| 测试 | 单元测试:评分算法、冷却状态机、限频器;集成测试:mock server 模拟不稳定代理 |