Skip to main content

ave_network/
routing.rs

1use futures_timer::Delay;
2
3use libp2p::{
4    Multiaddr, PeerId, StreamProtocol,
5    core::Endpoint,
6    futures::FutureExt,
7    kad::{
8        Behaviour as Kademlia, BootstrapError, Config as KademliaConfig,
9        Event as KademliaEvent, GetClosestPeersError, GetClosestPeersOk,
10        K_VALUE, PeerInfo, QueryResult,
11        store::{MemoryStore, MemoryStoreConfig},
12    },
13    swarm::{
14        CloseConnection, ConnectionDenied, ConnectionId, FromSwarm,
15        NetworkBehaviour, THandler, ToSwarm,
16    },
17};
18use serde::{Deserialize, Serialize};
19
20use std::{
21    collections::{HashMap, VecDeque},
22    task::{Poll, Waker},
23    time::Duration,
24};
25
26use crate::{NodeType, utils::LimitsConfig};
27
28#[cfg(not(any(test, feature = "test")))]
29use crate::utils::{is_dns, is_global, is_loop_back, is_private, is_tcp};
30
31/// The discovery behaviour.
32pub struct Behaviour {
33    /// Boolean that activates the random walk if the node has already finished the initial pre-routing phase.
34    pre_routing: bool,
35
36    /// Kademlia behavior.
37    kademlia: Kademlia<MemoryStore>,
38
39    waker: Option<Waker>,
40
41    /// The next random walk in the Kademlia DHT. `None` if random walks are disabled.
42    next_random_walk: Option<Delay>,
43
44    /// Duration between random walks.
45    duration_to_next_kad: Duration,
46
47    /// Number of nodes we're currently connected to.
48    num_connections: u64,
49
50    /// Number of active connections over which we interrupt the discovery process.
51    discovery_only_if_under_num: u64,
52
53    #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
54    allow_private_address_in_dht: bool,
55
56    #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
57    allow_dns_address_in_dht: bool,
58
59    #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
60    allow_loop_back_address_in_dht: bool,
61
62    /// Peers to close connection
63    close_connections: VecDeque<(PeerId, Option<ConnectionId>)>,
64
65    peer_to_remove: HashMap<PeerId, u8>,
66}
67
68impl Behaviour {
69    /// Creates a new routing `Behaviour`.
70    pub fn new(
71        peer_id: PeerId,
72        config: Config,
73        protocol: StreamProtocol,
74        node_type: NodeType,
75        limits: LimitsConfig,
76    ) -> Self {
77        let Config {
78            dht_random_walk,
79            discovery_only_if_under_num,
80            allow_dns_address_in_dht,
81            allow_private_address_in_dht,
82            allow_loop_back_address_in_dht,
83            kademlia_disjoint_query_paths,
84        } = config;
85
86        let mut kad_config = KademliaConfig::new(protocol);
87
88        kad_config.disjoint_query_paths(kademlia_disjoint_query_paths);
89        kad_config.set_query_timeout(Duration::from_secs(
90            limits.kademlia_query_timeout,
91        ));
92        kad_config.set_replication_interval(None);
93        kad_config.set_caching(libp2p::kad::Caching::Disabled);
94
95        // By default Kademlia attempts to insert all peers into its routing table once a
96        // dialing attempt succeeds. In order to control which peer is added, disable the
97        // auto-insertion and instead add peers manually.
98        kad_config.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual);
99        kad_config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
100
101        let store = MemoryStore::with_config(
102            peer_id,
103            MemoryStoreConfig {
104                max_records: 0,
105                max_value_bytes: 0,
106                max_providers_per_key: K_VALUE.get(),
107                max_provided_keys: 0,
108            },
109        );
110        let mut kad = Kademlia::with_config(peer_id, store, kad_config);
111
112        if let NodeType::Addressable | NodeType::Bootstrap = node_type {
113            kad.set_mode(Some(libp2p::kad::Mode::Server));
114        } else {
115            kad.set_mode(Some(libp2p::kad::Mode::Client));
116        }
117
118        Self {
119            kademlia: kad,
120            next_random_walk: if dht_random_walk
121                && node_type == NodeType::Bootstrap
122            {
123                Some(Delay::new(Duration::new(0, 0)))
124            } else {
125                None
126            },
127            duration_to_next_kad: Duration::from_secs(1),
128            num_connections: 0,
129            discovery_only_if_under_num,
130            allow_dns_address_in_dht,
131            allow_private_address_in_dht,
132            allow_loop_back_address_in_dht,
133            close_connections: VecDeque::new(),
134            pre_routing: true,
135            waker: None,
136            peer_to_remove: HashMap::new(),
137        }
138    }
139
140    pub fn new_close_connections(
141        &mut self,
142        peer_id: PeerId,
143        connection_id: Option<ConnectionId>,
144    ) {
145        self.close_connections.push_back((peer_id, connection_id));
146
147        if let Some(waker) = self.waker.take() {
148            waker.wake();
149        }
150    }
151
152    pub const fn finish_prerouting_state(&mut self) {
153        self.pre_routing = false;
154    }
155
156    /// Returns true if the given peer is known.
157    pub fn is_known_peer(&mut self, peer_id: &PeerId) -> bool {
158        for b in self.kademlia.kbuckets() {
159            if b.iter().any(|x| peer_id == x.node.key.preimage()) {
160                return true;
161            }
162        }
163
164        false
165    }
166
167    pub fn add_peer_to_remove(&mut self, peer_id: &PeerId) {
168        let count = self
169            .peer_to_remove
170            .entry(*peer_id)
171            .and_modify(|x| *x += 1)
172            .or_insert(1);
173
174        if *count >= 3 {
175            self.remove_node(peer_id);
176            self.clean_peer_to_remove(peer_id);
177        }
178    }
179
180    pub fn clean_peer_to_remove(&mut self, peer_id: &PeerId) {
181        self.peer_to_remove.remove(peer_id);
182    }
183
184    /// Add a self-reported address of a remote peer to the k-buckets of the DHT
185    /// if it has compatible `supported_protocols`.
186    ///
187    /// **Note**: It is important that you call this method. The discovery mechanism will not
188    /// automatically add connecting peers to the Kademlia k-buckets.
189    pub fn add_self_reported_address(
190        &mut self,
191        peer_id: &PeerId,
192        addr: &Multiaddr,
193    ) -> bool {
194        if self.is_invalid_address(addr) {
195            return false;
196        }
197
198        self.kademlia.add_address(peer_id, addr.clone());
199        true
200    }
201
202    #[cfg(any(test, feature = "test"))]
203    pub fn is_invalid_address(&self, _addr: &Multiaddr) -> bool {
204        return false;
205    }
206
207    #[cfg(not(any(test, feature = "test")))]
208    pub fn is_invalid_address(&self, addr: &Multiaddr) -> bool {
209        {
210            // Our transport is TPC only
211            if !is_tcp(addr) {
212                return true;
213            }
214
215            if is_private(addr) {
216                return !self.allow_private_address_in_dht;
217            }
218
219            if is_loop_back(addr) {
220                return !self.allow_loop_back_address_in_dht;
221            }
222
223            if is_dns(addr) {
224                return !self.allow_dns_address_in_dht;
225            }
226
227            !is_global(addr)
228        }
229    }
230
231    /// Discover closet peers to the given `PeerId`.
232    pub fn discover(&mut self, peer_id: &PeerId) {
233        self.kademlia.get_closest_peers(*peer_id);
234    }
235
236    /// Remove node from the DHT.
237    pub fn remove_node(&mut self, peer_id: &PeerId) {
238        self.kademlia.remove_peer(peer_id);
239    }
240}
241
242/// Event generated by the `DiscoveryBehaviour`.
243#[derive(Debug)]
244pub enum Event {
245    /// Closest peers to a given key have been found
246    ClosestPeer {
247        peer_id: PeerId,
248        info: Option<PeerInfo>,
249    },
250}
251
252impl NetworkBehaviour for Behaviour {
253    type ConnectionHandler =
254        <Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler;
255    type ToSwarm = Event;
256
257    fn handle_established_inbound_connection(
258        &mut self,
259        connection_id: ConnectionId,
260        peer: PeerId,
261        local_addr: &Multiaddr,
262        remote_addr: &Multiaddr,
263    ) -> Result<THandler<Self>, ConnectionDenied> {
264        self.kademlia.handle_established_inbound_connection(
265            connection_id,
266            peer,
267            local_addr,
268            remote_addr,
269        )
270    }
271
272    fn handle_established_outbound_connection(
273        &mut self,
274        connection_id: ConnectionId,
275        peer: PeerId,
276        addr: &Multiaddr,
277        role_override: Endpoint,
278        port_use: libp2p::core::transport::PortUse,
279    ) -> Result<THandler<Self>, ConnectionDenied> {
280        self.kademlia.handle_established_outbound_connection(
281            connection_id,
282            peer,
283            addr,
284            role_override,
285            port_use,
286        )
287    }
288
289    fn on_swarm_event(&mut self, event: FromSwarm) {
290        match event {
291            FromSwarm::AddressChange(..)
292            | FromSwarm::DialFailure(..)
293            | FromSwarm::ExpiredListenAddr(..)
294            | FromSwarm::ExternalAddrConfirmed(..)
295            | FromSwarm::ExternalAddrExpired(..)
296            | FromSwarm::ListenerClosed(..)
297            | FromSwarm::ListenFailure(..)
298            | FromSwarm::ListenerError(..)
299            | FromSwarm::NewListener(..)
300            | FromSwarm::NewListenAddr(..)
301            | FromSwarm::NewExternalAddrCandidate(..)
302            | FromSwarm::NewExternalAddrOfPeer(..) => {
303                self.kademlia.on_swarm_event(event);
304            }
305            FromSwarm::ConnectionEstablished(e) => {
306                self.num_connections += 1;
307                self.kademlia
308                    .on_swarm_event(FromSwarm::ConnectionEstablished(e));
309            }
310            FromSwarm::ConnectionClosed(e) => {
311                self.num_connections = self.num_connections.saturating_sub(1);
312                self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
313            }
314            _ => self.kademlia.on_swarm_event(event),
315        }
316    }
317
318    fn on_connection_handler_event(
319        &mut self,
320        peer_id: PeerId,
321        connection_id: libp2p::swarm::ConnectionId,
322        event: libp2p::swarm::THandlerOutEvent<Self>,
323    ) {
324        self.kademlia.on_connection_handler_event(
325            peer_id,
326            connection_id,
327            event,
328        );
329    }
330
331    fn poll(
332        &mut self,
333        cx: &mut std::task::Context<'_>,
334    ) -> std::task::Poll<
335        libp2p::swarm::ToSwarm<
336            Self::ToSwarm,
337            libp2p::swarm::THandlerInEvent<Self>,
338        >,
339    > {
340        if let Some((peer_id, connection_id)) =
341            self.close_connections.pop_front()
342        {
343            if let Some(connection_id) = connection_id {
344                return Poll::Ready(ToSwarm::CloseConnection {
345                    peer_id,
346                    connection: CloseConnection::One(connection_id),
347                });
348            } else {
349                return Poll::Ready(ToSwarm::CloseConnection {
350                    peer_id,
351                    connection: CloseConnection::All,
352                });
353            }
354        }
355
356        while let Poll::Ready(ev) = self.kademlia.poll(cx) {
357            match ev {
358                ToSwarm::GenerateEvent(ev) => {
359                    match ev {
360                        KademliaEvent::RoutablePeer { peer, address } => {
361                            self.add_self_reported_address(&peer, &address);
362                        }
363                        KademliaEvent::ModeChanged { .. }
364                        | KademliaEvent::InboundRequest { .. }
365                        | KademliaEvent::UnroutablePeer { .. }
366                        | KademliaEvent::PendingRoutablePeer { .. }
367                        | KademliaEvent::RoutingUpdated { .. } => {
368                            // We are not interested in this event at the moment.
369                        }
370                        KademliaEvent::OutboundQueryProgressed {
371                            result,
372                            ..
373                        } => {
374                            match result {
375                                QueryResult::Bootstrap(bootstrap_ok) => {
376                                    match bootstrap_ok {
377                                        Ok(ok) => {
378                                            self.clean_peer_to_remove(&ok.peer);
379                                        }
380                                        Err(e) => {
381                                            let BootstrapError::Timeout {
382                                                peer,
383                                                ..
384                                            } = e;
385                                            self.add_peer_to_remove(&peer);
386                                        }
387                                    };
388                                }
389                                QueryResult::GetClosestPeers(
390                                    get_closest_peers_ok,
391                                ) => {
392                                    // kademlia.get_closest_peers(PeerId)
393                                    match get_closest_peers_ok {
394                                        Ok(GetClosestPeersOk {
395                                            key,
396                                            peers,
397                                        }) => {
398                                            if let Ok(peer_id) =
399                                                PeerId::from_bytes(&key)
400                                            {
401                                                if let Some(info) =
402                                                    peers.iter().find(|x| {
403                                                        x.peer_id == peer_id
404                                                    })
405                                                {
406                                                    return Poll::Ready(
407                                                    ToSwarm::GenerateEvent(
408                                                        Event::ClosestPeer {
409                                                            peer_id,
410                                                            info: Some(
411                                                                info.clone(),
412                                                            ),
413                                                        },
414                                                    ),
415                                                );
416                                                } else {
417                                                    return Poll::Ready(
418                                                    ToSwarm::GenerateEvent(
419                                                        Event::ClosestPeer {
420                                                            peer_id,
421                                                            info: None,
422                                                        },
423                                                    ),
424                                                );
425                                                }
426                                            };
427                                        }
428                                        Err(
429                                            GetClosestPeersError::Timeout {
430                                                key,
431                                                ..
432                                            },
433                                        ) => {
434                                            if let Ok(peer_id) =
435                                                PeerId::from_bytes(&key)
436                                            {
437                                                return Poll::Ready(
438                                                    ToSwarm::GenerateEvent(
439                                                        Event::ClosestPeer {
440                                                            peer_id,
441                                                            info: None,
442                                                        },
443                                                    ),
444                                                );
445                                            };
446                                        }
447                                    }
448                                }
449                                QueryResult::GetProviders(..)
450                                | QueryResult::StartProviding(..)
451                                | QueryResult::RepublishProvider(..)
452                                | QueryResult::GetRecord(..)
453                                | QueryResult::PutRecord(..)
454                                | QueryResult::RepublishRecord(..) => {
455                                    // We are not interested in this event at the moment.
456                                }
457                            };
458                        }
459                    }
460                }
461                ToSwarm::Dial { opts } => {
462                    return Poll::Ready(ToSwarm::Dial { opts });
463                }
464                ToSwarm::NotifyHandler {
465                    peer_id,
466                    handler,
467                    event,
468                } => {
469                    return Poll::Ready(ToSwarm::NotifyHandler {
470                        peer_id,
471                        handler,
472                        event,
473                    });
474                }
475                ToSwarm::CloseConnection {
476                    peer_id,
477                    connection,
478                } => {
479                    return Poll::Ready(ToSwarm::CloseConnection {
480                        peer_id,
481                        connection,
482                    });
483                }
484                ToSwarm::ExternalAddrConfirmed(e) => {
485                    return Poll::Ready(ToSwarm::ExternalAddrConfirmed(e));
486                }
487                ToSwarm::ExternalAddrExpired(e) => {
488                    return Poll::Ready(ToSwarm::ExternalAddrExpired(e));
489                }
490                ToSwarm::ListenOn { opts } => {
491                    return Poll::Ready(ToSwarm::ListenOn { opts });
492                }
493                ToSwarm::NewExternalAddrCandidate(e) => {
494                    return Poll::Ready(ToSwarm::NewExternalAddrCandidate(e));
495                }
496                ToSwarm::RemoveListener { id } => {
497                    return Poll::Ready(ToSwarm::RemoveListener { id });
498                }
499                _ => {}
500            }
501        }
502
503        // Poll the stream that fires when we need to start a random Kademlia query.
504        if !self.pre_routing
505            && let Some(next) = self.next_random_walk.as_mut()
506            && next.poll_unpin(cx).is_ready()
507        {
508            if self.num_connections < self.discovery_only_if_under_num {
509                self.kademlia.get_closest_peers(PeerId::random());
510            }
511
512            *next = Delay::new(self.duration_to_next_kad);
513            self.duration_to_next_kad = std::cmp::min(
514                self.duration_to_next_kad * 2,
515                Duration::from_secs(120),
516            );
517        }
518
519        self.waker = Some(cx.waker().clone());
520        Poll::Pending
521    }
522
523    fn handle_pending_inbound_connection(
524        &mut self,
525        connection_id: libp2p::swarm::ConnectionId,
526        local_addr: &Multiaddr,
527        remote_addr: &Multiaddr,
528    ) -> Result<(), libp2p::swarm::ConnectionDenied> {
529        self.kademlia.handle_pending_inbound_connection(
530            connection_id,
531            local_addr,
532            remote_addr,
533        )
534    }
535
536    fn handle_pending_outbound_connection(
537        &mut self,
538        connection_id: libp2p::swarm::ConnectionId,
539        maybe_peer: Option<PeerId>,
540        addresses: &[Multiaddr],
541        effective_role: libp2p::core::Endpoint,
542    ) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
543        let addresses = self.kademlia.handle_pending_outbound_connection(
544            connection_id,
545            maybe_peer,
546            addresses,
547            effective_role,
548        )?;
549
550        let filter_addresses = addresses
551            .iter()
552            .filter(|x| !self.is_invalid_address(x))
553            .cloned()
554            .collect::<Vec<Multiaddr>>();
555
556        if filter_addresses.len() != addresses.len() {
557            if let Some(peer_id) = maybe_peer {
558                self.kademlia.remove_peer(&peer_id);
559                for addr in filter_addresses.iter() {
560                    self.kademlia.add_address(&peer_id, addr.clone());
561                }
562            }
563        } else if filter_addresses.is_empty()
564            && let Some(peer_id) = maybe_peer
565        {
566            self.peer_to_remove.remove(&peer_id);
567            self.kademlia.remove_peer(&peer_id);
568        }
569
570        Ok(filter_addresses)
571    }
572}
573
574/// Configuration for the routing behaviour.
575#[derive(Clone, Debug, Deserialize, Serialize)]
576#[serde(default)]
577pub struct Config {
578    /// Whether to enable random walks in the Kademlia DHT.
579    dht_random_walk: bool,
580
581    /// Number of active connections over which we interrupt the discovery process.
582    discovery_only_if_under_num: u64,
583
584    allow_private_address_in_dht: bool,
585
586    allow_dns_address_in_dht: bool,
587
588    allow_loop_back_address_in_dht: bool,
589
590    /// When enabled the number of disjoint paths used equals the configured parallelism.
591    kademlia_disjoint_query_paths: bool,
592}
593
594impl Default for Config {
595    fn default() -> Self {
596        Self {
597            dht_random_walk: true,
598            discovery_only_if_under_num: 25,
599            allow_private_address_in_dht: Default::default(),
600            allow_dns_address_in_dht: Default::default(),
601            allow_loop_back_address_in_dht: Default::default(),
602            kademlia_disjoint_query_paths: true,
603        }
604    }
605}
606
607impl Config {
608    /// Get DHT random walk.
609    pub const fn get_dht_random_walk(&self) -> bool {
610        self.dht_random_walk
611    }
612
613    /// Enables or disables random walks in the Kademlia DHT.
614    pub const fn with_dht_random_walk(mut self, enable: bool) -> Self {
615        self.dht_random_walk = enable;
616        self
617    }
618
619    /// Get discovery limits.
620    pub const fn get_discovery_limit(&self) -> u64 {
621        self.discovery_only_if_under_num
622    }
623
624    /// Sets the number of active connections over which we interrupt the discovery process.
625    pub const fn with_discovery_limit(mut self, num: u64) -> Self {
626        self.discovery_only_if_under_num = num;
627        self
628    }
629
630    /// Get allow_local_address_in_dht.
631    pub const fn get_allow_private_address_in_dht(&self) -> bool {
632        self.allow_private_address_in_dht
633    }
634
635    /// Whether to allow local addresses in the DHT.
636    pub const fn with_allow_private_address_in_dht(
637        mut self,
638        allow: bool,
639    ) -> Self {
640        self.allow_private_address_in_dht = allow;
641        self
642    }
643
644    /// Get allow_dns_address_in_dht.
645    pub const fn get_allow_dns_address_in_dht(&self) -> bool {
646        self.allow_dns_address_in_dht
647    }
648
649    /// Whether to allow non-global addresses in the DHT.
650    pub const fn with_allow_dns_address_in_dht(mut self, allow: bool) -> Self {
651        self.allow_dns_address_in_dht = allow;
652        self
653    }
654
655    /// Get allow_non_globals_in_dht.
656    pub const fn get_allow_loop_back_address_in_dht(&self) -> bool {
657        self.allow_loop_back_address_in_dht
658    }
659
660    /// Whether to allow non-global addresses in the DHT.
661    pub const fn with_allow_loop_back_address_in_dht(
662        mut self,
663        allow: bool,
664    ) -> Self {
665        self.allow_loop_back_address_in_dht = allow;
666        self
667    }
668
669    /// Get allow kademlia disjoint query paths
670    pub const fn get_kademlia_disjoint_query_paths(&self) -> bool {
671        self.kademlia_disjoint_query_paths
672    }
673
674    /// When enabled the number of disjoint paths used equals the configured parallelism.
675    pub const fn with_kademlia_disjoint_query_paths(
676        mut self,
677        enable: bool,
678    ) -> Self {
679        self.kademlia_disjoint_query_paths = enable;
680        self
681    }
682}
683
684/// A node in the routing table.
685#[derive(Clone, Debug, Deserialize, Serialize)]
686pub struct RoutingNode {
687    /// Peer ID.
688    pub peer_id: String,
689    /// Address.
690    pub address: Vec<String>,
691}