1use futures_timer::Delay;
2
3use libp2p::{
4 Multiaddr, PeerId, StreamProtocol,
5 core::Endpoint,
6 futures::FutureExt,
7 kad::{
8 Behaviour as Kademlia, BootstrapError, Config as KademliaConfig,
9 Event as KademliaEvent, GetClosestPeersError, GetClosestPeersOk,
10 K_VALUE, PeerInfo, QueryResult,
11 store::{MemoryStore, MemoryStoreConfig},
12 },
13 swarm::{
14 CloseConnection, ConnectionDenied, ConnectionId, FromSwarm,
15 NetworkBehaviour, THandler, ToSwarm,
16 },
17};
18use serde::{Deserialize, Serialize};
19
20use std::{
21 collections::{HashMap, VecDeque},
22 task::{Poll, Waker},
23 time::Duration,
24};
25
26use crate::{NodeType, utils::LimitsConfig};
27
28#[cfg(not(any(test, feature = "test")))]
29use crate::utils::{is_dns, is_global, is_loop_back, is_private, is_tcp};
30
31pub struct Behaviour {
33 pre_routing: bool,
35
36 kademlia: Kademlia<MemoryStore>,
38
39 waker: Option<Waker>,
40
41 next_random_walk: Option<Delay>,
43
44 duration_to_next_kad: Duration,
46
47 num_connections: u64,
49
50 discovery_only_if_under_num: u64,
52
53 allow_private_address_in_dht: bool,
54
55 allow_dns_address_in_dht: bool,
56
57 allow_loop_back_address_in_dht: bool,
58
59 close_connections: VecDeque<(PeerId, Option<ConnectionId>)>,
61
62 peer_to_remove: HashMap<PeerId, u8>,
63}
64
65impl Behaviour {
66 pub fn new(
68 peer_id: PeerId,
69 config: Config,
70 protocol: StreamProtocol,
71 node_type: NodeType,
72 limits: LimitsConfig,
73 ) -> Self {
74 let Config {
75 dht_random_walk,
76 discovery_only_if_under_num,
77 allow_dns_address_in_dht,
78 allow_private_address_in_dht,
79 allow_loop_back_address_in_dht,
80 kademlia_disjoint_query_paths,
81 } = config;
82
83 let mut kad_config = KademliaConfig::new(protocol);
84
85 kad_config.disjoint_query_paths(kademlia_disjoint_query_paths);
86 kad_config.set_query_timeout(Duration::from_secs(
87 limits.kademlia_query_timeout,
88 ));
89 kad_config.set_replication_interval(None);
90 kad_config.set_caching(libp2p::kad::Caching::Disabled);
91
92 kad_config.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual);
96 kad_config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
97
98 let store = MemoryStore::with_config(
99 peer_id,
100 MemoryStoreConfig {
101 max_records: 0,
102 max_value_bytes: 0,
103 max_providers_per_key: K_VALUE.get(),
104 max_provided_keys: 0,
105 },
106 );
107 let mut kad = Kademlia::with_config(peer_id, store, kad_config);
108
109 if let NodeType::Addressable | NodeType::Bootstrap = node_type {
110 kad.set_mode(Some(libp2p::kad::Mode::Server));
111 } else {
112 kad.set_mode(Some(libp2p::kad::Mode::Client));
113 }
114
115 Self {
116 kademlia: kad,
117 next_random_walk: if dht_random_walk
118 && node_type == NodeType::Bootstrap
119 {
120 Some(Delay::new(Duration::new(0, 0)))
121 } else {
122 None
123 },
124 duration_to_next_kad: Duration::from_secs(1),
125 num_connections: 0,
126 discovery_only_if_under_num,
127 allow_dns_address_in_dht,
128 allow_private_address_in_dht,
129 allow_loop_back_address_in_dht,
130 close_connections: VecDeque::new(),
131 pre_routing: true,
132 waker: None,
133 peer_to_remove: HashMap::new(),
134 }
135 }
136
137 pub fn new_close_connections(
138 &mut self,
139 peer_id: PeerId,
140 connection_id: Option<ConnectionId>,
141 ) {
142 self.close_connections.push_back((peer_id, connection_id));
143
144 if let Some(waker) = self.waker.take() {
145 waker.wake();
146 }
147 }
148
149 pub const fn finish_prerouting_state(&mut self) {
150 self.pre_routing = false;
151 }
152
153 pub fn is_known_peer(&mut self, peer_id: &PeerId) -> bool {
155 for b in self.kademlia.kbuckets() {
156 if b.iter().any(|x| peer_id == x.node.key.preimage()) {
157 return true;
158 }
159 }
160
161 false
162 }
163
164 pub fn add_peer_to_remove(&mut self, peer_id: &PeerId) {
165 let count = self
166 .peer_to_remove
167 .entry(*peer_id)
168 .and_modify(|x| *x += 1)
169 .or_insert(1);
170
171 if *count >= 3 {
172 self.remove_node(peer_id);
173 self.clean_peer_to_remove(peer_id);
174 }
175 }
176
177 pub fn clean_peer_to_remove(&mut self, peer_id: &PeerId) {
178 self.peer_to_remove.remove(peer_id);
179 }
180
181 pub fn add_self_reported_address(
187 &mut self,
188 peer_id: &PeerId,
189 addr: &Multiaddr,
190 ) -> bool {
191 if self.is_invalid_address(addr) {
192 return false;
193 }
194
195 self.kademlia.add_address(peer_id, addr.clone());
196 true
197 }
198
199 pub fn is_invalid_address(&self, addr: &Multiaddr) -> bool {
200 #[cfg(not(any(test, feature = "test")))]
201 {
202 if !is_tcp(addr) {
204 return true;
205 }
206
207 if is_private(addr) {
208 return !self.allow_private_address_in_dht;
209 }
210
211 if is_loop_back(addr) {
212 return !self.allow_loop_back_address_in_dht;
213 }
214
215 if is_dns(addr) {
216 return !self.allow_dns_address_in_dht;
217 }
218
219 !is_global(addr)
220 }
221
222 #[cfg(any(test, feature = "test"))]
223 return false;
224 }
225
226 pub fn discover(&mut self, peer_id: &PeerId) {
228 self.kademlia.get_closest_peers(*peer_id);
229 }
230
231 pub fn remove_node(&mut self, peer_id: &PeerId) {
233 self.kademlia.remove_peer(peer_id);
234 }
235}
236
237#[derive(Debug)]
239pub enum Event {
240 ClosestPeer {
242 peer_id: PeerId,
243 info: Option<PeerInfo>,
244 },
245}
246
247impl NetworkBehaviour for Behaviour {
248 type ConnectionHandler =
249 <Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler;
250 type ToSwarm = Event;
251
252 fn handle_established_inbound_connection(
253 &mut self,
254 connection_id: ConnectionId,
255 peer: PeerId,
256 local_addr: &Multiaddr,
257 remote_addr: &Multiaddr,
258 ) -> Result<THandler<Self>, ConnectionDenied> {
259 self.kademlia.handle_established_inbound_connection(
260 connection_id,
261 peer,
262 local_addr,
263 remote_addr,
264 )
265 }
266
267 fn handle_established_outbound_connection(
268 &mut self,
269 connection_id: ConnectionId,
270 peer: PeerId,
271 addr: &Multiaddr,
272 role_override: Endpoint,
273 port_use: libp2p::core::transport::PortUse,
274 ) -> Result<THandler<Self>, ConnectionDenied> {
275 self.kademlia.handle_established_outbound_connection(
276 connection_id,
277 peer,
278 addr,
279 role_override,
280 port_use,
281 )
282 }
283
284 fn on_swarm_event(&mut self, event: FromSwarm) {
285 match event {
286 FromSwarm::AddressChange(..)
287 | FromSwarm::DialFailure(..)
288 | FromSwarm::ExpiredListenAddr(..)
289 | FromSwarm::ExternalAddrConfirmed(..)
290 | FromSwarm::ExternalAddrExpired(..)
291 | FromSwarm::ListenerClosed(..)
292 | FromSwarm::ListenFailure(..)
293 | FromSwarm::ListenerError(..)
294 | FromSwarm::NewListener(..)
295 | FromSwarm::NewListenAddr(..)
296 | FromSwarm::NewExternalAddrCandidate(..)
297 | FromSwarm::NewExternalAddrOfPeer(..) => {
298 self.kademlia.on_swarm_event(event);
299 }
300 FromSwarm::ConnectionEstablished(e) => {
301 self.num_connections += 1;
302 self.kademlia
303 .on_swarm_event(FromSwarm::ConnectionEstablished(e));
304 }
305 FromSwarm::ConnectionClosed(e) => {
306 self.num_connections = self.num_connections.saturating_sub(1);
307 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
308 }
309 _ => self.kademlia.on_swarm_event(event),
310 }
311 }
312
313 fn on_connection_handler_event(
314 &mut self,
315 peer_id: PeerId,
316 connection_id: libp2p::swarm::ConnectionId,
317 event: libp2p::swarm::THandlerOutEvent<Self>,
318 ) {
319 self.kademlia.on_connection_handler_event(
320 peer_id,
321 connection_id,
322 event,
323 );
324 }
325
326 fn poll(
327 &mut self,
328 cx: &mut std::task::Context<'_>,
329 ) -> std::task::Poll<
330 libp2p::swarm::ToSwarm<
331 Self::ToSwarm,
332 libp2p::swarm::THandlerInEvent<Self>,
333 >,
334 > {
335 if let Some((peer_id, connection_id)) =
336 self.close_connections.pop_front()
337 {
338 if let Some(connection_id) = connection_id {
339 return Poll::Ready(ToSwarm::CloseConnection {
340 peer_id,
341 connection: CloseConnection::One(connection_id),
342 });
343 } else {
344 return Poll::Ready(ToSwarm::CloseConnection {
345 peer_id,
346 connection: CloseConnection::All,
347 });
348 }
349 }
350
351 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
352 match ev {
353 ToSwarm::GenerateEvent(ev) => {
354 match ev {
355 KademliaEvent::RoutablePeer { peer, address } => {
356 self.add_self_reported_address(&peer, &address);
357 }
358 KademliaEvent::ModeChanged { .. }
359 | KademliaEvent::InboundRequest { .. }
360 | KademliaEvent::UnroutablePeer { .. }
361 | KademliaEvent::PendingRoutablePeer { .. }
362 | KademliaEvent::RoutingUpdated { .. } => {
363 }
365 KademliaEvent::OutboundQueryProgressed {
366 result,
367 ..
368 } => {
369 match result {
370 QueryResult::Bootstrap(bootstrap_ok) => {
371 match bootstrap_ok {
372 Ok(ok) => {
373 self.clean_peer_to_remove(&ok.peer);
374 }
375 Err(e) => {
376 let BootstrapError::Timeout {
377 peer,
378 ..
379 } = e;
380 self.add_peer_to_remove(&peer);
381 }
382 };
383 }
384 QueryResult::GetClosestPeers(
385 get_closest_peers_ok,
386 ) => {
387 match get_closest_peers_ok {
389 Ok(GetClosestPeersOk {
390 key,
391 peers,
392 }) => {
393 if let Ok(peer_id) =
394 PeerId::from_bytes(&key)
395 {
396 if let Some(info) =
397 peers.iter().find(|x| {
398 x.peer_id == peer_id
399 })
400 {
401 return Poll::Ready(
402 ToSwarm::GenerateEvent(
403 Event::ClosestPeer {
404 peer_id,
405 info: Some(
406 info.clone(),
407 ),
408 },
409 ),
410 );
411 } else {
412 return Poll::Ready(
413 ToSwarm::GenerateEvent(
414 Event::ClosestPeer {
415 peer_id,
416 info: None,
417 },
418 ),
419 );
420 }
421 };
422 }
423 Err(
424 GetClosestPeersError::Timeout {
425 key,
426 ..
427 },
428 ) => {
429 if let Ok(peer_id) =
430 PeerId::from_bytes(&key)
431 {
432 return Poll::Ready(
433 ToSwarm::GenerateEvent(
434 Event::ClosestPeer {
435 peer_id,
436 info: None,
437 },
438 ),
439 );
440 };
441 }
442 }
443 }
444 QueryResult::GetProviders(..)
445 | QueryResult::StartProviding(..)
446 | QueryResult::RepublishProvider(..)
447 | QueryResult::GetRecord(..)
448 | QueryResult::PutRecord(..)
449 | QueryResult::RepublishRecord(..) => {
450 }
452 };
453 }
454 }
455 }
456 ToSwarm::Dial { opts } => {
457 return Poll::Ready(ToSwarm::Dial { opts });
458 }
459 ToSwarm::NotifyHandler {
460 peer_id,
461 handler,
462 event,
463 } => {
464 return Poll::Ready(ToSwarm::NotifyHandler {
465 peer_id,
466 handler,
467 event,
468 });
469 }
470 ToSwarm::CloseConnection {
471 peer_id,
472 connection,
473 } => {
474 return Poll::Ready(ToSwarm::CloseConnection {
475 peer_id,
476 connection,
477 });
478 }
479 ToSwarm::ExternalAddrConfirmed(e) => {
480 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(e));
481 }
482 ToSwarm::ExternalAddrExpired(e) => {
483 return Poll::Ready(ToSwarm::ExternalAddrExpired(e));
484 }
485 ToSwarm::ListenOn { opts } => {
486 return Poll::Ready(ToSwarm::ListenOn { opts });
487 }
488 ToSwarm::NewExternalAddrCandidate(e) => {
489 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(e));
490 }
491 ToSwarm::RemoveListener { id } => {
492 return Poll::Ready(ToSwarm::RemoveListener { id });
493 }
494 _ => {}
495 }
496 }
497
498 if !self.pre_routing
500 && let Some(next) = self.next_random_walk.as_mut()
501 && next.poll_unpin(cx).is_ready()
502 {
503 if self.num_connections < self.discovery_only_if_under_num {
504 self.kademlia.get_closest_peers(PeerId::random());
505 }
506
507 *next = Delay::new(self.duration_to_next_kad);
508 self.duration_to_next_kad = std::cmp::min(
509 self.duration_to_next_kad * 2,
510 Duration::from_secs(120),
511 );
512 }
513
514 self.waker = Some(cx.waker().clone());
515 Poll::Pending
516 }
517
518 fn handle_pending_inbound_connection(
519 &mut self,
520 connection_id: libp2p::swarm::ConnectionId,
521 local_addr: &Multiaddr,
522 remote_addr: &Multiaddr,
523 ) -> Result<(), libp2p::swarm::ConnectionDenied> {
524 self.kademlia.handle_pending_inbound_connection(
525 connection_id,
526 local_addr,
527 remote_addr,
528 )
529 }
530
531 fn handle_pending_outbound_connection(
532 &mut self,
533 connection_id: libp2p::swarm::ConnectionId,
534 maybe_peer: Option<PeerId>,
535 addresses: &[Multiaddr],
536 effective_role: libp2p::core::Endpoint,
537 ) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
538 let addresses = self.kademlia.handle_pending_outbound_connection(
539 connection_id,
540 maybe_peer,
541 addresses,
542 effective_role,
543 )?;
544
545 let filter_addresses = addresses
546 .iter()
547 .filter(|x| !self.is_invalid_address(x))
548 .cloned()
549 .collect::<Vec<Multiaddr>>();
550
551 if filter_addresses.len() != addresses.len() {
552 if let Some(peer_id) = maybe_peer {
553 self.kademlia.remove_peer(&peer_id);
554 for addr in filter_addresses.iter() {
555 self.kademlia.add_address(&peer_id, addr.clone());
556 }
557 }
558 } else if filter_addresses.is_empty()
559 && let Some(peer_id) = maybe_peer
560 {
561 self.peer_to_remove.remove(&peer_id);
562 self.kademlia.remove_peer(&peer_id);
563 }
564
565 Ok(filter_addresses)
566 }
567}
568
569#[derive(Clone, Debug, Deserialize, Serialize)]
571#[serde(default)]
572pub struct Config {
573 dht_random_walk: bool,
575
576 discovery_only_if_under_num: u64,
578
579 allow_private_address_in_dht: bool,
580
581 allow_dns_address_in_dht: bool,
582
583 allow_loop_back_address_in_dht: bool,
584
585 kademlia_disjoint_query_paths: bool,
587}
588
589impl Default for Config {
590 fn default() -> Self {
591 Self {
592 dht_random_walk: true,
593 discovery_only_if_under_num: 25,
594 allow_private_address_in_dht: Default::default(),
595 allow_dns_address_in_dht: Default::default(),
596 allow_loop_back_address_in_dht: Default::default(),
597 kademlia_disjoint_query_paths: true,
598 }
599 }
600}
601
602impl Config {
603 pub const fn get_dht_random_walk(&self) -> bool {
605 self.dht_random_walk
606 }
607
608 pub const fn with_dht_random_walk(mut self, enable: bool) -> Self {
610 self.dht_random_walk = enable;
611 self
612 }
613
614 pub const fn get_discovery_limit(&self) -> u64 {
616 self.discovery_only_if_under_num
617 }
618
619 pub const fn with_discovery_limit(mut self, num: u64) -> Self {
621 self.discovery_only_if_under_num = num;
622 self
623 }
624
625 pub const fn get_allow_private_address_in_dht(&self) -> bool {
627 self.allow_private_address_in_dht
628 }
629
630 pub const fn with_allow_private_address_in_dht(
632 mut self,
633 allow: bool,
634 ) -> Self {
635 self.allow_private_address_in_dht = allow;
636 self
637 }
638
639 pub const fn get_allow_dns_address_in_dht(&self) -> bool {
641 self.allow_dns_address_in_dht
642 }
643
644 pub const fn with_allow_dns_address_in_dht(mut self, allow: bool) -> Self {
646 self.allow_dns_address_in_dht = allow;
647 self
648 }
649
650 pub const fn get_allow_loop_back_address_in_dht(&self) -> bool {
652 self.allow_loop_back_address_in_dht
653 }
654
655 pub const fn with_allow_loop_back_address_in_dht(
657 mut self,
658 allow: bool,
659 ) -> Self {
660 self.allow_loop_back_address_in_dht = allow;
661 self
662 }
663
664 pub const fn get_kademlia_disjoint_query_paths(&self) -> bool {
666 self.kademlia_disjoint_query_paths
667 }
668
669 pub const fn with_kademlia_disjoint_query_paths(
671 mut self,
672 enable: bool,
673 ) -> Self {
674 self.kademlia_disjoint_query_paths = enable;
675 self
676 }
677}
678
679#[derive(Clone, Debug, Deserialize, Serialize)]
681pub struct RoutingNode {
682 pub peer_id: String,
684 pub address: Vec<String>,
686}