1use crate::error::Result;
2use crate::metadata::RbitFetcher;
3use crate::protocol::{DhtMessage, DhtArgs, DhtResponse};
4use crate::scheduler::MetadataScheduler;
5use crate::types::{DHTOptions, TorrentInfo, NetMode};
6use crate::sharded::{ShardedNodeQueue, NodeTuple};
7use rand::Rng;
8use ahash::AHasher;
9use std::hash::{Hash, Hasher};
10use std::net::{IpAddr, Ipv6Addr, SocketAddr};
11use std::sync::{Arc, RwLock};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::time::Duration;
14use tokio::net::UdpSocket;
15use tokio::sync::{mpsc, Semaphore};
16use socket2::{Socket, Domain, Type, Protocol};
17use std::pin::Pin;
18use std::future::Future;
19
20const BOOTSTRAP_NODES: &[&str] = &[
21 "router.bittorrent.com:6881",
22 "dht.transmissionbt.com:6881",
23 "router.utorrent.com:6881",
24 "dht.aelitis.com:6881",
25];
26
27pub type BoxedBoolFuture = Pin<Box<dyn Future<Output = bool> + Send>>;
29pub type MetadataFetchCallback = Arc<dyn Fn(String) -> BoxedBoolFuture + Send + Sync>;
30
31#[derive(Debug, Clone)]
33pub struct HashDiscovered {
34 pub info_hash: String,
35 pub peer_addr: SocketAddr,
36 pub discovered_at: std::time::Instant,
37}
38
39type TorrentCallback = Arc<dyn Fn(TorrentInfo) + Send + Sync>;
42type FilterCallback = Arc<dyn Fn(&str) -> bool + Send + Sync>;
43
44#[derive(Clone)]
45pub struct DHTServer {
46 #[allow(dead_code)]
47 options: DHTOptions,
48 node_id: Vec<u8>,
49 socket: Arc<UdpSocket>,
50 socket_v6: Option<Arc<UdpSocket>>,
51 token_secret: Vec<u8>,
52
53 callback: Arc<RwLock<Option<TorrentCallback>>>,
54 filter: Arc<RwLock<Option<FilterCallback>>>,
55 on_metadata_fetch: Arc<RwLock<Option<MetadataFetchCallback>>>,
56
57 node_queue: Arc<ShardedNodeQueue>,
59
60 hash_tx: mpsc::Sender<HashDiscovered>,
62
63 metadata_queue_len: Arc<AtomicUsize>,
65 max_metadata_queue_size: usize,
66}
67
68impl DHTServer {
69 pub async fn new(options: DHTOptions) -> Result<Self> {
70 let (socket, socket_v6) = match options.netmode {
71 NetMode::Ipv4Only => {
72 let sock = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
73 #[cfg(not(windows))]
74 { let _ = sock.set_reuse_port(true); }
75 let _ = sock.set_reuse_address(true);
76 sock.set_nonblocking(true)?;
77
78 let _ = sock.set_recv_buffer_size(32 * 1024 * 1024); let _ = sock.set_send_buffer_size(8 * 1024 * 1024); let addr: SocketAddr = format!("0.0.0.0:{}", options.port).parse().unwrap();
83 sock.bind(&addr.into())?;
84 (Arc::new(UdpSocket::from_std(sock.into())?), None)
85 },
86 NetMode::Ipv6Only => {
87 let sock = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
88 #[cfg(not(windows))]
89 { let _ = sock.set_reuse_port(true); }
90 let _ = sock.set_reuse_address(true);
91 #[cfg(not(windows))]
93 { let _ = sock.set_only_v6(true); }
94 sock.set_nonblocking(true)?;
95
96 let _ = sock.set_recv_buffer_size(32 * 1024 * 1024);
97 let _ = sock.set_send_buffer_size(8 * 1024 * 1024);
98
99 let addr: SocketAddr = format!("[::]:{}", options.port).parse().unwrap();
100 sock.bind(&addr.into())?;
101 (Arc::new(UdpSocket::from_std(sock.into())?), None)
102 },
103 NetMode::DualStack => {
104 let sock_v4 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
106 #[cfg(not(windows))]
107 { let _ = sock_v4.set_reuse_port(true); }
108 let _ = sock_v4.set_reuse_address(true);
109 sock_v4.set_nonblocking(true)?;
110 let _ = sock_v4.set_recv_buffer_size(32 * 1024 * 1024);
111 let _ = sock_v4.set_send_buffer_size(8 * 1024 * 1024);
112 let addr_v4: SocketAddr = format!("0.0.0.0:{}", options.port).parse().unwrap();
113 sock_v4.bind(&addr_v4.into())?;
114 let socket = Arc::new(UdpSocket::from_std(sock_v4.into())?);
115
116 let sock_v6 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
118 #[cfg(not(windows))]
119 { let _ = sock_v6.set_reuse_port(true); }
120 let _ = sock_v6.set_reuse_address(true);
121 #[cfg(not(windows))]
122 { let _ = sock_v6.set_only_v6(true); } sock_v6.set_nonblocking(true)?;
124 let _ = sock_v6.set_recv_buffer_size(32 * 1024 * 1024);
125 let _ = sock_v6.set_send_buffer_size(8 * 1024 * 1024);
126 let addr_v6: SocketAddr = format!("[::]:{}", options.port).parse().unwrap();
127 sock_v6.bind(&addr_v6.into())?;
128 let socket_v6 = Some(Arc::new(UdpSocket::from_std(sock_v6.into())?));
129
130 (socket, socket_v6)
131 },
132 };
133
134 let node_id = generate_random_id();
135 let mut rng = rand::thread_rng();
136 let token_secret: Vec<u8> = (0..10).map(|_| rng.gen()).collect();
137
138 let node_queue = ShardedNodeQueue::new(options.node_queue_capacity);
141
142 let (hash_tx, hash_rx) = mpsc::channel::<HashDiscovered>(10000);
146
147 let fetcher = Arc::new(RbitFetcher::new(options.metadata_timeout));
148
149 let callback = Arc::new(RwLock::new(None));
151 let on_metadata_fetch = Arc::new(RwLock::new(None));
152
153 let metadata_queue_len = Arc::new(AtomicUsize::new(0));
155
156 let scheduler = MetadataScheduler::new(
157 hash_rx,
158 fetcher,
159 options.max_metadata_queue_size,
160 options.max_metadata_worker_count,
161 callback.clone(),
162 on_metadata_fetch.clone(),
163 metadata_queue_len.clone(),
164 );
165
166 tokio::spawn(async move {
168 scheduler.run().await;
169 });
170
171 let server = Self {
172 options: options.clone(),
173 node_id: node_id.clone(),
174 socket,
175 socket_v6,
176 token_secret,
177 callback,
178 on_metadata_fetch,
179 node_queue: Arc::new(node_queue),
180 filter: Arc::new(RwLock::new(None)),
181 hash_tx,
182 metadata_queue_len,
183 max_metadata_queue_size: options.max_metadata_queue_size,
184 };
185
186 Ok(server)
187 }
188
189 pub fn local_addr(&self) -> Result<SocketAddr> {
190 Ok(self.socket.local_addr()?)
191 }
192
193 fn is_addr_allowed(&self, addr: &SocketAddr) -> bool {
199 match self.options.netmode {
200 NetMode::Ipv4Only => addr.is_ipv4(),
201 NetMode::Ipv6Only => addr.is_ipv6(),
202 NetMode::DualStack => true, }
204 }
205
206 fn select_socket(&self, addr: &SocketAddr) -> &Arc<UdpSocket> {
208 match self.options.netmode {
209 NetMode::Ipv4Only => {
210 &self.socket
212 },
213 NetMode::Ipv6Only => {
214 &self.socket
216 },
217 NetMode::DualStack => {
218 if addr.is_ipv6() {
220 self.socket_v6.as_ref().unwrap_or(&self.socket)
221 } else {
222 &self.socket
223 }
224 },
225 }
226 }
227
228 pub fn on_metadata_fetch<F, Fut>(&self, callback: F)
248 where
249 F: Fn(String) -> Fut + Send + Sync + 'static,
250 Fut: Future<Output = bool> + Send + 'static,
251 {
252 *self.on_metadata_fetch.write().unwrap() = Some(Arc::new(move |hash| {
253 Box::pin(callback(hash))
254 }));
255 }
256
257 pub fn on_torrent<F>(&self, callback: F) where F: Fn(TorrentInfo) + Send + Sync + 'static {
279 *self.callback.write().unwrap() = Some(Arc::new(callback));
280 }
281
282 pub fn set_filter<F>(&self, filter: F) where F: Fn(&str) -> bool + Send + Sync + 'static {
293 *self.filter.write().unwrap() = Some(Arc::new(filter));
294 }
295
296
297 pub fn get_node_pool_size(&self) -> usize {
298 self.node_queue.len()
299 }
300
301 pub async fn start(&self) -> Result<()> {
302
303 self.start_receiver();
304 self.bootstrap().await;
305
306 let server = self.clone();
307
308 tokio::spawn(async move {
309 let semaphore = Arc::new(Semaphore::new(2000));
310 let mut loop_tick = 0;
311
312 loop {
313 let queue_len = server.metadata_queue_len.load(Ordering::Relaxed);
315 let queue_pressure = queue_len as f64 / server.max_metadata_queue_size as f64;
316
317 let (batch_size, sleep_duration) = if queue_pressure < 0.8 {
319 (200, Duration::from_millis(10))
321 } else if queue_pressure < 0.95 {
322 (20, Duration::from_millis(500))
324 } else {
325 (0, Duration::from_millis(1000))
327 };
328
329 let filter_ipv6 = match server.options.netmode {
331 NetMode::Ipv4Only => Some(false),
332 NetMode::Ipv6Only => Some(true),
333 NetMode::DualStack => None,
334 };
335
336 let queue_empty = server.node_queue.is_empty_for(filter_ipv6);
338
339 let nodes_batch = {
340 if queue_empty || batch_size == 0 {
341 None
342 } else {
343 Some(server.node_queue.pop_batch(batch_size, filter_ipv6))
344 }
345 };
346
347 loop_tick += 1;
348 if nodes_batch.is_none() || loop_tick % 50 == 0 {
349 server.bootstrap().await;
350 if nodes_batch.is_none() {
351 tokio::time::sleep(sleep_duration).await;
352 continue;
353 }
354 }
355
356 if let Some(nodes) = nodes_batch {
357 for node in nodes {
358 let permit = semaphore.clone().acquire_owned().await.unwrap();
359 let server_clone = server.clone();
360 tokio::spawn(async move {
361 let neighbor_id = generate_neighbor_target(&node.id, &server_clone.node_id);
362 let random_target = generate_random_id();
363 let _ = server_clone.send_find_node(node.addr, &random_target, &neighbor_id).await;
364 drop(permit);
365 });
366 }
367 }
368
369 tokio::time::sleep(sleep_duration).await;
370 }
371 });
372
373 std::future::pending::<()>().await;
374 Ok(())
375 }
376
377 fn start_receiver(&self) {
378 let socket = self.socket.clone();
379 let socket_v6 = self.socket_v6.clone();
380 let server = self.clone();
381
382 let num_workers = std::thread::available_parallelism()
383 .map(|n| n.get())
384 .unwrap_or(8);
385
386 let queue_size = 5000;
387
388 let mut senders = Vec::with_capacity(num_workers);
389 for _ in 0..num_workers {
390 let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(queue_size);
391 senders.push(tx);
392
393 let server_clone = server.clone();
394
395 tokio::spawn(async move {
396 while let Some((data, addr)) = rx.recv().await {
397 let _ = server_clone.handle_message(&data, addr).await;
398 }
399 });
400 }
401
402 let senders_for_v6 = senders.clone();
403 tokio::spawn(async move {
404 let mut buf = [0u8; 65536];
405 let mut next_worker_idx = 0;
406
407 loop {
408 match socket.recv_from(&mut buf).await {
409 Ok((size, addr)) => {
410 if size > 8192 {
412 log::trace!("⚠️ 拒绝异常大的 UDP 包: {} 字节 from {}", size, addr);
413 continue;
414 }
415
416 if size == 0 || buf[0] != b'd' {
419 continue;
420 }
421
422 let data = buf[..size].to_vec();
423
424 let tx = &senders[next_worker_idx];
425 next_worker_idx = (next_worker_idx + 1) % num_workers;
426
427 match tx.try_send((data, addr)) {
428 Ok(_) => {},
429 Err(mpsc::error::TrySendError::Full(_)) => {
430 #[cfg(debug_assertions)]
431 log::trace!("UDP worker queue full, dropping packet");
432 },
433 Err(_) => { break; }
434 }
435 }
436 Err(_e) => {
437 tokio::time::sleep(Duration::from_millis(1)).await;
438 }
439 }
440 }
441 });
442
443 if let Some(socket_v6) = socket_v6 {
445 let senders_v6 = senders_for_v6;
446 tokio::spawn(async move {
447 let mut buf = [0u8; 65536];
448 let mut next_worker_idx = 0;
449
450 loop {
451 match socket_v6.recv_from(&mut buf).await {
452 Ok((size, addr)) => {
453 if size > 8192 { continue; }
454 if size == 0 || buf[0] != b'd' { continue; }
455
456 let data = buf[..size].to_vec();
457
458 let tx = &senders_v6[next_worker_idx];
459 next_worker_idx = (next_worker_idx + 1) % num_workers;
460
461 match tx.try_send((data, addr)) {
462 Ok(_) => {},
463 Err(mpsc::error::TrySendError::Full(_)) => {
464 #[cfg(debug_assertions)]
465 log::trace!("UDP worker queue full, dropping packet");
466 },
467 Err(_) => { break; }
468 }
469 }
470 Err(_e) => {
471 tokio::time::sleep(Duration::from_millis(1)).await;
472 }
473 }
474 }
475 });
476 }
477 }
478
479 async fn handle_message(&self, data: &[u8], addr: SocketAddr) -> Result<()> {
480 if !self.is_addr_allowed(&addr) {
484 log::trace!("⚠️ 拒绝不匹配的地址类型: {} (当前模式: {:?})", addr, self.options.netmode);
485 return Ok(());
486 }
487
488 let msg: DhtMessage = match serde_bencode::from_bytes(data) {
489 Ok(m) => m,
490 Err(_) => return Ok(()),
491 };
492
493 match msg.y.as_str() {
494 "q" => {
495 if let Some(q_type) = &msg.q {
496 self.handle_query(&msg, q_type.as_bytes(), addr).await?;
497 }
498 }
499 "r" => {
500 if let Some(response) = &msg.r {
501 self.handle_response(response).await?;
502 }
503 }
504 _ => {}
505 }
506 Ok(())
507 }
508
509 async fn handle_query(&self, msg: &DhtMessage, query_type: &[u8], addr: SocketAddr) -> Result<()> {
510 let args = match &msg.a {
511 Some(a) => a,
512 None => return Ok(()),
513 };
514
515 let transaction_id = &msg.t;
516 let sender_id: Option<&[u8]> = args.id.as_deref().map(|v| v.as_slice());
517 let target_id_fallback: Option<&[u8]> = args.target.as_deref()
518 .or(args.info_hash.as_deref())
519 .map(|v| v.as_slice());
520
521 let q_str = std::str::from_utf8(query_type).unwrap_or("");
522
523 if q_str == "announce_peer" {
524 self.handle_announce_peer(args, addr).await?;
525 }
526
527 self.send_response(transaction_id, addr, q_str, sender_id, target_id_fallback).await?;
528 Ok(())
529 }
530
531 async fn handle_announce_peer(&self, args: &DhtArgs, addr: SocketAddr) -> Result<()> {
532 if let Some(token) = &args.token {
533 if !self.validate_token(token, addr) { return Ok(()); }
534 } else {
535 return Ok(());
536 }
537
538 if let Some(info_hash) = &args.info_hash {
539 let info_hash_arr: [u8; 20] = match info_hash.as_ref().try_into() {
540 Ok(arr) => arr, Err(_) => return Ok(()),
541 };
542 let hash_hex = hex::encode(info_hash_arr);
543
544 let filter_cb = self.filter.read().unwrap().clone();
545 if let Some(f) = filter_cb {
546 if !f(&hash_hex) { return Ok(()); }
547 }
548
549 #[cfg(debug_assertions)]
550 log::debug!("🔥 新 Hash: {} 来自 {}", hash_hex, addr);
551
552 let port = if let Some(implied) = args.implied_port {
554 if implied != 0 { addr.port() } else { args.port.unwrap_or(0) }
555 } else {
556 args.port.unwrap_or(addr.port())
557 };
558
559 if port > 0 {
560 let event = HashDiscovered {
561 info_hash: hash_hex,
562 peer_addr: SocketAddr::new(addr.ip(), port),
563 discovered_at: std::time::Instant::now(),
564 };
565
566 if let Err(_) = self.hash_tx.try_send(event) {
568 #[cfg(debug_assertions)]
569 log::debug!("⚠️ Hash 队列满,丢弃 hash");
570 }
571 }
572 }
573 Ok(())
574 }
575
576 async fn handle_response(&self, response: &DhtResponse) -> Result<()> {
577 if let Some(nodes_bytes) = &response.nodes {
579 self.process_compact_nodes(nodes_bytes);
580 }
581 if let Some(nodes6_bytes) = &response.nodes6 {
583 self.process_compact_nodes_v6(nodes6_bytes);
584 }
585 Ok(())
586 }
587
588 fn process_compact_nodes(&self, nodes_bytes: &[u8]) {
589 if self.options.netmode == NetMode::Ipv6Only {
591 return;
592 }
593
594 if nodes_bytes.len() % 26 != 0 { return; }
595
596 for chunk in nodes_bytes.chunks(26) {
598 let id = chunk[0..20].to_vec();
599 let port = u16::from_be_bytes([chunk[24], chunk[25]]);
600
601 let ip = std::net::Ipv4Addr::new(chunk[20], chunk[21], chunk[22], chunk[23]);
602 let addr = SocketAddr::new(std::net::IpAddr::V4(ip), port);
603
604 self.node_queue.push(NodeTuple { id, addr });
605 }
606 }
607
608 fn process_compact_nodes_v6(&self, nodes_bytes: &[u8]) {
609 if self.options.netmode == NetMode::Ipv4Only {
611 return;
612 }
613
614 if nodes_bytes.len() % 38 != 0 { return; }
615 for chunk in nodes_bytes.chunks(38) {
616 let id = chunk[0..20].to_vec();
617 let port = u16::from_be_bytes([chunk[36], chunk[37]]);
618 let ip_bytes: [u8; 16] = match chunk[20..36].try_into() {
619 Ok(b) => b,
620 Err(_) => continue, };
622 let ip = Ipv6Addr::from(ip_bytes);
623 if !ip.is_unspecified() && !ip.is_multicast() {
625 let addr = SocketAddr::new(IpAddr::V6(ip), port);
626 self.node_queue.push(NodeTuple { id, addr });
627 }
628 }
629 }
630
631 async fn send_response(
632 &self,
633 tid: &[u8],
634 addr: SocketAddr,
635 query_type: &str,
636 sender_id: Option<&[u8]>,
637 target_id_fallback: Option<&[u8]>,
638 ) -> Result<()> {
639 let mut r_dict = std::collections::HashMap::new();
640
641 let reference_id = sender_id.or(target_id_fallback);
642 let my_id = if let Some(target) = reference_id {
643 generate_neighbor_target(target, &self.node_id)
644 } else {
645 self.node_id.clone()
646 };
647
648 r_dict.insert(b"id".to_vec(), serde_bencode::value::Value::Bytes(my_id));
649 let token = self.generate_token(addr);
650 r_dict.insert(b"token".to_vec(), serde_bencode::value::Value::Bytes(token));
651
652 if query_type == "get_peers" || query_type == "find_node" {
653 let requestor_is_ipv6 = addr.is_ipv6();
655 let filter_ipv6 = match self.options.netmode {
656 NetMode::Ipv4Only => Some(false), NetMode::Ipv6Only => Some(true), NetMode::DualStack => Some(requestor_is_ipv6), };
660
661 let nodes = self.node_queue.get_random_nodes(8, filter_ipv6);
663
664 let mut nodes_data = Vec::new();
665 let mut nodes6_data = Vec::new();
666
667 for node in nodes {
668 match node.addr.ip() {
669 IpAddr::V4(ip) => {
671 nodes_data.extend_from_slice(&node.id);
672 nodes_data.extend_from_slice(&ip.octets());
673 nodes_data.extend_from_slice(&node.addr.port().to_be_bytes());
674 },
675 IpAddr::V6(ip) => {
677 nodes6_data.extend_from_slice(&node.id);
678 nodes6_data.extend_from_slice(&ip.octets());
679 nodes6_data.extend_from_slice(&node.addr.port().to_be_bytes());
680 },
681 }
682 }
683
684 if requestor_is_ipv6 {
687 if !nodes6_data.is_empty() {
689 r_dict.insert(b"nodes6".to_vec(), serde_bencode::value::Value::Bytes(nodes6_data));
690 }
691 } else {
692 if !nodes_data.is_empty() {
694 r_dict.insert(b"nodes".to_vec(), serde_bencode::value::Value::Bytes(nodes_data));
695 }
696 }
697 }
698
699 let mut response: std::collections::HashMap<String, serde_bencode::value::Value> = std::collections::HashMap::new();
700 response.insert("t".to_string(), serde_bencode::value::Value::Bytes(tid.to_vec()));
701 response.insert("y".to_string(), serde_bencode::value::Value::Bytes(b"r".to_vec()));
702 response.insert("r".to_string(), serde_bencode::value::Value::Dict(r_dict));
703
704 if let Ok(encoded) = serde_bencode::to_bytes(&response) {
705 let _ = self.select_socket(&addr).send_to(&encoded, addr).await;
706 }
707 Ok(())
708 }
709
710 async fn bootstrap(&self) {
711 let target = generate_random_id();
712 for node in BOOTSTRAP_NODES {
713 match tokio::net::lookup_host(node).await {
714 Ok(addrs) => {
715 for addr in addrs {
716 match self.options.netmode {
718 NetMode::Ipv4Only => {
719 if addr.is_ipv6() { continue; }
720 },
721 NetMode::Ipv6Only => {
722 if addr.is_ipv4() { continue; }
723 },
724 NetMode::DualStack => {
725 },
727 }
728 let _ = self.send_find_node(addr, &target, &self.node_id).await;
729 }
730 }
731 Err(_) => {}
732 }
733 }
734 }
735
736 async fn send_find_node(&self, addr: SocketAddr, target: &[u8], sender_id: &[u8]) -> Result<()> {
737 let mut args = std::collections::HashMap::new();
738 args.insert(b"id".to_vec(), serde_bencode::value::Value::Bytes(sender_id.to_vec()));
739 args.insert(b"target".to_vec(), serde_bencode::value::Value::Bytes(target.to_vec()));
740
741 let mut msg: std::collections::HashMap<String, serde_bencode::value::Value> = std::collections::HashMap::new();
742 msg.insert("t".to_string(), serde_bencode::value::Value::Bytes(vec![0, 1]));
743 msg.insert("y".to_string(), serde_bencode::value::Value::Bytes(b"q".to_vec()));
744 msg.insert("q".to_string(), serde_bencode::value::Value::Bytes(b"find_node".to_vec()));
745 msg.insert("a".to_string(), serde_bencode::value::Value::Dict(args));
746
747 if let Ok(encoded) = serde_bencode::to_bytes(&msg) {
748 let _ = self.select_socket(&addr).send_to(&encoded, addr).await;
749 }
750 Ok(())
751 }
752
753 fn generate_token(&self, addr: SocketAddr) -> Vec<u8> {
754
755 let mut hasher = AHasher::default();
756
757 match addr.ip() {
759 IpAddr::V4(ip) => ip.octets().hash(&mut hasher),
760 IpAddr::V6(ip) => ip.octets().hash(&mut hasher),
761 }
762
763 self.token_secret.hash(&mut hasher);
765
766 let hash = hasher.finish();
768 hash.to_le_bytes().to_vec()
769 }
770
771 fn validate_token(&self, token: &[u8], addr: SocketAddr) -> bool {
772 if token.len() != 8 {
773 return false;
774 }
775 let expected = self.generate_token(addr);
776 token == expected.as_slice()
777 }
778}
779
780fn generate_random_id() -> Vec<u8> {
781 let mut rng = rand::thread_rng();
782 (0..20).map(|_| rng.gen()).collect()
783}
784
785fn generate_neighbor_target(remote_id: &[u8], local_id: &[u8]) -> Vec<u8> {
786 let mut id = Vec::with_capacity(20);
787 let prefix_len = std::cmp::min(remote_id.len(), 6);
788 id.extend_from_slice(&remote_id[..prefix_len]);
789 if local_id.len() > prefix_len {
790 id.extend_from_slice(&local_id[prefix_len..]);
791 } else {
792 while id.len() < 20 {
793 id.push(rand::random());
794 }
795 }
796 id
797}