1use futures_timer::Delay;
2
3use libp2p::{
4 Multiaddr, PeerId, StreamProtocol,
5 core::Endpoint,
6 futures::FutureExt,
7 kad::{
8 Behaviour as Kademlia, BootstrapError, Config as KademliaConfig,
9 Event as KademliaEvent, GetClosestPeersError, GetClosestPeersOk,
10 K_VALUE, PeerInfo, QueryResult,
11 store::{MemoryStore, MemoryStoreConfig},
12 },
13 swarm::{
14 CloseConnection, ConnectionDenied, ConnectionId, FromSwarm,
15 NetworkBehaviour, THandler, ToSwarm,
16 },
17};
18use serde::{Deserialize, Serialize};
19
20use std::{
21 collections::{HashMap, VecDeque},
22 task::{Poll, Waker},
23 time::Duration,
24};
25
26use crate::{NodeType, utils::LimitsConfig};
27
28#[cfg(not(any(test, feature = "test")))]
29use crate::utils::{is_dns, is_global, is_loop_back, is_private, is_tcp};
30
31pub struct Behaviour {
33 pre_routing: bool,
35
36 kademlia: Kademlia<MemoryStore>,
38
39 waker: Option<Waker>,
40
41 next_random_walk: Option<Delay>,
43
44 duration_to_next_kad: Duration,
46
47 num_connections: u64,
49
50 discovery_only_if_under_num: u64,
52
53 #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
54 allow_private_address_in_dht: bool,
55
56 #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
57 allow_dns_address_in_dht: bool,
58
59 #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
60 allow_loop_back_address_in_dht: bool,
61
62 close_connections: VecDeque<(PeerId, Option<ConnectionId>)>,
64
65 peer_to_remove: HashMap<PeerId, u8>,
66}
67
68impl Behaviour {
69 pub fn new(
71 peer_id: PeerId,
72 config: Config,
73 protocol: StreamProtocol,
74 node_type: NodeType,
75 limits: LimitsConfig,
76 ) -> Self {
77 let Config {
78 dht_random_walk,
79 discovery_only_if_under_num,
80 allow_dns_address_in_dht,
81 allow_private_address_in_dht,
82 allow_loop_back_address_in_dht,
83 kademlia_disjoint_query_paths,
84 } = config;
85
86 let mut kad_config = KademliaConfig::new(protocol);
87
88 kad_config.disjoint_query_paths(kademlia_disjoint_query_paths);
89 kad_config.set_query_timeout(Duration::from_secs(
90 limits.kademlia_query_timeout,
91 ));
92 kad_config.set_replication_interval(None);
93 kad_config.set_caching(libp2p::kad::Caching::Disabled);
94
95 kad_config.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual);
99 kad_config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
100
101 let store = MemoryStore::with_config(
102 peer_id,
103 MemoryStoreConfig {
104 max_records: 0,
105 max_value_bytes: 0,
106 max_providers_per_key: K_VALUE.get(),
107 max_provided_keys: 0,
108 },
109 );
110 let mut kad = Kademlia::with_config(peer_id, store, kad_config);
111
112 if let NodeType::Addressable | NodeType::Bootstrap = node_type {
113 kad.set_mode(Some(libp2p::kad::Mode::Server));
114 } else {
115 kad.set_mode(Some(libp2p::kad::Mode::Client));
116 }
117
118 Self {
119 kademlia: kad,
120 next_random_walk: if dht_random_walk
121 && node_type == NodeType::Bootstrap
122 {
123 Some(Delay::new(Duration::new(0, 0)))
124 } else {
125 None
126 },
127 duration_to_next_kad: Duration::from_secs(1),
128 num_connections: 0,
129 discovery_only_if_under_num,
130 allow_dns_address_in_dht,
131 allow_private_address_in_dht,
132 allow_loop_back_address_in_dht,
133 close_connections: VecDeque::new(),
134 pre_routing: true,
135 waker: None,
136 peer_to_remove: HashMap::new(),
137 }
138 }
139
140 pub fn new_close_connections(
141 &mut self,
142 peer_id: PeerId,
143 connection_id: Option<ConnectionId>,
144 ) {
145 self.close_connections.push_back((peer_id, connection_id));
146
147 if let Some(waker) = self.waker.take() {
148 waker.wake();
149 }
150 }
151
152 pub const fn finish_prerouting_state(&mut self) {
153 self.pre_routing = false;
154 }
155
156 pub fn is_known_peer(&mut self, peer_id: &PeerId) -> bool {
158 for b in self.kademlia.kbuckets() {
159 if b.iter().any(|x| peer_id == x.node.key.preimage()) {
160 return true;
161 }
162 }
163
164 false
165 }
166
167 pub fn add_peer_to_remove(&mut self, peer_id: &PeerId) {
168 let count = self
169 .peer_to_remove
170 .entry(*peer_id)
171 .and_modify(|x| *x += 1)
172 .or_insert(1);
173
174 if *count >= 3 {
175 self.remove_node(peer_id);
176 self.clean_peer_to_remove(peer_id);
177 }
178 }
179
180 pub fn clean_peer_to_remove(&mut self, peer_id: &PeerId) {
181 self.peer_to_remove.remove(peer_id);
182 }
183
184 pub fn add_self_reported_address(
190 &mut self,
191 peer_id: &PeerId,
192 addr: &Multiaddr,
193 ) -> bool {
194 if self.is_invalid_address(addr) {
195 return false;
196 }
197
198 self.kademlia.add_address(peer_id, addr.clone());
199 true
200 }
201
202 #[cfg(any(test, feature = "test"))]
203 pub fn is_invalid_address(&self, _addr: &Multiaddr) -> bool {
204 return false;
205 }
206
207 #[cfg(not(any(test, feature = "test")))]
208 pub fn is_invalid_address(&self, addr: &Multiaddr) -> bool {
209 {
210 if !is_tcp(addr) {
212 return true;
213 }
214
215 if is_private(addr) {
216 return !self.allow_private_address_in_dht;
217 }
218
219 if is_loop_back(addr) {
220 return !self.allow_loop_back_address_in_dht;
221 }
222
223 if is_dns(addr) {
224 return !self.allow_dns_address_in_dht;
225 }
226
227 !is_global(addr)
228 }
229 }
230
231 pub fn discover(&mut self, peer_id: &PeerId) {
233 self.kademlia.get_closest_peers(*peer_id);
234 }
235
236 pub fn remove_node(&mut self, peer_id: &PeerId) {
238 self.kademlia.remove_peer(peer_id);
239 }
240}
241
242#[derive(Debug)]
244pub enum Event {
245 ClosestPeer {
247 peer_id: PeerId,
248 info: Option<PeerInfo>,
249 },
250}
251
252impl NetworkBehaviour for Behaviour {
253 type ConnectionHandler =
254 <Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler;
255 type ToSwarm = Event;
256
257 fn handle_established_inbound_connection(
258 &mut self,
259 connection_id: ConnectionId,
260 peer: PeerId,
261 local_addr: &Multiaddr,
262 remote_addr: &Multiaddr,
263 ) -> Result<THandler<Self>, ConnectionDenied> {
264 self.kademlia.handle_established_inbound_connection(
265 connection_id,
266 peer,
267 local_addr,
268 remote_addr,
269 )
270 }
271
272 fn handle_established_outbound_connection(
273 &mut self,
274 connection_id: ConnectionId,
275 peer: PeerId,
276 addr: &Multiaddr,
277 role_override: Endpoint,
278 port_use: libp2p::core::transport::PortUse,
279 ) -> Result<THandler<Self>, ConnectionDenied> {
280 self.kademlia.handle_established_outbound_connection(
281 connection_id,
282 peer,
283 addr,
284 role_override,
285 port_use,
286 )
287 }
288
289 fn on_swarm_event(&mut self, event: FromSwarm) {
290 match event {
291 FromSwarm::AddressChange(..)
292 | FromSwarm::DialFailure(..)
293 | FromSwarm::ExpiredListenAddr(..)
294 | FromSwarm::ExternalAddrConfirmed(..)
295 | FromSwarm::ExternalAddrExpired(..)
296 | FromSwarm::ListenerClosed(..)
297 | FromSwarm::ListenFailure(..)
298 | FromSwarm::ListenerError(..)
299 | FromSwarm::NewListener(..)
300 | FromSwarm::NewListenAddr(..)
301 | FromSwarm::NewExternalAddrCandidate(..)
302 | FromSwarm::NewExternalAddrOfPeer(..) => {
303 self.kademlia.on_swarm_event(event);
304 }
305 FromSwarm::ConnectionEstablished(e) => {
306 self.num_connections += 1;
307 self.kademlia
308 .on_swarm_event(FromSwarm::ConnectionEstablished(e));
309 }
310 FromSwarm::ConnectionClosed(e) => {
311 self.num_connections = self.num_connections.saturating_sub(1);
312 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
313 }
314 _ => self.kademlia.on_swarm_event(event),
315 }
316 }
317
318 fn on_connection_handler_event(
319 &mut self,
320 peer_id: PeerId,
321 connection_id: libp2p::swarm::ConnectionId,
322 event: libp2p::swarm::THandlerOutEvent<Self>,
323 ) {
324 self.kademlia.on_connection_handler_event(
325 peer_id,
326 connection_id,
327 event,
328 );
329 }
330
331 fn poll(
332 &mut self,
333 cx: &mut std::task::Context<'_>,
334 ) -> std::task::Poll<
335 libp2p::swarm::ToSwarm<
336 Self::ToSwarm,
337 libp2p::swarm::THandlerInEvent<Self>,
338 >,
339 > {
340 if let Some((peer_id, connection_id)) =
341 self.close_connections.pop_front()
342 {
343 if let Some(connection_id) = connection_id {
344 return Poll::Ready(ToSwarm::CloseConnection {
345 peer_id,
346 connection: CloseConnection::One(connection_id),
347 });
348 } else {
349 return Poll::Ready(ToSwarm::CloseConnection {
350 peer_id,
351 connection: CloseConnection::All,
352 });
353 }
354 }
355
356 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
357 match ev {
358 ToSwarm::GenerateEvent(ev) => {
359 match ev {
360 KademliaEvent::RoutablePeer { peer, address } => {
361 self.add_self_reported_address(&peer, &address);
362 }
363 KademliaEvent::ModeChanged { .. }
364 | KademliaEvent::InboundRequest { .. }
365 | KademliaEvent::UnroutablePeer { .. }
366 | KademliaEvent::PendingRoutablePeer { .. }
367 | KademliaEvent::RoutingUpdated { .. } => {
368 }
370 KademliaEvent::OutboundQueryProgressed {
371 result,
372 ..
373 } => {
374 match result {
375 QueryResult::Bootstrap(bootstrap_ok) => {
376 match bootstrap_ok {
377 Ok(ok) => {
378 self.clean_peer_to_remove(&ok.peer);
379 }
380 Err(e) => {
381 let BootstrapError::Timeout {
382 peer,
383 ..
384 } = e;
385 self.add_peer_to_remove(&peer);
386 }
387 };
388 }
389 QueryResult::GetClosestPeers(
390 get_closest_peers_ok,
391 ) => {
392 match get_closest_peers_ok {
394 Ok(GetClosestPeersOk {
395 key,
396 peers,
397 }) => {
398 if let Ok(peer_id) =
399 PeerId::from_bytes(&key)
400 {
401 if let Some(info) =
402 peers.iter().find(|x| {
403 x.peer_id == peer_id
404 })
405 {
406 return Poll::Ready(
407 ToSwarm::GenerateEvent(
408 Event::ClosestPeer {
409 peer_id,
410 info: Some(
411 info.clone(),
412 ),
413 },
414 ),
415 );
416 } else {
417 return Poll::Ready(
418 ToSwarm::GenerateEvent(
419 Event::ClosestPeer {
420 peer_id,
421 info: None,
422 },
423 ),
424 );
425 }
426 };
427 }
428 Err(
429 GetClosestPeersError::Timeout {
430 key,
431 ..
432 },
433 ) => {
434 if let Ok(peer_id) =
435 PeerId::from_bytes(&key)
436 {
437 return Poll::Ready(
438 ToSwarm::GenerateEvent(
439 Event::ClosestPeer {
440 peer_id,
441 info: None,
442 },
443 ),
444 );
445 };
446 }
447 }
448 }
449 QueryResult::GetProviders(..)
450 | QueryResult::StartProviding(..)
451 | QueryResult::RepublishProvider(..)
452 | QueryResult::GetRecord(..)
453 | QueryResult::PutRecord(..)
454 | QueryResult::RepublishRecord(..) => {
455 }
457 };
458 }
459 }
460 }
461 ToSwarm::Dial { opts } => {
462 return Poll::Ready(ToSwarm::Dial { opts });
463 }
464 ToSwarm::NotifyHandler {
465 peer_id,
466 handler,
467 event,
468 } => {
469 return Poll::Ready(ToSwarm::NotifyHandler {
470 peer_id,
471 handler,
472 event,
473 });
474 }
475 ToSwarm::CloseConnection {
476 peer_id,
477 connection,
478 } => {
479 return Poll::Ready(ToSwarm::CloseConnection {
480 peer_id,
481 connection,
482 });
483 }
484 ToSwarm::ExternalAddrConfirmed(e) => {
485 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(e));
486 }
487 ToSwarm::ExternalAddrExpired(e) => {
488 return Poll::Ready(ToSwarm::ExternalAddrExpired(e));
489 }
490 ToSwarm::ListenOn { opts } => {
491 return Poll::Ready(ToSwarm::ListenOn { opts });
492 }
493 ToSwarm::NewExternalAddrCandidate(e) => {
494 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(e));
495 }
496 ToSwarm::RemoveListener { id } => {
497 return Poll::Ready(ToSwarm::RemoveListener { id });
498 }
499 _ => {}
500 }
501 }
502
503 if !self.pre_routing
505 && let Some(next) = self.next_random_walk.as_mut()
506 && next.poll_unpin(cx).is_ready()
507 {
508 if self.num_connections < self.discovery_only_if_under_num {
509 self.kademlia.get_closest_peers(PeerId::random());
510 }
511
512 *next = Delay::new(self.duration_to_next_kad);
513 self.duration_to_next_kad = std::cmp::min(
514 self.duration_to_next_kad * 2,
515 Duration::from_secs(120),
516 );
517 }
518
519 self.waker = Some(cx.waker().clone());
520 Poll::Pending
521 }
522
523 fn handle_pending_inbound_connection(
524 &mut self,
525 connection_id: libp2p::swarm::ConnectionId,
526 local_addr: &Multiaddr,
527 remote_addr: &Multiaddr,
528 ) -> Result<(), libp2p::swarm::ConnectionDenied> {
529 self.kademlia.handle_pending_inbound_connection(
530 connection_id,
531 local_addr,
532 remote_addr,
533 )
534 }
535
536 fn handle_pending_outbound_connection(
537 &mut self,
538 connection_id: libp2p::swarm::ConnectionId,
539 maybe_peer: Option<PeerId>,
540 addresses: &[Multiaddr],
541 effective_role: libp2p::core::Endpoint,
542 ) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
543 let addresses = self.kademlia.handle_pending_outbound_connection(
544 connection_id,
545 maybe_peer,
546 addresses,
547 effective_role,
548 )?;
549
550 let filter_addresses = addresses
551 .iter()
552 .filter(|x| !self.is_invalid_address(x))
553 .cloned()
554 .collect::<Vec<Multiaddr>>();
555
556 if filter_addresses.len() != addresses.len() {
557 if let Some(peer_id) = maybe_peer {
558 self.kademlia.remove_peer(&peer_id);
559 for addr in filter_addresses.iter() {
560 self.kademlia.add_address(&peer_id, addr.clone());
561 }
562 }
563 } else if filter_addresses.is_empty()
564 && let Some(peer_id) = maybe_peer
565 {
566 self.peer_to_remove.remove(&peer_id);
567 self.kademlia.remove_peer(&peer_id);
568 }
569
570 Ok(filter_addresses)
571 }
572}
573
574#[derive(Clone, Debug, Deserialize, Serialize)]
576#[serde(default)]
577pub struct Config {
578 dht_random_walk: bool,
580
581 discovery_only_if_under_num: u64,
583
584 allow_private_address_in_dht: bool,
585
586 allow_dns_address_in_dht: bool,
587
588 allow_loop_back_address_in_dht: bool,
589
590 kademlia_disjoint_query_paths: bool,
592}
593
594impl Default for Config {
595 fn default() -> Self {
596 Self {
597 dht_random_walk: true,
598 discovery_only_if_under_num: 25,
599 allow_private_address_in_dht: Default::default(),
600 allow_dns_address_in_dht: Default::default(),
601 allow_loop_back_address_in_dht: Default::default(),
602 kademlia_disjoint_query_paths: true,
603 }
604 }
605}
606
607impl Config {
608 pub const fn get_dht_random_walk(&self) -> bool {
610 self.dht_random_walk
611 }
612
613 pub const fn with_dht_random_walk(mut self, enable: bool) -> Self {
615 self.dht_random_walk = enable;
616 self
617 }
618
619 pub const fn get_discovery_limit(&self) -> u64 {
621 self.discovery_only_if_under_num
622 }
623
624 pub const fn with_discovery_limit(mut self, num: u64) -> Self {
626 self.discovery_only_if_under_num = num;
627 self
628 }
629
630 pub const fn get_allow_private_address_in_dht(&self) -> bool {
632 self.allow_private_address_in_dht
633 }
634
635 pub const fn with_allow_private_address_in_dht(
637 mut self,
638 allow: bool,
639 ) -> Self {
640 self.allow_private_address_in_dht = allow;
641 self
642 }
643
644 pub const fn get_allow_dns_address_in_dht(&self) -> bool {
646 self.allow_dns_address_in_dht
647 }
648
649 pub const fn with_allow_dns_address_in_dht(mut self, allow: bool) -> Self {
651 self.allow_dns_address_in_dht = allow;
652 self
653 }
654
655 pub const fn get_allow_loop_back_address_in_dht(&self) -> bool {
657 self.allow_loop_back_address_in_dht
658 }
659
660 pub const fn with_allow_loop_back_address_in_dht(
662 mut self,
663 allow: bool,
664 ) -> Self {
665 self.allow_loop_back_address_in_dht = allow;
666 self
667 }
668
669 pub const fn get_kademlia_disjoint_query_paths(&self) -> bool {
671 self.kademlia_disjoint_query_paths
672 }
673
674 pub const fn with_kademlia_disjoint_query_paths(
676 mut self,
677 enable: bool,
678 ) -> Self {
679 self.kademlia_disjoint_query_paths = enable;
680 self
681 }
682}
683
684#[derive(Clone, Debug, Deserialize, Serialize)]
686pub struct RoutingNode {
687 pub peer_id: String,
689 pub address: Vec<String>,
691}