Skip to main content

ave_network/
routing.rs

1use futures_timer::Delay;
2
3use libp2p::{
4    Multiaddr, PeerId, StreamProtocol,
5    core::Endpoint,
6    futures::FutureExt,
7    kad::{
8        Behaviour as Kademlia, BootstrapError, Config as KademliaConfig,
9        Event as KademliaEvent, GetClosestPeersError, GetClosestPeersOk,
10        K_VALUE, PeerInfo, QueryResult,
11        store::{MemoryStore, MemoryStoreConfig},
12    },
13    swarm::{
14        CloseConnection, ConnectionDenied, ConnectionId, FromSwarm,
15        NetworkBehaviour, THandler, ToSwarm,
16    },
17};
18use serde::{Deserialize, Serialize};
19
20use std::{
21    collections::{HashMap, VecDeque},
22    task::{Poll, Waker},
23    time::Duration,
24};
25
26use crate::{NodeType, utils::LimitsConfig};
27
28#[cfg(not(any(test, feature = "test")))]
29use crate::utils::{is_dns, is_global, is_loop_back, is_private, is_tcp};
30
31/// The discovery behaviour.
32pub struct Behaviour {
33    /// Boolean that activates the random walk if the node has already finished the initial pre-routing phase.
34    pre_routing: bool,
35
36    /// Kademlia behavior.
37    kademlia: Kademlia<MemoryStore>,
38
39    waker: Option<Waker>,
40
41    /// The next random walk in the Kademlia DHT. `None` if random walks are disabled.
42    next_random_walk: Option<Delay>,
43
44    /// Duration between random walks.
45    duration_to_next_kad: Duration,
46
47    /// Number of nodes we're currently connected to.
48    num_connections: u64,
49
50    /// Number of active connections over which we interrupt the discovery process.
51    discovery_only_if_under_num: u64,
52
53    allow_private_address_in_dht: bool,
54
55    allow_dns_address_in_dht: bool,
56
57    allow_loop_back_address_in_dht: bool,
58
59    /// Peers to close connection
60    close_connections: VecDeque<(PeerId, Option<ConnectionId>)>,
61
62    peer_to_remove: HashMap<PeerId, u8>,
63}
64
65impl Behaviour {
66    /// Creates a new routing `Behaviour`.
67    pub fn new(
68        peer_id: PeerId,
69        config: Config,
70        protocol: StreamProtocol,
71        node_type: NodeType,
72        limits: LimitsConfig,
73    ) -> Self {
74        let Config {
75            dht_random_walk,
76            discovery_only_if_under_num,
77            allow_dns_address_in_dht,
78            allow_private_address_in_dht,
79            allow_loop_back_address_in_dht,
80            kademlia_disjoint_query_paths,
81        } = config;
82
83        let mut kad_config = KademliaConfig::new(protocol);
84
85        kad_config.disjoint_query_paths(kademlia_disjoint_query_paths);
86        kad_config.set_query_timeout(Duration::from_secs(
87            limits.kademlia_query_timeout,
88        ));
89        kad_config.set_replication_interval(None);
90        kad_config.set_caching(libp2p::kad::Caching::Disabled);
91
92        // By default Kademlia attempts to insert all peers into its routing table once a
93        // dialing attempt succeeds. In order to control which peer is added, disable the
94        // auto-insertion and instead add peers manually.
95        kad_config.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual);
96        kad_config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
97
98        let store = MemoryStore::with_config(
99            peer_id,
100            MemoryStoreConfig {
101                max_records: 0,
102                max_value_bytes: 0,
103                max_providers_per_key: K_VALUE.get(),
104                max_provided_keys: 0,
105            },
106        );
107        let mut kad = Kademlia::with_config(peer_id, store, kad_config);
108
109        if let NodeType::Addressable | NodeType::Bootstrap = node_type {
110            kad.set_mode(Some(libp2p::kad::Mode::Server));
111        } else {
112            kad.set_mode(Some(libp2p::kad::Mode::Client));
113        }
114
115        Self {
116            kademlia: kad,
117            next_random_walk: if dht_random_walk
118                && node_type == NodeType::Bootstrap
119            {
120                Some(Delay::new(Duration::new(0, 0)))
121            } else {
122                None
123            },
124            duration_to_next_kad: Duration::from_secs(1),
125            num_connections: 0,
126            discovery_only_if_under_num,
127            allow_dns_address_in_dht,
128            allow_private_address_in_dht,
129            allow_loop_back_address_in_dht,
130            close_connections: VecDeque::new(),
131            pre_routing: true,
132            waker: None,
133            peer_to_remove: HashMap::new(),
134        }
135    }
136
137    pub fn new_close_connections(
138        &mut self,
139        peer_id: PeerId,
140        connection_id: Option<ConnectionId>,
141    ) {
142        self.close_connections.push_back((peer_id, connection_id));
143
144        if let Some(waker) = self.waker.take() {
145            waker.wake();
146        }
147    }
148
149    pub const fn finish_prerouting_state(&mut self) {
150        self.pre_routing = false;
151    }
152
153    /// Returns true if the given peer is known.
154    pub fn is_known_peer(&mut self, peer_id: &PeerId) -> bool {
155        for b in self.kademlia.kbuckets() {
156            if b.iter().any(|x| peer_id == x.node.key.preimage()) {
157                return true;
158            }
159        }
160
161        false
162    }
163
164    pub fn add_peer_to_remove(&mut self, peer_id: &PeerId) {
165        let count = self
166            .peer_to_remove
167            .entry(*peer_id)
168            .and_modify(|x| *x += 1)
169            .or_insert(1);
170
171        if *count >= 3 {
172            self.remove_node(peer_id);
173            self.clean_peer_to_remove(peer_id);
174        }
175    }
176
177    pub fn clean_peer_to_remove(&mut self, peer_id: &PeerId) {
178        self.peer_to_remove.remove(peer_id);
179    }
180
181    /// Add a self-reported address of a remote peer to the k-buckets of the DHT
182    /// if it has compatible `supported_protocols`.
183    ///
184    /// **Note**: It is important that you call this method. The discovery mechanism will not
185    /// automatically add connecting peers to the Kademlia k-buckets.
186    pub fn add_self_reported_address(
187        &mut self,
188        peer_id: &PeerId,
189        addr: &Multiaddr,
190    ) -> bool {
191        if self.is_invalid_address(addr) {
192            return false;
193        }
194
195        self.kademlia.add_address(peer_id, addr.clone());
196        true
197    }
198
199    pub fn is_invalid_address(&self, addr: &Multiaddr) -> bool {
200        #[cfg(not(any(test, feature = "test")))]
201        {
202            // Our transport is TPC only
203            if !is_tcp(addr) {
204                return true;
205            }
206
207            if is_private(addr) {
208                return !self.allow_private_address_in_dht;
209            }
210
211            if is_loop_back(addr) {
212                return !self.allow_loop_back_address_in_dht;
213            }
214
215            if is_dns(addr) {
216                return !self.allow_dns_address_in_dht;
217            }
218
219            !is_global(addr)
220        }
221
222        #[cfg(any(test, feature = "test"))]
223        return false;
224    }
225
226    /// Discover closet peers to the given `PeerId`.
227    pub fn discover(&mut self, peer_id: &PeerId) {
228        self.kademlia.get_closest_peers(*peer_id);
229    }
230
231    /// Remove node from the DHT.
232    pub fn remove_node(&mut self, peer_id: &PeerId) {
233        self.kademlia.remove_peer(peer_id);
234    }
235}
236
237/// Event generated by the `DiscoveryBehaviour`.
238#[derive(Debug)]
239pub enum Event {
240    /// Closest peers to a given key have been found
241    ClosestPeer {
242        peer_id: PeerId,
243        info: Option<PeerInfo>,
244    },
245}
246
247impl NetworkBehaviour for Behaviour {
248    type ConnectionHandler =
249        <Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler;
250    type ToSwarm = Event;
251
252    fn handle_established_inbound_connection(
253        &mut self,
254        connection_id: ConnectionId,
255        peer: PeerId,
256        local_addr: &Multiaddr,
257        remote_addr: &Multiaddr,
258    ) -> Result<THandler<Self>, ConnectionDenied> {
259        self.kademlia.handle_established_inbound_connection(
260            connection_id,
261            peer,
262            local_addr,
263            remote_addr,
264        )
265    }
266
267    fn handle_established_outbound_connection(
268        &mut self,
269        connection_id: ConnectionId,
270        peer: PeerId,
271        addr: &Multiaddr,
272        role_override: Endpoint,
273        port_use: libp2p::core::transport::PortUse,
274    ) -> Result<THandler<Self>, ConnectionDenied> {
275        self.kademlia.handle_established_outbound_connection(
276            connection_id,
277            peer,
278            addr,
279            role_override,
280            port_use,
281        )
282    }
283
284    fn on_swarm_event(&mut self, event: FromSwarm) {
285        match event {
286            FromSwarm::AddressChange(..)
287            | FromSwarm::DialFailure(..)
288            | FromSwarm::ExpiredListenAddr(..)
289            | FromSwarm::ExternalAddrConfirmed(..)
290            | FromSwarm::ExternalAddrExpired(..)
291            | FromSwarm::ListenerClosed(..)
292            | FromSwarm::ListenFailure(..)
293            | FromSwarm::ListenerError(..)
294            | FromSwarm::NewListener(..)
295            | FromSwarm::NewListenAddr(..)
296            | FromSwarm::NewExternalAddrCandidate(..)
297            | FromSwarm::NewExternalAddrOfPeer(..) => {
298                self.kademlia.on_swarm_event(event);
299            }
300            FromSwarm::ConnectionEstablished(e) => {
301                self.num_connections += 1;
302                self.kademlia
303                    .on_swarm_event(FromSwarm::ConnectionEstablished(e));
304            }
305            FromSwarm::ConnectionClosed(e) => {
306                self.num_connections = self.num_connections.saturating_sub(1);
307                self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
308            }
309            _ => self.kademlia.on_swarm_event(event),
310        }
311    }
312
313    fn on_connection_handler_event(
314        &mut self,
315        peer_id: PeerId,
316        connection_id: libp2p::swarm::ConnectionId,
317        event: libp2p::swarm::THandlerOutEvent<Self>,
318    ) {
319        self.kademlia.on_connection_handler_event(
320            peer_id,
321            connection_id,
322            event,
323        );
324    }
325
326    fn poll(
327        &mut self,
328        cx: &mut std::task::Context<'_>,
329    ) -> std::task::Poll<
330        libp2p::swarm::ToSwarm<
331            Self::ToSwarm,
332            libp2p::swarm::THandlerInEvent<Self>,
333        >,
334    > {
335        if let Some((peer_id, connection_id)) =
336            self.close_connections.pop_front()
337        {
338            if let Some(connection_id) = connection_id {
339                return Poll::Ready(ToSwarm::CloseConnection {
340                    peer_id,
341                    connection: CloseConnection::One(connection_id),
342                });
343            } else {
344                return Poll::Ready(ToSwarm::CloseConnection {
345                    peer_id,
346                    connection: CloseConnection::All,
347                });
348            }
349        }
350
351        while let Poll::Ready(ev) = self.kademlia.poll(cx) {
352            match ev {
353                ToSwarm::GenerateEvent(ev) => {
354                    match ev {
355                        KademliaEvent::RoutablePeer { peer, address } => {
356                            self.add_self_reported_address(&peer, &address);
357                        }
358                        KademliaEvent::ModeChanged { .. }
359                        | KademliaEvent::InboundRequest { .. }
360                        | KademliaEvent::UnroutablePeer { .. }
361                        | KademliaEvent::PendingRoutablePeer { .. }
362                        | KademliaEvent::RoutingUpdated { .. } => {
363                            // We are not interested in this event at the moment.
364                        }
365                        KademliaEvent::OutboundQueryProgressed {
366                            result,
367                            ..
368                        } => {
369                            match result {
370                                QueryResult::Bootstrap(bootstrap_ok) => {
371                                    match bootstrap_ok {
372                                        Ok(ok) => {
373                                            self.clean_peer_to_remove(&ok.peer);
374                                        }
375                                        Err(e) => {
376                                            let BootstrapError::Timeout {
377                                                peer,
378                                                ..
379                                            } = e;
380                                            self.add_peer_to_remove(&peer);
381                                        }
382                                    };
383                                }
384                                QueryResult::GetClosestPeers(
385                                    get_closest_peers_ok,
386                                ) => {
387                                    // kademlia.get_closest_peers(PeerId)
388                                    match get_closest_peers_ok {
389                                        Ok(GetClosestPeersOk {
390                                            key,
391                                            peers,
392                                        }) => {
393                                            if let Ok(peer_id) =
394                                                PeerId::from_bytes(&key)
395                                            {
396                                                if let Some(info) =
397                                                    peers.iter().find(|x| {
398                                                        x.peer_id == peer_id
399                                                    })
400                                                {
401                                                    return Poll::Ready(
402                                                    ToSwarm::GenerateEvent(
403                                                        Event::ClosestPeer {
404                                                            peer_id,
405                                                            info: Some(
406                                                                info.clone(),
407                                                            ),
408                                                        },
409                                                    ),
410                                                );
411                                                } else {
412                                                    return Poll::Ready(
413                                                    ToSwarm::GenerateEvent(
414                                                        Event::ClosestPeer {
415                                                            peer_id,
416                                                            info: None,
417                                                        },
418                                                    ),
419                                                );
420                                                }
421                                            };
422                                        }
423                                        Err(
424                                            GetClosestPeersError::Timeout {
425                                                key,
426                                                ..
427                                            },
428                                        ) => {
429                                            if let Ok(peer_id) =
430                                                PeerId::from_bytes(&key)
431                                            {
432                                                return Poll::Ready(
433                                                    ToSwarm::GenerateEvent(
434                                                        Event::ClosestPeer {
435                                                            peer_id,
436                                                            info: None,
437                                                        },
438                                                    ),
439                                                );
440                                            };
441                                        }
442                                    }
443                                }
444                                QueryResult::GetProviders(..)
445                                | QueryResult::StartProviding(..)
446                                | QueryResult::RepublishProvider(..)
447                                | QueryResult::GetRecord(..)
448                                | QueryResult::PutRecord(..)
449                                | QueryResult::RepublishRecord(..) => {
450                                    // We are not interested in this event at the moment.
451                                }
452                            };
453                        }
454                    }
455                }
456                ToSwarm::Dial { opts } => {
457                    return Poll::Ready(ToSwarm::Dial { opts });
458                }
459                ToSwarm::NotifyHandler {
460                    peer_id,
461                    handler,
462                    event,
463                } => {
464                    return Poll::Ready(ToSwarm::NotifyHandler {
465                        peer_id,
466                        handler,
467                        event,
468                    });
469                }
470                ToSwarm::CloseConnection {
471                    peer_id,
472                    connection,
473                } => {
474                    return Poll::Ready(ToSwarm::CloseConnection {
475                        peer_id,
476                        connection,
477                    });
478                }
479                ToSwarm::ExternalAddrConfirmed(e) => {
480                    return Poll::Ready(ToSwarm::ExternalAddrConfirmed(e));
481                }
482                ToSwarm::ExternalAddrExpired(e) => {
483                    return Poll::Ready(ToSwarm::ExternalAddrExpired(e));
484                }
485                ToSwarm::ListenOn { opts } => {
486                    return Poll::Ready(ToSwarm::ListenOn { opts });
487                }
488                ToSwarm::NewExternalAddrCandidate(e) => {
489                    return Poll::Ready(ToSwarm::NewExternalAddrCandidate(e));
490                }
491                ToSwarm::RemoveListener { id } => {
492                    return Poll::Ready(ToSwarm::RemoveListener { id });
493                }
494                _ => {}
495            }
496        }
497
498        // Poll the stream that fires when we need to start a random Kademlia query.
499        if !self.pre_routing
500            && let Some(next) = self.next_random_walk.as_mut()
501            && next.poll_unpin(cx).is_ready()
502        {
503            if self.num_connections < self.discovery_only_if_under_num {
504                self.kademlia.get_closest_peers(PeerId::random());
505            }
506
507            *next = Delay::new(self.duration_to_next_kad);
508            self.duration_to_next_kad = std::cmp::min(
509                self.duration_to_next_kad * 2,
510                Duration::from_secs(120),
511            );
512        }
513
514        self.waker = Some(cx.waker().clone());
515        Poll::Pending
516    }
517
518    fn handle_pending_inbound_connection(
519        &mut self,
520        connection_id: libp2p::swarm::ConnectionId,
521        local_addr: &Multiaddr,
522        remote_addr: &Multiaddr,
523    ) -> Result<(), libp2p::swarm::ConnectionDenied> {
524        self.kademlia.handle_pending_inbound_connection(
525            connection_id,
526            local_addr,
527            remote_addr,
528        )
529    }
530
531    fn handle_pending_outbound_connection(
532        &mut self,
533        connection_id: libp2p::swarm::ConnectionId,
534        maybe_peer: Option<PeerId>,
535        addresses: &[Multiaddr],
536        effective_role: libp2p::core::Endpoint,
537    ) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
538        let addresses = self.kademlia.handle_pending_outbound_connection(
539            connection_id,
540            maybe_peer,
541            addresses,
542            effective_role,
543        )?;
544
545        let filter_addresses = addresses
546            .iter()
547            .filter(|x| !self.is_invalid_address(x))
548            .cloned()
549            .collect::<Vec<Multiaddr>>();
550
551        if filter_addresses.len() != addresses.len() {
552            if let Some(peer_id) = maybe_peer {
553                self.kademlia.remove_peer(&peer_id);
554                for addr in filter_addresses.iter() {
555                    self.kademlia.add_address(&peer_id, addr.clone());
556                }
557            }
558        } else if filter_addresses.is_empty()
559            && let Some(peer_id) = maybe_peer
560        {
561            self.peer_to_remove.remove(&peer_id);
562            self.kademlia.remove_peer(&peer_id);
563        }
564
565        Ok(filter_addresses)
566    }
567}
568
569/// Configuration for the routing behaviour.
570#[derive(Clone, Debug, Deserialize, Serialize)]
571#[serde(default)]
572pub struct Config {
573    /// Whether to enable random walks in the Kademlia DHT.
574    dht_random_walk: bool,
575
576    /// Number of active connections over which we interrupt the discovery process.
577    discovery_only_if_under_num: u64,
578
579    allow_private_address_in_dht: bool,
580
581    allow_dns_address_in_dht: bool,
582
583    allow_loop_back_address_in_dht: bool,
584
585    /// When enabled the number of disjoint paths used equals the configured parallelism.
586    kademlia_disjoint_query_paths: bool,
587}
588
589impl Default for Config {
590    fn default() -> Self {
591        Self {
592            dht_random_walk: true,
593            discovery_only_if_under_num: 25,
594            allow_private_address_in_dht: Default::default(),
595            allow_dns_address_in_dht: Default::default(),
596            allow_loop_back_address_in_dht: Default::default(),
597            kademlia_disjoint_query_paths: true,
598        }
599    }
600}
601
602impl Config {
603    /// Get DHT random walk.
604    pub const fn get_dht_random_walk(&self) -> bool {
605        self.dht_random_walk
606    }
607
608    /// Enables or disables random walks in the Kademlia DHT.
609    pub const fn with_dht_random_walk(mut self, enable: bool) -> Self {
610        self.dht_random_walk = enable;
611        self
612    }
613
614    /// Get discovery limits.
615    pub const fn get_discovery_limit(&self) -> u64 {
616        self.discovery_only_if_under_num
617    }
618
619    /// Sets the number of active connections over which we interrupt the discovery process.
620    pub const fn with_discovery_limit(mut self, num: u64) -> Self {
621        self.discovery_only_if_under_num = num;
622        self
623    }
624
625    /// Get allow_local_address_in_dht.
626    pub const fn get_allow_private_address_in_dht(&self) -> bool {
627        self.allow_private_address_in_dht
628    }
629
630    /// Whether to allow local addresses in the DHT.
631    pub const fn with_allow_private_address_in_dht(
632        mut self,
633        allow: bool,
634    ) -> Self {
635        self.allow_private_address_in_dht = allow;
636        self
637    }
638
639    /// Get allow_dns_address_in_dht.
640    pub const fn get_allow_dns_address_in_dht(&self) -> bool {
641        self.allow_dns_address_in_dht
642    }
643
644    /// Whether to allow non-global addresses in the DHT.
645    pub const fn with_allow_dns_address_in_dht(mut self, allow: bool) -> Self {
646        self.allow_dns_address_in_dht = allow;
647        self
648    }
649
650    /// Get allow_non_globals_in_dht.
651    pub const fn get_allow_loop_back_address_in_dht(&self) -> bool {
652        self.allow_loop_back_address_in_dht
653    }
654
655    /// Whether to allow non-global addresses in the DHT.
656    pub const fn with_allow_loop_back_address_in_dht(
657        mut self,
658        allow: bool,
659    ) -> Self {
660        self.allow_loop_back_address_in_dht = allow;
661        self
662    }
663
664    /// Get allow kademlia disjoint query paths
665    pub const fn get_kademlia_disjoint_query_paths(&self) -> bool {
666        self.kademlia_disjoint_query_paths
667    }
668
669    /// When enabled the number of disjoint paths used equals the configured parallelism.
670    pub const fn with_kademlia_disjoint_query_paths(
671        mut self,
672        enable: bool,
673    ) -> Self {
674        self.kademlia_disjoint_query_paths = enable;
675        self
676    }
677}
678
679/// A node in the routing table.
680#[derive(Clone, Debug, Deserialize, Serialize)]
681pub struct RoutingNode {
682    /// Peer ID.
683    pub peer_id: String,
684    /// Address.
685    pub address: Vec<String>,
686}