dht-crawler
一个基于 Rust 和 Tokio 实现的高性能分布式哈希表 (DHT) 爬虫库。它能够加入 BitTorrent DHT 网络,监听并自动获取种子的元数据(Metadata/InfoHash)。
✨ 核心特性
- 🚀 极致性能:基于
Tokio异步运行时构建,支持数万级的高并发连接处理。 - 📦 自动元数据抓取:内置元数据获取引擎,自动完成从 InfoHash 到种子详情的抓取。
- 🌐 双栈网络支持:完美支持 IPv4 和 IPv6(DualStack 模式),扩大节点覆盖范围。
- ⚡ 高度可配置:支持自定义并发数、队列大小、超时时间等核心参数。
- 📊 监控友好:提供 Prometheus 指标导出接口,轻松监控爬虫状态(可选)。
🏗️ 架构与流程
本库采用了 Reactor 模式 与 Worker Pool 相结合的高并发架构,确保了在处理海量 UDP 数据包时的吞吐量。
系统架构图
graph TD
%% 网络层
Network((DHT Network)) <-->|UDP Packets| Socket[UDP Socket]
%% 接收与分发
subgraph Receiver [Packet Receiver]
Socket -->|recv_from| Reader[UDP Reader / Dispatcher]
Reader -->|Round Robin| Ch1[Channel 1]
Reader -->|Round Robin| Ch2[Channel 2]
Reader -->|...| ChN[Channel N]
end
%% 并行处理
subgraph Processing [Packet Processing Workers]
Ch1 --> W1[Worker 1]
Ch2 --> W2[Worker 2]
ChN --> WN[Worker N]
W1 & W2 & WN -->|Parse & Logic| Logic{Protocol Logic}
end
%% 业务逻辑分支
Logic -->|Discover Node| NodeMgr[Node Queue]
Logic -->|Discover InfoHash| HashQ[Hash Queue]
Logic -->|On Error| ErrorCb[User on_error Callback]
%% 元数据抓取子系统
subgraph Metadata [Metadata Subsystem]
HashQ --> Scheduler[Scheduler]
Scheduler -->|Spawn| MetaW1[Meta Worker 1]
Scheduler -->|...| MetaWN[Meta Worker N]
MetaW1 & MetaWN <-->|TCP / ut_metadata| Peer((Remote Peer))
end
MetaW1 & MetaWN -->|Success| Callback[User Callback]
核心流程解析
-
UDP 读取与分发 (Reader & Dispatcher):
- 独立的 UDP Reader 任务持续从 Socket 读取数据包。
- 使用 Round-Robin 策略将数据包分发给 N 个(默认为 CPU 核心数)处理 Channel,实现无锁的负载均衡。
-
并行协议处理 (Packet Workers):
- N 个 Packet Worker 并行消费 Channel 中的数据。
- 负责 Bencode 解码、KRPC 协议解析、消息路由(Query/Response)。
- 高效处理
get_peers和announce_peer消息,提取 InfoHash。 - 运行时错误通过
on_error回调上报,不触发 panic,便于 JNI 等集成场景。
-
元数据调度 (Metadata Subsystem):
- 提取出的 InfoHash 进入独立的 Hash Queue。
- Scheduler 根据配置的并发度(如 1000+)动态启动 Metadata Worker。
- Worker 通过 TCP 连接 Peer,使用 BEP-0009 协议下载种子元数据。
📦 安装
在你的 Cargo.toml 中添加依赖:
[]
= "0.1"
如果需要 Prometheus 监控支持:
[]
= { = "0.1", = ["metrics"] }
🚀 快速开始
下面是一个最简的启动示例。它会启动一个 DHT 节点,并在抓取到新种子时打印日志。
use *;
async
完整的可运行代码请参考 examples/main.rs
⚙️ 配置详解
DHTOptions 提供了丰富的配置项来调整爬虫行为:
let options = DHTOptions ;
可选 API:server.set_filter(|info_hash_hex| bool) 可在发现阶段过滤要处理的 info_hash(返回 true 表示允许进入元数据队列)。
错误处理
库内采用严格的错误处理策略,避免底层异常导致进程崩溃,便于集成 JNI 或嵌入式场景。
错误类型 DHTError
use ;
// 错误变体包括:
// - DHTError::Network(io::Error) — 网络/IO 错误
// - DHTError::Init(String) — 初始化错误(如 socket、worker)
// - DHTError::Internal(String) — 内部逻辑错误
// - DHTError::LockPoisoned(String) — 锁中毒(预留)
// - DHTError::Other(String) — 其他
注册错误回调 on_error
运行时错误(如协议处理失败)会通过回调上报,而不会 panic:
let server = new.await?;
// 将错误输出到 stderr 或接入自己的日志/监控
server.on_error;
// JNI 场景示例:将错误传回 Java 层
// server.on_error(|err| {
// jni_callback_on_error(env, err.to_string());
// });
同步错误:Result 传播
DHTServer::new() 和 server.start() 返回 Result,调用方需处理或传播:
async
编译示例
1. 启用 mimalloc(内存优化)
在长运行的高并发场景下,使用 mimalloc 可降低约 10–30% 内存占用。本库的 mimalloc feature 仅用于方便编译/运行示例;若将本库作为依赖使用,请在自己的 bin 项目中单独引入并配置 mimalloc 全局分配器。
2. 启用 metrics(监控)
启用后,可通过 HTTP 接口拉取 Prometheus 格式的监控数据。
监控地址:http://localhost:9000/metrics
📜 许可证
MIT License