dht_crawler/
server.rs

1use crate::error::Result;
2use crate::metadata::RbitFetcher;
3use crate::protocol::{DhtMessage, DhtArgs, DhtResponse};
4use crate::scheduler::MetadataScheduler;
5use crate::types::{DHTOptions, TorrentInfo, NetMode};
6use crate::sharded::{ShardedNodeQueue, NodeTuple};
7use rand::Rng;
8use ahash::AHasher;
9use std::hash::{Hash, Hasher};
10use std::net::{IpAddr, Ipv6Addr, SocketAddr};
11use std::sync::{Arc, RwLock};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::time::Duration;
14use tokio::net::UdpSocket;
15use tokio::sync::{mpsc, Semaphore};
16use socket2::{Socket, Domain, Type, Protocol};
17use std::pin::Pin;
18use std::future::Future;
19
20const BOOTSTRAP_NODES: &[&str] = &[
21    "router.bittorrent.com:6881",
22    "dht.transmissionbt.com:6881",
23    "router.utorrent.com:6881",
24    "dht.aelitis.com:6881",
25];
26
27// 类型定义
28pub type BoxedBoolFuture = Pin<Box<dyn Future<Output = bool> + Send>>;
29pub type MetadataFetchCallback = Arc<dyn Fn(String) -> BoxedBoolFuture + Send + Sync>;
30
31// Hash 发现事件
32#[derive(Debug, Clone)]
33pub struct HashDiscovered {
34    pub info_hash: String,
35    pub peer_addr: SocketAddr,
36    pub discovered_at: std::time::Instant,
37}
38
39// ---------------------------------------------------------------
40
41type TorrentCallback = Arc<dyn Fn(TorrentInfo) + Send + Sync>;
42type FilterCallback = Arc<dyn Fn(&str) -> bool + Send + Sync>;
43
44#[derive(Clone)]
45pub struct DHTServer {
46    #[allow(dead_code)]
47    options: DHTOptions,
48    node_id: Vec<u8>,
49    socket: Arc<UdpSocket>,
50    socket_v6: Option<Arc<UdpSocket>>,
51    token_secret: Vec<u8>,
52
53    callback: Arc<RwLock<Option<TorrentCallback>>>,
54    filter: Arc<RwLock<Option<FilterCallback>>>,
55    on_metadata_fetch: Arc<RwLock<Option<MetadataFetchCallback>>>,
56
57    // 使用分片锁,大幅减少竞争
58    node_queue: Arc<ShardedNodeQueue>,
59
60    // 发送 hash 发现事件
61    hash_tx: mpsc::Sender<HashDiscovered>,
62    
63    // Metadata 队列长度(用于自适应爬取速度)
64    metadata_queue_len: Arc<AtomicUsize>,
65    max_metadata_queue_size: usize,
66}
67
68impl DHTServer {
69    pub async fn new(options: DHTOptions) -> Result<Self> {
70        let (socket, socket_v6) = match options.netmode {
71            NetMode::Ipv4Only => {
72                let sock = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
73                #[cfg(not(windows))]
74                { let _ = sock.set_reuse_port(true); }
75                let _ = sock.set_reuse_address(true);
76                sock.set_nonblocking(true)?;
77                
78                // 增加网络缓冲区以应对高QPS
79                let _ = sock.set_recv_buffer_size(32 * 1024 * 1024);  // 32MB
80                let _ = sock.set_send_buffer_size(8 * 1024 * 1024);   // 8MB
81
82                let addr: SocketAddr = format!("0.0.0.0:{}", options.port).parse().unwrap();
83                sock.bind(&addr.into())?;
84                (Arc::new(UdpSocket::from_std(sock.into())?), None)
85            },
86            NetMode::Ipv6Only => {
87                let sock = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
88                #[cfg(not(windows))]
89                { let _ = sock.set_reuse_port(true); }
90                let _ = sock.set_reuse_address(true);
91                // 设置仅IPv6模式(Windows默认是仅IPv6,Linux/Unix需要设置)
92                #[cfg(not(windows))]
93                { let _ = sock.set_only_v6(true); }
94                sock.set_nonblocking(true)?;
95                
96                let _ = sock.set_recv_buffer_size(32 * 1024 * 1024);
97                let _ = sock.set_send_buffer_size(8 * 1024 * 1024);
98
99                let addr: SocketAddr = format!("[::]:{}", options.port).parse().unwrap();
100                sock.bind(&addr.into())?;
101                (Arc::new(UdpSocket::from_std(sock.into())?), None)
102            },
103            NetMode::DualStack => {
104                // IPv4 socket
105                let sock_v4 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
106                #[cfg(not(windows))]
107                { let _ = sock_v4.set_reuse_port(true); }
108                let _ = sock_v4.set_reuse_address(true);
109                sock_v4.set_nonblocking(true)?;
110                let _ = sock_v4.set_recv_buffer_size(32 * 1024 * 1024);
111                let _ = sock_v4.set_send_buffer_size(8 * 1024 * 1024);
112                let addr_v4: SocketAddr = format!("0.0.0.0:{}", options.port).parse().unwrap();
113                sock_v4.bind(&addr_v4.into())?;
114                let socket = Arc::new(UdpSocket::from_std(sock_v4.into())?);
115
116                // IPv6 socket
117                let sock_v6 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
118                #[cfg(not(windows))]
119                { let _ = sock_v6.set_reuse_port(true); }
120                let _ = sock_v6.set_reuse_address(true);
121                #[cfg(not(windows))]
122                { let _ = sock_v6.set_only_v6(true); }  // 仅IPv6,避免与IPv4冲突
123                sock_v6.set_nonblocking(true)?;
124                let _ = sock_v6.set_recv_buffer_size(32 * 1024 * 1024);
125                let _ = sock_v6.set_send_buffer_size(8 * 1024 * 1024);
126                let addr_v6: SocketAddr = format!("[::]:{}", options.port).parse().unwrap();
127                sock_v6.bind(&addr_v6.into())?;
128                let socket_v6 = Some(Arc::new(UdpSocket::from_std(sock_v6.into())?));
129
130                (socket, socket_v6)
131            },
132        };
133
134        let node_id = generate_random_id();
135        let mut rng = rand::thread_rng();
136        let token_secret: Vec<u8> = (0..10).map(|_| rng.gen()).collect();
137
138        // 使用分片队列
139        // 队列容量:从配置获取
140        let node_queue = ShardedNodeQueue::new(options.node_queue_capacity);
141
142        // -----------------------------------------------------------
143        // 内部初始化 MetadataScheduler
144        // -----------------------------------------------------------
145        let (hash_tx, hash_rx) = mpsc::channel::<HashDiscovered>(10000);
146
147        let fetcher = Arc::new(RbitFetcher::new(options.metadata_timeout));
148        
149        // 创建共享的回调状态
150        let callback = Arc::new(RwLock::new(None));
151        let on_metadata_fetch = Arc::new(RwLock::new(None));
152        
153        // 创建共享的队列长度计数器
154        let metadata_queue_len = Arc::new(AtomicUsize::new(0));
155
156        let scheduler = MetadataScheduler::new(
157            hash_rx,
158            fetcher,
159            options.max_metadata_queue_size,
160            options.max_metadata_worker_count,
161            callback.clone(),
162            on_metadata_fetch.clone(),
163            metadata_queue_len.clone(),
164        );
165
166        // 启动 Scheduler
167        tokio::spawn(async move {
168            scheduler.run().await;
169        });
170
171        let server = Self {
172            options: options.clone(),
173            node_id: node_id.clone(),
174            socket,
175            socket_v6,
176            token_secret,
177            callback,
178            on_metadata_fetch,
179            node_queue: Arc::new(node_queue),
180            filter: Arc::new(RwLock::new(None)),
181            hash_tx,
182            metadata_queue_len,
183            max_metadata_queue_size: options.max_metadata_queue_size,
184        };
185
186        Ok(server)
187    }
188
189    pub fn local_addr(&self) -> Result<SocketAddr> {
190        Ok(self.socket.local_addr()?)
191    }
192
193    /// 验证地址类型是否与当前 netmode 配置匹配
194    /// 
195    /// 防御性编程:虽然 socket 层面理论上不应该接收到不匹配的数据包,
196    /// 但在某些特殊情况下(如系统配置、双栈模式切换等)可能会有问题。
197    /// 此方法确保在应用层也进行验证,避免处理不匹配的地址类型。
198    fn is_addr_allowed(&self, addr: &SocketAddr) -> bool {
199        match self.options.netmode {
200            NetMode::Ipv4Only => addr.is_ipv4(),
201            NetMode::Ipv6Only => addr.is_ipv6(),
202            NetMode::DualStack => true, // 双栈模式接受所有地址类型
203        }
204    }
205
206    /// 根据目标地址选择合适的socket
207    fn select_socket(&self, addr: &SocketAddr) -> &Arc<UdpSocket> {
208        match self.options.netmode {
209            NetMode::Ipv4Only => {
210                // IPv4Only 模式:只有 IPv4 socket
211                &self.socket
212            },
213            NetMode::Ipv6Only => {
214                // IPv6Only 模式:只有 IPv6 socket
215                &self.socket
216            },
217            NetMode::DualStack => {
218                // 双栈模式:根据地址类型选择
219                if addr.is_ipv6() {
220                    self.socket_v6.as_ref().unwrap_or(&self.socket)
221                } else {
222                    &self.socket
223                }
224            },
225        }
226    }
227
228    /// 设置元数据获取前的检查回调
229    ///
230    /// 此回调在发现新的 info_hash 后,但在实际连接对等端获取元数据之前执行。
231    /// 你可以在这里进行去重检查(如查询数据库),返回 `true` 表示继续获取,`false` 表示跳过。
232    ///
233    /// # 注意事项
234    /// - 回调是在 `MetadataScheduler` 的 Worker 线程中异步执行的(通过 `.await`)。
235    /// - 支持耗时操作(如数据库查询),但请注意 Worker 数量限制(默认 500)。
236    /// - 如果回调执行过慢,可能会导致任务队列堆积。
237    ///
238    /// # 示例
239    /// ```rust,ignore
240    /// server.on_metadata_fetch(|hash| async move {
241    ///     // 检查数据库是否存在
242    ///     // let exists = db.has(hash).await;
243    ///     // !exists
244    ///     true
245    /// });
246    /// ```
247    pub fn on_metadata_fetch<F, Fut>(&self, callback: F)
248    where
249        F: Fn(String) -> Fut + Send + Sync + 'static,
250        Fut: Future<Output = bool> + Send + 'static,
251    {
252        *self.on_metadata_fetch.write().unwrap() = Some(Arc::new(move |hash| {
253            Box::pin(callback(hash))
254        }));
255    }
256
257    /// 设置成功获取到种子信息的回调
258    ///
259    /// 当成功从对等端下载并解析出种子元数据(Metadata)后调用。
260    ///
261    /// # 注意事项
262    /// - 此回调是在 Worker 线程中同步执行的。
263    /// - 如果包含耗时操作(如写入大量数据或复杂计算),**必须**在回调内部手动使用 `tokio::spawn`。
264    /// - 否则会阻塞当前的元数据获取 Worker,降低系统吞吐量。
265    ///
266    /// # 示例
267    /// ```rust,ignore
268    /// server.on_torrent(|info| {
269    ///     // 简单操作可以直接做
270    ///     println!("Got torrent: {}", info.name);
271    ///     
272    ///     // 耗时操作建议 spawn
273    ///     tokio::spawn(async move {
274    ///         save_to_db(info).await;
275    ///     });
276    /// });
277    /// ```
278    pub fn on_torrent<F>(&self, callback: F) where F: Fn(TorrentInfo) + Send + Sync + 'static {
279        *self.callback.write().unwrap() = Some(Arc::new(callback));
280    }
281    
282    /// 设置 Hash 过滤器
283    ///
284    /// 在处理 `announce_peer` 消息时,用于快速判断是否应该处理该 Hash。
285    /// 这通常用于布隆过滤器之前的黑名单或白名单机制。
286    ///
287    /// # 注意事项
288    /// - 此回调是在 UDP 处理线程中**同步执行**的。
289    /// - **绝对禁止**执行任何耗时操作(如 IO、数据库查询、锁等待)。
290    /// - 任何延迟都会直接阻塞网络包的接收,导致丢包。
291    /// - 应仅进行纯内存的快速判断。
292    pub fn set_filter<F>(&self, filter: F) where F: Fn(&str) -> bool + Send + Sync + 'static {
293        *self.filter.write().unwrap() = Some(Arc::new(filter));
294    }
295
296
297    pub fn get_node_pool_size(&self) -> usize {
298        self.node_queue.len()
299    }
300
301    pub async fn start(&self) -> Result<()> {
302
303        self.start_receiver();
304        self.bootstrap().await;
305
306        let server = self.clone();
307
308        tokio::spawn(async move {
309            let semaphore = Arc::new(Semaphore::new(2000));
310            let mut loop_tick = 0;
311
312            loop {
313                // 🚀 自适应爬取速度:根据 Metadata 队列负载调整爬取策略
314                let queue_len = server.metadata_queue_len.load(Ordering::Relaxed);
315                let queue_pressure = queue_len as f64 / server.max_metadata_queue_size as f64;
316                
317                // 动态计算批次大小和休眠时间
318                let (batch_size, sleep_duration) = if queue_pressure < 0.8 {
319                    // 🟡 黄区:队列有压力,适度减速
320                    (200, Duration::from_millis(10))
321                } else if queue_pressure < 0.95 {
322                    // 🟠 橙区:队列高压,大幅减速
323                    (20, Duration::from_millis(500))
324                } else {
325                    // 🔴 红区:队列爆满,暂停主动爬取
326                    (0, Duration::from_millis(1000))
327                };
328
329                // 根据配置决定从哪个队列获取节点
330                let filter_ipv6 = match server.options.netmode {
331                    NetMode::Ipv4Only => Some(false),
332                    NetMode::Ipv6Only => Some(true),
333                    NetMode::DualStack => None,
334                };
335                
336                // 检查对应队列是否为空
337                let queue_empty = server.node_queue.is_empty_for(filter_ipv6);
338                
339                let nodes_batch = {
340                    if queue_empty || batch_size == 0 {
341                        None
342                    } else {
343                        Some(server.node_queue.pop_batch(batch_size, filter_ipv6))
344                    }
345                };
346
347                loop_tick += 1;
348                if nodes_batch.is_none() || loop_tick % 50 == 0 {
349                    server.bootstrap().await;
350                    if nodes_batch.is_none() {
351                        tokio::time::sleep(sleep_duration).await;
352                        continue;
353                    }
354                }
355
356                if let Some(nodes) = nodes_batch {
357                    for node in nodes {
358                        let permit = semaphore.clone().acquire_owned().await.unwrap();
359                        let server_clone = server.clone();
360                        tokio::spawn(async move {
361                            let neighbor_id = generate_neighbor_target(&node.id, &server_clone.node_id);
362                            let random_target = generate_random_id();
363                            let _ = server_clone.send_find_node(node.addr, &random_target, &neighbor_id).await;
364                            drop(permit);
365                        });
366                    }
367                }
368
369                tokio::time::sleep(sleep_duration).await;
370            }
371        });
372
373        std::future::pending::<()>().await;
374        Ok(())
375    }
376
377    fn start_receiver(&self) {
378        let socket = self.socket.clone();
379        let socket_v6 = self.socket_v6.clone();
380        let server = self.clone();
381
382        let num_workers = std::thread::available_parallelism()
383            .map(|n| n.get())
384            .unwrap_or(8);
385
386        let queue_size = 5000;
387
388        let mut senders = Vec::with_capacity(num_workers);
389        for _ in 0..num_workers {
390            let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(queue_size);
391            senders.push(tx);
392
393            let server_clone = server.clone();
394
395            tokio::spawn(async move {
396                while let Some((data, addr)) = rx.recv().await {
397                    let _ = server_clone.handle_message(&data, addr).await;
398                }
399            });
400        }
401
402        let senders_for_v6 = senders.clone();
403        tokio::spawn(async move {
404            let mut buf = [0u8; 65536];
405            let mut next_worker_idx = 0;
406
407            loop {
408                match socket.recv_from(&mut buf).await {
409                    Ok((size, addr)) => {
410                        // 🛡️ 安全检查1:拒绝异常大的包(DHT 消息通常 < 2KB)
411                        if size > 8192 {
412                            log::trace!("⚠️ 拒绝异常大的 UDP 包: {} 字节 from {}", size, addr);
413                            continue;
414                        }
415                        
416                        // 🛡️ 安全检查2:快速检查是否是有效的 Bencode 字典
417                        // DHT KRPC 消息(BEP-5)必须是字典,首字符必须是 'd'
418                        if size == 0 || buf[0] != b'd' {
419                            continue;
420                        }
421
422                        let data = buf[..size].to_vec();
423
424                        let tx = &senders[next_worker_idx];
425                        next_worker_idx = (next_worker_idx + 1) % num_workers;
426
427                        match tx.try_send((data, addr)) {
428                            Ok(_) => {},
429                            Err(mpsc::error::TrySendError::Full(_)) => {
430                                #[cfg(debug_assertions)]
431                                log::trace!("UDP worker queue full, dropping packet");
432                            },
433                            Err(_) => { break; }
434                        }
435                    }
436                    Err(_e) => {
437                        tokio::time::sleep(Duration::from_millis(1)).await;
438                    }
439                }
440            }
441        });
442
443        // IPv6 接收任务
444        if let Some(socket_v6) = socket_v6 {
445            let senders_v6 = senders_for_v6;
446            tokio::spawn(async move {
447                let mut buf = [0u8; 65536];
448                let mut next_worker_idx = 0;
449
450                loop {
451                    match socket_v6.recv_from(&mut buf).await {
452                        Ok((size, addr)) => {
453                            if size > 8192 { continue; }
454                            if size == 0 || buf[0] != b'd' { continue; }
455
456                            let data = buf[..size].to_vec();
457
458                            let tx = &senders_v6[next_worker_idx];
459                            next_worker_idx = (next_worker_idx + 1) % num_workers;
460
461                            match tx.try_send((data, addr)) {
462                                Ok(_) => {},
463                                Err(mpsc::error::TrySendError::Full(_)) => {
464                                    #[cfg(debug_assertions)]
465                                    log::trace!("UDP worker queue full, dropping packet");
466                                },
467                                Err(_) => { break; }
468                            }
469                        }
470                        Err(_e) => {
471                            tokio::time::sleep(Duration::from_millis(1)).await;
472                        }
473                    }
474                }
475            });
476        }
477    }
478
479    async fn handle_message(&self, data: &[u8], addr: SocketAddr) -> Result<()> {
480        // 🛡️ 验证地址类型是否与当前 netmode 配置匹配
481        // 防御性编程:虽然 socket 层面理论上不应该接收到不匹配的数据包,
482        // 但在某些特殊情况下(如系统配置、双栈模式切换等)可能会有问题
483        if !self.is_addr_allowed(&addr) {
484            log::trace!("⚠️ 拒绝不匹配的地址类型: {} (当前模式: {:?})", addr, self.options.netmode);
485            return Ok(());
486        }
487
488        let msg: DhtMessage = match serde_bencode::from_bytes(data) {
489            Ok(m) => m,
490            Err(_) => return Ok(()),
491        };
492
493        match msg.y.as_str() {
494            "q" => {
495                if let Some(q_type) = &msg.q {
496                    self.handle_query(&msg, q_type.as_bytes(), addr).await?;
497                }
498            }
499            "r" => {
500                if let Some(response) = &msg.r {
501                    self.handle_response(response).await?;
502                }
503            }
504            _ => {}
505        }
506        Ok(())
507    }
508
509    async fn handle_query(&self, msg: &DhtMessage, query_type: &[u8], addr: SocketAddr) -> Result<()> {
510        let args = match &msg.a {
511            Some(a) => a,
512            None => return Ok(()),
513        };
514
515        let transaction_id = &msg.t;
516        let sender_id: Option<&[u8]> = args.id.as_deref().map(|v| v.as_slice());
517        let target_id_fallback: Option<&[u8]> = args.target.as_deref()
518            .or(args.info_hash.as_deref())
519            .map(|v| v.as_slice());
520
521        let q_str = std::str::from_utf8(query_type).unwrap_or("");
522        
523        if q_str == "announce_peer" {
524            self.handle_announce_peer(args, addr).await?;
525        }
526
527        self.send_response(transaction_id, addr, q_str, sender_id, target_id_fallback).await?;
528        Ok(())
529    }
530
531    async fn handle_announce_peer(&self, args: &DhtArgs, addr: SocketAddr) -> Result<()> {
532        if let Some(token) = &args.token {
533            if !self.validate_token(token, addr) { return Ok(()); }
534        } else {
535            return Ok(());
536        }
537
538        if let Some(info_hash) = &args.info_hash {
539            let info_hash_arr: [u8; 20] = match info_hash.as_ref().try_into() {
540                Ok(arr) => arr, Err(_) => return Ok(()),
541            };
542            let hash_hex = hex::encode(info_hash_arr);
543
544            let filter_cb = self.filter.read().unwrap().clone();
545            if let Some(f) = filter_cb {
546                if !f(&hash_hex) { return Ok(()); }
547            }
548
549            #[cfg(debug_assertions)]
550            log::debug!("🔥 新 Hash: {} 来自 {}", hash_hex, addr);
551
552            // 解耦:发送 hash 发现事件
553            let port = if let Some(implied) = args.implied_port {
554                if implied != 0 { addr.port() } else { args.port.unwrap_or(0) }
555            } else {
556                args.port.unwrap_or(addr.port())
557            };
558
559            if port > 0 {
560                let event = HashDiscovered {
561                    info_hash: hash_hex,
562                    peer_addr: SocketAddr::new(addr.ip(), port),
563                    discovered_at: std::time::Instant::now(),
564                };
565
566                // 使用 try_send,队列满时直接丢弃(背压)
567                if let Err(_) = self.hash_tx.try_send(event) {
568                    #[cfg(debug_assertions)]
569                    log::debug!("⚠️ Hash 队列满,丢弃 hash");
570                }
571            }
572        }
573        Ok(())
574    }
575
576    async fn handle_response(&self, response: &DhtResponse) -> Result<()> {
577        // 处理 IPv4 节点
578        if let Some(nodes_bytes) = &response.nodes {
579            self.process_compact_nodes(nodes_bytes);
580        }
581        // 处理 IPv6 节点
582        if let Some(nodes6_bytes) = &response.nodes6 {
583            self.process_compact_nodes_v6(nodes6_bytes);
584        }
585        Ok(())
586    }
587
588    fn process_compact_nodes(&self, nodes_bytes: &[u8]) {
589        // 根据配置决定是否处理IPv4节点
590        if self.options.netmode == NetMode::Ipv6Only {
591            return;
592        }
593
594        if nodes_bytes.len() % 26 != 0 { return; }
595
596        // 使用分片队列,直接并发插入(无锁竞争)
597        for chunk in nodes_bytes.chunks(26) {
598            let id = chunk[0..20].to_vec();
599            let port = u16::from_be_bytes([chunk[24], chunk[25]]);
600            
601            let ip = std::net::Ipv4Addr::new(chunk[20], chunk[21], chunk[22], chunk[23]);
602            let addr = SocketAddr::new(std::net::IpAddr::V4(ip), port);
603            
604            self.node_queue.push(NodeTuple { id, addr });
605        }
606    }
607
608    fn process_compact_nodes_v6(&self, nodes_bytes: &[u8]) {
609        // 根据配置决定是否处理IPv6节点
610        if self.options.netmode == NetMode::Ipv4Only {
611            return;
612        }
613
614        if nodes_bytes.len() % 38 != 0 { return; }
615        for chunk in nodes_bytes.chunks(38) {
616            let id = chunk[0..20].to_vec();
617            let port = u16::from_be_bytes([chunk[36], chunk[37]]);
618            let ip_bytes: [u8; 16] = match chunk[20..36].try_into() {
619                Ok(b) => b,
620                Err(_) => continue, // 如果转换失败(理论上不会),跳过该节点
621            };
622            let ip = Ipv6Addr::from(ip_bytes);
623            // 过滤掉不可用地址 (组播, 未指定等)
624            if !ip.is_unspecified() && !ip.is_multicast() {
625                let addr = SocketAddr::new(IpAddr::V6(ip), port);
626                self.node_queue.push(NodeTuple { id, addr });
627            }
628        }
629    }
630
631    async fn send_response(
632        &self,
633        tid: &[u8],
634        addr: SocketAddr,
635        query_type: &str,
636        sender_id: Option<&[u8]>,
637        target_id_fallback: Option<&[u8]>,
638    ) -> Result<()> {
639        let mut r_dict = std::collections::HashMap::new();
640
641        let reference_id = sender_id.or(target_id_fallback);
642        let my_id = if let Some(target) = reference_id {
643            generate_neighbor_target(target, &self.node_id)
644        } else {
645            self.node_id.clone()
646        };
647
648        r_dict.insert(b"id".to_vec(), serde_bencode::value::Value::Bytes(my_id));
649        let token = self.generate_token(addr);
650        r_dict.insert(b"token".to_vec(), serde_bencode::value::Value::Bytes(token));
651
652        if query_type == "get_peers" || query_type == "find_node" {
653            // 根据配置和请求方IP类型决定需要获取的节点类型
654            let requestor_is_ipv6 = addr.is_ipv6();
655            let filter_ipv6 = match self.options.netmode {
656                NetMode::Ipv4Only => Some(false),  // 只要 IPv4
657                NetMode::Ipv6Only => Some(true),   // 只要 IPv6
658                NetMode::DualStack => Some(requestor_is_ipv6),  // 双栈模式:根据请求方IP类型返回对应类型的节点
659            };
660            
661            // 使用分片队列获取随机节点(无锁竞争,带地址族过滤)
662            let nodes = self.node_queue.get_random_nodes(8, filter_ipv6);
663            
664            let mut nodes_data = Vec::new();
665            let mut nodes6_data = Vec::new();
666
667            for node in nodes {
668                match node.addr.ip() {
669                    // IPv4 节点
670                    IpAddr::V4(ip) => {
671                        nodes_data.extend_from_slice(&node.id);
672                        nodes_data.extend_from_slice(&ip.octets());
673                        nodes_data.extend_from_slice(&node.addr.port().to_be_bytes());
674                    },
675                    // IPv6 节点
676                    IpAddr::V6(ip) => {
677                        nodes6_data.extend_from_slice(&node.id);
678                        nodes6_data.extend_from_slice(&ip.octets());
679                        nodes6_data.extend_from_slice(&node.addr.port().to_be_bytes());
680                    },
681                }
682            }
683            
684            // 根据请求方IP类型返回对应类型的节点
685            // 在单栈模式下,get_random_nodes 已经过滤了节点类型,所以这里直接根据请求方类型返回即可
686            if requestor_is_ipv6 {
687                // 请求方是IPv6:返回IPv6节点
688                if !nodes6_data.is_empty() {
689                    r_dict.insert(b"nodes6".to_vec(), serde_bencode::value::Value::Bytes(nodes6_data));
690                }
691            } else {
692                // 请求方是IPv4:返回IPv4节点
693                if !nodes_data.is_empty() {
694                    r_dict.insert(b"nodes".to_vec(), serde_bencode::value::Value::Bytes(nodes_data));
695                }
696            }
697        }
698
699        let mut response: std::collections::HashMap<String, serde_bencode::value::Value> = std::collections::HashMap::new();
700        response.insert("t".to_string(), serde_bencode::value::Value::Bytes(tid.to_vec()));
701        response.insert("y".to_string(), serde_bencode::value::Value::Bytes(b"r".to_vec()));
702        response.insert("r".to_string(), serde_bencode::value::Value::Dict(r_dict));
703
704        if let Ok(encoded) = serde_bencode::to_bytes(&response) {
705            let _ = self.select_socket(&addr).send_to(&encoded, addr).await;
706        }
707        Ok(())
708    }
709
710    async fn bootstrap(&self) {
711        let target = generate_random_id();
712        for node in BOOTSTRAP_NODES {
713            match tokio::net::lookup_host(node).await {
714                Ok(addrs) => {
715                    for addr in addrs {
716                        // 根据配置过滤地址
717                        match self.options.netmode {
718                            NetMode::Ipv4Only => {
719                                if addr.is_ipv6() { continue; }
720                            },
721                            NetMode::Ipv6Only => {
722                                if addr.is_ipv4() { continue; }
723                            },
724                            NetMode::DualStack => {
725                                // 双栈模式,接受所有地址
726                            },
727                        }
728                        let _ = self.send_find_node(addr, &target, &self.node_id).await;
729                    }
730                }
731                Err(_) => {}
732            }
733        }
734    }
735
736    async fn send_find_node(&self, addr: SocketAddr, target: &[u8], sender_id: &[u8]) -> Result<()> {
737        let mut args = std::collections::HashMap::new();
738        args.insert(b"id".to_vec(), serde_bencode::value::Value::Bytes(sender_id.to_vec()));
739        args.insert(b"target".to_vec(), serde_bencode::value::Value::Bytes(target.to_vec()));
740
741        let mut msg: std::collections::HashMap<String, serde_bencode::value::Value> = std::collections::HashMap::new();
742        msg.insert("t".to_string(), serde_bencode::value::Value::Bytes(vec![0, 1]));
743        msg.insert("y".to_string(), serde_bencode::value::Value::Bytes(b"q".to_vec()));
744        msg.insert("q".to_string(), serde_bencode::value::Value::Bytes(b"find_node".to_vec()));
745        msg.insert("a".to_string(), serde_bencode::value::Value::Dict(args));
746
747        if let Ok(encoded) = serde_bencode::to_bytes(&msg) {
748            let _ = self.select_socket(&addr).send_to(&encoded, addr).await;
749        }
750        Ok(())
751    }
752
753    fn generate_token(&self, addr: SocketAddr) -> Vec<u8> {
754
755        let mut hasher = AHasher::default();
756        
757        // Hash IP地址
758        match addr.ip() {
759            IpAddr::V4(ip) => ip.octets().hash(&mut hasher),
760            IpAddr::V6(ip) => ip.octets().hash(&mut hasher),
761        }
762        
763        // Hash 密钥
764        self.token_secret.hash(&mut hasher);
765        
766        // 返回 8 字节 token
767        let hash = hasher.finish();
768        hash.to_le_bytes().to_vec()
769    }
770    
771    fn validate_token(&self, token: &[u8], addr: SocketAddr) -> bool {
772        if token.len() != 8 {
773            return false;
774        }
775        let expected = self.generate_token(addr);
776        token == expected.as_slice()
777    }
778}
779
780fn generate_random_id() -> Vec<u8> {
781    let mut rng = rand::thread_rng();
782    (0..20).map(|_| rng.gen()).collect()
783}
784
785fn generate_neighbor_target(remote_id: &[u8], local_id: &[u8]) -> Vec<u8> {
786    let mut id = Vec::with_capacity(20);
787    let prefix_len = std::cmp::min(remote_id.len(), 6);
788    id.extend_from_slice(&remote_id[..prefix_len]);
789    if local_id.len() > prefix_len {
790        id.extend_from_slice(&local_id[prefix_len..]);
791    } else {
792        while id.len() < 20 {
793            id.push(rand::random());
794        }
795    }
796    id
797}